1use super::manager::{Config as ManagerConfig, Manager, WriteFactory};
30use crate::journal::Error;
31use bytes::BufMut;
32use commonware_codec::{Codec, CodecShared, FixedSize};
33use commonware_cryptography::{crc32, Crc32};
34use commonware_runtime::{Blob as _, Error as RError, Metrics, Storage};
35use std::{io::Cursor, num::NonZeroUsize};
36use zstd::{bulk::compress, decode_all};
37
38#[derive(Clone)]
40pub struct Config<C> {
41 pub partition: String,
43
44 pub compression: Option<u8>,
46
47 pub codec_config: C,
49
50 pub write_buffer: NonZeroUsize,
52}
53
54pub struct Glob<E: Storage + Metrics, V: Codec> {
60 manager: Manager<E, WriteFactory>,
61
62 compression: Option<u8>,
64
65 codec_config: V::Cfg,
67}
68
69impl<E: Storage + Metrics, V: CodecShared> Glob<E, V> {
70 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
72 let manager_cfg = ManagerConfig {
73 partition: cfg.partition,
74 factory: WriteFactory {
75 capacity: cfg.write_buffer,
76 },
77 };
78 let manager = Manager::init(context, manager_cfg).await?;
79
80 Ok(Self {
81 manager,
82 compression: cfg.compression,
83 codec_config: cfg.codec_config,
84 })
85 }
86
87 pub async fn append(&mut self, section: u64, value: &V) -> Result<(u64, u32), Error> {
93 let buf = if let Some(level) = self.compression {
95 let encoded = value.encode();
97 let mut compressed =
98 compress(&encoded, level as i32).map_err(|_| Error::CompressionFailed)?;
99 let checksum = Crc32::checksum(&compressed);
100 compressed.put_u32(checksum);
101 compressed
102 } else {
103 let entry_size = value.encode_size() + crc32::Digest::SIZE;
105 let mut buf = Vec::with_capacity(entry_size);
106 value.write(&mut buf);
107 let checksum = Crc32::checksum(&buf);
108 buf.put_u32(checksum);
109 buf
110 };
111
112 let entry_size = u32::try_from(buf.len()).map_err(|_| Error::ValueTooLarge)?;
114 let writer = self.manager.get_or_create(section).await?;
115 let offset = writer.size().await;
116 writer.write_at(buf, offset).await.map_err(Error::Runtime)?;
117
118 Ok((offset, entry_size))
119 }
120
121 pub async fn get(&self, section: u64, offset: u64, size: u32) -> Result<V, Error> {
126 let writer = self
127 .manager
128 .get(section)?
129 .ok_or(Error::SectionOutOfRange(section))?;
130
131 let size_usize = size as usize;
132
133 let buf = writer.read_at(vec![0u8; size_usize], offset).await?;
135 let buf = buf.as_ref();
136
137 if buf.len() < crc32::Digest::SIZE {
139 return Err(Error::Runtime(RError::BlobInsufficientLength));
140 }
141
142 let data_len = buf.len() - crc32::Digest::SIZE;
143 let compressed_data = &buf[..data_len];
144 let stored_checksum =
145 u32::from_be_bytes(buf[data_len..].try_into().expect("checksum is 4 bytes"));
146
147 let checksum = Crc32::checksum(compressed_data);
149 if checksum != stored_checksum {
150 return Err(Error::ChecksumMismatch(stored_checksum, checksum));
151 }
152
153 let value = if self.compression.is_some() {
155 let decompressed =
156 decode_all(Cursor::new(compressed_data)).map_err(|_| Error::DecompressionFailed)?;
157 V::decode_cfg(decompressed.as_ref(), &self.codec_config).map_err(Error::Codec)?
158 } else {
159 V::decode_cfg(compressed_data, &self.codec_config).map_err(Error::Codec)?
160 };
161
162 Ok(value)
163 }
164
165 pub async fn sync(&self, section: u64) -> Result<(), Error> {
167 self.manager.sync(section).await
168 }
169
170 pub async fn sync_all(&self) -> Result<(), Error> {
172 self.manager.sync_all().await
173 }
174
175 pub async fn size(&self, section: u64) -> Result<u64, Error> {
177 self.manager.size(section).await
178 }
179
180 pub async fn rewind(&mut self, section: u64, size: u64) -> Result<(), Error> {
184 self.manager.rewind(section, size).await
185 }
186
187 pub async fn rewind_section(&mut self, section: u64, size: u64) -> Result<(), Error> {
191 self.manager.rewind_section(section, size).await
192 }
193
194 pub async fn prune(&mut self, min: u64) -> Result<bool, Error> {
196 self.manager.prune(min).await
197 }
198
199 pub fn oldest_section(&self) -> Option<u64> {
201 self.manager.oldest_section()
202 }
203
204 pub fn newest_section(&self) -> Option<u64> {
206 self.manager.newest_section()
207 }
208
209 pub fn sections(&self) -> impl Iterator<Item = u64> + '_ {
211 self.manager.sections()
212 }
213
214 pub async fn remove_section(&mut self, section: u64) -> Result<bool, Error> {
216 self.manager.remove_section(section).await
217 }
218
219 pub async fn close(&mut self) -> Result<(), Error> {
221 self.sync_all().await
222 }
223
224 pub async fn destroy(self) -> Result<(), Error> {
226 self.manager.destroy().await
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use commonware_macros::test_traced;
234 use commonware_runtime::{deterministic, Runner};
235 use commonware_utils::NZUsize;
236
237 fn test_cfg() -> Config<()> {
238 Config {
239 partition: "test_partition".to_string(),
240 compression: None,
241 codec_config: (),
242 write_buffer: NZUsize!(1024),
243 }
244 }
245
246 #[test_traced]
247 fn test_glob_append_and_get() {
248 let executor = deterministic::Runner::default();
249 executor.start(|context| async move {
250 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
251 .await
252 .expect("Failed to init glob");
253
254 let value: i32 = 42;
256 let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
257 assert_eq!(offset, 0);
258
259 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
261 assert_eq!(retrieved, value);
262
263 glob.sync(1).await.expect("Failed to sync");
265 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
266 assert_eq!(retrieved, value);
267
268 glob.destroy().await.expect("Failed to destroy");
269 });
270 }
271
272 #[test_traced]
273 fn test_glob_multiple_values() {
274 let executor = deterministic::Runner::default();
275 executor.start(|context| async move {
276 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
277 .await
278 .expect("Failed to init glob");
279
280 let values: Vec<i32> = vec![1, 2, 3, 4, 5];
282 let mut locations = Vec::new();
283
284 for value in &values {
285 let (offset, size) = glob.append(1, value).await.expect("Failed to append");
286 locations.push((offset, size));
287 }
288
289 for (i, (offset, size)) in locations.iter().enumerate() {
291 let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
292 assert_eq!(retrieved, values[i]);
293 }
294
295 glob.destroy().await.expect("Failed to destroy");
296 });
297 }
298
299 #[test_traced]
300 fn test_glob_with_compression() {
301 let executor = deterministic::Runner::default();
302 executor.start(|context| async move {
303 let cfg = Config {
304 partition: "test_partition".to_string(),
305 compression: Some(3), codec_config: (),
307 write_buffer: NZUsize!(1024),
308 };
309 let mut glob: Glob<_, [u8; 100]> = Glob::init(context.clone(), cfg)
310 .await
311 .expect("Failed to init glob");
312
313 let value: [u8; 100] = [0u8; 100]; let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
316
317 assert!(size < 100 + 4);
319
320 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
322 assert_eq!(retrieved, value);
323
324 glob.destroy().await.expect("Failed to destroy");
325 });
326 }
327
328 #[test_traced]
329 fn test_glob_prune() {
330 let executor = deterministic::Runner::default();
331 executor.start(|context| async move {
332 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
333 .await
334 .expect("Failed to init glob");
335
336 for section in 1..=5 {
338 glob.append(section, &(section as i32))
339 .await
340 .expect("Failed to append");
341 glob.sync(section).await.expect("Failed to sync");
342 }
343
344 glob.prune(3).await.expect("Failed to prune");
346
347 assert!(glob.get(1, 0, 8).await.is_err());
349 assert!(glob.get(2, 0, 8).await.is_err());
350
351 assert!(glob.manager.blobs.contains_key(&3));
353 assert!(glob.manager.blobs.contains_key(&4));
354 assert!(glob.manager.blobs.contains_key(&5));
355
356 glob.destroy().await.expect("Failed to destroy");
357 });
358 }
359
360 #[test_traced]
361 fn test_glob_checksum_mismatch() {
362 let executor = deterministic::Runner::default();
363 executor.start(|context| async move {
364 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
365 .await
366 .expect("Failed to init glob");
367
368 let value: i32 = 42;
370 let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
371 glob.sync(1).await.expect("Failed to sync");
372
373 let writer = glob.manager.blobs.get(&1).unwrap();
375 writer
376 .write_at(vec![0xFF, 0xFF, 0xFF, 0xFF], offset)
377 .await
378 .expect("Failed to corrupt");
379 writer.sync().await.expect("Failed to sync");
380
381 let result = glob.get(1, offset, size).await;
383 assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
384
385 glob.destroy().await.expect("Failed to destroy");
386 });
387 }
388
389 #[test_traced]
390 fn test_glob_rewind() {
391 let executor = deterministic::Runner::default();
392 executor.start(|context| async move {
393 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
394 .await
395 .expect("Failed to init glob");
396
397 let values: Vec<i32> = vec![1, 2, 3, 4, 5];
399 let mut locations = Vec::new();
400
401 for value in &values {
402 let (offset, size) = glob.append(1, value).await.expect("Failed to append");
403 locations.push((offset, size));
404 }
405 glob.sync(1).await.expect("Failed to sync");
406
407 let (third_offset, third_size) = locations[2];
409 let rewind_size = third_offset + u64::from(third_size);
410 glob.rewind_section(1, rewind_size)
411 .await
412 .expect("Failed to rewind");
413
414 for (i, (offset, size)) in locations.iter().take(3).enumerate() {
416 let retrieved = glob.get(1, *offset, *size).await.expect("Failed to get");
417 assert_eq!(retrieved, values[i]);
418 }
419
420 let (fourth_offset, fourth_size) = locations[3];
422 let result = glob.get(1, fourth_offset, fourth_size).await;
423 assert!(result.is_err());
424
425 glob.destroy().await.expect("Failed to destroy");
426 });
427 }
428
429 #[test_traced]
430 fn test_glob_persistence() {
431 let executor = deterministic::Runner::default();
432 executor.start(|context| async move {
433 let cfg = test_cfg();
434
435 let mut glob: Glob<_, i32> = Glob::init(context.clone(), cfg.clone())
437 .await
438 .expect("Failed to init glob");
439
440 let value: i32 = 42;
441 let (offset, size) = glob.append(1, &value).await.expect("Failed to append");
442 glob.sync(1).await.expect("Failed to sync");
443 drop(glob);
444
445 let glob: Glob<_, i32> = Glob::init(context.clone(), cfg)
447 .await
448 .expect("Failed to reinit glob");
449
450 let retrieved = glob.get(1, offset, size).await.expect("Failed to get");
451 assert_eq!(retrieved, value);
452
453 glob.destroy().await.expect("Failed to destroy");
454 });
455 }
456
457 #[test_traced]
458 fn test_glob_get_invalid_size() {
459 let executor = deterministic::Runner::default();
460 executor.start(|context| async move {
461 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
462 .await
463 .expect("Failed to init glob");
464
465 let (offset, _size) = glob.append(1, &42).await.expect("Failed to append");
466 glob.sync(1).await.expect("Failed to sync");
467
468 assert!(glob.get(1, offset, 0).await.is_err());
470
471 for size in 1..4u32 {
473 let result = glob.get(1, offset, size).await;
474 assert!(matches!(
475 result,
476 Err(Error::Runtime(RError::BlobInsufficientLength))
477 ));
478 }
479
480 glob.destroy().await.expect("Failed to destroy");
481 });
482 }
483
484 #[test_traced]
485 fn test_glob_get_wrong_size() {
486 let executor = deterministic::Runner::default();
487 executor.start(|context| async move {
488 let mut glob: Glob<_, i32> = Glob::init(context.clone(), test_cfg())
489 .await
490 .expect("Failed to init glob");
491
492 let (offset, correct_size) = glob.append(1, &42).await.expect("Failed to append");
493 glob.sync(1).await.expect("Failed to sync");
494
495 let result = glob.get(1, offset, correct_size - 1).await;
497 assert!(matches!(result, Err(Error::ChecksumMismatch(_, _))));
498
499 glob.destroy().await.expect("Failed to destroy");
500 });
501 }
502}