Skip to main content

commonware_storage/journal/segmented/
glob.rs

1//! Simple section-based blob storage for values.
2//!
3//! This module provides a minimal blob storage optimized for storing values where
4//! the size is tracked externally (in an index entry). Unlike the segmented variable
5//! journal, this format does not include a size prefix since the caller already
6//! knows the size.
7//!
8//! # Format
9//!
10//! Each entry is stored as:
11//!
12//! ```text
13//! +---+---+---+---+---+---+---+---+---+---+---+---+
14//! |     Compressed Data (variable)    |   CRC32   |
15//! +---+---+---+---+---+---+---+---+---+---+---+---+
16//! ```
17//!
18//! - **Compressed Data**: zstd compressed (if enabled) or raw codec output
19//! - **CRC32**: 4-byte checksum of the compressed data
20//!
21//! # Read Flow
22//!
23//! 1. Get `(offset, size)` from index entry
24//! 2. Read `size` bytes directly from blob at byte offset
25//! 3. Last 4 bytes are CRC32, verify it
26//! 4. Decompress remaining bytes if compression enabled
27//! 5. Decode value
28
29use super::manager::{Config as ManagerConfig, Manager, WriteFactory};
30use crate::journal::Error;
31use commonware_codec::{Codec, CodecShared, FixedSize};
32use commonware_cryptography::{crc32, Crc32};
33use commonware_runtime::{Blob as _, BufMut, Error as RError, IoBufMut, Metrics, Storage};
34use std::{io::Cursor, num::NonZeroUsize};
35use zstd::{bulk::compress, decode_all};
36
37/// Configuration for blob storage.
38#[derive(Clone)]
39pub struct Config<C> {
40    /// The partition to use for storing blobs.
41    pub partition: String,
42
43    /// Optional compression level (using `zstd`) to apply to data before storing.
44    pub compression: Option<u8>,
45
46    /// The codec configuration to use for encoding and decoding items.
47    pub codec_config: C,
48
49    /// The size of the write buffer to use for each blob.
50    pub write_buffer: NonZeroUsize,
51}
52
53/// Simple section-based blob storage for values.
54///
55/// Uses [`buffer::Write`](commonware_runtime::buffer::Write) for batching writes.
56/// Reads go directly to blobs without any caching (ideal for large values that
57/// shouldn't pollute a page cache).
58pub struct Glob<E: Storage + Metrics, V: Codec> {
59    manager: Manager<E, WriteFactory>,
60
61    /// Compression level (if enabled).
62    compression: Option<u8>,
63
64    /// Codec configuration.
65    codec_config: V::Cfg,
66}
67
68impl<E: Storage + Metrics, V: CodecShared> Glob<E, V> {
69    /// Initialize blob storage, opening existing section blobs.
70    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
71        let manager_cfg = ManagerConfig {
72            partition: cfg.partition,
73            factory: WriteFactory {
74                capacity: cfg.write_buffer,
75            },
76        };
77        let manager = Manager::init(context, manager_cfg).await?;
78
79        Ok(Self {
80            manager,
81            compression: cfg.compression,
82            codec_config: cfg.codec_config,
83        })
84    }
85
86    /// Append value to section, returns (offset, size).
87    ///
88    /// The returned offset is the byte offset where the entry was written.
89    /// The returned size is the total bytes written (compressed_data + crc32).
90    /// Both should be stored in the index entry for later retrieval.
91    pub async fn append(&mut self, section: u64, value: &V) -> Result<(u64, u32), Error> {
92        // Encode and optionally compress, then append checksum
93        let buf = if let Some(level) = self.compression {
94            // Compressed: encode first, then compress, then append checksum
95            let encoded = value.encode();
96            let mut compressed =
97                compress(&encoded, level as i32).map_err(|_| Error::CompressionFailed)?;
98            let checksum = Crc32::checksum(&compressed);
99            compressed.put_u32(checksum);
100            compressed
101        } else {
102            // Uncompressed: pre-allocate exact size to avoid copying
103            let entry_size = value.encode_size() + crc32::Digest::SIZE;
104            let mut buf = Vec::with_capacity(entry_size);
105            value.write(&mut buf);
106            let checksum = Crc32::checksum(&buf);
107            buf.put_u32(checksum);
108            buf
109        };
110
111        // Write to blob
112        let entry_size = u32::try_from(buf.len()).map_err(|_| Error::ValueTooLarge)?;
113        let writer = self.manager.get_or_create(section).await?;
114        let offset = writer.size().await;
115        writer.write_at(offset, buf).await.map_err(Error::Runtime)?;
116
117        Ok((offset, entry_size))
118    }
119
120    /// Read value at offset with known size (from index entry).
121    ///
122    /// The offset should be the byte offset returned by `append()`.
123    /// Reads directly from blob without any caching.
124    pub async fn get(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
125        let writer = self
126            .manager
127            .get(section)?
128            .ok_or(Error::SectionOutOfRange(section))?;
129
130        // Read via buffered writer (handles read-through for buffered data)
131        let buf = writer
132            .read_at(offset, IoBufMut::zeroed(size as usize))
133            .await?
134            .coalesce();
135
136        // Entry format: [compressed_data] [crc32 (4 bytes)]
137        if buf.len() < crc32::Digest::SIZE {
138            return Err(Error::Runtime(RError::BlobInsufficientLength));
139        }
140
141        let data_len = buf.len() - crc32::Digest::SIZE;
142        let compressed_data = &buf.as_ref()[..data_len];
143        let stored_checksum = u32::from_be_bytes(
144            buf.as_ref()[data_len..]
145                .try_into()
146                .expect("checksum is 4 bytes"),
147        );
148
149        // Verify checksum
150        let checksum = Crc32::checksum(compressed_data);
151        if checksum != stored_checksum {
152            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
153        }
154
155        // Decompress if needed and decode
156        let value = if self.compression.is_some() {
157            let decompressed =
158                decode_all(Cursor::new(compressed_data)).map_err(|_| Error::DecompressionFailed)?;
159            V::decode_cfg(decompressed.as_ref(), &self.codec_config).map_err(Error::Codec)?
160        } else {
161            V::decode_cfg(compressed_data, &self.codec_config).map_err(Error::Codec)?
162        };
163
164        Ok(value)
165    }
166
167    /// Sync section to disk (flushes write buffer).
168    pub async fn sync(&self, section: u64) -> Result<(), Error> {
169        self.manager.sync(section).await
170    }
171
172    /// Sync all sections to disk.
173    pub async fn sync_all(&self) -> Result<(), Error> {
174        self.manager.sync_all().await
175    }
176
177    /// Get the current size of a section (including buffered data).
178    pub async fn size(&self, section: u64) -> Result<u64, Error> {
179        self.manager.size(section).await
180    }
181
182    /// Rewind to a specific section and size.
183    ///
184    /// Truncates the section to the given size and removes all sections after it.
185    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
186        self.manager.rewind(section, size).await
187    }
188
189    /// Rewind only the given section to a specific size.
190    ///
191    /// Unlike `rewind`, this does not affect other sections.
192    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
193        self.manager.rewind_section(section, size).await
194    }
195
196    /// Prune sections before min.
197    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
198        self.manager.prune(min).await
199    }
200
201    /// Returns the number of the oldest section.
202    pub fn oldest_section(&self) -> Option<u64> {
203        self.manager.oldest_section()
204    }
205
206    /// Returns the number of the newest section.
207    pub fn newest_section(&self) -> Option<u64> {
208        self.manager.newest_section()
209    }
210
211    /// Returns an iterator over all section numbers.
212    pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
213        self.manager.sections()
214    }
215
216    /// Remove a specific section. Returns true if the section existed and was removed.
217    pub async fn remove_section(&mut self, section: u64) -> Result<bool, Error> {
218        self.manager.remove_section(section).await
219    }
220
221    /// Close all blobs (syncs first).
222    pub async fn close(&mut self) -> Result<(), Error> {
223        self.sync_all().await
224    }
225
226    /// Destroy all blobs.
227    pub async fn destroy(self) -> Result<(), Error> {
228        self.manager.destroy().await
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use commonware_macros::test_traced;
236    use commonware_runtime::{deterministic, Metrics, Runner};
237    use commonware_utils::NZUsize;
238
239    fn test_cfg() -> Config<()> {
240        Config {
241            partition: "test_partition".to_string(),
242            compression: None,
243            codec_config: (),
244            write_buffer: NZUsize!(1024),
245        }
246    }
247
248    #[test_traced]
249    fn test_glob_append_and_get() {
250        let executor = deterministic::Runner::default();
251        executor.start(|context| async move {
252            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
253                .await
254                .expect("Failed to init glob");
255
256            // Append a value
257            let value: i32 = 42;
258            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
259            assert_eq!(offset, 0);
260
261            // Get the value back
262            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
263            assert_eq!(retrieved, value);
264
265            // Sync and verify
266            glob.sync(1).await.expect("Failed to sync");
267            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
268            assert_eq!(retrieved, value);
269
270            glob.destroy().await.expect("Failed to destroy");
271        });
272    }
273
274    #[test_traced]
275    fn test_glob_multiple_values() {
276        let executor = deterministic::Runner::default();
277        executor.start(|context| async move {
278            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
279                .await
280                .expect("Failed to init glob");
281
282            // Append multiple values
283            let values: Vec<i32> = vec![1, 2, 3, 4, 5];
284            let mut locations = Vec::new();
285
286            for value in &values {
287                let (offset, size) = glob.append(1, value).await.expect("Failed to append");
288                locations.push((offset, size));
289            }
290
291            // Get all values back
292            for (i, (offset, size)) in locations.iter().enumerate() {
293                let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
294                assert_eq!(retrieved, values[i]);
295            }
296
297            glob.destroy().await.expect("Failed to destroy");
298        });
299    }
300
301    #[test_traced]
302    fn test_glob_with_compression() {
303        let executor = deterministic::Runner::default();
304        executor.start(|context| async move {
305            let cfg = Config {
306                partition: "test_partition".to_string(),
307                compression: Some(3), // zstd level 3
308                codec_config: (),
309                write_buffer: NZUsize!(1024),
310            };
311            let mut glob: Glob<_, [u8; 100]> = Glob::init(context.clone(), cfg)
312                .await
313                .expect("Failed to init glob");
314
315            // Append a value
316            let value: [u8; 100] = [0u8; 100]; // Compressible data
317            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
318
319            // Size should be smaller due to compression
320            assert!(size < 100 + 4);
321
322            // Get the value back
323            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
324            assert_eq!(retrieved, value);
325
326            glob.destroy().await.expect("Failed to destroy");
327        });
328    }
329
330    #[test_traced]
331    fn test_glob_prune() {
332        let executor = deterministic::Runner::default();
333        executor.start(|context| async move {
334            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
335                .await
336                .expect("Failed to init glob");
337
338            // Append to multiple sections
339            for section in 1..=5 {
340                glob.append(section, &(section as i32))
341                    .await
342                    .expect("Failed to append");
343                glob.sync(section).await.expect("Failed to sync");
344            }
345
346            // Prune sections < 3
347            glob.prune(3).await.expect("Failed to prune");
348
349            // Sections 1 and 2 should be gone
350            assert!(glob.get(1, 0, 8).await.is_err());
351            assert!(glob.get(2, 0, 8).await.is_err());
352
353            // Sections 3-5 should still exist
354            assert!(glob.manager.blobs.contains_key(&3));
355            assert!(glob.manager.blobs.contains_key(&4));
356            assert!(glob.manager.blobs.contains_key(&5));
357
358            glob.destroy().await.expect("Failed to destroy");
359        });
360    }
361
362    #[test_traced]
363    fn test_glob_checksum_mismatch() {
364        let executor = deterministic::Runner::default();
365        executor.start(|context| async move {
366            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
367                .await
368                .expect("Failed to init glob");
369
370            // Append a value
371            let value: i32 = 42;
372            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
373            glob.sync(1).await.expect("Failed to sync");
374
375            // Corrupt the data by writing directly to the underlying blob
376            let writer = glob.manager.blobs.get(&1).unwrap();
377            writer
378                .write_at(offset, vec![0xFF, 0xFF, 0xFF, 0xFF])
379                .await
380                .expect("Failed to corrupt");
381            writer.sync().await.expect("Failed to sync");
382
383            // Get should fail with checksum mismatch
384            let result = glob.get(1, offset, size).await;
385            assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
386
387            glob.destroy().await.expect("Failed to destroy");
388        });
389    }
390
391    #[test_traced]
392    fn test_glob_rewind() {
393        let executor = deterministic::Runner::default();
394        executor.start(|context| async move {
395            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
396                .await
397                .expect("Failed to init glob");
398
399            // Append multiple values and track sizes
400            let values: Vec<i32> = vec![1, 2, 3, 4, 5];
401            let mut locations = Vec::new();
402
403            for value in &values {
404                let (offset, size) = glob.append(1, value).await.expect("Failed to append");
405                locations.push((offset, size));
406            }
407            glob.sync(1).await.expect("Failed to sync");
408
409            // Rewind to after the third value
410            let (third_offset, third_size) = locations[2];
411            let rewind_size = third_offset + u64::from(third_size);
412            glob.rewind_section(1, rewind_size)
413                .await
414                .expect("Failed to rewind");
415
416            // First three values should still be readable
417            for (i, (offset, size)) in locations.iter().take(3).enumerate() {
418                let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
419                assert_eq!(retrieved, values[i]);
420            }
421
422            // Fourth and fifth values should fail (reading past end of blob)
423            let (fourth_offset, fourth_size) = locations[3];
424            let result = glob.get(1, fourth_offset, fourth_size).await;
425            assert!(result.is_err());
426
427            glob.destroy().await.expect("Failed to destroy");
428        });
429    }
430
431    #[test_traced]
432    fn test_glob_persistence() {
433        let executor = deterministic::Runner::default();
434        executor.start(|context| async move {
435            let cfg = test_cfg();
436
437            // Create and populate glob
438            let mut glob: Glob<_, i32> = Glob::init(context.with_label("first"), cfg.clone())
439                .await
440                .expect("Failed to init glob");
441
442            let value: i32 = 42;
443            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
444            glob.sync(1).await.expect("Failed to sync");
445            drop(glob);
446
447            // Reopen and verify
448            let glob: Glob<_, i32> = Glob::init(context.with_label("second"), cfg)
449                .await
450                .expect("Failed to reinit glob");
451
452            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
453            assert_eq!(retrieved, value);
454
455            glob.destroy().await.expect("Failed to destroy");
456        });
457    }
458
459    #[test_traced]
460    fn test_glob_get_invalid_size() {
461        let executor = deterministic::Runner::default();
462        executor.start(|context| async move {
463            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
464                .await
465                .expect("Failed to init glob");
466
467            let (offset, _size) = glob.append(1, &42).await.expect("Failed to append");
468            glob.sync(1).await.expect("Failed to sync");
469
470            // Size 0 - should fail
471            assert!(glob.get(1, offset, 0).await.is_err());
472
473            // Size < CRC_SIZE (1, 2, 3 bytes) - should fail with BlobInsufficientLength
474            for size in 1..4u32 {
475                let result = glob.get(1, offset, size).await;
476                assert!(matches!(
477                    result,
478                    Err(Error::Runtime(RError::BlobInsufficientLength))
479                ));
480            }
481
482            glob.destroy().await.expect("Failed to destroy");
483        });
484    }
485
486    #[test_traced]
487    fn test_glob_get_wrong_size() {
488        let executor = deterministic::Runner::default();
489        executor.start(|context| async move {
490            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
491                .await
492                .expect("Failed to init glob");
493
494            let (offset, correct_size) = glob.append(1, &42).await.expect("Failed to append");
495            glob.sync(1).await.expect("Failed to sync");
496
497            // Size too small (but >= CRC_SIZE) - checksum mismatch
498            let result = glob.get(1, offset, correct_size - 1).await;
499            assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
500
501            glob.destroy().await.expect("Failed to destroy");
502        });
503    }
504}