kafru 1.0.4

kafru is a Python Celery-inspired queuing library for Rust, using cron for scheduling and SurrealDB for storing queues, metrics, and schedules.
Documentation
use crate::schedule::{
    Schedule, ScheduleData, ScheduleListConditions, ScheduleStatus
};
use chrono::Utc;
use tracing::{instrument, info, error};
use tokio::runtime::{Builder, RuntimeMetrics};
use crate::queue::{
    Queue,
    QueueData,
    QueueStatus
};
use crate::Command;
use crate::metric::{Metric, MetricData, MetricKind};
use tokio::time::{Duration, Instant};
use std::sync::Arc;
use crate::database::Db;
use crate::agent::{Agent, AgentFilter, AgentStatus,  AgentData, AgentKind};
use std::collections::HashMap;
use tokio::task::JoinHandle;
use surrealdb::RecordId;
use tokio::runtime::Runtime;


type SchedulerTasksHandles = HashMap<String,Option<JoinHandle<()>>>;

#[derive(Debug)]
pub struct SchedulerTasks{
    handles: SchedulerTasksHandles
}

impl SchedulerTasks {
    /// Creates a new instance of `SchedulerTasks`.
    ///
    /// Initializes the `SchedulerTasks` struct with an empty collection of task handles.
    ///
    /// # Returns
    /// A new `SchedulerTasks` instance.
    pub async fn new() -> Self {
        Self {
            handles: HashMap::new(),
        }
    }

    /// Adds a task to the worker's collection of handles.
    ///
    /// # Arguments
    /// * `scheduler_name` - The name of the queue the task belongs to.
    /// * `task_id` - A unique identifier for the task.
    /// * `handle` - An optional join handle associated with the task.
    ///
    /// The task is stored in the collection under a unique name generated by combining
    /// the queue name and task ID.
    pub async fn add(&mut self, scheduler_name: String, task_id: u64, handle: Option<JoinHandle<()>>) {
        let name: String = Agent::to_name(&scheduler_name, &task_id).await;
        self.handles.insert(name.clone(), handle);
    }

    /// Removes a task from the worker's collection by its unique name.
    ///
    /// # Arguments
    /// * `name` - The unique name of the task to be removed.
    pub async fn remove(&mut self, name: String) {
        self.handles.remove(&name);
    }

    /// Attempts to abort a running task by its unique name.
    ///
    /// # Arguments
    /// * `name` - The unique name of the task to be aborted.
    ///
    /// # Returns
    /// * `Ok(true)` if the task was successfully aborted.
    /// * `Err(String)` with an error message if the task could not be found or aborted.
    pub async fn abort(&self, name: String) -> Result<bool, String> {
        if let Some(handle) = self.handles.get(&name).clone() {
            if let Some(h) = handle {
                h.abort();
            }
            return Ok(true);
        }
        Err(format!("unable to abort scheduler task {}", name))
    }
}

/// Represents the status of a scheduler's operation during a check.
///
/// This enum is used to indicate whether a scheduler should continue its operation
/// or stop based on the result of a status check.
///
/// # Variants
/// * `Continue` - Indicates that the scheduler should proceed with its operation.
/// * `Stop` - Indicates that the scheduler should halt its operation.
#[derive(Debug,Clone, PartialEq)]
pub enum SchedulerCheckStatus {
    Continue,
    Stop
}

#[derive(Debug, Clone)]
pub struct Scheduler {
    db: Option<Arc<Db>>,
    server: String,
    agent: Agent,
    author: String
}

impl Scheduler{
    /// Creates a new instance of the structure.
    ///
    /// This asynchronous function initializes a new instance, setting up the database connection,
    /// server identifier, and author information. It also creates an `Agent` instance for managing
    /// agent-related operations.
    ///
    /// # Arguments
    /// * `db` - An optional shared reference to the database connection (`Arc<Db>`). If `None`, 
    ///   a new database connection will be established.
    /// * `server` - A `String` representing the server name or identifier.
    /// * `author` - A `String` representing the author's name or identifier for tracking.
    ///
    /// # Returns
    /// A new instance of the structure.
    pub async fn new(db: Option<Arc<Db>>, server: String, author: String) -> Self {
        let agent = Agent::new(db.clone()).await;
        Self {
            db,
            server,
            agent,
            author
        }
    }


