Skip to main content

oxirs_stream/consistency/
mod.rs

1//! # Consistency Protocols
2//!
3//! Configurable consistency levels for stream operations, including:
4//!
5//! - [`StreamConsistencyLevel`]: Consistency semantics (Eventual, ReadYourWrites, etc.)
6//! - [`ConsistencyConfig`]: Configuration for read/write consistency with retries
7//! - [`VersionedValue`]: A value tagged with version, timestamp, and origin node
8//! - [`ConsistencyManager`]: Enforces monotonic-read and monotonic-write guarantees per session
9//! - [`EventualConsistencyBuffer`]: Batches writes for eventual synchronisation
10
11use std::collections::HashMap;
12
13// ─── StreamConsistencyLevel ───────────────────────────────────────────────────
14
15/// Consistency semantics for stream read and write operations.
16///
17/// These map to well-known distributed-systems consistency models.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum StreamConsistencyLevel {
20    /// Replicas converge eventually; no ordering guarantees.
21    Eventual,
22    /// A client always reads values it previously wrote.
23    ReadYourWrites,
24    /// Read responses are non-decreasing in version for a session.
25    MonotonicRead,
26    /// Write responses are non-decreasing in version for a session.
27    MonotonicWrite,
28    /// All reads reflect all preceding writes globally.
29    Strong,
30    /// Operations appear instantaneous and globally ordered.
31    Linearizable,
32}
33
34// ─── ConsistencyConfig ────────────────────────────────────────────────────────
35
36/// Configures read/write consistency, timeouts, and retry behaviour.
37#[derive(Debug, Clone)]
38pub struct ConsistencyConfig {
39    pub read_level: StreamConsistencyLevel,
40    pub write_level: StreamConsistencyLevel,
41    pub timeout_ms: u64,
42    pub retry_count: usize,
43}
44
45impl Default for ConsistencyConfig {
46    fn default() -> Self {
47        Self {
48            read_level: StreamConsistencyLevel::Eventual,
49            write_level: StreamConsistencyLevel::Eventual,
50            timeout_ms: 1000,
51            retry_count: 3,
52        }
53    }
54}
55
56// ─── VersionedValue ───────────────────────────────────────────────────────────
57
58/// A value tagged with a logical version, wall-clock timestamp, and originating node.
59#[derive(Debug, Clone)]
60pub struct VersionedValue<T> {
61    pub value: T,
62    pub version: u64,
63    pub timestamp: i64,
64    pub node_id: String,
65}
66
67impl<T> VersionedValue<T> {
68    pub fn new(value: T, version: u64, timestamp: i64, node_id: impl Into<String>) -> Self {
69        Self {
70            value,
71            version,
72            timestamp,
73            node_id: node_id.into(),
74        }
75    }
76}
77
78// ─── ConsistencyManager ───────────────────────────────────────────────────────
79
80/// Enforces per-session monotonic-read and monotonic-write guarantees.
81///
82/// Sessions are identified by arbitrary string tokens (e.g., client IDs).
83pub struct ConsistencyManager {
84    config: ConsistencyConfig,
85    /// session_id → minimum version that reads must satisfy (MonotonicRead)
86    monotonic_read_version: HashMap<String, u64>,
87    /// session_id → last write version (MonotonicWrite)
88    monotonic_write_version: HashMap<String, u64>,
89}
90
91impl ConsistencyManager {
92    /// Create a new manager with the given configuration.
93    pub fn new(config: ConsistencyConfig) -> Self {
94        Self {
95            config,
96            monotonic_read_version: HashMap::new(),
97            monotonic_write_version: HashMap::new(),
98        }
99    }
100
101    /// Determine whether a read of `value` is acceptable for `session_id`.
102    ///
103    /// - `Eventual` / `ReadYourWrites` / `Strong` / `Linearizable`: always `true`
104    /// - `MonotonicRead`: `value.version >= session's last-seen version`
105    pub fn can_read<T>(&mut self, session_id: &str, value: &VersionedValue<T>) -> bool {
106        match self.config.read_level {
107            StreamConsistencyLevel::MonotonicRead => {
108                let min_ver = self
109                    .monotonic_read_version
110                    .get(session_id)
111                    .copied()
112                    .unwrap_or(0);
113                value.version >= min_ver
114            }
115            _ => true,
116        }
117    }
118
119    /// Record that `session_id` has observed `value` (update monotonic-read floor).
120    pub fn after_read<T>(&mut self, session_id: &str, value: &VersionedValue<T>) {
121        let entry = self
122            .monotonic_read_version
123            .entry(session_id.to_string())
124            .or_insert(0);
125        if value.version > *entry {
126            *entry = value.version;
127        }
128    }
129
130    /// Determine whether a write of `value` is valid for `session_id`.
131    ///
132    /// - `MonotonicWrite`: `value.version >= session's last-write version`
133    /// - everything else: always `true`
134    pub fn can_write<T>(&mut self, session_id: &str, value: &VersionedValue<T>) -> bool {
135        match self.config.write_level {
136            StreamConsistencyLevel::MonotonicWrite => {
137                let last_ver = self
138                    .monotonic_write_version
139                    .get(session_id)
140                    .copied()
141                    .unwrap_or(0);
142                value.version >= last_ver
143            }
144            _ => true,
145        }
146    }
147
148    /// Record that `session_id` performed a write at `value`'s version.
149    pub fn after_write<T>(&mut self, session_id: &str, value: &VersionedValue<T>) {
150        let entry = self
151            .monotonic_write_version
152            .entry(session_id.to_string())
153            .or_insert(0);
154        if value.version > *entry {
155            *entry = value.version;
156        }
157    }
158
159    /// Return `true` if `value.version` is strictly behind `current_version`
160    /// (i.e., the value is stale).
161    pub fn is_stale<T>(&self, value: &VersionedValue<T>, current_version: u64) -> bool {
162        value.version < current_version
163    }
164
165    /// Reference to the active configuration.
166    pub fn config(&self) -> &ConsistencyConfig {
167        &self.config
168    }
169
170    /// How many sessions are tracked for monotonic-read.
171    pub fn monotonic_read_session_count(&self) -> usize {
172        self.monotonic_read_version.len()
173    }
174
175    /// How many sessions are tracked for monotonic-write.
176    pub fn monotonic_write_session_count(&self) -> usize {
177        self.monotonic_write_version.len()
178    }
179}
180
181// ─── EventualConsistencyBuffer ────────────────────────────────────────────────
182
183/// Buffers writes for eventual propagation, flushing when `max_lag_ms` has passed.
184pub struct EventualConsistencyBuffer {
185    pending: Vec<(String, Vec<u8>)>,
186    max_lag_ms: u64,
187    last_sync_ms: i64,
188}
189
190impl EventualConsistencyBuffer {
191    /// Create a buffer that flushes after at most `max_lag_ms` milliseconds.
192    pub fn new(max_lag_ms: u64) -> Self {
193        Self {
194            pending: Vec::new(),
195            max_lag_ms,
196            last_sync_ms: current_ms(),
197        }
198    }
199
200    /// Stage a key-value write for eventual propagation.
201    pub fn buffer(&mut self, key: &str, value: Vec<u8>) {
202        self.pending.push((key.to_string(), value));
203    }
204
205    /// Return `true` when the lag since last sync exceeds `max_lag_ms`.
206    pub fn should_sync(&self, now_ms: i64) -> bool {
207        now_ms - self.last_sync_ms >= self.max_lag_ms as i64
208    }
209
210    /// Drain all pending writes and reset the sync timer.
211    pub fn drain(&mut self) -> Vec<(String, Vec<u8>)> {
212        self.last_sync_ms = current_ms();
213        std::mem::take(&mut self.pending)
214    }
215
216    /// Number of writes currently buffered.
217    pub fn pending_count(&self) -> usize {
218        self.pending.len()
219    }
220}
221
222/// Helper: current wall-clock time in milliseconds.
223fn current_ms() -> i64 {
224    std::time::SystemTime::now()
225        .duration_since(std::time::UNIX_EPOCH)
226        .map(|d| d.as_millis() as i64)
227        .unwrap_or(0)
228}
229
230// ─── Tests ────────────────────────────────────────────────────────────────────
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    fn make_value<T>(value: T, version: u64) -> VersionedValue<T> {
237        VersionedValue::new(value, version, 1_700_000_000_000, "node-1")
238    }
239
240    // ── ConsistencyConfig ──────────────────────────────────────────────────────
241
242    #[test]
243    fn test_default_config() {
244        let cfg = ConsistencyConfig::default();
245        assert_eq!(cfg.read_level, StreamConsistencyLevel::Eventual);
246        assert_eq!(cfg.write_level, StreamConsistencyLevel::Eventual);
247        assert_eq!(cfg.timeout_ms, 1000);
248        assert_eq!(cfg.retry_count, 3);
249    }
250
251    #[test]
252    fn test_custom_config() {
253        let cfg = ConsistencyConfig {
254            read_level: StreamConsistencyLevel::Strong,
255            write_level: StreamConsistencyLevel::Linearizable,
256            timeout_ms: 500,
257            retry_count: 5,
258        };
259        assert_eq!(cfg.read_level, StreamConsistencyLevel::Strong);
260        assert_eq!(cfg.write_level, StreamConsistencyLevel::Linearizable);
261    }
262
263    // ── VersionedValue ─────────────────────────────────────────────────────────
264
265    #[test]
266    fn test_versioned_value_fields() {
267        let vv = make_value("hello", 42);
268        assert_eq!(vv.value, "hello");
269        assert_eq!(vv.version, 42);
270        assert_eq!(vv.node_id, "node-1");
271    }
272
273    // ── ConsistencyManager (Eventual) ─────────────────────────────────────────
274
275    #[test]
276    fn test_eventual_always_allows_read_and_write() {
277        let mut mgr = ConsistencyManager::new(ConsistencyConfig::default());
278        let vv = make_value(1u32, 1);
279        assert!(mgr.can_read("s1", &vv));
280        assert!(mgr.can_write("s1", &vv));
281    }
282
283    // ── ConsistencyManager (MonotonicRead) ────────────────────────────────────
284
285    #[test]
286    fn test_monotonic_read_initial_allows_any_version() {
287        let mut mgr = ConsistencyManager::new(ConsistencyConfig {
288            read_level: StreamConsistencyLevel::MonotonicRead,
289            ..Default::default()
290        });
291        let vv = make_value(0u32, 0);
292        assert!(mgr.can_read("sess", &vv));
293    }
294
295    #[test]
296    fn test_monotonic_read_blocks_regression() {
297        let mut mgr = ConsistencyManager::new(ConsistencyConfig {
298            read_level: StreamConsistencyLevel::MonotonicRead,
299            ..Default::default()
300        });
301        let v5 = make_value(0u32, 5);
302        mgr.after_read("sess", &v5);
303
304        // version 4 is a regression – must be blocked
305        let v4 = make_value(0u32, 4);
306        assert!(!mgr.can_read("sess", &v4));
307
308        // version 5 is same level – must be allowed
309        assert!(mgr.can_read("sess", &v5));
310
311        // version 6 advances – must be allowed
312        let v6 = make_value(0u32, 6);
313        assert!(mgr.can_read("sess", &v6));
314    }
315
316    #[test]
317    fn test_monotonic_read_sessions_independent() {
318        let mut mgr = ConsistencyManager::new(ConsistencyConfig {
319            read_level: StreamConsistencyLevel::MonotonicRead,
320            ..Default::default()
321        });
322        let v10 = make_value(0u32, 10);
323        mgr.after_read("session-A", &v10);
324
325        // session-B has no history; version 1 is fine
326        let v1 = make_value(0u32, 1);
327        assert!(mgr.can_read("session-B", &v1));
328    }
329
330    #[test]
331    fn test_after_read_tracks_max_version() {
332        let mut mgr = ConsistencyManager::new(ConsistencyConfig {
333            read_level: StreamConsistencyLevel::MonotonicRead,
334            ..Default::default()
335        });
336        mgr.after_read("s", &make_value(0u32, 3));
337        mgr.after_read("s", &make_value(0u32, 7));
338        mgr.after_read("s", &make_value(0u32, 5)); // lower – should be ignored
339
340        // After seeing version 7, the monotonic floor is 7.
341        // v7 (== floor) must be allowed
342        let v7 = make_value(0u32, 7);
343        assert!(mgr.can_read("s", &v7));
344        // v8 (> floor) must be allowed
345        let v8 = make_value(0u32, 8);
346        assert!(mgr.can_read("s", &v8));
347        // v4 (< floor) must be blocked
348        let v4 = make_value(0u32, 4);
349        assert!(!mgr.can_read("s", &v4));
350        // v6 (< floor) must also be blocked
351        let v6 = make_value(0u32, 6);
352        assert!(!mgr.can_read("s", &v6));
353    }
354
355    // ── ConsistencyManager (MonotonicWrite) ───────────────────────────────────
356
357    #[test]
358    fn test_monotonic_write_initial_allows_any_version() {
359        let mut mgr = ConsistencyManager::new(ConsistencyConfig {
360            write_level: StreamConsistencyLevel::MonotonicWrite,
361            ..Default::default()
362        });
363        let vv = make_value(0u32, 0);
364        assert!(mgr.can_write("sess", &vv));
365    }
366
367    #[test]
368    fn test_monotonic_write_blocks_regression() {
369        let mut mgr = ConsistencyManager::new(ConsistencyConfig {
370            write_level: StreamConsistencyLevel::MonotonicWrite,
371            ..Default::default()
372        });
373        let v5 = make_value(0u32, 5);
374        mgr.after_write("sess", &v5);
375
376        let v4 = make_value(0u32, 4);
377        assert!(!mgr.can_write("sess", &v4));
378
379        let v5b = make_value(0u32, 5);
380        assert!(mgr.can_write("sess", &v5b));
381
382        let v6 = make_value(0u32, 6);
383        assert!(mgr.can_write("sess", &v6));
384    }
385
386    // ── ConsistencyManager (is_stale) ─────────────────────────────────────────
387
388    #[test]
389    fn test_is_stale_behind_current() {
390        let mgr = ConsistencyManager::new(ConsistencyConfig::default());
391        let vv = make_value(0u32, 4);
392        assert!(mgr.is_stale(&vv, 10));
393    }
394
395    #[test]
396    fn test_is_not_stale_at_current() {
397        let mgr = ConsistencyManager::new(ConsistencyConfig::default());
398        let vv = make_value(0u32, 10);
399        assert!(!mgr.is_stale(&vv, 10));
400    }
401
402    #[test]
403    fn test_is_not_stale_ahead_current() {
404        let mgr = ConsistencyManager::new(ConsistencyConfig::default());
405        let vv = make_value(0u32, 12);
406        assert!(!mgr.is_stale(&vv, 10));
407    }
408
409    // ── ConsistencyManager (session counts) ───────────────────────────────────
410
411    #[test]
412    fn test_session_tracking_counts() {
413        let mut mgr = ConsistencyManager::new(ConsistencyConfig {
414            read_level: StreamConsistencyLevel::MonotonicRead,
415            write_level: StreamConsistencyLevel::MonotonicWrite,
416            ..Default::default()
417        });
418        assert_eq!(mgr.monotonic_read_session_count(), 0);
419        assert_eq!(mgr.monotonic_write_session_count(), 0);
420
421        mgr.after_read("r-sess", &make_value(0u32, 1));
422        mgr.after_write("w-sess", &make_value(0u32, 1));
423
424        assert_eq!(mgr.monotonic_read_session_count(), 1);
425        assert_eq!(mgr.monotonic_write_session_count(), 1);
426    }
427
428    // ── EventualConsistencyBuffer ──────────────────────────────────────────────
429
430    #[test]
431    fn test_buffer_initial_empty() {
432        let buf = EventualConsistencyBuffer::new(500);
433        assert_eq!(buf.pending_count(), 0);
434    }
435
436    #[test]
437    fn test_buffer_and_count() {
438        let mut buf = EventualConsistencyBuffer::new(500);
439        buf.buffer("key1", b"val1".to_vec());
440        buf.buffer("key2", b"val2".to_vec());
441        assert_eq!(buf.pending_count(), 2);
442    }
443
444    #[test]
445    fn test_buffer_drain() {
446        let mut buf = EventualConsistencyBuffer::new(500);
447        buf.buffer("k", b"v".to_vec());
448        let drained = buf.drain();
449        assert_eq!(drained.len(), 1);
450        assert_eq!(drained[0].0, "k");
451        assert_eq!(drained[0].1, b"v");
452        assert_eq!(buf.pending_count(), 0);
453    }
454
455    #[test]
456    fn test_should_sync_past_deadline() {
457        let buf = EventualConsistencyBuffer::new(100);
458        let far_future = current_ms() + 10_000; // 10 seconds in the future
459        assert!(buf.should_sync(far_future));
460    }
461
462    #[test]
463    fn test_should_not_sync_before_deadline() {
464        let buf = EventualConsistencyBuffer::new(60_000); // 60 second lag
465        let now = current_ms();
466        assert!(!buf.should_sync(now));
467    }
468
469    #[test]
470    fn test_drain_resets_timer() {
471        let mut buf = EventualConsistencyBuffer::new(100);
472        buf.buffer("x", b"1".to_vec());
473        let _ = buf.drain();
474        // immediately after drain the timer is reset; now_ms == last_sync_ms ≈ now
475        let now = current_ms();
476        assert!(!buf.should_sync(now));
477    }
478}