1use super::{Config, Error};
2use crate::{kv, rmap::RMap, Persistable};
3use commonware_codec::{
4 CodecFixed, CodecFixedShared, Encode, FixedSize, Read, ReadExt, Write as CodecWrite,
5};
6use commonware_cryptography::{crc32, Crc32};
7use commonware_runtime::{
8 buffer::{Read as ReadBuffer, Write},
9 Blob, Buf, BufMut, BufferPooler, Clock, Error as RError, Metrics, Storage,
10};
11use commonware_utils::{bitmap::BitMap, hex, sync::AsyncMutex};
12use futures::future::try_join_all;
13use prometheus_client::metrics::counter::Counter;
14use std::{
15 collections::{btree_map::Entry, BTreeMap, BTreeSet},
16 marker::PhantomData,
17};
18use tracing::{debug, warn};
19
20#[derive(Debug, Clone)]
22struct Record<V: CodecFixed<Cfg = ()>> {
23 value: V,
24 crc: u32,
25}
26
27impl<V: CodecFixed<Cfg = ()>> Record<V> {
28 fn new(value: V) -> Self {
29 let crc = Crc32::checksum(&value.encode());
30 Self { value, crc }
31 }
32
33 fn is_valid(&self) -> bool {
34 self.crc == Crc32::checksum(&self.value.encode())
35 }
36}
37
38impl<V: CodecFixed<Cfg = ()>> FixedSize for Record<V> {
39 const SIZE: usize = V::SIZE + crc32::Digest::SIZE;
40}
41
42impl<V: CodecFixed<Cfg = ()>> CodecWrite for Record<V> {
43 fn write(&self, buf: &mut impl BufMut) {
44 self.value.write(buf);
45 self.crc.write(buf);
46 }
47}
48
49impl<V: CodecFixed<Cfg = ()>> Read for Record<V> {
50 type Cfg = ();
51
52 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
53 let value = V::read(buf)?;
54 let crc = u32::read(buf)?;
55
56 Ok(Self { value, crc })
57 }
58}
59
60#[cfg(feature = "arbitrary")]
61impl<V: CodecFixed<Cfg = ()>> arbitrary::Arbitrary<'_> for Record<V>
62where
63 V: for<'a> arbitrary::Arbitrary<'a>,
64{
65 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
66 let value = V::arbitrary(u)?;
67 Ok(Self::new(value))
68 }
69}
70
71pub struct Ordinal<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> {
73 context: E,
75 config: Config,
76
77 blobs: BTreeMap<u64, Write<E::Blob>>,
79
80 intervals: RMap,
82
83 pending: AsyncMutex<BTreeSet<u64>>,
87
88 puts: Counter,
90 gets: Counter,
91 has: Counter,
92 syncs: Counter,
93 pruned: Counter,
94
95 _phantom: PhantomData<V>,
96}
97
98impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixed<Cfg = ()>> Ordinal<E, V> {
99 pub async fn init(context: E, config: Config) -> Result<Self, Error> {
101 Self::init_with_bits(context, config, None).await
102 }
103
104 pub async fn init_with_bits(
113 context: E,
114 config: Config,
115 bits: Option<BTreeMap<u64, &Option<BitMap>>>,
116 ) -> Result<Self, Error> {
117 let mut blobs = BTreeMap::new();
119 let stored_blobs = match context.scan(&config.partition).await {
120 Ok(blobs) => blobs,
121 Err(commonware_runtime::Error::PartitionMissing(_)) => Vec::new(),
122 Err(err) => return Err(Error::Runtime(err)),
123 };
124
125 for name in stored_blobs {
127 let (blob, mut len) = context.open(&config.partition, &name).await?;
128 let index = match name.try_into() {
129 Ok(index) => u64::from_be_bytes(index),
130 Err(nm) => Err(Error::InvalidBlobName(hex(&nm)))?,
131 };
132
133 let record_size = Record::<V>::SIZE as u64;
135 if len % record_size != 0 {
136 warn!(
137 blob = index,
138 invalid_size = len,
139 record_size,
140 "blob size is not a multiple of record size, truncating"
141 );
142 len -= len % record_size;
143 blob.resize(len).await?;
144 blob.sync().await?;
145 }
146
147 debug!(blob = index, len, "found index blob");
148 blobs.insert(index, (blob, len));
149 }
150
151 debug!(
153 blobs = blobs.len(),
154 "rebuilding intervals from existing index"
155 );
156 let start = context.current();
157 let mut items = 0;
158 let mut intervals = RMap::new();
159 for (section, (blob, size)) in &blobs {
160 if let Some(bits) = &bits {
162 if !bits.contains_key(section) {
163 warn!(section, "skipping section without bits");
164 continue;
165 }
166 }
167
168 let mut replay_blob =
170 ReadBuffer::from_pooler(&context, blob.clone(), *size, config.replay_buffer);
171
172 let mut offset = 0;
174 let items_per_blob = config.items_per_blob.get();
175 while offset < *size {
176 let index = section * items_per_blob + (offset / Record::<V>::SIZE as u64);
178
179 let mut must_exist = false;
181 if let Some(bits) = &bits {
182 let bits = bits.get(section).unwrap();
184 if let Some(bits) = bits {
185 let bit_index = offset as usize / Record::<V>::SIZE;
186 if !bits.get(bit_index as u64) {
187 offset += Record::<V>::SIZE as u64;
188 continue;
189 }
190 }
191
192 must_exist = true;
194 }
195
196 replay_blob.seek_to(offset)?;
198 let mut record_buf = replay_blob.read(Record::<V>::SIZE).await?;
199 offset += Record::<V>::SIZE as u64;
200
201 if let Ok(record) = Record::<V>::read(&mut record_buf) {
203 if record.is_valid() {
204 items += 1;
205 intervals.insert(index);
206 continue;
207 }
208 };
209
210 if must_exist {
213 return Err(Error::MissingRecord(index));
214 }
215 }
216 }
217 debug!(
218 items,
219 elapsed = ?context.current().duration_since(start).unwrap_or_default(),
220 "rebuilt intervals"
221 );
222
223 let blobs = blobs
225 .into_iter()
226 .map(|(index, (blob, len))| {
227 (
228 index,
229 Write::from_pooler(&context, blob, len, config.write_buffer),
230 )
231 })
232 .collect();
233
234 let puts = Counter::default();
236 let gets = Counter::default();
237 let has = Counter::default();
238 let syncs = Counter::default();
239 let pruned = Counter::default();
240 context.register("puts", "Number of put calls", puts.clone());
241 context.register("gets", "Number of get calls", gets.clone());
242 context.register("has", "Number of has calls", has.clone());
243 context.register("syncs", "Number of sync calls", syncs.clone());
244 context.register("pruned", "Number of pruned blobs", pruned.clone());
245
246 Ok(Self {
247 context,
248 config,
249 blobs,
250 intervals,
251 pending: AsyncMutex::new(BTreeSet::new()),
252 puts,
253 gets,
254 has,
255 syncs,
256 pruned,
257 _phantom: PhantomData,
258 })
259 }
260
261 pub async fn put(&mut self, index: u64, value: V) -> Result<(), Error> {
263 self.puts.inc();
264
265 let items_per_blob = self.config.items_per_blob.get();
267 let section = index / items_per_blob;
268 if let Entry::Vacant(entry) = self.blobs.entry(section) {
269 let (blob, len) = self
270 .context
271 .open(&self.config.partition, §ion.to_be_bytes())
272 .await?;
273 entry.insert(Write::from_pooler(
274 &self.context,
275 blob,
276 len,
277 self.config.write_buffer,
278 ));
279 debug!(section, "created blob");
280 }
281
282 let blob = self.blobs.get(§ion).unwrap();
284 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
285 let record = Record::new(value);
286 blob.write_at(offset, record.encode_mut()).await?;
287 self.pending.lock().await.insert(section);
288
289 self.intervals.insert(index);
291
292 Ok(())
293 }
294
295 pub async fn get(&self, index: u64) -> Result<Option<V>, Error> {
297 self.gets.inc();
298
299 if self.intervals.get(&index).is_none() {
301 return Ok(None);
302 }
303
304 let items_per_blob = self.config.items_per_blob.get();
306 let section = index / items_per_blob;
307 let blob = self.blobs.get(§ion).unwrap();
308 let offset = (index % items_per_blob) * Record::<V>::SIZE as u64;
309 let mut read_buf = blob.read_at(offset, Record::<V>::SIZE).await?;
310 let record = Record::<V>::read(&mut read_buf)?;
311
312 if record.is_valid() {
314 Ok(Some(record.value))
315 } else {
316 Err(Error::InvalidRecord(index))
317 }
318 }
319
320 pub fn has(&self, index: u64) -> bool {
322 self.has.inc();
323
324 self.intervals.get(&index).is_some()
325 }
326
327 pub fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
329 self.intervals.next_gap(index)
330 }
331
332 pub fn ranges(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
334 self.intervals.iter().map(|(&s, &e)| (s, e))
335 }
336
337 pub fn first_index(&self) -> Option<u64> {
339 self.intervals.first_index()
340 }
341
342 pub fn last_index(&self) -> Option<u64> {
344 self.intervals.last_index()
345 }
346
347 pub fn missing_items(&self, start: u64, max: usize) -> Vec<u64> {
352 self.intervals.missing_items(start, max)
353 }
354
355 pub async fn prune(&mut self, min: u64) -> Result<(), Error> {
360 let items_per_blob = self.config.items_per_blob.get();
362 let min_section = min / items_per_blob;
363 let sections_to_remove: Vec<u64> = self
364 .blobs
365 .keys()
366 .filter(|&§ion| section < min_section)
367 .copied()
368 .collect();
369
370 for section in sections_to_remove {
372 if let Some(blob) = self.blobs.remove(§ion) {
373 drop(blob);
374 self.context
375 .remove(&self.config.partition, Some(§ion.to_be_bytes()))
376 .await?;
377
378 let start_index = section * items_per_blob;
380 let end_index = (section + 1) * items_per_blob - 1;
381 self.intervals.remove(start_index, end_index);
382 debug!(section, start_index, end_index, "pruned blob");
383 }
384
385 self.pruned.inc();
387 }
388
389 self.pending
391 .lock()
392 .await
393 .retain(|§ion| section >= min_section);
394
395 Ok(())
396 }
397
398 pub async fn sync(&self) -> Result<(), Error> {
400 self.syncs.inc();
401
402 let mut pending = self.pending.lock().await;
405 if pending.is_empty() {
406 return Ok(());
407 }
408
409 let mut futures = Vec::with_capacity(pending.len());
410 for section in pending.iter() {
411 futures.push(self.blobs.get(section).unwrap().sync());
412 }
413 try_join_all(futures).await?;
414
415 pending.clear();
417
418 Ok(())
419 }
420
421 pub async fn destroy(self) -> Result<(), Error> {
423 for (i, blob) in self.blobs.into_iter() {
424 drop(blob);
425 self.context
426 .remove(&self.config.partition, Some(&i.to_be_bytes()))
427 .await?;
428 debug!(section = i, "destroyed blob");
429 }
430 match self.context.remove(&self.config.partition, None).await {
431 Ok(()) => {}
432 Err(RError::PartitionMissing(_)) => {
433 }
435 Err(err) => return Err(Error::Runtime(err)),
436 }
437 Ok(())
438 }
439}
440
441impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixedShared> kv::Gettable
442 for Ordinal<E, V>
443{
444 type Key = u64;
445 type Value = V;
446 type Error = Error;
447
448 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
449 self.get(*key).await
450 }
451}
452
453impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixedShared> kv::Updatable
454 for Ordinal<E, V>
455{
456 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
457 self.put(key, value).await
458 }
459}
460
461impl<E: BufferPooler + Storage + Metrics + Clock, V: CodecFixedShared> Persistable
462 for Ordinal<E, V>
463{
464 type Error = Error;
465
466 async fn commit(&self) -> Result<(), Self::Error> {
467 self.sync().await
468 }
469
470 async fn sync(&self) -> Result<(), Self::Error> {
471 self.sync().await
472 }
473
474 async fn destroy(self) -> Result<(), Self::Error> {
475 self.destroy().await
476 }
477}
478
479#[cfg(all(test, feature = "arbitrary"))]
480mod conformance {
481 use super::*;
482 use commonware_codec::conformance::CodecConformance;
483
484 commonware_conformance::conformance_tests! {
485 CodecConformance<Record<u32>>
486 }
487}
488
489#[cfg(test)]
490mod tests {
491 use super::*;
492 use crate::kv::tests::{assert_gettable, assert_send, assert_updatable};
493 use commonware_runtime::deterministic::Context;
494
495 type TestOrdinal = Ordinal<Context, u64>;
496
497 #[allow(dead_code)]
498 fn assert_ordinal_futures_are_send(ordinal: &mut TestOrdinal, key: u64) {
499 assert_gettable(ordinal, &key);
500 assert_updatable(ordinal, key, 0u64);
501 }
502
503 #[allow(dead_code)]
504 fn assert_ordinal_destroy_is_send(ordinal: TestOrdinal) {
505 assert_send(ordinal.destroy());
506 }
507}