Skip to main content

maw/model/
ordering.rs

1//! Ordering key for causal ordering of operations (§5.9).
2//!
3//! Manifold uses a composite ordering key `(epoch_id, workspace_id, seq)` for
4//! deterministic causal ordering. Wall clock is informational only — never used
5//! for correctness, but clamped monotonically so it's always non-decreasing.
6//!
7//! # Ordering semantics
8//!
9//! The authoritative ordering triple is `(epoch_id, workspace_id, seq)`.
10//! Wall clock is display-only and excluded from `Ord`/`PartialOrd`.
11//!
12//! Within a single workspace, `seq` is strictly monotonically increasing.
13//! Across workspaces, ties are broken by `workspace_id` (lexicographic).
14//! Across epochs, ties are broken by `epoch_id` (hex-string lexicographic).
15
16use std::cmp::Ordering;
17use std::fmt;
18use std::time::{SystemTime, UNIX_EPOCH};
19
20use serde::{Deserialize, Serialize};
21
22use crate::model::types::{EpochId, WorkspaceId};
23
24// ---------------------------------------------------------------------------
25// OrderingKey
26// ---------------------------------------------------------------------------
27
28/// Composite ordering key for causal ordering of operations.
29///
30/// Ordering is determined by the authoritative triple `(epoch_id, workspace_id, seq)`.
31/// The `wall_clock_ms` field is informational only and excluded from ordering.
32#[derive(Clone, Debug, Eq, Serialize, Deserialize)]
33pub struct OrderingKey {
34    /// The epoch this operation belongs to.
35    pub epoch_id: EpochId,
36    /// The workspace that produced this operation.
37    pub workspace_id: WorkspaceId,
38    /// Monotonically increasing sequence number within a workspace.
39    pub seq: u64,
40    /// Wall-clock milliseconds since Unix epoch (informational only).
41    /// Clamped: never goes backward within a workspace.
42    pub wall_clock_ms: u64,
43}
44
45impl OrderingKey {
46    /// Create a new ordering key with explicit values.
47    #[must_use]
48    pub const fn new(
49        epoch_id: EpochId,
50        workspace_id: WorkspaceId,
51        seq: u64,
52        wall_clock_ms: u64,
53    ) -> Self {
54        Self {
55            epoch_id,
56            workspace_id,
57            seq,
58            wall_clock_ms,
59        }
60    }
61}
62
63// Ordering uses ONLY the authoritative triple.
64impl PartialEq for OrderingKey {
65    fn eq(&self, other: &Self) -> bool {
66        self.epoch_id == other.epoch_id
67            && self.workspace_id == other.workspace_id
68            && self.seq == other.seq
69    }
70}
71
72impl PartialOrd for OrderingKey {
73    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74        Some(self.cmp(other))
75    }
76}
77
78impl Ord for OrderingKey {
79    fn cmp(&self, other: &Self) -> Ordering {
80        self.epoch_id
81            .as_str()
82            .cmp(other.epoch_id.as_str())
83            .then_with(|| self.workspace_id.cmp(&other.workspace_id))
84            .then_with(|| self.seq.cmp(&other.seq))
85    }
86}
87
88impl fmt::Display for OrderingKey {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        write!(
91            f,
92            "{}:{}:{}",
93            &self.epoch_id.as_str()[..8],
94            self.workspace_id,
95            self.seq,
96        )
97    }
98}
99
100// ---------------------------------------------------------------------------
101// SequenceGenerator — monotonic per-workspace sequence + wall-clock clamp
102// ---------------------------------------------------------------------------
103
104/// Per-workspace sequence and wall-clock generator.
105///
106/// Guarantees:
107/// - `seq` is strictly monotonically increasing (starts at 1).
108/// - `wall_clock_ms` is non-decreasing: `max(now_ms, last_seen + 1)`.
109///
110/// # Usage
111///
112/// ```rust,ignore
113/// let mut seq_gen = SequenceGenerator::new();
114/// let (seq, wall_ms) = seq_gen.next(); // (1, now_ms)
115/// let (seq, wall_ms) = seq_gen.next(); // (2, max(now_ms, prev+1))
116/// ```
117#[derive(Clone, Debug, Serialize, Deserialize)]
118pub struct SequenceGenerator {
119    last_seq: u64,
120    last_wall_clock_ms: u64,
121}
122
123impl SequenceGenerator {
124    /// Create a new generator starting at seq=0 (next call returns 1).
125    #[must_use]
126    pub const fn new() -> Self {
127        Self {
128            last_seq: 0,
129            last_wall_clock_ms: 0,
130        }
131    }
132
133    /// Resume from a known state (e.g., loaded from persistent storage).
134    #[must_use]
135    pub const fn resume(last_seq: u64, last_wall_clock_ms: u64) -> Self {
136        Self {
137            last_seq,
138            last_wall_clock_ms,
139        }
140    }
141
142    /// Generate the next `(seq, wall_clock_ms)` pair.
143    ///
144    /// Wall clock is clamped: if the system clock went backward (NTP step,
145    /// VM resume), we use `last_seen + 1` instead of going backward.
146    #[allow(clippy::should_implement_trait)]
147    pub fn next(&mut self) -> (u64, u64) {
148        self.last_seq += 1;
149        let now_ms = current_time_ms();
150        self.last_wall_clock_ms = now_ms.max(self.last_wall_clock_ms + 1);
151        (self.last_seq, self.last_wall_clock_ms)
152    }
153
154    /// Generate the next `(seq, wall_clock_ms)` pair using a provided wall clock.
155    ///
156    /// This is primarily for testing — in production use [`Self::next()`].
157    pub fn next_with_clock(&mut self, now_ms: u64) -> (u64, u64) {
158        self.last_seq += 1;
159        self.last_wall_clock_ms = now_ms.max(self.last_wall_clock_ms + 1);
160        (self.last_seq, self.last_wall_clock_ms)
161    }
162
163    /// The last sequence number generated (0 if none yet).
164    #[must_use]
165    pub const fn last_seq(&self) -> u64 {
166        self.last_seq
167    }
168
169    /// The last wall-clock value generated (0 if none yet).
170    #[must_use]
171    pub const fn last_wall_clock_ms(&self) -> u64 {
172        self.last_wall_clock_ms
173    }
174}
175
176impl Default for SequenceGenerator {
177    fn default() -> Self {
178        Self::new()
179    }
180}
181
182/// Get current wall-clock time in milliseconds since Unix epoch.
183fn current_time_ms() -> u64 {
184    SystemTime::now()
185        .duration_since(UNIX_EPOCH)
186        .map(|d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX))
187        .unwrap_or(0)
188}
189
190// ---------------------------------------------------------------------------
191// Tests
192// ---------------------------------------------------------------------------
193
194#[cfg(test)]
195#[allow(clippy::all, clippy::pedantic, clippy::nursery)]
196mod tests {
197    use super::*;
198    use crate::model::types::EpochId;
199
200    fn epoch(c: char) -> EpochId {
201        EpochId::new(&c.to_string().repeat(40)).unwrap()
202    }
203
204    fn ws(name: &str) -> WorkspaceId {
205        WorkspaceId::new(name).unwrap()
206    }
207
208    fn key(epoch_char: char, ws_name: &str, seq: u64, wall: u64) -> OrderingKey {
209        OrderingKey::new(epoch(epoch_char), ws(ws_name), seq, wall)
210    }
211
212    // -----------------------------------------------------------------------
213    // OrderingKey construction and display
214    // -----------------------------------------------------------------------
215
216    #[test]
217    fn ordering_key_construction() {
218        let k = key('a', "agent-1", 42, 1000);
219        assert_eq!(k.epoch_id, epoch('a'));
220        assert_eq!(k.workspace_id, ws("agent-1"));
221        assert_eq!(k.seq, 42);
222        assert_eq!(k.wall_clock_ms, 1000);
223    }
224
225    #[test]
226    fn ordering_key_display() {
227        let k = key('a', "agent-1", 5, 0);
228        let display = format!("{k}");
229        assert!(
230            display.starts_with("aaaaaaaa"),
231            "should start with epoch prefix"
232        );
233        assert!(display.contains("agent-1"), "should contain workspace id");
234        assert!(display.ends_with(":5"), "should end with seq number");
235    }
236
237    // -----------------------------------------------------------------------
238    // Ordering — authoritative triple only
239    // -----------------------------------------------------------------------
240
241    #[test]
242    fn ordering_same_epoch_same_ws_by_seq() {
243        let k1 = key('a', "w1", 1, 100);
244        let k2 = key('a', "w1", 2, 50); // wall_clock lower but seq higher
245        assert!(k1 < k2, "same epoch+ws: should order by seq");
246    }
247
248    #[test]
249    fn ordering_same_epoch_different_ws() {
250        let k1 = key('a', "agent-1", 1, 100);
251        let k2 = key('a', "agent-2", 1, 100);
252        assert!(
253            k1 < k2,
254            "same epoch+seq: should order by workspace_id lexicographic"
255        );
256    }
257
258    #[test]
259    fn ordering_different_epoch() {
260        let k1 = key('a', "w1", 100, 100);
261        let k2 = key('b', "w1", 1, 1);
262        assert!(k1 < k2, "different epoch: epoch_id comparison comes first");
263    }
264
265    #[test]
266    fn ordering_wall_clock_does_not_affect_ordering() {
267        let k1 = key('a', "w1", 1, 9999);
268        let k2 = key('a', "w1", 1, 1);
269        assert_eq!(
270            k1.cmp(&k2),
271            Ordering::Equal,
272            "wall_clock must not affect ordering"
273        );
274    }
275
276    #[test]
277    fn ordering_equality_ignores_wall_clock() {
278        let k1 = key('a', "w1", 5, 100);
279        let k2 = key('a', "w1", 5, 999);
280        assert_eq!(k1, k2, "equality should ignore wall_clock");
281    }
282
283    #[test]
284    fn ordering_inequality_by_seq() {
285        let k1 = key('a', "w1", 1, 100);
286        let k2 = key('a', "w1", 2, 100);
287        assert_ne!(k1, k2);
288    }
289
290    #[test]
291    fn ordering_is_total() {
292        // Verify transitivity: a < b < c → a < c
293        let a = key('a', "w1", 1, 0);
294        let b = key('a', "w1", 2, 0);
295        let c = key('a', "w1", 3, 0);
296        assert!(a < b);
297        assert!(b < c);
298        assert!(a < c);
299    }
300
301    // -----------------------------------------------------------------------
302    // SequenceGenerator — monotonic seq + wall-clock clamp
303    // -----------------------------------------------------------------------
304
305    #[test]
306    fn seq_gen_starts_at_zero() {
307        let seq_gen = SequenceGenerator::new();
308        assert_eq!(seq_gen.last_seq(), 0);
309        assert_eq!(seq_gen.last_wall_clock_ms(), 0);
310    }
311
312    #[test]
313    fn seq_gen_first_call_returns_1() {
314        let mut seq_gen = SequenceGenerator::new();
315        let (seq, _) = seq_gen.next_with_clock(1000);
316        assert_eq!(seq, 1);
317    }
318
319    #[test]
320    fn seq_gen_monotonic_sequence() {
321        let mut seq_gen = SequenceGenerator::new();
322        let (s1, _) = seq_gen.next_with_clock(100);
323        let (s2, _) = seq_gen.next_with_clock(200);
324        let (s3, _) = seq_gen.next_with_clock(300);
325        assert_eq!(s1, 1);
326        assert_eq!(s2, 2);
327        assert_eq!(s3, 3);
328    }
329
330    #[test]
331    fn seq_gen_wall_clock_forward() {
332        let mut seq_gen = SequenceGenerator::new();
333        let (_, w1) = seq_gen.next_with_clock(1000);
334        let (_, w2) = seq_gen.next_with_clock(2000);
335        assert_eq!(w1, 1000);
336        assert_eq!(w2, 2000);
337    }
338
339    #[test]
340    fn seq_gen_wall_clock_backward_clamped() {
341        let mut seq_gen = SequenceGenerator::new();
342        let (_, w1) = seq_gen.next_with_clock(5000);
343        assert_eq!(w1, 5000);
344
345        // Clock goes backward — should clamp to last+1
346        let (_, w2) = seq_gen.next_with_clock(3000);
347        assert_eq!(w2, 5001, "backward clock should clamp to last+1");
348
349        // Clock goes even further backward
350        let (_, w3) = seq_gen.next_with_clock(1000);
351        assert_eq!(w3, 5002, "still clamped");
352    }
353
354    #[test]
355    fn seq_gen_wall_clock_same_time_clamped() {
356        let mut seq_gen = SequenceGenerator::new();
357        let (_, w1) = seq_gen.next_with_clock(1000);
358        let (_, w2) = seq_gen.next_with_clock(1000);
359        assert_eq!(w1, 1000);
360        assert_eq!(w2, 1001, "same time should advance by 1");
361    }
362
363    #[test]
364    fn seq_gen_resume() {
365        let mut seq_gen = SequenceGenerator::resume(10, 5000);
366        assert_eq!(seq_gen.last_seq(), 10);
367        assert_eq!(seq_gen.last_wall_clock_ms(), 5000);
368
369        let (seq, wall) = seq_gen.next_with_clock(6000);
370        assert_eq!(seq, 11, "should continue from last_seq");
371        assert_eq!(wall, 6000);
372    }
373
374    #[test]
375    fn seq_gen_resume_backward_clock() {
376        let mut seq_gen = SequenceGenerator::resume(5, 10000);
377
378        // Clock went backward (VM resume scenario)
379        let (seq, wall) = seq_gen.next_with_clock(8000);
380        assert_eq!(seq, 6);
381        assert_eq!(wall, 10001, "should clamp: max(8000, 10000+1)");
382    }
383
384    #[test]
385    fn seq_gen_next_uses_real_clock() {
386        let mut seq_gen = SequenceGenerator::new();
387        let (seq, wall) = seq_gen.next();
388        assert_eq!(seq, 1);
389        assert!(wall > 0, "wall clock should be positive from system time");
390        // Sanity: wall clock should be after 2024-01-01 (1704067200000 ms)
391        assert!(
392            wall > 1_704_067_200_000,
393            "wall clock {wall} seems too small"
394        );
395    }
396
397    // -----------------------------------------------------------------------
398    // Serde round-trip
399    // -----------------------------------------------------------------------
400
401    #[test]
402    fn ordering_key_serde_roundtrip() {
403        let k = key('f', "agent-3", 99, 123_456_789);
404        let json = serde_json::to_string(&k).unwrap();
405        let parsed: OrderingKey = serde_json::from_str(&json).unwrap();
406        assert_eq!(parsed.epoch_id, k.epoch_id);
407        assert_eq!(parsed.workspace_id, k.workspace_id);
408        assert_eq!(parsed.seq, k.seq);
409        assert_eq!(parsed.wall_clock_ms, k.wall_clock_ms);
410    }
411
412    #[test]
413    fn seq_gen_serde_roundtrip() {
414        let mut seq_gen = SequenceGenerator::new();
415        seq_gen.next_with_clock(5000);
416        seq_gen.next_with_clock(6000);
417
418        let json = serde_json::to_string(&seq_gen).unwrap();
419        let restored: SequenceGenerator = serde_json::from_str(&json).unwrap();
420        assert_eq!(restored.last_seq(), seq_gen.last_seq());
421        assert_eq!(restored.last_wall_clock_ms(), seq_gen.last_wall_clock_ms());
422    }
423
424    // -----------------------------------------------------------------------
425    // Ordering consistency with causal chain
426    // -----------------------------------------------------------------------
427
428    #[test]
429    fn causal_chain_ordering() {
430        // Simulate a workspace producing operations in sequence
431        let mut seq_gen = SequenceGenerator::new();
432        let e = epoch('a');
433        let w = ws("agent-1");
434
435        let mut keys = Vec::new();
436        for clock in [100, 200, 300, 400, 500] {
437            let (seq, wall) = seq_gen.next_with_clock(clock);
438            keys.push(OrderingKey::new(e.clone(), w.clone(), seq, wall));
439        }
440
441        // Verify strict ascending order
442        for window in keys.windows(2) {
443            assert!(
444                window[0] < window[1],
445                "causal chain must be strictly ascending: {:?} should be < {:?}",
446                window[0],
447                window[1]
448            );
449        }
450    }
451
452    #[test]
453    fn causal_chain_with_backward_clock() {
454        // NTP step scenario: clock goes backward mid-chain
455        let mut seq_gen = SequenceGenerator::new();
456        let e = epoch('b');
457        let w = ws("agent-2");
458
459        let clocks = [1000, 2000, 500, 300, 4000]; // backward at index 2,3
460        let mut keys = Vec::new();
461        for &clock in &clocks {
462            let (seq, wall) = seq_gen.next_with_clock(clock);
463            keys.push(OrderingKey::new(e.clone(), w.clone(), seq, wall));
464        }
465
466        // Wall clocks should be monotonically non-decreasing
467        for window in keys.windows(2) {
468            assert!(
469                window[0].wall_clock_ms < window[1].wall_clock_ms,
470                "wall clock must be strictly increasing after clamp"
471            );
472        }
473
474        // Ordering must still be strictly ascending
475        for window in keys.windows(2) {
476            assert!(window[0] < window[1]);
477        }
478    }
479
480    #[test]
481    fn cross_workspace_ordering_deterministic() {
482        // Two workspaces in same epoch — ordering is by workspace_id then seq
483        let e = epoch('a');
484        let keys = vec![
485            OrderingKey::new(e.clone(), ws("alpha"), 1, 100),
486            OrderingKey::new(e.clone(), ws("alpha"), 2, 200),
487            OrderingKey::new(e.clone(), ws("beta"), 1, 150),
488            OrderingKey::new(e, ws("beta"), 2, 250),
489        ];
490
491        let mut sorted = keys;
492        sorted.sort();
493
494        // Expected: alpha:1, alpha:2, beta:1, beta:2
495        assert_eq!(sorted[0].workspace_id, ws("alpha"));
496        assert_eq!(sorted[0].seq, 1);
497        assert_eq!(sorted[1].workspace_id, ws("alpha"));
498        assert_eq!(sorted[1].seq, 2);
499        assert_eq!(sorted[2].workspace_id, ws("beta"));
500        assert_eq!(sorted[2].seq, 1);
501        assert_eq!(sorted[3].workspace_id, ws("beta"));
502        assert_eq!(sorted[3].seq, 2);
503    }
504}