use polars::io::mmap::MmapBytesReader;
use crate::core::v_latest::index::file_chunker::CHUNK_SIZE;
use crate::core::v_latest::index::file_chunker::ChunkShardManager;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::model::merkle_tree::node::FileNode;
use std::io::Read;
use std::io::Seek;
pub struct ChunkReader {
pub repo: LocalRepository,
node: FileNode,
offset: u64,
csm: ChunkShardManager,
}
impl ChunkReader {
pub fn new(repo: LocalRepository, node: FileNode) -> Result<Self, OxenError> {
let csm = ChunkShardManager::new(&repo)?;
Ok(Self {
repo,
node,
offset: 0,
csm,
})
}
}
impl Read for ChunkReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
log::debug!(
"--START-- read {} from chunked file at offset {} / {}",
buf.len(),
self.offset,
self.node.num_bytes()
);
if self.offset >= self.node.num_bytes() {
log::debug!(
"Reached end of file at offset: {} >= {}",
self.offset,
self.node.num_bytes()
);
self.offset = 0;
return Ok(0);
}
let mut chunk_index = self.offset / CHUNK_SIZE as u64;
let mut chunk_offset = self.offset % CHUNK_SIZE as u64;
log::debug!("Chunk index: {chunk_index:?} offset {chunk_offset:?}");
log::debug!("Chunk hashes len {:?}", self.node.chunk_hashes().len());
let mut total_read = 0;
while total_read < buf.len() as u64 && chunk_index < self.node.chunk_hashes().len() as u64 {
log::debug!("-start- read {:?}/{}", total_read, buf.len());
log::debug!(
"chunk_index {}/{} chunk_offset {:?}",
chunk_index,
self.node.chunk_hashes().len(),
chunk_offset
);
let chunk_hash = self.node.chunk_hashes()[chunk_index as usize];
let chunk_data = self.csm.read_chunk(chunk_hash).unwrap();
let chunk_data_len = chunk_data.len() as u64;
log::debug!("Chunk file size {chunk_data_len:?}");
let bytes_to_copy =
std::cmp::min(buf.len() as u64 - total_read, chunk_data_len - chunk_offset);
log::debug!("Bytes to copy {bytes_to_copy:?}");
if bytes_to_copy == 0 {
break;
}
buf[total_read as usize..(total_read + bytes_to_copy) as usize].copy_from_slice(
&chunk_data[chunk_offset as usize..(chunk_offset + bytes_to_copy) as usize],
);
total_read += bytes_to_copy;
chunk_offset += bytes_to_copy;
if chunk_offset >= CHUNK_SIZE as u64 {
chunk_offset = 0;
chunk_index += 1;
}
self.offset += bytes_to_copy;
log::debug!("Total read {:?}/{}", total_read, buf.len());
log::debug!("-end- Offset {:?} / {}", self.offset, self.node.num_bytes());
}
log::debug!("--END-- Total read {total_read:?}");
Ok(total_read as usize)
}
}
impl Seek for ChunkReader {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
log::debug!("Seek in chunked file {:?} / {}", pos, self.node.num_bytes());
self.offset = match pos {
std::io::SeekFrom::Start(offset) => offset,
std::io::SeekFrom::Current(offset) => self.offset + offset as u64,
std::io::SeekFrom::End(offset) => (self.node.num_bytes() as i64 + offset) as u64,
};
log::debug!("New offset {:?}", self.offset);
Ok(self.offset)
}
}
impl MmapBytesReader for ChunkReader {
}