    /// Checks for and executes commands related to task management.
    ///
    /// This function processes commands associated with worker tasks and queues, such as removing tasks, 
    /// terminating tasks, and shutting down queues. It interacts with the provided runtime, worker tasks, 
    /// and database components to perform the necessary operations.
    ///
    /// # Arguments
    /// * `runtime` - A reference to the Tokio runtime, used to access runtime metrics.
    /// * `scheduler_tasks` - A mutable reference to the `SchedulerTasks` structure, which manages active task handles.
    /// * `scheduler_name` - The name of the queue associated with the commands.
    ///
    /// # Returns
    /// * `Ok(WorkerCheckStatus::Continue)` if the worker should continue monitoring the queue.
    /// * `Ok(WorkerCheckStatus::Stop)` if the worker should stop monitoring the queue.
    /// * `Err(String)` if an error occurs while processing commands.
    pub async fn check_command(&self, runtime: &Runtime, scheduler_tasks: &mut SchedulerTasks, scheduler_name: String) -> Result<SchedulerCheckStatus,String>{
        let queue: Queue = Queue::new(self.db.clone()).await;
        let agent: Agent = Agent::new(self.db.clone()).await;
        
        let agent_names: Vec<String> = scheduler_tasks.handles.keys().map(|value| value.to_string()).collect();
        let agents: Vec<AgentData> = self.agent.list(AgentFilter { 
            names: Some(agent_names),
            ..Default::default()
        }).await?;
        if agents.len() > 0 {
            for agent_data in agents {
                if agent_data.command_is_executed == Some(false) {
                    if let Some(command) = agent_data.command {
                        let agent_name: String = if let Some(value) = agent_data.name { value } else { return Err("agent name is required for check command".to_string()) };
                        let agent_id: RecordId = if let Some(value) = agent_data.id { value } else { return Err("agent name is required for check command".to_string()) };        
                        match command {
                            Command::TaskRemove => { 
                                info!("remove task {:#?}",agent_name.clone()); 
                                scheduler_tasks.remove(agent_name.clone()).await;
                                if let Err(error) = agent.remove(agent_id,false).await {
                                    error!("{}",error);
                                }
                            },
                            Command::TaskTerminate => {
                                if let Some(agent_queue_id) = agent_data.queue_id {
                                    info!("terminate task {}",&scheduler_name);   
                                    scheduler_tasks.abort(agent_name).await?;                                                 
                                    if let Err(error) = queue.set_status(
                                        agent_queue_id,
                                        QueueStatus::Error,
                                        Some("task has been terminated".to_string())
                                    ).await {
                                        error!("{}",error);
                                    }
                                    if let Err(error) = agent.update(
                                        agent_id,
                                        AgentData {
                                            status: Some(AgentStatus::Terminated),
                                            command_is_executed: Some(true),
                                            message: None,
                                            ..Default::default()
                                        }
                                    ).await {
                                        error!("{}",error);
                                    }
                                }
                                else {
                                    return Err("unable to terminate task, queue_id is missing".to_string());
                                }
                            },
                            Command::SchedulerForceShutdown => {
                                info!("force shutdown queue {}",&scheduler_name);    
                                return Ok(SchedulerCheckStatus::Stop);
                            }
                            Command::SchedulerGracefulShutdown => {
                                loop {
                                    if runtime.metrics().num_alive_tasks() == 0 {
                                        info!("graceful shutdown queue {}",&scheduler_name);
                                        return Ok(SchedulerCheckStatus::Stop);
                                    }
                                    tokio::time::sleep_until(Instant::now() + Duration::from_secs(1)).await;
                                }
                            }
                            _ => {}
                        };
                    }
                }
            }
        }
        return Ok(SchedulerCheckStatus::Continue);
    }


