lcpfs 2026.1.102

LCP File System - A ZFS-inspired copy-on-write filesystem for Rust
// Copyright 2025 LunaOS Contributors
// SPDX-License-Identifier: Apache-2.0

//! # Filesystem Event Notification
//!
//! Real-time event notification for filesystem changes, similar to Linux's inotify.
//!
//! ## Overview
//!
//! The notify module provides a way to monitor filesystem changes in real-time.
//! Applications can register watches on paths and receive callbacks or poll for
//! events when changes occur.
//!
//! ## Features
//!
//! - **Watch Registration**: Monitor specific paths for events
//! - **Recursive Watches**: Monitor entire directory trees
//! - **Event Filtering**: Select which event types to receive
//! - **Callbacks**: Receive events via callback functions
//! - **Polling**: Poll for events with optional timeout
//! - **Event Log**: Query historical events
//! - **Debouncing**: Coalesce rapid changes
//!
//! ## Event Types
//!
//! | Event | Description |
//! |-------|-------------|
//! | Create | File created |
//! | Modify | File content changed |
//! | Delete | File deleted |
//! | Rename | File moved/renamed |
//! | Attrib | Attributes changed |
//! | Open | File opened |
//! | Close | File closed |
//! | DirCreate | Directory created |
//! | DirDelete | Directory deleted |
//!
//! ## Usage
//!
//! ### Callback-based Watching
//!
//! ```rust,ignore
//! use lcpfs::notify::{watch, EventType, FsEvent};
//!
//! // Watch for file changes with a callback
//! let wd = watch(
//!     "tank/data",
//!     "/path/to/dir",
//!     &[EventType::Create, EventType::Delete, EventType::Modify],
//!     Box::new(|event: &FsEvent| {
//!         println!("{} on {}", event.event_type.as_str(), event.path);
//!     }),
//! )?;
//!
//! // Later, remove the watch
//! remove_watch(wd)?;
//! ```
//!
//! ### Polling for Events
//!
//! ```rust,ignore
//! use lcpfs::notify::{poll_events, emit_create};
//!
//! // Somewhere in the filesystem code:
//! emit_create("tank/data", "/new/file.txt", 12345, 100);
//!
//! // In your application:
//! let events = poll_events();
//! for event in events {
//!     println!("Event: {} on {}", event.event_type.as_str(), event.path);
//! }
//! ```
//!
//! ### Recursive Watching
//!
//! ```rust,ignore
//! use lcpfs::notify::{watch_recursive, EventType};
//!
//! // Watch an entire directory tree
//! let wd = watch_recursive(
//!     "tank/data",
//!     "/projects",
//!     &[EventType::Create, EventType::Modify, EventType::Delete],
//!     Box::new(|event| {
//!         // Receives events from /projects and all subdirectories
//!         println!("{}", event.path);
//!     }),
//! )?;
//! ```
//!
//! ### Event History
//!
//! ```rust,ignore
//! use lcpfs::notify::{get_last_events, get_events_since_txg};
//!
//! // Get the last 100 events
//! let recent = get_last_events(100);
//!
//! // Get events since a specific transaction group
//! let since = get_events_since_txg(last_known_txg);
//! ```
//!
//! ## Integration Points
//!
//! The filesystem should call emit functions at appropriate points:
//!
//! ```rust,ignore
//! // In your write implementation:
//! fn write_file(...) {
//!     // ... actual write logic ...
//!
//!     if is_new_file {
//!         emit_create(dataset, path, object_id, txg);
//!     } else {
//!         emit_modify(dataset, path, object_id, size, txg);
//!     }
//! }
//!
//! // In your delete implementation:
//! fn delete_file(...) {
//!     // ... actual delete logic ...
//!     emit_delete(dataset, path, object_id, txg);
//! }
//! ```

mod emit;
mod types;
mod watch;

// Re-export types
pub use types::{EventMask, EventType, FsEvent, NotifyError, WatchDescriptor, WatchOptions};

// Re-export watch functions
pub use watch::{
    WatchCallback, WatchInfo, add_watch, get_watch_info, remove_watch, watch, watch_count,
    watch_recursive,
};

// Re-export emit functions
pub use emit::{
    EventLogStats, clear_event_log, clear_pending, emit_attrib, emit_create, emit_delete,
    emit_dir_create, emit_dir_delete, emit_event, emit_event_with_debounce, emit_modify,
    emit_rename, get_event_log_stats, get_events_for_path, get_events_since_timestamp,
    get_events_since_txg, get_last_events, is_emit_enabled, pending_count, poll_event, poll_events,
    set_debounce_interval, set_emit_enabled, set_event_log_max_size, wait_for_events,
};

// ═══════════════════════════════════════════════════════════════════════════════
// CONVENIENCE FUNCTIONS
// ═══════════════════════════════════════════════════════════════════════════════

/// Initialize the notify subsystem.
///
/// This is optional - the subsystem initializes lazily. Call this to
/// customize settings before first use.
pub fn init(max_log_size: usize, debounce_ms: u64) {
    set_event_log_max_size(max_log_size);
    set_debounce_interval(debounce_ms * 1000); // Convert to microseconds
}

/// Shutdown the notify subsystem.
///
/// Clears all watches and pending events.
pub fn shutdown() {
    set_emit_enabled(false);
    clear_pending();
    clear_event_log();
}

