1use super::Immutable;
4use crate::{
5 journal::{authenticated, contiguous::Mutable, Error as JournalError},
6 merkle::{Family, Location},
7 qmdb::{any::ValueEncoding, immutable::operation::Operation, operation::Key, Error},
8 translator::Translator,
9 Context, Persistable,
10};
11use commonware_codec::EncodeShared;
12use commonware_cryptography::{Digest, Hasher as CHasher};
13use core::iter;
14use std::{
15 collections::BTreeMap,
16 sync::{Arc, Weak},
17};
18
19#[derive(Clone)]
21pub(crate) struct DiffEntry<F: Family, V> {
22 pub(crate) value: V,
23 pub(crate) loc: Location<F>,
24}
25
26#[allow(clippy::type_complexity)]
32pub struct UnmerkleizedBatch<F, H, K, V>
33where
34 F: Family,
35 K: Key,
36 V: ValueEncoding,
37 H: CHasher,
38{
39 journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<K, V>>,
41
42 mutations: BTreeMap<K, V::Value>,
44
45 parent: Option<Arc<MerkleizedBatch<F, H::Digest, K, V>>>,
47
48 base_size: u64,
51
52 db_size: u64,
54}
55
56#[derive(Clone)]
59pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding> {
60 pub(super) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<K, V>>>,
62
63 pub(super) diff: Arc<BTreeMap<K, DiffEntry<F, V::Value>>>,
65
66 pub(super) parent: Option<Weak<Self>>,
68
69 pub(super) base_size: u64,
71
72 pub(super) total_size: u64,
74
75 pub(super) db_size: u64,
77
78 #[allow(clippy::type_complexity)]
82 pub(super) ancestor_diffs: Vec<Arc<BTreeMap<K, DiffEntry<F, V::Value>>>>,
83
84 pub(super) ancestor_diff_ends: Vec<u64>,
88}
89
90impl<F, H, K, V> UnmerkleizedBatch<F, H, K, V>
91where
92 F: Family,
93 K: Key,
94 V: ValueEncoding,
95 H: CHasher,
96 Operation<K, V>: EncodeShared,
97{
98 pub(super) fn new<E, C, T>(
100 immutable: &Immutable<F, E, K, V, C, H, T>,
101 journal_size: u64,
102 ) -> Self
103 where
104 E: Context,
105 C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
106 C::Item: EncodeShared,
107 T: Translator,
108 {
109 Self {
110 journal_batch: immutable.journal.new_batch(),
111 mutations: BTreeMap::new(),
112 parent: None,
113 base_size: journal_size,
114 db_size: journal_size,
115 }
116 }
117
118 pub fn set(mut self, key: K, value: V::Value) -> Self {
123 self.mutations.insert(key, value);
124 self
125 }
126
127 pub async fn get<E, C, T>(
129 &self,
130 key: &K,
131 db: &Immutable<F, E, K, V, C, H, T>,
132 ) -> Result<Option<V::Value>, Error<F>>
133 where
134 E: Context,
135 C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
136 C::Item: EncodeShared,
137 T: Translator,
138 {
139 if let Some(value) = self.mutations.get(key) {
141 return Ok(Some(value.clone()));
142 }
143 if let Some(parent) = self.parent.as_ref() {
146 if let Some(entry) = parent.diff.get(key) {
147 return Ok(Some(entry.value.clone()));
148 }
149 for batch in parent.ancestors() {
150 if let Some(entry) = batch.diff.get(key) {
151 return Ok(Some(entry.value.clone()));
152 }
153 }
154 }
155 db.get(key).await
157 }
158
159 pub fn merkleize<E, C, T>(
161 self,
162 db: &Immutable<F, E, K, V, C, H, T>,
163 metadata: Option<V::Value>,
164 ) -> Arc<MerkleizedBatch<F, H::Digest, K, V>>
165 where
166 E: Context,
167 C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
168 C::Item: EncodeShared,
169 T: Translator,
170 {
171 let base = self.base_size;
172
173 let mut ops: Vec<Operation<K, V>> = Vec::with_capacity(self.mutations.len() + 1);
175 let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
176
177 for (key, value) in self.mutations {
178 let loc = Location::new(base + ops.len() as u64);
179 ops.push(Operation::Set(key.clone(), value.clone()));
180 diff.insert(key, DiffEntry { value, loc });
181 }
182
183 ops.push(Operation::Commit(metadata));
184
185 let total_size = base + ops.len() as u64;
186
187 let mut journal_batch = self.journal_batch;
189 for op in &ops {
190 journal_batch = journal_batch.add(op.clone());
191 }
192 let journal_merkleized = db.journal.with_mem(|mem| journal_batch.merkleize(mem));
193
194 let mut ancestor_diffs = Vec::new();
195 let mut ancestor_diff_ends = Vec::new();
196 if let Some(parent) = &self.parent {
197 ancestor_diffs.push(Arc::clone(&parent.diff));
198 ancestor_diff_ends.push(parent.total_size);
199 for batch in parent.ancestors() {
200 ancestor_diffs.push(Arc::clone(&batch.diff));
201 ancestor_diff_ends.push(batch.total_size);
202 }
203 }
204
205 Arc::new(MerkleizedBatch {
206 journal_batch: journal_merkleized,
207 diff: Arc::new(diff),
208 parent: self.parent.as_ref().map(Arc::downgrade),
209 base_size: self.base_size,
210 total_size,
211 db_size: self.db_size,
212 ancestor_diffs,
213 ancestor_diff_ends,
214 })
215 }
216}
217
218impl<F: Family, D: Digest, K: Key, V: ValueEncoding> MerkleizedBatch<F, D, K, V>
219where
220 Operation<K, V>: EncodeShared,
221{
222 pub fn root(&self) -> D {
224 self.journal_batch.root()
225 }
226
227 pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
229 let mut next = self.parent.as_ref().and_then(Weak::upgrade);
230 iter::from_fn(move || {
231 let batch = next.take()?;
232 next = batch.parent.as_ref().and_then(Weak::upgrade);
233 Some(batch)
234 })
235 }
236
237 pub async fn get<E, C, H, T>(
239 &self,
240 key: &K,
241 db: &Immutable<F, E, K, V, C, H, T>,
242 ) -> Result<Option<V::Value>, Error<F>>
243 where
244 E: Context,
245 C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
246 C::Item: EncodeShared,
247 H: CHasher<Digest = D>,
248 T: Translator,
249 {
250 if let Some(entry) = self.diff.get(key) {
251 return Ok(Some(entry.value.clone()));
252 }
253 for batch in self.ancestors() {
254 if let Some(entry) = batch.diff.get(key) {
255 return Ok(Some(entry.value.clone()));
256 }
257 }
258 db.get(key).await
259 }
260
261 pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, K, V>
267 where
268 H: CHasher<Digest = D>,
269 {
270 UnmerkleizedBatch {
271 journal_batch: self.journal_batch.new_batch::<H>(),
272 mutations: BTreeMap::new(),
273 parent: Some(Arc::clone(self)),
274 base_size: self.total_size,
275 db_size: self.db_size,
276 }
277 }
278}
279
280impl<F, E, K, V, C, H, T> Immutable<F, E, K, V, C, H, T>
281where
282 F: Family,
283 E: Context,
284 K: Key,
285 V: ValueEncoding,
286 C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
287 C::Item: EncodeShared,
288 H: CHasher,
289 T: Translator,
290{
291 pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, K, V>> {
293 let journal_size = *self.last_commit_loc + 1;
294 Arc::new(MerkleizedBatch {
295 journal_batch: self.journal.to_merkleized_batch(),
296 diff: Arc::new(BTreeMap::new()),
297 parent: None,
298 base_size: journal_size,
299 total_size: journal_size,
300 db_size: journal_size,
301 ancestor_diffs: Vec::new(),
302 ancestor_diff_ends: Vec::new(),
303 })
304 }
305}