Skip to main content

ralph_workflow/files/monitoring/
io.rs

1// Boundary module: direct filesystem I/O and threading for PROMPT.md monitoring.
2// File named io.rs — recognized as boundary module by forbid_io_effects lint.
3//
4// Provides proactive monitoring to detect deletion attempts on PROMPT.md
5// immediately, rather than waiting for periodic checks.
6// Uses the `notify` crate for cross-platform file system events.
7//
8// Effect System Exception: uses `std::fs` directly (not Workspace trait)
9// because the notify crate requires watching the actual filesystem and the
10// monitor runs in a background thread that cannot share PhaseContext.
11// Documented in `docs/architecture/effect-system.md`.
12
13use std::fs;
14use std::fs::OpenOptions;
15use std::path::Path;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::thread;
19use std::time::Duration;
20
21const NOTIFY_EVENT_QUEUE_CAPACITY: usize = 1024;
22
23fn bounded_event_queue<T>() -> (std::sync::mpsc::SyncSender<T>, std::sync::mpsc::Receiver<T>) {
24    std::sync::mpsc::sync_channel(NOTIFY_EVENT_QUEUE_CAPACITY)
25}
26
27/// File system monitor for detecting PROMPT.md deletion events.
28///
29/// The monitor watches for deletion events and automatically restores
30/// PROMPT.md from backup when detected. Monitoring happens in a background
31/// thread, so the main thread is not blocked.
32///
33/// # Example
34///
35/// ```no_run
36/// # use ralph_workflow::files::monitoring::PromptMonitor;
37/// let mut monitor = PromptMonitor::new().unwrap();
38/// monitor.start().unwrap();
39///
40/// // ... run pipeline phases ...
41///
42/// // Check if any restoration occurred
43/// if monitor.check_and_restore() {
44///     println!("PROMPT.md was restored!");
45/// }
46///
47/// monitor.stop();
48/// # Ok::<(), std::io::Error>(())
49/// ```
50pub struct PromptMonitor {
51    /// Flag indicating if PROMPT.md was deleted and restored
52    restoration_detected: Arc<AtomicBool>,
53    /// Flag to signal the monitor thread to stop
54    stop_signal: Arc<AtomicBool>,
55    /// Handle to the monitor thread (None if not started)
56    monitor_thread: Option<thread::JoinHandle<()>>,
57    warnings_tx: std::sync::mpsc::SyncSender<String>,
58    warnings_rx: std::sync::mpsc::Receiver<String>,
59}
60
61impl PromptMonitor {
62    /// Create a new file system monitor for PROMPT.md.
63    ///
64    /// Returns an error if the current directory cannot be accessed or
65    /// if PROMPT.md doesn't exist (we need to know what to watch for).
66    ///
67    /// # Errors
68    ///
69    /// Returns error if the operation fails.
70    pub fn new() -> std::io::Result<Self> {
71        // Verify we're in a valid directory with PROMPT.md
72        let prompt_path = Path::new("PROMPT.md");
73        if !prompt_path.exists() {
74            return Err(std::io::Error::new(
75                std::io::ErrorKind::NotFound,
76                "PROMPT.md does not exist - cannot monitor",
77            ));
78        }
79
80        let (warnings_tx, warnings_rx) = bounded_event_queue();
81
82        Ok(Self {
83            restoration_detected: Arc::new(AtomicBool::new(false)),
84            stop_signal: Arc::new(AtomicBool::new(false)),
85            monitor_thread: None,
86            warnings_tx,
87            warnings_rx,
88        })
89    }
90
91    /// Start monitoring PROMPT.md for deletion events.
92    ///
93    /// This spawns a background thread that watches for file system events.
94    /// Returns immediately; monitoring happens asynchronously.
95    ///
96    /// The monitor will automatically restore PROMPT.md from backup if
97    /// deletion is detected.
98    ///
99    /// # Errors
100    ///
101    /// Returns error if the operation fails.
102    pub fn start(&mut self) -> std::io::Result<()> {
103        if self.monitor_thread.is_some() {
104            return Err(std::io::Error::new(
105                std::io::ErrorKind::AlreadyExists,
106                "Monitor is already running",
107            ));
108        }
109
110        let restoration_flag = Arc::clone(&self.restoration_detected);
111        let stop_signal = Arc::clone(&self.stop_signal);
112        let warnings = self.warnings_tx.clone();
113
114        let handle = thread::spawn(move || {
115            Self::monitor_thread_main(&restoration_flag, &stop_signal, warnings);
116        });
117
118        self.monitor_thread = Some(handle);
119        Ok(())
120    }
121
122    /// Background thread entry point for file system monitoring.
123    ///
124    /// Sets up a watcher and runs the event loop, falling back to polling
125    /// if the watcher cannot be created.
126    fn monitor_thread_main(
127        restoration_detected: &Arc<AtomicBool>,
128        stop_signal: &Arc<AtomicBool>,
129        warnings: std::sync::mpsc::SyncSender<String>,
130    ) {
131        // Bounded queue: caps memory on event bursts; drops are safe because
132        // PROMPT.md deletion protection is best-effort and events are coalescable.
133        let (tx, rx) = bounded_event_queue();
134        match setup_directory_watcher(tx) {
135            Ok(_watcher) => run_watcher_event_loop(&rx, restoration_detected, stop_signal),
136            Err(e) => {
137                push_warning(&warnings, watcher_setup_error_message(&e));
138                Self::polling_monitor(restoration_detected, stop_signal);
139            }
140        }
141    }
142
143    /// Handle a file system event from the watcher.
144    fn handle_fs_event(event: &notify::Event, restoration_detected: &Arc<AtomicBool>) {
145        if is_restore_trigger_event(event) && Self::restore_from_backup() {
146            restoration_detected.store(true, Ordering::Release);
147        }
148    }
149
150    /// Fallback polling-based monitor when file system watcher fails.
151    ///
152    /// Some filesystems (NFS, network drives) don't support file system
153    /// events. This fallback polls every 100ms to check if PROMPT.md exists.
154    fn polling_monitor(restoration_detected: &Arc<AtomicBool>, stop_signal: &Arc<AtomicBool>) {
155        let previous_exists = AtomicBool::new(Path::new("PROMPT.md").exists());
156
157        std::iter::from_fn(|| {
158            if stop_signal.load(Ordering::Relaxed) {
159                return None;
160            }
161
162            thread::sleep(Duration::from_millis(100));
163            Some(Path::new("PROMPT.md").exists())
164        })
165        .for_each(|current_exists| {
166            let previous = previous_exists.swap(current_exists, Ordering::AcqRel);
167            if previous && !current_exists && Self::restore_from_backup() {
168                restoration_detected.store(true, Ordering::Release);
169            }
170        });
171    }
172
173    /// Restore PROMPT.md from backup.
174    ///
175    /// Tries backups in order:
176    /// - .agent/PROMPT.md.backup
177    /// - .agent/PROMPT.md.backup.1
178    /// - .agent/PROMPT.md.backup.2
179    ///
180    /// Returns true if restoration succeeded, false otherwise.
181    ///
182    /// Uses atomic open to avoid TOCTOU race conditions - opens and reads
183    /// the file in one operation rather than checking existence separately.
184    #[must_use]
185    pub fn restore_from_backup() -> bool {
186        let backup_paths = [
187            Path::new(".agent/PROMPT.md.backup"),
188            Path::new(".agent/PROMPT.md.backup.1"),
189            Path::new(".agent/PROMPT.md.backup.2"),
190        ];
191
192        let prompt_path = Path::new("PROMPT.md");
193
194        backup_paths
195            .iter()
196            .filter_map(|backup_path| read_backup_content_secure(backup_path))
197            .filter(|backup_content| !backup_content.trim().is_empty())
198            .any(|backup_content| {
199                restore_prompt_content_atomic(prompt_path, backup_content.as_bytes()).is_ok()
200            })
201    }
202
203    /// Check if any restoration events were detected and reset the flag.
204    ///
205    /// Returns true if PROMPT.md was deleted and restored since the last
206    /// check. This is a one-time check - the flag is reset after reading.
207    ///
208    /// # Example
209    ///
210    /// ```no_run
211    /// # use ralph_workflow::files::monitoring::PromptMonitor;
212    /// # let mut monitor = PromptMonitor::new().unwrap();
213    /// # monitor.start().unwrap();
214    /// // After running some agent code
215    /// if monitor.check_and_restore() {
216    ///     println!("PROMPT.md was restored during this phase!");
217    /// }
218    /// ```
219    #[must_use]
220    pub fn check_and_restore(&self) -> bool {
221        self.restoration_detected.swap(false, Ordering::AcqRel)
222    }
223
224    /// Drain any warnings produced by the monitor thread.
225    #[must_use]
226    pub fn drain_warnings(&self) -> Vec<String> {
227        drain_warnings(&self.warnings_rx)
228    }
229
230    /// Stop monitoring and cleanup resources.
231    ///
232    /// Signals the monitor thread to stop and waits for it to complete.
233    #[must_use]
234    pub fn stop(mut self) -> Vec<String> {
235        self.stop_signal.store(true, Ordering::Release);
236
237        if let Some(handle) = self.monitor_thread.take() {
238            if let Err(panic_payload) = handle.join() {
239                push_warning(
240                    &self.warnings_tx,
241                    format!(
242                        "File monitoring thread panicked: {}",
243                        extract_panic_message(panic_payload)
244                    ),
245                );
246            }
247        }
248
249        self.drain_warnings()
250    }
251}
252
253enum MonitorSetupError {
254    Create(notify::Error),
255    Watch(notify::Error),
256}
257
258/// Return a human-readable warning for a watcher setup failure.
259fn watcher_setup_error_message(err: &MonitorSetupError) -> String {
260    match err {
261        MonitorSetupError::Create(e) => format!(
262            "Failed to create file system watcher: {e}. Falling back to periodic polling for PROMPT.md protection."
263        ),
264        MonitorSetupError::Watch(e) => format!(
265            "Failed to watch current directory: {e}. Falling back to periodic polling for PROMPT.md protection."
266        ),
267    }
268}
269
270/// Drive the watcher event loop until `stop_signal` is set or the channel disconnects.
271fn run_watcher_event_loop(
272    rx: &std::sync::mpsc::Receiver<notify::Result<notify::Event>>,
273    restoration_detected: &Arc<AtomicBool>,
274    stop_signal: &Arc<AtomicBool>,
275) {
276    std::iter::from_fn(|| {
277        if stop_signal.load(Ordering::Relaxed) {
278            return None;
279        }
280        Some(rx.recv_timeout(Duration::from_millis(100)))
281    })
282    .take_while(|received| {
283        !matches!(
284            received,
285            Err(std::sync::mpsc::RecvTimeoutError::Disconnected)
286        )
287    })
288    .for_each(|received| {
289        if let Ok(Ok(event)) = received {
290            PromptMonitor::handle_fs_event(&event, restoration_detected);
291            // Drain queued events to coalesce bursts.
292            std::iter::from_fn(|| rx.try_recv().ok())
293                .filter_map(Result::ok)
294                .for_each(|next_event| {
295                    PromptMonitor::handle_fs_event(&next_event, restoration_detected);
296                });
297        }
298    });
299}
300
301/// Extract a human-readable message from a thread panic payload.
302fn extract_panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
303    payload
304        .downcast_ref::<String>()
305        .cloned()
306        .or_else(|| payload.downcast_ref::<&str>().map(ToString::to_string))
307        .or_else(|| payload.downcast_ref::<&String>().map(|s| (*s).clone()))
308        .unwrap_or_else(|| {
309            format!(
310                "<unknown panic type: {}>",
311                std::any::type_name_of_val(&payload)
312            )
313        })
314}
315
316fn setup_directory_watcher(
317    event_sender: std::sync::mpsc::SyncSender<notify::Result<notify::Event>>,
318) -> std::result::Result<notify::RecommendedWatcher, MonitorSetupError> {
319    notify::recommended_watcher(move |res| {
320        // Drop if full to keep memory bounded.
321        let _ = event_sender.try_send(res);
322    })
323    .map_err(MonitorSetupError::Create)
324    .and_then(|watcher| {
325        watcher
326            .with_current_directory_watch()
327            .map_err(MonitorSetupError::Watch)
328    })
329}
330
331trait WatcherRegistrationExt {
332    fn with_current_directory_watch(self) -> notify::Result<Self>
333    where
334        Self: Sized;
335}
336
337impl WatcherRegistrationExt for notify::RecommendedWatcher {
338    fn with_current_directory_watch(mut self) -> notify::Result<Self> {
339        use notify::Watcher;
340
341        self.watch(Path::new("."), notify::RecursiveMode::NonRecursive)?;
342        Ok(self)
343    }
344}
345
346// ============================================================================
347// Helper functions (boundary module - mutation and I/O permitted)
348// ============================================================================
349
350fn push_warning(warnings: &std::sync::mpsc::SyncSender<String>, warning: String) {
351    let _ = warnings.try_send(warning);
352}
353
354fn drain_warnings(warnings: &std::sync::mpsc::Receiver<String>) -> Vec<String> {
355    std::iter::from_fn(|| warnings.try_recv().ok()).collect()
356}
357
358fn read_backup_content_secure(path: &Path) -> Option<String> {
359    // Defense-in-depth against symlink/hardlink attacks:
360    // - Reject symlink backups (symlink_metadata)
361    // - On Unix, open with O_NOFOLLOW and reject nlink != 1
362    // - Ensure it's a regular file
363    #[cfg(unix)]
364    {
365        use std::os::unix::fs::{MetadataExt, OpenOptionsExt};
366
367        let file = OpenOptions::new()
368            .read(true)
369            .custom_flags(libc::O_NOFOLLOW)
370            .open(path)
371            .ok()?;
372
373        let metadata = file.metadata().ok()?;
374        if !metadata.is_file() {
375            return None;
376        }
377        if metadata.nlink() != 1 {
378            return None;
379        }
380
381        std::io::read_to_string(file).ok()
382    }
383
384    #[cfg(not(unix))]
385    {
386        let meta = fs::symlink_metadata(path).ok()?;
387        if meta.file_type().is_symlink() {
388            return None;
389        }
390        if !meta.is_file() {
391            return None;
392        }
393
394        std::fs::read_to_string(path).ok()
395    }
396}
397
398/// Verify the path is not a directory, returning an error if it is.
399fn ensure_not_directory(path: &Path) -> std::io::Result<()> {
400    fs::symlink_metadata(path)
401        .ok()
402        .filter(|m| m.is_dir())
403        .map_or(Ok(()), |_| {
404            Err(std::io::Error::other("PROMPT.md path is a directory"))
405        })
406}
407
408/// Write `content` to `path` and fsync it to disk.
409fn write_and_sync_temp(path: &Path, content: &[u8]) -> std::io::Result<()> {
410    fs::write(path, content)?;
411    let _ = OpenOptions::new()
412        .write(true)
413        .open(path)
414        .and_then(|file| file.sync_all());
415    Ok(())
416}
417
418/// Set read-only permissions on a file before publishing it.
419fn make_file_readonly(path: &Path) -> std::io::Result<()> {
420    #[cfg(unix)]
421    {
422        use std::os::unix::fs::PermissionsExt;
423        fs::set_permissions(
424            path,
425            <fs::Permissions as PermissionsExt>::from_mode(0o444),
426        )?;
427    }
428
429    #[cfg(windows)]
430    {
431        let mut perms = fs::metadata(path)?.permissions();
432        perms.set_readonly(true);
433        fs::set_permissions(path, perms)?;
434    }
435
436    Ok(())
437}
438
439/// Atomically replace `dest` with `src` via rename, cleaning up `src` on failure.
440fn rename_or_cleanup(src: &Path, dest: &Path) -> std::io::Result<()> {
441    // On Windows, std::fs::rename does not replace existing destinations.
442    #[cfg(windows)]
443    if dest.exists() {
444        let _ = fs::remove_file(dest);
445    }
446
447    // Rename is symlink-safe: it replaces the directory entry rather than
448    // following a symlink target.
449    fs::rename(src, dest).inspect_err(|_e| {
450        let _ = fs::remove_file(src);
451    })
452}
453
454fn restore_prompt_content_atomic(prompt_path: &Path, content: &[u8]) -> std::io::Result<()> {
455    ensure_not_directory(prompt_path)?;
456    let temp_name = unique_temp_name();
457    let temp_path = Path::new(&temp_name);
458    write_and_sync_temp(temp_path, content)?;
459    make_file_readonly(temp_path)?;
460    rename_or_cleanup(temp_path, prompt_path)
461}
462
463fn unique_temp_name() -> String {
464    use std::time::{SystemTime, UNIX_EPOCH};
465
466    let nanos = SystemTime::now()
467        .duration_since(UNIX_EPOCH)
468        .unwrap_or_default()
469        .as_nanos();
470    let pid = std::process::id();
471    format!(".prompt_restore_tmp_{pid}_{nanos}")
472}
473
474fn is_prompt_md_path(path: &Path) -> bool {
475    matches!(path.file_name(), Some(name) if name == "PROMPT.md")
476}
477
478fn is_restore_trigger_event(event: &notify::Event) -> bool {
479    matches!(event.kind, notify::EventKind::Remove(_))
480        && event.paths.iter().any(|path| is_prompt_md_path(path))
481}
482
483impl Drop for PromptMonitor {
484    fn drop(&mut self) {
485        // Signal the thread to stop when dropped
486        self.stop_signal.store(true, Ordering::Release);
487
488        // Take the handle and let it finish on its own
489        // (we can't wait in Drop because we might be panicking)
490        let _ = self.monitor_thread.take();
491    }
492}
493
494// Tests are in tests/system_tests/file_protection/
495
496#[cfg(test)]
497mod tests {
498    use super::{drain_warnings, is_restore_trigger_event, push_warning};
499    use std::path::PathBuf;
500
501    fn remove_event(paths: Vec<&str>) -> notify::Event {
502        paths.into_iter().map(PathBuf::from).fold(
503            notify::Event::new(notify::EventKind::Remove(notify::event::RemoveKind::Any)),
504            |event, path| event.add_path(path),
505        )
506    }
507
508    fn create_event(paths: Vec<&str>) -> notify::Event {
509        paths.into_iter().map(PathBuf::from).fold(
510            notify::Event::new(notify::EventKind::Create(notify::event::CreateKind::Any)),
511            |event, path| event.add_path(path),
512        )
513    }
514
515    #[test]
516    fn drain_warnings_clears_buffer_after_read() {
517        let (warnings_tx, warnings_rx) = std::sync::mpsc::sync_channel::<String>(16);
518
519        push_warning(&warnings_tx, "first warning".to_string());
520        push_warning(&warnings_tx, "second warning".to_string());
521
522        let first_drain = drain_warnings(&warnings_rx);
523        assert_eq!(first_drain.len(), 2);
524
525        let second_drain = drain_warnings(&warnings_rx);
526        assert!(
527            second_drain.is_empty(),
528            "warnings should be cleared after drain"
529        );
530    }
531
532    #[test]
533    fn restore_trigger_event_requires_remove_kind_and_prompt_path() {
534        let remove_prompt = remove_event(vec!["PROMPT.md"]);
535        assert!(is_restore_trigger_event(&remove_prompt));
536
537        let remove_other = remove_event(vec!["README.md"]);
538        assert!(!is_restore_trigger_event(&remove_other));
539
540        let create_prompt = create_event(vec!["PROMPT.md"]);
541        assert!(!is_restore_trigger_event(&create_prompt));
542    }
543}