Skip to main content

icydb_core/db/commit/
mod.rs

1//! IcyDB commit protocol and atomicity guardrails.
2//!
3//! Contract:
4//! - `begin_commit` persists a marker that fully describes durable mutations.
5//! - Durable correctness is owned by marker replay in recovery (row ops).
6//! - In-process apply guards are best-effort cleanup only and are not authoritative.
7//!
8//! ## Commit Boundary and Authority of CommitMarker
9//!
10//! The `CommitMarker` fully specifies every row mutation. After
11//! the marker is persisted, executors must not re-derive semantics or branch
12//! on entity/index contents; apply logic deterministically replays row ops.
13//! Recovery replays row ops as recorded, not planner logic.
14
15mod decode;
16mod guard;
17mod memory;
18mod recovery;
19mod store;
20#[cfg(test)]
21mod tests;
22
23use crate::{
24    db::{
25        Db,
26        commit::{
27            decode::{decode_data_key, decode_index_entry, decode_index_key},
28            store::{CommitStore, with_commit_store, with_commit_store_infallible},
29        },
30        index::{IndexKey, RawIndexEntry, RawIndexKey, plan::plan_index_mutation_for_entity},
31        store::{DataKey, DataStore, RawDataKey, RawRow},
32    },
33    error::{ErrorClass, ErrorOrigin, InternalError},
34    traits::{EntityKind, EntityValue, Path},
35    types::Ulid,
36};
37#[cfg(test)]
38use canic_memory::{
39    registry::{MemoryRegistry, MemoryRegistryError},
40    runtime::registry::MemoryRegistryRuntime,
41};
42use serde::{Deserialize, Serialize};
43#[cfg(test)]
44use std::collections::BTreeSet;
45use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
46
47pub use guard::CommitApplyGuard;
48pub use recovery::{ensure_recovered, ensure_recovered_for_write};
49
50#[cfg(test)]
51/// Return true if a commit marker is currently persisted.
52pub fn commit_marker_present() -> Result<bool, InternalError> {
53    store::commit_marker_present()
54}
55
56#[cfg(test)]
57/// Initialize commit marker storage for tests.
58///
59/// This registers a placeholder data-store entry if none exists so the commit
60/// memory allocator can select the correct reserved range.
61pub fn init_commit_store_for_tests() -> Result<(), InternalError> {
62    // Phase 1: ensure the memory registry has at least one reserved range.
63    let init_result = MemoryRegistryRuntime::init(Some(("icydb_test", 1, 200)));
64    match init_result {
65        Ok(_) => {}
66        Err(MemoryRegistryError::Overlap { .. }) => {
67            MemoryRegistryRuntime::init(None).map_err(|err| {
68                InternalError::new(
69                    ErrorClass::Internal,
70                    ErrorOrigin::Store,
71                    format!("memory registry init failed: {err}"),
72                )
73            })?;
74        }
75        Err(err) => {
76            return Err(InternalError::new(
77                ErrorClass::Internal,
78                ErrorOrigin::Store,
79                format!("memory registry init failed: {err}"),
80            ));
81        }
82    }
83
84    // Phase 2: ensure a DB-store entry exists so commit memory can be allocated.
85    let snapshots = MemoryRegistryRuntime::snapshot_ids_by_range();
86    if snapshots.is_empty() {
87        return Err(InternalError::new(
88            ErrorClass::Internal,
89            ErrorOrigin::Store,
90            "no memory ranges available for commit marker tests",
91        ));
92    }
93    let has_store_entry = snapshots.iter().any(|snapshot| {
94        snapshot.entries.iter().any(|(_, entry)| {
95            entry.label.ends_with("DataStore") || entry.label.ends_with("IndexStore")
96        })
97    });
98
99    if !has_store_entry {
100        let snapshot = snapshots.first().ok_or_else(|| {
101            InternalError::new(
102                ErrorClass::Internal,
103                ErrorOrigin::Store,
104                "no memory ranges available for commit marker tests",
105            )
106        })?;
107        let used_ids = snapshot
108            .entries
109            .iter()
110            .map(|(id, _)| *id)
111            .collect::<BTreeSet<_>>();
112        let dummy_id = (snapshot.range.start..=snapshot.range.end)
113            .find(|id| !used_ids.contains(id))
114            .ok_or_else(|| {
115                InternalError::new(
116                    ErrorClass::Unsupported,
117                    ErrorOrigin::Store,
118                    format!(
119                        "no free memory ids available for commit marker tests in range {}-{}",
120                        snapshot.range.start, snapshot.range.end
121                    ),
122                )
123            })?;
124
125        MemoryRegistry::register(dummy_id, &snapshot.owner, "commit_test::DataStore").map_err(
126            |err| {
127                InternalError::new(
128                    ErrorClass::Internal,
129                    ErrorOrigin::Store,
130                    format!("commit test memory registration failed: {err}"),
131                )
132            },
133        )?;
134    }
135
136    // Phase 3: initialize the commit store in the production slot.
137
138    with_commit_store(|_| Ok(()))
139}
140
141// Stage-2 invariant:
142// - We persist a commit marker before any stable mutation.
143// - After marker creation, executor apply phases are infallible or trap.
144// - Recovery replays the stored row mutation plan.
145// This makes partial mutations deterministic without a WAL.
146
147const COMMIT_LABEL: &str = "CommitMarker";
148const COMMIT_ID_BYTES: usize = 16;
149
150// Conservative upper bound to avoid rejecting valid commits when index entries
151// are large; still small enough to fit typical canister constraints.
152pub const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
153
154///
155/// CommitRowOp
156///
157/// Row-level mutation recorded in a commit marker.
158/// Store identity is derived from `entity_path` at apply/recovery time.
159///
160
161#[derive(Clone, Debug, Deserialize, Serialize)]
162#[serde(deny_unknown_fields)]
163pub struct CommitRowOp {
164    pub entity_path: String,
165    pub key: Vec<u8>,
166    pub before: Option<Vec<u8>>,
167    pub after: Option<Vec<u8>>,
168}
169
170impl CommitRowOp {
171    /// Construct a row-level commit operation.
172    #[must_use]
173    pub fn new(
174        entity_path: impl Into<String>,
175        key: Vec<u8>,
176        before: Option<Vec<u8>>,
177        after: Option<Vec<u8>>,
178    ) -> Self {
179        Self {
180            entity_path: entity_path.into(),
181            key,
182            before,
183            after,
184        }
185    }
186}
187
188///
189/// CommitIndexOp
190///
191/// Internal index mutation used during row-op preparation/apply.
192/// Not persisted in commit markers.
193
194#[derive(Clone, Debug, Deserialize, Serialize)]
195#[serde(deny_unknown_fields)]
196pub struct CommitIndexOp {
197    pub store: String,
198    pub key: Vec<u8>,
199    pub value: Option<Vec<u8>>,
200}
201
202///
203/// CommitMarker
204///
205/// Persisted mutation plan covering row-level operations.
206/// Recovery replays the marker exactly as stored.
207/// Unknown fields are rejected as corruption; commit markers are not forward-compatible.
208/// This is internal commit-protocol metadata, not a user-schema type.
209
210#[derive(Clone, Debug, Deserialize, Serialize)]
211#[serde(deny_unknown_fields)]
212pub struct CommitMarker {
213    pub id: [u8; COMMIT_ID_BYTES],
214    pub row_ops: Vec<CommitRowOp>,
215}
216
217impl CommitMarker {
218    /// Construct a new commit marker with a fresh commit id.
219    pub fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
220        let id = Ulid::try_generate()
221            .map_err(|err| {
222                InternalError::new(
223                    ErrorClass::Internal,
224                    ErrorOrigin::Store,
225                    format!("commit id generation failed: {err}"),
226                )
227            })?
228            .to_bytes();
229
230        Ok(Self { id, row_ops })
231    }
232}
233
234/// Validate commit-marker row-op shape invariants.
235///
236/// Every row op must represent a concrete mutation:
237/// - insert (`before=None`, `after=Some`)
238/// - update (`before=Some`, `after=Some`)
239/// - delete (`before=Some`, `after=None`)
240///
241/// The empty shape (`before=None`, `after=None`) is corruption.
242pub fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
243    // Phase 1: reject row ops that cannot encode any mutation semantics.
244    for row_op in &marker.row_ops {
245        if row_op.before.is_none() && row_op.after.is_none() {
246            return Err(InternalError::new(
247                ErrorClass::Corruption,
248                ErrorOrigin::Store,
249                "commit marker corrupted: row op has neither before nor after payload",
250            ));
251        }
252    }
253
254    Ok(())
255}
256
257///
258/// PreparedIndexMutation
259///
260/// Mechanical index mutation derived from a row op.
261///
262
263#[derive(Clone)]
264pub struct PreparedIndexMutation {
265    pub store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
266    pub key: RawIndexKey,
267    pub value: Option<RawIndexEntry>,
268}
269
270///
271/// PreparedRowCommitOp
272///
273/// Mechanical store mutation derived from one row op.
274///
275
276#[derive(Clone)]
277pub struct PreparedRowCommitOp {
278    pub index_ops: Vec<PreparedIndexMutation>,
279    pub data_store: &'static LocalKey<RefCell<DataStore>>,
280    pub data_key: RawDataKey,
281    pub data_value: Option<RawRow>,
282    pub index_remove_count: usize,
283    pub index_insert_count: usize,
284}
285
286impl PreparedRowCommitOp {
287    /// Apply the prepared row operation infallibly.
288    pub fn apply(self) {
289        for index_op in self.index_ops {
290            index_op.store.with_borrow_mut(|store| {
291                if let Some(value) = index_op.value {
292                    store.insert(index_op.key, value);
293                } else {
294                    store.remove(&index_op.key);
295                }
296            });
297        }
298
299        self.data_store.with_borrow_mut(|store| {
300            if let Some(value) = self.data_value {
301                store.insert(self.data_key, value);
302            } else {
303                store.remove(&self.data_key);
304            }
305        });
306    }
307}
308
309/// Capture the current store state needed to roll back one prepared row op.
310///
311/// The returned op writes the prior index/data values back when applied.
312#[must_use]
313pub fn snapshot_row_rollback(op: &PreparedRowCommitOp) -> PreparedRowCommitOp {
314    let mut index_ops = Vec::with_capacity(op.index_ops.len());
315    for index_op in &op.index_ops {
316        let existing = index_op.store.with_borrow(|store| store.get(&index_op.key));
317        index_ops.push(PreparedIndexMutation {
318            store: index_op.store,
319            key: index_op.key,
320            value: existing,
321        });
322    }
323
324    let data_value = op.data_store.with_borrow(|store| store.get(&op.data_key));
325
326    PreparedRowCommitOp {
327        index_ops,
328        data_store: op.data_store,
329        data_key: op.data_key,
330        data_value,
331        index_remove_count: 0,
332        index_insert_count: 0,
333    }
334}
335
336/// Prepare a typed row-level commit op for one entity type.
337///
338/// This resolves store handles and index/data mutations so commit/recovery
339/// apply can remain mechanical.
340#[expect(clippy::too_many_lines)]
341pub fn prepare_row_commit_for_entity<E: EntityKind + EntityValue>(
342    db: &Db<E::Canister>,
343    op: &CommitRowOp,
344) -> Result<PreparedRowCommitOp, InternalError> {
345    if op.entity_path != E::PATH {
346        return Err(InternalError::new(
347            ErrorClass::Corruption,
348            ErrorOrigin::Store,
349            format!(
350                "commit marker entity path mismatch: expected '{}', found '{}'",
351                E::PATH,
352                op.entity_path
353            ),
354        ));
355    }
356
357    let raw_key = decode_data_key(&op.key)?;
358    let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
359        InternalError::new(
360            ErrorClass::Corruption,
361            ErrorOrigin::Store,
362            format!("commit marker data key corrupted: {err}"),
363        )
364    })?;
365    data_key.try_key::<E>()?;
366
367    let decode_entity = |bytes: &[u8], label: &str| -> Result<(RawRow, E), InternalError> {
368        let row = RawRow::try_new(bytes.to_vec())?;
369        let entity = row.try_decode::<E>().map_err(|err| {
370            InternalError::new(
371                ErrorClass::Corruption,
372                ErrorOrigin::Serialize,
373                format!("commit marker {label} row decode failed: {err}"),
374            )
375        })?;
376        let expected = data_key.try_key::<E>()?;
377        let actual = entity.id().key();
378        if expected != actual {
379            return Err(InternalError::new(
380                ErrorClass::Corruption,
381                ErrorOrigin::Store,
382                format!("commit marker row key mismatch: expected {expected:?}, found {actual:?}"),
383            ));
384        }
385
386        Ok((row, entity))
387    };
388
389    let old_pair = op
390        .before
391        .as_ref()
392        .map(|bytes| decode_entity(bytes, "before"))
393        .transpose()?;
394    let new_pair = op
395        .after
396        .as_ref()
397        .map(|bytes| decode_entity(bytes, "after"))
398        .transpose()?;
399
400    if old_pair.is_none() && new_pair.is_none() {
401        return Err(InternalError::new(
402            ErrorClass::Corruption,
403            ErrorOrigin::Store,
404            "commit marker row op is a no-op (before/after both missing)",
405        ));
406    }
407
408    let index_plan = plan_index_mutation_for_entity::<E>(
409        db,
410        old_pair.as_ref().map(|(_, entity)| entity),
411        new_pair.as_ref().map(|(_, entity)| entity),
412    )?;
413    let mut index_remove_count = 0usize;
414    let mut index_insert_count = 0usize;
415    for index in E::INDEXES {
416        let old_key = old_pair
417            .as_ref()
418            .map(|(_, old_entity)| IndexKey::new(old_entity, index))
419            .transpose()?
420            .flatten()
421            .map(|key| key.to_raw());
422        let new_key = new_pair
423            .as_ref()
424            .map(|(_, new_entity)| IndexKey::new(new_entity, index))
425            .transpose()?
426            .flatten()
427            .map(|key| key.to_raw());
428
429        if old_key != new_key {
430            if old_key.is_some() {
431                index_remove_count = index_remove_count.saturating_add(1);
432            }
433            if new_key.is_some() {
434                index_insert_count = index_insert_count.saturating_add(1);
435            }
436        }
437    }
438    let mut index_stores = BTreeMap::new();
439    for apply in &index_plan.apply {
440        index_stores.insert(apply.index.store, apply.store);
441    }
442
443    let mut index_ops = Vec::with_capacity(index_plan.commit_ops.len());
444    for index_op in index_plan.commit_ops {
445        let store = index_stores
446            .get(index_op.store.as_str())
447            .copied()
448            .ok_or_else(|| {
449                InternalError::new(
450                    ErrorClass::Corruption,
451                    ErrorOrigin::Index,
452                    format!(
453                        "missing index store '{}' for entity '{}'",
454                        index_op.store,
455                        E::PATH
456                    ),
457                )
458            })?;
459        let key = decode_index_key(&index_op.key)?;
460        let value = index_op
461            .value
462            .as_ref()
463            .map(|bytes| decode_index_entry(bytes))
464            .transpose()?;
465        index_ops.push(PreparedIndexMutation { store, key, value });
466    }
467
468    let data_store = db.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))?;
469    let data_value = new_pair.map(|(row, _)| row);
470
471    Ok(PreparedRowCommitOp {
472        index_ops,
473        data_store: data_store.data_store(),
474        data_key: raw_key,
475        data_value,
476        index_remove_count,
477        index_insert_count,
478    })
479}
480
481///
482/// CommitGuard
483///
484/// In-flight commit handle that clears the marker on completion.
485/// Must not be leaked across mutation boundaries.
486///
487
488#[derive(Clone, Debug)]
489pub struct CommitGuard {
490    pub marker: CommitMarker,
491}
492
493impl CommitGuard {
494    // Clear the commit marker without surfacing errors.
495    fn clear(self) {
496        let _ = self;
497        with_commit_store_infallible(CommitStore::clear_infallible);
498    }
499}
500
501/// Persist a commit marker and open the commit window.
502pub fn begin_commit(marker: CommitMarker) -> Result<CommitGuard, InternalError> {
503    with_commit_store(|store| {
504        if store.load()?.is_some() {
505            return Err(InternalError::new(
506                ErrorClass::InvariantViolation,
507                ErrorOrigin::Store,
508                "commit marker already present before begin",
509            ));
510        }
511        store.set(&marker)?;
512
513        Ok(CommitGuard { marker })
514    })
515}
516
517/// Apply commit ops and clear the marker regardless of outcome.
518///
519/// The apply closure performs mechanical marker application only.
520/// Any in-process rollback guard used by the closure is non-authoritative
521/// transitional cleanup; durable authority remains the commit marker protocol.
522pub fn finish_commit(
523    mut guard: CommitGuard,
524    apply: impl FnOnce(&mut CommitGuard) -> Result<(), InternalError>,
525) -> Result<(), InternalError> {
526    // COMMIT WINDOW:
527    // Apply mutates stores from a prevalidated marker payload.
528    // Marker durability + recovery replay remain the atomicity authority.
529    // We clear the marker on any outcome so recovery does not reapply an
530    // already-attempted marker in this process.
531    let result = apply(&mut guard);
532    let commit_id = guard.marker.id;
533    guard.clear();
534    // Internal invariant: commit markers must not persist after a finished mutation.
535    assert!(
536        with_commit_store_infallible(|store| store.is_empty()),
537        "commit marker must be cleared after finish_commit (commit_id={commit_id:?})"
538    );
539    result
540}