1use crate::{
46 index::{Cursor, Unordered as Index},
47 journal::{
48 contiguous::{Mutable, Reader},
49 Error as JournalError,
50 },
51 merkle::{hasher::Standard as StandardHasher, Bagging, Family, Location},
52 qmdb::operation::Operation,
53};
54use commonware_cryptography::Hasher as CryptoHasher;
55use commonware_utils::NZUsize;
56use core::num::NonZeroUsize;
57use futures::{pin_mut, StreamExt as _};
58use thiserror::Error;
59
60pub mod any;
61pub mod batch_chain;
62pub(crate) mod bitmap;
63pub(crate) mod compact;
64#[cfg(test)]
65mod conformance;
66pub mod current;
67pub mod immutable;
68pub mod keyless;
69mod metrics;
70pub mod operation;
71pub mod store;
72pub mod sync;
73pub mod verify;
74
75pub use verify::{
76 create_multi_proof, create_proof_store, verify_multi_proof, verify_proof,
77 verify_proof_and_extract_digests, verify_proof_and_pinned_nodes,
78};
79
80pub(crate) const ROOT_BAGGING: Bagging = Bagging::BackwardFold;
82
83pub const fn hasher<H: CryptoHasher>() -> StandardHasher<H> {
85 StandardHasher::new(ROOT_BAGGING)
86}
87
88pub(crate) async fn find_inactivity_floor_at<F, R>(
101 reader: &R,
102 op_count: Location<F>,
103 floor_of: impl Fn(&R::Item) -> Option<Location<F>>,
104) -> Result<Location<F>, Error<F>>
105where
106 F: Family,
107 R: Reader,
108{
109 let Some(last_op) = op_count.checked_sub(1) else {
110 return Err(Error::HistoricalFloorPruned(op_count));
111 };
112 let last_op = *last_op;
113 let bounds = reader.bounds();
114 if last_op < bounds.start {
115 return Err(JournalError::ItemPruned(last_op).into());
116 }
117
118 let op = reader.read(last_op).await?;
119 let floor = floor_of(&op).ok_or(Error::HistoricalFloorPruned(op_count))?;
120 if floor > Location::new(last_op) {
121 return Err(Error::DataCorrupted(
122 "inactivity floor exceeds commit location",
123 ));
124 }
125 Ok(floor)
126}
127
128pub(crate) async fn inactive_peaks_at<F, R>(
130 reader: &R,
131 op_count: Location<F>,
132 floor_of: impl Fn(&R::Item) -> Option<Location<F>>,
133) -> Result<usize, Error<F>>
134where
135 F: Family,
136 R: Reader,
137{
138 if op_count == Location::new(0) {
139 return Ok(0);
140 }
141
142 let floor = find_inactivity_floor_at::<F, _>(reader, op_count, floor_of).await?;
143 Ok(F::inactive_peaks(F::location_to_position(op_count), floor))
144}
145
146#[derive(Error, Debug)]
148pub enum Error<F: Family> {
149 #[error("data corrupted: {0}")]
150 DataCorrupted(&'static str),
151
152 #[error("merkle error: {0}")]
153 Merkle(#[from] crate::merkle::Error<F>),
154
155 #[error("metadata error: {0}")]
156 Metadata(#[from] crate::metadata::Error),
157
158 #[error("journal error: {0}")]
159 Journal(#[from] crate::journal::Error),
160
161 #[error("runtime error: {0}")]
162 Runtime(#[from] commonware_runtime::Error),
163
164 #[error("operation pruned: {0}")]
165 OperationPruned(Location<F>),
166
167 #[error("key not found")]
169 KeyNotFound,
170
171 #[error("key exists")]
173 KeyExists,
174
175 #[error("unexpected data at location: {0}")]
176 UnexpectedData(Location<F>),
177
178 #[error("location out of bounds: {0} >= {1}")]
179 LocationOutOfBounds(Location<F>, Location<F>),
180
181 #[error("prune location {0} beyond minimum required location {1}")]
182 PruneBeyondMinRequired(Location<F>, Location<F>),
183
184 #[error(
186 "stale batch: db has {db_size} ops, batch requires {batch_db_size}, {batch_base_size}, or an ancestor boundary"
187 )]
188 StaleBatch {
189 db_size: u64,
190 batch_db_size: u64,
191 batch_base_size: u64,
192 },
193
194 #[error("floor regressed: batch floor {0} < current floor {1}")]
196 FloorRegressed(Location<F>, Location<F>),
197
198 #[error("floor beyond commit location: floor {0} > commit loc {1}")]
202 FloorBeyondSize(Location<F>, Location<F>),
203
204 #[error("historical floor pruned for size: {0}")]
213 HistoricalFloorPruned(Location<F>),
214}
215
216impl<F: Family> From<crate::journal::authenticated::Error<F>> for Error<F> {
217 fn from(e: crate::journal::authenticated::Error<F>) -> Self {
218 match e {
219 crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
220 crate::journal::authenticated::Error::Merkle(m) => Self::Merkle(m),
221 }
222 }
223}
224
225const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
228
229pub(super) async fn build_snapshot_from_log<F, C, I, Fn>(
235 inactivity_floor_loc: crate::merkle::Location<F>,
236 reader: &C,
237 snapshot: &mut I,
238 mut callback: Fn,
239) -> Result<usize, Error<F>>
240where
241 F: crate::merkle::Family,
242 C: Reader<Item: Operation<F>>,
243 I: Index<Value = crate::merkle::Location<F>>,
244 Fn: FnMut(bool, Option<crate::merkle::Location<F>>),
245{
246 let bounds = reader.bounds();
247 let stream = reader
248 .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
249 .await?;
250 pin_mut!(stream);
251 let last_commit_loc = bounds.end.saturating_sub(1);
252 let mut active_keys: usize = 0;
253 while let Some(result) = stream.next().await {
254 let (loc, op) = result?;
255 if let Some(key) = op.key() {
256 if op.is_delete() {
257 let old_loc = delete_key(snapshot, reader, key).await?;
258 callback(false, old_loc);
259 if old_loc.is_some() {
260 active_keys -= 1;
261 }
262 } else if op.is_update() {
263 let new_loc = crate::merkle::Location::new(loc);
264 let old_loc = update_key(snapshot, reader, key, new_loc).await?;
265 callback(true, old_loc);
266 if old_loc.is_none() {
267 active_keys += 1;
268 }
269 }
270 } else if op.has_floor().is_some() {
271 callback(loc == last_commit_loc, None);
272 }
273 }
274
275 Ok(active_keys)
276}
277
278async fn delete_key<F, I, R>(
281 snapshot: &mut I,
282 reader: &R,
283 key: &<R::Item as Operation<F>>::Key,
284) -> Result<Option<Location<F>>, Error<F>>
285where
286 F: Family,
287 I: Index<Value = Location<F>>,
288 R: Reader,
289 R::Item: Operation<F>,
290{
291 let Some(mut cursor) = snapshot.get_mut(key) else {
293 return Ok(None);
294 };
295
296 let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? else {
298 return Ok(None);
299 };
300 cursor.delete();
301
302 Ok(Some(loc))
303}
304
305async fn update_key<F, I, R>(
307 snapshot: &mut I,
308 reader: &R,
309 key: &<R::Item as Operation<F>>::Key,
310 new_loc: Location<F>,
311) -> Result<Option<Location<F>>, Error<F>>
312where
313 F: Family,
314 I: Index<Value = Location<F>>,
315 R: Reader,
316 R::Item: Operation<F>,
317{
318 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
321 return Ok(None);
322 };
323
324 if let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? {
326 assert!(new_loc > loc);
327 cursor.update(new_loc);
328 return Ok(Some(loc));
329 }
330
331 cursor.insert(new_loc);
333
334 Ok(None)
335}
336
337async fn find_update_op<F, R>(
344 reader: &R,
345 cursor: &mut impl Cursor<Value = Location<F>>,
346 key: &<R::Item as Operation<F>>::Key,
347) -> Result<Option<Location<F>>, Error<F>>
348where
349 F: Family,
350 R: Reader,
351 R::Item: Operation<F>,
352{
353 while let Some(&loc) = cursor.next() {
354 let op = reader.read(*loc).await?;
355 let k = op.key().expect("operation without key");
356 if *k == *key {
357 return Ok(Some(loc));
358 }
359 }
360
361 Ok(None)
362}
363
364fn update_known_loc<F: Family, I: Index<Value = Location<F>>>(
371 snapshot: &mut I,
372 key: &[u8],
373 old_loc: Location<F>,
374 new_loc: Location<F>,
375) {
376 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
377 assert!(
378 cursor.find(|&loc| *loc == old_loc),
379 "known key with given old_loc should have been found"
380 );
381 cursor.update(new_loc);
382}
383
384fn delete_known_loc<F: Family, I: Index<Value = Location<F>>>(
391 snapshot: &mut I,
392 key: &[u8],
393 old_loc: Location<F>,
394) {
395 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
396 assert!(
397 cursor.find(|&loc| *loc == old_loc),
398 "known key with given old_loc should have been found"
399 );
400 cursor.delete();
401}
402
403pub(crate) struct FloorHelper<
405 'a,
406 F: Family,
407 I: Index<Value = Location<F>>,
408 C: Mutable<Item: Operation<F>>,
409> {
410 pub snapshot: &'a mut I,
411 pub log: &'a mut C,
412}
413
414impl<F, I, C> FloorHelper<'_, F, I, C>
415where
416 F: Family,
417 I: Index<Value = Location<F>>,
418 C: Mutable<Item: Operation<F>>,
419{
420 async fn move_op_if_active(
424 &mut self,
425 op: C::Item,
426 old_loc: Location<F>,
427 ) -> Result<bool, Error<F>> {
428 let Some(key) = op.key() else {
429 return Ok(false); };
431
432 {
434 let Some(mut cursor) = self.snapshot.get_mut(key) else {
435 return Ok(false);
436 };
437 if !cursor.find(|&loc| loc == old_loc) {
438 return Ok(false);
439 }
440
441 cursor.update(Location::<F>::new(self.log.size().await));
443 }
444
445 self.log.append(&op).await?;
447
448 Ok(true)
449 }
450
451 async fn raise_floor(
461 &mut self,
462 mut inactivity_floor_loc: Location<F>,
463 ) -> Result<Location<F>, Error<F>> {
464 let tip_loc: Location<F> = Location::new(self.log.size().await);
465 loop {
466 assert!(
467 *inactivity_floor_loc < tip_loc,
468 "no active operations above the inactivity floor"
469 );
470 let old_loc = inactivity_floor_loc;
471 inactivity_floor_loc += 1;
472 let op = {
473 let reader = self.log.reader().await;
474 reader.read(*old_loc).await?
475 };
476 if self.move_op_if_active(op, old_loc).await? {
477 return Ok(inactivity_floor_loc);
478 }
479 }
480 }
481}