use std::{
fmt::{self, Debug, Formatter},
sync::Arc,
};
use futures::{future::BoxFuture, Future, FutureExt};
use serde::Serialize;
use tower::{Layer, Service};
pub mod shutdown;
use crate::{
backend::Backend,
error::BoxDynError,
request::Request,
worker::{Context, Event, EventHandler, Ready, Worker, WorkerId},
};
use self::shutdown::Shutdown;
pub struct Monitor {
futures: Vec<BoxFuture<'static, ()>>,
workers: Vec<Worker<Context>>,
terminator: Option<BoxFuture<'static, ()>>,
shutdown: Shutdown,
event_handler: EventHandler,
}
impl Debug for Monitor {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Monitor")
.field("shutdown", &"[Graceful shutdown listener]")
.field("workers", &self.futures.len())
.finish()
}
}
impl Monitor {
pub fn register<Req, S, P, Res, Ctx>(mut self, mut worker: Worker<Ready<S, P>>) -> Self
where
S: Service<Request<Req, Ctx>, Response = Res> + Send + 'static,
S::Future: Send,
S::Response: Send + Sync + Serialize + 'static,
S::Error: Send + Sync + 'static + Into<BoxDynError>,
P: Backend<Request<Req, Ctx>, Res> + Send + 'static,
P::Stream: Unpin + Send + 'static,
P::Layer: Layer<S> + Send,
<P::Layer as Layer<S>>::Service: Service<Request<Req, Ctx>, Response = Res> + Send,
<<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
<<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Error:
Send + Sync + Into<BoxDynError>,
Req: Send + Sync + 'static,
Ctx: Send + Sync + 'static,
Res: 'static,
{
worker.state.shutdown = Some(self.shutdown.clone());
worker.state.event_handler = self.event_handler.clone();
let runnable = worker.run();
let handle = runnable.get_handle();
self.workers.push(handle);
self.futures.push(runnable.boxed());
self
}
#[deprecated(
since = "0.6.0",
note = "Consider using the `.register` as workers now offer concurrency by default"
)]
pub fn register_with_count<Req, S, P, Res, Ctx>(
mut self,
count: usize,
worker: Worker<Ready<S, P>>,
) -> Self
where
S: Service<Request<Req, Ctx>, Response = Res> + Send + 'static + Clone,
S::Future: Send,
S::Response: Send + Sync + Serialize + 'static,
S::Error: Send + Sync + 'static + Into<BoxDynError>,
P: Backend<Request<Req, Ctx>, Res> + Send + 'static + Clone,
P::Stream: Unpin + Send + 'static,
P::Layer: Layer<S> + Send,
<P::Layer as Layer<S>>::Service: Service<Request<Req, Ctx>, Response = Res> + Send,
<<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Future: Send,
<<P::Layer as Layer<S>>::Service as Service<Request<Req, Ctx>>>::Error:
Send + Sync + Into<BoxDynError>,
Req: Send + Sync + 'static,
Ctx: Send + Sync + 'static,
Res: 'static,
{
for index in 0..count {
let mut worker = worker.clone();
let name = format!("{}-{index}", worker.id());
worker.id = WorkerId::new(name);
self = self.register(worker);
}
self
}
pub async fn run_with_signal<S>(self, signal: S) -> std::io::Result<()>
where
S: Send + Future<Output = std::io::Result<()>>,
{
let shutdown = self.shutdown.clone();
let shutdown_after = self.shutdown.shutdown_after(signal);
if let Some(terminator) = self.terminator {
let _res = futures::future::select(
futures::future::join_all(self.futures)
.map(|_| shutdown.start_shutdown())
.boxed(),
async {
let _res = shutdown_after.await;
terminator.await;
}
.boxed(),
)
.await;
} else {
let runner = self.run();
let _res = futures::join!(shutdown_after, runner); }
Ok(())
}
pub async fn run(self) -> std::io::Result<()> {
let shutdown = self.shutdown.clone();
let shutdown_future = self.shutdown.boxed().map(|_| ());
futures::join!(
futures::future::join_all(self.futures).map(|_| shutdown.start_shutdown()),
shutdown_future,
);
Ok(())
}
pub fn on_event<F: Fn(Worker<Event>) + Send + Sync + 'static>(self, f: F) -> Self {
let _ = self.event_handler.write().map(|mut res| {
let _ = res.insert(Box::new(f));
});
self
}
}
impl Default for Monitor {
fn default() -> Self {
Self {
shutdown: Shutdown::new(),
futures: Vec::new(),
terminator: None,
event_handler: Arc::default(),
workers: Vec::new(),
}
}
}
impl Monitor {
pub fn new() -> Self {
Self::default()
}
#[cfg(feature = "sleep")]
pub fn shutdown_timeout(self, duration: std::time::Duration) -> Self {
self.with_terminator(crate::sleep(duration))
}
pub fn with_terminator(mut self, fut: impl Future<Output = ()> + Send + 'static) -> Self {
self.terminator = Some(fut.boxed());
self
}
}
#[cfg(test)]
mod tests {
use crate::test_utils::apalis_test_service_fn;
use std::{io, time::Duration};
use tokio::time::sleep;
use crate::{
builder::{WorkerBuilder, WorkerFactory},
memory::MemoryStorage,
monitor::Monitor,
mq::MessageQueue,
request::Request,
test_message_queue,
test_utils::TestWrapper,
};
test_message_queue!(MemoryStorage::new());
#[tokio::test]
async fn it_works_with_workers() {
let backend = MemoryStorage::new();
let mut handle = backend.clone();
tokio::spawn(async move {
for i in 0..10 {
handle.enqueue(i).await.unwrap();
}
});
let service = tower::service_fn(|request: Request<u32, ()>| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(service);
let monitor: Monitor = Monitor::new();
let monitor = monitor.register(worker);
let shutdown = monitor.shutdown.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(1500)).await;
shutdown.start_shutdown();
});
monitor.run().await.unwrap();
}
#[tokio::test]
async fn test_monitor_run() {
let backend = MemoryStorage::new();
let mut handle = backend.clone();
tokio::spawn(async move {
for i in 0..10 {
handle.enqueue(i).await.unwrap();
}
});
let service = tower::service_fn(|request: Request<u32, _>| async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok::<_, io::Error>(request)
});
let worker = WorkerBuilder::new("rango-tango")
.backend(backend)
.build(service);
let monitor: Monitor = Monitor::new();
let monitor = monitor.on_event(|e| {
println!("{e:?}");
});
let monitor = monitor.register(worker);
assert_eq!(monitor.futures.len(), 1);
let shutdown = monitor.shutdown.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(1000)).await;
shutdown.start_shutdown();
});
let result = monitor.run().await;
sleep(Duration::from_millis(1000)).await;
assert!(result.is_ok());
}
}