use futures::{StreamExt, future::try_join_all, stream::FuturesUnordered};
use std::{
sync::Arc,
time::{Duration, SystemTime},
};
use tokio::{sync::Notify, time::sleep};
use crate::prelude::*;
pub(crate) struct Service {
preconditions: Vec<conditions::ConditionBox>,
restart_triggers: Vec<triggers::TriggerBox>,
activity: Activity,
}
impl Service {
pub(crate) fn new(config: config::Service) -> Result<Self> {
Ok(Self {
preconditions: conditions::build_conditions(config.conditions)?,
restart_triggers: triggers::build_triggers(config.restart_triggers)?,
activity: Activity::new(config.name.clone(), config.command)?,
})
}
pub(crate) async fn start(self) {
loop {
let start_time = SystemTime::now();
let results = try_join_all(self.preconditions.iter().map(|c| c.check())).await;
let mut futs = FuturesUnordered::new();
let notify = Arc::new(Notify::new());
let mut notified = None;
match results {
Ok(v) => {
if v.into_iter().all(|x| x) {
notified = Some(notify.notified());
let notify = notify.clone();
let cmds = self.activity.start_activity();
futs.push(tokio::spawn(async move {
cmds.await?;
log::info!("Activity finished, notify waiters.");
notify.notify_waiters();
Ok(())
}));
}
}
Err(e) => {
log::error!(
"An error occured while checking conditions for service: {}",
e
);
}
};
for t in &self.restart_triggers {
futs.push(tokio::spawn(t.wait_for()));
}
for c in &self.preconditions {
futs.push(tokio::spawn(c.wait_for_change()));
}
let result = futs.next().await;
if let Some(Err(e)) = result {
log::error!("An error occured while waiting: {}", e);
}
log::info!("Cancelling process...");
self.activity.cancel_activity();
if let Some(notified) = notified {
notified.await;
}
futs.into_iter().for_each(|f| f.abort());
if SystemTime::now().duration_since(start_time).unwrap() < Duration::from_secs(10) {
log::warn!("Command failed to start within 10s... Waiting 30s to try again...");
sleep(Duration::from_secs(30)).await;
log::warn!("Waiting done...")
}
}
}
}