Skip to main content

icydb_core/db/commit/
mod.rs

1//! IcyDB commit protocol and atomicity guardrails.
2//!
3//! Contract:
4//! - `begin_commit` persists a marker that fully describes durable mutations.
5//! - Durable correctness is owned by marker replay in recovery (row ops).
6//! - In-process apply guards are best-effort cleanup only and are not authoritative.
7//!
8//! ## Commit Boundary and Authority of CommitMarker
9//!
10//! The `CommitMarker` fully specifies every row mutation. After
11//! the marker is persisted, executors must not re-derive semantics or branch
12//! on entity/index contents; apply logic deterministically replays row ops.
13//! Recovery replays row ops as recorded, not planner logic.
14
15mod decode;
16mod guard;
17mod memory;
18mod recovery;
19mod store;
20#[cfg(test)]
21mod tests;
22
23use crate::{
24    db::{
25        Db,
26        commit::{
27            decode::{decode_data_key, decode_index_entry, decode_index_key},
28            store::{CommitStore, with_commit_store, with_commit_store_infallible},
29        },
30        index::{IndexKey, RawIndexEntry, RawIndexKey, plan::plan_index_mutation_for_entity},
31        relation::prepare_reverse_relation_index_mutations_for_source,
32        store::{DataKey, DataStore, RawDataKey, RawRow},
33    },
34    error::{ErrorClass, ErrorOrigin, InternalError},
35    traits::{EntityKind, EntityValue, Path},
36    types::Ulid,
37};
38#[cfg(test)]
39use canic_memory::{
40    registry::{MemoryRegistry, MemoryRegistryError},
41    runtime::registry::MemoryRegistryRuntime,
42};
43use serde::{Deserialize, Serialize};
44#[cfg(test)]
45use std::collections::BTreeSet;
46use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
47
48pub use guard::CommitApplyGuard;
49pub use recovery::{ensure_recovered, ensure_recovered_for_write};
50
51#[cfg(test)]
52/// Return true if a commit marker is currently persisted.
53pub fn commit_marker_present() -> Result<bool, InternalError> {
54    store::commit_marker_present()
55}
56
57#[cfg(test)]
58/// Initialize commit marker storage for tests.
59///
60/// This registers a placeholder data-store entry if none exists so the commit
61/// memory allocator can select the correct reserved range.
62pub fn init_commit_store_for_tests() -> Result<(), InternalError> {
63    // Phase 1: ensure the memory registry has at least one reserved range.
64    let init_result = MemoryRegistryRuntime::init(Some(("icydb_test", 1, 200)));
65    match init_result {
66        Ok(_) => {}
67        Err(MemoryRegistryError::Overlap { .. }) => {
68            MemoryRegistryRuntime::init(None).map_err(|err| {
69                InternalError::new(
70                    ErrorClass::Internal,
71                    ErrorOrigin::Store,
72                    format!("memory registry init failed: {err}"),
73                )
74            })?;
75        }
76        Err(err) => {
77            return Err(InternalError::new(
78                ErrorClass::Internal,
79                ErrorOrigin::Store,
80                format!("memory registry init failed: {err}"),
81            ));
82        }
83    }
84
85    // Phase 2: ensure a DB-store entry exists so commit memory can be allocated.
86    let snapshots = MemoryRegistryRuntime::snapshot_ids_by_range();
87    if snapshots.is_empty() {
88        return Err(InternalError::new(
89            ErrorClass::Internal,
90            ErrorOrigin::Store,
91            "no memory ranges available for commit marker tests",
92        ));
93    }
94    let has_store_entry = snapshots.iter().any(|snapshot| {
95        snapshot.entries.iter().any(|(_, entry)| {
96            entry.label.ends_with("DataStore") || entry.label.ends_with("IndexStore")
97        })
98    });
99
100    if !has_store_entry {
101        let snapshot = snapshots.first().ok_or_else(|| {
102            InternalError::new(
103                ErrorClass::Internal,
104                ErrorOrigin::Store,
105                "no memory ranges available for commit marker tests",
106            )
107        })?;
108        let used_ids = snapshot
109            .entries
110            .iter()
111            .map(|(id, _)| *id)
112            .collect::<BTreeSet<_>>();
113        let dummy_id = (snapshot.range.start..=snapshot.range.end)
114            .find(|id| !used_ids.contains(id))
115            .ok_or_else(|| {
116                InternalError::new(
117                    ErrorClass::Unsupported,
118                    ErrorOrigin::Store,
119                    format!(
120                        "no free memory ids available for commit marker tests in range {}-{}",
121                        snapshot.range.start, snapshot.range.end
122                    ),
123                )
124            })?;
125
126        MemoryRegistry::register(dummy_id, &snapshot.owner, "commit_test::DataStore").map_err(
127            |err| {
128                InternalError::new(
129                    ErrorClass::Internal,
130                    ErrorOrigin::Store,
131                    format!("commit test memory registration failed: {err}"),
132                )
133            },
134        )?;
135    }
136
137    // Phase 3: initialize the commit store in the production slot.
138
139    with_commit_store(|_| Ok(()))
140}
141
142// Stage-2 invariant:
143// - We persist a commit marker before any stable mutation.
144// - After marker creation, executor apply phases are infallible or trap.
145// - Recovery replays the stored row mutation plan.
146// This makes partial mutations deterministic without a WAL.
147
148const COMMIT_LABEL: &str = "CommitMarker";
149const COMMIT_ID_BYTES: usize = 16;
150
151// Conservative upper bound to avoid rejecting valid commits when index entries
152// are large; still small enough to fit typical canister constraints.
153pub const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
154
155///
156/// CommitRowOp
157///
158/// Row-level mutation recorded in a commit marker.
159/// Store identity is derived from `entity_path` at apply/recovery time.
160///
161
162#[derive(Clone, Debug, Deserialize, Serialize)]
163#[serde(deny_unknown_fields)]
164pub struct CommitRowOp {
165    pub entity_path: String,
166    pub key: Vec<u8>,
167    pub before: Option<Vec<u8>>,
168    pub after: Option<Vec<u8>>,
169}
170
171impl CommitRowOp {
172    /// Construct a row-level commit operation.
173    #[must_use]
174    pub fn new(
175        entity_path: impl Into<String>,
176        key: Vec<u8>,
177        before: Option<Vec<u8>>,
178        after: Option<Vec<u8>>,
179    ) -> Self {
180        Self {
181            entity_path: entity_path.into(),
182            key,
183            before,
184            after,
185        }
186    }
187}
188
189///
190/// CommitIndexOp
191///
192/// Internal index mutation used during row-op preparation/apply.
193/// Not persisted in commit markers.
194
195#[derive(Clone, Debug, Deserialize, Serialize)]
196#[serde(deny_unknown_fields)]
197pub struct CommitIndexOp {
198    pub store: String,
199    pub key: Vec<u8>,
200    pub value: Option<Vec<u8>>,
201}
202
203///
204/// CommitMarker
205///
206/// Persisted mutation plan covering row-level operations.
207/// Recovery replays the marker exactly as stored.
208/// Unknown fields are rejected as corruption; commit markers are not forward-compatible.
209/// This is internal commit-protocol metadata, not a user-schema type.
210
211#[derive(Clone, Debug, Deserialize, Serialize)]
212#[serde(deny_unknown_fields)]
213pub struct CommitMarker {
214    pub id: [u8; COMMIT_ID_BYTES],
215    pub row_ops: Vec<CommitRowOp>,
216}
217
218impl CommitMarker {
219    /// Construct a new commit marker with a fresh commit id.
220    pub fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
221        let id = Ulid::try_generate()
222            .map_err(|err| {
223                InternalError::new(
224                    ErrorClass::Internal,
225                    ErrorOrigin::Store,
226                    format!("commit id generation failed: {err}"),
227                )
228            })?
229            .to_bytes();
230
231        Ok(Self { id, row_ops })
232    }
233}
234
235/// Validate commit-marker row-op shape invariants.
236///
237/// Every row op must represent a concrete mutation:
238/// - insert (`before=None`, `after=Some`)
239/// - update (`before=Some`, `after=Some`)
240/// - delete (`before=Some`, `after=None`)
241///
242/// The empty shape (`before=None`, `after=None`) is corruption.
243pub fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
244    // Phase 1: reject row ops that cannot encode any mutation semantics.
245    for row_op in &marker.row_ops {
246        if row_op.before.is_none() && row_op.after.is_none() {
247            return Err(InternalError::new(
248                ErrorClass::Corruption,
249                ErrorOrigin::Store,
250                "commit marker corrupted: row op has neither before nor after payload",
251            ));
252        }
253    }
254
255    Ok(())
256}
257
258///
259/// PreparedIndexMutation
260///
261/// Mechanical index mutation derived from a row op.
262///
263
264#[derive(Clone)]
265pub struct PreparedIndexMutation {
266    pub store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
267    pub key: RawIndexKey,
268    pub value: Option<RawIndexEntry>,
269}
270
271///
272/// PreparedRowCommitOp
273///
274/// Mechanical store mutation derived from one row op.
275///
276
277#[derive(Clone)]
278pub struct PreparedRowCommitOp {
279    pub index_ops: Vec<PreparedIndexMutation>,
280    pub data_store: &'static LocalKey<RefCell<DataStore>>,
281    pub data_key: RawDataKey,
282    pub data_value: Option<RawRow>,
283    pub index_remove_count: usize,
284    pub index_insert_count: usize,
285    pub reverse_index_remove_count: usize,
286    pub reverse_index_insert_count: usize,
287}
288
289impl PreparedRowCommitOp {
290    /// Apply the prepared row operation infallibly.
291    pub fn apply(self) {
292        for index_op in self.index_ops {
293            index_op.store.with_borrow_mut(|store| {
294                if let Some(value) = index_op.value {
295                    store.insert(index_op.key, value);
296                } else {
297                    store.remove(&index_op.key);
298                }
299            });
300        }
301
302        self.data_store.with_borrow_mut(|store| {
303            if let Some(value) = self.data_value {
304                store.insert(self.data_key, value);
305            } else {
306                store.remove(&self.data_key);
307            }
308        });
309    }
310}
311
312/// Capture the current store state needed to roll back one prepared row op.
313///
314/// The returned op writes the prior index/data values back when applied.
315#[must_use]
316pub fn snapshot_row_rollback(op: &PreparedRowCommitOp) -> PreparedRowCommitOp {
317    let mut index_ops = Vec::with_capacity(op.index_ops.len());
318    for index_op in &op.index_ops {
319        let existing = index_op.store.with_borrow(|store| store.get(&index_op.key));
320        index_ops.push(PreparedIndexMutation {
321            store: index_op.store,
322            key: index_op.key,
323            value: existing,
324        });
325    }
326
327    let data_value = op.data_store.with_borrow(|store| store.get(&op.data_key));
328
329    PreparedRowCommitOp {
330        index_ops,
331        data_store: op.data_store,
332        data_key: op.data_key,
333        data_value,
334        index_remove_count: 0,
335        index_insert_count: 0,
336        reverse_index_remove_count: 0,
337        reverse_index_insert_count: 0,
338    }
339}
340
341/// Prepare a typed row-level commit op for one entity type.
342///
343/// This resolves store handles and index/data mutations so commit/recovery
344/// apply can remain mechanical.
345#[expect(clippy::too_many_lines)]
346pub fn prepare_row_commit_for_entity<E: EntityKind + EntityValue>(
347    db: &Db<E::Canister>,
348    op: &CommitRowOp,
349) -> Result<PreparedRowCommitOp, InternalError> {
350    if op.entity_path != E::PATH {
351        return Err(InternalError::new(
352            ErrorClass::Corruption,
353            ErrorOrigin::Store,
354            format!(
355                "commit marker entity path mismatch: expected '{}', found '{}'",
356                E::PATH,
357                op.entity_path
358            ),
359        ));
360    }
361
362    let raw_key = decode_data_key(&op.key)?;
363    let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
364        InternalError::new(
365            ErrorClass::Corruption,
366            ErrorOrigin::Store,
367            format!("commit marker data key corrupted: {err}"),
368        )
369    })?;
370    data_key.try_key::<E>()?;
371
372    let decode_entity = |bytes: &[u8], label: &str| -> Result<(RawRow, E), InternalError> {
373        let row = RawRow::try_new(bytes.to_vec())?;
374        let entity = row.try_decode::<E>().map_err(|err| {
375            InternalError::new(
376                ErrorClass::Corruption,
377                ErrorOrigin::Serialize,
378                format!("commit marker {label} row decode failed: {err}"),
379            )
380        })?;
381        let expected = data_key.try_key::<E>()?;
382        let actual = entity.id().key();
383        if expected != actual {
384            return Err(InternalError::new(
385                ErrorClass::Corruption,
386                ErrorOrigin::Store,
387                format!("commit marker row key mismatch: expected {expected:?}, found {actual:?}"),
388            ));
389        }
390
391        Ok((row, entity))
392    };
393
394    let old_pair = op
395        .before
396        .as_ref()
397        .map(|bytes| decode_entity(bytes, "before"))
398        .transpose()?;
399    let new_pair = op
400        .after
401        .as_ref()
402        .map(|bytes| decode_entity(bytes, "after"))
403        .transpose()?;
404
405    if old_pair.is_none() && new_pair.is_none() {
406        return Err(InternalError::new(
407            ErrorClass::Corruption,
408            ErrorOrigin::Store,
409            "commit marker row op is a no-op (before/after both missing)",
410        ));
411    }
412
413    let index_plan = plan_index_mutation_for_entity::<E>(
414        db,
415        old_pair.as_ref().map(|(_, entity)| entity),
416        new_pair.as_ref().map(|(_, entity)| entity),
417    )?;
418    let mut index_remove_count = 0usize;
419    let mut index_insert_count = 0usize;
420    for index in E::INDEXES {
421        let old_key = old_pair
422            .as_ref()
423            .map(|(_, old_entity)| IndexKey::new(old_entity, index))
424            .transpose()?
425            .flatten()
426            .map(|key| key.to_raw());
427        let new_key = new_pair
428            .as_ref()
429            .map(|(_, new_entity)| IndexKey::new(new_entity, index))
430            .transpose()?
431            .flatten()
432            .map(|key| key.to_raw());
433
434        if old_key != new_key {
435            if old_key.is_some() {
436                index_remove_count = index_remove_count.saturating_add(1);
437            }
438            if new_key.is_some() {
439                index_insert_count = index_insert_count.saturating_add(1);
440            }
441        }
442    }
443    let mut index_stores = BTreeMap::new();
444    for apply in &index_plan.apply {
445        index_stores.insert(apply.index.store, apply.store);
446    }
447
448    let mut index_ops = Vec::with_capacity(index_plan.commit_ops.len());
449    for index_op in index_plan.commit_ops {
450        let store = index_stores
451            .get(index_op.store.as_str())
452            .copied()
453            .ok_or_else(|| {
454                InternalError::new(
455                    ErrorClass::Corruption,
456                    ErrorOrigin::Index,
457                    format!(
458                        "missing index store '{}' for entity '{}'",
459                        index_op.store,
460                        E::PATH
461                    ),
462                )
463            })?;
464        let key = decode_index_key(&index_op.key)?;
465        let value = index_op
466            .value
467            .as_ref()
468            .map(|bytes| decode_index_entry(bytes))
469            .transpose()?;
470        index_ops.push(PreparedIndexMutation { store, key, value });
471    }
472    let (reverse_index_ops, reverse_remove_count, reverse_insert_count) =
473        prepare_reverse_relation_index_mutations_for_source::<E>(
474            db,
475            old_pair.as_ref().map(|(_, entity)| entity),
476            new_pair.as_ref().map(|(_, entity)| entity),
477        )?;
478    index_ops.extend(reverse_index_ops);
479
480    let data_store = db.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))?;
481    let data_value = new_pair.map(|(row, _)| row);
482
483    Ok(PreparedRowCommitOp {
484        index_ops,
485        data_store: data_store.data_store(),
486        data_key: raw_key,
487        data_value,
488        index_remove_count,
489        index_insert_count,
490        reverse_index_remove_count: reverse_remove_count,
491        reverse_index_insert_count: reverse_insert_count,
492    })
493}
494
495///
496/// CommitGuard
497///
498/// In-flight commit handle that clears the marker on completion.
499/// Must not be leaked across mutation boundaries.
500///
501
502#[derive(Clone, Debug)]
503pub struct CommitGuard {
504    pub marker: CommitMarker,
505}
506
507impl CommitGuard {
508    // Clear the commit marker without surfacing errors.
509    fn clear(self) {
510        let _ = self;
511        with_commit_store_infallible(CommitStore::clear_infallible);
512    }
513}
514
515/// Persist a commit marker and open the commit window.
516pub fn begin_commit(marker: CommitMarker) -> Result<CommitGuard, InternalError> {
517    with_commit_store(|store| {
518        if store.load()?.is_some() {
519            return Err(InternalError::new(
520                ErrorClass::InvariantViolation,
521                ErrorOrigin::Store,
522                "commit marker already present before begin",
523            ));
524        }
525        store.set(&marker)?;
526
527        Ok(CommitGuard { marker })
528    })
529}
530
531/// Apply commit ops and clear the marker regardless of outcome.
532///
533/// The apply closure performs mechanical marker application only.
534/// Any in-process rollback guard used by the closure is non-authoritative
535/// transitional cleanup; durable authority remains the commit marker protocol.
536pub fn finish_commit(
537    mut guard: CommitGuard,
538    apply: impl FnOnce(&mut CommitGuard) -> Result<(), InternalError>,
539) -> Result<(), InternalError> {
540    // COMMIT WINDOW:
541    // Apply mutates stores from a prevalidated marker payload.
542    // Marker durability + recovery replay remain the atomicity authority.
543    // We clear the marker on any outcome so recovery does not reapply an
544    // already-attempted marker in this process.
545    let result = apply(&mut guard);
546    let commit_id = guard.marker.id;
547    guard.clear();
548    // Internal invariant: commit markers must not persist after a finished mutation.
549    assert!(
550        with_commit_store_infallible(|store| store.is_empty()),
551        "commit marker must be cleared after finish_commit (commit_id={commit_id:?})"
552    );
553    result
554}