use std::{
any::Any,
fmt::{self, Debug, Formatter},
sync::{Arc, RwLock},
};
use futures::{future::BoxFuture, Future, FutureExt};
use tower::Service;
mod shutdown;
use crate::{
error::BoxDynError,
executor::Executor,
request::Request,
worker::{Context, Event, Ready, Worker},
Backend,
};
use self::shutdown::Shutdown;
pub struct Monitor<E> {
workers: Vec<Worker<Context<E>>>,
executor: E,
context: MonitorContext,
terminator: Option<BoxFuture<'static, ()>>,
}
#[derive(Clone)]
pub struct MonitorContext {
#[allow(clippy::type_complexity)]
event_handler: Arc<RwLock<Option<Box<dyn Fn(Worker<Event>) + Send + Sync>>>>,
shutdown: Shutdown,
}
impl fmt::Debug for MonitorContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MonitorContext")
.field("events", &self.event_handler.type_id())
.field("shutdown", &"[Shutdown]")
.finish()
}
}
impl MonitorContext {
fn new() -> MonitorContext {
Self {
event_handler: Arc::default(),
shutdown: Shutdown::new(),
}
}
pub fn shutdown(&self) -> &Shutdown {
&self.shutdown
}
pub fn notify(&self, event: Worker<Event>) {
let _ = self
.event_handler
.as_ref()
.read()
.map(|caller| caller.as_ref().map(|caller| caller(event)));
}
}
impl<E> Debug for Monitor<E> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Monitor")
.field("shutdown", &"[Graceful shutdown listener]")
.field("workers", &self.workers)
.field("executor", &std::any::type_name::<E>())
.finish()
}
}
impl<E: Executor + Clone + Send + 'static + Sync> Monitor<E> {
pub fn register<
J: Send + Sync + 'static,
S: Service<Request<J>> + Send + 'static + Clone,
P: Backend<Request<J>> + 'static,
>(
mut self,
worker: Worker<Ready<S, P>>,
) -> Self
where
S::Future: Send,
S::Response: 'static,
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
{
self.workers.push(worker.with_monitor(&self));
self
}
pub fn register_with_count<
J: Send + Sync + 'static,
S: Service<Request<J>> + Send + 'static + Clone,
P: Backend<Request<J>> + 'static,
>(
mut self,
count: usize,
worker: Worker<Ready<S, P>>,
) -> Self
where
S::Future: Send,
S::Response: 'static,
S::Error: Send + Sync + 'static + Into<BoxDynError>,
<P as Backend<Request<J>>>::Stream: Unpin + Send + 'static,
{
let workers = worker.with_monitor_instances(count, &self);
self.workers.extend(workers);
self
}
pub async fn run_with_signal<S: Future<Output = std::io::Result<()>>>(
self,
signal: S,
) -> std::io::Result<()>
where
E: Executor + Clone + Send + 'static,
{
let shutdown = self.context.shutdown.clone();
let shutdown_after = self.context.shutdown.shutdown_after(signal);
let runner = self.run();
futures::try_join!(shutdown_after, runner)?;
shutdown.await;
Ok(())
}
pub async fn run(self) -> std::io::Result<()>
where
E: Executor + Clone + Send + 'static,
{
let mut futures = Vec::new();
for worker in self.workers {
futures.push(worker.run().boxed());
}
let shutdown_future = self.context.shutdown.boxed().map(|_| ());
if let Some(terminator) = self.terminator {
let runner = futures::future::select(
futures::future::join_all(futures).map(|_| ()),
shutdown_future,
);
futures::join!(runner, terminator);
} else {
futures::join!(
futures::future::join_all(futures).map(|_| ()),
shutdown_future,
);
}
Ok(())
}
pub fn on_event<F: Fn(Worker<Event>) + Send + Sync + 'static>(self, f: F) -> Self {
let _ = self.context.event_handler.write().map(|mut res| {
let _ = res.insert(Box::new(f));
});
self
}
pub fn executor(&self) -> &E {
&self.executor
}
pub(crate) fn context(&self) -> &MonitorContext {
&self.context
}
}
impl<E: Default> Default for Monitor<E> {
fn default() -> Self {
Self {
executor: E::default(),
context: MonitorContext::new(),
workers: Vec::new(),
terminator: None,
}
}
}
impl<E> Monitor<E> {
pub fn new() -> Self
where
E: Default,
{
Self::new_with_executor(E::default())
}
pub fn new_with_executor(executor: E) -> Self {
Self {
context: MonitorContext::new(),
workers: Vec::new(),
executor,
terminator: None,
}
}
pub fn set_executor<NE: Executor>(self, executor: NE) -> Monitor<NE> {
if !self.workers.is_empty() {
panic!("Tried changing executor when already loaded some workers");
}
Monitor {
context: self.context,
workers: Vec::new(),
executor,
terminator: self.terminator,
}
}
#[cfg(feature = "sleep")]
pub fn shutdown_timeout(self, duration: std::time::Duration) -> Self {
self.with_terminator(crate::sleep(duration))
}
pub fn with_terminator(mut self, fut: impl Future<Output = ()> + Send + 'static) -> Self {
self.terminator = Some(fut.boxed());
self
}
}
#[cfg(test)]
mod tests {
use std::{io, time::Duration};
use tokio::time::sleep;
use crate::{
builder::{WorkerBuilder, WorkerFactory},
memory::MemoryStorage,
monitor::Monitor,
mq::MessageQueue,
request::Request,
TestExecutor,
};
#[tokio::test]
async fn it_works() {
let backend = MemoryStorage::new();
let handle = backend.clone();
tokio::spawn(async move {
for i in 0..10 {
handle.enqueue(i).await.unwrap();
}
});
let service = tower::service_fn(|request: Request<u32>| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.source(backend)
.build(service);
let monitor: Monitor<TestExecutor> = Monitor::new();
let monitor = monitor.register(worker);
let shutdown = monitor.context.shutdown.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(1500)).await;
shutdown.shutdown();
});
monitor.run().await.unwrap();
}
#[tokio::test]
async fn test_monitor_run() {
let backend = MemoryStorage::new();
let handle = backend.clone();
tokio::spawn(async move {
for i in 0..1000 {
handle.enqueue(i).await.unwrap();
}
});
let service = tower::service_fn(|request: Request<u32>| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.source(backend)
.build(service);
let monitor: Monitor<TestExecutor> = Monitor::new();
let monitor = monitor.on_event(|e| {
println!("{e:?}");
});
let monitor = monitor.register_with_count(5, worker);
assert_eq!(monitor.workers.len(), 5);
let shutdown = monitor.context.shutdown.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(1000)).await;
shutdown.shutdown();
});
let result = monitor.run().await;
sleep(Duration::from_millis(1000)).await;
assert!(result.is_ok());
}
}