1use crate::{
11 adb::{operation::fixed::FixedOperation, Error},
12 index::{Cursor, Index},
13 journal::contiguous::fixed::{Config as JConfig, Journal},
14 mmr::{
15 bitmap::BitMap,
16 journaled::{Config as MmrConfig, Mmr},
17 Location, Position, Proof, StandardHasher as Standard,
18 },
19 translator::Translator,
20};
21use commonware_codec::Encode as _;
22use commonware_cryptography::Hasher as CHasher;
23use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage, ThreadPool};
24use futures::{
25 future::{try_join_all, TryFutureExt as _},
26 try_join,
27};
28use std::num::{NonZeroU64, NonZeroUsize};
29use tracing::{debug, warn};
30
31pub mod ordered;
32pub mod sync;
33pub mod unordered;
34
35const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
38
39#[derive(Clone)]
41pub struct Config<T: Translator> {
42 pub mmr_journal_partition: String,
44
45 pub mmr_items_per_blob: NonZeroU64,
47
48 pub mmr_write_buffer: NonZeroUsize,
50
51 pub mmr_metadata_partition: String,
53
54 pub log_journal_partition: String,
56
57 pub log_items_per_blob: NonZeroU64,
59
60 pub log_write_buffer: NonZeroUsize,
62
63 pub translator: T,
65
66 pub thread_pool: Option<ThreadPool>,
68
69 pub buffer_pool: PoolRef,
71}
72
73pub(crate) async fn init_mmr_and_log<
77 E: Storage + Clock + Metrics,
78 O: FixedOperation,
79 H: CHasher,
80 T: Translator,
81>(
82 context: E,
83 cfg: Config<T>,
84 hasher: &mut Standard<H>,
85) -> Result<(Location, Mmr<E, H>, Journal<E, O>), Error> {
86 let mut mmr = Mmr::init(
87 context.with_label("mmr"),
88 hasher,
89 MmrConfig {
90 journal_partition: cfg.mmr_journal_partition,
91 metadata_partition: cfg.mmr_metadata_partition,
92 items_per_blob: cfg.mmr_items_per_blob,
93 write_buffer: cfg.mmr_write_buffer,
94 thread_pool: cfg.thread_pool,
95 buffer_pool: cfg.buffer_pool.clone(),
96 },
97 )
98 .await?;
99
100 let mut log: Journal<E, O> = Journal::init(
101 context.with_label("log"),
102 JConfig {
103 partition: cfg.log_journal_partition,
104 items_per_blob: cfg.log_items_per_blob,
105 write_buffer: cfg.log_write_buffer,
106 buffer_pool: cfg.buffer_pool,
107 },
108 )
109 .await?;
110
111 let mut log_size: Location = log.size().await.into();
113 let mut rewind_leaf_num = log_size;
114 let mut inactivity_floor_loc = Location::new_unchecked(0);
115 while rewind_leaf_num > 0 {
116 let op = log.read(rewind_leaf_num.as_u64() - 1).await?;
117 if let Some(loc) = op.commit_floor() {
118 inactivity_floor_loc = loc;
119 break;
120 }
121 rewind_leaf_num -= 1;
122 }
123 if rewind_leaf_num != log_size {
124 let op_count = log_size - rewind_leaf_num;
125 warn!(
126 ?log_size,
127 ?op_count,
128 "rewinding over uncommitted log operations"
129 );
130 log.rewind(rewind_leaf_num.as_u64()).await?;
131 log.sync().await?;
132 log_size = rewind_leaf_num;
133 }
134
135 let mut next_mmr_leaf_num = mmr.leaves();
137 if next_mmr_leaf_num > log_size {
138 let op_count = next_mmr_leaf_num - log_size;
139 warn!(?log_size, ?op_count, "popping uncommitted MMR operations");
140 mmr.pop(op_count.as_u64() as usize).await?;
141 next_mmr_leaf_num = log_size;
142 }
143
144 if next_mmr_leaf_num < log_size {
146 let op_count = log_size - next_mmr_leaf_num;
147 warn!(
148 ?log_size,
149 ?op_count,
150 "MMR lags behind log, replaying log to catch up"
151 );
152 while next_mmr_leaf_num < log_size {
153 let op = log.read(next_mmr_leaf_num.as_u64()).await?;
154 mmr.add_batched(hasher, &op.encode()).await?;
155 next_mmr_leaf_num += 1;
156 }
157 mmr.sync(hasher).await.map_err(Error::Mmr)?;
158 }
159
160 assert_eq!(log.size().await, mmr.leaves());
162
163 Ok((inactivity_floor_loc, mmr, log))
164}
165
166async fn prune_db<E, O, H>(
175 mmr: &mut Mmr<E, H>,
176 log: &mut Journal<E, O>,
177 hasher: &mut Standard<H>,
178 target_prune_loc: Location,
179 inactivity_floor_loc: Location,
180 op_count: Location,
181) -> Result<(), Error>
182where
183 E: Storage + Clock + Metrics,
184 O: FixedOperation,
185 H: CHasher,
186{
187 if target_prune_loc > inactivity_floor_loc {
188 return Err(crate::mmr::Error::RangeOutOfBounds(target_prune_loc).into());
189 }
190 let target_prune_pos = Position::try_from(target_prune_loc)?;
191
192 if mmr.size() == 0 {
193 return Ok(());
195 };
196
197 mmr.sync(hasher).await?;
201
202 if !log.prune(target_prune_loc.as_u64()).await? {
203 return Ok(());
204 }
205
206 debug!(
207 log_size = op_count.as_u64(),
208 ?target_prune_loc,
209 "pruned inactive ops"
210 );
211
212 mmr.prune_to_pos(hasher, target_prune_pos).await?;
213
214 Ok(())
215}
216
217async fn historical_proof<E, O, H>(
229 mmr: &Mmr<E, H>,
230 log: &Journal<E, O>,
231 op_count: Location,
232 start_loc: Location,
233 max_ops: NonZeroU64,
234) -> Result<(Proof<H::Digest>, Vec<O>), Error>
235where
236 E: Storage + Clock + Metrics,
237 O: FixedOperation,
238 H: CHasher,
239{
240 let size = Location::new_unchecked(log.size().await);
241 if op_count > size {
242 return Err(crate::mmr::Error::RangeOutOfBounds(size).into());
243 }
244 if start_loc >= op_count {
245 return Err(crate::mmr::Error::RangeOutOfBounds(start_loc).into());
246 }
247 let end_loc = std::cmp::min(op_count, start_loc.saturating_add(max_ops.get()));
248
249 let mmr_size = Position::try_from(op_count)?;
250 let proof = mmr
251 .historical_range_proof(mmr_size, start_loc..end_loc)
252 .await?;
253
254 let mut ops = Vec::with_capacity((end_loc.as_u64() - start_loc.as_u64()) as usize);
255 let futures = (start_loc.as_u64()..end_loc.as_u64())
256 .map(|i| log.read(i))
257 .collect::<Vec<_>>();
258 try_join_all(futures)
259 .await?
260 .into_iter()
261 .for_each(|op| ops.push(op));
262
263 Ok((proof, ops))
264}
265
266async fn update_loc<E, I: Index<Value = Location>, O>(
269 snapshot: &mut I,
270 log: &Journal<E, O>,
271 key: &<O as FixedOperation>::Key,
272 new_loc: Location,
273) -> Result<Option<Location>, Error>
274where
275 E: Storage + Clock + Metrics,
276 O: FixedOperation,
277{
278 let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
281 return Ok(None);
282 };
283
284 if let Some(loc) = find_update_op(log, &mut cursor, key).await? {
286 assert!(new_loc > loc);
287 cursor.update(new_loc);
288 return Ok(Some(loc));
289 }
290
291 cursor.insert(new_loc);
293
294 Ok(None)
295}
296
297async fn delete_key<E, I, O>(
300 snapshot: &mut I,
301 log: &Journal<E, O>,
302 key: &O::Key,
303) -> Result<Option<Location>, Error>
304where
305 E: Storage + Clock + Metrics,
306 I: Index<Value = Location>,
307 O: FixedOperation,
308{
309 let Some(mut cursor) = snapshot.get_mut(key) else {
311 return Ok(None);
312 };
313
314 let Some(loc) = find_update_op(log, &mut cursor, key).await? else {
316 return Ok(None);
317 };
318 cursor.delete();
319
320 Ok(Some(loc))
321}
322
323async fn find_update_op<E, C, O>(
326 log: &Journal<E, O>,
327 cursor: &mut C,
328 key: &<O as FixedOperation>::Key,
329) -> Result<Option<Location>, Error>
330where
331 E: Storage + Clock + Metrics,
332 C: Cursor<Value = Location>,
333 O: FixedOperation,
334{
335 while let Some(&loc) = cursor.next() {
336 let op = log.read(*loc).await?;
337 let k = op.key().expect("operation without key");
338 if *k == *key {
339 return Ok(Some(loc));
340 }
341 }
342
343 Ok(None)
344}
345
346pub(crate) struct Shared<
348 'a,
349 E: Storage + Clock + Metrics,
350 I: Index<Value = Location>,
351 O: FixedOperation,
352 H: CHasher,
353> {
354 pub snapshot: &'a mut I,
355 pub mmr: &'a mut Mmr<E, H>,
356 pub log: &'a mut Journal<E, O>,
357 pub hasher: &'a mut Standard<H>,
358}
359
360impl<E, I, O, H> Shared<'_, E, I, O, H>
361where
362 E: Storage + Clock + Metrics,
363 I: Index<Value = Location>,
364 O: FixedOperation,
365 H: CHasher,
366{
367 pub(crate) async fn apply_op(&mut self, op: O) -> Result<(), Error> {
370 let encoded_op = op.encode();
371
372 try_join!(
374 self.mmr
375 .add_batched(self.hasher, &encoded_op)
376 .map_err(Error::Mmr),
377 self.log.append(op).map_err(Error::Journal)
378 )?;
379
380 Ok(())
381 }
382
383 pub(crate) async fn move_op_if_active(
387 &mut self,
388 op: O,
389 old_loc: Location,
390 ) -> Result<Option<Location>, Error> {
391 let Some(key) = op.key() else {
393 return Ok(None); };
395 let Some(mut cursor) = self.snapshot.get_mut(key) else {
396 return Ok(None);
397 };
398
399 if cursor.find(|&loc| *loc == old_loc) {
401 let tip_loc = Location::new_unchecked(self.log.size().await);
403 cursor.update(tip_loc);
404 drop(cursor);
405
406 self.apply_op(op).await?;
408 return Ok(Some(old_loc));
409 }
410
411 Ok(None)
413 }
414
415 async fn raise_floor(&mut self, mut inactivity_floor_loc: Location) -> Result<Location, Error>
424 where
425 E: Storage + Clock + Metrics,
426 I: Index<Value = Location>,
427 H: CHasher,
428 O: FixedOperation,
429 {
430 loop {
434 let tip_loc = Location::new_unchecked(self.log.size().await);
435 assert!(
436 *inactivity_floor_loc < tip_loc,
437 "no active operations above the inactivity floor"
438 );
439 let old_loc = inactivity_floor_loc;
440 inactivity_floor_loc += 1;
441 let op = self.log.read(*old_loc).await?;
442 if self.move_op_if_active(op, old_loc).await?.is_some() {
443 return Ok(inactivity_floor_loc);
444 }
445 }
446 }
447
448 pub(crate) async fn raise_floor_with_bitmap<const N: usize>(
455 &mut self,
456 status: &mut BitMap<H, N>,
457 mut inactivity_floor_loc: Location,
458 ) -> Result<Location, Error>
459 where
460 E: Storage + Clock + Metrics,
461 I: Index<Value = Location>,
462 O: FixedOperation,
463 H: CHasher,
464 {
465 while !status.get_bit(*inactivity_floor_loc) {
467 inactivity_floor_loc += 1;
468 }
469
470 let op = self.log.read(*inactivity_floor_loc).await?;
472 let loc = self
473 .move_op_if_active(op, inactivity_floor_loc)
474 .await?
475 .expect("op should be active based on status bitmap");
476 status.set_bit(*loc, false);
477 status.push(true);
478
479 inactivity_floor_loc += 1;
481
482 Ok(inactivity_floor_loc)
483 }
484
485 async fn sync_and_process_updates(&mut self) -> Result<(), Error> {
487 let mmr_fut = async {
488 self.mmr.merkleize(self.hasher);
489 Ok::<(), Error>(())
490 };
491 try_join!(self.log.sync().map_err(Error::Journal), mmr_fut)?;
492
493 Ok(())
494 }
495
496 async fn sync(&mut self) -> Result<(), Error> {
498 try_join!(
499 self.log.sync().map_err(Error::Journal),
500 self.mmr.sync(self.hasher).map_err(Error::Mmr)
501 )?;
502
503 Ok(())
504 }
505}