commonware_storage/journal/segmented/
glob.rs

1//! Simple section-based blob storage for values.
2//!
3//! This module provides a minimal blob storage optimized for storing values where
4//! the size is tracked externally (in an index entry). Unlike the segmented variable
5//! journal, this format does not include a size prefix since the caller already
6//! knows the size.
7//!
8//! # Format
9//!
10//! Each entry is stored as:
11//!
12//! ```text
13//! +---+---+---+---+---+---+---+---+---+---+---+---+
14//! |     Compressed Data (variable)    |   CRC32   |
15//! +---+---+---+---+---+---+---+---+---+---+---+---+
16//! ```
17//!
18//! - **Compressed Data**: zstd compressed (if enabled) or raw codec output
19//! - **CRC32**: 4-byte checksum of the compressed data
20//!
21//! # Read Flow
22//!
23//! 1. Get `(offset, size)` from index entry
24//! 2. Read `size` bytes directly from blob at byte offset
25//! 3. Last 4 bytes are CRC32, verify it
26//! 4. Decompress remaining bytes if compression enabled
27//! 5. Decode value
28
29use super::manager::{Config as ManagerConfig, Manager, WriteFactory};
30use crate::journal::Error;
31use bytes::BufMut;
32use commonware_codec::{Codec, CodecShared, FixedSize};
33use commonware_cryptography::{crc32, Crc32};
34use commonware_runtime::{Blob as _, Error as RError, Metrics, Storage};
35use std::{io::Cursor, num::NonZeroUsize};
36use zstd::{bulk::compress, decode_all};
37
38/// Configuration for blob storage.
39#[derive(Clone)]
40pub struct Config<C> {
41    /// The partition to use for storing blobs.
42    pub partition: String,
43
44    /// Optional compression level (using `zstd`) to apply to data before storing.
45    pub compression: Option<u8>,
46
47    /// The codec configuration to use for encoding and decoding items.
48    pub codec_config: C,
49
50    /// The size of the write buffer to use for each blob.
51    pub write_buffer: NonZeroUsize,
52}
53
54/// Simple section-based blob storage for values.
55///
56/// Uses [`buffer::Write`](commonware_runtime::buffer::Write) for batching writes.
57/// Reads go directly to blobs without any caching (ideal for large values that
58/// shouldn't pollute a buffer pool cache).
59pub struct Glob<E: Storage + Metrics, V: Codec> {
60    manager: Manager<E, WriteFactory>,
61
62    /// Compression level (if enabled).
63    compression: Option<u8>,
64
65    /// Codec configuration.
66    codec_config: V::Cfg,
67}
68
69impl<E: Storage + Metrics, V: CodecShared> Glob<E, V> {
70    /// Initialize blob storage, opening existing section blobs.
71    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
72        let manager_cfg = ManagerConfig {
73            partition: cfg.partition,
74            factory: WriteFactory {
75                capacity: cfg.write_buffer,
76            },
77        };
78        let manager = Manager::init(context, manager_cfg).await?;
79
80        Ok(Self {
81            manager,
82            compression: cfg.compression,
83            codec_config: cfg.codec_config,
84        })
85    }
86
87    /// Append value to section, returns (offset, size).
88    ///
89    /// The returned offset is the byte offset where the entry was written.
90    /// The returned size is the total bytes written (compressed_data + crc32).
91    /// Both should be stored in the index entry for later retrieval.
92    pub async fn append(&mut self, section: u64, value: &V) -> Result<(u64, u32), Error> {
93        // Encode and optionally compress, then append checksum
94        let buf = if let Some(level) = self.compression {
95            // Compressed: encode first, then compress, then append checksum
96            let encoded = value.encode();
97            let mut compressed =
98                compress(&encoded, level as i32).map_err(|_| Error::CompressionFailed)?;
99            let checksum = Crc32::checksum(&compressed);
100            compressed.put_u32(checksum);
101            compressed
102        } else {
103            // Uncompressed: pre-allocate exact size to avoid copying
104            let entry_size = value.encode_size() + crc32::Digest::SIZE;
105            let mut buf = Vec::with_capacity(entry_size);
106            value.write(&mut buf);
107            let checksum = Crc32::checksum(&buf);
108            buf.put_u32(checksum);
109            buf
110        };
111
112        // Write to blob
113        let entry_size = u32::try_from(buf.len()).map_err(|_| Error::ValueTooLarge)?;
114        let writer = self.manager.get_or_create(section).await?;
115        let offset = writer.size().await;
116        writer.write_at(buf, offset).await.map_err(Error::Runtime)?;
117
118        Ok((offset, entry_size))
119    }
120
121    /// Read value at offset with known size (from index entry).
122    ///
123    /// The offset should be the byte offset returned by `append()`.
124    /// Reads directly from blob without any caching.
125    pub async fn get(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
126        let writer = self
127            .manager
128            .get(section)?
129            .ok_or(Error::SectionOutOfRange(section))?;
130
131        let size_usize = size as usize;
132
133        // Read via buffered writer (handles read-through for buffered data)
134        let buf = writer.read_at(vec![0u8; size_usize], offset).await?;
135        let buf = buf.as_ref();
136
137        // Entry format: [compressed_data] [crc32 (4 bytes)]
138        if buf.len() < crc32::Digest::SIZE {
139            return Err(Error::Runtime(RError::BlobInsufficientLength));
140        }
141
142        let data_len = buf.len() - crc32::Digest::SIZE;
143        let compressed_data = &buf[..data_len];
144        let stored_checksum =
145            u32::from_be_bytes(buf[data_len..].try_into().expect("checksum is 4 bytes"));
146
147        // Verify checksum
148        let checksum = Crc32::checksum(compressed_data);
149        if checksum != stored_checksum {
150            return Err(Error::ChecksumMismatch(stored_checksum, checksum));
151        }
152
153        // Decompress if needed and decode
154        let value = if self.compression.is_some() {
155            let decompressed =
156                decode_all(Cursor::new(compressed_data)).map_err(|_| Error::DecompressionFailed)?;
157            V::decode_cfg(decompressed.as_ref(), &self.codec_config).map_err(Error::Codec)?
158        } else {
159            V::decode_cfg(compressed_data, &self.codec_config).map_err(Error::Codec)?
160        };
161
162        Ok(value)
163    }
164
165    /// Sync section to disk (flushes write buffer).
166    pub async fn sync(&self, section: u64) -> Result<(), Error> {
167        self.manager.sync(section).await
168    }
169
170    /// Sync all sections to disk.
171    pub async fn sync_all(&self) -> Result<(), Error> {
172        self.manager.sync_all().await
173    }
174
175    /// Get the current size of a section (including buffered data).
176    pub async fn size(&self, section: u64) -> Result<u64, Error> {
177        self.manager.size(section).await
178    }
179
180    /// Rewind to a specific section and size.
181    ///
182    /// Truncates the section to the given size and removes all sections after it.
183    pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
184        self.manager.rewind(section, size).await
185    }
186
187    /// Rewind only the given section to a specific size.
188    ///
189    /// Unlike `rewind`, this does not affect other sections.
190    pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
191        self.manager.rewind_section(section, size).await
192    }
193
194    /// Prune sections before min.
195    pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
196        self.manager.prune(min).await
197    }
198
199    /// Returns the number of the oldest section.
200    pub fn oldest_section(&self) -> Option<u64> {
201        self.manager.oldest_section()
202    }
203
204    /// Returns the number of the newest section.
205    pub fn newest_section(&self) -> Option<u64> {
206        self.manager.newest_section()
207    }
208
209    /// Returns an iterator over all section numbers.
210    pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
211        self.manager.sections()
212    }
213
214    /// Remove a specific section. Returns true if the section existed and was removed.
215    pub async fn remove_section(&mut self, section: u64) -> Result<bool, Error> {
216        self.manager.remove_section(section).await
217    }
218
219    /// Close all blobs (syncs first).
220    pub async fn close(&mut self) -> Result<(), Error> {
221        self.sync_all().await
222    }
223
224    /// Destroy all blobs.
225    pub async fn destroy(self) -> Result<(), Error> {
226        self.manager.destroy().await
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233    use commonware_macros::test_traced;
234    use commonware_runtime::{deterministic, Runner};
235    use commonware_utils::NZUsize;
236
237    fn test_cfg() -> Config<()> {
238        Config {
239            partition: "test_partition".to_string(),
240            compression: None,
241            codec_config: (),
242            write_buffer: NZUsize!(1024),
243        }
244    }
245
246    #[test_traced]
247    fn test_glob_append_and_get() {
248        let executor = deterministic::Runner::default();
249        executor.start(|context| async move {
250            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
251                .await
252                .expect("Failed to init glob");
253
254            // Append a value
255            let value: i32 = 42;
256            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
257            assert_eq!(offset, 0);
258
259            // Get the value back
260            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
261            assert_eq!(retrieved, value);
262
263            // Sync and verify
264            glob.sync(1).await.expect("Failed to sync");
265            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
266            assert_eq!(retrieved, value);
267
268            glob.destroy().await.expect("Failed to destroy");
269        });
270    }
271
272    #[test_traced]
273    fn test_glob_multiple_values() {
274        let executor = deterministic::Runner::default();
275        executor.start(|context| async move {
276            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
277                .await
278                .expect("Failed to init glob");
279
280            // Append multiple values
281            let values: Vec<i32> = vec![1, 2, 3, 4, 5];
282            let mut locations = Vec::new();
283
284            for value in &values {
285                let (offset, size) = glob.append(1, value).await.expect("Failed to append");
286                locations.push((offset, size));
287            }
288
289            // Get all values back
290            for (i, (offset, size)) in locations.iter().enumerate() {
291                let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
292                assert_eq!(retrieved, values[i]);
293            }
294
295            glob.destroy().await.expect("Failed to destroy");
296        });
297    }
298
299    #[test_traced]
300    fn test_glob_with_compression() {
301        let executor = deterministic::Runner::default();
302        executor.start(|context| async move {
303            let cfg = Config {
304                partition: "test_partition".to_string(),
305                compression: Some(3), // zstd level 3
306                codec_config: (),
307                write_buffer: NZUsize!(1024),
308            };
309            let mut glob: Glob<_, [u8; 100]> = Glob::init(context.clone(), cfg)
310                .await
311                .expect("Failed to init glob");
312
313            // Append a value
314            let value: [u8; 100] = [0u8; 100]; // Compressible data
315            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
316
317            // Size should be smaller due to compression
318            assert!(size < 100 + 4);
319
320            // Get the value back
321            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
322            assert_eq!(retrieved, value);
323
324            glob.destroy().await.expect("Failed to destroy");
325        });
326    }
327
328    #[test_traced]
329    fn test_glob_prune() {
330        let executor = deterministic::Runner::default();
331        executor.start(|context| async move {
332            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
333                .await
334                .expect("Failed to init glob");
335
336            // Append to multiple sections
337            for section in 1..=5 {
338                glob.append(section, &(section as i32))
339                    .await
340                    .expect("Failed to append");
341                glob.sync(section).await.expect("Failed to sync");
342            }
343
344            // Prune sections < 3
345            glob.prune(3).await.expect("Failed to prune");
346
347            // Sections 1 and 2 should be gone
348            assert!(glob.get(1, 0, 8).await.is_err());
349            assert!(glob.get(2, 0, 8).await.is_err());
350
351            // Sections 3-5 should still exist
352            assert!(glob.manager.blobs.contains_key(&3));
353            assert!(glob.manager.blobs.contains_key(&4));
354            assert!(glob.manager.blobs.contains_key(&5));
355
356            glob.destroy().await.expect("Failed to destroy");
357        });
358    }
359
360    #[test_traced]
361    fn test_glob_checksum_mismatch() {
362        let executor = deterministic::Runner::default();
363        executor.start(|context| async move {
364            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
365                .await
366                .expect("Failed to init glob");
367
368            // Append a value
369            let value: i32 = 42;
370            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
371            glob.sync(1).await.expect("Failed to sync");
372
373            // Corrupt the data by writing directly to the underlying blob
374            let writer = glob.manager.blobs.get(&1).unwrap();
375            writer
376                .write_at(vec![0xFF, 0xFF, 0xFF, 0xFF], offset)
377                .await
378                .expect("Failed to corrupt");
379            writer.sync().await.expect("Failed to sync");
380
381            // Get should fail with checksum mismatch
382            let result = glob.get(1, offset, size).await;
383            assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
384
385            glob.destroy().await.expect("Failed to destroy");
386        });
387    }
388
389    #[test_traced]
390    fn test_glob_rewind() {
391        let executor = deterministic::Runner::default();
392        executor.start(|context| async move {
393            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
394                .await
395                .expect("Failed to init glob");
396
397            // Append multiple values and track sizes
398            let values: Vec<i32> = vec![1, 2, 3, 4, 5];
399            let mut locations = Vec::new();
400
401            for value in &values {
402                let (offset, size) = glob.append(1, value).await.expect("Failed to append");
403                locations.push((offset, size));
404            }
405            glob.sync(1).await.expect("Failed to sync");
406
407            // Rewind to after the third value
408            let (third_offset, third_size) = locations[2];
409            let rewind_size = third_offset + u64::from(third_size);
410            glob.rewind_section(1, rewind_size)
411                .await
412                .expect("Failed to rewind");
413
414            // First three values should still be readable
415            for (i, (offset, size)) in locations.iter().take(3).enumerate() {
416                let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
417                assert_eq!(retrieved, values[i]);
418            }
419
420            // Fourth and fifth values should fail (reading past end of blob)
421            let (fourth_offset, fourth_size) = locations[3];
422            let result = glob.get(1, fourth_offset, fourth_size).await;
423            assert!(result.is_err());
424
425            glob.destroy().await.expect("Failed to destroy");
426        });
427    }
428
429    #[test_traced]
430    fn test_glob_persistence() {
431        let executor = deterministic::Runner::default();
432        executor.start(|context| async move {
433            let cfg = test_cfg();
434
435            // Create and populate glob
436            let mut glob: Glob<_, i32> = Glob::init(context.clone(), cfg.clone())
437                .await
438                .expect("Failed to init glob");
439
440            let value: i32 = 42;
441            let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
442            glob.sync(1).await.expect("Failed to sync");
443            drop(glob);
444
445            // Reopen and verify
446            let glob: Glob<_, i32> = Glob::init(context.clone(), cfg)
447                .await
448                .expect("Failed to reinit glob");
449
450            let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
451            assert_eq!(retrieved, value);
452
453            glob.destroy().await.expect("Failed to destroy");
454        });
455    }
456
457    #[test_traced]
458    fn test_glob_get_invalid_size() {
459        let executor = deterministic::Runner::default();
460        executor.start(|context| async move {
461            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
462                .await
463                .expect("Failed to init glob");
464
465            let (offset, _size) = glob.append(1, &42).await.expect("Failed to append");
466            glob.sync(1).await.expect("Failed to sync");
467
468            // Size 0 - should fail
469            assert!(glob.get(1, offset, 0).await.is_err());
470
471            // Size < CRC_SIZE (1, 2, 3 bytes) - should fail with BlobInsufficientLength
472            for size in 1..4u32 {
473                let result = glob.get(1, offset, size).await;
474                assert!(matches!(
475                    result,
476                    Err(Error::Runtime(RError::BlobInsufficientLength))
477                ));
478            }
479
480            glob.destroy().await.expect("Failed to destroy");
481        });
482    }
483
484    #[test_traced]
485    fn test_glob_get_wrong_size() {
486        let executor = deterministic::Runner::default();
487        executor.start(|context| async move {
488            let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
489                .await
490                .expect("Failed to init glob");
491
492            let (offset, correct_size) = glob.append(1, &42).await.expect("Failed to append");
493            glob.sync(1).await.expect("Failed to sync");
494
495            // Size too small (but >= CRC_SIZE) - checksum mismatch
496            let result = glob.get(1, offset, correct_size - 1).await;
497            assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
498
499            glob.destroy().await.expect("Failed to destroy");
500        });
501    }
502}