use std::sync::Arc;
use crate::dal::DAL;
use crate::executor::pipeline_executor::PipelineError;
use crate::registry::traits::WorkflowRegistry;
use crate::UniversalUuid;
use super::DefaultRunner;
impl DefaultRunner {
pub async fn register_cron_workflow(
&self,
workflow_name: &str,
cron_expression: &str,
timezone: &str,
) -> Result<UniversalUuid, PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled. Use enable_cron_scheduling(true) in config."
.to_string(),
});
}
let dal = DAL::new(self.database.clone());
use crate::CronEvaluator;
CronEvaluator::validate(cron_expression, timezone).map_err(|e| {
PipelineError::Configuration {
message: format!("Invalid cron expression or timezone: {}", e),
}
})?;
let evaluator = CronEvaluator::new(cron_expression, timezone).map_err(|e| {
PipelineError::Configuration {
message: format!("Failed to create cron evaluator: {}", e),
}
})?;
let now = chrono::Utc::now();
let next_run = evaluator
.next_execution(now)
.map_err(|e| PipelineError::Configuration {
message: format!("Failed to calculate next execution: {}", e),
})?;
use crate::database::universal_types::{UniversalBool, UniversalTimestamp};
use crate::models::cron_schedule::NewCronSchedule;
let new_schedule = NewCronSchedule {
workflow_name: workflow_name.to_string(),
cron_expression: cron_expression.to_string(),
timezone: Some(timezone.to_string()),
enabled: Some(UniversalBool::new(true)),
catchup_policy: Some("skip".to_string()),
start_date: None,
end_date: None,
next_run_at: UniversalTimestamp(next_run),
};
let schedule = dal
.cron_schedule()
.create(new_schedule)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to create cron schedule: {}", e),
})?;
Ok(schedule.id)
}
pub async fn list_cron_schedules(
&self,
enabled_only: bool,
limit: i64,
offset: i64,
) -> Result<Vec<crate::models::cron_schedule::CronSchedule>, PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.cron_schedule()
.list(enabled_only, limit, offset)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to list cron schedules: {}", e),
})
}
pub async fn set_cron_schedule_enabled(
&self,
schedule_id: UniversalUuid,
enabled: bool,
) -> Result<(), PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
if enabled {
dal.cron_schedule().enable(schedule_id).await
} else {
dal.cron_schedule().disable(schedule_id).await
}
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to update cron schedule: {}", e),
})
}
pub async fn delete_cron_schedule(
&self,
schedule_id: UniversalUuid,
) -> Result<(), PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.cron_schedule()
.delete(schedule_id)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to delete cron schedule: {}", e),
})
}
pub async fn get_cron_schedule(
&self,
schedule_id: UniversalUuid,
) -> Result<crate::models::cron_schedule::CronSchedule, PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.cron_schedule()
.get_by_id(schedule_id)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get cron schedule: {}", e),
})
}
pub async fn update_cron_schedule(
&self,
schedule_id: UniversalUuid,
cron_expression: Option<&str>,
timezone: Option<&str>,
) -> Result<(), PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
if let (Some(expr), Some(tz)) = (cron_expression, timezone) {
use crate::CronEvaluator;
CronEvaluator::validate(expr, tz).map_err(|e| PipelineError::Configuration {
message: format!("Invalid cron expression or timezone: {}", e),
})?;
}
let mut schedule = dal
.cron_schedule()
.get_by_id(schedule_id)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get cron schedule: {}", e),
})?;
if let Some(expr) = cron_expression {
schedule.cron_expression = expr.to_string();
}
if let Some(tz) = timezone {
schedule.timezone = tz.to_string();
}
use crate::CronEvaluator;
let evaluator =
CronEvaluator::new(&schedule.cron_expression, &schedule.timezone).map_err(|e| {
PipelineError::Configuration {
message: format!("Failed to create cron evaluator: {}", e),
}
})?;
let now = chrono::Utc::now();
let next_run = evaluator
.next_execution(now)
.map_err(|e| PipelineError::Configuration {
message: format!("Failed to calculate next execution: {}", e),
})?;
dal.cron_schedule()
.update_expression_and_timezone(schedule_id, cron_expression, timezone, next_run)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to update cron schedule: {}", e),
})?;
Ok(())
}
pub async fn get_cron_execution_history(
&self,
schedule_id: UniversalUuid,
limit: i64,
offset: i64,
) -> Result<Vec<crate::models::cron_execution::CronExecution>, PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.cron_execution()
.get_by_schedule_id(schedule_id, limit, offset)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get cron execution history: {}", e),
})
}
pub async fn get_cron_execution_stats(
&self,
since: chrono::DateTime<chrono::Utc>,
) -> Result<crate::dal::CronExecutionStats, PipelineError> {
if !self.config.enable_cron_scheduling() {
return Err(PipelineError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.cron_execution()
.get_execution_stats(since)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get cron execution stats: {}", e),
})
}
pub async fn get_workflow_registry(&self) -> Option<Arc<dyn WorkflowRegistry>> {
let registry = self.workflow_registry.read().await;
registry.clone()
}
pub async fn get_registry_reconciler_status(
&self,
) -> Option<crate::registry::ReconcilerStatus> {
let reconciler = self.registry_reconciler.read().await;
if let Some(reconciler) = reconciler.as_ref() {
Some(reconciler.get_status().await)
} else {
None
}
}
pub fn is_registry_reconciler_enabled(&self) -> bool {
self.config.enable_registry_reconciler()
}
}