commonware_storage/ordinal/
storage.rs1use super::{Config, Error};
2use crate::{kv, rmap::RMap, Persistable};
3use commonware_codec::{
4 CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
5};
6use commonware_cryptography::{crc32, Crc32};
7use commonware_runtime::{
8 buffer::{Read as ReadBuffer, Write},
9 Blob, Buf, BufMut, Clock, Error as RError, IoBufMut, Metrics, Storage,
10};
11use commonware_utils::{bitmap::BitMap, hex};
12use futures::future::try_join_all;
13use prometheus_client::metrics::counter::Counter;
14use std::{
15 collections::{btree_map::Entry, BTreeMap, BTreeSet},
16 marker::PhantomData,
17};
18use tracing::{debug, warn};
19
20#[derive(Debug, Clone)]
22struct Record<V: CodecFixed<Cfg = ()>> {
23 value: V,
24 crc: u32,
25}
26
27impl<V: CodecFixed<Cfg = ()>> Record<V> {
28 fn new(value: V) -> Self {
29 let crc = Crc32::checksum(&value.encode());
30 Self { value, crc }
31 }
32
33 fn is_valid(&self) -> bool {
34 self.crc == Crc32::checksum(&self.value.encode())
35 }
36}
37
38impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
39 const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
40}
41
42impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
43 fn write(&self, buf: &mut impl BufMut) {
44 self.value.write(buf);
45 self.crc.write(buf);
46 }
47}
48
49impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
50 type Cfg = ();
51
52 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
53 let value = V::read(buf)?;
54 let crc = u32::read(buf)?;
55
56 Ok(Self { value, crc })
57 }
58}
59
60#[cfg(feature = "arbitrary")]
61impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
62where
63 V: for<'a> arbitrary::Arbitrary<'a>,
64{
65 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
66 let value = V::arbitrary(u)?;
67 Ok(Self::new(value))
68 }
69}
70
71pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
73 context: E,
75 config: Config,
76
77 blobs: BTreeMap<u64, Write<E::Blob>>,
79
80 intervals: RMap,
82
83 pending: BTreeSet<u64>,
85
86 puts: Counter,
88 gets: Counter,
89 has: Counter,
90 syncs: Counter,
91 pruned: Counter,
92
93 _phantom: PhantomData<V>,
94}
95
96impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
97 pub async fn init(context: E, config: Config) -> Result<Self, Error> {
99 Self::init_with_bits(context, config, None).await
100 }
101
102 pub async fn init_with_bits(
111 context: E,
112 config: Config,
113 bits: Option<BTreeMap<u64, &Option<BitMap>>>,
114 ) -> Result<Self, Error> {
115 let mut blobs = BTreeMap::new();
117 let stored_blobs = match context.scan(&config.partition).await {
118 Ok(blobs) => blobs,
119 Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
120 Err(err) => return Err(Error::Runtime(err)),
121 };
122
123 for name in stored_blobs {
125 let (blob, mut len) = context.open(&config.partition, &name).await?;
126 let index = match name.try_into() {
127 Ok(index) => u64::from_be_bytes(index),
128 Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
129 };
130
131 let record_size = Record::<V>::SIZE as u64;
133 if len % record_size != 0 {
134 warn!(
135 blob = index,
136 invalid_size = len,
137 record_size,
138 "blob size is not a multiple of record size, truncating"
139 );
140 len -= len % record_size;
141 blob.resize(len).await?;
142 blob.sync().await?;
143 }
144
145 debug!(blob = index, len, "found index blob");
146 let wrapped_blob = Write::new(blob, len, config.write_buffer);
147 blobs.insert(index, wrapped_blob);
148 }
149
150 debug!(
152 blobs = blobs.len(),
153 "rebuilding intervals from existing index"
154 );
155 let start = context.current();
156 let mut items = 0;
157 let mut intervals = RMap::new();
158 for (section, blob) in &blobs {
159 if let Some(bits) = &bits {
161 if !bits.contains_key(section) {
162 warn!(section, "skipping section without bits");
163 continue;
164 }
165 }
166
167 let size = blob.size().await;
169 let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
170
171 let mut offset = 0;
173 let items_per_blob = config.items_per_blob.get();
174 while offset < size {
175 let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
177
178 let mut must_exist = false;
180 if let Some(bits) = &bits {
181 let bits = bits.get(section).unwrap();
183 if let Some(bits) = bits {
184 let bit_index = offset as usize / Record::<V>::SIZE;
185 if !bits.get(bit_index as u64) {
186 offset += Record::<V>::SIZE as u64;
187 continue;
188 }
189 }
190
191 must_exist = true;
193 }
194
195 replay_blob.seek_to(offset)?;
197 let mut record_buf = vec![0u8; Record::<V>::SIZE];
198 replay_blob
199 .read_exact(&mut record_buf, Record::<V>::SIZE)
200 .await?;
201 offset += Record::<V>::SIZE as u64;
202
203 if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
205 if record.is_valid() {
206 items += 1;
207 intervals.insert(index);
208 continue;
209 }
210 };
211
212 if must_exist {
215 return Err(Error::MissingRecord(index));
216 }
217 }
218 }
219 debug!(
220 items,
221 elapsed = ?context.current().duration_since(start).unwrap_or_default(),
222 "rebuilt intervals"
223 );
224
225 let puts = Counter::default();
227 let gets = Counter::default();
228 let has = Counter::default();
229 let syncs = Counter::default();
230 let pruned = Counter::default();
231 context.register("puts", "Number of put calls", puts.clone());
232 context.register("gets", "Number of get calls", gets.clone());
233 context.register("has", "Number of has calls", has.clone());
234 context.register("syncs", "Number of sync calls", syncs.clone());
235 context.register("pruned", "Number of pruned blobs", pruned.clone());
236
237 Ok(Self {
238 context,
239 config,
240 blobs,
241 intervals,
242 pending: BTreeSet::new(),
243 puts,
244 gets,
245 has,
246 syncs,
247 pruned,
248 _phantom: PhantomData,
249 })
250 }
251
252 pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
254 self.puts.inc();
255
256 let items_per_blob = self.config.items_per_blob.get();
258 let section = index / items_per_blob;
259 if let Entry::Vacant(entry) = self.blobs.entry(section) {
260 let (blob, len) = self
261 .context
262 .open(&self.config.partition, §ion.to_be_bytes())
263 .await?;
264 entry.insert(Write::new(blob, len, self.config.write_buffer));
265 debug!(section, "created blob");
266 }
267
268 let blob = self.blobs.get(§ion).unwrap();
270 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
271 let record = Record::new(value);
272 blob.write_at(offset, record.encode_mut()).await?;
273 self.pending.insert(section);
274
275 self.intervals.insert(index);
277
278 Ok(())
279 }
280
281 pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
283 self.gets.inc();
284
285 if self.intervals.get(&index).is_none() {
287 return Ok(None);
288 }
289
290 let items_per_blob = self.config.items_per_blob.get();
292 let section = index / items_per_blob;
293 let blob = self.blobs.get(§ion).unwrap();
294 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
295 let read_buf = blob
296 .read_at(offset, IoBufMut::zeroed(Record::<V>::SIZE))
297 .await?
298 .coalesce();
299 let record = Record::<V>::read(&mut read_buf.as_ref())?;
300
301 if record.is_valid() {
303 Ok(Some(record.value))
304 } else {
305 Err(Error::InvalidRecord(index))
306 }
307 }
308
309 pub fn has(&self, index: u64) -> bool {
311 self.has.inc();
312
313 self.intervals.get(&index).is_some()
314 }
315
316 pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
318 self.intervals.next_gap(index)
319 }
320
321 pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
323 self.intervals.iter().map(|(&s, &e)| (s, e))
324 }
325
326 pub fn first_index(&self) -> Option<u64> {
328 self.intervals.first_index()
329 }
330
331 pub fn last_index(&self) -> Option<u64> {
333 self.intervals.last_index()
334 }
335
336 pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
341 self.intervals.missing_items(start, max)
342 }
343
344 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
349 let items_per_blob = self.config.items_per_blob.get();
351 let min_section = min / items_per_blob;
352 let sections_to_remove: Vec<u64> = self
353 .blobs
354 .keys()
355 .filter(|&§ion| section < min_section)
356 .copied()
357 .collect();
358
359 for section in sections_to_remove {
361 if let Some(blob) = self.blobs.remove(§ion) {
362 drop(blob);
363 self.context
364 .remove(&self.config.partition, Some(§ion.to_be_bytes()))
365 .await?;
366
367 let start_index = section * items_per_blob;
369 let end_index = (section + 1) * items_per_blob - 1;
370 self.intervals.remove(start_index, end_index);
371 debug!(section, start_index, end_index, "pruned blob");
372 }
373
374 self.pruned.inc();
376 }
377
378 self.pending.retain(|§ion| section >= min_section);
380
381 Ok(())
382 }
383
384 pub async fn sync(&mut self) -> Result<(), Error> {
386 self.syncs.inc();
387
388 let mut futures = Vec::with_capacity(self.pending.len());
390 for §ion in &self.pending {
391 futures.push(self.blobs.get(§ion).unwrap().sync());
392 }
393 try_join_all(futures).await?;
394
395 self.pending.clear();
397
398 Ok(())
399 }
400
401 pub async fn destroy(self) -> Result<(), Error> {
403 for (i, blob) in self.blobs.into_iter() {
404 drop(blob);
405 self.context
406 .remove(&self.config.partition, Some(&i.to_be_bytes()))
407 .await?;
408 debug!(section = i, "destroyed blob");
409 }
410 match self.context.remove(&self.config.partition, None).await {
411 Ok(()) => {}
412 Err(RError::PartitionMissing(_)) => {
413 }
415 Err(err) => return Err(Error::Runtime(err)),
416 }
417 Ok(())
418 }
419}
420
421impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Gettable for Ordinal<E, V> {
422 type Key = u64;
423 type Value = V;
424 type Error = Error;
425
426 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
427 self.get(*key).await
428 }
429}
430
431impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Updatable for Ordinal<E, V> {
432 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
433 self.put(key, value).await
434 }
435}
436
437impl<E: Storage + Metrics + Clock, V: CodecFixedShared> Persistable for Ordinal<E, V> {
438 type Error = Error;
439
440 async fn commit(&mut self) -> Result<(), Self::Error> {
441 self.sync().await
442 }
443
444 async fn sync(&mut self) -> Result<(), Self::Error> {
445 self.sync().await
446 }
447
448 async fn destroy(self) -> Result<(), Self::Error> {
449 self.destroy().await
450 }
451}
452
453#[cfg(all(test, feature = "arbitrary"))]
454mod conformance {
455 use super::*;
456 use commonware_codec::conformance::CodecConformance;
457
458 commonware_conformance::conformance_tests! {
459 CodecConformance<Record<u32>>
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466 use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
467 use commonware_runtime::deterministic::Context;
468
469 type TestOrdinal = Ordinal<Context, u64>;
470
471 #[allow(dead_code)]
472 fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
473 assert_gettable(ordinal, &key);
474 assert_updatable(ordinal, key, 0u64);
475 }
476
477 #[allow(dead_code)]
478 fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
479 assert_send(ordinal.destroy());
480 }
481}