supertrees 0.1.2

Supervision trees for Tokio-based services inspired by Erlang/OTP
Documentation
use std::ops::Deref;

use log::debug;
use tokio::runtime::Runtime;
use tokio::task::JoinSet;

use crate::process::Process;
use crate::worker::backoff::{Backoff, BackoffResult};
use crate::worker::Restartable;
use crate::{RestartPolicy, Worker};

#[derive(Debug)]
pub struct Watcher {
    workers: Vec<Box<dyn Worker>>,
}

impl Watcher {
    pub fn new(workers: Vec<Box<dyn Worker>>) -> Self {
        Self { workers }
    }

    fn start_worker(joinset: &mut JoinSet<Box<dyn Worker>>, worker: Box<dyn Worker>) {
        debug!("starting worker={worker:?}");
        joinset.spawn(async move {
            let mut backoff = Backoff::new(worker);
            loop {
                let f = backoff.init();
                f.await;
                if let BackoffResult::RetryAfterDelay(delay) = backoff.maybe_delay() {
                    debug!(
                        "worker stopped, retrying after delay={delay:?} for worker={:?}",
                        backoff.deref()
                    );
                    tokio::time::sleep(delay).await;
                } else {
                    break;
                }
            }
            backoff.into_inner()
        });
    }

    fn start_workers(&mut self) -> JoinSet<Box<dyn Worker>> {
        let mut joinset = JoinSet::new();
        let workers = std::mem::take(&mut self.workers);
        for worker in workers.into_iter() {
            Self::start_worker(&mut joinset, worker);
        }
        joinset
    }

    fn start(&mut self) {
        debug!("starting tokio runtime");
        let rt = Runtime::new().expect("failed to start runtime");
        rt.block_on(async move {
            let mut joinset = self.start_workers();
            while let Some(Ok(worker)) = joinset.join_next().await {
                debug!("joined worker={:?}", worker);
            }
        });
    }
}

impl Process for Watcher {
    fn start(&mut self) {
        self.start();
    }
}

impl Restartable for Watcher {
    fn restart_policy(&self) -> crate::RestartPolicy {
        RestartPolicy::Never
    }
}