1mod decode;
16mod guard;
17mod memory;
18mod recovery;
19mod store;
20#[cfg(test)]
21mod tests;
22
23use crate::{
24 db::{
25 Db,
26 commit::{
27 decode::{decode_data_key, decode_index_entry, decode_index_key},
28 store::{CommitStore, with_commit_store, with_commit_store_infallible},
29 },
30 decode::decode_entity_with_expected_key,
31 index::{IndexKey, RawIndexEntry, RawIndexKey, plan::plan_index_mutation_for_entity},
32 relation::prepare_reverse_relation_index_mutations_for_source,
33 store::{DataKey, DataStore, RawDataKey, RawRow},
34 },
35 error::{ErrorClass, ErrorOrigin, InternalError},
36 traits::{EntityKind, EntityValue, Path},
37 types::Ulid,
38};
39#[cfg(test)]
40use canic_memory::{
41 registry::{MemoryRegistry, MemoryRegistryError},
42 runtime::registry::MemoryRegistryRuntime,
43};
44use serde::{Deserialize, Serialize};
45#[cfg(test)]
46use std::collections::BTreeSet;
47use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
48
49pub use guard::CommitApplyGuard;
50pub use recovery::{ensure_recovered, ensure_recovered_for_write};
51
52#[cfg(test)]
53pub fn commit_marker_present() -> Result<bool, InternalError> {
55 store::commit_marker_present()
56}
57
58#[cfg(test)]
59pub fn init_commit_store_for_tests() -> Result<(), InternalError> {
64 let init_result = MemoryRegistryRuntime::init(Some(("icydb_test", 1, 200)));
66 match init_result {
67 Ok(_) => {}
68 Err(MemoryRegistryError::Overlap { .. }) => {
69 MemoryRegistryRuntime::init(None).map_err(|err| {
70 InternalError::new(
71 ErrorClass::Internal,
72 ErrorOrigin::Store,
73 format!("memory registry init failed: {err}"),
74 )
75 })?;
76 }
77 Err(err) => {
78 return Err(InternalError::new(
79 ErrorClass::Internal,
80 ErrorOrigin::Store,
81 format!("memory registry init failed: {err}"),
82 ));
83 }
84 }
85
86 let snapshots = MemoryRegistryRuntime::snapshot_ids_by_range();
88 if snapshots.is_empty() {
89 return Err(InternalError::new(
90 ErrorClass::Internal,
91 ErrorOrigin::Store,
92 "no memory ranges available for commit marker tests",
93 ));
94 }
95 let has_store_entry = snapshots.iter().any(|snapshot| {
96 snapshot.entries.iter().any(|(_, entry)| {
97 entry.label.ends_with("DataStore") || entry.label.ends_with("IndexStore")
98 })
99 });
100
101 if !has_store_entry {
102 let snapshot = snapshots.first().ok_or_else(|| {
103 InternalError::new(
104 ErrorClass::Internal,
105 ErrorOrigin::Store,
106 "no memory ranges available for commit marker tests",
107 )
108 })?;
109 let used_ids = snapshot
110 .entries
111 .iter()
112 .map(|(id, _)| *id)
113 .collect::<BTreeSet<_>>();
114 let dummy_id = (snapshot.range.start..=snapshot.range.end)
115 .find(|id| !used_ids.contains(id))
116 .ok_or_else(|| {
117 InternalError::new(
118 ErrorClass::Unsupported,
119 ErrorOrigin::Store,
120 format!(
121 "no free memory ids available for commit marker tests in range {}-{}",
122 snapshot.range.start, snapshot.range.end
123 ),
124 )
125 })?;
126
127 MemoryRegistry::register(dummy_id, &snapshot.owner, "commit_test::DataStore").map_err(
128 |err| {
129 InternalError::new(
130 ErrorClass::Internal,
131 ErrorOrigin::Store,
132 format!("commit test memory registration failed: {err}"),
133 )
134 },
135 )?;
136 }
137
138 with_commit_store(|_| Ok(()))
141}
142
143const COMMIT_LABEL: &str = "CommitMarker";
150const COMMIT_ID_BYTES: usize = 16;
151
152pub const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
155
156#[derive(Clone, Debug, Deserialize, Serialize)]
164#[serde(deny_unknown_fields)]
165pub struct CommitRowOp {
166 pub entity_path: String,
167 pub key: Vec<u8>,
168 pub before: Option<Vec<u8>>,
169 pub after: Option<Vec<u8>>,
170}
171
172impl CommitRowOp {
173 #[must_use]
175 pub fn new(
176 entity_path: impl Into<String>,
177 key: Vec<u8>,
178 before: Option<Vec<u8>>,
179 after: Option<Vec<u8>>,
180 ) -> Self {
181 Self {
182 entity_path: entity_path.into(),
183 key,
184 before,
185 after,
186 }
187 }
188}
189
190#[derive(Clone, Debug, Deserialize, Serialize)]
197#[serde(deny_unknown_fields)]
198pub struct CommitIndexOp {
199 pub store: String,
200 pub key: Vec<u8>,
201 pub value: Option<Vec<u8>>,
202}
203
204#[derive(Clone, Debug, Deserialize, Serialize)]
213#[serde(deny_unknown_fields)]
214pub struct CommitMarker {
215 pub id: [u8; COMMIT_ID_BYTES],
216 pub row_ops: Vec<CommitRowOp>,
217}
218
219impl CommitMarker {
220 pub fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
222 let id = Ulid::try_generate()
223 .map_err(|err| {
224 InternalError::new(
225 ErrorClass::Internal,
226 ErrorOrigin::Store,
227 format!("commit id generation failed: {err}"),
228 )
229 })?
230 .to_bytes();
231
232 Ok(Self { id, row_ops })
233 }
234}
235
236pub fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
245 for row_op in &marker.row_ops {
247 if row_op.before.is_none() && row_op.after.is_none() {
248 return Err(InternalError::new(
249 ErrorClass::Corruption,
250 ErrorOrigin::Store,
251 "commit marker corrupted: row op has neither before nor after payload",
252 ));
253 }
254 }
255
256 Ok(())
257}
258
259#[derive(Clone)]
266pub struct PreparedIndexMutation {
267 pub store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
268 pub key: RawIndexKey,
269 pub value: Option<RawIndexEntry>,
270}
271
272#[derive(Clone)]
279pub struct PreparedRowCommitOp {
280 pub index_ops: Vec<PreparedIndexMutation>,
281 pub data_store: &'static LocalKey<RefCell<DataStore>>,
282 pub data_key: RawDataKey,
283 pub data_value: Option<RawRow>,
284 pub index_remove_count: usize,
285 pub index_insert_count: usize,
286 pub reverse_index_remove_count: usize,
287 pub reverse_index_insert_count: usize,
288}
289
290impl PreparedRowCommitOp {
291 pub fn apply(self) {
293 for index_op in self.index_ops {
294 index_op.store.with_borrow_mut(|store| {
295 if let Some(value) = index_op.value {
296 store.insert(index_op.key, value);
297 } else {
298 store.remove(&index_op.key);
299 }
300 });
301 }
302
303 self.data_store.with_borrow_mut(|store| {
304 if let Some(value) = self.data_value {
305 store.insert(self.data_key, value);
306 } else {
307 store.remove(&self.data_key);
308 }
309 });
310 }
311}
312
313#[must_use]
317pub fn snapshot_row_rollback(op: &PreparedRowCommitOp) -> PreparedRowCommitOp {
318 let mut index_ops = Vec::with_capacity(op.index_ops.len());
319 for index_op in &op.index_ops {
320 let existing = index_op.store.with_borrow(|store| store.get(&index_op.key));
321 index_ops.push(PreparedIndexMutation {
322 store: index_op.store,
323 key: index_op.key,
324 value: existing,
325 });
326 }
327
328 let data_value = op.data_store.with_borrow(|store| store.get(&op.data_key));
329
330 PreparedRowCommitOp {
331 index_ops,
332 data_store: op.data_store,
333 data_key: op.data_key,
334 data_value,
335 index_remove_count: 0,
336 index_insert_count: 0,
337 reverse_index_remove_count: 0,
338 reverse_index_insert_count: 0,
339 }
340}
341
342pub fn rollback_prepared_row_ops_reverse(ops: Vec<PreparedRowCommitOp>) {
347 for op in ops.into_iter().rev() {
348 op.apply();
349 }
350}
351
352#[expect(clippy::too_many_lines)]
357pub fn prepare_row_commit_for_entity<E: EntityKind + EntityValue>(
358 db: &Db<E::Canister>,
359 op: &CommitRowOp,
360) -> Result<PreparedRowCommitOp, InternalError> {
361 if op.entity_path != E::PATH {
362 return Err(InternalError::new(
363 ErrorClass::Corruption,
364 ErrorOrigin::Store,
365 format!(
366 "commit marker entity path mismatch: expected '{}', found '{}'",
367 E::PATH,
368 op.entity_path
369 ),
370 ));
371 }
372
373 let raw_key = decode_data_key(&op.key)?;
374 let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
375 InternalError::new(
376 ErrorClass::Corruption,
377 ErrorOrigin::Store,
378 format!("commit marker data key corrupted: {err}"),
379 )
380 })?;
381 let expected_key = data_key.try_key::<E>()?;
382
383 let decode_entity = |bytes: &[u8], label: &str| -> Result<(RawRow, E), InternalError> {
384 let row = RawRow::try_new(bytes.to_vec())?;
385 let entity = decode_entity_with_expected_key::<E, _, _, _, _>(
386 expected_key,
387 || row.try_decode::<E>(),
388 |err| {
389 InternalError::new(
390 ErrorClass::Corruption,
391 ErrorOrigin::Serialize,
392 format!("commit marker {label} row decode failed: {err}"),
393 )
394 },
395 |expected, actual| {
396 Ok(InternalError::new(
397 ErrorClass::Corruption,
398 ErrorOrigin::Store,
399 format!(
400 "commit marker row key mismatch: expected {expected:?}, found {actual:?}"
401 ),
402 ))
403 },
404 )?;
405
406 Ok((row, entity))
407 };
408
409 let old_pair = op
410 .before
411 .as_ref()
412 .map(|bytes| decode_entity(bytes, "before"))
413 .transpose()?;
414 let new_pair = op
415 .after
416 .as_ref()
417 .map(|bytes| decode_entity(bytes, "after"))
418 .transpose()?;
419
420 if old_pair.is_none() && new_pair.is_none() {
421 return Err(InternalError::new(
422 ErrorClass::Corruption,
423 ErrorOrigin::Store,
424 "commit marker row op is a no-op (before/after both missing)",
425 ));
426 }
427
428 let index_plan = plan_index_mutation_for_entity::<E>(
429 db,
430 old_pair.as_ref().map(|(_, entity)| entity),
431 new_pair.as_ref().map(|(_, entity)| entity),
432 )?;
433 let mut index_remove_count = 0usize;
434 let mut index_insert_count = 0usize;
435 for index in E::INDEXES {
436 let old_key = old_pair
437 .as_ref()
438 .map(|(_, old_entity)| IndexKey::new(old_entity, index))
439 .transpose()?
440 .flatten()
441 .map(|key| key.to_raw());
442 let new_key = new_pair
443 .as_ref()
444 .map(|(_, new_entity)| IndexKey::new(new_entity, index))
445 .transpose()?
446 .flatten()
447 .map(|key| key.to_raw());
448
449 if old_key != new_key {
450 if old_key.is_some() {
451 index_remove_count = index_remove_count.saturating_add(1);
452 }
453 if new_key.is_some() {
454 index_insert_count = index_insert_count.saturating_add(1);
455 }
456 }
457 }
458 let mut index_stores = BTreeMap::new();
459 for apply in &index_plan.apply {
460 index_stores.insert(apply.index.store, apply.store);
461 }
462
463 let mut index_ops = Vec::with_capacity(index_plan.commit_ops.len());
464 for index_op in index_plan.commit_ops {
465 let store = index_stores
466 .get(index_op.store.as_str())
467 .copied()
468 .ok_or_else(|| {
469 InternalError::new(
470 ErrorClass::Corruption,
471 ErrorOrigin::Index,
472 format!(
473 "missing index store '{}' for entity '{}'",
474 index_op.store,
475 E::PATH
476 ),
477 )
478 })?;
479 let key = decode_index_key(&index_op.key)?;
480 let value = index_op
481 .value
482 .as_ref()
483 .map(|bytes| decode_index_entry(bytes))
484 .transpose()?;
485 index_ops.push(PreparedIndexMutation { store, key, value });
486 }
487 let (reverse_index_ops, reverse_remove_count, reverse_insert_count) =
488 prepare_reverse_relation_index_mutations_for_source::<E>(
489 db,
490 old_pair.as_ref().map(|(_, entity)| entity),
491 new_pair.as_ref().map(|(_, entity)| entity),
492 )?;
493 index_ops.extend(reverse_index_ops);
494
495 let data_store = db.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))?;
496 let data_value = new_pair.map(|(row, _)| row);
497
498 Ok(PreparedRowCommitOp {
499 index_ops,
500 data_store: data_store.data_store(),
501 data_key: raw_key,
502 data_value,
503 index_remove_count,
504 index_insert_count,
505 reverse_index_remove_count: reverse_remove_count,
506 reverse_index_insert_count: reverse_insert_count,
507 })
508}
509
510#[derive(Clone, Debug)]
518pub struct CommitGuard {
519 pub marker: CommitMarker,
520}
521
522impl CommitGuard {
523 fn clear(self) {
525 let _ = self;
526 with_commit_store_infallible(CommitStore::clear_infallible);
527 }
528}
529
530pub fn begin_commit(marker: CommitMarker) -> Result<CommitGuard, InternalError> {
532 with_commit_store(|store| {
533 if store.load()?.is_some() {
534 return Err(InternalError::new(
535 ErrorClass::InvariantViolation,
536 ErrorOrigin::Store,
537 "commit marker already present before begin",
538 ));
539 }
540 store.set(&marker)?;
541
542 Ok(CommitGuard { marker })
543 })
544}
545
546pub fn finish_commit(
552 mut guard: CommitGuard,
553 apply: impl FnOnce(&mut CommitGuard) -> Result<(), InternalError>,
554) -> Result<(), InternalError> {
555 let result = apply(&mut guard);
561 let commit_id = guard.marker.id;
562 guard.clear();
563 assert!(
565 with_commit_store_infallible(|store| store.is_empty()),
566 "commit marker must be cleared after finish_commit (commit_id={commit_id:?})"
567 );
568 result
569}