Skip to main content

ralph_workflow/files/protection/
monitoring.rs

1//! Real-time file system monitoring for PROMPT.md protection.
2//!
3//! This module provides proactive monitoring to detect deletion attempts
4//! on PROMPT.md immediately, rather than waiting for periodic checks.
5//! It uses the `notify` crate for cross-platform file system events.
6//!
7//! # Effect System Exception
8//!
9//! This module uses `std::fs` directly rather than the `Workspace` trait.
10//! This is a documented exception to the effect system architecture because:
11//!
12//! 1. **Real-time filesystem monitoring**: The `notify` crate requires watching
13//!    the actual filesystem for events (inotify, `FSEvents`, `ReadDirectoryChangesW`).
14//! 2. **Background thread operation**: The monitor runs in a separate thread
15//!    that cannot share `PhaseContext` or workspace references.
16//! 3. **OS-level event handling**: File system events are inherently tied to
17//!    the real filesystem, not an abstraction layer.
18//!
19//! This exception is documented in `docs/architecture/effect-system.md`.
20//!
21//! # Design
22//!
23//! The monitor runs in a background thread and watches for deletion events
24//! on PROMPT.md. When a deletion is detected, it's automatically restored
25//! from backup. The main thread can poll the monitor to check if any
26//! restoration events occurred.
27//!
28//! # Platform Support
29//!
30//! - **Unix/Linux**: inotify via `notify` crate
31//! - **macOS**: `FSEvents` via `notify` crate
32//! - **Windows**: `ReadDirectoryChangesW` via `notify` crate
33
34use std::fs;
35use std::fs::OpenOptions;
36use std::path::Path;
37use std::sync::atomic::{AtomicBool, Ordering};
38use std::sync::{Arc, Mutex};
39use std::thread;
40use std::time::Duration;
41
42const NOTIFY_EVENT_QUEUE_CAPACITY: usize = 1024;
43
44fn bounded_event_queue<T>() -> (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>) {
45    std::sync::mpsc::sync_channel(NOTIFY_EVENT_QUEUE_CAPACITY)
46}
47
48/// File system monitor for detecting PROMPT.md deletion events.
49///
50/// The monitor watches for deletion events and automatically restores
51/// PROMPT.md from backup when detected. Monitoring happens in a background
52/// thread, so the main thread is not blocked.
53///
54/// # Example
55///
56/// ```no_run
57/// # use ralph_workflow::files::protection::monitoring::PromptMonitor;
58/// let mut monitor = PromptMonitor::new().unwrap();
59/// monitor.start().unwrap();
60///
61/// // ... run pipeline phases ...
62///
63/// // Check if any restoration occurred
64/// if monitor.check_and_restore() {
65///     println!("PROMPT.md was restored!");
66/// }
67///
68/// monitor.stop();
69/// # Ok::<(), std::io::Error>(())
70/// ```
71pub struct PromptMonitor {
72    /// Flag indicating if PROMPT.md was deleted and restored
73    restoration_detected: Arc<AtomicBool>,
74    /// Flag to signal the monitor thread to stop
75    stop_signal: Arc<AtomicBool>,
76    /// Handle to the monitor thread (None if not started)
77    monitor_thread: Option<thread::JoinHandle<()>>,
78    /// Warnings emitted by the monitor thread.
79    ///
80    /// This avoids printing directly from library/background thread code.
81    warnings: Arc<Mutex<Vec<String>>>,
82}
83
84impl PromptMonitor {
85    /// Create a new file system monitor for PROMPT.md.
86    ///
87    /// Returns an error if the current directory cannot be accessed or
88    /// if PROMPT.md doesn't exist (we need to know what to watch for).
89    ///
90    /// # Errors
91    ///
92    /// Returns error if the operation fails.
93    pub fn new() -> std::io::Result<Self> {
94        // Verify we're in a valid directory with PROMPT.md
95        let prompt_path = Path::new("PROMPT.md");
96        if !prompt_path.exists() {
97            return Err(std::io::Error::new(
98                std::io::ErrorKind::NotFound,
99                "PROMPT.md does not exist - cannot monitor",
100            ));
101        }
102
103        Ok(Self {
104            restoration_detected: Arc::new(AtomicBool::new(false)),
105            stop_signal: Arc::new(AtomicBool::new(false)),
106            monitor_thread: None,
107            warnings: Arc::new(Mutex::new(Vec::new())),
108        })
109    }
110
111    /// Start monitoring PROMPT.md for deletion events.
112    ///
113    /// This spawns a background thread that watches for file system events.
114    /// Returns immediately; monitoring happens asynchronously.
115    ///
116    /// The monitor will automatically restore PROMPT.md from backup if
117    /// deletion is detected.
118    ///
119    /// # Errors
120    ///
121    /// Returns error if the operation fails.
122    pub fn start(&mut self) -> std::io::Result<()> {
123        if self.monitor_thread.is_some() {
124            return Err(std::io::Error::new(
125                std::io::ErrorKind::AlreadyExists,
126                "Monitor is already running",
127            ));
128        }
129
130        let restoration_flag = Arc::clone(&self.restoration_detected);
131        let stop_signal = Arc::clone(&self.stop_signal);
132        let warnings = Arc::clone(&self.warnings);
133
134        let handle = thread::spawn(move || {
135            Self::monitor_thread_main(&restoration_flag, &stop_signal, &warnings);
136        });
137
138        self.monitor_thread = Some(handle);
139        Ok(())
140    }
141
142    /// Background thread entry point for file system monitoring.
143    ///
144    /// This thread watches the current directory for deletion events on
145    /// PROMPT.md and restores from backup when detected.
146    fn monitor_thread_main(
147        restoration_detected: &Arc<AtomicBool>,
148        stop_signal: &Arc<AtomicBool>,
149        warnings: &Arc<Mutex<Vec<String>>>,
150    ) {
151        use notify::Watcher;
152
153        // Bounded queue for notify events.
154        //
155        // The notify crate can emit bursts of events under heavy filesystem activity.
156        // We cap the in-memory queue to avoid unbounded growth; when full, we drop
157        // events because PROMPT.md deletion protection is best-effort and repeated
158        // events are coalescable (the polling fallback also covers missed events).
159        let (tx, rx) = bounded_event_queue();
160        let event_sender = tx;
161
162        // Create a watcher for the current directory
163        let mut watcher = match notify::recommended_watcher(move |res| {
164            // Drop if full to keep memory bounded.
165            let _ = event_sender.try_send(res);
166        }) {
167            Ok(w) => w,
168            Err(e) => {
169                push_warning(
170                    warnings,
171                    format!(
172                        "Failed to create file system watcher: {e}. Falling back to periodic polling for PROMPT.md protection."
173                    ),
174                );
175                // Fallback to polling if watcher creation fails
176                Self::polling_monitor(restoration_detected, stop_signal);
177                return;
178            }
179        };
180
181        // Watch the current directory for events
182        if let Err(e) = watcher.watch(Path::new("."), notify::RecursiveMode::NonRecursive) {
183            push_warning(
184                warnings,
185                format!(
186                    "Failed to watch current directory: {e}. Falling back to periodic polling for PROMPT.md protection."
187                ),
188            );
189            Self::polling_monitor(restoration_detected, stop_signal);
190            return;
191        }
192
193        // Process events until stop signal is received
194        let mut prompt_existed_last_check = true;
195
196        while !stop_signal.load(Ordering::Relaxed) {
197            // Check for events with a short timeout
198            match rx.recv_timeout(Duration::from_millis(100)) {
199                Ok(Ok(event)) => {
200                    Self::handle_fs_event(
201                        &event,
202                        restoration_detected,
203                        &mut prompt_existed_last_check,
204                    );
205
206                    // Drain any queued events to coalesce bursts.
207                    while let Ok(next) = rx.try_recv() {
208                        if let Ok(next_event) = next {
209                            Self::handle_fs_event(
210                                &next_event,
211                                restoration_detected,
212                                &mut prompt_existed_last_check,
213                            );
214                        }
215                    }
216                }
217                Ok(Err(_)) | Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
218                    // Error in watcher or timeout - continue anyway
219                }
220                Err(_) => {
221                    // Channel disconnected - stop monitoring
222                    break;
223                }
224            }
225        }
226    }
227
228    /// Handle a file system event from the watcher.
229    fn handle_fs_event(
230        event: &notify::Event,
231        restoration_detected: &Arc<AtomicBool>,
232        _prompt_existed_last_check: &mut bool,
233    ) {
234        for path in &event.paths {
235            if is_prompt_md_path(path) {
236                // Check for remove event
237                if matches!(event.kind, notify::EventKind::Remove(_)) {
238                    // PROMPT.md was removed - restore it
239                    if Self::restore_from_backup() {
240                        restoration_detected.store(true, Ordering::Release);
241                    }
242                }
243            }
244        }
245    }
246
247    /// Fallback polling-based monitor when file system watcher fails.
248    ///
249    /// Some filesystems (NFS, network drives) don't support file system
250    /// events. This fallback polls every 100ms to check if PROMPT.md exists.
251    fn polling_monitor(restoration_detected: &Arc<AtomicBool>, stop_signal: &Arc<AtomicBool>) {
252        let mut prompt_existed = Path::new("PROMPT.md").exists();
253
254        while !stop_signal.load(Ordering::Relaxed) {
255            thread::sleep(Duration::from_millis(100));
256
257            let prompt_exists_now = Path::new("PROMPT.md").exists();
258
259            // Detect deletion (transition from exists to not exists)
260            if prompt_existed && !prompt_exists_now && Self::restore_from_backup() {
261                restoration_detected.store(true, Ordering::Release);
262            }
263
264            prompt_existed = prompt_exists_now;
265        }
266    }
267
268    /// Restore PROMPT.md from backup.
269    ///
270    /// Tries backups in order:
271    /// - .agent/PROMPT.md.backup
272    /// - .agent/PROMPT.md.backup.1
273    /// - .agent/PROMPT.md.backup.2
274    ///
275    /// Returns true if restoration succeeded, false otherwise.
276    ///
277    /// Uses atomic open to avoid TOCTOU race conditions - opens and reads
278    /// the file in one operation rather than checking existence separately.
279    #[must_use]
280    pub fn restore_from_backup() -> bool {
281        let backup_paths = [
282            Path::new(".agent/PROMPT.md.backup"),
283            Path::new(".agent/PROMPT.md.backup.1"),
284            Path::new(".agent/PROMPT.md.backup.2"),
285        ];
286
287        let prompt_path = Path::new("PROMPT.md");
288
289        for backup_path in &backup_paths {
290            let Some(backup_content) = read_backup_content_secure(backup_path) else {
291                continue;
292            };
293
294            if backup_content.trim().is_empty() {
295                continue;
296            }
297
298            if restore_prompt_content_atomic(prompt_path, backup_content.as_bytes()).is_err() {
299                continue;
300            }
301
302            return true;
303        }
304
305        false
306    }
307
308    /// Check if any restoration events were detected and reset the flag.
309    ///
310    /// Returns true if PROMPT.md was deleted and restored since the last
311    /// check. This is a one-time check - the flag is reset after reading.
312    ///
313    /// # Example
314    ///
315    /// ```no_run
316    /// # use ralph_workflow::files::protection::monitoring::PromptMonitor;
317    /// # let mut monitor = PromptMonitor::new().unwrap();
318    /// # monitor.start().unwrap();
319    /// // After running some agent code
320    /// if monitor.check_and_restore() {
321    ///     println!("PROMPT.md was restored during this phase!");
322    /// }
323    /// ```
324    #[must_use]
325    pub fn check_and_restore(&self) -> bool {
326        self.restoration_detected.swap(false, Ordering::AcqRel)
327    }
328
329    /// Drain any warnings produced by the monitor thread.
330    #[must_use]
331    pub fn drain_warnings(&self) -> Vec<String> {
332        drain_warnings(&self.warnings)
333    }
334
335    /// Stop monitoring and cleanup resources.
336    ///
337    /// Signals the monitor thread to stop and waits for it to complete.
338    #[must_use]
339    pub fn stop(mut self) -> Vec<String> {
340        // Signal the thread to stop
341        self.stop_signal.store(true, Ordering::Release);
342
343        // Wait for the thread to finish and check for panics
344        if let Some(handle) = self.monitor_thread.take() {
345            if let Err(panic_payload) = handle.join() {
346                // Thread panicked - extract and log panic message for diagnostics
347                // Try common panic payload types
348                let panic_msg = panic_payload
349                    .downcast_ref::<String>()
350                    .cloned()
351                    .or_else(|| {
352                        panic_payload
353                            .downcast_ref::<&str>()
354                            .map(ToString::to_string)
355                    })
356                    .or_else(|| {
357                        panic_payload
358                            .downcast_ref::<&String>()
359                            .map(|s| (*s).clone())
360                    })
361                    .unwrap_or_else(|| {
362                        // Fallback: Try to get any available information
363                        format!(
364                            "<unknown panic type: {}>",
365                            std::any::type_name_of_val(&panic_payload)
366                        )
367                    });
368                push_warning(
369                    &self.warnings,
370                    format!("File monitoring thread panicked: {panic_msg}"),
371                );
372            }
373        }
374
375        self.drain_warnings()
376    }
377}
378
379include!("monitoring/helpers.rs");
380
381#[cfg(test)]
382mod tests {
383    include!("monitoring/tests.rs");
384}