use crate::codegen::action_nodes::{ActionNodeConfig, TriggerType};
use crate::server::handlers::{
WebhookNotification, get_project_binary_path, is_project_built, notify_webhook,
};
use crate::server::state::AppState;
use chrono::Timelike;
use chrono::{DateTime, Utc};
use cron::Schedule;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::time::Duration;
#[derive(Debug, Clone)]
struct ScheduledJob {
project_id: String,
project_name: String,
trigger_id: String,
cron: String,
timezone: String,
default_prompt: Option<String>,
next_run: DateTime<Utc>,
binary_path: String,
}
pub struct SchedulerState {
jobs: HashMap<String, Vec<ScheduledJob>>,
running: bool,
last_executed: HashMap<String, DateTime<Utc>>,
}
impl Default for SchedulerState {
fn default() -> Self {
Self::new()
}
}
impl SchedulerState {
pub fn new() -> Self {
Self {
jobs: HashMap::new(),
running: false,
last_executed: HashMap::new(),
}
}
}
lazy_static::lazy_static! {
pub static ref SCHEDULER: Arc<RwLock<SchedulerState>> = Arc::new(RwLock::new(SchedulerState::new()));
}
fn get_next_run(cron_expr: &str, _timezone: &str) -> Option<DateTime<Utc>> {
let parts: Vec<&str> = cron_expr.split_whitespace().collect();
let cron_with_seconds = if parts.len() == 5 {
format!("0 {}", cron_expr)
} else {
cron_expr.to_string()
};
let schedule = Schedule::from_str(&cron_with_seconds).ok()?;
schedule.upcoming(Utc).next()
}
async fn scan_projects(state: &AppState) -> Vec<ScheduledJob> {
let storage = state.storage.read().await;
let projects = match storage.list().await {
Ok(metas) => metas,
Err(e) => {
tracing::error!("Failed to list projects for scheduler: {}", e);
return Vec::new();
}
};
let mut jobs = Vec::new();
for meta in projects {
let project = match storage.get(meta.id).await {
Ok(p) => p,
Err(_) => continue,
};
let is_built = is_project_built(&project.name);
let binary_path = get_project_binary_path(&project.name);
for (trigger_id, node) in &project.action_nodes {
if let ActionNodeConfig::Trigger(trigger) = node {
tracing::debug!(
project = %project.name,
trigger_id = %trigger_id,
trigger_type = ?trigger.trigger_type,
is_built = is_built,
binary_path = %binary_path,
has_schedule = trigger.schedule.is_some(),
"Found trigger in project"
);
}
}
if !is_built {
continue;
}
for (trigger_id, node) in &project.action_nodes {
if let ActionNodeConfig::Trigger(trigger) = node {
tracing::debug!(
project = %project.name,
trigger_type = ?trigger.trigger_type,
is_schedule = (trigger.trigger_type == TriggerType::Schedule),
"Checking trigger for schedule"
);
if trigger.trigger_type == TriggerType::Schedule {
tracing::debug!(
project = %project.name,
has_schedule_config = trigger.schedule.is_some(),
"Trigger is Schedule type"
);
if let Some(schedule) = &trigger.schedule {
let next_run_result = get_next_run(&schedule.cron, &schedule.timezone);
tracing::debug!(
project = %project.name,
cron = %schedule.cron,
next_run = ?next_run_result,
"Parsed cron expression"
);
if let Some(next_run) = next_run_result {
tracing::info!(
project = %project.name,
cron = %schedule.cron,
next_run = %next_run,
"Adding scheduled job"
);
jobs.push(ScheduledJob {
project_id: meta.id.to_string(),
project_name: project.name.clone(),
trigger_id: trigger_id.clone(),
cron: schedule.cron.clone(),
timezone: schedule.timezone.clone(),
default_prompt: schedule.default_prompt.clone(),
next_run,
binary_path: binary_path.clone(),
});
}
}
}
}
}
}
jobs
}
async fn execute_job(job: &ScheduledJob) {
tracing::info!(
project_id = %job.project_id,
project_name = %job.project_name,
trigger_id = %job.trigger_id,
cron = %job.cron,
"Executing scheduled job"
);
let session_id = uuid::Uuid::new_v4().to_string();
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let payload = if let Some(prompt) = &job.default_prompt {
serde_json::json!({
"trigger": "schedule",
"input": prompt,
"cron": job.cron,
"timezone": job.timezone,
"scheduled_time": job.next_run.to_rfc3339(),
})
} else {
serde_json::json!({
"trigger": "schedule",
"input": format!("Scheduled trigger fired at {} (cron: {})", job.next_run.to_rfc3339(), job.cron),
"cron": job.cron,
"timezone": job.timezone,
"scheduled_time": job.next_run.to_rfc3339(),
})
};
notify_webhook(
&job.project_id,
WebhookNotification {
session_id: session_id.clone(),
path: format!("/schedule/{}", job.trigger_id),
method: "SCHEDULE".to_string(),
payload,
timestamp,
binary_path: Some(job.binary_path.clone()),
},
)
.await;
tracing::info!(
project_id = %job.project_id,
session_id = %session_id,
"Scheduled job notification sent to UI"
);
}
pub async fn start_scheduler(state: AppState) {
{
let mut scheduler = SCHEDULER.write().await;
if scheduler.running {
tracing::warn!("Scheduler already running");
return;
}
scheduler.running = true;
}
tracing::info!("Starting schedule trigger service");
loop {
let jobs = scan_projects(&state).await;
tracing::info!(
job_count = jobs.len(),
"Scheduler scan complete - found {} schedule triggers",
jobs.len()
);
for job in &jobs {
tracing::debug!(
project = %job.project_name,
cron = %job.cron,
next_run = %job.next_run,
"Found scheduled job"
);
}
{
let mut scheduler = SCHEDULER.write().await;
if !scheduler.running {
tracing::info!("Scheduler stopped");
break;
}
scheduler.jobs.clear();
for job in &jobs {
scheduler
.jobs
.entry(job.project_id.clone())
.or_insert_with(Vec::new)
.push(job.clone());
}
}
let now = Utc::now();
let Some(current_minute) = now.with_second(0).and_then(|t| t.with_nanosecond(0)) else {
tracing::warn!("Failed to round current time to minute");
tokio::time::sleep(Duration::from_secs(30)).await;
continue;
};
for job in &jobs {
let job_key = format!("{}:{}", job.project_id, job.trigger_id);
let should_execute = {
let scheduler = SCHEDULER.read().await;
let last_exec = scheduler.last_executed.get(&job_key);
let already_executed = last_exec
.and_then(|t| t.with_second(0).and_then(|t| t.with_nanosecond(0)))
.map(|t| t >= current_minute)
.unwrap_or(false);
let next_run_minute = match job
.next_run
.with_second(0)
.and_then(|t| t.with_nanosecond(0))
{
Some(t) => t,
None => continue,
};
let time_to_next = (next_run_minute - current_minute).num_seconds();
let is_matching_minute = time_to_next <= 60 && time_to_next > 0;
is_matching_minute && !already_executed
};
if should_execute {
tracing::info!(
project = %job.project_name,
trigger_id = %job.trigger_id,
next_run = %job.next_run,
current_minute = %current_minute,
"Executing scheduled job"
);
{
let mut scheduler = SCHEDULER.write().await;
scheduler.last_executed.insert(job_key, now);
}
execute_job(job).await;
}
}
if !jobs.is_empty() {
tracing::debug!(
job_count = jobs.len(),
"Scheduler tick - {} jobs scheduled",
jobs.len()
);
}
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
pub async fn stop_scheduler() {
let mut scheduler = SCHEDULER.write().await;
scheduler.running = false;
tracing::info!("Scheduler stop requested");
}
pub async fn get_project_schedules(project_id: &str) -> Vec<ScheduledJobInfo> {
let scheduler = SCHEDULER.read().await;
scheduler
.jobs
.get(project_id)
.map(|jobs| {
jobs.iter()
.map(|j| ScheduledJobInfo {
trigger_id: j.trigger_id.clone(),
cron: j.cron.clone(),
timezone: j.timezone.clone(),
next_run: j.next_run.to_rfc3339(),
})
.collect()
})
.unwrap_or_default()
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ScheduledJobInfo {
pub trigger_id: String,
pub cron: String,
pub timezone: String,
pub next_run: String,
}