use std::{cmp::Ordering, io::prelude::*, rc::Rc, time::Instant};
use anyhow::{Context, Result, anyhow, ensure};
use mut_binary_heap::{BinaryHeap, FnComparator};
use rustc_hash::FxHashSet;
use tracing::*;
use crate::backend;
use crate::blob;
use crate::counters;
use crate::file_util;
use crate::hashing::{HashingReader, ObjectId};
use crate::index;
use crate::pack;
type ZstdDecoder<R> = zstd::stream::read::Decoder<'static, R>;
struct TimestampedChunk {
stamp: Instant,
chunk: Rc<Vec<u8>>,
}
impl TimestampedChunk {
fn new(chunk: Vec<u8>) -> Self {
let stamp = Instant::now();
let chunk = Rc::new(chunk);
Self { stamp, chunk }
}
}
fn min_time(a: &TimestampedChunk, b: &TimestampedChunk) -> Ordering {
b.stamp.cmp(&a.stamp)
}
type ChunkOrder = fn(&TimestampedChunk, &TimestampedChunk) -> Ordering;
type ChunkComparator = FnComparator<ChunkOrder>;
struct ChunkCache {
cache: BinaryHeap<ObjectId, TimestampedChunk, ChunkComparator>,
space_used: usize,
}
impl ChunkCache {
fn new() -> Self {
let cache: BinaryHeap<ObjectId, TimestampedChunk, ChunkComparator> =
BinaryHeap::new_by(min_time);
let space_used = 0;
Self { cache, space_used }
}
fn get(&mut self, id: &ObjectId) -> Option<Rc<Vec<u8>>> {
if let Some(mut tb) = self.cache.get_mut(id) {
tb.stamp = Instant::now();
Some(tb.chunk.clone())
} else {
None
}
}
fn insert(&mut self, id: &ObjectId, chunk: &[u8]) {
if let Some(mut tb) = self.cache.get_mut(id) {
tb.stamp = Instant::now();
return;
}
let new_entry = TimestampedChunk::new(Vec::from(chunk));
self.space_used += chunk.len();
self.cache.push(*id, new_entry);
}
fn shrink_to(&mut self, new_size: usize) {
let mut num_evicted: usize = 0;
while !self.cache.is_empty() && self.space_used > new_size {
let popped = self.cache.pop().unwrap();
assert!(self.space_used >= popped.chunk.len());
self.space_used -= popped.chunk.len();
num_evicted += 1;
}
trace!(
"Evicted {num_evicted} chunks from cache, {} ({}) left",
self.cache.len(),
file_util::nice_size(self.space_used as u64)
)
}
}
pub struct ChunkReader<'a> {
cached_backend: &'a backend::CachedBackend,
index: &'a index::Index,
blob_map: &'a index::BlobMap,
cache: ChunkCache,
read_packs: FxHashSet<ObjectId>,
biggest_pack_size: usize,
}
impl<'a> ChunkReader<'a> {
pub fn new(
cached_backend: &'a backend::CachedBackend,
index: &'a index::Index,
blob_map: &'a index::BlobMap,
) -> Self {
let cache = ChunkCache::new();
Self {
cached_backend,
index,
blob_map,
cache,
read_packs: FxHashSet::default(),
biggest_pack_size: 0,
}
}
pub fn blob_size(&mut self, id: &ObjectId) -> Result<u32> {
let pack_id: ObjectId = *self
.blob_map
.get(id)
.ok_or_else(|| anyhow!("Chunk {id} not found in any pack"))?;
let manifest = self
.index
.packs
.get(&pack_id)
.ok_or_else(|| anyhow!("Couldn't find pack {pack_id} manifest in the index"))?;
let entry = manifest
.iter()
.find(|e| e.id == *id)
.ok_or_else(|| anyhow!("Chunk {id} isn't in pack {pack_id} like the index said"))?;
Ok(entry.length)
}
pub fn read_blob(&mut self, id: &ObjectId) -> Result<Rc<Vec<u8>>> {
if let Some(b) = self.cache.get(id) {
counters::bump(counters::Op::ChunkCacheHit);
return Ok(b);
}
counters::bump(counters::Op::ChunkCacheMiss);
let pack_id: ObjectId = *self
.blob_map
.get(id)
.ok_or_else(|| anyhow!("Chunk {id} not found in any pack"))?;
trace!("Chunk cache miss; reading pack {pack_id}");
let loaded_size = self
.load_pack(pack_id)
.with_context(|| format!("Couldn't load pack {pack_id}"))?;
self.biggest_pack_size = self.biggest_pack_size.max(loaded_size);
self.cache.shrink_to(self.biggest_pack_size * 2);
if !self.read_packs.insert(pack_id) {
counters::bump(counters::Op::PackRereads);
}
Ok(self.cache.get(id).unwrap())
}
fn load_pack(&mut self, id: ObjectId) -> Result<usize> {
let mut file = self.cached_backend.read_pack(&id)?;
pack::check_magic(&mut file)?;
let manifest = self
.index
.packs
.get(&id)
.ok_or_else(|| anyhow!("Couldn't find pack {} manifest in the index", id))?;
let mut blob_stream =
ZstdDecoder::new(file).context("Decompression of blob stream failed")?;
let mut bytes_read = 0;
let mut blob_buf = vec![];
for entry in manifest {
if entry.blob_type != blob::Type::Chunk {
warn!(
"Tree {} found in pack where we expected only chunks",
entry.id
);
continue;
}
blob_buf.clear();
blob_buf.reserve(entry.length as usize);
let mut hashing_decoder =
HashingReader::new((&mut blob_stream).take(entry.length as u64));
hashing_decoder.read_to_end(&mut blob_buf)?;
let (hash, _) = hashing_decoder.finalize();
ensure!(
entry.id == hash,
"Calculated hash of blob ({}) doesn't match ID {}",
hash,
entry.id
);
self.cache.insert(&entry.id, &blob_buf);
bytes_read += entry.length as usize;
}
Ok(bytes_read)
}
}
#[cfg(test)]
mod test {
use super::*;
use std::collections::BTreeSet;
use std::sync::mpsc::sync_channel;
use crate::blob;
use crate::chunk;
#[test]
fn smoke() -> Result<()> {
let backend = backend::in_memory();
let mut chunks = Vec::new();
chunks.extend(chunk::chunk_file("tests/references/sr71.txt")?);
chunks.extend(chunk::chunk_file("tests/references/index.stability")?);
chunks.extend(chunk::chunk_file("tests/references/pack.stability")?);
chunks.extend(chunk::chunk_file("tests/references/README.md")?);
assert_eq!(chunks.len(), 4);
let (chunk_tx, chunk_rx) = sync_channel(0);
let (pack_tx, pack_rx) = sync_channel(0);
let (upload_tx, upload_rx) = sync_channel(0);
let unused_byte_count = std::sync::atomic::AtomicU64::new(0);
let chunk_packer = std::thread::spawn(move || {
pack::pack(
pack::DEFAULT_PACK_SIZE,
chunk_rx,
pack_tx,
upload_tx,
&unused_byte_count,
&unused_byte_count,
)
});
let uploader = std::thread::spawn(move || -> Result<backend::CachedBackend> {
let mut num_packs = 0;
while let Ok((path, fh)) = upload_rx.recv() {
backend.write(&path, fh)?;
num_packs += 1;
}
assert_eq!(num_packs, 1);
Ok(backend)
});
for chunk in &chunks {
chunk_tx.send(chunk.clone())?
}
drop(chunk_tx);
let index = {
let metadata = pack_rx.recv()?;
let supersedes = BTreeSet::new();
let mut packs = index::PackMap::new();
packs.insert(metadata.id, metadata.manifest);
index::Index { packs, supersedes }
};
let blob_map = index::blob_to_pack_map(&index)?;
chunk_packer.join().unwrap()?;
let backend = uploader.join().unwrap()?;
let mut reader = ChunkReader::new(&backend, &index, &blob_map);
readback(&chunks[0], &mut reader)?;
readback(&chunks[0], &mut reader)?;
readback(&chunks[2], &mut reader)?;
readback(&chunks[1], &mut reader)?;
readback(&chunks[3], &mut reader)?;
Ok(())
}
fn readback(blob: &blob::Blob, reader: &mut ChunkReader) -> Result<()> {
let read_blob = reader
.read_blob(&blob.id)
.with_context(|| format!("Couldn't read {}", blob.id))?;
assert_eq!(*read_blob, blob.bytes());
Ok(())
}
}