use crate::errors::OxenHttpError;
use crate::helpers::get_repo;
use crate::params::{PageNumQuery, app_data, parse_resource, path_param};
use liboxen::constants::AVG_CHUNK_SIZE;
use liboxen::error::OxenError;
use liboxen::util::fs::replace_file_name_keep_extension;
use liboxen::util::paginate;
use liboxen::view::StatusMessage;
use liboxen::view::entries::{PaginatedMetadataEntries, PaginatedMetadataEntriesResponse};
use liboxen::{constants, current_function, repositories};
use actix_web::{HttpRequest, HttpResponse, web};
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use futures_util::stream::StreamExt as _;
use serde::Deserialize;
use std::io::prelude::*;
#[derive(Deserialize, Debug)]
pub struct ChunkQuery {
pub chunk_start: Option<u64>,
pub chunk_size: Option<u64>,
}
pub async fn download_data_from_version_paths(
req: HttpRequest,
mut body: web::Payload,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, &repo_name)?;
let mut bytes = web::BytesMut::new();
while let Some(item) = body.next().await {
bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?);
}
log::debug!(
"{} got repo [{}] and content_ids size {}",
current_function!(),
repo_name,
bytes.len()
);
let mut gz = GzDecoder::new(&bytes[..]);
let mut line_delimited_files = String::new();
gz.read_to_string(&mut line_delimited_files)?;
let content_files: Vec<&str> = line_delimited_files.split('\n').collect();
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
log::debug!("Got {} content ids", content_files.len());
for content_file in content_files.iter() {
if content_file.is_empty() {
continue;
}
let mut path_to_read = repo.path.join(content_file);
path_to_read = replace_file_name_keep_extension(
&path_to_read,
constants::VERSION_FILE_NAME.to_string(),
);
if path_to_read.exists() {
tar.append_path_with_name(path_to_read, content_file)?;
} else {
log::error!("Could not find content: {content_file:?} -> {path_to_read:?}");
return Err(OxenError::path_does_not_exist(path_to_read).into());
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
Ok(HttpResponse::Ok().body(buffer))
}
#[tracing::instrument(skip_all)]
pub async fn download_chunk(
req: HttpRequest,
query: web::Query<ChunkQuery>,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let repo = get_repo(&app_data.path, namespace, &repo_name)?;
let resource = parse_resource(&req, &repo)?;
let commit = resource.clone().commit.ok_or(OxenHttpError::NotFound)?;
let path = resource.path.clone();
log::debug!(
"{} resource {}/{}",
current_function!(),
repo_name,
resource
);
let version_store = repo.version_store()?;
let chunk_start: u64 = query.chunk_start.unwrap_or(0);
let chunk_size: u64 = query.chunk_size.unwrap_or(AVG_CHUNK_SIZE);
let file_node = match repositories::entries::get_file(&repo, &commit, &path)? {
Some(node) => node,
None => {
return Err(OxenHttpError::NotFound);
}
};
let chunk = version_store
.get_version_chunk(&file_node.hash().to_string(), chunk_start, chunk_size)
.await?;
Ok(HttpResponse::Ok().body(chunk))
}
pub async fn list_tabular(
req: HttpRequest,
query: web::Query<PageNumQuery>,
) -> Result<HttpResponse, OxenHttpError> {
let app_data = app_data(&req)?;
let namespace = path_param(&req, "namespace")?.to_string();
let repo_name = path_param(&req, "repo_name")?.to_string();
let commit_or_branch = path_param(&req, "commit_or_branch")?.to_string();
let repo = get_repo(&app_data.path, namespace, repo_name)?;
let commit = repositories::revisions::get(&repo, &commit_or_branch)?
.ok_or_else(|| OxenError::RevisionNotFound(commit_or_branch.into()))?;
let page = query.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
let page_size = query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);
log::debug!(
"{} page {} page_size {}",
current_function!(),
page,
page_size,
);
let entries = repositories::entries::list_tabular_files_in_repo(&repo, &commit)?;
log::debug!("list_tabular entries: {entries:?}");
let (paginated_entries, pagination) = paginate(entries, page, page_size);
Ok(HttpResponse::Ok().json(PaginatedMetadataEntriesResponse {
status: StatusMessage::resource_found(),
entries: PaginatedMetadataEntries {
entries: paginated_entries,
pagination,
},
}))
}