1use crate::{
7 index::ordered::Index,
8 journal::contiguous::{Contiguous, MutableContiguous},
9 kv::{self, Batchable},
10 mmr::{grafting::Storage as GraftingStorage, Location},
11 qmdb::{
12 any::{
13 ordered::{Operation, Update},
14 ValueEncoding,
15 },
16 current::{
17 db::{Merkleized, State, Unmerkleized},
18 proof::OperationProof,
19 },
20 store::{self, LogStore as _},
21 DurabilityState, Durable, Error, NonDurable,
22 },
23 translator::Translator,
24};
25use commonware_codec::Codec;
26use commonware_cryptography::{Digest, DigestOf, Hasher};
27use commonware_runtime::{Clock, Metrics, Storage};
28use commonware_utils::Array;
29use futures::stream::Stream;
30
31#[derive(Clone, Eq, PartialEq, Debug)]
33pub struct KeyValueProof<K: Array, D: Digest, const N: usize> {
34 pub proof: OperationProof<D, N>,
35 pub next_key: K,
36}
37
38pub type Db<E, C, K, V, H, T, const N: usize, S = Merkleized<DigestOf<H>>, D = Durable> =
40 super::super::db::Db<E, C, Index<T, Location>, H, Update<K, V>, N, S, D>;
41
42impl<
44 E: Storage + Clock + Metrics,
45 C: Contiguous<Item = Operation<K, V>>,
46 K: Array,
47 V: ValueEncoding,
48 H: Hasher,
49 T: Translator,
50 const N: usize,
51 S: State<DigestOf<H>>,
52 D: DurabilityState,
53 > Db<E, C, K, V, H, T, N, S, D>
54where
55 Operation<K, V>: Codec,
56 V::Value: Send + Sync,
57{
58 pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error> {
60 self.any.get(key).await
61 }
62
63 pub fn verify_key_value_proof(
66 hasher: &mut H,
67 key: K,
68 value: V::Value,
69 proof: &KeyValueProof<K, H::Digest, N>,
70 root: &H::Digest,
71 ) -> bool {
72 let op = Operation::Update(Update {
73 key,
74 value,
75 next_key: proof.next_key.clone(),
76 });
77
78 proof
79 .proof
80 .verify(hasher, Self::grafting_height(), op, root)
81 }
82
83 pub async fn get_span(&self, key: &K) -> Result<Option<(Location, Update<K, V>)>, Error> {
86 self.any.get_span(key).await
87 }
88
89 pub async fn stream_range<'a>(
92 &'a self,
93 start: K,
94 ) -> Result<impl Stream<Item = Result<(K, V::Value), Error>> + 'a, Error>
95 where
96 V: 'a,
97 {
98 self.any.stream_range(start).await
99 }
100
101 pub fn verify_exclusion_proof(
104 hasher: &mut H,
105 key: &K,
106 proof: &super::ExclusionProof<K, V, H::Digest, N>,
107 root: &H::Digest,
108 ) -> bool {
109 let (op_proof, op) = match proof {
110 super::ExclusionProof::KeyValue(op_proof, data) => {
111 if data.key == *key {
112 return false;
114 }
115 if !crate::qmdb::any::db::Db::<
116 E,
117 C,
118 Index<T, Location>,
119 H,
120 Update<K, V>,
121 S::AnyState,
122 D,
123 >::span_contains(&data.key, &data.next_key, key)
124 {
125 return false;
128 }
129
130 (op_proof, Operation::Update(data.clone()))
131 }
132 super::ExclusionProof::Commit(op_proof, metadata) => {
133 let floor_loc = op_proof.loc;
137 (
138 op_proof,
139 Operation::CommitFloor(metadata.clone(), floor_loc),
140 )
141 }
142 };
143
144 op_proof.verify(hasher, Self::grafting_height(), op, root)
145 }
146}
147
148impl<
150 E: Storage + Clock + Metrics,
151 C: MutableContiguous<Item = Operation<K, V>>,
152 K: Array,
153 V: ValueEncoding,
154 H: Hasher,
155 T: Translator,
156 const N: usize,
157 D: store::State,
158 > Db<E, C, K, V, H, T, N, Merkleized<DigestOf<H>>, D>
159where
160 Operation<K, V>: Codec,
161 V::Value: Send + Sync,
162{
163 pub async fn key_value_proof(
171 &self,
172 hasher: &mut H,
173 key: K,
174 ) -> Result<KeyValueProof<K, H::Digest, N>, Error> {
175 let op_loc = self.any.get_with_loc(&key).await?;
176 let Some((data, loc)) = op_loc else {
177 return Err(Error::KeyNotFound);
178 };
179 let height = Self::grafting_height();
180 let mmr = &self.any.log.mmr;
181 let proof =
182 OperationProof::<H::Digest, N>::new(hasher, &self.status, height, mmr, loc).await?;
183
184 Ok(KeyValueProof {
185 proof,
186 next_key: data.next_key,
187 })
188 }
189
190 pub async fn exclusion_proof(
196 &self,
197 hasher: &mut H,
198 key: &K,
199 ) -> Result<super::ExclusionProof<K, V, H::Digest, N>, Error> {
200 let height = Self::grafting_height();
201 let grafted_mmr =
202 GraftingStorage::<'_, H, _, _>::new(&self.status, &self.any.log.mmr, height);
203
204 let span = self.any.get_span(key).await?;
205 let loc = match &span {
206 Some((loc, key_data)) => {
207 if key_data.key == *key {
208 return Err(Error::KeyExists);
210 }
211 *loc
212 }
213 None => self.size().checked_sub(1).expect("db shouldn't be empty"),
214 };
215
216 let op_proof =
217 OperationProof::<H::Digest, N>::new(hasher, &self.status, height, &grafted_mmr, loc)
218 .await?;
219
220 Ok(match span {
221 Some((_, key_data)) => super::ExclusionProof::KeyValue(op_proof, key_data),
222 None => {
223 let value = match self.any.log.read(loc).await? {
224 Operation::CommitFloor(value, _) => value,
225 _ => unreachable!("last commit is not a CommitFloor operation"),
226 };
227 super::ExclusionProof::Commit(op_proof, value)
228 }
229 })
230 }
231}
232
233impl<
235 E: Storage + Clock + Metrics,
236 C: MutableContiguous<Item = Operation<K, V>>,
237 K: Array,
238 V: ValueEncoding,
239 H: Hasher,
240 T: Translator,
241 const N: usize,
242 > Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
243where
244 Operation<K, V>: Codec,
245 V::Value: Send + Sync,
246{
247 pub async fn update(&mut self, key: K, value: V::Value) -> Result<(), Error> {
250 self.any
251 .update_with_callback(key, value, |loc| {
252 self.status.push(true);
253 if let Some(loc) = loc {
254 self.status.set_bit(*loc, false);
255 }
256 })
257 .await
258 }
259
260 pub async fn create(&mut self, key: K, value: V::Value) -> Result<bool, Error> {
264 self.any
265 .create_with_callback(key, value, |loc| {
266 self.status.push(true);
267 if let Some(loc) = loc {
268 self.status.set_bit(*loc, false);
269 }
270 })
271 .await
272 }
273
274 pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
278 let mut r = false;
279 self.any
280 .delete_with_callback(key, |append, loc| {
281 if let Some(loc) = loc {
282 self.status.set_bit(*loc, false);
283 }
284 self.status.push(append);
285 r = true;
286 })
287 .await?;
288
289 Ok(r)
290 }
291}
292
293impl<
295 E: Storage + Clock + Metrics,
296 C: Contiguous<Item = Operation<K, V>>,
297 K: Array,
298 V: ValueEncoding,
299 H: Hasher,
300 T: Translator,
301 const N: usize,
302 S: State<DigestOf<H>>,
303 D: DurabilityState,
304 > kv::Gettable for Db<E, C, K, V, H, T, N, S, D>
305where
306 Operation<K, V>: Codec,
307 V::Value: Send + Sync,
308{
309 type Key = K;
310 type Value = V::Value;
311 type Error = Error;
312
313 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
314 self.get(key).await
315 }
316}
317
318impl<
320 E: Storage + Clock + Metrics,
321 C: MutableContiguous<Item = Operation<K, V>>,
322 K: Array,
323 V: ValueEncoding,
324 H: Hasher,
325 T: Translator,
326 const N: usize,
327 > kv::Updatable for Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
328where
329 Operation<K, V>: Codec,
330 V::Value: Send + Sync,
331{
332 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
333 self.update(key, value).await
334 }
335}
336
337impl<
339 E: Storage + Clock + Metrics,
340 C: MutableContiguous<Item = Operation<K, V>>,
341 K: Array,
342 V: ValueEncoding,
343 H: Hasher,
344 T: Translator,
345 const N: usize,
346 > kv::Deletable for Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
347where
348 Operation<K, V>: Codec,
349 V::Value: Send + Sync,
350{
351 async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
352 self.delete(key).await
353 }
354}
355
356impl<E, C, K, V, T, H, const N: usize> Batchable
358 for Db<E, C, K, V, H, T, N, Unmerkleized, NonDurable>
359where
360 E: Storage + Clock + Metrics,
361 C: MutableContiguous<Item = Operation<K, V>>,
362 K: Array,
363 V: ValueEncoding,
364 T: Translator,
365 H: Hasher,
366 Operation<K, V>: Codec,
367 V::Value: Send + Sync,
368{
369 async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Error>
370 where
371 Iter: Iterator<Item = (K, Option<V::Value>)> + Send + 'a,
372 {
373 let status = &mut self.status;
374 self.any
375 .write_batch_with_callback(iter, move |append: bool, loc: Option<Location>| {
376 status.push(append);
377 if let Some(loc) = loc {
378 status.set_bit(*loc, false);
379 }
380 })
381 .await
382 }
383}