ralph_workflow/files/protection/monitoring.rs
1//! Real-time file system monitoring for PROMPT.md protection.
2//!
3//! This module provides proactive monitoring to detect deletion attempts
4//! on PROMPT.md immediately, rather than waiting for periodic checks.
5//! It uses the `notify` crate for cross-platform file system events.
6//!
7//! # Effect System Exception
8//!
9//! This module uses `std::fs` directly rather than the `Workspace` trait.
10//! This is a documented exception to the effect system architecture because:
11//!
12//! 1. **Real-time filesystem monitoring**: The `notify` crate requires watching
13//! the actual filesystem for events (inotify, `FSEvents`, `ReadDirectoryChangesW`).
14//! 2. **Background thread operation**: The monitor runs in a separate thread
15//! that cannot share `PhaseContext` or workspace references.
16//! 3. **OS-level event handling**: File system events are inherently tied to
17//! the real filesystem, not an abstraction layer.
18//!
19//! This exception is documented in `docs/architecture/effect-system.md`.
20//!
21//! # Design
22//!
23//! The monitor runs in a background thread and watches for deletion events
24//! on PROMPT.md. When a deletion is detected, it's automatically restored
25//! from backup. The main thread can poll the monitor to check if any
26//! restoration events occurred.
27//!
28//! # Platform Support
29//!
30//! - **Unix/Linux**: inotify via `notify` crate
31//! - **macOS**: `FSEvents` via `notify` crate
32//! - **Windows**: `ReadDirectoryChangesW` via `notify` crate
33
34use std::fs;
35use std::fs::OpenOptions;
36use std::path::Path;
37use std::sync::atomic::{AtomicBool, Ordering};
38use std::sync::{Arc, Mutex};
39use std::thread;
40use std::time::Duration;
41
42const NOTIFY_EVENT_QUEUE_CAPACITY: usize = 1024;
43
44fn bounded_event_queue<T>() -> (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>) {
45 std::sync::mpsc::sync_channel(NOTIFY_EVENT_QUEUE_CAPACITY)
46}
47
48/// File system monitor for detecting PROMPT.md deletion events.
49///
50/// The monitor watches for deletion events and automatically restores
51/// PROMPT.md from backup when detected. Monitoring happens in a background
52/// thread, so the main thread is not blocked.
53///
54/// # Example
55///
56/// ```no_run
57/// # use ralph_workflow::files::protection::monitoring::PromptMonitor;
58/// let mut monitor = PromptMonitor::new().unwrap();
59/// monitor.start().unwrap();
60///
61/// // ... run pipeline phases ...
62///
63/// // Check if any restoration occurred
64/// if monitor.check_and_restore() {
65/// println!("PROMPT.md was restored!");
66/// }
67///
68/// monitor.stop();
69/// # Ok::<(), std::io::Error>(())
70/// ```
71pub struct PromptMonitor {
72 /// Flag indicating if PROMPT.md was deleted and restored
73 restoration_detected: Arc<AtomicBool>,
74 /// Flag to signal the monitor thread to stop
75 stop_signal: Arc<AtomicBool>,
76 /// Handle to the monitor thread (None if not started)
77 monitor_thread: Option<thread::JoinHandle<()>>,
78 /// Warnings emitted by the monitor thread.
79 ///
80 /// This avoids printing directly from library/background thread code.
81 warnings: Arc<Mutex<Vec<String>>>,
82}
83
84impl PromptMonitor {
85 /// Create a new file system monitor for PROMPT.md.
86 ///
87 /// Returns an error if the current directory cannot be accessed or
88 /// if PROMPT.md doesn't exist (we need to know what to watch for).
89 ///
90 /// # Errors
91 ///
92 /// Returns error if the operation fails.
93 pub fn new() -> std::io::Result<Self> {
94 // Verify we're in a valid directory with PROMPT.md
95 let prompt_path = Path::new("PROMPT.md");
96 if !prompt_path.exists() {
97 return Err(std::io::Error::new(
98 std::io::ErrorKind::NotFound,
99 "PROMPT.md does not exist - cannot monitor",
100 ));
101 }
102
103 Ok(Self {
104 restoration_detected: Arc::new(AtomicBool::new(false)),
105 stop_signal: Arc::new(AtomicBool::new(false)),
106 monitor_thread: None,
107 warnings: Arc::new(Mutex::new(Vec::new())),
108 })
109 }
110
111 /// Start monitoring PROMPT.md for deletion events.
112 ///
113 /// This spawns a background thread that watches for file system events.
114 /// Returns immediately; monitoring happens asynchronously.
115 ///
116 /// The monitor will automatically restore PROMPT.md from backup if
117 /// deletion is detected.
118 ///
119 /// # Errors
120 ///
121 /// Returns error if the operation fails.
122 pub fn start(&mut self) -> std::io::Result<()> {
123 if self.monitor_thread.is_some() {
124 return Err(std::io::Error::new(
125 std::io::ErrorKind::AlreadyExists,
126 "Monitor is already running",
127 ));
128 }
129
130 let restoration_flag = Arc::clone(&self.restoration_detected);
131 let stop_signal = Arc::clone(&self.stop_signal);
132 let warnings = Arc::clone(&self.warnings);
133
134 let handle = thread::spawn(move || {
135 Self::monitor_thread_main(&restoration_flag, &stop_signal, &warnings);
136 });
137
138 self.monitor_thread = Some(handle);
139 Ok(())
140 }
141
142 /// Background thread entry point for file system monitoring.
143 ///
144 /// This thread watches the current directory for deletion events on
145 /// PROMPT.md and restores from backup when detected.
146 fn monitor_thread_main(
147 restoration_detected: &Arc<AtomicBool>,
148 stop_signal: &Arc<AtomicBool>,
149 warnings: &Arc<Mutex<Vec<String>>>,
150 ) {
151 use notify::Watcher;
152
153 // Bounded queue for notify events.
154 //
155 // The notify crate can emit bursts of events under heavy filesystem activity.
156 // We cap the in-memory queue to avoid unbounded growth; when full, we drop
157 // events because PROMPT.md deletion protection is best-effort and repeated
158 // events are coalescable (the polling fallback also covers missed events).
159 let (tx, rx) = bounded_event_queue();
160 let event_sender = tx;
161
162 // Create a watcher for the current directory
163 let mut watcher = match notify::recommended_watcher(move |res| {
164 // Drop if full to keep memory bounded.
165 let _ = event_sender.try_send(res);
166 }) {
167 Ok(w) => w,
168 Err(e) => {
169 push_warning(
170 warnings,
171 format!(
172 "Failed to create file system watcher: {e}. Falling back to periodic polling for PROMPT.md protection."
173 ),
174 );
175 // Fallback to polling if watcher creation fails
176 Self::polling_monitor(restoration_detected, stop_signal);
177 return;
178 }
179 };
180
181 // Watch the current directory for events
182 if let Err(e) = watcher.watch(Path::new("."), notify::RecursiveMode::NonRecursive) {
183 push_warning(
184 warnings,
185 format!(
186 "Failed to watch current directory: {e}. Falling back to periodic polling for PROMPT.md protection."
187 ),
188 );
189 Self::polling_monitor(restoration_detected, stop_signal);
190 return;
191 }
192
193 // Process events until stop signal is received
194 let mut prompt_existed_last_check = true;
195
196 while !stop_signal.load(Ordering::Relaxed) {
197 // Check for events with a short timeout
198 match rx.recv_timeout(Duration::from_millis(100)) {
199 Ok(Ok(event)) => {
200 Self::handle_fs_event(
201 &event,
202 restoration_detected,
203 &mut prompt_existed_last_check,
204 );
205
206 // Drain any queued events to coalesce bursts.
207 while let Ok(next) = rx.try_recv() {
208 if let Ok(next_event) = next {
209 Self::handle_fs_event(
210 &next_event,
211 restoration_detected,
212 &mut prompt_existed_last_check,
213 );
214 }
215 }
216 }
217 Ok(Err(_)) | Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
218 // Error in watcher or timeout - continue anyway
219 }
220 Err(_) => {
221 // Channel disconnected - stop monitoring
222 break;
223 }
224 }
225 }
226 }
227
228 /// Handle a file system event from the watcher.
229 fn handle_fs_event(
230 event: ¬ify::Event,
231 restoration_detected: &Arc<AtomicBool>,
232 _prompt_existed_last_check: &mut bool,
233 ) {
234 for path in &event.paths {
235 if is_prompt_md_path(path) {
236 // Check for remove event
237 if matches!(event.kind, notify::EventKind::Remove(_)) {
238 // PROMPT.md was removed - restore it
239 if Self::restore_from_backup() {
240 restoration_detected.store(true, Ordering::Release);
241 }
242 }
243 }
244 }
245 }
246
247 /// Fallback polling-based monitor when file system watcher fails.
248 ///
249 /// Some filesystems (NFS, network drives) don't support file system
250 /// events. This fallback polls every 100ms to check if PROMPT.md exists.
251 fn polling_monitor(restoration_detected: &Arc<AtomicBool>, stop_signal: &Arc<AtomicBool>) {
252 let mut prompt_existed = Path::new("PROMPT.md").exists();
253
254 while !stop_signal.load(Ordering::Relaxed) {
255 thread::sleep(Duration::from_millis(100));
256
257 let prompt_exists_now = Path::new("PROMPT.md").exists();
258
259 // Detect deletion (transition from exists to not exists)
260 if prompt_existed && !prompt_exists_now && Self::restore_from_backup() {
261 restoration_detected.store(true, Ordering::Release);
262 }
263
264 prompt_existed = prompt_exists_now;
265 }
266 }
267
268 /// Restore PROMPT.md from backup.
269 ///
270 /// Tries backups in order:
271 /// - .agent/PROMPT.md.backup
272 /// - .agent/PROMPT.md.backup.1
273 /// - .agent/PROMPT.md.backup.2
274 ///
275 /// Returns true if restoration succeeded, false otherwise.
276 ///
277 /// Uses atomic open to avoid TOCTOU race conditions - opens and reads
278 /// the file in one operation rather than checking existence separately.
279 #[must_use]
280 pub fn restore_from_backup() -> bool {
281 let backup_paths = [
282 Path::new(".agent/PROMPT.md.backup"),
283 Path::new(".agent/PROMPT.md.backup.1"),
284 Path::new(".agent/PROMPT.md.backup.2"),
285 ];
286
287 let prompt_path = Path::new("PROMPT.md");
288
289 for backup_path in &backup_paths {
290 let Some(backup_content) = read_backup_content_secure(backup_path) else {
291 continue;
292 };
293
294 if backup_content.trim().is_empty() {
295 continue;
296 }
297
298 if restore_prompt_content_atomic(prompt_path, backup_content.as_bytes()).is_err() {
299 continue;
300 }
301
302 return true;
303 }
304
305 false
306 }
307
308 /// Check if any restoration events were detected and reset the flag.
309 ///
310 /// Returns true if PROMPT.md was deleted and restored since the last
311 /// check. This is a one-time check - the flag is reset after reading.
312 ///
313 /// # Example
314 ///
315 /// ```no_run
316 /// # use ralph_workflow::files::protection::monitoring::PromptMonitor;
317 /// # let mut monitor = PromptMonitor::new().unwrap();
318 /// # monitor.start().unwrap();
319 /// // After running some agent code
320 /// if monitor.check_and_restore() {
321 /// println!("PROMPT.md was restored during this phase!");
322 /// }
323 /// ```
324 #[must_use]
325 pub fn check_and_restore(&self) -> bool {
326 self.restoration_detected.swap(false, Ordering::AcqRel)
327 }
328
329 /// Drain any warnings produced by the monitor thread.
330 #[must_use]
331 pub fn drain_warnings(&self) -> Vec<String> {
332 drain_warnings(&self.warnings)
333 }
334
335 /// Stop monitoring and cleanup resources.
336 ///
337 /// Signals the monitor thread to stop and waits for it to complete.
338 #[must_use]
339 pub fn stop(mut self) -> Vec<String> {
340 // Signal the thread to stop
341 self.stop_signal.store(true, Ordering::Release);
342
343 // Wait for the thread to finish and check for panics
344 if let Some(handle) = self.monitor_thread.take() {
345 if let Err(panic_payload) = handle.join() {
346 // Thread panicked - extract and log panic message for diagnostics
347 // Try common panic payload types
348 let panic_msg = panic_payload
349 .downcast_ref::<String>()
350 .cloned()
351 .or_else(|| {
352 panic_payload
353 .downcast_ref::<&str>()
354 .map(ToString::to_string)
355 })
356 .or_else(|| {
357 panic_payload
358 .downcast_ref::<&String>()
359 .map(|s| (*s).clone())
360 })
361 .unwrap_or_else(|| {
362 // Fallback: Try to get any available information
363 format!(
364 "<unknown panic type: {}>",
365 std::any::type_name_of_val(&panic_payload)
366 )
367 });
368 push_warning(
369 &self.warnings,
370 format!("File monitoring thread panicked: {panic_msg}"),
371 );
372 }
373 }
374
375 self.drain_warnings()
376 }
377}
378
379include!("monitoring/helpers.rs");
380
381#[cfg(test)]
382mod tests {
383 include!("monitoring/tests.rs");
384}