1use alloc::{boxed::Box, format, string::String, sync::Arc, vec::Vec};
2use async_trait::async_trait;
3use core::{
4 fmt,
5 sync::atomic::{AtomicU64, Ordering},
6};
7
8use gibblox_core::{
9 BlockReader, BlockReaderConfigIdentity, GibbloxError, GibbloxErrorKind, GibbloxResult,
10 ReadContext,
11};
12use sha2::{Digest, Sha256, Sha512_256};
13use tracing::{debug, trace};
14
15use crate::index::{CasyncChunkId, CasyncIndex, CasyncIndexValidation};
16
17#[derive(Clone, Debug)]
18pub struct CasyncReaderConfig {
19 pub block_size: u32,
20 pub strict_verify: bool,
21 pub identity: Option<String>,
22}
23
24impl Default for CasyncReaderConfig {
25 fn default() -> Self {
26 Self {
27 block_size: 4096,
28 strict_verify: false,
29 identity: None,
30 }
31 }
32}
33
34impl CasyncReaderConfig {
35 pub fn with_identity(mut self, identity: impl Into<String>) -> Self {
36 self.identity = Some(identity.into());
37 self
38 }
39}
40
41impl BlockReaderConfigIdentity for CasyncReaderConfig {
42 fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
43 if let Some(identity) = self.identity.as_deref() {
44 return out.write_str(identity);
45 }
46 write!(
47 out,
48 "casync-config:block_size={}:strict_verify={}",
49 self.block_size, self.strict_verify
50 )
51 }
52}
53
54#[async_trait]
55pub trait CasyncIndexSource: Send + Sync {
56 async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>>;
57}
58
59#[async_trait]
60impl<T> CasyncIndexSource for Arc<T>
61where
62 T: CasyncIndexSource + ?Sized,
63{
64 async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>> {
65 (**self).load_index_bytes().await
66 }
67}
68
69#[async_trait]
70pub trait CasyncChunkStore: Send + Sync {
71 async fn load_chunk(&self, id: &CasyncChunkId, ctx: ReadContext) -> GibbloxResult<Vec<u8>>;
72}
73
74#[async_trait]
75impl<T> CasyncChunkStore for Arc<T>
76where
77 T: CasyncChunkStore + ?Sized,
78{
79 async fn load_chunk(&self, id: &CasyncChunkId, ctx: ReadContext) -> GibbloxResult<Vec<u8>> {
80 (**self).load_chunk(id, ctx).await
81 }
82}
83
84pub struct CasyncBlockReader<S> {
85 index: CasyncIndex,
86 chunk_store: S,
87 config: CasyncReaderConfig,
88 total_blocks: u64,
89 index_digest: CasyncChunkId,
90 identity: String,
91 chunks_verified: AtomicU64,
92}
93
94impl<S> CasyncBlockReader<S>
95where
96 S: CasyncChunkStore,
97{
98 pub async fn open<I>(
99 index_source: I,
100 chunk_store: S,
101 config: CasyncReaderConfig,
102 ) -> GibbloxResult<Self>
103 where
104 I: CasyncIndexSource,
105 {
106 validate_block_size(config.block_size)?;
107
108 let index_bytes = index_source.load_index_bytes().await?;
109 trace!(
110 index_bytes = index_bytes.len(),
111 strict_verify = config.strict_verify,
112 "loaded casync index bytes"
113 );
114
115 let index = CasyncIndex::parse(
116 &index_bytes,
117 CasyncIndexValidation {
118 strict: config.strict_verify,
119 },
120 )?;
121
122 let digest = digest_sha256(&index_bytes);
123 let index_digest = CasyncChunkId::from_bytes(digest);
124 let total_blocks = index.blob_size().div_ceil(config.block_size as u64);
125 let identity = config.identity.clone().unwrap_or_else(|| {
126 format!(
127 "casync:index={}:chunk_count={}:blob_size={}",
128 index_digest,
129 index.total_chunks(),
130 index.blob_size()
131 )
132 });
133
134 debug!(
135 index_bytes = index_bytes.len(),
136 blob_size = index.blob_size(),
137 total_chunks = index.total_chunks(),
138 total_blocks,
139 index_digest = %index_digest,
140 "casync index parsed"
141 );
142
143 Ok(Self {
144 index,
145 chunk_store,
146 config,
147 total_blocks,
148 index_digest,
149 identity,
150 chunks_verified: AtomicU64::new(0),
151 })
152 }
153
154 pub fn index(&self) -> &CasyncIndex {
155 &self.index
156 }
157
158 pub fn index_digest(&self) -> CasyncChunkId {
159 self.index_digest
160 }
161
162 pub fn chunks_verified(&self) -> u64 {
163 self.chunks_verified.load(Ordering::Relaxed)
164 }
165
166 fn blocks_from_len(&self, len: usize) -> GibbloxResult<u64> {
167 if len == 0 {
168 return Ok(0);
169 }
170 let block_size = self.config.block_size as usize;
171 if !len.is_multiple_of(block_size) {
172 return Err(GibbloxError::with_message(
173 GibbloxErrorKind::InvalidInput,
174 "buffer length must align to block size",
175 ));
176 }
177 Ok((len / block_size) as u64)
178 }
179
180 fn validate_read_range(&self, lba: u64, blocks: u64) -> GibbloxResult<()> {
181 let end = lba.checked_add(blocks).ok_or_else(|| {
182 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "lba overflow")
183 })?;
184 if end > self.total_blocks {
185 return Err(GibbloxError::with_message(
186 GibbloxErrorKind::OutOfRange,
187 "requested range exceeds image size",
188 ));
189 }
190 Ok(())
191 }
192
193 async fn load_chunk_for_read(
194 &self,
195 chunk_idx: usize,
196 ctx: ReadContext,
197 ) -> GibbloxResult<Vec<u8>> {
198 let Some(chunk_ref) = self.index.chunks().get(chunk_idx) else {
199 return Err(GibbloxError::with_message(
200 GibbloxErrorKind::OutOfRange,
201 "chunk index out of range",
202 ));
203 };
204
205 let chunk_data = self.chunk_store.load_chunk(chunk_ref.id(), ctx).await?;
206 self.verify_chunk_payload(chunk_idx, &chunk_data)?;
207 Ok(chunk_data)
208 }
209
210 fn verify_chunk_payload(&self, chunk_idx: usize, payload: &[u8]) -> GibbloxResult<()> {
211 let Some(chunk_ref) = self.index.chunks().get(chunk_idx) else {
212 return Err(GibbloxError::with_message(
213 GibbloxErrorKind::OutOfRange,
214 "chunk index out of range",
215 ));
216 };
217
218 let expected_len = self.index.chunk_len(chunk_idx).ok_or_else(|| {
219 GibbloxError::with_message(
220 GibbloxErrorKind::OutOfRange,
221 "chunk length unavailable for index entry",
222 )
223 })?;
224 if payload.len() as u64 != expected_len {
225 return Err(GibbloxError::with_message(
226 GibbloxErrorKind::Io,
227 format!(
228 "chunk length mismatch for {}: expected {}, got {}",
229 chunk_ref.id(),
230 expected_len,
231 payload.len()
232 ),
233 ));
234 }
235
236 let actual = if self.index.uses_sha512_256() {
237 CasyncChunkId::from_bytes(digest_sha512_256(payload))
238 } else {
239 CasyncChunkId::from_bytes(digest_sha256(payload))
240 };
241
242 if actual != *chunk_ref.id() {
243 return Err(GibbloxError::with_message(
244 GibbloxErrorKind::Io,
245 format!(
246 "chunk digest mismatch for {}: got {}",
247 chunk_ref.id(),
248 actual
249 ),
250 ));
251 }
252
253 let done = self.chunks_verified.fetch_add(1, Ordering::Relaxed) + 1;
254 trace!(
255 chunk_idx,
256 chunk = %chunk_ref.id(),
257 verified = done,
258 total_chunks = self.index.total_chunks(),
259 "chunk verified"
260 );
261
262 Ok(())
263 }
264}
265
266#[async_trait]
267impl<S> BlockReader for CasyncBlockReader<S>
268where
269 S: CasyncChunkStore,
270{
271 fn block_size(&self) -> u32 {
272 self.config.block_size
273 }
274
275 async fn total_blocks(&self) -> GibbloxResult<u64> {
276 Ok(self.total_blocks)
277 }
278
279 fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
280 out.write_str(&self.identity)
281 }
282
283 async fn read_blocks(
284 &self,
285 lba: u64,
286 buf: &mut [u8],
287 ctx: ReadContext,
288 ) -> GibbloxResult<usize> {
289 if buf.is_empty() {
290 return Ok(0);
291 }
292
293 let blocks = self.blocks_from_len(buf.len())?;
294 self.validate_read_range(lba, blocks)?;
295
296 let read_start = lba
297 .checked_mul(self.config.block_size as u64)
298 .ok_or_else(|| {
299 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read offset overflow")
300 })?;
301 if read_start >= self.index.blob_size() {
302 return Err(GibbloxError::with_message(
303 GibbloxErrorKind::OutOfRange,
304 "requested start offset is outside the image",
305 ));
306 }
307
308 let requested_end = read_start.checked_add(buf.len() as u64).ok_or_else(|| {
309 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read end overflow")
310 })?;
311 let read_end = requested_end.min(self.index.blob_size());
312
313 buf.fill(0);
314
315 let mut cursor = read_start;
316 let mut chunk_idx = self.index.chunk_for_offset(read_start).ok_or_else(|| {
317 GibbloxError::with_message(
318 GibbloxErrorKind::OutOfRange,
319 "could not map read offset to chunk index",
320 )
321 })?;
322
323 while cursor < read_end {
324 let (chunk_start, chunk_end) = self.index.chunk_bounds(chunk_idx).ok_or_else(|| {
325 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "chunk bounds unavailable")
326 })?;
327
328 let chunk_data = self.load_chunk_for_read(chunk_idx, ctx).await?;
329
330 let copy_start = cursor.max(chunk_start);
331 let copy_end = read_end.min(chunk_end);
332 if copy_end <= copy_start {
333 return Err(GibbloxError::with_message(
334 GibbloxErrorKind::Io,
335 "chunk bounds did not advance read cursor",
336 ));
337 }
338
339 let src_start = usize::try_from(copy_start - chunk_start).map_err(|_| {
340 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "source offset overflow")
341 })?;
342 let src_end = usize::try_from(copy_end - chunk_start).map_err(|_| {
343 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "source end overflow")
344 })?;
345 let dst_start = usize::try_from(copy_start - read_start).map_err(|_| {
346 GibbloxError::with_message(
347 GibbloxErrorKind::OutOfRange,
348 "destination offset overflow",
349 )
350 })?;
351 let dst_end = usize::try_from(copy_end - read_start).map_err(|_| {
352 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "destination end overflow")
353 })?;
354
355 buf[dst_start..dst_end].copy_from_slice(&chunk_data[src_start..src_end]);
356
357 cursor = copy_end;
358 chunk_idx += 1;
359 }
360
361 trace!(
362 lba,
363 blocks,
364 bytes = buf.len(),
365 effective_bytes = read_end - read_start,
366 "served casync read"
367 );
368
369 Ok(buf.len())
370 }
371}
372
373fn validate_block_size(block_size: u32) -> GibbloxResult<()> {
374 if block_size == 0 || !block_size.is_power_of_two() {
375 return Err(GibbloxError::with_message(
376 GibbloxErrorKind::InvalidInput,
377 "block size must be non-zero power of two",
378 ));
379 }
380 Ok(())
381}
382
383fn digest_sha256(bytes: &[u8]) -> [u8; 32] {
384 let digest = Sha256::digest(bytes);
385 let mut out = [0u8; 32];
386 out.copy_from_slice(&digest);
387 out
388}
389
390fn digest_sha512_256(bytes: &[u8]) -> [u8; 32] {
391 let digest = Sha512_256::digest(bytes);
392 let mut out = [0u8; 32];
393 out.copy_from_slice(&digest);
394 out
395}
396
397#[cfg(test)]
398mod tests {
399 extern crate std;
400
401 use super::{
402 CasyncBlockReader, CasyncChunkId, CasyncChunkStore, CasyncIndexSource, CasyncReaderConfig,
403 };
404 use alloc::{boxed::Box, collections::BTreeMap, format, sync::Arc, vec, vec::Vec};
405 use async_trait::async_trait;
406 use gibblox_core::{BlockReader, GibbloxError, GibbloxErrorKind, GibbloxResult, ReadContext};
407 use sha2::{Digest, Sha256};
408 use std::sync::{
409 Mutex,
410 atomic::{AtomicUsize, Ordering},
411 };
412
413 const CA_FORMAT_INDEX: u64 = 0x9682_4d9c_7b12_9ff9;
414 const CA_FORMAT_TABLE: u64 = 0xe75b_9e11_2f17_417d;
415 const CA_FORMAT_TABLE_TAIL_MARKER: u64 = 0x4b4f_050e_5549_ecd1;
416
417 const INDEX_HEADER_SIZE: usize = 48;
418 const TABLE_HEADER_SIZE: usize = 16;
419 const TABLE_ITEM_SIZE: usize = 40;
420 const TABLE_TAIL_SIZE: usize = 40;
421
422 struct StaticIndexSource {
423 bytes: Vec<u8>,
424 }
425
426 #[async_trait]
427 impl CasyncIndexSource for StaticIndexSource {
428 async fn load_index_bytes(&self) -> GibbloxResult<Vec<u8>> {
429 Ok(self.bytes.clone())
430 }
431 }
432
433 struct MemoryChunkStore {
434 source: BTreeMap<CasyncChunkId, Vec<u8>>,
435 cache: Mutex<BTreeMap<CasyncChunkId, Vec<u8>>>,
436 offline: bool,
437 fetch_calls: AtomicUsize,
438 }
439
440 impl MemoryChunkStore {
441 fn new(source: BTreeMap<CasyncChunkId, Vec<u8>>, offline: bool) -> Self {
442 Self {
443 source,
444 cache: Mutex::new(BTreeMap::new()),
445 offline,
446 fetch_calls: AtomicUsize::new(0),
447 }
448 }
449
450 fn with_seed(
451 source: BTreeMap<CasyncChunkId, Vec<u8>>,
452 seeded: BTreeMap<CasyncChunkId, Vec<u8>>,
453 offline: bool,
454 ) -> Self {
455 Self {
456 source,
457 cache: Mutex::new(seeded),
458 offline,
459 fetch_calls: AtomicUsize::new(0),
460 }
461 }
462
463 fn fetch_calls(&self) -> usize {
464 self.fetch_calls.load(Ordering::SeqCst)
465 }
466 }
467
468 #[async_trait]
469 impl CasyncChunkStore for MemoryChunkStore {
470 async fn load_chunk(
471 &self,
472 id: &CasyncChunkId,
473 _ctx: ReadContext,
474 ) -> GibbloxResult<Vec<u8>> {
475 if let Some(hit) = self
476 .cache
477 .lock()
478 .map_err(|_| GibbloxError::with_message(GibbloxErrorKind::Io, "cache poisoned"))?
479 .get(id)
480 .cloned()
481 {
482 return Ok(hit);
483 }
484
485 if self.offline {
486 return Err(GibbloxError::with_message(
487 GibbloxErrorKind::Io,
488 format!("offline and chunk not cached: {id}"),
489 ));
490 }
491
492 let Some(payload) = self.source.get(id).cloned() else {
493 return Err(GibbloxError::with_message(
494 GibbloxErrorKind::Io,
495 format!("missing source chunk: {id}"),
496 ));
497 };
498
499 self.fetch_calls.fetch_add(1, Ordering::SeqCst);
500 self.cache
501 .lock()
502 .map_err(|_| GibbloxError::with_message(GibbloxErrorKind::Io, "cache poisoned"))?
503 .insert(*id, payload.clone());
504 Ok(payload)
505 }
506 }
507
508 #[tokio::test]
509 async fn reconstructs_bytes_and_zero_pads_final_block() {
510 let chunks = vec![b"abcd".to_vec(), b"ef".to_vec()];
511 let (index_bytes, chunk_map, _) = build_index_and_chunks(&chunks);
512
513 let reader = CasyncBlockReader::open(
514 StaticIndexSource { bytes: index_bytes },
515 MemoryChunkStore::new(chunk_map, false),
516 CasyncReaderConfig {
517 block_size: 4,
518 strict_verify: true,
519 identity: None,
520 },
521 )
522 .await
523 .expect("open reader");
524
525 assert_eq!(reader.total_blocks().await.expect("total blocks"), 2);
526
527 let mut buf = vec![0u8; 8];
528 reader
529 .read_blocks(0, &mut buf, ReadContext::FOREGROUND)
530 .await
531 .expect("read blocks");
532 assert_eq!(&buf, b"abcdef\0\0");
533 }
534
535 #[tokio::test]
536 async fn chunk_store_can_cache_internally() {
537 let chunks = vec![b"aaaa".to_vec(), b"bbbb".to_vec()];
538 let (index_bytes, chunk_map, _) = build_index_and_chunks(&chunks);
539
540 let chunk_store = Arc::new(MemoryChunkStore::new(chunk_map, false));
541 let reader = CasyncBlockReader::open(
542 StaticIndexSource { bytes: index_bytes },
543 Arc::clone(&chunk_store),
544 CasyncReaderConfig {
545 block_size: 4,
546 strict_verify: true,
547 identity: None,
548 },
549 )
550 .await
551 .expect("open reader");
552
553 let mut first = vec![0u8; 8];
554 reader
555 .read_blocks(0, &mut first, ReadContext::FOREGROUND)
556 .await
557 .expect("first read");
558
559 let mut second = vec![0u8; 8];
560 reader
561 .read_blocks(0, &mut second, ReadContext::FOREGROUND)
562 .await
563 .expect("second read");
564
565 assert_eq!(&first, b"aaaabbbb");
566 assert_eq!(&second, b"aaaabbbb");
567 assert_eq!(chunk_store.fetch_calls(), 2);
568 }
569
570 #[tokio::test]
571 async fn offline_mode_errors_on_missing_chunk() {
572 let chunks = vec![b"abcd".to_vec()];
573 let (index_bytes, chunk_map, _) = build_index_and_chunks(&chunks);
574
575 let reader = CasyncBlockReader::open(
576 StaticIndexSource { bytes: index_bytes },
577 MemoryChunkStore::new(chunk_map, true),
578 CasyncReaderConfig {
579 block_size: 4,
580 strict_verify: true,
581 identity: None,
582 },
583 )
584 .await
585 .expect("open reader");
586
587 let mut buf = vec![0u8; 4];
588 let err = reader
589 .read_blocks(0, &mut buf, ReadContext::FOREGROUND)
590 .await
591 .expect_err("offline miss should fail");
592 assert_eq!(err.kind(), GibbloxErrorKind::Io);
593 }
594
595 #[tokio::test]
596 async fn corrupt_cached_chunk_fails_verification() {
597 let chunks = vec![b"abcd".to_vec()];
598 let (index_bytes, chunk_map, chunk_ids) = build_index_and_chunks(&chunks);
599 let mut seeded = BTreeMap::new();
600 seeded.insert(chunk_ids[0], b"zzzz".to_vec());
601
602 let reader = CasyncBlockReader::open(
603 StaticIndexSource { bytes: index_bytes },
604 MemoryChunkStore::with_seed(chunk_map, seeded, false),
605 CasyncReaderConfig {
606 block_size: 4,
607 strict_verify: true,
608 identity: None,
609 },
610 )
611 .await
612 .expect("open reader");
613
614 let mut buf = vec![0u8; 4];
615 let err = reader
616 .read_blocks(0, &mut buf, ReadContext::FOREGROUND)
617 .await
618 .expect_err("corrupt cached payload should fail");
619 assert_eq!(err.kind(), GibbloxErrorKind::Io);
620 }
621
622 fn build_index_and_chunks(
623 chunks: &[Vec<u8>],
624 ) -> (
625 Vec<u8>,
626 BTreeMap<CasyncChunkId, Vec<u8>>,
627 Vec<CasyncChunkId>,
628 ) {
629 let mut out = Vec::new();
630 let mut chunk_map = BTreeMap::new();
631 let mut chunk_ids = Vec::new();
632
633 out.extend_from_slice(&(INDEX_HEADER_SIZE as u64).to_le_bytes());
634 out.extend_from_slice(&CA_FORMAT_INDEX.to_le_bytes());
635 out.extend_from_slice(&0u64.to_le_bytes());
636 out.extend_from_slice(&1u64.to_le_bytes());
637 out.extend_from_slice(&4096u64.to_le_bytes());
638 out.extend_from_slice(&(128 * 1024 * 1024u64).to_le_bytes());
639
640 out.extend_from_slice(&u64::MAX.to_le_bytes());
641 out.extend_from_slice(&CA_FORMAT_TABLE.to_le_bytes());
642
643 let mut end = 0u64;
644 for chunk in chunks {
645 let digest = Sha256::digest(chunk);
646 let mut digest_arr = [0u8; 32];
647 digest_arr.copy_from_slice(&digest);
648 let chunk_id = CasyncChunkId::from_bytes(digest_arr);
649
650 end += chunk.len() as u64;
651 out.extend_from_slice(&end.to_le_bytes());
652 out.extend_from_slice(&digest_arr);
653
654 chunk_map.insert(chunk_id, chunk.clone());
655 chunk_ids.push(chunk_id);
656 }
657
658 let table_size =
659 (TABLE_HEADER_SIZE + (chunks.len() * TABLE_ITEM_SIZE) + TABLE_TAIL_SIZE) as u64;
660 out.extend_from_slice(&0u64.to_le_bytes());
661 out.extend_from_slice(&0u64.to_le_bytes());
662 out.extend_from_slice(&(INDEX_HEADER_SIZE as u64).to_le_bytes());
663 out.extend_from_slice(&table_size.to_le_bytes());
664 out.extend_from_slice(&CA_FORMAT_TABLE_TAIL_MARKER.to_le_bytes());
665
666 (out, chunk_map, chunk_ids)
667 }
668}