1use super::operation::{update::Update, Operation};
6use crate::{
7 index::Unordered as UnorderedIndex,
8 journal::{
9 authenticated,
10 contiguous::{Contiguous, Mutable, Reader},
11 Error as JournalError,
12 },
13 merkle::{Family, Location, Proof},
14 qmdb::{
15 build_snapshot_from_log, delete_known_loc, operation::Operation as OperationTrait,
16 update_known_loc, Error,
17 },
18 Context, Persistable,
19};
20use commonware_codec::{Codec, CodecShared};
21use commonware_cryptography::Hasher;
22use core::num::NonZeroU64;
23use std::collections::HashMap;
24
25pub(crate) type AuthenticatedLog<F, E, C, H> = authenticated::Journal<F, E, C, H>;
27
28enum SnapshotUndo<F: Family, K> {
30 Replace {
31 key: K,
32 old_loc: Location<F>,
33 new_loc: Location<F>,
34 },
35 Remove {
36 key: K,
37 old_loc: Location<F>,
38 },
39 Insert {
40 key: K,
41 new_loc: Location<F>,
42 },
43}
44
45pub struct Db<
52 F: Family,
53 E: Context,
54 C: Contiguous<Item: CodecShared>,
55 I: UnorderedIndex<Value = Location<F>>,
56 H: Hasher,
57 U: Send + Sync,
58> {
59 pub(crate) log: AuthenticatedLog<F, E, C, H>,
67
68 pub(crate) inactivity_floor_loc: Location<F>,
71
72 pub(crate) last_commit_loc: Location<F>,
74
75 pub(crate) snapshot: I,
82
83 pub(crate) active_keys: usize,
85
86 pub(crate) _update: core::marker::PhantomData<U>,
88}
89
90impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
92where
93 F: Family,
94 E: Context,
95 U: Update,
96 C: Contiguous<Item = Operation<F, U>>,
97 I: UnorderedIndex<Value = Location<F>>,
98 H: Hasher,
99 Operation<F, U>: Codec,
100{
101 pub const fn inactivity_floor_loc(&self) -> Location<F> {
104 self.inactivity_floor_loc
105 }
106
107 pub const fn is_empty(&self) -> bool {
109 self.active_keys == 0
110 }
111
112 pub async fn get_metadata(&self) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
114 match self.log.reader().await.read(*self.last_commit_loc).await? {
115 Operation::CommitFloor(metadata, _) => Ok(metadata),
116 _ => unreachable!("last commit is not a CommitFloor operation"),
117 }
118 }
119
120 pub fn root(&self) -> H::Digest {
121 self.log.root()
122 }
123
124 pub async fn get(&self, key: &U::Key) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
126 let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
128 let reader = self.log.reader().await;
129 for loc in locs {
130 let op = reader.read(*loc).await?;
131 let Operation::Update(data) = op else {
132 panic!("location does not reference update operation. loc={loc}");
133 };
134 if data.key() == key {
135 return Ok(Some(data.value().clone()));
136 }
137 }
138 Ok(None)
139 }
140
141 pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
144 let bounds = self.log.reader().await.bounds();
145 Location::new(bounds.start)..Location::new(bounds.end)
146 }
147
148 pub async fn pinned_nodes_at(
150 &self,
151 loc: Location<F>,
152 ) -> Result<Vec<H::Digest>, crate::qmdb::Error<F>> {
153 if !loc.is_valid() {
154 return Err(crate::merkle::Error::LocationOverflow(loc).into());
155 }
156 let futs: Vec<_> = F::nodes_to_pin(loc)
157 .map(|p| async move {
158 self.log
159 .merkle
160 .get_node(p)
161 .await?
162 .ok_or(crate::merkle::Error::ElementPruned(p).into())
163 })
164 .collect();
165 futures::future::try_join_all(futs).await
166 }
167}
168
169impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
171where
172 F: Family,
173 E: Context,
174 U: Update,
175 C: Mutable<Item = Operation<F, U>>,
176 I: UnorderedIndex<Value = Location<F>>,
177 H: Hasher,
178 Operation<F, U>: Codec,
179{
180 pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), crate::qmdb::Error<F>> {
188 if prune_loc > self.inactivity_floor_loc {
189 return Err(crate::qmdb::Error::PruneBeyondMinRequired(
190 prune_loc,
191 self.inactivity_floor_loc,
192 ));
193 }
194
195 self.log.prune(prune_loc).await?;
196
197 Ok(())
198 }
199
200 pub async fn historical_proof(
201 &self,
202 historical_size: Location<F>,
203 start_loc: Location<F>,
204 max_ops: NonZeroU64,
205 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
206 self.log
207 .historical_proof(historical_size, start_loc, max_ops)
208 .await
209 .map_err(Into::into)
210 }
211
212 pub async fn proof(
213 &self,
214 loc: Location<F>,
215 max_ops: NonZeroU64,
216 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
217 self.historical_proof(self.log.size().await, loc, max_ops)
218 .await
219 }
220
221 pub async fn rewind(&mut self, size: Location<F>) -> Result<Vec<Location<F>>, Error<F>> {
243 let rewind_size = *size;
244 let current_size = *self.last_commit_loc + 1;
245
246 if rewind_size == current_size {
247 return Ok(Vec::new());
248 }
249 if rewind_size == 0 || rewind_size > current_size {
250 return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
251 }
252
253 let (rewind_floor, undos, active_keys_delta) = {
255 let reader = self.log.reader().await;
256 let bounds = reader.bounds();
257 let rewind_last_loc = Location::new(rewind_size - 1);
258 if rewind_size <= bounds.start {
259 return Err(Error::<F>::Journal(JournalError::ItemPruned(
260 *rewind_last_loc,
261 )));
262 }
263 let rewind_last_op = reader.read(*rewind_last_loc).await?;
264 let Some(rewind_floor) = rewind_last_op.has_floor() else {
265 return Err(Error::UnexpectedData(rewind_last_loc));
266 };
267 if *rewind_floor < bounds.start {
268 return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
269 }
270
271 let mut undos = Vec::with_capacity((current_size - rewind_size) as usize);
272 let mut active_keys_delta = 0isize;
273 let mut prior_state_by_key: HashMap<U::Key, Option<Location<F>>> = HashMap::new();
274
275 for loc in *rewind_floor..current_size {
277 let op = reader.read(loc).await?;
278 let op_loc = Location::new(loc);
279 match op {
280 Operation::CommitFloor(_, _) => {}
281 Operation::Update(update) => {
282 let key = update.key().clone();
283 let previous_loc = prior_state_by_key.get(&key).copied().flatten();
284
285 if loc >= rewind_size {
286 if let Some(previous_loc) = previous_loc {
287 undos.push(SnapshotUndo::Replace {
288 key: key.clone(),
289 old_loc: op_loc,
290 new_loc: previous_loc,
291 });
292 } else {
293 active_keys_delta -= 1;
294 undos.push(SnapshotUndo::Remove {
295 key: key.clone(),
296 old_loc: op_loc,
297 });
298 }
299 }
300
301 prior_state_by_key.insert(key, Some(op_loc));
302 }
303 Operation::Delete(key) => {
304 let previous_loc = prior_state_by_key.get(&key).copied().flatten();
305
306 if loc >= rewind_size {
307 if let Some(previous_loc) = previous_loc {
308 active_keys_delta += 1;
309 undos.push(SnapshotUndo::Insert {
310 key: key.clone(),
311 new_loc: previous_loc,
312 });
313 }
314 }
315
316 prior_state_by_key.insert(key, None);
317 }
318 }
319 }
320
321 undos.reverse();
323
324 (rewind_floor, undos, active_keys_delta)
325 };
326
327 self.log.rewind(rewind_size).await?;
331
332 let mut restored_locs = Vec::new();
333 for undo in undos {
334 match undo {
335 SnapshotUndo::Replace {
336 key,
337 old_loc,
338 new_loc,
339 } => {
340 if new_loc < rewind_size {
341 restored_locs.push(new_loc);
342 }
343 update_known_loc(&mut self.snapshot, &key, old_loc, new_loc);
344 }
345 SnapshotUndo::Remove { key, old_loc } => {
346 delete_known_loc(&mut self.snapshot, &key, old_loc)
347 }
348 SnapshotUndo::Insert { key, new_loc } => {
349 if new_loc < rewind_size {
350 restored_locs.push(new_loc);
351 }
352 self.snapshot.insert(&key, new_loc);
353 }
354 }
355 }
356
357 self.active_keys = self
358 .active_keys
359 .checked_add_signed(active_keys_delta)
360 .ok_or(Error::DataCorrupted(
361 "active_keys underflow while rewinding",
362 ))?;
363 self.last_commit_loc = Location::new(rewind_size - 1);
364 self.inactivity_floor_loc = rewind_floor;
365
366 Ok(restored_locs)
367 }
368}
369
370impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
372where
373 F: Family,
374 E: Context,
375 U: Update,
376 C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
377 I: UnorderedIndex<Value = Location<F>>,
378 H: Hasher,
379 Operation<F, U>: Codec,
380{
381 pub async fn init_from_log<Cb>(
388 mut index: I,
389 log: AuthenticatedLog<F, E, C, H>,
390 known_inactivity_floor: Option<Location<F>>,
391 mut callback: Cb,
392 ) -> Result<Self, crate::qmdb::Error<F>>
393 where
394 Cb: FnMut(bool, Option<Location<F>>),
395 {
396 let (last_commit_loc, inactivity_floor_loc, active_keys) = {
399 let reader = log.reader().await;
400 let last_commit_loc = reader
401 .bounds()
402 .end
403 .checked_sub(1)
404 .expect("commit should exist");
405 let last_commit = reader.read(last_commit_loc).await?;
406 let inactivity_floor_loc = last_commit.has_floor().expect("should be a commit");
407 if let Some(known_inactivity_floor) = known_inactivity_floor {
408 (*known_inactivity_floor..*inactivity_floor_loc)
409 .for_each(|_| callback(false, None));
410 }
411
412 let active_keys =
413 build_snapshot_from_log(inactivity_floor_loc, &reader, &mut index, callback)
414 .await?;
415 (
416 Location::new(last_commit_loc),
417 inactivity_floor_loc,
418 active_keys,
419 )
420 };
421
422 Ok(Self {
423 log,
424 inactivity_floor_loc,
425 snapshot: index,
426 last_commit_loc,
427 active_keys,
428 _update: core::marker::PhantomData,
429 })
430 }
431
432 pub async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
434 self.log.sync().await.map_err(Into::into)
435 }
436
437 pub async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
440 self.log.commit().await.map_err(Into::into)
441 }
442
443 pub async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
445 self.log.destroy().await.map_err(Into::into)
446 }
447}
448
449impl<F, E, U, C, I, H> Persistable for Db<F, E, C, I, H, U>
450where
451 F: Family,
452 E: Context,
453 U: Update,
454 C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
455 I: UnorderedIndex<Value = Location<F>>,
456 H: Hasher,
457 Operation<F, U>: Codec,
458{
459 type Error = crate::qmdb::Error<F>;
460
461 async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
462 Self::commit(self).await
463 }
464
465 async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
466 Self::sync(self).await
467 }
468
469 async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
470 self.destroy().await
471 }
472}