use super::executor::TaskExecutor;
use super::monitor::TaskMonitor;
use super::store::CronStore;
use super::types::{CronExpression, CronTask, TaskStatus};
use crate::agent::AgentManager;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
pub struct CronScheduler {
store: Arc<CronStore>,
executor: Arc<TaskExecutor>,
monitor: Arc<TaskMonitor>,
max_concurrent: usize,
}
impl CronScheduler {
pub fn new(
store: Arc<CronStore>,
agent_manager: Arc<RwLock<AgentManager>>,
skills_dir: std::path::PathBuf,
max_concurrent: usize,
) -> Self {
Self {
store: store.clone(),
executor: Arc::new(TaskExecutor::new(agent_manager, skills_dir)),
monitor: Arc::new(TaskMonitor::new(store)),
max_concurrent,
}
}
pub fn add_task(&self, task: CronTask) -> Result<Arc<str>> {
CronExpression::parse(&task.expression)
.map_err(|e| anyhow::anyhow!("Invalid cron expression: {}", e))?;
let id = task.id.clone();
self.store.save(&task)?;
tracing::info!("Task '{}' added: {}", id, task.description);
Ok(id)
}
pub fn delete_task(&self, id: &str) -> Result<bool> {
let result = self.store.delete(id)?;
if result {
tracing::info!("Task '{}' deleted", id);
}
Ok(result)
}
pub fn list_tasks(&self) -> Result<Vec<CronTask>> {
self.store.list_all()
}
pub async fn execute_now(&self, id: &str) -> Result<ExecutionResult> {
let mut task = self.store.load(id)?
.ok_or_else(|| anyhow::anyhow!("Task '{}' not found", id))?;
task.status = TaskStatus::Running;
self.store.save(&task)?;
let result = self.executor.execute(&task).await?;
self.monitor.record_execution(&mut task, &result)?;
Ok(result)
}
pub async fn run(&self) -> Result<()> {
tracing::info!("Cron scheduler started");
loop {
self.tick().await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
async fn tick(&self) {
let tasks = match self.store.list_all() {
Ok(t) => t,
Err(e) => {
tracing::error!("Failed to load tasks: {}", e);
return;
}
};
let now = chrono::Local::now();
let mut running_count = 0;
for task in &tasks {
if task.status == TaskStatus::Running {
running_count += 1;
}
}
for mut task in tasks {
if task.status == TaskStatus::Running {
continue;
}
if running_count >= self.max_concurrent {
break;
}
if let Ok(expr) = CronExpression::parse(&task.expression) {
if expr.matches(&now) {
if let Some(last_run) = task.last_run {
let last = chrono::DateTime::from_timestamp(last_run as i64, 0)
.map(|dt| dt.with_timezone(&chrono::Local));
if let Some(last) = last {
if now.signed_duration_since(last).num_seconds() < 60 {
continue;
}
}
}
task.status = TaskStatus::Running;
if let Err(e) = self.store.save(&task) {
tracing::error!("Failed to save task: {}", e);
continue;
}
running_count += 1;
let executor = self.executor.clone();
let monitor = self.monitor.clone();
let _store = self.store.clone();
tokio::spawn(async move {
let result = executor.execute(&task).await.unwrap_or_else(|e| {
super::executor::ExecutionResult {
success: false,
output: Arc::from(""),
error: Some(Arc::from(e.to_string().as_str())),
duration_ms: 0,
}
});
if let Err(e) = monitor.record_execution(&mut task, &result) {
tracing::error!("Failed to record execution: {}", e);
}
});
}
}
}
}
}
use super::executor::ExecutionResult;