Skip to main content

camel_core/hot_reload/adapters/
reload_watcher.rs

1//! File-watching hot-reload loop.
2//!
3//! Watches YAML/JSON route files for changes and triggers pipeline reloads.
4//!
5//! # Circular dependency avoidance
6//!
7//! `camel-core` cannot depend on `camel-dsl` (which already depends on
8//! `camel-core`). Route discovery is therefore injected via a closure:
9//!
10//! ```ignore
11//! watch_and_reload(dirs, controller, || discover_routes(&patterns)).await?;
12//! ```
13//!
14//! This keeps `camel-core` decoupled while letting `camel-cli` (which has
15//! access to both) wire the pieces together.
16
17use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::reload::FunctionReloadContext;
29use crate::hot_reload::application::{
30    compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
31};
32use crate::lifecycle::application::route_definition::RouteDefinition;
33
34pub struct ReloadWatcher;
35
36impl ReloadWatcher {
37    pub async fn watch_and_reload<F>(
38        watch_dirs: Vec<PathBuf>,
39        controller: RuntimeExecutionHandle,
40        discover_fn: F,
41        shutdown: Option<CancellationToken>,
42        drain_timeout: Duration,
43        debounce: Duration,
44    ) -> Result<(), CamelError>
45    where
46        F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
47    {
48        watch_and_reload(
49            watch_dirs,
50            controller,
51            discover_fn,
52            shutdown,
53            drain_timeout,
54            debounce,
55        )
56        .await
57    }
58
59    pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
60        resolve_watch_dirs(patterns)
61    }
62}
63
64/// Watch directories and reload pipelines when YAML files change.
65///
66/// * `watch_dirs` — directories to monitor (non-recursive watch on each).
67/// * `controller` — live route controller to apply reload actions to.
68/// * `discover_fn` — called after each debounced file-change event to load
69///   the current set of route definitions from disk.
70/// * `shutdown` — optional token; cancel it to stop the watcher gracefully.
71///
72/// The function runs indefinitely until the channel is closed, the shutdown
73/// token is cancelled, or a fatal initialisation error occurs.
74///
75/// # Errors
76///
77/// Returns `Err` only on fatal initialisation failure (watcher cannot be
78/// created, or a directory cannot be watched). Per-route reload errors are
79/// logged as warnings and do not terminate the loop.
80pub async fn watch_and_reload<F>(
81    watch_dirs: Vec<PathBuf>,
82    controller: RuntimeExecutionHandle,
83    discover_fn: F,
84    shutdown: Option<CancellationToken>,
85    drain_timeout: Duration,
86    debounce: Duration,
87) -> Result<(), CamelError>
88where
89    F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
90{
91    let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
92
93    let mut watcher = RecommendedWatcher::new(
94        move |res| {
95            let _ = tx.blocking_send(res);
96        },
97        notify::Config::default(),
98    )
99    .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
100
101    if watch_dirs.is_empty() {
102        tracing::warn!("hot-reload: no directories to watch");
103    }
104
105    for dir in &watch_dirs {
106        watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
107            CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
108        })?;
109        tracing::info!("hot-reload: watching {:?}", dir);
110    }
111
112    // Debounce duration to coalesce rapid successive file events from editors
113    // (e.g., when saving a file, multiple events may fire in quick succession).
114    // debounce duration is passed by the caller (configured via CamelConfig.watch_debounce_ms)
115
116    // Helper: check if shutdown was requested
117    let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
118
119    loop {
120        if is_cancelled() {
121            tracing::info!("hot-reload: shutdown requested — stopping watcher");
122            return Ok(());
123        }
124
125        // Wait for the first relevant event.
126        let triggered = loop {
127            // Select between an incoming event and a cancellation signal.
128            let recv_fut = rx.recv();
129            if let Some(token) = shutdown.as_ref() {
130                tokio::select! {
131                    biased;
132                    _ = token.cancelled() => {
133                        tracing::info!("hot-reload: shutdown requested — stopping watcher");
134                        return Ok(());
135                    }
136                    msg = recv_fut => {
137                        match msg {
138                            None => return Ok(()), // channel closed
139                            Some(Err(e)) => {
140                                tracing::warn!("hot-reload: watcher error: {e}");
141                                continue;
142                            }
143                            Some(Ok(event)) => {
144                                if is_reload_event(&event) {
145                                    break true;
146                                }
147                            }
148                        }
149                    }
150                }
151            } else {
152                match recv_fut.await {
153                    None => return Ok(()), // channel closed — watcher dropped
154                    Some(Err(e)) => {
155                        tracing::warn!("hot-reload: watcher error: {e}");
156                        continue;
157                    }
158                    Some(Ok(event)) => {
159                        if is_reload_event(&event) {
160                            break true;
161                        }
162                    }
163                }
164            }
165        };
166
167        if !triggered {
168            continue;
169        }
170
171        // Drain further events within the debounce window.
172        let deadline = tokio::time::Instant::now() + debounce;
173        loop {
174            match tokio::time::timeout_at(deadline, rx.recv()).await {
175                Ok(Some(_)) => continue,   // consume
176                Ok(None) => return Ok(()), // channel closed
177                Err(_elapsed) => break,    // debounce window expired
178            }
179        }
180        tracing::info!("hot-reload: file change detected — reloading routes");
181
182        // Discover new definitions from disk.
183        let new_defs = match discover_fn() {
184            Ok(defs) => defs,
185            Err(e) => {
186                tracing::warn!("hot-reload: route discovery failed: {e}");
187                continue;
188            }
189        };
190
191        // Compute the diff from runtime projection route IDs only.
192        let runtime_route_ids = match controller.runtime_route_ids().await {
193            Ok(route_ids) => route_ids,
194            Err(err) => {
195                tracing::warn!(
196                    error = %err,
197                    "hot-reload: failed to list runtime routes; skipping this reload cycle"
198                );
199                continue;
200            }
201        };
202        let mut runtime_hashes: std::collections::HashMap<String, u64> =
203            std::collections::HashMap::new();
204        for id in &runtime_route_ids {
205            if let Some(hash) = controller.route_source_hash(id).await {
206                runtime_hashes.insert(id.clone(), hash);
207            }
208        }
209
210        let actions = compute_reload_actions_from_runtime_snapshot(
211            &new_defs,
212            &runtime_route_ids,
213            &|route_id: &str| runtime_hashes.get(route_id).copied(),
214        );
215
216        if actions.is_empty() {
217            tracing::debug!("hot-reload: no route changes detected");
218            continue;
219        }
220
221        tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
222
223        let function_ctx = controller.function_invoker().map(|invoker| {
224            let generation = invoker.begin_reload();
225            FunctionReloadContext {
226                invoker,
227                generation,
228            }
229        });
230        let errors = execute_reload_actions(
231            actions,
232            new_defs,
233            &controller,
234            drain_timeout,
235            function_ctx.as_ref(),
236        )
237        .await;
238        for err in &errors {
239            tracing::warn!(
240                "hot-reload: error on route '{}' ({}): {}",
241                err.route_id,
242                err.action,
243                err.error
244            );
245        }
246    }
247}
248
249/// Returns `true` if this notify event should trigger a reload.
250///
251/// Only events affecting `.yaml`, `.yml`, or `.json` files are considered, to avoid
252/// triggering reloads on editor swap files (`.swp`, `~`, `.tmp`, etc.).
253fn is_reload_event(event: &Event) -> bool {
254    let has_yaml = event.paths.iter().any(|p| {
255        p.extension()
256            .map(|e| e == "yaml" || e == "yml" || e == "json")
257            .unwrap_or(false)
258    });
259    has_yaml
260        && matches!(
261            event.kind,
262            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
263        )
264}
265
266/// Extract unique existing parent directories from glob patterns.
267///
268/// Given patterns like `"routes/*.yaml"` or `"/abs/**/*.yaml"`, walks up
269/// from each path until the first non-glob segment and collects the
270/// resulting existing directories.
271pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
272    let mut dirs: HashSet<PathBuf> = HashSet::new();
273
274    for pattern in patterns {
275        let path = Path::new(pattern.as_str());
276
277        let dir = if path.is_dir() {
278            path.to_path_buf()
279        } else {
280            // Walk toward root until no glob characters remain.
281            let mut candidate = path;
282            loop {
283                let s = candidate.to_string_lossy();
284                if s.contains('*') || s.contains('?') || s.contains('[') {
285                    match candidate.parent() {
286                        Some(p) => candidate = p,
287                        None => break,
288                    }
289                } else {
290                    break;
291                }
292            }
293            candidate.to_path_buf()
294        };
295
296        if dir.as_os_str().is_empty() {
297            // Relative pattern with no parent — watch current dir
298            dirs.insert(PathBuf::from("."));
299        } else if dir.exists() {
300            dirs.insert(dir);
301        }
302    }
303
304    dirs.into_iter().collect()
305}
306
307#[cfg(test)]
308mod tests {
309    use std::path::PathBuf;
310
311    use notify::{Event, EventKind};
312
313    use super::is_reload_event;
314
315    fn make_event(paths: &[&str], kind: EventKind) -> Event {
316        Event {
317            kind,
318            paths: paths.iter().map(PathBuf::from).collect(),
319            attrs: Default::default(),
320        }
321    }
322
323    #[test]
324    fn is_reload_event_accepts_yaml() {
325        let ev = make_event(
326            &["routes/test.yaml"],
327            EventKind::Create(notify::event::CreateKind::File),
328        );
329        assert!(is_reload_event(&ev));
330    }
331
332    #[test]
333    fn is_reload_event_accepts_yml() {
334        let ev = make_event(
335            &["routes/test.yml"],
336            EventKind::Modify(notify::event::ModifyKind::Data(
337                notify::event::DataChange::Content,
338            )),
339        );
340        assert!(is_reload_event(&ev));
341    }
342
343    #[test]
344    fn is_reload_event_accepts_json() {
345        let ev = make_event(
346            &["routes/test.json"],
347            EventKind::Create(notify::event::CreateKind::File),
348        );
349        assert!(is_reload_event(&ev));
350    }
351
352    #[test]
353    fn is_reload_event_rejects_other_extensions() {
354        let ev = make_event(
355            &["routes/test.toml"],
356            EventKind::Create(notify::event::CreateKind::File),
357        );
358        assert!(!is_reload_event(&ev));
359    }
360
361    #[test]
362    fn is_reload_event_rejects_swap_files() {
363        let ev = make_event(
364            &["routes/.test.yaml.swp"],
365            EventKind::Create(notify::event::CreateKind::File),
366        );
367        assert!(!is_reload_event(&ev));
368    }
369}