camel_core/hot_reload/adapters/
reload_watcher.rs1use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::{
29 compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
30};
31use crate::lifecycle::application::route_definition::RouteDefinition;
32
33pub struct ReloadWatcher;
34
35impl ReloadWatcher {
36 pub async fn watch_and_reload<F>(
37 watch_dirs: Vec<PathBuf>,
38 controller: RuntimeExecutionHandle,
39 discover_fn: F,
40 shutdown: Option<CancellationToken>,
41 drain_timeout: Duration,
42 debounce: Duration,
43 ) -> Result<(), CamelError>
44 where
45 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
46 {
47 watch_and_reload(
48 watch_dirs,
49 controller,
50 discover_fn,
51 shutdown,
52 drain_timeout,
53 debounce,
54 )
55 .await
56 }
57
58 pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
59 resolve_watch_dirs(patterns)
60 }
61}
62
63pub async fn watch_and_reload<F>(
80 watch_dirs: Vec<PathBuf>,
81 controller: RuntimeExecutionHandle,
82 discover_fn: F,
83 shutdown: Option<CancellationToken>,
84 drain_timeout: Duration,
85 debounce: Duration,
86) -> Result<(), CamelError>
87where
88 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
89{
90 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
91
92 let mut watcher = RecommendedWatcher::new(
93 move |res| {
94 let _ = tx.blocking_send(res);
95 },
96 notify::Config::default(),
97 )
98 .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
99
100 if watch_dirs.is_empty() {
101 tracing::warn!("hot-reload: no directories to watch");
102 }
103
104 for dir in &watch_dirs {
105 watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
106 CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
107 })?;
108 tracing::info!("hot-reload: watching {:?}", dir);
109 }
110
111 let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
117
118 loop {
119 if is_cancelled() {
120 tracing::info!("hot-reload: shutdown requested — stopping watcher");
121 return Ok(());
122 }
123
124 let triggered = loop {
126 let recv_fut = rx.recv();
128 if let Some(token) = shutdown.as_ref() {
129 tokio::select! {
130 biased;
131 _ = token.cancelled() => {
132 tracing::info!("hot-reload: shutdown requested — stopping watcher");
133 return Ok(());
134 }
135 msg = recv_fut => {
136 match msg {
137 None => return Ok(()), Some(Err(e)) => {
139 tracing::warn!("hot-reload: watcher error: {e}");
140 continue;
141 }
142 Some(Ok(event)) => {
143 if is_reload_event(&event) {
144 break true;
145 }
146 }
147 }
148 }
149 }
150 } else {
151 match recv_fut.await {
152 None => return Ok(()), Some(Err(e)) => {
154 tracing::warn!("hot-reload: watcher error: {e}");
155 continue;
156 }
157 Some(Ok(event)) => {
158 if is_reload_event(&event) {
159 break true;
160 }
161 }
162 }
163 }
164 };
165
166 if !triggered {
167 continue;
168 }
169
170 let deadline = tokio::time::Instant::now() + debounce;
172 loop {
173 match tokio::time::timeout_at(deadline, rx.recv()).await {
174 Ok(Some(_)) => continue, Ok(None) => return Ok(()), Err(_elapsed) => break, }
178 }
179 tracing::info!("hot-reload: file change detected — reloading routes");
180
181 let new_defs = match discover_fn() {
183 Ok(defs) => defs,
184 Err(e) => {
185 tracing::warn!("hot-reload: route discovery failed: {e}");
186 continue;
187 }
188 };
189
190 let runtime_route_ids = match controller.runtime_route_ids().await {
192 Ok(route_ids) => route_ids,
193 Err(err) => {
194 tracing::warn!(
195 error = %err,
196 "hot-reload: failed to list runtime routes; skipping this reload cycle"
197 );
198 continue;
199 }
200 };
201 let mut runtime_hashes: std::collections::HashMap<String, u64> =
202 std::collections::HashMap::new();
203 for id in &runtime_route_ids {
204 if let Some(hash) = controller.route_source_hash(id).await {
205 runtime_hashes.insert(id.clone(), hash);
206 }
207 }
208
209 let actions = compute_reload_actions_from_runtime_snapshot(
210 &new_defs,
211 &runtime_route_ids,
212 &|route_id: &str| runtime_hashes.get(route_id).copied(),
213 );
214
215 if actions.is_empty() {
216 tracing::debug!("hot-reload: no route changes detected");
217 continue;
218 }
219
220 tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
221
222 let errors = execute_reload_actions(actions, new_defs, &controller, drain_timeout).await;
223 for err in &errors {
224 tracing::warn!(
225 "hot-reload: error on route '{}' ({}): {}",
226 err.route_id,
227 err.action,
228 err.error
229 );
230 }
231 }
232}
233
234fn is_reload_event(event: &Event) -> bool {
239 let has_yaml = event.paths.iter().any(|p| {
240 p.extension()
241 .map(|e| e == "yaml" || e == "yml")
242 .unwrap_or(false)
243 });
244 has_yaml
245 && matches!(
246 event.kind,
247 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
248 )
249}
250
251pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
257 let mut dirs: HashSet<PathBuf> = HashSet::new();
258
259 for pattern in patterns {
260 let path = Path::new(pattern.as_str());
261
262 let dir = if path.is_dir() {
263 path.to_path_buf()
264 } else {
265 let mut candidate = path;
267 loop {
268 let s = candidate.to_string_lossy();
269 if s.contains('*') || s.contains('?') || s.contains('[') {
270 match candidate.parent() {
271 Some(p) => candidate = p,
272 None => break,
273 }
274 } else {
275 break;
276 }
277 }
278 candidate.to_path_buf()
279 };
280
281 if dir.as_os_str().is_empty() {
282 dirs.insert(PathBuf::from("."));
284 } else if dir.exists() {
285 dirs.insert(dir);
286 }
287 }
288
289 dirs.into_iter().collect()
290}