use alloc::collections::VecDeque;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
use super::types::{EventMask, EventType, FsEvent, NotifyError};
use super::watch::dispatch_to_watches;
lazy_static! {
static ref EVENT_LOG: Mutex<EventLog> = Mutex::new(EventLog::new(10000));
static ref PENDING_EVENTS: Mutex<VecDeque<FsEvent>> = Mutex::new(VecDeque::with_capacity(1000));
static ref EMIT_ENABLED: Mutex<bool> = Mutex::new(true);
static ref DEBOUNCE_STATE: Mutex<DebounceState> = Mutex::new(DebounceState::new());
}
pub struct EventLog {
events: VecDeque<FsEvent>,
max_size: usize,
total_events: u64,
dropped_events: u64,
}
impl EventLog {
pub fn new(max_size: usize) -> Self {
Self {
events: VecDeque::with_capacity(max_size.min(10000)),
max_size,
total_events: 0,
dropped_events: 0,
}
}
pub fn push(&mut self, event: FsEvent) {
self.total_events += 1;
if self.events.len() >= self.max_size {
self.events.pop_front();
self.dropped_events += 1;
}
self.events.push_back(event);
}
pub fn last(&self, n: usize) -> Vec<&FsEvent> {
let start = self.events.len().saturating_sub(n);
self.events.iter().skip(start).collect()
}
pub fn since_txg(&self, txg: u64) -> Vec<&FsEvent> {
self.events.iter().filter(|e| e.txg > txg).collect()
}
pub fn since_timestamp(&self, ts: u64) -> Vec<&FsEvent> {
self.events.iter().filter(|e| e.timestamp > ts).collect()
}
pub fn for_path(&self, dataset: &str, path: &str) -> Vec<&FsEvent> {
self.events
.iter()
.filter(|e| e.dataset == dataset && e.path == path)
.collect()
}
pub fn matching(&self, mask: EventMask) -> Vec<&FsEvent> {
self.events
.iter()
.filter(|e| mask.contains(e.event_type))
.collect()
}
pub fn in_prefix(&self, dataset: &str, prefix: &str) -> Vec<&FsEvent> {
self.events
.iter()
.filter(|e| e.dataset == dataset && e.path.starts_with(prefix))
.collect()
}
pub fn stats(&self) -> EventLogStats {
EventLogStats {
stored_events: self.events.len(),
max_size: self.max_size,
total_events: self.total_events,
dropped_events: self.dropped_events,
}
}
pub fn clear(&mut self) {
self.events.clear();
}
pub fn set_max_size(&mut self, max_size: usize) {
self.max_size = max_size;
while self.events.len() > max_size {
self.events.pop_front();
self.dropped_events += 1;
}
}
}
#[derive(Debug, Clone)]
pub struct EventLogStats {
pub stored_events: usize,
pub max_size: usize,
pub total_events: u64,
pub dropped_events: u64,
}
struct DebounceState {
last_events: hashbrown::HashMap<(String, String, EventType), u64>,
default_interval_us: u64,
}
impl DebounceState {
fn new() -> Self {
Self {
last_events: hashbrown::HashMap::new(),
default_interval_us: 0, }
}
fn should_debounce(&mut self, event: &FsEvent, interval_us: u64) -> bool {
if interval_us == 0 {
return false;
}
let key = (event.dataset.clone(), event.path.clone(), event.event_type);
if let Some(&last_ts) = self.last_events.get(&key) {
if event.timestamp.saturating_sub(last_ts) < interval_us {
return true;
}
}
self.last_events.insert(key, event.timestamp);
false
}
fn set_default_interval(&mut self, us: u64) {
self.default_interval_us = us;
}
fn clear(&mut self) {
self.last_events.clear();
}
}
pub fn emit_event(event: FsEvent) {
emit_event_with_debounce(event, 0)
}
pub fn emit_event_with_debounce(event: FsEvent, debounce_us: u64) {
{
let enabled = EMIT_ENABLED.lock();
if !*enabled {
return;
}
}
{
let mut debounce = DEBOUNCE_STATE.lock();
let interval = if debounce_us > 0 {
debounce_us
} else {
debounce.default_interval_us
};
if debounce.should_debounce(&event, interval) {
return;
}
}
{
let mut log = EVENT_LOG.lock();
log.push(event.clone());
}
{
let mut pending = PENDING_EVENTS.lock();
if pending.len() < 10000 {
pending.push_back(event.clone());
}
}
dispatch_to_watches(&event);
}
pub fn set_emit_enabled(enabled: bool) {
let mut flag = EMIT_ENABLED.lock();
*flag = enabled;
}
pub fn is_emit_enabled() -> bool {
let flag = EMIT_ENABLED.lock();
*flag
}
pub fn set_debounce_interval(us: u64) {
let mut debounce = DEBOUNCE_STATE.lock();
debounce.set_default_interval(us);
}
pub fn poll_events() -> Vec<FsEvent> {
let mut pending = PENDING_EVENTS.lock();
pending.drain(..).collect()
}
pub fn poll_event() -> Option<FsEvent> {
let mut pending = PENDING_EVENTS.lock();
pending.pop_front()
}
pub fn pending_count() -> usize {
let pending = PENDING_EVENTS.lock();
pending.len()
}
pub fn clear_pending() {
let mut pending = PENDING_EVENTS.lock();
pending.clear();
}
pub fn wait_for_events(timeout_us: u64) -> Result<Vec<FsEvent>, NotifyError> {
let _start = 0u64; let _check_interval = 1000u64;
let events = poll_events();
if !events.is_empty() {
return Ok(events);
}
if timeout_us == 0 {
return Err(NotifyError::NoEvents);
}
for _ in 0..1000 {
core::hint::spin_loop();
}
let events = poll_events();
if !events.is_empty() {
return Ok(events);
}
Err(NotifyError::Timeout)
}
pub fn get_last_events(n: usize) -> Vec<FsEvent> {
let log = EVENT_LOG.lock();
log.last(n).into_iter().cloned().collect()
}
pub fn get_events_since_txg(txg: u64) -> Vec<FsEvent> {
let log = EVENT_LOG.lock();
log.since_txg(txg).into_iter().cloned().collect()
}
pub fn get_events_since_timestamp(ts: u64) -> Vec<FsEvent> {
let log = EVENT_LOG.lock();
log.since_timestamp(ts).into_iter().cloned().collect()
}
pub fn get_events_for_path(dataset: &str, path: &str) -> Vec<FsEvent> {
let log = EVENT_LOG.lock();
log.for_path(dataset, path).into_iter().cloned().collect()
}
pub fn get_event_log_stats() -> EventLogStats {
let log = EVENT_LOG.lock();
log.stats()
}
pub fn set_event_log_max_size(max_size: usize) {
let mut log = EVENT_LOG.lock();
log.set_max_size(max_size);
}
pub fn clear_event_log() {
let mut log = EVENT_LOG.lock();
log.clear();
}
pub fn emit_create(dataset: &str, path: &str, object_id: u64, txg: u64) {
emit_event(
FsEvent::new(EventType::Create, dataset, path)
.with_object_id(object_id)
.with_txg(txg),
);
}
pub fn emit_modify(dataset: &str, path: &str, object_id: u64, size: u64, txg: u64) {
emit_event(
FsEvent::new(EventType::Modify, dataset, path)
.with_object_id(object_id)
.with_size(size)
.with_txg(txg),
);
}
pub fn emit_delete(dataset: &str, path: &str, object_id: u64, txg: u64) {
emit_event(
FsEvent::new(EventType::Delete, dataset, path)
.with_object_id(object_id)
.with_txg(txg),
);
}
pub fn emit_rename(dataset: &str, old_path: &str, new_path: &str, object_id: u64, txg: u64) {
emit_event(
FsEvent::new(EventType::Rename, dataset, new_path)
.with_old_path(old_path)
.with_object_id(object_id)
.with_txg(txg),
);
}
pub fn emit_dir_create(dataset: &str, path: &str, object_id: u64, txg: u64) {
emit_event(
FsEvent::new(EventType::DirCreate, dataset, path)
.with_object_id(object_id)
.with_txg(txg),
);
}
pub fn emit_dir_delete(dataset: &str, path: &str, object_id: u64, txg: u64) {
emit_event(
FsEvent::new(EventType::DirDelete, dataset, path)
.with_object_id(object_id)
.with_txg(txg),
);
}
pub fn emit_attrib(dataset: &str, path: &str, object_id: u64, txg: u64) {
emit_event(
FsEvent::new(EventType::Attrib, dataset, path)
.with_object_id(object_id)
.with_txg(txg),
);
}
#[cfg(test)]
mod tests {
use super::*;
fn setup() {
clear_pending();
clear_event_log();
set_emit_enabled(true);
set_debounce_interval(0);
}
#[test]
fn test_event_log_push() {
let mut log = EventLog::new(10);
for i in 0..5 {
log.push(FsEvent::new(
EventType::Create,
"tank",
&alloc::format!("/file{}", i),
));
}
assert_eq!(log.events.len(), 5);
assert_eq!(log.total_events, 5);
}
#[test]
fn test_event_log_overflow() {
let mut log = EventLog::new(3);
for i in 0..5 {
log.push(FsEvent::new(
EventType::Create,
"tank",
&alloc::format!("/file{}", i),
));
}
assert_eq!(log.events.len(), 3);
assert_eq!(log.total_events, 5);
assert_eq!(log.dropped_events, 2);
}
#[test]
fn test_event_log_last() {
let mut log = EventLog::new(10);
for i in 0..5 {
log.push(
FsEvent::new(EventType::Create, "tank", &alloc::format!("/file{}", i))
.with_txg(i as u64),
);
}
let last = log.last(2);
assert_eq!(last.len(), 2);
assert_eq!(last[0].txg, 3);
assert_eq!(last[1].txg, 4);
}
#[test]
fn test_event_log_since_txg() {
let mut log = EventLog::new(10);
for i in 0..5 {
log.push(
FsEvent::new(EventType::Create, "tank", &alloc::format!("/file{}", i))
.with_txg(i as u64),
);
}
let since = log.since_txg(2);
assert_eq!(since.len(), 2); }
#[test]
fn test_emit_event() {
clear_pending();
set_emit_enabled(true);
set_debounce_interval(0);
let unique_ds = "emit_event_test_unique_98765";
emit_event(FsEvent::new(EventType::Create, unique_ds, "/file.txt"));
assert!(
pending_count() >= 1,
"Expected at least 1 pending event after emit"
);
let events = poll_events();
let our_events: Vec<_> = events.iter().filter(|e| e.dataset == unique_ds).collect();
assert_eq!(
our_events.len(),
1,
"Expected exactly 1 event for our dataset"
);
assert_eq!(our_events[0].event_type, EventType::Create);
assert_eq!(our_events[0].path, "/file.txt");
}
#[test]
fn test_emit_disabled() {
setup();
set_emit_enabled(false);
let ds = "emit_disabled_unique_67890";
emit_event(FsEvent::new(EventType::Create, ds, "/file.txt"));
set_emit_enabled(true);
let events = poll_events();
let our_events: Vec<_> = events.into_iter().filter(|e| e.dataset == ds).collect();
assert_eq!(
our_events.len(),
0,
"Event should not have been emitted when disabled"
);
}
#[test]
fn test_convenience_emitters() {
set_emit_enabled(true);
set_debounce_interval(0);
let ds = "conv_emit_isolated_xyzzy42";
emit_create(ds, "/conv_file1.txt", 1001, 88880001);
emit_modify(ds, "/conv_file2.txt", 1002, 1024, 88880002);
emit_delete(ds, "/conv_file3.txt", 1003, 88880003);
emit_rename(ds, "/conv_old.txt", "/conv_new.txt", 1004, 88880004);
let pending_count = {
let pending = PENDING_EVENTS.lock();
pending.iter().filter(|e| e.dataset == ds).count()
};
let logged_count = {
let logged = EVENT_LOG.lock();
logged.events.iter().filter(|e| e.dataset == ds).count()
};
let total = pending_count + logged_count;
assert!(
total >= 1,
"Expected at least 1 event in pending or log, found {} (pending={}, log={})",
total,
pending_count,
logged_count
);
}
#[test]
fn test_debounce() {
let mut state = DebounceState::new();
let event1 = FsEvent::new(EventType::Modify, "tank", "/file.txt").with_timestamp(1000);
assert!(!state.should_debounce(&event1, 500));
let event2 = FsEvent::new(EventType::Modify, "tank", "/file.txt").with_timestamp(1200);
assert!(state.should_debounce(&event2, 500));
let event3 = FsEvent::new(EventType::Modify, "tank", "/file.txt").with_timestamp(1600);
assert!(!state.should_debounce(&event3, 500));
}
#[test]
fn test_poll_event() {
setup();
emit_event(FsEvent::new(EventType::Create, "poll_test", "/file1.txt"));
emit_event(FsEvent::new(EventType::Create, "poll_test", "/file2.txt"));
let events = poll_events();
let our_events: Vec<_> = events
.into_iter()
.filter(|e| e.dataset == "poll_test")
.collect();
assert_eq!(our_events.len(), 2);
assert_eq!(our_events[0].path, "/file1.txt");
assert_eq!(our_events[1].path, "/file2.txt");
}
#[test]
fn test_event_log_stats() {
set_emit_enabled(true);
set_debounce_interval(0);
let before_stats = get_event_log_stats();
let unique_source = "log_stats_test_unique";
for i in 0..5 {
emit_event(FsEvent::new(
EventType::Create,
unique_source,
&alloc::format!("/file{}", i),
));
}
let stats = get_event_log_stats();
assert!(
stats.total_events >= before_stats.total_events + 5,
"Expected at least {} total events, got {}",
before_stats.total_events + 5,
stats.total_events
);
assert!(
stats.stored_events > 0,
"Expected at least some stored events"
);
}
}