use crate::context::Context;
use crate::dal::DAL;
use crate::database::universal_types::UniversalUuid;
use crate::error::ValidationError;
use crate::executor::{PipelineError, PipelineExecutor};
use crate::models::trigger_execution::NewTriggerExecution;
use crate::models::trigger_schedule::{NewTriggerSchedule, TriggerSchedule};
use crate::trigger::{get_trigger, Trigger, TriggerError};
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct TriggerSchedulerConfig {
pub base_poll_interval: Duration,
pub poll_timeout: Duration,
}
impl Default for TriggerSchedulerConfig {
fn default() -> Self {
Self {
base_poll_interval: Duration::from_secs(1),
poll_timeout: Duration::from_secs(30),
}
}
}
#[derive(Clone)]
pub struct TriggerScheduler {
dal: Arc<DAL>,
executor: Arc<dyn PipelineExecutor>,
config: TriggerSchedulerConfig,
shutdown: watch::Receiver<bool>,
last_poll_times: HashMap<String, Instant>,
}
impl TriggerScheduler {
pub fn new(
dal: Arc<DAL>,
executor: Arc<dyn PipelineExecutor>,
config: TriggerSchedulerConfig,
shutdown: watch::Receiver<bool>,
) -> Self {
Self {
dal,
executor,
config,
shutdown,
last_poll_times: HashMap::new(),
}
}
pub fn with_defaults(
dal: Arc<DAL>,
executor: Arc<dyn PipelineExecutor>,
shutdown: watch::Receiver<bool>,
) -> Self {
Self::new(dal, executor, TriggerSchedulerConfig::default(), shutdown)
}
pub async fn run_polling_loop(&mut self) -> Result<(), PipelineError> {
info!(
"Starting trigger scheduler polling loop (base interval: {:?})",
self.config.base_poll_interval
);
let mut interval = tokio::time::interval(self.config.base_poll_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = self.check_and_process_triggers().await {
error!("Error processing triggers: {}", e);
}
}
_ = self.shutdown.changed() => {
if *self.shutdown.borrow() {
info!("Trigger scheduler received shutdown signal");
break;
}
}
}
}
info!("Trigger scheduler polling loop stopped");
Ok(())
}
async fn check_and_process_triggers(&mut self) -> Result<(), PipelineError> {
debug!("Checking trigger schedules");
let schedules = self
.dal
.trigger_schedule()
.get_enabled()
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Failed to get trigger schedules: {}", e),
})?;
if schedules.is_empty() {
debug!("No enabled trigger schedules found");
return Ok(());
}
let now = Instant::now();
for schedule in schedules {
let poll_interval = Duration::from_millis(schedule.poll_interval_ms as u64);
let last_poll = self.last_poll_times.get(&schedule.trigger_name);
let should_poll = match last_poll {
Some(last) => now.duration_since(*last) >= poll_interval,
None => true, };
if !should_poll {
continue;
}
if let Err(e) = self.process_trigger(&schedule).await {
error!(
"Failed to process trigger '{}': {}",
schedule.trigger_name, e
);
}
self.last_poll_times
.insert(schedule.trigger_name.clone(), now);
}
Ok(())
}
async fn process_trigger(&self, schedule: &TriggerSchedule) -> Result<(), TriggerError> {
debug!(
"Processing trigger '{}' (workflow: {})",
schedule.trigger_name, schedule.workflow_name
);
let trigger =
get_trigger(&schedule.trigger_name).ok_or_else(|| TriggerError::TriggerNotFound {
name: schedule.trigger_name.clone(),
})?;
let poll_result = tokio::time::timeout(self.config.poll_timeout, trigger.poll())
.await
.map_err(|_| TriggerError::PollError {
message: format!(
"Trigger '{}' poll timed out after {:?}",
schedule.trigger_name, self.config.poll_timeout
),
})?
.map_err(|e| {
error!("Trigger '{}' poll error: {}", schedule.trigger_name, e);
e
})?;
let now = Utc::now();
if let Err(e) = self
.dal
.trigger_schedule()
.update_last_poll(schedule.id, now)
.await
{
warn!(
"Failed to update last_poll_at for trigger '{}': {}",
schedule.trigger_name, e
);
}
if !poll_result.should_fire() {
debug!("Trigger '{}' returned Skip", schedule.trigger_name);
return Ok(());
}
let context_hash = poll_result.context_hash();
if !schedule.allows_concurrent() {
let has_active = self
.dal
.trigger_execution()
.has_active_execution(&schedule.trigger_name, &context_hash)
.await
.map_err(|e| TriggerError::ConnectionPool(e.to_string()))?;
if has_active {
debug!(
"Trigger '{}' has active execution with same context hash, skipping",
schedule.trigger_name
);
return Ok(());
}
}
info!(
"Trigger '{}' fired, scheduling workflow '{}'",
schedule.trigger_name, schedule.workflow_name
);
let execution = self
.create_execution_audit(&schedule.trigger_name, &context_hash)
.await?;
let context = poll_result.into_context().unwrap_or_else(Context::new);
match self.execute_workflow(schedule, context).await {
Ok(pipeline_execution_id) => {
if let Err(e) = self
.dal
.trigger_execution()
.link_pipeline_execution(execution.id, pipeline_execution_id)
.await
{
warn!(
"Failed to link trigger execution to pipeline execution: {}",
e
);
}
info!(
"Successfully scheduled workflow '{}' for trigger '{}' (execution: {})",
schedule.workflow_name, schedule.trigger_name, pipeline_execution_id
);
}
Err(e) => {
error!(
"Failed to execute workflow '{}' for trigger '{}': {}",
schedule.workflow_name, schedule.trigger_name, e
);
if let Err(e) = self
.dal
.trigger_execution()
.complete(execution.id, Utc::now())
.await
{
warn!(
"Failed to mark trigger execution as completed after failure: {}",
e
);
}
return Err(TriggerError::WorkflowSchedulingFailed {
workflow: schedule.workflow_name.clone(),
message: e.to_string(),
});
}
}
Ok(())
}
async fn create_execution_audit(
&self,
trigger_name: &str,
context_hash: &str,
) -> Result<crate::models::trigger_execution::TriggerExecution, TriggerError> {
let new_execution = NewTriggerExecution::new(trigger_name, context_hash);
let execution = self
.dal
.trigger_execution()
.create(new_execution)
.await
.map_err(|e| TriggerError::ConnectionPool(e.to_string()))?;
debug!(
"Created trigger execution audit record {} for trigger '{}'",
execution.id, trigger_name
);
Ok(execution)
}
async fn execute_workflow(
&self,
schedule: &TriggerSchedule,
mut context: Context<serde_json::Value>,
) -> Result<UniversalUuid, PipelineError> {
context
.insert(
"trigger_name",
serde_json::json!(schedule.trigger_name.clone()),
)
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert("triggered_at", serde_json::json!(Utc::now().to_rfc3339()))
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
let result = self
.executor
.execute(&schedule.workflow_name, context)
.await?;
debug!(
"Successfully handed off workflow '{}' to executor (execution_id: {})",
schedule.workflow_name, result.execution_id
);
Ok(UniversalUuid(result.execution_id))
}
pub async fn register_trigger(
&self,
trigger: &dyn Trigger,
workflow_name: &str,
) -> Result<TriggerSchedule, ValidationError> {
let new_schedule =
NewTriggerSchedule::new(trigger.name(), workflow_name, trigger.poll_interval())
.with_allow_concurrent(trigger.allow_concurrent());
self.dal.trigger_schedule().upsert(new_schedule).await
}
pub async fn disable_trigger(&self, trigger_name: &str) -> Result<(), ValidationError> {
if let Some(schedule) = self
.dal
.trigger_schedule()
.get_by_name(trigger_name)
.await?
{
self.dal.trigger_schedule().disable(schedule.id).await?;
info!("Disabled trigger '{}'", trigger_name);
}
Ok(())
}
pub async fn enable_trigger(&self, trigger_name: &str) -> Result<(), ValidationError> {
if let Some(schedule) = self
.dal
.trigger_schedule()
.get_by_name(trigger_name)
.await?
{
self.dal.trigger_schedule().enable(schedule.id).await?;
info!("Enabled trigger '{}'", trigger_name);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_trigger_scheduler_config_default() {
let config = TriggerSchedulerConfig::default();
assert_eq!(config.base_poll_interval, Duration::from_secs(1));
assert_eq!(config.poll_timeout, Duration::from_secs(30));
}
}