Skip to main content

causal_dag/
lib.rs

1//! Pillar: I. PACR fields: ι (node identity), Π (predecessor edges).
2//!
3//! A **lock-free, append-only** causal DAG over [`PacrRecord`] nodes.
4//!
5//! Physical axiom: special relativity mandates that causation propagates at
6//! most at the speed of light, imposing a strict **partial order** on events.
7//! This partial order is encoded in the predecessor set Π — there is no total
8//! order and no global clock.
9//!
10//! # Design
11//!
12//! | Operation           | Complexity    | Mechanism                            |
13//! |---------------------|---------------|--------------------------------------|
14//! | [`CausalDag::append`] | O(\|Π\|)    | predecessor validation + `DashMap` CAS |
15//! | [`CausalDag::get`]  | O(1) expected | `DashMap` sharded read                 |
16//! | [`CausalDag::successors`] | O(1)    | `DashMap` reverse-index read           |
17//! | [`CausalDag::predecessors`] | O(1)  | `DashMap` sharded read                 |
18//! | [`CausalDag::ancestry`] | O(V+E)    | BFS with `HashSet` visited-set         |
19//!
20//! # CRDT semantics
21//!
22//! The DAG is a **Grow-Only Set (G-Set)**: records are immutable once inserted
23//! and the only mutation is appending new records.  Two replicas converge by
24//! exchanging unseen records (union of node sets).
25//!
26//! # Concurrency
27//!
28//! [`DashMap`] uses sharded locks (not a single `RwLock`) and provides
29//! lock-free concurrent reads on already-inserted keys.  Append uses
30//! `entry()` for an atomic duplicate-check-and-insert, eliminating the
31//! TOCTOU race of a separate `contains_key` + `insert` pair.
32//!
33//! No `Mutex` or `RwLock` is used anywhere in this crate.
34
35#![forbid(unsafe_code)]
36#![deny(clippy::all, clippy::pedantic)]
37#![allow(
38    clippy::cast_precision_loss,
39    clippy::cast_possible_truncation,
40    clippy::cast_sign_loss,
41    clippy::similar_names,
42    clippy::doc_markdown,
43    clippy::must_use_candidate,
44    clippy::needless_pass_by_value,
45    clippy::missing_panics_doc,
46    clippy::missing_errors_doc,
47    clippy::return_self_not_must_use,
48    clippy::unreadable_literal
49)]
50
51pub mod distance_tax;
52pub mod merge;
53
54use std::collections::HashSet;
55use std::sync::Arc;
56
57use dashmap::mapref::entry::Entry;
58use dashmap::DashMap;
59use smallvec::SmallVec;
60use thiserror::Error;
61
62use pacr_types::{CausalId, PacrRecord};
63
64// ── Public types ──────────────────────────────────────────────────────────────
65
66/// A lock-free, append-only directed acyclic graph of [`PacrRecord`] nodes.
67///
68/// Nodes are [`CausalId`]s.  Directed edges flow from each record to its
69/// causal predecessors (Π).  A reverse index (`children`) allows O(1)
70/// forward traversal (cause → effect).
71///
72/// # Invariants maintained at all times
73///
74/// 1. **Append-only**: a record is never removed or modified after insertion.
75/// 2. **Predecessor-existence**: every non-GENESIS predecessor of a record
76///    was inserted before that record.  This makes transitive causal cycles
77///    structurally impossible (no record can be its own ancestor).
78/// 3. **No self-reference**: a record cannot list its own `id` in Π.
79#[derive(Debug)]
80pub struct CausalDag {
81    /// Primary map: `CausalId` → Arc<PacrRecord>.
82    /// `DashMap` provides O(1) sharded reads and atomic entry operations.
83    nodes: DashMap<CausalId, Arc<PacrRecord>>,
84
85    /// Reverse index: `predecessor_id` → list of successor ids.
86    /// Enables O(1) forward traversal (cause → effect).
87    /// `SmallVec<[CausalId; 4]>`: most nodes have 1–4 children (Pillar I,
88    /// inline buffer avoids heap allocation for the common case).
89    children: DashMap<CausalId, SmallVec<[CausalId; 4]>>,
90}
91
92/// Error returned by [`CausalDag::append`].
93#[derive(Debug, Clone, Error)]
94#[non_exhaustive]
95pub enum DagError {
96    /// A record with this [`CausalId`] already exists in the DAG.
97    /// Append-only invariant: duplicate IDs are forbidden.
98    #[error("duplicate causal ID: {0}")]
99    DuplicateId(CausalId),
100
101    /// The record references a predecessor that has not yet been inserted.
102    /// Physical axiom: a cause must exist before an effect can reference it.
103    #[error("missing predecessor: {child} references unknown predecessor {parent}")]
104    MissingPredecessor {
105        /// The record being inserted.
106        child: CausalId,
107        /// The predecessor that was not found in the DAG.
108        parent: CausalId,
109    },
110
111    /// The record lists its own [`CausalId`] in its predecessor set Π.
112    /// A causal self-loop violates the acyclicity invariant.
113    #[error("self-reference: {0} cannot be its own causal predecessor")]
114    SelfReference(CausalId),
115}
116
117// ── Implementation ────────────────────────────────────────────────────────────
118
119impl CausalDag {
120    /// Creates an empty DAG.
121    #[must_use]
122    pub fn new() -> Self {
123        Self {
124            nodes: DashMap::new(),
125            children: DashMap::new(),
126        }
127    }
128
129    /// Creates an empty DAG with pre-allocated shard capacity.
130    ///
131    /// Use when the expected record count is known in advance; avoids
132    /// rehashing at ingestion time.
133    #[must_use]
134    pub fn with_capacity(capacity: usize) -> Self {
135        Self {
136            nodes: DashMap::with_capacity(capacity),
137            children: DashMap::with_capacity(capacity),
138        }
139    }
140
141    /// Appends a [`PacrRecord`] to the DAG.
142    ///
143    /// Validates three structural invariants before inserting:
144    /// 1. No duplicate ID (append-only).
145    /// 2. Every non-GENESIS predecessor already exists in the DAG.
146    /// 3. No self-reference in Π.
147    ///
148    /// On success, returns an [`Arc`] pointing to the stored record.
149    ///
150    /// # Errors
151    ///
152    /// Returns [`DagError`] if any invariant is violated.
153    ///
154    /// # Complexity
155    ///
156    /// O(\|Π\|) — one `DashMap` read per predecessor, then one atomic
157    /// `entry()` insert for the node itself.
158    pub fn append(&self, record: PacrRecord) -> Result<Arc<PacrRecord>, DagError> {
159        let id = record.id;
160
161        // ── Invariant 3: no self-reference ────────────────────────────────────
162        // Checked first: cheapest check, no map access needed.
163        if record.predecessors.contains(&id) {
164            return Err(DagError::SelfReference(id));
165        }
166
167        // ── Invariant 2: all predecessors exist ───────────────────────────────
168        // Safe to validate before locking the node shard: the DAG is append-only
169        // so existing nodes can never disappear.  If a predecessor is missing,
170        // we fail fast without touching the write path.
171        for pred_id in &record.predecessors {
172            if !pred_id.is_genesis() && !self.nodes.contains_key(pred_id) {
173                return Err(DagError::MissingPredecessor {
174                    child: id,
175                    parent: *pred_id,
176                });
177            }
178        }
179
180        // ── Invariant 1: no duplicate + atomic insert ─────────────────────────
181        // `entry()` acquires the shard lock once and either inserts (Vacant) or
182        // detects a duplicate (Occupied) atomically — no TOCTOU race.
183        let record_arc = match self.nodes.entry(id) {
184            Entry::Occupied(_) => return Err(DagError::DuplicateId(id)),
185            Entry::Vacant(slot) => {
186                let arc = Arc::new(record);
187                slot.insert(Arc::clone(&arc));
188                arc
189            }
190        };
191
192        // ── Update reverse index (children) ───────────────────────────────────
193        // O(|Π|) inserts into the children map.  Each predecessor gains `id` as
194        // a child.  GENESIS entries are indexed too — useful for enumerating
195        // first-generation events.
196        for pred_id in &record_arc.predecessors {
197            self.children.entry(*pred_id).or_default().push(id);
198        }
199
200        Ok(record_arc)
201    }
202
203    /// Retrieves a record by its [`CausalId`].
204    ///
205    /// Returns `None` if the ID is not present.
206    ///
207    /// # Complexity
208    ///
209    /// O(1) expected — single `DashMap` sharded read.
210    #[must_use]
211    pub fn get(&self, id: &CausalId) -> Option<Arc<PacrRecord>> {
212        self.nodes.get(id).map(|r| Arc::clone(r.value()))
213    }
214
215    /// Returns `true` if a record with `id` exists in the DAG.
216    ///
217    /// # Complexity
218    ///
219    /// O(1) expected.
220    #[must_use]
221    pub fn contains(&self, id: &CausalId) -> bool {
222        self.nodes.contains_key(id)
223    }
224
225    /// Returns the direct causal predecessors of `id` (the Π set).
226    ///
227    /// Returns `None` if `id` is not present in the DAG.
228    ///
229    /// # Complexity
230    ///
231    /// O(1) expected — reads the stored predecessor set.
232    #[must_use]
233    pub fn predecessors(&self, id: &CausalId) -> Option<SmallVec<[CausalId; 4]>> {
234        self.nodes.get(id).map(|r| r.predecessors.clone())
235    }
236
237    /// Returns the direct causal successors of `id` (events that cite `id` in Π).
238    ///
239    /// Returns an empty `SmallVec` if `id` has no known successors yet.
240    ///
241    /// # Complexity
242    ///
243    /// O(1) expected — reads the reverse index.
244    #[must_use]
245    pub fn successors(&self, id: &CausalId) -> SmallVec<[CausalId; 4]> {
246        self.children
247            .get(id)
248            .map(|entry| entry.value().clone())
249            .unwrap_or_default()
250    }
251
252    /// Returns all transitive causal ancestors of `id` up to `max_depth` hops.
253    ///
254    /// Uses BFS over the Π edges.  GENESIS predecessors are not included in the
255    /// result (they are the definitional "no ancestor" sentinel).
256    ///
257    /// # Complexity
258    ///
259    /// O(V + E) where V is the number of ancestors visited and E is the total
260    /// predecessor edges traversed.  `max_depth` bounds the frontier.
261    #[must_use]
262    pub fn ancestry(&self, id: &CausalId, max_depth: usize) -> Vec<CausalId> {
263        let mut visited: HashSet<CausalId> = HashSet::new();
264        let mut result: Vec<CausalId> = Vec::new();
265        let mut queue: std::collections::VecDeque<(CausalId, usize)> =
266            std::collections::VecDeque::new();
267
268        if let Some(record) = self.get(id) {
269            for pred in &record.predecessors {
270                if !pred.is_genesis() {
271                    queue.push_back((*pred, 1));
272                }
273            }
274        }
275
276        while let Some((current, depth)) = queue.pop_front() {
277            if depth > max_depth || visited.contains(&current) {
278                continue;
279            }
280            visited.insert(current);
281            result.push(current);
282
283            if let Some(record) = self.get(&current) {
284                for pred in &record.predecessors {
285                    if !pred.is_genesis() && !visited.contains(pred) {
286                        queue.push_back((*pred, depth + 1));
287                    }
288                }
289            }
290        }
291
292        result
293    }
294
295    /// Number of records in the DAG.
296    ///
297    /// # Complexity
298    ///
299    /// O(1) — `DashMap` tracks length per shard.
300    #[must_use]
301    pub fn len(&self) -> usize {
302        self.nodes.len()
303    }
304
305    /// Returns `true` if the DAG contains no records.
306    #[must_use]
307    pub fn is_empty(&self) -> bool {
308        self.nodes.is_empty()
309    }
310}
311
312impl Default for CausalDag {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318// ── Unit tests ────────────────────────────────────────────────────────────────
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use bytes::Bytes;
324    use pacr_types::{CognitiveSplit, Estimate, PacrBuilder, ResourceTriple};
325
326    /// Build a minimal valid [`PacrRecord`] for testing.
327    pub(super) fn make_record(id: u128, preds: &[u128]) -> PacrRecord {
328        let predecessors = preds.iter().copied().map(CausalId).collect();
329        PacrBuilder::new()
330            .id(CausalId(id))
331            .predecessors(predecessors)
332            .landauer_cost(Estimate::exact(1e-20))
333            .resources(ResourceTriple {
334                energy: Estimate::exact(1e-19),
335                time: Estimate::exact(1e-9),
336                space: Estimate::exact(128.0),
337            })
338            .cognitive_split(CognitiveSplit {
339                statistical_complexity: Estimate::exact(1.0),
340                entropy_rate: Estimate::exact(0.5),
341            })
342            .payload(Bytes::new())
343            .build()
344            .expect("test record is always valid")
345    }
346
347    // ── append ────────────────────────────────────────────────────────────────
348
349    #[test]
350    fn append_genesis_record_succeeds() {
351        let dag = CausalDag::new();
352        let r = dag.append(make_record(1, &[0])); // 0 = GENESIS
353        assert!(r.is_ok());
354        assert_eq!(dag.len(), 1);
355    }
356
357    #[test]
358    fn append_duplicate_id_returns_error() {
359        let dag = CausalDag::new();
360        dag.append(make_record(1, &[0])).unwrap();
361        let err = dag.append(make_record(1, &[0])).unwrap_err();
362        assert!(matches!(err, DagError::DuplicateId(_)));
363    }
364
365    #[test]
366    fn append_missing_predecessor_returns_error() {
367        let dag = CausalDag::new();
368        // id=2 cites id=99 which does not exist
369        let err = dag.append(make_record(2, &[99])).unwrap_err();
370        assert!(matches!(err, DagError::MissingPredecessor { .. }));
371    }
372
373    #[test]
374    fn append_self_reference_returns_error() {
375        let dag = CausalDag::new();
376        let err = dag.append(make_record(5, &[5])).unwrap_err();
377        assert!(matches!(err, DagError::SelfReference(_)));
378    }
379
380    #[test]
381    fn append_chain_of_three_succeeds() {
382        let dag = CausalDag::new();
383        dag.append(make_record(1, &[0])).unwrap();
384        dag.append(make_record(2, &[1])).unwrap();
385        dag.append(make_record(3, &[2])).unwrap();
386        assert_eq!(dag.len(), 3);
387    }
388
389    // ── get / contains ─────────────────────────────────��──────────────────────
390
391    #[test]
392    fn get_returns_correct_record() {
393        let dag = CausalDag::new();
394        dag.append(make_record(7, &[0])).unwrap();
395        let got = dag.get(&CausalId(7)).expect("should be present");
396        assert_eq!(got.id, CausalId(7));
397    }
398
399    #[test]
400    fn get_absent_id_returns_none() {
401        let dag = CausalDag::new();
402        assert!(dag.get(&CausalId(42)).is_none());
403    }
404
405    #[test]
406    fn contains_returns_true_after_append() {
407        let dag = CausalDag::new();
408        dag.append(make_record(10, &[0])).unwrap();
409        assert!(dag.contains(&CausalId(10)));
410        assert!(!dag.contains(&CausalId(11)));
411    }
412
413    // ── predecessors / successors ─────────────────────────────────────────────
414
415    #[test]
416    fn predecessors_returns_pi_set() {
417        let dag = CausalDag::new();
418        dag.append(make_record(1, &[0])).unwrap();
419        dag.append(make_record(2, &[0])).unwrap();
420        dag.append(make_record(3, &[1, 2])).unwrap();
421        let preds = dag.predecessors(&CausalId(3)).unwrap();
422        assert!(preds.contains(&CausalId(1)));
423        assert!(preds.contains(&CausalId(2)));
424        assert_eq!(preds.len(), 2);
425    }
426
427    #[test]
428    fn successors_empty_for_leaf_node() {
429        let dag = CausalDag::new();
430        dag.append(make_record(1, &[0])).unwrap();
431        // id=1 has no successors yet
432        assert!(dag.successors(&CausalId(1)).is_empty());
433    }
434
435    #[test]
436    fn successors_populated_after_child_appended() {
437        let dag = CausalDag::new();
438        dag.append(make_record(1, &[0])).unwrap();
439        dag.append(make_record(2, &[1])).unwrap();
440        let succ = dag.successors(&CausalId(1));
441        assert_eq!(succ.len(), 1);
442        assert_eq!(succ[0], CausalId(2));
443    }
444
445    #[test]
446    fn successors_diamond_shape() {
447        // 0 → 1 → 3
448        //   ↘ 2 ↗
449        let dag = CausalDag::new();
450        dag.append(make_record(1, &[0])).unwrap();
451        dag.append(make_record(2, &[0])).unwrap();
452        dag.append(make_record(3, &[1, 2])).unwrap();
453        let succ = dag.successors(&CausalId(1));
454        assert_eq!(succ.len(), 1);
455        assert_eq!(succ[0], CausalId(3));
456    }
457
458    // ── ancestry ──────────────────────────────────────────────────────────────
459
460    #[test]
461    fn ancestry_linear_chain() {
462        // 1 → 2 → 3 (genesis at 0)
463        let dag = CausalDag::new();
464        dag.append(make_record(1, &[0])).unwrap();
465        dag.append(make_record(2, &[1])).unwrap();
466        dag.append(make_record(3, &[2])).unwrap();
467        let ancestors = dag.ancestry(&CausalId(3), 10);
468        // Must contain 1 and 2; not 0 (GENESIS filtered) nor 3 (self)
469        assert!(ancestors.contains(&CausalId(1)));
470        assert!(ancestors.contains(&CausalId(2)));
471        assert!(!ancestors.contains(&CausalId(0)));
472        assert!(!ancestors.contains(&CausalId(3)));
473    }
474
475    #[test]
476    fn ancestry_respects_max_depth() {
477        // chain: 1 → 2 → 3 → 4
478        let dag = CausalDag::new();
479        dag.append(make_record(1, &[0])).unwrap();
480        dag.append(make_record(2, &[1])).unwrap();
481        dag.append(make_record(3, &[2])).unwrap();
482        dag.append(make_record(4, &[3])).unwrap();
483        // From 4 with depth=1: should only reach 3
484        let ancestors = dag.ancestry(&CausalId(4), 1);
485        assert_eq!(ancestors.len(), 1);
486        assert!(ancestors.contains(&CausalId(3)));
487    }
488
489    #[test]
490    fn ancestry_empty_for_genesis_child() {
491        let dag = CausalDag::new();
492        dag.append(make_record(1, &[0])).unwrap();
493        // id=1's only predecessor is GENESIS (id=0), which is filtered
494        let ancestors = dag.ancestry(&CausalId(1), 10);
495        assert!(ancestors.is_empty());
496    }
497
498    // ── Structural DAG invariants ─────────────────────────────────────────────
499
500    #[test]
501    fn append_only_get_never_returns_different_record() {
502        // Once a record is inserted, a second get must return the same id.
503        let dag = CausalDag::new();
504        dag.append(make_record(42, &[0])).unwrap();
505        let first = dag.get(&CausalId(42)).unwrap();
506        let second = dag.get(&CausalId(42)).unwrap();
507        // Arc::ptr_eq is sufficient: same backing allocation = same record.
508        assert!(Arc::ptr_eq(&first, &second));
509    }
510
511    #[test]
512    fn is_empty_and_len_track_correctly() {
513        let dag = CausalDag::new();
514        assert!(dag.is_empty());
515        assert_eq!(dag.len(), 0);
516        dag.append(make_record(1, &[0])).unwrap();
517        assert!(!dag.is_empty());
518        assert_eq!(dag.len(), 1);
519        dag.append(make_record(2, &[1])).unwrap();
520        assert_eq!(dag.len(), 2);
521    }
522}
523
524// ── Property-based tests ──────────────────────────────────────────────────────
525
526#[cfg(test)]
527mod prop_tests {
528    use super::tests::make_record;
529    use super::*;
530    use proptest::prelude::*;
531
532    proptest! {
533        /// Append-only: every successfully appended record is immediately
534        /// retrievable, and its id matches.
535        #[test]
536        fn appended_record_is_retrievable(id in 1_u128..10_000_u128) {
537            let dag = CausalDag::new();
538            dag.append(make_record(id, &[0])).unwrap();
539            let got = dag.get(&CausalId(id));
540            prop_assert!(got.is_some());
541            prop_assert_eq!(got.unwrap().id, CausalId(id));
542        }
543
544        /// Duplicate IDs are always rejected regardless of content.
545        #[test]
546        fn duplicate_id_always_rejected(id in 1_u128..10_000_u128) {
547            let dag = CausalDag::new();
548            dag.append(make_record(id, &[0])).unwrap();
549            let err = dag.append(make_record(id, &[0]));
550            prop_assert!(err.is_err());
551            prop_assert!(matches!(err.unwrap_err(), DagError::DuplicateId(_)));
552        }
553
554        /// A chain of N records is always consistent: each record has exactly
555        /// the predecessor it was built with, and len() == N.
556        #[test]
557        fn linear_chain_len_correct(n in 1_usize..50_usize) {
558            let dag = CausalDag::new();
559            // Record 1 is a genesis child; records 2..n each follow the prior.
560            dag.append(make_record(1, &[0])).unwrap();
561            for i in 2..=(n as u128) {
562                dag.append(make_record(i, &[i - 1])).unwrap();
563            }
564            prop_assert_eq!(dag.len(), n);
565        }
566
567        /// Self-reference is always rejected.
568        #[test]
569        fn self_reference_always_rejected(id in 1_u128..10_000_u128) {
570            let dag = CausalDag::new();
571            let err = dag.append(make_record(id, &[id]));
572            prop_assert!(matches!(err, Err(DagError::SelfReference(_))));
573        }
574
575        /// Missing-predecessor is always rejected (citing an id that was never
576        /// inserted).
577        #[test]
578        fn missing_predecessor_always_rejected(
579            id     in 1_u128..5_000_u128,
580            parent in 5_001_u128..10_000_u128,   // guaranteed absent
581        ) {
582            let dag = CausalDag::new();
583            let result = dag.append(make_record(id, &[parent]));
584            prop_assert!(result.is_err());
585            let is_missing = matches!(result.unwrap_err(),
586                DagError::MissingPredecessor { .. });
587            prop_assert!(is_missing);
588        }
589
590        /// O(1) lookup contract: get() on an existing record returns Some in
591        /// constant time (checked indirectly via 1 000 sequential lookups on
592        /// a DAG of N records — all must succeed).
593        #[test]
594        fn all_appended_ids_are_retrievable(n in 1_usize..100_usize) {
595            let dag = CausalDag::new();
596            dag.append(make_record(1, &[0])).unwrap();
597            for i in 2..=(n as u128) {
598                dag.append(make_record(i, &[i - 1])).unwrap();
599            }
600            for i in 1..=(n as u128) {
601                prop_assert!(dag.get(&CausalId(i)).is_some(),
602                    "id={i} should be in DAG");
603            }
604        }
605
606        /// Successor reverse-index is consistent: if B cites A in Π, then
607        /// A.successors() must contain B.
608        #[test]
609        fn successors_consistent_with_predecessors(
610            a in 1_u128..1_000_u128,
611            b in 1_001_u128..2_000_u128,
612        ) {
613            let dag = CausalDag::new();
614            dag.append(make_record(a, &[0])).unwrap();
615            dag.append(make_record(b, &[a])).unwrap();
616            let succ = dag.successors(&CausalId(a));
617            prop_assert!(succ.contains(&CausalId(b)),
618                "successors({a}) should contain {b}");
619        }
620    }
621}