use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::thread;
use uuid::Uuid;
use crate::error::InternalError;
use crate::mesh::Mesh;
use crate::network::reply::InboundRouter;
use crate::transport::Connection;
use super::{JoinHandles, OrchestratableServiceFactory, ServiceOrchestrator};
pub struct RunnableServiceOrchestrator {
pub(super) connection: Box<dyn Connection>,
pub(super) incoming_capacity: usize,
pub(super) outgoing_capacity: usize,
pub(super) channel_capacity: usize,
pub(super) service_factories: Vec<Box<dyn OrchestratableServiceFactory>>,
pub(super) supported_service_types: Vec<String>,
}
impl RunnableServiceOrchestrator {
pub fn run(self) -> Result<ServiceOrchestrator, InternalError> {
let service_factories = self.service_factories;
let supported_service_types = self.supported_service_types;
let services = Arc::new(Mutex::new(HashMap::new()));
let stopped_services = Arc::new(Mutex::new(HashMap::new()));
let mesh = Mesh::new(self.incoming_capacity, self.outgoing_capacity);
let mesh_id = format!("{}", Uuid::new_v4());
mesh.add(self.connection, mesh_id.to_string())
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let running = Arc::new(AtomicBool::new(true));
let (network_sender, network_receiver) = crossbeam_channel::bounded(self.channel_capacity);
let (inbound_sender, inbound_receiver) = crossbeam_channel::bounded(self.channel_capacity);
let inbound_router = InboundRouter::new(Box::new(inbound_sender));
let incoming_mesh = mesh.clone();
let incoming_running = running.clone();
let incoming_router = inbound_router.clone();
let incoming_join_handle = thread::Builder::new()
.name("Orchestrator Incoming".into())
.spawn(move || {
if let Err(err) =
super::run_incoming_loop(incoming_mesh, incoming_running, incoming_router)
{
error!(
"Terminating orchestrator incoming thread due to error: {}",
err
);
Err(err)
} else {
Ok(())
}
})
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let inbound_services = services.clone();
let inbound_running = running.clone();
let inbound_join_handle = thread::Builder::new()
.name("Orchestrator Inbound".into())
.spawn(move || {
if let Err(err) =
super::run_inbound_loop(inbound_services, inbound_receiver, inbound_running)
{
error!(
"Terminating orchestrator inbound thread due to error: {}",
err
);
Err(err)
} else {
Ok(())
}
})
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let outgoing_running = running.clone();
let outgoing_join_handle = thread::Builder::new()
.name("Orchestrator Outgoing".into())
.spawn(move || {
if let Err(err) =
super::run_outgoing_loop(mesh, outgoing_running, network_receiver, mesh_id)
{
error!(
"Terminating orchestrator outgoing thread due to error: {}",
err
);
Err(err)
} else {
Ok(())
}
})
.map_err(|err| InternalError::from_source(Box::new(err)))?;
let join_handles = JoinHandles::new(vec![
incoming_join_handle,
inbound_join_handle,
outgoing_join_handle,
]);
info!("Service orchestrator started");
Ok(ServiceOrchestrator {
services,
stopped_services,
service_factories,
supported_service_types,
network_sender,
inbound_router,
running,
join_handles: Some(join_handles),
})
}
}