pub mod error;
mod factory;
mod processor;
mod registry;
#[cfg(feature = "rest-api")]
mod rest_api;
pub mod scabbard;
mod sender;
#[cfg(feature = "service-arg-validation")]
pub mod validation;
use std::any::Any;
pub use factory::ServiceFactory;
pub use processor::JoinHandles;
pub use processor::ServiceProcessor;
pub use processor::ShutdownHandle;
pub use registry::StandardServiceNetworkRegistry;
pub use error::{
FactoryCreateError, ServiceConnectionError, ServiceDestroyError, ServiceDisconnectionError,
ServiceError, ServiceProcessorError, ServiceSendError, ServiceStartError, ServiceStopError,
};
#[derive(Clone, Debug)]
pub struct ServiceMessageContext {
pub sender: String,
pub circuit: String,
pub correlation_id: String,
}
pub trait ServiceNetworkRegistry: Send {
fn connect(
&self,
service_id: &str,
) -> Result<Box<dyn ServiceNetworkSender>, ServiceConnectionError>;
fn disconnect(&self, service_id: &str) -> Result<(), ServiceDisconnectionError>;
}
pub trait ServiceNetworkSender: Send {
fn send(&self, recipient: &str, message: &[u8]) -> Result<(), ServiceSendError>;
fn send_and_await(&self, recipient: &str, message: &[u8]) -> Result<Vec<u8>, ServiceSendError>;
fn reply(
&self,
message_origin: &ServiceMessageContext,
message: &[u8],
) -> Result<(), ServiceSendError>;
fn clone_box(&self) -> Box<dyn ServiceNetworkSender>;
}
impl Clone for Box<dyn ServiceNetworkSender> {
fn clone(&self) -> Self {
self.clone_box()
}
}
pub trait Service: Send {
fn service_id(&self) -> &str;
fn service_type(&self) -> &str;
fn start(
&mut self,
service_registry: &dyn ServiceNetworkRegistry,
) -> Result<(), ServiceStartError>;
fn stop(
&mut self,
service_registry: &dyn ServiceNetworkRegistry,
) -> Result<(), ServiceStopError>;
fn destroy(self: Box<Self>) -> Result<(), ServiceDestroyError>;
fn handle_message(
&self,
message_bytes: &[u8],
message_context: &ServiceMessageContext,
) -> Result<(), ServiceError>;
fn as_any(&self) -> &dyn Any;
}
#[cfg(test)]
pub mod tests {
use super::*;
use std::collections::HashSet;
use std::error::Error;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct MockServiceNetworkRegistryError(pub String);
impl Error for MockServiceNetworkRegistryError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
None
}
}
impl std::fmt::Display for MockServiceNetworkRegistryError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct MockServiceNetworkRegistry {
pub connected_ids: Arc<Mutex<HashSet<String>>>,
network_sender: MockServiceNetworkSender,
}
impl MockServiceNetworkRegistry {
pub fn new() -> Self {
MockServiceNetworkRegistry {
connected_ids: Arc::new(Mutex::new(HashSet::new())),
network_sender: MockServiceNetworkSender::new(),
}
}
pub fn network_sender(&self) -> &MockServiceNetworkSender {
&self.network_sender
}
}
impl ServiceNetworkRegistry for MockServiceNetworkRegistry {
fn connect(
&self,
service_id: &str,
) -> Result<Box<dyn ServiceNetworkSender>, ServiceConnectionError> {
if self
.connected_ids
.lock()
.expect("connected_ids lock poisoned")
.insert(service_id.into())
{
Ok(Box::new(self.network_sender.clone()))
} else {
Err(ServiceConnectionError::RejectedError(format!(
"service with id {} already connected",
service_id
)))
}
}
fn disconnect(&self, service_id: &str) -> Result<(), ServiceDisconnectionError> {
if self
.connected_ids
.lock()
.expect("connected_ids lock poisoned")
.remove(service_id)
{
Ok(())
} else {
Err(ServiceDisconnectionError::RejectedError(format!(
"service with id {} not connected",
service_id
)))
}
}
}
#[derive(Clone, Debug)]
pub struct MockServiceNetworkSender {
pub sent: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
pub sent_and_awaited: Arc<Mutex<Vec<(String, Vec<u8>)>>>,
pub replied: Arc<Mutex<Vec<(ServiceMessageContext, Vec<u8>)>>>,
}
impl MockServiceNetworkSender {
pub fn new() -> Self {
MockServiceNetworkSender {
sent: Arc::new(Mutex::new(vec![])),
sent_and_awaited: Arc::new(Mutex::new(vec![])),
replied: Arc::new(Mutex::new(vec![])),
}
}
}
impl ServiceNetworkSender for MockServiceNetworkSender {
fn send(&self, recipient: &str, message: &[u8]) -> Result<(), ServiceSendError> {
self.sent
.lock()
.expect("sent lock poisoned")
.push((recipient.to_string(), message.to_vec()));
Ok(())
}
fn send_and_await(
&self,
recipient: &str,
message: &[u8],
) -> Result<Vec<u8>, ServiceSendError> {
self.sent_and_awaited
.lock()
.expect("sent_and_awaited lock poisoned")
.push((recipient.to_string(), message.to_vec()));
Ok(vec![])
}
fn reply(
&self,
message_origin: &ServiceMessageContext,
message: &[u8],
) -> Result<(), ServiceSendError> {
self.replied
.lock()
.expect("replied lock poisoned")
.push((message_origin.clone(), message.to_vec()));
Ok(())
}
fn clone_box(&self) -> Box<dyn ServiceNetworkSender> {
Box::new(self.clone())
}
}
pub fn test_connect_and_disconnect(service: &mut dyn Service) {
let registry = MockServiceNetworkRegistry::new();
service.start(®istry).expect("failed to start engine");
assert!(registry
.connected_ids
.lock()
.expect("connected_ids lock poisoned")
.contains(service.service_id()));
service.stop(®istry).expect("failed to stop engine");
assert!(registry
.connected_ids
.lock()
.expect("connected_ids lock poisoned")
.is_empty());
}
}