Skip to main content

haystack_core/graph/
changelog.rs

1// Graph change tracking — records mutations for replication / undo.
2
3use std::fmt;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::data::HDict;
7
8/// Default changelog capacity (50,000 entries).
9pub const DEFAULT_CHANGELOG_CAPACITY: usize = 50_000;
10
11/// The kind of mutation that was applied.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum DiffOp {
14    /// A new entity was added.
15    Add,
16    /// An existing entity was updated.
17    Update,
18    /// An entity was removed.
19    Remove,
20}
21
22/// A single mutation record.
23#[derive(Debug, Clone)]
24pub struct GraphDiff {
25    /// The graph version *after* this mutation.
26    pub version: u64,
27    /// Wall-clock timestamp as Unix nanoseconds (0 if unavailable).
28    pub timestamp: i64,
29    /// The kind of mutation.
30    pub op: DiffOp,
31    /// The entity's ref value.
32    pub ref_val: String,
33    /// The entity state before the mutation (Some for Remove; None for Add/Update).
34    pub old: Option<HDict>,
35    /// The entity state after the mutation (Some for Add; None for Remove/Update).
36    pub new: Option<HDict>,
37    /// For Update: only the tags that changed, with their **new** values.
38    pub changed_tags: Option<HDict>,
39    /// For Update: only the tags that changed, with their **previous** values.
40    pub previous_tags: Option<HDict>,
41}
42
43impl GraphDiff {
44    /// Returns the current wall-clock time as Unix nanoseconds.
45    pub(crate) fn now_nanos() -> i64 {
46        SystemTime::now()
47            .duration_since(UNIX_EPOCH)
48            .map(|d| i64::try_from(d.as_nanos()).unwrap_or(i64::MAX))
49            .unwrap_or(0)
50    }
51}
52
53/// Error returned when a subscriber has fallen behind and the changelog
54/// no longer contains entries at their requested version.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct ChangelogGap {
57    /// The version the subscriber requested changes since.
58    pub subscriber_version: u64,
59    /// The lowest version still retained in the changelog.
60    pub floor_version: u64,
61}
62
63impl fmt::Display for ChangelogGap {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
65        write!(
66            f,
67            "changelog gap: subscriber at version {}, oldest available is {}",
68            self.subscriber_version, self.floor_version
69        )
70    }
71}
72
73impl std::error::Error for ChangelogGap {}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[test]
80    fn diff_op_equality() {
81        assert_eq!(DiffOp::Add, DiffOp::Add);
82        assert_ne!(DiffOp::Add, DiffOp::Update);
83        assert_ne!(DiffOp::Update, DiffOp::Remove);
84    }
85
86    #[test]
87    fn diff_op_clone() {
88        let op = DiffOp::Remove;
89        let cloned = op.clone();
90        assert_eq!(op, cloned);
91    }
92
93    #[test]
94    fn graph_diff_construction() {
95        let diff = GraphDiff {
96            version: 1,
97            timestamp: 0,
98            op: DiffOp::Add,
99            ref_val: "site-1".to_string(),
100            old: None,
101            new: Some(HDict::new()),
102            changed_tags: None,
103            previous_tags: None,
104        };
105        assert_eq!(diff.version, 1);
106        assert_eq!(diff.op, DiffOp::Add);
107        assert_eq!(diff.ref_val, "site-1");
108        assert!(diff.old.is_none());
109        assert!(diff.new.is_some());
110        assert!(diff.changed_tags.is_none());
111        assert!(diff.previous_tags.is_none());
112    }
113
114    #[test]
115    fn graph_diff_clone() {
116        let diff = GraphDiff {
117            version: 2,
118            timestamp: 0,
119            op: DiffOp::Update,
120            ref_val: "equip-1".to_string(),
121            old: None,
122            new: None,
123            changed_tags: Some(HDict::new()),
124            previous_tags: Some(HDict::new()),
125        };
126        let cloned = diff.clone();
127        assert_eq!(cloned.version, 2);
128        assert_eq!(cloned.op, DiffOp::Update);
129        assert_eq!(cloned.ref_val, "equip-1");
130        assert!(cloned.changed_tags.is_some());
131        assert!(cloned.previous_tags.is_some());
132    }
133
134    #[test]
135    fn changelog_gap_display() {
136        let gap = ChangelogGap {
137            subscriber_version: 5,
138            floor_version: 100,
139        };
140        let msg = format!("{gap}");
141        assert!(msg.contains("5"));
142        assert!(msg.contains("100"));
143    }
144
145    #[test]
146    fn changelog_gap_equality() {
147        let a = ChangelogGap {
148            subscriber_version: 1,
149            floor_version: 10,
150        };
151        let b = ChangelogGap {
152            subscriber_version: 1,
153            floor_version: 10,
154        };
155        assert_eq!(a, b);
156    }
157
158    #[test]
159    fn now_nanos_returns_positive() {
160        let ts = GraphDiff::now_nanos();
161        assert!(ts > 0, "timestamp should be positive, got {ts}");
162    }
163}