use std::sync::Arc;
use crate::dal::DAL;
use crate::executor::workflow_executor::WorkflowExecutionError;
use crate::registry::traits::WorkflowRegistry;
use crate::UniversalUuid;
use super::DefaultRunner;
impl DefaultRunner {
pub async fn register_cron_workflow(
&self,
workflow_name: &str,
cron_expression: &str,
timezone: &str,
) -> Result<UniversalUuid, WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled. Use enable_cron_scheduling(true) in config."
.to_string(),
});
}
let dal = DAL::new(self.database.clone());
use crate::CronEvaluator;
CronEvaluator::validate(cron_expression, timezone).map_err(|e| {
WorkflowExecutionError::Configuration {
message: format!("Invalid cron expression or timezone: {}", e),
}
})?;
let evaluator = CronEvaluator::new(cron_expression, timezone).map_err(|e| {
WorkflowExecutionError::Configuration {
message: format!("Failed to create cron evaluator: {}", e),
}
})?;
let now = chrono::Utc::now();
let next_run =
evaluator
.next_execution(now)
.map_err(|e| WorkflowExecutionError::Configuration {
message: format!("Failed to calculate next execution: {}", e),
})?;
use crate::database::universal_types::UniversalTimestamp;
use crate::models::schedule::NewSchedule;
let mut new_schedule =
NewSchedule::cron(workflow_name, cron_expression, UniversalTimestamp(next_run));
new_schedule.timezone = Some(timezone.to_string());
let schedule = dal.schedule().create(new_schedule).await.map_err(|e| {
WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to create cron schedule: {}", e),
}
})?;
Ok(schedule.id)
}
pub async fn list_cron_schedules(
&self,
enabled_only: bool,
limit: i64,
offset: i64,
) -> Result<Vec<crate::models::schedule::Schedule>, WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.schedule()
.list(Some("cron"), enabled_only, limit, offset)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to list cron schedules: {}", e),
})
}
pub async fn set_cron_schedule_enabled(
&self,
schedule_id: UniversalUuid,
enabled: bool,
) -> Result<(), WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
if enabled {
dal.schedule().enable(schedule_id).await
} else {
dal.schedule().disable(schedule_id).await
}
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to update cron schedule: {}", e),
})
}
pub async fn delete_cron_schedule(
&self,
schedule_id: UniversalUuid,
) -> Result<(), WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.schedule().delete(schedule_id).await.map_err(|e| {
WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to delete cron schedule: {}", e),
}
})
}
pub async fn get_cron_schedule(
&self,
schedule_id: UniversalUuid,
) -> Result<crate::models::schedule::Schedule, WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.schedule().get_by_id(schedule_id).await.map_err(|e| {
WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get cron schedule: {}", e),
}
})
}
pub async fn update_cron_schedule(
&self,
schedule_id: UniversalUuid,
cron_expression: Option<&str>,
timezone: Option<&str>,
) -> Result<(), WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
let schedule = dal.schedule().get_by_id(schedule_id).await.map_err(|e| {
WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get cron schedule: {}", e),
}
})?;
let effective_expr = cron_expression
.or(schedule.cron_expression.as_deref())
.unwrap_or("* * * * *");
let effective_tz = timezone.or(schedule.timezone.as_deref()).unwrap_or("UTC");
if cron_expression.is_some() || timezone.is_some() {
use crate::CronEvaluator;
CronEvaluator::validate(effective_expr, effective_tz).map_err(|e| {
WorkflowExecutionError::Configuration {
message: format!("Invalid cron expression or timezone: {}", e),
}
})?;
}
use crate::CronEvaluator;
let evaluator = CronEvaluator::new(effective_expr, effective_tz).map_err(|e| {
WorkflowExecutionError::Configuration {
message: format!("Failed to create cron evaluator: {}", e),
}
})?;
let now = chrono::Utc::now();
let next_run =
evaluator
.next_execution(now)
.map_err(|e| WorkflowExecutionError::Configuration {
message: format!("Failed to calculate next execution: {}", e),
})?;
dal.schedule()
.update_cron_expression_and_timezone(schedule_id, cron_expression, timezone, next_run)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to update cron schedule: {}", e),
})?;
Ok(())
}
pub async fn get_cron_execution_history(
&self,
schedule_id: UniversalUuid,
limit: i64,
offset: i64,
) -> Result<Vec<crate::models::schedule::ScheduleExecution>, WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.schedule_execution()
.list_by_schedule(schedule_id, limit, offset)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get cron execution history: {}", e),
})
}
pub async fn get_cron_execution_stats(
&self,
since: chrono::DateTime<chrono::Utc>,
) -> Result<crate::dal::ScheduleExecutionStats, WorkflowExecutionError> {
if !self.config.enable_cron_scheduling() {
return Err(WorkflowExecutionError::Configuration {
message: "Cron scheduling not enabled.".to_string(),
});
}
let dal = DAL::new(self.database.clone());
dal.schedule_execution()
.get_execution_stats(since)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get cron execution stats: {}", e),
})
}
pub async fn get_workflow_registry(&self) -> Option<Arc<dyn WorkflowRegistry>> {
self.service_manager.read().await.workflow_registry.clone()
}
pub fn is_registry_reconciler_enabled(&self) -> bool {
self.config.enable_registry_reconciler()
}
}
pub struct DalCronRegistrar {
database: crate::database::Database,
}
impl DalCronRegistrar {
pub fn new(database: crate::database::Database) -> Self {
Self { database }
}
}
#[async_trait::async_trait]
impl crate::registry::reconciler::CronWorkflowRegistrar for DalCronRegistrar {
async fn register_cron_workflow(
&self,
workflow_name: &str,
cron_expression: &str,
timezone: &str,
) -> Result<String, String> {
use crate::database::universal_types::UniversalTimestamp;
use crate::models::schedule::NewSchedule;
use crate::CronEvaluator;
CronEvaluator::validate(cron_expression, timezone)
.map_err(|e| format!("Invalid cron expression or timezone: {}", e))?;
let evaluator = CronEvaluator::new(cron_expression, timezone)
.map_err(|e| format!("Failed to create cron evaluator: {}", e))?;
let now = chrono::Utc::now();
let next_run = evaluator
.next_execution(now)
.map_err(|e| format!("Failed to calculate next execution: {}", e))?;
let mut new_schedule =
NewSchedule::cron(workflow_name, cron_expression, UniversalTimestamp(next_run));
new_schedule.timezone = Some(timezone.to_string());
let dal = DAL::new(self.database.clone());
let schedule = dal
.schedule()
.create(new_schedule)
.await
.map_err(|e| format!("Failed to create cron schedule: {}", e))?;
Ok(schedule.id.to_string())
}
async fn unregister_cron_workflow(&self, schedule_id: &str) -> Result<(), String> {
let parsed: UniversalUuid = schedule_id
.parse::<uuid::Uuid>()
.map_err(|e| format!("invalid schedule id '{}': {}", schedule_id, e))?
.into();
let dal = DAL::new(self.database.clone());
dal.schedule()
.delete(parsed)
.await
.map_err(|e| format!("Failed to delete cron schedule: {}", e))
}
}