1use crate::{
21 index::{Cursor, Unordered as Index},
22 journal::contiguous::{Contiguous, MutableContiguous},
23 mmr::Location,
24 qmdb::operation::Operation,
25 DirtyAuthenticatedBitMap,
26};
27use commonware_cryptography::Digest;
28use commonware_utils::NZUsize;
29use core::num::NonZeroUsize;
30use futures::{pin_mut, StreamExt as _};
31use thiserror::Error;
32
33pub mod any;
34pub mod current;
35pub mod immutable;
36pub mod keyless;
37pub mod operation;
38pub mod store;
39pub mod sync;
40pub mod verify;
41pub use verify::{
42 create_multi_proof, create_proof, create_proof_store, create_proof_store_from_digests,
43 digests_required_for_proof, extract_pinned_nodes, verify_multi_proof, verify_proof,
44 verify_proof_and_extract_digests,
45};
46
47#[derive(Error, Debug)]
49pub enum Error {
50 #[error("mmr error: {0}")]
51 Mmr(#[from] crate::mmr::Error),
52
53 #[error("metadata error: {0}")]
54 Metadata(#[from] crate::metadata::Error),
55
56 #[error("journal error: {0}")]
57 Journal(#[from] crate::journal::Error),
58
59 #[error("runtime error: {0}")]
60 Runtime(#[from] commonware_runtime::Error),
61
62 #[error("operation pruned: {0}")]
63 OperationPruned(Location),
64
65 #[error("key not found")]
67 KeyNotFound,
68
69 #[error("key exists")]
71 KeyExists,
72
73 #[error("unexpected data at location: {0}")]
74 UnexpectedData(Location),
75
76 #[error("location out of bounds: {0} >= {1}")]
77 LocationOutOfBounds(Location, Location),
78
79 #[error("prune location {0} beyond minimum required location {1}")]
80 PruneBeyondMinRequired(Location, Location),
81}
82
83impl From<crate::journal::authenticated::Error> for Error {
84 fn from(e: crate::journal::authenticated::Error) -> Self {
85 match e {
86 crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
87 crate::journal::authenticated::Error::Mmr(m) => Self::Mmr(m),
88 }
89 }
90}
91
92const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
95
96pub(super) async fn build_snapshot_from_log<C, I, F>(
102 inactivity_floor_loc: Location,
103 log: &C,
104 snapshot: &mut I,
105 mut callback: F,
106) -> Result<usize, Error>
107where
108 C: Contiguous<Item: Operation>,
109 I: Index<Value = Location>,
110 F: FnMut(bool, Option<Location>),
111{
112 let stream = log
113 .replay(*inactivity_floor_loc, SNAPSHOT_READ_BUFFER_SIZE)
114 .await?;
115 pin_mut!(stream);
116 let last_commit_loc = log.size().saturating_sub(1);
117 let mut active_keys: usize = 0;
118 while let Some(result) = stream.next().await {
119 let (loc, op) = result?;
120 if let Some(key) = op.key() {
121 if op.is_delete() {
122 let old_loc = delete_key(snapshot, log, key).await?;
123 callback(false, old_loc);
124 if old_loc.is_some() {
125 active_keys -= 1;
126 }
127 } else if op.is_update() {
128 let new_loc = Location::new_unchecked(loc);
129 let old_loc = update_key(snapshot, log, key, new_loc).await?;
130 callback(true, old_loc);
131 if old_loc.is_none() {
132 active_keys += 1;
133 }
134 }
135 } else if op.has_floor().is_some() {
136 callback(loc == last_commit_loc, None);
137 }
138 }
139
140 Ok(active_keys)
141}
142
143async fn delete_key<I, C>(
146 snapshot: &mut I,
147 log: &C,
148 key: &<C::Item as Operation>::Key,
149) -> Result<Option<Location>, Error>
150where
151 I: Index<Value = Location>,
152 C: Contiguous<Item: Operation>,
153{
154 let Some(mut cursor) = snapshot.get_mut(key) else {
156 return Ok(None);
157 };
158
159 let Some(loc) = find_update_op(log, &mut cursor, key).await? else {
161 return Ok(None);
162 };
163 cursor.delete();
164
165 Ok(Some(loc))
166}
167
168async fn update_key<I, C>(
171 snapshot: &mut I,
172 log: &C,
173 key: &<C::Item as Operation>::Key,
174 new_loc: Location,
175) -> Result<Option<Location>, Error>
176where
177 I: Index<Value = Location>,
178 C: Contiguous<Item: Operation>,
179{
180 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
183 return Ok(None);
184 };
185
186 if let Some(loc) = find_update_op(log, &mut cursor, key).await? {
188 assert!(new_loc > loc);
189 cursor.update(new_loc);
190 return Ok(Some(loc));
191 }
192
193 cursor.insert(new_loc);
195
196 Ok(None)
197}
198
199async fn create_key<I, C>(
202 snapshot: &mut I,
203 log: &C,
204 key: &<C::Item as Operation>::Key,
205 new_loc: Location,
206) -> Result<bool, Error>
207where
208 I: Index<Value = Location>,
209 C: Contiguous<Item: Operation>,
210{
211 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
214 return Ok(true);
215 };
216
217 if find_update_op(log, &mut cursor, key).await?.is_some() {
219 return Ok(false);
220 }
221
222 cursor.insert(new_loc);
224
225 Ok(true)
226}
227
228async fn find_update_op<C>(
235 log: &C,
236 cursor: &mut impl Cursor<Value = Location>,
237 key: &<C::Item as Operation>::Key,
238) -> Result<Option<Location>, Error>
239where
240 C: Contiguous<Item: Operation>,
241{
242 while let Some(&loc) = cursor.next() {
243 let op = log.read(*loc).await?;
244 let k = op.key().expect("operation without key");
245 if *k == *key {
246 return Ok(Some(loc));
247 }
248 }
249
250 Ok(None)
251}
252
253fn update_known_loc<I: Index<Value = Location>>(
260 snapshot: &mut I,
261 key: &[u8],
262 old_loc: Location,
263 new_loc: Location,
264) {
265 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
266 assert!(
267 cursor.find(|&loc| *loc == old_loc),
268 "known key with given old_loc should have been found"
269 );
270 cursor.update(new_loc);
271}
272
273fn delete_known_loc<I: Index<Value = Location>>(snapshot: &mut I, key: &[u8], old_loc: Location) {
280 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
281 assert!(
282 cursor.find(|&loc| *loc == old_loc),
283 "known key with given old_loc should have been found"
284 );
285 cursor.delete();
286}
287
288pub(crate) struct FloorHelper<'a, I: Index<Value = Location>, C: MutableContiguous<Item: Operation>>
290{
291 pub snapshot: &'a mut I,
292 pub log: &'a mut C,
293}
294
295impl<I, C> FloorHelper<'_, I, C>
296where
297 I: Index<Value = Location>,
298 C: MutableContiguous<Item: Operation>,
299{
300 async fn move_op_if_active(&mut self, op: C::Item, old_loc: Location) -> Result<bool, Error> {
304 let Some(key) = op.key() else {
305 return Ok(false); };
307
308 let Some(mut cursor) = self.snapshot.get_mut(key) else {
310 return Ok(false);
311 };
312 if !cursor.find(|&loc| loc == old_loc) {
313 return Ok(false);
314 }
315
316 cursor.update(Location::new_unchecked(self.log.size()));
318 drop(cursor);
319
320 self.log.append(op).await?;
322
323 Ok(true)
324 }
325
326 async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result<Location, Error>
338 where
339 I: Index<Value = Location>,
340 {
341 let tip_loc = Location::new_unchecked(self.log.size());
342 loop {
343 assert!(
344 *inactivity_floor_loc < tip_loc,
345 "no active operations above the inactivity floor"
346 );
347 let old_loc = inactivity_floor_loc;
348 inactivity_floor_loc += 1;
349 let op = self.log.read(*old_loc).await?;
350 if self.move_op_if_active(op, old_loc).await? {
351 return Ok(inactivity_floor_loc);
352 }
353 }
354 }
355
356 pub(crate) async fn raise_floor_with_bitmap<D: Digest, const N: usize>(
364 &mut self,
365 status: &mut DirtyAuthenticatedBitMap<D, N>,
366 mut inactivity_floor_loc: Location,
367 ) -> Result<Location, Error>
368 where
369 I: Index<Value = Location>,
370 {
371 while !status.get_bit(*inactivity_floor_loc) {
373 inactivity_floor_loc += 1;
374 }
375
376 let op = self.log.read(*inactivity_floor_loc).await?;
378 assert!(
379 self.move_op_if_active(op, inactivity_floor_loc).await?,
380 "op should be active based on status bitmap"
381 );
382 status.set_bit(*inactivity_floor_loc, false);
383 status.push(true);
384
385 Ok(inactivity_floor_loc + 1)
386 }
387}