Skip to main content

nodedb_array/sync/
apply.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Pure op-application dispatcher for array CRDT sync.
4//!
5//! [`apply_op`] is the single entry point for merging an incoming [`ArrayOp`]
6//! into local engine state. It enforces shape validation, schema-version
7//! gating, idempotency, and tile-cache invalidation in a fixed order.
8//!
9//! Engine implementations live in `nodedb-lite` and `nodedb`; this file
10//! ships only the abstract [`ApplyEngine`] trait, the outcome types, and the
11//! pure dispatcher function.
12
13#[cfg(test)]
14use std::collections::{HashMap, HashSet};
15#[cfg(test)]
16use std::sync::Mutex;
17
18use crate::error::{ArrayError, ArrayResult};
19use crate::sync::hlc::Hlc;
20use crate::sync::op::{ArrayOp, ArrayOpKind};
21use crate::types::coord::value::CoordValue;
22
23// ─── Outcome types ───────────────────────────────────────────────────────────
24
25/// Outcome returned by [`apply_op`].
26#[derive(Clone, Debug, PartialEq)]
27pub enum ApplyOutcome {
28    /// The op was already present; no state was changed.
29    Idempotent,
30    /// The op was applied successfully.
31    Applied,
32    /// The op was rejected; the reason is attached.
33    Rejected(ApplyRejection),
34}
35
36/// Reason an op was rejected by [`apply_op`].
37#[derive(Clone, Debug, PartialEq)]
38pub enum ApplyRejection {
39    /// The op's `schema_hlc` is strictly newer than the local schema HLC.
40    ///
41    /// The receiver should request the updated schema and retry.
42    SchemaTooNew {
43        /// Local schema HLC at the time of rejection.
44        local: Hlc,
45        /// Schema HLC carried by the op.
46        op: Hlc,
47    },
48    /// The named array is not known to this replica.
49    ///
50    /// The receiver should request the array schema before retrying.
51    ArrayUnknown {
52        /// Name of the unknown array.
53        name: String,
54    },
55    /// The op violates the shape contract (e.g. `Put` without attrs).
56    ShapeInvalid {
57        /// Human-readable description of the violation.
58        detail: String,
59    },
60    /// The engine rejected the op (e.g. a transient storage error).
61    ///
62    /// The rejection is wrapped here so callers can record it without
63    /// propagating a hard error. Only [`ArrayError::SegmentCorruption`] and
64    /// [`ArrayError::HlcLockPoisoned`] are considered corruption-grade and
65    /// propagated directly; all other engine errors become this variant.
66    EngineRejected {
67        /// Human-readable description of the engine error.
68        detail: String,
69    },
70}
71
72// ─── ApplyEngine trait ───────────────────────────────────────────────────────
73
74/// Abstract interface to local engine state consumed by [`apply_op`].
75///
76/// Implementations live in `nodedb-lite` and `nodedb` crates. The trait is
77/// intentionally *not* `Send + Sync`-bounded — implementations choose their
78/// own threading model.
79pub trait ApplyEngine {
80    /// Return the current schema HLC for `array`, or `None` if the array is
81    /// not known to this replica.
82    fn schema_hlc(&self, array: &str) -> ArrayResult<Option<Hlc>>;
83
84    /// Return `true` if `hlc` has already been applied to `array`.
85    ///
86    /// Used for idempotent re-delivery detection.
87    fn already_seen(&self, array: &str, hlc: Hlc) -> ArrayResult<bool>;
88
89    /// Apply a `Put` op to the engine.
90    fn apply_put(&mut self, op: &ArrayOp) -> ArrayResult<()>;
91
92    /// Apply a `Delete` op to the engine.
93    fn apply_delete(&mut self, op: &ArrayOp) -> ArrayResult<()>;
94
95    /// Apply an `Erase` op to the engine.
96    fn apply_erase(&mut self, op: &ArrayOp) -> ArrayResult<()>;
97
98    /// Invalidate any tile-cache entry covering `coord` in `array`.
99    ///
100    /// Called after every successful op application so that subsequent reads
101    /// see the updated state.
102    fn invalidate_tile(&mut self, array: &str, coord: &[CoordValue]) -> ArrayResult<()>;
103}
104
105// ─── Dispatcher ──────────────────────────────────────────────────────────────
106
107/// Apply `op` to `engine`, returning an outcome without panicking.
108///
109/// Steps:
110/// 1. Shape validation — on error: `Rejected(ShapeInvalid)`.
111/// 2. Schema HLC check — `None` → `Rejected(ArrayUnknown)`;
112///    `op.header.schema_hlc > local` → `Rejected(SchemaTooNew)`.
113/// 3. Idempotency — already seen → `Idempotent`.
114/// 4. Dispatch to `apply_put`/`apply_delete`/`apply_erase`. Engine errors
115///    that indicate corruption (`SegmentCorruption`, `HlcLockPoisoned`) are
116///    propagated; all others become `Rejected(EngineRejected)`.
117/// 5. Tile-cache invalidation.
118/// 6. Return `Applied`.
119pub fn apply_op<E: ApplyEngine>(engine: &mut E, op: &ArrayOp) -> ArrayResult<ApplyOutcome> {
120    // 1. Shape validation.
121    if let Err(e) = op.validate_shape() {
122        return Ok(ApplyOutcome::Rejected(ApplyRejection::ShapeInvalid {
123            detail: e.to_string(),
124        }));
125    }
126
127    // 2. Schema HLC gating.
128    match engine.schema_hlc(&op.header.array)? {
129        None => {
130            return Ok(ApplyOutcome::Rejected(ApplyRejection::ArrayUnknown {
131                name: op.header.array.clone(),
132            }));
133        }
134        Some(local_schema) if op.header.schema_hlc > local_schema => {
135            return Ok(ApplyOutcome::Rejected(ApplyRejection::SchemaTooNew {
136                local: local_schema,
137                op: op.header.schema_hlc,
138            }));
139        }
140        Some(_) => {}
141    }
142
143    // 3. Idempotency.
144    if engine.already_seen(&op.header.array, op.header.hlc)? {
145        return Ok(ApplyOutcome::Idempotent);
146    }
147
148    // 4. Dispatch.
149    let dispatch_result = match op.kind {
150        ArrayOpKind::Put => engine.apply_put(op),
151        ArrayOpKind::Delete => engine.apply_delete(op),
152        ArrayOpKind::Erase => engine.apply_erase(op),
153    };
154
155    if let Err(e) = dispatch_result {
156        // Corruption-grade errors propagate; everything else becomes a rejection.
157        match &e {
158            ArrayError::SegmentCorruption { .. } | ArrayError::HlcLockPoisoned => return Err(e),
159            _ => {
160                return Ok(ApplyOutcome::Rejected(ApplyRejection::EngineRejected {
161                    detail: e.to_string(),
162                }));
163            }
164        }
165    }
166
167    // 5. Tile invalidation.
168    engine.invalidate_tile(&op.header.array, &op.coord)?;
169
170    // 6. Success.
171    Ok(ApplyOutcome::Applied)
172}
173
174// ─── MockEngine (test / test-utils only) ────────────────────────────────────
175
176/// Minimal in-memory [`ApplyEngine`] for unit tests.
177///
178/// Tracks:
179/// - known arrays with their schema HLCs
180/// - seen `(array, hlc_bytes)` pairs for idempotency
181/// - successfully applied ops
182/// - an optional injected engine error consumed on the next apply call
183/// - a log of invalidated tile coords
184#[cfg(test)]
185pub struct MockEngine {
186    schemas: HashMap<String, Hlc>,
187    seen: HashSet<(String, [u8; 18])>,
188    pub applied: Vec<ArrayOp>,
189    pub invalidated: Vec<(String, Vec<CoordValue>)>,
190    reject_next_with: Mutex<Option<ArrayError>>,
191}
192
193#[cfg(test)]
194impl MockEngine {
195    /// Create an empty engine with no registered arrays.
196    pub fn new() -> Self {
197        Self {
198            schemas: HashMap::new(),
199            seen: HashSet::new(),
200            applied: Vec::new(),
201            invalidated: Vec::new(),
202            reject_next_with: Mutex::new(None),
203        }
204    }
205
206    /// Register an array so that ops targeting it can be applied.
207    pub fn register_array(&mut self, array: &str, schema_hlc: Hlc) {
208        self.schemas.insert(array.to_string(), schema_hlc);
209    }
210
211    /// Inject an error that will be returned by the next `apply_*` call.
212    pub fn set_reject_next(&self, err: ArrayError) {
213        *self
214            .reject_next_with
215            .lock()
216            .expect("invariant: MockEngine mutex is not poisoned") = Some(err);
217    }
218
219    fn take_inject(&self) -> Option<ArrayError> {
220        self.reject_next_with
221            .lock()
222            .expect("invariant: MockEngine mutex is not poisoned")
223            .take()
224    }
225}
226
227#[cfg(test)]
228impl Default for MockEngine {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234#[cfg(test)]
235impl ApplyEngine for MockEngine {
236    fn schema_hlc(&self, array: &str) -> ArrayResult<Option<Hlc>> {
237        Ok(self.schemas.get(array).copied())
238    }
239
240    fn already_seen(&self, array: &str, hlc: Hlc) -> ArrayResult<bool> {
241        Ok(self.seen.contains(&(array.to_string(), hlc.to_bytes())))
242    }
243
244    fn apply_put(&mut self, op: &ArrayOp) -> ArrayResult<()> {
245        if let Some(err) = self.take_inject() {
246            return Err(err);
247        }
248        self.seen
249            .insert((op.header.array.clone(), op.header.hlc.to_bytes()));
250        self.applied.push(op.clone());
251        Ok(())
252    }
253
254    fn apply_delete(&mut self, op: &ArrayOp) -> ArrayResult<()> {
255        if let Some(err) = self.take_inject() {
256            return Err(err);
257        }
258        self.seen
259            .insert((op.header.array.clone(), op.header.hlc.to_bytes()));
260        self.applied.push(op.clone());
261        Ok(())
262    }
263
264    fn apply_erase(&mut self, op: &ArrayOp) -> ArrayResult<()> {
265        if let Some(err) = self.take_inject() {
266            return Err(err);
267        }
268        self.seen
269            .insert((op.header.array.clone(), op.header.hlc.to_bytes()));
270        self.applied.push(op.clone());
271        Ok(())
272    }
273
274    fn invalidate_tile(&mut self, array: &str, coord: &[CoordValue]) -> ArrayResult<()> {
275        self.invalidated.push((array.to_string(), coord.to_vec()));
276        Ok(())
277    }
278}
279
280// ─── Tests ───────────────────────────────────────────────────────────────────
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::sync::hlc::Hlc;
286    use crate::sync::op::{ArrayOpHeader, ArrayOpKind};
287    use crate::sync::replica_id::ReplicaId;
288    use crate::types::cell_value::value::CellValue;
289    use crate::types::coord::value::CoordValue;
290
291    fn replica() -> ReplicaId {
292        ReplicaId::new(42)
293    }
294
295    fn hlc(ms: u64) -> Hlc {
296        Hlc::new(ms, 0, replica()).unwrap()
297    }
298
299    fn header(array: &str, op_ms: u64, schema_ms: u64) -> ArrayOpHeader {
300        ArrayOpHeader {
301            array: array.into(),
302            hlc: hlc(op_ms),
303            schema_hlc: hlc(schema_ms),
304            valid_from_ms: 0,
305            valid_until_ms: -1,
306            system_from_ms: op_ms as i64,
307        }
308    }
309
310    fn put_op(array: &str, op_ms: u64, schema_ms: u64) -> ArrayOp {
311        ArrayOp {
312            header: header(array, op_ms, schema_ms),
313            kind: ArrayOpKind::Put,
314            coord: vec![CoordValue::Int64(1)],
315            attrs: Some(vec![CellValue::Null]),
316        }
317    }
318
319    fn delete_op(array: &str, op_ms: u64, schema_ms: u64) -> ArrayOp {
320        ArrayOp {
321            header: header(array, op_ms, schema_ms),
322            kind: ArrayOpKind::Delete,
323            coord: vec![CoordValue::Int64(1)],
324            attrs: None,
325        }
326    }
327
328    #[test]
329    fn apply_put_succeeds() {
330        let mut engine = MockEngine::new();
331        engine.register_array("a", hlc(100));
332
333        let op = put_op("a", 50, 100);
334        let outcome = apply_op(&mut engine, &op).unwrap();
335        assert_eq!(outcome, ApplyOutcome::Applied);
336        assert_eq!(engine.applied.len(), 1);
337        assert_eq!(engine.invalidated.len(), 1);
338    }
339
340    #[test]
341    fn apply_idempotent_on_replay() {
342        let mut engine = MockEngine::new();
343        engine.register_array("a", hlc(100));
344
345        let op = put_op("a", 50, 100);
346        apply_op(&mut engine, &op).unwrap();
347        let outcome = apply_op(&mut engine, &op).unwrap();
348        assert_eq!(outcome, ApplyOutcome::Idempotent);
349        // Still only one applied op.
350        assert_eq!(engine.applied.len(), 1);
351    }
352
353    #[test]
354    fn apply_rejects_unknown_array() {
355        let mut engine = MockEngine::new();
356        let op = put_op("unknown", 50, 100);
357        let outcome = apply_op(&mut engine, &op).unwrap();
358        assert!(matches!(
359            outcome,
360            ApplyOutcome::Rejected(ApplyRejection::ArrayUnknown { name }) if name == "unknown"
361        ));
362    }
363
364    #[test]
365    fn apply_rejects_schema_too_new() {
366        let mut engine = MockEngine::new();
367        engine.register_array("a", hlc(50)); // local schema at ms=50
368        let op = put_op("a", 10, 100); // op schema_hlc at ms=100 > 50
369        let outcome = apply_op(&mut engine, &op).unwrap();
370        assert!(matches!(
371            outcome,
372            ApplyOutcome::Rejected(ApplyRejection::SchemaTooNew { local, op: op_hlc })
373            if local == hlc(50) && op_hlc == hlc(100)
374        ));
375    }
376
377    #[test]
378    fn apply_rejects_invalid_shape() {
379        let mut engine = MockEngine::new();
380        engine.register_array("a", hlc(100));
381        // Put op with no attrs is invalid.
382        let op = ArrayOp {
383            header: header("a", 50, 100),
384            kind: ArrayOpKind::Put,
385            coord: vec![CoordValue::Int64(1)],
386            attrs: None,
387        };
388        let outcome = apply_op(&mut engine, &op).unwrap();
389        assert!(matches!(
390            outcome,
391            ApplyOutcome::Rejected(ApplyRejection::ShapeInvalid { .. })
392        ));
393    }
394
395    #[test]
396    fn apply_wraps_engine_error_as_rejection() {
397        let mut engine = MockEngine::new();
398        engine.register_array("a", hlc(100));
399        engine.set_reject_next(ArrayError::InvalidOp {
400            detail: "simulated engine error".into(),
401        });
402        let op = put_op("a", 50, 100);
403        let outcome = apply_op(&mut engine, &op).unwrap();
404        assert!(matches!(
405            outcome,
406            ApplyOutcome::Rejected(ApplyRejection::EngineRejected { .. })
407        ));
408        // No tile invalidation on rejection.
409        assert!(engine.invalidated.is_empty());
410    }
411
412    #[test]
413    fn apply_invalidates_tile_after_success() {
414        let mut engine = MockEngine::new();
415        engine.register_array("b", hlc(100));
416        let op = delete_op("b", 30, 50);
417        apply_op(&mut engine, &op).unwrap();
418        assert_eq!(engine.invalidated.len(), 1);
419        assert_eq!(engine.invalidated[0].0, "b");
420    }
421}