use std::sync::Arc;
use crate::Job;
use shiny_common::context::{ContextFactory};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument};
use uuid::Uuid;
use crate::JobDefinition;
use crate::job_trigger::JobTrigger;
pub struct JobsExecutor {
jobs: Vec<JobDefinition>,
context_factory: Arc<dyn ContextFactory>,
}
impl JobsExecutor {
pub fn new(
context_factory: Arc<dyn ContextFactory>
) -> JobsExecutor {
JobsExecutor {
jobs: Vec::new(),
context_factory,
}
}
pub fn schedule(&mut self, name: String, job: Arc<dyn Job>, trigger: Arc<dyn JobTrigger>) {
self.jobs.push(JobDefinition {
name,
job,
trigger,
});
}
pub async fn run(&self, cancellation_token: CancellationToken) {
let loops = self.jobs.iter().map(|definition| {
let trigger = definition.trigger.clone();
let job = definition.job.clone();
let cancellation_token = cancellation_token.child_token();
let context_factory = self.context_factory.clone();
async move {
loop {
let task_id = Uuid::new_v4();
let span = tracing::span!(tracing::Level::INFO, "Task", id=?task_id);
let execution_flow = async {
tracing::debug!("awaiting trigger fire");
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::debug!("cancellation signal received");
return ExecutionFlow::Stop;
}
_ = trigger.next() => {
tracing::debug!("starting execution of job");
}
}
let child_token = cancellation_token.child_token();
let local_job = job.clone();
let context_factory = context_factory.clone();
tokio::spawn(async move {
let mut context = context_factory.create();
context.set_cancellation_token(child_token);
match local_job.execute(&mut context).await {
Ok(_) => tracing::info!("Job successfully executed"),
Err(err) => tracing::error!("Job failed {}", err)
}
}.in_current_span());
ExecutionFlow::Continue
}.instrument(span).await;
if let ExecutionFlow::Stop = execution_flow {
return;
}
}
}.instrument(tracing::span!(tracing::Level::INFO, "Job", name=definition.name))
});
let mut loops = FuturesUnordered::from_iter(loops);
if loops.next().await.is_some() && cancellation_token.is_cancelled() {
loops.collect().await
}
}
}
enum ExecutionFlow {
Continue,
Stop
}