1use crate::background::BackgroundTaskManager;
22use crate::config::SchedulerConfig;
23use crate::error::{RavenClawsError, Result};
24use serde::{Deserialize, Serialize};
25use std::path::PathBuf;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28use tracing::{debug, error, info, instrument, warn};
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct TriggerConfig {
35 pub name: String,
37 pub prompt: String,
39 #[serde(default)]
41 pub system_prompt: Option<String>,
42 pub trigger: TriggerType,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48#[serde(tag = "type")]
49pub enum TriggerType {
50 #[serde(rename = "cron")]
52 Cron {
53 expression: String,
55 },
56 #[serde(rename = "webhook")]
58 Webhook {
59 #[serde(default)]
61 secret: Option<String>,
62 },
63 #[serde(rename = "watch")]
65 Watch {
66 path: String,
68 #[serde(default = "default_watch_events")]
70 events: Vec<String>,
71 #[serde(default = "default_debounce_secs")]
73 debounce_secs: u64,
74 },
75}
76
77fn default_watch_events() -> Vec<String> {
78 vec!["modify".to_string()]
79}
80
81fn default_debounce_secs() -> u64 {
82 5
83}
84
85#[derive(Clone)]
92pub struct Scheduler {
93 bg_manager: BackgroundTaskManager,
95 triggers: Vec<TriggerConfig>,
97 running: Arc<RwLock<bool>>,
99 webhook_port: u16,
101}
102
103impl Scheduler {
104 pub fn new(bg_manager: BackgroundTaskManager, config: &SchedulerConfig) -> Self {
106 Self {
107 bg_manager,
108 triggers: config.triggers.clone(),
109 running: Arc::new(RwLock::new(false)),
110 webhook_port: 9090,
111 }
112 }
113
114 pub fn set_webhook_port(&mut self, port: u16) {
116 self.webhook_port = port;
117 }
118
119 #[instrument(skip(self))]
126 pub async fn start(&self) -> Result<()> {
127 {
128 let mut running = self.running.write().await;
129 if *running {
130 warn!("Scheduler is already running");
131 return Ok(());
132 }
133 *running = true;
134 }
135
136 let trigger_count = self.triggers.len();
137 if trigger_count == 0 {
138 info!("No triggers configured, scheduler idle");
139 return Ok(());
140 }
141
142 info!(count = trigger_count, "Starting scheduler with triggers");
143
144 for trigger in &self.triggers {
146 match &trigger.trigger {
147 TriggerType::Cron { expression } => {
148 let _schedule = expression.parse::<cron::Schedule>().map_err(|e| {
149 RavenClawsError::CommandExecution(format!(
150 "Invalid cron expression '{}': {}",
151 expression, e
152 ))
153 })?;
154 info!(
155 name = %trigger.name,
156 expression = %expression,
157 "Registered cron trigger"
158 );
159 }
160 TriggerType::Webhook { secret } => {
161 let _has_secret = secret.is_some();
162 info!(
163 name = %trigger.name,
164 has_secret = _has_secret,
165 "Registered webhook trigger"
166 );
167 }
168 TriggerType::Watch {
169 path,
170 events,
171 debounce_secs,
172 } => {
173 let path_buf = PathBuf::from(path);
174 if !path_buf.exists() {
175 warn!(
176 name = %trigger.name,
177 path = %path,
178 "Watch path does not exist yet, will retry on start"
179 );
180 }
181 info!(
182 name = %trigger.name,
183 path = %path,
184 events = ?events,
185 debounce_secs = debounce_secs,
186 "Registered file watch trigger"
187 );
188 }
189 }
190 }
191
192 for trigger in &self.triggers {
194 if let TriggerType::Cron { expression } = &trigger.trigger {
195 let schedule = expression.parse::<cron::Schedule>().map_err(|e| {
196 RavenClawsError::CommandExecution(format!(
197 "Invalid cron expression '{}': {}",
198 expression, e
199 ))
200 })?;
201 let bg = self.bg_manager.clone();
202 let name = trigger.name.clone();
203 let prompt = trigger.prompt.clone();
204 let system_prompt = trigger.system_prompt.clone().unwrap_or_default();
205
206 tokio::spawn(async move {
207 run_cron_trigger(name, schedule, bg, prompt, system_prompt).await;
208 });
209 }
210 }
211
212 let has_webhooks = self
214 .triggers
215 .iter()
216 .any(|t| matches!(t.trigger, TriggerType::Webhook { .. }));
217
218 if has_webhooks {
219 let webhook_triggers: Vec<(String, String, Option<String>)> = self
220 .triggers
221 .iter()
222 .filter_map(|t| {
223 if let TriggerType::Webhook { secret } = &t.trigger {
224 Some((t.name.clone(), t.prompt.clone(), secret.clone()))
225 } else {
226 None
227 }
228 })
229 .collect();
230
231 let bg = self.bg_manager.clone();
232 let port = self.webhook_port;
233
234 tokio::spawn(async move {
235 run_webhook_server(port, webhook_triggers, bg).await;
236 });
237 }
238
239 for trigger in &self.triggers {
241 if let TriggerType::Watch {
242 path,
243 events,
244 debounce_secs,
245 } = &trigger.trigger
246 {
247 let bg = self.bg_manager.clone();
248 let name = trigger.name.clone();
249 let prompt = trigger.prompt.clone();
250 let system_prompt = trigger.system_prompt.clone().unwrap_or_default();
251 let watch_path = path.clone();
252 let watch_events = events.clone();
253 let debounce = *debounce_secs;
254
255 tokio::spawn(async move {
256 run_watch_trigger(
257 name,
258 watch_path,
259 watch_events,
260 debounce,
261 bg,
262 prompt,
263 system_prompt,
264 )
265 .await;
266 });
267 }
268 }
269
270 Ok(())
271 }
272
273 pub async fn stop(&self) {
275 let mut running = self.running.write().await;
276 *running = false;
277 info!("Scheduler stopped");
278 }
279
280 #[allow(dead_code)]
282 pub async fn is_running(&self) -> bool {
283 *self.running.read().await
284 }
285}
286
287#[instrument(skip(bg_manager, schedule), fields(trigger_name = %name))]
291async fn run_cron_trigger(
292 name: String,
293 schedule: cron::Schedule,
294 bg_manager: BackgroundTaskManager,
295 prompt: String,
296 system_prompt: String,
297) {
298 info!(trigger = %name, "Cron trigger started");
299
300 for next in schedule.upcoming(chrono::Utc) {
301 let now = chrono::Utc::now();
302 let delay = (next - now).to_std().unwrap_or(std::time::Duration::ZERO);
303
304 if delay > std::time::Duration::ZERO {
305 debug!(
306 trigger = %name,
307 next_run = %next,
308 delay_ms = delay.as_millis(),
309 "Sleeping until next cron trigger"
310 );
311 tokio::time::sleep(delay).await;
312 }
313
314 debug!(trigger = %name, "Cron trigger firing");
315 match bg_manager
316 .submit(prompt.clone(), system_prompt.clone())
317 .await
318 {
319 Ok(task_id) => {
320 info!(
321 trigger = %name,
322 task_id = %task_id,
323 "Cron trigger submitted background task"
324 );
325 }
326 Err(e) => {
327 error!(
328 trigger = %name,
329 error = %e,
330 "Cron trigger failed to submit task"
331 );
332 }
333 }
334 }
335}
336
337#[instrument(skip(triggers, bg_manager))]
341async fn run_webhook_server(
342 port: u16,
343 triggers: Vec<(String, String, Option<String>)>,
344 bg_manager: BackgroundTaskManager,
345) {
346 let bind_addr = format!("127.0.0.1:{}", port);
347 let listener = match tokio::net::TcpListener::bind(&bind_addr).await {
348 Ok(l) => {
349 info!(
350 address = %bind_addr,
351 trigger_count = triggers.len(),
352 "Webhook server started"
353 );
354 l
355 }
356 Err(e) => {
357 error!(
358 address = %bind_addr,
359 error = %e,
360 "Failed to start webhook server"
361 );
362 return;
363 }
364 };
365
366 let triggers = Arc::new(triggers);
367
368 loop {
369 match listener.accept().await {
370 Ok((stream, peer)) => {
371 let triggers = Arc::clone(&triggers);
372 let bg = bg_manager.clone();
373 tokio::spawn(async move {
374 handle_webhook_connection(stream, peer, triggers, bg).await;
375 });
376 }
377 Err(e) => {
378 warn!(error = %e, "Webhook server accept error");
379 }
380 }
381 }
382}
383
384async fn handle_webhook_connection(
386 mut stream: tokio::net::TcpStream,
387 peer: std::net::SocketAddr,
388 triggers: Arc<Vec<(String, String, Option<String>)>>,
389 bg_manager: BackgroundTaskManager,
390) {
391 use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
392
393 let mut reader = BufReader::new(&mut stream);
394 let mut request_line = String::new();
395
396 if reader.read_line(&mut request_line).await.is_err() {
397 return;
398 }
399
400 let request_line = request_line.trim();
401 if request_line.is_empty() {
402 return;
403 }
404
405 let parts: Vec<&str> = request_line.split_whitespace().collect();
407 if parts.len() < 2 {
408 send_http_response(&mut stream, "400 Bad Request", b"Bad Request").await;
409 return;
410 }
411
412 let method = parts[0];
413 let path = parts[1];
414
415 if method != "POST" {
417 send_http_response(&mut stream, "405 Method Not Allowed", b"Method Not Allowed").await;
418 return;
419 }
420
421 let path_parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
423 if path_parts.len() < 2 || path_parts[0] != "webhook" {
424 send_http_response(&mut stream, "404 Not Found", b"Not Found").await;
425 return;
426 }
427
428 let trigger_name = path_parts[1];
429
430 let mut header_line = String::new();
432 let mut content_length: usize = 0;
433 loop {
434 header_line.clear();
435 if reader.read_line(&mut header_line).await.is_err() {
436 return;
437 }
438 let line = header_line.trim();
439 if line.is_empty() {
440 break;
441 }
442 if let Some(len_str) = line.strip_prefix("Content-Length:") {
443 content_length = len_str.trim().parse().unwrap_or(0);
444 }
445 }
446
447 let mut body = vec![0u8; content_length];
449 if content_length > 0 && reader.read_exact(&mut body).await.is_err() {
450 send_http_response(&mut stream, "400 Bad Request", b"Bad Request").await;
451 return;
452 }
453
454 let matched = triggers.iter().find(|(name, _, _)| name == trigger_name);
456
457 if let Some((_name, prompt, _secret)) = matched {
458 let webhook_body = String::from_utf8_lossy(&body);
460 let full_prompt = format!("{}\n\nWebhook payload:\n{}", prompt, webhook_body);
461
462 match bg_manager.submit(full_prompt, String::new()).await {
463 Ok(task_id) => {
464 info!(
465 trigger = %_name,
466 task_id = %task_id,
467 peer = %peer,
468 "Webhook trigger submitted background task"
469 );
470 send_http_response(
471 &mut stream,
472 "200 OK",
473 format!("{{\"task_id\":\"{}\"}}", task_id).as_bytes(),
474 )
475 .await;
476 }
477 Err(e) => {
478 error!(
479 trigger = %_name,
480 error = %e,
481 "Webhook trigger failed to submit task"
482 );
483 send_http_response(
484 &mut stream,
485 "500 Internal Server Error",
486 b"Internal Server Error",
487 )
488 .await;
489 }
490 }
491 } else {
492 send_http_response(&mut stream, "404 Not Found", b"Trigger Not Found").await;
493 }
494}
495
496async fn send_http_response(
498 stream: &mut (impl tokio::io::AsyncWrite + Unpin),
499 status: &str,
500 body: &[u8],
501) {
502 use tokio::io::AsyncWriteExt;
503
504 let response = format!(
505 "HTTP/1.1 {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
506 status,
507 body.len(),
508 );
509
510 if let Err(e) = stream.write_all(response.as_bytes()).await {
511 warn!(error = %e, "Failed to write webhook response headers");
512 return;
513 }
514 if let Err(e) = stream.write_all(body).await {
515 warn!(error = %e, "Failed to write webhook response body");
516 return;
517 }
518 if let Err(e) = stream.flush().await {
519 warn!(error = %e, "Failed to flush webhook response");
520 }
521}
522
523#[instrument(skip(bg_manager), fields(trigger_name = %name, path = %watch_path))]
527async fn run_watch_trigger(
528 name: String,
529 watch_path: String,
530 events: Vec<String>,
531 debounce_secs: u64,
532 bg_manager: BackgroundTaskManager,
533 prompt: String,
534 system_prompt: String,
535) {
536 use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
537 use std::sync::mpsc;
538
539 info!(
540 trigger = %name,
541 path = %watch_path,
542 "File watch trigger started"
543 );
544
545 let (tx, rx) = mpsc::channel::<notify::Result<Event>>();
547
548 let mut watcher = match RecommendedWatcher::new(tx, Config::default()) {
549 Ok(w) => w,
550 Err(e) => {
551 error!(
552 trigger = %name,
553 error = %e,
554 "Failed to create file watcher"
555 );
556 return;
557 }
558 };
559
560 let path = PathBuf::from(&watch_path);
562 if let Err(e) = watcher.watch(&path, RecursiveMode::NonRecursive) {
563 error!(
564 trigger = %name,
565 path = %watch_path,
566 error = %e,
567 "Failed to watch path"
568 );
569 return;
570 }
571
572 let debounce_duration = std::time::Duration::from_secs(debounce_secs);
574 let mut last_trigger_time: Option<std::time::Instant> = None;
575
576 for event in rx {
578 match event {
579 Ok(event) => {
580 let should_trigger = match &event.kind {
581 EventKind::Create(_) => events.contains(&"create".to_string()),
582 EventKind::Modify(_) => events.contains(&"modify".to_string()),
583 EventKind::Remove(_) => events.contains(&"delete".to_string()),
584 _ => false,
585 };
586
587 if should_trigger {
588 let now = std::time::Instant::now();
589 let should_fire = match last_trigger_time {
590 Some(last) => now.duration_since(last) >= debounce_duration,
591 None => true,
592 };
593
594 if should_fire {
595 last_trigger_time = Some(now);
596 debug!(
597 trigger = %name,
598 event = ?event.kind,
599 paths = ?event.paths,
600 "File watch trigger firing"
601 );
602
603 match bg_manager
604 .submit(prompt.clone(), system_prompt.clone())
605 .await
606 {
607 Ok(task_id) => {
608 info!(
609 trigger = %name,
610 task_id = %task_id,
611 "File watch trigger submitted background task"
612 );
613 }
614 Err(e) => {
615 error!(
616 trigger = %name,
617 error = %e,
618 "File watch trigger failed to submit task"
619 );
620 }
621 }
622 }
623 }
624 }
625 Err(e) => {
626 warn!(
627 trigger = %name,
628 error = %e,
629 "File watch error"
630 );
631 }
632 }
633 }
634}
635
636#[cfg(test)]
639mod tests {
640 use super::*;
641 use crate::background::BackgroundTaskManager;
642 use std::path::PathBuf;
643
644 fn test_dir(name: &str) -> PathBuf {
645 let dir = std::env::temp_dir().join(format!(
646 "ravenclaws-test-sched-{}-{}",
647 name,
648 std::process::id()
649 ));
650 let _ = std::fs::remove_dir_all(&dir);
651 dir
652 }
653
654 #[tokio::test]
655 async fn test_trigger_config_cron() {
656 let config = TriggerConfig {
657 name: "hourly".to_string(),
658 prompt: "Run hourly check".to_string(),
659 system_prompt: None,
660 trigger: TriggerType::Cron {
661 expression: "0 * * * * *".to_string(),
662 },
663 };
664
665 assert_eq!(config.name, "hourly");
666 match &config.trigger {
667 TriggerType::Cron { expression } => {
668 assert_eq!(expression, "0 * * * * *");
669 }
670 _ => panic!("Expected Cron trigger"),
671 }
672 }
673
674 #[tokio::test]
675 async fn test_trigger_config_webhook() {
676 let config = TriggerConfig {
677 name: "github-webhook".to_string(),
678 prompt: "Process GitHub event".to_string(),
679 system_prompt: None,
680 trigger: TriggerType::Webhook {
681 secret: Some("mysecret".to_string()),
682 },
683 };
684
685 assert_eq!(config.name, "github-webhook");
686 match &config.trigger {
687 TriggerType::Webhook { secret } => {
688 assert_eq!(secret.as_deref(), Some("mysecret"));
689 }
690 _ => panic!("Expected Webhook trigger"),
691 }
692 }
693
694 #[tokio::test]
695 async fn test_trigger_config_watch() {
696 let config = TriggerConfig {
697 name: "config-watch".to_string(),
698 prompt: "Config changed".to_string(),
699 system_prompt: None,
700 trigger: TriggerType::Watch {
701 path: "/etc/config".to_string(),
702 events: vec!["modify".to_string(), "create".to_string()],
703 debounce_secs: 10,
704 },
705 };
706
707 assert_eq!(config.name, "config-watch");
708 match &config.trigger {
709 TriggerType::Watch {
710 path,
711 events,
712 debounce_secs,
713 } => {
714 assert_eq!(path, "/etc/config");
715 assert_eq!(events.len(), 2);
716 assert_eq!(*debounce_secs, 10);
717 }
718 _ => panic!("Expected Watch trigger"),
719 }
720 }
721
722 #[tokio::test]
723 async fn test_scheduler_new() {
724 let dir = test_dir("new");
725 let bg = BackgroundTaskManager::new(&dir).await.unwrap();
726 let config = SchedulerConfig { triggers: vec![] };
727 let scheduler = Scheduler::new(bg, &config);
728
729 assert!(!scheduler.is_running().await);
730 let _ = std::fs::remove_dir_all(&dir);
731 }
732
733 #[tokio::test]
734 async fn test_scheduler_start_stop() {
735 let dir = test_dir("start_stop");
736 let bg = BackgroundTaskManager::new(&dir).await.unwrap();
737 let config = SchedulerConfig { triggers: vec![] };
738 let scheduler = Scheduler::new(bg, &config);
739
740 scheduler.start().await.unwrap();
741 assert!(scheduler.is_running().await);
742
743 scheduler.stop().await;
744 assert!(!scheduler.is_running().await);
745 let _ = std::fs::remove_dir_all(&dir);
746 }
747
748 #[tokio::test]
749 async fn test_cron_expression_parsing() {
750 let expr = "0 */6 * * * *";
752 let schedule = expr.parse::<cron::Schedule>();
753 assert!(schedule.is_ok(), "Valid cron expression should parse");
754
755 let bad_expr = "not-a-cron";
757 let schedule = bad_expr.parse::<cron::Schedule>();
758 assert!(schedule.is_err(), "Invalid cron expression should fail");
759 }
760
761 #[tokio::test]
762 async fn test_cron_schedule_upcoming() {
763 let expr = "0 * * * * *"; let schedule = expr.parse::<cron::Schedule>().unwrap();
765 let now = chrono::Utc::now();
766
767 let mut upcoming = schedule.upcoming(chrono::Utc);
768 let next = upcoming.next();
769 assert!(next.is_some(), "Should have a next scheduled time");
770 assert!(next.unwrap() > now, "Next time should be in the future");
771 }
772
773 #[tokio::test]
774 async fn test_scheduler_with_cron_trigger() {
775 let dir = test_dir("with_cron");
776 let bg = BackgroundTaskManager::new(&dir).await.unwrap();
777
778 let config = SchedulerConfig {
779 triggers: vec![TriggerConfig {
780 name: "test-cron".to_string(),
781 prompt: "Cron test".to_string(),
782 system_prompt: None,
783 trigger: TriggerType::Cron {
784 expression: "0 0 1 1 * *".to_string(), },
786 }],
787 };
788
789 let scheduler = Scheduler::new(bg, &config);
790 scheduler.start().await.unwrap();
791 assert!(scheduler.is_running().await);
792
793 scheduler.stop().await;
794 let _ = std::fs::remove_dir_all(&dir);
795 }
796
797 #[tokio::test]
798 async fn test_scheduler_with_webhook_trigger() {
799 let dir = test_dir("with_webhook");
800 let bg = BackgroundTaskManager::new(&dir).await.unwrap();
801
802 let config = SchedulerConfig {
803 triggers: vec![TriggerConfig {
804 name: "test-webhook".to_string(),
805 prompt: "Webhook test".to_string(),
806 system_prompt: None,
807 trigger: TriggerType::Webhook { secret: None },
808 }],
809 };
810
811 let scheduler = Scheduler::new(bg, &config);
812 scheduler.start().await.unwrap();
813 assert!(scheduler.is_running().await);
814
815 scheduler.stop().await;
816 let _ = std::fs::remove_dir_all(&dir);
817 }
818
819 #[tokio::test]
820 async fn test_scheduler_with_watch_trigger() {
821 let dir = test_dir("with_watch");
822 let bg = BackgroundTaskManager::new(&dir).await.unwrap();
823
824 let config = SchedulerConfig {
825 triggers: vec![TriggerConfig {
826 name: "test-watch".to_string(),
827 prompt: "Watch test".to_string(),
828 system_prompt: None,
829 trigger: TriggerType::Watch {
830 path: dir.to_string_lossy().to_string(),
831 events: vec!["modify".to_string()],
832 debounce_secs: 1,
833 },
834 }],
835 };
836
837 let scheduler = Scheduler::new(bg, &config);
838 scheduler.start().await.unwrap();
839 assert!(scheduler.is_running().await);
840
841 scheduler.stop().await;
842 let _ = std::fs::remove_dir_all(&dir);
843 }
844
845 #[tokio::test]
846 async fn test_webhook_response_format() {
847 let task_id = "test-uuid-1234";
848 let response = format!("{{\"task_id\":\"{}\"}}", task_id);
849 let parsed: serde_json::Value = serde_json::from_str(&response).unwrap();
850 assert_eq!(parsed["task_id"], task_id);
851 }
852
853 #[tokio::test]
854 async fn test_send_http_response() {
855 let (mut a, mut b) = tokio::io::duplex(1024);
856
857 tokio::spawn(async move {
858 send_http_response(&mut a, "200 OK", b"{\"status\":\"ok\"}").await;
859 });
860
861 use tokio::io::AsyncReadExt;
862 let mut buf = vec![0u8; 512];
863 let n = b.read(&mut buf).await.unwrap();
864 let response = String::from_utf8_lossy(&buf[..n]);
865
866 assert!(response.contains("200 OK"));
867 assert!(response.contains("{\"status\":\"ok\"}"));
868 }
869
870 #[tokio::test]
871 async fn test_trigger_config_serialization() {
872 let config = TriggerConfig {
873 name: "test".to_string(),
874 prompt: "test prompt".to_string(),
875 system_prompt: Some("system".to_string()),
876 trigger: TriggerType::Cron {
877 expression: "0 * * * * *".to_string(),
878 },
879 };
880
881 let json = serde_json::to_string_pretty(&config).unwrap();
882 let deserialized: TriggerConfig = serde_json::from_str(&json).unwrap();
883
884 assert_eq!(deserialized.name, "test");
885 assert_eq!(deserialized.prompt, "test prompt");
886 assert_eq!(deserialized.system_prompt, Some("system".to_string()));
887 match &deserialized.trigger {
888 TriggerType::Cron { expression } => {
889 assert_eq!(expression, "0 * * * * *");
890 }
891 _ => panic!("Expected Cron trigger"),
892 }
893 }
894
895 #[tokio::test]
896 async fn test_webhook_trigger_serialization() {
897 let config = TriggerConfig {
898 name: "gh".to_string(),
899 prompt: "process".to_string(),
900 system_prompt: None,
901 trigger: TriggerType::Webhook {
902 secret: Some("s3cret".to_string()),
903 },
904 };
905
906 let json = serde_json::to_string_pretty(&config).unwrap();
907 let deserialized: TriggerConfig = serde_json::from_str(&json).unwrap();
908
909 match &deserialized.trigger {
910 TriggerType::Webhook { secret } => {
911 assert_eq!(secret.as_deref(), Some("s3cret"));
912 }
913 _ => panic!("Expected Webhook trigger"),
914 }
915 }
916
917 #[tokio::test]
918 async fn test_watch_trigger_serialization() {
919 let config = TriggerConfig {
920 name: "fw".to_string(),
921 prompt: "file changed".to_string(),
922 system_prompt: None,
923 trigger: TriggerType::Watch {
924 path: "/tmp".to_string(),
925 events: vec!["modify".to_string()],
926 debounce_secs: 5,
927 },
928 };
929
930 let json = serde_json::to_string_pretty(&config).unwrap();
931 let deserialized: TriggerConfig = serde_json::from_str(&json).unwrap();
932
933 match &deserialized.trigger {
934 TriggerType::Watch {
935 path,
936 events,
937 debounce_secs,
938 } => {
939 assert_eq!(path, "/tmp");
940 assert_eq!(events, &vec!["modify".to_string()]);
941 assert_eq!(*debounce_secs, 5);
942 }
943 _ => panic!("Expected Watch trigger"),
944 }
945 }
946
947 #[tokio::test]
948 async fn test_default_watch_events() {
949 let events = default_watch_events();
950 assert_eq!(events, vec!["modify".to_string()]);
951 }
952
953 #[tokio::test]
954 async fn test_default_debounce_secs() {
955 assert_eq!(default_debounce_secs(), 5);
956 }
957}