liboxen 0.50.4

Oxen is a fast, unstructured data version control, to help version large machine learning datasets written in Rust.
Documentation
use crate::core::db::data_frames::DataFrameError;
use crate::core::db::data_frames::row_changes_db::get_all_data_frame_row_changes;
use crate::error::OxenError;
use crate::model::Workspace;
use crate::model::data_frame::update_result::UpdateResult;
use crate::view::data_frames::DataFrameRowChange;

use polars::datatypes::AnyValue;

use polars::frame::DataFrame;
use polars::prelude::PlSmallStr;

use crate::{core, repositories};
use sql_query_builder::Select;

use crate::constants::{DIFF_STATUS_COL, OXEN_ID_COL, OXEN_ROW_ID_COL, TABLE_NAME};

use crate::core::db::data_frames::changes_db;
use crate::core::db::data_frames::df_db::{self, with_df_db_manager};
use crate::model::LocalRepository;
use crate::model::staged_row_status::StagedRowStatus;

use std::path::Path;
use std::str::FromStr;

pub fn add(
    _repo: &LocalRepository,
    workspace: &Workspace,
    file_path: impl AsRef<Path>,
    data: &serde_json::Value,
) -> Result<DataFrame, OxenError> {
    core::v_latest::workspaces::data_frames::rows::add(workspace, file_path.as_ref(), data)
}

pub fn get_row_diff(
    workspace: &Workspace,
    file_path: impl AsRef<Path>,
) -> Result<Vec<DataFrameRowChange>, DataFrameError> {
    let row_changes_path =
        repositories::workspaces::data_frames::row_changes_path(workspace, file_path);
    match changes_db::try_get_changes_db(&row_changes_path)? {
        Some(db) => get_all_data_frame_row_changes(&db),
        None => Ok(Vec::new()),
    }
}

pub fn update(
    _repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: &str,
    data: &serde_json::Value,
) -> Result<DataFrame, OxenError> {
    core::v_latest::workspaces::data_frames::rows::update(workspace, path.as_ref(), row_id, data)
}

pub fn batch_update(
    _repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    data: &serde_json::Value,
) -> Result<Vec<UpdateResult>, OxenError> {
    core::v_latest::workspaces::data_frames::rows::batch_update(workspace, path.as_ref(), data)
}

pub fn delete(
    _repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: &str,
) -> Result<DataFrame, OxenError> {
    core::v_latest::workspaces::data_frames::rows::delete(workspace, path.as_ref(), row_id)
}

pub async fn restore(
    _repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: impl AsRef<str>,
) -> Result<DataFrame, OxenError> {
    core::v_latest::workspaces::data_frames::rows::restore(workspace, path.as_ref(), row_id).await
}

pub fn get_by_id(
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: impl AsRef<str>,
) -> Result<DataFrame, OxenError> {
    let path = path.as_ref();
    let row_id = row_id.as_ref();
    let db_path = repositories::workspaces::data_frames::duckdb_path(workspace, path);
    log::debug!("get_row_by_id() got db_path: {db_path:?}");
    let data = with_df_db_manager(&db_path, |manager| {
        manager.with_conn(|conn| {
            let query = Select::new()
                .select("*")
                .from(TABLE_NAME)
                .where_clause(&format!("{OXEN_ID_COL} = '{row_id}'"));
            df_db::select(conn, &query, None)
        })
    })?;
    log::debug!("get_row_by_id() got data: {data:?}");
    Ok(data)
}

pub fn get_row_id(row_df: &DataFrame) -> Result<Option<String>, OxenError> {
    let oxen_id_col = PlSmallStr::from_str(OXEN_ID_COL);
    if row_df.height() == 1 && row_df.get_column_names().contains(&&oxen_id_col) {
        Ok(row_df
            .column(OXEN_ID_COL)
            .unwrap()
            .get(0)
            .map(|val| val.to_string().trim_matches('"').to_string())
            .ok())
    } else {
        Ok(None)
    }
}

pub fn get_row_status(row_df: &DataFrame) -> Result<Option<StagedRowStatus>, DataFrameError> {
    let diff_status_col = PlSmallStr::from_str(DIFF_STATUS_COL);
    if row_df.height() == 1 && row_df.get_column_names().contains(&&diff_status_col) {
        let anyval_status = row_df.column(DIFF_STATUS_COL).unwrap().get(0)?;
        let str_status = anyval_status
            .get_str()
            .ok_or_else(|| DataFrameError::RowStatusNotFound)?;
        let status = StagedRowStatus::from_str(str_status)
            .map_err(|_| DataFrameError::InvalidRowStatus(str_status.to_string()))?;
        Ok(Some(status))
    } else {
        Ok(None)
    }
}

pub fn get_row_idx(row_df: &DataFrame) -> Result<Option<usize>, OxenError> {
    let oxen_row_id_col = PlSmallStr::from_str(OXEN_ROW_ID_COL);
    if row_df.height() == 1 && row_df.get_column_names().contains(&&oxen_row_id_col) {
        let row_df_anyval = row_df.column(OXEN_ROW_ID_COL).unwrap().get(0)?;
        match row_df_anyval {
            AnyValue::UInt16(val) => Ok(Some(val as usize)),
            AnyValue::UInt32(val) => Ok(Some(val as usize)),
            AnyValue::UInt64(val) => Ok(Some(val as usize)),
            AnyValue::Int16(val) => Ok(Some(val as usize)),
            AnyValue::Int32(val) => Ok(Some(val as usize)),
            AnyValue::Int64(val) => Ok(Some(val as usize)),
            val => {
                log::debug!("unrecognized row index type {val:?}");
                Ok(None)
            }
        }
    } else {
        Ok(None)
    }
}