1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4
5use alloc::{boxed::Box, collections::BTreeMap, format, string::String, sync::Arc, vec, vec::Vec};
6use async_trait::async_trait;
7use core::fmt;
8use futures_channel::oneshot;
9use gibblox_core::{
10 BlockReader, BlockReaderConfigIdentity, GibbloxError, GibbloxErrorKind, GibbloxResult,
11 ReadContext, derive_block_identity_id, derive_config_identity_id,
12};
13use tracing::{debug, trace};
14
15const CACHE_MAGIC: [u8; 7] = *b"GIBBLX!";
16const CACHE_VERSION: u8 = 2;
17const CACHE_PREFIX_LEN: usize = 28;
18const DEFAULT_FLUSH_BLOCKS: u32 = 64;
19const ZERO_CHUNK_LEN: usize = 4096;
20
21#[derive(Clone, Copy, Debug)]
23#[cfg_attr(target_arch = "wasm32", derive(serde::Serialize))]
24pub struct CacheStats {
25 pub total_hits: u64,
26 pub total_misses: u64,
27 pub cached_blocks: u64,
28 pub total_blocks: u64,
29}
30
31impl CacheStats {
32 pub fn hit_rate(&self) -> f64 {
33 let total = self.total_hits + self.total_misses;
34 if total == 0 {
35 return 0.0;
36 }
37 (self.total_hits as f64 / total as f64) * 100.0
38 }
39
40 pub fn fill_rate(&self) -> f64 {
41 if self.total_blocks == 0 {
42 return 0.0;
43 }
44 (self.cached_blocks as f64 / self.total_blocks as f64) * 100.0
45 }
46}
47
48#[async_trait]
53pub trait CacheOps: Send + Sync {
54 async fn read_at(&self, offset: u64, out: &mut [u8]) -> GibbloxResult<usize>;
56
57 async fn write_at(&self, offset: u64, data: &[u8]) -> GibbloxResult<()>;
59
60 async fn set_len(&self, len: u64) -> GibbloxResult<()>;
62
63 async fn flush(&self) -> GibbloxResult<()>;
65}
66
67#[async_trait]
68impl<T> CacheOps for Arc<T>
69where
70 T: CacheOps + ?Sized,
71{
72 async fn read_at(&self, offset: u64, out: &mut [u8]) -> GibbloxResult<usize> {
73 (**self).read_at(offset, out).await
74 }
75
76 async fn write_at(&self, offset: u64, data: &[u8]) -> GibbloxResult<()> {
77 (**self).write_at(offset, data).await
78 }
79
80 async fn set_len(&self, len: u64) -> GibbloxResult<()> {
81 (**self).set_len(len).await
82 }
83
84 async fn flush(&self) -> GibbloxResult<()> {
85 (**self).flush().await
86 }
87}
88
89pub struct CachedBlockReader<S, C> {
98 inner: S,
99 cache: C,
100 block_size: u32,
101 total_blocks: u64,
102 bitmap_offset: u64,
103 data_offset: u64,
104 flush_every_blocks: u32,
105 state: async_lock::Mutex<CacheState>,
106 in_flight: async_lock::Mutex<InFlight>,
107 mutation_lock: async_lock::Mutex<()>,
108}
109
110impl<S, C> CachedBlockReader<S, C>
111where
112 S: BlockReader,
113 C: CacheOps,
114{
115 pub async fn new(inner: S, cache: C) -> GibbloxResult<Self> {
117 Self::with_flush_block_limit(inner, cache, DEFAULT_FLUSH_BLOCKS).await
118 }
119
120 pub async fn with_flush_block_limit(
126 inner: S,
127 cache: C,
128 flush_every_blocks: u32,
129 ) -> GibbloxResult<Self> {
130 if flush_every_blocks == 0 {
131 return Err(GibbloxError::with_message(
132 GibbloxErrorKind::InvalidInput,
133 "flush block limit must be non-zero",
134 ));
135 }
136 let block_size = inner.block_size();
137 if block_size == 0 || !block_size.is_power_of_two() {
138 return Err(GibbloxError::with_message(
139 GibbloxErrorKind::InvalidInput,
140 "block size must be non-zero power of two",
141 ));
142 }
143 let total_blocks = inner.total_blocks().await?;
144 let layout = CacheLayout::new(block_size, total_blocks)?;
145 let identity = cached_reader_identity_string(&inner);
146 let opened = open_or_initialize_cache(&cache, &layout, &identity).await?;
147 let valid_blocks = count_set_bits(&opened.valid);
148 debug!(
149 block_size,
150 total_blocks,
151 identity = %identity,
152 valid_blocks,
153 flush_every_blocks,
154 "cached block reader initialized"
155 );
156 trace!(
157 block_size,
158 total_blocks,
159 bitmap_offset = opened.mapping.bitmap_offset,
160 data_offset = opened.mapping.data_offset,
161 flush_every_blocks,
162 "cached block reader initialized"
163 );
164
165 Ok(Self {
166 inner,
167 cache,
168 block_size,
169 total_blocks,
170 bitmap_offset: opened.mapping.bitmap_offset,
171 data_offset: opened.mapping.data_offset,
172 flush_every_blocks,
173 state: async_lock::Mutex::new(CacheState {
174 valid: opened.valid,
175 blocks_since_flush: 0,
176 total_hits: 0,
177 total_misses: 0,
178 last_stats_log: 0,
179 cached_blocks: valid_blocks,
180 }),
181 in_flight: async_lock::Mutex::new(InFlight::new()),
182 mutation_lock: async_lock::Mutex::new(()),
183 })
184 }
185
186 pub async fn flush_cache(&self) -> GibbloxResult<()> {
188 let _guard = self.mutation_lock.lock().await;
189 debug!("cache flush requested explicitly");
190 self.flush_locked_if_needed(true).await
191 }
192
193 fn block_size_usize(&self) -> usize {
194 self.block_size as usize
195 }
196
197 fn validate_range(&self, lba: u64, blocks: u64) -> GibbloxResult<()> {
198 let end = lba.checked_add(blocks).ok_or_else(|| {
199 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "lba overflow")
200 })?;
201 if end > self.total_blocks {
202 return Err(GibbloxError::with_message(
203 GibbloxErrorKind::OutOfRange,
204 "requested range exceeds total blocks",
205 ));
206 }
207 Ok(())
208 }
209
210 fn blocks_from_len(&self, len: usize) -> GibbloxResult<u64> {
211 if len == 0 {
212 return Ok(0);
213 }
214 if !len.is_multiple_of(self.block_size_usize()) {
215 return Err(GibbloxError::with_message(
216 GibbloxErrorKind::InvalidInput,
217 "buffer length must align to block size",
218 ));
219 }
220 Ok((len / self.block_size_usize()) as u64)
221 }
222
223 fn data_offset_for_block(&self, block_idx: u64) -> GibbloxResult<u64> {
224 let block_bytes = block_idx
225 .checked_mul(self.block_size as u64)
226 .ok_or_else(|| {
227 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "block offset overflow")
228 })?;
229 self.data_offset.checked_add(block_bytes).ok_or_else(|| {
230 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "data offset overflow")
231 })
232 }
233
234 async fn snapshot_segments(&self, lba: u64, blocks: u64) -> Vec<(u64, u64, bool)> {
235 let guard = self.state.lock().await;
236 let mut segments = Vec::new();
237 let mut block = lba;
238 while block < lba + blocks {
239 let hit = bit_is_set(&guard.valid, block);
240 let mut len = 1u64;
241 while block + len < lba + blocks && bit_is_set(&guard.valid, block + len) == hit {
242 len += 1;
243 }
244 segments.push((block, len, hit));
245 block += len;
246 }
247 segments
248 }
249
250 async fn invalidate_range(&self, start_block: u64, blocks: u64) {
251 if blocks == 0 {
252 return;
253 }
254 let mut guard = self.state.lock().await;
255 let removed = clear_bits_and_count_removed(&mut guard.valid, start_block, blocks);
256 guard.cached_blocks = guard.cached_blocks.saturating_sub(removed);
257 }
258
259 pub async fn get_stats(&self) -> CacheStats {
261 let guard = self.state.lock().await;
262 CacheStats {
263 total_hits: guard.total_hits,
264 total_misses: guard.total_misses,
265 cached_blocks: guard.cached_blocks,
266 total_blocks: self.total_blocks,
267 }
268 }
269
270 async fn fill_from_cache(
271 &self,
272 lba: u64,
273 blocks: u64,
274 buf: &mut [u8],
275 ) -> GibbloxResult<Vec<u64>> {
276 let mut missing = Vec::new();
277 let bs = self.block_size_usize();
278 let segments = self.snapshot_segments(lba, blocks).await;
279
280 for (start_block, len_blocks, hit) in segments {
281 if !hit {
282 missing.extend(start_block..start_block + len_blocks);
283 continue;
284 }
285
286 let block_offset = (start_block - lba) as usize;
287 let byte_start = block_offset.checked_mul(bs).ok_or_else(|| {
288 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "buffer offset overflow")
289 })?;
290 let byte_len = (len_blocks as usize).checked_mul(bs).ok_or_else(|| {
291 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "buffer length overflow")
292 })?;
293 let byte_end = byte_start.checked_add(byte_len).ok_or_else(|| {
294 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "buffer end overflow")
295 })?;
296 let out = &mut buf[byte_start..byte_end];
297
298 let data_offset = self.data_offset_for_block(start_block)?;
299 let full = read_exact_at(&self.cache, data_offset, out).await?;
300 if !full {
301 self.invalidate_range(start_block, len_blocks).await;
302 missing.extend(start_block..start_block + len_blocks);
303 }
304 }
305
306 trace!(
307 lba,
308 blocks,
309 missing = missing.len(),
310 "cache read pass complete"
311 );
312
313 Ok(missing)
314 }
315
316 async fn mark_in_flight(&self, blocks: &[u64]) -> (Vec<u64>, Vec<oneshot::Receiver<()>>) {
317 let mut to_fetch = Vec::new();
318 let mut waiters = Vec::new();
319 let mut guard = self.in_flight.lock().await;
320 for block in blocks {
321 match guard.waiters.get_mut(block) {
322 Some(pending) => {
323 let (tx, rx) = oneshot::channel();
324 pending.push(tx);
325 waiters.push(rx);
326 }
327 None => {
328 guard.waiters.insert(*block, Vec::new());
329 to_fetch.push(*block);
330 }
331 }
332 }
333 trace!(
334 requested = blocks.len(),
335 to_fetch = to_fetch.len(),
336 waiting = waiters.len(),
337 "updated in-flight block registry"
338 );
339 (to_fetch, waiters)
340 }
341
342 async fn take_waiters(&self, start_block: u64, len_blocks: u64) -> Vec<oneshot::Sender<()>> {
343 let mut guard = self.in_flight.lock().await;
344 let mut senders = Vec::new();
345 for block in start_block..(start_block + len_blocks) {
346 if let Some(mut pending) = guard.waiters.remove(&block) {
347 senders.append(&mut pending);
348 }
349 }
350 senders
351 }
352
353 fn coalesce(blocks: &[u64]) -> Vec<(u64, u64)> {
354 if blocks.is_empty() {
355 return Vec::new();
356 }
357 let mut ranges = Vec::new();
358 let mut start = blocks[0];
359 let mut len = 1u64;
360 for pair in blocks.windows(2) {
361 if let [prev, curr] = pair {
362 if *curr == *prev + 1 {
363 len += 1;
364 } else {
365 ranges.push((start, len));
366 start = *curr;
367 len = 1;
368 }
369 }
370 }
371 ranges.push((start, len));
372 ranges
373 }
374
375 async fn mark_valid_and_collect_bitmap_write(
376 &self,
377 start_block: u64,
378 blocks: u64,
379 ) -> GibbloxResult<(u64, Vec<u8>, bool)> {
380 let end_block = start_block.checked_add(blocks).ok_or_else(|| {
381 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "block range overflow")
382 })?;
383 let first_byte = (start_block / 8) as usize;
384 let last_byte = ((end_block - 1) / 8) as usize;
385
386 let mut guard = self.state.lock().await;
387 let newly_cached = set_bits_and_count_new(&mut guard.valid, start_block, blocks);
388 guard.cached_blocks = guard.cached_blocks.saturating_add(newly_cached);
389 guard.blocks_since_flush = guard.blocks_since_flush.saturating_add(blocks as u32);
390 let should_flush = guard.blocks_since_flush >= self.flush_every_blocks;
391 let chunk = guard.valid[first_byte..=last_byte].to_vec();
392 let bitmap_offset = self.bitmap_offset + first_byte as u64;
393 Ok((bitmap_offset, chunk, should_flush))
394 }
395
396 async fn flush_locked_if_needed(&self, force: bool) -> GibbloxResult<()> {
397 let (should_flush, blocks_written, valid_blocks) = {
398 let guard = self.state.lock().await;
399 let should = force || guard.blocks_since_flush >= self.flush_every_blocks;
400 (should, guard.blocks_since_flush, guard.cached_blocks)
401 };
402 if !should_flush {
403 return Ok(());
404 }
405
406 debug!(
407 force,
408 blocks_since_last_flush = blocks_written,
409 valid_blocks,
410 "flushing cache data and metadata"
411 );
412
413 self.cache.flush().await?;
414
415 let mut guard = self.state.lock().await;
416 guard.blocks_since_flush = 0;
417 debug!("cache flush completed successfully");
418 Ok(())
419 }
420
421 async fn fetch_and_populate(
422 &self,
423 ranges: &[(u64, u64)],
424 ctx: ReadContext,
425 ) -> GibbloxResult<()> {
426 let bs = self.block_size_usize();
427 for (start_block, len_blocks) in ranges {
428 trace!(
429 start_block,
430 len_blocks, "fetching missing range from inner source"
431 );
432 let expected_bytes = (*len_blocks as usize).checked_mul(bs).ok_or_else(|| {
433 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "range too large")
434 })?;
435 let mut buf = vec![0u8; expected_bytes];
436 let result = async {
437 let read = self.inner.read_blocks(*start_block, &mut buf, ctx).await?;
438 if read != expected_bytes {
439 return Err(GibbloxError::with_message(
440 GibbloxErrorKind::Io,
441 "inner source returned short read",
442 ));
443 }
444
445 let _mutation_guard = self.mutation_lock.lock().await;
446
447 let data_offset = self.data_offset_for_block(*start_block)?;
448 self.cache.write_at(data_offset, &buf).await?;
449
450 let (bitmap_offset, bitmap_chunk, should_flush) = self
451 .mark_valid_and_collect_bitmap_write(*start_block, *len_blocks)
452 .await?;
453 self.cache.write_at(bitmap_offset, &bitmap_chunk).await?;
454 trace!(
455 start_block,
456 len_blocks,
457 bytes = expected_bytes,
458 should_flush,
459 "cache range populated"
460 );
461 if should_flush {
462 self.flush_locked_if_needed(false).await?;
463 }
464
465 Ok(())
466 }
467 .await;
468
469 let senders = self.take_waiters(*start_block, *len_blocks).await;
470 for sender in senders {
471 let _ = sender.send(());
472 }
473 result?;
474 }
475 Ok(())
476 }
477
478 pub async fn ensure_cached(
486 &self,
487 lba: u64,
488 blocks: u64,
489 ctx: ReadContext,
490 ) -> GibbloxResult<()> {
491 if blocks == 0 {
492 return Ok(());
493 }
494 self.validate_range(lba, blocks)?;
495
496 let missing = self.missing_blocks_from_bitmap(lba, blocks).await;
500 if missing.is_empty() {
501 return Ok(()); }
503
504 let (to_fetch, waiters) = self.mark_in_flight(&missing).await;
506
507 if !to_fetch.is_empty() {
509 self.fetch_and_populate(&Self::coalesce(&to_fetch), ctx)
510 .await?;
511 }
512
513 for waiter in waiters {
515 let _ = waiter.await;
516 }
517
518 Ok(())
519 }
520
521 async fn missing_blocks_from_bitmap(&self, lba: u64, blocks: u64) -> Vec<u64> {
522 let mut missing = Vec::new();
523 let guard = self.state.lock().await;
524 for block in lba..(lba + blocks) {
525 if !bit_is_set(&guard.valid, block) {
526 missing.push(block);
527 }
528 }
529 missing
530 }
531}
532
533#[async_trait]
534impl<S, C> BlockReader for CachedBlockReader<S, C>
535where
536 S: BlockReader,
537 C: CacheOps,
538{
539 fn block_size(&self) -> u32 {
540 self.block_size
541 }
542
543 async fn total_blocks(&self) -> GibbloxResult<u64> {
544 Ok(self.total_blocks)
545 }
546
547 fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
548 write_cached_identity(&self.inner, out)
549 }
550
551 async fn read_blocks(
552 &self,
553 lba: u64,
554 buf: &mut [u8],
555 ctx: ReadContext,
556 ) -> GibbloxResult<usize> {
557 let blocks = self.blocks_from_len(buf.len())?;
558 if blocks == 0 {
559 return Ok(0);
560 }
561 self.validate_range(lba, blocks)?;
562
563 let missing = self.fill_from_cache(lba, blocks, buf).await?;
564
565 {
567 let mut guard = self.state.lock().await;
568 if missing.is_empty() {
569 guard.total_hits += blocks;
570 } else {
571 guard.total_hits += blocks - missing.len() as u64;
572 guard.total_misses += missing.len() as u64;
573 }
574 let total_reads = guard.total_hits + guard.total_misses;
575 if total_reads > 0 && (total_reads - guard.last_stats_log) >= 1000 {
576 guard.last_stats_log = total_reads;
577 let hit_rate = (guard.total_hits as f64 / total_reads as f64) * 100.0;
578 debug!(
579 total_hits = guard.total_hits,
580 total_misses = guard.total_misses,
581 hit_rate = format!("{:.1}%", hit_rate),
582 "cache statistics"
583 );
584 }
585 }
586
587 if missing.is_empty() {
588 trace!(lba, blocks, "cache hit");
589 return Ok(buf.len());
590 }
591
592 trace!(lba, blocks, missing = missing.len(), "cache miss");
593
594 let (to_fetch, waiters) = self.mark_in_flight(&missing).await;
595 if !to_fetch.is_empty() {
596 self.fetch_and_populate(&Self::coalesce(&to_fetch), ctx)
597 .await?;
598 }
599 for waiter in waiters {
600 let _ = waiter.await;
601 }
602
603 let final_missing = self.fill_from_cache(lba, blocks, buf).await?;
604 if !final_missing.is_empty() {
605 return Err(GibbloxError::with_message(
606 GibbloxErrorKind::Io,
607 "cache fetch did not populate all blocks",
608 ));
609 }
610 Ok(buf.len())
611 }
612}
613
614fn write_cached_identity<S: BlockReader + ?Sized>(
615 inner: &S,
616 out: &mut dyn fmt::Write,
617) -> fmt::Result {
618 out.write_str("cached:(")?;
619 inner.write_identity(out)?;
620 out.write_str(")")
621}
622
623fn write_cached_config_identity<C: BlockReaderConfigIdentity + ?Sized>(
624 config: &C,
625 out: &mut dyn fmt::Write,
626) -> fmt::Result {
627 out.write_str("cached:(")?;
628 config.write_identity(out)?;
629 out.write_str(")")
630}
631
632pub fn cached_reader_identity_string<S: BlockReader + ?Sized>(inner: &S) -> String {
633 let mut identity = String::new();
634 let _ = write_cached_identity(inner, &mut identity);
635 identity
636}
637
638pub fn cached_config_identity_string<C: BlockReaderConfigIdentity + ?Sized>(config: &C) -> String {
639 let mut identity = String::new();
640 let _ = write_cached_config_identity(config, &mut identity);
641 identity
642}
643
644pub fn derive_cached_reader_identity_id<S: BlockReader + ?Sized>(
645 inner: &S,
646 total_blocks: u64,
647) -> u32 {
648 derive_block_identity_id(inner.block_size(), total_blocks, |writer| {
649 write_cached_identity(inner, writer)
650 })
651}
652
653pub fn derive_cached_config_identity_id<C: BlockReaderConfigIdentity + ?Sized>(config: &C) -> u32 {
654 struct CachedConfigIdentity<'a, C: BlockReaderConfigIdentity + ?Sized>(&'a C);
655
656 impl<C: BlockReaderConfigIdentity + ?Sized> BlockReaderConfigIdentity
657 for CachedConfigIdentity<'_, C>
658 {
659 fn write_identity(&self, out: &mut dyn fmt::Write) -> fmt::Result {
660 write_cached_config_identity(self.0, out)
661 }
662 }
663
664 derive_config_identity_id(&CachedConfigIdentity(config))
665}
666
667struct CacheLayout {
668 block_size: u32,
669 total_blocks: u64,
670 bitmap_bytes: u64,
671 data_bytes: u64,
672}
673
674impl CacheLayout {
675 fn new(block_size: u32, total_blocks: u64) -> GibbloxResult<Self> {
676 if block_size == 0 || !block_size.is_power_of_two() {
677 return Err(GibbloxError::with_message(
678 GibbloxErrorKind::InvalidInput,
679 "block size must be non-zero power of two",
680 ));
681 }
682 let bitmap_bytes = total_blocks.div_ceil(8);
683 if bitmap_bytes > usize::MAX as u64 {
684 return Err(GibbloxError::with_message(
685 GibbloxErrorKind::OutOfRange,
686 "bitmap exceeds addressable memory",
687 ));
688 }
689 let data_bytes = total_blocks.checked_mul(block_size as u64).ok_or_else(|| {
690 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "cache too large")
691 })?;
692
693 Ok(Self {
694 block_size,
695 total_blocks,
696 bitmap_bytes,
697 data_bytes,
698 })
699 }
700
701 fn bitmap_len_usize(&self) -> usize {
702 self.bitmap_bytes as usize
703 }
704}
705
706struct CacheMapping {
707 bitmap_offset: u64,
708 data_offset: u64,
709 total_len: u64,
710}
711
712impl CacheMapping {
713 fn new(layout: &CacheLayout, identity_len: usize) -> GibbloxResult<Self> {
714 if identity_len > u32::MAX as usize {
715 return Err(GibbloxError::with_message(
716 GibbloxErrorKind::InvalidInput,
717 "identity too large",
718 ));
719 }
720 let bitmap_offset = (CACHE_PREFIX_LEN as u64)
721 .checked_add(identity_len as u64)
722 .ok_or_else(|| {
723 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "metadata overflow")
724 })?;
725 let data_offset = bitmap_offset
726 .checked_add(layout.bitmap_bytes)
727 .ok_or_else(|| {
728 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "bitmap overflow")
729 })?;
730 let total_len = data_offset.checked_add(layout.data_bytes).ok_or_else(|| {
731 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "cache length overflow")
732 })?;
733 Ok(Self {
734 bitmap_offset,
735 data_offset,
736 total_len,
737 })
738 }
739}
740
741struct CacheHeader {
742 block_size: u32,
743 total_blocks: u64,
744 identity_len: u32,
745}
746
747impl CacheHeader {
748 fn encode_prefix(&self) -> [u8; CACHE_PREFIX_LEN] {
749 let mut out = [0u8; CACHE_PREFIX_LEN];
750 out[0..7].copy_from_slice(&CACHE_MAGIC);
751 out[7] = CACHE_VERSION;
752 out[8] = 0; out[9] = 0; out[10..12].copy_from_slice(&0u16.to_le_bytes()); out[12..16].copy_from_slice(&self.block_size.to_le_bytes());
756 out[16..24].copy_from_slice(&self.total_blocks.to_le_bytes());
757 out[24..28].copy_from_slice(&self.identity_len.to_le_bytes());
758 out
759 }
760
761 fn decode_prefix(prefix: &[u8; CACHE_PREFIX_LEN]) -> Option<Self> {
762 if prefix[0..7] != CACHE_MAGIC {
763 return None;
764 }
765 if prefix[7] != CACHE_VERSION {
766 return None;
767 }
768 let block_size = u32::from_le_bytes([prefix[12], prefix[13], prefix[14], prefix[15]]);
769 let total_blocks = u64::from_le_bytes([
770 prefix[16], prefix[17], prefix[18], prefix[19], prefix[20], prefix[21], prefix[22],
771 prefix[23],
772 ]);
773 let identity_len = u32::from_le_bytes([prefix[24], prefix[25], prefix[26], prefix[27]]);
774 Some(Self {
775 block_size,
776 total_blocks,
777 identity_len,
778 })
779 }
780}
781
782struct OpenedCache {
783 mapping: CacheMapping,
784 valid: Vec<u8>,
785}
786
787async fn open_or_initialize_cache<C: CacheOps>(
788 cache: &C,
789 layout: &CacheLayout,
790 identity: &str,
791) -> GibbloxResult<OpenedCache> {
792 debug!(
793 block_size = layout.block_size,
794 total_blocks = layout.total_blocks,
795 identity = %identity,
796 "opening cache state"
797 );
798 let mut prefix = [0u8; CACHE_PREFIX_LEN];
799 let have_prefix = read_exact_at(cache, 0, &mut prefix).await?;
800 if !have_prefix {
801 debug!("cache prefix missing; initializing new cache file");
802 return initialize_cache(cache, layout, identity).await;
803 }
804
805 let Some(header) = CacheHeader::decode_prefix(&prefix) else {
806 debug!("cache header invalid; reinitializing cache file");
807 return initialize_cache(cache, layout, identity).await;
808 };
809 if header.block_size != layout.block_size || header.total_blocks != layout.total_blocks {
810 debug!(
811 stored_block_size = header.block_size,
812 stored_total_blocks = header.total_blocks,
813 expected_block_size = layout.block_size,
814 expected_total_blocks = layout.total_blocks,
815 "cache geometry mismatch; reinitializing cache file"
816 );
817 return initialize_cache(cache, layout, identity).await;
818 }
819
820 let identity_len = header.identity_len as usize;
821 let mapping = CacheMapping::new(layout, identity_len)?;
822 let mut stored_identity = vec![0u8; identity_len];
823 if !read_exact_at(cache, CACHE_PREFIX_LEN as u64, &mut stored_identity).await? {
824 debug!("cache identity bytes missing; reinitializing cache file");
825 return initialize_cache(cache, layout, identity).await;
826 }
827 let stored_identity_str = String::from_utf8_lossy(&stored_identity);
828 if stored_identity.as_slice() != identity.as_bytes() {
829 debug!(
830 stored_identity = %stored_identity_str,
831 expected_identity = %identity,
832 "cache identity mismatch; reinitializing cache file"
833 );
834 return initialize_cache(cache, layout, identity).await;
835 }
836
837 cache.set_len(mapping.total_len).await?;
838
839 let mut valid = vec![0u8; layout.bitmap_len_usize()];
840 if !read_exact_at(cache, mapping.bitmap_offset, &mut valid).await? {
841 debug!("cache bitmap missing; reinitializing cache file");
842 return initialize_cache(cache, layout, identity).await;
843 }
844
845 let valid_blocks = valid.iter().map(|b| b.count_ones()).sum::<u32>();
846 debug!(valid_blocks, "cache opened successfully with existing data");
847
848 Ok(OpenedCache { mapping, valid })
849}
850
851async fn initialize_cache<C: CacheOps>(
852 cache: &C,
853 layout: &CacheLayout,
854 identity: &str,
855) -> GibbloxResult<OpenedCache> {
856 let identity_len = identity.len();
857 let mapping = CacheMapping::new(layout, identity_len)?;
858
859 cache.set_len(0).await?;
860 cache.set_len(mapping.total_len).await?;
861
862 let header = CacheHeader {
863 block_size: layout.block_size,
864 total_blocks: layout.total_blocks,
865 identity_len: identity_len as u32,
866 }
867 .encode_prefix();
868
869 cache.write_at(0, &header).await?;
870 cache
871 .write_at(CACHE_PREFIX_LEN as u64, identity.as_bytes())
872 .await?;
873 write_zero_region(cache, mapping.bitmap_offset, layout.bitmap_bytes).await?;
874 cache.flush().await?;
875
876 debug!(
877 block_size = layout.block_size,
878 total_blocks = layout.total_blocks,
879 identity = %identity,
880 identity_len,
881 "cache file initialized with clean state"
882 );
883
884 Ok(OpenedCache {
885 mapping,
886 valid: vec![0u8; layout.bitmap_len_usize()],
887 })
888}
889
890async fn read_exact_at<C: CacheOps>(
891 cache: &C,
892 mut offset: u64,
893 out: &mut [u8],
894) -> GibbloxResult<bool> {
895 let mut filled = 0usize;
896 while filled < out.len() {
897 let read = cache.read_at(offset, &mut out[filled..]).await?;
898 if read == 0 {
899 return Ok(false);
900 }
901 filled = filled.checked_add(read).ok_or_else(|| {
902 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read size overflow")
903 })?;
904 offset = offset.checked_add(read as u64).ok_or_else(|| {
905 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "read offset overflow")
906 })?;
907 }
908 Ok(true)
909}
910
911async fn write_zero_region<C: CacheOps>(cache: &C, mut offset: u64, len: u64) -> GibbloxResult<()> {
912 if len == 0 {
913 return Ok(());
914 }
915 let mut remaining = len;
916 let zero = [0u8; ZERO_CHUNK_LEN];
917 while remaining > 0 {
918 let write_len = remaining.min(ZERO_CHUNK_LEN as u64) as usize;
919 cache.write_at(offset, &zero[..write_len]).await?;
920 offset = offset.checked_add(write_len as u64).ok_or_else(|| {
921 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "zero-fill offset overflow")
922 })?;
923 remaining -= write_len as u64;
924 }
925 Ok(())
926}
927
928struct InFlight {
929 waiters: BTreeMap<u64, Vec<oneshot::Sender<()>>>,
930}
931
932impl InFlight {
933 fn new() -> Self {
934 Self {
935 waiters: BTreeMap::new(),
936 }
937 }
938}
939
940struct CacheState {
941 valid: Vec<u8>,
942 blocks_since_flush: u32,
943 total_hits: u64,
944 total_misses: u64,
945 last_stats_log: u64,
946 cached_blocks: u64,
947}
948
949fn bit_is_set(bits: &[u8], idx: u64) -> bool {
950 let byte_idx = (idx / 8) as usize;
951 let mask = 1u8 << (idx % 8);
952 bits[byte_idx] & mask != 0
953}
954
955fn bit_mask(start_bit: u8, end_bit: u8) -> u8 {
956 let width = (end_bit - start_bit + 1) as u16;
957 let ones = if width >= 8 {
958 u16::from(u8::MAX)
959 } else {
960 (1u16 << width) - 1
961 };
962 (ones as u8) << start_bit
963}
964
965fn set_bits_and_count_new(bits: &mut [u8], start: u64, len: u64) -> u64 {
966 let mut idx = start;
967 let end = start + len;
968 let mut newly_set = 0u64;
969 while idx < end {
970 let byte_idx = (idx / 8) as usize;
971 let start_bit = (idx % 8) as u8;
972 let span = ((8 - start_bit as u64).min(end - idx)) as u8;
973 let end_bit = start_bit + span - 1;
974 let mask = bit_mask(start_bit, end_bit);
975 let before = bits[byte_idx];
976 let after = before | mask;
977 bits[byte_idx] = after;
978 newly_set += (after.count_ones() - before.count_ones()) as u64;
979 idx += span as u64;
980 }
981 newly_set
982}
983
984fn clear_bits_and_count_removed(bits: &mut [u8], start: u64, len: u64) -> u64 {
985 let mut idx = start;
986 let end = start + len;
987 let mut removed = 0u64;
988 while idx < end {
989 let byte_idx = (idx / 8) as usize;
990 let start_bit = (idx % 8) as u8;
991 let span = ((8 - start_bit as u64).min(end - idx)) as u8;
992 let end_bit = start_bit + span - 1;
993 let mask = bit_mask(start_bit, end_bit);
994 let before = bits[byte_idx];
995 let after = before & !mask;
996 bits[byte_idx] = after;
997 removed += (before.count_ones() - after.count_ones()) as u64;
998 idx += span as u64;
999 }
1000 removed
1001}
1002
1003fn count_set_bits(bits: &[u8]) -> u64 {
1004 bits.iter().map(|byte| byte.count_ones() as u64).sum()
1005}
1006
1007pub struct MemoryCacheOps {
1009 state: async_lock::Mutex<Vec<u8>>,
1010}
1011
1012impl MemoryCacheOps {
1013 pub fn new() -> Self {
1014 Self {
1015 state: async_lock::Mutex::new(Vec::new()),
1016 }
1017 }
1018}
1019
1020impl Default for MemoryCacheOps {
1021 fn default() -> Self {
1022 Self::new()
1023 }
1024}
1025
1026#[async_trait]
1027impl CacheOps for MemoryCacheOps {
1028 async fn read_at(&self, offset: u64, out: &mut [u8]) -> GibbloxResult<usize> {
1029 if out.is_empty() {
1030 return Ok(0);
1031 }
1032 let guard = self.state.lock().await;
1033 let start = match usize::try_from(offset) {
1034 Ok(v) => v,
1035 Err(_) => return Ok(0),
1036 };
1037 if start >= guard.len() {
1038 return Ok(0);
1039 }
1040 let available = guard.len() - start;
1041 let copy_len = available.min(out.len());
1042 out[..copy_len].copy_from_slice(&guard[start..start + copy_len]);
1043 Ok(copy_len)
1044 }
1045
1046 async fn write_at(&self, offset: u64, data: &[u8]) -> GibbloxResult<()> {
1047 if data.is_empty() {
1048 return Ok(());
1049 }
1050 let mut guard = self.state.lock().await;
1051 let start = usize::try_from(offset).map_err(|_| {
1052 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "offset exceeds memory cache")
1053 })?;
1054 let end = start.checked_add(data.len()).ok_or_else(|| {
1055 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "write overflow")
1056 })?;
1057 if end > guard.len() {
1058 guard.resize(end, 0);
1059 }
1060 guard[start..end].copy_from_slice(data);
1061 Ok(())
1062 }
1063
1064 async fn set_len(&self, len: u64) -> GibbloxResult<()> {
1065 let len = usize::try_from(len).map_err(|_| {
1066 GibbloxError::with_message(GibbloxErrorKind::OutOfRange, "length exceeds memory cache")
1067 })?;
1068 let mut guard = self.state.lock().await;
1069 guard.resize(len, 0);
1070 Ok(())
1071 }
1072
1073 async fn flush(&self) -> GibbloxResult<()> {
1074 Ok(())
1075 }
1076}