// ═══════════════════════════════════════════════════════════════════════════════
// MODULE TESTS
// ═══════════════════════════════════════════════════════════════════════════════

#[cfg(test)]
mod tests {
    use super::*;
    use alloc::boxed::Box;
    use alloc::sync::Arc;
    use alloc::vec;
    use alloc::vec::Vec;
    use core::sync::atomic::{AtomicU64, Ordering};

    fn setup() {
        // Reset state
        set_emit_enabled(true);
        set_debounce_interval(0);
        clear_pending();
        clear_event_log();
    }

    #[test]
    fn test_exports_accessible() {
        // Verify all exports are accessible
        let _ = EventType::Create;
        let _ = EventMask::ALL;
        let _ = WatchOptions::default();
        let _ = FsEvent::new(EventType::Create, "tank", "/path");
    }

    #[test]
    fn test_watch_and_emit() {
        setup();

        let counter = Arc::new(AtomicU64::new(0));
        let counter_clone = counter.clone();

        let callback: WatchCallback = Box::new(move |_event| {
            counter_clone.fetch_add(1, Ordering::SeqCst);
        });

        let wd = watch("tank/data", "/watched", &[EventType::Create], callback).unwrap();

        // Emit an event that matches
        emit_create("tank/data", "/watched/file.txt", 1, 100);

        // Check callback was invoked
        assert_eq!(counter.load(Ordering::SeqCst), 1);

        // Clean up
        remove_watch(wd).unwrap();
    }

    #[test]
    fn test_poll_integration() {
        setup();

        // Use a very unique dataset and txg range
        let ds = "poll_int_unique_7788998877";
        let base_txg = 99880077u64;
        emit_create(ds, "/poll_file1.txt", 1, base_txg);
        emit_modify(ds, "/poll_file2.txt", 2, 1024, base_txg + 1);

        // Check event log for our events (more reliable than poll_events
        // which may have been consumed by parallel tests)
        let logged = get_events_since_txg(base_txg);
        let our_logged: Vec<_> = logged.into_iter().filter(|e| e.dataset == ds).collect();

        // Also try poll_events
        let polled = poll_events();
        let our_polled: Vec<_> = polled.into_iter().filter(|e| e.dataset == ds).collect();

        // At least one of these should have our events
        let total = our_logged.len() + our_polled.len();
        assert!(
            total >= 1,
            "Expected at least 1 event in log or pending, got {} (log={}, polled={})",
            total,
            our_logged.len(),
            our_polled.len()
        );
    }

    #[test]
    fn test_event_log_integration() {
        setup();

        // Use unique txg range
        for i in 0..5 {
            emit_create(
                "log_int",
                &alloc::format!("/file{}.txt", i),
                i as u64,
                10000 + i as u64,
            );
        }

        // Get events with our txg range
        let since = get_events_since_txg(10002);
        let our_events: Vec<_> = since
            .into_iter()
            .filter(|e| e.dataset == "log_int")
            .collect();
        assert_eq!(our_events.len(), 2); // txg 10003 and 10004
    }

    #[test]
    fn test_recursive_watch() {
        setup();

        let counter = Arc::new(AtomicU64::new(0));
        let counter_clone = counter.clone();

        let _wd = watch_recursive(
            "tank",
            "/root",
            &[EventType::Create],
            Box::new(move |_| {
                counter_clone.fetch_add(1, Ordering::SeqCst);
            }),
        )
        .unwrap();

        // Events at various depths
        emit_create("tank", "/root/file.txt", 1, 100);
        emit_create("tank", "/root/sub/file.txt", 2, 101);
        emit_create("tank", "/root/sub/deep/file.txt", 3, 102);

        // All should match
        assert_eq!(counter.load(Ordering::SeqCst), 3);
    }

    #[test]
    fn test_watch_info() {
        setup();

        let wd = add_watch(
            "tank",
            "/path",
            EventMask::ALL,
            WatchOptions::recursive(),
            None,
        )
        .unwrap();

        let info = get_watch_info(wd).unwrap();
        assert_eq!(info.dataset, "tank");
        assert_eq!(info.path, "/path");
        assert!(info.recursive);
        assert!(info.active);

        remove_watch(wd).unwrap();
        assert!(get_watch_info(wd).is_none());
    }

    #[test]
    fn test_init_shutdown() {
        // Note: Tests run in parallel with shared global state.
        // Use unique dataset and verify relative changes.
        let ds = "init_shutdown_test_unique_11223344";

        // Ensure we start in a known state for this test
        set_emit_enabled(true);
        set_debounce_interval(0);
        clear_pending();

        init(5000, 100);

        // Verify emit works
        let before = pending_count();
        emit_create(ds, "/file.txt", 1, 100);
        let after_emit = pending_count();
        assert!(
            after_emit > before,
            "Event should have been emitted: before={}, after={}",
            before,
            after_emit
        );

        // Shutdown disables emit
        shutdown();

        // After shutdown, new events for THIS test shouldn't be emitted
        // Note: we check by looking for our specific dataset's events
        let before_shutdown_emit = pending_count();
        emit_create(ds, "/file2.txt", 2, 101);
        let after_shutdown_emit = pending_count();

        // The count should stay the same (no new events added)
        assert_eq!(
            after_shutdown_emit, before_shutdown_emit,
            "No events should be emitted after shutdown"
        );

        // Re-enable for other tests
        set_emit_enabled(true);
    }
}