use std::collections::HashMap;
use std::path::PathBuf;
use crate::core;
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::{Commit, LocalRepository, Schema};
use crate::repositories;
use std::path::Path;
pub fn list(
repo: &LocalRepository,
commit: &Commit,
) -> Result<HashMap<PathBuf, Schema>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::data_frames::schemas::list(repo, commit),
}
}
pub fn get_by_path(
repo: &LocalRepository,
commit: &Commit,
path: impl AsRef<Path>,
) -> Result<Option<Schema>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::data_frames::schemas::get_by_path(repo, commit, path),
}
}
pub fn get_staged(
repo: &LocalRepository,
path: impl AsRef<Path>,
) -> Result<Option<Schema>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::data_frames::schemas::get_staged(repo, path),
}
}
pub fn get_staged_schema_with_staged_db_manager(
repo: &LocalRepository,
path: impl AsRef<Path>,
) -> Result<Option<Schema>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::data_frames::schemas::get_staged_schema_with_staged_db_manager(
repo, path,
),
}
}
pub fn restore_schema(
repo: &LocalRepository,
path: impl AsRef<Path>,
og_schema: &Schema,
before_column: &str,
after_column: &str,
) -> Result<(), OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => Err(OxenError::basic_str("Not implemented for v0.10.0")),
_ => core::v_latest::data_frames::schemas::restore_schema(
repo,
path,
og_schema,
before_column,
after_column,
),
}
}
pub fn list_staged(repo: &LocalRepository) -> Result<HashMap<PathBuf, Schema>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::data_frames::schemas::list_staged(repo),
}
}
pub fn show(
repo: &LocalRepository,
path: impl AsRef<Path>,
staged: bool,
verbose: bool,
) -> Result<String, OxenError> {
let path = path.as_ref();
let schema = if staged {
get_staged(repo, path)?
} else {
match repositories::commits::head_commit_maybe(repo)? {
Some(commit) => repositories::data_frames::schemas::get_by_path(repo, &commit, path)?,
None => None,
}
};
log::debug!("show: {schema:?}");
let Some(schema) = schema else {
return Err(OxenError::schema_does_not_exist(path));
};
let mut results = String::new();
if verbose {
let verbose_str = schema.verbose_str();
results.push_str(&format!(
"{} {}\n{}\n",
path.to_string_lossy(),
schema.hash,
verbose_str
));
} else {
results.push_str(&format!(
"{}\t{}\t{}",
path.to_string_lossy(),
schema.hash,
schema
))
}
Ok(results)
}
pub fn rm(repo: &LocalRepository, path: impl AsRef<Path>, staged: bool) -> Result<(), OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::data_frames::schemas::rm(repo, path, staged),
}
}
pub fn add_schema_metadata(
repo: &LocalRepository,
path: impl AsRef<Path>,
metadata: &serde_json::Value,
) -> Result<HashMap<PathBuf, Schema>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::data_frames::schemas::add_schema_metadata(repo, path, metadata),
}
}
pub fn add_column_metadata(
repo: &LocalRepository,
path: impl AsRef<Path>,
column: impl AsRef<str>,
metadata: &serde_json::Value,
) -> Result<HashMap<PathBuf, Schema>, OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
MinOxenVersion::V0_19_0 => {
core::v_latest::data_frames::schemas::add_column_metadata(repo, path, column, metadata)
}
_ => {
core::v_latest::data_frames::schemas::add_column_metadata(repo, path, column, metadata)
}
}
}
#[cfg(test)]
mod tests {
use crate::error::OxenError;
use crate::test;
use crate::util;
use crate::{command, repositories};
use serde_json::json;
use std::path::{Path, PathBuf};
#[tokio::test]
async fn test_command_schema_list() -> Result<(), OxenError> {
test::run_training_data_repo_test_fully_committed_async(|repo| async move {
let commit = repositories::commits::head_commit(&repo)?;
let schemas = repositories::data_frames::schemas::list(&repo, &commit)?;
assert_eq!(schemas.len(), 8);
let path = PathBuf::from("annotations")
.join("train")
.join("bounding_box.csv");
let schema =
repositories::data_frames::schemas::get_by_path(&repo, &commit, &path)?.unwrap();
assert_eq!(schema.hash, "b821946753334c083124fd563377d795");
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
Ok(())
})
.await
}
#[tokio::test]
async fn test_stage_and_commit_schema() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|repo| async move {
let status = repositories::status(&repo).await?;
assert_eq!(status.staged_schemas.len(), 0);
let bbox_filename = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = repo.path.join(bbox_filename);
repositories::add(&repo, bbox_file).await?;
let status = repositories::status(&repo).await?;
assert_eq!(status.staged_schemas.len(), 1);
for (path, schema) in status.staged_schemas.iter() {
println!("GOT SCHEMA {path:?} -> {schema:?}");
}
let commit = repositories::commit(&repo, "Adding bounding box schema")?;
let status = repositories::status(&repo).await?;
assert_eq!(status.staged_schemas.len(), 0);
let path = PathBuf::from("annotations")
.join("train")
.join("bounding_box.csv");
let schema =
repositories::data_frames::schemas::get_by_path(&repo, &commit, &path)?.unwrap();
assert_eq!(schema.hash, "b821946753334c083124fd563377d795");
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
Ok(())
})
.await
}
#[tokio::test]
async fn test_copy_schemas_from_parent() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|repo| async move {
let status = repositories::status(&repo).await?;
assert_eq!(status.staged_schemas.len(), 0);
let bbox_filename = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = repo.path.join(&bbox_filename);
repositories::add(&repo, bbox_file).await?;
let status = repositories::status(&repo).await?;
assert_eq!(status.staged_schemas.len(), 1);
for (path, schema) in status.staged_schemas.iter() {
println!("GOT SCHEMA {path:?} -> {schema:?}");
}
repositories::commit(&repo, "Adding bounding box schema")?;
let readme_filename = Path::new("README.md");
let readme_file = repo.path.join(readme_filename);
util::fs::write(&readme_file, "Changing the README")?;
repositories::add(&repo, readme_file).await?;
let commit = repositories::commit(&repo, "Changing the README")?;
let maybe_schema =
repositories::data_frames::schemas::get_by_path(&repo, &commit, &bbox_filename)?;
assert!(maybe_schema.is_some());
let schema = maybe_schema.unwrap();
assert_eq!(schema.hash, "b821946753334c083124fd563377d795");
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_add_staged() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_path = repo
.path
.join("annotations")
.join("train")
.join("bounding_box.csv");
repositories::add(&repo, &bbox_path).await?;
let bbox_file = util::fs::path_relative_to_dir(&bbox_path, &repo.path)?;
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_path)?.unwrap();
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
let min_x_meta = json!({
"key": "val"
});
let updated_schemas = repositories::data_frames::schemas::add_column_metadata(
&repo,
&bbox_path,
"min_x",
&min_x_meta,
)?;
let updated_schema = updated_schemas
.get(&bbox_file)
.expect("Expected to find updated schema");
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_path)?.unwrap();
assert_eq!(updated_schema.hash, schema.hash);
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[2].metadata, Some(min_x_meta));
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_schema_rm_staged() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_path = repo
.path
.join("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = util::fs::path_relative_to_dir(&bbox_path, &repo.path)?;
let schema_ref = bbox_file.to_str().unwrap();
let min_x_meta = json!({
"key": "val"
});
repositories::add(&repo, &bbox_path).await?;
repositories::data_frames::schemas::add_column_metadata(
&repo,
schema_ref,
"min_x",
&min_x_meta,
)?;
let schema = repositories::data_frames::schemas::get_staged(&repo, schema_ref)?;
assert!(schema.is_some());
repositories::data_frames::schemas::rm(&repo, schema_ref, true)?;
let schema = repositories::data_frames::schemas::get_staged(&repo, schema_ref)?;
assert!(schema.is_none());
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_add_schema_metadata() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_path = repo
.path
.join("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = util::fs::path_relative_to_dir(&bbox_path, &repo.path)?;
repositories::add(&repo, &bbox_path).await?;
repositories::commit(&repo, "Adding bounding box file")?;
let metadata = json!({
"task": "bounding_box",
"description": "detect some bounding boxes"
});
repositories::data_frames::schemas::add_schema_metadata(&repo, &bbox_file, &metadata)?;
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(schema.metadata, Some(metadata));
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_add_schema_metadata_and_col_metadata() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_path = repo
.path
.join("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = util::fs::path_relative_to_dir(&bbox_path, &repo.path)?;
repositories::add(&repo, &bbox_path).await?;
repositories::commit(&repo, "Adding bounding box file")?;
let schema_metadata = json!({
"task": "bounding_box",
"description": "detect some bounding boxes"
});
let column_name = "file".to_string();
let column_metadata = json!({
"root": "images"
});
repositories::data_frames::schemas::add_schema_metadata(
&repo,
&bbox_file,
&schema_metadata,
)?;
repositories::data_frames::schemas::add_column_metadata(
&repo,
&bbox_file,
column_name,
&column_metadata,
)?;
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(schema.metadata, Some(schema_metadata));
assert_eq!(schema.fields[0].metadata, Some(column_metadata));
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_add_column_metadata() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_file = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_path = repo.path.join(&bbox_file);
repositories::add(&repo, &bbox_path).await?;
let status = repositories::status(&repo).await?;
println!("status: {status:?}");
status.print();
let metadata = json!({
"root": "images"
});
repositories::data_frames::schemas::add_column_metadata(
&repo, &bbox_file, "file", &metadata,
)?;
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[0].metadata, Some(metadata));
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_add_column_to_committed_schema2() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_file = Path::new("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_path = repo.path.join(&bbox_file);
repositories::add(&repo, &bbox_path).await?;
let commit = repositories::commit(&repo, "Adding bounding box file")?;
let schemas = repositories::data_frames::schemas::list(&repo, &commit)?;
for (path, schema) in schemas.iter() {
println!("GOT SCHEMA {path:?} -> {schema:?}");
}
let metadata = json!({
"root": "images"
});
repositories::add(&repo, &bbox_path).await?;
repositories::data_frames::schemas::add_column_metadata(
&repo, &bbox_file, "file", &metadata,
)?;
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[0].metadata, Some(metadata.to_owned()));
let commit = repositories::commit(&repo, "Adding metadata to file column")?;
let schemas = repositories::data_frames::schemas::list(&repo, &commit)?;
for (path, schema) in schemas.iter() {
println!("GOT SCHEMA {path:?} -> {schema:?}");
}
assert_eq!(schemas.len(), 1);
let schema = schemas.values().next().unwrap();
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[0].metadata, Some(metadata));
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_add_column_to_committed_schema_after_changing_data()
-> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_path = repo
.path
.join("annotations")
.join("train")
.join("bounding_box.csv");
repositories::add(&repo, &bbox_path).await?;
let commit = repositories::commit(&repo, "Adding bounding box file")?;
let schemas = repositories::data_frames::schemas::list(&repo, &commit)?;
for (path, schema) in schemas.iter() {
println!("GOT SCHEMA {path:?} -> {schema:?}");
}
let bbox_file = util::fs::path_relative_to_dir(&bbox_path, &repo.path)?;
let metadata = json!({
"root": "images"
});
repositories::data_frames::schemas::add_column_metadata(
&repo, &bbox_file, "file", &metadata,
)?;
repositories::commit(&repo, "Adding metadata to file column")?;
command::df::add_column(&bbox_path, "new_column:0:i32").await?;
repositories::add(&repo, &bbox_path).await?;
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(schema.fields.len(), 7);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[0].metadata, Some(metadata));
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_persist_schema_types_across_commits() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |repo| async move {
let bbox_path = repo
.path
.join("annotations")
.join("train")
.join("bounding_box.csv");
let bbox_file = util::fs::path_relative_to_dir(&bbox_path, &repo.path)?;
let file_metadata = json!({
"root": "images"
});
repositories::add(&repo, &bbox_path).await?;
println!("after add initial metadata to: {bbox_file:?}");
repositories::data_frames::schemas::add_column_metadata(
&repo,
&bbox_file,
"file",
&file_metadata,
)?;
println!("staged column metadata to: {bbox_file:?}");
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[0].metadata, Some(file_metadata.to_owned()));
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
repositories::commit(&repo, "Adding bounding box schema")?;
let min_x_metadata = json!({
"key": "val"
});
let updated_schemas = repositories::data_frames::schemas::add_column_metadata(
&repo,
&bbox_file,
"min_x",
&min_x_metadata,
)?;
let updated_schema = updated_schemas
.get(&bbox_file)
.expect("Expected to find updated schema");
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(updated_schema.hash, schema.hash);
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[0].metadata, Some(file_metadata.to_owned()));
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[2].metadata, Some(min_x_metadata.to_owned()));
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
repositories::commit(&repo, "Updating the bounding box schema")?;
let width_metadata = json!({
"metric": "meters"
});
let updated_schemas = repositories::data_frames::schemas::add_column_metadata(
&repo,
&bbox_file,
"width",
&width_metadata,
)?;
let updated_schema = updated_schemas
.get(&bbox_file)
.expect("Expected to find updated schema");
let schema =
repositories::data_frames::schemas::get_staged(&repo, &bbox_file)?.unwrap();
assert_eq!(updated_schema.hash, schema.hash);
assert_eq!(schema.fields.len(), 6);
assert_eq!(schema.fields[0].name, "file");
assert_eq!(schema.fields[0].dtype, "str");
assert_eq!(schema.fields[0].metadata, Some(file_metadata.to_owned()));
assert_eq!(schema.fields[1].name, "label");
assert_eq!(schema.fields[1].dtype, "str");
assert_eq!(schema.fields[2].name, "min_x");
assert_eq!(schema.fields[2].dtype, "f64");
assert_eq!(schema.fields[2].metadata, Some(min_x_metadata));
assert_eq!(schema.fields[3].name, "min_y");
assert_eq!(schema.fields[3].dtype, "f64");
assert_eq!(schema.fields[4].name, "width");
assert_eq!(schema.fields[4].dtype, "i64");
assert_eq!(schema.fields[4].metadata, Some(width_metadata));
assert_eq!(schema.fields[5].name, "height");
assert_eq!(schema.fields[5].dtype, "i64");
Ok(())
})
.await
}
#[tokio::test]
async fn test_schemas_merge_fast_forward_pull() -> Result<(), OxenError> {
test::run_select_data_sync_remote("README.md", |_local_repo, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|repo_dir_a| async move {
let repo_dir_a = repo_dir_a.join("repo_a");
let cloned_repo_a =
repositories::clone_url(&remote_repo.remote.url, &repo_dir_a).await?;
test::run_empty_dir_test_async(|repo_dir_b| async move {
println!("=== Starting test: Two users pushing to remote ===");
println!("User A: Will add schema metadata to dataframe");
println!("User B: Will edit README");
let data_filename = Path::new("data.csv");
let data_path = cloned_repo_a.path.join(data_filename);
let column_name = "name";
let data_content = "name,age\nJohn,30\nJane,25";
util::fs::write_to_path(&data_path, data_content)?;
repositories::add(&cloned_repo_a, &data_path).await?;
repositories::commit(&cloned_repo_a, "User A adding data.csv")?;
repositories::push(&cloned_repo_a).await?;
let repo_dir_b = repo_dir_b.join("repo_b");
let cloned_repo_b =
repositories::clone_url(&remote_repo.remote.url, &repo_dir_b).await?;
println!("User A: Adding schema metadata...");
let column_metadata = json!({
"my_custom": "name_column_metadata"
});
repositories::data_frames::schemas::add_column_metadata(
&cloned_repo_a,
data_filename,
column_name,
&column_metadata,
)?;
repositories::commit(&cloned_repo_a, "User A adding schema metadata")?;
println!("User A: Pushing schema metadata...");
repositories::push(&cloned_repo_a).await?;
println!("User A pushed ✅ 1");
let commit = repositories::commits::head_commit(&cloned_repo_a)?;
let schema = repositories::data_frames::schemas::get_by_path(
&cloned_repo_a,
&commit,
data_filename,
)?
.expect("Schema should exist");
println!("User A: Schema: {schema:?}");
let num_commits_a = repositories::commits::list(&cloned_repo_a)?.len();
println!("User A: Number of commits: {num_commits_a}");
let commits = repositories::commits::list(&cloned_repo_a)?;
for commit in &commits {
println!("User A: GOT COMMIT {commit}");
}
println!("================================================");
println!("User B: Editing README...");
let readme_path = cloned_repo_b.path.join("README.md");
let readme_content = "Updated README by User B";
util::fs::write_to_path(&readme_path, readme_content)?;
repositories::add(&cloned_repo_b, &readme_path).await?;
repositories::commit(&cloned_repo_b, "User B editing README")?;
let commits = repositories::commits::list(&cloned_repo_b)?;
for commit in &commits {
println!("User B: GOT COMMIT BEFORE {commit}");
}
println!("================================================");
println!("User B: Attempting to push README changes (should fail because remote is ahead)...");
let push_result = repositories::push(&cloned_repo_b).await;
assert!(push_result.is_err(), "Push should fail when remote is ahead");
println!("User B push failed ✅ 3 (as expected - remote is ahead)");
println!("User B: Pulling changes...");
repositories::pull(&cloned_repo_b).await?;
println!("User B pulled ✅ 4");
let commits = repositories::commits::list(&cloned_repo_b)?;
for commit in &commits {
println!("User B: GOT COMMIT AFTER {commit}");
}
println!("================================================");
let readme_content_after_pull = util::fs::read_from_path(&readme_path)?;
assert_eq!(
readme_content_after_pull, readme_content,
"README should still have User B's changes"
);
let commit = repositories::commits::head_commit(&cloned_repo_b)?;
println!("User B: CHECKING COMMIT {commit}");
let schema = repositories::data_frames::schemas::get_by_path(
&cloned_repo_b,
&commit,
data_filename,
)?
.expect("Schema should exist");
println!("User B: Schema: {schema:?}");
let name_field = schema.fields.iter()
.find(|field| field.name == column_name)
.expect("Schema should have a 'name' field");
assert_eq!(
name_field.metadata, Some(column_metadata.clone()),
"Schema should have User A's metadata on the 'name' field"
);
println!("User B: Pushing README changes (should succeed after pull)...");
repositories::push(&cloned_repo_b).await?;
println!("User B pushed ✅ 5");
Ok(())
})
.await?;
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
}