Skip to main content

camel_core/hot_reload/adapters/
reload_watcher.rs

1//! File-watching hot-reload loop.
2//!
3//! Watches YAML/JSON route files for changes and triggers pipeline reloads.
4//!
5//! # Circular dependency avoidance
6//!
7//! `camel-core` cannot depend on `camel-dsl` (which already depends on
8//! `camel-core`). Route discovery is therefore injected via a closure:
9//!
10//! ```ignore
11//! watch_and_reload(dirs, controller, || discover_routes(&patterns)).await?;
12//! ```
13//!
14//! This keeps `camel-core` decoupled while letting `camel-cli` (which has
15//! access to both) wire the pieces together.
16
17use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::reload::FunctionReloadContext;
29use crate::hot_reload::application::{
30    compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
31};
32use crate::lifecycle::application::route_definition::RouteDefinition;
33
34pub struct ReloadWatcher;
35
36impl ReloadWatcher {
37    pub async fn watch_and_reload<F>(
38        watch_dirs: Vec<PathBuf>,
39        controller: RuntimeExecutionHandle,
40        discover_fn: F,
41        shutdown: Option<CancellationToken>,
42        drain_timeout: Duration,
43        debounce: Duration,
44    ) -> Result<(), CamelError>
45    where
46        F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
47    {
48        watch_and_reload(
49            watch_dirs,
50            controller,
51            discover_fn,
52            shutdown,
53            drain_timeout,
54            debounce,
55        )
56        .await
57    }
58
59    pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
60        resolve_watch_dirs(patterns)
61    }
62}
63
64/// Watch directories and reload pipelines when YAML files change.
65///
66/// * `watch_dirs` — directories to monitor (non-recursive watch on each).
67/// * `controller` — live route controller to apply reload actions to.
68/// * `discover_fn` — called after each debounced file-change event to load
69///   the current set of route definitions from disk.
70/// * `shutdown` — optional token; cancel it to stop the watcher gracefully.
71///
72/// The function runs indefinitely until the channel is closed, the shutdown
73/// token is cancelled, or a fatal initialisation error occurs.
74///
75/// # Errors
76///
77/// Returns `Err` only on fatal initialisation failure (watcher cannot be
78/// created, or a directory cannot be watched). Per-route reload errors are
79/// logged as warnings and do not terminate the loop.
80pub async fn watch_and_reload<F>(
81    watch_dirs: Vec<PathBuf>,
82    controller: RuntimeExecutionHandle,
83    discover_fn: F,
84    shutdown: Option<CancellationToken>,
85    drain_timeout: Duration,
86    debounce: Duration,
87) -> Result<(), CamelError>
88where
89    F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
90{
91    let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
92
93    let mut watcher = RecommendedWatcher::new(
94        move |res| {
95            let _ = tx.blocking_send(res);
96        },
97        notify::Config::default(),
98    )
99    .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
100
101    if watch_dirs.is_empty() {
102        tracing::warn!("hot-reload: no directories to watch");
103    }
104
105    for dir in &watch_dirs {
106        watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
107            CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
108        })?;
109        tracing::info!("hot-reload: watching {:?}", dir);
110    }
111
112    // Debounce duration to coalesce rapid successive file events from editors
113    // (e.g., when saving a file, multiple events may fire in quick succession).
114    // debounce duration is passed by the caller (configured via CamelConfig.watch_debounce_ms)
115
116    // Helper: check if shutdown was requested
117    let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
118
119    loop {
120        if is_cancelled() {
121            tracing::info!("hot-reload: shutdown requested — stopping watcher");
122            return Ok(());
123        }
124
125        // Wait for the first relevant event.
126        let triggered = loop {
127            // Select between an incoming event and a cancellation signal.
128            let recv_fut = rx.recv();
129            if let Some(token) = shutdown.as_ref() {
130                tokio::select! {
131                    biased;
132                    _ = token.cancelled() => {
133                        tracing::info!("hot-reload: shutdown requested — stopping watcher");
134                        return Ok(());
135                    }
136                    msg = recv_fut => {
137                        match msg {
138                            None => return Ok(()), // channel closed
139                            Some(Err(e)) => {
140                                tracing::warn!("hot-reload: watcher error: {e}");
141                                continue;
142                            }
143                            Some(Ok(event)) => {
144                                if is_reload_event(&event) {
145                                    break true;
146                                }
147                            }
148                        }
149                    }
150                }
151            } else {
152                match recv_fut.await {
153                    None => return Ok(()), // channel closed — watcher dropped
154                    Some(Err(e)) => {
155                        tracing::warn!("hot-reload: watcher error: {e}");
156                        continue;
157                    }
158                    Some(Ok(event)) => {
159                        if is_reload_event(&event) {
160                            break true;
161                        }
162                    }
163                }
164            }
165        };
166
167        if !triggered {
168            continue;
169        }
170
171        // Drain further events within the debounce window.
172        let deadline = tokio::time::Instant::now() + debounce;
173        loop {
174            match tokio::time::timeout_at(deadline, rx.recv()).await {
175                Ok(Some(_)) => continue,   // consume
176                Ok(None) => return Ok(()), // channel closed
177                Err(_elapsed) => break,    // debounce window expired
178            }
179        }
180        tracing::info!("hot-reload: file change detected — reloading routes");
181
182        // Discover new definitions from disk.
183        let new_defs = match discover_fn() {
184            Ok(defs) => defs,
185            Err(e) => {
186                tracing::warn!("hot-reload: route discovery failed: {e}");
187                continue;
188            }
189        };
190
191        // Compute the diff from runtime projection route IDs only.
192        let runtime_route_ids = match controller.runtime_route_ids().await {
193            Ok(route_ids) => route_ids,
194            Err(err) => {
195                tracing::warn!(
196                    error = %err,
197                    "hot-reload: failed to list runtime routes; skipping this reload cycle"
198                );
199                continue;
200            }
201        };
202        let mut runtime_hashes: std::collections::HashMap<String, u64> =
203            std::collections::HashMap::new();
204        for id in &runtime_route_ids {
205            if let Some(hash) = controller.route_source_hash(id).await {
206                runtime_hashes.insert(id.clone(), hash);
207            }
208        }
209
210        let actions = compute_reload_actions_from_runtime_snapshot(
211            &new_defs,
212            &runtime_route_ids,
213            &|route_id: &str| runtime_hashes.get(route_id).copied(),
214        );
215
216        if actions.is_empty() {
217            tracing::debug!("hot-reload: no route changes detected");
218            continue;
219        }
220
221        tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
222
223        let function_ctx = controller.function_invoker().map(|invoker| {
224            let generation = invoker.begin_reload();
225            FunctionReloadContext {
226                invoker,
227                generation,
228            }
229        });
230        let errors = execute_reload_actions(
231            actions,
232            new_defs,
233            &controller,
234            drain_timeout,
235            function_ctx.as_ref(),
236        )
237        .await;
238        for err in &errors {
239            tracing::warn!(
240                "hot-reload: error on route '{}' ({}): {}",
241                err.route_id,
242                err.action,
243                err.error
244            );
245        }
246    }
247}
248
249/// Returns `true` if this notify event should trigger a reload.
250///
251/// Only events affecting `.yaml`, `.yml`, or `.json` files are considered, to avoid
252/// triggering reloads on editor swap files (`.swp`, `~`, `.tmp`, etc.).
253fn is_reload_event(event: &Event) -> bool {
254    let has_yaml = event.paths.iter().any(|p| {
255        p.extension()
256            .map(|e| e == "yaml" || e == "yml" || e == "json")
257            .unwrap_or(false)
258    });
259    has_yaml
260        && matches!(
261            event.kind,
262            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
263        )
264}
265
266/// Extract unique existing parent directories from glob patterns.
267///
268/// Given patterns like `"routes/*.yaml"` or `"/abs/**/*.yaml"`, walks up
269/// from each path until the first non-glob segment and collects the
270/// resulting existing directories.
271pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
272    let mut dirs: HashSet<PathBuf> = HashSet::new();
273
274    for pattern in patterns {
275        let path = Path::new(pattern.as_str());
276
277        let dir = if path.is_dir() {
278            path.to_path_buf()
279        } else {
280            // Walk toward root until no glob characters remain.
281            let mut candidate = path;
282            loop {
283                let s = candidate.to_string_lossy();
284                if s.contains('*') || s.contains('?') || s.contains('[') {
285                    match candidate.parent() {
286                        Some(p) => candidate = p,
287                        None => break,
288                    }
289                } else {
290                    break;
291                }
292            }
293            candidate.to_path_buf()
294        };
295
296        if dir.as_os_str().is_empty() {
297            // Relative pattern with no parent — watch current dir
298            dirs.insert(PathBuf::from("."));
299        } else if dir.exists() {
300            dirs.insert(dir);
301        }
302    }
303
304    dirs.into_iter().collect()
305}
306
307#[cfg(test)]
308mod tests {
309    use std::fs;
310    use std::path::PathBuf;
311    use std::time::{Duration, SystemTime, UNIX_EPOCH};
312
313    use notify::{Event, EventKind};
314    use tokio_util::sync::CancellationToken;
315
316    use crate::CamelContext;
317
318    use super::{ReloadWatcher, is_reload_event, resolve_watch_dirs, watch_and_reload};
319
320    fn make_event(paths: &[&str], kind: EventKind) -> Event {
321        Event {
322            kind,
323            paths: paths.iter().map(PathBuf::from).collect(),
324            attrs: Default::default(),
325        }
326    }
327
328    #[test]
329    fn is_reload_event_accepts_yaml() {
330        let ev = make_event(
331            &["routes/test.yaml"],
332            EventKind::Create(notify::event::CreateKind::File),
333        );
334        assert!(is_reload_event(&ev));
335    }
336
337    #[test]
338    fn is_reload_event_accepts_yml() {
339        let ev = make_event(
340            &["routes/test.yml"],
341            EventKind::Modify(notify::event::ModifyKind::Data(
342                notify::event::DataChange::Content,
343            )),
344        );
345        assert!(is_reload_event(&ev));
346    }
347
348    #[test]
349    fn is_reload_event_accepts_json() {
350        let ev = make_event(
351            &["routes/test.json"],
352            EventKind::Create(notify::event::CreateKind::File),
353        );
354        assert!(is_reload_event(&ev));
355    }
356
357    #[test]
358    fn is_reload_event_rejects_other_extensions() {
359        let ev = make_event(
360            &["routes/test.toml"],
361            EventKind::Create(notify::event::CreateKind::File),
362        );
363        assert!(!is_reload_event(&ev));
364    }
365
366    #[test]
367    fn is_reload_event_rejects_swap_files() {
368        let ev = make_event(
369            &["routes/.test.yaml.swp"],
370            EventKind::Create(notify::event::CreateKind::File),
371        );
372        assert!(!is_reload_event(&ev));
373    }
374
375    #[test]
376    fn is_reload_event_accepts_remove_kind() {
377        let ev = make_event(
378            &["routes/test.yaml"],
379            EventKind::Remove(notify::event::RemoveKind::File),
380        );
381        assert!(is_reload_event(&ev));
382    }
383
384    #[test]
385    fn is_reload_event_rejects_non_reload_kind_even_with_yaml() {
386        let ev = make_event(
387            &["routes/test.yaml"],
388            EventKind::Access(notify::event::AccessKind::Read),
389        );
390        assert!(!is_reload_event(&ev));
391    }
392
393    #[test]
394    fn is_reload_event_accepts_when_any_path_matches() {
395        let ev = make_event(
396            &["routes/test.tmp", "routes/real.json"],
397            EventKind::Modify(notify::event::ModifyKind::Data(
398                notify::event::DataChange::Content,
399            )),
400        );
401        assert!(is_reload_event(&ev));
402    }
403
404    #[test]
405    fn resolve_watch_dirs_returns_existing_dir_pattern_and_ignores_missing() {
406        let root = std::env::temp_dir().join(format!(
407            "camel-reload-watch-{}",
408            SystemTime::now()
409                .duration_since(UNIX_EPOCH)
410                .expect("system clock before unix epoch")
411                .as_nanos()
412        ));
413        let existing = root.join("routes");
414        fs::create_dir_all(&existing).expect("create temp dir");
415
416        let patterns = vec![
417            existing.to_string_lossy().to_string(),
418            root.join("missing").to_string_lossy().to_string(),
419        ];
420
421        let dirs = resolve_watch_dirs(&patterns);
422
423        assert!(dirs.contains(&existing));
424        assert_eq!(dirs.len(), 1);
425
426        fs::remove_dir_all(&root).expect("cleanup temp dir");
427    }
428
429    #[test]
430    fn resolve_watch_dirs_walks_up_glob_segments() {
431        let root = std::env::temp_dir().join(format!(
432            "camel-reload-watch-glob-{}",
433            SystemTime::now()
434                .duration_since(UNIX_EPOCH)
435                .expect("system clock before unix epoch")
436                .as_nanos()
437        ));
438        let nested = root.join("a").join("b");
439        fs::create_dir_all(&nested).expect("create nested temp dir");
440
441        let pattern = nested
442            .join("**")
443            .join("*.yaml")
444            .to_string_lossy()
445            .to_string();
446
447        let dirs = resolve_watch_dirs(&[pattern]);
448
449        assert!(dirs.contains(&nested));
450
451        fs::remove_dir_all(&root).expect("cleanup temp dir");
452    }
453
454    #[test]
455    fn resolve_watch_dirs_uses_current_dir_for_relative_no_parent_pattern() {
456        let dirs = resolve_watch_dirs(&["*.yaml".to_string()]);
457        assert_eq!(dirs, vec![PathBuf::from(".")]);
458    }
459
460    #[tokio::test]
461    async fn watch_and_reload_returns_immediately_when_shutdown_is_cancelled() {
462        let token = CancellationToken::new();
463        token.cancel();
464
465        let ctx = CamelContext::builder().build().await.unwrap();
466        let result = watch_and_reload(
467            vec![],
468            ctx.runtime_execution_handle(),
469            || Ok(vec![]),
470            Some(token),
471            Duration::from_millis(1),
472            Duration::from_millis(1),
473        )
474        .await;
475
476        assert!(result.is_ok());
477    }
478
479    #[test]
480    fn is_reload_event_rejects_no_extension() {
481        let ev = make_event(
482            &["routes/Makefile"],
483            EventKind::Modify(notify::event::ModifyKind::Data(
484                notify::event::DataChange::Content,
485            )),
486        );
487        assert!(!is_reload_event(&ev));
488    }
489
490    #[test]
491    fn is_reload_event_rejects_any_event_with_no_matching_kind() {
492        let ev = make_event(&["routes/test.yaml"], EventKind::Other);
493        assert!(!is_reload_event(&ev));
494    }
495
496    #[test]
497    fn is_reload_event_rejects_empty_paths() {
498        let ev = Event {
499            kind: EventKind::Create(notify::event::CreateKind::File),
500            paths: vec![],
501            attrs: Default::default(),
502        };
503        assert!(!is_reload_event(&ev));
504    }
505
506    #[test]
507    fn resolve_watch_dirs_deduplicates_same_directory() {
508        let root = std::env::temp_dir().join(format!(
509            "camel-reload-watch-dedup-{}",
510            SystemTime::now()
511                .duration_since(UNIX_EPOCH)
512                .expect("system clock before unix epoch")
513                .as_nanos()
514        ));
515        fs::create_dir_all(&root).expect("create temp dir");
516
517        let patterns = vec![
518            root.join("*.yaml").to_string_lossy().to_string(),
519            root.join("*.yml").to_string_lossy().to_string(),
520        ];
521
522        let dirs = resolve_watch_dirs(&patterns);
523        assert_eq!(dirs.len(), 1);
524        assert!(dirs.contains(&root));
525
526        fs::remove_dir_all(&root).expect("cleanup temp dir");
527    }
528
529    #[test]
530    fn resolve_watch_dirs_handles_absolute_glob_pattern() {
531        let root = std::env::temp_dir().join(format!(
532            "camel-reload-watch-abs-{}",
533            SystemTime::now()
534                .duration_since(UNIX_EPOCH)
535                .expect("system clock before unix epoch")
536                .as_nanos()
537        ));
538        let nested = root.join("routes").join("sub");
539        fs::create_dir_all(&nested).expect("create nested temp dir");
540
541        let pattern = root
542            .join("routes")
543            .join("**")
544            .join("*.yaml")
545            .to_string_lossy()
546            .to_string();
547
548        let dirs = resolve_watch_dirs(&[pattern]);
549        assert!(dirs.iter().any(|d| d.starts_with(root.join("routes"))));
550
551        fs::remove_dir_all(&root).expect("cleanup temp dir");
552    }
553
554    #[test]
555    fn resolve_watch_dirs_empty_patterns_returns_empty() {
556        let dirs = resolve_watch_dirs(&[]);
557        assert!(dirs.is_empty());
558    }
559
560    #[test]
561    fn resolve_watch_dirs_ignores_nonexistent_parent_of_glob() {
562        let dirs = resolve_watch_dirs(&["/nonexistent/path/**/*.yaml".to_string()]);
563        assert!(dirs.is_empty());
564    }
565
566    #[test]
567    fn reload_watcher_struct_is_unit() {
568        let _ = ReloadWatcher;
569    }
570}