    #[instrument(skip_all)]
    /// Monitors schedules and pushes tasks to the queue for execution.
    ///
    /// This function starts a scheduler monitoring process, which observes schedules 
    /// and processes tasks based on the provided parameters. It operates within a 
    /// dedicated thread pool configured for monitoring.
    ///
    /// # Parameters
    ///
    /// - `scheduler_name`:
    ///   - (Optional) The name of the scheduler to monitor. 
    ///   - If provided, the function targets the specified scheduler. 
    ///   - Defaults to `"scheduler-default"`, prefixed with the server name.
    ///
    /// - `poll_interval`:
    ///   - (Optional) The interval, in seconds, for polling the scheduler for updates or new schedules. 
    ///   - If not specified, defaults to `60` seconds. This interval defines how frequently the scheduler 
    ///     is checked for new tasks or changes.
    ///
    /// # Returns
    ///
    /// - `Result<(), String>`:
    ///   - Returns `Ok(())` on successful initiation of the monitoring process.
    ///   - Returns `Err(String)` if there is a failure in setting up the scheduler or the monitoring process.
    ///
    pub async fn watch(&self, scheduler_name: Option<String>, poll_interval: Option<u64>) -> Result<(),String> {
        let poll_interval = poll_interval.unwrap_or(60);
        let scheduler_name = format!("{}-{}",self.server,scheduler_name.unwrap_or(String::from("scheduler-default")));
        match Builder::new_multi_thread()
        .thread_name(scheduler_name.clone())
        .worker_threads(1)
        .enable_all()
        .build() {
            Ok(runtime)=> {
                let mut scheduler_tasks: SchedulerTasks = SchedulerTasks::new().await;
                let agent: Agent = Agent::new(self.db.clone()).await;
                let parent_agent_data = agent.register(AgentData {
                    name: Some(Agent::to_name(&scheduler_name, &0).await),
                    kind: Some(AgentKind::Scheduler),
                    server: Some(self.server.clone()),
                    status: Some(AgentStatus::Running),
                    author: Some(self.author.clone()),
                    ..Default::default()
                }).await?;
                scheduler_tasks.add(scheduler_name.clone(), 0,  None).await;
                loop {
                    match self.check_command(&runtime, &mut scheduler_tasks, scheduler_name.clone() ).await {
                        Ok(wc_status) => {
                            if wc_status == SchedulerCheckStatus::Stop  {
                                break;
                            }
                        }
                        Err(error) => {
                            error!(error);
                            return Err(error);
                        }
                    }
                    let metric: Metric = Metric::new(self.db.clone()).await;
                    let rt_metrics: RuntimeMetrics = runtime.metrics();
                    let metric_name: String = scheduler_name.clone();
                    let db = self.db.clone();
                    let _ = runtime.spawn(async move {
                        if let Err(error) = metric.create(MetricData {
                            name: Some(metric_name),
                            kind: Some(MetricKind::Scheduler),
                            num_alive_tasks: Some(rt_metrics.num_alive_tasks()),
                            num_workers: Some(rt_metrics.num_workers()),
                            ..Default::default()
                        }).await {
                            info!("scheduler metrics error: {}",error);
                        }
                        let schedule: Schedule = Schedule::new(db.clone()).await;
                        match schedule.list(ScheduleListConditions {
                            until_schedule: Some(Utc::now()),
                            start_schedule: Some(Utc::now()),
                            upcoming:Some(true),
                            status:Some(vec![ScheduleStatus::Enabled.to_string()]),
                            ..Default::default()
                        }).await {
                            Ok(records) => {
                                info!("got {} records",records.len());
                                let queue: Queue = Queue::new(db.clone()).await;
                                for record in records {
                                    let _record = record.clone();
                                    let record_name: String = record.name.unwrap();
                                    if let Err(error) = schedule.update(record.id.unwrap(),ScheduleData {
                                        status: if record.one_time == true { Some(ScheduleStatus::Disabled) } else { record.status },
                                        next_schedule: None,
                                        .._record
                                    }).await {
                                        error!("schedule update error [{}]: {}",&record_name,error);
                                    }
                                    if let Err(error) = queue.push(QueueData {
                                        name: Some(record_name.clone()),
                                        queue: record.queue,
                                        parameters: record.parameters,
                                        message: Some(format!("Scheduled at {}",Utc::now())),
                                        handler: record.handler,
                                        status: Some(QueueStatus::Waiting),
                                        ..Default::default()
                                    }).await {
                                        error!("queue scheduling error [{}]: {}",&record_name,error);
                                    }
                                }
                            }
                            Err(error) => {
                                error!("schedule list: {}",error);
                            }
                        }
                    }).await.unwrap();
                    info!("sleeping for {} second(s)", poll_interval);
                    tokio::time::sleep_until(Instant::now() + Duration::from_secs(poll_interval)).await;
                }
                if let Err(error) = self.agent.remove(parent_agent_data.id.unwrap().clone(),true).await {
                    error!("task agent remove error [{}]: {}", scheduler_name, error);
                }
                runtime.shutdown_background();
                Ok(())
            }
            Err(error) => {
                Err(error.to_string())
            }
        }
    }
}