camel_core/hot_reload/adapters/
reload_watcher.rs1use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::Duration;
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use tokio::sync::mpsc;
23use tokio_util::sync::CancellationToken;
24
25use camel_api::CamelError;
26
27use crate::context::RuntimeExecutionHandle;
28use crate::hot_reload::application::{
29 compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
30};
31use crate::lifecycle::application::route_definition::RouteDefinition;
32
33pub struct ReloadWatcher;
34
35impl ReloadWatcher {
36 pub async fn watch_and_reload<F>(
37 watch_dirs: Vec<PathBuf>,
38 controller: RuntimeExecutionHandle,
39 discover_fn: F,
40 shutdown: Option<CancellationToken>,
41 drain_timeout: Duration,
42 debounce: Duration,
43 ) -> Result<(), CamelError>
44 where
45 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
46 {
47 watch_and_reload(
48 watch_dirs,
49 controller,
50 discover_fn,
51 shutdown,
52 drain_timeout,
53 debounce,
54 )
55 .await
56 }
57
58 pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
59 resolve_watch_dirs(patterns)
60 }
61}
62
63pub async fn watch_and_reload<F>(
80 watch_dirs: Vec<PathBuf>,
81 controller: RuntimeExecutionHandle,
82 discover_fn: F,
83 shutdown: Option<CancellationToken>,
84 drain_timeout: Duration,
85 debounce: Duration,
86) -> Result<(), CamelError>
87where
88 F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
89{
90 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
91
92 let mut watcher = RecommendedWatcher::new(
93 move |res| {
94 let _ = tx.blocking_send(res);
95 },
96 notify::Config::default(),
97 )
98 .map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
99
100 if watch_dirs.is_empty() {
101 tracing::warn!("hot-reload: no directories to watch");
102 }
103
104 for dir in &watch_dirs {
105 watcher
106 .watch(dir, RecursiveMode::NonRecursive)
107 .map_err(|e| {
108 CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
109 })?;
110 tracing::info!("hot-reload: watching {:?}", dir);
111 }
112
113 let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
119
120 loop {
121 if is_cancelled() {
122 tracing::info!("hot-reload: shutdown requested — stopping watcher");
123 return Ok(());
124 }
125
126 let triggered = loop {
128 let recv_fut = rx.recv();
130 if let Some(token) = shutdown.as_ref() {
131 tokio::select! {
132 biased;
133 _ = token.cancelled() => {
134 tracing::info!("hot-reload: shutdown requested — stopping watcher");
135 return Ok(());
136 }
137 msg = recv_fut => {
138 match msg {
139 None => return Ok(()), Some(Err(e)) => {
141 tracing::warn!("hot-reload: watcher error: {e}");
142 continue;
143 }
144 Some(Ok(event)) => {
145 if is_reload_event(&event) {
146 break true;
147 }
148 }
149 }
150 }
151 }
152 } else {
153 match recv_fut.await {
154 None => return Ok(()), Some(Err(e)) => {
156 tracing::warn!("hot-reload: watcher error: {e}");
157 continue;
158 }
159 Some(Ok(event)) => {
160 if is_reload_event(&event) {
161 break true;
162 }
163 }
164 }
165 }
166 };
167
168 if !triggered {
169 continue;
170 }
171
172 let deadline = tokio::time::Instant::now() + debounce;
174 loop {
175 match tokio::time::timeout_at(deadline, rx.recv()).await {
176 Ok(Some(_)) => continue, Ok(None) => return Ok(()), Err(_elapsed) => break, }
180 }
181 tracing::info!("hot-reload: file change detected — reloading routes");
182
183 let new_defs = match discover_fn() {
185 Ok(defs) => defs,
186 Err(e) => {
187 tracing::warn!("hot-reload: route discovery failed: {e}");
188 continue;
189 }
190 };
191
192 let runtime_route_ids = match controller.runtime_route_ids().await {
194 Ok(route_ids) => route_ids,
195 Err(err) => {
196 tracing::warn!(
197 error = %err,
198 "hot-reload: failed to list runtime routes; skipping this reload cycle"
199 );
200 continue;
201 }
202 };
203 let actions = compute_reload_actions_from_runtime_snapshot(&new_defs, &runtime_route_ids);
204
205 if actions.is_empty() {
206 tracing::debug!("hot-reload: no route changes detected");
207 continue;
208 }
209
210 tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
211
212 let errors = execute_reload_actions(actions, new_defs, &controller, drain_timeout).await;
213 for err in &errors {
214 tracing::warn!(
215 "hot-reload: error on route '{}' ({}): {}",
216 err.route_id,
217 err.action,
218 err.error
219 );
220 }
221 }
222}
223
224fn is_reload_event(event: &Event) -> bool {
229 let has_yaml = event.paths.iter().any(|p| {
230 p.extension()
231 .map(|e| e == "yaml" || e == "yml")
232 .unwrap_or(false)
233 });
234 has_yaml
235 && matches!(
236 event.kind,
237 EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
238 )
239}
240
241pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
247 let mut dirs: HashSet<PathBuf> = HashSet::new();
248
249 for pattern in patterns {
250 let path = Path::new(pattern.as_str());
251
252 let dir = if path.is_dir() {
253 path.to_path_buf()
254 } else {
255 let mut candidate = path;
257 loop {
258 let s = candidate.to_string_lossy();
259 if s.contains('*') || s.contains('?') || s.contains('[') {
260 match candidate.parent() {
261 Some(p) => candidate = p,
262 None => break,
263 }
264 } else {
265 break;
266 }
267 }
268 candidate.to_path_buf()
269 };
270
271 if dir.as_os_str().is_empty() {
272 dirs.insert(PathBuf::from("."));
274 } else if dir.exists() {
275 dirs.insert(dir);
276 }
277 }
278
279 dirs.into_iter().collect()
280}