1mod decode;
16mod guard;
17mod memory;
18mod recovery;
19mod store;
20#[cfg(test)]
21mod tests;
22
23use crate::{
24 db::{
25 Db,
26 commit::{
27 decode::{decode_data_key, decode_index_entry, decode_index_key},
28 store::{CommitStore, with_commit_store, with_commit_store_infallible},
29 },
30 index::{IndexKey, RawIndexEntry, RawIndexKey, plan::plan_index_mutation_for_entity},
31 relation::prepare_reverse_relation_index_mutations_for_source,
32 store::{DataKey, DataStore, RawDataKey, RawRow},
33 },
34 error::{ErrorClass, ErrorOrigin, InternalError},
35 traits::{EntityKind, EntityValue, Path},
36 types::Ulid,
37};
38#[cfg(test)]
39use canic_memory::{
40 registry::{MemoryRegistry, MemoryRegistryError},
41 runtime::registry::MemoryRegistryRuntime,
42};
43use serde::{Deserialize, Serialize};
44#[cfg(test)]
45use std::collections::BTreeSet;
46use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
47
48pub use guard::CommitApplyGuard;
49pub use recovery::{ensure_recovered, ensure_recovered_for_write};
50
51#[cfg(test)]
52pub fn commit_marker_present() -> Result<bool, InternalError> {
54 store::commit_marker_present()
55}
56
57#[cfg(test)]
58pub fn init_commit_store_for_tests() -> Result<(), InternalError> {
63 let init_result = MemoryRegistryRuntime::init(Some(("icydb_test", 1, 200)));
65 match init_result {
66 Ok(_) => {}
67 Err(MemoryRegistryError::Overlap { .. }) => {
68 MemoryRegistryRuntime::init(None).map_err(|err| {
69 InternalError::new(
70 ErrorClass::Internal,
71 ErrorOrigin::Store,
72 format!("memory registry init failed: {err}"),
73 )
74 })?;
75 }
76 Err(err) => {
77 return Err(InternalError::new(
78 ErrorClass::Internal,
79 ErrorOrigin::Store,
80 format!("memory registry init failed: {err}"),
81 ));
82 }
83 }
84
85 let snapshots = MemoryRegistryRuntime::snapshot_ids_by_range();
87 if snapshots.is_empty() {
88 return Err(InternalError::new(
89 ErrorClass::Internal,
90 ErrorOrigin::Store,
91 "no memory ranges available for commit marker tests",
92 ));
93 }
94 let has_store_entry = snapshots.iter().any(|snapshot| {
95 snapshot.entries.iter().any(|(_, entry)| {
96 entry.label.ends_with("DataStore") || entry.label.ends_with("IndexStore")
97 })
98 });
99
100 if !has_store_entry {
101 let snapshot = snapshots.first().ok_or_else(|| {
102 InternalError::new(
103 ErrorClass::Internal,
104 ErrorOrigin::Store,
105 "no memory ranges available for commit marker tests",
106 )
107 })?;
108 let used_ids = snapshot
109 .entries
110 .iter()
111 .map(|(id, _)| *id)
112 .collect::<BTreeSet<_>>();
113 let dummy_id = (snapshot.range.start..=snapshot.range.end)
114 .find(|id| !used_ids.contains(id))
115 .ok_or_else(|| {
116 InternalError::new(
117 ErrorClass::Unsupported,
118 ErrorOrigin::Store,
119 format!(
120 "no free memory ids available for commit marker tests in range {}-{}",
121 snapshot.range.start, snapshot.range.end
122 ),
123 )
124 })?;
125
126 MemoryRegistry::register(dummy_id, &snapshot.owner, "commit_test::DataStore").map_err(
127 |err| {
128 InternalError::new(
129 ErrorClass::Internal,
130 ErrorOrigin::Store,
131 format!("commit test memory registration failed: {err}"),
132 )
133 },
134 )?;
135 }
136
137 with_commit_store(|_| Ok(()))
140}
141
142const COMMIT_LABEL: &str = "CommitMarker";
149const COMMIT_ID_BYTES: usize = 16;
150
151pub const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
154
155#[derive(Clone, Debug, Deserialize, Serialize)]
163#[serde(deny_unknown_fields)]
164pub struct CommitRowOp {
165 pub entity_path: String,
166 pub key: Vec<u8>,
167 pub before: Option<Vec<u8>>,
168 pub after: Option<Vec<u8>>,
169}
170
171impl CommitRowOp {
172 #[must_use]
174 pub fn new(
175 entity_path: impl Into<String>,
176 key: Vec<u8>,
177 before: Option<Vec<u8>>,
178 after: Option<Vec<u8>>,
179 ) -> Self {
180 Self {
181 entity_path: entity_path.into(),
182 key,
183 before,
184 after,
185 }
186 }
187}
188
189#[derive(Clone, Debug, Deserialize, Serialize)]
196#[serde(deny_unknown_fields)]
197pub struct CommitIndexOp {
198 pub store: String,
199 pub key: Vec<u8>,
200 pub value: Option<Vec<u8>>,
201}
202
203#[derive(Clone, Debug, Deserialize, Serialize)]
212#[serde(deny_unknown_fields)]
213pub struct CommitMarker {
214 pub id: [u8; COMMIT_ID_BYTES],
215 pub row_ops: Vec<CommitRowOp>,
216}
217
218impl CommitMarker {
219 pub fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
221 let id = Ulid::try_generate()
222 .map_err(|err| {
223 InternalError::new(
224 ErrorClass::Internal,
225 ErrorOrigin::Store,
226 format!("commit id generation failed: {err}"),
227 )
228 })?
229 .to_bytes();
230
231 Ok(Self { id, row_ops })
232 }
233}
234
235pub fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
244 for row_op in &marker.row_ops {
246 if row_op.before.is_none() && row_op.after.is_none() {
247 return Err(InternalError::new(
248 ErrorClass::Corruption,
249 ErrorOrigin::Store,
250 "commit marker corrupted: row op has neither before nor after payload",
251 ));
252 }
253 }
254
255 Ok(())
256}
257
258#[derive(Clone)]
265pub struct PreparedIndexMutation {
266 pub store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
267 pub key: RawIndexKey,
268 pub value: Option<RawIndexEntry>,
269}
270
271#[derive(Clone)]
278pub struct PreparedRowCommitOp {
279 pub index_ops: Vec<PreparedIndexMutation>,
280 pub data_store: &'static LocalKey<RefCell<DataStore>>,
281 pub data_key: RawDataKey,
282 pub data_value: Option<RawRow>,
283 pub index_remove_count: usize,
284 pub index_insert_count: usize,
285 pub reverse_index_remove_count: usize,
286 pub reverse_index_insert_count: usize,
287}
288
289impl PreparedRowCommitOp {
290 pub fn apply(self) {
292 for index_op in self.index_ops {
293 index_op.store.with_borrow_mut(|store| {
294 if let Some(value) = index_op.value {
295 store.insert(index_op.key, value);
296 } else {
297 store.remove(&index_op.key);
298 }
299 });
300 }
301
302 self.data_store.with_borrow_mut(|store| {
303 if let Some(value) = self.data_value {
304 store.insert(self.data_key, value);
305 } else {
306 store.remove(&self.data_key);
307 }
308 });
309 }
310}
311
312#[must_use]
316pub fn snapshot_row_rollback(op: &PreparedRowCommitOp) -> PreparedRowCommitOp {
317 let mut index_ops = Vec::with_capacity(op.index_ops.len());
318 for index_op in &op.index_ops {
319 let existing = index_op.store.with_borrow(|store| store.get(&index_op.key));
320 index_ops.push(PreparedIndexMutation {
321 store: index_op.store,
322 key: index_op.key,
323 value: existing,
324 });
325 }
326
327 let data_value = op.data_store.with_borrow(|store| store.get(&op.data_key));
328
329 PreparedRowCommitOp {
330 index_ops,
331 data_store: op.data_store,
332 data_key: op.data_key,
333 data_value,
334 index_remove_count: 0,
335 index_insert_count: 0,
336 reverse_index_remove_count: 0,
337 reverse_index_insert_count: 0,
338 }
339}
340
341#[expect(clippy::too_many_lines)]
346pub fn prepare_row_commit_for_entity<E: EntityKind + EntityValue>(
347 db: &Db<E::Canister>,
348 op: &CommitRowOp,
349) -> Result<PreparedRowCommitOp, InternalError> {
350 if op.entity_path != E::PATH {
351 return Err(InternalError::new(
352 ErrorClass::Corruption,
353 ErrorOrigin::Store,
354 format!(
355 "commit marker entity path mismatch: expected '{}', found '{}'",
356 E::PATH,
357 op.entity_path
358 ),
359 ));
360 }
361
362 let raw_key = decode_data_key(&op.key)?;
363 let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
364 InternalError::new(
365 ErrorClass::Corruption,
366 ErrorOrigin::Store,
367 format!("commit marker data key corrupted: {err}"),
368 )
369 })?;
370 data_key.try_key::<E>()?;
371
372 let decode_entity = |bytes: &[u8], label: &str| -> Result<(RawRow, E), InternalError> {
373 let row = RawRow::try_new(bytes.to_vec())?;
374 let entity = row.try_decode::<E>().map_err(|err| {
375 InternalError::new(
376 ErrorClass::Corruption,
377 ErrorOrigin::Serialize,
378 format!("commit marker {label} row decode failed: {err}"),
379 )
380 })?;
381 let expected = data_key.try_key::<E>()?;
382 let actual = entity.id().key();
383 if expected != actual {
384 return Err(InternalError::new(
385 ErrorClass::Corruption,
386 ErrorOrigin::Store,
387 format!("commit marker row key mismatch: expected {expected:?}, found {actual:?}"),
388 ));
389 }
390
391 Ok((row, entity))
392 };
393
394 let old_pair = op
395 .before
396 .as_ref()
397 .map(|bytes| decode_entity(bytes, "before"))
398 .transpose()?;
399 let new_pair = op
400 .after
401 .as_ref()
402 .map(|bytes| decode_entity(bytes, "after"))
403 .transpose()?;
404
405 if old_pair.is_none() && new_pair.is_none() {
406 return Err(InternalError::new(
407 ErrorClass::Corruption,
408 ErrorOrigin::Store,
409 "commit marker row op is a no-op (before/after both missing)",
410 ));
411 }
412
413 let index_plan = plan_index_mutation_for_entity::<E>(
414 db,
415 old_pair.as_ref().map(|(_, entity)| entity),
416 new_pair.as_ref().map(|(_, entity)| entity),
417 )?;
418 let mut index_remove_count = 0usize;
419 let mut index_insert_count = 0usize;
420 for index in E::INDEXES {
421 let old_key = old_pair
422 .as_ref()
423 .map(|(_, old_entity)| IndexKey::new(old_entity, index))
424 .transpose()?
425 .flatten()
426 .map(|key| key.to_raw());
427 let new_key = new_pair
428 .as_ref()
429 .map(|(_, new_entity)| IndexKey::new(new_entity, index))
430 .transpose()?
431 .flatten()
432 .map(|key| key.to_raw());
433
434 if old_key != new_key {
435 if old_key.is_some() {
436 index_remove_count = index_remove_count.saturating_add(1);
437 }
438 if new_key.is_some() {
439 index_insert_count = index_insert_count.saturating_add(1);
440 }
441 }
442 }
443 let mut index_stores = BTreeMap::new();
444 for apply in &index_plan.apply {
445 index_stores.insert(apply.index.store, apply.store);
446 }
447
448 let mut index_ops = Vec::with_capacity(index_plan.commit_ops.len());
449 for index_op in index_plan.commit_ops {
450 let store = index_stores
451 .get(index_op.store.as_str())
452 .copied()
453 .ok_or_else(|| {
454 InternalError::new(
455 ErrorClass::Corruption,
456 ErrorOrigin::Index,
457 format!(
458 "missing index store '{}' for entity '{}'",
459 index_op.store,
460 E::PATH
461 ),
462 )
463 })?;
464 let key = decode_index_key(&index_op.key)?;
465 let value = index_op
466 .value
467 .as_ref()
468 .map(|bytes| decode_index_entry(bytes))
469 .transpose()?;
470 index_ops.push(PreparedIndexMutation { store, key, value });
471 }
472 let (reverse_index_ops, reverse_remove_count, reverse_insert_count) =
473 prepare_reverse_relation_index_mutations_for_source::<E>(
474 db,
475 old_pair.as_ref().map(|(_, entity)| entity),
476 new_pair.as_ref().map(|(_, entity)| entity),
477 )?;
478 index_ops.extend(reverse_index_ops);
479
480 let data_store = db.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))?;
481 let data_value = new_pair.map(|(row, _)| row);
482
483 Ok(PreparedRowCommitOp {
484 index_ops,
485 data_store: data_store.data_store(),
486 data_key: raw_key,
487 data_value,
488 index_remove_count,
489 index_insert_count,
490 reverse_index_remove_count: reverse_remove_count,
491 reverse_index_insert_count: reverse_insert_count,
492 })
493}
494
495#[derive(Clone, Debug)]
503pub struct CommitGuard {
504 pub marker: CommitMarker,
505}
506
507impl CommitGuard {
508 fn clear(self) {
510 let _ = self;
511 with_commit_store_infallible(CommitStore::clear_infallible);
512 }
513}
514
515pub fn begin_commit(marker: CommitMarker) -> Result<CommitGuard, InternalError> {
517 with_commit_store(|store| {
518 if store.load()?.is_some() {
519 return Err(InternalError::new(
520 ErrorClass::InvariantViolation,
521 ErrorOrigin::Store,
522 "commit marker already present before begin",
523 ));
524 }
525 store.set(&marker)?;
526
527 Ok(CommitGuard { marker })
528 })
529}
530
531pub fn finish_commit(
537 mut guard: CommitGuard,
538 apply: impl FnOnce(&mut CommitGuard) -> Result<(), InternalError>,
539) -> Result<(), InternalError> {
540 let result = apply(&mut guard);
546 let commit_id = guard.marker.id;
547 guard.clear();
548 assert!(
550 with_commit_store_infallible(|store| store.is_empty()),
551 "commit marker must be cleared after finish_commit (commit_id={commit_id:?})"
552 );
553 result
554}