commonware_storage/ordinal/
storage.rs1use super::{Config, Error};
2use crate::{kv, rmap::RMap, Persistable};
3use bytes::{Buf, BufMut};
4use commonware_codec::{
5 CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
6};
7use commonware_cryptography::{crc32, Crc32};
8use commonware_runtime::{
9 buffer::{Read as ReadBuffer, Write},
10 Blob, Clock, Error as RError, Metrics, Storage,
11};
12use commonware_utils::{bitmap::BitMap, hex};
13use futures::future::try_join_all;
14use prometheus_client::metrics::counter::Counter;
15use std::{
16 collections::{btree_map::Entry, BTreeMap, BTreeSet},
17 marker::PhantomData,
18};
19use tracing::{debug, warn};
20
21#[derive(Debug, Clone)]
23struct Record<V: CodecFixed<Cfg = ()>> {
24 value: V,
25 crc: u32,
26}
27
28impl<V: CodecFixed<Cfg = ()>> Record<V> {
29 fn new(value: V) -> Self {
30 let crc = Crc32::checksum(&value.encode());
31 Self { value, crc }
32 }
33
34 fn is_valid(&self) -> bool {
35 self.crc == Crc32::checksum(&self.value.encode())
36 }
37}
38
39impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
40 const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
41}
42
43impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
44 fn write(&self, buf: &mut impl BufMut) {
45 self.value.write(buf);
46 self.crc.write(buf);
47 }
48}
49
50impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
51 type Cfg = ();
52
53 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
54 let value = V::read(buf)?;
55 let crc = u32::read(buf)?;
56
57 Ok(Self { value, crc })
58 }
59}
60
61#[cfg(feature = "arbitrary")]
62impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
63where
64 V: for<'a> arbitrary::Arbitrary<'a>,
65{
66 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
67 let value = V::arbitrary(u)?;
68 Ok(Self::new(value))
69 }
70}
71
72pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
74 context: E,
76 config: Config,
77
78 blobs: BTreeMap<u64, Write<E::Blob>>,
80
81 intervals: RMap,
83
84 pending: BTreeSet<u64>,
86
87 puts: Counter,
89 gets: Counter,
90 has: Counter,
91 syncs: Counter,
92 pruned: Counter,
93
94 _phantom: PhantomData<V>,
95}
96
97impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
98 pub async fn init(context: E, config: Config) -> Result<Self, Error> {
100 Self::init_with_bits(context, config, None).await
101 }
102
103 pub async fn init_with_bits(
112 context: E,
113 config: Config,
114 bits: Option<BTreeMap<u64, &Option<BitMap>>>,
115 ) -> Result<Self, Error> {
116 let mut blobs = BTreeMap::new();
118 let stored_blobs = match context.scan(&config.partition).await {
119 Ok(blobs) => blobs,
120 Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
121 Err(err) => return Err(Error::Runtime(err)),
122 };
123
124 for name in stored_blobs {
126 let (blob, mut len) = context.open(&config.partition, &name).await?;
127 let index = match name.try_into() {
128 Ok(index) => u64::from_be_bytes(index),
129 Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
130 };
131
132 let record_size = Record::<V>::SIZE as u64;
134 if len % record_size != 0 {
135 warn!(
136 blob = index,
137 invalid_size = len,
138 record_size,
139 "blob size is not a multiple of record size, truncating"
140 );
141 len -= len % record_size;
142 blob.resize(len).await?;
143 blob.sync().await?;
144 }
145
146 debug!(blob = index, len, "found index blob");
147 let wrapped_blob = Write::new(blob, len, config.write_buffer);
148 blobs.insert(index, wrapped_blob);
149 }
150
151 debug!(
153 blobs = blobs.len(),
154 "rebuilding intervals from existing index"
155 );
156 let start = context.current();
157 let mut items = 0;
158 let mut intervals = RMap::new();
159 for (section, blob) in &blobs {
160 if let Some(bits) = &bits {
162 if !bits.contains_key(section) {
163 warn!(section, "skipping section without bits");
164 continue;
165 }
166 }
167
168 let size = blob.size().await;
170 let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
171
172 let mut offset = 0;
174 let items_per_blob = config.items_per_blob.get();
175 while offset < size {
176 let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
178
179 let mut must_exist = false;
181 if let Some(bits) = &bits {
182 let bits = bits.get(section).unwrap();
184 if let Some(bits) = bits {
185 let bit_index = offset as usize / Record::<V>::SIZE;
186 if !bits.get(bit_index as u64) {
187 offset += Record::<V>::SIZE as u64;
188 continue;
189 }
190 }
191
192 must_exist = true;
194 }
195
196 replay_blob.seek_to(offset)?;
198 let mut record_buf = vec![0u8; Record::<V>::SIZE];
199 replay_blob
200 .read_exact(&mut record_buf, Record::<V>::SIZE)
201 .await?;
202 offset += Record::<V>::SIZE as u64;
203
204 if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
206 if record.is_valid() {
207 items += 1;
208 intervals.insert(index);
209 continue;
210 }
211 };
212
213 if must_exist {
216 return Err(Error::MissingRecord(index));
217 }
218 }
219 }
220 debug!(
221 items,
222 elapsed = ?context.current().duration_since(start).unwrap_or_default(),
223 "rebuilt intervals"
224 );
225
226 let puts = Counter::default();
228 let gets = Counter::default();
229 let has = Counter::default();
230 let syncs = Counter::default();
231 let pruned = Counter::default();
232 context.register("puts", "Number of put calls", puts.clone());
233 context.register("gets", "Number of get calls", gets.clone());
234 context.register("has", "Number of has calls", has.clone());
235 context.register("syncs", "Number of sync calls", syncs.clone());
236 context.register("pruned", "Number of pruned blobs", pruned.clone());
237
238 Ok(Self {
239 context,
240 config,
241 blobs,
242 intervals,
243 pending: BTreeSet::new(),
244 puts,
245 gets,
246 has,
247 syncs,
248 pruned,
249 _phantom: PhantomData,
250 })
251 }
252
253 pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
255 self.puts.inc();
256
257 let items_per_blob = self.config.items_per_blob.get();
259 let section = index / items_per_blob;
260 if let Entry::Vacant(entry) = self.blobs.entry(section) {
261 let (blob, len) = self
262 .context
263 .open(&self.config.partition, §ion.to_be_bytes())
264 .await?;
265 entry.insert(Write::new(blob, len, self.config.write_buffer));
266 debug!(section, "created blob");
267 }
268
269 let blob = self.blobs.get(§ion).unwrap();
271 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
272 let record = Record::new(value);
273 blob.write_at(record.encode_mut(), offset).await?;
274 self.pending.insert(section);
275
276 self.intervals.insert(index);
278
279 Ok(())
280 }
281
282 pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
284 self.gets.inc();
285
286 if self.intervals.get(&index).is_none() {
288 return Ok(None);
289 }
290
291 let items_per_blob = self.config.items_per_blob.get();
293 let section = index / items_per_blob;
294 let blob = self.blobs.get(§ion).unwrap();
295 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
296 let read_buf = vec![0u8; Record::<V>::SIZE];
297 let read_buf = blob.read_at(read_buf, offset).await?;
298 let record = Record::<V>::read(&mut read_buf.as_ref())?;
299
300 if record.is_valid() {
302 Ok(Some(record.value))
303 } else {
304 Err(Error::InvalidRecord(index))
305 }
306 }
307
308 pub fn has(&self, index: u64) -> bool {
310 self.has.inc();
311
312 self.intervals.get(&index).is_some()
313 }
314
315 pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
317 self.intervals.next_gap(index)
318 }
319
320 pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
322 self.intervals.iter().map(|(&s, &e)| (s, e))
323 }
324
325 pub fn first_index(&self) -> Option<u64> {
327 self.intervals.first_index()
328 }
329
330 pub fn last_index(&self) -> Option<u64> {
332 self.intervals.last_index()
333 }
334
335 pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
340 self.intervals.missing_items(start, max)
341 }
342
343 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
348 let items_per_blob = self.config.items_per_blob.get();
350 let min_section = min / items_per_blob;
351 let sections_to_remove: Vec<u64> = self
352 .blobs
353 .keys()
354 .filter(|&§ion| section < min_section)
355 .copied()
356 .collect();
357
358 for section in sections_to_remove {
360 if let Some(blob) = self.blobs.remove(§ion) {
361 drop(blob);
362 self.context
363 .remove(&self.config.partition, Some(§ion.to_be_bytes()))
364 .await?;
365
366 let start_index = section * items_per_blob;
368 let end_index = (section + 1) * items_per_blob - 1;
369 self.intervals.remove(start_index, end_index);
370 debug!(section, start_index, end_index, "pruned blob");
371 }
372
373 self.pruned.inc();
375 }
376
377 self.pending.retain(|§ion| section >= min_section);
379
380 Ok(())
381 }
382
383 pub async fn sync(&mut self) -> Result<(), Error> {
385 self.syncs.inc();
386
387 let mut futures = Vec::with_capacity(self.pending.len());
389 for §ion in &self.pending {
390 futures.push(self.blobs.get(§ion).unwrap().sync());
391 }
392 try_join_all(futures).await?;
393
394 self.pending.clear();
396
397 Ok(())
398 }
399
400 pub async fn destroy(self) -> Result<(), Error> {
402 for (i, blob) in self.blobs.into_iter() {
403 drop(blob);
404 self.context
405 .remove(&self.config.partition, Some(&i.to_be_bytes()))
406 .await?;
407 debug!(section = i, "destroyed blob");
408 }
409 match self.context.remove(&self.config.partition, None).await {
410 Ok(()) => {}
411 Err(RError::PartitionMissing(_)) => {
412 }
414 Err(err) => return Err(Error::Runtime(err)),
415 }
416 Ok(())
417 }
418}
419
420impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Gettable for Ordinal<E, V> {
421 type Key = u64;
422 type Value = V;
423 type Error = Error;
424
425 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
426 self.get(*key).await
427 }
428}
429
430impl<E: Storage + Metrics + Clock, V: CodecFixedShared> kv::Updatable for Ordinal<E, V> {
431 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
432 self.put(key, value).await
433 }
434}
435
436impl<E: Storage + Metrics + Clock, V: CodecFixedShared> Persistable for Ordinal<E, V> {
437 type Error = Error;
438
439 async fn commit(&mut self) -> Result<(), Self::Error> {
440 self.sync().await
441 }
442
443 async fn sync(&mut self) -> Result<(), Self::Error> {
444 self.sync().await
445 }
446
447 async fn destroy(self) -> Result<(), Self::Error> {
448 self.destroy().await
449 }
450}
451
452#[cfg(all(test, feature = "arbitrary"))]
453mod conformance {
454 use super::*;
455 use commonware_codec::conformance::CodecConformance;
456
457 commonware_conformance::conformance_tests! {
458 CodecConformance<Record<u32>>
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465 use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
466 use commonware_runtime::deterministic::Context;
467
468 type TestOrdinal = Ordinal<Context, u64>;
469
470 #[allow(dead_code)]
471 fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
472 assert_gettable(ordinal, &key);
473 assert_updatable(ordinal, key, 0u64);
474 }
475
476 #[allow(dead_code)]
477 fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
478 assert_send(ordinal.destroy());
479 }
480}