use crate::core::db::data_frames::df_db::with_df_db_manager;
use crate::core::df::tabular::transform_new;
use crate::core::df::{sql, tabular};
use crate::core::staged::with_staged_db_manager;
use crate::error::OxenError;
use crate::model::ParsedResource;
use crate::model::data_frame::{DataFrameSchemaSize, DataFrameSlice, DataFrameSliceSchemas};
use crate::model::merkle_tree::node::EMerkleTreeNode;
use crate::model::metadata::generic_metadata::GenericMetadata;
use crate::model::metadata::metadata_tabular::MetadataTabularImpl;
use crate::model::{Commit, DataFrameSize, LocalRepository, Schema, Workspace};
use crate::opts::DFOpts;
use crate::repositories;
use polars::prelude::IntoLazy as _;
use std::path::Path;
pub mod schemas;
pub async fn get_slice(
repo: &LocalRepository,
resource: &ParsedResource,
path: impl AsRef<Path>,
opts: &DFOpts,
) -> Result<DataFrameSlice, OxenError> {
let workspace = resource.workspace.as_ref();
let commit = match workspace {
Some(ws) => ws.commit.clone(),
None => resource
.commit
.clone()
.ok_or(OxenError::basic_str("Commit not found"))?,
};
let (staged_repo, base_repo) = match workspace {
Some(ws) => (&ws.workspace_repo, repo),
None => (repo, repo),
};
let file_node = match workspace {
Some(ws) => with_staged_db_manager(staged_repo, |staged_db_manager| {
if let Some(staged_node) = staged_db_manager.read_from_staged_db(&path)? {
let file_node = match staged_node.node.node {
EMerkleTreeNode::File(f) => Ok(f),
_ => Err(OxenError::basic_str(
"Only single file download is supported",
)),
}?;
return Ok(file_node);
}
let commit = &ws.commit;
let file_node = repositories::tree::get_file_by_path(base_repo, commit, &path)?
.ok_or(OxenError::path_does_not_exist(path.as_ref()))?;
Ok(file_node)
}),
None => {
let commit = resource
.commit
.as_ref()
.ok_or(OxenError::basic_str("Commit not found"))?;
let file_node = repositories::tree::get_file_by_path(base_repo, commit, &path)?
.ok_or(OxenError::path_does_not_exist(path.as_ref()))?;
Ok(file_node)
}
}?;
log::debug!("get_slice file_node {file_node:?}");
let metadata: Result<MetadataTabularImpl, OxenError> = match file_node.metadata() {
Some(metadata) => match metadata {
GenericMetadata::MetadataTabular(metadata) => Ok(metadata.tabular.clone()),
_ => {
return Err(OxenError::basic_str("Metadata is not tabular"));
}
},
None => {
return Err(OxenError::basic_str("File node does not have metadata"));
}
};
let metadata = metadata?;
log::debug!("get_slice metadata {metadata:?}");
let source_schema = metadata.schema;
let data_frame_size = DataFrameSize {
width: metadata.width,
height: metadata.height,
};
let handle_sql_result = handle_sql_querying(repo, &commit, path, opts, &data_frame_size).await;
if let Ok(response) = handle_sql_result {
return Ok(response);
}
let version_store = repo.version_store()?;
let version_path = version_store
.get_version_path(&file_node.hash().to_string())
.await?;
let df = tabular::read_df_with_extension(version_path, file_node.extension(), opts).await?;
log::debug!("get_slice df {:?}", df.height());
let view_height = if opts.has_filter_transform() {
df.height()
} else {
data_frame_size.height
};
let mut slice_schema = Schema::from_polars(df.schema());
slice_schema.update_metadata_from_schema(&source_schema);
log::debug!("get_slice slice_schema {slice_schema:?}");
Ok(DataFrameSlice {
schemas: DataFrameSliceSchemas {
source: DataFrameSchemaSize {
size: data_frame_size,
schema: source_schema,
},
slice: DataFrameSchemaSize {
size: DataFrameSize {
width: df.width(),
height: view_height,
},
schema: slice_schema,
},
},
slice: df,
total_entries: view_height,
})
}
async fn handle_sql_querying(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
opts: &DFOpts,
data_frame_size: &DataFrameSize,
) -> Result<DataFrameSlice, OxenError> {
let path = path.as_ref();
let mut workspace: Option<Workspace> = None;
if opts.sql.is_some() {
match crate::core::v_latest::workspaces::data_frames::get_queryable_data_frame_workspace(
repo, path, commit,
) {
Ok(found_workspace) => {
workspace = Some(found_workspace);
}
Err(e) => return Err(e),
}
}
if let (Some(sql), Some(workspace)) = (opts.sql.clone(), workspace) {
let db_path = repositories::workspaces::data_frames::duckdb_path(&workspace, path);
let df = with_df_db_manager(db_path, |manager| {
manager.with_conn_mut(|conn| sql::query_df(conn, sql, None))
})?;
log::debug!("handle_sql_querying got df {df:?}");
let paginated_df = transform_new(df.clone().lazy(), opts).await?.collect()?;
let source_schema = if let Some(schema) =
repositories::data_frames::schemas::get_by_path(repo, &workspace.commit, path)?
{
schema
} else {
Schema::from_polars(paginated_df.schema())
};
let mut slice_schema = Schema::from_polars(df.schema());
slice_schema.update_metadata_from_schema(&source_schema);
return Ok(DataFrameSlice {
schemas: DataFrameSliceSchemas {
source: DataFrameSchemaSize {
size: data_frame_size.clone(),
schema: source_schema,
},
slice: DataFrameSchemaSize {
size: DataFrameSize {
width: paginated_df.width(),
height: paginated_df.height(),
},
schema: slice_schema,
},
},
slice: paginated_df,
total_entries: df.height(),
});
}
Err(OxenError::basic_str("Could not query data frame"))
}