oxen-server 0.48.1

Oxen is a fast, unstructured data version control, to help version large machine learning datasets written in Rust.
use std::path::PathBuf;

use crate::errors::OxenHttpError;
use crate::helpers::get_repo;
use crate::params::{app_data, path_param};

use actix_web::{HttpRequest, HttpResponse};
use liboxen::error::StringError;
use liboxen::model::Schema;
use liboxen::model::data_frame::DataFrameSchemaSize;
use liboxen::opts::DFOpts;
use liboxen::repositories;
use liboxen::view::data_frames::columns::{
    ColumnToDelete, ColumnToRestore, ColumnToUpdate, NewColumn,
};
use liboxen::view::json_data_frame_view::JsonDataFrameColumnResponse;
use liboxen::view::{
    JsonDataFrameView, JsonDataFrameViews, StatusMessage, StatusMessageDescription,
};
use serde_json::{Value, json};

pub async fn create(req: HttpRequest, body: String) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;

    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let workspace_id = path_param(&req, "workspace_id")?.to_string();
    let repo = get_repo(&app_data.path, namespace.clone(), repo_name.clone())?;
    let file_path = PathBuf::from(path_param(&req, "path")?);

    let mut body_json: Value = serde_json::from_str(&body).map_err(|_err| {
        OxenHttpError::BadRequest("Failed to parse NewColumn from request body".into())
    })?;

    if let Some(obj) = body_json.as_object_mut() {
        if obj.contains_key("dtype") {
            let dtype_value = obj.remove("dtype").unwrap(); // Safe to unwrap because we just checked it exists
            obj.insert("data_type".to_string(), dtype_value);
        }
    } else {
        return Err(OxenHttpError::BadRequest(
            "Request body is not a valid JSON object".into(),
        ));
    }

    let new_column: NewColumn = serde_json::from_value(body_json)?;

    log::info!(
        "create column {namespace}/{repo_name} for file {file_path:?} on in workspace id {workspace_id}"
    );
    log::debug!("create column with data {new_column:?}");

    // Get the workspace
    let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
        return Ok(HttpResponse::NotFound()
            .json(StatusMessageDescription::workspace_not_found(workspace_id)));
    };

    // Make sure the data frame is indexed
    let is_editable = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;

    if !is_editable {
        return Err(OxenHttpError::DatasetNotIndexed(file_path.into()));
    }

    let column_df = repositories::workspaces::data_frames::columns::add(
        &repo,
        &workspace,
        &file_path,
        &new_column,
    )?;

    let opts = DFOpts::empty();
    let column_schema = Schema::from_polars(column_df.schema());
    let column_df_source = DataFrameSchemaSize::from_df(&column_df, &column_schema);
    let column_df_view = JsonDataFrameView::from_df_opts(column_df, column_schema, &opts).await;
    let diff =
        repositories::workspaces::data_frames::columns::get_column_diff(&workspace, &file_path)?;

    let mut df_views = JsonDataFrameViews {
        source: column_df_source,
        view: column_df_view,
    };

    repositories::workspaces::data_frames::columns::decorate_fields_with_column_diffs(
        &workspace,
        &file_path,
        &mut df_views,
    )?;

    let response = JsonDataFrameColumnResponse {
        data_frame: df_views,
        diff: Some(diff),
        commit: None,
        derived_resource: None,
        status: StatusMessage::resource_found(),
        resource: None,
    };

    Ok(HttpResponse::Ok().json(response))
}

