pub mod chunks;
use crate::errors::OxenHttpError;
use crate::helpers::{file_stream_response, get_repo};
use crate::params::{app_data, parse_resource, path_param};
use actix_multipart::Multipart;
use actix_web::{Error, HttpRequest, HttpResponse, web};
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::write::GzipEncoder;
use async_zip::Compression;
use async_zip::base::write::ZipFileWriter;
use flate2::read::GzDecoder;
use futures_util::{StreamExt, TryStreamExt as _};
use liboxen::error::OxenError;
use liboxen::model::LocalRepository;
use liboxen::model::metadata::metadata_image::ImgResize;
use liboxen::repositories;
use liboxen::util;
use liboxen::view::versions::{CleanCorruptedVersionsResponse, VersionFile, VersionFileResponse};
use liboxen::view::{ErrorFileInfo, ErrorFilesResponse, FileWithHash, StatusMessage};
use mime;
use std::io::Read as StdRead;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::io::BufReader;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio_tar::Builder;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use tokio_util::compat::TokioAsyncWriteCompatExt;
use tokio_util::io::{ReaderStream, StreamReader};
use utoipa::ToSchema;
const DOWNLOAD_BUFFER_SIZE: usize = 2 * 1024 * 1024;
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/versions/{version_id}/metadata",
tag = "Version Files",
description = "Get metadata for a specific file version by its hash.",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("version_id" = String, Path, description = "The hash ID of the file version", example = "1eb45ac94f3eab120f3a"),
),
responses(
(status = 200, description = "File metadata found", body = VersionFileResponse),
(status = 404, description = "Version file not found"),
)
)]
pub async fn metadata(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let version_id = path_param(&req, "version_id")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let exists = repo.version_store()?.version_exists(&version_id).await?;
if !exists {
return Err(OxenHttpError::NotFound);
}
let file_size = repo.version_store()?.get_version_size(&version_id).await?;
Ok(HttpResponse::Ok().json(VersionFileResponse {
status: StatusMessage::resource_found(),
version: VersionFile {
hash: version_id,
size: file_size,
},
}))
}
pub async fn clean(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, &repo_name)?;
let version_store = repo.version_store()?;
let result = version_store.clean_corrupted_versions(false).await?;
Ok(HttpResponse::Ok().json(CleanCorruptedVersionsResponse {
status: StatusMessage::resource_found(),
result,
}))
}
#[tracing::instrument(skip_all)]
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/versions/{resource}",
tag = "Version Files",
description = "Download a file by its resource path, with optional image resizing.",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
("resource" = String, Path, description = "The resource identifier (e.g., 'main/path/to/file.ext' or a commit ID/branch name followed by the file path)", example = "main/images/dog_1.jpg"),
ImgResize
),
responses(
(status = 200, description = "File content returned as a stream. Content-Type matches the file's MIME type (e.g., image/jpeg), or the resized image's MIME type.",
body = Vec<u8>,
headers(
("oxen-revision-id" = String, description = "The commit ID of the file version")
)
),
(status = 404, description = "Repository, commit, or file not found"),
)
)]
pub async fn download(
req: HttpRequest,
query: web::Query<ImgResize>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, &namespace, &repo_name)?;
let version_store = repo.version_store()?;
let resource = parse_resource(&req, &repo)?;
let commit = resource.commit.clone().ok_or(OxenHttpError::NotFound)?;
let path = resource.path.clone();
log::debug!("Download resource {namespace}/{repo_name}/{resource} version file");
let entry = repositories::entries::get_file(&repo, &commit, &path)?
.ok_or_else(|| OxenError::path_does_not_exist(path.clone()))?;
let file_hash = entry.hash();
let hash_str = file_hash.to_string();
let mime_type = entry.mime_type();
let num_bytes = entry.num_bytes();
let last_commit_id = entry.last_commit_id().to_string();
let img_resize = query.into_inner();
if (img_resize.width.is_some() || img_resize.height.is_some())
&& mime_type.starts_with("image/")
{
log::debug!("img_resize {img_resize:?}");
let (file_stream, content_length) =
util::fs::handle_image_resize(Arc::clone(&version_store), hash_str, &path, img_resize)
.await?;
return Ok(
file_stream_response(mime_type, &last_commit_id, Some(content_length))
.streaming(file_stream),
);
} else {
log::debug!("did not hit the resize cache");
}
let stream = version_store.get_version_stream(&hash_str).await?;
Ok(file_stream_response(mime_type, &last_commit_id, Some(num_bytes)).streaming(stream))
}
#[tracing::instrument(skip_all)]
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/versions/batch-download",
tag = "Version Files",
description = "Download multiple files as a gzipped tarball by providing their hashes.",
summary = "Batch download files (Tarball)",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
),
request_body(
content = Vec<u8>,
content_type = "application/gzip",
description = "Gzip compressed binary payload containing a line-delimited list of merkle hashes to download",
),
responses(
(status = 200, description = "Tarball of all requested files, gzipped.", content_type = "application/gzip"),
)
)]
pub async fn batch_download(
req: HttpRequest,
mut body: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, &repo_name)?;
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?);
}
log::debug!(
"batch_download got repo [{}] and content_ids size {}",
repo_name,
bytes.len()
);
let mut gz = GzDecoder::new(&bytes[..]);
let mut line_delimited_files = String::new();
if let Err(e) = gz.read_to_string(&mut line_delimited_files) {
log::error!("Failed to decompress gzip payload: {e}");
return Err(OxenHttpError::from(e));
}
let file_hashes: Vec<String> = line_delimited_files
.lines()
.map(str::trim)
.filter(|hash| !hash.is_empty())
.filter(|hash| hash.chars().all(|c| c.is_ascii_hexdigit()))
.map(|hash| hash.to_string())
.collect();
log::debug!("Got {} file hashes", file_hashes.len());
stream_versions_tar_gz(&repo, file_hashes).await
}
pub async fn stream_versions_tar_gz(
repo: &LocalRepository,
file_hashes: Vec<String>,
) -> Result<HttpResponse, OxenHttpError> {
let version_store = repo.version_store()?;
let (writer, reader) = tokio::io::duplex(DOWNLOAD_BUFFER_SIZE);
let version_store_clone = version_store.clone();
let file_hashes_clone = file_hashes.clone();
let (error_tx, mut error_rx) = tokio::sync::mpsc::unbounded_channel();
let writer_task = async move {
let enc = GzipEncoder::new(writer);
let mut tar = Builder::new(enc);
let mut had_error = false;
for file_hash in file_hashes_clone.iter() {
match version_store_clone.get_version_stream(file_hash).await {
Ok(data) => {
let file_size = match version_store_clone.get_version_size(file_hash).await {
Ok(size) => size,
Err(e) => {
log::error!("Failed to get version file size for {file_hash}: {e}");
error_tx.send(e).ok();
had_error = true;
break;
}
};
let mut header = tokio_tar::Header::new_gnu();
header.set_size(file_size as u64);
if let Err(e) = header.set_path(file_hash) {
log::error!("Failed to set path for {file_hash}: {e}");
error_tx
.send(OxenError::basic_str(format!(
"Failed to set path for {file_hash}: {e}"
)))
.ok();
had_error = true;
break;
}
header.set_mode(0o644);
header.set_uid(0);
header.set_gid(0);
header.set_cksum();
let mut reader = StreamReader::new(data);
if let Err(e) = tar.append(&header, &mut reader).await {
log::error!("Failed to append {file_hash} to tar: {e}");
error_tx.send(OxenError::IO(e)).ok();
had_error = true;
break;
}
log::info!(
"Successfully appended data to tarball for hash: {}",
&file_hash
);
}
Err(e) => {
log::error!("Failed to get version {file_hash}: {e}");
error_tx.send(e).ok();
had_error = true;
break;
}
}
}
if let Err(e) = tar.finish().await {
log::error!("Failed to finish tar: {e}");
error_tx.send(OxenError::IO(e)).ok();
had_error = true;
}
match tar.into_inner().await {
Ok(mut enc) => {
if let Err(e) = enc.shutdown().await {
log::error!("Failed to shutdown gzip encoder: {e}");
error_tx.send(OxenError::IO(e)).ok();
had_error = true;
}
log::info!("Successfully finished tarball");
}
Err(e) => {
log::error!("Failed to get encoder: {e}");
error_tx.send(OxenError::IO(e)).ok();
had_error = true;
}
};
if had_error {
log::warn!("Stream closed due to earlier error");
} else {
log::info!("Streaming completed successfully");
}
};
tokio::spawn(writer_task);
let stream = ReaderStream::new(reader).map(move |chunk| {
if let Ok(err) = error_rx.try_recv() {
log::error!("Stream error: {err}");
return Err(OxenHttpError::from(err));
}
chunk.map_err(OxenHttpError::from)
});
Ok(HttpResponse::Ok()
.content_type("application/gzip")
.streaming(stream))
}
pub async fn stream_versions_zip(
repo: &LocalRepository,
files: Vec<FileWithHash>,
) -> Result<HttpResponse, OxenHttpError> {
let version_store = repo.version_store()?;
let (writer, reader) = tokio::io::duplex(DOWNLOAD_BUFFER_SIZE);
let version_store_clone = version_store.clone();
let files_clone = files.clone();
let (error_tx, mut error_rx) = tokio::sync::mpsc::unbounded_channel();
let writer_task = async move {
let compat_writer = writer.compat_write();
let mut zip_writer = ZipFileWriter::new(compat_writer);
let mut had_error = false;
for file in files_clone.iter() {
let path = file.path.to_str();
let path = match path.map(|s| s.to_string()) {
Some(path) => path,
None => {
let err = "Invalid UTF-8 in path".to_string();
error_tx.send(OxenError::basic_str(err)).ok();
had_error = true;
break;
}
};
let hash = &file.hash;
let file_size = match version_store_clone.get_version_size(hash).await {
Ok(size) => size,
Err(e) => {
log::error!("Failed to get version file size for {hash}: {e}");
error_tx.send(e).ok();
had_error = true;
break;
}
};
match version_store_clone.get_version_stream(hash).await {
Ok(data) => {
let mut reader = StreamReader::new(data);
let compression = Compression::Deflate;
let zip_entry_builder =
async_zip::ZipEntryBuilder::new(path.into(), compression)
.uncompressed_size(file_size as u64)
.unix_permissions(0o644);
let zip_entry = zip_entry_builder.build();
let entry_writer = match zip_writer.write_entry_stream(zip_entry).await {
Ok(writer) => writer,
Err(e) => {
log::error!("Failed to append {hash} to zip: {e}");
error_tx.send(OxenError::IO(std::io::Error::other(e))).ok();
had_error = true;
break;
}
};
let mut compat_writer = entry_writer.compat_write();
if let Err(e) = tokio::io::copy(&mut reader, &mut compat_writer).await {
log::error!("Failed to stream data for {hash} into zip: {e}");
error_tx.send(OxenError::IO(std::io::Error::other(e))).ok();
had_error = true;
break;
}
let entry_writer = compat_writer.into_inner();
if let Err(e) = entry_writer.close().await {
log::error!("Failed to close zip entry for {hash}: {e}");
error_tx.send(OxenError::IO(std::io::Error::other(e))).ok();
had_error = true;
break;
}
log::info!("Successfully appended data to zip for hash: {}", &hash);
}
Err(e) => {
log::error!("Failed to get version {hash}: {e}");
error_tx.send(OxenError::IO(std::io::Error::other(e))).ok();
had_error = true;
break;
}
}
}
if let Err(e) = zip_writer.close().await {
log::error!("Failed to finish zip file: {e}");
error_tx.send(OxenError::IO(std::io::Error::other(e))).ok();
had_error = true;
} else {
log::info!("Successfully finished zip file");
}
if !had_error {
log::info!("Streaming completed successfully");
} else {
log::warn!("Stream closed due to earlier error");
}
};
tokio::spawn(writer_task);
let stream = tokio_util::io::ReaderStream::new(reader).map(move |chunk| {
if let Ok(err) = error_rx.try_recv() {
log::error!("Stream error: {err}");
return Err(OxenHttpError::from(err));
}
chunk.map_err(OxenHttpError::from)
});
Ok(HttpResponse::Ok()
.content_type("application/zip")
.streaming(stream))
}
#[derive(ToSchema)]
pub struct UploadVersionFile {
#[schema(value_type = String, format = Binary)]
pub file: Vec<u8>,
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/versions",
tag = "Version Files",
description = "Upload multiple files via multipart form, identified by their content hashes.",
summary = "Batch upload files (Multipart)",
params(
("namespace" = String, Path, description = "The namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "The name of the repository", example = "ImageNet-1k"),
),
request_body(
content_type = "multipart/form-data",
description = "Multipart upload of files. Each form field 'file[]' or 'file' should contain the file content (optionally gzip compressed), and the filename should be the content hash (e.g., 'file.jpg' is not used, the hash is the identifier).",
content = UploadVersionFile,
),
responses(
(status = 200, description = "Files successfully uploaded (check err_files for failures)", body = ErrorFilesResponse),
(status = 400, description = "Invalid multipart request"),
(status = 404, description = "Repository not found"),
)
)]
pub async fn batch_upload(
req: HttpRequest,
payload: Multipart,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, &repo_name)?;
let err_files = save_multiparts(payload, &repo).await?;
log::debug!("batch upload complete with err_files: {}", err_files.len());
Ok(HttpResponse::Ok().json(ErrorFilesResponse {
status: StatusMessage::resource_created(),
err_files,
}))
}
pub async fn save_multiparts(
mut payload: Multipart,
repo: &LocalRepository,
) -> Result<Vec<ErrorFileInfo>, Error> {
let version_store = repo.version_store().map_err(|oxen_err: OxenError| {
log::error!("Failed to get version store: {oxen_err:?}");
actix_web::error::ErrorInternalServerError(oxen_err.to_string())
})?;
let gzip_mime: mime::Mime = "application/gzip".parse().unwrap();
let mut err_files: Vec<ErrorFileInfo> = vec![];
while let Some(mut field) = payload.try_next().await? {
let Some(content_disposition) = field.content_disposition().cloned() else {
continue;
};
if let Some(name) = content_disposition.get_name()
&& (name == "file[]" || name == "file")
{
let upload_filehash = content_disposition.get_filename().map_or_else(
|| {
Err(actix_web::error::ErrorBadRequest(
"Missing hash in multipart request",
))
},
|fhash_os_str| Ok(fhash_os_str.to_string()),
)?;
log::debug!("upload file_hash: {upload_filehash:?}");
let is_gzipped = field
.content_type()
.map(|mime| {
mime.type_() == gzip_mime.type_() && mime.subtype() == gzip_mime.subtype()
})
.unwrap_or(false);
let mut field_bytes = Vec::new();
while let Some(chunk) = field.try_next().await? {
field_bytes.extend_from_slice(&chunk);
}
let field_size = field_bytes.len() as u64;
let reader: Box<dyn AsyncRead + Send + Unpin> = if is_gzipped {
let cursor = std::io::Cursor::new(field_bytes);
let buf_reader = BufReader::new(cursor);
Box::new(GzipDecoder::new(buf_reader))
} else {
let cursor = std::io::Cursor::new(field_bytes);
Box::new(cursor)
};
match version_store
.store_version_from_reader(&upload_filehash, reader, field_size)
.await
{
Ok(_) => {
log::info!("Successfully stored version for hash: {upload_filehash}");
}
Err(e) => {
log::error!("Failed to store version for hash {upload_filehash}: {e}");
record_error_file(
&mut err_files,
upload_filehash.clone(),
None,
format!("Failed to store version: {e}"),
);
continue;
}
}
}
}
Ok(err_files)
}
fn record_error_file(
err_files: &mut Vec<ErrorFileInfo>,
filehash: String,
filepath: Option<PathBuf>,
error: String,
) {
let info = ErrorFileInfo {
hash: filehash,
path: filepath,
error,
};
err_files.push(info);
}
#[cfg(test)]
mod tests {
use crate::app_data::OxenAppData;
use crate::controllers;
use crate::test;
use actix_multipart::test::create_form_data_payload_and_headers;
use actix_web::http::header;
use actix_web::{App, body, web, web::Bytes};
use flate2::Compression;
use flate2::write::GzEncoder;
use liboxen::error::OxenError;
use liboxen::repositories;
use liboxen::util;
use liboxen::view::ErrorFilesResponse;
use mime;
use std::io::Write;
#[actix_web::test]
async fn test_controllers_versions_download() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
util::fs::create_dir_all(repo.path.join("data"))?;
let relative_path = "data/hello.txt";
let hello_file = repo.path.join(relative_path);
let file_content = "Hello";
util::fs::write_to_path(&hello_file, file_content)?;
repositories::add(&repo, &hello_file).await?;
repositories::commit(&repo, "First commit")?;
let uri = format!("/oxen/{namespace}/{repo_name}/versions/main/{relative_path}");
let req = actix_web::test::TestRequest::get()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()))
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/versions/{resource:.*}",
web::get().to(controllers::versions::download),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_LENGTH).unwrap(),
file_content.len().to_string().as_str()
);
assert_eq!(
resp.headers()
.get(header::ACCESS_CONTROL_EXPOSE_HEADERS)
.unwrap(),
header::CONTENT_LENGTH.as_str()
);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
assert_eq!(bytes, "Hello");
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_versions_download_resize_sets_content_length() -> Result<(), OxenError>
{
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Resize-Content-Length";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
util::fs::create_dir_all(repo.path.join("data"))?;
let relative_path = "data/pixel.png";
let image_file = repo.path.join(relative_path);
let png_bytes = [
0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48,
0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x06, 0x00, 0x00,
0x00, 0x1f, 0x15, 0xc4, 0x89, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x44, 0x41, 0x54, 0x78,
0x9c, 0x63, 0xf8, 0xcf, 0xc0, 0xf0, 0x1f, 0x00, 0x05, 0x00, 0x01, 0xff, 0x89, 0x99,
0x3d, 0x1d, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44, 0xae, 0x42, 0x60, 0x82,
];
util::fs::write_data(&image_file, &png_bytes)?;
repositories::add(&repo, &image_file).await?;
repositories::commit(&repo, "First commit")?;
let uri =
format!("/oxen/{namespace}/{repo_name}/versions/main/{relative_path}?width=2&height=2");
let req = actix_web::test::TestRequest::get()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()))
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/versions/{resource:.*}",
web::get().to(controllers::versions::download),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
assert_eq!(
resp.headers()
.get(header::ACCESS_CONTROL_EXPOSE_HEADERS)
.unwrap(),
header::CONTENT_LENGTH.as_str()
);
let content_length = resp
.headers()
.get(header::CONTENT_LENGTH)
.unwrap()
.to_str()
.unwrap()
.parse::<usize>()
.unwrap();
let bytes = body::to_bytes(resp.into_body()).await.unwrap();
assert_eq!(content_length, bytes.len());
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_versions_download_bad_commit_returns_404() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
util::fs::create_dir_all(repo.path.join("data"))?;
let relative_path = "data/hello.txt";
let hello_file = repo.path.join(relative_path);
util::fs::write_to_path(&hello_file, "Hello")?;
repositories::add(&repo, &hello_file).await?;
repositories::commit(&repo, "First commit")?;
let uri =
format!("/oxen/{namespace}/{repo_name}/versions/nonexistent_commit_id/{relative_path}");
let req = actix_web::test::TestRequest::get()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()))
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/versions/{resource:.*}",
web::get().to(controllers::versions::download),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::NOT_FOUND);
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
#[actix_web::test]
async fn test_controllers_versions_batch_upload() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Name";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;
let path = liboxen::test::add_txt_file_to_dir(&repo.path, "hello")?;
repositories::add(&repo, path).await?;
repositories::commit(&repo, "first commit")?;
let file_content = "Test Content";
let file_hash = util::hasher::hash_str(file_content);
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(file_content.as_bytes())?;
let compressed_bytes = encoder.finish()?;
let (body, headers) = create_form_data_payload_and_headers(
"file[]",
Some(file_hash.clone()),
Some("application/gzip".parse::<mime::Mime>().unwrap()),
Bytes::from(compressed_bytes),
);
let uri = format!("/oxen/{namespace}/{repo_name}/versions");
let req = actix_web::test::TestRequest::post()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()));
let req = headers
.into_iter()
.fold(req, |req, hdr| req.insert_header(hdr))
.set_payload(body)
.to_request();
let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/versions",
web::post().to(controllers::versions::batch_upload),
),
)
.await;
let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
let bytes = actix_http::body::to_bytes(resp.into_body()).await.unwrap();
let response: ErrorFilesResponse = serde_json::from_slice(&bytes)?;
assert_eq!(response.status.status, "success");
assert!(response.err_files.is_empty());
let version_store = repo.version_store()?;
let stored_data = version_store.get_version(&file_hash).await?;
assert_eq!(stored_data, file_content.as_bytes());
test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}
}