use crate::services::broker::MessageBroker;
use crate::services::director::ServicesDirector;
use crate::services::envelope::ServiceEnvelope;
use crate::services::service::{Service, ServiceConcurrency};
use crate::services::system_facade::ServiceAssistant;
use crate::system_director::SystemDirector;
use async_channel::{unbounded as channel, Receiver, Sender};
use async_std::sync::Mutex;
use async_std::{sync::Arc, task};
use dashmap::mapref::entry::Entry::Occupied;
use std::any::Any;
use std::any::TypeId;
use std::fmt::Debug;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::SystemTime;
#[async_trait::async_trait]
pub(crate) trait Manager: Send + Sync + Debug {
fn end(&self);
fn get_type_id(&self) -> TypeId;
async fn get_sender_as_any(&mut self) -> Box<(dyn Any + Send)>;
fn get_statistics(&self) -> ServiceReport;
fn clone(&self) -> Box<dyn Manager>;
}
pub struct ServiceReport {
pub last_message_on: SystemTime,
pub enqueued_messages: usize,
}
#[derive(Debug)]
pub(crate) enum ServiceManagerCommand<S: Service> {
Dispatch(Box<dyn ServiceEnvelope<Service = S>>),
End,
}
#[derive(Debug)]
pub(crate) struct ServiceManager<S: Service> {
senders: Arc<Vec<Sender<ServiceManagerCommand<S>>>>,
current: Arc<Mutex<usize>>,
is_ending: Arc<AtomicBool>,
active_services: Arc<AtomicUsize>,
}
impl<S: Service> ServiceManager<S> {
pub async fn new(
director: ServicesDirector,
system_director: SystemDirector,
broker: MessageBroker,
) -> ServiceManager<S> {
let system_facade = ServiceAssistant::<S>::new(system_director.clone(), broker.clone());
let (service, service_conf) = S::initialize(&system_facade).await;
let service = Arc::new(service);
let mut wait_for_service = true;
let concurrency = match service_conf.concurrency {
ServiceConcurrency::Automatic => {
if std::mem::size_of::<S>() == 0 {
wait_for_service = false;
1
} else {
num_cpus::get()
}
}
ServiceConcurrency::None => 1,
ServiceConcurrency::OnePerCore => num_cpus::get(),
ServiceConcurrency::OneEachTwoCore => num_cpus::get() / 2,
ServiceConcurrency::Fixed(quantity) => quantity,
ServiceConcurrency::Unlimited => {
wait_for_service = false;
1
}
};
let mut senders = Vec::new();
let mut receivers = Vec::new();
for _ in 0..concurrency {
let (sender, receiver) = channel::<ServiceManagerCommand<S>>();
senders.push(sender);
receivers.push(receiver);
}
let senders = Arc::new(senders);
let current = Arc::new(Mutex::new(0));
let active_services = Arc::new(AtomicUsize::new(concurrency));
let manager = ServiceManager {
senders: senders.clone(),
current,
is_ending: Arc::new(AtomicBool::new(false)),
active_services: active_services.clone(),
};
for (receiver, sender) in receivers.iter().zip(senders.as_ref()) {
service_loop(
receiver.clone(),
sender.clone(),
service.clone(),
director.clone(),
active_services.clone(),
system_director.clone(),
broker.clone(),
wait_for_service,
);
}
manager
}
pub(crate) async fn get_sender(&self) -> Sender<ServiceManagerCommand<S>> {
let current = self.get_next_sender_index().await;
match self.senders.get(current) {
Some(sender) => sender.clone(),
_ => unreachable!(),
}
}
pub(crate) async fn get_next_sender_index(&self) -> usize {
if self.senders.len() == 1 {
return 0;
}
let mut current = self.current.lock().await;
*current += 1;
if *current >= self.senders.len() {
*current = 0;
}
*current
}
fn end(&self) {
for sender in self.senders.iter() {
let sender = sender.clone();
task::spawn(async move {
let _ = sender.send(ServiceManagerCommand::End).await;
});
}
}
}
fn service_loop<S: Service>(
receiver: Receiver<ServiceManagerCommand<S>>,
sender: Sender<ServiceManagerCommand<S>>,
service: Arc<S>,
director: ServicesDirector,
active_services: Arc<AtomicUsize>,
system_director: SystemDirector,
broker: MessageBroker,
wait_for_service: bool,
) {
task::spawn(async move {
let system_facade = Arc::new(ServiceAssistant::<S>::new(system_director, broker));
loop {
if let Ok(command) = receiver.recv().await {
match command {
ServiceManagerCommand::Dispatch(envelope) => {
dispatch::<S>(&service, &system_facade, envelope, wait_for_service).await;
}
ServiceManagerCommand::End => {
match recv_until_command_or_end!(receiver, ServiceManagerCommand::End).await
{
None | Some(ServiceManagerCommand::End) => {
let entry = director
.get_blocking_manager_entry(std::any::TypeId::of::<S>());
match recv_until_command_or_end!(
receiver,
ServiceManagerCommand::End
)
.await
{
Some(ServiceManagerCommand::Dispatch(envelope)) => {
drop(entry);
let _ = sender.send(ServiceManagerCommand::End).await;
dispatch::<S>(
&service,
&system_facade,
envelope,
wait_for_service,
)
.await;
}
None | Some(ServiceManagerCommand::End) => {
let previously_active =
active_services.fetch_sub(1, Ordering::Relaxed);
if let Occupied(entry) = entry {
if previously_active <= 1 {
entry.remove();
director.signal_manager_removed().await;
}
}
break;
}
}
}
Some(ServiceManagerCommand::Dispatch(envelope)) => {
let _ = sender.send(ServiceManagerCommand::End).await;
dispatch::<S>(&service, &system_facade, envelope, wait_for_service)
.await;
}
}
}
}
}
}
});
}
async fn dispatch<'a, S: Service>(
service: &'a Arc<S>,
system_facade: &'a Arc<ServiceAssistant<S>>,
mut envelope: Box<dyn ServiceEnvelope<Service = S>>,
wait_for_service: bool,
) {
if wait_for_service {
envelope.dispatch(&service, &system_facade).await;
} else {
let service = service.clone();
let system_facade = system_facade.clone();
task::spawn(async move { envelope.dispatch(&service, &system_facade).await });
}
}
#[async_trait::async_trait]
impl<S: Service> Manager for ServiceManager<S> {
async fn get_sender_as_any(&mut self) -> Box<(dyn Any + Send + 'static)> {
Box::new(self.get_sender().await)
}
fn get_type_id(&self) -> TypeId {
std::any::TypeId::of::<S>()
}
fn end(&self) {
self.end();
}
fn get_statistics(&self) -> ServiceReport {
ServiceReport {
last_message_on: SystemTime::now(),
enqueued_messages: 10000,
}
}
fn clone(&self) -> Box<dyn Manager> {
Box::new(Clone::clone(self))
}
}
impl<S: Service> Clone for ServiceManager<S> {
fn clone(&self) -> ServiceManager<S> {
ServiceManager {
senders: self.senders.clone(),
current: self.current.clone(),
is_ending: self.is_ending.clone(),
active_services: self.active_services.clone(),
}
}
}