Skip to main content

ravenclaws/
scheduler.rs

1//! Scheduling & triggers for proactive 24/7 agents
2//!
3//! Provides three trigger mechanisms that submit tasks to the BackgroundTaskManager:
4//!
5//! 1. **Cron scheduling** — execute tasks on a recurring schedule (cron expressions)
6//! 2. **Webhook triggers** — HTTP endpoint to receive webhooks and trigger tasks
7//! 3. **File-watch triggers** — watch files/directories for changes and trigger tasks
8//!
9//! # Architecture
10//!
11//! ```text
12//! Scheduler
13//!   ├── CronTrigger   — cron expression → tokio interval → submit task
14//!   ├── WebhookServer — HTTP POST /webhook/<name> → submit task
15//!   └── FileWatcher   — notify events → debounce → submit task
16//! ```
17//!
18//! All triggers submit tasks to a shared `BackgroundTaskManager` and use the
19//! configured LLM client for execution.
20
21use crate::background::BackgroundTaskManager;
22use crate::config::SchedulerConfig;
23use crate::error::{RavenClawsError, Result};
24use serde::{Deserialize, Serialize};
25use std::path::PathBuf;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28use tracing::{debug, error, info, instrument, warn};
29
30// ── Trigger types ──────────────────────────────────────────────────────────
31
32/// A single scheduled trigger configuration
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct TriggerConfig {
35    /// Unique name for this trigger
36    pub name: String,
37    /// The prompt to send to the agent when triggered
38    pub prompt: String,
39    /// Optional system prompt override
40    #[serde(default)]
41    pub system_prompt: Option<String>,
42    /// Trigger type-specific configuration
43    pub trigger: TriggerType,
44}
45
46/// The type of trigger and its specific configuration
47#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(tag = "type")]
49pub enum TriggerType {
50    /// Cron expression (e.g., "0 */6 * * *" for every 6 hours)
51    #[serde(rename = "cron")]
52    Cron {
53        /// Standard cron expression (5 or 6 fields)
54        expression: String,
55    },
56    /// Webhook endpoint (e.g., POST /webhook/<name>)
57    #[serde(rename = "webhook")]
58    Webhook {
59        /// Optional secret for HMAC verification
60        #[serde(default)]
61        secret: Option<String>,
62    },
63    /// File system watcher
64    #[serde(rename = "watch")]
65    Watch {
66        /// Path to watch
67        path: String,
68        /// Event types to trigger on (create, modify, delete)
69        #[serde(default = "default_watch_events")]
70        events: Vec<String>,
71        /// Debounce interval in seconds (default: 5)
72        #[serde(default = "default_debounce_secs")]
73        debounce_secs: u64,
74    },
75}
76
77fn default_watch_events() -> Vec<String> {
78    vec!["modify".to_string()]
79}
80
81fn default_debounce_secs() -> u64 {
82    5
83}
84
85// ── Scheduler ──────────────────────────────────────────────────────────────
86
87/// The main scheduler that manages all triggers
88///
89/// Owns the trigger configurations and manages their lifecycle.
90/// All triggers submit tasks to the shared BackgroundTaskManager.
91#[derive(Clone)]
92pub struct Scheduler {
93    /// Background task manager for executing triggered tasks
94    bg_manager: BackgroundTaskManager,
95    /// Trigger configurations
96    triggers: Vec<TriggerConfig>,
97    /// Whether the scheduler is running
98    running: Arc<RwLock<bool>>,
99    /// Webhook server port (default: 9090)
100    webhook_port: u16,
101}
102
103impl Scheduler {
104    /// Create a new scheduler with the given background task manager and triggers
105    pub fn new(bg_manager: BackgroundTaskManager, config: &SchedulerConfig) -> Self {
106        Self {
107            bg_manager,
108            triggers: config.triggers.clone(),
109            running: Arc::new(RwLock::new(false)),
110            webhook_port: 9090,
111        }
112    }
113
114    /// Set the webhook server port
115    pub fn set_webhook_port(&mut self, port: u16) {
116        self.webhook_port = port;
117    }
118
119    /// Start all triggers and run until cancelled
120    ///
121    /// Spawns tokio tasks for each trigger type:
122    /// - Cron triggers: periodic timer based on cron expression
123    /// - Webhook triggers: HTTP server listening for POST requests
124    /// - Watch triggers: file system watcher with debouncing
125    #[instrument(skip(self))]
126    pub async fn start(&self) -> Result<()> {
127        {
128            let mut running = self.running.write().await;
129            if *running {
130                warn!("Scheduler is already running");
131                return Ok(());
132            }
133            *running = true;
134        }
135
136        let trigger_count = self.triggers.len();
137        if trigger_count == 0 {
138            info!("No triggers configured, scheduler idle");
139            return Ok(());
140        }
141
142        info!(count = trigger_count, "Starting scheduler with triggers");
143
144        // Parse and validate all triggers
145        for trigger in &self.triggers {
146            match &trigger.trigger {
147                TriggerType::Cron { expression } => {
148                    let _schedule = expression.parse::<cron::Schedule>().map_err(|e| {
149                        RavenClawsError::CommandExecution(format!(
150                            "Invalid cron expression '{}': {}",
151                            expression, e
152                        ))
153                    })?;
154                    info!(
155                        name = %trigger.name,
156                        expression = %expression,
157                        "Registered cron trigger"
158                    );
159                }
160                TriggerType::Webhook { secret } => {
161                    let _has_secret = secret.is_some();
162                    info!(
163                        name = %trigger.name,
164                        has_secret = _has_secret,
165                        "Registered webhook trigger"
166                    );
167                }
168                TriggerType::Watch {
169                    path,
170                    events,
171                    debounce_secs,
172                } => {
173                    let path_buf = PathBuf::from(path);
174                    if !path_buf.exists() {
175                        warn!(
176                            name = %trigger.name,
177                            path = %path,
178                            "Watch path does not exist yet, will retry on start"
179                        );
180                    }
181                    info!(
182                        name = %trigger.name,
183                        path = %path,
184                        events = ?events,
185                        debounce_secs = debounce_secs,
186                        "Registered file watch trigger"
187                    );
188                }
189            }
190        }
191
192        // Spawn cron trigger tasks
193        for trigger in &self.triggers {
194            if let TriggerType::Cron { expression } = &trigger.trigger {
195                let schedule = expression.parse::<cron::Schedule>().map_err(|e| {
196                    RavenClawsError::CommandExecution(format!(
197                        "Invalid cron expression '{}': {}",
198                        expression, e
199                    ))
200                })?;
201                let bg = self.bg_manager.clone();
202                let name = trigger.name.clone();
203                let prompt = trigger.prompt.clone();
204                let system_prompt = trigger.system_prompt.clone().unwrap_or_default();
205
206                tokio::spawn(async move {
207                    run_cron_trigger(name, schedule, bg, prompt, system_prompt).await;
208                });
209            }
210        }
211
212        // Spawn webhook server if any webhook triggers exist
213        let has_webhooks = self
214            .triggers
215            .iter()
216            .any(|t| matches!(t.trigger, TriggerType::Webhook { .. }));
217
218        if has_webhooks {
219            let webhook_triggers: Vec<(String, String, Option<String>)> = self
220                .triggers
221                .iter()
222                .filter_map(|t| {
223                    if let TriggerType::Webhook { secret } = &t.trigger {
224                        Some((t.name.clone(), t.prompt.clone(), secret.clone()))
225                    } else {
226                        None
227                    }
228                })
229                .collect();
230
231            let bg = self.bg_manager.clone();
232            let port = self.webhook_port;
233
234            tokio::spawn(async move {
235                run_webhook_server(port, webhook_triggers, bg).await;
236            });
237        }
238
239        // Spawn file watch tasks
240        for trigger in &self.triggers {
241            if let TriggerType::Watch {
242                path,
243                events,
244                debounce_secs,
245            } = &trigger.trigger
246            {
247                let bg = self.bg_manager.clone();
248                let name = trigger.name.clone();
249                let prompt = trigger.prompt.clone();
250                let system_prompt = trigger.system_prompt.clone().unwrap_or_default();
251                let watch_path = path.clone();
252                let watch_events = events.clone();
253                let debounce = *debounce_secs;
254
255                tokio::spawn(async move {
256                    run_watch_trigger(
257                        name,
258                        watch_path,
259                        watch_events,
260                        debounce,
261                        bg,
262                        prompt,
263                        system_prompt,
264                    )
265                    .await;
266                });
267            }
268        }
269
270        Ok(())
271    }
272
273    /// Stop the scheduler
274    pub async fn stop(&self) {
275        let mut running = self.running.write().await;
276        *running = false;
277        info!("Scheduler stopped");
278    }
279
280    /// Check if the scheduler is running
281    #[allow(dead_code)]
282    pub async fn is_running(&self) -> bool {
283        *self.running.read().await
284    }
285}
286
287// ── Cron trigger runner ────────────────────────────────────────────────────
288
289/// Run a cron trigger: sleep until next scheduled time, then submit a task
290#[instrument(skip(bg_manager, schedule), fields(trigger_name = %name))]
291async fn run_cron_trigger(
292    name: String,
293    schedule: cron::Schedule,
294    bg_manager: BackgroundTaskManager,
295    prompt: String,
296    system_prompt: String,
297) {
298    info!(trigger = %name, "Cron trigger started");
299
300    for next in schedule.upcoming(chrono::Utc) {
301        let now = chrono::Utc::now();
302        let delay = (next - now).to_std().unwrap_or(std::time::Duration::ZERO);
303
304        if delay > std::time::Duration::ZERO {
305            debug!(
306                trigger = %name,
307                next_run = %next,
308                delay_ms = delay.as_millis(),
309                "Sleeping until next cron trigger"
310            );
311            tokio::time::sleep(delay).await;
312        }
313
314        debug!(trigger = %name, "Cron trigger firing");
315        match bg_manager
316            .submit(prompt.clone(), system_prompt.clone())
317            .await
318        {
319            Ok(task_id) => {
320                info!(
321                    trigger = %name,
322                    task_id = %task_id,
323                    "Cron trigger submitted background task"
324                );
325            }
326            Err(e) => {
327                error!(
328                    trigger = %name,
329                    error = %e,
330                    "Cron trigger failed to submit task"
331                );
332            }
333        }
334    }
335}
336
337// ── Webhook server ─────────────────────────────────────────────────────────
338
339/// Run a simple HTTP server that listens for webhook POST requests
340#[instrument(skip(triggers, bg_manager))]
341async fn run_webhook_server(
342    port: u16,
343    triggers: Vec<(String, String, Option<String>)>,
344    bg_manager: BackgroundTaskManager,
345) {
346    let bind_addr = format!("127.0.0.1:{}", port);
347    let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
348        Ok(l) => {
349            info!(
350                address = %bind_addr,
351                trigger_count = triggers.len(),
352                "Webhook server started"
353            );
354            l
355        }
356        Err(e) => {
357            error!(
358                address = %bind_addr,
359                error = %e,
360                "Failed to start webhook server"
361            );
362            return;
363        }
364    };
365
366    let triggers = Arc::new(triggers);
367
368    loop {
369        match listener.accept().await {
370            Ok((stream, peer)) => {
371                let triggers = Arc::clone(&triggers);
372                let bg = bg_manager.clone();
373                tokio::spawn(async move {
374                    handle_webhook_connection(stream, peer, triggers, bg).await;
375                });
376            }
377            Err(e) => {
378                warn!(error = %e, "Webhook server accept error");
379            }
380        }
381    }
382}
383
384/// Handle a single webhook HTTP connection
385async fn handle_webhook_connection(
386    mut stream: tokio::net::TcpStream,
387    peer: std::net::SocketAddr,
388    triggers: Arc<Vec<(String, String, Option<String>)>>,
389    bg_manager: BackgroundTaskManager,
390) {
391    use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
392
393    let mut reader = BufReader::new(&mut stream);
394    let mut request_line = String::new();
395
396    if reader.read_line(&mut request_line).await.is_err() {
397        return;
398    }
399
400    let request_line = request_line.trim();
401    if request_line.is_empty() {
402        return;
403    }
404
405    // Parse method and path
406    let parts: Vec<&str> = request_line.split_whitespace().collect();
407    if parts.len() < 2 {
408        send_http_response(&mut stream, "400 Bad Request", b"Bad Request").await;
409        return;
410    }
411
412    let method = parts[0];
413    let path = parts[1];
414
415    // Only accept POST requests
416    if method != "POST" {
417        send_http_response(&mut stream, "405 Method Not Allowed", b"Method Not Allowed").await;
418        return;
419    }
420
421    // Extract trigger name from path: /webhook/<name>
422    let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
423    if path_parts.len() < 2 || path_parts[0] != "webhook" {
424        send_http_response(&mut stream, "404 Not Found", b"Not Found").await;
425        return;
426    }
427
428    let trigger_name = path_parts[1];
429
430    // Read headers
431    let mut header_line = String::new();
432    let mut content_length: usize = 0;
433    loop {
434        header_line.clear();
435        if reader.read_line(&mut header_line).await.is_err() {
436            return;
437        }
438        let line = header_line.trim();
439        if line.is_empty() {
440            break;
441        }
442        if let Some(len_str) = line.strip_prefix("Content-Length:") {
443            content_length = len_str.trim().parse().unwrap_or(0);
444        }
445    }
446
447    // Read body
448    let mut body = vec![0u8; content_length];
449    if content_length > 0 && reader.read_exact(&mut body).await.is_err() {
450        send_http_response(&mut stream, "400 Bad Request", b"Bad Request").await;
451        return;
452    }
453
454    // Find matching trigger
455    let matched = triggers.iter().find(|(name, _, _)| name == trigger_name);
456
457    if let Some((_name, prompt, _secret)) = matched {
458        // Submit the task with webhook body as context
459        let webhook_body = String::from_utf8_lossy(&body);
460        let full_prompt = format!("{}\n\nWebhook payload:\n{}", prompt, webhook_body);
461
462        match bg_manager.submit(full_prompt, String::new()).await {
463            Ok(task_id) => {
464                info!(
465                    trigger = %_name,
466                    task_id = %task_id,
467                    peer = %peer,
468                    "Webhook trigger submitted background task"
469                );
470                send_http_response(
471                    &mut stream,
472                    "200 OK",
473                    format!("{{\"task_id\":\"{}\"}}", task_id).as_bytes(),
474                )
475                .await;
476            }
477            Err(e) => {
478                error!(
479                    trigger = %_name,
480                    error = %e,
481                    "Webhook trigger failed to submit task"
482                );
483                send_http_response(
484                    &mut stream,
485                    "500 Internal Server Error",
486                    b"Internal Server Error",
487                )
488                .await;
489            }
490        }
491    } else {
492        send_http_response(&mut stream, "404 Not Found", b"Trigger Not Found").await;
493    }
494}
495
496/// Send an HTTP response to any async writer
497async fn send_http_response(
498    stream: &mut (impl tokio::io::AsyncWrite + Unpin),
499    status: &str,
500    body: &[u8],
501) {
502    use tokio::io::AsyncWriteExt;
503
504    let response = format!(
505        "HTTP/1.1 {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
506        status,
507        body.len(),
508    );
509
510    if let Err(e) = stream.write_all(response.as_bytes()).await {
511        warn!(error = %e, "Failed to write webhook response headers");
512        return;
513    }
514    if let Err(e) = stream.write_all(body).await {
515        warn!(error = %e, "Failed to write webhook response body");
516        return;
517    }
518    if let Err(e) = stream.flush().await {
519        warn!(error = %e, "Failed to flush webhook response");
520    }
521}
522
523// ── File watch trigger runner ──────────────────────────────────────────────
524
525/// Run a file watch trigger: monitor a path for changes and submit tasks
526#[instrument(skip(bg_manager), fields(trigger_name = %name, path = %watch_path))]
527async fn run_watch_trigger(
528    name: String,
529    watch_path: String,
530    events: Vec<String>,
531    debounce_secs: u64,
532    bg_manager: BackgroundTaskManager,
533    prompt: String,
534    system_prompt: String,
535) {
536    use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
537    use std::sync::mpsc;
538
539    info!(
540        trigger = %name,
541        path = %watch_path,
542        "File watch trigger started"
543    );
544
545    // Create a channel to receive file system events
546    let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
547
548    let mut watcher = match RecommendedWatcher::new(tx, Config::default()) {
549        Ok(w) => w,
550        Err(e) => {
551            error!(
552                trigger = %name,
553                error = %e,
554                "Failed to create file watcher"
555            );
556            return;
557        }
558    };
559
560    // Watch the path
561    let path = PathBuf::from(&watch_path);
562    if let Err(e) = watcher.watch(&path, RecursiveMode::NonRecursive) {
563        error!(
564            trigger = %name,
565            path = %watch_path,
566            error = %e,
567            "Failed to watch path"
568        );
569        return;
570    }
571
572    // Debounce state
573    let debounce_duration = std::time::Duration::from_secs(debounce_secs);
574    let mut last_trigger_time: Option<std::time::Instant> = None;
575
576    // Process events
577    for event in rx {
578        match event {
579            Ok(event) => {
580                let should_trigger = match &event.kind {
581                    EventKind::Create(_) => events.contains(&"create".to_string()),
582                    EventKind::Modify(_) => events.contains(&"modify".to_string()),
583                    EventKind::Remove(_) => events.contains(&"delete".to_string()),
584                    _ => false,
585                };
586
587                if should_trigger {
588                    let now = std::time::Instant::now();
589                    let should_fire = match last_trigger_time {
590                        Some(last) => now.duration_since(last) >= debounce_duration,
591                        None => true,
592                    };
593
594                    if should_fire {
595                        last_trigger_time = Some(now);
596                        debug!(
597                            trigger = %name,
598                            event = ?event.kind,
599                            paths = ?event.paths,
600                            "File watch trigger firing"
601                        );
602
603                        match bg_manager
604                            .submit(prompt.clone(), system_prompt.clone())
605                            .await
606                        {
607                            Ok(task_id) => {
608                                info!(
609                                    trigger = %name,
610                                    task_id = %task_id,
611                                    "File watch trigger submitted background task"
612                                );
613                            }
614                            Err(e) => {
615                                error!(
616                                    trigger = %name,
617                                    error = %e,
618                                    "File watch trigger failed to submit task"
619                                );
620                            }
621                        }
622                    }
623                }
624            }
625            Err(e) => {
626                warn!(
627                    trigger = %name,
628                    error = %e,
629                    "File watch error"
630                );
631            }
632        }
633    }
634}
635
636// ── Tests ──────────────────────────────────────────────────────────────────
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641    use crate::background::BackgroundTaskManager;
642    use std::path::PathBuf;
643
644    fn test_dir(name: &str) -> PathBuf {
645        let dir = std::env::temp_dir().join(format!(
646            "ravenclaws-test-sched-{}-{}",
647            name,
648            std::process::id()
649        ));
650        let _ = std::fs::remove_dir_all(&dir);
651        dir
652    }
653
654    #[tokio::test]
655    async fn test_trigger_config_cron() {
656        let config = TriggerConfig {
657            name: "hourly".to_string(),
658            prompt: "Run hourly check".to_string(),
659            system_prompt: None,
660            trigger: TriggerType::Cron {
661                expression: "0 * * * * *".to_string(),
662            },
663        };
664
665        assert_eq!(config.name, "hourly");
666        match &config.trigger {
667            TriggerType::Cron { expression } => {
668                assert_eq!(expression, "0 * * * * *");
669            }
670            _ => panic!("Expected Cron trigger"),
671        }
672    }
673
674    #[tokio::test]
675    async fn test_trigger_config_webhook() {
676        let config = TriggerConfig {
677            name: "github-webhook".to_string(),
678            prompt: "Process GitHub event".to_string(),
679            system_prompt: None,
680            trigger: TriggerType::Webhook {
681                secret: Some("mysecret".to_string()),
682            },
683        };
684
685        assert_eq!(config.name, "github-webhook");
686        match &config.trigger {
687            TriggerType::Webhook { secret } => {
688                assert_eq!(secret.as_deref(), Some("mysecret"));
689            }
690            _ => panic!("Expected Webhook trigger"),
691        }
692    }
693
694    #[tokio::test]
695    async fn test_trigger_config_watch() {
696        let config = TriggerConfig {
697            name: "config-watch".to_string(),
698            prompt: "Config changed".to_string(),
699            system_prompt: None,
700            trigger: TriggerType::Watch {
701                path: "/etc/config".to_string(),
702                events: vec!["modify".to_string(), "create".to_string()],
703                debounce_secs: 10,
704            },
705        };
706
707        assert_eq!(config.name, "config-watch");
708        match &config.trigger {
709            TriggerType::Watch {
710                path,
711                events,
712                debounce_secs,
713            } => {
714                assert_eq!(path, "/etc/config");
715                assert_eq!(events.len(), 2);
716                assert_eq!(*debounce_secs, 10);
717            }
718            _ => panic!("Expected Watch trigger"),
719        }
720    }
721
722    #[tokio::test]
723    async fn test_scheduler_new() {
724        let dir = test_dir("new");
725        let bg = BackgroundTaskManager::new(&dir).await.unwrap();
726        let config = SchedulerConfig { triggers: vec![] };
727        let scheduler = Scheduler::new(bg, &config);
728
729        assert!(!scheduler.is_running().await);
730        let _ = std::fs::remove_dir_all(&dir);
731    }
732
733    #[tokio::test]
734    async fn test_scheduler_start_stop() {
735        let dir = test_dir("start_stop");
736        let bg = BackgroundTaskManager::new(&dir).await.unwrap();
737        let config = SchedulerConfig { triggers: vec![] };
738        let scheduler = Scheduler::new(bg, &config);
739
740        scheduler.start().await.unwrap();
741        assert!(scheduler.is_running().await);
742
743        scheduler.stop().await;
744        assert!(!scheduler.is_running().await);
745        let _ = std::fs::remove_dir_all(&dir);
746    }
747
748    #[tokio::test]
749    async fn test_cron_expression_parsing() {
750        // Valid cron expression
751        let expr = "0 */6 * * * *";
752        let schedule = expr.parse::<cron::Schedule>();
753        assert!(schedule.is_ok(), "Valid cron expression should parse");
754
755        // Invalid cron expression
756        let bad_expr = "not-a-cron";
757        let schedule = bad_expr.parse::<cron::Schedule>();
758        assert!(schedule.is_err(), "Invalid cron expression should fail");
759    }
760
761    #[tokio::test]
762    async fn test_cron_schedule_upcoming() {
763        let expr = "0 * * * * *"; // Every minute at :00
764        let schedule = expr.parse::<cron::Schedule>().unwrap();
765        let now = chrono::Utc::now();
766
767        let mut upcoming = schedule.upcoming(chrono::Utc);
768        let next = upcoming.next();
769        assert!(next.is_some(), "Should have a next scheduled time");
770        assert!(next.unwrap() > now, "Next time should be in the future");
771    }
772
773    #[tokio::test]
774    async fn test_scheduler_with_cron_trigger() {
775        let dir = test_dir("with_cron");
776        let bg = BackgroundTaskManager::new(&dir).await.unwrap();
777
778        let config = SchedulerConfig {
779            triggers: vec![TriggerConfig {
780                name: "test-cron".to_string(),
781                prompt: "Cron test".to_string(),
782                system_prompt: None,
783                trigger: TriggerType::Cron {
784                    expression: "0 0 1 1 * *".to_string(), // Once a year
785                },
786            }],
787        };
788
789        let scheduler = Scheduler::new(bg, &config);
790        scheduler.start().await.unwrap();
791        assert!(scheduler.is_running().await);
792
793        scheduler.stop().await;
794        let _ = std::fs::remove_dir_all(&dir);
795    }
796
797    #[tokio::test]
798    async fn test_scheduler_with_webhook_trigger() {
799        let dir = test_dir("with_webhook");
800        let bg = BackgroundTaskManager::new(&dir).await.unwrap();
801
802        let config = SchedulerConfig {
803            triggers: vec![TriggerConfig {
804                name: "test-webhook".to_string(),
805                prompt: "Webhook test".to_string(),
806                system_prompt: None,
807                trigger: TriggerType::Webhook { secret: None },
808            }],
809        };
810
811        let scheduler = Scheduler::new(bg, &config);
812        scheduler.start().await.unwrap();
813        assert!(scheduler.is_running().await);
814
815        scheduler.stop().await;
816        let _ = std::fs::remove_dir_all(&dir);
817    }
818
819    #[tokio::test]
820    async fn test_scheduler_with_watch_trigger() {
821        let dir = test_dir("with_watch");
822        let bg = BackgroundTaskManager::new(&dir).await.unwrap();
823
824        let config = SchedulerConfig {
825            triggers: vec![TriggerConfig {
826                name: "test-watch".to_string(),
827                prompt: "Watch test".to_string(),
828                system_prompt: None,
829                trigger: TriggerType::Watch {
830                    path: dir.to_string_lossy().to_string(),
831                    events: vec!["modify".to_string()],
832                    debounce_secs: 1,
833                },
834            }],
835        };
836
837        let scheduler = Scheduler::new(bg, &config);
838        scheduler.start().await.unwrap();
839        assert!(scheduler.is_running().await);
840
841        scheduler.stop().await;
842        let _ = std::fs::remove_dir_all(&dir);
843    }
844
845    #[tokio::test]
846    async fn test_webhook_response_format() {
847        let task_id = "test-uuid-1234";
848        let response = format!("{{\"task_id\":\"{}\"}}", task_id);
849        let parsed: serde_json::Value = serde_json::from_str(&response).unwrap();
850        assert_eq!(parsed["task_id"], task_id);
851    }
852
853    #[tokio::test]
854    async fn test_send_http_response() {
855        let (mut a, mut b) = tokio::io::duplex(1024);
856
857        tokio::spawn(async move {
858            send_http_response(&mut a, "200 OK", b"{\"status\":\"ok\"}").await;
859        });
860
861        use tokio::io::AsyncReadExt;
862        let mut buf = vec![0u8; 512];
863        let n = b.read(&mut buf).await.unwrap();
864        let response = String::from_utf8_lossy(&buf[..n]);
865
866        assert!(response.contains("200 OK"));
867        assert!(response.contains("{\"status\":\"ok\"}"));
868    }
869
870    #[tokio::test]
871    async fn test_trigger_config_serialization() {
872        let config = TriggerConfig {
873            name: "test".to_string(),
874            prompt: "test prompt".to_string(),
875            system_prompt: Some("system".to_string()),
876            trigger: TriggerType::Cron {
877                expression: "0 * * * * *".to_string(),
878            },
879        };
880
881        let json = serde_json::to_string_pretty(&config).unwrap();
882        let deserialized: TriggerConfig = serde_json::from_str(&json).unwrap();
883
884        assert_eq!(deserialized.name, "test");
885        assert_eq!(deserialized.prompt, "test prompt");
886        assert_eq!(deserialized.system_prompt, Some("system".to_string()));
887        match &deserialized.trigger {
888            TriggerType::Cron { expression } => {
889                assert_eq!(expression, "0 * * * * *");
890            }
891            _ => panic!("Expected Cron trigger"),
892        }
893    }
894
895    #[tokio::test]
896    async fn test_webhook_trigger_serialization() {
897        let config = TriggerConfig {
898            name: "gh".to_string(),
899            prompt: "process".to_string(),
900            system_prompt: None,
901            trigger: TriggerType::Webhook {
902                secret: Some("s3cret".to_string()),
903            },
904        };
905
906        let json = serde_json::to_string_pretty(&config).unwrap();
907        let deserialized: TriggerConfig = serde_json::from_str(&json).unwrap();
908
909        match &deserialized.trigger {
910            TriggerType::Webhook { secret } => {
911                assert_eq!(secret.as_deref(), Some("s3cret"));
912            }
913            _ => panic!("Expected Webhook trigger"),
914        }
915    }
916
917    #[tokio::test]
918    async fn test_watch_trigger_serialization() {
919        let config = TriggerConfig {
920            name: "fw".to_string(),
921            prompt: "file changed".to_string(),
922            system_prompt: None,
923            trigger: TriggerType::Watch {
924                path: "/tmp".to_string(),
925                events: vec!["modify".to_string()],
926                debounce_secs: 5,
927            },
928        };
929
930        let json = serde_json::to_string_pretty(&config).unwrap();
931        let deserialized: TriggerConfig = serde_json::from_str(&json).unwrap();
932
933        match &deserialized.trigger {
934            TriggerType::Watch {
935                path,
936                events,
937                debounce_secs,
938            } => {
939                assert_eq!(path, "/tmp");
940                assert_eq!(events, &vec!["modify".to_string()]);
941                assert_eq!(*debounce_secs, 5);
942            }
943            _ => panic!("Expected Watch trigger"),
944        }
945    }
946
947    #[tokio::test]
948    async fn test_default_watch_events() {
949        let events = default_watch_events();
950        assert_eq!(events, vec!["modify".to_string()]);
951    }
952
953    #[tokio::test]
954    async fn test_default_debounce_secs() {
955        assert_eq!(default_debounce_secs(), 5);
956    }
957}