use std;
use std::io::{self, ErrorKind};
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use crate::constants::{VERSION_CHUNK_FILE_NAME, VERSION_CHUNKS_DIR, VERSION_FILE_NAME};
use crate::error::OxenError;
use crate::model::MerkleHash;
use crate::storage::version_store::{VersionLocation, VersionStore};
use crate::util::fs::AtomicFile;
use crate::util::{concurrency, hasher};
use crate::view::versions::CleanCorruptedVersionsResult;
use async_trait::async_trait;
use bytes::Bytes;
use log;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tokio::fs::{self, File, metadata};
use tokio::io::AsyncReadExt;
use tokio::io::BufReader;
use tokio::sync::Semaphore;
use tokio::task::spawn_blocking;
use tokio_stream::Stream;
use tokio_util::io::ReaderStream;
#[derive(Debug)]
pub struct LocalVersionStore {
root_path: PathBuf,
}
impl LocalVersionStore {
pub fn new(root_path: impl AsRef<Path>) -> Self {
Self {
root_path: root_path.as_ref().to_path_buf(),
}
}
fn version_dir(&self, hash: &str) -> PathBuf {
let topdir = &hash[..2];
let subdir = &hash[2..];
self.root_path.join(topdir).join(subdir)
}
fn version_path(&self, hash: &str) -> PathBuf {
self.version_dir(hash).join(VERSION_FILE_NAME)
}
fn version_chunks_dir(&self, hash: &str) -> PathBuf {
self.version_dir(hash).join(VERSION_CHUNKS_DIR)
}
fn version_chunk_dir(&self, hash: &str, offset: u64) -> PathBuf {
self.version_chunks_dir(hash).join(offset.to_string())
}
fn version_chunk_file(&self, hash: &str, offset: u64) -> PathBuf {
self.version_chunk_dir(hash, offset)
.join(VERSION_CHUNK_FILE_NAME)
}
}
#[async_trait]
impl VersionStore for LocalVersionStore {
async fn init(&self) -> Result<(), OxenError> {
if !self.root_path.exists() {
fs::create_dir_all(&self.root_path).await?;
}
Ok(())
}
async fn store_version_from_reader(
&self,
hash: &str,
mut reader: Box<dyn tokio::io::AsyncRead + Send + Unpin>,
_size: u64,
) -> Result<(), OxenError> {
let version_path = self.version_path(hash);
if !version_path.exists() {
let expected_hash: MerkleHash = hash.parse()?;
AtomicFile::new(&version_path)
.with_hash(expected_hash)
.stream_async(&mut *reader)
.await?;
}
Ok(())
}
async fn store_version(&self, hash: &str, data: Bytes) -> Result<(), OxenError> {
let version_path = self.version_path(hash);
if version_path.exists() {
return Ok(());
}
let expected_hash: MerkleHash = hash.parse()?;
spawn_blocking(move || {
AtomicFile::new(&version_path)
.with_hash(expected_hash)
.write(&data)
})
.await??;
Ok(())
}
async fn store_version_derived(
&self,
orig_hash: &str,
derived_filename: &str,
derived_data: Bytes,
) -> Result<(), OxenError> {
let path = self.version_dir(orig_hash).join(derived_filename);
let path_for_log = path.clone();
spawn_blocking(move || AtomicFile::new(&path).write(&derived_data)).await??;
log::debug!("Saved derived version file {path_for_log:?}");
Ok(())
}
async fn get_version_size(&self, hash: &str) -> Result<u64, OxenError> {
let path = self.version_path(hash);
let metadata = fs::metadata(&path).await?;
Ok(metadata.len())
}
async fn get_version(&self, hash: &str) -> Result<Vec<u8>, OxenError> {
let path = self.version_path(hash);
let data = fs::read(&path).await?;
Ok(data)
}
async fn get_version_derived_size(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<u64, OxenError> {
let path = self.version_dir(orig_hash).join(derived_filename);
let metadata = fs::metadata(&path).await?;
Ok(metadata.len())
}
async fn get_version_stream(
&self,
hash: &str,
) -> Result<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Unpin>, OxenError>
{
let path = self.version_path(hash);
let file = File::open(&path).await?;
let reader = BufReader::new(file);
let stream = ReaderStream::new(reader);
Ok(Box::new(stream))
}
async fn get_version_derived_stream(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Unpin>, OxenError>
{
let path = self.version_dir(orig_hash).join(derived_filename);
let file = File::open(&path).await?;
let reader = BufReader::new(file);
let stream = ReaderStream::new(reader);
Ok(Box::new(stream))
}
async fn derived_version_exists(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<bool, OxenError> {
let derived_path = self.version_dir(orig_hash).join(derived_filename);
match metadata(derived_path).await {
Ok(meta) => Ok(meta.is_file()),
Err(err) => match err.kind() {
ErrorKind::NotFound => Ok(false),
_ => Err(err)?,
},
}
}
async fn version_location(&self, hash: &str) -> Result<VersionLocation, OxenError> {
Ok(VersionLocation::Local(self.version_path(hash)))
}
async fn copy_version_to_path(
&self,
hash: &str,
dest_path: &Path,
mtime: SystemTime,
) -> Result<(), OxenError> {
let version_path = self.version_path(hash);
log::debug!("copying version path: {version_path:?} to {dest_path:?}");
match fs::try_exists(&version_path).await? {
true => {}
false => {
return Err(OxenError::VersionStoreDataMissing {
hash: hash.to_string(),
target_path: dest_path.to_path_buf().into(),
});
}
}
let dest_path = dest_path.to_path_buf();
spawn_blocking(move || {
AtomicFile::new(&dest_path)
.with_mtime(mtime)
.copy_from(&version_path)
})
.await??;
Ok(())
}
async fn version_exists(&self, hash: &str) -> Result<bool, OxenError> {
Ok(self.version_path(hash).exists())
}
async fn delete_version(&self, hash: &str) -> Result<(), OxenError> {
let version_dir = self.version_dir(hash);
if version_dir.exists() {
fs::remove_dir_all(&version_dir).await?;
}
Ok(())
}
async fn list_versions(&self) -> Result<Vec<String>, OxenError> {
let mut versions = Vec::new();
let mut top_entries = fs::read_dir(&self.root_path).await?;
while let Some(top_entry) = top_entries.next_entry().await? {
let file_type = top_entry.file_type().await?;
if !file_type.is_dir() {
continue;
}
let top_name = top_entry.file_name();
let mut sub_entries = fs::read_dir(top_entry.path()).await?;
while let Some(sub_entry) = sub_entries.next_entry().await? {
let file_type = sub_entry.file_type().await?;
if !file_type.is_dir() {
continue;
}
let sub_name = sub_entry.file_name();
let hash = format!(
"{}{}",
top_name.to_string_lossy(),
sub_name.to_string_lossy()
);
versions.push(hash);
}
}
versions.sort();
Ok(versions)
}
async fn store_version_chunk(
&self,
hash: &str,
offset: u64,
data: Bytes,
) -> Result<(), OxenError> {
let chunk_path = self.version_chunk_file(hash, offset);
if chunk_path.exists() {
return Ok(());
}
spawn_blocking(move || AtomicFile::new(&chunk_path).write(&data)).await??;
Ok(())
}
async fn get_version_chunk(
&self,
hash: &str,
offset: u64,
size: u64,
) -> Result<Vec<u8>, OxenError> {
let version_file_path = self.version_path(hash);
let mut file = File::open(&version_file_path).await?;
let metadata = file.metadata().await?;
let file_len = metadata.len();
if offset >= file_len || offset + size > file_len {
return Err(OxenError::IO(io::Error::new(
io::ErrorKind::UnexpectedEof,
"beyond end of file",
)));
}
let read_len = std::cmp::min(size, file_len - offset);
if read_len > usize::MAX as u64 {
return Err(OxenError::basic_str("requested chunk too large"));
}
use tokio::io::{AsyncSeekExt, SeekFrom};
file.seek(SeekFrom::Start(offset)).await?;
let mut buffer = vec![0u8; read_len as usize];
file.read_exact(&mut buffer).await?;
Ok(buffer)
}
async fn list_version_chunks(&self, hash: &str) -> Result<Vec<u64>, OxenError> {
let chunk_dir = self.version_chunks_dir(hash);
let mut chunks = Vec::new();
let mut entries = fs::read_dir(&chunk_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let file_type = entry.file_type().await?;
if file_type.is_dir()
&& let Ok(chunk_offset) = entry.file_name().to_string_lossy().parse::<u64>()
{
chunks.push(chunk_offset);
}
}
chunks.sort();
Ok(chunks)
}
async fn combine_version_chunks(&self, hash: &str) -> Result<(), OxenError> {
let version_path = self.version_path(hash);
let chunks_dir = self.version_chunks_dir(hash);
let chunk_offsets = self.list_version_chunks(hash).await?;
let expected_hash: MerkleHash = hash.parse()?;
log::debug!(
"combine_version_chunks found {} chunks",
chunk_offsets.len()
);
let chunk_paths: Vec<PathBuf> = chunk_offsets
.iter()
.map(|offset| self.version_chunk_file(hash, *offset))
.collect();
spawn_blocking(move || -> Result<(), OxenError> {
let mut combined: Box<dyn std::io::Read> = Box::new(std::io::empty());
for chunk_path in &chunk_paths {
let chunk_file = std::fs::File::open(chunk_path)?;
combined = Box::new(std::io::Read::chain(combined, chunk_file));
}
AtomicFile::new(&version_path)
.with_hash(expected_hash)
.stream(&mut combined)?;
if chunks_dir.exists() {
std::fs::remove_dir_all(&chunks_dir)?;
}
Ok(())
})
.await??;
Ok(())
}
async fn clean_corrupted_versions(
&self,
dry_run: bool,
) -> Result<CleanCorruptedVersionsResult, OxenError> {
#[derive(Default)]
struct Stats {
scanned_objects: AtomicUsize,
corrupted_objects: AtomicUsize,
io_errors: AtomicUsize,
deleted_objects: AtomicUsize,
}
impl Stats {
fn incr_scanned(&self) {
self.scanned_objects.fetch_add(1, Ordering::Relaxed);
}
fn incr_corrupted(&self) {
self.corrupted_objects.fetch_add(1, Ordering::Relaxed);
}
fn incr_io_error(&self) {
self.io_errors.fetch_add(1, Ordering::Relaxed);
}
fn incr_deleted(&self) {
self.deleted_objects.fetch_add(1, Ordering::Relaxed);
}
fn snapshot(&self) -> (usize, usize, usize, usize) {
(
self.scanned_objects.load(Ordering::Relaxed),
self.corrupted_objects.load(Ordering::Relaxed),
self.io_errors.load(Ordering::Relaxed),
self.deleted_objects.load(Ordering::Relaxed),
)
}
}
let start = std::time::Instant::now();
let stats = Arc::new(Stats::default());
let concurrency = concurrency::default_num_threads();
let semaphore = Arc::new(Semaphore::new(concurrency));
let mut prefix_rd = fs::read_dir(&self.root_path).await?;
let mut prefix_paths: Vec<PathBuf> = Vec::new();
while let Some(entry) = prefix_rd.next_entry().await? {
match entry.file_type().await {
Ok(ft) if ft.is_dir() => {
prefix_paths.push(entry.path());
}
_ => {
stats.incr_io_error();
}
}
}
let mut handles = Vec::with_capacity(prefix_paths.len());
for prefix_path in prefix_paths {
let semaphore_cl = semaphore.clone();
let stats_cl = stats.clone();
let handle = tokio::spawn(async move {
let mut suffix_rd = match fs::read_dir(&prefix_path).await {
Ok(rd) => rd,
Err(_) => {
stats_cl.incr_io_error();
return;
}
};
while let Ok(Some(entry)) = suffix_rd.next_entry().await {
let file_type = match entry.file_type().await {
Ok(ft) => ft,
Err(_) => {
stats_cl.incr_io_error();
continue;
}
};
if !file_type.is_dir() {
continue;
}
let suffix_path = entry.path();
let prefix_name = match prefix_path.file_name().and_then(|s| s.to_str()) {
Some(p) => p.to_string(),
None => {
stats_cl.incr_io_error();
continue;
}
};
let suffix_name = match suffix_path.file_name().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => {
stats_cl.incr_io_error();
continue;
}
};
let expected_hash = format!("{prefix_name}{suffix_name}");
let permit = semaphore_cl.clone().acquire_owned().await.unwrap();
{
let data_path = suffix_path.join("data");
let data = match fs::read(&data_path).await {
Ok(b) => b,
Err(_) => {
stats_cl.incr_io_error();
if !dry_run && fs::remove_dir_all(&suffix_path).await.is_ok() {
stats_cl.incr_deleted();
}
drop(permit);
continue;
}
};
stats_cl.incr_scanned();
let actual_hash =
match spawn_blocking(move || hasher::hash_buffer(&data)).await {
Ok(h) => h,
Err(_) => {
log::debug!(
"Failed to compute hash for version file {expected_hash}"
);
stats_cl.incr_io_error();
if !dry_run && fs::remove_dir_all(&suffix_path).await.is_ok() {
stats_cl.incr_deleted();
}
drop(permit);
continue;
}
};
if actual_hash != expected_hash {
log::debug!("version file {actual_hash} is corrupted!");
stats_cl.incr_corrupted();
if !dry_run {
match fs::remove_dir_all(&suffix_path).await {
Ok(_) => {
stats_cl.incr_deleted();
log::debug!("Corrupted version file {actual_hash} deleted");
}
Err(_) => {
stats_cl.incr_io_error();
log::debug!(
"Failed to delete corrupted version file {actual_hash}"
);
}
}
}
}
drop(permit);
}
}
});
handles.push(handle);
}
for h in handles {
let _ = h.await;
}
let (scanned, corrupted, io_errors, deleted) = stats.snapshot();
let elapsed = std::time::Duration::from_millis(start.elapsed().as_millis() as u64);
let result = CleanCorruptedVersionsResult {
scanned: scanned as u64,
corrupted: corrupted as u64,
cleaned: deleted as u64,
errors: io_errors as u64,
elapsed,
};
Ok(result)
}
fn storage_kind(&self) -> crate::storage::StorageKind {
crate::storage::StorageKind::Local
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tempfile::TempDir;
async fn setup() -> (TempDir, LocalVersionStore) {
let temp_dir = TempDir::new().unwrap();
let store = LocalVersionStore::new(temp_dir.path());
store.init().await.unwrap();
(temp_dir, store)
}
#[tokio::test]
async fn test_init() {
let (_temp_dir, store) = setup().await;
assert!(store.root_path.exists());
assert!(store.root_path.is_dir());
}
#[tokio::test]
async fn test_store_and_get_version() {
let (_temp_dir, store) = setup().await;
let data = b"test data";
let hash = hasher::hash_buffer(data);
store
.store_version(&hash, Bytes::from_static(data))
.await
.unwrap();
let version_path = store.version_path(&hash);
assert!(version_path.exists());
assert_eq!(version_path.parent().unwrap(), store.version_dir(&hash));
let retrieved = store.get_version(&hash).await.unwrap();
assert_eq!(retrieved, data);
}
#[tokio::test]
async fn test_version_location_returns_local_path() {
let (_temp_dir, store) = setup().await;
let hash = "abcdef1234567890";
let location = store.version_location(hash).await.unwrap();
match location {
VersionLocation::Local(path) => {
assert_eq!(path, store.version_path(hash));
}
other => panic!("expected Local variant, got {other:?}"),
}
}
#[tokio::test]
async fn test_store_from_reader() {
let (_temp_dir, store) = setup().await;
let data = b"test data from reader";
let hash = hasher::hash_buffer(data);
let cursor = Cursor::new(data.to_vec());
store
.store_version_from_reader(&hash, Box::new(cursor), data.len() as u64)
.await
.unwrap();
let version_path = store.version_path(&hash);
assert!(version_path.exists());
let retrieved = store.get_version(&hash).await.unwrap();
assert_eq!(retrieved, data);
}
#[tokio::test]
async fn test_version_exists() {
let (_temp_dir, store) = setup().await;
let data = b"test data";
let hash = hasher::hash_buffer(data);
assert!(!store.version_exists(&hash).await.unwrap());
store
.store_version(&hash, Bytes::from_static(data))
.await
.unwrap();
assert!(store.version_exists(&hash).await.unwrap());
}
#[tokio::test]
async fn test_find_missing_versions_returns_only_absent_hashes() {
let (_temp_dir, store) = setup().await;
let present_data = b"x";
let also_present_data = b"y";
let present = hasher::hash_buffer(present_data);
let also_present = hasher::hash_buffer(also_present_data);
let absent = "cccc3333cccc3333";
store
.store_version(&present, Bytes::from_static(present_data))
.await
.unwrap();
store
.store_version(&also_present, Bytes::from_static(also_present_data))
.await
.unwrap();
let missing = store
.find_missing_versions(&[present.clone(), absent.to_string(), also_present.clone()])
.await
.unwrap();
assert_eq!(missing, vec![absent.to_string()]);
}
#[tokio::test]
async fn test_find_missing_versions_empty_input_returns_empty() {
let (_temp_dir, store) = setup().await;
let missing = store.find_missing_versions(&[]).await.unwrap();
assert!(missing.is_empty());
}
#[tokio::test]
async fn test_delete_version() {
let (_temp_dir, store) = setup().await;
let data = b"test data";
let hash = hasher::hash_buffer(data);
store
.store_version(&hash, Bytes::from_static(data))
.await
.unwrap();
assert!(store.version_exists(&hash).await.unwrap());
store.delete_version(&hash).await.unwrap();
assert!(!store.version_exists(&hash).await.unwrap());
assert!(!store.version_dir(&hash).exists());
}
#[tokio::test]
async fn test_list_versions() {
let (_temp_dir, store) = setup().await;
let payloads: [&[u8]; 3] = [b"alpha", b"bravo", b"charlie"];
let mut hashes: Vec<String> = payloads.iter().map(|d| hasher::hash_buffer(d)).collect();
for (payload, hash) in payloads.iter().zip(&hashes) {
store
.store_version(hash, Bytes::from_static(payload))
.await
.unwrap();
}
hashes.sort();
let versions = store.list_versions().await.unwrap();
assert_eq!(versions, hashes);
}
#[tokio::test]
async fn test_get_nonexistent_version() {
let (_temp_dir, store) = setup().await;
let hash = "nonexistent";
match store.get_version(hash).await {
Ok(_) => panic!("Expected error when getting non-existent version"),
Err(OxenError::IO(e)) => {
assert_eq!(e.kind(), io::ErrorKind::NotFound);
}
Err(e) => {
panic!("Unexpected error when getting non-existent version: {e:?}");
}
}
}
#[tokio::test]
async fn test_delete_nonexistent_version() {
let (_temp_dir, store) = setup().await;
let hash = "nonexistent";
store.delete_version(hash).await.unwrap();
}
#[tokio::test]
async fn test_store_and_get_version_chunk() {
let (_temp_dir, store) = setup().await;
let data = b"test chunk data";
let hash = hasher::hash_buffer(data);
let offset = 0;
let size = data.len() as u64;
store
.store_version(&hash, Bytes::from_static(data))
.await
.unwrap();
let file_path = store.version_path(&hash);
assert!(file_path.exists());
assert_eq!(file_path.parent().unwrap(), store.version_dir(&hash));
let retrieved = store.get_version_chunk(&hash, offset, size).await.unwrap();
assert_eq!(retrieved, data);
}
#[tokio::test]
async fn test_get_nonexistent_chunk() {
let (_temp_dir, store) = setup().await;
let hash = "abcdef1234567890";
let offset = 0;
let size = 100;
match store.get_version_chunk(hash, offset, size).await {
Ok(_) => panic!("Expected error when getting non-existent chunk"),
Err(OxenError::IO(e)) => {
assert_eq!(e.kind(), io::ErrorKind::NotFound);
}
Err(e) => {
panic!("Unexpected error when getting non-existent chunk: {e:?}");
}
}
}
#[tokio::test]
async fn test_store_and_get_version_derived() {
let (_temp_dir, store) = setup().await;
let orig_hash = "aaaaaaaaaaaaaaaa";
let derived_filename = "100x200.jpg";
let derived_data = b"fake resized image bytes for hash aaaaaaaaaaaaaaaa";
store
.store_version_derived(
orig_hash,
derived_filename,
Bytes::from_static(derived_data),
)
.await
.unwrap();
use futures::StreamExt;
let mut stream = store
.get_version_derived_stream(orig_hash, derived_filename)
.await
.unwrap();
let mut collected = Vec::new();
while let Some(chunk) = stream.next().await {
collected.extend_from_slice(&chunk.unwrap());
}
assert_eq!(collected, derived_data);
assert_eq!(
store
.get_version_derived_size(orig_hash, derived_filename)
.await
.unwrap(),
derived_data.len() as u64
);
}
#[tokio::test]
async fn test_derived_version_exists() {
let (_temp_dir, store) = setup().await;
let orig_hash = "bbbbbbbbbbbbbbbb";
let derived_filename = "300x400.jpg";
let derived_data = b"fake resized image bytes for hash bbbbbbbbbbbbbbbb";
assert!(
!store
.derived_version_exists(orig_hash, derived_filename)
.await
.unwrap()
);
store
.store_version_derived(
orig_hash,
derived_filename,
Bytes::from_static(derived_data),
)
.await
.unwrap();
assert!(
store
.derived_version_exists(orig_hash, derived_filename)
.await
.unwrap()
);
}
#[tokio::test]
async fn test_copy_version_to_path_with_mtime_stamps_atomically() {
let (_temp_dir, store) = setup().await;
let data = b"working-tree publish content";
let hash = hasher::hash_buffer(data);
store
.store_version(&hash, Bytes::from_static(data))
.await
.unwrap();
let dest_dir = TempDir::new().unwrap();
let dest_path = dest_dir.path().join("subdir/output.bin");
let mtime = SystemTime::UNIX_EPOCH + std::time::Duration::new(1_700_000_000, 123_456_789);
store
.copy_version_to_path(&hash, &dest_path, mtime)
.await
.unwrap();
assert_eq!(std::fs::read(&dest_path).unwrap(), data);
let actual = std::fs::metadata(&dest_path).unwrap().modified().unwrap();
assert_eq!(actual, mtime);
}
#[tokio::test]
async fn test_copy_version_to_path_missing_version() {
let (_temp_dir, store) = setup().await;
let dest_dir = TempDir::new().unwrap();
let dest_path = dest_dir.path().join("out.bin");
let mtime = SystemTime::UNIX_EPOCH + std::time::Duration::new(1_700_000_000, 0);
let result = store
.copy_version_to_path("0000000000000000", &dest_path, mtime)
.await;
assert!(matches!(
result,
Err(OxenError::VersionStoreDataMissing { .. })
));
assert!(!dest_path.exists());
let names: Vec<_> = std::fs::read_dir(dest_dir.path())
.unwrap()
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert!(names.is_empty(), "leftover entries: {names:?}");
}
}