commonware_storage/ordinal/
storage.rs1use super::{Config, Error};
2use crate::{rmap::RMap, Context, Persistable};
3use commonware_codec::{
4 CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
5};
6use commonware_cryptography::{crc32, Crc32};
7use commonware_formatting::hex;
8use commonware_runtime::{
9 buffer::{Read as ReadBuffer, Write},
10 telemetry::metrics::{Counter, MetricsExt as _},
11 Blob, Buf, BufMut, BufferPooler, Error as RError,
12};
13use commonware_utils::{bitmap::BitMap, sync::AsyncMutex};
14use futures::future::try_join_all;
15use std::{
16 collections::{btree_map::Entry, BTreeMap, BTreeSet},
17 marker::PhantomData,
18};
19use tracing::{debug, warn};
20
21#[derive(Debug, Clone)]
23struct Record<V: CodecFixed<Cfg = ()>> {
24 value: V,
25 crc: u32,
26}
27
28impl<V: CodecFixed<Cfg = ()>> Record<V> {
29 fn new(value: V) -> Self {
30 let crc = Crc32::checksum(&value.encode());
31 Self { value, crc }
32 }
33
34 fn is_valid(&self) -> bool {
35 self.crc == Crc32::checksum(&self.value.encode())
36 }
37}
38
39impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
40 const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
41}
42
43impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
44 fn write(&self, buf: &mut impl BufMut) {
45 self.value.write(buf);
46 self.crc.write(buf);
47 }
48}
49
50impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
51 type Cfg = ();
52
53 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
54 let value = V::read(buf)?;
55 let crc = u32::read(buf)?;
56
57 Ok(Self { value, crc })
58 }
59}
60
61#[cfg(feature = "arbitrary")]
62impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
63where
64 V: for<'a> arbitrary::Arbitrary<'a>,
65{
66 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
67 let value = V::arbitrary(u)?;
68 Ok(Self::new(value))
69 }
70}
71
72pub struct Ordinal<E: BufferPooler + Context, V: CodecFixed<Cfg = ()>> {
74 context: E,
76 config: Config,
77
78 blobs: BTreeMap<u64, Write<E::Blob>>,
80
81 intervals: RMap,
83
84 pending: AsyncMutex<BTreeSet<u64>>,
88
89 puts: Counter,
91 gets: Counter,
92 has: Counter,
93 syncs: Counter,
94 pruned: Counter,
95
96 _phantom: PhantomData<V>,
97}
98
99impl<E: BufferPooler + Context, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
100 pub async fn init(context: E, config: Config) -> Result<Self, Error> {
102 Self::init_with_bits(context, config, None).await
103 }
104
105 pub async fn init_with_bits(
114 context: E,
115 config: Config,
116 bits: Option<BTreeMap<u64, &Option<BitMap>>>,
117 ) -> Result<Self, Error> {
118 let mut blobs = BTreeMap::new();
120 let stored_blobs = match context.scan(&config.partition).await {
121 Ok(blobs) => blobs,
122 Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
123 Err(err) => return Err(Error::Runtime(err)),
124 };
125
126 for name in stored_blobs {
128 let (blob, mut len) = context.open(&config.partition, &name).await?;
129 let index = match name.try_into() {
130 Ok(index) => u64::from_be_bytes(index),
131 Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
132 };
133
134 let record_size = Record::<V>::SIZE as u64;
136 if len % record_size != 0 {
137 warn!(
138 blob = index,
139 invalid_size = len,
140 record_size,
141 "blob size is not a multiple of record size, truncating"
142 );
143 len -= len % record_size;
144 blob.resize(len).await?;
145 blob.sync().await?;
146 }
147
148 debug!(blob = index, len, "found index blob");
149 blobs.insert(index, (blob, len));
150 }
151
152 debug!(
154 blobs = blobs.len(),
155 "rebuilding intervals from existing index"
156 );
157 let start = context.current();
158 let mut items = 0;
159 let mut intervals = RMap::new();
160 for (section, (blob, size)) in &blobs {
161 if let Some(bits) = &bits {
163 if !bits.contains_key(section) {
164 warn!(section, "skipping section without bits");
165 continue;
166 }
167 }
168
169 let mut replay_blob =
171 ReadBuffer::from_pooler(&context, blob.clone(), *size, config.replay_buffer);
172
173 let mut offset = 0;
175 let items_per_blob = config.items_per_blob.get();
176 while offset < *size {
177 let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
179
180 let mut must_exist = false;
182 if let Some(bits) = &bits {
183 let bits = bits.get(section).unwrap();
185 if let Some(bits) = bits {
186 let bit_index = offset as usize / Record::<V>::SIZE;
187 if !bits.get(bit_index as u64) {
188 offset += Record::<V>::SIZE as u64;
189 continue;
190 }
191 }
192
193 must_exist = true;
195 }
196
197 replay_blob.seek_to(offset)?;
199 let mut record_buf = replay_blob.read(Record::<V>::SIZE).await?;
200 offset += Record::<V>::SIZE as u64;
201
202 if let Ok(record) = Record::<V>::read(&mut record_buf) {
204 if record.is_valid() {
205 items += 1;
206 intervals.insert(index);
207 continue;
208 }
209 };
210
211 if must_exist {
214 return Err(Error::MissingRecord(index));
215 }
216 }
217 }
218 debug!(
219 items,
220 elapsed = ?context.current().duration_since(start).unwrap_or_default(),
221 "rebuilt intervals"
222 );
223
224 let blobs = blobs
226 .into_iter()
227 .map(|(index, (blob, len))| {
228 (
229 index,
230 Write::from_pooler(&context, blob, len, config.write_buffer),
231 )
232 })
233 .collect();
234
235 let puts = context.counter("puts", "Number of put calls");
237 let gets = context.counter("gets", "Number of get calls");
238 let has = context.counter("has", "Number of has calls");
239 let syncs = context.counter("syncs", "Number of sync calls");
240 let pruned = context.counter("pruned", "Number of pruned blobs");
241
242 Ok(Self {
243 context,
244 config,
245 blobs,
246 intervals,
247 pending: AsyncMutex::new(BTreeSet::new()),
248 puts,
249 gets,
250 has,
251 syncs,
252 pruned,
253 _phantom: PhantomData,
254 })
255 }
256
257 pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
259 self.puts.inc();
260
261 let items_per_blob = self.config.items_per_blob.get();
263 let section = index / items_per_blob;
264 if let Entry::Vacant(entry) = self.blobs.entry(section) {
265 let (blob, len) = self
266 .context
267 .open(&self.config.partition, §ion.to_be_bytes())
268 .await?;
269 entry.insert(Write::from_pooler(
270 &self.context,
271 blob,
272 len,
273 self.config.write_buffer,
274 ));
275 debug!(section, "created blob");
276 }
277
278 let blob = self.blobs.get(§ion).unwrap();
280 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
281 let record = Record::new(value);
282 blob.write_at(offset, record.encode_mut()).await?;
283 self.pending.lock().await.insert(section);
284
285 self.intervals.insert(index);
287
288 Ok(())
289 }
290
291 pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
293 self.gets.inc();
294
295 if self.intervals.get(&index).is_none() {
297 return Ok(None);
298 }
299
300 let items_per_blob = self.config.items_per_blob.get();
302 let section = index / items_per_blob;
303 let blob = self.blobs.get(§ion).unwrap();
304 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
305 let mut read_buf = blob.read_at(offset, Record::<V>::SIZE).await?;
306 let record = Record::<V>::read(&mut read_buf)?;
307
308 if record.is_valid() {
310 Ok(Some(record.value))
311 } else {
312 Err(Error::InvalidRecord(index))
313 }
314 }
315
316 pub fn has(&self, index: u64) -> bool {
318 self.has.inc();
319
320 self.intervals.get(&index).is_some()
321 }
322
323 pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
325 self.intervals.next_gap(index)
326 }
327
328 pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
330 self.intervals.iter().map(|(&s, &e)| (s, e))
331 }
332
333 pub fn ranges_from(&self, from: u64) -> impl Iterator<Item = (u64, u64)> + '_ {
335 self.intervals.iter_from(from).map(|(&s, &e)| (s, e))
336 }
337
338 pub fn first_index(&self) -> Option<u64> {
340 self.intervals.first_index()
341 }
342
343 pub fn last_index(&self) -> Option<u64> {
345 self.intervals.last_index()
346 }
347
348 pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
353 self.intervals.missing_items(start, max)
354 }
355
356 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
361 let items_per_blob = self.config.items_per_blob.get();
363 let min_section = min / items_per_blob;
364 let sections_to_remove: Vec<u64> = self
365 .blobs
366 .keys()
367 .filter(|&§ion| section < min_section)
368 .copied()
369 .collect();
370
371 for section in sections_to_remove {
373 if let Some(blob) = self.blobs.remove(§ion) {
374 drop(blob);
375 self.context
376 .remove(&self.config.partition, Some(§ion.to_be_bytes()))
377 .await?;
378
379 let start_index = section * items_per_blob;
381 let end_index = (section + 1) * items_per_blob - 1;
382 self.intervals.remove(start_index, end_index);
383 debug!(section, start_index, end_index, "pruned blob");
384 }
385
386 self.pruned.inc();
388 }
389
390 self.pending
392 .lock()
393 .await
394 .retain(|§ion| section >= min_section);
395
396 Ok(())
397 }
398
399 pub async fn sync(&self) -> Result<(), Error> {
401 self.syncs.inc();
402
403 let mut pending = self.pending.lock().await;
406 if pending.is_empty() {
407 return Ok(());
408 }
409
410 let mut futures = Vec::with_capacity(pending.len());
411 for section in pending.iter() {
412 futures.push(self.blobs.get(section).unwrap().sync());
413 }
414 try_join_all(futures).await?;
415
416 pending.clear();
418
419 Ok(())
420 }
421
422 pub async fn destroy(self) -> Result<(), Error> {
424 for (i, blob) in self.blobs.into_iter() {
425 drop(blob);
426 self.context
427 .remove(&self.config.partition, Some(&i.to_be_bytes()))
428 .await?;
429 debug!(section = i, "destroyed blob");
430 }
431 match self.context.remove(&self.config.partition, None).await {
432 Ok(()) => {}
433 Err(RError::PartitionMissing(_)) => {
434 }
436 Err(err) => return Err(Error::Runtime(err)),
437 }
438 Ok(())
439 }
440}
441
442impl<E: BufferPooler + Context, V: CodecFixedShared> Persistable for Ordinal<E, V> {
443 type Error = Error;
444
445 async fn commit(&self) -> Result<(), Self::Error> {
446 self.sync().await
447 }
448
449 async fn sync(&self) -> Result<(), Self::Error> {
450 self.sync().await
451 }
452
453 async fn destroy(self) -> Result<(), Self::Error> {
454 self.destroy().await
455 }
456}
457
458#[cfg(all(test, feature = "arbitrary"))]
459mod conformance {
460 use super::*;
461 use commonware_codec::conformance::CodecConformance;
462
463 commonware_conformance::conformance_tests! {
464 CodecConformance<Record<u32>>
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use commonware_runtime::deterministic::Context;
472
473 type TestOrdinal = Ordinal<Context, u64>;
474
475 fn is_send<T: Send>(_: T) {}
476
477 #[allow(dead_code)]
478 fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
479 is_send(ordinal.get(key));
480 is_send(ordinal.put(key, 0u64));
481 }
482
483 #[allow(dead_code)]
484 fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
485 is_send(ordinal.destroy());
486 }
487}