1#[cfg(any(test, feature = "test-traits"))]
2use crate::qmdb::any::traits::PersistableMutableLog;
3use crate::{
4 index::Ordered as Index,
5 journal::contiguous::{Contiguous, Reader},
6 merkle::{Family, Location},
7 qmdb::{
8 any::{db::Db, ValueEncoding},
9 operation::{Key, Operation as OperationTrait},
10 },
11 Context,
12};
13use commonware_codec::Codec;
14use commonware_cryptography::Hasher;
15use futures::{
16 future::try_join_all,
17 stream::{self, Stream},
18};
19use std::{
20 collections::{BTreeMap, BTreeSet},
21 ops::Bound,
22};
23
24pub mod fixed;
25pub mod variable;
26
27pub use crate::qmdb::any::operation::{update::Ordered as Update, Ordered as Operation};
28
29type LocatedKey<F, K, V> = Option<(Location<F>, Update<K, V>)>;
31
32impl<
33 F: Family,
34 E: Context,
35 K: Key,
36 V: ValueEncoding,
37 C: Contiguous<Item = Operation<F, K, V>>,
38 I: Index<Value = Location<F>>,
39 H: Hasher,
40 > Db<F, E, C, I, H, Update<K, V>>
41where
42 Operation<F, K, V>: Codec,
43{
44 async fn get_update_op(
45 reader: &impl Reader<Item = Operation<F, K, V>>,
46 loc: Location<F>,
47 ) -> Result<Update<K, V>, crate::qmdb::Error<F>> {
48 match reader.read(*loc).await? {
49 Operation::Update(key_data) => Ok(key_data),
50 _ => unreachable!("expected update operation at location {}", loc),
51 }
52 }
53
54 pub fn span_contains(span_start: &K, span_end: &K, key: &K) -> bool {
56 if span_start >= span_end {
57 if key >= span_start || key < span_end {
59 return true;
60 }
61 } else {
62 if key >= span_start && key < span_end {
64 return true;
65 }
66 }
67
68 false
69 }
70
71 async fn find_span(
73 &self,
74 locs: impl IntoIterator<Item = Location<F>>,
75 key: &K,
76 ) -> Result<LocatedKey<F, K, V>, crate::qmdb::Error<F>> {
77 let reader = self.log.reader().await;
78 for loc in locs {
79 let data = Self::get_update_op(&reader, loc).await?;
81 if Self::span_contains(&data.key, &data.next_key, key) {
82 return Ok(Some((loc, data)));
83 }
84 }
85
86 Ok(None)
87 }
88
89 pub async fn get_span(&self, key: &K) -> Result<LocatedKey<F, K, V>, crate::qmdb::Error<F>> {
92 if self.is_empty() {
93 return Ok(None);
94 }
95
96 let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
99 let span = self.find_span(locs, key).await?;
100 if let Some(span) = span {
101 return Ok(Some(span));
102 }
103
104 let Some((iter, _)) = self.snapshot.prev_translated_key(key) else {
105 return Ok(None);
107 };
108
109 let locs: Vec<Location<F>> = iter.copied().collect();
111 let span = self
112 .find_span(locs, key)
113 .await?
114 .expect("a span that includes any given key should always exist if db is non-empty");
115
116 Ok(Some(span))
117 }
118
119 pub async fn get_all(&self, key: &K) -> Result<Option<(V::Value, K)>, crate::qmdb::Error<F>> {
121 self.get_with_loc(key)
122 .await
123 .map(|res| res.map(|(data, _)| (data.value, data.next_key)))
124 }
125
126 pub(crate) async fn get_with_loc(
128 &self,
129 key: &K,
130 ) -> Result<Option<(Update<K, V>, Location<F>)>, crate::qmdb::Error<F>> {
131 let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
133 let reader = self.log.reader().await;
134 for loc in locs {
135 let op = reader.read(*loc).await?;
136 assert!(
137 op.is_update(),
138 "location does not reference update operation. loc={loc}"
139 );
140 if op.key().expect("update operation must have key") == key {
141 let Operation::Update(data) = op else {
142 unreachable!("expected update operation");
143 };
144 return Ok(Some((data, loc)));
145 }
146 }
147
148 Ok(None)
149 }
150
151 pub async fn stream_range<'a>(
154 &'a self,
155 start: K,
156 ) -> Result<
157 impl Stream<Item = Result<(K, V::Value), crate::qmdb::Error<F>>> + 'a,
158 crate::qmdb::Error<F>,
159 >
160 where
161 V: 'a,
162 {
163 let start_iter = self.snapshot.get(&start);
164 let mut init_pending = self.fetch_all_updates(start_iter).await?;
165 init_pending.retain(|x| x.key >= start);
166
167 Ok(stream::unfold(
168 (start, init_pending),
169 move |(driver_key, mut pending): (K, Vec<Update<K, V>>)| async move {
170 if !pending.is_empty() {
171 let item = pending.pop().expect("pending is not empty");
172 return Some((Ok((item.key, item.value)), (driver_key, pending)));
173 }
174
175 let Some((iter, wrapped)) = self.snapshot.next_translated_key(&driver_key) else {
176 return None; };
178 if wrapped {
179 return None; }
181
182 match self.fetch_all_updates(iter).await {
185 Ok(mut pending) => {
186 let item = pending.pop().expect("pending is not empty");
187 let key = item.key.clone();
188 Some((Ok((item.key, item.value)), (key, pending)))
189 }
190 Err(e) => Some((Err(e), (driver_key, pending))),
191 }
192 },
193 ))
194 }
195
196 async fn fetch_all_updates(
199 &self,
200 locs: impl IntoIterator<Item = &Location<F>>,
201 ) -> Result<Vec<Update<K, V>>, crate::qmdb::Error<F>> {
202 let reader = self.log.reader().await;
203 let futures = locs
204 .into_iter()
205 .map(|loc| Self::get_update_op(&reader, *loc));
206 let mut updates = try_join_all(futures).await?;
207 updates.sort_by(|a, b| b.key.cmp(&a.key));
208
209 Ok(updates)
210 }
211}
212
213pub(crate) fn find_next_key<K: Ord + Clone>(key: &K, possible_next: &BTreeSet<K>) -> K {
220 let next = possible_next
221 .range((Bound::Excluded(key), Bound::Unbounded))
222 .next();
223 if let Some(next) = next {
224 return next.clone();
225 }
226 possible_next
227 .first()
228 .expect("possible_next should not be empty")
229 .clone()
230}
231
232pub(crate) fn find_prev_key<'a, K: Ord, V>(
239 key: &K,
240 possible_previous: &'a BTreeMap<K, V>,
241) -> (&'a K, &'a V) {
242 let prev = possible_previous
243 .range((Bound::Unbounded, Bound::Excluded(key)))
244 .next_back();
245 if let Some(prev) = prev {
246 return prev;
247 }
248 possible_previous
249 .iter()
250 .next_back()
251 .expect("possible_previous should not be empty")
252}
253
254#[cfg(any(test, feature = "test-traits"))]
255crate::qmdb::any::traits::impl_db_any! {
256 [E, K, V, C, I, H] Db<crate::merkle::mmr::Family, E, C, I, H, Update<K, V>>
257 where {
258 E: Context,
259 K: Key,
260 V: ValueEncoding + 'static,
261 C: PersistableMutableLog<Operation<crate::merkle::mmr::Family, K, V>>,
262 I: Index<Value = crate::mmr::Location> + 'static,
263 H: Hasher,
264 Operation<crate::merkle::mmr::Family, K, V>: Codec,
265 V::Value: Send + Sync,
266 }
267 Family = crate::merkle::mmr::Family, Key = K, Value = V::Value, Digest = H::Digest
268}
269
270#[cfg(any(test, feature = "test-traits"))]
271crate::qmdb::any::traits::impl_provable! {
272 [E, K, V, C, I, H] Db<crate::merkle::mmr::Family, E, C, I, H, Update<K, V>>
273 where {
274 E: Context,
275 K: Key,
276 V: ValueEncoding + 'static,
277 C: PersistableMutableLog<Operation<crate::merkle::mmr::Family, K, V>>,
278 I: Index<Value = crate::mmr::Location> + 'static,
279 H: Hasher,
280 Operation<crate::merkle::mmr::Family, K, V>: Codec,
281 V::Value: Send + Sync,
282 }
283 Family = crate::merkle::mmr::Family, Operation = Operation<crate::merkle::mmr::Family, K, V>
284}
285
286#[cfg(any(test, feature = "test-traits"))]
287crate::qmdb::any::traits::impl_db_any! {
288 [E, K, V, C, I, H] Db<crate::merkle::mmb::Family, E, C, I, H, Update<K, V>>
289 where {
290 E: Context,
291 K: Key,
292 V: ValueEncoding + 'static,
293 C: PersistableMutableLog<Operation<crate::merkle::mmb::Family, K, V>>,
294 I: Index<Value = crate::merkle::Location<crate::merkle::mmb::Family>> + 'static,
295 H: Hasher,
296 Operation<crate::merkle::mmb::Family, K, V>: Codec,
297 V::Value: Send + Sync,
298 }
299 Family = crate::merkle::mmb::Family, Key = K, Value = V::Value, Digest = H::Digest
300}
301
302#[cfg(any(test, feature = "test-traits"))]
303crate::qmdb::any::traits::impl_provable! {
304 [E, K, V, C, I, H] Db<crate::merkle::mmb::Family, E, C, I, H, Update<K, V>>
305 where {
306 E: Context,
307 K: Key,
308 V: ValueEncoding + 'static,
309 C: PersistableMutableLog<Operation<crate::merkle::mmb::Family, K, V>>,
310 I: Index<Value = crate::merkle::Location<crate::merkle::mmb::Family>> + 'static,
311 H: Hasher,
312 Operation<crate::merkle::mmb::Family, K, V>: Codec,
313 V::Value: Send + Sync,
314 }
315 Family = crate::merkle::mmb::Family, Operation = Operation<crate::merkle::mmb::Family, K, V>
316}
317
318#[cfg(test)]
319mod test {
320 use super::*;
321 use crate::{
322 merkle::mmr,
323 qmdb::any::traits::{DbAny, UnmerkleizedBatch as _},
324 };
325 use commonware_cryptography::{sha256::Digest, Sha256};
326 use commonware_runtime::{deterministic::Context, Metrics};
327 use commonware_utils::sequence::FixedBytes;
328 use core::{future::Future, pin::Pin};
329
330 pub(crate) async fn test_ordered_any_db_empty<
331 D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
332 >(
333 context: Context,
334 mut db: D,
335 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
336 ) {
337 assert!(db.get_metadata().await.unwrap().is_none());
338 assert!(matches!(
339 db.prune(db.inactivity_floor_loc().await).await,
340 Ok(())
341 ));
342
343 let d1 = FixedBytes::from([1u8; 4]);
346 let d2 = Sha256::fill(2u8);
347 let root = db.root();
348 {
350 let _batch = db.new_batch().write(d1, Some(d2));
351 }
353 let mut db = reopen_db(context.with_label("reopen1")).await;
354 assert_eq!(db.root(), root);
355
356 let metadata = Sha256::fill(3u8);
358 let merkleized = db.new_batch().merkleize(&db, Some(metadata)).await.unwrap();
359 let range = db.apply_batch(merkleized).await.unwrap();
360 db.commit().await.unwrap();
361 assert_eq!(range.start, Location::new(1));
362 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
363 let root = db.root();
364 assert!(matches!(
365 db.prune(db.inactivity_floor_loc().await).await,
366 Ok(())
367 ));
368
369 let mut db = reopen_db(context.with_label("reopen2")).await;
371 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
372 assert_eq!(db.root(), root);
373
374 for _ in 1..100 {
376 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
377 let _ = db.apply_batch(merkleized).await.unwrap();
378 db.commit().await.unwrap();
379 }
380 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
381 let _ = db.apply_batch(merkleized).await.unwrap();
382 db.commit().await.unwrap();
383 db.destroy().await.unwrap();
384 }
385
386 pub(crate) async fn test_ordered_any_db_basic<
387 D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
388 >(
389 context: Context,
390 mut db: D,
391 reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
392 ) {
393 let key1 = FixedBytes::from([1u8; 4]);
396 let key2 = FixedBytes::from([2u8; 4]);
397 let val1 = Sha256::fill(3u8);
398 let val2 = Sha256::fill(4u8);
399
400 assert!(db.get(&key1).await.unwrap().is_none());
401 assert!(db.get(&key2).await.unwrap().is_none());
402
403 assert!(db.get(&key1).await.unwrap().is_none());
404 let merkleized = db
405 .new_batch()
406 .write(key1.clone(), Some(val1))
407 .merkleize(&db, None)
408 .await
409 .unwrap();
410 db.apply_batch(merkleized).await.unwrap();
411 db.commit().await.unwrap();
412 assert_eq!(db.get(&key1).await.unwrap().unwrap(), val1);
413 assert!(db.get(&key2).await.unwrap().is_none());
414
415 assert!(db.get(&key2).await.unwrap().is_none());
416 let merkleized = db
417 .new_batch()
418 .write(key2.clone(), Some(val2))
419 .merkleize(&db, None)
420 .await
421 .unwrap();
422 db.apply_batch(merkleized).await.unwrap();
423 db.commit().await.unwrap();
424 assert_eq!(db.get(&key1).await.unwrap().unwrap(), val1);
425 assert_eq!(db.get(&key2).await.unwrap().unwrap(), val2);
426
427 let merkleized = db
428 .new_batch()
429 .write(key1.clone(), None)
430 .merkleize(&db, None)
431 .await
432 .unwrap();
433 db.apply_batch(merkleized).await.unwrap();
434 db.commit().await.unwrap();
435 assert!(db.get(&key1).await.unwrap().is_none());
436 assert_eq!(db.get(&key2).await.unwrap().unwrap(), val2);
437
438 let new_val = Sha256::fill(5u8);
439 let merkleized = db
440 .new_batch()
441 .write(key1.clone(), Some(new_val))
442 .merkleize(&db, None)
443 .await
444 .unwrap();
445 db.apply_batch(merkleized).await.unwrap();
446 db.commit().await.unwrap();
447 assert_eq!(db.get(&key1).await.unwrap().unwrap(), new_val);
448
449 let merkleized = db
450 .new_batch()
451 .write(key2.clone(), Some(new_val))
452 .merkleize(&db, None)
453 .await
454 .unwrap();
455 db.apply_batch(merkleized).await.unwrap();
456 db.commit().await.unwrap();
457 assert_eq!(db.get(&key2).await.unwrap().unwrap(), new_val);
458
459 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
461 let _ = db.apply_batch(merkleized).await.unwrap();
462 db.commit().await.unwrap();
463
464 assert!(db.get(&key1).await.unwrap().is_some());
466
467 assert!(db.get(&key1).await.unwrap().is_some());
469 let merkleized = db
470 .new_batch()
471 .write(key1.clone(), None)
472 .merkleize(&db, None)
473 .await
474 .unwrap();
475 db.apply_batch(merkleized).await.unwrap();
476 db.commit().await.unwrap();
477 assert!(db.get(&key2).await.unwrap().is_some());
478 let merkleized = db
479 .new_batch()
480 .write(key2.clone(), None)
481 .merkleize(&db, None)
482 .await
483 .unwrap();
484 db.apply_batch(merkleized).await.unwrap();
485 db.commit().await.unwrap();
486 assert!(db.get(&key1).await.unwrap().is_none());
487 assert!(db.get(&key2).await.unwrap().is_none());
488
489 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
491 let _ = db.apply_batch(merkleized).await.unwrap();
492 db.commit().await.unwrap();
493
494 assert!(db.get(&key1).await.unwrap().is_none());
496
497 let key3 = FixedBytes::from([6u8; 4]);
499 assert!(db.get(&key3).await.unwrap().is_none());
500
501 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
503 let _ = db.apply_batch(merkleized).await.unwrap();
504 db.commit().await.unwrap();
505 let op_count = db.bounds().await.end;
506 let root = db.root();
507 let mut db = reopen_db(context.with_label("reopen1")).await;
508 assert_eq!(db.bounds().await.end, op_count);
509 assert_eq!(db.root(), root);
510
511 let merkleized = db
513 .new_batch()
514 .write(key1.clone(), Some(val1))
515 .merkleize(&db, None)
516 .await
517 .unwrap();
518 db.apply_batch(merkleized).await.unwrap();
519 db.commit().await.unwrap();
520
521 let merkleized = db
522 .new_batch()
523 .write(key2.clone(), Some(val2))
524 .merkleize(&db, None)
525 .await
526 .unwrap();
527 db.apply_batch(merkleized).await.unwrap();
528 db.commit().await.unwrap();
529
530 let merkleized = db
531 .new_batch()
532 .write(key1.clone(), None)
533 .merkleize(&db, None)
534 .await
535 .unwrap();
536 db.apply_batch(merkleized).await.unwrap();
537 db.commit().await.unwrap();
538
539 let merkleized = db
540 .new_batch()
541 .write(key2.clone(), Some(val1))
542 .merkleize(&db, None)
543 .await
544 .unwrap();
545 db.apply_batch(merkleized).await.unwrap();
546 db.commit().await.unwrap();
547
548 let merkleized = db
549 .new_batch()
550 .write(key1.clone(), Some(val2))
551 .merkleize(&db, None)
552 .await
553 .unwrap();
554 db.apply_batch(merkleized).await.unwrap();
555 db.commit().await.unwrap();
556
557 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
559 let _ = db.apply_batch(merkleized).await.unwrap();
560 db.commit().await.unwrap();
561
562 let op_count = db.bounds().await.end;
564 let root = db.root();
565 let mut db = reopen_db(context.with_label("reopen2")).await;
566
567 assert_eq!(db.root(), root);
568 assert_eq!(db.bounds().await.end, op_count);
569
570 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
573 let _ = db.apply_batch(merkleized).await.unwrap();
574 db.commit().await.unwrap();
575
576 assert!(db.root() != root);
577
578 let root = db.root();
580 db.prune(db.inactivity_floor_loc().await).await.unwrap();
581 assert_eq!(db.root(), root);
582
583 db.destroy().await.unwrap();
584 }
585
586 pub(crate) async fn test_ordered_any_update_collision_edge_case<
589 D: DbAny<mmr::Family, Key = FixedBytes<4>, Value = Digest, Digest = Digest>,
590 >(
591 mut db: D,
592 ) {
593 let key1 = FixedBytes::from([0xFFu8, 0xFFu8, 5u8, 5u8]);
596 let key2 = FixedBytes::from([0xFFu8, 0xFFu8, 6u8, 6u8]);
597 let key3 = FixedBytes::from([0xFFu8, 0xFFu8, 0u8, 0u8]);
599 let val = Sha256::fill(1u8);
600
601 let merkleized = db
602 .new_batch()
603 .write(key1.clone(), Some(val))
604 .write(key2.clone(), Some(val))
605 .write(key3.clone(), Some(val))
606 .merkleize(&db, None)
607 .await
608 .unwrap();
609 db.apply_batch(merkleized).await.unwrap();
610
611 assert_eq!(db.get(&key1).await.unwrap().unwrap(), val);
612 assert_eq!(db.get(&key2).await.unwrap().unwrap(), val);
613 assert_eq!(db.get(&key3).await.unwrap().unwrap(), val);
614
615 let merkleized = db.new_batch().merkleize(&db, None).await.unwrap();
616 let _ = db.apply_batch(merkleized).await.unwrap();
617 db.commit().await.unwrap();
618 db.destroy().await.unwrap();
619 }
620}