use std::time::Duration;
pub trait FsysObserver: std::fmt::Debug + Send + Sync {
fn on_journal_append(&self, _event: JournalAppendEvent) {}
fn on_journal_sync(&self, _event: JournalSyncEvent) {}
fn on_handle_write(&self, _event: HandleWriteEvent) {}
fn on_handle_read(&self, _event: HandleReadEvent) {}
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct JournalAppendEvent {
pub bytes_written: u64,
pub records: u32,
pub duration: Duration,
pub error: bool,
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct JournalSyncEvent {
pub durable_lsn: u64,
pub duration: Duration,
pub followers_at_commit: u32,
pub error: bool,
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct HandleWriteEvent {
pub bytes_written: u64,
pub duration: Duration,
pub error: bool,
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub struct HandleReadEvent {
pub bytes_read: u64,
pub duration: Duration,
pub error: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug, Default)]
struct CountingObserver {
appends: AtomicU64,
syncs: AtomicU64,
writes: AtomicU64,
reads: AtomicU64,
}
impl FsysObserver for CountingObserver {
fn on_journal_append(&self, _e: JournalAppendEvent) {
let _ = self.appends.fetch_add(1, Ordering::Relaxed);
}
fn on_journal_sync(&self, _e: JournalSyncEvent) {
let _ = self.syncs.fetch_add(1, Ordering::Relaxed);
}
fn on_handle_write(&self, _e: HandleWriteEvent) {
let _ = self.writes.fetch_add(1, Ordering::Relaxed);
}
fn on_handle_read(&self, _e: HandleReadEvent) {
let _ = self.reads.fetch_add(1, Ordering::Relaxed);
}
}
#[test]
fn observer_default_methods_are_no_ops() {
#[derive(Debug)]
struct OnlyAppend;
impl FsysObserver for OnlyAppend {
fn on_journal_append(&self, _e: JournalAppendEvent) {}
}
let obs: Arc<dyn FsysObserver> = Arc::new(OnlyAppend);
obs.on_journal_sync(JournalSyncEvent {
durable_lsn: 0,
duration: Duration::ZERO,
followers_at_commit: 0,
error: false,
});
obs.on_handle_write(HandleWriteEvent {
bytes_written: 0,
duration: Duration::ZERO,
error: false,
});
obs.on_handle_read(HandleReadEvent {
bytes_read: 0,
duration: Duration::ZERO,
error: false,
});
}
#[test]
fn counting_observer_tallies_per_method() {
let obs = Arc::new(CountingObserver::default());
let dyn_obs: Arc<dyn FsysObserver> = obs.clone();
for _ in 0..3 {
dyn_obs.on_journal_append(JournalAppendEvent {
bytes_written: 100,
records: 1,
duration: Duration::from_micros(10),
error: false,
});
}
dyn_obs.on_journal_sync(JournalSyncEvent {
durable_lsn: 300,
duration: Duration::from_micros(50),
followers_at_commit: 0,
error: false,
});
assert_eq!(obs.appends.load(Ordering::Relaxed), 3);
assert_eq!(obs.syncs.load(Ordering::Relaxed), 1);
assert_eq!(obs.writes.load(Ordering::Relaxed), 0);
assert_eq!(obs.reads.load(Ordering::Relaxed), 0);
}
#[test]
fn events_are_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<JournalAppendEvent>();
assert_send_sync::<JournalSyncEvent>();
assert_send_sync::<HandleWriteEvent>();
assert_send_sync::<HandleReadEvent>();
}
}