use std::fmt::{Debug, Display};
use libc::pid_t;
use crate::process::process_group::ProcessGroup;
use crate::process::Process;
use crate::task::Task;
use crate::worker::backoff_policy::BackoffPolicy;
use crate::worker::restartable::{RestartPolicy, Restartable};
use crate::worker::watcher::Watcher;
use crate::worker::Worker;
pub struct Supervisor {
root_pid: pid_t,
tasks: Vec<Task>,
backoff_policy: BackoffPolicy,
restart_policy: RestartPolicy,
}
impl Debug for Supervisor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Supervisor")
.field("root_pid", &self.root_pid)
.field("tasks", &self.tasks)
.finish()
}
}
impl Supervisor {
pub(crate) fn new(root_pid: pid_t) -> Self {
Self {
root_pid,
tasks: vec![],
backoff_policy: BackoffPolicy::default(),
restart_policy: RestartPolicy::default(),
}
}
pub fn with_backoff_policy(mut self, backoff_policy: BackoffPolicy) -> Self {
self.backoff_policy = backoff_policy;
self
}
pub fn with_restart_policy(mut self, restart_policy: RestartPolicy) -> Self {
self.restart_policy = restart_policy;
self
}
pub(crate) fn run(&mut self) {
let tasks = std::mem::take(&mut self.tasks);
let (workers, supervisors): (Vec<_>, Vec<_>) = tasks
.into_iter()
.partition(|w| matches!(w, Task::Worker(_)));
let worker_watcher = Watcher::new(
workers
.into_iter()
.filter_map(|w| match w {
Task::Worker(w) => Some(w),
_ => None,
})
.collect(),
);
let mut pg = ProcessGroup::new();
pg.add_process(Box::new(worker_watcher));
for supervisor in supervisors.into_iter() {
if let Task::Supervisor(s) = supervisor {
pg.add_process(Box::new(s))
}
}
pg.run();
}
pub fn add_worker(mut self, worker: impl Worker + 'static) -> Self {
let _id = self.tasks.len();
self.tasks.push(Task::Worker(Box::new(worker)));
self
}
pub fn add_supervisor<F>(mut self, f: F) -> Self
where
F: FnOnce(Self) -> Self,
{
self.tasks
.push(Task::Supervisor(f(Supervisor::new(self.root_pid))));
self
}
}
impl Process for Supervisor {
fn start(&mut self) {
self.run();
}
}
impl Display for Supervisor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Debug::fmt(self, f)
}
}
impl Restartable for Supervisor {
fn backoff_policy(&self) -> BackoffPolicy {
self.backoff_policy
}
fn restart_policy(&self) -> RestartPolicy {
self.restart_policy
}
}