commonware_storage/archive/immutable/
storage.rs1use crate::{
2 archive::{immutable::Config, Error, Identifier},
3 freezer::{self, Checkpoint, Cursor, Freezer},
4 metadata::{self, Metadata},
5 ordinal::{self, Ordinal},
6};
7use bytes::{Buf, BufMut};
8use commonware_codec::{Codec, EncodeSize, FixedSize, Read, ReadExt, Write};
9use commonware_runtime::{Clock, Metrics, Storage};
10use commonware_utils::{sequence::prefixed_u64::U64, Array, BitVec};
11use futures::join;
12use prometheus_client::metrics::counter::Counter;
13use std::collections::BTreeMap;
14use tracing::debug;
15
16const FREEZER_PREFIX: u8 = 0;
18
19const ORDINAL_PREFIX: u8 = 1;
21
22enum Record {
24 Freezer(Checkpoint),
25 Ordinal(Option<BitVec>),
26}
27
28impl Record {
29 fn freezer(&self) -> &Checkpoint {
31 match self {
32 Self::Freezer(checkpoint) => checkpoint,
33 _ => panic!("incorrect record"),
34 }
35 }
36
37 fn ordinal(&self) -> &Option<BitVec> {
39 match self {
40 Self::Ordinal(indices) => indices,
41 _ => panic!("incorrect record"),
42 }
43 }
44}
45
46impl Write for Record {
47 fn write(&self, buf: &mut impl BufMut) {
48 match self {
49 Self::Freezer(checkpoint) => {
50 buf.put_u8(0);
51 checkpoint.write(buf);
52 }
53 Self::Ordinal(indices) => {
54 buf.put_u8(1);
55 indices.write(buf);
56 }
57 }
58 }
59}
60
61impl Read for Record {
62 type Cfg = ();
63 fn read_cfg(buf: &mut impl Buf, _: &Self::Cfg) -> Result<Self, commonware_codec::Error> {
64 let tag = u8::read(buf)?;
65 match tag {
66 0 => Ok(Self::Freezer(Checkpoint::read(buf)?)),
67 1 => Ok(Self::Ordinal(Option::<BitVec>::read_cfg(
68 buf,
69 &(0..=usize::MAX).into(),
70 )?)),
71 _ => Err(commonware_codec::Error::InvalidEnum(tag)),
72 }
73 }
74}
75
76impl EncodeSize for Record {
77 fn encode_size(&self) -> usize {
78 1 + match self {
79 Self::Freezer(_) => Checkpoint::SIZE,
80 Self::Ordinal(indices) => indices.encode_size(),
81 }
82 }
83}
84
85pub struct Archive<E: Storage + Metrics + Clock, K: Array, V: Codec> {
87 items_per_section: u64,
89
90 metadata: Metadata<E, U64, Record>,
92
93 freezer: Freezer<E, K, V>,
95
96 ordinal: Ordinal<E, Cursor>,
98
99 gets: Counter,
101 has: Counter,
102 syncs: Counter,
103}
104
105impl<E: Storage + Metrics + Clock, K: Array, V: Codec> Archive<E, K, V> {
106 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
108 let metadata = Metadata::<E, U64, Record>::init(
110 context.with_label("metadata"),
111 metadata::Config {
112 partition: cfg.metadata_partition,
113 codec_config: (),
114 },
115 )
116 .await?;
117
118 let freezer_key = U64::new(FREEZER_PREFIX, 0);
120 let checkpoint = metadata.get(&freezer_key).map(|freezer| *freezer.freezer());
121
122 let freezer = Freezer::init_with_checkpoint(
126 context.with_label("freezer"),
127 freezer::Config {
128 journal_partition: cfg.freezer_journal_partition,
129 journal_compression: cfg.freezer_journal_compression,
130 journal_write_buffer: cfg.write_buffer,
131 journal_target_size: cfg.freezer_journal_target_size,
132 journal_buffer_pool: cfg.freezer_journal_buffer_pool,
133 table_partition: cfg.freezer_table_partition,
134 table_initial_size: cfg.freezer_table_initial_size,
135 table_resize_frequency: cfg.freezer_table_resize_frequency,
136 table_resize_chunk_size: cfg.freezer_table_resize_chunk_size,
137 table_replay_buffer: cfg.replay_buffer,
138 codec_config: cfg.codec_config,
139 },
140 checkpoint,
141 )
142 .await?;
143
144 let sections = metadata.keys(Some(&[ORDINAL_PREFIX])).collect::<Vec<_>>();
146 let mut section_bits = BTreeMap::new();
147 for section in sections {
148 let bits = metadata.get(section).unwrap().ordinal();
150
151 let section = section.value();
153 section_bits.insert(section, bits);
154 }
155
156 let ordinal = Ordinal::init_with_bits(
160 context.with_label("ordinal"),
161 ordinal::Config {
162 partition: cfg.ordinal_partition,
163 items_per_blob: cfg.items_per_section,
164 write_buffer: cfg.write_buffer,
165 replay_buffer: cfg.replay_buffer,
166 },
167 Some(section_bits),
168 )
169 .await?;
170
171 let gets = Counter::default();
173 let has = Counter::default();
174 let syncs = Counter::default();
175 context.register("gets", "Number of gets performed", gets.clone());
176 context.register("has", "Number of has performed", has.clone());
177 context.register("syncs", "Number of syncs called", syncs.clone());
178
179 Ok(Self {
180 items_per_section: cfg.items_per_section.get(),
181 metadata,
182 freezer,
183 ordinal,
184 gets,
185 has,
186 syncs,
187 })
188 }
189
190 async fn get_index(&self, index: u64) -> Result<Option<V>, Error> {
192 let Some(cursor) = self.ordinal.get(index).await? else {
194 return Ok(None);
195 };
196
197 let result = self
199 .freezer
200 .get(freezer::Identifier::Cursor(cursor))
201 .await?;
202
203 Ok(result)
205 }
206
207 async fn get_key(&self, key: &K) -> Result<Option<V>, Error> {
209 let result = self.freezer.get(freezer::Identifier::Key(key)).await?;
211
212 Ok(result)
214 }
215
216 async fn initialize_section(&mut self, section: u64) {
218 let bits = BitVec::zeroes(self.items_per_section as usize);
220
221 let key = U64::new(ORDINAL_PREFIX, section);
223 self.metadata.put(key, Record::Ordinal(Some(bits)));
224 debug!(section, "initialized section");
225 }
226}
227
228impl<E: Storage + Metrics + Clock, K: Array, V: Codec> crate::archive::Archive
229 for Archive<E, K, V>
230{
231 type Key = K;
232 type Value = V;
233
234 async fn put(&mut self, index: u64, key: K, data: V) -> Result<(), Error> {
235 if self.ordinal.has(index) {
237 return Ok(());
238 }
239
240 let section = index / self.items_per_section;
242 let ordinal_key = U64::new(ORDINAL_PREFIX, section);
243 if self.metadata.get(&ordinal_key).is_none() {
244 self.initialize_section(section).await;
245 }
246 let record = self.metadata.get_mut(&ordinal_key).unwrap();
247
248 let done = if let Record::Ordinal(Some(bits)) = record {
250 bits.set((index % self.items_per_section) as usize);
251 bits.count_ones() == self.items_per_section as usize
252 } else {
253 false
254 };
255 if done {
256 *record = Record::Ordinal(None);
257 }
258
259 let cursor = self.freezer.put(key, data).await?;
261
262 self.ordinal.put(index, cursor).await?;
264
265 Ok(())
266 }
267
268 async fn get(&self, identifier: Identifier<'_, K>) -> Result<Option<V>, Error> {
269 self.gets.inc();
270
271 match identifier {
272 Identifier::Index(index) => self.get_index(index).await,
273 Identifier::Key(key) => self.get_key(key).await,
274 }
275 }
276
277 async fn has(&self, identifier: Identifier<'_, K>) -> Result<bool, Error> {
278 self.has.inc();
279
280 match identifier {
281 Identifier::Index(index) => Ok(self.ordinal.has(index)),
282 Identifier::Key(key) => self.get_key(key).await.map(|result| result.is_some()),
283 }
284 }
285
286 async fn sync(&mut self) -> Result<(), Error> {
287 self.syncs.inc();
288
289 let (freezer_result, ordinal_result) = join!(self.freezer.sync(), self.ordinal.sync());
291 let checkpoint = freezer_result?;
292 ordinal_result?;
293
294 let freezer_key = U64::new(FREEZER_PREFIX, 0);
296 self.metadata.put(freezer_key, Record::Freezer(checkpoint));
297
298 self.metadata.sync().await?;
300
301 Ok(())
302 }
303
304 fn next_gap(&self, index: u64) -> (Option<u64>, Option<u64>) {
305 self.ordinal.next_gap(index)
306 }
307
308 async fn close(mut self) -> Result<(), Error> {
309 self.ordinal.close().await?;
311
312 let checkpoint = self.freezer.close().await?;
314
315 let freezer_key = U64::new(FREEZER_PREFIX, 0);
317 self.metadata.put(freezer_key, Record::Freezer(checkpoint));
318
319 self.metadata.close().await?;
321
322 Ok(())
323 }
324
325 async fn destroy(self) -> Result<(), Error> {
326 self.ordinal.destroy().await?;
328
329 self.freezer.destroy().await?;
331
332 self.metadata.destroy().await?;
334
335 Ok(())
336 }
337}