userd 0.2.0

A user daemon, managing services and regular running of jobs, in user space.
use futures::{StreamExt, future::try_join_all, stream::FuturesUnordered};
use std::{
    sync::Arc,
    time::{Duration, SystemTime},
};
use tokio::{sync::Notify, time::sleep};

use crate::prelude::*;

pub(crate) struct Service {
    preconditions: Vec<conditions::ConditionBox>,
    restart_triggers: Vec<triggers::TriggerBox>,
    activity: Activity,
}

impl Service {
    pub(crate) fn new(config: config::Service) -> Result<Self> {
        Ok(Self {
            preconditions: conditions::build_conditions(config.conditions)?,
            restart_triggers: triggers::build_triggers(config.restart_triggers)?,
            activity: Activity::new(config.name.clone(), config.command)?,
        })
    }
    pub(crate) async fn start(self) {
        loop {
            let start_time = SystemTime::now();
            let results = try_join_all(self.preconditions.iter().map(|c| c.check())).await;
            let mut futs = FuturesUnordered::new();
            let notify = Arc::new(Notify::new());
            let mut notified = None;
            match results {
                Ok(v) => {
                    if v.into_iter().all(|x| x) {
                        notified = Some(notify.notified());
                        let notify = notify.clone();
                        let cmds = self.activity.start_activity();
                        futs.push(tokio::spawn(async move {
                            cmds.await?;
                            log::info!("Activity finished, notify waiters.");
                            notify.notify_waiters();
                            Ok(())
                        }));
                    }
                }
                Err(e) => {
                    log::error!(
                        "An error occured while checking conditions for service: {}",
                        e
                    );
                }
            };
            for t in &self.restart_triggers {
                futs.push(tokio::spawn(t.wait_for()));
            }
            for c in &self.preconditions {
                futs.push(tokio::spawn(c.wait_for_change()));
            }
            // Wait for any triggers, preconditions, command
            let result = futs.next().await;
            if let Some(Err(e)) = result {
                log::error!("An error occured while waiting: {}", e);
            }
            // Cancel the current process
            log::info!("Cancelling process...");
            self.activity.cancel_activity();
            // Wait the process has been killed
            if let Some(notified) = notified {
                notified.await;
            }
            // Abort tasks
            futs.into_iter().for_each(|f| f.abort());
            // Check that the process didn't crash on startup. and then wait before trying again
            if SystemTime::now().duration_since(start_time).unwrap() < Duration::from_secs(10) {
                log::warn!("Command failed to start within 10s... Waiting 30s to try again...");
                sleep(Duration::from_secs(30)).await;
                log::warn!("Waiting done...")
            }
        }
    }
}