oxirs_stream/consistency/
mod.rs1use std::collections::HashMap;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum StreamConsistencyLevel {
20 Eventual,
22 ReadYourWrites,
24 MonotonicRead,
26 MonotonicWrite,
28 Strong,
30 Linearizable,
32}
33
34#[derive(Debug, Clone)]
38pub struct ConsistencyConfig {
39 pub read_level: StreamConsistencyLevel,
40 pub write_level: StreamConsistencyLevel,
41 pub timeout_ms: u64,
42 pub retry_count: usize,
43}
44
45impl Default for ConsistencyConfig {
46 fn default() -> Self {
47 Self {
48 read_level: StreamConsistencyLevel::Eventual,
49 write_level: StreamConsistencyLevel::Eventual,
50 timeout_ms: 1000,
51 retry_count: 3,
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
60pub struct VersionedValue<T> {
61 pub value: T,
62 pub version: u64,
63 pub timestamp: i64,
64 pub node_id: String,
65}
66
67impl<T> VersionedValue<T> {
68 pub fn new(value: T, version: u64, timestamp: i64, node_id: impl Into<String>) -> Self {
69 Self {
70 value,
71 version,
72 timestamp,
73 node_id: node_id.into(),
74 }
75 }
76}
77
78pub struct ConsistencyManager {
84 config: ConsistencyConfig,
85 monotonic_read_version: HashMap<String, u64>,
87 monotonic_write_version: HashMap<String, u64>,
89}
90
91impl ConsistencyManager {
92 pub fn new(config: ConsistencyConfig) -> Self {
94 Self {
95 config,
96 monotonic_read_version: HashMap::new(),
97 monotonic_write_version: HashMap::new(),
98 }
99 }
100
101 pub fn can_read<T>(&mut self, session_id: &str, value: &VersionedValue<T>) -> bool {
106 match self.config.read_level {
107 StreamConsistencyLevel::MonotonicRead => {
108 let min_ver = self
109 .monotonic_read_version
110 .get(session_id)
111 .copied()
112 .unwrap_or(0);
113 value.version >= min_ver
114 }
115 _ => true,
116 }
117 }
118
119 pub fn after_read<T>(&mut self, session_id: &str, value: &VersionedValue<T>) {
121 let entry = self
122 .monotonic_read_version
123 .entry(session_id.to_string())
124 .or_insert(0);
125 if value.version > *entry {
126 *entry = value.version;
127 }
128 }
129
130 pub fn can_write<T>(&mut self, session_id: &str, value: &VersionedValue<T>) -> bool {
135 match self.config.write_level {
136 StreamConsistencyLevel::MonotonicWrite => {
137 let last_ver = self
138 .monotonic_write_version
139 .get(session_id)
140 .copied()
141 .unwrap_or(0);
142 value.version >= last_ver
143 }
144 _ => true,
145 }
146 }
147
148 pub fn after_write<T>(&mut self, session_id: &str, value: &VersionedValue<T>) {
150 let entry = self
151 .monotonic_write_version
152 .entry(session_id.to_string())
153 .or_insert(0);
154 if value.version > *entry {
155 *entry = value.version;
156 }
157 }
158
159 pub fn is_stale<T>(&self, value: &VersionedValue<T>, current_version: u64) -> bool {
162 value.version < current_version
163 }
164
165 pub fn config(&self) -> &ConsistencyConfig {
167 &self.config
168 }
169
170 pub fn monotonic_read_session_count(&self) -> usize {
172 self.monotonic_read_version.len()
173 }
174
175 pub fn monotonic_write_session_count(&self) -> usize {
177 self.monotonic_write_version.len()
178 }
179}
180
181pub struct EventualConsistencyBuffer {
185 pending: Vec<(String, Vec<u8>)>,
186 max_lag_ms: u64,
187 last_sync_ms: i64,
188}
189
190impl EventualConsistencyBuffer {
191 pub fn new(max_lag_ms: u64) -> Self {
193 Self {
194 pending: Vec::new(),
195 max_lag_ms,
196 last_sync_ms: current_ms(),
197 }
198 }
199
200 pub fn buffer(&mut self, key: &str, value: Vec<u8>) {
202 self.pending.push((key.to_string(), value));
203 }
204
205 pub fn should_sync(&self, now_ms: i64) -> bool {
207 now_ms - self.last_sync_ms >= self.max_lag_ms as i64
208 }
209
210 pub fn drain(&mut self) -> Vec<(String, Vec<u8>)> {
212 self.last_sync_ms = current_ms();
213 std::mem::take(&mut self.pending)
214 }
215
216 pub fn pending_count(&self) -> usize {
218 self.pending.len()
219 }
220}
221
222fn current_ms() -> i64 {
224 std::time::SystemTime::now()
225 .duration_since(std::time::UNIX_EPOCH)
226 .map(|d| d.as_millis() as i64)
227 .unwrap_or(0)
228}
229
230#[cfg(test)]
233mod tests {
234 use super::*;
235
236 fn make_value<T>(value: T, version: u64) -> VersionedValue<T> {
237 VersionedValue::new(value, version, 1_700_000_000_000, "node-1")
238 }
239
240 #[test]
243 fn test_default_config() {
244 let cfg = ConsistencyConfig::default();
245 assert_eq!(cfg.read_level, StreamConsistencyLevel::Eventual);
246 assert_eq!(cfg.write_level, StreamConsistencyLevel::Eventual);
247 assert_eq!(cfg.timeout_ms, 1000);
248 assert_eq!(cfg.retry_count, 3);
249 }
250
251 #[test]
252 fn test_custom_config() {
253 let cfg = ConsistencyConfig {
254 read_level: StreamConsistencyLevel::Strong,
255 write_level: StreamConsistencyLevel::Linearizable,
256 timeout_ms: 500,
257 retry_count: 5,
258 };
259 assert_eq!(cfg.read_level, StreamConsistencyLevel::Strong);
260 assert_eq!(cfg.write_level, StreamConsistencyLevel::Linearizable);
261 }
262
263 #[test]
266 fn test_versioned_value_fields() {
267 let vv = make_value("hello", 42);
268 assert_eq!(vv.value, "hello");
269 assert_eq!(vv.version, 42);
270 assert_eq!(vv.node_id, "node-1");
271 }
272
273 #[test]
276 fn test_eventual_always_allows_read_and_write() {
277 let mut mgr = ConsistencyManager::new(ConsistencyConfig::default());
278 let vv = make_value(1u32, 1);
279 assert!(mgr.can_read("s1", &vv));
280 assert!(mgr.can_write("s1", &vv));
281 }
282
283 #[test]
286 fn test_monotonic_read_initial_allows_any_version() {
287 let mut mgr = ConsistencyManager::new(ConsistencyConfig {
288 read_level: StreamConsistencyLevel::MonotonicRead,
289 ..Default::default()
290 });
291 let vv = make_value(0u32, 0);
292 assert!(mgr.can_read("sess", &vv));
293 }
294
295 #[test]
296 fn test_monotonic_read_blocks_regression() {
297 let mut mgr = ConsistencyManager::new(ConsistencyConfig {
298 read_level: StreamConsistencyLevel::MonotonicRead,
299 ..Default::default()
300 });
301 let v5 = make_value(0u32, 5);
302 mgr.after_read("sess", &v5);
303
304 let v4 = make_value(0u32, 4);
306 assert!(!mgr.can_read("sess", &v4));
307
308 assert!(mgr.can_read("sess", &v5));
310
311 let v6 = make_value(0u32, 6);
313 assert!(mgr.can_read("sess", &v6));
314 }
315
316 #[test]
317 fn test_monotonic_read_sessions_independent() {
318 let mut mgr = ConsistencyManager::new(ConsistencyConfig {
319 read_level: StreamConsistencyLevel::MonotonicRead,
320 ..Default::default()
321 });
322 let v10 = make_value(0u32, 10);
323 mgr.after_read("session-A", &v10);
324
325 let v1 = make_value(0u32, 1);
327 assert!(mgr.can_read("session-B", &v1));
328 }
329
330 #[test]
331 fn test_after_read_tracks_max_version() {
332 let mut mgr = ConsistencyManager::new(ConsistencyConfig {
333 read_level: StreamConsistencyLevel::MonotonicRead,
334 ..Default::default()
335 });
336 mgr.after_read("s", &make_value(0u32, 3));
337 mgr.after_read("s", &make_value(0u32, 7));
338 mgr.after_read("s", &make_value(0u32, 5)); let v7 = make_value(0u32, 7);
343 assert!(mgr.can_read("s", &v7));
344 let v8 = make_value(0u32, 8);
346 assert!(mgr.can_read("s", &v8));
347 let v4 = make_value(0u32, 4);
349 assert!(!mgr.can_read("s", &v4));
350 let v6 = make_value(0u32, 6);
352 assert!(!mgr.can_read("s", &v6));
353 }
354
355 #[test]
358 fn test_monotonic_write_initial_allows_any_version() {
359 let mut mgr = ConsistencyManager::new(ConsistencyConfig {
360 write_level: StreamConsistencyLevel::MonotonicWrite,
361 ..Default::default()
362 });
363 let vv = make_value(0u32, 0);
364 assert!(mgr.can_write("sess", &vv));
365 }
366
367 #[test]
368 fn test_monotonic_write_blocks_regression() {
369 let mut mgr = ConsistencyManager::new(ConsistencyConfig {
370 write_level: StreamConsistencyLevel::MonotonicWrite,
371 ..Default::default()
372 });
373 let v5 = make_value(0u32, 5);
374 mgr.after_write("sess", &v5);
375
376 let v4 = make_value(0u32, 4);
377 assert!(!mgr.can_write("sess", &v4));
378
379 let v5b = make_value(0u32, 5);
380 assert!(mgr.can_write("sess", &v5b));
381
382 let v6 = make_value(0u32, 6);
383 assert!(mgr.can_write("sess", &v6));
384 }
385
386 #[test]
389 fn test_is_stale_behind_current() {
390 let mgr = ConsistencyManager::new(ConsistencyConfig::default());
391 let vv = make_value(0u32, 4);
392 assert!(mgr.is_stale(&vv, 10));
393 }
394
395 #[test]
396 fn test_is_not_stale_at_current() {
397 let mgr = ConsistencyManager::new(ConsistencyConfig::default());
398 let vv = make_value(0u32, 10);
399 assert!(!mgr.is_stale(&vv, 10));
400 }
401
402 #[test]
403 fn test_is_not_stale_ahead_current() {
404 let mgr = ConsistencyManager::new(ConsistencyConfig::default());
405 let vv = make_value(0u32, 12);
406 assert!(!mgr.is_stale(&vv, 10));
407 }
408
409 #[test]
412 fn test_session_tracking_counts() {
413 let mut mgr = ConsistencyManager::new(ConsistencyConfig {
414 read_level: StreamConsistencyLevel::MonotonicRead,
415 write_level: StreamConsistencyLevel::MonotonicWrite,
416 ..Default::default()
417 });
418 assert_eq!(mgr.monotonic_read_session_count(), 0);
419 assert_eq!(mgr.monotonic_write_session_count(), 0);
420
421 mgr.after_read("r-sess", &make_value(0u32, 1));
422 mgr.after_write("w-sess", &make_value(0u32, 1));
423
424 assert_eq!(mgr.monotonic_read_session_count(), 1);
425 assert_eq!(mgr.monotonic_write_session_count(), 1);
426 }
427
428 #[test]
431 fn test_buffer_initial_empty() {
432 let buf = EventualConsistencyBuffer::new(500);
433 assert_eq!(buf.pending_count(), 0);
434 }
435
436 #[test]
437 fn test_buffer_and_count() {
438 let mut buf = EventualConsistencyBuffer::new(500);
439 buf.buffer("key1", b"val1".to_vec());
440 buf.buffer("key2", b"val2".to_vec());
441 assert_eq!(buf.pending_count(), 2);
442 }
443
444 #[test]
445 fn test_buffer_drain() {
446 let mut buf = EventualConsistencyBuffer::new(500);
447 buf.buffer("k", b"v".to_vec());
448 let drained = buf.drain();
449 assert_eq!(drained.len(), 1);
450 assert_eq!(drained[0].0, "k");
451 assert_eq!(drained[0].1, b"v");
452 assert_eq!(buf.pending_count(), 0);
453 }
454
455 #[test]
456 fn test_should_sync_past_deadline() {
457 let buf = EventualConsistencyBuffer::new(100);
458 let far_future = current_ms() + 10_000; assert!(buf.should_sync(far_future));
460 }
461
462 #[test]
463 fn test_should_not_sync_before_deadline() {
464 let buf = EventualConsistencyBuffer::new(60_000); let now = current_ms();
466 assert!(!buf.should_sync(now));
467 }
468
469 #[test]
470 fn test_drain_resets_timer() {
471 let mut buf = EventualConsistencyBuffer::new(100);
472 buf.buffer("x", b"1".to_vec());
473 let _ = buf.drain();
474 let now = current_ms();
476 assert!(!buf.should_sync(now));
477 }
478}