use crate::core::NormalizedPath;
use dashmap::DashMap;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Clock(u64);
impl Clock {
pub const ZERO: Clock = Clock(0);
#[must_use]
pub fn tick(self) -> u64 {
self.0
}
}
impl std::fmt::Display for Clock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "clock:{}", self.0)
}
}
pub struct ChangeJournal {
current: AtomicU64,
journal: RwLock<BTreeMap<Clock, Vec<NormalizedPath>>>,
last_change: DashMap<NormalizedPath, Clock>,
last_overflow: AtomicU64,
max_journal_entries: usize,
}
impl std::fmt::Debug for ChangeJournal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChangeJournal")
.field("current", &self.current_clock())
.field("entries", &self.last_change.len())
.finish()
}
}
impl ChangeJournal {
#[must_use]
pub fn new() -> Self {
Self::with_capacity(10_000)
}
#[must_use]
pub fn with_capacity(max_journal_entries: usize) -> Self {
Self {
current: AtomicU64::new(0),
journal: RwLock::new(BTreeMap::new()),
last_change: DashMap::new(),
last_overflow: AtomicU64::new(0),
max_journal_entries,
}
}
#[must_use]
pub fn current_clock(&self) -> Clock {
Clock(self.current.load(Ordering::Acquire))
}
pub fn advance(&self, changed_paths: Vec<NormalizedPath>) -> Clock {
let new_tick = self.current.fetch_add(1, Ordering::AcqRel) + 1;
let clock = Clock(new_tick);
for path in &changed_paths {
self.last_change.insert(path.clone(), clock);
}
let mut journal = self.journal.write().expect("journal lock poisoned");
journal.insert(clock, changed_paths);
while journal.len() > self.max_journal_entries {
journal.pop_first();
}
clock
}
#[must_use]
pub fn changed_since(&self, path: &NormalizedPath, since: Clock) -> bool {
let overflow = self.last_overflow.load(Ordering::Acquire);
if overflow > 0 && since.0 < overflow {
return true;
}
match self.last_change.get(path) {
Some(last) => *last > since,
None => true,
}
}
#[must_use]
pub fn changes_since(&self, since: Clock) -> Vec<NormalizedPath> {
let journal = self.journal.read().expect("journal lock poisoned");
let mut result = Vec::new();
for (_clock, paths) in
journal.range((std::ops::Bound::Excluded(since), std::ops::Bound::Unbounded))
{
result.extend(paths.iter().cloned());
}
result
}
pub fn mark_overflow(&self) -> Clock {
let new_tick = self.current.fetch_add(1, Ordering::AcqRel) + 1;
self.last_overflow.store(new_tick, Ordering::Release);
Clock(new_tick)
}
pub fn clear(&self) -> Clock {
self.last_change.clear();
self.journal.write().expect("journal lock poisoned").clear();
self.mark_overflow()
}
pub fn retain_paths(&self, live: &std::collections::HashSet<NormalizedPath>) -> usize {
let before = self.last_change.len();
self.last_change.retain(|path, _| live.contains(path));
before - self.last_change.len()
}
#[must_use]
pub fn last_change_len(&self) -> usize {
self.last_change.len()
}
pub fn register(&self, path: NormalizedPath) {
let current = Clock(self.current.load(Ordering::Acquire));
self.last_change.entry(path).or_insert(current);
}
}
impl Default for ChangeJournal {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn clock_display() {
let c = Clock(42);
assert_eq!(format!("{c}"), "clock:42");
assert_eq!(format!("{}", Clock::ZERO), "clock:0");
}
#[test]
fn clock_zero_tick() {
assert_eq!(Clock::ZERO.tick(), 0);
}
#[test]
fn clock_ordering() {
assert!(Clock::ZERO < Clock(1));
assert!(Clock(1) < Clock(2));
assert_eq!(Clock(5), Clock(5));
}
#[test]
fn advance_empty_batch() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![]);
assert_eq!(c1, Clock(1));
assert_eq!(journal.current_clock(), Clock(1));
let changes = journal.changes_since(Clock::ZERO);
assert!(changes.is_empty());
}
#[test]
fn changes_since_empty_journal() {
let journal = ChangeJournal::new();
let changes = journal.changes_since(Clock::ZERO);
assert!(changes.is_empty());
}
#[test]
fn default_creates_new_journal() {
let journal = ChangeJournal::default();
assert_eq!(journal.current_clock(), Clock::ZERO);
}
#[test]
fn changed_since_at_exact_clock_returns_false() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![NormalizedPath::from("a.c")]);
assert!(!journal.changed_since(&NormalizedPath::from("a.c"), c1));
}
#[test]
fn overflow_then_new_changes() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![NormalizedPath::from("a.c")]);
let overflow_clock = journal.mark_overflow();
let c3 = journal.advance(vec![NormalizedPath::from("b.c")]);
assert!(journal.changed_since(&NormalizedPath::from("a.c"), c1));
assert!(!journal.changed_since(&NormalizedPath::from("b.c"), c3));
assert!(journal.changed_since(&NormalizedPath::from("b.c"), overflow_clock));
}
#[test]
fn advance_increments_clock() {
let journal = ChangeJournal::new();
assert_eq!(journal.current_clock(), Clock::ZERO);
let c1 = journal.advance(vec![NormalizedPath::from("a.c")]);
assert_eq!(c1, Clock(1));
let c2 = journal.advance(vec![NormalizedPath::from("b.c")]);
assert_eq!(c2, Clock(2));
assert_eq!(journal.current_clock(), Clock(2));
}
#[test]
fn changed_since_returns_true_for_changed_file() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![NormalizedPath::from("foo.h")]);
let c2 = journal.advance(vec![NormalizedPath::from("bar.h")]);
assert!(journal.changed_since(&NormalizedPath::from("foo.h"), Clock::ZERO));
assert!(journal.changed_since(&NormalizedPath::from("bar.h"), c1));
assert!(!journal.changed_since(&NormalizedPath::from("bar.h"), c2));
}
#[test]
fn changed_since_returns_false_for_unchanged_file() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![NormalizedPath::from("foo.h")]);
let _c2 = journal.advance(vec![NormalizedPath::from("bar.h")]);
assert!(!journal.changed_since(&NormalizedPath::from("foo.h"), c1));
}
#[test]
fn untracked_file_reports_changed() {
let journal = ChangeJournal::new();
journal.advance(vec![NormalizedPath::from("known.h")]);
assert!(journal.changed_since(&NormalizedPath::from("unknown.h"), Clock::ZERO));
}
#[test]
fn changes_since_returns_batch_union() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![NormalizedPath::from("a.c")]);
let _c2 = journal.advance(vec![
NormalizedPath::from("b.c"),
NormalizedPath::from("c.c"),
]);
let _c3 = journal.advance(vec![NormalizedPath::from("d.c")]);
let changed = journal.changes_since(c1);
assert_eq!(changed.len(), 3);
assert!(changed.contains(&NormalizedPath::from("b.c")));
assert!(changed.contains(&NormalizedPath::from("c.c")));
assert!(changed.contains(&NormalizedPath::from("d.c")));
}
#[test]
fn journal_trims_old_entries() {
let journal = ChangeJournal::with_capacity(5);
for i in 0..10 {
journal.advance(vec![NormalizedPath::from(format!("file_{i}.c"))]);
}
let journal_entries = journal.journal.read().unwrap();
assert!(journal_entries.len() <= 5);
assert!(!journal_entries.contains_key(&Clock(1)));
assert!(journal_entries.contains_key(&Clock(10)));
}
#[test]
fn mark_overflow_invalidates_everything() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![NormalizedPath::from("a.c")]);
let _c2 = journal.advance(vec![NormalizedPath::from("b.c")]);
let overflow_clock = journal.mark_overflow();
assert!(journal.changed_since(&NormalizedPath::from("a.c"), c1));
assert!(journal.changed_since(&NormalizedPath::from("b.c"), c1));
assert!(journal.changed_since(&NormalizedPath::from("never_seen.c"), c1));
assert!(!journal.changed_since(&NormalizedPath::from("a.c"), overflow_clock));
}
#[test]
fn register_makes_file_tracked() {
let journal = ChangeJournal::new();
assert!(journal.changed_since(&NormalizedPath::from("registered.h"), Clock::ZERO));
journal.register(NormalizedPath::from("registered.h"));
assert!(!journal.changed_since(&NormalizedPath::from("registered.h"), Clock::ZERO));
}
#[test]
fn register_does_not_overwrite_newer_change() {
let journal = ChangeJournal::new();
let path = NormalizedPath::from("modified.h");
let _c1 = journal.advance(vec![path.clone()]); journal.register(NormalizedPath::from("modified.h"));
assert!(journal.changed_since(&NormalizedPath::from("modified.h"), Clock::ZERO));
}
#[test]
fn clear_empties_journal_and_marks_overflow() {
let journal = ChangeJournal::new();
let c1 = journal.advance(vec![NormalizedPath::from("a.c")]);
let _c2 = journal.advance(vec![NormalizedPath::from("b.c")]);
let overflow_clock = journal.clear();
let changes = journal.changes_since(Clock::ZERO);
assert!(changes.is_empty());
assert!(journal.changed_since(&NormalizedPath::from("a.c"), c1));
journal.register(NormalizedPath::from("new.c"));
assert!(!journal.changed_since(&NormalizedPath::from("new.c"), overflow_clock));
}
#[test]
fn retain_removes_orphans() {
let journal = ChangeJournal::new();
journal.advance(vec![
NormalizedPath::from("a.c"),
NormalizedPath::from("b.c"),
NormalizedPath::from("c.c"),
]);
let live: std::collections::HashSet<NormalizedPath> =
[NormalizedPath::from("a.c")].into_iter().collect();
let removed = journal.retain_paths(&live);
assert_eq!(removed, 2);
assert_eq!(journal.last_change_len(), 1);
assert!(!journal.changed_since(&NormalizedPath::from("a.c"), Clock(1)));
}
#[test]
fn retain_keeps_all() {
let journal = ChangeJournal::new();
journal.advance(vec![
NormalizedPath::from("a.c"),
NormalizedPath::from("b.c"),
]);
let live: std::collections::HashSet<NormalizedPath> =
[NormalizedPath::from("a.c"), NormalizedPath::from("b.c")]
.into_iter()
.collect();
let removed = journal.retain_paths(&live);
assert_eq!(removed, 0);
assert_eq!(journal.last_change_len(), 2);
}
#[test]
fn last_change_len_tracks() {
let journal = ChangeJournal::new();
assert_eq!(journal.last_change_len(), 0);
journal.advance(vec![NormalizedPath::from("x.c")]);
assert_eq!(journal.last_change_len(), 1);
journal.advance(vec![
NormalizedPath::from("y.c"),
NormalizedPath::from("z.c"),
]);
assert_eq!(journal.last_change_len(), 3);
}
#[test]
fn concurrent_advance_and_query() {
use std::sync::Arc;
let journal = Arc::new(ChangeJournal::new());
let mut handles = Vec::new();
for t in 0..4 {
let j = Arc::clone(&journal);
handles.push(std::thread::spawn(move || {
for i in 0..100 {
j.advance(vec![NormalizedPath::from(format!("t{t}_f{i}.c"))]);
}
}));
}
for _ in 0..4 {
let j = Arc::clone(&journal);
handles.push(std::thread::spawn(move || {
for _ in 0..100 {
let clock = j.current_clock();
let _ = j.changed_since(&NormalizedPath::from("t0_f0.c"), clock);
let _ = j.changes_since(Clock::ZERO);
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(journal.current_clock().tick(), 400);
}
}