1use super::operation::{update::Update, Operation};
6use crate::{
7 index::Unordered as UnorderedIndex,
8 journal::{
9 authenticated,
10 contiguous::{Contiguous, Mutable, Reader},
11 Error as JournalError,
12 },
13 merkle::{Family, Location, Proof},
14 qmdb::{
15 bitmap::Shared,
16 build_snapshot_from_log, delete_known_loc,
17 metrics::{KeyReadMetrics, OperationMetrics, StateMetrics},
18 operation::Operation as OperationTrait,
19 update_known_loc, Error,
20 },
21 Context, Persistable,
22};
23use commonware_codec::{Codec, CodecShared};
24use commonware_cryptography::Hasher;
25use commonware_parallel::Strategy;
26use commonware_utils::bitmap;
27use core::num::NonZeroU64;
28use std::{collections::HashMap, sync::Arc};
29
30pub(crate) struct Metrics<E: Context> {
32 pub state: StateMetrics,
34 pub operations: OperationMetrics<E>,
36 pub reads: KeyReadMetrics<E>,
38}
39
40impl<E: Context> Metrics<E> {
41 pub fn new(context: E) -> Self {
43 let context = Arc::new(context);
44 Self {
45 state: StateMetrics::new(context.as_ref()),
46 operations: OperationMetrics::new(context.clone()),
47 reads: KeyReadMetrics::new(context),
48 }
49 }
50}
51
52pub(crate) type AuthenticatedLog<F, E, C, H, S> = authenticated::Journal<F, E, C, H, S>;
54
55enum SnapshotUndo<F: Family, K> {
57 Replace {
58 key: K,
59 old_loc: Location<F>,
60 new_loc: Location<F>,
61 },
62 Remove {
63 key: K,
64 old_loc: Location<F>,
65 },
66 Insert {
67 key: K,
68 new_loc: Location<F>,
69 },
70}
71
72pub struct Db<
82 F: Family,
83 E: Context,
84 C: Contiguous<Item: CodecShared>,
85 I: UnorderedIndex<Value = Location<F>>,
86 H: Hasher,
87 U: Send + Sync,
88 const N: usize,
89 S: Strategy,
90> {
91 pub(crate) log: AuthenticatedLog<F, E, C, H, S>,
99
100 pub(crate) root: H::Digest,
102
103 pub(crate) inactivity_floor_loc: Location<F>,
106
107 pub(crate) last_commit_loc: Location<F>,
109
110 pub(crate) snapshot: I,
117
118 pub(crate) active_keys: usize,
120
121 pub(crate) bitmap: Arc<Shared<N>>,
133
134 pub(crate) metrics: Metrics<E>,
136
137 pub(crate) _update: core::marker::PhantomData<U>,
139}
140
141impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
143where
144 F: Family,
145 E: Context,
146 U: Update,
147 C: Contiguous<Item = Operation<F, U>>,
148 I: UnorderedIndex<Value = Location<F>>,
149 H: Hasher,
150 S: Strategy,
151 Operation<F, U>: Codec,
152{
153 #[cfg(any(test, feature = "test-traits"))]
156 pub(crate) const fn inactivity_floor_loc(&self) -> Location<F> {
157 self.inactivity_floor_loc
158 }
159
160 pub const fn sync_boundary(&self) -> Location<F> {
163 self.inactivity_floor_loc
164 }
165
166 pub const fn is_empty(&self) -> bool {
168 self.active_keys == 0
169 }
170
171 pub async fn get_metadata(&self) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
173 match self.log.reader().await.read(*self.last_commit_loc).await? {
174 Operation::CommitFloor(metadata, _) => Ok(metadata),
175 _ => unreachable!("last commit is not a CommitFloor operation"),
176 }
177 }
178
179 pub const fn root(&self) -> H::Digest {
181 self.root
182 }
183
184 pub(crate) fn inactive_peaks(
186 &self,
187 leaves: Location<F>,
188 inactivity_floor: Location<F>,
189 ) -> usize {
190 F::inactive_peaks(F::location_to_position(leaves), inactivity_floor)
191 }
192
193 pub const fn strategy(&self) -> &S {
195 self.log.strategy()
196 }
197
198 pub async fn get(&self, key: &U::Key) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
200 let _timer = self.metrics.reads.get_timer();
201 self.metrics.reads.get_calls.inc();
202 self.metrics.reads.keys_requested.inc();
203 let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
205 let reader = self.log.reader().await;
206 let mut result = None;
207 for loc in locs {
208 let op = reader.read(*loc).await?;
209 let Operation::Update(data) = op else {
210 panic!("location does not reference update operation. loc={loc}");
211 };
212 if data.key() == key {
213 result = Some(data.value().clone());
214 break;
215 }
216 }
217
218 Ok(result)
219 }
220
221 pub async fn get_many(
225 &self,
226 keys: &[&U::Key],
227 ) -> Result<Vec<Option<U::Value>>, crate::qmdb::Error<F>> {
228 if keys.is_empty() {
229 return Ok(Vec::new());
230 }
231
232 let _timer = self.metrics.reads.get_many_timer();
233 self.metrics.reads.get_many_calls.inc();
234 self.metrics.reads.keys_requested.inc_by(keys.len() as u64);
235
236 let mut candidates: Vec<(usize, u64)> = Vec::with_capacity(keys.len());
239 let mut results: Vec<Option<U::Value>> = vec![None; keys.len()];
240
241 for (key_idx, key) in keys.iter().enumerate() {
242 for &loc in self.snapshot.get(key) {
243 candidates.push((key_idx, *loc));
244 }
245 }
246
247 if candidates.is_empty() {
248 return Ok(results);
249 }
250
251 candidates.sort_unstable_by_key(|&(_, pos)| pos);
253
254 let mut positions: Vec<u64> = Vec::with_capacity(candidates.len());
255 for &(_, pos) in &candidates {
256 if positions.last() != Some(&pos) {
257 positions.push(pos);
258 }
259 }
260
261 let reader = self.log.reader().await;
263 let ops = reader.read_many(&positions).await?;
264
265 for &(key_idx, pos) in &candidates {
267 if results[key_idx].is_some() {
268 continue;
269 }
270 let op_idx = positions
271 .binary_search(&pos)
272 .expect("position was deduped from candidates");
273 let Operation::Update(data) = &ops[op_idx] else {
274 panic!("location does not reference update operation. loc={pos}");
275 };
276 if data.key() == keys[key_idx] {
277 results[key_idx] = Some(data.value().clone());
278 }
279 }
280
281 Ok(results)
282 }
283
284 pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
287 let bounds = self.log.reader().await.bounds();
288 Location::new(bounds.start)..Location::new(bounds.end)
289 }
290
291 pub(crate) async fn update_metrics(&self) {
293 let bounds = self.log.reader().await.bounds();
294 self.metrics.state.set(
295 bounds.end,
296 bounds.start,
297 *self.inactivity_floor_loc,
298 *self.last_commit_loc,
299 );
300 }
301
302 pub async fn pinned_nodes_at(
304 &self,
305 loc: Location<F>,
306 ) -> Result<Vec<H::Digest>, crate::qmdb::Error<F>> {
307 self.log
308 .merkle
309 .pinned_nodes_at(loc)
310 .await
311 .map_err(Into::into)
312 }
313}
314
315impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
317where
318 F: Family,
319 E: Context,
320 U: Update,
321 C: Mutable<Item = Operation<F, U>>,
322 I: UnorderedIndex<Value = Location<F>>,
323 H: Hasher,
324 S: Strategy,
325 Operation<F, U>: Codec,
326{
327 pub(crate) fn prune_bitmap(&mut self, prune_loc: Location<F>) {
330 self.bitmap.write().prune_to_bit(*prune_loc);
331 }
332
333 pub(crate) async fn prune_log(
344 &mut self,
345 prune_loc: Location<F>,
346 ) -> Result<Location<F>, crate::qmdb::Error<F>> {
347 if prune_loc > self.inactivity_floor_loc {
348 return Err(crate::qmdb::Error::PruneBeyondMinRequired(
349 prune_loc,
350 self.inactivity_floor_loc,
351 ));
352 }
353
354 Ok(self.log.prune(prune_loc).await?)
355 }
356
357 #[tracing::instrument(
360 name = "qmdb::any::Db::prune",
361 level = "info",
362 skip_all,
363 fields(
364 requested_loc = *prune_loc,
365 inactivity_floor = *self.inactivity_floor_loc,
366 ),
367 )]
368 pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), crate::qmdb::Error<F>> {
369 let _timer = self.metrics.operations.prune_timer();
370 self.metrics.operations.prune_calls.inc();
371 let actual_pruned = self.prune_log(prune_loc).await?;
372 self.prune_bitmap(actual_pruned);
373 self.update_metrics().await;
374 Ok(())
375 }
376
377 #[allow(clippy::type_complexity)]
391 #[tracing::instrument(
392 name = "qmdb::any::Db::historical_proof",
393 level = "info",
394 skip_all,
395 fields(
396 historical_size = *historical_size,
397 start_loc = *start_loc,
398 max_ops = max_ops.get(),
399 ),
400 )]
401 pub async fn historical_proof(
402 &self,
403 historical_size: Location<F>,
404 start_loc: Location<F>,
405 max_ops: NonZeroU64,
406 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
407 if historical_size > self.log.size().await {
408 return Err(crate::qmdb::Error::Merkle(
409 crate::merkle::Error::RangeOutOfBounds(historical_size),
410 ));
411 }
412
413 let inactivity_floor = {
414 let reader = self.log.reader().await;
415 crate::qmdb::find_inactivity_floor_at::<F, _>(&reader, historical_size, |op| {
416 op.has_floor()
417 })
418 .await?
419 };
420 let inactive_peaks = self.inactive_peaks(historical_size, inactivity_floor);
421 self.log
422 .historical_proof(historical_size, start_loc, max_ops, inactive_peaks)
423 .await
424 .map_err(Into::into)
425 }
426
427 pub async fn proof(
428 &self,
429 loc: Location<F>,
430 max_ops: NonZeroU64,
431 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
432 self.historical_proof(self.log.size().await, loc, max_ops)
433 .await
434 }
435
436 #[tracing::instrument(
456 name = "qmdb::any::Db::rewind",
457 level = "info",
458 skip_all,
459 fields(
460 target_size = *size,
461 prev_size = *self.last_commit_loc + 1,
462 ),
463 )]
464 pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
465 let rewind_size = *size;
466 let current_size = *self.last_commit_loc + 1;
467
468 if rewind_size == current_size {
469 return Ok(());
470 }
471 if rewind_size == 0 || rewind_size > current_size {
472 return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
473 }
474
475 let (rewind_floor, undos, active_keys_delta) = {
477 let reader = self.log.reader().await;
478 let bounds = reader.bounds();
479 let rewind_last_loc = Location::new(rewind_size - 1);
480 if rewind_size <= bounds.start {
481 return Err(Error::<F>::Journal(JournalError::ItemPruned(
482 *rewind_last_loc,
483 )));
484 }
485 let rewind_last_op = reader.read(*rewind_last_loc).await?;
486 let Some(rewind_floor) = rewind_last_op.has_floor() else {
487 return Err(Error::UnexpectedData(rewind_last_loc));
488 };
489 if *rewind_floor < bounds.start {
490 return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
491 }
492
493 let mut undos = Vec::with_capacity((current_size - rewind_size) as usize);
494 let mut active_keys_delta = 0isize;
495 let mut prior_state_by_key: HashMap<U::Key, Option<Location<F>>> = HashMap::new();
496
497 for loc in *rewind_floor..current_size {
499 let op = reader.read(loc).await?;
500 let op_loc = Location::new(loc);
501 match op {
502 Operation::CommitFloor(_, _) => {}
503 Operation::Update(update) => {
504 let key = update.key().clone();
505 let previous_loc = prior_state_by_key.get(&key).copied().flatten();
506
507 if loc >= rewind_size {
508 if let Some(previous_loc) = previous_loc {
509 undos.push(SnapshotUndo::Replace {
510 key: key.clone(),
511 old_loc: op_loc,
512 new_loc: previous_loc,
513 });
514 } else {
515 active_keys_delta -= 1;
516 undos.push(SnapshotUndo::Remove {
517 key: key.clone(),
518 old_loc: op_loc,
519 });
520 }
521 }
522
523 prior_state_by_key.insert(key, Some(op_loc));
524 }
525 Operation::Delete(key) => {
526 let previous_loc = prior_state_by_key.get(&key).copied().flatten();
527
528 if loc >= rewind_size {
529 if let Some(previous_loc) = previous_loc {
530 active_keys_delta += 1;
531 undos.push(SnapshotUndo::Insert {
532 key: key.clone(),
533 new_loc: previous_loc,
534 });
535 }
536 }
537
538 prior_state_by_key.insert(key, None);
539 }
540 }
541 }
542
543 undos.reverse();
545
546 (rewind_floor, undos, active_keys_delta)
547 };
548
549 self.log.rewind(rewind_size).await?;
553
554 {
560 let mut bitmap = self.bitmap.write();
561 assert!(
562 bitmap.pruned_bits() <= rewind_size,
563 "bitmap pruned boundary exceeded journal retained start",
564 );
565 bitmap.truncate(rewind_size);
566
567 for undo in undos {
568 match undo {
569 SnapshotUndo::Replace {
570 key,
571 old_loc,
572 new_loc,
573 } => {
574 if new_loc < rewind_size {
575 bitmap.set_bit(*new_loc, true);
576 }
577 update_known_loc(&mut self.snapshot, &key, old_loc, new_loc);
578 }
579 SnapshotUndo::Remove { key, old_loc } => {
580 delete_known_loc(&mut self.snapshot, &key, old_loc)
581 }
582 SnapshotUndo::Insert { key, new_loc } => {
583 if new_loc < rewind_size {
584 bitmap.set_bit(*new_loc, true);
585 }
586 self.snapshot.insert(&key, new_loc);
587 }
588 }
589 }
590
591 bitmap.set_bit(rewind_size - 1, true);
596 }
597
598 self.active_keys = self
599 .active_keys
600 .checked_add_signed(active_keys_delta)
601 .ok_or(Error::DataCorrupted(
602 "active_keys underflow while rewinding",
603 ))?;
604 self.last_commit_loc = Location::new(rewind_size - 1);
605 self.inactivity_floor_loc = rewind_floor;
606 self.root = self
607 .log
608 .root(self.inactive_peaks(Location::new(rewind_size), rewind_floor))?;
609 self.update_metrics().await;
610
611 Ok(())
612 }
613}
614
615impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
617where
618 F: Family,
619 E: Context,
620 U: Update,
621 C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
622 I: UnorderedIndex<Value = Location<F>>,
623 H: Hasher,
624 S: Strategy,
625 Operation<F, U>: Codec,
626{
627 pub(crate) async fn init_from_log(
636 mut index: I,
637 log: AuthenticatedLog<F, E, C, H, S>,
638 shared_bitmap: Option<Arc<Shared<N>>>,
639 metrics: Metrics<E>,
640 ) -> Result<Self, crate::qmdb::Error<F>> {
641 let (last_commit_loc, inactivity_floor_loc, active_keys, bitmap) = {
642 let reader = log.reader().await;
643 let bounds = reader.bounds();
644 let last_commit_loc = Location::new(
645 bounds
646 .end
647 .checked_sub(1)
648 .ok_or(Error::HistoricalFloorPruned(Location::new(bounds.end)))?,
649 );
650 let inactivity_floor_loc = crate::qmdb::find_inactivity_floor_at::<F, _>(
651 &reader,
652 Location::new(bounds.end),
653 |op| op.has_floor(),
654 )
655 .await?;
656
657 let bitmap = shared_bitmap.unwrap_or_else(|| {
661 let pruned_chunks =
662 (bounds.start / bitmap::Prunable::<N>::CHUNK_SIZE_BITS) as usize;
663 let bm = bitmap::Prunable::<N>::new_with_pruned_chunks(pruned_chunks)
664 .expect("pruned chunk count fits in u64 bits");
665 Arc::new(Shared::new(bm))
666 });
667
668 {
670 let mut guard = bitmap.write();
671 assert!(
674 guard.pruned_bits() <= *inactivity_floor_loc,
675 "shared_bitmap pruned_bits {} exceeds inactivity_floor_loc {}",
676 guard.pruned_bits(),
677 *inactivity_floor_loc,
678 );
679 guard.extend_to(*inactivity_floor_loc);
680 }
681
682 let active_keys = {
686 let bitmap = &bitmap;
687 build_snapshot_from_log(
688 inactivity_floor_loc,
689 &reader,
690 &mut index,
691 |is_active, old_loc| {
692 let mut guard = bitmap.write();
693 guard.push(is_active);
694 if let Some(loc) = old_loc {
695 guard.set_bit(*loc, false);
696 }
697 },
698 )
699 .await?
700 };
701
702 (last_commit_loc, inactivity_floor_loc, active_keys, bitmap)
708 };
709
710 if bitmap::Readable::<N>::len(bitmap.as_ref()) != log.size().await {
712 return Err(crate::qmdb::Error::DataCorrupted(
713 "bitmap length diverged from log size during init",
714 ));
715 }
716
717 let inactive_peaks = F::inactive_peaks(
718 F::location_to_position(log.merkle.leaves()),
719 inactivity_floor_loc,
720 );
721 let root = log.root(inactive_peaks)?;
722
723 let db = Self {
724 log,
725 root,
726 inactivity_floor_loc,
727 snapshot: index,
728 last_commit_loc,
729 active_keys,
730 bitmap,
731 metrics,
732 _update: core::marker::PhantomData,
733 };
734 db.update_metrics().await;
735 Ok(db)
736 }
737
738 #[tracing::instrument(
740 name = "qmdb::any::Db::sync",
741 level = "info",
742 skip_all,
743 fields(
744 db_size = *self.last_commit_loc + 1,
745 inactivity_floor = *self.inactivity_floor_loc,
746 active_keys = self.active_keys as u64,
747 ),
748 )]
749 pub async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
750 let _timer = self.metrics.operations.sync_timer();
751 self.metrics.operations.sync_calls.inc();
752 self.log.sync().await?;
753 Ok(())
754 }
755
756 #[tracing::instrument(
759 name = "qmdb::any::Db::commit",
760 level = "info",
761 skip_all,
762 fields(
763 db_size = *self.last_commit_loc + 1,
764 inactivity_floor = *self.inactivity_floor_loc,
765 active_keys = self.active_keys as u64,
766 ),
767 )]
768 pub async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
769 let _timer = self.metrics.operations.commit_timer();
770 self.metrics.operations.commit_calls.inc();
771 self.log.commit().await?;
772 Ok(())
773 }
774
775 pub async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
777 self.log.destroy().await.map_err(Into::into)
778 }
779}
780
781impl<F, E, U, C, I, H, const N: usize, S> Persistable for Db<F, E, C, I, H, U, N, S>
782where
783 F: Family,
784 E: Context,
785 U: Update,
786 C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
787 I: UnorderedIndex<Value = Location<F>>,
788 H: Hasher,
789 S: Strategy,
790 Operation<F, U>: Codec,
791{
792 type Error = crate::qmdb::Error<F>;
793
794 async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
795 Self::commit(self).await
796 }
797
798 async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
799 Self::sync(self).await
800 }
801
802 async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
803 self.destroy().await
804 }
805}