use std::path::Path;
use crate::api;
use crate::api::client;
use crate::error::OxenError;
use crate::model::RemoteRepository;
use crate::opts::DFOpts;
use crate::util;
use crate::view::commit::CommitResponse;
use crate::view::data_frames::FromDirectoryRequest;
use crate::view::{JsonDataFrameViewResponse, StatusMessage};
pub async fn get(
remote_repo: &RemoteRepository,
commit_or_branch: &str,
path: impl AsRef<Path>,
opts: DFOpts,
) -> Result<JsonDataFrameViewResponse, OxenError> {
let path_str = util::fs::to_unix_str(path);
let query_str = opts.to_http_query_params();
let uri = format!("/data_frames/{commit_or_branch}/{path_str}?{query_str}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let res = client.get(&url).send().await?;
let body = client::parse_json_body(&url, res).await?;
log::debug!("got body: {body}");
let response: Result<JsonDataFrameViewResponse, serde_json::Error> =
serde_json::from_str(&body);
match response {
Ok(val) => {
log::debug!("got JsonDataFrameViewResponse: {val:?}");
Ok(val)
}
Err(err) => Err(OxenError::basic_str(format!(
"error parsing response from {url}\n\nErr {err:?} \n\n{body}"
))),
}
}
pub async fn index(
remote_repo: &RemoteRepository,
commit_or_branch: &str,
path: impl AsRef<Path>,
) -> Result<StatusMessage, OxenError> {
let path_str = path.as_ref().to_str().unwrap();
let uri = format!("/data_frames/index/{commit_or_branch}/{path_str}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let res = client.post(&url).send().await?;
let body = client::parse_json_body(&url, res).await?;
let response: Result<StatusMessage, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(val) => {
log::debug!("got StatusMessage: {val:?}");
Ok(val)
}
Err(err) => Err(OxenError::basic_str(format!(
"error parsing response from {url}\n\nErr {err:?} \n\n{body}"
))),
}
}
pub async fn from_directory(
remote_repo: &RemoteRepository,
commit_or_branch: &str,
path: impl AsRef<Path>,
request: FromDirectoryRequest,
) -> Result<CommitResponse, OxenError> {
let path_str = util::fs::to_unix_str(path);
let uri = format!("/data_frames/from_directory/{commit_or_branch}/{path_str}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let json_body = serde_json::to_string(&request)?;
let res = client
.post(&url)
.header("Content-Type", "application/json")
.body(json_body)
.send()
.await?;
let body = client::parse_json_body(&url, res).await?;
log::debug!("got body: {body}");
let response: Result<CommitResponse, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(val) => {
log::debug!("got CommitResponse: {val:?}");
Ok(val)
}
Err(err) => Err(OxenError::basic_str(format!(
"error parsing response from {url}\n\nErr {err:?} \n\n{body}"
))),
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use crate::api;
use crate::command;
use crate::constants;
use crate::constants::DEFAULT_BRANCH_NAME;
use crate::constants::DEFAULT_REMOTE_NAME;
use crate::error::OxenError;
use crate::opts::DFOpts;
use crate::repositories;
use crate::test;
use crate::util;
use crate::view::data_frames::FromDirectoryRequest;
use crate::view::data_frames::columns::NewColumn;
use serde_json::json;
#[tokio::test]
async fn test_fetch_schema_metadata() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("large_files");
util::fs::create_dir_all(&large_dir)?;
let csv_file = large_dir.join("test.csv");
let from_file = test::test_200k_csv();
util::fs::copy(from_file, &csv_file)?;
repositories::add(&local_repo, &csv_file).await?;
repositories::commit(&local_repo, "add test.csv")?;
let schema_ref = &PathBuf::from("large_files")
.join("test.csv")
.to_string_lossy()
.to_string();
let schema_metadata = json!({
"description": "A dataset of faces",
"task": "gen_faces"
});
let column_name = "image_id".to_string();
let column_metadata = json!({
"root": "images"
});
repositories::data_frames::schemas::add_column_metadata(
&local_repo,
schema_ref,
&column_name,
&column_metadata,
)?;
repositories::data_frames::schemas::add_schema_metadata(
&local_repo,
schema_ref,
&schema_metadata,
)?;
repositories::commit(&local_repo, "add test.csv schema metadata")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let mut opts = DFOpts::empty();
opts.page_size = Some(10);
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
PathBuf::from("large_files").join("test.csv"),
opts,
)
.await?;
assert_eq!(df.data_frame.source.size.height, 200_000);
assert_eq!(df.data_frame.source.size.width, 11);
assert_eq!(df.data_frame.view.pagination.page_number, 1);
assert_eq!(df.data_frame.view.pagination.page_size, 10);
assert_eq!(df.data_frame.view.pagination.total_entries, 200_000);
assert_eq!(df.data_frame.view.pagination.total_pages, 20_000);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 10);
assert_eq!(
df.data_frame.source.schema.metadata,
Some(schema_metadata.to_owned())
);
assert_eq!(
df.data_frame.source.schema.fields[0].metadata,
Some(column_metadata.to_owned())
);
assert_eq!(df.data_frame.view.schema.metadata, Some(schema_metadata));
assert_eq!(
df.data_frame.view.schema.fields[0].metadata,
Some(column_metadata)
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_paginate_df_page_one() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("large_files");
util::fs::create_dir_all(&large_dir)?;
let csv_file = large_dir.join("test.csv");
let from_file = test::test_200k_csv();
util::fs::copy(from_file, &csv_file)?;
repositories::add(&local_repo, &csv_file).await?;
repositories::commit(&local_repo, "add test.csv")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let mut opts = DFOpts::empty();
opts.page_size = Some(10);
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
PathBuf::from("large_files").join("test.csv"),
opts,
)
.await?;
assert_eq!(df.data_frame.source.size.height, 200_000);
assert_eq!(df.data_frame.source.size.width, 11);
assert_eq!(df.data_frame.view.pagination.page_number, 1);
assert_eq!(df.data_frame.view.pagination.page_size, 10);
assert_eq!(df.data_frame.view.pagination.total_entries, 200_000);
assert_eq!(df.data_frame.view.pagination.total_pages, 20_000);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 10);
Ok(())
})
.await
}
#[tokio::test]
async fn test_paginate_df_page_1_page_size_20() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("large_files");
util::fs::create_dir_all(&large_dir)?;
let csv_file = large_dir.join("test.csv");
let from_file = test::test_200k_csv();
util::fs::copy(from_file, &csv_file)?;
repositories::add(&local_repo, &csv_file).await?;
repositories::commit(&local_repo, "add test.csv")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let mut opts = DFOpts::empty();
opts.page = Some(1);
opts.page_size = Some(20);
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
"large_files/test.csv",
opts,
)
.await?;
assert_eq!(df.data_frame.source.size.height, 200_000);
assert_eq!(df.data_frame.source.size.width, 11);
assert_eq!(df.data_frame.view.pagination.page_number, 1);
assert_eq!(df.data_frame.view.pagination.page_size, 20);
assert_eq!(df.data_frame.view.pagination.total_entries, 200_000);
assert_eq!(df.data_frame.view.pagination.total_pages, 10000);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 20);
Ok(())
})
.await
}
#[tokio::test]
async fn test_paginate_df_after_sql() -> Result<(), OxenError> {
if std::env::consts::OS == "windows" {
return Ok(());
}
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("large_files");
util::fs::create_dir_all(&large_dir)?;
let csv_file = large_dir.join("test.csv");
let from_file = test::test_200k_csv();
util::fs::copy(from_file, &csv_file)?;
repositories::add(&local_repo, &csv_file).await?;
repositories::commit(&local_repo, "add test.csv")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
api::client::data_frames::index(
&remote_repo,
DEFAULT_BRANCH_NAME,
"large_files/test.csv",
)
.await?;
let mut opts = DFOpts::empty();
opts.page_size = Some(100);
opts.sql = Some(
"SELECT image_id,lefteye_x,lefteye_y FROM df WHERE lefteye_x > 70".to_string(),
);
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
PathBuf::from("large_files").join("test.csv"),
opts,
)
.await?;
assert_eq!(df.data_frame.source.size.height, 200_000);
assert_eq!(df.data_frame.source.size.width, 11);
assert_eq!(df.data_frame.view.size.height, 100);
assert_eq!(df.data_frame.view.size.width, 7);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 100);
assert_eq!(df.data_frame.view.pagination.page_number, 1);
assert_eq!(df.data_frame.view.pagination.page_size, 100);
assert_eq!(df.data_frame.view.pagination.total_entries, 37_291);
assert_eq!(df.data_frame.view.pagination.total_pages, 373);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 100);
let mut opts = DFOpts::empty();
opts.page = Some(2);
opts.page_size = Some(100);
opts.sql = Some(
"SELECT image_id,lefteye_x,lefteye_y FROM df WHERE lefteye_x > 70".to_string(),
);
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
PathBuf::from("large_files").join("test.csv"),
opts,
)
.await?;
assert_eq!(df.data_frame.source.size.height, 200_000);
assert_eq!(df.data_frame.source.size.width, 11);
assert_eq!(df.data_frame.view.size.height, 100);
assert_eq!(df.data_frame.view.size.width, 7);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 100);
assert_eq!(df.data_frame.view.pagination.page_number, 2);
assert_eq!(df.data_frame.view.pagination.page_size, 100);
assert_eq!(df.data_frame.view.pagination.total_entries, 37_291);
assert_eq!(df.data_frame.view.pagination.total_pages, 373);
Ok(())
})
.await
}
#[tokio::test]
async fn test_remote_get_schema_df_on_branch() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("csvs");
util::fs::create_dir_all(&large_dir)?;
let csv_file = large_dir.join("test.csv");
let from_file = test::test_csv_file_with_name("mixed_data_types.csv");
util::fs::copy(from_file, &csv_file)?;
repositories::add(&local_repo, &csv_file).await?;
repositories::commit(&local_repo, "add test.csv")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let schema_ref = &PathBuf::from("csvs")
.join("test.csv")
.to_string_lossy()
.to_string();
let remote_repo = test::create_remote_repo(&local_repo).await?;
let opts = DFOpts::empty();
let result =
api::client::data_frames::get(&remote_repo, DEFAULT_BRANCH_NAME, schema_ref, opts)
.await;
assert!(result.is_err());
repositories::push(&local_repo).await?;
let branch_name = "new_branch";
repositories::branches::create_checkout(&local_repo, branch_name)?;
let schema_metadata = json!({
"task": "chat_bot",
"description": "some generic description",
});
let column_name = "difficulty".to_string();
let column_metadata = json!(
{
"values": [0, 1, 2]
}
);
repositories::data_frames::schemas::add_schema_metadata(
&local_repo,
schema_ref,
&schema_metadata,
)?;
repositories::data_frames::schemas::add_column_metadata(
&local_repo,
schema_ref,
&column_name,
&column_metadata,
)?;
repositories::commit(&local_repo, "add test.csv schema metadata")?;
let opts = DFOpts::empty();
let result =
api::client::data_frames::get(&remote_repo, branch_name, schema_ref, opts).await;
assert!(result.is_err());
repositories::push(&local_repo).await?;
let opts = DFOpts::empty();
let results =
api::client::data_frames::get(&remote_repo, branch_name, schema_ref, opts).await;
assert!(results.is_ok());
let result = results.unwrap();
let schema = result.data_frame.source.schema;
assert_eq!(schema.fields.len(), 5);
assert_eq!(schema.fields[0].name, "prompt");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[1].name, "response");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "is_correct");
assert_eq!(schema.fields[2].dtype, "bool");
assert_eq!(schema.fields[3].name, "response_time");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "difficulty");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.metadata, Some(schema_metadata));
assert_eq!(schema.fields[4].metadata, Some(column_metadata));
Ok(())
})
.await
}
#[tokio::test]
async fn test_remote_parquet_no_params() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("data");
util::fs::create_dir_all(&large_dir)?;
let test_file = large_dir.join("test.parquet");
let from_file = test::test_1k_parquet();
util::fs::copy(from_file, &test_file)?;
repositories::add(&local_repo, &test_file).await?;
repositories::commit(&local_repo, "add test.parquet")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let mut opts = DFOpts::empty();
opts.columns = Some("id,title".to_string());
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
"data/test.parquet",
opts,
)
.await?;
let p_df = df.data_frame.view.to_df().await;
println!("{p_df:?}");
assert_eq!(df.data_frame.source.size.height, 1024);
assert_eq!(df.data_frame.source.size.width, 3);
assert_eq!(df.data_frame.view.size.height, constants::DEFAULT_PAGE_SIZE);
assert_eq!(df.data_frame.view.size.width, 2);
assert_eq!(df.data_frame.view.pagination.page_number, 1);
assert_eq!(
df.data_frame.view.pagination.page_size,
constants::DEFAULT_PAGE_SIZE
);
assert_eq!(df.data_frame.view.pagination.total_entries, 1024);
assert_eq!(df.data_frame.view.pagination.total_pages, 11);
assert_eq!(
df.data_frame.view.data.as_array().unwrap().len(),
constants::DEFAULT_PAGE_SIZE
);
assert_eq!(df.data_frame.view.data[0]["title"], "Anarchism");
Ok(())
})
.await
}
#[tokio::test]
async fn test_paginate_remote_parquet() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("data");
util::fs::create_dir_all(&large_dir)?;
let test_file = large_dir.join("test.parquet");
let from_file = test::test_1k_parquet();
util::fs::copy(from_file, &test_file)?;
repositories::add(&local_repo, &test_file).await?;
repositories::commit(&local_repo, "add test.parquet")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let mut opts = DFOpts::empty();
opts.page = Some(4);
opts.page_size = Some(5);
opts.columns = Some("id,title".to_string());
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
"data/test.parquet",
opts,
)
.await?;
let p_df = df.data_frame.view.to_df().await;
println!("{p_df:?}");
assert_eq!(df.data_frame.source.size.height, 1024);
assert_eq!(df.data_frame.source.size.width, 3);
assert_eq!(df.data_frame.view.size.height, 5);
assert_eq!(df.data_frame.view.size.width, 2);
assert_eq!(df.data_frame.view.pagination.page_number, 4);
assert_eq!(df.data_frame.view.pagination.page_size, 5);
assert_eq!(df.data_frame.view.pagination.total_entries, 1024);
assert_eq!(df.data_frame.view.pagination.total_pages, 205);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 5);
println!("{}", df.data_frame.view.data[0]["title"]);
assert_eq!(df.data_frame.view.data[0]["title"], "Ayn Rand");
Ok(())
})
.await
}
#[tokio::test]
async fn test_slice_remote_parquet() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let large_dir = repo_dir.join("data");
util::fs::create_dir_all(&large_dir)?;
let test_file = large_dir.join("test.parquet");
let from_file = test::test_1k_parquet();
util::fs::copy(from_file, &test_file)?;
repositories::add(&local_repo, &test_file).await?;
repositories::commit(&local_repo, "add test.parquet")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let mut opts = DFOpts::empty();
opts.slice = Some("330..333".to_string());
opts.columns = Some("id,title".to_string());
let df = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
"data/test.parquet",
opts,
)
.await?;
let p_df = df.data_frame.view.to_df().await;
println!("{p_df:?}");
assert_eq!(df.data_frame.source.size.height, 1024);
assert_eq!(df.data_frame.source.size.width, 3);
assert_eq!(df.data_frame.view.size.height, 3);
assert_eq!(df.data_frame.view.size.width, 2);
assert_eq!(df.data_frame.view.data.as_array().unwrap().len(), 3);
println!("{}", df.data_frame.view.data[0]["title"]);
assert_eq!(df.data_frame.view.data[0]["title"], "April 26");
Ok(())
})
.await
}
#[tokio::test]
async fn test_from_directory() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let test_dir = repo_dir.join("test_files");
util::fs::create_dir_all(&test_dir)?;
let subdir = test_dir.join("subdir");
util::fs::create_dir_all(&subdir)?;
let file1 = test_dir.join("file1.txt");
std::fs::write(&file1, "content1")?;
let file2 = test_dir.join("file2.txt");
std::fs::write(&file2, "content2")?;
let file3 = subdir.join("file3.txt");
std::fs::write(&file3, "content3")?;
repositories::add(&local_repo, &test_dir).await?;
repositories::commit(&local_repo, "add test files")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let request = FromDirectoryRequest {
output_path: Some("file_listing.parquet".to_string()),
extra_columns: Some(vec![
NewColumn {
name: "file_size".to_string(),
data_type: "float".to_string(),
},
NewColumn {
name: "file_type".to_string(),
data_type: "float".to_string(),
},
]),
commit_message: Some("Generated directory listing".to_string()),
user_name: Some("test_user".to_string()),
user_email: Some("test@example.com".to_string()),
recursive: Some(true),
};
let response = api::client::data_frames::from_directory(
&remote_repo,
DEFAULT_BRANCH_NAME,
"test_files",
request,
)
.await?;
let files = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
"file_listing.parquet",
DFOpts::empty(),
)
.await?;
let p_df = files.data_frame.view.to_df().await;
assert_eq!(p_df.height(), 3);
assert_eq!(p_df.width(), 3);
let file_path_col = p_df.column("file_path").unwrap();
let file_paths: Vec<&str> = file_path_col.str().unwrap().into_no_null_iter().collect();
let file_1_str = PathBuf::from("test_files")
.join("file1.txt")
.to_str()
.unwrap()
.to_string();
let file_2_str = PathBuf::from("test_files")
.join("file2.txt")
.to_str()
.unwrap()
.to_string();
let file_3_str = PathBuf::from("test_files")
.join("subdir")
.join("file3.txt")
.to_str()
.unwrap()
.to_string();
assert!(file_paths.contains(&file_1_str.as_str()));
assert!(file_paths.contains(&file_2_str.as_str()));
assert!(file_paths.contains(&file_3_str.as_str()));
assert!(response.status.status_message == "resource_created");
assert!(!response.commit.id.is_empty());
assert_eq!(response.commit.message, "Generated directory listing");
Ok(())
})
.await
}
#[tokio::test]
async fn test_from_directory_with_images_sets_metadata() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let test_dir = repo_dir.join("test_images");
util::fs::create_dir_all(&test_dir)?;
let image1 = test_dir.join("image1.jpg");
let image2 = test_dir.join("image2.png");
let image3 = test_dir.join("image3.jpeg");
util::fs::copy(test::test_img_file_with_name("cat_1.jpg"), &image1)?;
util::fs::copy(test::test_img_file_with_name("cat_rgba.png"), &image2)?;
util::fs::copy(test::test_img_file_with_name("dog_1.jpg"), &image3)?;
repositories::add(&local_repo, &test_dir).await?;
repositories::commit(&local_repo, "add test images")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let request = FromDirectoryRequest {
output_path: Some("image_listing.parquet".to_string()),
extra_columns: None,
commit_message: Some("Generated image listing".to_string()),
user_name: Some("test_user".to_string()),
user_email: Some("test@example.com".to_string()),
recursive: Some(false),
};
let response = api::client::data_frames::from_directory(
&remote_repo,
DEFAULT_BRANCH_NAME,
"test_images",
request,
)
.await?;
let files = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
"image_listing.parquet",
DFOpts::empty(),
)
.await?;
let p_df = files.data_frame.view.to_df().await;
assert_eq!(p_df.height(), 3);
let schema = &files.data_frame.view.schema;
let file_path_field = schema
.fields
.iter()
.find(|f| f.name == "file_path")
.expect("file_path column should exist");
assert!(
file_path_field.metadata.is_some(),
"file_path column should have metadata"
);
let metadata = file_path_field.metadata.as_ref().unwrap();
assert_eq!(
metadata["_oxen"]["render"]["func"], "image",
"file_path column should have image render metadata"
);
assert_eq!(response.status.status_message, "resource_created");
assert!(!response.commit.id.is_empty());
assert_eq!(response.commit.message, "Generated image listing");
Ok(())
})
.await
}
#[tokio::test]
async fn test_from_directory_mixed_files_no_image_metadata() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut local_repo| async move {
let repo_dir = &local_repo.path;
let test_dir = repo_dir.join("test_mixed");
util::fs::create_dir_all(&test_dir)?;
let image1 = test_dir.join("image1.jpg");
let text1 = test_dir.join("file1.txt");
util::fs::copy(test::test_img_file_with_name("cat_1.jpg"), &image1)?;
std::fs::write(&text1, "content1")?;
repositories::add(&local_repo, &test_dir).await?;
repositories::commit(&local_repo, "add mixed files")?;
let remote = test::repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let request = FromDirectoryRequest {
output_path: Some("mixed_listing.parquet".to_string()),
extra_columns: None,
commit_message: Some("Generated mixed listing".to_string()),
user_name: Some("test_user".to_string()),
user_email: Some("test@example.com".to_string()),
recursive: Some(false),
};
let _response = api::client::data_frames::from_directory(
&remote_repo,
DEFAULT_BRANCH_NAME,
"test_mixed",
request,
)
.await?;
let files = api::client::data_frames::get(
&remote_repo,
DEFAULT_BRANCH_NAME,
"mixed_listing.parquet",
DFOpts::empty(),
)
.await?;
let schema = &files.data_frame.view.schema;
let file_path_field = schema
.fields
.iter()
.find(|f| f.name == "file_path")
.expect("file_path column should exist");
if let Some(metadata) = &file_path_field.metadata {
if metadata.get("_oxen").is_some() {
assert!(
metadata["_oxen"].get("render").is_none(),
"Mixed file column should not have image render metadata"
);
}
}
Ok(())
})
.await
}
}