use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;
use tokio_stream::Stream;
use crate::constants;
use crate::error::OxenError;
use crate::opts::StorageOpts;
use crate::storage::{LocalVersionStore, S3VersionStore};
use crate::util;
use crate::view::versions::CleanCorruptedVersionsResult;
pub enum LocalFilePath {
Stable(PathBuf),
Temp(async_tempfile::TempFile),
}
impl Deref for LocalFilePath {
type Target = Path;
fn deref(&self) -> &Path {
match self {
LocalFilePath::Stable(p) => p.as_path(),
LocalFilePath::Temp(t) => t.file_path(),
}
}
}
impl AsRef<Path> for LocalFilePath {
fn as_ref(&self) -> &Path {
self.deref()
}
}
impl std::fmt::Debug for LocalFilePath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LocalFilePath::Stable(p) => write!(f, "Stable({p:?})"),
LocalFilePath::Temp(t) => write!(f, "Temp({:?})", t.file_path()),
}
}
}
impl std::fmt::Display for LocalFilePath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.deref().display())
}
}
impl LocalFilePath {
pub fn to_pathbuf(&self) -> PathBuf {
self.deref().to_path_buf()
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StorageConfig {
#[serde(rename = "type")]
pub type_: String,
#[serde(default)]
pub settings: HashMap<String, String>,
}
#[async_trait]
pub trait VersionStore: Debug + Send + Sync + 'static {
async fn init(&self) -> Result<(), OxenError>;
async fn store_version_from_reader(
&self,
hash: &str,
reader: Box<dyn AsyncRead + Send + Unpin>,
size: u64,
) -> Result<(), OxenError>;
async fn store_version(&self, hash: &str, data: &[u8]) -> Result<(), OxenError>;
async fn store_version_chunk(
&self,
hash: &str,
offset: u64,
data: Bytes,
) -> Result<(), OxenError>;
async fn store_version_derived(
&self,
orig_hash: &str,
derived_filename: &str,
derived_data: &[u8],
) -> Result<(), OxenError>;
async fn get_version_chunk(
&self,
hash: &str,
offset: u64,
size: u64,
) -> Result<Vec<u8>, OxenError>;
async fn list_version_chunks(&self, hash: &str) -> Result<Vec<u64>, OxenError>;
async fn combine_version_chunks(&self, hash: &str) -> Result<(), OxenError>;
async fn get_version_size(&self, hash: &str) -> Result<u64, OxenError>;
async fn get_version(&self, hash: &str) -> Result<Vec<u8>, OxenError>;
async fn get_version_stream(
&self,
hash: &str,
) -> Result<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Unpin>, OxenError>;
async fn get_version_derived_size(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<u64, OxenError>;
async fn get_version_derived_stream(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Unpin>, OxenError>;
async fn derived_version_exists(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<bool, OxenError>;
async fn get_version_path(&self, hash: &str) -> Result<LocalFilePath, OxenError>;
async fn copy_version_to_path(&self, hash: &str, dest_path: &Path) -> Result<(), OxenError>;
async fn version_exists(&self, hash: &str) -> Result<bool, OxenError>;
async fn delete_version(&self, hash: &str) -> Result<(), OxenError>;
async fn list_versions(&self) -> Result<Vec<String>, OxenError>;
async fn clean_corrupted_versions(
&self,
dry_run: bool,
) -> Result<CleanCorruptedVersionsResult, OxenError>;
fn storage_type(&self) -> &str;
fn storage_settings(&self) -> HashMap<String, String>;
}
pub fn create_version_store(
repo_dir: &Path,
storage_opts: &StorageOpts,
) -> Result<Arc<dyn VersionStore>, OxenError> {
match storage_opts.type_.as_str() {
"local" => {
let Some(ref local_storage_opts) = storage_opts.local_storage_opts else {
return Err(OxenError::basic_str("local storage opts not found"));
};
let versions_dir = if let Some(path) = &local_storage_opts.path {
if path.starts_with(".oxen") {
repo_dir.join(path)
} else {
path.clone()
}
} else {
util::fs::oxen_hidden_dir(repo_dir)
.join(constants::VERSIONS_DIR)
.join(constants::FILES_DIR)
};
let store = LocalVersionStore::new(versions_dir);
Ok(Arc::new(store))
}
"s3" => {
let Some(ref s3_opts) = storage_opts.s3_opts else {
return Err(OxenError::basic_str("s3 storage opts not found"));
};
let bucket = s3_opts.bucket.clone();
let prefix = s3_opts.prefix.clone().unwrap_or("versions".to_string());
let store = S3VersionStore::new(bucket, prefix);
Ok(Arc::new(store))
}
_ => Err(OxenError::basic_str(format!(
"Unsupported async storage type: {}",
storage_opts.type_
))),
}
}