rsclaw 0.0.1-alpha.1

rsclaw: High-performance AI agent (BETA). Optimized for M4 Max and 2GB VPS. 100% compatible with openclaw
Documentation
use super::executor::TaskExecutor;
use super::monitor::TaskMonitor;
use super::store::CronStore;
use super::types::{CronExpression, CronTask, TaskStatus};
use crate::agent::AgentManager;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;

/// Cron scheduler.
pub struct CronScheduler {
    store: Arc<CronStore>,
    executor: Arc<TaskExecutor>,
    monitor: Arc<TaskMonitor>,
    max_concurrent: usize,
}

impl CronScheduler {
    /// Create a new cron scheduler.
    pub fn new(
        store: Arc<CronStore>,
        agent_manager: Arc<RwLock<AgentManager>>,
        skills_dir: std::path::PathBuf,
        max_concurrent: usize,
    ) -> Self {
        Self {
            store: store.clone(),
            executor: Arc::new(TaskExecutor::new(agent_manager, skills_dir)),
            monitor: Arc::new(TaskMonitor::new(store)),
            max_concurrent,
        }
    }

    /// Add a new task.
    pub fn add_task(&self, task: CronTask) -> Result<Arc<str>> {
        // Validate cron expression
        CronExpression::parse(&task.expression)
            .map_err(|e| anyhow::anyhow!("Invalid cron expression: {}", e))?;

        let id = task.id.clone();
        self.store.save(&task)?;
        tracing::info!("Task '{}' added: {}", id, task.description);
        Ok(id)
    }

    /// Delete a task.
    pub fn delete_task(&self, id: &str) -> Result<bool> {
        let result = self.store.delete(id)?;
        if result {
            tracing::info!("Task '{}' deleted", id);
        }
        Ok(result)
    }

    /// List all tasks.
    pub fn list_tasks(&self) -> Result<Vec<CronTask>> {
        self.store.list_all()
    }

    /// Execute a task immediately.
    pub async fn execute_now(&self, id: &str) -> Result<ExecutionResult> {
        let mut task = self.store.load(id)?
            .ok_or_else(|| anyhow::anyhow!("Task '{}' not found", id))?;

        task.status = TaskStatus::Running;
        self.store.save(&task)?;

        let result = self.executor.execute(&task).await?;
        self.monitor.record_execution(&mut task, &result)?;

        Ok(result)
    }

    /// Run the scheduler loop.
    pub async fn run(&self) -> Result<()> {
        tracing::info!("Cron scheduler started");

        loop {
            self.tick().await;
            tokio::time::sleep(Duration::from_secs(10)).await;
        }
    }

    /// Single tick of the scheduler.
    async fn tick(&self) {
        let tasks = match self.store.list_all() {
            Ok(t) => t,
            Err(e) => {
                tracing::error!("Failed to load tasks: {}", e);
                return;
            }
        };

        let now = chrono::Local::now();
        let mut running_count = 0;

        // Count currently running tasks
        for task in &tasks {
            if task.status == TaskStatus::Running {
                running_count += 1;
            }
        }

        for mut task in tasks {
            // Skip if already running
            if task.status == TaskStatus::Running {
                continue;
            }

            // Skip if at max concurrency
            if running_count >= self.max_concurrent {
                break;
            }

            // Check if it's time to run
            if let Ok(expr) = CronExpression::parse(&task.expression) {
                if expr.matches(&now) {
                    // Check if already ran this minute
                    if let Some(last_run) = task.last_run {
                        let last = chrono::DateTime::from_timestamp(last_run as i64, 0)
                            .map(|dt| dt.with_timezone(&chrono::Local));
                        if let Some(last) = last {
                            if now.signed_duration_since(last).num_seconds() < 60 {
                                continue;
                            }
                        }
                    }

                    // Execute task
                    task.status = TaskStatus::Running;
                    if let Err(e) = self.store.save(&task) {
                        tracing::error!("Failed to save task: {}", e);
                        continue;
                    }

                    running_count += 1;

                    let executor = self.executor.clone();
                    let monitor = self.monitor.clone();
                    let _store = self.store.clone();

                    tokio::spawn(async move {
                        let result = executor.execute(&task).await.unwrap_or_else(|e| {
                            super::executor::ExecutionResult {
                                success: false,
                                output: Arc::from(""),
                                error: Some(Arc::from(e.to_string().as_str())),
                                duration_ms: 0,
                            }
                        });

                        if let Err(e) = monitor.record_execution(&mut task, &result) {
                            tracing::error!("Failed to record execution: {}", e);
                        }
                    });
                }
            }
        }
    }
}

use super::executor::ExecutionResult;