use crate::api::client;
use crate::api::client::internal_types::LocalOrBase;
use crate::constants::{chunk_size, max_retries};
use crate::core::progress::push_progress::PushProgress;
use crate::error::OxenError;
use crate::model::{Commit, LocalRepository, RemoteRepository};
use crate::opts::GlobOpts;
use crate::util::{self, concurrency};
use crate::view::{ErrorFileInfo, ErrorFilesResponse, FilePathsResponse, FileWithHash};
use crate::{api, repositories, view, view::workspaces::ValidateUploadFeasibilityRequest};
use bytesize::ByteSize;
use futures_util::StreamExt;
use glob_match::glob_match;
use parking_lot::Mutex;
use rand::{Rng, thread_rng};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio::time::{Duration, sleep};
use futures::stream;
use tokio_stream::wrappers::ReceiverStream;
use crate::util::hasher;
use flate2::Compression;
use flate2::write::GzEncoder;
const BASE_WAIT_TIME: usize = 300;
const MAX_WAIT_TIME: usize = 10_000;
const WORKSPACE_ADD_LIMIT: u64 = 100_000_000;
#[derive(Debug)]
pub struct UploadResult {
pub files_to_add: Vec<FileWithHash>,
pub err_files: Vec<ErrorFileInfo>,
}
pub type UploadFails = Vec<ErrorFileInfo>;
pub async fn add(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<str>,
paths: Vec<PathBuf>,
local_repo: &Option<LocalRepository>,
) -> Result<UploadFails, OxenError> {
add_with_opts(
remote_repo,
workspace_id,
directory,
paths,
local_repo,
false,
)
.await
}
pub async fn add_with_opts(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<str>,
paths: Vec<PathBuf>,
local_repo: &Option<LocalRepository>,
update_timestamp: bool,
) -> Result<UploadFails, OxenError> {
let workspace_id = workspace_id.as_ref();
let directory = directory.as_ref();
if paths.is_empty() {
return Ok(vec![]);
}
let glob_opts = GlobOpts {
paths,
staged_db: false,
merkle_tree: false,
working_dir: true,
walk_dirs: true,
};
let expanded_paths = util::glob::parse_glob_paths(&glob_opts, local_repo.as_ref())?;
let expanded_paths: Vec<PathBuf> = expanded_paths.iter().cloned().collect();
let n_expected_uploads = expanded_paths.len();
let upload_result = upload_multiple_files(
remote_repo,
workspace_id,
directory,
expanded_paths,
local_repo
.clone()
.map(|local| LocalOrBase::Local(local.clone()))
.as_ref(),
update_timestamp,
)
.await;
match upload_result {
Ok(failed_to_upload) => {
print_add_result(workspace_id, n_expected_uploads, &failed_to_upload);
Ok(failed_to_upload)
}
error => error,
}
}
fn print_add_result(workspace_id: &str, n_total: usize, failed_to_upload: &[ErrorFileInfo]) {
let n_fail = failed_to_upload.len();
if n_fail == 0 {
println!("🐂 oxen added {n_total} entries to workspace {workspace_id}");
} else {
let n_success = n_total - n_fail;
println!(
"🐂 oxen added {n_success} entries to workspace {workspace_id} but 😱 failed to upload {n_fail} entries",
);
}
}
pub struct AddResult {
pub added: Option<(Commit, Vec<PathBuf>)>,
pub not_in_base: Vec<PathBuf>,
pub not_file: Vec<PathBuf>,
}
#[allow(clippy::needless_range_loop)]
fn resolve_paths_in_place(base_dir: &Path, paths: &mut [PathBuf]) -> Result<(), OxenError> {
for i in 0..paths.len() {
if !paths[i].is_absolute() {
paths[i] = base_dir.join(&paths[i]);
}
paths[i] = std::path::absolute(&(paths[i]))?;
if !paths[i].is_file() {
return Err(OxenError::basic_str(format!(
"Cannot upload non-existent file: {}",
paths[i].display()
)));
} else if !paths[i].starts_with(base_dir) {
return Err(OxenError::basic_str(format!(
"Cannot upload path that doesn't exist in base directory ({}): {}",
base_dir.display(),
paths[i].display()
)));
}
}
Ok(())
}
pub async fn add_files(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
base_dir: impl AsRef<Path>,
paths: Vec<PathBuf>,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
let base_dir = std::path::absolute(base_dir)?;
if !base_dir.is_dir() {
return Err(OxenError::basic_str(format!(
"base_dir is not a directory: {}",
base_dir.display()
)));
}
if paths.is_empty() {
return Err(OxenError::basic_str("No paths to add!"));
}
let workspace_id = workspace_id.as_ref();
let paths: Vec<PathBuf> = {
let mut paths = paths;
resolve_paths_in_place(&base_dir, &mut paths)?;
paths
};
let base_dir_enum = LocalOrBase::Base(base_dir);
let n_expected_uploads = paths.len();
match upload_multiple_files(
remote_repo,
workspace_id,
"", paths,
Some(&base_dir_enum),
false,
)
.await
{
Ok(failed_to_upload) => {
print_add_result(workspace_id, n_expected_uploads, &failed_to_upload);
Ok(failed_to_upload)
}
error => error,
}
}
pub async fn add_bytes(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<str>,
path: PathBuf,
buf: &[u8],
) -> Result<(), OxenError> {
let workspace_id = workspace_id.as_ref();
let directory = directory.as_ref();
match upload_bytes_as_file(remote_repo, workspace_id, directory, &path, buf).await {
Ok(path) => {
println!("🐂 oxen added entry {path:?} to workspace {workspace_id}");
}
Err(e) => {
return Err(e);
}
}
Ok(())
}
pub async fn upload_single_file(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<Path>,
path: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let path = path.as_ref();
let Ok(metadata) = path.metadata() else {
return Err(OxenError::path_does_not_exist(path));
};
log::debug!("Uploading file with size: {}", metadata.len());
if metadata.len() > chunk_size() {
let directory = directory.as_ref();
match api::client::versions::parallel_large_file_upload(
remote_repo,
path,
Some(directory),
Some(workspace_id.as_ref().to_string()),
false,
None,
None,
)
.await
{
Ok(upload) => Ok(upload.local_path),
Err(err) => Err(err),
}
} else {
p_upload_single_file(remote_repo, workspace_id, directory, path).await
}
}
pub async fn upload_bytes_as_file(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<Path>,
path: impl AsRef<Path>,
buf: &[u8],
) -> Result<PathBuf, OxenError> {
p_upload_bytes_as_file(remote_repo, workspace_id, directory, path, buf).await
}
async fn upload_multiple_files(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<Path>,
paths: Vec<PathBuf>,
local_or_base: Option<&LocalOrBase>,
update_timestamp: bool,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
if paths.is_empty() {
return Ok(vec![]);
}
let workspace_id = workspace_id.as_ref();
let directory = directory.as_ref();
let large_file_threshold = chunk_size();
let mut large_files = Vec::new();
let mut large_files_size = 0;
let mut small_files = Vec::new();
let mut small_files_size = 0;
let mut failed_to_upload = vec![];
for path in paths {
let path = match local_or_base {
Some(LocalOrBase::Local(local_repository)) => {
let repo_path = &local_repository.path;
let relative_path = util::fs::path_relative_to_dir(path, repo_path)?;
repo_path.join(&relative_path)
}
Some(LocalOrBase::Base(_)) | None => path,
};
if !path.exists() {
log::debug!("Path does not exist: {path:?}");
return Err(OxenError::path_does_not_exist(path));
}
match path.metadata() {
Ok(metadata) => {
let file_size = metadata.len();
if file_size > large_file_threshold {
large_files.push((path, file_size));
large_files_size += file_size;
} else {
small_files.push((path, file_size));
small_files_size += file_size;
}
}
Err(err) => {
log::debug!("Failed to get metadata for file {path:?}: {err}");
return Err(OxenError::file_metadata_error(path, err));
}
}
}
let total_size = large_files_size + small_files_size;
validate_upload_feasibility(remote_repo, workspace_id, total_size).await?;
for (path, _) in large_files {
let dst_dir = match local_or_base {
Some(LocalOrBase::Base(base_dir)) => {
let rel = util::fs::path_relative_to_dir(&path, base_dir)?;
rel.parent().map(|p| p.to_path_buf()).unwrap_or_default()
}
Some(LocalOrBase::Local(_)) | None => directory.to_path_buf(),
};
let hash = util::hasher::hash_file_contents(&path).unwrap_or_default();
match api::client::versions::parallel_large_file_upload(
remote_repo,
&path,
Some(&dst_dir),
Some(workspace_id.to_string()),
update_timestamp,
None,
None,
)
.await
{
Ok(_) => log::debug!("Successfully uploaded large file: {path:?}"),
Err(err) => {
let msg = format!("Failed to upload large file {path:?}");
log::error!("{msg}: {err}");
failed_to_upload.push(ErrorFileInfo {
hash,
path: Some(path),
error: msg,
});
}
}
}
let err_files_small_upload = parallel_batched_small_file_upload(
remote_repo,
workspace_id,
directory,
small_files,
small_files_size,
local_or_base,
update_timestamp,
)
.await?;
failed_to_upload.extend(err_files_small_upload);
Ok(failed_to_upload)
}
pub(crate) async fn parallel_batched_small_file_upload(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<Path>,
small_files: Vec<(PathBuf, u64)>,
small_files_size: u64,
local_or_base: Option<&LocalOrBase>,
update_timestamp: bool,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
if small_files.is_empty() {
return Ok(vec![]);
}
let (base_or_repo_path, head_commit_local_repo_maybe, keep_relative_paths) = match local_or_base
{
Some(LocalOrBase::Local(local_repository)) => {
let head_commit_maybe = repositories::commits::head_commit_maybe(local_repository)?;
let head_commit_exists = head_commit_maybe.is_some();
(
local_repository.path.clone(),
head_commit_maybe.map(|head_commit| (head_commit, local_repository.clone())),
head_commit_exists,
)
}
Some(LocalOrBase::Base(base_dir)) => (base_dir.to_path_buf(), None, true),
None => (PathBuf::new(), None, false),
};
log::debug!(
"Uploading {} small files (total {} bytes)",
small_files.len(),
small_files_size
);
let workspace_id = workspace_id.as_ref().to_string();
let directory = directory.as_ref().to_str().unwrap_or_default().to_string();
type PieceOfWork = Vec<(PathBuf, u64)>;
type ProcessedBatch = (Vec<reqwest::multipart::Part>, Vec<FileWithHash>, u64);
let mut file_batches: Vec<PieceOfWork> = Vec::new();
let mut current_batch: PieceOfWork = Vec::new();
let mut current_batch_size = 0;
let mut total_size = 0;
for (idx, (path, file_size)) in small_files.iter().enumerate() {
current_batch.push((path.clone(), *file_size));
current_batch_size += file_size;
if current_batch_size > chunk_size() || idx >= small_files.len() - 1 {
file_batches.push(current_batch.clone());
current_batch.clear();
total_size += current_batch_size;
current_batch_size = 0;
}
}
let client = Arc::new(api::client::builder_for_remote_repo(remote_repo)?.build()?);
let err_files: Arc<Mutex<Vec<ErrorFileInfo>>> = Arc::new(Mutex::new(vec![]));
let errors = Arc::new(Mutex::new(Vec::new()));
let worker_count = concurrency::num_threads_for_items(file_batches.len());
let (tx, rx) = mpsc::channel(worker_count);
let progress = Arc::new(PushProgress::new_with_totals(
small_files.len() as u64,
total_size,
));
let producer_errors = Arc::clone(&errors);
let head_commit_local_repo_maybe_clone = head_commit_local_repo_maybe.clone();
let producer_handle = tokio::spawn(async move {
stream::iter(file_batches)
.for_each_concurrent(worker_count, {
let head_commit_local_repo_maybe_clone = head_commit_local_repo_maybe_clone.clone();
move |batch| {
let base_or_repo_path_clone = base_or_repo_path.clone();
let head_commit_local_repo_maybe_clone =
head_commit_local_repo_maybe_clone.clone();
let errors = Arc::clone(&producer_errors);
let tx_clone = tx.clone();
async move {
let base_or_repo_path_clone = base_or_repo_path_clone.clone();
let head_commit_local_repo_maybe_clone =
head_commit_local_repo_maybe_clone.clone();
let result: Result<(), OxenError> = async move {
let mut batch_size = 0;
let mut batch_parts = Vec::new();
let mut files_to_stage = Vec::new();
log::debug!(
"Starting file processing loop with {:?} files",
batch.len()
);
for (path, size) in batch {
let base_or_repo_path_clone = base_or_repo_path_clone.clone();
let head_commit_local_repo_maybe_clone =
head_commit_local_repo_maybe_clone.clone();
let file_data_maybe: Option<(
reqwest::multipart::Part,
String,
PathBuf,
u64,
)> = tokio::task::spawn_blocking(move || {
let relative_path = util::fs::path_relative_to_dir(
&path,
&base_or_repo_path_clone,
)?;
if !update_timestamp
&& let Some((ref head_commit, ref local_repository)) =
head_commit_local_repo_maybe_clone
&& let Some(file_node) =
repositories::tree::get_file_by_path(
local_repository,
head_commit,
&relative_path,
)?
&& !util::fs::is_modified_from_node(&path, &file_node)?
{
log::debug!("Skipping add on unmodified path {path:?}");
return Ok(None);
}
let staging_path = if keep_relative_paths {
relative_path
} else {
PathBuf::from(relative_path.file_name().unwrap())
};
let file = std::fs::read(&path).map_err(|e| {
OxenError::basic_str(format!(
"Failed to read file '{path:?}': {e}"
))
})?;
let hash = hasher::hash_buffer(&file);
let compressed_bytes: Vec<u8> = {
let mut encoder =
GzEncoder::new(Vec::new(), Compression::default());
std::io::copy(&mut file.as_slice(), &mut encoder).map_err(
|e| {
OxenError::basic_str(format!(
"Failed to copy file '{path:?}' to encoder: {e}"
))
},
)?;
match encoder.finish() {
Ok(bytes) => bytes,
Err(e) => {
return Err(OxenError::basic_str(format!(
"Failed to finish gzip for file {}: {}",
&hash, e
)));
}
}
};
let file_part =
reqwest::multipart::Part::bytes(compressed_bytes)
.file_name(hash.clone())
.mime_str("application/gzip")?;
Ok(Some((file_part, hash, staging_path, size)))
})
.await??;
let (file_part, file_hash, file_path, file_size) =
match file_data_maybe {
Some(data) => data,
None => continue,
};
batch_parts.push(file_part);
files_to_stage.push(FileWithHash {
hash: file_hash,
path: file_path,
});
batch_size += file_size;
}
let processed_batch: ProcessedBatch =
(batch_parts, files_to_stage, batch_size);
match tx_clone.send(processed_batch).await {
Ok(_) => Ok(()),
Err(e) => Err(OxenError::basic_str(format!("{e:?}"))),
}
}
.await;
if let Err(e) = result {
errors.lock().push(OxenError::basic_str(format!("{e:?}")));
}
}
}
})
.await;
});
let client_clone = client.clone();
let workspace_id_clone = workspace_id.clone();
let remote_repo_clone = remote_repo.clone();
let directory_clone = directory.clone();
let local_or_base_clone = local_or_base.cloned();
let consumer_err_files = Arc::clone(&err_files);
let consumer_errors = Arc::clone(&errors);
let progress_clone = Arc::clone(&progress);
let consumer_handle = tokio::spawn(async move {
let rx_stream = ReceiverStream::new(rx);
rx_stream
.for_each_concurrent(
worker_count,
|processed_batch| {
let client_clone = client_clone.clone();
let remote_repo_clone = remote_repo_clone.clone();
let workspace_id_clone = workspace_id_clone.clone();
let directory_str = directory_clone.clone();
let local_or_base_clone = local_or_base_clone.clone();
let err_files_clone = Arc::clone(&consumer_err_files);
let errors = Arc::clone(&consumer_errors);
let bar = Arc::clone(&progress_clone);
async move {
let result: Result<(), OxenError> = async move {
let (current_batch_parts, files_to_stage, current_batch_size) = processed_batch;
let num_entries = current_batch_parts.len();
let mut form = reqwest::multipart::Form::new();
for part in current_batch_parts {
form = form.part("file[]", part);
}
let mut files_to_retry = files_to_stage.clone();
match api::client::versions::workspace_multipart_batch_upload_parts_with_retry(
&remote_repo_clone,
Arc::clone(&client_clone),
form,
&mut files_to_retry,
local_or_base_clone.as_ref(),
)
.await
{
Ok(upload_err_files) => {
if !upload_err_files.is_empty() {
let mut err_files = err_files_clone.lock();
err_files.extend(upload_err_files.clone());
}
log::debug!(
"Version file upload successful with {:?} err files. Beginning staging for {:?} files",
upload_err_files.len(),
files_to_stage.len()
);
match stage_files_to_workspace_with_retry(
&remote_repo_clone,
client_clone,
&workspace_id_clone,
Arc::new(files_to_stage),
&directory_str,
upload_err_files,
update_timestamp,
)
.await
{
Ok(staging_err_files) => {
log::debug!("Successfully staged files to workspace with errs {:?}", staging_err_files.len());
bar.add_bytes(current_batch_size);
bar.add_files(num_entries as u64);
if !staging_err_files.is_empty() {
let mut err_files = err_files_clone.lock();
err_files.extend(staging_err_files.clone());
}
}
Err(e) => {
log::error!("failed to stage files to workspace: {e}");
return Err(OxenError::basic_str(format!(
"failed to stage to workspace: {e}"
)));
}
}
Ok(())
}
Err(e) => {
let mut err_files = err_files_clone.lock();
err_files.extend(
files_to_stage
.iter()
.map(|f| ErrorFileInfo {
hash: f.hash.clone(),
path: Some(f.path.clone()),
error: format!("{e:?}"),
})
.collect::<Vec<ErrorFileInfo>>()
);
log::error!("failed to upload version files to workspace: {e}");
Err(OxenError::basic_str(format!(
"failed to upload version files to workspace: {e}"
)))
}
}
}.await;
if let Err(e) = result {
errors.lock().push(OxenError::basic_str(format!("{e:?}")));
}
}
}
)
.await;
});
tokio::try_join!(producer_handle, consumer_handle)?;
let mutex = match Arc::try_unwrap(err_files) {
Ok(mutex) => mutex,
Err(e) => {
let err = format!("Couldn't acquire mutex guard for err_files: {e:?}");
log::error!("{err}");
return Err(OxenError::basic_str(&err));
}
};
let err_files = mutex.into_inner();
let operational_errors = match Arc::try_unwrap(errors) {
Ok(mutex) => mutex.into_inner(),
Err(e) => {
let err = format!("Couldn't acquire mutex guard for errors: {e:?}");
log::error!("{err}");
return Err(OxenError::basic_str(&err));
}
};
log::debug!("All upload tasks completed");
progress.finish();
if !operational_errors.is_empty() {
log::error!(
"Encountered {} fatal error(s) during upload",
operational_errors.len()
);
return Err(operational_errors.into_iter().next().unwrap());
}
if !err_files.is_empty() {
log::error!("Failed to upload {} files after retry", err_files.len());
Ok(err_files)
} else {
Ok(vec![])
}
}
pub async fn stage_files_to_workspace_with_retry(
remote_repo: &RemoteRepository,
client: Arc<reqwest::Client>,
workspace_id: impl AsRef<str>,
files_to_add: Arc<Vec<FileWithHash>>,
directory_str: impl AsRef<str>,
err_files: Vec<ErrorFileInfo>,
update_timestamp: bool,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
let mut retry_count: usize = 0;
let directory_str = directory_str.as_ref();
let workspace_id = workspace_id.as_ref().to_string();
let max_retries = max_retries();
while retry_count < max_retries {
retry_count += 1;
match stage_files_to_workspace(
remote_repo,
client.clone(),
&workspace_id,
files_to_add.clone(),
directory_str,
err_files.clone(),
update_timestamp,
)
.await
{
Ok(stage_err_files) => {
return Ok(stage_err_files);
}
Err(e) => {
log::error!("Error staging files to workspace: {e:?}");
if retry_count == max_retries {
return Err(OxenError::basic_str(format!(
"failed to stage files to workspace after retries: {e:?}"
)));
}
}
}
let wait_time = exponential_backoff(BASE_WAIT_TIME, retry_count, MAX_WAIT_TIME);
sleep(Duration::from_millis(wait_time as u64)).await;
}
log::error!(
"Error: Failed to stage files_to_add: {:?}",
files_to_add.len()
);
Err(OxenError::basic_str(
"failed to stage files to workspace after retries",
))
}
pub async fn stage_files_to_workspace(
remote_repo: &RemoteRepository,
client: Arc<reqwest::Client>,
workspace_id: impl AsRef<str>,
files_to_add: Arc<Vec<FileWithHash>>,
directory_str: impl AsRef<str>,
err_files: Vec<ErrorFileInfo>,
update_timestamp: bool,
) -> Result<Vec<ErrorFileInfo>, OxenError> {
let workspace_id = workspace_id.as_ref();
let directory_str = directory_str.as_ref();
let uri = if update_timestamp {
format!("/workspaces/{workspace_id}/versions/{directory_str}?update_timestamp=true")
} else {
format!("/workspaces/{workspace_id}/versions/{directory_str}")
};
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let files_to_send = if !err_files.is_empty() {
let err_hashes: std::collections::HashSet<String> =
err_files.iter().map(|f| f.hash.clone()).collect();
files_to_add
.iter()
.filter(|f| !err_hashes.contains(&f.hash))
.cloned()
.collect()
} else {
files_to_add.to_vec()
};
log::debug!("Files to send: {:?}", files_to_send.len());
let response = client.post(&url).json(&files_to_send).send().await?;
let body = client::parse_json_body(&url, response).await?;
let response: ErrorFilesResponse = serde_json::from_str(&body)?;
Ok(response.err_files)
}
async fn p_upload_single_file(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<Path>,
path: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let workspace_id = workspace_id.as_ref();
let directory = directory.as_ref();
let directory_name = directory.to_string_lossy();
let path = path.as_ref();
log::debug!("multipart_file_upload path: {path:?}");
let Ok(file) = std::fs::read(path) else {
let err = format!("Error reading file at path: {path:?}");
return Err(OxenError::basic_str(err));
};
let uri = format!("/workspaces/{workspace_id}/files/{directory_name}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let file_name: String = path.file_name().unwrap().to_string_lossy().into();
log::info!("api::client::workspaces::files::add sending file_name: {file_name:?}");
let file_part = reqwest::multipart::Part::bytes(file).file_name(file_name);
let form = reqwest::multipart::Form::new().part("file", file_part);
let client = client::new_for_url(&url)?;
let response = client.post(&url).multipart(form).send().await?;
let body = client::parse_json_body(&url, response).await?;
let result: Result<FilePathsResponse, serde_json::Error> = serde_json::from_str(&body);
match result {
Ok(val) => {
log::debug!("File path response: {val:?}");
if let Some(path) = val.paths.first() {
Ok(path.clone())
} else {
Err(OxenError::basic_str("No file path returned from server"))
}
}
Err(err) => {
let err = format!(
"api::staging::add_file error parsing response from {url}\n\nErr {err:?} \n\n{body}"
);
Err(OxenError::basic_str(err))
}
}
}
async fn p_upload_bytes_as_file(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
directory: impl AsRef<Path>,
path: impl AsRef<Path>,
mut buf: &[u8],
) -> Result<PathBuf, OxenError> {
let limit = WORKSPACE_ADD_LIMIT;
let total_size: u64 = buf.len().try_into().unwrap();
if total_size > limit {
let error_msg = format!(
"Total size of files to upload is too large. {} > {} Consider using `oxen push` instead for now until upload supports bulk push.",
ByteSize::b(total_size),
ByteSize::b(limit)
);
return Err(OxenError::basic_str(error_msg));
}
let workspace_id = workspace_id.as_ref();
let directory = directory.as_ref();
let directory_name = directory.to_string_lossy();
let path = path.as_ref();
log::debug!("multipart_file_upload path: {path:?}");
let file_name: String = path.file_name().unwrap().to_string_lossy().into();
log::info!("uploading bytes with file_name: {file_name:?}");
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
std::io::copy(&mut buf, &mut encoder)?;
let compressed_bytes = match encoder.finish() {
Ok(bytes) => bytes,
Err(e) => {
return Err(OxenError::basic_str(format!(
"Failed to finish gzip for file {}: {}",
&file_name, e
)));
}
};
let file_part = reqwest::multipart::Part::bytes(compressed_bytes)
.file_name(file_name)
.mime_str("application/gzip")?;
let form = reqwest::multipart::Form::new().part("file[]", file_part);
let uri = format!("/workspaces/{workspace_id}/files/{directory_name}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let response = client.post(&url).multipart(form).send().await?;
let body = client::parse_json_body(&url, response).await?;
let result: Result<FilePathsResponse, serde_json::Error> = serde_json::from_str(&body);
match result {
Ok(val) => {
log::debug!("File path response: {val:?}");
if let Some(path) = val.paths.first() {
Ok(path.clone())
} else {
Err(OxenError::basic_str("No file path returned from server"))
}
}
Err(err) => {
let err = format!(
"api::staging::add_file error parsing response from {url}\n\nErr {err:?} \n\n{body}"
);
Err(OxenError::basic_str(err))
}
}
}
pub async fn rm(
remote_repo: &RemoteRepository,
workspace_id: &str,
path: impl AsRef<Path>,
) -> Result<(), OxenError> {
let file_name = path.as_ref().to_string_lossy();
let uri = format!("/workspaces/{workspace_id}/files/{file_name}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("rm_file {url}");
let client = client::new_for_url(&url)?;
let response = client.delete(&url).send().await?;
let body = client::parse_json_body(&url, response).await?;
log::debug!("rm_file got body: {body}");
Ok(())
}
pub async fn rm_files(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
paths: Vec<PathBuf>,
) -> Result<(), OxenError> {
let workspace_id = workspace_id.as_ref();
let glob_opts = GlobOpts {
paths: paths.clone(),
staged_db: false,
merkle_tree: true,
working_dir: false,
walk_dirs: false,
};
let expanded_paths: HashSet<PathBuf> =
util::glob::parse_glob_paths(&glob_opts, Some(local_repo))?;
let repo_path = &local_repo.path;
let expanded_paths: Vec<PathBuf> = expanded_paths
.iter()
.map(|p| util::fs::path_relative_to_dir(p, repo_path).unwrap())
.collect();
let uri = format!("/workspaces/{workspace_id}/files");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("rm_files: {url}");
let client = client::new_for_url(&url)?;
let response = client.delete(&url).json(&expanded_paths).send().await?;
if response.status().is_success() {
let _body = client::parse_json_body(&url, response).await?;
println!("🐂 oxen staged paths {paths:?} as removed in workspace {workspace_id}");
if local_repo.is_remote_mode() {
for path in expanded_paths {
let full_path = local_repo.path.join(&path);
if full_path.is_dir() {
util::fs::remove_dir_all(&full_path)?;
}
if full_path.is_file() {
util::fs::remove_file(&full_path)?;
}
}
}
} else {
log::error!("rm_files failed with status: {}", response.status());
let body = client::parse_json_body(&url, response).await?;
return Err(OxenError::basic_str(format!(
"Error: Could not remove paths {body:?}"
)));
}
Ok(())
}
pub async fn rm_files_from_staged(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
paths: Vec<PathBuf>,
) -> Result<(), OxenError> {
let workspace_id = workspace_id.as_ref();
let repo_path = local_repo.path.clone();
let mut expanded_paths: HashSet<PathBuf> = HashSet::new();
for path in paths.clone() {
let relative_path = util::fs::path_relative_to_dir(&path, local_repo.path.clone())?;
let full_path = repo_path.join(&relative_path);
if util::fs::is_glob_path(&full_path) {
let Some(ref head_commit) = repositories::commits::head_commit_maybe(local_repo)?
else {
return Err(OxenError::basic_str(
"Error: Cannot rm with glob paths in remote-mode repo without HEAD commit",
));
};
let glob_pattern = relative_path
.file_name()
.unwrap()
.to_string_lossy()
.to_string();
let root_path = PathBuf::from("");
let parent_path = relative_path.parent().unwrap_or(&root_path);
let Some(dir_node) = repositories::tree::get_dir_with_children(
local_repo,
head_commit,
parent_path,
None,
)?
else {
continue;
};
let dir_children = dir_node.list_paths()?;
for child_path in dir_children {
let child_str = child_path.to_string_lossy().to_string();
if glob_match(&glob_pattern, &child_str) {
expanded_paths.insert(parent_path.join(child_path.clone()));
}
}
} else {
expanded_paths.insert(relative_path);
}
}
log::debug!("expanded paths: {expanded_paths:?}");
let expanded_paths: Vec<PathBuf> = expanded_paths.iter().cloned().collect();
let uri = format!("/workspaces/{workspace_id}/staged");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("rm_files: {url}");
let client = client::new_for_url(&url)?;
let response = client.delete(&url).json(&expanded_paths).send().await?;
let body = client::parse_json_body(&url, response).await?;
log::debug!("rm_files got body: {body}");
Ok(())
}
pub async fn mv(
remote_repo: &RemoteRepository,
workspace_id: impl AsRef<str>,
path: impl AsRef<Path>,
new_path: impl AsRef<Path>,
) -> Result<view::StatusMessage, OxenError> {
let workspace_id = workspace_id.as_ref();
let path = path.as_ref();
let file_path_str = path.to_string_lossy();
let uri = format!("/workspaces/{workspace_id}/files/{file_path_str}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let params = serde_json::to_string(&serde_json::json!({
"new_path": new_path.as_ref().to_string_lossy()
}))?;
let client = client::new_for_url(&url)?;
let res = client.patch(&url).body(params).send().await?;
let body = client::parse_json_body(&url, res).await?;
let response: Result<view::StatusMessage, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(response) => Ok(response),
Err(err) => {
let err = format!(
"api::workspaces::files::mv error parsing from {url}\n\nErr {err:?} \n\n{body}"
);
Err(OxenError::basic_str(err))
}
}
}
pub async fn download(
remote_repo: &RemoteRepository,
workspace_id: &str,
path: &str,
output_path: Option<&Path>,
) -> Result<(), OxenError> {
let uri = if util::fs::has_tabular_extension(path) {
format!("/workspaces/{workspace_id}/data_frames/download/{path}")
} else {
format!("/workspaces/{workspace_id}/files/{path}")
};
log::debug!("Downloading file from {workspace_id}/{path} to {output_path:?}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("Downloading file from {url}");
let client = client::new_for_url(&url)?;
let response = client.get(&url).send().await?;
if response.status().is_success() {
let output_path = output_path.unwrap_or_else(|| Path::new(path));
let output_dir = output_path.parent().unwrap_or_else(|| Path::new(""));
if !output_dir.exists() {
util::fs::create_dir_all(output_dir)?;
}
let mut file = tokio::fs::File::create(&output_path).await?;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
file.write_all(&chunk).await?;
}
file.flush().await?;
} else {
let status = response.status();
if status == reqwest::StatusCode::NOT_FOUND {
return Err(OxenError::path_does_not_exist(path));
}
log::error!("api::client::workspace::files::download failed with status: {status}");
let body = client::parse_json_body(&url, response).await?;
return Err(OxenError::basic_str(format!(
"Error: Could not download file {body:?}"
)));
}
Ok(())
}
pub async fn validate_upload_feasibility(
remote_repo: &RemoteRepository,
workspace_id: &str,
total_size: u64,
) -> Result<(), OxenError> {
let uri = format!("/workspaces/{workspace_id}/validate");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let body = ValidateUploadFeasibilityRequest { size: total_size };
let response = client
.post(&url)
.header("Content-Type", "application/json")
.json(&body)
.send()
.await?;
client::parse_json_body(&url, response).await?;
Ok(())
}
pub fn exponential_backoff(base_wait_time: usize, n: usize, max: usize) -> usize {
(base_wait_time + n.pow(2) + jitter()).min(max)
}
fn jitter() -> usize {
thread_rng().gen_range(0..=500)
}
#[cfg(test)]
mod tests {
use crate::constants::DEFAULT_BRANCH_NAME;
use crate::error::OxenError;
use crate::model::{EntryDataType, NewCommitBody, RemoteRepository};
use crate::opts::CloneOpts;
use crate::opts::fetch_opts::FetchOpts;
use crate::view::workspaces::WorkspaceResponseWithStatus;
use crate::{api, constants};
use crate::{repositories, test};
use std::path::PathBuf;
use std::path::Path;
use tempfile::TempDir;
use uuid;
#[tokio::test]
async fn test_stage_single_file() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-images";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let directory_name = "images";
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let path = test::test_img_file();
let result = api::client::workspaces::files::add(
&remote_repo,
&workspace_id,
directory_name,
vec![path],
&None,
)
.await;
assert!(result.is_ok());
let result = result.unwrap();
assert!(result.is_empty(), "{:?}", result);
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let path = Path::new(directory_name);
let entries = api::client::workspaces::changes::list(
&remote_repo,
&workspace_id,
path,
page_num,
page_size,
)
.await?;
assert_eq!(entries.added_files.entries.len(), 1);
assert_eq!(entries.added_files.total_entries, 1);
let assert_path = PathBuf::from("images").join(PathBuf::from("dwight_vince.jpeg"));
assert_eq!(
entries.added_files.entries[0].filename(),
assert_path.to_str().unwrap(),
);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_stage_large_file() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-large-file";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let directory_name = "my_large_file";
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let path = test::test_30k_parquet();
let result = api::client::workspaces::files::add(
&remote_repo,
&workspace_id,
directory_name,
vec![path],
&None,
)
.await;
assert!(result.is_ok());
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let path = Path::new(directory_name);
let entries = api::client::workspaces::changes::list(
&remote_repo,
&workspace_id,
path,
page_num,
page_size,
)
.await?;
assert_eq!(entries.added_files.entries.len(), 1);
assert_eq!(entries.added_files.total_entries, 1);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_stage_multiple_files() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-data";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let directory_name = "data";
let paths = vec![
test::test_img_file(),
test::test_img_file_with_name("cole_anthony.jpeg"),
];
let result = api::client::workspaces::files::add(
&remote_repo,
&workspace_id,
directory_name,
paths,
&None,
)
.await;
assert!(result.is_ok());
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let path = Path::new(directory_name);
let entries = api::client::workspaces::changes::list(
&remote_repo,
&workspace_id,
path,
page_num,
page_size,
)
.await?;
assert_eq!(entries.added_files.entries.len(), 2);
assert_eq!(entries.added_files.total_entries, 2);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_create_remote_readme_repo_and_commit_multiple_data_frames()
-> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, DEFAULT_BRANCH_NAME, &workspace_id)
.await?;
assert_eq!(workspace.id, workspace_id);
let file_to_post = test::test_1k_parquet();
let directory_name = "";
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
file_to_post,
)
.await;
println!("result: {result:?}");
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add another data frame".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
api::client::workspaces::commit(
&remote_repo,
DEFAULT_BRANCH_NAME,
&workspace_id,
&body,
)
.await?;
let entries = api::client::entries::list_entries_with_type(
&remote_repo,
"",
DEFAULT_BRANCH_NAME,
&EntryDataType::Tabular,
)
.await?;
assert_eq!(entries.len(), 1);
let workspace =
api::client::workspaces::create(&remote_repo, DEFAULT_BRANCH_NAME, &workspace_id)
.await?;
assert_eq!(workspace.id, workspace_id);
let file_to_post = test::test_csv_file_with_name("emojis.csv");
let directory_name = "moare_data";
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
file_to_post,
)
.await;
println!("result: {result:?}");
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add emojis data frame".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
api::client::workspaces::commit(
&remote_repo,
DEFAULT_BRANCH_NAME,
&workspace_id,
&body,
)
.await?;
let entries = api::client::entries::list_entries_with_type(
&remote_repo,
"",
DEFAULT_BRANCH_NAME,
&EntryDataType::Tabular,
)
.await?;
assert_eq!(entries.len(), 2);
println!("entries: {entries:?}");
let workspace =
api::client::workspaces::create(&remote_repo, DEFAULT_BRANCH_NAME, &workspace_id)
.await?;
assert_eq!(workspace.id, workspace_id);
let file_to_post = test::test_invalid_parquet_file();
let directory_name = "broken_data";
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
file_to_post,
)
.await;
println!("result: {result:?}");
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add broken data frame".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
api::client::workspaces::commit(
&remote_repo,
DEFAULT_BRANCH_NAME,
&workspace_id,
&body,
)
.await?;
let entries = api::client::entries::list_entries_with_type(
&remote_repo,
"",
DEFAULT_BRANCH_NAME,
&EntryDataType::Tabular,
)
.await?;
assert_eq!(entries.len(), 2);
println!("entries: {entries:?}");
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_commit_multiple_data_frames() -> Result<(), OxenError> {
test::run_readme_remote_repo_test(|_local_repo, remote_repo| async move {
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, DEFAULT_BRANCH_NAME, &workspace_id)
.await?;
assert_eq!(workspace.id, workspace_id);
let file_to_post = test::test_1k_parquet();
let directory_name = "";
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
file_to_post,
)
.await;
println!("result: {result:?}");
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add another data frame".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
api::client::workspaces::commit(
&remote_repo,
DEFAULT_BRANCH_NAME,
&workspace_id,
&body,
)
.await?;
let entries = api::client::entries::list_entries_with_type(
&remote_repo,
"",
DEFAULT_BRANCH_NAME,
&EntryDataType::Tabular,
)
.await?;
assert_eq!(entries.len(), 1);
let workspace =
api::client::workspaces::create(&remote_repo, DEFAULT_BRANCH_NAME, &workspace_id)
.await?;
assert_eq!(workspace.id, workspace_id);
let file_to_post = test::test_csv_file_with_name("emojis.csv");
let directory_name = "moare_data";
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
file_to_post,
)
.await;
println!("result: {result:?}");
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add emojis data frame".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
api::client::workspaces::commit(
&remote_repo,
DEFAULT_BRANCH_NAME,
&workspace_id,
&body,
)
.await?;
let entries = api::client::entries::list_entries_with_type(
&remote_repo,
"",
DEFAULT_BRANCH_NAME,
&EntryDataType::Tabular,
)
.await?;
assert_eq!(entries.len(), 2);
println!("entries: {entries:?}");
let workspace =
api::client::workspaces::create(&remote_repo, DEFAULT_BRANCH_NAME, &workspace_id)
.await?;
assert_eq!(workspace.id, workspace_id);
let file_to_post = test::test_invalid_parquet_file();
let directory_name = "broken_data";
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
file_to_post,
)
.await;
println!("result: {result:?}");
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add broken data frame".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
api::client::workspaces::commit(
&remote_repo,
DEFAULT_BRANCH_NAME,
&workspace_id,
&body,
)
.await?;
let entries = api::client::entries::list_entries_with_type(
&remote_repo,
"",
DEFAULT_BRANCH_NAME,
&EntryDataType::Tabular,
)
.await?;
assert_eq!(entries.len(), 2);
println!("entries: {entries:?}");
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_commit_staged_single_file_and_pull() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-data";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let file_to_post = test::test_img_file();
let directory_name = "data";
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
file_to_post,
)
.await;
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add one image".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
let commit =
api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id, &body)
.await?;
let remote_commit = api::client::commits::get_by_id(&remote_repo, &commit.id).await?;
assert!(remote_commit.is_some());
assert_eq!(commit.id, remote_commit.unwrap().id);
let remote_repo_cloned = remote_repo.clone();
test::run_empty_dir_test_async(|cloned_repo_dir| async move {
let opts = CloneOpts::new(remote_repo.remote.url, cloned_repo_dir.join("new_repo"));
let cloned_repo = repositories::clone(&opts).await?;
let path = cloned_repo
.path
.join(directory_name)
.join(test::test_img_file().file_name().unwrap());
assert!(!path.exists());
let mut fetch_opts = FetchOpts::new();
fetch_opts.branch = "add-data".to_string();
repositories::pull_remote_branch(&cloned_repo, &fetch_opts).await?;
let local_commit = repositories::commits::head_commit(&cloned_repo)?;
assert_eq!(local_commit.id, commit.id);
println!("Looking for file at path: {path:?}");
assert!(path.exists());
Ok(())
})
.await?;
Ok(remote_repo_cloned)
})
.await
}
#[tokio::test]
async fn test_commit_schema_on_branch() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "test-schema-issues";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let original_schemas = api::client::schemas::list(&remote_repo, branch_name).await?;
let directory_name = "tabular";
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let path = test::test_1k_parquet();
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
path,
)
.await;
assert!(result.is_ok());
let path = test::test_img_file();
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
path,
)
.await;
assert!(result.is_ok());
let body = NewCommitBody {
message: "Add one data frame and one image".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
let commit =
api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id, &body)
.await?;
assert!(commit.message.contains("Add one data frame and one image"));
let schemas = api::client::schemas::list(&remote_repo, branch_name).await?;
assert_eq!(schemas.len(), original_schemas.len() + 1);
let file_counts =
api::client::dir::file_counts(&remote_repo, branch_name, directory_name).await?;
assert_eq!(file_counts.dir.data_types.len(), 2);
assert_eq!(
file_counts
.dir
.data_types
.iter()
.find(|dt| dt.data_type == "image")
.unwrap()
.count,
1
);
assert_eq!(
file_counts
.dir
.data_types
.iter()
.find(|dt| dt.data_type == "tabular")
.unwrap()
.count,
1
);
let file_counts = api::client::dir::file_counts(&remote_repo, branch_name, "").await?;
assert_eq!(file_counts.dir.data_types.len(), 2);
assert_eq!(
file_counts
.dir
.data_types
.iter()
.find(|dt| dt.data_type == "image")
.unwrap()
.count,
1
);
assert_eq!(
file_counts
.dir
.data_types
.iter()
.find(|dt| dt.data_type == "tabular")
.unwrap()
.count,
2
);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_rm_file() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-images";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let directory_name = "images";
let path = test::test_img_file();
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
path,
)
.await;
assert!(result.is_ok());
let result =
api::client::workspaces::files::rm(&remote_repo, &workspace_id, result.unwrap())
.await;
assert!(result.is_ok());
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let path = Path::new(directory_name);
let entries = api::client::workspaces::changes::list(
&remote_repo,
&workspace_id,
path,
page_num,
page_size,
)
.await?;
assert_eq!(entries.added_files.entries.len(), 0);
assert_eq!(entries.added_files.total_entries, 0);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_stage_file_in_multiple_subdirectories() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-images";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let directory_name = "my/images/dir/is/long";
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let path = test::test_img_file();
let result = api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
directory_name,
path,
)
.await;
assert!(result.is_ok());
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let path = Path::new(directory_name);
let entries = api::client::workspaces::changes::list(
&remote_repo,
&workspace_id,
path,
page_num,
page_size,
)
.await?;
assert_eq!(entries.added_files.entries.len(), 1);
assert_eq!(entries.added_files.total_entries, 1);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_add_multiple_files() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-multiple-files";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let workspace_id = format!("test-workspace-{}", uuid::Uuid::new_v4());
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let paths = vec![
test::test_img_file(),
test::test_img_file_with_name("cole_anthony.jpeg"),
];
let directory = "test_data";
let result = api::client::workspaces::files::add(
&remote_repo,
&workspace_id,
directory,
paths,
&None,
)
.await;
assert!(result.is_ok());
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let path = Path::new(directory);
let entries = api::client::workspaces::changes::list(
&remote_repo,
&workspace_id,
path,
page_num,
page_size,
)
.await?;
assert_eq!(entries.added_files.entries.len(), 2);
assert_eq!(entries.added_files.total_entries, 2);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_add_file_with_absolute_path() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move {
let branch_name = "add-images-with-absolute-path";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let directory_name = "new-images";
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let path = crate::util::fs::canonicalize(test::test_img_file())?;
let result = api::client::workspaces::files::add(
&remote_repo,
&workspace_id,
directory_name,
vec![path],
&None,
)
.await;
assert!(result.is_ok());
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let path = Path::new("");
let entries = api::client::workspaces::changes::list(
&remote_repo,
&workspace_id,
path,
page_num,
page_size,
)
.await?;
assert_eq!(entries.added_files.entries.len(), 1);
assert_eq!(entries.added_files.total_entries, 1);
let assert_path = PathBuf::from("new-images").join(PathBuf::from("dwight_vince.jpeg"));
assert_eq!(
entries.added_files.entries[0].filename(),
assert_path.to_str().unwrap(),
);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_download_version_file_from_workspace() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let branch_name = constants::DEFAULT_BRANCH_NAME;
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, &branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let bounding_box_path = PathBuf::from("README.md");
let temp_dir = TempDir::new()?;
let output_path = temp_dir.path().join("output.md");
api::client::workspaces::files::download(
&remote_repo,
&workspace_id,
bounding_box_path.to_str().unwrap(),
Some(&output_path),
)
.await?;
assert!(output_path.exists());
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_mv_file() -> Result<(), OxenError> {
if std::env::consts::OS == "windows" {
return Ok(());
}
test::run_remote_repo_test_all_data_pushed(|remote_repo| async move {
let branch_name = "mv-file-test";
let branch = api::client::branches::create_from_branch(
&remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(&remote_repo, branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let original_path = "train/dog_1.jpg";
let new_path = "renamed/images/dog_1_moved.jpg";
let mv_response = api::client::workspaces::files::mv(
&remote_repo,
&workspace_id,
original_path,
new_path,
)
.await?;
assert_eq!(mv_response.status, "success");
let body = NewCommitBody {
message: "Moved file to new location".to_string(),
author: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
let commit =
api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id, &body)
.await?;
let new_file =
api::client::entries::get_entry(&remote_repo, new_path, &commit.id).await?;
assert!(new_file.is_some(), "File should exist at new path");
let file_bytes =
api::client::file::get_file(&remote_repo, branch_name, new_path).await?;
assert!(
!file_bytes.is_empty(),
"File content should not be empty at new path"
);
let old_file =
api::client::entries::get_entry(&remote_repo, original_path, &commit.id).await?;
assert!(old_file.is_none(), "File should not exist at original path");
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_download_file_from_nonexistent_workspace() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let non_existent_workspace_id = "workspace_does_not_exist";
let workspace =
api::client::workspaces::get(&remote_repo, non_existent_workspace_id).await?;
assert!(workspace.is_none());
let temp_dir = TempDir::new()?;
let output_path = temp_dir.path().join("output.md");
let result = api::client::workspaces::files::download(
&remote_repo,
non_existent_workspace_id,
"README.md",
Some(&output_path),
)
.await;
assert!(result.is_err());
assert!(!output_path.exists());
Ok(remote_repo)
})
.await
}
async fn make_workspace(
remote_repo: &RemoteRepository,
) -> Result<WorkspaceResponseWithStatus, OxenError> {
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace = api::client::workspaces::create(
remote_repo,
&constants::DEFAULT_BRANCH_NAME,
&workspace_id,
)
.await?;
assert_eq!(
workspace.id, workspace_id,
"Expected to create workspace with ID {} but got ID {}",
workspace_id, workspace.id
);
Ok(workspace)
}
#[tokio::test]
async fn test_download_uploaded_file_from_workspace() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let workspace_id = make_workspace(&remote_repo).await?.id;
let temp_dir = TempDir::new()?;
let test_filename = "file_to_upload.txt";
let test_file = {
let p = temp_dir.path().join(test_filename);
tokio::fs::write(&p, b"Hello world! How are you today?").await?;
p
};
let upload_path = {
let upload_path = "images";
api::client::workspaces::files::upload_single_file(
&remote_repo,
&workspace_id,
upload_path,
&test_file,
)
.await?;
upload_path
};
let output_path = {
let output_path = temp_dir.path().join("downloaded.jpeg");
let file_path = format!("{upload_path}/{test_filename}");
api::client::workspaces::files::download(
&remote_repo,
&workspace_id,
&file_path,
Some(&output_path),
)
.await?;
assert!(
output_path.exists(),
"Expecting to have downloaded file to: {}",
output_path.display()
);
output_path
};
let downloaded_contents = tokio::fs::read_to_string(&output_path).await?;
assert_eq!(downloaded_contents, "Hello world! How are you today?");
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_download_nonexistent_file_from_workspace() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let workspace_id = make_workspace(&remote_repo).await?.id;
let temp_dir = TempDir::new()?;
let output_path = temp_dir.path().join("output.txt");
let result = api::client::workspaces::files::download(
&remote_repo,
&workspace_id,
"this_file_does_not_exist.txt",
Some(&output_path),
)
.await;
assert!(result.is_err(), "{result:?}");
assert!(
!output_path.exists(),
"Not expecting '{}' to exist",
output_path.display()
);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_download_entry_fallback_for_committed_file() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let workspace = make_workspace(&remote_repo).await?;
let temp_dir = TempDir::new()?;
let output_path = {
let output_path = temp_dir.path().join("fallback_readme.md");
api::client::entries::download_entry(
&remote_repo,
Path::new("README.md"),
&output_path,
&workspace.commit.id,
)
.await?;
assert!(
output_path.exists(),
"Expecting to have downloaded output to: {}",
output_path.display()
);
output_path
};
let content = std::fs::read_to_string(&output_path)?;
assert!(!content.is_empty(), "Expecting non-empty README.md file");
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_workspace_lookup_by_name_for_download() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let workspace_name = "my-download-workspace";
let workspace = {
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace = api::client::workspaces::create_with_name(
&remote_repo,
&constants::DEFAULT_BRANCH_NAME,
&workspace_id,
workspace_name,
)
.await?;
assert_eq!(workspace.id, workspace_id);
assert_eq!(workspace.name, Some(workspace_name.to_string()));
workspace
};
let found_workspace =
api::client::workspaces::get_by_name(&remote_repo, workspace_name).await?;
assert!(found_workspace.is_some());
let found_workspace = found_workspace.unwrap();
assert_eq!(found_workspace.id, workspace.id);
let temp_dir = TempDir::new()?;
let output_path = temp_dir.path().join("readme_by_name.md");
api::client::workspaces::files::download(
&remote_repo,
&found_workspace.id,
"README.md",
Some(&output_path),
)
.await?;
assert!(output_path.exists());
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_workspace_lookup_by_nonexistent_name() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let non_existent_name = "workspace_name_does_not_exist";
let workspace =
api::client::workspaces::get_by_name(&remote_repo, non_existent_name).await?;
assert!(workspace.is_none());
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_download_nonexistent_file_returns_path_dne() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let workspace_id = make_workspace(&remote_repo).await?.id;
let temp_dir = TempDir::new()?;
let output_path = temp_dir.path().join("output.txt");
let result = api::client::workspaces::files::download(
&remote_repo,
&workspace_id,
"this_file_does_not_exist.txt",
Some(&output_path),
)
.await;
assert!(result.is_err(), "Expected error for nonexistent file");
let err = result.unwrap_err();
assert!(
matches!(err, OxenError::PathDoesNotExist(_)),
"Expected PathDoesNotExist error, got: {err:?}"
);
assert!(
!output_path.exists(),
"Not expecting '{}' to exist",
output_path.display()
);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_add_files_preserves_paths_local_repo_relative_paths() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|local_repo, remote_repo| async move {
help_test_add_files_preserve_path(&remote_repo, &local_repo.path, true).await?;
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_add_files_preserves_paths_local_repo_absolute_paths() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|local_repo, remote_repo| async move {
help_test_add_files_preserve_path(
&remote_repo,
&std::path::absolute(local_repo.path).unwrap(),
false,
)
.await?;
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_add_files_preserves_paths_tempdir_relative_paths() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_, remote_repo| async move {
let base_dir_guard = tempfile::tempdir()?;
let base_dir = base_dir_guard.path().to_path_buf();
help_test_add_files_preserve_path(&remote_repo, &base_dir, true).await?;
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_add_files_preserves_paths_tempdir_absolute_paths() -> Result<(), OxenError> {
test::run_remote_repo_test_bounding_box_csv_pushed(|_, remote_repo| async move {
let base_dir_guard = tempfile::tempdir()?;
let base_dir = base_dir_guard.path().to_path_buf();
help_test_add_files_preserve_path(&remote_repo, &base_dir, false).await?;
Ok(remote_repo)
})
.await
}
async fn help_test_add_files_preserve_path(
remote_repo: &RemoteRepository,
base_dir: &Path,
use_relative_paths: bool,
) -> Result<(), OxenError> {
let branch_name = "add-files-preserve-paths";
let branch = api::client::branches::create_from_branch(
remote_repo,
branch_name,
DEFAULT_BRANCH_NAME,
)
.await?;
assert_eq!(branch.name, branch_name);
let workspace_id = uuid::Uuid::new_v4().to_string();
let workspace =
api::client::workspaces::create(remote_repo, branch_name, &workspace_id).await?;
assert_eq!(workspace.id, workspace_id);
let sub_dir_1 = base_dir.join("data_being_added");
let sub_dir_2 = sub_dir_1.join("nested");
std::fs::create_dir_all(&sub_dir_2)?;
let file_a = sub_dir_2.join("file_a.txt");
let file_b = sub_dir_2.join("file_b.txt");
let file_c = sub_dir_1.join("file_c.txt");
let file_root = base_dir.join("root_file.txt");
test::write_txt_file_to_path(&file_a, "content a")?;
test::write_txt_file_to_path(&file_b, "content b")?;
test::write_txt_file_to_path(&file_c, "contents c")?;
test::write_txt_file_to_path(&file_root, "root content")?;
let paths: Vec<PathBuf> = {
let paths = vec![file_a, file_b, file_c, file_root];
if use_relative_paths {
paths
.into_iter()
.map(|p| p.strip_prefix(base_dir).unwrap().to_path_buf())
.collect()
} else {
paths
.into_iter()
.map(|p| std::path::absolute(p).unwrap())
.collect()
}
};
let result =
api::client::workspaces::files::add_files(remote_repo, &workspace_id, base_dir, paths)
.await;
assert!(result.is_ok(), "add_files failed: {result:?}");
let page_num = constants::DEFAULT_PAGE_NUM;
let page_size = constants::DEFAULT_PAGE_SIZE;
let entries = api::client::workspaces::changes::list(
remote_repo,
&workspace_id,
Path::new(""),
page_num,
page_size,
)
.await?;
assert_eq!(
entries.added_files.total_entries, 4,
"Expected 4 staged files, got {}",
entries.added_files.total_entries
);
let staged: Vec<String> = entries
.added_files
.entries
.iter()
.map(|e| e.filename().to_string())
.collect();
for p in [
format!(
"data_being_added{}nested{}file_a.txt",
std::path::MAIN_SEPARATOR_STR,
std::path::MAIN_SEPARATOR_STR
),
format!(
"data_being_added{}nested{}file_b.txt",
std::path::MAIN_SEPARATOR_STR,
std::path::MAIN_SEPARATOR_STR
),
format!(
"data_being_added{}file_c.txt",
std::path::MAIN_SEPARATOR_STR
),
"root_file.txt".to_string(),
] {
assert!(
staged.contains(&p),
"Expected '{p}' in staged paths, got: {staged:?}"
)
}
Ok(())
}
}