camel_core/hot_reload/adapters/
reload_watcher.rs1use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::reload::FunctionReloadContext;
29use crate::hot_reload::application::{
30 compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
31};
32use crate::lifecycle::application::route_definition::RouteDefinition;
33
34pub struct ReloadWatcher;
35
36impl ReloadWatcher {
37 pub async fn watch_and_reload<F>(
38 watch_dirs: Vec<PathBuf>,
39 controller: RuntimeExecutionHandle,
40 discover_fn: F,
41 shutdown: Option<CancellationToken>,
42 drain_timeout: Duration,
43 debounce: Duration,
44 ) -> Result<(), CamelError>
45 where
46 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
47 {
48 watch_and_reload(
49 watch_dirs,
50 controller,
51 discover_fn,
52 shutdown,
53 drain_timeout,
54 debounce,
55 )
56 .await
57 }
58
59 pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
60 resolve_watch_dirs(patterns)
61 }
62}
63
64pub async fn watch_and_reload<F>(
81 watch_dirs: Vec<PathBuf>,
82 controller: RuntimeExecutionHandle,
83 discover_fn: F,
84 shutdown: Option<CancellationToken>,
85 drain_timeout: Duration,
86 debounce: Duration,
87) -> Result<(), CamelError>
88where
89 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
90{
91 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
92
93 let mut watcher = RecommendedWatcher::new(
94 move |res| {
95 let _ = tx.blocking_send(res);
96 },
97 notify::Config::default(),
98 )
99 .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
100
101 if watch_dirs.is_empty() {
102 tracing::warn!("hot-reload: no directories to watch");
103 }
104
105 for dir in &watch_dirs {
106 watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
107 CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
108 })?;
109 tracing::info!("hot-reload: watching {:?}", dir);
110 }
111
112 let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
118
119 loop {
120 if is_cancelled() {
121 tracing::info!("hot-reload: shutdown requested — stopping watcher");
122 return Ok(());
123 }
124
125 let triggered = loop {
127 let recv_fut = rx.recv();
129 if let Some(token) = shutdown.as_ref() {
130 tokio::select! {
131 biased;
132 _ = token.cancelled() => {
133 tracing::info!("hot-reload: shutdown requested — stopping watcher");
134 return Ok(());
135 }
136 msg = recv_fut => {
137 match msg {
138 None => return Ok(()), Some(Err(e)) => {
140 tracing::warn!("hot-reload: watcher error: {e}");
141 continue;
142 }
143 Some(Ok(event)) => {
144 if is_reload_event(&event) {
145 break true;
146 }
147 }
148 }
149 }
150 }
151 } else {
152 match recv_fut.await {
153 None => return Ok(()), Some(Err(e)) => {
155 tracing::warn!("hot-reload: watcher error: {e}");
156 continue;
157 }
158 Some(Ok(event)) => {
159 if is_reload_event(&event) {
160 break true;
161 }
162 }
163 }
164 }
165 };
166
167 if !triggered {
168 continue;
169 }
170
171 let deadline = tokio::time::Instant::now() + debounce;
173 loop {
174 match tokio::time::timeout_at(deadline, rx.recv()).await {
175 Ok(Some(_)) => continue, Ok(None) => return Ok(()), Err(_elapsed) => break, }
179 }
180 tracing::info!("hot-reload: file change detected — reloading routes");
181
182 let new_defs = match discover_fn() {
184 Ok(defs) => defs,
185 Err(e) => {
186 tracing::warn!("hot-reload: route discovery failed: {e}");
187 continue;
188 }
189 };
190
191 let runtime_route_ids = match controller.runtime_route_ids().await {
193 Ok(route_ids) => route_ids,
194 Err(err) => {
195 tracing::warn!(
196 error = %err,
197 "hot-reload: failed to list runtime routes; skipping this reload cycle"
198 );
199 continue;
200 }
201 };
202 let mut runtime_hashes: std::collections::HashMap<String, u64> =
203 std::collections::HashMap::new();
204 for id in &runtime_route_ids {
205 if let Some(hash) = controller.route_source_hash(id).await {
206 runtime_hashes.insert(id.clone(), hash);
207 }
208 }
209
210 let actions = compute_reload_actions_from_runtime_snapshot(
211 &new_defs,
212 &runtime_route_ids,
213 &|route_id: &str| runtime_hashes.get(route_id).copied(),
214 );
215
216 if actions.is_empty() {
217 tracing::debug!("hot-reload: no route changes detected");
218 continue;
219 }
220
221 tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
222
223 let function_ctx = controller.function_invoker().map(|invoker| {
224 let generation = invoker.begin_reload();
225 FunctionReloadContext {
226 invoker,
227 generation,
228 }
229 });
230 let errors = execute_reload_actions(
231 actions,
232 new_defs,
233 &controller,
234 drain_timeout,
235 function_ctx.as_ref(),
236 )
237 .await;
238 for err in &errors {
239 tracing::warn!(
240 "hot-reload: error on route '{}' ({}): {}",
241 err.route_id,
242 err.action,
243 err.error
244 );
245 }
246 }
247}
248
249fn is_reload_event(event: &Event) -> bool {
254 let has_yaml = event.paths.iter().any(|p| {
255 p.extension()
256 .map(|e| e == "yaml" || e == "yml" || e == "json")
257 .unwrap_or(false)
258 });
259 has_yaml
260 && matches!(
261 event.kind,
262 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
263 )
264}
265
266pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
272 let mut dirs: HashSet<PathBuf> = HashSet::new();
273
274 for pattern in patterns {
275 let path = Path::new(pattern.as_str());
276
277 let dir = if path.is_dir() {
278 path.to_path_buf()
279 } else {
280 let mut candidate = path;
282 loop {
283 let s = candidate.to_string_lossy();
284 if s.contains('*') || s.contains('?') || s.contains('[') {
285 match candidate.parent() {
286 Some(p) => candidate = p,
287 None => break,
288 }
289 } else {
290 break;
291 }
292 }
293 candidate.to_path_buf()
294 };
295
296 if dir.as_os_str().is_empty() {
297 dirs.insert(PathBuf::from("."));
299 } else if dir.exists() {
300 dirs.insert(dir);
301 }
302 }
303
304 dirs.into_iter().collect()
305}
306
307#[cfg(test)]
308mod tests {
309 use std::path::PathBuf;
310
311 use notify::{Event, EventKind};
312
313 use super::is_reload_event;
314
315 fn make_event(paths: &[&str], kind: EventKind) -> Event {
316 Event {
317 kind,
318 paths: paths.iter().map(PathBuf::from).collect(),
319 attrs: Default::default(),
320 }
321 }
322
323 #[test]
324 fn is_reload_event_accepts_yaml() {
325 let ev = make_event(
326 &["routes/test.yaml"],
327 EventKind::Create(notify::event::CreateKind::File),
328 );
329 assert!(is_reload_event(&ev));
330 }
331
332 #[test]
333 fn is_reload_event_accepts_yml() {
334 let ev = make_event(
335 &["routes/test.yml"],
336 EventKind::Modify(notify::event::ModifyKind::Data(
337 notify::event::DataChange::Content,
338 )),
339 );
340 assert!(is_reload_event(&ev));
341 }
342
343 #[test]
344 fn is_reload_event_accepts_json() {
345 let ev = make_event(
346 &["routes/test.json"],
347 EventKind::Create(notify::event::CreateKind::File),
348 );
349 assert!(is_reload_event(&ev));
350 }
351
352 #[test]
353 fn is_reload_event_rejects_other_extensions() {
354 let ev = make_event(
355 &["routes/test.toml"],
356 EventKind::Create(notify::event::CreateKind::File),
357 );
358 assert!(!is_reload_event(&ev));
359 }
360
361 #[test]
362 fn is_reload_event_rejects_swap_files() {
363 let ev = make_event(
364 &["routes/.test.yaml.swp"],
365 EventKind::Create(notify::event::CreateKind::File),
366 );
367 assert!(!is_reload_event(&ev));
368 }
369}