Skip to main content

haystack_core/graph/
changelog.rs

1// Graph change tracking — records mutations for replication / undo.
2
3use std::fmt;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::data::HDict;
7
8/// Default changelog capacity (50,000 entries).
9pub const DEFAULT_CHANGELOG_CAPACITY: usize = 50_000;
10
11/// The kind of mutation that was applied.
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum DiffOp {
14    /// A new entity was added.
15    Add,
16    /// An existing entity was updated.
17    Update,
18    /// An entity was removed.
19    Remove,
20}
21
22/// A single mutation record.
23///
24/// Note: Each GraphDiff stores full entity clones (old and/or new). With the default
25/// changelog capacity (50,000 entries), this can use significant memory for entities
26/// with many tags or large string values.
27#[derive(Debug, Clone)]
28pub struct GraphDiff {
29    /// The graph version *after* this mutation.
30    pub version: u64,
31    /// Wall-clock timestamp as Unix nanoseconds (0 if unavailable).
32    pub timestamp: i64,
33    /// The kind of mutation.
34    pub op: DiffOp,
35    /// The entity's ref value.
36    pub ref_val: String,
37    /// The entity state before the mutation (Some for Remove; None for Add/Update).
38    pub old: Option<HDict>,
39    /// The entity state after the mutation (Some for Add; None for Remove/Update).
40    pub new: Option<HDict>,
41    /// For Update: only the tags that changed, with their **new** values.
42    pub changed_tags: Option<HDict>,
43    /// For Update: only the tags that changed, with their **previous** values.
44    pub previous_tags: Option<HDict>,
45}
46
47impl GraphDiff {
48    /// Returns the current wall-clock time as Unix nanoseconds.
49    pub(crate) fn now_nanos() -> i64 {
50        SystemTime::now()
51            .duration_since(UNIX_EPOCH)
52            .map(|d| i64::try_from(d.as_nanos()).unwrap_or(i64::MAX))
53            .unwrap_or(0)
54    }
55}
56
57/// Error returned when a subscriber has fallen behind and the changelog
58/// no longer contains entries at their requested version.
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub struct ChangelogGap {
61    /// The version the subscriber requested changes since.
62    pub subscriber_version: u64,
63    /// The lowest version still retained in the changelog.
64    pub floor_version: u64,
65}
66
67impl fmt::Display for ChangelogGap {
68    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
69        write!(
70            f,
71            "changelog gap: subscriber at version {}, oldest available is {}",
72            self.subscriber_version, self.floor_version
73        )
74    }
75}
76
77impl std::error::Error for ChangelogGap {}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82
83    #[test]
84    fn diff_op_equality() {
85        assert_eq!(DiffOp::Add, DiffOp::Add);
86        assert_ne!(DiffOp::Add, DiffOp::Update);
87        assert_ne!(DiffOp::Update, DiffOp::Remove);
88    }
89
90    #[test]
91    fn diff_op_clone() {
92        let op = DiffOp::Remove;
93        let cloned = op.clone();
94        assert_eq!(op, cloned);
95    }
96
97    #[test]
98    fn graph_diff_construction() {
99        let diff = GraphDiff {
100            version: 1,
101            timestamp: 0,
102            op: DiffOp::Add,
103            ref_val: "site-1".to_string(),
104            old: None,
105            new: Some(HDict::new()),
106            changed_tags: None,
107            previous_tags: None,
108        };
109        assert_eq!(diff.version, 1);
110        assert_eq!(diff.op, DiffOp::Add);
111        assert_eq!(diff.ref_val, "site-1");
112        assert!(diff.old.is_none());
113        assert!(diff.new.is_some());
114        assert!(diff.changed_tags.is_none());
115        assert!(diff.previous_tags.is_none());
116    }
117
118    #[test]
119    fn graph_diff_clone() {
120        let diff = GraphDiff {
121            version: 2,
122            timestamp: 0,
123            op: DiffOp::Update,
124            ref_val: "equip-1".to_string(),
125            old: None,
126            new: None,
127            changed_tags: Some(HDict::new()),
128            previous_tags: Some(HDict::new()),
129        };
130        let cloned = diff.clone();
131        assert_eq!(cloned.version, 2);
132        assert_eq!(cloned.op, DiffOp::Update);
133        assert_eq!(cloned.ref_val, "equip-1");
134        assert!(cloned.changed_tags.is_some());
135        assert!(cloned.previous_tags.is_some());
136    }
137
138    #[test]
139    fn changelog_gap_display() {
140        let gap = ChangelogGap {
141            subscriber_version: 5,
142            floor_version: 100,
143        };
144        let msg = format!("{gap}");
145        assert!(msg.contains("5"));
146        assert!(msg.contains("100"));
147    }
148
149    #[test]
150    fn changelog_gap_equality() {
151        let a = ChangelogGap {
152            subscriber_version: 1,
153            floor_version: 10,
154        };
155        let b = ChangelogGap {
156            subscriber_version: 1,
157            floor_version: 10,
158        };
159        assert_eq!(a, b);
160    }
161
162    #[test]
163    fn now_nanos_returns_positive() {
164        let ts = GraphDiff::now_nanos();
165        assert!(ts > 0, "timestamp should be positive, got {ts}");
166    }
167}