1use super::Immutable;
4use crate::{
5 journal::{authenticated, contiguous::Mutable, Error as JournalError},
6 merkle::{Family, Location},
7 qmdb::{
8 any::{batch::lookup_sorted, ValueEncoding},
9 batch_chain::{self, Bounds},
10 immutable::operation::Operation,
11 operation::Key,
12 Error,
13 },
14 translator::Translator,
15 Context, Persistable,
16};
17use commonware_codec::EncodeShared;
18use commonware_cryptography::{Digest, Hasher as CHasher};
19use commonware_parallel::Strategy;
20use std::{
21 collections::BTreeMap,
22 sync::{Arc, Weak},
23};
24
25type DiffVec<K, F, V> = Vec<(K, DiffEntry<F, V>)>;
26
27#[derive(Clone)]
29pub(crate) struct DiffEntry<F: Family, V> {
30 pub(crate) value: V,
31 pub(crate) loc: Location<F>,
32}
33
34#[allow(clippy::type_complexity)]
40pub struct UnmerkleizedBatch<F, H, K, V, S: Strategy>
41where
42 F: Family,
43 K: Key,
44 V: ValueEncoding,
45 H: CHasher,
46{
47 journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<F, K, V>, S>,
49
50 mutations: BTreeMap<K, V::Value>,
52
53 parent: Option<Arc<MerkleizedBatch<F, H::Digest, K, V, S>>>,
55
56 base_size: u64,
59
60 db_size: u64,
62}
63
64type JournalBatch<F, D, K, V, S> = Arc<authenticated::MerkleizedBatch<F, D, Operation<F, K, V>, S>>;
66
67#[derive(Clone)]
70pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy> {
71 pub(super) journal_batch: JournalBatch<F, D, K, V, S>,
73
74 pub(super) root: D,
76
77 pub(super) diff: Arc<DiffVec<K, F, V::Value>>,
80
81 pub(super) parent: Option<Weak<Self>>,
83
84 pub(super) ancestor_diffs: Vec<Arc<DiffVec<K, F, V::Value>>>,
88
89 pub(super) bounds: batch_chain::Bounds<F>,
91}
92
93impl<F, H, K, V, S: Strategy> UnmerkleizedBatch<F, H, K, V, S>
94where
95 F: Family,
96 K: Key,
97 V: ValueEncoding,
98 H: CHasher,
99 Operation<F, K, V>: EncodeShared,
100{
101 pub(super) fn new<E, C, T>(
103 immutable: &Immutable<F, E, K, V, C, H, T, S>,
104 journal_size: u64,
105 ) -> Self
106 where
107 E: Context,
108 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
109 C::Item: EncodeShared,
110 T: Translator,
111 {
112 Self {
113 journal_batch: immutable.journal.new_batch(),
114 mutations: BTreeMap::new(),
115 parent: None,
116 base_size: journal_size,
117 db_size: journal_size,
118 }
119 }
120
121 pub fn set(mut self, key: K, value: V::Value) -> Self {
126 self.mutations.insert(key, value);
127 self
128 }
129
130 pub async fn get<E, C, T>(
132 &self,
133 key: &K,
134 db: &Immutable<F, E, K, V, C, H, T, S>,
135 ) -> Result<Option<V::Value>, Error<F>>
136 where
137 E: Context,
138 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
139 C::Item: EncodeShared,
140 T: Translator,
141 {
142 if let Some(value) = self.mutations.get(key) {
144 return Ok(Some(value.clone()));
145 }
146 if let Some(parent) = self.parent.as_ref() {
149 if let Some(entry) = lookup_sorted(parent.diff.as_slice(), key) {
150 return Ok(Some(entry.value.clone()));
151 }
152 for batch in parent.ancestors() {
153 if let Some(entry) = lookup_sorted(batch.diff.as_slice(), key) {
154 return Ok(Some(entry.value.clone()));
155 }
156 }
157 }
158 db.get(key).await
160 }
161
162 pub async fn get_many<E, C, T>(
166 &self,
167 keys: &[&K],
168 db: &Immutable<F, E, K, V, C, H, T, S>,
169 ) -> Result<Vec<Option<V::Value>>, Error<F>>
170 where
171 E: Context,
172 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
173 C::Item: EncodeShared,
174 T: Translator,
175 {
176 if keys.is_empty() {
177 return Ok(Vec::new());
178 }
179
180 let mut results: Vec<Option<V::Value>> = Vec::with_capacity(keys.len());
181 let mut db_indices = Vec::new();
182 let mut db_keys = Vec::new();
183
184 for (i, key) in keys.iter().enumerate() {
185 if let Some(value) = self.mutations.get(*key) {
187 results.push(Some(value.clone()));
188 continue;
189 }
190
191 let mut found = false;
193 if let Some(parent) = self.parent.as_ref() {
194 if let Some(entry) = lookup_sorted(parent.diff.as_slice(), *key) {
195 results.push(Some(entry.value.clone()));
196 found = true;
197 }
198 if !found {
199 for batch in parent.ancestors() {
200 if let Some(entry) = lookup_sorted(batch.diff.as_slice(), *key) {
201 results.push(Some(entry.value.clone()));
202 found = true;
203 break;
204 }
205 }
206 }
207 }
208
209 if found {
210 continue;
211 }
212
213 db_indices.push(i);
215 db_keys.push(*key);
216 results.push(None);
217 }
218
219 if !db_keys.is_empty() {
220 let db_results = db.get_many(&db_keys).await?;
221 for (slot, value) in db_indices.into_iter().zip(db_results) {
222 results[slot] = value;
223 }
224 }
225
226 Ok(results)
227 }
228
229 pub fn merkleize<E, C, T>(
234 self,
235 db: &Immutable<F, E, K, V, C, H, T, S>,
236 metadata: Option<V::Value>,
237 inactivity_floor: Location<F>,
238 ) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>>
239 where
240 E: Context,
241 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
242 C::Item: EncodeShared,
243 T: Translator,
244 {
245 let base = self.base_size;
246
247 let mut ops: Vec<Operation<F, K, V>> = Vec::with_capacity(self.mutations.len() + 1);
250 let mut diff: DiffVec<K, F, V::Value> = Vec::with_capacity(self.mutations.len());
251
252 for (key, value) in self.mutations {
253 let loc = Location::new(base + ops.len() as u64);
254 ops.push(Operation::Set(key.clone(), value.clone()));
255 diff.push((key, DiffEntry { value, loc }));
256 }
257 assert!(diff.is_sorted_by(|a, b| a.0 < b.0));
258
259 ops.push(Operation::Commit(metadata, inactivity_floor));
260
261 let total_size = base + ops.len() as u64;
262
263 let mut journal_batch = self.journal_batch;
265 for op in &ops {
266 journal_batch = journal_batch.add(op.clone());
267 }
268 let inactive_peaks = F::inactive_peaks(
269 F::location_to_position(Location::new(total_size)),
270 inactivity_floor,
271 );
272 let journal_merkleized = db.journal.with_mem(|mem| journal_batch.merkleize(mem));
273 let root = db
274 .journal
275 .with_mem(|mem| journal_merkleized.root(mem, &db.journal.hasher, inactive_peaks))
276 .expect("inactive_peaks computed from batch size");
277
278 let mut ancestor_diffs = Vec::new();
279 let mut ancestors = Vec::new();
280 for batch in
281 batch_chain::parent_and_ancestors(self.parent.as_ref(), |parent| parent.ancestors())
282 {
283 ancestor_diffs.push(Arc::clone(&batch.diff));
284 ancestors.push(batch_chain::AncestorBounds {
285 floor: batch.bounds.inactivity_floor,
286 end: batch.bounds.total_size,
287 });
288 }
289
290 Arc::new(MerkleizedBatch {
291 journal_batch: journal_merkleized,
292 root,
293 diff: Arc::new(diff),
294 parent: self.parent.as_ref().map(Arc::downgrade),
295 ancestor_diffs,
296 bounds: batch_chain::Bounds {
297 base_size: self.base_size,
298 db_size: self.db_size,
299 total_size,
300 ancestors,
301 inactivity_floor,
302 },
303 })
304 }
305}
306
307impl<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy> MerkleizedBatch<F, D, K, V, S>
308where
309 Operation<F, K, V>: EncodeShared,
310{
311 pub const fn root(&self) -> D {
313 self.root
314 }
315
316 pub const fn bounds(&self) -> &Bounds<F> {
318 &self.bounds
319 }
320
321 pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
323 batch_chain::ancestors(self.parent.clone(), |batch| batch.parent.as_ref())
324 }
325
326 pub async fn get<E, C, H, T>(
328 &self,
329 key: &K,
330 db: &Immutable<F, E, K, V, C, H, T, S>,
331 ) -> Result<Option<V::Value>, Error<F>>
332 where
333 E: Context,
334 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
335 C::Item: EncodeShared,
336 H: CHasher<Digest = D>,
337 T: Translator,
338 {
339 if let Some(entry) = lookup_sorted(self.diff.as_slice(), key) {
340 return Ok(Some(entry.value.clone()));
341 }
342 for batch in self.ancestors() {
343 if let Some(entry) = lookup_sorted(batch.diff.as_slice(), key) {
344 return Ok(Some(entry.value.clone()));
345 }
346 }
347 db.get(key).await
348 }
349
350 pub async fn get_many<E, C, H, T>(
354 &self,
355 keys: &[&K],
356 db: &Immutable<F, E, K, V, C, H, T, S>,
357 ) -> Result<Vec<Option<V::Value>>, Error<F>>
358 where
359 E: Context,
360 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
361 C::Item: EncodeShared,
362 H: CHasher<Digest = D>,
363 T: Translator,
364 {
365 if keys.is_empty() {
366 return Ok(Vec::new());
367 }
368
369 let mut results: Vec<Option<V::Value>> = Vec::with_capacity(keys.len());
370 let mut db_indices = Vec::new();
371 let mut db_keys = Vec::new();
372
373 for (i, key) in keys.iter().enumerate() {
374 if let Some(entry) = lookup_sorted(self.diff.as_slice(), *key) {
376 results.push(Some(entry.value.clone()));
377 continue;
378 }
379
380 let mut found = false;
382 for batch in self.ancestors() {
383 if let Some(entry) = lookup_sorted(batch.diff.as_slice(), *key) {
384 results.push(Some(entry.value.clone()));
385 found = true;
386 break;
387 }
388 }
389
390 if found {
391 continue;
392 }
393
394 db_indices.push(i);
396 db_keys.push(*key);
397 results.push(None);
398 }
399
400 if !db_keys.is_empty() {
401 let db_results = db.get_many(&db_keys).await?;
402 for (slot, value) in db_indices.into_iter().zip(db_results) {
403 results[slot] = value;
404 }
405 }
406
407 Ok(results)
408 }
409
410 pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, K, V, S>
416 where
417 H: CHasher<Digest = D>,
418 {
419 UnmerkleizedBatch {
420 journal_batch: self.journal_batch.new_batch::<H>(),
421 mutations: BTreeMap::new(),
422 parent: Some(Arc::clone(self)),
423 base_size: self.bounds.total_size,
424 db_size: self.bounds.db_size,
425 }
426 }
427}
428
429impl<F, E, K, V, C, H, T, S> Immutable<F, E, K, V, C, H, T, S>
430where
431 F: Family,
432 E: Context,
433 K: Key,
434 V: ValueEncoding,
435 C: Mutable<Item = Operation<F, K, V>> + Persistable<Error = JournalError>,
436 C::Item: EncodeShared,
437 H: CHasher,
438 T: Translator,
439 S: Strategy,
440{
441 pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>> {
443 let journal_size = *self.last_commit_loc + 1;
444 Arc::new(MerkleizedBatch {
445 journal_batch: self.journal.to_merkleized_batch(),
446 root: self.root,
447 diff: Arc::new(Vec::new()),
448 parent: None,
449 ancestor_diffs: Vec::new(),
450 bounds: batch_chain::Bounds {
451 base_size: journal_size,
452 db_size: journal_size,
453 total_size: journal_size,
454 ancestors: Vec::new(),
455 inactivity_floor: self.inactivity_floor_loc,
456 },
457 })
458 }
459}