shinyframework_jobs 0.1.2

Shiny Jobs
Documentation
use std::sync::Arc;
use crate::Job;
use shiny_common::context::{ContextFactory};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument};
use uuid::Uuid;
use crate::JobDefinition;
use crate::job_trigger::JobTrigger;

pub struct JobsExecutor {
    jobs: Vec<JobDefinition>,
    context_factory: Arc<dyn ContextFactory>,
}

impl JobsExecutor {
    pub fn new(
        context_factory: Arc<dyn ContextFactory>
    ) -> JobsExecutor {
        JobsExecutor {
            jobs: Vec::new(),
            context_factory,
        }
    }

    pub fn schedule(&mut self, name: String, job: Arc<dyn Job>, trigger: Arc<dyn JobTrigger>) {
        self.jobs.push(JobDefinition {
            name,
            job,
            trigger,
        });
    }

    pub async fn run(&self, cancellation_token: CancellationToken) {
        let loops = self.jobs.iter().map(|definition| {
            let trigger = definition.trigger.clone();
            let job = definition.job.clone();
            let cancellation_token = cancellation_token.child_token();
            let context_factory = self.context_factory.clone();

            async move {
                loop {
                    let task_id = Uuid::new_v4();
                    let span = tracing::span!(tracing::Level::INFO, "Task", id=?task_id);

                    let execution_flow = async {
                        tracing::debug!("awaiting trigger fire");
                        tokio::select! {
                            _ = cancellation_token.cancelled() => {
                                tracing::debug!("cancellation signal received");
                                return ExecutionFlow::Stop;
                            }
                            _ = trigger.next() => {
                                tracing::debug!("starting execution of job");
                            }
                        }

                        let child_token = cancellation_token.child_token();
                        let local_job = job.clone();
                        let context_factory = context_factory.clone();

                        tokio::spawn(async move {
                            let mut context = context_factory.create();
                            context.set_cancellation_token(child_token);

                            match local_job.execute(&mut context).await {
                                Ok(_) => tracing::info!("Job successfully executed"),
                                Err(err) => tracing::error!("Job failed {}", err)
                            }
                        }.in_current_span());

                        ExecutionFlow::Continue
                    }.instrument(span).await;

                    if let ExecutionFlow::Stop = execution_flow {
                        return;
                    }
                }
            }.instrument(tracing::span!(tracing::Level::INFO, "Job", name=definition.name))
        });

        let mut loops = FuturesUnordered::from_iter(loops);

        if loops.next().await.is_some() && cancellation_token.is_cancelled() {
            loops.collect().await
        }
    }
}


enum ExecutionFlow {
    Continue,
    Stop
}