camel_core/hot_reload/adapters/
reload_watcher.rs1use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::{
29 compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
30};
31use crate::lifecycle::application::route_definition::RouteDefinition;
32
33pub struct ReloadWatcher;
34
35impl ReloadWatcher {
36 pub async fn watch_and_reload<F>(
37 watch_dirs: Vec<PathBuf>,
38 controller: RuntimeExecutionHandle,
39 discover_fn: F,
40 shutdown: Option<CancellationToken>,
41 ) -> Result<(), CamelError>
42 where
43 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
44 {
45 watch_and_reload(watch_dirs, controller, discover_fn, shutdown).await
46 }
47
48 pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
49 resolve_watch_dirs(patterns)
50 }
51}
52
53pub async fn watch_and_reload<F>(
70 watch_dirs: Vec<PathBuf>,
71 controller: RuntimeExecutionHandle,
72 discover_fn: F,
73 shutdown: Option<CancellationToken>,
74) -> Result<(), CamelError>
75where
76 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
77{
78 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
79
80 let mut watcher = RecommendedWatcher::new(
81 move |res| {
82 let _ = tx.blocking_send(res);
83 },
84 notify::Config::default(),
85 )
86 .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
87
88 if watch_dirs.is_empty() {
89 tracing::warn!("hot-reload: no directories to watch");
90 }
91
92 for dir in &watch_dirs {
93 watcher
94 .watch(dir, RecursiveMode::NonRecursive)
95 .map_err(|e| {
96 CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
97 })?;
98 tracing::info!("hot-reload: watching {:?}", dir);
99 }
100
101 let debounce = Duration::from_millis(300);
105
106 let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
108
109 loop {
110 if is_cancelled() {
111 tracing::info!("hot-reload: shutdown requested — stopping watcher");
112 return Ok(());
113 }
114
115 let triggered = loop {
117 let recv_fut = rx.recv();
119 if let Some(token) = shutdown.as_ref() {
120 tokio::select! {
121 biased;
122 _ = token.cancelled() => {
123 tracing::info!("hot-reload: shutdown requested — stopping watcher");
124 return Ok(());
125 }
126 msg = recv_fut => {
127 match msg {
128 None => return Ok(()), Some(Err(e)) => {
130 tracing::warn!("hot-reload: watcher error: {e}");
131 continue;
132 }
133 Some(Ok(event)) => {
134 if is_reload_event(&event) {
135 break true;
136 }
137 }
138 }
139 }
140 }
141 } else {
142 match recv_fut.await {
143 None => return Ok(()), Some(Err(e)) => {
145 tracing::warn!("hot-reload: watcher error: {e}");
146 continue;
147 }
148 Some(Ok(event)) => {
149 if is_reload_event(&event) {
150 break true;
151 }
152 }
153 }
154 }
155 };
156
157 if !triggered {
158 continue;
159 }
160
161 let deadline = tokio::time::Instant::now() + debounce;
163 loop {
164 match tokio::time::timeout_at(deadline, rx.recv()).await {
165 Ok(Some(_)) => continue, Ok(None) => return Ok(()), Err(_elapsed) => break, }
169 }
170 tracing::info!("hot-reload: file change detected — reloading routes");
171
172 let new_defs = match discover_fn() {
174 Ok(defs) => defs,
175 Err(e) => {
176 tracing::warn!("hot-reload: route discovery failed: {e}");
177 continue;
178 }
179 };
180
181 let runtime_route_ids = match controller.runtime_route_ids().await {
183 Ok(route_ids) => route_ids,
184 Err(err) => {
185 tracing::warn!(
186 error = %err,
187 "hot-reload: failed to list runtime routes; skipping this reload cycle"
188 );
189 continue;
190 }
191 };
192 let actions = compute_reload_actions_from_runtime_snapshot(&new_defs, &runtime_route_ids);
193
194 if actions.is_empty() {
195 tracing::debug!("hot-reload: no route changes detected");
196 continue;
197 }
198
199 tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
200
201 let errors = execute_reload_actions(actions, new_defs, &controller).await;
202 for err in &errors {
203 tracing::warn!(
204 "hot-reload: error on route '{}' ({}): {}",
205 err.route_id,
206 err.action,
207 err.error
208 );
209 }
210 }
211}
212
213fn is_reload_event(event: &Event) -> bool {
218 let has_yaml = event.paths.iter().any(|p| {
219 p.extension()
220 .map(|e| e == "yaml" || e == "yml")
221 .unwrap_or(false)
222 });
223 has_yaml
224 && matches!(
225 event.kind,
226 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
227 )
228}
229
230pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
236 let mut dirs: HashSet<PathBuf> = HashSet::new();
237
238 for pattern in patterns {
239 let path = Path::new(pattern.as_str());
240
241 let dir = if path.is_dir() {
242 path.to_path_buf()
243 } else {
244 let mut candidate = path;
246 loop {
247 let s = candidate.to_string_lossy();
248 if s.contains('*') || s.contains('?') || s.contains('[') {
249 match candidate.parent() {
250 Some(p) => candidate = p,
251 None => break,
252 }
253 } else {
254 break;
255 }
256 }
257 candidate.to_path_buf()
258 };
259
260 if dir.as_os_str().is_empty() {
261 dirs.insert(PathBuf::from("."));
263 } else if dir.exists() {
264 dirs.insert(dir);
265 }
266 }
267
268 dirs.into_iter().collect()
269}