cloacina 0.4.0

A Rust library for resilient task execution and orchestration.
Documentation
/*
 *  Copyright 2025-2026 Colliery Software
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

//! Cron scheduling API for the DefaultRunner.
//!
//! This module provides methods for managing cron-scheduled workflow executions.

use std::sync::Arc;

use crate::dal::DAL;
use crate::executor::pipeline_executor::PipelineError;
use crate::registry::traits::WorkflowRegistry;
use crate::UniversalUuid;

use super::DefaultRunner;

impl DefaultRunner {
    /// Register a workflow to run on a cron schedule
    ///
    /// # Arguments
    /// * `workflow_name` - Name of the workflow to schedule
    /// * `cron_expression` - Cron expression (e.g., "0 9 * * *" for daily at 9 AM)
    /// * `timezone` - Timezone for interpreting the cron expression (e.g., "UTC", "America/New_York")
    ///
    /// # Returns
    /// * `Result<UniversalUuid, PipelineError>` - The ID of the created schedule or an error
    pub async fn register_cron_workflow(
        &self,
        workflow_name: &str,
        cron_expression: &str,
        timezone: &str,
    ) -> Result<UniversalUuid, PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled. Use enable_cron_scheduling(true) in config."
                    .to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());

        // Validate cron expression and timezone
        use crate::CronEvaluator;
        CronEvaluator::validate(cron_expression, timezone).map_err(|e| {
            PipelineError::Configuration {
                message: format!("Invalid cron expression or timezone: {}", e),
            }
        })?;

        // Calculate initial next run time
        let evaluator = CronEvaluator::new(cron_expression, timezone).map_err(|e| {
            PipelineError::Configuration {
                message: format!("Failed to create cron evaluator: {}", e),
            }
        })?;

        let now = chrono::Utc::now();
        let next_run = evaluator
            .next_execution(now)
            .map_err(|e| PipelineError::Configuration {
                message: format!("Failed to calculate next execution: {}", e),
            })?;

        // Create the schedule using unified NewSchedule
        use crate::database::universal_types::UniversalTimestamp;
        use crate::models::schedule::NewSchedule;

        let mut new_schedule =
            NewSchedule::cron(workflow_name, cron_expression, UniversalTimestamp(next_run));
        new_schedule.timezone = Some(timezone.to_string());

        let schedule = dal.schedule().create(new_schedule).await.map_err(|e| {
            PipelineError::ExecutionFailed {
                message: format!("Failed to create cron schedule: {}", e),
            }
        })?;

        Ok(schedule.id)
    }

    /// List all registered cron schedules
    ///
    /// # Arguments
    /// * `enabled_only` - If true, only return enabled schedules
    /// * `limit` - Maximum number of schedules to return
    /// * `offset` - Number of schedules to skip for pagination
    ///
    /// # Returns
    /// * `Result<Vec<Schedule>, PipelineError>` - List of cron schedules
    pub async fn list_cron_schedules(
        &self,
        enabled_only: bool,
        limit: i64,
        offset: i64,
    ) -> Result<Vec<crate::models::schedule::Schedule>, PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled.".to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());
        dal.schedule()
            .list(Some("cron"), enabled_only, limit, offset)
            .await
            .map_err(|e| PipelineError::ExecutionFailed {
                message: format!("Failed to list cron schedules: {}", e),
            })
    }

    /// Enable or disable a cron schedule
    ///
    /// # Arguments
    /// * `schedule_id` - UUID of the schedule to modify
    /// * `enabled` - Whether to enable (true) or disable (false) the schedule
    ///
    /// # Returns
    /// * `Result<(), PipelineError>` - Success or error
    pub async fn set_cron_schedule_enabled(
        &self,
        schedule_id: UniversalUuid,
        enabled: bool,
    ) -> Result<(), PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled.".to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());

        if enabled {
            dal.schedule().enable(schedule_id).await
        } else {
            dal.schedule().disable(schedule_id).await
        }
        .map_err(|e| PipelineError::ExecutionFailed {
            message: format!("Failed to update cron schedule: {}", e),
        })
    }

    /// Delete a cron schedule
    ///
    /// # Arguments
    /// * `schedule_id` - UUID of the schedule to delete
    ///
    /// # Returns
    /// * `Result<(), PipelineError>` - Success or error
    pub async fn delete_cron_schedule(
        &self,
        schedule_id: UniversalUuid,
    ) -> Result<(), PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled.".to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());
        dal.schedule()
            .delete(schedule_id)
            .await
            .map_err(|e| PipelineError::ExecutionFailed {
                message: format!("Failed to delete cron schedule: {}", e),
            })
    }

    /// Get a specific cron schedule by ID
    ///
    /// # Arguments
    /// * `schedule_id` - UUID of the schedule to retrieve
    ///
    /// # Returns
    /// * `Result<Schedule, PipelineError>` - The cron schedule or an error
    pub async fn get_cron_schedule(
        &self,
        schedule_id: UniversalUuid,
    ) -> Result<crate::models::schedule::Schedule, PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled.".to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());
        dal.schedule()
            .get_by_id(schedule_id)
            .await
            .map_err(|e| PipelineError::ExecutionFailed {
                message: format!("Failed to get cron schedule: {}", e),
            })
    }

    /// Update a cron schedule's expression and/or timezone
    ///
    /// # Arguments
    /// * `schedule_id` - UUID of the schedule to update
    /// * `cron_expression` - New cron expression (optional)
    /// * `timezone` - New timezone (optional)
    ///
    /// # Returns
    /// * `Result<(), PipelineError>` - Success or error
    pub async fn update_cron_schedule(
        &self,
        schedule_id: UniversalUuid,
        cron_expression: Option<&str>,
        timezone: Option<&str>,
    ) -> Result<(), PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled.".to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());

        // Get current schedule to fill in missing values
        let schedule = dal.schedule().get_by_id(schedule_id).await.map_err(|e| {
            PipelineError::ExecutionFailed {
                message: format!("Failed to get cron schedule: {}", e),
            }
        })?;

        let effective_expr = cron_expression
            .or(schedule.cron_expression.as_deref())
            .unwrap_or("* * * * *");
        let effective_tz = timezone.or(schedule.timezone.as_deref()).unwrap_or("UTC");

        // Validate inputs if provided
        if cron_expression.is_some() || timezone.is_some() {
            use crate::CronEvaluator;
            CronEvaluator::validate(effective_expr, effective_tz).map_err(|e| {
                PipelineError::Configuration {
                    message: format!("Invalid cron expression or timezone: {}", e),
                }
            })?;
        }

        // Calculate new next run time
        use crate::CronEvaluator;
        let evaluator = CronEvaluator::new(effective_expr, effective_tz).map_err(|e| {
            PipelineError::Configuration {
                message: format!("Failed to create cron evaluator: {}", e),
            }
        })?;

        let now = chrono::Utc::now();
        let next_run = evaluator
            .next_execution(now)
            .map_err(|e| PipelineError::Configuration {
                message: format!("Failed to calculate next execution: {}", e),
            })?;

        dal.schedule()
            .update_cron_expression_and_timezone(schedule_id, cron_expression, timezone, next_run)
            .await
            .map_err(|e| PipelineError::ExecutionFailed {
                message: format!("Failed to update cron schedule: {}", e),
            })?;

        Ok(())
    }

    /// Get execution history for a cron schedule
    ///
    /// # Arguments
    /// * `schedule_id` - UUID of the schedule
    /// * `limit` - Maximum number of executions to return
    /// * `offset` - Number of executions to skip for pagination
    ///
    /// # Returns
    /// * `Result<Vec<ScheduleExecution>, PipelineError>` - List of schedule executions
    pub async fn get_cron_execution_history(
        &self,
        schedule_id: UniversalUuid,
        limit: i64,
        offset: i64,
    ) -> Result<Vec<crate::models::schedule::ScheduleExecution>, PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled.".to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());
        dal.schedule_execution()
            .list_by_schedule(schedule_id, limit, offset)
            .await
            .map_err(|e| PipelineError::ExecutionFailed {
                message: format!("Failed to get cron execution history: {}", e),
            })
    }

    /// Get cron execution statistics
    ///
    /// # Arguments
    /// * `since` - Only include executions since this timestamp
    ///
    /// # Returns
    /// * `Result<CronExecutionStats, PipelineError>` - Execution statistics
    pub async fn get_cron_execution_stats(
        &self,
        since: chrono::DateTime<chrono::Utc>,
    ) -> Result<crate::dal::ScheduleExecutionStats, PipelineError> {
        if !self.config.enable_cron_scheduling() {
            return Err(PipelineError::Configuration {
                message: "Cron scheduling not enabled.".to_string(),
            });
        }

        let dal = DAL::new(self.database.clone());
        dal.schedule_execution()
            .get_execution_stats(since)
            .await
            .map_err(|e| PipelineError::ExecutionFailed {
                message: format!("Failed to get cron execution stats: {}", e),
            })
    }

    /// Get access to the workflow registry (if enabled)
    ///
    /// # Returns
    /// * `Some(Arc<WorkflowRegistry>)` - If the registry is enabled and initialized
    /// * `None` - If the registry is not enabled or not yet initialized
    pub async fn get_workflow_registry(&self) -> Option<Arc<dyn WorkflowRegistry>> {
        let registry = self.workflow_registry.read().await;
        registry.clone()
    }

    /// Get the current status of the registry reconciler (if enabled)
    ///
    /// # Returns
    /// * `Some(ReconcilerStatus)` - If the reconciler is enabled and initialized
    /// * `None` - If the reconciler is not enabled or not yet initialized
    pub async fn get_registry_reconciler_status(
        &self,
    ) -> Option<crate::registry::ReconcilerStatus> {
        let reconciler = self.registry_reconciler.read().await;
        if let Some(reconciler) = reconciler.as_ref() {
            Some(reconciler.get_status().await)
        } else {
            None
        }
    }

    /// Check if the registry reconciler is enabled in the configuration
    pub fn is_registry_reconciler_enabled(&self) -> bool {
        self.config.enable_registry_reconciler()
    }
}