use liboxen::constants;
use liboxen::constants::COMMITS_DIR;
use liboxen::constants::DIR_HASHES_DIR;
use liboxen::constants::DIRS_DIR;
use liboxen::constants::HISTORY_DIR;
use liboxen::constants::VERSION_FILE_NAME;
use liboxen::core::commit_sync_status;
use liboxen::error::OxenError;
use liboxen::model::{Commit, LocalRepository};
use liboxen::opts::PaginateOpts;
use liboxen::perf_guard;
use liboxen::repositories;
use liboxen::util;
use liboxen::view::MerkleHashesResponse;
use liboxen::view::branch::BranchName;
use liboxen::view::entries::ListCommitEntryResponse;
use liboxen::view::tree::merkle_hashes::MerkleHashes;
use liboxen::view::{
CommitResponse, ListCommitResponse, PaginatedCommits, Pagination, RootCommitResponse,
StatusMessage,
};
use os_path::OsPath;
use crate::app_data::OxenAppData;
use crate::errors::OxenHttpError;
use crate::helpers::get_repo;
use crate::params::PageNumQuery;
use crate::params::parse_resource;
use crate::params::{app_data, path_param};
use actix_web::{Error, HttpRequest, HttpResponse, web};
use async_compression::tokio::bufread::GzipDecoder;
use bytesize::ByteSize;
use flate2::Compression;
use flate2::write::GzEncoder;
use futures_util::stream::StreamExt as _;
use serde::Deserialize;
use std::fs::OpenOptions;
use std::io::Cursor;
use std::io::Read;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use tokio::io::BufReader;
use tokio_tar::Archive;
use utoipa::IntoParams;
#[derive(Deserialize, Debug, IntoParams)]
pub struct ChunkedDataUploadQuery {
#[param(example = "a2c3d4e5f67890b1c2d3e4f5a6b7c8d9")]
pub hash: String, #[param(example = 1)]
pub chunk_num: usize, #[param(example = 10)]
pub total_chunks: usize, #[param(example = 100000000)]
pub total_size: usize, #[param(example = true)]
pub is_compressed: bool, #[param(example = "images/cow.jpg")]
pub filename: Option<String>, }
#[derive(Deserialize, IntoParams)]
pub struct ListMissingFilesQuery {
#[param(example = "abc1234567890def1234567890fedcba")]
pub base: Option<String>,
#[param(example = "84c76a5b2e9a2637f9091991475c404d")]
pub head: String,
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits",
tag = "Commits",
description = "List all commits in the repository's history.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
),
responses(
(status = 200, description = "List of commits", body = ListCommitResponse),
(status = 404, description = "Repository not found")
)
)]
pub async fn index(req: HttpRequest) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let commits = repositories::commits::list(&repo).unwrap_or_default();
Ok(HttpResponse::Ok().json(ListCommitResponse::success(commits)))
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/history/{resource}",
tag = "Commits",
description = "List paginated commit history for a revision or file path. Supports revision ranges (base..head) and path-specific history.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
("resource" = String, Path, description = "Commit ID, Branch name, or path to a file/directory", example = "main/data/train/image.jpg"),
PageNumQuery
),
responses(
(status = 200, description = "Paginated list of commits with total count and cache status", body = PaginatedCommits),
(status = 404, description = "Repository or resource not found")
)
)]
pub async fn history(
req: HttpRequest,
query: web::Query<PageNumQuery>,
) -> Result<HttpResponse, OxenHttpError> {
let _perf = perf_guard!("commits::history_endpoint");
let _perf_parse = perf_guard!("commits::history_parse_params");
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let resource_param = path_param(&req, "resource")?.to_string();
let pagination = PaginateOpts {
page_num: query.page.unwrap_or(constants::DEFAULT_PAGE_NUM),
page_size: query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE),
};
if repositories::is_empty(&repo)? {
return Ok(HttpResponse::Ok().json(PaginatedCommits::success(
vec![],
Pagination::empty(pagination),
)));
}
drop(_perf_parse);
log::debug!("commit_history resource_param: {resource_param:?}");
let _perf_resource = perf_guard!("commits::history_parse_resource");
let (resource, revision, commit) = if resource_param.contains("..") {
(None, Some(resource_param), None)
} else {
let resource = parse_resource(&req, &repo)?;
let commit = resource.clone().commit.ok_or(OxenHttpError::NotFound)?;
(Some(resource), None, Some(commit))
};
drop(_perf_resource);
match &resource {
Some(resource) if resource.path != Path::new("") => {
log::debug!("commit_history resource_param: {resource:?}");
let _perf_list = perf_guard!("commits::history_list_by_path");
let commits = repositories::commits::list_by_path_from_paginated(
&repo,
commit.as_ref().unwrap(), &resource.path,
pagination,
)?;
log::debug!("commit_history got {} commits", commits.commits.len());
Ok(HttpResponse::Ok().json(commits))
}
_ => {
log::debug!("commit_history revision: {revision:?}");
let revision_id = revision.as_ref().or_else(|| commit.as_ref().map(|c| &c.id));
if let Some(revision_id) = revision_id {
let _perf_list = perf_guard!("commits::history_list_from_revision");
let commits =
repositories::commits::list_from_paginated(&repo, revision_id, pagination)?;
log::debug!("commit_history got {} commits", commits.commits.len());
Ok(HttpResponse::Ok().json(commits))
} else {
Err(OxenHttpError::NotFound)
}
}
}
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/all",
description = "List all commits in a repository",
tag = "Commits",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
PageNumQuery
),
responses(
(status = 200, description = "Paginated list of all commits", body = PaginatedCommits),
(status = 404, description = "Repository not found")
)
)]
pub async fn list_all(
req: HttpRequest,
query: web::Query<PageNumQuery>,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let pagination = PaginateOpts {
page_num: query.page.unwrap_or(constants::DEFAULT_PAGE_NUM),
page_size: query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE),
};
let paginated_commits = repositories::commits::list_all_paginated(&repo, pagination)?;
Ok(HttpResponse::Ok().json(paginated_commits))
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/missing",
description = "From a list of commit hashes, list the ones not present on the server",
tag = "Commits",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
),
request_body(
content = MerkleHashes,
description = "List of commit hashes present on the client.",
example = json!({
"hashes": ["abc1234567890def1234567890fedcba", "84c76a5b2e9a2637f9091991475c404d"]
})
),
responses(
(status = 200, description = "List of commit hashes missing on the server", body = MerkleHashesResponse),
(status = 400, description = "Invalid JSON body"),
(status = 404, description = "Repository not found")
)
)]
pub async fn list_missing(
req: HttpRequest,
body: String,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let data: Result<MerkleHashes, serde_json::Error> = serde_json::from_str(&body);
let Ok(merkle_hashes) = data else {
log::error!("list_missing invalid JSON: {body:?}");
return Ok(HttpResponse::BadRequest().json(StatusMessage::error("Invalid JSON")));
};
log::debug!(
"list_missing checking {} commit hashes",
merkle_hashes.hashes.len()
);
let missing_commits =
repositories::tree::list_unsynced_commit_hashes(&repo, &merkle_hashes.hashes)?;
log::debug!(
"list_missing found {} missing commits",
missing_commits.len()
);
let response = MerkleHashesResponse {
status: StatusMessage::resource_found(),
hashes: missing_commits,
};
Ok(HttpResponse::Ok().json(response))
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/missing_files",
description = "List files that are referenced in a commit but not present on the server. Accept a commit range.",
tag = "Commits",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
ListMissingFilesQuery
),
responses(
(status = 200, description = "List of missing file entries", body = ListCommitEntryResponse),
(status = 404, description = "Repository or commit not found")
)
)]
pub async fn list_missing_files(
req: HttpRequest,
query: web::Query<ListMissingFilesQuery>,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let base_commit = match &query.base {
Some(base) => repositories::commits::get_by_id(&repo, base)?,
None => None,
};
let head_commit = repositories::commits::get_by_id(&repo, &query.head)?
.ok_or_else(|| OxenError::RevisionNotFound(query.head.as_str().into()))?;
let missing_files = repositories::entries::list_missing_files_in_commit_range(
&repo,
&base_commit,
&head_commit,
)
.await?;
let response = ListCommitEntryResponse {
status: StatusMessage::resource_found(),
entries: missing_files,
};
Ok(HttpResponse::Ok().json(response))
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/commits/synced",
tag = "Commits",
description = "Mark a list of commit hashes as successfully synchronized to the server.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
),
request_body(
content = MerkleHashes,
description = "List of commit hashes successfully synced to the server.",
example = json!({
"hashes": ["abc1234567890def1234567890fedcba", "84c76a5b2e9a2637f9091991475c404d"]
})
),
responses(
(status = 200, description = "Commits marked as synced", body = MerkleHashesResponse),
(status = 404, description = "Repository not found")
)
)]
pub async fn mark_commits_as_synced(
req: HttpRequest,
mut body: web::Payload,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repository = get_repo(&app_data.path, namespace, repo_name)?;
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?);
}
let request: MerkleHashes = serde_json::from_slice(&bytes)?;
let hashes = request.hashes;
log::debug!(
"mark_commits_as_synced marking {} commit hashes",
&hashes.len()
);
for hash in &hashes {
commit_sync_status::mark_commit_as_synced(&repository, hash)?;
}
log::debug!("successfully marked {} commit hashes", &hashes.len());
Ok(HttpResponse::Ok().json(MerkleHashesResponse {
status: StatusMessage::resource_found(),
hashes,
}))
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/{commit_id}",
tag = "Commits",
description = "Get details of a specific commit by its ID.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
("commit_id" = String, Path, description = "Hash ID of the commit", example = "84c76a5b2e9a2637f9091991475c404d"),
),
responses(
(status = 200, description = "Commit", body = CommitResponse),
(status = 404, description = "Commit not found")
)
)]
pub async fn show(req: HttpRequest) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let commit_id = path_param(&req, "commit_id")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let commit = repositories::commits::get_by_id(&repo, &commit_id)?
.ok_or_else(|| OxenError::RevisionNotFound(commit_id.into()))?;
Ok(HttpResponse::Ok().json(CommitResponse {
status: StatusMessage::resource_found(),
commit,
}))
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/{commit_or_branch}/parents",
tag = "Commits",
description = "Get the parent commits of a specific commit or the tip of a branch.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
("commit_or_branch" = String, Path, description = "Commit ID or Branch name", example = "main"),
),
responses(
(status = 200, description = "List of parent commits", body = ListCommitResponse),
(status = 404, description = "Commit or Branch not found")
)
)]
pub async fn parents(req: HttpRequest) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let commit_or_branch = path_param(&req, "commit_or_branch")?.to_string();
let repository = get_repo(&app_data.path, namespace, name)?;
let commit = repositories::revisions::get(&repository, &commit_or_branch)?
.ok_or_else(|| OxenError::RevisionNotFound(commit_or_branch.into()))?;
let parents = repositories::commits::list_from(&repository, &commit.id)?;
Ok(HttpResponse::Ok().json(ListCommitResponse {
status: StatusMessage::resource_found(),
commits: parents,
}))
}
#[tracing::instrument(skip_all)]
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits_db",
tag = "Commits",
description = "Download the commits database as a compressed tarball for cloning.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
),
responses(
(status = 200, description = "Tarball of commits DB"),
(status = 404, description = "Repository not found")
)
)]
pub async fn download_commits_db(
req: HttpRequest,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let repository = get_repo(&app_data.path, namespace, name)?;
let buffer = compress_commits_db(&repository)?;
Ok(HttpResponse::Ok().body(buffer))
}
fn compress_commits_db(repository: &LocalRepository) -> Result<Vec<u8>, OxenError> {
let commit_dir = util::fs::oxen_hidden_dir(&repository.path).join(COMMITS_DIR);
let tar_subdir = Path::new(COMMITS_DIR);
log::debug!("Compressing commit db from dir {commit_dir:?}");
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
tar.append_dir_all(tar_subdir, commit_dir)?;
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!("Compressed commit dir size is {}", ByteSize::b(total_size));
Ok(buffer)
}
#[tracing::instrument(skip_all)]
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/{base_head}/dir_hashes_db",
tag = "Commits",
description = "Download directory hashes database for a commit range as a tarball.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
("base_head" = String, Path, description = "Commit ID range (base..head) or single commit ID", example = "abc1234..84c76a5"),
),
responses(
(status = 200, description = "Tarball of dir hashes DB"),
(status = 400, description = "Invalid base_head format"),
(status = 404, description = "Repository or commit not found")
)
)]
pub async fn download_dir_hashes_db(
req: HttpRequest,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let base_head = path_param(&req, "base_head")?.to_string();
let repository = get_repo(&app_data.path, namespace, name)?;
let commits = if base_head.contains("..") {
let split = base_head.split("..").collect::<Vec<&str>>();
if split.len() != 2 {
return Err(OxenHttpError::BadRequest("Invalid base_head".into()));
}
let base_commit_id = split[0];
let head_commit_id = split[1];
let base_commit = repositories::revisions::get(&repository, base_commit_id)?
.ok_or_else(|| OxenError::RevisionNotFound(base_commit_id.into()))?;
let head_commit = repositories::revisions::get(&repository, head_commit_id)?
.ok_or_else(|| OxenError::RevisionNotFound(head_commit_id.into()))?;
repositories::commits::list_between(&repository, &base_commit, &head_commit)?
} else {
repositories::commits::list_from(&repository, &base_head)?
};
let buffer = compress_commits(&repository, &commits)?;
Ok(HttpResponse::Ok().body(buffer))
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/{commit_or_branch}/commit_entries_db",
tag = "Commits",
description = "Download the commit entries database for a specific commit as a tarball.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
("commit_or_branch" = String, Path, description = "Commit ID or Branch name", example = "main"),
),
responses(
(status = 200, description = "Tarball of commit entries DB"),
(status = 404, description = "Repository or commit not found")
)
)]
pub async fn download_commit_entries_db(
req: HttpRequest,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let commit_or_branch = path_param(&req, "commit_or_branch")?.to_string();
let repository = get_repo(&app_data.path, namespace, name)?;
let commit = repositories::revisions::get(&repository, &commit_or_branch)?
.ok_or_else(|| OxenError::RevisionNotFound(commit_or_branch.into()))?;
let buffer = compress_commit(&repository, &commit)?;
Ok(HttpResponse::Ok().body(buffer))
}
fn compress_commits(
repository: &LocalRepository,
commits: &[Commit],
) -> Result<Vec<u8>, OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
let dirs_to_compress = vec![DIRS_DIR, DIR_HASHES_DIR];
log::debug!("Compressing {} commits", commits.len());
for commit in commits {
let commit_dir = util::fs::oxen_hidden_dir(&repository.path)
.join(HISTORY_DIR)
.join(commit.id.clone());
let tar_subdir = Path::new(HISTORY_DIR).join(commit.id.clone());
log::debug!("Compressing commit {} from dir {:?}", commit.id, commit_dir);
for dir in &dirs_to_compress {
let full_path = commit_dir.join(dir);
let tar_path = tar_subdir.join(dir);
if full_path.exists() {
tar.append_dir_all(&tar_path, full_path)?;
}
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!(
"Compressed {} commits, size is {}",
commits.len(),
ByteSize::b(total_size)
);
Ok(buffer)
}
fn compress_commit(repository: &LocalRepository, commit: &Commit) -> Result<Vec<u8>, OxenError> {
let commit_dir = util::fs::oxen_hidden_dir(&repository.path)
.join(HISTORY_DIR)
.join(commit.id.clone());
let tar_subdir = Path::new(HISTORY_DIR).join(commit.id.clone());
log::debug!("Compressing commit {} from dir {:?}", commit.id, commit_dir);
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
let dirs_to_compress = vec![DIRS_DIR, DIR_HASHES_DIR];
for dir in &dirs_to_compress {
let full_path = commit_dir.join(dir);
let tar_path = tar_subdir.join(dir);
if full_path.exists() {
tar.append_dir_all(&tar_path, full_path)?;
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX);
log::debug!(
"Compressed commit {} size is {}",
commit.id,
ByteSize::b(total_size)
);
Ok(buffer)
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/commits",
description = "Upload a commit to a branch on the server. This creates an empty commit. To create a commit with children, use the upload_tree endpoint.",
tag = "Commits",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
),
request_body(
content = Commit,
description = "Commit object and target branch name.",
example = json!({
"author": "Bessie Oxington",
"email": "bessie@oxen.ai",
"message": "Empty commit for testing",
"id": "abc1234567890def1234567890fedcba",
"branch_name": "main"
}),
),
responses(
(status = 200, description = "Commit created", body = CommitResponse),
(status = 400, description = "Invalid commit data or mismatched remote history"),
)
)]
pub async fn create(
req: HttpRequest,
body: String,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
log::debug!("Got commit data: {body}");
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repository = get_repo(&app_data.path, namespace, repo_name)?;
let new_commit: Commit = match serde_json::from_str(&body) {
Ok(commit) => commit,
Err(_) => {
log::error!("commits create got invalid commit data {body}");
return Err(OxenHttpError::BadRequest("Invalid commit data".into()));
}
};
log::debug!("commits create got new commit: {new_commit:?}");
let bn: BranchName =
match serde_json::from_str(&body) {
Ok(name) => name,
Err(_) => return Err(OxenHttpError::BadRequest(
"Must supply `branch_name` in body. Upgrade CLI to greater than v0.6.1 if failing."
.into(),
)),
};
match repositories::commits::create_empty_commit(&repository, bn.branch_name, &new_commit) {
Ok(commit) => Ok(HttpResponse::Ok().json(CommitResponse {
status: StatusMessage::resource_created(),
commit: commit.to_owned(),
})),
Err(err) => {
log::error!("Err create_commit: {err}");
Err(OxenHttpError::InternalServerError)
}
}
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/commits/upload_chunk",
description = "Upload a chunk of file data for use in large file uploads.",
tag = "Commits",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
ChunkedDataUploadQuery
),
request_body(
content_type = "application/octet-stream",
description = "Chunk of data (binary bytes)",
content = Vec<u8>
),
responses(
(status = 200, description = "Chunk uploaded successfully", body = StatusMessage),
)
)]
pub async fn upload_chunk(
req: HttpRequest,
mut chunk: web::Payload, query: web::Query<ChunkedDataUploadQuery>, ) -> Result<HttpResponse, OxenHttpError> {
log::debug!("in upload_chunk controller");
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, name)?;
let hidden_dir = util::fs::oxen_hidden_dir(&repo.path);
let id = query.hash.clone();
let size = query.total_size;
let chunk_num = query.chunk_num;
let total_chunks = query.total_chunks;
log::debug!(
"upload_chunk got chunk {chunk_num}/{total_chunks} of upload {id} of total size {size}"
);
let tmp_dir = hidden_dir.join("tmp").join("chunked").join(id);
let chunk_file = tmp_dir.join(format!("chunk_{chunk_num:016}"));
if !tmp_dir.exists()
&& let Err(err) = util::fs::create_dir_all(&tmp_dir)
{
log::error!("upload_chunk could not complete chunk upload, mkdir failed: {err:?}");
return Ok(HttpResponse::InternalServerError().json(StatusMessage::internal_server_error()));
}
let mut bytes = web::BytesMut::new();
while let Some(item) = chunk.next().await {
bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?);
}
log::debug!("upload_chunk writing file {chunk_file:?}");
match OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&chunk_file)
{
Ok(mut f) => {
match f.write_all(&bytes) {
Ok(_) => {
log::debug!("upload_chunk successfully wrote chunk {chunk_file:?}");
if let Err(err) = check_if_upload_complete_and_unpack(
&repo,
tmp_dir,
total_chunks,
size,
query.is_compressed,
query.filename.to_owned(),
)
.await
{
log::error!("upload_chunk unpack failed: {err:?}");
return Err(err.into());
}
Ok(HttpResponse::Ok().json(StatusMessage::resource_created()))
}
Err(err) => {
log::error!(
"upload_chunk could not complete chunk upload, file write_all failed: {err:?} -> {chunk_file:?}"
);
Ok(HttpResponse::InternalServerError()
.json(StatusMessage::internal_server_error()))
}
}
}
Err(err) => {
log::error!(
"upload_chunk could not complete chunk upload, file create failed: {err:?} -> {chunk_file:?}"
);
Ok(HttpResponse::InternalServerError().json(StatusMessage::internal_server_error()))
}
}
}
async fn check_if_upload_complete_and_unpack(
repo: &LocalRepository,
tmp_dir: PathBuf,
total_chunks: usize,
total_size: usize,
is_compressed: bool,
filename: Option<String>,
) -> Result<(), OxenError> {
let mut files = util::fs::list_files_in_dir(&tmp_dir).await?;
log::debug!(
"check_if_upload_complete_and_unpack checking if complete... {} / {}",
files.len(),
total_chunks
);
if files.len() < total_chunks {
return Ok(());
}
files.sort();
let mut uploaded_size: u64 = 0;
for file in files.iter() {
let metadata = util::fs::metadata(file)?;
uploaded_size += metadata.len();
}
log::debug!(
"check_if_upload_complete_and_unpack checking if complete... {uploaded_size} / {total_size}"
);
if (uploaded_size as usize) >= total_size {
log::debug!(
"check_if_upload_complete_and_unpack decompressing {} bytes to {:?}",
total_size,
repo.path
);
if is_compressed {
unpack_compressed_data(&files, repo).await?;
} else {
let filename = filename.ok_or_else(|| {
OxenError::MissingFileName(
"check_if_upload_complete_and_unpack must supply filename if !compressed"
.into(),
)
})?;
unpack_to_file(&files, repo, &filename)?;
}
log::debug!(
"check_if_upload_complete_and_unpack unpacked {} files successfully",
files.len()
);
util::fs::remove_dir_all(&tmp_dir)?;
log::debug!("check_if_upload_complete_and_unpack removed tmp dir {tmp_dir:?}");
}
Ok(())
}
fn unpack_to_file(
files: &[PathBuf],
repo: &LocalRepository,
filename: &str,
) -> Result<(), OxenError> {
log::debug!("Got filename {filename}");
let os_path = OsPath::from(filename).to_pathbuf();
log::debug!("Got native filename {os_path:?}");
let hidden_dir = util::fs::oxen_hidden_dir(&repo.path);
let mut full_path = hidden_dir.join(os_path);
full_path =
util::fs::replace_file_name_keep_extension(&full_path, VERSION_FILE_NAME.to_owned());
log::debug!("Unpack to {full_path:?}");
if let Some(parent) = full_path.parent() {
util::fs::create_dir_all(parent)?;
}
let mut outf = std::fs::File::create(&full_path)
.map_err(|e| OxenError::file_create_error(&full_path, e))?;
for file in files.iter() {
log::debug!("Reading file bytes {file:?}");
let mut buffer: Vec<u8> = Vec::new();
let mut f = std::fs::File::open(file).map_err(|e| OxenError::file_open_error(file, e))?;
f.read_to_end(&mut buffer)
.map_err(|e| OxenError::file_read_error(file, e))?;
log::debug!("Read {} file bytes from file {:?}", buffer.len(), file);
outf.write_all(&buffer)?;
log::debug!("Unpack successful! {full_path:?}");
}
Ok(())
}
async fn unpack_compressed_data(
files: &[PathBuf],
repo: &LocalRepository,
) -> Result<(), OxenError> {
let mut buffer: Vec<u8> = Vec::new();
for file in files.iter() {
log::debug!("Reading file bytes {file:?}");
let mut f = std::fs::File::open(file).map_err(|e| OxenError::file_open_error(file, e))?;
f.read_to_end(&mut buffer)
.map_err(|e| OxenError::file_read_error(file, e))?;
}
unpack_entry_tarball_async(repo, buffer).await?;
Ok(())
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/commits/upload",
tag = "Commits",
description = "Upload the commits database tarball to the server during push.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
),
request_body(
content_type = "application/octet-stream",
description = "Compressed commit database (tar.gz)",
content = Vec<u8>
),
responses(
(status = 200, description = "Commits DB uploaded successfully", body = StatusMessage),
)
)]
pub async fn upload(
req: HttpRequest,
mut body: web::Payload, ) -> Result<HttpResponse, OxenHttpError> {
log::debug!("in regular upload controller");
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, &namespace, &name)?;
let mut bytes = Vec::new();
while let Some(item) = body.next().await {
bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?);
}
let total_size: u64 = u64::try_from(bytes.len()).unwrap_or(u64::MAX);
log::debug!(
"Got compressed data for repo {}/{} -> {}",
namespace,
name,
ByteSize::b(total_size)
);
log::debug!(
"Decompressing {} bytes to repo at {}",
bytes.len(),
repo.path.display()
);
unpack_entry_tarball_async(&repo, bytes).await?;
Ok(HttpResponse::Ok().json(StatusMessage::resource_created()))
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/commits/{commit_id}/complete",
description = "Notify the server that the commit has finished uploading.",
tag = "Commits",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
("commit_id" = String, Path, description = "ID of the commit to complete", example = "84c76a5b2e9a2637f9091991475c404d"),
),
responses(
(status = 200, description = "Commit push completed successfully", body = CommitResponse),
(status = 404, description = "Repository or commit not found"),
)
)]
pub async fn complete(req: HttpRequest) -> Result<HttpResponse, Error> {
let app_data = req.app_data::<OxenAppData>().unwrap();
let namespace: &str = path_param(&req, "namespace").unwrap();
let repo_name: &str = path_param(&req, "repo_name").unwrap();
let commit_id: &str = path_param(&req, "commit_id").unwrap();
match repositories::get_by_namespace_and_name(&app_data.path, namespace, repo_name) {
Ok(Some(repo)) => {
match repositories::commits::get_by_id(&repo, commit_id) {
Ok(Some(commit)) => {
let response = CommitResponse {
status: StatusMessage::resource_created(),
commit: commit.clone(),
};
Ok(HttpResponse::Ok().json(response))
}
Ok(None) => {
log::error!("Could not find commit [{commit_id}]");
Ok(HttpResponse::NotFound().json(StatusMessage::resource_not_found()))
}
Err(err) => {
log::error!("Error finding commit [{commit_id}]: {err}");
Ok(HttpResponse::InternalServerError()
.json(StatusMessage::internal_server_error()))
}
}
}
Ok(None) => {
log::debug!("404 could not get repo {repo_name}",);
Ok(HttpResponse::NotFound().json(StatusMessage::resource_not_found()))
}
Err(repo_err) => {
log::error!("Err get_by_name: {repo_err}");
Ok(HttpResponse::InternalServerError().json(StatusMessage::internal_server_error()))
}
}
}
#[utoipa::path(
post,
path = "/api/repos/{namespace}/{repo_name}/commits/{commit_id}/upload_tree",
tag = "Commits",
description = "Upload a commit's merkle tree data as a compressed tarball.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
("commit_id" = String, Path, description = "Client head commit ID", example = "84c76a5b2e9a2637f9091991475c404d"),
),
request_body(
content_type = "application/octet-stream",
description = "Compressed tree data (tar.gz)",
content = Vec<u8>
),
responses(
(status = 200, description = "Tree uploaded successfully", body = CommitResponse),
)
)]
pub async fn upload_tree(
req: HttpRequest,
mut body: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let client_head_id = path_param(&req, "commit_id")?.to_string();
let repo = get_repo(&app_data.path, namespace, name)?;
let server_head_commit = repositories::commits::head_commit(&repo)?;
let tmp_dir = util::fs::oxen_hidden_dir(&repo.path).join("tmp");
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?);
}
let total_size: u64 = u64::try_from(bytes.len()).unwrap_or(u64::MAX);
log::debug!(
"Got compressed data for tree {} -> {}",
client_head_id,
ByteSize::b(total_size)
);
log::debug!("Decompressing {} bytes to {:?}", bytes.len(), tmp_dir);
unpack_tree_tarball(&tmp_dir, &bytes).await?;
Ok(HttpResponse::Ok().json(CommitResponse {
status: StatusMessage::resource_found(),
commit: server_head_commit.to_owned(),
}))
}
#[utoipa::path(
get,
path = "/api/repos/{namespace}/{repo_name}/commits/root",
tag = "Commits",
description = "Get the root (initial) commit of the repository, or None if empty.",
params(
("namespace" = String, Path, description = "Namespace of the repository", example = "ox"),
("repo_name" = String, Path, description = "Name of the repository", example = "ImageNet-1k"),
),
responses(
(status = 200, description = "Root commit found (None if empty repository)", body = RootCommitResponse),
)
)]
pub async fn root_commit(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, name)?;
let root = repositories::commits::root_commit_maybe(&repo)?;
Ok(HttpResponse::Ok().json(RootCommitResponse {
status: StatusMessage::resource_found(),
commit: root,
}))
}
async fn unpack_tree_tarball(tmp_dir: &Path, data: &[u8]) -> Result<(), OxenError> {
let reader = Cursor::new(data);
let buf_reader = BufReader::new(reader);
let decoder = GzipDecoder::new(buf_reader);
let mut archive = Archive::new(decoder);
let mut entries = match archive.entries() {
Ok(entries) => entries,
Err(e) => {
log::error!("Could not unpack tree database from archive...");
log::error!("Err: {e:?}");
return Err(OxenError::basic_str("Failed to get archive entries"));
}
};
while let Some(entry) = entries.next().await {
if let Ok(mut file) = entry {
let path = file.path().unwrap();
log::debug!("unpack_tree_tarball path {path:?}");
let stripped_path = if path.starts_with(HISTORY_DIR) {
match path.strip_prefix(HISTORY_DIR) {
Ok(stripped) => stripped,
Err(err) => {
log::error!("Could not strip prefix from path {err:?}");
return Err(OxenError::basic_str("Failed to strip path prefix"));
}
}
} else {
&path
};
let mut new_path = PathBuf::from(tmp_dir);
new_path.push(stripped_path);
if let Some(parent) = new_path.parent() {
util::fs::create_dir_all(parent).expect("Could not create parent dir");
}
log::debug!("unpack_tree_tarball new_path {path:?}");
file.unpack(&new_path).await.unwrap();
} else {
log::error!("Could not unpack file in archive...");
}
}
Ok(())
}
async fn unpack_entry_tarball_async(
repo: &LocalRepository,
compressed_data: Vec<u8>,
) -> Result<(), OxenError> {
let hidden_dir = util::fs::oxen_hidden_dir(&repo.path);
let version_store = repo.version_store()?;
let reader = Cursor::new(compressed_data);
let buf_reader = BufReader::new(reader);
let decoder = GzipDecoder::new(buf_reader);
let mut archive = Archive::new(decoder);
let mut entries = archive.entries()?;
while let Some(entry) = entries.next().await {
let mut file = entry?;
let path = file
.path()
.map_err(|e| OxenError::basic_str(format!("Invalid path in archive: {e}")))?;
if path.starts_with("versions") && path.to_string_lossy().contains("files") {
let hash = extract_hash_from_path(&path)?;
let entry_size = file.header().size()?;
version_store
.store_version_from_reader(&hash, Box::new(file), entry_size)
.await?;
} else {
let entry_type = file.header().entry_type();
if !entry_type.is_file() && !entry_type.is_dir() {
return Err(OxenError::basic_str(format!(
"Unsupported archive entry type for {}: only regular files and \
directories are allowed",
path.display()
)));
}
let mut dest = hidden_dir.clone();
for component in path.components() {
match component {
std::path::Component::Normal(part) => dest.push(part),
std::path::Component::ParentDir => {
return Err(OxenError::basic_str(format!(
"Path traversal detected in archive entry: {}",
path.display()
)));
}
_ => continue,
}
}
if dest == hidden_dir {
continue;
}
if let Some(parent) = dest.parent() {
util::fs::create_dir_all(parent)?;
}
file.unpack(&dest)
.await
.map_err(|e| OxenError::basic_str(format!("Failed to unpack file: {e}")))?;
}
}
log::debug!("Done decompressing with async streaming.");
Ok(())
}
fn extract_hash_from_path(path: &Path) -> Result<String, OxenError> {
let parts: Vec<_> = path.components().map(|comp| comp.as_os_str()).collect();
if parts.len() >= 5 && parts[0] == "versions" && parts[1] == "files" {
let top_dir = parts[2];
let sub_dir = parts[3];
if top_dir.len() == 2 && !sub_dir.is_empty() {
return Ok(format!(
"{}{}",
top_dir.to_string_lossy(),
sub_dir.to_string_lossy()
));
}
}
Err(OxenError::basic_str(format!(
"Could not get hash for file: {path:?}"
)))
}