mod builder;
mod error;
#[cfg(feature = "rest-api-actix-web-1")]
mod rest_api;
mod runnable;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crossbeam_channel::{Receiver, Sender};
use protobuf::Message;
use crate::channel;
use crate::error::InternalError;
use crate::mesh::{Envelope, Mesh, RecvTimeoutError as MeshRecvTimeoutError, SendError};
use crate::network::reply::InboundRouter;
use crate::protocol::network::NetworkMessage;
use crate::protos::circuit::{
AdminDirectMessage, CircuitDirectMessage, CircuitError, CircuitMessage, CircuitMessageType,
ServiceConnectResponse, ServiceDisconnectResponse,
};
use crate::protos::prelude::*;
use crate::service::{
FactoryCreateError, Service, ServiceFactory, ServiceMessageContext,
StandardServiceNetworkRegistry,
};
use crate::threading::lifecycle::ShutdownHandle;
use crate::transport::Connection;
pub use self::builder::ServiceOrchestratorBuilder;
pub use self::error::{
AddServiceError, InitializeServiceError, ListServicesError, NewOrchestratorError,
OrchestratorError, ShutdownServiceError,
};
pub use self::runnable::RunnableServiceOrchestrator;
const TIMEOUT_SEC: u64 = 2;
#[derive(Clone, Eq, Hash, PartialEq, Debug)]
pub struct ServiceDefinition {
pub circuit: String,
pub service_id: String,
pub service_type: String,
}
impl std::fmt::Display for ServiceDefinition {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}::{} ({})",
self.circuit, self.service_id, self.service_type
)
}
}
pub trait OrchestratableService: Service {
fn clone_box(&self) -> Box<dyn OrchestratableService>;
fn as_service(&self) -> &dyn Service;
}
impl Clone for Box<dyn OrchestratableService> {
fn clone(&self) -> Self {
self.clone_box()
}
}
pub trait OrchestratableServiceFactory: ServiceFactory {
fn create_orchestratable_service(
&self,
service_id: String,
service_type: &str,
circuit_id: &str,
args: HashMap<String, String>,
) -> Result<Box<dyn OrchestratableService>, FactoryCreateError>;
}
struct ManagedService {
pub service: Box<dyn OrchestratableService>,
pub registry: StandardServiceNetworkRegistry,
}
pub struct ServiceOrchestrator {
services: Arc<Mutex<HashMap<ServiceDefinition, ManagedService>>>,
service_factories: Vec<Box<dyn OrchestratableServiceFactory>>,
supported_service_types: Vec<String>,
network_sender: Sender<Vec<u8>>,
inbound_router: InboundRouter<CircuitMessageType>,
stopped_services: Arc<Mutex<HashMap<ServiceDefinition, Box<dyn OrchestratableService>>>>,
running: Arc<AtomicBool>,
join_handles: Option<JoinHandles<Result<(), OrchestratorError>>>,
}
impl ServiceOrchestrator {
#[deprecated(
since = "0.5.1",
note = "please use `ServiceOrchestratorBuilder` instead"
)]
pub fn new(
service_factories: Vec<Box<dyn OrchestratableServiceFactory>>,
connection: Box<dyn Connection>,
incoming_capacity: usize,
outgoing_capacity: usize,
channel_capacity: usize,
) -> Result<Self, NewOrchestratorError> {
let mut builder = builder::ServiceOrchestratorBuilder::new()
.with_connection(connection)
.with_incoming_capacity(incoming_capacity)
.with_outgoing_capacity(outgoing_capacity)
.with_channel_capacity(channel_capacity);
for service_factory in service_factories.into_iter() {
builder = builder.with_service_factory(service_factory);
}
builder
.build()
.map_err(|e| NewOrchestratorError(Box::new(e)))?
.run()
.map_err(|e| NewOrchestratorError(Box::new(e)))
}
pub fn take_shutdown_handle(&mut self) -> Option<ServiceOrchestratorShutdownHandle> {
let join_handles = self.join_handles.take()?;
Some(ServiceOrchestratorShutdownHandle {
services: Arc::clone(&self.services),
join_handles: Some(join_handles),
running: Arc::clone(&self.running),
})
}
pub fn initialize_service(
&self,
service_definition: ServiceDefinition,
args: HashMap<String, String>,
) -> Result<(), InitializeServiceError> {
let factory = self
.service_factories
.iter()
.find(|factory| {
factory
.available_service_types()
.contains(&service_definition.service_type)
})
.ok_or(InitializeServiceError::UnknownType)?;
let mut service = factory.create_orchestratable_service(
service_definition.service_id.clone(),
service_definition.service_type.as_str(),
service_definition.circuit.as_str(),
args,
)?;
let registry = StandardServiceNetworkRegistry::new(
service_definition.circuit.clone(),
self.network_sender.clone(),
self.inbound_router.clone(),
);
service
.start(®istry)
.map_err(|err| InitializeServiceError::InitializationFailed(Box::new(err)))?;
self.services
.lock()
.map_err(|_| InitializeServiceError::LockPoisoned)?
.insert(service_definition, ManagedService { service, registry });
Ok(())
}
pub fn stop_service(
&self,
service_definition: &ServiceDefinition,
) -> Result<(), ShutdownServiceError> {
let ManagedService {
mut service,
registry,
} = self
.services
.lock()
.map_err(|_| ShutdownServiceError::LockPoisoned)?
.remove(service_definition)
.ok_or(ShutdownServiceError::UnknownService)?;
service.stop(®istry).map_err(|err| {
ShutdownServiceError::ShutdownFailed((service_definition.clone(), Box::new(err)))
})?;
self.stopped_services
.lock()
.map_err(|_| ShutdownServiceError::LockPoisoned)?
.insert(service_definition.clone(), service);
Ok(())
}
pub fn purge_service(
&self,
service_definition: &ServiceDefinition,
) -> Result<(), InternalError> {
if let Some(mut service) = self
.stopped_services
.lock()
.map_err(|_| {
InternalError::with_message("Orchestrator stopped service lock was poisoned".into())
})?
.remove(service_definition)
{
service.purge()
} else {
Ok(())
}
}
pub fn shutdown_all_services(&self) -> Result<(), ShutdownServiceError> {
let mut services = self
.services
.lock()
.map_err(|_| ShutdownServiceError::LockPoisoned)?;
for (service_definition, managed_service) in services.drain() {
let ManagedService {
mut service,
registry,
} = managed_service;
service.stop(®istry).map_err(|err| {
ShutdownServiceError::ShutdownFailed((service_definition.clone(), Box::new(err)))
})?;
service.destroy().map_err(|err| {
ShutdownServiceError::ShutdownFailed((service_definition, Box::new(err)))
})?;
}
self.running.store(false, Ordering::SeqCst);
Ok(())
}
pub fn list_services(
&self,
circuits: Vec<String>,
service_types: Vec<String>,
) -> Result<Vec<ServiceDefinition>, ListServicesError> {
Ok(self
.services
.lock()
.map_err(|_| ListServicesError::LockPoisoned)?
.iter()
.filter_map(|(service, _)| {
if (circuits.is_empty() || circuits.contains(&service.circuit))
&& (service_types.is_empty() || service_types.contains(&service.service_type))
{
Some(service)
} else {
None
}
})
.cloned()
.collect())
}
pub fn add_stopped_service(
&self,
service_definition: ServiceDefinition,
args: HashMap<String, String>,
) -> Result<(), AddServiceError> {
let factory = self
.service_factories
.iter()
.find(|factory| {
factory
.available_service_types()
.contains(&service_definition.service_type)
})
.ok_or(AddServiceError::UnknownType)?;
let service = factory.create_orchestratable_service(
service_definition.service_id.clone(),
service_definition.service_type.as_str(),
service_definition.circuit.as_str(),
args,
)?;
self.stopped_services
.lock()
.map_err(|_| AddServiceError::LockPoisoned)?
.insert(service_definition, service);
Ok(())
}
pub fn supported_service_types(&self) -> &[String] {
&self.supported_service_types
}
}
pub struct JoinHandles<T> {
join_handles: Vec<JoinHandle<T>>,
}
impl<T> JoinHandles<T> {
fn new(join_handles: Vec<JoinHandle<T>>) -> Self {
Self { join_handles }
}
pub fn join_all(self) -> thread::Result<Vec<T>> {
let mut res = Vec::with_capacity(self.join_handles.len());
for jh in self.join_handles.into_iter() {
res.push(jh.join()?);
}
Ok(res)
}
}
pub struct ServiceOrchestratorShutdownHandle {
services: Arc<Mutex<HashMap<ServiceDefinition, ManagedService>>>,
join_handles: Option<JoinHandles<Result<(), OrchestratorError>>>,
running: Arc<AtomicBool>,
}
impl ShutdownHandle for ServiceOrchestratorShutdownHandle {
fn signal_shutdown(&mut self) {
match self.services.lock() {
Ok(mut services) => {
for (service_definition, managed_service) in services.drain() {
let ManagedService {
mut service,
registry,
} = managed_service;
if let Err(err) = service.stop(®istry) {
error!("Unable to stop service {}: {}", service_definition, err);
}
if let Err(err) = service.destroy() {
error!("Unable to destroy service {}: {}", service_definition, err);
}
}
}
Err(_) => {
error!("Service orchestrator service lock was poisoned; unable to cleanly shutdown")
}
}
self.running.store(false, Ordering::SeqCst);
}
fn wait_for_shutdown(mut self) -> Result<(), InternalError> {
if let Some(join_handles) = self.join_handles.take() {
match join_handles.join_all() {
Ok(results) => {
results
.into_iter()
.filter(Result::is_err)
.map(Result::unwrap_err)
.for_each(|err| {
error!("{}", err);
});
}
Err(_) => {
return Err(crate::error::InternalError::with_message(
"Unable to join service processor threads".into(),
));
}
}
}
Ok(())
}
}
fn run_incoming_loop(
incoming_mesh: Mesh,
incoming_running: Arc<AtomicBool>,
mut inbound_router: InboundRouter<CircuitMessageType>,
) -> Result<(), OrchestratorError> {
while incoming_running.load(Ordering::SeqCst) {
let timeout = Duration::from_secs(TIMEOUT_SEC);
let message_bytes = match incoming_mesh.recv_timeout(timeout) {
Ok(envelope) => Vec::from(envelope),
Err(MeshRecvTimeoutError::Timeout) => continue,
Err(MeshRecvTimeoutError::Disconnected) => {
error!("Mesh Disconnected");
break;
}
Err(MeshRecvTimeoutError::PoisonedLock) => {
error!("Mesh lock was poisoned");
break;
}
Err(MeshRecvTimeoutError::Shutdown) => {
error!("Mesh has shutdown");
break;
}
};
let msg = NetworkMessage::from_bytes(&message_bytes)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
match msg {
NetworkMessage::Circuit(payload) => {
let mut circuit_msg: CircuitMessage = Message::parse_from_bytes(&payload)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
match circuit_msg.get_message_type() {
CircuitMessageType::ADMIN_DIRECT_MESSAGE => {
let admin_direct_message: AdminDirectMessage =
Message::parse_from_bytes(circuit_msg.get_payload())
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
inbound_router
.route(
admin_direct_message.get_correlation_id(),
Ok((
CircuitMessageType::ADMIN_DIRECT_MESSAGE,
circuit_msg.take_payload(),
)),
)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
}
CircuitMessageType::CIRCUIT_DIRECT_MESSAGE => {
let direct_message: CircuitDirectMessage =
Message::parse_from_bytes(circuit_msg.get_payload())
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
inbound_router
.route(
direct_message.get_correlation_id(),
Ok((
CircuitMessageType::CIRCUIT_DIRECT_MESSAGE,
circuit_msg.take_payload(),
)),
)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
}
CircuitMessageType::SERVICE_CONNECT_RESPONSE => {
let response: ServiceConnectResponse =
Message::parse_from_bytes(circuit_msg.get_payload())
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
inbound_router
.route(
response.get_correlation_id(),
Ok((
CircuitMessageType::SERVICE_CONNECT_RESPONSE,
circuit_msg.take_payload(),
)),
)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
}
CircuitMessageType::SERVICE_DISCONNECT_RESPONSE => {
let response: ServiceDisconnectResponse =
Message::parse_from_bytes(circuit_msg.get_payload())
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
inbound_router
.route(
response.get_correlation_id(),
Ok((
CircuitMessageType::SERVICE_DISCONNECT_RESPONSE,
circuit_msg.take_payload(),
)),
)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
}
CircuitMessageType::CIRCUIT_ERROR_MESSAGE => {
let response: CircuitError =
Message::parse_from_bytes(circuit_msg.get_payload())
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
warn!("Received circuit error message {:?}", response);
}
msg_type => warn!("Received unimplemented message: {:?}", msg_type),
}
}
NetworkMessage::NetworkHeartbeat(_) => trace!("Received network heartbeat"),
_ => warn!("Received unimplemented message"),
}
}
Ok(())
}
fn run_inbound_loop(
services: Arc<Mutex<HashMap<ServiceDefinition, ManagedService>>>,
inbound_receiver: Receiver<Result<(CircuitMessageType, Vec<u8>), channel::RecvError>>,
inbound_running: Arc<AtomicBool>,
) -> Result<(), OrchestratorError> {
let timeout = Duration::from_secs(TIMEOUT_SEC);
while inbound_running.load(Ordering::SeqCst) {
let service_message = match inbound_receiver.recv_timeout(timeout) {
Ok(msg) => msg,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => continue,
Err(err) => {
debug!("inbound sender dropped; ending inbound message thread");
return Err(OrchestratorError::Internal(Box::new(err)));
}
}
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
match service_message {
(CircuitMessageType::ADMIN_DIRECT_MESSAGE, msg) => {
let mut admin_direct_message: AdminDirectMessage = Message::parse_from_bytes(&msg)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
let services = services
.lock()
.map_err(|_| OrchestratorError::LockPoisoned)?;
match services.iter().find_map(|(service_def, managed_service)| {
if service_def.circuit == admin_direct_message.get_circuit()
&& service_def.service_id == admin_direct_message.get_recipient()
{
Some(&managed_service.service)
} else {
None
}
}) {
Some(service) => {
let msg_context = ServiceMessageContext {
sender: admin_direct_message.take_sender(),
circuit: admin_direct_message.take_circuit(),
correlation_id: admin_direct_message.take_correlation_id(),
};
if let Err(err) =
service.handle_message(admin_direct_message.get_payload(), &msg_context)
{
error!("unable to handle admin direct message: {}", err);
}
}
None => warn!(
"Service with id {} does not exist on circuit {}; ignoring message",
admin_direct_message.get_recipient(),
admin_direct_message.get_circuit(),
),
}
}
(CircuitMessageType::CIRCUIT_DIRECT_MESSAGE, msg) => {
let mut circuit_direct_message: CircuitDirectMessage =
Message::parse_from_bytes(&msg)
.map_err(|err| OrchestratorError::Internal(Box::new(err)))?;
let services = services
.lock()
.map_err(|_| OrchestratorError::LockPoisoned)?;
match services.iter().find_map(|(service_def, managed_service)| {
if service_def.circuit == circuit_direct_message.get_circuit()
&& service_def.service_id == circuit_direct_message.get_recipient()
{
Some(&managed_service.service)
} else {
None
}
}) {
Some(service) => {
let msg_context = ServiceMessageContext {
sender: circuit_direct_message.take_sender(),
circuit: circuit_direct_message.take_circuit(),
correlation_id: circuit_direct_message.take_correlation_id(),
};
if let Err(err) = service
.handle_message(circuit_direct_message.get_payload(), &msg_context)
{
error!("unable to handle direct message: {}", err);
}
}
None => warn!(
"Service with id {} does not exist on circuit {}; ignoring message",
circuit_direct_message.get_recipient(),
circuit_direct_message.get_circuit(),
),
}
}
(msg_type, _) => warn!(
"Received message ({:?}) that does not have a correlation id",
msg_type
),
}
}
Ok(())
}
fn run_outgoing_loop(
outgoing_mesh: Mesh,
outgoing_running: Arc<AtomicBool>,
outgoing_receiver: Receiver<Vec<u8>>,
mesh_id: String,
) -> Result<(), OrchestratorError> {
while outgoing_running.load(Ordering::SeqCst) {
let timeout = Duration::from_secs(TIMEOUT_SEC);
let message_bytes = match outgoing_receiver.recv_timeout(timeout) {
Ok(msg) => msg,
Err(crossbeam_channel::RecvTimeoutError::Timeout) => continue,
Err(err) => {
error!("channel dropped while handling outgoing messages: {}", err);
break;
}
};
match outgoing_mesh.send(Envelope::new(mesh_id.to_string(), message_bytes)) {
Ok(()) => (),
Err(SendError::Full(_)) => error!("Unable to send outgoing message, queue full"),
Err(err) => return Err(OrchestratorError::Internal(Box::new(err))),
}
}
Ok(())
}