liboxen 0.49.1

Oxen is a fast, unstructured data version control, to help version large machine learning datasets written in Rust.
Documentation
use crate::core::db::data_frames::row_changes_db::get_all_data_frame_row_changes;
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::Workspace;
use crate::model::data_frame::update_result::UpdateResult;
use crate::view::data_frames::DataFrameRowChange;

use polars::datatypes::AnyValue;

use polars::frame::DataFrame;
use polars::prelude::PlSmallStr;

use crate::{core, repositories};
use rocksdb::DB;
use sql_query_builder::Select;

use crate::constants::{DIFF_STATUS_COL, OXEN_ID_COL, OXEN_ROW_ID_COL, TABLE_NAME};
use crate::core::db;

use crate::core::db::data_frames::df_db::{self, with_df_db_manager};
use crate::model::LocalRepository;
use crate::model::staged_row_status::StagedRowStatus;

use std::path::Path;

pub fn add(
    repo: &LocalRepository,
    workspace: &Workspace,
    file_path: impl AsRef<Path>,
    data: &serde_json::Value,
) -> Result<DataFrame, OxenError> {
    match repo.min_version() {
        MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
        _ => {
            core::v_latest::workspaces::data_frames::rows::add(workspace, file_path.as_ref(), data)
        }
    }
}

pub fn get_row_diff(
    workspace: &Workspace,
    file_path: impl AsRef<Path>,
) -> Result<Vec<DataFrameRowChange>, OxenError> {
    let row_changes_path =
        repositories::workspaces::data_frames::row_changes_path(workspace, file_path);
    let opts = db::key_val::opts::default();
    let db = DB::open_for_read_only(&opts, dunce::simplified(&row_changes_path), false)?;
    get_all_data_frame_row_changes(&db)
}

pub fn update(
    repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: &str,
    data: &serde_json::Value,
) -> Result<DataFrame, OxenError> {
    match repo.min_version() {
        MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
        _ => core::v_latest::workspaces::data_frames::rows::update(
            workspace,
            path.as_ref(),
            row_id,
            data,
        ),
    }
}

pub fn batch_update(
    repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    data: &serde_json::Value,
) -> Result<Vec<UpdateResult>, OxenError> {
    match repo.min_version() {
        MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
        _ => core::v_latest::workspaces::data_frames::rows::batch_update(
            workspace,
            path.as_ref(),
            data,
        ),
    }
}

pub fn delete(
    repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: &str,
) -> Result<DataFrame, OxenError> {
    match repo.min_version() {
        MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
        _ => {
            core::v_latest::workspaces::data_frames::rows::delete(workspace, path.as_ref(), row_id)
        }
    }
}

pub async fn restore(
    repo: &LocalRepository,
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: impl AsRef<str>,
) -> Result<DataFrame, OxenError> {
    match repo.min_version() {
        MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
        _ => {
            core::v_latest::workspaces::data_frames::rows::restore(workspace, path.as_ref(), row_id)
                .await
        }
    }
}

pub fn get_by_id(
    workspace: &Workspace,
    path: impl AsRef<Path>,
    row_id: impl AsRef<str>,
) -> Result<DataFrame, OxenError> {
    let path = path.as_ref();
    let row_id = row_id.as_ref();
    let db_path = repositories::workspaces::data_frames::duckdb_path(workspace, path);
    log::debug!("get_row_by_id() got db_path: {db_path:?}");
    let data = with_df_db_manager(&db_path, |manager| {
        manager.with_conn(|conn| {
            let query = Select::new()
                .select("*")
                .from(TABLE_NAME)
                .where_clause(&format!("{OXEN_ID_COL} = '{row_id}'"));
            df_db::select(conn, &query, None)
        })
    })?;
    log::debug!("get_row_by_id() got data: {data:?}");
    Ok(data)
}

pub fn get_row_id(row_df: &DataFrame) -> Result<Option<String>, OxenError> {
    let oxen_id_col = PlSmallStr::from_str(OXEN_ID_COL);
    if row_df.height() == 1 && row_df.get_column_names().contains(&&oxen_id_col) {
        Ok(row_df
            .column(OXEN_ID_COL)
            .unwrap()
            .get(0)
            .map(|val| val.to_string().trim_matches('"').to_string())
            .ok())
    } else {
        Ok(None)
    }
}

pub fn get_row_status(row_df: &DataFrame) -> Result<Option<StagedRowStatus>, OxenError> {
    let diff_status_col = PlSmallStr::from_str(DIFF_STATUS_COL);
    if row_df.height() == 1 && row_df.get_column_names().contains(&&diff_status_col) {
        let anyval_status = row_df.column(DIFF_STATUS_COL).unwrap().get(0)?;
        let str_status = anyval_status
            .get_str()
            .ok_or_else(|| OxenError::basic_str("Row status not found"))?;
        let status = StagedRowStatus::from_string(str_status)?;
        Ok(Some(status))
    } else {
        Ok(None)
    }
}

pub fn get_row_idx(row_df: &DataFrame) -> Result<Option<usize>, OxenError> {
    let oxen_row_id_col = PlSmallStr::from_str(OXEN_ROW_ID_COL);
    if row_df.height() == 1 && row_df.get_column_names().contains(&&oxen_row_id_col) {
        let row_df_anyval = row_df.column(OXEN_ROW_ID_COL).unwrap().get(0)?;
        match row_df_anyval {
            AnyValue::UInt16(val) => Ok(Some(val as usize)),
            AnyValue::UInt32(val) => Ok(Some(val as usize)),
            AnyValue::UInt64(val) => Ok(Some(val as usize)),
            AnyValue::Int16(val) => Ok(Some(val as usize)),
            AnyValue::Int32(val) => Ok(Some(val as usize)),
            AnyValue::Int64(val) => Ok(Some(val as usize)),
            val => {
                log::debug!("unrecognized row index type {val:?}");
                Ok(None)
            }
        }
    } else {
        Ok(None)
    }
}