use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::{Path, PathBuf};
use rocksdb::DBWithThreadMode;
use rocksdb::SingleThreaded;
use crate::constants::CHUNKS_DIR;
use crate::constants::TREE_DIR;
use crate::core::db;
use crate::core::db::key_val::u128_kv_db;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::util;
pub const CHUNK_SIZE: usize = 16 * 1024;
const SHARD_CAPACITY: usize = 1000 * CHUNK_SIZE;
pub struct ChunkShardDB {
db: DBWithThreadMode<SingleThreaded>,
}
impl ChunkShardDB {
fn db_path(repo: &LocalRepository) -> PathBuf {
util::fs::oxen_hidden_dir(&repo.path)
.join(TREE_DIR)
.join(Path::new(CHUNKS_DIR))
}
pub fn new(repo: &LocalRepository) -> Result<Self, OxenError> {
let path = Self::db_path(repo);
let opts = db::key_val::opts::default();
let db = DBWithThreadMode::open(&opts, dunce::simplified(&path))?;
Ok(Self { db })
}
pub fn has_key(&self, hash: u128) -> bool {
u128_kv_db::has_key(&self.db, hash)
}
pub fn get(&self, hash: u128) -> Result<Option<u32>, OxenError> {
u128_kv_db::get(&self.db, hash)
}
pub fn put(&self, hash: u128, shard_idx: u32) -> Result<(), OxenError> {
let value = shard_idx.to_le_bytes();
u128_kv_db::put_buf(&self.db, hash, &value)?;
Ok(())
}
}
#[derive(Serialize, Deserialize)]
pub struct ChunkShardIndex {
pub hash_offsets: HashMap<u128, (u32, u32)>,
}
impl Default for ChunkShardIndex {
fn default() -> Self {
Self::new()
}
}
impl ChunkShardIndex {
pub fn new() -> Self {
Self {
hash_offsets: HashMap::new(),
}
}
}
pub struct ChunkShardFile {
pub path: PathBuf,
pub file: File,
pub index: ChunkShardIndex,
pub data_start: usize,
pub offset: usize,
pub data: Vec<u8>,
}
impl ChunkShardFile {
pub fn db_path(repo: &LocalRepository) -> PathBuf {
util::fs::oxen_hidden_dir(&repo.path)
.join(TREE_DIR)
.join("shards")
}
pub fn shard_path(repo: &LocalRepository, file_idx: u32) -> PathBuf {
let path = Self::db_path(repo);
path.join(format!("shard_{file_idx}"))
}
pub fn shard_idx(path: &Path) -> u32 {
let file_stem = path.file_stem().unwrap();
let file_stem_str = file_stem.to_str().unwrap();
let idx_str = file_stem_str.split('_').nth(1).unwrap();
idx_str.parse::<u32>().unwrap()
}
pub fn open(repo: &LocalRepository, file_idx: u32) -> Result<ChunkShardFile, OxenError> {
log::debug!("Opening shard file: {:?}", Self::shard_path(repo, file_idx));
let path = Self::shard_path(repo, file_idx);
let file = File::open(&path)?;
let shard_file = ChunkShardFile {
path,
file,
index: ChunkShardIndex::new(),
data_start: 0,
offset: 0,
data: Vec::new(),
};
Ok(shard_file)
}
pub fn create(repo: &LocalRepository, file_idx: u32) -> Result<ChunkShardFile, OxenError> {
log::debug!(
"Creating shard file: {:?}",
Self::shard_path(repo, file_idx)
);
let path = Self::shard_path(repo, file_idx);
let file = File::create(&path)?;
let index = ChunkShardIndex::new();
Ok(ChunkShardFile {
path,
file,
index,
data_start: 0,
offset: 0,
data: vec![0; SHARD_CAPACITY],
})
}
pub fn has_capacity(&self, buffer_len: usize) -> bool {
(self.offset + buffer_len) < SHARD_CAPACITY
}
pub fn add_buffer(&mut self, hash: u128, buffer: &[u8]) -> Result<(), OxenError> {
if !self.has_capacity(buffer.len()) {
return Err(OxenError::basic_str("Shard is full"));
}
self.index
.hash_offsets
.insert(hash, (self.offset as u32, buffer.len() as u32));
self.data[self.offset..self.offset + buffer.len()].copy_from_slice(buffer);
self.offset += buffer.len();
Ok(())
}
pub fn get_buffer(&mut self, hash: u128) -> Result<Vec<u8>, OxenError> {
let offset = self.index.hash_offsets[&hash];
let start = self.data_start + offset.0 as usize;
log::debug!(
"Reading chunk from shard: [{:?}] for hash: {} at start {} offset: {} size: {}",
self.path,
hash,
start,
offset.0,
offset.1
);
self.file.seek(SeekFrom::Start(start as u64))?;
let mut buffer = vec![0u8; offset.1 as usize];
let bytes_read = self.file.read(&mut buffer)?;
log::debug!("read {bytes_read} bytes");
Ok(buffer)
}
pub fn read_index(&mut self) -> Result<(), OxenError> {
let mut buffer = [0u8; 4]; let bytes_read = self.file.read(&mut buffer)?;
log::debug!("read {bytes_read} bytes");
let index_size = u32::from_le_bytes(buffer) as usize;
let mut index_bytes = vec![0u8; index_size];
let bytes_read = self.file.read(&mut index_bytes)?;
log::debug!("read {bytes_read} bytes");
self.index = bincode::deserialize(&index_bytes)?;
self.data_start = index_size + 8;
log::debug!(
"Read index of size {} with {:?} hashes",
bytesize::ByteSize::b(index_size as u64),
self.index.hash_offsets.len()
);
Ok(())
}
pub fn read_data(&mut self) -> Result<(), OxenError> {
let mut buffer = [0u8; 4]; let bytes_read = self.file.read(&mut buffer)?;
log::debug!("read {bytes_read} bytes");
self.offset = u32::from_le_bytes(buffer) as usize;
log::debug!("read data with {:?} bytes", self.offset);
let mut buffer = vec![0u8; self.offset];
let bytes_read = self.file.read(&mut buffer)?;
log::debug!("read {bytes_read} bytes");
self.data = vec![0u8; SHARD_CAPACITY];
self.data[..self.offset].copy_from_slice(&buffer);
Ok(())
}
pub fn save(&mut self) -> Result<(), OxenError> {
log::debug!("Saving shard file: {:?}", self.path);
self.file = File::create(&self.path)?;
let index_bytes = bincode::serialize(&self.index)?;
log::debug!("Saving shard index: {:?}", index_bytes.len());
self.file
.write_all(&(index_bytes.len() as u32).to_le_bytes())?;
self.file.write_all(&index_bytes)?;
self.file.write_all(&(self.offset as u32).to_le_bytes())?;
let data = &self.data[..self.offset];
log::debug!("Saving shard data: {:?}", data.len());
self.file.write_all(data)?;
self.file.sync_all()?;
log::debug!("Saved shard file: {:?}", self.path);
Ok(())
}
}
pub struct ChunkShardManager {
repo: LocalRepository,
db: ChunkShardDB,
current_idx: i32,
current_file: Option<ChunkShardFile>,
}
impl ChunkShardManager {
pub fn new(repo: &LocalRepository) -> Result<Self, OxenError> {
let chunk_db = ChunkShardDB::new(repo)?;
Ok(Self {
repo: repo.clone(),
current_idx: -1,
current_file: None,
db: chunk_db,
})
}
pub fn open_for_write(&mut self) -> Result<(), OxenError> {
log::debug!("Opening chunk shard manager");
let shard_dir = ChunkShardFile::db_path(&self.repo);
if !shard_dir.exists() {
util::fs::create_dir_all(&shard_dir)?;
}
let mut shard_paths: Vec<PathBuf> = std::fs::read_dir(&shard_dir)?
.map(|x| x.unwrap().path())
.collect::<Vec<PathBuf>>();
shard_paths.sort_by(|a, b| {
let a_idx = ChunkShardFile::shard_idx(a);
let b_idx = ChunkShardFile::shard_idx(b);
a_idx.cmp(&b_idx)
});
let mut current_idx = 0;
let mut current_file: Option<ChunkShardFile> = None;
for path in shard_paths {
log::debug!("Opening shard file: {path:?}");
let file_idx = ChunkShardFile::shard_idx(&path);
if let Ok(mut shard_file) = ChunkShardFile::open(&self.repo, file_idx) {
shard_file.read_index()?;
log::debug!("Opened shard file: {path:?}");
if shard_file.has_capacity(CHUNK_SIZE) {
log::debug!("Shard [{file_idx}] has capacity, using it");
shard_file.read_data()?;
current_idx = file_idx;
current_file = Some(shard_file);
}
}
}
if current_file.is_none() {
log::debug!(
"Creating new shard file: {:?}",
ChunkShardFile::shard_path(&self.repo, current_idx)
);
current_file = Some(ChunkShardFile::create(&self.repo, current_idx)?);
}
log::debug!("Current shard index: {current_idx:?}");
self.current_idx = current_idx as i32; self.current_file = current_file;
Ok(())
}
pub fn has_chunk(&self, hash: u128) -> bool {
self.db.has_key(hash)
}
pub fn read_chunk(&mut self, hash: u128) -> Result<Vec<u8>, OxenError> {
let shard_idx = self
.db
.get(hash)?
.ok_or(OxenError::basic_str("Chunk not found"))?;
log::debug!("Reading chunk from shard: [{shard_idx}] for hash: {hash}");
if shard_idx as i32 != self.current_idx {
self.current_file = Some(ChunkShardFile::open(&self.repo, shard_idx)?);
self.current_file.as_mut().unwrap().read_index()?;
self.current_idx = shard_idx as i32;
}
let buffer = self.current_file.as_mut().unwrap().get_buffer(hash)?;
Ok(buffer)
}
pub fn write_chunk(&mut self, hash: u128, chunk: &[u8]) -> Result<u32, OxenError> {
let Some(current_file) = self.current_file.as_mut() else {
return Err(OxenError::basic_str("Not open for writing"));
};
self.db.put(hash, self.current_idx as u32)?;
current_file.add_buffer(hash, chunk)?;
if !current_file.has_capacity(chunk.len()) {
log::debug!(
"Shard file is full with {} saving {}",
current_file.offset,
self.current_idx
);
current_file.save()?;
self.current_idx += 1;
log::debug!("Shard file is full, starting new one {}", self.current_idx);
self.current_file = Some(ChunkShardFile::create(&self.repo, self.current_idx as u32)?);
}
Ok(self.current_idx as u32)
}
pub fn save_all(&mut self) -> Result<(), OxenError> {
let Some(current_file) = self.current_file.as_mut() else {
return Err(OxenError::basic_str("Not open for writing"));
};
current_file.save()?;
Ok(())
}
}