Skip to main content

camel_core/hot_reload/adapters/
reload_watcher.rs

1//! File-watching hot-reload loop.
2//!
3//! Watches YAML route files for changes and triggers pipeline reloads.
4//!
5//! # Circular dependency avoidance
6//!
7//! `camel-core` cannot depend on `camel-dsl` (which already depends on
8//! `camel-core`). Route discovery is therefore injected via a closure:
9//!
10//! ```ignore
11//! watch_and_reload(dirs, controller, || discover_routes(&patterns)).await?;
12//! ```
13//!
14//! This keeps `camel-core` decoupled while letting `camel-cli` (which has
15//! access to both) wire the pieces together.
16
17use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::{
29    compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
30};
31use crate::lifecycle::application::route_definition::RouteDefinition;
32
33pub struct ReloadWatcher;
34
35impl ReloadWatcher {
36    pub async fn watch_and_reload<F>(
37        watch_dirs: Vec<PathBuf>,
38        controller: RuntimeExecutionHandle,
39        discover_fn: F,
40        shutdown: Option<CancellationToken>,
41        drain_timeout: Duration,
42        debounce: Duration,
43    ) -> Result<(), CamelError>
44    where
45        F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
46    {
47        watch_and_reload(
48            watch_dirs,
49            controller,
50            discover_fn,
51            shutdown,
52            drain_timeout,
53            debounce,
54        )
55        .await
56    }
57
58    pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
59        resolve_watch_dirs(patterns)
60    }
61}
62
63/// Watch directories and reload pipelines when YAML files change.
64///
65/// * `watch_dirs` — directories to monitor (non-recursive watch on each).
66/// * `controller` — live route controller to apply reload actions to.
67/// * `discover_fn` — called after each debounced file-change event to load
68///   the current set of route definitions from disk.
69/// * `shutdown` — optional token; cancel it to stop the watcher gracefully.
70///
71/// The function runs indefinitely until the channel is closed, the shutdown
72/// token is cancelled, or a fatal initialisation error occurs.
73///
74/// # Errors
75///
76/// Returns `Err` only on fatal initialisation failure (watcher cannot be
77/// created, or a directory cannot be watched). Per-route reload errors are
78/// logged as warnings and do not terminate the loop.
79pub async fn watch_and_reload<F>(
80    watch_dirs: Vec<PathBuf>,
81    controller: RuntimeExecutionHandle,
82    discover_fn: F,
83    shutdown: Option<CancellationToken>,
84    drain_timeout: Duration,
85    debounce: Duration,
86) -> Result<(), CamelError>
87where
88    F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
89{
90    let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
91
92    let mut watcher = RecommendedWatcher::new(
93        move |res| {
94            let _ = tx.blocking_send(res);
95        },
96        notify::Config::default(),
97    )
98    .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
99
100    if watch_dirs.is_empty() {
101        tracing::warn!("hot-reload: no directories to watch");
102    }
103
104    for dir in &watch_dirs {
105        watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
106            CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
107        })?;
108        tracing::info!("hot-reload: watching {:?}", dir);
109    }
110
111    // Debounce duration to coalesce rapid successive file events from editors
112    // (e.g., when saving a file, multiple events may fire in quick succession).
113    // debounce duration is passed by the caller (configured via CamelConfig.watch_debounce_ms)
114
115    // Helper: check if shutdown was requested
116    let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
117
118    loop {
119        if is_cancelled() {
120            tracing::info!("hot-reload: shutdown requested — stopping watcher");
121            return Ok(());
122        }
123
124        // Wait for the first relevant event.
125        let triggered = loop {
126            // Select between an incoming event and a cancellation signal.
127            let recv_fut = rx.recv();
128            if let Some(token) = shutdown.as_ref() {
129                tokio::select! {
130                    biased;
131                    _ = token.cancelled() => {
132                        tracing::info!("hot-reload: shutdown requested — stopping watcher");
133                        return Ok(());
134                    }
135                    msg = recv_fut => {
136                        match msg {
137                            None => return Ok(()), // channel closed
138                            Some(Err(e)) => {
139                                tracing::warn!("hot-reload: watcher error: {e}");
140                                continue;
141                            }
142                            Some(Ok(event)) => {
143                                if is_reload_event(&event) {
144                                    break true;
145                                }
146                            }
147                        }
148                    }
149                }
150            } else {
151                match recv_fut.await {
152                    None => return Ok(()), // channel closed — watcher dropped
153                    Some(Err(e)) => {
154                        tracing::warn!("hot-reload: watcher error: {e}");
155                        continue;
156                    }
157                    Some(Ok(event)) => {
158                        if is_reload_event(&event) {
159                            break true;
160                        }
161                    }
162                }
163            }
164        };
165
166        if !triggered {
167            continue;
168        }
169
170        // Drain further events within the debounce window.
171        let deadline = tokio::time::Instant::now() + debounce;
172        loop {
173            match tokio::time::timeout_at(deadline, rx.recv()).await {
174                Ok(Some(_)) => continue,   // consume
175                Ok(None) => return Ok(()), // channel closed
176                Err(_elapsed) => break,    // debounce window expired
177            }
178        }
179        tracing::info!("hot-reload: file change detected — reloading routes");
180
181        // Discover new definitions from disk.
182        let new_defs = match discover_fn() {
183            Ok(defs) => defs,
184            Err(e) => {
185                tracing::warn!("hot-reload: route discovery failed: {e}");
186                continue;
187            }
188        };
189
190        // Compute the diff from runtime projection route IDs only.
191        let runtime_route_ids = match controller.runtime_route_ids().await {
192            Ok(route_ids) => route_ids,
193            Err(err) => {
194                tracing::warn!(
195                    error = %err,
196                    "hot-reload: failed to list runtime routes; skipping this reload cycle"
197                );
198                continue;
199            }
200        };
201        let mut runtime_hashes: std::collections::HashMap<String, u64> =
202            std::collections::HashMap::new();
203        for id in &runtime_route_ids {
204            if let Some(hash) = controller.route_source_hash(id).await {
205                runtime_hashes.insert(id.clone(), hash);
206            }
207        }
208
209        let actions = compute_reload_actions_from_runtime_snapshot(
210            &new_defs,
211            &runtime_route_ids,
212            &|route_id: &str| runtime_hashes.get(route_id).copied(),
213        );
214
215        if actions.is_empty() {
216            tracing::debug!("hot-reload: no route changes detected");
217            continue;
218        }
219
220        tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
221
222        let errors = execute_reload_actions(actions, new_defs, &controller, drain_timeout).await;
223        for err in &errors {
224            tracing::warn!(
225                "hot-reload: error on route '{}' ({}): {}",
226                err.route_id,
227                err.action,
228                err.error
229            );
230        }
231    }
232}
233
234/// Returns `true` if this notify event should trigger a reload.
235///
236/// Only events affecting `.yaml` or `.yml` files are considered, to avoid
237/// triggering reloads on editor swap files (`.swp`, `~`, `.tmp`, etc.).
238fn is_reload_event(event: &Event) -> bool {
239    let has_yaml = event.paths.iter().any(|p| {
240        p.extension()
241            .map(|e| e == "yaml" || e == "yml")
242            .unwrap_or(false)
243    });
244    has_yaml
245        && matches!(
246            event.kind,
247            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
248        )
249}
250
251/// Extract unique existing parent directories from glob patterns.
252///
253/// Given patterns like `"routes/*.yaml"` or `"/abs/**/*.yaml"`, walks up
254/// from each path until the first non-glob segment and collects the
255/// resulting existing directories.
256pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
257    let mut dirs: HashSet<PathBuf> = HashSet::new();
258
259    for pattern in patterns {
260        let path = Path::new(pattern.as_str());
261
262        let dir = if path.is_dir() {
263            path.to_path_buf()
264        } else {
265            // Walk toward root until no glob characters remain.
266            let mut candidate = path;
267            loop {
268                let s = candidate.to_string_lossy();
269                if s.contains('*') || s.contains('?') || s.contains('[') {
270                    match candidate.parent() {
271                        Some(p) => candidate = p,
272                        None => break,
273                    }
274                } else {
275                    break;
276                }
277            }
278            candidate.to_path_buf()
279        };
280
281        if dir.as_os_str().is_empty() {
282            // Relative pattern with no parent — watch current dir
283            dirs.insert(PathBuf::from("."));
284        } else if dir.exists() {
285            dirs.insert(dir);
286        }
287    }
288
289    dirs.into_iter().collect()
290}