use crate::errors::OxenHttpError;
use crate::helpers::{file_stream_response, get_repo};
use crate::params::{app_data, path_param};
use liboxen::core;
use liboxen::core::staged::get_staged_db_manager;
use liboxen::error::OxenError;
use liboxen::model::LocalRepository;
use liboxen::model::merkle_tree::node::EMerkleTreeNode;
use liboxen::model::metadata::metadata_image::ImgResize;
use liboxen::model::metadata::metadata_video::VideoThumbnail;
use liboxen::repositories;
use liboxen::util;
use liboxen::util::hasher;
use liboxen::view::workspaces::RenameRequest;
use liboxen::view::{
ErrorFileInfo, ErrorFilesResponse, FilePathsResponse, FileWithHash, StatusMessage,
StatusMessageDescription,
};
use actix_multipart::Multipart;
use actix_web::Error;
use actix_web::{HttpRequest, HttpResponse, web};
use flate2::read::GzDecoder;
use futures_util::TryStreamExt as _;
use serde::Deserialize;
use std::io::Read as StdRead;
use std::path::PathBuf;
use std::sync::Arc;
use utoipa;
#[derive(utoipa::ToSchema)]
pub struct FileUpload {
#[schema(value_type = String, format = Binary)]
pub file: Vec<u8>,
}
#[derive(Deserialize, Debug, Default)]
pub struct StagingQueryParams {
pub update_timestamp: Option<bool>,
}
#[derive(Deserialize, Debug)]
pub struct WorkspaceFileQueryParams {
pub width: Option<u32>,
pub height: Option<u32>,
pub timestamp: Option<f64>,
pub thumbnail: Option<bool>,
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path}",
description = "Get a file from a workspace.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("path" = String, Path, description = "The path to the file in the workspace", example = "images/train/dog_1.jpg"),
("width" = Option<u32>, Query, description = "Width for image resize or video thumbnail", example = 320),
("height" = Option<u32>, Query, description = "Height for image resize or video thumbnail", example = 240),
("timestamp" = Option<f64>, Query, description = "Timestamp in seconds to extract video thumbnail from", example = 1.0),
("thumbnail" = Option<bool>, Query, description = "Set to true to generate a video thumbnail instead of returning the full video", example = true)
),
responses(
(status = 200, description = "File content returned as a stream. Content-Type varies: matches the file's MIME type for regular files and image resizes, or 'image/jpeg' for video thumbnails",
body = Vec<u8>,
headers(
("oxen-revision-id" = String, description = "The commit ID of the file version")
)
),
(status = 404, description = "Workspace or File not found"),
(status = 400, description = "Invalid parameters")
)
)]
pub async fn get(
req: HttpRequest,
query: web::Query<WorkspaceFileQueryParams>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let version_store = repo.version_store()?;
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Err(OxenHttpError::NotFound);
};
let path = path_param(&req, "path")?.to_string();
log::debug!("got workspace file path {:?}", &path);
let staged_db_manager = get_staged_db_manager(&workspace.workspace_repo)?;
let file_node = match staged_db_manager.read_from_staged_db(&path)? {
Some(staged_node) => match staged_node.node.node {
EMerkleTreeNode::File(f) => Ok(f),
_ => Err(OxenError::basic_str(
"Only single file download is supported",
)),
}?,
None => {
if let Some(file_node) = repositories::tree::get_file_by_path(
&workspace.base_repo,
&workspace.commit,
&path,
)? {
file_node
} else {
return Err(OxenHttpError::InternalOxenError(
OxenError::resource_not_found(&path),
));
}
}
};
let file_hash = file_node.hash();
let hash_str = file_hash.to_string();
let mime_type = file_node.mime_type();
let num_bytes = file_node.num_bytes();
let last_commit_id = file_node.last_commit_id().to_string();
let query_params = query.into_inner();
if (query_params.width.is_some() || query_params.height.is_some())
&& mime_type.starts_with("image/")
{
let img_resize = ImgResize {
width: query_params.width,
height: query_params.height,
};
log::debug!("img_resize {img_resize:?}");
let (file_stream, content_length) = util::fs::handle_image_resize(
Arc::clone(&version_store),
hash_str.clone(),
&PathBuf::from(&path),
img_resize,
)
.await?;
return Ok(
file_stream_response(mime_type, &last_commit_id, Some(content_length))
.streaming(file_stream),
);
}
if query_params.thumbnail == Some(true) && mime_type.starts_with("video/") {
let video_thumbnail = VideoThumbnail {
width: query_params.width,
height: query_params.height,
timestamp: query_params.timestamp.or(Some(1.0)),
thumbnail: query_params.thumbnail,
};
log::debug!("video_thumbnail {video_thumbnail:?}");
let stream =
util::fs::handle_video_thumbnail(Arc::clone(&version_store), hash_str, video_thumbnail)
.await?;
return Ok(file_stream_response("image/jpeg", &last_commit_id, None).streaming(stream));
}
log::debug!("did not hit the resize or thumbnail cache");
let stream = version_store.get_version_stream(&hash_str).await?;
Ok(file_stream_response(mime_type, &last_commit_id, Some(num_bytes)).streaming(stream))
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path}",
description = "Upload and stage files to a workspace. Accept a multipart with either gzipped or uncompressed file parts. Use the filename from the file part and compute the file hash from the content.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("path" = String, Path, description = "The target path to upload the file to", example = "data/train")
),
request_body(
content_type = "multipart/form-data",
description = "Multipart upload of file. Each file should be sent as a separate file part",
content = FileUpload,
),
responses(
(status = 200, description = "File successfully uploaded to workspace", body = FilePathsResponse),
(status = 404, description = "Workspace not found"),
(status = 400, description = "Invalid upload request")
)
)]
pub async fn add(req: HttpRequest, payload: Multipart) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(&app_data.path, namespace, &repo_name)?;
let directory = path_param(&req, "path")?.to_string();
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let version_store = repo.version_store()?;
let (upload_files, err_files, update_timestamp) = save_parts(payload, &repo).await?;
log::debug!("Save multiparts found {} err_files", err_files.len());
log::debug!(
"Calling add version files from the core workspace logic with {} files (update_timestamp: {})",
upload_files.len(),
update_timestamp,
);
let mut ret_files = vec![];
for upload_file in upload_files {
let file_name = upload_file.path.file_name().unwrap();
let dst_path = PathBuf::from(&directory).join(file_name);
let version_path = version_store.get_version_path(&upload_file.hash).await?;
let ret_file = match core::v_latest::workspaces::files::add_version_file(
&workspace,
&version_path,
&dst_path,
&upload_file.hash,
update_timestamp,
) {
Ok(ret_file) => ret_file,
Err(e) => {
log::error!("Error adding file {version_path:?}: {e:?}");
continue;
}
};
ret_files.push(ret_file);
log::info!("Successfully staged file {upload_file:?}");
}
Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_created(),
paths: ret_files,
}))
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/batch/{directory}",
description = "Stage file nodes to a workspace. Do not upload file contents to the repository.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("directory" = String, Path, description = "The directory to stage the files into", example = "data/train"),
("update_timestamp" = Option<bool>, Query, description = "Force staging even if file content has not changed, updating the file timestamp", example = false)
),
request_body(
content = Vec<FileWithHash>,
description = "List of files and their pre-calculated hashes (must exist in version store).",
example = json!([
{
"path": "images/train/dog.jpg",
"hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
}
])
),
responses(
(status = 200, description = "Files staged successfully", body = ErrorFilesResponse),
(status = 404, description = "Workspace not found")
)
)]
pub async fn add_version_files(
req: HttpRequest,
payload: web::Json<Vec<FileWithHash>>,
query: web::Query<StagingQueryParams>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let directory = path_param(&req, "directory")?.to_string();
let update_timestamp = query.update_timestamp.unwrap_or(false);
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let files_with_hash: Vec<FileWithHash> = payload.into_inner();
log::debug!(
"Calling add version files from the core workspace logic with {} files (update_timestamp: {})",
files_with_hash.len(),
update_timestamp,
);
let err_files = core::v_latest::workspaces::files::add_version_files(
&repo,
&workspace,
&files_with_hash,
&directory,
update_timestamp,
)
.await?;
log::debug!("Staging complete with {:?} err files", err_files.len());
Ok(HttpResponse::Ok().json(ErrorFilesResponse {
status: StatusMessage::resource_created(),
err_files,
}))
}
#[utoipa::path(
delete,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files",
description = "Stage files for removal from the repository. Accepts both files and directories.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745")
),
request_body(
content = Vec<String>,
description = "List of paths to stage for removal",
example = json!(["images/train/dog_1.jpg", "annotations/incorrect.xml"])
),
responses(
(status = 200, description = "Files successfully removed", body = FilePathsResponse),
(status = 206, description = "Some files could not be found/removed (returns paths of files not found)", body = FilePathsResponse),
(status = 404, description = "Workspace not found")
)
)]
pub async fn rm_files(
req: HttpRequest,
payload: web::Json<Vec<PathBuf>>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let paths_to_remove: Vec<PathBuf> = payload.into_inner();
let mut ret_files = vec![];
let mut err_files = vec![];
for path in &paths_to_remove {
err_files.extend(repositories::workspaces::files::rm(&workspace, &path).await?);
log::debug!("rm ✅ success! staged file {path:?} as removed");
ret_files.push(path);
}
log::debug!("err_files: {err_files:?}");
if err_files.is_empty() {
Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_deleted(),
paths: paths_to_remove,
}))
} else {
let error_paths: Vec<PathBuf> = err_files
.into_iter()
.filter_map(|err_info| err_info.path)
.collect();
Ok(HttpResponse::PartialContent().json(FilePathsResponse {
status: StatusMessage::resource_not_found(),
paths: error_paths,
}))
}
}
pub async fn validate(req: HttpRequest, _body: String) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let Some(_workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
Ok(HttpResponse::Ok().json(StatusMessage::resource_found()))
}
#[utoipa::path(
patch,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path}",
description = "Move or rename a file within the workspace.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("path" = String, Path, description = "The current path to the file to move/rename", example = "images/train/dog_1.jpg")
),
request_body(
content = RenameRequest,
description = "The new path for the file",
example = json!({"new_path": "images/train/renamed_dog_1.jpg"})
),
responses(
(status = 200, description = "File successfully moved/renamed", body = StatusMessage),
(status = 400, description = "Invalid request (empty new_path or new_path already exists)"),
(status = 404, description = "Workspace or file not found")
)
)]
pub async fn mv(req: HttpRequest, body: String) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let path = PathBuf::from(path_param(&req, "path")?);
let body: RenameRequest = serde_json::from_str(&body)?;
if body.new_path.is_empty() {
return Err(OxenHttpError::BadRequest("new_path cannot be empty".into()));
}
let new_path = util::fs::validate_and_normalize_path(&body.new_path)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
if repositories::tree::get_node_by_path(&repo, &workspace.commit, &new_path)?.is_some() {
return Err(OxenHttpError::BadRequest(
"new_path already exists in the repository".into(),
));
}
if util::fs::is_tabular(&path) {
repositories::workspaces::data_frames::rename(&workspace, &path, &new_path).await?;
} else {
repositories::workspaces::files::mv(&workspace, &path, &new_path)?;
}
Ok(HttpResponse::Ok().json(StatusMessage::resource_updated()))
}
pub async fn save_parts(
mut payload: Multipart,
repo: &LocalRepository,
) -> Result<(Vec<FileWithHash>, Vec<ErrorFileInfo>, bool), Error> {
let version_store = repo.version_store().map_err(|oxen_err: OxenError| {
log::error!("Failed to get version store: {oxen_err:?}");
actix_web::error::ErrorInternalServerError(oxen_err.to_string())
})?;
let gzip_mime: mime::Mime = "application/gzip".parse().unwrap();
let mut upload_files: Vec<FileWithHash> = vec![];
let mut err_files: Vec<ErrorFileInfo> = vec![];
let mut update_timestamp = false;
while let Some(mut field) = payload.try_next().await? {
let Some(content_disposition) = field.content_disposition().cloned() else {
continue;
};
if let Some(name) = content_disposition.get_name()
&& name == "update_timestamp"
{
update_timestamp = parse_bool_field(&mut field).await?;
continue;
}
if let Some(name) = content_disposition.get_name()
&& (name == "file[]" || name == "file")
{
let upload_filename = content_disposition.get_filename().map_or_else(
|| {
Err(actix_web::error::ErrorBadRequest(
"Missing hash in multipart request",
))
},
|fhash_os_str| Ok(fhash_os_str.to_string()),
)?;
let mut field_bytes = Vec::new();
while let Some(chunk) = field.try_next().await? {
field_bytes.extend_from_slice(&chunk);
}
let is_gzipped = field
.content_type()
.map(|mime| {
mime.type_() == gzip_mime.type_() && mime.subtype() == gzip_mime.subtype()
})
.unwrap_or(false);
let upload_filename_copy = upload_filename.clone();
let (upload_filehash, data_to_store) =
match actix_web::web::block(move || -> Result<(String, Vec<u8>), OxenError> {
if is_gzipped {
log::debug!(
"Decompressing gzipped data for file: {upload_filename_copy:?}"
);
let mut decoder = GzDecoder::new(&field_bytes[..]);
let mut decompressed_bytes: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decompressed_bytes).map_err(|e| {
OxenError::basic_str(format!("Failed to decompress gzipped data: {e}"))
})?;
let hash = hasher::hash_buffer(&decompressed_bytes);
Ok((hash, decompressed_bytes))
} else {
log::debug!("Data for file {upload_filename_copy:?} is not gzipped.");
let hash = hasher::hash_buffer(&field_bytes);
Ok((hash, field_bytes))
}
})
.await
{
Ok(Ok((hash, data))) => (hash, data),
Ok(Err(e)) => {
log::error!(
"Failed to decompress data for file {}: {:?}",
&upload_filename,
e
);
record_error_file(
&mut err_files,
upload_filename.clone(),
None,
format!("Failed to decompress data: {e:?}"),
);
continue;
}
Err(e) => {
log::error!(
"Failed to execute blocking decompression task for file {}: {}",
&upload_filename,
e
);
record_error_file(
&mut err_files,
upload_filename.clone(),
None,
format!("Failed to execute blocking decompression: {e}"),
);
continue;
}
};
match version_store
.store_version(&upload_filehash, &data_to_store)
.await
{
Ok(_) => {
upload_files.push(FileWithHash {
hash: upload_filehash.to_string(),
path: upload_filename.into(),
});
log::info!("Successfully stored version for hash: {}", &upload_filehash);
}
Err(e) => {
log::error!(
"Failed to store version for hash {}: {}",
&upload_filehash,
e
);
record_error_file(
&mut err_files,
upload_filehash.clone(),
None,
format!("Failed to store version: {e}"),
);
continue;
}
}
}
}
Ok((upload_files, err_files, update_timestamp))
}
async fn parse_bool_field(field: &mut actix_multipart::Field) -> Result<bool, Error> {
let mut bytes = Vec::new();
while let Some(chunk) = field.try_next().await? {
bytes.extend_from_slice(&chunk);
}
let value = String::from_utf8_lossy(&bytes);
match value.as_ref() {
"true" | "1" => Ok(true),
"false" | "0" => Ok(false),
_ => Err(actix_web::error::ErrorBadRequest(format!(
"Invalid boolean value: {value}"
))),
}
}
fn record_error_file(
err_files: &mut Vec<ErrorFileInfo>,
filehash: String,
filepath: Option<PathBuf>,
error: String,
) {
let info = ErrorFileInfo {
hash: filehash,
path: filepath,
error,
};
err_files.push(info);
}
#[cfg(test)]
mod tests {
use crate::app_data::OxenAppData;
use crate::controllers;
use crate::test;
use actix_web::http::header;
use actix_web::{App, web};
use liboxen::error::OxenError;
use liboxen::repositories;
use liboxen::util;
#[actix_web::test]
async fn test_get_nonexistent_file_returns_404() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let file_path = "this_file_does_not_exist.txt";
let uri =
format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{file_path}");
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::get().to(controllers::workspaces::files::get),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::NOT_FOUND);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_workspace_file_get_exposes_content_length() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Get-Headers";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
let file_content = "Hello";
util::fs::write_to_path(&hello_file, file_content)?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let uri =
format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/hello.txt");
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::get().to(controllers::workspaces::files::get),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_LENGTH).unwrap(),
file_content.len().to_string().as_str()
);
assert_eq!(
resp.headers()
.get(header::ACCESS_CONTROL_EXPOSE_HEADERS)
.unwrap(),
header::CONTENT_LENGTH.as_str()
);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
}