camel_core/hot_reload/adapters/
reload_watcher.rs1use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::{
29 compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
30};
31use crate::lifecycle::application::route_definition::RouteDefinition;
32
33pub struct ReloadWatcher;
34
35impl ReloadWatcher {
36 pub async fn watch_and_reload<F>(
37 watch_dirs: Vec<PathBuf>,
38 controller: RuntimeExecutionHandle,
39 discover_fn: F,
40 shutdown: Option<CancellationToken>,
41 drain_timeout: Duration,
42 debounce: Duration,
43 ) -> Result<(), CamelError>
44 where
45 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
46 {
47 watch_and_reload(
48 watch_dirs,
49 controller,
50 discover_fn,
51 shutdown,
52 drain_timeout,
53 debounce,
54 )
55 .await
56 }
57
58 pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
59 resolve_watch_dirs(patterns)
60 }
61}
62
63pub async fn watch_and_reload<F>(
80 watch_dirs: Vec<PathBuf>,
81 controller: RuntimeExecutionHandle,
82 discover_fn: F,
83 shutdown: Option<CancellationToken>,
84 drain_timeout: Duration,
85 debounce: Duration,
86) -> Result<(), CamelError>
87where
88 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
89{
90 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
91
92 let mut watcher = RecommendedWatcher::new(
93 move |res| {
94 let _ = tx.blocking_send(res);
95 },
96 notify::Config::default(),
97 )
98 .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
99
100 if watch_dirs.is_empty() {
101 tracing::warn!("hot-reload: no directories to watch");
102 }
103
104 for dir in &watch_dirs {
105 watcher
106 .watch(dir, RecursiveMode::NonRecursive)
107 .map_err(|e| {
108 CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
109 })?;
110 tracing::info!("hot-reload: watching {:?}", dir);
111 }
112
113 let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
119
120 loop {
121 if is_cancelled() {
122 tracing::info!("hot-reload: shutdown requested — stopping watcher");
123 return Ok(());
124 }
125
126 let triggered = loop {
128 let recv_fut = rx.recv();
130 if let Some(token) = shutdown.as_ref() {
131 tokio::select! {
132 biased;
133 _ = token.cancelled() => {
134 tracing::info!("hot-reload: shutdown requested — stopping watcher");
135 return Ok(());
136 }
137 msg = recv_fut => {
138 match msg {
139 None => return Ok(()), Some(Err(e)) => {
141 tracing::warn!("hot-reload: watcher error: {e}");
142 continue;
143 }
144 Some(Ok(event)) => {
145 if is_reload_event(&event) {
146 break true;
147 }
148 }
149 }
150 }
151 }
152 } else {
153 match recv_fut.await {
154 None => return Ok(()), Some(Err(e)) => {
156 tracing::warn!("hot-reload: watcher error: {e}");
157 continue;
158 }
159 Some(Ok(event)) => {
160 if is_reload_event(&event) {
161 break true;
162 }
163 }
164 }
165 }
166 };
167
168 if !triggered {
169 continue;
170 }
171
172 let deadline = tokio::time::Instant::now() + debounce;
174 loop {
175 match tokio::time::timeout_at(deadline, rx.recv()).await {
176 Ok(Some(_)) => continue, Ok(None) => return Ok(()), Err(_elapsed) => break, }
180 }
181 tracing::info!("hot-reload: file change detected — reloading routes");
182
183 let new_defs = match discover_fn() {
185 Ok(defs) => defs,
186 Err(e) => {
187 tracing::warn!("hot-reload: route discovery failed: {e}");
188 continue;
189 }
190 };
191
192 let runtime_route_ids = match controller.runtime_route_ids().await {
194 Ok(route_ids) => route_ids,
195 Err(err) => {
196 tracing::warn!(
197 error = %err,
198 "hot-reload: failed to list runtime routes; skipping this reload cycle"
199 );
200 continue;
201 }
202 };
203 let mut runtime_hashes: std::collections::HashMap<String, u64> =
204 std::collections::HashMap::new();
205 for id in &runtime_route_ids {
206 if let Some(hash) = controller.route_source_hash(id).await {
207 runtime_hashes.insert(id.clone(), hash);
208 }
209 }
210
211 let actions = compute_reload_actions_from_runtime_snapshot(
212 &new_defs,
213 &runtime_route_ids,
214 &|route_id: &str| runtime_hashes.get(route_id).copied(),
215 );
216
217 if actions.is_empty() {
218 tracing::debug!("hot-reload: no route changes detected");
219 continue;
220 }
221
222 tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
223
224 let errors = execute_reload_actions(actions, new_defs, &controller, drain_timeout).await;
225 for err in &errors {
226 tracing::warn!(
227 "hot-reload: error on route '{}' ({}): {}",
228 err.route_id,
229 err.action,
230 err.error
231 );
232 }
233 }
234}
235
236fn is_reload_event(event: &Event) -> bool {
241 let has_yaml = event.paths.iter().any(|p| {
242 p.extension()
243 .map(|e| e == "yaml" || e == "yml")
244 .unwrap_or(false)
245 });
246 has_yaml
247 && matches!(
248 event.kind,
249 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
250 )
251}
252
253pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
259 let mut dirs: HashSet<PathBuf> = HashSet::new();
260
261 for pattern in patterns {
262 let path = Path::new(pattern.as_str());
263
264 let dir = if path.is_dir() {
265 path.to_path_buf()
266 } else {
267 let mut candidate = path;
269 loop {
270 let s = candidate.to_string_lossy();
271 if s.contains('*') || s.contains('?') || s.contains('[') {
272 match candidate.parent() {
273 Some(p) => candidate = p,
274 None => break,
275 }
276 } else {
277 break;
278 }
279 }
280 candidate.to_path_buf()
281 };
282
283 if dir.as_os_str().is_empty() {
284 dirs.insert(PathBuf::from("."));
286 } else if dir.exists() {
287 dirs.insert(dir);
288 }
289 }
290
291 dirs.into_iter().collect()
292}