commonware_storage/archive/immutable/
storage.rs1use crate::{
2 archive::{immutable::Config, Error, Identifier},
3 freezer::{self, Checkpoint, Cursor, Freezer},
4 metadata::{self, Metadata},
5 ordinal::{self, Ordinal},
6};
7use commonware_codec::{CodecShared, EncodeSize, FixedSize, Read, ReadExt, Write};
8use commonware_runtime::{Buf, BufMut, Clock, Metrics, Storage};
9use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
10use futures::join;
11use prometheus_client::metrics::counter::Counter;
12use std::collections::BTreeMap;
13use tracing::debug;
14
15const FREEZER_PREFIX: u8 = 0;
17
18const ORDINAL_PREFIX: u8 = 1;
20
21#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
23enum Record {
24 Freezer(Checkpoint),
25 Ordinal(Option<BitMap>),
26}
27
28impl Record {
29 fn freezer(&self) -> &Checkpoint {
31 match self {
32 Self::Freezer(checkpoint) => checkpoint,
33 _ => panic!("incorrect record"),
34 }
35 }
36
37 fn ordinal(&self) -> &Option<BitMap> {
39 match self {
40 Self::Ordinal(indices) => indices,
41 _ => panic!("incorrect record"),
42 }
43 }
44}
45
46impl Write for Record {
47 fn write(&self, buf: &mut impl BufMut) {
48 match self {
49 Self::Freezer(checkpoint) => {
50 buf.put_u8(0);
51 checkpoint.write(buf);
52 }
53 Self::Ordinal(indices) => {
54 buf.put_u8(1);
55 indices.write(buf);
56 }
57 }
58 }
59}
60
61impl Read for Record {
62 type Cfg = ();
63 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
64 let tag = u8::read(buf)?;
65 match tag {
66 0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
67 1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
68 buf,
69 &(usize::MAX as u64),
70 )?)),
71 _ => Err(commonware_codec::Error::InvalidEnum(tag)),
72 }
73 }
74}
75
76impl EncodeSize for Record {
77 fn encode_size(&self) -> usize {
78 1 + match self {
79 Self::Freezer(_) => Checkpoint::SIZE,
80 Self::Ordinal(indices) => indices.encode_size(),
81 }
82 }
83}
84
85pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
87 items_per_section: u64,
89
90 metadata: Metadata<E, U64, Record>,
92
93 freezer: Freezer<E, K, V>,
95
96 ordinal: Ordinal<E, Cursor>,
98
99 gets: Counter,
101 has: Counter,
102 syncs: Counter,
103}
104
105impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Archive<E, K, V> {
106 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
108 let metadata = Metadata::<E, U64, Record>::init(
110 context.with_label("metadata"),
111 metadata::Config {
112 partition: cfg.metadata_partition,
113 codec_config: (),
114 },
115 )
116 .await?;
117
118 let freezer_key = U64::new(FREEZER_PREFIX, 0);
120 let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
121
122 let freezer = Freezer::init_with_checkpoint(
126 context.with_label("freezer"),
127 freezer::Config {
128 key_partition: cfg.freezer_key_partition,
129 key_write_buffer: cfg.freezer_key_write_buffer,
130 key_page_cache: cfg.freezer_key_page_cache,
131 value_partition: cfg.freezer_value_partition,
132 value_compression: cfg.freezer_value_compression,
133 value_write_buffer: cfg.freezer_value_write_buffer,
134 value_target_size: cfg.freezer_value_target_size,
135 table_partition: cfg.freezer_table_partition,
136 table_initial_size: cfg.freezer_table_initial_size,
137 table_resize_frequency: cfg.freezer_table_resize_frequency,
138 table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
139 table_replay_buffer: cfg.replay_buffer,
140 codec_config: cfg.codec_config,
141 },
142 checkpoint,
143 )
144 .await?;
145
146 let sections = metadata
148 .keys()
149 .filter(|k| k.prefix() == ORDINAL_PREFIX)
150 .collect::<Vec<_>>();
151 let mut section_bits = BTreeMap::new();
152 for section in sections {
153 let bits = metadata.get(section).unwrap().ordinal();
155
156 let section = section.value();
158 section_bits.insert(section, bits);
159 }
160
161 let ordinal = Ordinal::init_with_bits(
165 context.with_label("ordinal"),
166 ordinal::Config {
167 partition: cfg.ordinal_partition,
168 items_per_blob: cfg.items_per_section,
169 write_buffer: cfg.ordinal_write_buffer,
170 replay_buffer: cfg.replay_buffer,
171 },
172 Some(section_bits),
173 )
174 .await?;
175
176 let gets = Counter::default();
178 let has = Counter::default();
179 let syncs = Counter::default();
180 context.register("gets", "Number of gets performed", gets.clone());
181 context.register("has", "Number of has performed", has.clone());
182 context.register("syncs", "Number of syncs called", syncs.clone());
183
184 Ok(Self {
185 items_per_section: cfg.items_per_section.get(),
186 metadata,
187 freezer,
188 ordinal,
189 gets,
190 has,
191 syncs,
192 })
193 }
194
195 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
197 let Some(cursor) = self.ordinal.get(index).await? else {
199 return Ok(None);
200 };
201
202 let result = self
204 .freezer
205 .get(freezer::Identifier::Cursor(cursor))
206 .await?;
207
208 Ok(result)
210 }
211
212 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
214 let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
216
217 Ok(result)
219 }
220
221 async fn initialize_section(&mut self, section: u64) {
223 let bits = BitMap::zeroes(self.items_per_section);
225
226 let key = U64::new(ORDINAL_PREFIX, section);
228 self.metadata.put(key, Record::Ordinal(Some(bits)));
229 debug!(section, "initialized section");
230 }
231}
232
233impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> crate::archive::Archive
234 for Archive<E, K, V>
235{
236 type Key = K;
237 type Value = V;
238
239 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
240 if self.ordinal.has(index) {
242 return Ok(());
243 }
244
245 let section = index / self.items_per_section;
247 let ordinal_key = U64::new(ORDINAL_PREFIX, section);
248 if self.metadata.get(&ordinal_key).is_none() {
249 self.initialize_section(section).await;
250 }
251 let record = self.metadata.get_mut(&ordinal_key).unwrap();
252
253 let done = if let Record::Ordinal(Some(bits)) = record {
255 bits.set(index % self.items_per_section, true);
256 bits.count_ones() == self.items_per_section
257 } else {
258 false
259 };
260 if done {
261 *record = Record::Ordinal(None);
262 }
263
264 let cursor = self.freezer.put(key, data).await?;
266
267 self.ordinal.put(index, cursor).await?;
269
270 Ok(())
271 }
272
273 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
274 self.gets.inc();
275
276 match identifier {
277 Identifier::Index(index) => self.get_index(index).await,
278 Identifier::Key(key) => self.get_key(key).await,
279 }
280 }
281
282 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
283 self.has.inc();
284
285 match identifier {
286 Identifier::Index(index) => Ok(self.ordinal.has(index)),
287 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
288 }
289 }
290
291 async fn sync(&mut self) -> Result<(), Error> {
292 self.syncs.inc();
293
294 let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
296 let checkpoint = freezer_result?;
297 ordinal_result?;
298
299 let freezer_key = U64::new(FREEZER_PREFIX, 0);
301 self.metadata.put(freezer_key, Record::Freezer(checkpoint));
302
303 self.metadata.sync().await?;
305
306 Ok(())
307 }
308
309 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
310 self.ordinal.next_gap(index)
311 }
312
313 fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
314 self.ordinal.missing_items(index, max)
315 }
316
317 fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
318 self.ordinal.ranges()
319 }
320
321 fn first_index(&self) -> Option<u64> {
322 self.ordinal.first_index()
323 }
324
325 fn last_index(&self) -> Option<u64> {
326 self.ordinal.last_index()
327 }
328
329 async fn destroy(self) -> Result<(), Error> {
330 self.ordinal.destroy().await?;
332
333 self.freezer.destroy().await?;
335
336 self.metadata.destroy().await?;
338
339 Ok(())
340 }
341}
342
343#[cfg(all(test, feature = "arbitrary"))]
344mod conformance {
345 use super::*;
346 use commonware_codec::conformance::CodecConformance;
347
348 commonware_conformance::conformance_tests! {
349 CodecConformance<Record>
350 }
351}