oxen-server 0.50.7

Oxen is a fast, unstructured data version control, to help version large machine learning datasets written in Rust.
use crate::errors::OxenHttpError;
use crate::helpers::get_repo;
use crate::params::{PageNumQuery, app_data, parse_resource, path_param};

use liboxen::constants::stream_segment_size;
use liboxen::error::OxenError;
use liboxen::util::fs::replace_file_name_keep_extension;
use liboxen::util::paginate;
use liboxen::view::StatusMessage;
use liboxen::view::entries::{PaginatedMetadataEntries, PaginatedMetadataEntriesResponse};
use liboxen::{constants, current_function, repositories};

use actix_web::{HttpRequest, HttpResponse, web};
use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use futures_util::stream::StreamExt as _;
use serde::Deserialize;

use std::io::prelude::*;

#[derive(Deserialize, Debug)]
pub struct ChunkQuery {
    pub chunk_start: Option<u64>,
    pub chunk_size: Option<u64>,
}

// Deprecated. Only kept to support older clients before v0.37.2
pub async fn download_data_from_version_paths(
    req: HttpRequest,
    mut body: web::Payload,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;
    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let repo = get_repo(app_data, namespace, &repo_name)?;

    let mut bytes = web::BytesMut::new();
    while let Some(item) = body.next().await {
        bytes.extend_from_slice(&item.map_err(|_| OxenHttpError::FailedToReadRequestPayload)?);
    }
    log::debug!(
        "{} got repo [{}] and content_ids size {}",
        current_function!(),
        repo_name,
        bytes.len()
    );

    let mut gz = GzDecoder::new(&bytes[..]);
    let mut line_delimited_files = String::new();
    gz.read_to_string(&mut line_delimited_files)?;

    let content_files: Vec<&str> = line_delimited_files.split('\n').collect();

    let enc = GzEncoder::new(Vec::new(), Compression::default());
    let mut tar = tar::Builder::new(enc);

    log::debug!("Got {} content ids", content_files.len());
    for content_file in content_files.iter() {
        if content_file.is_empty() {
            // last line might be empty on split \n
            continue;
        }

        // log::debug!("download_data_from_version_paths pulling {}", content_file);

        // We read from version file as determined by the latest logic (data.extension)
        // but still want to write the tar archive with the original filename so that it
        // unpacks to the location old clients expect.

        // This is annoying but the older client passes in the full path to the version file with the extension
        // ie .oxen/versions/files/71/7783cda74ceeced8d45fae3155382c/data.jpg
        // but the new client passes in the path without the extension
        // ie .oxen/versions/files/71/7783cda74ceeced8d45fae3155382c/data
        // So we need to support both formats.

        // In an ideal world we would just pass in the hash and not the full path to save on bandwidth as well
        let mut path_to_read = repo.path.join(content_file);
        path_to_read = replace_file_name_keep_extension(
            &path_to_read,
            constants::VERSION_FILE_NAME.to_string(),
        );

        if path_to_read.exists() {
            tar.append_path_with_name(path_to_read, content_file)?;
        } else {
            log::error!("Could not find content: {content_file:?} -> {path_to_read:?}");
            return Err(OxenError::path_does_not_exist(path_to_read).into());
        }
    }

    tar.finish()?;
    let buffer: Vec<u8> = tar.into_inner()?.finish()?;
    Ok(HttpResponse::Ok().body(buffer))
}

/// Download a chunk of a larger file
#[tracing::instrument(skip_all)]
pub async fn download_chunk(
    req: HttpRequest,
    query: web::Query<ChunkQuery>,
) -> actix_web::Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;
    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let repo = get_repo(app_data, namespace, &repo_name)?;
    let resource = parse_resource(&req, &repo)?;
    let commit = resource.clone().commit.ok_or(OxenHttpError::NotFound)?;
    let path = resource.path.clone();

    log::debug!(
        "{} resource {}/{}",
        current_function!(),
        repo_name,
        resource
    );

    let version_store = repo.version_store();
    let chunk_start: u64 = query.chunk_start.unwrap_or(0);
    let chunk_size: u64 = query.chunk_size.unwrap_or_else(stream_segment_size);

    let file_node = match repositories::entries::get_file(&repo, &commit, &path)? {
        Some(node) => node,
        None => {
            return Err(OxenHttpError::NotFound);
        }
    };

    let chunk = version_store
        .get_version_chunk(&file_node.hash().to_string(), chunk_start, chunk_size)
        .await?;

    Ok(HttpResponse::Ok().body(chunk))
}

pub async fn list_tabular(
    req: HttpRequest,
    query: web::Query<PageNumQuery>,
) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;
    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let commit_or_branch = path_param(&req, "commit_or_branch")?.to_string();
    let repo = get_repo(app_data, namespace, repo_name)?;
    let commit = repositories::revisions::get(&repo, &commit_or_branch)?
        .ok_or_else(|| OxenError::RevisionNotFound(commit_or_branch.into()))?;

    let page = query.page.unwrap_or(constants::DEFAULT_PAGE_NUM);
    let page_size = query.page_size.unwrap_or(constants::DEFAULT_PAGE_SIZE);

    log::debug!(
        "{} page {} page_size {}",
        current_function!(),
        page,
        page_size,
    );

    let entries = repositories::entries::list_tabular_files_in_repo(&repo, &commit)?;
    log::debug!("list_tabular entries: {entries:?}");
    let (paginated_entries, pagination) = paginate(entries, page, page_size);

    Ok(HttpResponse::Ok().json(PaginatedMetadataEntriesResponse {
        status: StatusMessage::resource_found(),
        entries: PaginatedMetadataEntries {
            entries: paginated_entries,
            pagination,
        },
    }))
}