commonware_storage/archive/immutable/
storage.rs1use crate::{
2 archive::{immutable::Config, Error, Identifier},
3 freezer::{self, Checkpoint, Cursor, Freezer},
4 metadata::{self, Metadata},
5 ordinal::{self, Ordinal},
6};
7use bytes::{Buf, BufMut};
8use commonware_codec::{Codec, EncodeSize, FixedSize, Read, ReadExt, Write};
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
11use futures::join;
12use prometheus_client::metrics::counter::Counter;
13use std::collections::BTreeMap;
14use tracing::debug;
15
16const FREEZER_PREFIX: u8 = 0;
18
19const ORDINAL_PREFIX: u8 = 1;
21
22#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
24enum Record {
25 Freezer(Checkpoint),
26 Ordinal(Option<BitMap>),
27}
28
29impl Record {
30 fn freezer(&self) -> &Checkpoint {
32 match self {
33 Self::Freezer(checkpoint) => checkpoint,
34 _ => panic!("incorrect record"),
35 }
36 }
37
38 fn ordinal(&self) -> &Option<BitMap> {
40 match self {
41 Self::Ordinal(indices) => indices,
42 _ => panic!("incorrect record"),
43 }
44 }
45}
46
47impl Write for Record {
48 fn write(&self, buf: &mut impl BufMut) {
49 match self {
50 Self::Freezer(checkpoint) => {
51 buf.put_u8(0);
52 checkpoint.write(buf);
53 }
54 Self::Ordinal(indices) => {
55 buf.put_u8(1);
56 indices.write(buf);
57 }
58 }
59 }
60}
61
62impl Read for Record {
63 type Cfg = ();
64 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
65 let tag = u8::read(buf)?;
66 match tag {
67 0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
68 1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
69 buf,
70 &(usize::MAX as u64),
71 )?)),
72 _ => Err(commonware_codec::Error::InvalidEnum(tag)),
73 }
74 }
75}
76
77impl EncodeSize for Record {
78 fn encode_size(&self) -> usize {
79 1 + match self {
80 Self::Freezer(_) => Checkpoint::SIZE,
81 Self::Ordinal(indices) => indices.encode_size(),
82 }
83 }
84}
85
86pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: Codec> {
88 items_per_section: u64,
90
91 metadata: Metadata<E, U64, Record>,
93
94 freezer: Freezer<E, K, V>,
96
97 ordinal: Ordinal<E, Cursor>,
99
100 gets: Counter,
102 has: Counter,
103 syncs: Counter,
104}
105
106impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Archive<E, K, V> {
107 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109 let metadata = Metadata::<E, U64, Record>::init(
111 context.with_label("metadata"),
112 metadata::Config {
113 partition: cfg.metadata_partition,
114 codec_config: (),
115 },
116 )
117 .await?;
118
119 let freezer_key = U64::new(FREEZER_PREFIX, 0);
121 let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
122
123 let freezer = Freezer::init_with_checkpoint(
127 context.with_label("freezer"),
128 freezer::Config {
129 journal_partition: cfg.freezer_journal_partition,
130 journal_compression: cfg.freezer_journal_compression,
131 journal_write_buffer: cfg.write_buffer,
132 journal_target_size: cfg.freezer_journal_target_size,
133 journal_buffer_pool: cfg.freezer_journal_buffer_pool,
134 table_partition: cfg.freezer_table_partition,
135 table_initial_size: cfg.freezer_table_initial_size,
136 table_resize_frequency: cfg.freezer_table_resize_frequency,
137 table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
138 table_replay_buffer: cfg.replay_buffer,
139 codec_config: cfg.codec_config,
140 },
141 checkpoint,
142 )
143 .await?;
144
145 let sections = metadata
147 .keys()
148 .filter(|k| k.prefix() == ORDINAL_PREFIX)
149 .collect::<Vec<_>>();
150 let mut section_bits = BTreeMap::new();
151 for section in sections {
152 let bits = metadata.get(section).unwrap().ordinal();
154
155 let section = section.value();
157 section_bits.insert(section, bits);
158 }
159
160 let ordinal = Ordinal::init_with_bits(
164 context.with_label("ordinal"),
165 ordinal::Config {
166 partition: cfg.ordinal_partition,
167 items_per_blob: cfg.items_per_section,
168 write_buffer: cfg.write_buffer,
169 replay_buffer: cfg.replay_buffer,
170 },
171 Some(section_bits),
172 )
173 .await?;
174
175 let gets = Counter::default();
177 let has = Counter::default();
178 let syncs = Counter::default();
179 context.register("gets", "Number of gets performed", gets.clone());
180 context.register("has", "Number of has performed", has.clone());
181 context.register("syncs", "Number of syncs called", syncs.clone());
182
183 Ok(Self {
184 items_per_section: cfg.items_per_section.get(),
185 metadata,
186 freezer,
187 ordinal,
188 gets,
189 has,
190 syncs,
191 })
192 }
193
194 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
196 let Some(cursor) = self.ordinal.get(index).await? else {
198 return Ok(None);
199 };
200
201 let result = self
203 .freezer
204 .get(freezer::Identifier::Cursor(cursor))
205 .await?;
206
207 Ok(result)
209 }
210
211 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
213 let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
215
216 Ok(result)
218 }
219
220 async fn initialize_section(&mut self, section: u64) {
222 let bits = BitMap::zeroes(self.items_per_section);
224
225 let key = U64::new(ORDINAL_PREFIX, section);
227 self.metadata.put(key, Record::Ordinal(Some(bits)));
228 debug!(section, "initialized section");
229 }
230}
231
232impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::archive::Archive
233 for Archive<E, K, V>
234{
235 type Key = K;
236 type Value = V;
237
238 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
239 if self.ordinal.has(index) {
241 return Ok(());
242 }
243
244 let section = index / self.items_per_section;
246 let ordinal_key = U64::new(ORDINAL_PREFIX, section);
247 if self.metadata.get(&ordinal_key).is_none() {
248 self.initialize_section(section).await;
249 }
250 let record = self.metadata.get_mut(&ordinal_key).unwrap();
251
252 let done = if let Record::Ordinal(Some(bits)) = record {
254 bits.set(index % self.items_per_section, true);
255 bits.count_ones() == self.items_per_section
256 } else {
257 false
258 };
259 if done {
260 *record = Record::Ordinal(None);
261 }
262
263 let cursor = self.freezer.put(key, data).await?;
265
266 self.ordinal.put(index, cursor).await?;
268
269 Ok(())
270 }
271
272 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
273 self.gets.inc();
274
275 match identifier {
276 Identifier::Index(index) => self.get_index(index).await,
277 Identifier::Key(key) => self.get_key(key).await,
278 }
279 }
280
281 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
282 self.has.inc();
283
284 match identifier {
285 Identifier::Index(index) => Ok(self.ordinal.has(index)),
286 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
287 }
288 }
289
290 async fn sync(&mut self) -> Result<(), Error> {
291 self.syncs.inc();
292
293 let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
295 let checkpoint = freezer_result?;
296 ordinal_result?;
297
298 let freezer_key = U64::new(FREEZER_PREFIX, 0);
300 self.metadata.put(freezer_key, Record::Freezer(checkpoint));
301
302 self.metadata.sync().await?;
304
305 Ok(())
306 }
307
308 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
309 self.ordinal.next_gap(index)
310 }
311
312 fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
313 self.ordinal.missing_items(index, max)
314 }
315
316 fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
317 self.ordinal.ranges()
318 }
319
320 fn first_index(&self) -> Option<u64> {
321 self.ordinal.first_index()
322 }
323
324 fn last_index(&self) -> Option<u64> {
325 self.ordinal.last_index()
326 }
327
328 async fn close(mut self) -> Result<(), Error> {
329 self.ordinal.close().await?;
331
332 let checkpoint = self.freezer.close().await?;
334
335 let freezer_key = U64::new(FREEZER_PREFIX, 0);
337 self.metadata.put(freezer_key, Record::Freezer(checkpoint));
338
339 self.metadata.close().await?;
341
342 Ok(())
343 }
344
345 async fn destroy(self) -> Result<(), Error> {
346 self.ordinal.destroy().await?;
348
349 self.freezer.destroy().await?;
351
352 self.metadata.destroy().await?;
354
355 Ok(())
356 }
357}
358
359#[cfg(all(test, feature = "arbitrary"))]
360mod conformance {
361 use super::*;
362 use commonware_codec::conformance::CodecConformance;
363
364 commonware_conformance::conformance_tests! {
365 CodecConformance<Record>
366 }
367}