Skip to main content

gibblox_cache/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4
5use alloc::{boxed::Box, collections::BTreeMap, format, string::String, sync::Arc, vec, vec::Vec};
6use async_trait::async_trait;
7use core::fmt;
8use futures_channel::oneshot;
9use gibblox_core::{
10    BlockReader, BlockReaderConfigIdentity, GibbloxError, GibbloxErrorKind, GibbloxResult,
11    ReadContext, derive_block_identity_id, derive_config_identity_id,
12};
13use tracing::{debug, trace};
14
15const CACHE_MAGIC: [u8; 7] = *b"GIBBLX!";
16const CACHE_VERSION: u8 = 2;
17const CACHE_PREFIX_LEN: usize = 28;
18const DEFAULT_FLUSH_BLOCKS: u32 = 64;
19const ZERO_CHUNK_LEN: usize = 4096;
20
21/// Snapshot of cache behavior and population.
22#[derive(Clone, Copy, Debug)]
23#[cfg_attr(target_arch = "wasm32", derive(serde::Serialize))]
24pub struct CacheStats {
25    pub total_hits: u64,
26    pub total_misses: u64,
27    pub cached_blocks: u64,
28    pub total_blocks: u64,
29}
30
31impl CacheStats {
32    pub fn hit_rate(&self) -> f64 {
33        let total = self.total_hits + self.total_misses;
34        if total == 0 {
35            return 0.0;
36        }
37        (self.total_hits as f64 / total as f64) * 100.0
38    }
39
40    pub fn fill_rate(&self) -> f64 {
41        if self.total_blocks == 0 {
42            return 0.0;
43        }
44        (self.cached_blocks as f64 / self.total_blocks as f64) * 100.0
45    }
46}
47
48/// Backend I/O abstraction for a single cache file.
49///
50/// Implementations are expected to be internally synchronized; the cache wrapper may call these
51/// methods concurrently from multiple tasks.
52#[async_trait]
53pub trait CacheOps: Send + Sync {
54    /// Read bytes at a fixed offset. Returns the number of bytes read.
55    async fn read_at(&self, offset: u64, out: &mut [u8]) -> GibbloxResult<usize>;
56
57    /// Write all bytes at a fixed offset.
58    async fn write_at(&self, offset: u64, data: &[u8]) -> GibbloxResult<()>;
59
60    /// Resize the underlying cache file.
61    async fn set_len(&self, len: u64) -> GibbloxResult<()>;
62
63    /// Persist pending data and metadata changes.
64    async fn flush(&self) -> GibbloxResult<()>;
65}
66
67#[async_trait]
68impl<T> CacheOps for Arc<T>
69where
70    T: CacheOps + ?Sized,
71{
72    async fn read_at(&self, offset: u64, out: &mut [u8]) -> GibbloxResult<usize> {
73        (**self).read_at(offset, out).await
74    }
75
76    async fn write_at(&self, offset: u64, data: &[u8]) -> GibbloxResult<()> {
77        (**self).write_at(offset, data).await
78    }
79
80    async fn set_len(&self, len: u64) -> GibbloxResult<()> {
81        (**self).set_len(len).await
82    }
83
84    async fn flush(&self) -> GibbloxResult<()> {
85        (**self).flush().await
86    }
87}
88
89/// Read-only block reader wrapper that consults a local file-style cache.
90///
91/// The cache file contains a compact header, a per-block validity bitmap, and raw backing bytes.
92/// Misses are fetched from the inner reader, written into the data region, and marked valid.
93///
94/// The cache flushes automatically after writing `flush_every_blocks` new blocks to ensure
95/// persistence across sessions. There is no dirty flag; the bitmap is the authoritative
96/// source of validity.
97pub struct CachedBlockReader<S, C> {
98    inner: S,
99    cache: C,
100    block_size: u32,
101    total_blocks: u64,
102    bitmap_offset: u64,
103    data_offset: u64,
104    flush_every_blocks: u32,
105    state: async_lock::Mutex<CacheState>,
106    in_flight: async_lock::Mutex<InFlight>,
107    mutation_lock: async_lock::Mutex<()>,
108}
109
110impl<S, C> CachedBlockReader<S, C>
111where
112    S: BlockReader,
113    C: CacheOps,
114{
115    /// Construct a cached reader using the default flush policy.
116    pub async fn new(inner: S, cache: C) -> GibbloxResult<Self> {
117        Self::with_flush_block_limit(inner, cache, DEFAULT_FLUSH_BLOCKS).await
118    }
119
120    /// Construct a cached reader with a custom flush threshold.
121    ///
122    /// The cache will flush after writing `flush_every_blocks` newly cached blocks.
123    /// Lower values provide better crash resilience at the cost of more frequent I/O.
124    /// A value of 1 flushes after every write batch (maximally safe but slowest).
125    pub async fn with_flush_block_limit(
126        inner: S,
127        cache: C,
128        flush_every_blocks: u32,
129    ) -> GibbloxResult<Self> {
130        if flush_every_blocks == 0 {
131            return Err(GibbloxError::with_message(
132                GibbloxErrorKind::InvalidInput,
133                "flush block limit must be non-zero",
134            ));
135        }
136        let block_size = inner.block_size();
137        if block_size == 0 || !block_size.is_power_of_two() {
138            return Err(GibbloxError::with_message(
139                GibbloxErrorKind::InvalidInput,
140                "block size must be non-zero power of two",
141            ));
142        }
143        let total_blocks = inner.total_blocks().await?;
144        let layout = CacheLayout::new(block_size, total_blocks)?;
145        let identity = cached_reader_identity_string(&inner);
146        let opened = open_or_initialize_cache(&cache, &layout, &identity).await?;
147        let valid_blocks = count_set_bits(&opened.valid);
148        debug!(
149            block_size,
150            total_blocks,
151            identity = %identity,
152            valid_blocks,
153            flush_every_blocks,
154            "cached block reader initialized"
155        );
156        trace!(
157            block_size,
158            total_blocks,
159            bitmap_offset = opened.mapping.bitmap_offset,
160            data_offset = opened.mapping.data_offset,
161            flush_every_blocks,
162            "cached block reader initialized"
163        );
164
165        Ok(Self {
166            inner,
167            cache,
168            block_size,
169            total_blocks,
170            bitmap_offset: opened.mapping.bitmap_offset,
171            data_offset: opened.mapping.data_offset,
172            flush_every_blocks,
173            state: async_lock::Mutex::new(CacheState {
174                valid: opened.valid,
175                blocks_since_flush: 0,
176                total_hits: 0,
177                total_misses: 0,
178                last_stats_log: 0,
179                cached_blocks: valid_blocks,
180            }),
181            in_flight: async_lock::Mutex::new(InFlight::new()),
182            mutation_lock: async_lock::Mutex::new(()),
183        })
184    }
185
186    /// Force a cache flush and clear the dirty bit if there are pending writes.
187    pub async fn flush_cache(&self) -> GibbloxResult<()> {
188        let _guard = self.mutation_lock.lock().await;
189        debug!("cache flush requested explicitly");
190        self.flush_locked_if_needed(true).await
191    }
192
193    fn block_size_usize(&self) -> usize {
194        self.block_size as usize
195    }
196
197    fn validate_range(&self, lba: u64, blocks: u64) -> GibbloxResult<()> {
198        let end = lba.checked_add(blocks).ok_or_else(|| {
199            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "lba overflow")
200        })?;
201        if end > self.total_blocks {
202            return Err(GibbloxError::with_message(
203                GibbloxErrorKind::OutOfRange,
204                "requested range exceeds total blocks",
205            ));
206        }
207        Ok(())
208    }
209
210    fn blocks_from_len(&self, len: usize) -> GibbloxResult<u64> {
211        if len == 0 {
212            return Ok(0);
213        }
214        if !len.is_multiple_of(self.block_size_usize()) {
215            return Err(GibbloxError::with_message(
216                GibbloxErrorKind::InvalidInput,
217                "buffer length must align to block size",
218            ));
219        }
220        Ok((len / self.block_size_usize()) as u64)
221    }
222
223    fn data_offset_for_block(&self, block_idx: u64) -> GibbloxResult<u64> {
224        let block_bytes = block_idx
225            .checked_mul(self.block_size as u64)
226            .ok_or_else(|| {
227                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "block offset overflow")
228            })?;
229        self.data_offset.checked_add(block_bytes).ok_or_else(|| {
230            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "data offset overflow")
231        })
232    }
233
234    async fn snapshot_segments(&self, lba: u64, blocks: u64) -> Vec<(u64, u64, bool)> {
235        let guard = self.state.lock().await;
236        let mut segments = Vec::new();
237        let mut block = lba;
238        while block < lba + blocks {
239            let hit = bit_is_set(&guard.valid, block);
240            let mut len = 1u64;
241            while block + len < lba + blocks && bit_is_set(&guard.valid, block + len) == hit {
242                len += 1;
243            }
244            segments.push((block, len, hit));
245            block += len;
246        }
247        segments
248    }
249
250    async fn invalidate_range(&self, start_block: u64, blocks: u64) {
251        if blocks == 0 {
252            return;
253        }
254        let mut guard = self.state.lock().await;
255        let removed = clear_bits_and_count_removed(&mut guard.valid, start_block, blocks);
256        guard.cached_blocks = guard.cached_blocks.saturating_sub(removed);
257    }
258
259    /// Return a snapshot of cache statistics.
260    pub async fn get_stats(&self) -> CacheStats {
261        let guard = self.state.lock().await;
262        CacheStats {
263            total_hits: guard.total_hits,
264            total_misses: guard.total_misses,
265            cached_blocks: guard.cached_blocks,
266            total_blocks: self.total_blocks,
267        }
268    }
269
270    async fn fill_from_cache(
271        &self,
272        lba: u64,
273        blocks: u64,
274        buf: &mut [u8],
275    ) -> GibbloxResult<Vec<u64>> {
276        let mut missing = Vec::new();
277        let bs = self.block_size_usize();
278        let segments = self.snapshot_segments(lba, blocks).await;
279
280        for (start_block, len_blocks, hit) in segments {
281            if !hit {
282                missing.extend(start_block..start_block + len_blocks);
283                continue;
284            }
285
286            let block_offset = (start_block - lba) as usize;
287            let byte_start = block_offset.checked_mul(bs).ok_or_else(|| {
288                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "buffer offset overflow")
289            })?;
290            let byte_len = (len_blocks as usize).checked_mul(bs).ok_or_else(|| {
291                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "buffer length overflow")
292            })?;
293            let byte_end = byte_start.checked_add(byte_len).ok_or_else(|| {
294                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "buffer end overflow")
295            })?;
296            let out = &mut buf[byte_start..byte_end];
297
298            let data_offset = self.data_offset_for_block(start_block)?;
299            let full = read_exact_at(&self.cache, data_offset, out).await?;
300            if !full {
301                self.invalidate_range(start_block, len_blocks).await;
302                missing.extend(start_block..start_block + len_blocks);
303            }
304        }
305
306        trace!(
307            lba,
308            blocks,
309            missing = missing.len(),
310            "cache read pass complete"
311        );
312
313        Ok(missing)
314    }
315
316    async fn mark_in_flight(&self, blocks: &[u64]) -> (Vec<u64>, Vec<oneshot::Receiver<()>>) {
317        let mut to_fetch = Vec::new();
318        let mut waiters = Vec::new();
319        let mut guard = self.in_flight.lock().await;
320        for block in blocks {
321            match guard.waiters.get_mut(block) {
322                Some(pending) => {
323                    let (tx, rx) = oneshot::channel();
324                    pending.push(tx);
325                    waiters.push(rx);
326                }
327                None => {
328                    guard.waiters.insert(*block, Vec::new());
329                    to_fetch.push(*block);
330                }
331            }
332        }
333        trace!(
334            requested = blocks.len(),
335            to_fetch = to_fetch.len(),
336            waiting = waiters.len(),
337            "updated in-flight block registry"
338        );
339        (to_fetch, waiters)
340    }
341
342    async fn take_waiters(&self, start_block: u64, len_blocks: u64) -> Vec<oneshot::Sender<()>> {
343        let mut guard = self.in_flight.lock().await;
344        let mut senders = Vec::new();
345        for block in start_block..(start_block + len_blocks) {
346            if let Some(mut pending) = guard.waiters.remove(&block) {
347                senders.append(&mut pending);
348            }
349        }
350        senders
351    }
352
353    fn coalesce(blocks: &[u64]) -> Vec<(u64, u64)> {
354        if blocks.is_empty() {
355            return Vec::new();
356        }
357        let mut ranges = Vec::new();
358        let mut start = blocks[0];
359        let mut len = 1u64;
360        for pair in blocks.windows(2) {
361            if let [prev, curr] = pair {
362                if *curr == *prev + 1 {
363                    len += 1;
364                } else {
365                    ranges.push((start, len));
366                    start = *curr;
367                    len = 1;
368                }
369            }
370        }
371        ranges.push((start, len));
372        ranges
373    }
374
375    async fn mark_valid_and_collect_bitmap_write(
376        &self,
377        start_block: u64,
378        blocks: u64,
379    ) -> GibbloxResult<(u64, Vec<u8>, bool)> {
380        let end_block = start_block.checked_add(blocks).ok_or_else(|| {
381            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "block range overflow")
382        })?;
383        let first_byte = (start_block / 8) as usize;
384        let last_byte = ((end_block - 1) / 8) as usize;
385
386        let mut guard = self.state.lock().await;
387        let newly_cached = set_bits_and_count_new(&mut guard.valid, start_block, blocks);
388        guard.cached_blocks = guard.cached_blocks.saturating_add(newly_cached);
389        guard.blocks_since_flush = guard.blocks_since_flush.saturating_add(blocks as u32);
390        let should_flush = guard.blocks_since_flush >= self.flush_every_blocks;
391        let chunk = guard.valid[first_byte..=last_byte].to_vec();
392        let bitmap_offset = self.bitmap_offset + first_byte as u64;
393        Ok((bitmap_offset, chunk, should_flush))
394    }
395
396    async fn flush_locked_if_needed(&self, force: bool) -> GibbloxResult<()> {
397        let (should_flush, blocks_written, valid_blocks) = {
398            let guard = self.state.lock().await;
399            let should = force || guard.blocks_since_flush >= self.flush_every_blocks;
400            (should, guard.blocks_since_flush, guard.cached_blocks)
401        };
402        if !should_flush {
403            return Ok(());
404        }
405
406        debug!(
407            force,
408            blocks_since_last_flush = blocks_written,
409            valid_blocks,
410            "flushing cache data and metadata"
411        );
412
413        self.cache.flush().await?;
414
415        let mut guard = self.state.lock().await;
416        guard.blocks_since_flush = 0;
417        debug!("cache flush completed successfully");
418        Ok(())
419    }
420
421    async fn fetch_and_populate(
422        &self,
423        ranges: &[(u64, u64)],
424        ctx: ReadContext,
425    ) -> GibbloxResult<()> {
426        let bs = self.block_size_usize();
427        for (start_block, len_blocks) in ranges {
428            trace!(
429                start_block,
430                len_blocks, "fetching missing range from inner source"
431            );
432            let expected_bytes = (*len_blocks as usize).checked_mul(bs).ok_or_else(|| {
433                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "range too large")
434            })?;
435            let mut buf = vec![0u8; expected_bytes];
436            let result = async {
437                let read = self.inner.read_blocks(*start_block, &mut buf, ctx).await?;
438                if read != expected_bytes {
439                    return Err(GibbloxError::with_message(
440                        GibbloxErrorKind::Io,
441                        "inner source returned short read",
442                    ));
443                }
444
445                let _mutation_guard = self.mutation_lock.lock().await;
446
447                let data_offset = self.data_offset_for_block(*start_block)?;
448                self.cache.write_at(data_offset, &buf).await?;
449
450                let (bitmap_offset, bitmap_chunk, should_flush) = self
451                    .mark_valid_and_collect_bitmap_write(*start_block, *len_blocks)
452                    .await?;
453                self.cache.write_at(bitmap_offset, &bitmap_chunk).await?;
454                trace!(
455                    start_block,
456                    len_blocks,
457                    bytes = expected_bytes,
458                    should_flush,
459                    "cache range populated"
460                );
461                if should_flush {
462                    self.flush_locked_if_needed(false).await?;
463                }
464
465                Ok(())
466            }
467            .await;
468
469            let senders = self.take_waiters(*start_block, *len_blocks).await;
470            for sender in senders {
471                let _ = sender.send(());
472            }
473            result?;
474        }
475        Ok(())
476    }
477
478    /// Ensure blocks [lba, lba+blocks) are cached without copying data out.
479    ///
480    /// This fetches any missing blocks from the inner reader and populates the
481    /// cache without allocating or returning a caller buffer.
482    ///
483    /// Already-cached blocks are skipped (bitmap check only). The provided
484    /// ReadContext propagates priority hints to the inner reader.
485    pub async fn ensure_cached(
486        &self,
487        lba: u64,
488        blocks: u64,
489        ctx: ReadContext,
490    ) -> GibbloxResult<()> {
491        if blocks == 0 {
492            return Ok(());
493        }
494        self.validate_range(lba, blocks)?;
495
496        // Fast-path cache probe for readahead/background callers.
497        // This avoids large temporary allocations and cache file reads when
498        // all requested blocks are already marked valid.
499        let missing = self.missing_blocks_from_bitmap(lba, blocks).await;
500        if missing.is_empty() {
501            return Ok(()); // All cached, early return
502        }
503
504        // Mark in-flight for deduplication
505        let (to_fetch, waiters) = self.mark_in_flight(&missing).await;
506
507        // Fetch missing ranges if we're the first to request them
508        if !to_fetch.is_empty() {
509            self.fetch_and_populate(&Self::coalesce(&to_fetch), ctx)
510                .await?;
511        }
512
513        // Wait for any overlapping in-flight fetches
514        for waiter in waiters {
515            let _ = waiter.await;
516        }
517
518        Ok(())
519    }
520
521    async fn missing_blocks_from_bitmap(&self, lba: u64, blocks: u64) -> Vec<u64> {
522        let mut missing = Vec::new();
523        let guard = self.state.lock().await;
524        for block in lba..(lba + blocks) {
525            if !bit_is_set(&guard.valid, block) {
526                missing.push(block);
527            }
528        }
529        missing
530    }
531}
532
533#[async_trait]
534impl<S, C> BlockReader for CachedBlockReader<S, C>
535where
536    S: BlockReader,
537    C: CacheOps,
538{
539    fn block_size(&self) -> u32 {
540        self.block_size
541    }
542
543    async fn total_blocks(&self) -> GibbloxResult<u64> {
544        Ok(self.total_blocks)
545    }
546
547    fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
548        write_cached_identity(&self.inner, out)
549    }
550
551    async fn read_blocks(
552        &self,
553        lba: u64,
554        buf: &mut [u8],
555        ctx: ReadContext,
556    ) -> GibbloxResult<usize> {
557        let blocks = self.blocks_from_len(buf.len())?;
558        if blocks == 0 {
559            return Ok(0);
560        }
561        self.validate_range(lba, blocks)?;
562
563        let missing = self.fill_from_cache(lba, blocks, buf).await?;
564
565        // Update hit/miss statistics and log periodically
566        {
567            let mut guard = self.state.lock().await;
568            if missing.is_empty() {
569                guard.total_hits += blocks;
570            } else {
571                guard.total_hits += blocks - missing.len() as u64;
572                guard.total_misses += missing.len() as u64;
573            }
574            let total_reads = guard.total_hits + guard.total_misses;
575            if total_reads > 0 && (total_reads - guard.last_stats_log) >= 1000 {
576                guard.last_stats_log = total_reads;
577                let hit_rate = (guard.total_hits as f64 / total_reads as f64) * 100.0;
578                debug!(
579                    total_hits = guard.total_hits,
580                    total_misses = guard.total_misses,
581                    hit_rate = format!("{:.1}%", hit_rate),
582                    "cache statistics"
583                );
584            }
585        }
586
587        if missing.is_empty() {
588            trace!(lba, blocks, "cache hit");
589            return Ok(buf.len());
590        }
591
592        trace!(lba, blocks, missing = missing.len(), "cache miss");
593
594        let (to_fetch, waiters) = self.mark_in_flight(&missing).await;
595        if !to_fetch.is_empty() {
596            self.fetch_and_populate(&Self::coalesce(&to_fetch), ctx)
597                .await?;
598        }
599        for waiter in waiters {
600            let _ = waiter.await;
601        }
602
603        let final_missing = self.fill_from_cache(lba, blocks, buf).await?;
604        if !final_missing.is_empty() {
605            return Err(GibbloxError::with_message(
606                GibbloxErrorKind::Io,
607                "cache fetch did not populate all blocks",
608            ));
609        }
610        Ok(buf.len())
611    }
612}
613
614fn write_cached_identity<S: BlockReader + ?Sized>(
615    inner: &S,
616    out: &mut dyn fmt::Write,
617) -> fmt::Result {
618    out.write_str("cached:(")?;
619    inner.write_identity(out)?;
620    out.write_str(")")
621}
622
623fn write_cached_config_identity<C: BlockReaderConfigIdentity + ?Sized>(
624    config: &C,
625    out: &mut dyn fmt::Write,
626) -> fmt::Result {
627    out.write_str("cached:(")?;
628    config.write_identity(out)?;
629    out.write_str(")")
630}
631
632pub fn cached_reader_identity_string<S: BlockReader + ?Sized>(inner: &S) -> String {
633    let mut identity = String::new();
634    let _ = write_cached_identity(inner, &mut identity);
635    identity
636}
637
638pub fn cached_config_identity_string<C: BlockReaderConfigIdentity + ?Sized>(config: &C) -> String {
639    let mut identity = String::new();
640    let _ = write_cached_config_identity(config, &mut identity);
641    identity
642}
643
644pub fn derive_cached_reader_identity_id<S: BlockReader + ?Sized>(
645    inner: &S,
646    total_blocks: u64,
647) -> u32 {
648    derive_block_identity_id(inner.block_size(), total_blocks, |writer| {
649        write_cached_identity(inner, writer)
650    })
651}
652
653pub fn derive_cached_config_identity_id<C: BlockReaderConfigIdentity + ?Sized>(config: &C) -> u32 {
654    struct CachedConfigIdentity<'a, C: BlockReaderConfigIdentity + ?Sized>(&'a C);
655
656    impl<C: BlockReaderConfigIdentity + ?Sized> BlockReaderConfigIdentity
657        for CachedConfigIdentity<'_, C>
658    {
659        fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
660            write_cached_config_identity(self.0, out)
661        }
662    }
663
664    derive_config_identity_id(&CachedConfigIdentity(config))
665}
666
667struct CacheLayout {
668    block_size: u32,
669    total_blocks: u64,
670    bitmap_bytes: u64,
671    data_bytes: u64,
672}
673
674impl CacheLayout {
675    fn new(block_size: u32, total_blocks: u64) -> GibbloxResult<Self> {
676        if block_size == 0 || !block_size.is_power_of_two() {
677            return Err(GibbloxError::with_message(
678                GibbloxErrorKind::InvalidInput,
679                "block size must be non-zero power of two",
680            ));
681        }
682        let bitmap_bytes = total_blocks.div_ceil(8);
683        if bitmap_bytes > usize::MAX as u64 {
684            return Err(GibbloxError::with_message(
685                GibbloxErrorKind::OutOfRange,
686                "bitmap exceeds addressable memory",
687            ));
688        }
689        let data_bytes = total_blocks.checked_mul(block_size as u64).ok_or_else(|| {
690            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "cache too large")
691        })?;
692
693        Ok(Self {
694            block_size,
695            total_blocks,
696            bitmap_bytes,
697            data_bytes,
698        })
699    }
700
701    fn bitmap_len_usize(&self) -> usize {
702        self.bitmap_bytes as usize
703    }
704}
705
706struct CacheMapping {
707    bitmap_offset: u64,
708    data_offset: u64,
709    total_len: u64,
710}
711
712impl CacheMapping {
713    fn new(layout: &CacheLayout, identity_len: usize) -> GibbloxResult<Self> {
714        if identity_len > u32::MAX as usize {
715            return Err(GibbloxError::with_message(
716                GibbloxErrorKind::InvalidInput,
717                "identity too large",
718            ));
719        }
720        let bitmap_offset = (CACHE_PREFIX_LEN as u64)
721            .checked_add(identity_len as u64)
722            .ok_or_else(|| {
723                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "metadata overflow")
724            })?;
725        let data_offset = bitmap_offset
726            .checked_add(layout.bitmap_bytes)
727            .ok_or_else(|| {
728                GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "bitmap overflow")
729            })?;
730        let total_len = data_offset.checked_add(layout.data_bytes).ok_or_else(|| {
731            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "cache length overflow")
732        })?;
733        Ok(Self {
734            bitmap_offset,
735            data_offset,
736            total_len,
737        })
738    }
739}
740
741struct CacheHeader {
742    block_size: u32,
743    total_blocks: u64,
744    identity_len: u32,
745}
746
747impl CacheHeader {
748    fn encode_prefix(&self) -> [u8; CACHE_PREFIX_LEN] {
749        let mut out = [0u8; CACHE_PREFIX_LEN];
750        out[0..7].copy_from_slice(&CACHE_MAGIC);
751        out[7] = CACHE_VERSION;
752        out[8] = 0; // reserved
753        out[9] = 0; // reserved
754        out[10..12].copy_from_slice(&0u16.to_le_bytes()); // reserved
755        out[12..16].copy_from_slice(&self.block_size.to_le_bytes());
756        out[16..24].copy_from_slice(&self.total_blocks.to_le_bytes());
757        out[24..28].copy_from_slice(&self.identity_len.to_le_bytes());
758        out
759    }
760
761    fn decode_prefix(prefix: &[u8; CACHE_PREFIX_LEN]) -> Option<Self> {
762        if prefix[0..7] != CACHE_MAGIC {
763            return None;
764        }
765        if prefix[7] != CACHE_VERSION {
766            return None;
767        }
768        let block_size = u32::from_le_bytes([prefix[12], prefix[13], prefix[14], prefix[15]]);
769        let total_blocks = u64::from_le_bytes([
770            prefix[16], prefix[17], prefix[18], prefix[19], prefix[20], prefix[21], prefix[22],
771            prefix[23],
772        ]);
773        let identity_len = u32::from_le_bytes([prefix[24], prefix[25], prefix[26], prefix[27]]);
774        Some(Self {
775            block_size,
776            total_blocks,
777            identity_len,
778        })
779    }
780}
781
782struct OpenedCache {
783    mapping: CacheMapping,
784    valid: Vec<u8>,
785}
786
787async fn open_or_initialize_cache<C: CacheOps>(
788    cache: &C,
789    layout: &CacheLayout,
790    identity: &str,
791) -> GibbloxResult<OpenedCache> {
792    debug!(
793        block_size = layout.block_size,
794        total_blocks = layout.total_blocks,
795        identity = %identity,
796        "opening cache state"
797    );
798    let mut prefix = [0u8; CACHE_PREFIX_LEN];
799    let have_prefix = read_exact_at(cache, 0, &mut prefix).await?;
800    if !have_prefix {
801        debug!("cache prefix missing; initializing new cache file");
802        return initialize_cache(cache, layout, identity).await;
803    }
804
805    let Some(header) = CacheHeader::decode_prefix(&prefix) else {
806        debug!("cache header invalid; reinitializing cache file");
807        return initialize_cache(cache, layout, identity).await;
808    };
809    if header.block_size != layout.block_size || header.total_blocks != layout.total_blocks {
810        debug!(
811            stored_block_size = header.block_size,
812            stored_total_blocks = header.total_blocks,
813            expected_block_size = layout.block_size,
814            expected_total_blocks = layout.total_blocks,
815            "cache geometry mismatch; reinitializing cache file"
816        );
817        return initialize_cache(cache, layout, identity).await;
818    }
819
820    let identity_len = header.identity_len as usize;
821    let mapping = CacheMapping::new(layout, identity_len)?;
822    let mut stored_identity = vec![0u8; identity_len];
823    if !read_exact_at(cache, CACHE_PREFIX_LEN as u64, &mut stored_identity).await? {
824        debug!("cache identity bytes missing; reinitializing cache file");
825        return initialize_cache(cache, layout, identity).await;
826    }
827    let stored_identity_str = String::from_utf8_lossy(&stored_identity);
828    if stored_identity.as_slice() != identity.as_bytes() {
829        debug!(
830            stored_identity = %stored_identity_str,
831            expected_identity = %identity,
832            "cache identity mismatch; reinitializing cache file"
833        );
834        return initialize_cache(cache, layout, identity).await;
835    }
836
837    cache.set_len(mapping.total_len).await?;
838
839    let mut valid = vec![0u8; layout.bitmap_len_usize()];
840    if !read_exact_at(cache, mapping.bitmap_offset, &mut valid).await? {
841        debug!("cache bitmap missing; reinitializing cache file");
842        return initialize_cache(cache, layout, identity).await;
843    }
844
845    let valid_blocks = valid.iter().map(|b| b.count_ones()).sum::<u32>();
846    debug!(valid_blocks, "cache opened successfully with existing data");
847
848    Ok(OpenedCache { mapping, valid })
849}
850
851async fn initialize_cache<C: CacheOps>(
852    cache: &C,
853    layout: &CacheLayout,
854    identity: &str,
855) -> GibbloxResult<OpenedCache> {
856    let identity_len = identity.len();
857    let mapping = CacheMapping::new(layout, identity_len)?;
858
859    cache.set_len(0).await?;
860    cache.set_len(mapping.total_len).await?;
861
862    let header = CacheHeader {
863        block_size: layout.block_size,
864        total_blocks: layout.total_blocks,
865        identity_len: identity_len as u32,
866    }
867    .encode_prefix();
868
869    cache.write_at(0, &header).await?;
870    cache
871        .write_at(CACHE_PREFIX_LEN as u64, identity.as_bytes())
872        .await?;
873    write_zero_region(cache, mapping.bitmap_offset, layout.bitmap_bytes).await?;
874    cache.flush().await?;
875
876    debug!(
877        block_size = layout.block_size,
878        total_blocks = layout.total_blocks,
879        identity = %identity,
880        identity_len,
881        "cache file initialized with clean state"
882    );
883
884    Ok(OpenedCache {
885        mapping,
886        valid: vec![0u8; layout.bitmap_len_usize()],
887    })
888}
889
890async fn read_exact_at<C: CacheOps>(
891    cache: &C,
892    mut offset: u64,
893    out: &mut [u8],
894) -> GibbloxResult<bool> {
895    let mut filled = 0usize;
896    while filled < out.len() {
897        let read = cache.read_at(offset, &mut out[filled..]).await?;
898        if read == 0 {
899            return Ok(false);
900        }
901        filled = filled.checked_add(read).ok_or_else(|| {
902            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read size overflow")
903        })?;
904        offset = offset.checked_add(read as u64).ok_or_else(|| {
905            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read offset overflow")
906        })?;
907    }
908    Ok(true)
909}
910
911async fn write_zero_region<C: CacheOps>(cache: &C, mut offset: u64, len: u64) -> GibbloxResult<()> {
912    if len == 0 {
913        return Ok(());
914    }
915    let mut remaining = len;
916    let zero = [0u8; ZERO_CHUNK_LEN];
917    while remaining > 0 {
918        let write_len = remaining.min(ZERO_CHUNK_LEN as u64) as usize;
919        cache.write_at(offset, &zero[..write_len]).await?;
920        offset = offset.checked_add(write_len as u64).ok_or_else(|| {
921            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "zero-fill offset overflow")
922        })?;
923        remaining -= write_len as u64;
924    }
925    Ok(())
926}
927
928struct InFlight {
929    waiters: BTreeMap<u64, Vec<oneshot::Sender<()>>>,
930}
931
932impl InFlight {
933    fn new() -> Self {
934        Self {
935            waiters: BTreeMap::new(),
936        }
937    }
938}
939
940struct CacheState {
941    valid: Vec<u8>,
942    blocks_since_flush: u32,
943    total_hits: u64,
944    total_misses: u64,
945    last_stats_log: u64,
946    cached_blocks: u64,
947}
948
949fn bit_is_set(bits: &[u8], idx: u64) -> bool {
950    let byte_idx = (idx / 8) as usize;
951    let mask = 1u8 << (idx % 8);
952    bits[byte_idx] & mask != 0
953}
954
955fn bit_mask(start_bit: u8, end_bit: u8) -> u8 {
956    let width = (end_bit - start_bit + 1) as u16;
957    let ones = if width >= 8 {
958        u16::from(u8::MAX)
959    } else {
960        (1u16 << width) - 1
961    };
962    (ones as u8) << start_bit
963}
964
965fn set_bits_and_count_new(bits: &mut [u8], start: u64, len: u64) -> u64 {
966    let mut idx = start;
967    let end = start + len;
968    let mut newly_set = 0u64;
969    while idx < end {
970        let byte_idx = (idx / 8) as usize;
971        let start_bit = (idx % 8) as u8;
972        let span = ((8 - start_bit as u64).min(end - idx)) as u8;
973        let end_bit = start_bit + span - 1;
974        let mask = bit_mask(start_bit, end_bit);
975        let before = bits[byte_idx];
976        let after = before | mask;
977        bits[byte_idx] = after;
978        newly_set += (after.count_ones() - before.count_ones()) as u64;
979        idx += span as u64;
980    }
981    newly_set
982}
983
984fn clear_bits_and_count_removed(bits: &mut [u8], start: u64, len: u64) -> u64 {
985    let mut idx = start;
986    let end = start + len;
987    let mut removed = 0u64;
988    while idx < end {
989        let byte_idx = (idx / 8) as usize;
990        let start_bit = (idx % 8) as u8;
991        let span = ((8 - start_bit as u64).min(end - idx)) as u8;
992        let end_bit = start_bit + span - 1;
993        let mask = bit_mask(start_bit, end_bit);
994        let before = bits[byte_idx];
995        let after = before & !mask;
996        bits[byte_idx] = after;
997        removed += (before.count_ones() - after.count_ones()) as u64;
998        idx += span as u64;
999    }
1000    removed
1001}
1002
1003fn count_set_bits(bits: &[u8]) -> u64 {
1004    bits.iter().map(|byte| byte.count_ones() as u64).sum()
1005}
1006
1007/// In-memory cache file implementation useful for tests and embedded callers.
1008pub struct MemoryCacheOps {
1009    state: async_lock::Mutex<Vec<u8>>,
1010}
1011
1012impl MemoryCacheOps {
1013    pub fn new() -> Self {
1014        Self {
1015            state: async_lock::Mutex::new(Vec::new()),
1016        }
1017    }
1018}
1019
1020impl Default for MemoryCacheOps {
1021    fn default() -> Self {
1022        Self::new()
1023    }
1024}
1025
1026#[async_trait]
1027impl CacheOps for MemoryCacheOps {
1028    async fn read_at(&self, offset: u64, out: &mut [u8]) -> GibbloxResult<usize> {
1029        if out.is_empty() {
1030            return Ok(0);
1031        }
1032        let guard = self.state.lock().await;
1033        let start = match usize::try_from(offset) {
1034            Ok(v) => v,
1035            Err(_) => return Ok(0),
1036        };
1037        if start >= guard.len() {
1038            return Ok(0);
1039        }
1040        let available = guard.len() - start;
1041        let copy_len = available.min(out.len());
1042        out[..copy_len].copy_from_slice(&guard[start..start + copy_len]);
1043        Ok(copy_len)
1044    }
1045
1046    async fn write_at(&self, offset: u64, data: &[u8]) -> GibbloxResult<()> {
1047        if data.is_empty() {
1048            return Ok(());
1049        }
1050        let mut guard = self.state.lock().await;
1051        let start = usize::try_from(offset).map_err(|_| {
1052            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "offset exceeds memory cache")
1053        })?;
1054        let end = start.checked_add(data.len()).ok_or_else(|| {
1055            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "write overflow")
1056        })?;
1057        if end > guard.len() {
1058            guard.resize(end, 0);
1059        }
1060        guard[start..end].copy_from_slice(data);
1061        Ok(())
1062    }
1063
1064    async fn set_len(&self, len: u64) -> GibbloxResult<()> {
1065        let len = usize::try_from(len).map_err(|_| {
1066            GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "length exceeds memory cache")
1067        })?;
1068        let mut guard = self.state.lock().await;
1069        guard.resize(len, 0);
1070        Ok(())
1071    }
1072
1073    async fn flush(&self) -> GibbloxResult<()> {
1074        Ok(())
1075    }
1076}