ralph_workflow/files/monitoring/
io.rs1use std::fs;
14use std::fs::OpenOptions;
15use std::path::Path;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21const NOTIFY_EVENT_QUEUE_CAPACITY: usize = 1024;
22
23fn bounded_event_queue<T>() -> (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>) {
24 std::sync::mpsc::sync_channel(NOTIFY_EVENT_QUEUE_CAPACITY)
25}
26
27pub struct PromptMonitor {
51 restoration_detected: Arc<AtomicBool>,
53 stop_signal: Arc<AtomicBool>,
55 monitor_thread: Option<thread::JoinHandle<()>>,
57 warnings_tx: std::sync::mpsc::SyncSender<String>,
58 warnings_rx: std::sync::mpsc::Receiver<String>,
59}
60
61impl PromptMonitor {
62 pub fn new() -> std::io::Result<Self> {
71 let prompt_path = Path::new("PROMPT.md");
73 if !prompt_path.exists() {
74 return Err(std::io::Error::new(
75 std::io::ErrorKind::NotFound,
76 "PROMPT.md does not exist - cannot monitor",
77 ));
78 }
79
80 let (warnings_tx, warnings_rx) = bounded_event_queue();
81
82 Ok(Self {
83 restoration_detected: Arc::new(AtomicBool::new(false)),
84 stop_signal: Arc::new(AtomicBool::new(false)),
85 monitor_thread: None,
86 warnings_tx,
87 warnings_rx,
88 })
89 }
90
91 pub fn start(&mut self) -> std::io::Result<()> {
103 if self.monitor_thread.is_some() {
104 return Err(std::io::Error::new(
105 std::io::ErrorKind::AlreadyExists,
106 "Monitor is already running",
107 ));
108 }
109
110 let restoration_flag = Arc::clone(&self.restoration_detected);
111 let stop_signal = Arc::clone(&self.stop_signal);
112 let warnings = self.warnings_tx.clone();
113
114 let handle = thread::spawn(move || {
115 Self::monitor_thread_main(&restoration_flag, &stop_signal, warnings);
116 });
117
118 self.monitor_thread = Some(handle);
119 Ok(())
120 }
121
122 fn monitor_thread_main(
127 restoration_detected: &Arc<AtomicBool>,
128 stop_signal: &Arc<AtomicBool>,
129 warnings: std::sync::mpsc::SyncSender<String>,
130 ) {
131 let (tx, rx) = bounded_event_queue();
134 match setup_directory_watcher(tx) {
135 Ok(_watcher) => run_watcher_event_loop(&rx, restoration_detected, stop_signal),
136 Err(e) => {
137 push_warning(&warnings, watcher_setup_error_message(&e));
138 Self::polling_monitor(restoration_detected, stop_signal);
139 }
140 }
141 }
142
143 fn handle_fs_event(event: ¬ify::Event, restoration_detected: &Arc<AtomicBool>) {
145 if is_restore_trigger_event(event) && Self::restore_from_backup() {
146 restoration_detected.store(true, Ordering::Release);
147 }
148 }
149
150 fn polling_monitor(restoration_detected: &Arc<AtomicBool>, stop_signal: &Arc<AtomicBool>) {
155 let previous_exists = AtomicBool::new(Path::new("PROMPT.md").exists());
156
157 std::iter::from_fn(|| {
158 if stop_signal.load(Ordering::Relaxed) {
159 return None;
160 }
161
162 thread::sleep(Duration::from_millis(100));
163 Some(Path::new("PROMPT.md").exists())
164 })
165 .for_each(|current_exists| {
166 let previous = previous_exists.swap(current_exists, Ordering::AcqRel);
167 if previous && !current_exists && Self::restore_from_backup() {
168 restoration_detected.store(true, Ordering::Release);
169 }
170 });
171 }
172
173 #[must_use]
185 pub fn restore_from_backup() -> bool {
186 let backup_paths = [
187 Path::new(".agent/PROMPT.md.backup"),
188 Path::new(".agent/PROMPT.md.backup.1"),
189 Path::new(".agent/PROMPT.md.backup.2"),
190 ];
191
192 let prompt_path = Path::new("PROMPT.md");
193
194 backup_paths
195 .iter()
196 .filter_map(|backup_path| read_backup_content_secure(backup_path))
197 .filter(|backup_content| !backup_content.trim().is_empty())
198 .any(|backup_content| {
199 restore_prompt_content_atomic(prompt_path, backup_content.as_bytes()).is_ok()
200 })
201 }
202
203 #[must_use]
220 pub fn check_and_restore(&self) -> bool {
221 self.restoration_detected.swap(false, Ordering::AcqRel)
222 }
223
224 #[must_use]
226 pub fn drain_warnings(&self) -> Vec<String> {
227 drain_warnings(&self.warnings_rx)
228 }
229
230 #[must_use]
234 pub fn stop(mut self) -> Vec<String> {
235 self.stop_signal.store(true, Ordering::Release);
236
237 if let Some(handle) = self.monitor_thread.take() {
238 if let Err(panic_payload) = handle.join() {
239 push_warning(
240 &self.warnings_tx,
241 format!(
242 "File monitoring thread panicked: {}",
243 extract_panic_message(panic_payload)
244 ),
245 );
246 }
247 }
248
249 self.drain_warnings()
250 }
251}
252
253enum MonitorSetupError {
254 Create(notify::Error),
255 Watch(notify::Error),
256}
257
258fn watcher_setup_error_message(err: &MonitorSetupError) -> String {
260 match err {
261 MonitorSetupError::Create(e) => format!(
262 "Failed to create file system watcher: {e}. Falling back to periodic polling for PROMPT.md protection."
263 ),
264 MonitorSetupError::Watch(e) => format!(
265 "Failed to watch current directory: {e}. Falling back to periodic polling for PROMPT.md protection."
266 ),
267 }
268}
269
270fn run_watcher_event_loop(
272 rx: &std::sync::mpsc::Receiver<notify::Result<notify::Event>>,
273 restoration_detected: &Arc<AtomicBool>,
274 stop_signal: &Arc<AtomicBool>,
275) {
276 std::iter::from_fn(|| {
277 if stop_signal.load(Ordering::Relaxed) {
278 return None;
279 }
280 Some(rx.recv_timeout(Duration::from_millis(100)))
281 })
282 .take_while(|received| {
283 !matches!(
284 received,
285 Err(std::sync::mpsc::RecvTimeoutError::Disconnected)
286 )
287 })
288 .for_each(|received| {
289 if let Ok(Ok(event)) = received {
290 PromptMonitor::handle_fs_event(&event, restoration_detected);
291 std::iter::from_fn(|| rx.try_recv().ok())
293 .filter_map(Result::ok)
294 .for_each(|next_event| {
295 PromptMonitor::handle_fs_event(&next_event, restoration_detected);
296 });
297 }
298 });
299}
300
301fn extract_panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
303 payload
304 .downcast_ref::<String>()
305 .cloned()
306 .or_else(|| payload.downcast_ref::<&str>().map(ToString::to_string))
307 .or_else(|| payload.downcast_ref::<&String>().map(|s| (*s).clone()))
308 .unwrap_or_else(|| {
309 format!(
310 "<unknown panic type: {}>",
311 std::any::type_name_of_val(&payload)
312 )
313 })
314}
315
316fn setup_directory_watcher(
317 event_sender: std::sync::mpsc::SyncSender<notify::Result<notify::Event>>,
318) -> std::result::Result<notify::RecommendedWatcher, MonitorSetupError> {
319 notify::recommended_watcher(move |res| {
320 let _ = event_sender.try_send(res);
322 })
323 .map_err(MonitorSetupError::Create)
324 .and_then(|watcher| {
325 watcher
326 .with_current_directory_watch()
327 .map_err(MonitorSetupError::Watch)
328 })
329}
330
331trait WatcherRegistrationExt {
332 fn with_current_directory_watch(self) -> notify::Result<Self>
333 where
334 Self: Sized;
335}
336
337impl WatcherRegistrationExt for notify::RecommendedWatcher {
338 fn with_current_directory_watch(mut self) -> notify::Result<Self> {
339 use notify::Watcher;
340
341 self.watch(Path::new("."), notify::RecursiveMode::NonRecursive)?;
342 Ok(self)
343 }
344}
345
346fn push_warning(warnings: &std::sync::mpsc::SyncSender<String>, warning: String) {
351 let _ = warnings.try_send(warning);
352}
353
354fn drain_warnings(warnings: &std::sync::mpsc::Receiver<String>) -> Vec<String> {
355 std::iter::from_fn(|| warnings.try_recv().ok()).collect()
356}
357
358fn read_backup_content_secure(path: &Path) -> Option<String> {
359 #[cfg(unix)]
364 {
365 use std::os::unix::fs::{MetadataExt, OpenOptionsExt};
366
367 let file = OpenOptions::new()
368 .read(true)
369 .custom_flags(libc::O_NOFOLLOW)
370 .open(path)
371 .ok()?;
372
373 let metadata = file.metadata().ok()?;
374 if !metadata.is_file() {
375 return None;
376 }
377 if metadata.nlink() != 1 {
378 return None;
379 }
380
381 std::io::read_to_string(file).ok()
382 }
383
384 #[cfg(not(unix))]
385 {
386 let meta = fs::symlink_metadata(path).ok()?;
387 if meta.file_type().is_symlink() {
388 return None;
389 }
390 if !meta.is_file() {
391 return None;
392 }
393
394 std::fs::read_to_string(path).ok()
395 }
396}
397
398fn ensure_not_directory(path: &Path) -> std::io::Result<()> {
400 fs::symlink_metadata(path)
401 .ok()
402 .filter(|m| m.is_dir())
403 .map_or(Ok(()), |_| {
404 Err(std::io::Error::other("PROMPT.md path is a directory"))
405 })
406}
407
408fn write_and_sync_temp(path: &Path, content: &[u8]) -> std::io::Result<()> {
410 fs::write(path, content)?;
411 let _ = OpenOptions::new()
412 .write(true)
413 .open(path)
414 .and_then(|file| file.sync_all());
415 Ok(())
416}
417
418fn make_file_readonly(path: &Path) -> std::io::Result<()> {
420 #[cfg(unix)]
421 {
422 use std::os::unix::fs::PermissionsExt;
423 fs::set_permissions(
424 path,
425 <fs::Permissions as PermissionsExt>::from_mode(0o444),
426 )?;
427 }
428
429 #[cfg(windows)]
430 {
431 let mut perms = fs::metadata(path)?.permissions();
432 perms.set_readonly(true);
433 fs::set_permissions(path, perms)?;
434 }
435
436 Ok(())
437}
438
439fn rename_or_cleanup(src: &Path, dest: &Path) -> std::io::Result<()> {
441 #[cfg(windows)]
443 if dest.exists() {
444 let _ = fs::remove_file(dest);
445 }
446
447 fs::rename(src, dest).inspect_err(|_e| {
450 let _ = fs::remove_file(src);
451 })
452}
453
454fn restore_prompt_content_atomic(prompt_path: &Path, content: &[u8]) -> std::io::Result<()> {
455 ensure_not_directory(prompt_path)?;
456 let temp_name = unique_temp_name();
457 let temp_path = Path::new(&temp_name);
458 write_and_sync_temp(temp_path, content)?;
459 make_file_readonly(temp_path)?;
460 rename_or_cleanup(temp_path, prompt_path)
461}
462
463fn unique_temp_name() -> String {
464 use std::time::{SystemTime, UNIX_EPOCH};
465
466 let nanos = SystemTime::now()
467 .duration_since(UNIX_EPOCH)
468 .unwrap_or_default()
469 .as_nanos();
470 let pid = std::process::id();
471 format!(".prompt_restore_tmp_{pid}_{nanos}")
472}
473
474fn is_prompt_md_path(path: &Path) -> bool {
475 matches!(path.file_name(), Some(name) if name == "PROMPT.md")
476}
477
478fn is_restore_trigger_event(event: ¬ify::Event) -> bool {
479 matches!(event.kind, notify::EventKind::Remove(_))
480 && event.paths.iter().any(|path| is_prompt_md_path(path))
481}
482
483impl Drop for PromptMonitor {
484 fn drop(&mut self) {
485 self.stop_signal.store(true, Ordering::Release);
487
488 let _ = self.monitor_thread.take();
491 }
492}
493
494#[cfg(test)]
497mod tests {
498 use super::{drain_warnings, is_restore_trigger_event, push_warning};
499 use std::path::PathBuf;
500
501 fn remove_event(paths: Vec<&str>) -> notify::Event {
502 paths.into_iter().map(PathBuf::from).fold(
503 notify::Event::new(notify::EventKind::Remove(notify::event::RemoveKind::Any)),
504 |event, path| event.add_path(path),
505 )
506 }
507
508 fn create_event(paths: Vec<&str>) -> notify::Event {
509 paths.into_iter().map(PathBuf::from).fold(
510 notify::Event::new(notify::EventKind::Create(notify::event::CreateKind::Any)),
511 |event, path| event.add_path(path),
512 )
513 }
514
515 #[test]
516 fn drain_warnings_clears_buffer_after_read() {
517 let (warnings_tx, warnings_rx) = std::sync::mpsc::sync_channel::<String>(16);
518
519 push_warning(&warnings_tx, "first warning".to_string());
520 push_warning(&warnings_tx, "second warning".to_string());
521
522 let first_drain = drain_warnings(&warnings_rx);
523 assert_eq!(first_drain.len(), 2);
524
525 let second_drain = drain_warnings(&warnings_rx);
526 assert!(
527 second_drain.is_empty(),
528 "warnings should be cleared after drain"
529 );
530 }
531
532 #[test]
533 fn restore_trigger_event_requires_remove_kind_and_prompt_path() {
534 let remove_prompt = remove_event(vec!["PROMPT.md"]);
535 assert!(is_restore_trigger_event(&remove_prompt));
536
537 let remove_other = remove_event(vec!["README.md"]);
538 assert!(!is_restore_trigger_event(&remove_other));
539
540 let create_prompt = create_event(vec!["PROMPT.md"]);
541 assert!(!is_restore_trigger_event(&create_prompt));
542 }
543}