use super::{Actor, Context, Handler, Message, Recipient, Worker};
use std::fmt::Debug;
use tokio::task::JoinHandle;
use std::fmt;
#[cfg(feature = "broker")]
use crate::worker::broker::Broker;
#[derive(Debug)]
pub enum WorkerManagement {
Status,
Restart,
Terminate,
}
impl<T> Default for Monitor<T> {
fn default() -> Self {
Self::new()
}
}
pub struct Monitor<R> {
workers: Vec<R>,
event_handlers: Vec<Box<dyn WorkerListener>>,
}
impl<R> fmt::Debug for Monitor<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Monitor")
.field("workers", &self.workers.len())
.field("listeners", &self.event_handlers.len())
.finish()
}
}
#[async_trait::async_trait]
impl Actor for Monitor<Recipient<WorkerManagement>> {
async fn on_start(&mut self, _ctx: &mut Context<Self>) {
#[cfg(feature = "broker")]
Broker::global().subscribe::<WorkerMessage, _>(_ctx);
}
}
impl<T> Monitor<T> {
pub fn new() -> Self {
Self {
workers: Vec::new(),
event_handlers: Vec::new(),
}
}
}
impl Monitor<JoinHandle<Recipient<WorkerManagement>>> {
pub fn register<W>(mut self, worker: W) -> Self
where
W: Worker,
{
let addr = tokio::spawn(async {
let addr = worker.start().await;
addr.recipient()
});
self.workers.push(addr);
self
}
pub fn register_with_count<F, W: Worker>(self, count: usize, factory: F) -> Self
where
F: Fn(usize) -> W,
{
let mut this = self;
for index in 0..count {
this = this.register(factory(index));
}
this
}
pub async fn run_without_signals(self) -> anyhow::Result<()> {
let mut workers = Vec::new();
for worker in self.workers {
workers.push(worker.await?);
}
let monitor = Monitor {
workers,
event_handlers: self.event_handlers,
};
monitor.start().await;
Ok(())
}
pub async fn run(self) -> anyhow::Result<()> {
self.run_without_signals().await?;
log::debug!("Listening shut down command (ctrl + c)");
tokio::signal::ctrl_c()
.await
.expect("failed to listen for event");
log::debug!("Workers shutdown complete");
Ok(())
}
}
pub trait WorkerListener: Send {
fn on_event(&self, worker_id: &str, event: &WorkerEvent);
}
impl<K> Monitor<K> {
#[cfg(feature = "broker")]
pub fn event_handler<H: 'static>(mut self, handle: H) -> Self
where
H: WorkerListener,
{
self.event_handlers.push(Box::new(handle));
self
}
}
#[derive(Debug, Clone)]
pub enum WorkerEvent {
Error(String),
Job {
id: String,
},
}
#[derive(Debug, Clone)]
pub struct WorkerMessage {
event: WorkerEvent,
worker_id: String,
}
impl WorkerMessage {
pub fn new(worker_id: String, event: WorkerEvent) -> Self {
WorkerMessage { event, worker_id }
}
}
impl Message for WorkerMessage {
type Result = ();
}
#[async_trait::async_trait]
impl Handler<WorkerMessage> for Monitor<Recipient<WorkerManagement>> {
type Result = ();
async fn handle(&mut self, msg: WorkerMessage) -> Self::Result {
for event_handler in &self.event_handlers {
(event_handler).on_event(&msg.worker_id, &msg.event);
}
}
}
#[cfg(test)]
mod tests {
use futures::Future;
use tower::Service;
use crate::{
context::JobContext,
error::JobError,
job::{Job, JobStreamResult},
job_fn::job_fn,
request::JobRequest,
response::JobResult,
};
use super::*;
#[tokio::test]
async fn test_simple_worker() {
struct SimpleWorker<S>(S);
struct Email;
impl Job for Email {
const NAME: &'static str = "worker::Email";
}
async fn send_email(_job: Email, _ctx: JobContext) -> Result<JobResult, JobError> {
Ok(JobResult::Success)
}
impl<S, F> Worker for SimpleWorker<S>
where
S: 'static
+ Send
+ Service<JobRequest<Email>, Response = JobResult, Error = JobError, Future = F>,
F: Future<Output = Result<JobResult, JobError>> + Send + 'static,
{
type Job = Email;
type Service = S;
type Future = F;
fn service(&mut self) -> &mut S {
&mut self.0
}
fn consume(&mut self) -> JobStreamResult<Self::Job> {
use futures::stream;
let stream = stream::iter(vec![
Ok(Some(JobRequest::new(Email))),
Ok(Some(JobRequest::new(Email))),
Ok(Some(JobRequest::new(Email))),
]);
Box::pin(stream)
}
}
let res = Monitor::new()
.register_with_count(1, move |_| SimpleWorker(job_fn(send_email)))
.run_without_signals()
.await;
assert!(res.is_ok())
}
}