commonware_storage/ordinal/
storage.rs1use super::{Config, Error};
2use crate::rmap::RMap;
3use bytes::{Buf, BufMut};
4use commonware_codec::{CodecFixed, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{
6 buffer::{Read as ReadBuffer, Write},
7 Blob, Clock, Error as RError, Metrics, Storage,
8};
9use commonware_utils::{bitmap::BitMap, hex};
10use futures::future::try_join_all;
11use prometheus_client::metrics::counter::Counter;
12use std::{
13 collections::{btree_map::Entry, BTreeMap, BTreeSet},
14 marker::PhantomData,
15 mem::take,
16};
17use tracing::{debug, warn};
18
19#[derive(Debug, Clone)]
21struct Record<V: CodecFixed<Cfg = ()>> {
22 value: V,
23 crc: u32,
24}
25
26impl<V: CodecFixed<Cfg = ()>> Record<V> {
27 fn new(value: V) -> Self {
28 let crc = crc32fast::hash(&value.encode());
29 Self { value, crc }
30 }
31
32 fn is_valid(&self) -> bool {
33 self.crc == crc32fast::hash(&self.value.encode())
34 }
35}
36
37impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
38 const SIZE: usize = V::SIZE + u32::SIZE;
39}
40
41impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
42 fn write(&self, buf: &mut impl BufMut) {
43 self.value.write(buf);
44 self.crc.write(buf);
45 }
46}
47
48impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
49 type Cfg = ();
50
51 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
52 let value = V::read(buf)?;
53 let crc = u32::read(buf)?;
54
55 Ok(Self { value, crc })
56 }
57}
58
59#[cfg(feature = "arbitrary")]
60impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
61where
62 V: for<'a> arbitrary::Arbitrary<'a>,
63{
64 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
65 let value = V::arbitrary(u)?;
66 Ok(Self::new(value))
67 }
68}
69
70pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
72 context: E,
74 config: Config,
75
76 blobs: BTreeMap<u64, Write<E::Blob>>,
78
79 intervals: RMap,
81
82 pending: BTreeSet<u64>,
84
85 puts: Counter,
87 gets: Counter,
88 has: Counter,
89 syncs: Counter,
90 pruned: Counter,
91
92 _phantom: PhantomData<V>,
93}
94
95impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
96 pub async fn init(context: E, config: Config) -> Result<Self, Error> {
98 Self::init_with_bits(context, config, None).await
99 }
100
101 pub async fn init_with_bits(
110 context: E,
111 config: Config,
112 bits: Option<BTreeMap<u64, &Option<BitMap>>>,
113 ) -> Result<Self, Error> {
114 let mut blobs = BTreeMap::new();
116 let stored_blobs = match context.scan(&config.partition).await {
117 Ok(blobs) => blobs,
118 Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
119 Err(err) => return Err(Error::Runtime(err)),
120 };
121
122 for name in stored_blobs {
124 let (blob, mut len) = context.open(&config.partition, &name).await?;
125 let index = match name.try_into() {
126 Ok(index) => u64::from_be_bytes(index),
127 Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
128 };
129
130 let record_size = Record::<V>::SIZE as u64;
132 if len % record_size != 0 {
133 warn!(
134 blob = index,
135 invalid_size = len,
136 record_size,
137 "blob size is not a multiple of record size, truncating"
138 );
139 len -= len % record_size;
140 blob.resize(len).await?;
141 blob.sync().await?;
142 }
143
144 debug!(blob = index, len, "found index blob");
145 let wrapped_blob = Write::new(blob, len, config.write_buffer);
146 blobs.insert(index, wrapped_blob);
147 }
148
149 debug!(
151 blobs = blobs.len(),
152 "rebuilding intervals from existing index"
153 );
154 let start = context.current();
155 let mut items = 0;
156 let mut intervals = RMap::new();
157 for (section, blob) in &blobs {
158 if let Some(bits) = &bits {
160 if !bits.contains_key(section) {
161 warn!(section, "skipping section without bits");
162 continue;
163 }
164 }
165
166 let size = blob.size().await;
168 let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
169
170 let mut offset = 0;
172 let items_per_blob = config.items_per_blob.get();
173 while offset < size {
174 let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
176
177 let mut must_exist = false;
179 if let Some(bits) = &bits {
180 let bits = bits.get(section).unwrap();
182 if let Some(bits) = bits {
183 let bit_index = offset as usize / Record::<V>::SIZE;
184 if !bits.get(bit_index as u64) {
185 offset += Record::<V>::SIZE as u64;
186 continue;
187 }
188 }
189
190 must_exist = true;
192 }
193
194 replay_blob.seek_to(offset)?;
196 let mut record_buf = vec![0u8; Record::<V>::SIZE];
197 replay_blob
198 .read_exact(&mut record_buf, Record::<V>::SIZE)
199 .await?;
200 offset += Record::<V>::SIZE as u64;
201
202 if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
204 if record.is_valid() {
205 items += 1;
206 intervals.insert(index);
207 continue;
208 }
209 };
210
211 if must_exist {
214 return Err(Error::MissingRecord(index));
215 }
216 }
217 }
218 debug!(
219 items,
220 elapsed = ?context.current().duration_since(start).unwrap_or_default(),
221 "rebuilt intervals"
222 );
223
224 let puts = Counter::default();
226 let gets = Counter::default();
227 let has = Counter::default();
228 let syncs = Counter::default();
229 let pruned = Counter::default();
230 context.register("puts", "Number of put calls", puts.clone());
231 context.register("gets", "Number of get calls", gets.clone());
232 context.register("has", "Number of has calls", has.clone());
233 context.register("syncs", "Number of sync calls", syncs.clone());
234 context.register("pruned", "Number of pruned blobs", pruned.clone());
235
236 Ok(Self {
237 context,
238 config,
239 blobs,
240 intervals,
241 pending: BTreeSet::new(),
242 puts,
243 gets,
244 has,
245 syncs,
246 pruned,
247 _phantom: PhantomData,
248 })
249 }
250
251 pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
253 self.puts.inc();
254
255 let items_per_blob = self.config.items_per_blob.get();
257 let section = index / items_per_blob;
258 if let Entry::Vacant(entry) = self.blobs.entry(section) {
259 let (blob, len) = self
260 .context
261 .open(&self.config.partition, §ion.to_be_bytes())
262 .await?;
263 entry.insert(Write::new(blob, len, self.config.write_buffer));
264 debug!(section, "created blob");
265 }
266
267 let blob = self.blobs.get(§ion).unwrap();
269 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
270 let record = Record::new(value);
271 blob.write_at(record.encode(), offset).await?;
272 self.pending.insert(section);
273
274 self.intervals.insert(index);
276
277 Ok(())
278 }
279
280 pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
282 self.gets.inc();
283
284 if self.intervals.get(&index).is_none() {
286 return Ok(None);
287 }
288
289 let items_per_blob = self.config.items_per_blob.get();
291 let section = index / items_per_blob;
292 let blob = self.blobs.get(§ion).unwrap();
293 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
294 let read_buf = vec![0u8; Record::<V>::SIZE];
295 let read_buf = blob.read_at(read_buf, offset).await?;
296 let record = Record::<V>::read(&mut read_buf.as_ref())?;
297
298 if record.is_valid() {
300 Ok(Some(record.value))
301 } else {
302 Err(Error::InvalidRecord(index))
303 }
304 }
305
306 pub fn has(&self, index: u64) -> bool {
308 self.has.inc();
309
310 self.intervals.get(&index).is_some()
311 }
312
313 pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
315 self.intervals.next_gap(index)
316 }
317
318 pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
320 self.intervals.iter().map(|(&s, &e)| (s, e))
321 }
322
323 pub fn first_index(&self) -> Option<u64> {
325 self.intervals.first_index()
326 }
327
328 pub fn last_index(&self) -> Option<u64> {
330 self.intervals.last_index()
331 }
332
333 pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
338 self.intervals.missing_items(start, max)
339 }
340
341 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
346 let items_per_blob = self.config.items_per_blob.get();
348 let min_section = min / items_per_blob;
349 let sections_to_remove: Vec<u64> = self
350 .blobs
351 .keys()
352 .filter(|&§ion| section < min_section)
353 .copied()
354 .collect();
355
356 for section in sections_to_remove {
358 if let Some(blob) = self.blobs.remove(§ion) {
359 drop(blob);
360 self.context
361 .remove(&self.config.partition, Some(§ion.to_be_bytes()))
362 .await?;
363
364 let start_index = section * items_per_blob;
366 let end_index = (section + 1) * items_per_blob - 1;
367 self.intervals.remove(start_index, end_index);
368 debug!(section, start_index, end_index, "pruned blob");
369 }
370
371 self.pruned.inc();
373 }
374
375 self.pending.retain(|§ion| section >= min_section);
377
378 Ok(())
379 }
380
381 pub async fn sync(&mut self) -> Result<(), Error> {
383 self.syncs.inc();
384
385 let mut futures = Vec::with_capacity(self.pending.len());
387 for §ion in &self.pending {
388 futures.push(self.blobs.get(§ion).unwrap().sync());
389 }
390 try_join_all(futures).await?;
391
392 self.pending.clear();
394
395 Ok(())
396 }
397
398 pub async fn close(mut self) -> Result<(), Error> {
400 self.sync().await?;
401 for (_, blob) in take(&mut self.blobs) {
402 blob.sync().await?;
403 }
404 Ok(())
405 }
406
407 pub async fn destroy(self) -> Result<(), Error> {
409 for (i, blob) in self.blobs.into_iter() {
410 drop(blob);
411 self.context
412 .remove(&self.config.partition, Some(&i.to_be_bytes()))
413 .await?;
414 debug!(section = i, "destroyed blob");
415 }
416 match self.context.remove(&self.config.partition, None).await {
417 Ok(()) => {}
418 Err(RError::PartitionMissing(_)) => {
419 }
421 Err(err) => return Err(Error::Runtime(err)),
422 }
423 Ok(())
424 }
425}
426
427impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> crate::store::Store for Ordinal<E, V> {
428 type Key = u64;
429 type Value = V;
430 type Error = Error;
431
432 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
433 self.get(*key).await
434 }
435}
436
437impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> crate::store::StoreMut
438 for Ordinal<E, V>
439{
440 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
441 self.put(key, value).await
442 }
443}
444
445impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> crate::store::StorePersistable
446 for Ordinal<E, V>
447{
448 async fn commit(&mut self) -> Result<(), Self::Error> {
449 self.sync().await
450 }
451
452 async fn destroy(self) -> Result<(), Self::Error> {
453 self.destroy().await
454 }
455}
456
457#[cfg(all(test, feature = "arbitrary"))]
458mod conformance {
459 use super::*;
460 use commonware_codec::conformance::CodecConformance;
461
462 commonware_conformance::conformance_tests! {
463 CodecConformance<Record<u32>>
464 }
465}