Skip to main content

gibblox_casync/
reader.rs

1use alloc::{boxed::Box, format, string::String, sync::Arc, vec::Vec};
2use async_trait::async_trait;
3use core::{
4    fmt,
5    sync::atomic::{AtomicU64, Ordering},
6};
7
8use gibblox_core::{
9    BlockReader, BlockReaderConfigIdentity, GibbloxError, GibbloxErrorKind, GibbloxResult,
10    ReadContext,
11};
12use sha2::{Digest, Sha256, Sha512_256};
13use tracing::{debug, trace};
14
15use crate::index::{CasyncChunkId, CasyncIndex, CasyncIndexValidation};
16
17#[derive(Clone, Debug)]
18pub struct CasyncReaderConfig {
19    pub block_size: u32,
20    pub strict_verify: bool,
21    pub identity: Option<String>,
22}
23
24impl Default for CasyncReaderConfig {
25    fn default() -> Self {
26        Self {
27            block_size: 4096,
28            strict_verify: false,
29            identity: None,
30        }
31    }
32}
33
34impl CasyncReaderConfig {
35    pub fn with_identity(mut self, identity: impl Into<String>) -> Self {
36        self.identity = Some(identity.into());
37        self
38    }
39}
40
41impl BlockReaderConfigIdentity for CasyncReaderConfig {
42    fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
43        if let Some(identity) = self.identity.as_deref() {
44            return out.write_str(identity);
45        }
46        write!(
47            out,
48            "casync-config:block_size={}:strict_verify={}",
49            self.block_size, self.strict_verify
50        )
51    }
52}
53
54#[async_trait]
55pub trait CasyncIndexSource: Send + Sync {
56    async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>>;
57}
58
59#[async_trait]
60impl<T> CasyncIndexSource for Arc<T>
61where
62    T: CasyncIndexSource + ?Sized,
63{
64    async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>> {
65        (**self).load_index_bytes().await
66    }
67}
68
69#[async_trait]
70pub trait CasyncChunkStore: Send + Sync {
71    async fn load_chunk(&self, id: &CasyncChunkId, ctx: ReadContext) -> GibbloxResult<Vec<u8>>;
72}
73
74#[async_trait]
75impl<T> CasyncChunkStore for Arc<T>
76where
77    T: CasyncChunkStore + ?Sized,
78{
79    async fn load_chunk(&self, id: &CasyncChunkId, ctx: ReadContext) -> GibbloxResult<Vec<u8>> {
80        (**self).load_chunk(id, ctx).await
81    }
82}
83
84pub struct CasyncBlockReader<S> {
85    index: CasyncIndex,
86    chunk_store: S,
87    config: CasyncReaderConfig,
88    total_blocks: u64,
89    index_digest: CasyncChunkId,
90    identity: String,
91    chunks_verified: AtomicU64,
92}
93
94impl<S> CasyncBlockReader<S>
95where
96    S: CasyncChunkStore,
97{
98    pub async fn open<I>(
99        index_source: I,
100        chunk_store: S,
101        config: CasyncReaderConfig,
102    ) -> GibbloxResult<Self>
103    where
104        I: CasyncIndexSource,
105    {
106        validate_block_size(config.block_size)?;
107
108        let index_bytes = index_source.load_index_bytes().await?;
109        trace!(
110            index_bytes = index_bytes.len(),
111            strict_verify = config.strict_verify,
112            "loaded casync index bytes"
113        );
114
115        let index = CasyncIndex::parse(
116            &index_bytes,
117            CasyncIndexValidation {
118                strict: config.strict_verify,
119            },
120        )?;
121
122        let digest = digest_sha256(&index_bytes);
123        let index_digest = CasyncChunkId::from_bytes(digest);
124        let total_blocks = index.blob_size().div_ceil(config.block_size as u64);
125        let identity = config.identity.clone().unwrap_or_else(|| {
126            format!(
127                "casync:index={}:chunk_count={}:blob_size={}",
128                index_digest,
129                index.total_chunks(),
130                index.blob_size()
131            )
132        });
133
134        debug!(
135            index_bytes = index_bytes.len(),
136            blob_size = index.blob_size(),
137            total_chunks = index.total_chunks(),
138            total_blocks,
139            index_digest = %index_digest,
140            "casync index parsed"
141        );
142
143        Ok(Self {
144            index,
145            chunk_store,
146            config,
147            total_blocks,
148            index_digest,
149            identity,
150            chunks_verified: AtomicU64::new(0),
151        })
152    }
153
154    pub fn index(&self) -> &CasyncIndex {
155        &self.index
156    }
157
158    pub fn index_digest(&self) -> CasyncChunkId {
159        self.index_digest
160    }
161
162    pub fn chunks_verified(&self) -> u64 {
163        self.chunks_verified.load(Ordering::Relaxed)
164    }
165
166    fn blocks_from_len(&self, len: usize) -> GibbloxResult<u64> {
167        if len == 0 {
168            return Ok(0);
169        }
170        let block_size = self.config.block_size as usize;
171        if !len.is_multiple_of(block_size) {
172            return Err(GibbloxError::with_message(
173                GibbloxErrorKind::InvalidInput,
174                "buffer length must align to block size",
175            ));
176        }
177        Ok((len / block_size) as u64)
178    }
179
180    fn validate_read_range(&self, lba: u64, blocks: u64) -> GibbloxResult<()> {
181        let end = lba.checked_add(blocks).ok_or_else(|| {
182            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "lba overflow")
183        })?;
184        if end > self.total_blocks {
185            return Err(GibbloxError::with_message(
186                GibbloxErrorKind::OutOfRange,
187                "requested range exceeds image size",
188            ));
189        }
190        Ok(())
191    }
192
193    async fn load_chunk_for_read(
194        &self,
195        chunk_idx: usize,
196        ctx: ReadContext,
197    ) -> GibbloxResult<Vec<u8>> {
198        let Some(chunk_ref) = self.index.chunks().get(chunk_idx) else {
199            return Err(GibbloxError::with_message(
200                GibbloxErrorKind::OutOfRange,
201                "chunk index out of range",
202            ));
203        };
204
205        let chunk_data = self.chunk_store.load_chunk(chunk_ref.id(), ctx).await?;
206        self.verify_chunk_payload(chunk_idx, &chunk_data)?;
207        Ok(chunk_data)
208    }
209
210    fn verify_chunk_payload(&self, chunk_idx: usize, payload: &[u8]) -> GibbloxResult<()> {
211        let Some(chunk_ref) = self.index.chunks().get(chunk_idx) else {
212            return Err(GibbloxError::with_message(
213                GibbloxErrorKind::OutOfRange,
214                "chunk index out of range",
215            ));
216        };
217
218        let expected_len = self.index.chunk_len(chunk_idx).ok_or_else(|| {
219            GibbloxError::with_message(
220                GibbloxErrorKind::OutOfRange,
221                "chunk length unavailable for index entry",
222            )
223        })?;
224        if payload.len() as u64 != expected_len {
225            return Err(GibbloxError::with_message(
226                GibbloxErrorKind::Io,
227                format!(
228                    "chunk length mismatch for {}: expected {}, got {}",
229                    chunk_ref.id(),
230                    expected_len,
231                    payload.len()
232                ),
233            ));
234        }
235
236        let actual = if self.index.uses_sha512_256() {
237            CasyncChunkId::from_bytes(digest_sha512_256(payload))
238        } else {
239            CasyncChunkId::from_bytes(digest_sha256(payload))
240        };
241
242        if actual != *chunk_ref.id() {
243            return Err(GibbloxError::with_message(
244                GibbloxErrorKind::Io,
245                format!(
246                    "chunk digest mismatch for {}: got {}",
247                    chunk_ref.id(),
248                    actual
249                ),
250            ));
251        }
252
253        let done = self.chunks_verified.fetch_add(1, Ordering::Relaxed) + 1;
254        trace!(
255            chunk_idx,
256            chunk = %chunk_ref.id(),
257            verified = done,
258            total_chunks = self.index.total_chunks(),
259            "chunk verified"
260        );
261
262        Ok(())
263    }
264}
265
266#[async_trait]
267impl<S> BlockReader for CasyncBlockReader<S>
268where
269    S: CasyncChunkStore,
270{
271    fn block_size(&self) -> u32 {
272        self.config.block_size
273    }
274
275    async fn total_blocks(&self) -> GibbloxResult<u64> {
276        Ok(self.total_blocks)
277    }
278
279    fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
280        out.write_str(&self.identity)
281    }
282
283    async fn read_blocks(
284        &self,
285        lba: u64,
286        buf: &mut [u8],
287        ctx: ReadContext,
288    ) -> GibbloxResult<usize> {
289        if buf.is_empty() {
290            return Ok(0);
291        }
292
293        let blocks = self.blocks_from_len(buf.len())?;
294        self.validate_read_range(lba, blocks)?;
295
296        let read_start = lba
297            .checked_mul(self.config.block_size as u64)
298            .ok_or_else(|| {
299                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read offset overflow")
300            })?;
301        if read_start >= self.index.blob_size() {
302            return Err(GibbloxError::with_message(
303                GibbloxErrorKind::OutOfRange,
304                "requested start offset is outside the image",
305            ));
306        }
307
308        let requested_end = read_start.checked_add(buf.len() as u64).ok_or_else(|| {
309            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read end overflow")
310        })?;
311        let read_end = requested_end.min(self.index.blob_size());
312
313        buf.fill(0);
314
315        let mut cursor = read_start;
316        let mut chunk_idx = self.index.chunk_for_offset(read_start).ok_or_else(|| {
317            GibbloxError::with_message(
318                GibbloxErrorKind::OutOfRange,
319                "could not map read offset to chunk index",
320            )
321        })?;
322
323        while cursor < read_end {
324            let (chunk_start, chunk_end) = self.index.chunk_bounds(chunk_idx).ok_or_else(|| {
325                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "chunk bounds unavailable")
326            })?;
327
328            let chunk_data = self.load_chunk_for_read(chunk_idx, ctx).await?;
329
330            let copy_start = cursor.max(chunk_start);
331            let copy_end = read_end.min(chunk_end);
332            if copy_end <= copy_start {
333                return Err(GibbloxError::with_message(
334                    GibbloxErrorKind::Io,
335                    "chunk bounds did not advance read cursor",
336                ));
337            }
338
339            let src_start = usize::try_from(copy_start - chunk_start).map_err(|_| {
340                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "source offset overflow")
341            })?;
342            let src_end = usize::try_from(copy_end - chunk_start).map_err(|_| {
343                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "source end overflow")
344            })?;
345            let dst_start = usize::try_from(copy_start - read_start).map_err(|_| {
346                GibbloxError::with_message(
347                    GibbloxErrorKind::OutOfRange,
348                    "destination offset overflow",
349                )
350            })?;
351            let dst_end = usize::try_from(copy_end - read_start).map_err(|_| {
352                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "destination end overflow")
353            })?;
354
355            buf[dst_start..dst_end].copy_from_slice(&chunk_data[src_start..src_end]);
356
357            cursor = copy_end;
358            chunk_idx += 1;
359        }
360
361        trace!(
362            lba,
363            blocks,
364            bytes = buf.len(),
365            effective_bytes = read_end - read_start,
366            "served casync read"
367        );
368
369        Ok(buf.len())
370    }
371}
372
373fn validate_block_size(block_size: u32) -> GibbloxResult<()> {
374    if block_size == 0 || !block_size.is_power_of_two() {
375        return Err(GibbloxError::with_message(
376            GibbloxErrorKind::InvalidInput,
377            "block size must be non-zero power of two",
378        ));
379    }
380    Ok(())
381}
382
383fn digest_sha256(bytes: &[u8]) -> [u8; 32] {
384    let digest = Sha256::digest(bytes);
385    let mut out = [0u8; 32];
386    out.copy_from_slice(&digest);
387    out
388}
389
390fn digest_sha512_256(bytes: &[u8]) -> [u8; 32] {
391    let digest = Sha512_256::digest(bytes);
392    let mut out = [0u8; 32];
393    out.copy_from_slice(&digest);
394    out
395}
396
397#[cfg(test)]
398mod tests {
399    extern crate std;
400
401    use super::{
402        CasyncBlockReader, CasyncChunkId, CasyncChunkStore, CasyncIndexSource, CasyncReaderConfig,
403    };
404    use alloc::{boxed::Box, collections::BTreeMap, format, sync::Arc, vec, vec::Vec};
405    use async_trait::async_trait;
406    use gibblox_core::{BlockReader, GibbloxError, GibbloxErrorKind, GibbloxResult, ReadContext};
407    use sha2::{Digest, Sha256};
408    use std::sync::{
409        Mutex,
410        atomic::{AtomicUsize, Ordering},
411    };
412
413    const CA_FORMAT_INDEX: u64 = 0x9682_4d9c_7b12_9ff9;
414    const CA_FORMAT_TABLE: u64 = 0xe75b_9e11_2f17_417d;
415    const CA_FORMAT_TABLE_TAIL_MARKER: u64 = 0x4b4f_050e_5549_ecd1;
416
417    const INDEX_HEADER_SIZE: usize = 48;
418    const TABLE_HEADER_SIZE: usize = 16;
419    const TABLE_ITEM_SIZE: usize = 40;
420    const TABLE_TAIL_SIZE: usize = 40;
421
422    struct StaticIndexSource {
423        bytes: Vec<u8>,
424    }
425
426    #[async_trait]
427    impl CasyncIndexSource for StaticIndexSource {
428        async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>> {
429            Ok(self.bytes.clone())
430        }
431    }
432
433    struct MemoryChunkStore {
434        source: BTreeMap<CasyncChunkId, Vec<u8>>,
435        cache: Mutex<BTreeMap<CasyncChunkId, Vec<u8>>>,
436        offline: bool,
437        fetch_calls: AtomicUsize,
438    }
439
440    impl MemoryChunkStore {
441        fn new(source: BTreeMap<CasyncChunkId, Vec<u8>>, offline: bool) -> Self {
442            Self {
443                source,
444                cache: Mutex::new(BTreeMap::new()),
445                offline,
446                fetch_calls: AtomicUsize::new(0),
447            }
448        }
449
450        fn with_seed(
451            source: BTreeMap<CasyncChunkId, Vec<u8>>,
452            seeded: BTreeMap<CasyncChunkId, Vec<u8>>,
453            offline: bool,
454        ) -> Self {
455            Self {
456                source,
457                cache: Mutex::new(seeded),
458                offline,
459                fetch_calls: AtomicUsize::new(0),
460            }
461        }
462
463        fn fetch_calls(&self) -> usize {
464            self.fetch_calls.load(Ordering::SeqCst)
465        }
466    }
467
468    #[async_trait]
469    impl CasyncChunkStore for MemoryChunkStore {
470        async fn load_chunk(
471            &self,
472            id: &CasyncChunkId,
473            _ctx: ReadContext,
474        ) -> GibbloxResult<Vec<u8>> {
475            if let Some(hit) = self
476                .cache
477                .lock()
478                .map_err(|_| GibbloxError::with_message(GibbloxErrorKind::Io, "cache poisoned"))?
479                .get(id)
480                .cloned()
481            {
482                return Ok(hit);
483            }
484
485            if self.offline {
486                return Err(GibbloxError::with_message(
487                    GibbloxErrorKind::Io,
488                    format!("offline and chunk not cached: {id}"),
489                ));
490            }
491
492            let Some(payload) = self.source.get(id).cloned() else {
493                return Err(GibbloxError::with_message(
494                    GibbloxErrorKind::Io,
495                    format!("missing source chunk: {id}"),
496                ));
497            };
498
499            self.fetch_calls.fetch_add(1, Ordering::SeqCst);
500            self.cache
501                .lock()
502                .map_err(|_| GibbloxError::with_message(GibbloxErrorKind::Io, "cache poisoned"))?
503                .insert(*id, payload.clone());
504            Ok(payload)
505        }
506    }
507
508    #[tokio::test]
509    async fn reconstructs_bytes_and_zero_pads_final_block() {
510        let chunks = vec![b"abcd".to_vec(), b"ef".to_vec()];
511        let (index_bytes, chunk_map, _) = build_index_and_chunks(&chunks);
512
513        let reader = CasyncBlockReader::open(
514            StaticIndexSource { bytes: index_bytes },
515            MemoryChunkStore::new(chunk_map, false),
516            CasyncReaderConfig {
517                block_size: 4,
518                strict_verify: true,
519                identity: None,
520            },
521        )
522        .await
523        .expect("open reader");
524
525        assert_eq!(reader.total_blocks().await.expect("total blocks"), 2);
526
527        let mut buf = vec![0u8; 8];
528        reader
529            .read_blocks(0, &mut buf, ReadContext::FOREGROUND)
530            .await
531            .expect("read blocks");
532        assert_eq!(&buf, b"abcdef\0\0");
533    }
534
535    #[tokio::test]
536    async fn chunk_store_can_cache_internally() {
537        let chunks = vec![b"aaaa".to_vec(), b"bbbb".to_vec()];
538        let (index_bytes, chunk_map, _) = build_index_and_chunks(&chunks);
539
540        let chunk_store = Arc::new(MemoryChunkStore::new(chunk_map, false));
541        let reader = CasyncBlockReader::open(
542            StaticIndexSource { bytes: index_bytes },
543            Arc::clone(&chunk_store),
544            CasyncReaderConfig {
545                block_size: 4,
546                strict_verify: true,
547                identity: None,
548            },
549        )
550        .await
551        .expect("open reader");
552
553        let mut first = vec![0u8; 8];
554        reader
555            .read_blocks(0, &mut first, ReadContext::FOREGROUND)
556            .await
557            .expect("first read");
558
559        let mut second = vec![0u8; 8];
560        reader
561            .read_blocks(0, &mut second, ReadContext::FOREGROUND)
562            .await
563            .expect("second read");
564
565        assert_eq!(&first, b"aaaabbbb");
566        assert_eq!(&second, b"aaaabbbb");
567        assert_eq!(chunk_store.fetch_calls(), 2);
568    }
569
570    #[tokio::test]
571    async fn offline_mode_errors_on_missing_chunk() {
572        let chunks = vec![b"abcd".to_vec()];
573        let (index_bytes, chunk_map, _) = build_index_and_chunks(&chunks);
574
575        let reader = CasyncBlockReader::open(
576            StaticIndexSource { bytes: index_bytes },
577            MemoryChunkStore::new(chunk_map, true),
578            CasyncReaderConfig {
579                block_size: 4,
580                strict_verify: true,
581                identity: None,
582            },
583        )
584        .await
585        .expect("open reader");
586
587        let mut buf = vec![0u8; 4];
588        let err = reader
589            .read_blocks(0, &mut buf, ReadContext::FOREGROUND)
590            .await
591            .expect_err("offline miss should fail");
592        assert_eq!(err.kind(), GibbloxErrorKind::Io);
593    }
594
595    #[tokio::test]
596    async fn corrupt_cached_chunk_fails_verification() {
597        let chunks = vec![b"abcd".to_vec()];
598        let (index_bytes, chunk_map, chunk_ids) = build_index_and_chunks(&chunks);
599        let mut seeded = BTreeMap::new();
600        seeded.insert(chunk_ids[0], b"zzzz".to_vec());
601
602        let reader = CasyncBlockReader::open(
603            StaticIndexSource { bytes: index_bytes },
604            MemoryChunkStore::with_seed(chunk_map, seeded, false),
605            CasyncReaderConfig {
606                block_size: 4,
607                strict_verify: true,
608                identity: None,
609            },
610        )
611        .await
612        .expect("open reader");
613
614        let mut buf = vec![0u8; 4];
615        let err = reader
616            .read_blocks(0, &mut buf, ReadContext::FOREGROUND)
617            .await
618            .expect_err("corrupt cached payload should fail");
619        assert_eq!(err.kind(), GibbloxErrorKind::Io);
620    }
621
622    fn build_index_and_chunks(
623        chunks: &[Vec<u8>],
624    ) -> (
625        Vec<u8>,
626        BTreeMap<CasyncChunkId, Vec<u8>>,
627        Vec<CasyncChunkId>,
628    ) {
629        let mut out = Vec::new();
630        let mut chunk_map = BTreeMap::new();
631        let mut chunk_ids = Vec::new();
632
633        out.extend_from_slice(&(INDEX_HEADER_SIZE as u64).to_le_bytes());
634        out.extend_from_slice(&CA_FORMAT_INDEX.to_le_bytes());
635        out.extend_from_slice(&0u64.to_le_bytes());
636        out.extend_from_slice(&1u64.to_le_bytes());
637        out.extend_from_slice(&4096u64.to_le_bytes());
638        out.extend_from_slice(&(128 * 1024 * 1024u64).to_le_bytes());
639
640        out.extend_from_slice(&u64::MAX.to_le_bytes());
641        out.extend_from_slice(&CA_FORMAT_TABLE.to_le_bytes());
642
643        let mut end = 0u64;
644        for chunk in chunks {
645            let digest = Sha256::digest(chunk);
646            let mut digest_arr = [0u8; 32];
647            digest_arr.copy_from_slice(&digest);
648            let chunk_id = CasyncChunkId::from_bytes(digest_arr);
649
650            end += chunk.len() as u64;
651            out.extend_from_slice(&end.to_le_bytes());
652            out.extend_from_slice(&digest_arr);
653
654            chunk_map.insert(chunk_id, chunk.clone());
655            chunk_ids.push(chunk_id);
656        }
657
658        let table_size =
659            (TABLE_HEADER_SIZE + (chunks.len() * TABLE_ITEM_SIZE) + TABLE_TAIL_SIZE) as u64;
660        out.extend_from_slice(&0u64.to_le_bytes());
661        out.extend_from_slice(&0u64.to_le_bytes());
662        out.extend_from_slice(&(INDEX_HEADER_SIZE as u64).to_le_bytes());
663        out.extend_from_slice(&table_size.to_le_bytes());
664        out.extend_from_slice(&CA_FORMAT_TABLE_TAIL_MARKER.to_le_bytes());
665
666        (out, chunk_map, chunk_ids)
667    }
668}