1mod decode;
16mod guard;
17mod memory;
18mod recovery;
19mod store;
20#[cfg(test)]
21mod tests;
22
23use crate::{
24 db::{
25 Db,
26 commit::{
27 decode::{decode_data_key, decode_index_entry, decode_index_key},
28 store::{CommitStore, with_commit_store, with_commit_store_infallible},
29 },
30 index::{IndexKey, RawIndexEntry, RawIndexKey, plan::plan_index_mutation_for_entity},
31 store::{DataKey, DataStore, RawDataKey, RawRow},
32 },
33 error::{ErrorClass, ErrorOrigin, InternalError},
34 traits::{EntityKind, EntityValue, Path},
35 types::Ulid,
36};
37#[cfg(test)]
38use canic_memory::{
39 registry::{MemoryRegistry, MemoryRegistryError},
40 runtime::registry::MemoryRegistryRuntime,
41};
42use serde::{Deserialize, Serialize};
43#[cfg(test)]
44use std::collections::BTreeSet;
45use std::{cell::RefCell, collections::BTreeMap, thread::LocalKey};
46
47pub use guard::CommitApplyGuard;
48pub use recovery::{ensure_recovered, ensure_recovered_for_write};
49
50#[cfg(test)]
51pub fn commit_marker_present() -> Result<bool, InternalError> {
53 store::commit_marker_present()
54}
55
56#[cfg(test)]
57pub fn init_commit_store_for_tests() -> Result<(), InternalError> {
62 let init_result = MemoryRegistryRuntime::init(Some(("icydb_test", 1, 200)));
64 match init_result {
65 Ok(_) => {}
66 Err(MemoryRegistryError::Overlap { .. }) => {
67 MemoryRegistryRuntime::init(None).map_err(|err| {
68 InternalError::new(
69 ErrorClass::Internal,
70 ErrorOrigin::Store,
71 format!("memory registry init failed: {err}"),
72 )
73 })?;
74 }
75 Err(err) => {
76 return Err(InternalError::new(
77 ErrorClass::Internal,
78 ErrorOrigin::Store,
79 format!("memory registry init failed: {err}"),
80 ));
81 }
82 }
83
84 let snapshots = MemoryRegistryRuntime::snapshot_ids_by_range();
86 if snapshots.is_empty() {
87 return Err(InternalError::new(
88 ErrorClass::Internal,
89 ErrorOrigin::Store,
90 "no memory ranges available for commit marker tests",
91 ));
92 }
93 let has_store_entry = snapshots.iter().any(|snapshot| {
94 snapshot.entries.iter().any(|(_, entry)| {
95 entry.label.ends_with("DataStore") || entry.label.ends_with("IndexStore")
96 })
97 });
98
99 if !has_store_entry {
100 let snapshot = snapshots.first().ok_or_else(|| {
101 InternalError::new(
102 ErrorClass::Internal,
103 ErrorOrigin::Store,
104 "no memory ranges available for commit marker tests",
105 )
106 })?;
107 let used_ids = snapshot
108 .entries
109 .iter()
110 .map(|(id, _)| *id)
111 .collect::<BTreeSet<_>>();
112 let dummy_id = (snapshot.range.start..=snapshot.range.end)
113 .find(|id| !used_ids.contains(id))
114 .ok_or_else(|| {
115 InternalError::new(
116 ErrorClass::Unsupported,
117 ErrorOrigin::Store,
118 format!(
119 "no free memory ids available for commit marker tests in range {}-{}",
120 snapshot.range.start, snapshot.range.end
121 ),
122 )
123 })?;
124
125 MemoryRegistry::register(dummy_id, &snapshot.owner, "commit_test::DataStore").map_err(
126 |err| {
127 InternalError::new(
128 ErrorClass::Internal,
129 ErrorOrigin::Store,
130 format!("commit test memory registration failed: {err}"),
131 )
132 },
133 )?;
134 }
135
136 with_commit_store(|_| Ok(()))
139}
140
141const COMMIT_LABEL: &str = "CommitMarker";
148const COMMIT_ID_BYTES: usize = 16;
149
150pub const MAX_COMMIT_BYTES: u32 = 16 * 1024 * 1024;
153
154#[derive(Clone, Debug, Deserialize, Serialize)]
162#[serde(deny_unknown_fields)]
163pub struct CommitRowOp {
164 pub entity_path: String,
165 pub key: Vec<u8>,
166 pub before: Option<Vec<u8>>,
167 pub after: Option<Vec<u8>>,
168}
169
170impl CommitRowOp {
171 #[must_use]
173 pub fn new(
174 entity_path: impl Into<String>,
175 key: Vec<u8>,
176 before: Option<Vec<u8>>,
177 after: Option<Vec<u8>>,
178 ) -> Self {
179 Self {
180 entity_path: entity_path.into(),
181 key,
182 before,
183 after,
184 }
185 }
186}
187
188#[derive(Clone, Debug, Deserialize, Serialize)]
195#[serde(deny_unknown_fields)]
196pub struct CommitIndexOp {
197 pub store: String,
198 pub key: Vec<u8>,
199 pub value: Option<Vec<u8>>,
200}
201
202#[derive(Clone, Debug, Deserialize, Serialize)]
211#[serde(deny_unknown_fields)]
212pub struct CommitMarker {
213 pub id: [u8; COMMIT_ID_BYTES],
214 pub row_ops: Vec<CommitRowOp>,
215}
216
217impl CommitMarker {
218 pub fn new(row_ops: Vec<CommitRowOp>) -> Result<Self, InternalError> {
220 let id = Ulid::try_generate()
221 .map_err(|err| {
222 InternalError::new(
223 ErrorClass::Internal,
224 ErrorOrigin::Store,
225 format!("commit id generation failed: {err}"),
226 )
227 })?
228 .to_bytes();
229
230 Ok(Self { id, row_ops })
231 }
232}
233
234pub fn validate_commit_marker_shape(marker: &CommitMarker) -> Result<(), InternalError> {
243 for row_op in &marker.row_ops {
245 if row_op.before.is_none() && row_op.after.is_none() {
246 return Err(InternalError::new(
247 ErrorClass::Corruption,
248 ErrorOrigin::Store,
249 "commit marker corrupted: row op has neither before nor after payload",
250 ));
251 }
252 }
253
254 Ok(())
255}
256
257#[derive(Clone)]
264pub struct PreparedIndexMutation {
265 pub store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
266 pub key: RawIndexKey,
267 pub value: Option<RawIndexEntry>,
268}
269
270#[derive(Clone)]
277pub struct PreparedRowCommitOp {
278 pub index_ops: Vec<PreparedIndexMutation>,
279 pub data_store: &'static LocalKey<RefCell<DataStore>>,
280 pub data_key: RawDataKey,
281 pub data_value: Option<RawRow>,
282 pub index_remove_count: usize,
283 pub index_insert_count: usize,
284}
285
286impl PreparedRowCommitOp {
287 pub fn apply(self) {
289 for index_op in self.index_ops {
290 index_op.store.with_borrow_mut(|store| {
291 if let Some(value) = index_op.value {
292 store.insert(index_op.key, value);
293 } else {
294 store.remove(&index_op.key);
295 }
296 });
297 }
298
299 self.data_store.with_borrow_mut(|store| {
300 if let Some(value) = self.data_value {
301 store.insert(self.data_key, value);
302 } else {
303 store.remove(&self.data_key);
304 }
305 });
306 }
307}
308
309#[must_use]
313pub fn snapshot_row_rollback(op: &PreparedRowCommitOp) -> PreparedRowCommitOp {
314 let mut index_ops = Vec::with_capacity(op.index_ops.len());
315 for index_op in &op.index_ops {
316 let existing = index_op.store.with_borrow(|store| store.get(&index_op.key));
317 index_ops.push(PreparedIndexMutation {
318 store: index_op.store,
319 key: index_op.key,
320 value: existing,
321 });
322 }
323
324 let data_value = op.data_store.with_borrow(|store| store.get(&op.data_key));
325
326 PreparedRowCommitOp {
327 index_ops,
328 data_store: op.data_store,
329 data_key: op.data_key,
330 data_value,
331 index_remove_count: 0,
332 index_insert_count: 0,
333 }
334}
335
336#[expect(clippy::too_many_lines)]
341pub fn prepare_row_commit_for_entity<E: EntityKind + EntityValue>(
342 db: &Db<E::Canister>,
343 op: &CommitRowOp,
344) -> Result<PreparedRowCommitOp, InternalError> {
345 if op.entity_path != E::PATH {
346 return Err(InternalError::new(
347 ErrorClass::Corruption,
348 ErrorOrigin::Store,
349 format!(
350 "commit marker entity path mismatch: expected '{}', found '{}'",
351 E::PATH,
352 op.entity_path
353 ),
354 ));
355 }
356
357 let raw_key = decode_data_key(&op.key)?;
358 let data_key = DataKey::try_from_raw(&raw_key).map_err(|err| {
359 InternalError::new(
360 ErrorClass::Corruption,
361 ErrorOrigin::Store,
362 format!("commit marker data key corrupted: {err}"),
363 )
364 })?;
365 data_key.try_key::<E>()?;
366
367 let decode_entity = |bytes: &[u8], label: &str| -> Result<(RawRow, E), InternalError> {
368 let row = RawRow::try_new(bytes.to_vec())?;
369 let entity = row.try_decode::<E>().map_err(|err| {
370 InternalError::new(
371 ErrorClass::Corruption,
372 ErrorOrigin::Serialize,
373 format!("commit marker {label} row decode failed: {err}"),
374 )
375 })?;
376 let expected = data_key.try_key::<E>()?;
377 let actual = entity.id().key();
378 if expected != actual {
379 return Err(InternalError::new(
380 ErrorClass::Corruption,
381 ErrorOrigin::Store,
382 format!("commit marker row key mismatch: expected {expected:?}, found {actual:?}"),
383 ));
384 }
385
386 Ok((row, entity))
387 };
388
389 let old_pair = op
390 .before
391 .as_ref()
392 .map(|bytes| decode_entity(bytes, "before"))
393 .transpose()?;
394 let new_pair = op
395 .after
396 .as_ref()
397 .map(|bytes| decode_entity(bytes, "after"))
398 .transpose()?;
399
400 if old_pair.is_none() && new_pair.is_none() {
401 return Err(InternalError::new(
402 ErrorClass::Corruption,
403 ErrorOrigin::Store,
404 "commit marker row op is a no-op (before/after both missing)",
405 ));
406 }
407
408 let index_plan = plan_index_mutation_for_entity::<E>(
409 db,
410 old_pair.as_ref().map(|(_, entity)| entity),
411 new_pair.as_ref().map(|(_, entity)| entity),
412 )?;
413 let mut index_remove_count = 0usize;
414 let mut index_insert_count = 0usize;
415 for index in E::INDEXES {
416 let old_key = old_pair
417 .as_ref()
418 .map(|(_, old_entity)| IndexKey::new(old_entity, index))
419 .transpose()?
420 .flatten()
421 .map(|key| key.to_raw());
422 let new_key = new_pair
423 .as_ref()
424 .map(|(_, new_entity)| IndexKey::new(new_entity, index))
425 .transpose()?
426 .flatten()
427 .map(|key| key.to_raw());
428
429 if old_key != new_key {
430 if old_key.is_some() {
431 index_remove_count = index_remove_count.saturating_add(1);
432 }
433 if new_key.is_some() {
434 index_insert_count = index_insert_count.saturating_add(1);
435 }
436 }
437 }
438 let mut index_stores = BTreeMap::new();
439 for apply in &index_plan.apply {
440 index_stores.insert(apply.index.store, apply.store);
441 }
442
443 let mut index_ops = Vec::with_capacity(index_plan.commit_ops.len());
444 for index_op in index_plan.commit_ops {
445 let store = index_stores
446 .get(index_op.store.as_str())
447 .copied()
448 .ok_or_else(|| {
449 InternalError::new(
450 ErrorClass::Corruption,
451 ErrorOrigin::Index,
452 format!(
453 "missing index store '{}' for entity '{}'",
454 index_op.store,
455 E::PATH
456 ),
457 )
458 })?;
459 let key = decode_index_key(&index_op.key)?;
460 let value = index_op
461 .value
462 .as_ref()
463 .map(|bytes| decode_index_entry(bytes))
464 .transpose()?;
465 index_ops.push(PreparedIndexMutation { store, key, value });
466 }
467
468 let data_store = db.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))?;
469 let data_value = new_pair.map(|(row, _)| row);
470
471 Ok(PreparedRowCommitOp {
472 index_ops,
473 data_store: data_store.data_store(),
474 data_key: raw_key,
475 data_value,
476 index_remove_count,
477 index_insert_count,
478 })
479}
480
481#[derive(Clone, Debug)]
489pub struct CommitGuard {
490 pub marker: CommitMarker,
491}
492
493impl CommitGuard {
494 fn clear(self) {
496 let _ = self;
497 with_commit_store_infallible(CommitStore::clear_infallible);
498 }
499}
500
501pub fn begin_commit(marker: CommitMarker) -> Result<CommitGuard, InternalError> {
503 with_commit_store(|store| {
504 if store.load()?.is_some() {
505 return Err(InternalError::new(
506 ErrorClass::InvariantViolation,
507 ErrorOrigin::Store,
508 "commit marker already present before begin",
509 ));
510 }
511 store.set(&marker)?;
512
513 Ok(CommitGuard { marker })
514 })
515}
516
517pub fn finish_commit(
523 mut guard: CommitGuard,
524 apply: impl FnOnce(&mut CommitGuard) -> Result<(), InternalError>,
525) -> Result<(), InternalError> {
526 let result = apply(&mut guard);
532 let commit_id = guard.marker.id;
533 guard.clear();
534 assert!(
536 with_commit_store_infallible(|store| store.is_empty()),
537 "commit marker must be cleared after finish_commit (commit_id={commit_id:?})"
538 );
539 result
540}