commonware_storage/ordinal/
storage.rs1use super::{Config, Error};
2use crate::rmap::RMap;
3use bytes::{Buf, BufMut};
4use commonware_codec::{CodecFixed, Encode, FixedSize, Read, ReadExt, Write as CodecWrite};
5use commonware_runtime::{
6 buffer::{Read as ReadBuffer, Write},
7 Blob, Clock, Error as RError, Metrics, Storage,
8};
9use commonware_utils::{bitmap::BitMap, hex};
10use futures::future::try_join_all;
11use prometheus_client::metrics::counter::Counter;
12use std::{
13 collections::{btree_map::Entry, BTreeMap, BTreeSet},
14 marker::PhantomData,
15 mem::take,
16};
17use tracing::{debug, warn};
18
19#[derive(Debug, Clone)]
21struct Record<V: CodecFixed<Cfg = ()>> {
22 value: V,
23 crc: u32,
24}
25
26impl<V: CodecFixed<Cfg = ()>> Record<V> {
27 fn new(value: V) -> Self {
28 let crc = crc32fast::hash(&value.encode());
29 Self { value, crc }
30 }
31
32 fn is_valid(&self) -> bool {
33 self.crc == crc32fast::hash(&self.value.encode())
34 }
35}
36
37impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
38 const SIZE: usize = V::SIZE + u32::SIZE;
39}
40
41impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
42 fn write(&self, buf: &mut impl BufMut) {
43 self.value.write(buf);
44 self.crc.write(buf);
45 }
46}
47
48impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
49 type Cfg = ();
50
51 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
52 let value = V::read(buf)?;
53 let crc = u32::read(buf)?;
54
55 Ok(Self { value, crc })
56 }
57}
58
59pub struct Ordinal<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
61 context: E,
63 config: Config,
64
65 blobs: BTreeMap<u64, Write<E::Blob>>,
67
68 intervals: RMap,
70
71 pending: BTreeSet<u64>,
73
74 puts: Counter,
76 gets: Counter,
77 has: Counter,
78 syncs: Counter,
79 pruned: Counter,
80
81 _phantom: PhantomData<V>,
82}
83
84impl<E: Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
85 pub async fn init(context: E, config: Config) -> Result<Self, Error> {
87 Self::init_with_bits(context, config, None).await
88 }
89
90 pub async fn init_with_bits(
99 context: E,
100 config: Config,
101 bits: Option<BTreeMap<u64, &Option<BitMap>>>,
102 ) -> Result<Self, Error> {
103 let mut blobs = BTreeMap::new();
105 let stored_blobs = match context.scan(&config.partition).await {
106 Ok(blobs) => blobs,
107 Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
108 Err(err) => return Err(Error::Runtime(err)),
109 };
110
111 for name in stored_blobs {
113 let (blob, mut len) = context.open(&config.partition, &name).await?;
114 let index = match name.try_into() {
115 Ok(index) => u64::from_be_bytes(index),
116 Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
117 };
118
119 let record_size = Record::<V>::SIZE as u64;
121 if len % record_size != 0 {
122 warn!(
123 blob = index,
124 invalid_size = len,
125 record_size,
126 "blob size is not a multiple of record size, truncating"
127 );
128 len -= len % record_size;
129 blob.resize(len).await?;
130 blob.sync().await?;
131 }
132
133 debug!(blob = index, len, "found index blob");
134 let wrapped_blob = Write::new(blob, len, config.write_buffer);
135 blobs.insert(index, wrapped_blob);
136 }
137
138 debug!(
140 blobs = blobs.len(),
141 "rebuilding intervals from existing index"
142 );
143 let start = context.current();
144 let mut items = 0;
145 let mut intervals = RMap::new();
146 for (section, blob) in &blobs {
147 if let Some(bits) = &bits {
149 if !bits.contains_key(section) {
150 warn!(section, "skipping section without bits");
151 continue;
152 }
153 }
154
155 let size = blob.size().await;
157 let mut replay_blob = ReadBuffer::new(blob.clone(), size, config.replay_buffer);
158
159 let mut offset = 0;
161 let items_per_blob = config.items_per_blob.get();
162 while offset < size {
163 let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
165
166 let mut must_exist = false;
168 if let Some(bits) = &bits {
169 let bits = bits.get(section).unwrap();
171 if let Some(bits) = bits {
172 let bit_index = offset as usize / Record::<V>::SIZE;
173 if !bits.get(bit_index as u64) {
174 offset += Record::<V>::SIZE as u64;
175 continue;
176 }
177 }
178
179 must_exist = true;
181 }
182
183 replay_blob.seek_to(offset)?;
185 let mut record_buf = vec![0u8; Record::<V>::SIZE];
186 replay_blob
187 .read_exact(&mut record_buf, Record::<V>::SIZE)
188 .await?;
189 offset += Record::<V>::SIZE as u64;
190
191 if let Ok(record) = Record::<V>::read(&mut record_buf.as_slice()) {
193 if record.is_valid() {
194 items += 1;
195 intervals.insert(index);
196 continue;
197 }
198 };
199
200 if must_exist {
203 return Err(Error::MissingRecord(index));
204 }
205 }
206 }
207 debug!(
208 items,
209 elapsed = ?context.current().duration_since(start).unwrap_or_default(),
210 "rebuilt intervals"
211 );
212
213 let puts = Counter::default();
215 let gets = Counter::default();
216 let has = Counter::default();
217 let syncs = Counter::default();
218 let pruned = Counter::default();
219 context.register("puts", "Number of put calls", puts.clone());
220 context.register("gets", "Number of get calls", gets.clone());
221 context.register("has", "Number of has calls", has.clone());
222 context.register("syncs", "Number of sync calls", syncs.clone());
223 context.register("pruned", "Number of pruned blobs", pruned.clone());
224
225 Ok(Self {
226 context,
227 config,
228 blobs,
229 intervals,
230 pending: BTreeSet::new(),
231 puts,
232 gets,
233 has,
234 syncs,
235 pruned,
236 _phantom: PhantomData,
237 })
238 }
239
240 pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
242 self.puts.inc();
243
244 let items_per_blob = self.config.items_per_blob.get();
246 let section = index / items_per_blob;
247 if let Entry::Vacant(entry) = self.blobs.entry(section) {
248 let (blob, len) = self
249 .context
250 .open(&self.config.partition, §ion.to_be_bytes())
251 .await?;
252 entry.insert(Write::new(blob, len, self.config.write_buffer));
253 debug!(section, "created blob");
254 }
255
256 let blob = self.blobs.get(§ion).unwrap();
258 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
259 let record = Record::new(value);
260 blob.write_at(record.encode(), offset).await?;
261 self.pending.insert(section);
262
263 self.intervals.insert(index);
265
266 Ok(())
267 }
268
269 pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
271 self.gets.inc();
272
273 if self.intervals.get(&index).is_none() {
275 return Ok(None);
276 }
277
278 let items_per_blob = self.config.items_per_blob.get();
280 let section = index / items_per_blob;
281 let blob = self.blobs.get(§ion).unwrap();
282 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
283 let read_buf = vec![0u8; Record::<V>::SIZE];
284 let read_buf = blob.read_at(read_buf, offset).await?;
285 let record = Record::<V>::read(&mut read_buf.as_ref())?;
286
287 if record.is_valid() {
289 Ok(Some(record.value))
290 } else {
291 Err(Error::InvalidRecord(index))
292 }
293 }
294
295 pub fn has(&self, index: u64) -> bool {
297 self.has.inc();
298
299 self.intervals.get(&index).is_some()
300 }
301
302 pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
304 self.intervals.next_gap(index)
305 }
306
307 pub fn first_index(&self) -> Option<u64> {
309 self.intervals.first_index()
310 }
311
312 pub fn last_index(&self) -> Option<u64> {
314 self.intervals.last_index()
315 }
316
317 pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
319 self.intervals.missing_items(start, max)
320 }
321
322 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
327 let items_per_blob = self.config.items_per_blob.get();
329 let min_section = min / items_per_blob;
330 let sections_to_remove: Vec<u64> = self
331 .blobs
332 .keys()
333 .filter(|&§ion| section < min_section)
334 .copied()
335 .collect();
336
337 for section in sections_to_remove {
339 if let Some(blob) = self.blobs.remove(§ion) {
340 drop(blob);
341 self.context
342 .remove(&self.config.partition, Some(§ion.to_be_bytes()))
343 .await?;
344
345 let start_index = section * items_per_blob;
347 let end_index = (section + 1) * items_per_blob - 1;
348 self.intervals.remove(start_index, end_index);
349 debug!(section, start_index, end_index, "pruned blob");
350 }
351
352 self.pruned.inc();
354 }
355
356 self.pending.retain(|§ion| section >= min_section);
358
359 Ok(())
360 }
361
362 pub async fn sync(&mut self) -> Result<(), Error> {
364 self.syncs.inc();
365
366 let mut futures = Vec::with_capacity(self.pending.len());
368 for §ion in &self.pending {
369 futures.push(self.blobs.get(§ion).unwrap().sync());
370 }
371 try_join_all(futures).await?;
372
373 self.pending.clear();
375
376 Ok(())
377 }
378
379 pub async fn close(mut self) -> Result<(), Error> {
381 self.sync().await?;
382 for (_, blob) in take(&mut self.blobs) {
383 blob.sync().await?;
384 }
385 Ok(())
386 }
387
388 pub async fn destroy(self) -> Result<(), Error> {
390 for (i, blob) in self.blobs.into_iter() {
391 drop(blob);
392 self.context
393 .remove(&self.config.partition, Some(&i.to_be_bytes()))
394 .await?;
395 debug!(section = i, "destroyed blob");
396 }
397 match self.context.remove(&self.config.partition, None).await {
398 Ok(()) => {}
399 Err(RError::PartitionMissing(_)) => {
400 }
402 Err(err) => return Err(Error::Runtime(err)),
403 }
404 Ok(())
405 }
406}