Skip to main content

icydb_core/db/commit/
mod.rs

1//! IcyDB commit protocol and atomicity guardrails.
2//!
3//! Contract:
4//! - `begin_commit` persists a marker that fully describes durable mutations.
5//! - Durable correctness is owned by marker replay in recovery (row ops).
6//! - In-process apply guards are best-effort cleanup only and are not authoritative.
7//!
8//! ## Commit Boundary and Authority of CommitMarker
9//!
10//! The `CommitMarker` fully specifies every row mutation. After
11//! the marker is persisted, executors must not re-derive semantics or branch
12//! on entity/index contents; apply logic deterministically replays row ops.
13//! Recovery replays row ops as recorded, not planner logic.
14
15mod decode;
16mod guard;
17mod memory;
18mod recovery;
19mod store;
20#[cfg(test)]
21mod tests;
22
23use crate::{
24    db::{
25        Db,
26        commit::{
27            decode::{decode_data_key, decode_index_entry, decode_index_key},
28            store::{CommitStore, with_commit_store, with_commit_store_infallible},
29        },
30        decode::decode_entity_with_expected_key,
31        index::{IndexKey, RawIndexEntry, RawIndexKey, plan::plan_index_mutation_for_entity},
32        relation::prepare_reverse_relation_index_mutations_for_source,
33        store::{DataKey, DataStore, RawDataKey, RawRow},
34    },
35    error::{ErrorClass, ErrorOrigin, InternalError},
36    traits::{EntityKind, EntityValue, Path},
37    types::Ulid,
38};
39#[cfg(test)]
40use canic_memory::{
41    registry::{MemoryRegistry, MemoryRegistryError},
42    runtime::registry::MemoryRegistryRuntime,
43};
44use serde::{Deserialize, Serialize};
45#[cfg(test)]
46use std::collections::BTreeSet;
47use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
48
49pub use guard::CommitApplyGuard;
50pub use recovery::{ensure_recovered, ensure_recovered_for_write};
51
52#[cfg(test)]
53/// Return true if a commit marker is currently persisted.
54pub fn commit_marker_present() -> Result<bool, InternalError> {
55    store::commit_marker_present()
56}
57
58#[cfg(test)]
59/// Initialize commit marker storage for tests.
60///
61/// This registers a placeholder data-store entry if none exists so the commit
62/// memory allocator can select the correct reserved range.
63pub fn init_commit_store_for_tests() -> Result<(), InternalError> {
64    // Phase 1: ensure the memory registry has at least one reserved range.
65    let init_result = MemoryRegistryRuntime::init(Some(("icydb_test", 1, 200)));
66    match init_result {
67        Ok(_) => {}
68        Err(MemoryRegistryError::Overlap { .. }) => {
69            MemoryRegistryRuntime::init(None).map_err(|err| {
70                InternalError::new(
71                    ErrorClass::Internal,
72                    ErrorOrigin::Store,
73                    format!("memory registry init failed: {err}"),
74                )
75            })?;
76        }
77        Err(err) => {
78            return Err(InternalError::new(
79                ErrorClass::Internal,
80                ErrorOrigin::Store,
81                format!("memory registry init failed: {err}"),
82            ));
83        }
84    }
85
86    // Phase 2: ensure a DB-store entry exists so commit memory can be allocated.
87    let snapshots = MemoryRegistryRuntime::snapshot_ids_by_range();
88    if snapshots.is_empty() {
89        return Err(InternalError::new(
90            ErrorClass::Internal,
91            ErrorOrigin::Store,
92            "no memory ranges available for commit marker tests",
93        ));
94    }
95    let has_store_entry = snapshots.iter().any(|snapshot| {
96        snapshot.entries.iter().any(|(_, entry)| {
97            entry.label.ends_with("DataStore") || entry.label.ends_with("IndexStore")
98        })
99    });
100
101    if !has_store_entry {
102        let snapshot = snapshots.first().ok_or_else(|| {
103            InternalError::new(
104                ErrorClass::Internal,
105                ErrorOrigin::Store,
106                "no memory ranges available for commit marker tests",
107            )
108        })?;
109        let used_ids = snapshot
110            .entries
111            .iter()
112            .map(|(id, _)| *id)
113            .collect::<BTreeSet<_>>();
114        let dummy_id = (snapshot.range.start..=snapshot.range.end)
115            .find(|id| !used_ids.contains(id))
116            .ok_or_else(|| {
117                InternalError::new(
118                    ErrorClass::Unsupported,
119                    ErrorOrigin::Store,
120                    format!(
121                        "no free memory ids available for commit marker tests in range {}-{}",
122                        snapshot.range.start, snapshot.range.end
123                    ),
124                )
125            })?;
126
127        MemoryRegistry::register(dummy_id, &snapshot.owner, "commit_test::DataStore").map_err(
128            |err| {
129                InternalError::new(
130                    ErrorClass::Internal,
131                    ErrorOrigin::Store,
132                    format!("commit test memory registration failed: {err}"),
133                )
134            },
135        )?;
136    }
137
138    // Phase 3: initialize the commit store in the production slot.
139
140    with_commit_store(|_| Ok(()))
141}
142
143// Stage-2 invariant:
144// - We persist a commit marker before any stable mutation.
145// - After marker creation, executor apply phases are infallible or trap.
146// - Recovery replays the stored row mutation plan.
147// This makes partial mutations deterministic without a WAL.
148
149const COMMIT_LABEL: &str = "CommitMarker";
150const COMMIT_ID_BYTES: usize = 16;
151
152// Conservative upper bound to avoid rejecting valid commits when index entries
153// are large; still small enough to fit typical canister constraints.
154pub const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
155
156///
157/// CommitRowOp
158///
159/// Row-level mutation recorded in a commit marker.
160/// Store identity is derived from `entity_path` at apply/recovery time.
161///
162
163#[derive(Clone, Debug, Deserialize, Serialize)]
164#[serde(deny_unknown_fields)]
165pub struct CommitRowOp {
166    pub entity_path: String,
167    pub key: Vec<u8>,
168    pub before: Option<Vec<u8>>,
169    pub after: Option<Vec<u8>>,
170}
171
172impl CommitRowOp {
173    /// Construct a row-level commit operation.
174    #[must_use]
175    pub fn new(
176        entity_path: impl Into<String>,
177        key: Vec<u8>,
178        before: Option<Vec<u8>>,
179        after: Option<Vec<u8>>,
180    ) -> Self {
181        Self {
182            entity_path: entity_path.into(),
183            key,
184            before,
185            after,
186        }
187    }
188}
189
190///
191/// CommitIndexOp
192///
193/// Internal index mutation used during row-op preparation/apply.
194/// Not persisted in commit markers.
195
196#[derive(Clone, Debug, Deserialize, Serialize)]
197#[serde(deny_unknown_fields)]
198pub struct CommitIndexOp {
199    pub store: String,
200    pub key: Vec<u8>,
201    pub value: Option<Vec<u8>>,
202}
203
204///
205/// CommitMarker
206///
207/// Persisted mutation plan covering row-level operations.
208/// Recovery replays the marker exactly as stored.
209/// Unknown fields are rejected as corruption; commit markers are not forward-compatible.
210/// This is internal commit-protocol metadata, not a user-schema type.
211
212#[derive(Clone, Debug, Deserialize, Serialize)]
213#[serde(deny_unknown_fields)]
214pub struct CommitMarker {
215    pub id: [u8; COMMIT_ID_BYTES],
216    pub row_ops: Vec<CommitRowOp>,
217}
218
219impl CommitMarker {
220    /// Construct a new commit marker with a fresh commit id.
221    pub fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
222        let id = Ulid::try_generate()
223            .map_err(|err| {
224                InternalError::new(
225                    ErrorClass::Internal,
226                    ErrorOrigin::Store,
227                    format!("commit id generation failed: {err}"),
228                )
229            })?
230            .to_bytes();
231
232        Ok(Self { id, row_ops })
233    }
234}
235
236/// Validate commit-marker row-op shape invariants.
237///
238/// Every row op must represent a concrete mutation:
239/// - insert (`before=None`, `after=Some`)
240/// - update (`before=Some`, `after=Some`)
241/// - delete (`before=Some`, `after=None`)
242///
243/// The empty shape (`before=None`, `after=None`) is corruption.
244pub fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
245    // Phase 1: reject row ops that cannot encode any mutation semantics.
246    for row_op in &marker.row_ops {
247        if row_op.before.is_none() && row_op.after.is_none() {
248            return Err(InternalError::new(
249                ErrorClass::Corruption,
250                ErrorOrigin::Store,
251                "commit marker corrupted: row op has neither before nor after payload",
252            ));
253        }
254    }
255
256    Ok(())
257}
258
259///
260/// PreparedIndexMutation
261///
262/// Mechanical index mutation derived from a row op.
263///
264
265#[derive(Clone)]
266pub struct PreparedIndexMutation {
267    pub store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
268    pub key: RawIndexKey,
269    pub value: Option<RawIndexEntry>,
270}
271
272///
273/// PreparedRowCommitOp
274///
275/// Mechanical store mutation derived from one row op.
276///
277
278#[derive(Clone)]
279pub struct PreparedRowCommitOp {
280    pub index_ops: Vec<PreparedIndexMutation>,
281    pub data_store: &'static LocalKey<RefCell<DataStore>>,
282    pub data_key: RawDataKey,
283    pub data_value: Option<RawRow>,
284    pub index_remove_count: usize,
285    pub index_insert_count: usize,
286    pub reverse_index_remove_count: usize,
287    pub reverse_index_insert_count: usize,
288}
289
290impl PreparedRowCommitOp {
291    /// Apply the prepared row operation infallibly.
292    pub fn apply(self) {
293        for index_op in self.index_ops {
294            index_op.store.with_borrow_mut(|store| {
295                if let Some(value) = index_op.value {
296                    store.insert(index_op.key, value);
297                } else {
298                    store.remove(&index_op.key);
299                }
300            });
301        }
302
303        self.data_store.with_borrow_mut(|store| {
304            if let Some(value) = self.data_value {
305                store.insert(self.data_key, value);
306            } else {
307                store.remove(&self.data_key);
308            }
309        });
310    }
311}
312
313/// Capture the current store state needed to roll back one prepared row op.
314///
315/// The returned op writes the prior index/data values back when applied.
316#[must_use]
317pub fn snapshot_row_rollback(op: &PreparedRowCommitOp) -> PreparedRowCommitOp {
318    let mut index_ops = Vec::with_capacity(op.index_ops.len());
319    for index_op in &op.index_ops {
320        let existing = index_op.store.with_borrow(|store| store.get(&index_op.key));
321        index_ops.push(PreparedIndexMutation {
322            store: index_op.store,
323            key: index_op.key,
324            value: existing,
325        });
326    }
327
328    let data_value = op.data_store.with_borrow(|store| store.get(&op.data_key));
329
330    PreparedRowCommitOp {
331        index_ops,
332        data_store: op.data_store,
333        data_key: op.data_key,
334        data_value,
335        index_remove_count: 0,
336        index_insert_count: 0,
337        reverse_index_remove_count: 0,
338        reverse_index_insert_count: 0,
339    }
340}
341
342/// Apply prepared-row rollback operations in reverse write order.
343///
344/// This is shared by preflight/recovery paths so rollback ordering remains
345/// mechanically consistent across commit-related execution phases.
346pub fn rollback_prepared_row_ops_reverse(ops: Vec<PreparedRowCommitOp>) {
347    for op in ops.into_iter().rev() {
348        op.apply();
349    }
350}
351
352/// Prepare a typed row-level commit op for one entity type.
353///
354/// This resolves store handles and index/data mutations so commit/recovery
355/// apply can remain mechanical.
356#[expect(clippy::too_many_lines)]
357pub fn prepare_row_commit_for_entity<E: EntityKind + EntityValue>(
358    db: &Db<E::Canister>,
359    op: &CommitRowOp,
360) -> Result<PreparedRowCommitOp, InternalError> {
361    if op.entity_path != E::PATH {
362        return Err(InternalError::new(
363            ErrorClass::Corruption,
364            ErrorOrigin::Store,
365            format!(
366                "commit marker entity path mismatch: expected '{}', found '{}'",
367                E::PATH,
368                op.entity_path
369            ),
370        ));
371    }
372
373    let raw_key = decode_data_key(&op.key)?;
374    let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
375        InternalError::new(
376            ErrorClass::Corruption,
377            ErrorOrigin::Store,
378            format!("commit marker data key corrupted: {err}"),
379        )
380    })?;
381    let expected_key = data_key.try_key::<E>()?;
382
383    let decode_entity = |bytes: &[u8], label: &str| -> Result<(RawRow, E), InternalError> {
384        let row = RawRow::try_new(bytes.to_vec())?;
385        let entity = decode_entity_with_expected_key::<E, _, _, _, _>(
386            expected_key,
387            || row.try_decode::<E>(),
388            |err| {
389                InternalError::new(
390                    ErrorClass::Corruption,
391                    ErrorOrigin::Serialize,
392                    format!("commit marker {label} row decode failed: {err}"),
393                )
394            },
395            |expected, actual| {
396                Ok(InternalError::new(
397                    ErrorClass::Corruption,
398                    ErrorOrigin::Store,
399                    format!(
400                        "commit marker row key mismatch: expected {expected:?}, found {actual:?}"
401                    ),
402                ))
403            },
404        )?;
405
406        Ok((row, entity))
407    };
408
409    let old_pair = op
410        .before
411        .as_ref()
412        .map(|bytes| decode_entity(bytes, "before"))
413        .transpose()?;
414    let new_pair = op
415        .after
416        .as_ref()
417        .map(|bytes| decode_entity(bytes, "after"))
418        .transpose()?;
419
420    if old_pair.is_none() && new_pair.is_none() {
421        return Err(InternalError::new(
422            ErrorClass::Corruption,
423            ErrorOrigin::Store,
424            "commit marker row op is a no-op (before/after both missing)",
425        ));
426    }
427
428    let index_plan = plan_index_mutation_for_entity::<E>(
429        db,
430        old_pair.as_ref().map(|(_, entity)| entity),
431        new_pair.as_ref().map(|(_, entity)| entity),
432    )?;
433    let mut index_remove_count = 0usize;
434    let mut index_insert_count = 0usize;
435    for index in E::INDEXES {
436        let old_key = old_pair
437            .as_ref()
438            .map(|(_, old_entity)| IndexKey::new(old_entity, index))
439            .transpose()?
440            .flatten()
441            .map(|key| key.to_raw());
442        let new_key = new_pair
443            .as_ref()
444            .map(|(_, new_entity)| IndexKey::new(new_entity, index))
445            .transpose()?
446            .flatten()
447            .map(|key| key.to_raw());
448
449        if old_key != new_key {
450            if old_key.is_some() {
451                index_remove_count = index_remove_count.saturating_add(1);
452            }
453            if new_key.is_some() {
454                index_insert_count = index_insert_count.saturating_add(1);
455            }
456        }
457    }
458    let mut index_stores = BTreeMap::new();
459    for apply in &index_plan.apply {
460        index_stores.insert(apply.index.store, apply.store);
461    }
462
463    let mut index_ops = Vec::with_capacity(index_plan.commit_ops.len());
464    for index_op in index_plan.commit_ops {
465        let store = index_stores
466            .get(index_op.store.as_str())
467            .copied()
468            .ok_or_else(|| {
469                InternalError::new(
470                    ErrorClass::Corruption,
471                    ErrorOrigin::Index,
472                    format!(
473                        "missing index store '{}' for entity '{}'",
474                        index_op.store,
475                        E::PATH
476                    ),
477                )
478            })?;
479        let key = decode_index_key(&index_op.key)?;
480        let value = index_op
481            .value
482            .as_ref()
483            .map(|bytes| decode_index_entry(bytes))
484            .transpose()?;
485        index_ops.push(PreparedIndexMutation { store, key, value });
486    }
487    let (reverse_index_ops, reverse_remove_count, reverse_insert_count) =
488        prepare_reverse_relation_index_mutations_for_source::<E>(
489            db,
490            old_pair.as_ref().map(|(_, entity)| entity),
491            new_pair.as_ref().map(|(_, entity)| entity),
492        )?;
493    index_ops.extend(reverse_index_ops);
494
495    let data_store = db.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))?;
496    let data_value = new_pair.map(|(row, _)| row);
497
498    Ok(PreparedRowCommitOp {
499        index_ops,
500        data_store: data_store.data_store(),
501        data_key: raw_key,
502        data_value,
503        index_remove_count,
504        index_insert_count,
505        reverse_index_remove_count: reverse_remove_count,
506        reverse_index_insert_count: reverse_insert_count,
507    })
508}
509
510///
511/// CommitGuard
512///
513/// In-flight commit handle that clears the marker on completion.
514/// Must not be leaked across mutation boundaries.
515///
516
517#[derive(Clone, Debug)]
518pub struct CommitGuard {
519    pub marker: CommitMarker,
520}
521
522impl CommitGuard {
523    // Clear the commit marker without surfacing errors.
524    fn clear(self) {
525        let _ = self;
526        with_commit_store_infallible(CommitStore::clear_infallible);
527    }
528}
529
530/// Persist a commit marker and open the commit window.
531pub fn begin_commit(marker: CommitMarker) -> Result<CommitGuard, InternalError> {
532    with_commit_store(|store| {
533        if store.load()?.is_some() {
534            return Err(InternalError::new(
535                ErrorClass::InvariantViolation,
536                ErrorOrigin::Store,
537                "commit marker already present before begin",
538            ));
539        }
540        store.set(&marker)?;
541
542        Ok(CommitGuard { marker })
543    })
544}
545
546/// Apply commit ops and clear the marker regardless of outcome.
547///
548/// The apply closure performs mechanical marker application only.
549/// Any in-process rollback guard used by the closure is non-authoritative
550/// transitional cleanup; durable authority remains the commit marker protocol.
551pub fn finish_commit(
552    mut guard: CommitGuard,
553    apply: impl FnOnce(&mut CommitGuard) -> Result<(), InternalError>,
554) -> Result<(), InternalError> {
555    // COMMIT WINDOW:
556    // Apply mutates stores from a prevalidated marker payload.
557    // Marker durability + recovery replay remain the atomicity authority.
558    // We clear the marker on any outcome so recovery does not reapply an
559    // already-attempted marker in this process.
560    let result = apply(&mut guard);
561    let commit_id = guard.marker.id;
562    guard.clear();
563    // Internal invariant: commit markers must not persist after a finished mutation.
564    assert!(
565        with_commit_store_infallible(|store| store.is_empty()),
566        "commit marker must be cleared after finish_commit (commit_id={commit_id:?})"
567    );
568    result
569}