liboxen 0.50.0

Oxen is a fast, unstructured data version control, to help version large machine learning datasets written in Rust.
Documentation
use crate::core::db::data_frames::df_db::with_df_db_manager;
use crate::core::df::tabular::transform_new;
use crate::core::df::{sql, tabular};
use crate::core::staged::get_staged_db_manager;
use crate::error::OxenError;
use crate::model::ParsedResource;
use crate::model::data_frame::{DataFrameSchemaSize, DataFrameSlice, DataFrameSliceSchemas};
use crate::model::merkle_tree::node::EMerkleTreeNode;
use crate::model::metadata::generic_metadata::GenericMetadata;
use crate::model::metadata::metadata_tabular::MetadataTabularImpl;
use crate::model::{Commit, DataFrameSize, LocalRepository, Schema, Workspace};
use crate::opts::DFOpts;
use crate::repositories;
use polars::prelude::IntoLazy as _;

use std::path::Path;

pub mod schemas;

pub async fn get_slice(
    repo: &LocalRepository,
    resource: &ParsedResource,
    path: impl AsRef<Path>,
    opts: &DFOpts,
) -> Result<DataFrameSlice, OxenError> {
    let workspace = resource.workspace.as_ref();
    let commit = match workspace {
        Some(ws) => ws.commit.clone(),
        None => resource
            .commit
            .clone()
            .ok_or_else(|| OxenError::basic_str("Commit not found"))?,
    };

    let (staged_repo, base_repo) = match workspace {
        Some(ws) => (&ws.workspace_repo, repo),
        None => (repo, repo),
    };

    let file_node = match workspace {
        Some(ws) => {
            let staged_db_manager = get_staged_db_manager(staged_repo)?;
            // Try staged DB first
            if let Some(staged_node) = staged_db_manager.read_from_staged_db(&path)? {
                match staged_node.node.node {
                    EMerkleTreeNode::File(f) => Ok(f),
                    _ => Err(OxenError::basic_str(
                        "Only single file download is supported",
                    )),
                }?
            } else {
                // Fall back to commit tree using workspace's commit
                let commit = &ws.commit;
                repositories::tree::get_file_by_path(base_repo, commit, &path)?
                    .ok_or_else(|| OxenError::path_does_not_exist(path.as_ref()))?
            }
        }
        None => {
            let commit = resource
                .commit
                .as_ref()
                .ok_or_else(|| OxenError::basic_str("Commit not found"))?;
            repositories::tree::get_file_by_path(base_repo, commit, &path)?
                .ok_or_else(|| OxenError::path_does_not_exist(path.as_ref()))?
        }
    };

    log::debug!("get_slice file_node {file_node:?}");

    let metadata: Result<MetadataTabularImpl, OxenError> = match file_node.metadata() {
        Some(metadata) => match metadata {
            GenericMetadata::MetadataTabular(metadata) => Ok(metadata.tabular.clone()),
            _ => {
                return Err(OxenError::basic_str("Metadata is not tabular"));
            }
        },
        None => {
            return Err(OxenError::basic_str("File node does not have metadata"));
        }
    };
    let metadata = metadata?;
    log::debug!("get_slice metadata {metadata:?}");

    let source_schema = metadata.schema;
    let data_frame_size = DataFrameSize {
        width: metadata.width,
        height: metadata.height,
    };

    let handle_sql_result = handle_sql_querying(repo, &commit, path, opts, &data_frame_size).await;
    if let Ok(response) = handle_sql_result {
        return Ok(response);
    }
    // Read the data frame from the version path
    let version_store = repo.version_store()?;
    let version_path = version_store
        .get_version_path(&file_node.hash().to_string())
        .await?;
    let df = tabular::read_df_with_extension(version_path, file_node.extension(), opts).await?;
    log::debug!("get_slice df {:?}", df.height());

    // Check what the view height is
    let view_height = if opts.has_filter_transform() {
        df.height()
    } else {
        data_frame_size.height
    };

    // Update the schema metadata from the source schema
    let mut slice_schema = Schema::from_polars(df.schema());
    slice_schema.update_metadata_from_schema(&source_schema);
    log::debug!("get_slice slice_schema {slice_schema:?}");
    // Return a DataFrameSlice
    Ok(DataFrameSlice {
        schemas: DataFrameSliceSchemas {
            source: DataFrameSchemaSize {
                size: data_frame_size,
                schema: source_schema,
            },
            slice: DataFrameSchemaSize {
                size: DataFrameSize {
                    width: df.width(),
                    height: view_height,
                },
                schema: slice_schema,
            },
        },
        slice: df,
        total_entries: view_height,
    })
}

async fn handle_sql_querying(
    repo: &LocalRepository,
    commit: &Commit,
    path: impl AsRef<Path>,
    opts: &DFOpts,
    data_frame_size: &DataFrameSize,
) -> Result<DataFrameSlice, OxenError> {
    let path = path.as_ref();
    let mut workspace: Option<Workspace> = None;

    if opts.sql.is_some() {
        match crate::core::v_latest::workspaces::data_frames::get_queryable_data_frame_workspace(
            repo, path, commit,
        ) {
            Ok(found_workspace) => {
                workspace = Some(found_workspace);
            }
            Err(e) => return Err(e),
        }
    }

    if let (Some(sql), Some(workspace)) = (opts.sql.clone(), workspace) {
        let db_path = repositories::workspaces::data_frames::duckdb_path(&workspace, path);
        let df = with_df_db_manager(db_path, |manager| {
            manager.with_conn_mut(|conn| sql::query_df(conn, sql, None))
        })?;
        log::debug!("handle_sql_querying got df {df:?}");
        let paginated_df = transform_new(df.clone().lazy(), opts).await?.collect()?;

        let source_schema = if let Some(schema) =
            repositories::data_frames::schemas::get_by_path(repo, &workspace.commit, path)?
        {
            schema
        } else {
            Schema::from_polars(paginated_df.schema())
        };

        let mut slice_schema = Schema::from_polars(df.schema());
        slice_schema.update_metadata_from_schema(&source_schema);

        return Ok(DataFrameSlice {
            schemas: DataFrameSliceSchemas {
                source: DataFrameSchemaSize {
                    size: data_frame_size.clone(),
                    schema: source_schema,
                },
                slice: DataFrameSchemaSize {
                    size: DataFrameSize {
                        width: paginated_df.width(),
                        height: paginated_df.height(),
                    },
                    schema: slice_schema,
                },
            },
            slice: paginated_df,
            total_entries: df.height(),
        });
    }

    Err(OxenError::basic_str("Could not query data frame"))
}