pub async fn delete(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;

    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let workspace_id = path_param(&req, "workspace_id")?.to_string();
    let repo = get_repo(&app_data.path, namespace.clone(), repo_name.clone())?;
    let file_path = PathBuf::from(path_param(&req, "path")?);
    let column_name = path_param(&req, "column_name")
        .map_err(|_| OxenHttpError::BadRequest("Column name missing in path parameters".into()))?;

    let column_to_delete = ColumnToDelete {
        name: column_name.to_string(),
    };

    log::info!(
        "Delete column {namespace}/{repo_name} for file {file_path:?} on in workspace id {workspace_id}"
    );
    log::debug!("create column with data {column_to_delete:?}");

    // Get the workspace
    let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
        return Ok(HttpResponse::NotFound()
            .json(StatusMessageDescription::workspace_not_found(workspace_id)));
    };

    // Make sure the data frame is indexed
    let is_editable = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;

    if !is_editable {
        return Err(OxenHttpError::DatasetNotIndexed(file_path.into()));
    }

    let column_df = match repositories::workspaces::data_frames::columns::delete(
        &repo,
        &workspace,
        &file_path,
        &column_to_delete,
    ) {
        Ok(df) => df,
        Err(e) => {
            log::error!("Error deleting column: {e:?}");
            return Err(OxenHttpError::BasicError(StringError::from(e.to_string())));
        }
    };

    let opts = DFOpts::empty();
    let column_schema = Schema::from_polars(column_df.schema());
    let column_df_source = DataFrameSchemaSize::from_df(&column_df, &column_schema);
    let column_df_view = JsonDataFrameView::from_df_opts(column_df, column_schema, &opts).await;

    let mut df_views = JsonDataFrameViews {
        source: column_df_source,
        view: column_df_view,
    };

    let diff =
        repositories::workspaces::data_frames::columns::get_column_diff(&workspace, &file_path)?;

    repositories::workspaces::data_frames::columns::decorate_fields_with_column_diffs(
        &workspace,
        &file_path,
        &mut df_views,
    )?;

    let response = JsonDataFrameColumnResponse {
        data_frame: df_views,
        diff: Some(diff),
        commit: None,
        derived_resource: None,
        status: StatusMessage::resource_found(),
        resource: None,
    };

    Ok(HttpResponse::Ok().json(response))
}

pub async fn update(req: HttpRequest, body: String) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;

    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let workspace_id = path_param(&req, "workspace_id")?.to_string();
    let repo = get_repo(&app_data.path, namespace.clone(), repo_name.clone())?;
    let file_path = PathBuf::from(path_param(&req, "path")?);
    let column_name = path_param(&req, "column_name")
        .map_err(|_| OxenHttpError::BadRequest("Column name missing in path parameters".into()))?;

    let mut body_json: Value = serde_json::from_str(&body).map_err(|_err| {
        OxenHttpError::BadRequest("Failed to parse request body into JSON".into())
    })?;

    let mut metadata_json: Option<serde_json::Value> = None;
    if let Some(obj) = body_json.as_object_mut() {
        if obj.contains_key("name") {
            let name_value = obj.remove("name").unwrap(); // Safe to unwrap because we just checked it exists
            obj.insert("new_name".to_string(), name_value);
        }
        if obj.contains_key("dtype") {
            let dtype_value = obj.remove("dtype").unwrap(); // Safe to unwrap because we just checked it exists
            obj.insert("new_data_type".to_string(), dtype_value);
        }

        obj.insert("name".to_string(), json!(column_name));
        if obj.contains_key("metadata") {
            metadata_json = Some(obj.remove("metadata").unwrap()); // Safe to unwrap because we just checked it exists
        }
    } else {
        return Err(OxenHttpError::BadRequest(
            "Request body is not a valid JSON object".into(),
        ));
    }

    let column_to_update: ColumnToUpdate = serde_json::from_value(body_json).map_err(|_err| {
        OxenHttpError::BadRequest(
            "Failed to parse ColumnToUpdate from modified request body".into(),
        )
    })?;

    log::info!(
        "Update column {namespace}/{repo_name} for file {file_path:?} on in workspace id {workspace_id}"
    );
    log::debug!("update column with data {column_to_update:?}");

    // Get the workspace
    let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
        return Ok(HttpResponse::NotFound()
            .json(StatusMessageDescription::workspace_not_found(workspace_id)));
    };

    // Make sure the data frame is indexed
    let is_editable = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;

    if !is_editable {
        return Err(OxenHttpError::DatasetNotIndexed(file_path.into()));
    }

    if let Some(metadata) = metadata_json {
        repositories::workspaces::data_frames::columns::add_column_metadata(
            &repo,
            &workspace,
            file_path.clone(),
            column_name.to_string(),
            &metadata,
        )?;
    }

    let column_df = repositories::workspaces::data_frames::columns::update(
        &repo,
        &workspace,
        &file_path,
        &column_to_update,
    )
    .await?;

    let opts = DFOpts::empty();
    let column_schema = Schema::from_polars(column_df.schema());
    let column_df_source = DataFrameSchemaSize::from_df(&column_df, &column_schema);
    let column_df_view = JsonDataFrameView::from_df_opts(column_df, column_schema, &opts).await;

    let mut df_views = JsonDataFrameViews {
        source: column_df_source,
        view: column_df_view,
    };

    repositories::workspaces::data_frames::columns::decorate_fields_with_column_diffs(
        &workspace,
        &file_path,
        &mut df_views,
    )?;

    let new_schema = repositories::data_frames::schemas::get_staged_schema_with_staged_db_manager(
        &workspace.workspace_repo,
        &file_path,
    )?;
    repositories::workspaces::data_frames::columns::update_column_schemas(
        new_schema,
        &mut df_views,
    )?;

    let diff =
        repositories::workspaces::data_frames::columns::get_column_diff(&workspace, &file_path)?;

    let response = JsonDataFrameColumnResponse {
        data_frame: df_views,
        diff: Some(diff),
        commit: None,
        derived_resource: None,
        status: StatusMessage::resource_found(),
        resource: None,
    };

    Ok(HttpResponse::Ok().json(response))
}

