commonware_storage/archive/immutable/
storage.rs1use crate::{
2 archive::{immutable::Config, Error, Identifier},
3 freezer::{self, Checkpoint, Cursor, Freezer},
4 metadata::{self, Metadata},
5 ordinal::{self, Ordinal},
6};
7use bytes::{Buf, BufMut};
8use commonware_codec::{CodecShared, EncodeSize, FixedSize, Read, ReadExt, Write};
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::{bitmap::BitMap, sequence::prefixed_u64::U64, Array};
11use futures::join;
12use prometheus_client::metrics::counter::Counter;
13use std::collections::BTreeMap;
14use tracing::debug;
15
16const FREEZER_PREFIX: u8 = 0;
18
19const ORDINAL_PREFIX: u8 = 1;
21
22#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
24enum Record {
25 Freezer(Checkpoint),
26 Ordinal(Option<BitMap>),
27}
28
29impl Record {
30 fn freezer(&self) -> &Checkpoint {
32 match self {
33 Self::Freezer(checkpoint) => checkpoint,
34 _ => panic!("incorrect record"),
35 }
36 }
37
38 fn ordinal(&self) -> &Option<BitMap> {
40 match self {
41 Self::Ordinal(indices) => indices,
42 _ => panic!("incorrect record"),
43 }
44 }
45}
46
47impl Write for Record {
48 fn write(&self, buf: &mut impl BufMut) {
49 match self {
50 Self::Freezer(checkpoint) => {
51 buf.put_u8(0);
52 checkpoint.write(buf);
53 }
54 Self::Ordinal(indices) => {
55 buf.put_u8(1);
56 indices.write(buf);
57 }
58 }
59 }
60}
61
62impl Read for Record {
63 type Cfg = ();
64 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
65 let tag = u8::read(buf)?;
66 match tag {
67 0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
68 1 => Ok(Self::Ordinal(Option::<BitMap>::read_cfg(
69 buf,
70 &(usize::MAX as u64),
71 )?)),
72 _ => Err(commonware_codec::Error::InvalidEnum(tag)),
73 }
74 }
75}
76
77impl EncodeSize for Record {
78 fn encode_size(&self) -> usize {
79 1 + match self {
80 Self::Freezer(_) => Checkpoint::SIZE,
81 Self::Ordinal(indices) => indices.encode_size(),
82 }
83 }
84}
85
86pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: CodecShared> {
88 items_per_section: u64,
90
91 metadata: Metadata<E, U64, Record>,
93
94 freezer: Freezer<E, K, V>,
96
97 ordinal: Ordinal<E, Cursor>,
99
100 gets: Counter,
102 has: Counter,
103 syncs: Counter,
104}
105
106impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> Archive<E, K, V> {
107 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109 let metadata = Metadata::<E, U64, Record>::init(
111 context.with_label("metadata"),
112 metadata::Config {
113 partition: cfg.metadata_partition,
114 codec_config: (),
115 },
116 )
117 .await?;
118
119 let freezer_key = U64::new(FREEZER_PREFIX, 0);
121 let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
122
123 let freezer = Freezer::init_with_checkpoint(
127 context.with_label("freezer"),
128 freezer::Config {
129 key_partition: cfg.freezer_key_partition,
130 key_write_buffer: cfg.freezer_key_write_buffer,
131 key_buffer_pool: cfg.freezer_key_buffer_pool,
132 value_partition: cfg.freezer_value_partition,
133 value_compression: cfg.freezer_value_compression,
134 value_write_buffer: cfg.freezer_value_write_buffer,
135 value_target_size: cfg.freezer_value_target_size,
136 table_partition: cfg.freezer_table_partition,
137 table_initial_size: cfg.freezer_table_initial_size,
138 table_resize_frequency: cfg.freezer_table_resize_frequency,
139 table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
140 table_replay_buffer: cfg.replay_buffer,
141 codec_config: cfg.codec_config,
142 },
143 checkpoint,
144 )
145 .await?;
146
147 let sections = metadata
149 .keys()
150 .filter(|k| k.prefix() == ORDINAL_PREFIX)
151 .collect::<Vec<_>>();
152 let mut section_bits = BTreeMap::new();
153 for section in sections {
154 let bits = metadata.get(section).unwrap().ordinal();
156
157 let section = section.value();
159 section_bits.insert(section, bits);
160 }
161
162 let ordinal = Ordinal::init_with_bits(
166 context.with_label("ordinal"),
167 ordinal::Config {
168 partition: cfg.ordinal_partition,
169 items_per_blob: cfg.items_per_section,
170 write_buffer: cfg.ordinal_write_buffer,
171 replay_buffer: cfg.replay_buffer,
172 },
173 Some(section_bits),
174 )
175 .await?;
176
177 let gets = Counter::default();
179 let has = Counter::default();
180 let syncs = Counter::default();
181 context.register("gets", "Number of gets performed", gets.clone());
182 context.register("has", "Number of has performed", has.clone());
183 context.register("syncs", "Number of syncs called", syncs.clone());
184
185 Ok(Self {
186 items_per_section: cfg.items_per_section.get(),
187 metadata,
188 freezer,
189 ordinal,
190 gets,
191 has,
192 syncs,
193 })
194 }
195
196 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
198 let Some(cursor) = self.ordinal.get(index).await? else {
200 return Ok(None);
201 };
202
203 let result = self
205 .freezer
206 .get(freezer::Identifier::Cursor(cursor))
207 .await?;
208
209 Ok(result)
211 }
212
213 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
215 let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
217
218 Ok(result)
220 }
221
222 async fn initialize_section(&mut self, section: u64) {
224 let bits = BitMap::zeroes(self.items_per_section);
226
227 let key = U64::new(ORDINAL_PREFIX, section);
229 self.metadata.put(key, Record::Ordinal(Some(bits)));
230 debug!(section, "initialized section");
231 }
232}
233
234impl<E: Storage + Metrics + Clock, K: Array, V: CodecShared> crate::archive::Archive
235 for Archive<E, K, V>
236{
237 type Key = K;
238 type Value = V;
239
240 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
241 if self.ordinal.has(index) {
243 return Ok(());
244 }
245
246 let section = index / self.items_per_section;
248 let ordinal_key = U64::new(ORDINAL_PREFIX, section);
249 if self.metadata.get(&ordinal_key).is_none() {
250 self.initialize_section(section).await;
251 }
252 let record = self.metadata.get_mut(&ordinal_key).unwrap();
253
254 let done = if let Record::Ordinal(Some(bits)) = record {
256 bits.set(index % self.items_per_section, true);
257 bits.count_ones() == self.items_per_section
258 } else {
259 false
260 };
261 if done {
262 *record = Record::Ordinal(None);
263 }
264
265 let cursor = self.freezer.put(key, data).await?;
267
268 self.ordinal.put(index, cursor).await?;
270
271 Ok(())
272 }
273
274 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
275 self.gets.inc();
276
277 match identifier {
278 Identifier::Index(index) => self.get_index(index).await,
279 Identifier::Key(key) => self.get_key(key).await,
280 }
281 }
282
283 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
284 self.has.inc();
285
286 match identifier {
287 Identifier::Index(index) => Ok(self.ordinal.has(index)),
288 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
289 }
290 }
291
292 async fn sync(&mut self) -> Result<(), Error> {
293 self.syncs.inc();
294
295 let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
297 let checkpoint = freezer_result?;
298 ordinal_result?;
299
300 let freezer_key = U64::new(FREEZER_PREFIX, 0);
302 self.metadata.put(freezer_key, Record::Freezer(checkpoint));
303
304 self.metadata.sync().await?;
306
307 Ok(())
308 }
309
310 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
311 self.ordinal.next_gap(index)
312 }
313
314 fn missing_items(&self, index: u64, max: usize) -> Vec<u64> {
315 self.ordinal.missing_items(index, max)
316 }
317
318 fn ranges(&self) -> impl Iterator<Item = (u64, u64)> {
319 self.ordinal.ranges()
320 }
321
322 fn first_index(&self) -> Option<u64> {
323 self.ordinal.first_index()
324 }
325
326 fn last_index(&self) -> Option<u64> {
327 self.ordinal.last_index()
328 }
329
330 async fn destroy(self) -> Result<(), Error> {
331 self.ordinal.destroy().await?;
333
334 self.freezer.destroy().await?;
336
337 self.metadata.destroy().await?;
339
340 Ok(())
341 }
342}
343
344#[cfg(all(test, feature = "arbitrary"))]
345mod conformance {
346 use super::*;
347 use commonware_codec::conformance::CodecConformance;
348
349 commonware_conformance::conformance_tests! {
350 CodecConformance<Record>
351 }
352}