use crate::errors::OxenHttpError;
use crate::helpers::{file_stream_response, get_repo};
use crate::params::{app_data, client_must_use_multipart_staging, path_param};
use liboxen::constants::stream_segment_size;
use liboxen::core;
use liboxen::core::staged::get_staged_db_manager;
use liboxen::error::OxenError;
use liboxen::model::LocalRepository;
use liboxen::model::merkle_tree::node::EMerkleTreeNode;
use liboxen::model::metadata::metadata_image::ImgResize;
use liboxen::model::metadata::metadata_video::VideoThumbnail;
use liboxen::repositories;
use liboxen::util;
use liboxen::util::hasher;
use liboxen::view::workspaces::RenameRequest;
use liboxen::view::{
ErrorFileInfo, ErrorFilesResponse, FilePathsResponse, FileWithHash, StatusMessage,
StatusMessageDescription,
};
use actix_multipart::Multipart;
use actix_web::Error;
use actix_web::{HttpRequest, HttpResponse, web};
use flate2::read::GzDecoder;
use futures_util::TryStreamExt as _;
use serde::Deserialize;
use std::io::Read as StdRead;
use std::path::PathBuf;
use std::sync::Arc;
use utoipa;
#[derive(utoipa::ToSchema)]
pub struct FileUpload {
#[schema(value_type = String, format = Binary)]
pub file: Vec<u8>,
}
#[derive(Deserialize, Debug)]
pub struct WorkspaceFileQueryParams {
pub width: Option<u32>,
pub height: Option<u32>,
pub timestamp: Option<f64>,
pub thumbnail: Option<bool>,
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path}",
description = "Get a file from a workspace.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("path" = String, Path, description = "The path to the file in the workspace", example = "images/train/dog_1.jpg"),
("width" = Option<u32>, Query, description = "Width for image resize or video thumbnail", example = 320),
("height" = Option<u32>, Query, description = "Height for image resize or video thumbnail", example = 240),
("timestamp" = Option<f64>, Query, description = "Timestamp in seconds to extract video thumbnail from", example = 1.0),
("thumbnail" = Option<bool>, Query, description = "Set to true to generate a video thumbnail instead of returning the full video", example = true)
),
responses(
(status = 200, description = "File content returned as a stream. Content-Type varies: matches the file's MIME type for regular files and image resizes, or 'image/jpeg' for video thumbnails",
body = Vec<u8>,
headers(
("oxen-revision-id" = String, description = "The commit ID of the file version")
)
),
(status = 404, description = "Workspace or File not found"),
(status = 400, description = "Invalid parameters")
)
)]
pub async fn get(
req: HttpRequest,
query: web::Query<WorkspaceFileQueryParams>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let version_store = repo.version_store();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Err(OxenHttpError::NotFound);
};
let path = path_param(&req, "path")?.to_string();
log::debug!("got workspace file path {:?}", &path);
let staged_db_manager = get_staged_db_manager(&workspace.workspace_repo)?;
let file_node = match staged_db_manager.read_from_staged_db(&path)? {
Some(staged_node) => match staged_node.node.node {
EMerkleTreeNode::File(f) => Ok(f),
_ => Err(OxenError::basic_str(
"Only single file download is supported",
)),
}?,
None => {
if let Some(file_node) = repositories::tree::get_file_by_path(
&workspace.base_repo,
&workspace.commit,
&path,
)? {
file_node
} else {
return Err(OxenHttpError::InternalOxenError(
OxenError::resource_not_found(&path),
));
}
}
};
let file_hash = file_node.hash();
let hash_str = file_hash.to_string();
let mime_type = file_node.mime_type();
let num_bytes = file_node.num_bytes();
let last_commit_id = file_node.last_commit_id().to_string();
let query_params = query.into_inner();
if (query_params.width.is_some() || query_params.height.is_some())
&& mime_type.starts_with("image/")
{
let img_resize = ImgResize {
width: query_params.width,
height: query_params.height,
};
log::debug!("img_resize {img_resize:?}");
let (file_stream, content_length) = util::fs::handle_image_resize(
Arc::clone(&version_store),
hash_str.clone(),
&PathBuf::from(&path),
img_resize,
)
.await?;
return Ok(
file_stream_response(mime_type, &last_commit_id, Some(content_length))
.streaming(file_stream),
);
}
if query_params.thumbnail == Some(true) && mime_type.starts_with("video/") {
let video_thumbnail = VideoThumbnail {
width: query_params.width,
height: query_params.height,
timestamp: query_params.timestamp.or(Some(1.0)),
thumbnail: query_params.thumbnail,
};
log::debug!("video_thumbnail {video_thumbnail:?}");
let stream = util::fs::handle_video_thumbnail(
Arc::clone(&version_store),
hash_str,
video_thumbnail,
&workspace.dir(),
)
.await?;
return Ok(file_stream_response("image/jpeg", &last_commit_id, None).streaming(stream));
}
log::debug!("did not hit the resize or thumbnail cache");
let stream = version_store.get_version_stream(&hash_str).await?;
Ok(file_stream_response(mime_type, &last_commit_id, Some(num_bytes)).streaming(stream))
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path}",
description = "Upload and stage files to a workspace. Accept a multipart with either gzipped or uncompressed file parts. Use the filename from the file part and compute the file hash from the content.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("path" = String, Path, description = "The target path to upload the file to", example = "data/train")
),
request_body(
content_type = "multipart/form-data",
description = "Multipart upload of file. Each file should be sent as a separate file part",
content = FileUpload,
),
responses(
(status = 200, description = "File successfully uploaded to workspace", body = FilePathsResponse),
(status = 404, description = "Workspace not found"),
(status = 400, description = "Invalid upload request")
)
)]
pub async fn add(req: HttpRequest, payload: Multipart) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, &repo_name)?;
let directory = path_param(&req, "path")?.to_string();
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let (upload_files, err_files) = save_parts(payload, &repo).await?;
log::debug!("Save multiparts found {} err_files", err_files.len());
log::debug!(
"Calling add version files from the core workspace logic with {} files",
upload_files.len(),
);
let base_dir = PathBuf::from(&directory);
let mut files_to_stage = Vec::with_capacity(upload_files.len());
for upload_file in upload_files {
let joined = base_dir.join(&upload_file.path);
let dst_path = match util::fs::validate_and_normalize_path(&joined) {
Ok(dst_path) => dst_path,
Err(e) => {
return Err(OxenHttpError::BadRequest(
format!("Invalid staging path {joined:?}: {e}").into(),
));
}
};
files_to_stage.push((dst_path, upload_file.hash));
}
let (ret_files, stage_err_files) =
core::v_latest::workspaces::files::add_version_files_at_paths(&workspace, files_to_stage)
.await?;
for err in &stage_err_files {
log::error!("Error staging file {:?}: {}", err.path, err.error);
}
Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_created(),
paths: ret_files,
}))
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/batch/{directory}",
description = "Stage file nodes to a workspace. Do not upload file contents to the repository.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("directory" = String, Path, description = "The directory to stage the files into", example = "data/train")
),
request_body(
content = Vec<FileWithHash>,
description = "List of files and their pre-calculated hashes (must exist in version store).",
example = json!([
{
"path": "images/train/dog.jpg",
"hash": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
}
])
),
responses(
(status = 200, description = "Files staged successfully", body = ErrorFilesResponse),
(status = 404, description = "Workspace not found")
)
)]
pub async fn add_version_files(
req: HttpRequest,
payload: web::Json<Vec<FileWithHash>>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
if client_must_use_multipart_staging(&req, app_data.test_mode) {
return Err(OxenHttpError::EndpointDeprecated(
"The JSON workspace-staging endpoint is deprecated. Upload file contents to the \
multipart workspace files endpoint (POST /workspaces/{id}/files/{path}) so the \
server can compute file metadata."
.into(),
));
}
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let directory = path_param(&req, "directory")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let files_with_hash: Vec<FileWithHash> = payload.into_inner();
log::debug!(
"Calling add version files from the core workspace logic with {} files",
files_with_hash.len(),
);
let err_files = core::v_latest::workspaces::files::add_version_files(
&repo,
&workspace,
&files_with_hash,
&directory,
)
.await?;
log::debug!("Staging complete with {:?} err files", err_files.len());
Ok(HttpResponse::Ok().json(ErrorFilesResponse {
status: StatusMessage::resource_created(),
err_files,
}))
}
#[utoipa::path(
delete,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files",
description = "Stage files for removal from the repository. Accepts both files and directories.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745")
),
request_body(
content = Vec<String>,
description = "List of paths to stage for removal",
example = json!(["images/train/dog_1.jpg", "annotations/incorrect.xml"])
),
responses(
(status = 200, description = "Files successfully removed", body = FilePathsResponse),
(status = 206, description = "Some files could not be found/removed (returns paths of files not found)", body = FilePathsResponse),
(status = 404, description = "Workspace not found")
)
)]
pub async fn rm_files(
req: HttpRequest,
payload: web::Json<Vec<PathBuf>>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
let paths_to_remove: Vec<PathBuf> = payload.into_inner();
let mut ret_files = vec![];
let mut err_files = vec![];
for path in &paths_to_remove {
err_files.extend(repositories::workspaces::files::rm(&workspace, &path).await?);
log::debug!("rm ✅ success! staged file {path:?} as removed");
ret_files.push(path);
}
log::debug!("err_files: {err_files:?}");
if err_files.is_empty() {
Ok(HttpResponse::Ok().json(FilePathsResponse {
status: StatusMessage::resource_deleted(),
paths: paths_to_remove,
}))
} else {
let error_paths: Vec<PathBuf> = err_files
.into_iter()
.filter_map(|err_info| err_info.path)
.collect();
Ok(HttpResponse::PartialContent().json(FilePathsResponse {
status: StatusMessage::resource_not_found(),
paths: error_paths,
}))
}
}
pub async fn validate(req: HttpRequest, _body: String) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let Some(_workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
Ok(HttpResponse::Ok().json(StatusMessage::resource_found()))
}
#[utoipa::path(
patch,
path = "/api/repos/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path}",
description = "Move or rename a file within the workspace.",
tag = "Workspace Files",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("workspace_id" = String, Path, description = "The UUID of the workspace", example = "580c0587-c157-417b-9118-8686d63d2745"),
("path" = String, Path, description = "The current path to the file to move/rename", example = "images/train/dog_1.jpg")
),
request_body(
content = RenameRequest,
description = "The new path for the file",
example = json!({"new_path": "images/train/renamed_dog_1.jpg"})
),
responses(
(status = 200, description = "File successfully moved/renamed", body = StatusMessage),
(status = 400, description = "Invalid request (empty new_path or new_path already exists)"),
(status = 404, description = "Workspace or file not found")
)
)]
pub async fn mv(req: HttpRequest, body: String) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let workspace_id = path_param(&req, "workspace_id")?.to_string();
let repo = get_repo(app_data, namespace, repo_name)?;
let path = PathBuf::from(path_param(&req, "path")?);
let body: RenameRequest = serde_json::from_str(&body)?;
if body.new_path.is_empty() {
return Err(OxenHttpError::BadRequest("new_path cannot be empty".into()));
}
let new_path = util::fs::validate_and_normalize_path(&body.new_path)?;
let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
return Ok(HttpResponse::NotFound()
.json(StatusMessageDescription::workspace_not_found(workspace_id)));
};
if repositories::tree::get_node_by_path(&repo, &workspace.commit, &new_path)?.is_some() {
return Err(OxenHttpError::BadRequest(
"new_path already exists in the repository".into(),
));
}
if util::fs::is_tabular(&path) {
repositories::workspaces::data_frames::rename(&workspace, &path, &new_path).await?;
} else {
repositories::workspaces::files::mv(&workspace, &path, &new_path)?;
}
Ok(HttpResponse::Ok().json(StatusMessage::resource_updated()))
}
pub async fn save_parts(
mut payload: Multipart,
repo: &LocalRepository,
) -> Result<(Vec<FileWithHash>, Vec<ErrorFileInfo>), Error> {
let version_store = repo.version_store();
let gzip_mime: mime::Mime = "application/gzip".parse().unwrap();
let mut upload_files: Vec<FileWithHash> = vec![];
let mut err_files: Vec<ErrorFileInfo> = vec![];
while let Some(mut field) = payload.try_next().await? {
let Some(content_disposition) = field.content_disposition().cloned() else {
continue;
};
if let Some(name) = content_disposition.get_name()
&& (name == "file[]" || name == "file")
{
let upload_filename = content_disposition.get_filename().map_or_else(
|| {
Err(actix_web::error::ErrorBadRequest(
"Missing hash in multipart request",
))
},
|fhash_os_str| Ok(fhash_os_str.to_string()),
)?;
let mut field_bytes = Vec::new();
while let Some(chunk) = field.try_next().await? {
field_bytes.extend_from_slice(&chunk);
}
let is_gzipped = field
.content_type()
.map(|mime| {
mime.type_() == gzip_mime.type_() && mime.subtype() == gzip_mime.subtype()
})
.unwrap_or(false);
let upload_filename_copy = upload_filename.clone();
let (upload_filehash, data_to_store) =
match actix_web::web::block(move || -> Result<(String, Vec<u8>), OxenError> {
if is_gzipped {
log::debug!(
"Decompressing gzipped data for file: {upload_filename_copy:?}"
);
let max_decompressed_size = stream_segment_size();
let mut decoder =
GzDecoder::new(&field_bytes[..]).take(max_decompressed_size + 1);
let mut decompressed_bytes: Vec<u8> = Vec::new();
decoder.read_to_end(&mut decompressed_bytes).map_err(|e| {
OxenError::internal_error(format!(
"Failed to decompress gzipped data: {e}"
))
})?;
let decompressed_size = decompressed_bytes.len() as u64;
if decompressed_size > max_decompressed_size {
return Err(OxenError::internal_error(format!(
"Decompressed size {decompressed_size} exceeds the \
{max_decompressed_size} byte limit"
)));
}
let hash = hasher::hash_buffer(&decompressed_bytes);
Ok((hash, decompressed_bytes))
} else {
log::debug!("Data for file {upload_filename_copy:?} is not gzipped.");
let hash = hasher::hash_buffer(&field_bytes);
Ok((hash, field_bytes))
}
})
.await
{
Ok(Ok((hash, data))) => (hash, data),
Ok(Err(e)) => {
log::error!(
"Failed to decompress data for file {}: {:?}",
&upload_filename,
e
);
record_error_file(
&mut err_files,
upload_filename.clone(),
None,
format!("Failed to decompress data: {e:?}"),
);
continue;
}
Err(e) => {
log::error!(
"Failed to execute blocking decompression task for file {}: {}",
&upload_filename,
e
);
record_error_file(
&mut err_files,
upload_filename.clone(),
None,
format!("Failed to execute blocking decompression: {e}"),
);
continue;
}
};
match version_store
.store_version(&upload_filehash, data_to_store.into())
.await
{
Ok(_) => {
upload_files.push(FileWithHash {
hash: upload_filehash.to_string(),
path: upload_filename.into(),
});
log::info!("Successfully stored version for hash: {}", &upload_filehash);
}
Err(e) => {
log::error!(
"Failed to store version for hash {}: {}",
&upload_filehash,
e
);
record_error_file(
&mut err_files,
upload_filehash.clone(),
None,
format!("Failed to store version: {e}"),
);
continue;
}
}
}
}
Ok((upload_files, err_files))
}
fn record_error_file(
err_files: &mut Vec<ErrorFileInfo>,
filehash: String,
filepath: Option<PathBuf>,
error: String,
) {
let info = ErrorFileInfo {
hash: filehash,
path: filepath,
error,
};
err_files.push(info);
}
#[cfg(test)]
mod tests {
use crate::app_data::OxenAppData;
use crate::controllers;
use crate::test;
use actix_web::http::header;
use actix_web::{App, web};
use liboxen::error::OxenError;
use liboxen::repositories;
use liboxen::util;
use liboxen::view::FilePathsResponse;
use actix_multipart::test::create_form_data_payload_and_headers;
use actix_web::web::Bytes;
use mime;
#[actix_web::test]
async fn test_get_nonexistent_file_returns_404() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let file_path = "this_file_does_not_exist.txt";
let uri =
format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{file_path}");
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::get().to(controllers::workspaces::files::get),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::NOT_FOUND);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_workspace_file_get_exposes_content_length() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Get-Headers";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
let file_content = "Hello";
util::fs::write_to_path(&hello_file, file_content)?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let uri =
format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/hello.txt");
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::get().to(controllers::workspaces::files::get),
),
)
.await;
let req = actix_web::test::TestRequest::get().uri(&uri).to_request();
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_LENGTH).unwrap(),
file_content.len().to_string().as_str()
);
assert_eq!(
resp.headers()
.get(header::ACCESS_CONTROL_EXPOSE_HEADERS)
.unwrap(),
header::CONTENT_LENGTH.as_str()
);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_workspace_files_add_stages_multipart_upload() -> Result<(), OxenError>
{
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Add-Multipart";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let file_content = "uploaded contents";
let (body, headers) = create_form_data_payload_and_headers(
"file[]",
Some("uploaded.txt".to_string()),
Some(mime::TEXT_PLAIN),
Bytes::from(file_content),
);
let uri = format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/data");
let req = actix_web::test::TestRequest::post()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()));
let req = headers
.into_iter()
.fold(req, |req, hdr| req.insert_header(hdr))
.set_payload(body)
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::post().to(controllers::workspaces::files::add),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: FilePathsResponse = serde_json::from_slice(&bytes)?;
assert_eq!(response.status.status, "success");
assert!(
response
.paths
.iter()
.any(|p| p.file_name().and_then(|f| f.to_str()) == Some("uploaded.txt")),
"expected a staged path ending in uploaded.txt, got {:?}",
response.paths
);
let file_hash = util::hasher::hash_buffer(file_content.as_bytes());
let stored = repo.version_store().get_version(&file_hash).await?;
assert_eq!(stored, file_content.as_bytes());
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_workspace_files_add_preserves_subdirectory() -> Result<(), OxenError>
{
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Add-Nested";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let (body, headers) = create_form_data_payload_and_headers(
"file[]",
Some("nested/dog.txt".to_string()),
Some(mime::TEXT_PLAIN),
Bytes::from("nested contents"),
);
let uri = format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/data");
let req = actix_web::test::TestRequest::post()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()));
let req = headers
.into_iter()
.fold(req, |req, hdr| req.insert_header(hdr))
.set_payload(body)
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::post().to(controllers::workspaces::files::add),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: FilePathsResponse = serde_json::from_slice(&bytes)?;
assert!(
response
.paths
.contains(&std::path::PathBuf::from("data/nested/dog.txt")),
"expected the subdirectory preserved as data/nested/dog.txt, got {:?}",
response.paths
);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_workspace_files_add_rejects_path_traversal() -> Result<(), OxenError>
{
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Add-Traversal";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let (body, headers) = create_form_data_payload_and_headers(
"file[]",
Some("../escape.txt".to_string()),
Some(mime::TEXT_PLAIN),
Bytes::from("malicious"),
);
let uri = format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/data");
let req = actix_web::test::TestRequest::post()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()));
let req = headers
.into_iter()
.fold(req, |req, hdr| req.insert_header(hdr))
.set_payload(body)
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::post().to(controllers::workspaces::files::add),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::BAD_REQUEST);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_workspace_files_add_stages_at_root_for_empty_directory()
-> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Add-Root";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let (body, headers) = create_form_data_payload_and_headers(
"file[]",
Some("root.txt".to_string()),
Some(mime::TEXT_PLAIN),
Bytes::from("root contents"),
);
let uri = format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/");
let req = actix_web::test::TestRequest::post()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()));
let req = headers
.into_iter()
.fold(req, |req, hdr| req.insert_header(hdr))
.set_payload(body)
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::post().to(controllers::workspaces::files::add),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: FilePathsResponse = serde_json::from_slice(&bytes)?;
assert!(
response
.paths
.contains(&std::path::PathBuf::from("root.txt")),
"expected staging at the workspace root (root.txt with no prefix), got {:?}",
response.paths
);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_add_version_files_returns_426_for_up_to_date_client() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Deprecated-Staging";
let workspace_id = uuid::Uuid::new_v4().to_string();
let uri = format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/versions/data");
let req = actix_web::test::TestRequest::post()
.uri(&uri)
.insert_header((header::USER_AGENT, "Oxen/0.99.0 (test; tokio)"))
.set_json(serde_json::json!([]))
.app_data(OxenAppData::new(sync_dir.to_path_buf()))
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/versions/{directory}",
web::post().to(controllers::workspaces::files::add_version_files),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::UPGRADE_REQUIRED);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_workspace_files_add_rejects_gzip_bomb() -> Result<(), OxenError> {
use flate2::Compression;
use flate2::write::GzEncoder;
use liboxen::constants::stream_segment_size;
use std::io::Write;
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Workspace-Gzip-Bomb";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let hello_file = repo.path.join("hello.txt");
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
let commit = repositories::commit(&repo, "First commit")?;
let workspace_id = uuid::Uuid::new_v4().to_string();
repositories::workspaces::create(&repo, &commit, &workspace_id, false)?;
let decompressed = vec![0u8; stream_segment_size() as usize + 1];
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&decompressed)?;
let gzipped = encoder.finish()?;
let (body, headers) = create_form_data_payload_and_headers(
"file[]",
Some("bomb.bin".to_string()),
Some("application/gzip".parse::<mime::Mime>().unwrap()),
Bytes::from(gzipped),
);
let uri = format!("/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/data");
let req = actix_web::test::TestRequest::post()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()));
let req = headers
.into_iter()
.fold(req, |req, hdr| req.insert_header(hdr))
.set_payload(body)
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/workspaces/{workspace_id}/files/{path:.*}",
web::post().to(controllers::workspaces::files::add),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: FilePathsResponse = serde_json::from_slice(&bytes)?;
assert!(
response.paths.is_empty(),
"expected no staged paths for a rejected gzip bomb, got {:?}",
response.paths
);
let bomb_hash = util::hasher::hash_buffer(&decompressed);
assert!(
!repo.version_store().version_exists(&bomb_hash).await?,
"gzip bomb contents should not have been stored"
);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
}