1use crate::{
46 index::{Cursor, Unordered as Index},
47 journal::contiguous::{Mutable, Reader},
48 merkle::{Family, Location},
49 qmdb::operation::Operation,
50};
51use commonware_utils::NZUsize;
52use core::num::NonZeroUsize;
53use futures::{pin_mut, StreamExt as _};
54use thiserror::Error;
55
56pub mod any;
57pub mod current;
58pub mod immutable;
59pub mod keyless;
60pub mod operation;
61pub mod store;
62pub mod sync;
63pub mod verify;
64pub use verify::{
65 create_multi_proof, create_proof_store, verify_multi_proof, verify_proof,
66 verify_proof_and_extract_digests, verify_proof_and_pinned_nodes,
67};
68
69#[derive(Error, Debug)]
71pub enum Error<F: Family> {
72 #[error("data corrupted: {0}")]
73 DataCorrupted(&'static str),
74
75 #[error("merkle error: {0}")]
76 Merkle(#[from] crate::merkle::Error<F>),
77
78 #[error("metadata error: {0}")]
79 Metadata(#[from] crate::metadata::Error),
80
81 #[error("journal error: {0}")]
82 Journal(#[from] crate::journal::Error),
83
84 #[error("runtime error: {0}")]
85 Runtime(#[from] commonware_runtime::Error),
86
87 #[error("operation pruned: {0}")]
88 OperationPruned(Location<F>),
89
90 #[error("key not found")]
92 KeyNotFound,
93
94 #[error("key exists")]
96 KeyExists,
97
98 #[error("unexpected data at location: {0}")]
99 UnexpectedData(Location<F>),
100
101 #[error("location out of bounds: {0} >= {1}")]
102 LocationOutOfBounds(Location<F>, Location<F>),
103
104 #[error("prune location {0} beyond minimum required location {1}")]
105 PruneBeyondMinRequired(Location<F>, Location<F>),
106
107 #[error(
109 "stale batch: db has {db_size} ops, batch requires {batch_db_size} or {batch_base_size}"
110 )]
111 StaleBatch {
112 db_size: u64,
113 batch_db_size: u64,
114 batch_base_size: u64,
115 },
116}
117
118impl<F: Family> From<crate::journal::authenticated::Error<F>> for Error<F> {
119 fn from(e: crate::journal::authenticated::Error<F>) -> Self {
120 match e {
121 crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
122 crate::journal::authenticated::Error::Merkle(m) => Self::Merkle(m),
123 }
124 }
125}
126
127const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
130
131pub(super) async fn build_snapshot_from_log<F, C, I, Fn>(
137 inactivity_floor_loc: crate::merkle::Location<F>,
138 reader: &C,
139 snapshot: &mut I,
140 mut callback: Fn,
141) -> Result<usize, Error<F>>
142where
143 F: crate::merkle::Family,
144 C: Reader<Item: Operation<F>>,
145 I: Index<Value = crate::merkle::Location<F>>,
146 Fn: FnMut(bool, Option<crate::merkle::Location<F>>),
147{
148 let bounds = reader.bounds();
149 let stream = reader
150 .replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
151 .await?;
152 pin_mut!(stream);
153 let last_commit_loc = bounds.end.saturating_sub(1);
154 let mut active_keys: usize = 0;
155 while let Some(result) = stream.next().await {
156 let (loc, op) = result?;
157 if let Some(key) = op.key() {
158 if op.is_delete() {
159 let old_loc = delete_key(snapshot, reader, key).await?;
160 callback(false, old_loc);
161 if old_loc.is_some() {
162 active_keys -= 1;
163 }
164 } else if op.is_update() {
165 let new_loc = crate::merkle::Location::new(loc);
166 let old_loc = update_key(snapshot, reader, key, new_loc).await?;
167 callback(true, old_loc);
168 if old_loc.is_none() {
169 active_keys += 1;
170 }
171 }
172 } else if op.has_floor().is_some() {
173 callback(loc == last_commit_loc, None);
174 }
175 }
176
177 Ok(active_keys)
178}
179
180async fn delete_key<F, I, R>(
183 snapshot: &mut I,
184 reader: &R,
185 key: &<R::Item as Operation<F>>::Key,
186) -> Result<Option<Location<F>>, Error<F>>
187where
188 F: Family,
189 I: Index<Value = Location<F>>,
190 R: Reader,
191 R::Item: Operation<F>,
192{
193 let Some(mut cursor) = snapshot.get_mut(key) else {
195 return Ok(None);
196 };
197
198 let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? else {
200 return Ok(None);
201 };
202 cursor.delete();
203
204 Ok(Some(loc))
205}
206
207async fn update_key<F, I, R>(
209 snapshot: &mut I,
210 reader: &R,
211 key: &<R::Item as Operation<F>>::Key,
212 new_loc: Location<F>,
213) -> Result<Option<Location<F>>, Error<F>>
214where
215 F: Family,
216 I: Index<Value = Location<F>>,
217 R: Reader,
218 R::Item: Operation<F>,
219{
220 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
223 return Ok(None);
224 };
225
226 if let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? {
228 assert!(new_loc > loc);
229 cursor.update(new_loc);
230 return Ok(Some(loc));
231 }
232
233 cursor.insert(new_loc);
235
236 Ok(None)
237}
238
239async fn find_update_op<F, R>(
246 reader: &R,
247 cursor: &mut impl Cursor<Value = Location<F>>,
248 key: &<R::Item as Operation<F>>::Key,
249) -> Result<Option<Location<F>>, Error<F>>
250where
251 F: Family,
252 R: Reader,
253 R::Item: Operation<F>,
254{
255 while let Some(&loc) = cursor.next() {
256 let op = reader.read(*loc).await?;
257 let k = op.key().expect("operation without key");
258 if *k == *key {
259 return Ok(Some(loc));
260 }
261 }
262
263 Ok(None)
264}
265
266fn update_known_loc<F: Family, I: Index<Value = Location<F>>>(
273 snapshot: &mut I,
274 key: &[u8],
275 old_loc: Location<F>,
276 new_loc: Location<F>,
277) {
278 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
279 assert!(
280 cursor.find(|&loc| *loc == old_loc),
281 "known key with given old_loc should have been found"
282 );
283 cursor.update(new_loc);
284}
285
286fn delete_known_loc<F: Family, I: Index<Value = Location<F>>>(
293 snapshot: &mut I,
294 key: &[u8],
295 old_loc: Location<F>,
296) {
297 let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
298 assert!(
299 cursor.find(|&loc| *loc == old_loc),
300 "known key with given old_loc should have been found"
301 );
302 cursor.delete();
303}
304
305pub(crate) struct FloorHelper<
307 'a,
308 F: Family,
309 I: Index<Value = Location<F>>,
310 C: Mutable<Item: Operation<F>>,
311> {
312 pub snapshot: &'a mut I,
313 pub log: &'a mut C,
314}
315
316impl<F, I, C> FloorHelper<'_, F, I, C>
317where
318 F: Family,
319 I: Index<Value = Location<F>>,
320 C: Mutable<Item: Operation<F>>,
321{
322 async fn move_op_if_active(
326 &mut self,
327 op: C::Item,
328 old_loc: Location<F>,
329 ) -> Result<bool, Error<F>> {
330 let Some(key) = op.key() else {
331 return Ok(false); };
333
334 {
336 let Some(mut cursor) = self.snapshot.get_mut(key) else {
337 return Ok(false);
338 };
339 if !cursor.find(|&loc| loc == old_loc) {
340 return Ok(false);
341 }
342
343 cursor.update(Location::<F>::new(self.log.size().await));
345 }
346
347 self.log.append(&op).await?;
349
350 Ok(true)
351 }
352
353 async fn raise_floor(
363 &mut self,
364 mut inactivity_floor_loc: Location<F>,
365 ) -> Result<Location<F>, Error<F>> {
366 let tip_loc: Location<F> = Location::new(self.log.size().await);
367 loop {
368 assert!(
369 *inactivity_floor_loc < tip_loc,
370 "no active operations above the inactivity floor"
371 );
372 let old_loc = inactivity_floor_loc;
373 inactivity_floor_loc += 1;
374 let op = {
375 let reader = self.log.reader().await;
376 reader.read(*old_loc).await?
377 };
378 if self.move_op_if_active(op, old_loc).await? {
379 return Ok(inactivity_floor_loc);
380 }
381 }
382 }
383}