oxen-server 0.49.1

Oxen is a fast, unstructured data version control, to help version large machine learning datasets written in Rust.
use std::path::PathBuf;

use crate::errors::OxenHttpError;
use crate::helpers::get_repo;
use crate::params::{app_data, path_param};

use actix_web::{HttpRequest, HttpResponse, web};
use futures_util::stream::StreamExt as _;
use liboxen::constants::AVG_CHUNK_SIZE;
use liboxen::core;
use liboxen::repositories;
use liboxen::view::StatusMessage;
use liboxen::view::versions::CompleteVersionUploadRequest;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
pub struct ChunkQuery {
    pub offset: Option<u64>,
    pub size: Option<u64>,
}

pub async fn upload(
    req: HttpRequest,
    query: web::Query<ChunkQuery>,
    mut body: web::Payload,
) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;
    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let version_id = path_param(&req, "version_id")?.to_string();

    let offset = query.offset.unwrap_or(0);

    let repo = get_repo(&app_data.path, namespace, repo_name)?;

    log::debug!(
        "/upload version {} chunk offset{} to repo: {:?}",
        version_id,
        offset,
        repo.path
    );

    let version_store = repo.version_store()?;

    // Read the chunk from the payload. Chunks are <= 10MB (AVG_CHUNK_SIZE)
    let mut chunk = web::BytesMut::new();
    while let Some(part_result) = body.next().await {
        let part = part_result.map_err(|e| OxenHttpError::BadRequest(e.to_string().into()))?;
        chunk.extend_from_slice(&part);
    }

    version_store
        .store_version_chunk(&version_id, offset, chunk.freeze())
        .await?;

    Ok(HttpResponse::Ok().json(StatusMessage::resource_created()))
}

pub async fn complete(req: HttpRequest, body: String) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;
    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let version_id = path_param(&req, "version_id")?.to_string();
    let repo = get_repo(&app_data.path, namespace, repo_name)?;

    log::debug!("/complete version chunk upload to repo: {:?}", repo.path);

    // Try to deserialize the body
    let request: Result<CompleteVersionUploadRequest, serde_json::Error> =
        serde_json::from_str(&body);
    if let Ok(request) = request {
        // There should only be a single file in the request
        if request.files.len() != 1 {
            return Ok(HttpResponse::BadRequest().json(StatusMessage::error(
                "Expected a single file in the request",
            )));
        }

        let file = &request.files[0];
        // Support both new clients (num_chunks) and old clients (upload_results)
        let num_chunks = file
            .num_chunks
            .or_else(|| file.upload_results.as_ref().map(|r| r.len()))
            .ok_or_else(|| {
                OxenHttpError::BadRequest(
                    "Missing both num_chunks and upload_results in request".into(),
                )
            })?;
        log::debug!("Client uploaded {num_chunks} chunks");
        let version_store = repo.version_store()?;

        let chunks = version_store.list_version_chunks(&version_id).await?;
        log::debug!("Found {} chunks on server", chunks.len());

        if chunks.len() != num_chunks {
            return Ok(
                HttpResponse::BadRequest().json(StatusMessage::error(format!(
                    "Number of chunks does not match expected number of chunks: {} != {}",
                    chunks.len(),
                    num_chunks
                ))),
            );
        }

        // Combine all the chunks for a version file into a single file
        version_store.combine_version_chunks(&version_id).await?;

        // If the workspace id is provided, stage the file
        if let Some(workspace_id) = request.workspace_id {
            let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
                return Ok(HttpResponse::NotFound().json(StatusMessage::error(format!(
                    "Workspace not found: {workspace_id}"
                ))));
            };
            let version_path = version_store.get_version_path(&version_id).await?;
            let dst_path = if let Some(dst_dir) = &file.dst_dir {
                dst_dir.join(file.file_name.clone())
            } else {
                PathBuf::from(file.file_name.clone())
            };

            core::v_latest::workspaces::files::add_version_file(
                &workspace,
                &*version_path,
                &dst_path,
                &version_id,
                request.update_timestamp,
            )
            .await?;
        }

        return Ok(HttpResponse::Ok().json(StatusMessage::resource_found()));
    }

    Ok(HttpResponse::BadRequest().json(StatusMessage::error("Invalid request body")))
}

// TODO: Add content-type and oxen-revision-id in the response header
// Currently, this endpoint is not used anywhere.
pub async fn download(
    req: HttpRequest,
    query: web::Query<ChunkQuery>,
) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;
    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let version_id = path_param(&req, "version_id")?.to_string();
    let repo = get_repo(&app_data.path, namespace, repo_name)?;

    let offset = query.offset.unwrap_or(0);
    let size = query.size.unwrap_or(AVG_CHUNK_SIZE);

    log::debug!(
        "download_chunk for repo: {:?}, file_hash: {}, offset: {}, size: {}",
        repo.path,
        version_id,
        offset,
        size
    );

    let version_store = repo.version_store()?;

    let chunk_data = version_store
        .get_version_chunk(&version_id, offset, size)
        .await?;
    Ok(HttpResponse::Ok()
        .insert_header((
            actix_web::http::header::CONTENT_LENGTH,
            chunk_data.len().to_string(),
        ))
        .body(chunk_data))
}

pub async fn create(_req: HttpRequest, _body: String) -> Result<HttpResponse, OxenHttpError> {
    Ok(HttpResponse::Ok().json(StatusMessage::resource_found()))
}