Skip to main content

camel_core/hot_reload/adapters/
reload_watcher.rs

1//! File-watching hot-reload loop.
2//!
3//! Watches YAML route files for changes and triggers pipeline reloads.
4//!
5//! # Circular dependency avoidance
6//!
7//! `camel-core` cannot depend on `camel-dsl` (which already depends on
8//! `camel-core`). Route discovery is therefore injected via a closure:
9//!
10//! ```ignore
11//! watch_and_reload(dirs, controller, || discover_routes(&patterns)).await?;
12//! ```
13//!
14//! This keeps `camel-core` decoupled while letting `camel-cli` (which has
15//! access to both) wire the pieces together.
16
17use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::{
29    compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
30};
31use crate::lifecycle::application::route_definition::RouteDefinition;
32
33pub struct ReloadWatcher;
34
35impl ReloadWatcher {
36    pub async fn watch_and_reload<F>(
37        watch_dirs: Vec<PathBuf>,
38        controller: RuntimeExecutionHandle,
39        discover_fn: F,
40        shutdown: Option<CancellationToken>,
41        drain_timeout: Duration,
42        debounce: Duration,
43    ) -> Result<(), CamelError>
44    where
45        F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
46    {
47        watch_and_reload(
48            watch_dirs,
49            controller,
50            discover_fn,
51            shutdown,
52            drain_timeout,
53            debounce,
54        )
55        .await
56    }
57
58    pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
59        resolve_watch_dirs(patterns)
60    }
61}
62
63/// Watch directories and reload pipelines when YAML files change.
64///
65/// * `watch_dirs` — directories to monitor (non-recursive watch on each).
66/// * `controller` — live route controller to apply reload actions to.
67/// * `discover_fn` — called after each debounced file-change event to load
68///   the current set of route definitions from disk.
69/// * `shutdown` — optional token; cancel it to stop the watcher gracefully.
70///
71/// The function runs indefinitely until the channel is closed, the shutdown
72/// token is cancelled, or a fatal initialisation error occurs.
73///
74/// # Errors
75///
76/// Returns `Err` only on fatal initialisation failure (watcher cannot be
77/// created, or a directory cannot be watched). Per-route reload errors are
78/// logged as warnings and do not terminate the loop.
79pub async fn watch_and_reload<F>(
80    watch_dirs: Vec<PathBuf>,
81    controller: RuntimeExecutionHandle,
82    discover_fn: F,
83    shutdown: Option<CancellationToken>,
84    drain_timeout: Duration,
85    debounce: Duration,
86) -> Result<(), CamelError>
87where
88    F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
89{
90    let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
91
92    let mut watcher = RecommendedWatcher::new(
93        move |res| {
94            let _ = tx.blocking_send(res);
95        },
96        notify::Config::default(),
97    )
98    .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
99
100    if watch_dirs.is_empty() {
101        tracing::warn!("hot-reload: no directories to watch");
102    }
103
104    for dir in &watch_dirs {
105        watcher
106            .watch(dir, RecursiveMode::NonRecursive)
107            .map_err(|e| {
108                CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
109            })?;
110        tracing::info!("hot-reload: watching {:?}", dir);
111    }
112
113    // Debounce duration to coalesce rapid successive file events from editors
114    // (e.g., when saving a file, multiple events may fire in quick succession).
115    // debounce duration is passed by the caller (configured via CamelConfig.watch_debounce_ms)
116
117    // Helper: check if shutdown was requested
118    let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
119
120    loop {
121        if is_cancelled() {
122            tracing::info!("hot-reload: shutdown requested — stopping watcher");
123            return Ok(());
124        }
125
126        // Wait for the first relevant event.
127        let triggered = loop {
128            // Select between an incoming event and a cancellation signal.
129            let recv_fut = rx.recv();
130            if let Some(token) = shutdown.as_ref() {
131                tokio::select! {
132                    biased;
133                    _ = token.cancelled() => {
134                        tracing::info!("hot-reload: shutdown requested — stopping watcher");
135                        return Ok(());
136                    }
137                    msg = recv_fut => {
138                        match msg {
139                            None => return Ok(()), // channel closed
140                            Some(Err(e)) => {
141                                tracing::warn!("hot-reload: watcher error: {e}");
142                                continue;
143                            }
144                            Some(Ok(event)) => {
145                                if is_reload_event(&event) {
146                                    break true;
147                                }
148                            }
149                        }
150                    }
151                }
152            } else {
153                match recv_fut.await {
154                    None => return Ok(()), // channel closed — watcher dropped
155                    Some(Err(e)) => {
156                        tracing::warn!("hot-reload: watcher error: {e}");
157                        continue;
158                    }
159                    Some(Ok(event)) => {
160                        if is_reload_event(&event) {
161                            break true;
162                        }
163                    }
164                }
165            }
166        };
167
168        if !triggered {
169            continue;
170        }
171
172        // Drain further events within the debounce window.
173        let deadline = tokio::time::Instant::now() + debounce;
174        loop {
175            match tokio::time::timeout_at(deadline, rx.recv()).await {
176                Ok(Some(_)) => continue,   // consume
177                Ok(None) => return Ok(()), // channel closed
178                Err(_elapsed) => break,    // debounce window expired
179            }
180        }
181        tracing::info!("hot-reload: file change detected — reloading routes");
182
183        // Discover new definitions from disk.
184        let new_defs = match discover_fn() {
185            Ok(defs) => defs,
186            Err(e) => {
187                tracing::warn!("hot-reload: route discovery failed: {e}");
188                continue;
189            }
190        };
191
192        // Compute the diff from runtime projection route IDs only.
193        let runtime_route_ids = match controller.runtime_route_ids().await {
194            Ok(route_ids) => route_ids,
195            Err(err) => {
196                tracing::warn!(
197                    error = %err,
198                    "hot-reload: failed to list runtime routes; skipping this reload cycle"
199                );
200                continue;
201            }
202        };
203        let actions = compute_reload_actions_from_runtime_snapshot(&new_defs, &runtime_route_ids);
204
205        if actions.is_empty() {
206            tracing::debug!("hot-reload: no route changes detected");
207            continue;
208        }
209
210        tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
211
212        let errors = execute_reload_actions(actions, new_defs, &controller, drain_timeout).await;
213        for err in &errors {
214            tracing::warn!(
215                "hot-reload: error on route '{}' ({}): {}",
216                err.route_id,
217                err.action,
218                err.error
219            );
220        }
221    }
222}
223
224/// Returns `true` if this notify event should trigger a reload.
225///
226/// Only events affecting `.yaml` or `.yml` files are considered, to avoid
227/// triggering reloads on editor swap files (`.swp`, `~`, `.tmp`, etc.).
228fn is_reload_event(event: &Event) -> bool {
229    let has_yaml = event.paths.iter().any(|p| {
230        p.extension()
231            .map(|e| e == "yaml" || e == "yml")
232            .unwrap_or(false)
233    });
234    has_yaml
235        && matches!(
236            event.kind,
237            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
238        )
239}
240
241/// Extract unique existing parent directories from glob patterns.
242///
243/// Given patterns like `"routes/*.yaml"` or `"/abs/**/*.yaml"`, walks up
244/// from each path until the first non-glob segment and collects the
245/// resulting existing directories.
246pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
247    let mut dirs: HashSet<PathBuf> = HashSet::new();
248
249    for pattern in patterns {
250        let path = Path::new(pattern.as_str());
251
252        let dir = if path.is_dir() {
253            path.to_path_buf()
254        } else {
255            // Walk toward root until no glob characters remain.
256            let mut candidate = path;
257            loop {
258                let s = candidate.to_string_lossy();
259                if s.contains('*') || s.contains('?') || s.contains('[') {
260                    match candidate.parent() {
261                        Some(p) => candidate = p,
262                        None => break,
263                    }
264                } else {
265                    break;
266                }
267            }
268            candidate.to_path_buf()
269        };
270
271        if dir.as_os_str().is_empty() {
272            // Relative pattern with no parent — watch current dir
273            dirs.insert(PathBuf::from("."));
274        } else if dir.exists() {
275            dirs.insert(dir);
276        }
277    }
278
279    dirs.into_iter().collect()
280}