use std::sync::{Arc, mpsc};
use std::thread;
use anyhow::{Context, Result};
use camino::Utf8Path;
use fastcdc::v2020::{Chunk, FastCDC};
use ouroboros::self_referencing;
use crate::blob::{self, Blob};
use crate::file_util::{self, LoadedFile};
use crate::hashing::ObjectId;
pub fn chunk_file<P: AsRef<Utf8Path>>(path: P) -> Result<impl Iterator<Item = Blob>> {
let path: &Utf8Path = path.as_ref();
let file = file_util::read_file(path).with_context(|| format!("Couldn't read {path}"))?;
Ok(ChunkIterator::new(file))
}
fn new_cdc(src: &[u8]) -> FastCDC {
const MIN_SIZE: u32 = 1024 * 512;
const TARGET_SIZE: u32 = 1024 * 1024;
const MAX_SIZE: u32 = 1024 * 1024 * 8;
FastCDC::new(src, MIN_SIZE, TARGET_SIZE, MAX_SIZE)
}
enum ChunkIterator {
Simple(SmallFileChunker),
Threaded(ThreadedChunker),
}
impl ChunkIterator {
fn new(file: Arc<LoadedFile>) -> Self {
match *file {
LoadedFile::Buffered(_) => ChunkIterator::Simple(SmallFileChunker::from(file)),
LoadedFile::Mapped(_) => ChunkIterator::Threaded(ThreadedChunker::from(file)),
}
}
}
impl Iterator for ChunkIterator {
type Item = Blob;
fn next(&mut self) -> Option<Self::Item> {
match self {
ChunkIterator::Simple(s) => s.next(),
ChunkIterator::Threaded(ThreadedChunker(t)) => t.next(),
}
}
}
#[self_referencing]
struct SmallFileChunker {
file: Arc<LoadedFile>,
#[borrows(file)]
#[not_covariant]
chunker: FastCDC<'this>,
}
impl SmallFileChunker {
fn from(file: Arc<LoadedFile>) -> Self {
assert!(matches!(*file, LoadedFile::Buffered(_)));
SmallFileChunkerBuilder {
file,
chunker_builder: |f: &Arc<LoadedFile>| new_cdc(f.bytes()),
}
.build()
}
}
impl Iterator for SmallFileChunker {
type Item = Blob;
fn next(&mut self) -> Option<Self::Item> {
self.with_mut(|s| s.chunker.next().map(|c| chunk_to_blob(s.file.clone(), c)))
}
}
struct ThreadedChunker(mpsc::IntoIter<Blob>);
impl ThreadedChunker {
fn from(file: Arc<LoadedFile>) -> Self {
assert!(matches!(*file, LoadedFile::Mapped(_)));
let (cuts_tx, cuts_rx) = mpsc::sync_channel(128);
let (blobs_tx, blobs_rx) = mpsc::sync_channel(128);
let file2 = file.clone();
thread::spawn(move || {
for cut in new_cdc(file.bytes()) {
if cuts_tx.send(cut).is_err() {
break;
}
}
});
thread::spawn(move || {
while let Ok(cut) = cuts_rx.recv() {
if blobs_tx.send(chunk_to_blob(file2.clone(), cut)).is_err() {
break;
}
}
});
Self(blobs_rx.into_iter())
}
}
#[derive(Debug, Clone)]
pub struct FileSpan {
file: Arc<LoadedFile>,
start: usize,
end: usize,
}
impl AsRef<[u8]> for FileSpan {
fn as_ref(&self) -> &[u8] {
let bytes: &[u8] = self.file.bytes();
&bytes[self.start..self.end]
}
}
fn chunk_to_blob(file: Arc<LoadedFile>, chunk: Chunk) -> Blob {
let start = chunk.offset;
let end = chunk.offset + chunk.length;
let span = FileSpan { file, start, end };
let id = ObjectId::hash(span.as_ref());
Blob {
contents: blob::Contents::Span(span),
id,
kind: blob::Type::Chunk,
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn smoke() -> Result<()> {
let chunked: Vec<_> = chunk_file("tests/references/sr71.txt")?.collect();
assert_eq!(chunked.len(), 1);
let chunked = &chunked[0];
assert_eq!(chunked.bytes().len(), 6934);
assert_eq!(
format!("{}", chunked.id),
"3klf09rvhih97ev102hos4g0hq6cr2b0o74mvhthli7oq"
);
Ok(())
}
}