camel_core/hot_reload/adapters/
reload_watcher.rs1use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::reload::FunctionReloadContext;
29use crate::hot_reload::application::{
30 compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
31};
32use crate::lifecycle::application::route_definition::RouteDefinition;
33
34pub struct ReloadWatcher;
35
36impl ReloadWatcher {
37 pub async fn watch_and_reload<F>(
38 watch_dirs: Vec<PathBuf>,
39 controller: RuntimeExecutionHandle,
40 discover_fn: F,
41 shutdown: Option<CancellationToken>,
42 drain_timeout: Duration,
43 debounce: Duration,
44 ) -> Result<(), CamelError>
45 where
46 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
47 {
48 watch_and_reload(
49 watch_dirs,
50 controller,
51 discover_fn,
52 shutdown,
53 drain_timeout,
54 debounce,
55 )
56 .await
57 }
58
59 pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
60 resolve_watch_dirs(patterns)
61 }
62}
63
64pub async fn watch_and_reload<F>(
81 watch_dirs: Vec<PathBuf>,
82 controller: RuntimeExecutionHandle,
83 discover_fn: F,
84 shutdown: Option<CancellationToken>,
85 drain_timeout: Duration,
86 debounce: Duration,
87) -> Result<(), CamelError>
88where
89 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
90{
91 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
92
93 let mut watcher = RecommendedWatcher::new(
94 move |res| {
95 let _ = tx.blocking_send(res);
96 },
97 notify::Config::default(),
98 )
99 .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
100
101 if watch_dirs.is_empty() {
102 tracing::warn!("hot-reload: no directories to watch");
103 }
104
105 for dir in &watch_dirs {
106 watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
107 CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
108 })?;
109 tracing::info!("hot-reload: watching {:?}", dir);
110 }
111
112 let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
118
119 loop {
120 if is_cancelled() {
121 tracing::info!("hot-reload: shutdown requested — stopping watcher");
122 return Ok(());
123 }
124
125 let triggered = loop {
127 let recv_fut = rx.recv();
129 if let Some(token) = shutdown.as_ref() {
130 tokio::select! {
131 biased;
132 _ = token.cancelled() => {
133 tracing::info!("hot-reload: shutdown requested — stopping watcher");
134 return Ok(());
135 }
136 msg = recv_fut => {
137 match msg {
138 None => return Ok(()), Some(Err(e)) => {
140 tracing::warn!("hot-reload: watcher error: {e}");
141 continue;
142 }
143 Some(Ok(event)) => {
144 if is_reload_event(&event) {
145 break true;
146 }
147 }
148 }
149 }
150 }
151 } else {
152 match recv_fut.await {
153 None => return Ok(()), Some(Err(e)) => {
155 tracing::warn!("hot-reload: watcher error: {e}");
156 continue;
157 }
158 Some(Ok(event)) => {
159 if is_reload_event(&event) {
160 break true;
161 }
162 }
163 }
164 }
165 };
166
167 if !triggered {
168 continue;
169 }
170
171 let deadline = tokio::time::Instant::now() + debounce;
173 loop {
174 match tokio::time::timeout_at(deadline, rx.recv()).await {
175 Ok(Some(_)) => continue, Ok(None) => return Ok(()), Err(_elapsed) => break, }
179 }
180 tracing::info!("hot-reload: file change detected — reloading routes");
181
182 let new_defs = match discover_fn() {
184 Ok(defs) => defs,
185 Err(e) => {
186 tracing::warn!("hot-reload: route discovery failed: {e}");
187 continue;
188 }
189 };
190
191 let runtime_route_ids = match controller.runtime_route_ids().await {
193 Ok(route_ids) => route_ids,
194 Err(err) => {
195 tracing::warn!(
196 error = %err,
197 "hot-reload: failed to list runtime routes; skipping this reload cycle"
198 );
199 continue;
200 }
201 };
202 let mut runtime_hashes: std::collections::HashMap<String, u64> =
203 std::collections::HashMap::new();
204 for id in &runtime_route_ids {
205 if let Some(hash) = controller.route_source_hash(id).await {
206 runtime_hashes.insert(id.clone(), hash);
207 }
208 }
209
210 let actions = compute_reload_actions_from_runtime_snapshot(
211 &new_defs,
212 &runtime_route_ids,
213 &|route_id: &str| runtime_hashes.get(route_id).copied(),
214 );
215
216 if actions.is_empty() {
217 tracing::debug!("hot-reload: no route changes detected");
218 continue;
219 }
220
221 tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
222
223 let function_ctx = controller.function_invoker().map(|invoker| {
224 let generation = invoker.begin_reload();
225 FunctionReloadContext {
226 invoker,
227 generation,
228 }
229 });
230 let errors = execute_reload_actions(
231 actions,
232 new_defs,
233 &controller,
234 drain_timeout,
235 function_ctx.as_ref(),
236 )
237 .await;
238 for err in &errors {
239 tracing::warn!(
240 "hot-reload: error on route '{}' ({}): {}",
241 err.route_id,
242 err.action,
243 err.error
244 );
245 }
246 }
247}
248
249fn is_reload_event(event: &Event) -> bool {
254 let has_yaml = event.paths.iter().any(|p| {
255 p.extension()
256 .map(|e| e == "yaml" || e == "yml" || e == "json")
257 .unwrap_or(false)
258 });
259 has_yaml
260 && matches!(
261 event.kind,
262 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
263 )
264}
265
266pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
272 let mut dirs: HashSet<PathBuf> = HashSet::new();
273
274 for pattern in patterns {
275 let path = Path::new(pattern.as_str());
276
277 let dir = if path.is_dir() {
278 path.to_path_buf()
279 } else {
280 let mut candidate = path;
282 loop {
283 let s = candidate.to_string_lossy();
284 if s.contains('*') || s.contains('?') || s.contains('[') {
285 match candidate.parent() {
286 Some(p) => candidate = p,
287 None => break,
288 }
289 } else {
290 break;
291 }
292 }
293 candidate.to_path_buf()
294 };
295
296 if dir.as_os_str().is_empty() {
297 dirs.insert(PathBuf::from("."));
299 } else if dir.exists() {
300 dirs.insert(dir);
301 }
302 }
303
304 dirs.into_iter().collect()
305}
306
307#[cfg(test)]
308mod tests {
309 use std::fs;
310 use std::path::PathBuf;
311 use std::time::{Duration, SystemTime, UNIX_EPOCH};
312
313 use notify::{Event, EventKind};
314 use tokio_util::sync::CancellationToken;
315
316 use crate::CamelContext;
317
318 use super::{ReloadWatcher, is_reload_event, resolve_watch_dirs, watch_and_reload};
319
320 fn make_event(paths: &[&str], kind: EventKind) -> Event {
321 Event {
322 kind,
323 paths: paths.iter().map(PathBuf::from).collect(),
324 attrs: Default::default(),
325 }
326 }
327
328 #[test]
329 fn is_reload_event_accepts_yaml() {
330 let ev = make_event(
331 &["routes/test.yaml"],
332 EventKind::Create(notify::event::CreateKind::File),
333 );
334 assert!(is_reload_event(&ev));
335 }
336
337 #[test]
338 fn is_reload_event_accepts_yml() {
339 let ev = make_event(
340 &["routes/test.yml"],
341 EventKind::Modify(notify::event::ModifyKind::Data(
342 notify::event::DataChange::Content,
343 )),
344 );
345 assert!(is_reload_event(&ev));
346 }
347
348 #[test]
349 fn is_reload_event_accepts_json() {
350 let ev = make_event(
351 &["routes/test.json"],
352 EventKind::Create(notify::event::CreateKind::File),
353 );
354 assert!(is_reload_event(&ev));
355 }
356
357 #[test]
358 fn is_reload_event_rejects_other_extensions() {
359 let ev = make_event(
360 &["routes/test.toml"],
361 EventKind::Create(notify::event::CreateKind::File),
362 );
363 assert!(!is_reload_event(&ev));
364 }
365
366 #[test]
367 fn is_reload_event_rejects_swap_files() {
368 let ev = make_event(
369 &["routes/.test.yaml.swp"],
370 EventKind::Create(notify::event::CreateKind::File),
371 );
372 assert!(!is_reload_event(&ev));
373 }
374
375 #[test]
376 fn is_reload_event_accepts_remove_kind() {
377 let ev = make_event(
378 &["routes/test.yaml"],
379 EventKind::Remove(notify::event::RemoveKind::File),
380 );
381 assert!(is_reload_event(&ev));
382 }
383
384 #[test]
385 fn is_reload_event_rejects_non_reload_kind_even_with_yaml() {
386 let ev = make_event(
387 &["routes/test.yaml"],
388 EventKind::Access(notify::event::AccessKind::Read),
389 );
390 assert!(!is_reload_event(&ev));
391 }
392
393 #[test]
394 fn is_reload_event_accepts_when_any_path_matches() {
395 let ev = make_event(
396 &["routes/test.tmp", "routes/real.json"],
397 EventKind::Modify(notify::event::ModifyKind::Data(
398 notify::event::DataChange::Content,
399 )),
400 );
401 assert!(is_reload_event(&ev));
402 }
403
404 #[test]
405 fn resolve_watch_dirs_returns_existing_dir_pattern_and_ignores_missing() {
406 let root = std::env::temp_dir().join(format!(
407 "camel-reload-watch-{}",
408 SystemTime::now()
409 .duration_since(UNIX_EPOCH)
410 .expect("system clock before unix epoch")
411 .as_nanos()
412 ));
413 let existing = root.join("routes");
414 fs::create_dir_all(&existing).expect("create temp dir");
415
416 let patterns = vec![
417 existing.to_string_lossy().to_string(),
418 root.join("missing").to_string_lossy().to_string(),
419 ];
420
421 let dirs = resolve_watch_dirs(&patterns);
422
423 assert!(dirs.contains(&existing));
424 assert_eq!(dirs.len(), 1);
425
426 fs::remove_dir_all(&root).expect("cleanup temp dir");
427 }
428
429 #[test]
430 fn resolve_watch_dirs_walks_up_glob_segments() {
431 let root = std::env::temp_dir().join(format!(
432 "camel-reload-watch-glob-{}",
433 SystemTime::now()
434 .duration_since(UNIX_EPOCH)
435 .expect("system clock before unix epoch")
436 .as_nanos()
437 ));
438 let nested = root.join("a").join("b");
439 fs::create_dir_all(&nested).expect("create nested temp dir");
440
441 let pattern = nested
442 .join("**")
443 .join("*.yaml")
444 .to_string_lossy()
445 .to_string();
446
447 let dirs = resolve_watch_dirs(&[pattern]);
448
449 assert!(dirs.contains(&nested));
450
451 fs::remove_dir_all(&root).expect("cleanup temp dir");
452 }
453
454 #[test]
455 fn resolve_watch_dirs_uses_current_dir_for_relative_no_parent_pattern() {
456 let dirs = resolve_watch_dirs(&["*.yaml".to_string()]);
457 assert_eq!(dirs, vec![PathBuf::from(".")]);
458 }
459
460 #[tokio::test]
461 async fn watch_and_reload_returns_immediately_when_shutdown_is_cancelled() {
462 let token = CancellationToken::new();
463 token.cancel();
464
465 let ctx = CamelContext::builder().build().await.unwrap();
466 let result = watch_and_reload(
467 vec![],
468 ctx.runtime_execution_handle(),
469 || Ok(vec![]),
470 Some(token),
471 Duration::from_millis(1),
472 Duration::from_millis(1),
473 )
474 .await;
475
476 assert!(result.is_ok());
477 }
478
479 #[test]
480 fn is_reload_event_rejects_no_extension() {
481 let ev = make_event(
482 &["routes/Makefile"],
483 EventKind::Modify(notify::event::ModifyKind::Data(
484 notify::event::DataChange::Content,
485 )),
486 );
487 assert!(!is_reload_event(&ev));
488 }
489
490 #[test]
491 fn is_reload_event_rejects_any_event_with_no_matching_kind() {
492 let ev = make_event(&["routes/test.yaml"], EventKind::Other);
493 assert!(!is_reload_event(&ev));
494 }
495
496 #[test]
497 fn is_reload_event_rejects_empty_paths() {
498 let ev = Event {
499 kind: EventKind::Create(notify::event::CreateKind::File),
500 paths: vec![],
501 attrs: Default::default(),
502 };
503 assert!(!is_reload_event(&ev));
504 }
505
506 #[test]
507 fn resolve_watch_dirs_deduplicates_same_directory() {
508 let root = std::env::temp_dir().join(format!(
509 "camel-reload-watch-dedup-{}",
510 SystemTime::now()
511 .duration_since(UNIX_EPOCH)
512 .expect("system clock before unix epoch")
513 .as_nanos()
514 ));
515 fs::create_dir_all(&root).expect("create temp dir");
516
517 let patterns = vec![
518 root.join("*.yaml").to_string_lossy().to_string(),
519 root.join("*.yml").to_string_lossy().to_string(),
520 ];
521
522 let dirs = resolve_watch_dirs(&patterns);
523 assert_eq!(dirs.len(), 1);
524 assert!(dirs.contains(&root));
525
526 fs::remove_dir_all(&root).expect("cleanup temp dir");
527 }
528
529 #[test]
530 fn resolve_watch_dirs_handles_absolute_glob_pattern() {
531 let root = std::env::temp_dir().join(format!(
532 "camel-reload-watch-abs-{}",
533 SystemTime::now()
534 .duration_since(UNIX_EPOCH)
535 .expect("system clock before unix epoch")
536 .as_nanos()
537 ));
538 let nested = root.join("routes").join("sub");
539 fs::create_dir_all(&nested).expect("create nested temp dir");
540
541 let pattern = root
542 .join("routes")
543 .join("**")
544 .join("*.yaml")
545 .to_string_lossy()
546 .to_string();
547
548 let dirs = resolve_watch_dirs(&[pattern]);
549 assert!(dirs.iter().any(|d| d.starts_with(root.join("routes"))));
550
551 fs::remove_dir_all(&root).expect("cleanup temp dir");
552 }
553
554 #[test]
555 fn resolve_watch_dirs_empty_patterns_returns_empty() {
556 let dirs = resolve_watch_dirs(&[]);
557 assert!(dirs.is_empty());
558 }
559
560 #[test]
561 fn resolve_watch_dirs_ignores_nonexistent_parent_of_glob() {
562 let dirs = resolve_watch_dirs(&["/nonexistent/path/**/*.yaml".to_string()]);
563 assert!(dirs.is_empty());
564 }
565
566 #[test]
567 fn reload_watcher_struct_is_unit() {
568 let _ = ReloadWatcher;
569 }
570}