Skip to main content

camel_core/hot_reload/adapters/
reload_watcher.rs

1//! File-watching hot-reload loop.
2//!
3//! Watches YAML route files for changes and triggers pipeline reloads.
4//!
5//! # Circular dependency avoidance
6//!
7//! `camel-core` cannot depend on `camel-dsl` (which already depends on
8//! `camel-core`). Route discovery is therefore injected via a closure:
9//!
10//! ```ignore
11//! watch_and_reload(dirs, controller, || discover_routes(&patterns)).await?;
12//! ```
13//!
14//! This keeps `camel-core` decoupled while letting `camel-cli` (which has
15//! access to both) wire the pieces together.
16
17use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::{
29    compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
30};
31use crate::lifecycle::application::route_definition::RouteDefinition;
32
33pub struct ReloadWatcher;
34
35impl ReloadWatcher {
36    pub async fn watch_and_reload<F>(
37        watch_dirs: Vec<PathBuf>,
38        controller: RuntimeExecutionHandle,
39        discover_fn: F,
40        shutdown: Option<CancellationToken>,
41    ) -> Result<(), CamelError>
42    where
43        F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
44    {
45        watch_and_reload(watch_dirs, controller, discover_fn, shutdown).await
46    }
47
48    pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
49        resolve_watch_dirs(patterns)
50    }
51}
52
53/// Watch directories and reload pipelines when YAML files change.
54///
55/// * `watch_dirs` — directories to monitor (non-recursive watch on each).
56/// * `controller` — live route controller to apply reload actions to.
57/// * `discover_fn` — called after each debounced file-change event to load
58///   the current set of route definitions from disk.
59/// * `shutdown` — optional token; cancel it to stop the watcher gracefully.
60///
61/// The function runs indefinitely until the channel is closed, the shutdown
62/// token is cancelled, or a fatal initialisation error occurs.
63///
64/// # Errors
65///
66/// Returns `Err` only on fatal initialisation failure (watcher cannot be
67/// created, or a directory cannot be watched). Per-route reload errors are
68/// logged as warnings and do not terminate the loop.
69pub async fn watch_and_reload<F>(
70    watch_dirs: Vec<PathBuf>,
71    controller: RuntimeExecutionHandle,
72    discover_fn: F,
73    shutdown: Option<CancellationToken>,
74) -> Result<(), CamelError>
75where
76    F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
77{
78    let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
79
80    let mut watcher = RecommendedWatcher::new(
81        move |res| {
82            let _ = tx.blocking_send(res);
83        },
84        notify::Config::default(),
85    )
86    .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
87
88    if watch_dirs.is_empty() {
89        tracing::warn!("hot-reload: no directories to watch");
90    }
91
92    for dir in &watch_dirs {
93        watcher
94            .watch(dir, RecursiveMode::NonRecursive)
95            .map_err(|e| {
96                CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
97            })?;
98        tracing::info!("hot-reload: watching {:?}", dir);
99    }
100
101    // Debounce duration to coalesce rapid successive file events from editors
102    // (e.g., when saving a file, multiple events may fire in quick succession).
103    // TODO: make this configurable via Camel.toml
104    let debounce = Duration::from_millis(300);
105
106    // Helper: check if shutdown was requested
107    let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
108
109    loop {
110        if is_cancelled() {
111            tracing::info!("hot-reload: shutdown requested — stopping watcher");
112            return Ok(());
113        }
114
115        // Wait for the first relevant event.
116        let triggered = loop {
117            // Select between an incoming event and a cancellation signal.
118            let recv_fut = rx.recv();
119            if let Some(token) = shutdown.as_ref() {
120                tokio::select! {
121                    biased;
122                    _ = token.cancelled() => {
123                        tracing::info!("hot-reload: shutdown requested — stopping watcher");
124                        return Ok(());
125                    }
126                    msg = recv_fut => {
127                        match msg {
128                            None => return Ok(()), // channel closed
129                            Some(Err(e)) => {
130                                tracing::warn!("hot-reload: watcher error: {e}");
131                                continue;
132                            }
133                            Some(Ok(event)) => {
134                                if is_reload_event(&event) {
135                                    break true;
136                                }
137                            }
138                        }
139                    }
140                }
141            } else {
142                match recv_fut.await {
143                    None => return Ok(()), // channel closed — watcher dropped
144                    Some(Err(e)) => {
145                        tracing::warn!("hot-reload: watcher error: {e}");
146                        continue;
147                    }
148                    Some(Ok(event)) => {
149                        if is_reload_event(&event) {
150                            break true;
151                        }
152                    }
153                }
154            }
155        };
156
157        if !triggered {
158            continue;
159        }
160
161        // Drain further events within the debounce window.
162        let deadline = tokio::time::Instant::now() + debounce;
163        loop {
164            match tokio::time::timeout_at(deadline, rx.recv()).await {
165                Ok(Some(_)) => continue,   // consume
166                Ok(None) => return Ok(()), // channel closed
167                Err(_elapsed) => break,    // debounce window expired
168            }
169        }
170        tracing::info!("hot-reload: file change detected — reloading routes");
171
172        // Discover new definitions from disk.
173        let new_defs = match discover_fn() {
174            Ok(defs) => defs,
175            Err(e) => {
176                tracing::warn!("hot-reload: route discovery failed: {e}");
177                continue;
178            }
179        };
180
181        // Compute the diff from runtime projection route IDs only.
182        let runtime_route_ids = match controller.runtime_route_ids().await {
183            Ok(route_ids) => route_ids,
184            Err(err) => {
185                tracing::warn!(
186                    error = %err,
187                    "hot-reload: failed to list runtime routes; skipping this reload cycle"
188                );
189                continue;
190            }
191        };
192        let actions = compute_reload_actions_from_runtime_snapshot(&new_defs, &runtime_route_ids);
193
194        if actions.is_empty() {
195            tracing::debug!("hot-reload: no route changes detected");
196            continue;
197        }
198
199        tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
200
201        let errors = execute_reload_actions(actions, new_defs, &controller).await;
202        for err in &errors {
203            tracing::warn!(
204                "hot-reload: error on route '{}' ({}): {}",
205                err.route_id,
206                err.action,
207                err.error
208            );
209        }
210    }
211}
212
213/// Returns `true` if this notify event should trigger a reload.
214///
215/// Only events affecting `.yaml` or `.yml` files are considered, to avoid
216/// triggering reloads on editor swap files (`.swp`, `~`, `.tmp`, etc.).
217fn is_reload_event(event: &Event) -> bool {
218    let has_yaml = event.paths.iter().any(|p| {
219        p.extension()
220            .map(|e| e == "yaml" || e == "yml")
221            .unwrap_or(false)
222    });
223    has_yaml
224        && matches!(
225            event.kind,
226            EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
227        )
228}
229
230/// Extract unique existing parent directories from glob patterns.
231///
232/// Given patterns like `"routes/*.yaml"` or `"/abs/**/*.yaml"`, walks up
233/// from each path until the first non-glob segment and collects the
234/// resulting existing directories.
235pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
236    let mut dirs: HashSet<PathBuf> = HashSet::new();
237
238    for pattern in patterns {
239        let path = Path::new(pattern.as_str());
240
241        let dir = if path.is_dir() {
242            path.to_path_buf()
243        } else {
244            // Walk toward root until no glob characters remain.
245            let mut candidate = path;
246            loop {
247                let s = candidate.to_string_lossy();
248                if s.contains('*') || s.contains('?') || s.contains('[') {
249                    match candidate.parent() {
250                        Some(p) => candidate = p,
251                        None => break,
252                    }
253                } else {
254                    break;
255                }
256            }
257            candidate.to_path_buf()
258        };
259
260        if dir.as_os_str().is_empty() {
261            // Relative pattern with no parent — watch current dir
262            dirs.insert(PathBuf::from("."));
263        } else if dir.exists() {
264            dirs.insert(dir);
265        }
266    }
267
268    dirs.into_iter().collect()
269}