1use super::manager::{Config as ManagerConfig, Manager, WriteFactory};
30use crate::journal::Error;
31use commonware_codec::{Codec, CodecShared, FixedSize};
32use commonware_cryptography::{crc32, Crc32};
33use commonware_runtime::{Blob as _, BufMut, Error as RError, IoBufMut, Metrics, Storage};
34use std::{io::Cursor, num::NonZeroUsize};
35use zstd::{bulk::compress, decode_all};
36
37#[derive(Clone)]
39pub struct Config<C> {
40 pub partition: String,
42
43 pub compression: Option<u8>,
45
46 pub codec_config: C,
48
49 pub write_buffer: NonZeroUsize,
51}
52
53pub struct Glob<E: Storage + Metrics, V: Codec> {
59 manager: Manager<E, WriteFactory>,
60
61 compression: Option<u8>,
63
64 codec_config: V::Cfg,
66}
67
68impl<E: Storage + Metrics, V: CodecShared> Glob<E, V> {
69 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
71 let manager_cfg = ManagerConfig {
72 partition: cfg.partition,
73 factory: WriteFactory {
74 capacity: cfg.write_buffer,
75 },
76 };
77 let manager = Manager::init(context, manager_cfg).await?;
78
79 Ok(Self {
80 manager,
81 compression: cfg.compression,
82 codec_config: cfg.codec_config,
83 })
84 }
85
86 pub async fn append(&mut self, section: u64, value: &V) -> Result<(u64, u32), Error> {
92 let buf = if let Some(level) = self.compression {
94 let encoded = value.encode();
96 let mut compressed =
97 compress(&encoded, level as i32).map_err(|_| Error::CompressionFailed)?;
98 let checksum = Crc32::checksum(&compressed);
99 compressed.put_u32(checksum);
100 compressed
101 } else {
102 let entry_size = value.encode_size() + crc32::Digest::SIZE;
104 let mut buf = Vec::with_capacity(entry_size);
105 value.write(&mut buf);
106 let checksum = Crc32::checksum(&buf);
107 buf.put_u32(checksum);
108 buf
109 };
110
111 let entry_size = u32::try_from(buf.len()).map_err(|_| Error::ValueTooLarge)?;
113 let writer = self.manager.get_or_create(section).await?;
114 let offset = writer.size().await;
115 writer.write_at(offset, buf).await.map_err(Error::Runtime)?;
116
117 Ok((offset, entry_size))
118 }
119
120 pub async fn get(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
125 let writer = self
126 .manager
127 .get(section)?
128 .ok_or(Error::SectionOutOfRange(section))?;
129
130 let buf = writer
132 .read_at(offset, IoBufMut::zeroed(size as usize))
133 .await?
134 .coalesce();
135
136 if buf.len() < crc32::Digest::SIZE {
138 return Err(Error::Runtime(RError::BlobInsufficientLength));
139 }
140
141 let data_len = buf.len() - crc32::Digest::SIZE;
142 let compressed_data = &buf.as_ref()[..data_len];
143 let stored_checksum = u32::from_be_bytes(
144 buf.as_ref()[data_len..]
145 .try_into()
146 .expect("checksum is 4 bytes"),
147 );
148
149 let checksum = Crc32::checksum(compressed_data);
151 if checksum != stored_checksum {
152 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
153 }
154
155 let value = if self.compression.is_some() {
157 let decompressed =
158 decode_all(Cursor::new(compressed_data)).map_err(|_| Error::DecompressionFailed)?;
159 V::decode_cfg(decompressed.as_ref(), &self.codec_config).map_err(Error::Codec)?
160 } else {
161 V::decode_cfg(compressed_data, &self.codec_config).map_err(Error::Codec)?
162 };
163
164 Ok(value)
165 }
166
167 pub async fn sync(&self, section: u64) -> Result<(), Error> {
169 self.manager.sync(section).await
170 }
171
172 pub async fn sync_all(&self) -> Result<(), Error> {
174 self.manager.sync_all().await
175 }
176
177 pub async fn size(&self, section: u64) -> Result<u64, Error> {
179 self.manager.size(section).await
180 }
181
182 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
186 self.manager.rewind(section, size).await
187 }
188
189 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
193 self.manager.rewind_section(section, size).await
194 }
195
196 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
198 self.manager.prune(min).await
199 }
200
201 pub fn oldest_section(&self) -> Option<u64> {
203 self.manager.oldest_section()
204 }
205
206 pub fn newest_section(&self) -> Option<u64> {
208 self.manager.newest_section()
209 }
210
211 pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
213 self.manager.sections()
214 }
215
216 pub async fn remove_section(&mut self, section: u64) -> Result<bool, Error> {
218 self.manager.remove_section(section).await
219 }
220
221 pub async fn close(&mut self) -> Result<(), Error> {
223 self.sync_all().await
224 }
225
226 pub async fn destroy(self) -> Result<(), Error> {
228 self.manager.destroy().await
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use commonware_macros::test_traced;
236 use commonware_runtime::{deterministic, Metrics, Runner};
237 use commonware_utils::NZUsize;
238
239 fn test_cfg() -> Config<()> {
240 Config {
241 partition: "test_partition".to_string(),
242 compression: None,
243 codec_config: (),
244 write_buffer: NZUsize!(1024),
245 }
246 }
247
248 #[test_traced]
249 fn test_glob_append_and_get() {
250 let executor = deterministic::Runner::default();
251 executor.start(|context| async move {
252 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
253 .await
254 .expect("Failed to init glob");
255
256 let value: i32 = 42;
258 let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
259 assert_eq!(offset, 0);
260
261 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
263 assert_eq!(retrieved, value);
264
265 glob.sync(1).await.expect("Failed to sync");
267 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
268 assert_eq!(retrieved, value);
269
270 glob.destroy().await.expect("Failed to destroy");
271 });
272 }
273
274 #[test_traced]
275 fn test_glob_multiple_values() {
276 let executor = deterministic::Runner::default();
277 executor.start(|context| async move {
278 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
279 .await
280 .expect("Failed to init glob");
281
282 let values: Vec<i32> = vec![1, 2, 3, 4, 5];
284 let mut locations = Vec::new();
285
286 for value in &values {
287 let (offset, size) = glob.append(1, value).await.expect("Failed to append");
288 locations.push((offset, size));
289 }
290
291 for (i, (offset, size)) in locations.iter().enumerate() {
293 let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
294 assert_eq!(retrieved, values[i]);
295 }
296
297 glob.destroy().await.expect("Failed to destroy");
298 });
299 }
300
301 #[test_traced]
302 fn test_glob_with_compression() {
303 let executor = deterministic::Runner::default();
304 executor.start(|context| async move {
305 let cfg = Config {
306 partition: "test_partition".to_string(),
307 compression: Some(3), codec_config: (),
309 write_buffer: NZUsize!(1024),
310 };
311 let mut glob: Glob<_, [u8; 100]> = Glob::init(context.clone(), cfg)
312 .await
313 .expect("Failed to init glob");
314
315 let value: [u8; 100] = [0u8; 100]; let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
318
319 assert!(size < 100 + 4);
321
322 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
324 assert_eq!(retrieved, value);
325
326 glob.destroy().await.expect("Failed to destroy");
327 });
328 }
329
330 #[test_traced]
331 fn test_glob_prune() {
332 let executor = deterministic::Runner::default();
333 executor.start(|context| async move {
334 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
335 .await
336 .expect("Failed to init glob");
337
338 for section in 1..=5 {
340 glob.append(section, &(section as i32))
341 .await
342 .expect("Failed to append");
343 glob.sync(section).await.expect("Failed to sync");
344 }
345
346 glob.prune(3).await.expect("Failed to prune");
348
349 assert!(glob.get(1, 0, 8).await.is_err());
351 assert!(glob.get(2, 0, 8).await.is_err());
352
353 assert!(glob.manager.blobs.contains_key(&3));
355 assert!(glob.manager.blobs.contains_key(&4));
356 assert!(glob.manager.blobs.contains_key(&5));
357
358 glob.destroy().await.expect("Failed to destroy");
359 });
360 }
361
362 #[test_traced]
363 fn test_glob_checksum_mismatch() {
364 let executor = deterministic::Runner::default();
365 executor.start(|context| async move {
366 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
367 .await
368 .expect("Failed to init glob");
369
370 let value: i32 = 42;
372 let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
373 glob.sync(1).await.expect("Failed to sync");
374
375 let writer = glob.manager.blobs.get(&1).unwrap();
377 writer
378 .write_at(offset, vec![0xFF, 0xFF, 0xFF, 0xFF])
379 .await
380 .expect("Failed to corrupt");
381 writer.sync().await.expect("Failed to sync");
382
383 let result = glob.get(1, offset, size).await;
385 assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
386
387 glob.destroy().await.expect("Failed to destroy");
388 });
389 }
390
391 #[test_traced]
392 fn test_glob_rewind() {
393 let executor = deterministic::Runner::default();
394 executor.start(|context| async move {
395 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
396 .await
397 .expect("Failed to init glob");
398
399 let values: Vec<i32> = vec![1, 2, 3, 4, 5];
401 let mut locations = Vec::new();
402
403 for value in &values {
404 let (offset, size) = glob.append(1, value).await.expect("Failed to append");
405 locations.push((offset, size));
406 }
407 glob.sync(1).await.expect("Failed to sync");
408
409 let (third_offset, third_size) = locations[2];
411 let rewind_size = third_offset + u64::from(third_size);
412 glob.rewind_section(1, rewind_size)
413 .await
414 .expect("Failed to rewind");
415
416 for (i, (offset, size)) in locations.iter().take(3).enumerate() {
418 let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
419 assert_eq!(retrieved, values[i]);
420 }
421
422 let (fourth_offset, fourth_size) = locations[3];
424 let result = glob.get(1, fourth_offset, fourth_size).await;
425 assert!(result.is_err());
426
427 glob.destroy().await.expect("Failed to destroy");
428 });
429 }
430
431 #[test_traced]
432 fn test_glob_persistence() {
433 let executor = deterministic::Runner::default();
434 executor.start(|context| async move {
435 let cfg = test_cfg();
436
437 let mut glob: Glob<_, i32> = Glob::init(context.with_label("first"), cfg.clone())
439 .await
440 .expect("Failed to init glob");
441
442 let value: i32 = 42;
443 let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
444 glob.sync(1).await.expect("Failed to sync");
445 drop(glob);
446
447 let glob: Glob<_, i32> = Glob::init(context.with_label("second"), cfg)
449 .await
450 .expect("Failed to reinit glob");
451
452 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
453 assert_eq!(retrieved, value);
454
455 glob.destroy().await.expect("Failed to destroy");
456 });
457 }
458
459 #[test_traced]
460 fn test_glob_get_invalid_size() {
461 let executor = deterministic::Runner::default();
462 executor.start(|context| async move {
463 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
464 .await
465 .expect("Failed to init glob");
466
467 let (offset, _size) = glob.append(1, &42).await.expect("Failed to append");
468 glob.sync(1).await.expect("Failed to sync");
469
470 assert!(glob.get(1, offset, 0).await.is_err());
472
473 for size in 1..4u32 {
475 let result = glob.get(1, offset, size).await;
476 assert!(matches!(
477 result,
478 Err(Error::Runtime(RError::BlobInsufficientLength))
479 ));
480 }
481
482 glob.destroy().await.expect("Failed to destroy");
483 });
484 }
485
486 #[test_traced]
487 fn test_glob_get_wrong_size() {
488 let executor = deterministic::Runner::default();
489 executor.start(|context| async move {
490 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
491 .await
492 .expect("Failed to init glob");
493
494 let (offset, correct_size) = glob.append(1, &42).await.expect("Failed to append");
495 glob.sync(1).await.expect("Failed to sync");
496
497 let result = glob.get(1, offset, correct_size - 1).await;
499 assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
500
501 glob.destroy().await.expect("Failed to destroy");
502 });
503 }
504}