pub async fn add_column_metadata(
    req: HttpRequest,
    body: String,
) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;

    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let workspace_id = path_param(&req, "workspace_id")?.to_string();
    let path = path_param(&req, "path")?.to_string();
    let repo = get_repo(&app_data.path, namespace, repo_name)?;

    let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
        return Ok(HttpResponse::NotFound()
            .json(StatusMessageDescription::workspace_not_found(workspace_id)));
    };

    let parsed_json: serde_json::Value = serde_json::from_str(&body)?;

    // Extract the column_name and metadata from the parsed JSON
    let column_name = parsed_json["column_name"]
        .as_str()
        .ok_or_else(|| OxenHttpError::BasicError("column_name is required".into()))?;
    let column_metadata = &parsed_json["metadata"];

    repositories::workspaces::data_frames::columns::add_column_metadata(
        &repo,
        &workspace,
        path.into(),
        column_name.to_string(),
        column_metadata,
    )?;

    Ok(HttpResponse::Ok().json(StatusMessage::resource_updated()))
}

pub async fn restore(req: HttpRequest) -> Result<HttpResponse, OxenHttpError> {
    let app_data = app_data(&req)?;

    let namespace = path_param(&req, "namespace")?.to_string();
    let repo_name = path_param(&req, "repo_name")?.to_string();
    let workspace_id = path_param(&req, "workspace_id")?.to_string();
    let file_path: PathBuf = PathBuf::from(path_param(&req, "path")?);

    let column_name = path_param(&req, "column_name")
        .map_err(|_| OxenHttpError::BadRequest("Column name missing in path parameters".into()))?;

    let column_to_restore = ColumnToRestore {
        name: column_name.to_string(),
    };

    let repo = get_repo(&app_data.path, namespace, repo_name)?;

    let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else {
        return Ok(HttpResponse::NotFound()
            .json(StatusMessageDescription::workspace_not_found(workspace_id)));
    };

    let is_editable = repositories::workspaces::data_frames::is_indexed(&workspace, &file_path)?;

    if !is_editable {
        return Err(OxenHttpError::DatasetNotIndexed(file_path.into()));
    }

    let restored_column = match repositories::workspaces::data_frames::columns::restore(
        &repo,
        &workspace,
        &file_path,
        &column_to_restore,
    )
    .await
    {
        Ok(df) => df,
        Err(e) => {
            log::error!("Error restoring column: {e:?}");
            return Err(OxenHttpError::BasicError(StringError::from(e.to_string())));
        }
    };

    let diff =
        repositories::workspaces::data_frames::columns::get_column_diff(&workspace, &file_path)?;

    let schema = Schema::from_polars(restored_column.schema());
    log::debug!("Restored column in controller is {restored_column:?}");

    let mut df_views = JsonDataFrameViews {
        source: DataFrameSchemaSize::from_df(&restored_column, &schema),
        view: JsonDataFrameView::from_df_opts(restored_column, schema, &DFOpts::empty()).await,
    };

    repositories::workspaces::data_frames::columns::decorate_fields_with_column_diffs(
        &workspace,
        &file_path,
        &mut df_views,
    )?;

    Ok(HttpResponse::Ok().json(JsonDataFrameColumnResponse {
        data_frame: df_views,
        diff: Some(diff),
        commit: None,
        derived_resource: None,
        status: StatusMessage::resource_updated(),
        resource: None,
    }))
}