use crate::api;
use crate::command;
use crate::constants;
use crate::constants::DEFAULT_REMOTE_NAME;
use crate::core;
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::RepoNew;
use crate::model::Schema;
use crate::model::User;
use crate::model::data_frame::schema::Field;
use crate::model::file::FileContents;
use crate::model::file::FileNew;
use crate::model::merkle_tree::node::merkle_tree_node_cache;
use crate::model::{LocalRepository, RemoteRepository};
use crate::opts::RmOpts;
use crate::repositories;
use crate::util;
use rand::Rng;
use rand::distributions::Alphanumeric;
use std::fs::File;
use std::fs::OpenOptions;
use std::future::Future;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::Mutex;
use tokio::time::sleep;
pub const DEFAULT_TEST_HOST: &str = "localhost:3000";
pub static REPO_ROOT: LazyLock<PathBuf> = LazyLock::new(|| {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.ancestors()
.nth(2)
.unwrap()
.to_path_buf()
});
pub static TEST_DATA_DIR: LazyLock<PathBuf> = LazyLock::new(|| REPO_ROOT.join("data"));
pub fn test_run_dir() -> PathBuf {
TEST_DATA_DIR.join("test").join("runs")
}
pub fn test_host() -> String {
match std::env::var("OXEN_TEST_HOST") {
Ok(host) => host,
Err(_err) => String::from(DEFAULT_TEST_HOST),
}
}
fn generate_random_string(len: usize) -> String {
rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(len)
.map(char::from)
.collect()
}
pub fn repo_remote_url_from(name: &str) -> String {
api::endpoint::remote_url_from_namespace_name(
test_host().as_str(),
constants::DEFAULT_NAMESPACE,
name,
)
}
static ENV_LOCK: LazyLock<Arc<Mutex<bool>>> = LazyLock::new(|| Arc::new(Mutex::new(false)));
pub fn init_test_env() {
util::logging::init_logging();
match ENV_LOCK.lock() {
Ok(mut logging_setup) => {
if !*logging_setup {
util::logging::init_logging();
*logging_setup = true;
}
unsafe {
std::env::set_var("TEST", "true");
}
}
Err(e) => {
panic!("Failed to acquire environment lock to initialize test environment: {e}");
}
}
}
fn create_prefixed_dir(
base_dir: impl AsRef<Path>,
prefix: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let base_dir = base_dir.as_ref();
let prefix = prefix.as_ref();
let repo_name = format!("{}", uuid::Uuid::new_v4());
let full_dir = Path::new(base_dir).join(prefix).join(repo_name);
util::fs::create_dir_all(&full_dir)?;
Ok(full_dir)
}
pub fn create_repo_dir(base_dir: impl AsRef<Path>) -> Result<PathBuf, OxenError> {
create_prefixed_dir(base_dir, "repo")
}
fn create_empty_dir(base_dir: impl AsRef<Path>) -> Result<PathBuf, OxenError> {
create_prefixed_dir(base_dir, "dir")
}
pub async fn create_remote_repo(repo: &LocalRepository) -> Result<RemoteRepository, OxenError> {
let repo_new = RepoNew::from_namespace_name_host(
constants::DEFAULT_NAMESPACE,
repo.dirname(),
test_host(),
None,
);
api::client::repositories::create_from_local(repo, repo_new).await
}
pub async fn create_or_clear_remote_repo(
repo: &LocalRepository,
) -> Result<RemoteRepository, OxenError> {
let repo_new = RepoNew::from_namespace_name_host(
constants::DEFAULT_NAMESPACE,
repo.dirname(),
test_host(),
None,
);
let remote_name = format!("/{}/{}", constants::DEFAULT_NAMESPACE, repo.dirname());
let host = repo_new.host();
let url = api::endpoint::url_from_host(&host, &remote_name);
let _ = api::client::repositories::delete_from_url(url).await;
sleep(std::time::Duration::from_millis(500)).await;
api::client::repositories::create_from_local(repo, repo_new).await
}
pub async fn add_n_files_m_dirs(
repo: &LocalRepository,
num_files: u64,
num_dirs: u64,
) -> Result<(), OxenError> {
let readme_file = repo.path.join("README.md");
util::fs::write_to_path(&readme_file, format!("Repo with {num_files} files"))?;
repositories::add(repo, &readme_file).await?;
let files_csv = repo.path.join("files.csv");
let mut file = File::create(&files_csv)?;
file.write_all(b"file,label\n")?;
for i in 0..num_files {
let label = if i % 2 == 0 { "cat" } else { "dog" };
file.write_all(format!("file{i}.txt,{label}\n").as_bytes())?;
}
file.flush()?;
let files_dir = repo.path.join("files");
util::fs::create_dir_all(&files_dir)?;
for i in 0..num_files {
let dir_num = i % num_dirs;
let dir_path = files_dir.join(format!("dir_{dir_num}"));
util::fs::create_dir_all(&dir_path)?;
let file_file = dir_path.join(format!("file{i}.txt"));
util::fs::write_to_path(&file_file, format!("File {i}"))?;
}
repositories::add(repo, &files_csv).await?;
repositories::add(repo, &files_dir).await?;
Ok(())
}
pub fn run_empty_dir_test<T>(test: T) -> Result<(), OxenError>
where
T: FnOnce(&Path) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_empty_dir_test start");
let repo_dir = create_empty_dir(test_run_dir())?;
log::info!(">>>>> run_empty_dir_test running test");
let result = std::panic::catch_unwind(|| match test(&repo_dir) {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err}");
}
});
maybe_cleanup_repo(&repo_dir)?;
assert!(result.is_ok());
Ok(())
}
pub async fn run_empty_dir_test_async<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(PathBuf) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_empty_dir_test_async start");
let repo_dir = create_empty_dir(test_run_dir())?;
log::info!(">>>>> run_empty_dir_test_async running test");
let result = match test(repo_dir.clone()).await {
Ok(_) => {
true
}
Err(err) => {
eprintln!("Error running test. Err: {err:?}");
false
}
};
assert!(result);
Ok(())
}
pub fn run_empty_local_repo_test<T>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_empty_local_repo_test start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
log::info!(">>>>> run_empty_local_repo_test running test");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| match test(repo) {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err}");
}
}));
maybe_cleanup_repo(&repo_dir)?;
assert!(result.is_ok());
Ok(())
}
pub fn run_empty_local_repo_test_w_version<T>(
version: MinOxenVersion,
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_empty_local_repo_test start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init::init_with_version(&repo_dir, version)?;
log::info!(">>>>> run_empty_local_repo_test running test");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| match test(repo) {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err}");
}
}));
maybe_cleanup_repo(&repo_dir)?;
assert!(result.is_ok());
Ok(())
}
pub async fn run_empty_local_repo_test_async<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_empty_local_repo_test_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
let version_store = repo.version_store()?;
version_store.init().await?;
log::info!(">>>>> run_empty_local_repo_test_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_one_commit_local_repo_test<T>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_one_commit_local_repo_test start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
let txt = generate_random_string(20);
let file_path = add_txt_file_to_dir(&repo_dir, &txt)?;
repositories::add(&repo, &file_path).await?;
repositories::commit(&repo, "Init commit")?;
log::info!(">>>>> run_one_commit_local_repo_test running test");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| match test(repo) {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err}");
}
}));
maybe_cleanup_repo(&repo_dir)?;
assert!(result.is_ok());
Ok(())
}
pub async fn run_one_commit_local_repo_test_async<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::debug!("run_one_commit_local_repo_test_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
let txt = generate_random_string(20);
let file_path = add_txt_file_to_dir(&repo_dir, &txt)?;
repositories::add(&repo, &file_path).await?;
repositories::commit(&repo, "Init commit")?;
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_one_commit_sync_repo_test<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_one_commit_sync_repo_test start");
let repo_dir = create_repo_dir(test_run_dir())?;
let mut local_repo = repositories::init(&repo_dir)?;
let txt = generate_random_string(20);
let file_path = add_txt_file_to_dir(&repo_dir, &txt)?;
repositories::add(&local_repo, &file_path).await?;
repositories::commit(&local_repo, "Init commit")?;
let remote_repo = create_remote_repo(&local_repo).await?;
command::config::set_remote(
&mut local_repo,
DEFAULT_REMOTE_NAME,
&remote_repo.remote.url,
)?;
repositories::push(&local_repo).await?;
log::info!(">>>>> run_one_commit_sync_repo_test running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo_with_remote(&repo_dir, &remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn run_many_local_commits_empty_sync_remote_test<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_many_local_commits_empty_sync_remote_test start");
let repo_dir = create_repo_dir(test_run_dir())?;
let mut local_repo = repositories::init(&repo_dir)?;
let remote_repo = create_remote_repo(&local_repo).await?;
command::config::set_remote(
&mut local_repo,
DEFAULT_REMOTE_NAME,
&remote_repo.remote.url,
)?;
let local_repo_dir = local_repo.path.clone();
for i in 1..26 {
let txt = generate_random_string(20);
let file_path = add_txt_file_to_dir(&local_repo_dir, &txt)?;
repositories::add(&local_repo, &file_path).await?;
repositories::commit(&local_repo, &format!("Adding file_{i}"))?;
}
log::info!(">>>>> run_many_local_commits_empty_sync_remote_test running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo_with_remote(&repo_dir, &remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn run_training_data_sync_test_no_commits<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_training_data_sync_test_no_commits start");
let repo_dir = create_repo_dir(test_run_dir())?;
let local_repo = repositories::init(&repo_dir)?;
populate_dir_with_training_data(&repo_dir)?;
let remote_repo = create_remote_repo(&local_repo).await?;
println!("Got remote repo: {remote_repo:?}");
log::info!(">>>>> run_training_data_sync_test_no_commits running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo_with_remote(&repo_dir, &remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn make_many_commits(local_repo: &LocalRepository) -> Result<(), OxenError> {
repositories::add(local_repo, local_repo.path.join("train")).await?;
repositories::commit(local_repo, "Adding train/")?;
repositories::add(local_repo, local_repo.path.join("test")).await?;
repositories::commit(local_repo, "Adding test/")?;
repositories::add(local_repo, local_repo.path.join("annotations")).await?;
repositories::commit(local_repo, "Adding annotations/")?;
repositories::add(local_repo, local_repo.path.join("nlp")).await?;
repositories::commit(local_repo, "Adding nlp/")?;
let rm_opts = RmOpts {
path: PathBuf::from("test"),
recursive: true,
staged: false,
};
repositories::rm(local_repo, &rm_opts)?;
repositories::commit(local_repo, "Removing test/")?;
repositories::add(local_repo, &local_repo.path).await?;
repositories::commit(local_repo, "Adding rest of data")?;
Ok(())
}
pub async fn run_local_repo_training_data_committed_async<T, F>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> F + std::panic::UnwindSafe,
F: std::future::Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_local_repo_training_data_committed_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
populate_dir_with_training_data(&repo_dir)?;
make_many_commits(&repo).await?;
log::info!(">>>>> run_local_repo_training_data_committed_async running test");
match test(repo).await {
Ok(_) => {}
Err(err) => {
maybe_cleanup_repo(&repo_dir)?;
panic!("Error running async test. Err: {err}");
}
}
maybe_cleanup_repo(&repo_dir)?;
Ok(())
}
pub async fn run_training_data_fully_sync_remote<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_training_data_fully_sync_remote start");
let repo_dir = create_repo_dir(test_run_dir())?;
let mut local_repo = repositories::init(&repo_dir)?;
populate_dir_with_training_data(&repo_dir)?;
make_many_commits(&local_repo).await?;
let remote_repo = create_remote_repo(&local_repo).await?;
let remote_url = repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote_url)?;
repositories::push(&local_repo).await?;
log::info!(">>>>> run_training_data_fully_sync_remote running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
assert!(result);
Ok(())
}
pub async fn run_select_data_sync_remote<T, Fut>(data: &str, test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_select_data_sync_remote start");
let repo_dir = create_repo_dir(test_run_dir())?;
let mut local_repo = repositories::init(&repo_dir)?;
populate_select_training_data(&repo_dir, data)?;
repositories::add(&local_repo, local_repo.path.join(data)).await?;
repositories::commit(&local_repo, &format!("Adding {data}"))?;
let remote_repo = create_remote_repo(&local_repo).await?;
let remote_url = repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote_url)?;
repositories::push(&local_repo).await?;
log::info!(">>>>> run_select_data_sync_remote running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo_with_remote(&repo_dir, &remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn run_subset_of_data_fully_sync_remote<T, Fut>(
data: &str,
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_subset_of_data_fully_sync_remote start");
let repo_dir = create_repo_dir(test_run_dir())?;
let mut local_repo = repositories::init(&repo_dir)?;
populate_select_training_data(&repo_dir, data)?;
let remote_repo = create_remote_repo(&local_repo).await?;
let remote_url = repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote_url)?;
repositories::push(&local_repo).await?;
log::info!(">>>>> run_subset_of_data_fully_sync_remote running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo_with_remote(&repo_dir, &remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn run_no_commit_remote_repo_test<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_no_commit_remote_repo_test start");
let name = format!("repo_{}", uuid::Uuid::new_v4());
let namespace = constants::DEFAULT_NAMESPACE;
let repo_new = RepoNew::from_namespace_name_host(namespace, name, test_host(), None);
let repo = api::client::repositories::create_empty(repo_new).await?;
log::info!(">>>>> run_no_commit_remote_repo_test running test");
let result = match test(repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
api::client::repositories::delete(&repo).await?;
assert!(result);
Ok(())
}
pub async fn run_empty_remote_repo_test<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_empty_remote_repo_test start");
let empty_dir = create_empty_dir(test_run_dir())?;
let name = format!("repo_{}", uuid::Uuid::new_v4());
let path = empty_dir.join(name);
let local_repo = repositories::init(&path)?;
let remote_repo = create_remote_repo(&local_repo).await?;
println!("REMOTE REPO: {remote_repo:?}");
log::info!(">>>>> run_empty_remote_repo_test running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo_with_remote(&path, &remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn run_empty_configured_remote_repo_test<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_empty_remote_repo_test start");
let empty_dir = create_empty_dir(test_run_dir())?;
let name = format!("repo_{}", uuid::Uuid::new_v4());
let path = empty_dir.join(name);
let mut local_repo = repositories::init(&path)?;
let remote_repo = create_remote_repo(&local_repo).await?;
command::config::set_remote(
&mut local_repo,
constants::DEFAULT_REMOTE_NAME,
remote_repo.url(),
)?;
println!("REMOTE REPO: {remote_repo:?}");
log::info!(">>>>> run_empty_remote_repo_test running test");
let result = match test(local_repo, remote_repo).await {
Ok(repo) => {
api::client::repositories::delete(&repo).await?;
true
}
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&path)?;
assert!(result);
Ok(())
}
pub async fn run_readme_remote_repo_test<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_empty_remote_repo_test start");
let empty_dir = create_empty_dir(test_run_dir())?;
let name = format!("repo_{}", uuid::Uuid::new_v4());
let path = empty_dir.join(name);
let mut local_repo = repositories::init(&path)?;
util::fs::write_to_path(local_repo.path.join("README.md"), "Hello World")?;
repositories::add(&local_repo, &local_repo.path).await?;
repositories::commit(&local_repo, "Adding README")?;
let remote_repo = create_remote_repo(&local_repo).await?;
command::config::set_remote(
&mut local_repo,
constants::DEFAULT_REMOTE_NAME,
remote_repo.url(),
)?;
repositories::push(&local_repo).await?;
println!("REMOTE REPO: {remote_repo:?}");
log::info!(">>>>> run_empty_remote_repo_test running test");
let result = match test(local_repo, remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo_with_remote(&path, &remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn run_remote_created_and_readme_remote_repo_test<T, Fut>(
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_remote_created_and_readme_remote_repo_test start");
let name = format!("repo_{}", uuid::Uuid::new_v4());
let namespace = constants::DEFAULT_NAMESPACE;
let user = User {
name: "Test User".to_string(),
email: "test@oxen.ai".to_string(),
};
let files: Vec<FileNew> = vec![FileNew {
path: PathBuf::from("README.md"),
contents: FileContents::Text(format!("# {}\n", &name)),
user: user.clone(),
}];
let mut repo = RepoNew::from_files(namespace, &name, files, None);
repo.host = Some(test_host());
repo.is_public = Some(true);
repo.scheme = Some("http".to_string());
let remote_repo = api::client::repositories::create(repo).await?;
println!("REMOTE REPO: {remote_repo:?}");
log::info!(">>>>> run_empty_remote_repo_test running test");
let result = match test(remote_repo.clone()).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
api::client::repositories::delete(&remote_repo).await?;
assert!(result);
Ok(())
}
pub async fn run_remote_repo_test_all_data_pushed<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_remote_repo_test_all_data_pushed start");
let empty_dir = create_empty_dir(test_run_dir())?;
let name = format!("repo_{}", uuid::Uuid::new_v4());
let path = empty_dir.join(name);
let mut local_repo = repositories::init(&path)?;
populate_dir_with_training_data(&local_repo.path)?;
add_all_data_to_repo(&local_repo).await?;
repositories::commit(&local_repo, "Adding all data")?;
let remote = repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let repo = create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
log::info!(">>>>> run_remote_repo_test_all_data_pushed running test");
let result = match test(repo).await {
Ok(_repo) => {
true
}
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&path)?;
assert!(result);
Ok(())
}
pub async fn run_remote_repo_test_bounding_box_csv_pushed<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository, RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_remote_repo_test_bounding_box_csv_pushed start");
let empty_dir = create_empty_dir(test_run_dir())?;
let name = format!("repo_{}", uuid::Uuid::new_v4());
let path = empty_dir.join(name);
let mut local_repo = repositories::init(&path)?;
create_bounding_box_csv(&local_repo.path)?;
repositories::add(&local_repo, &local_repo.path).await?;
log::debug!("about to commit bounding box csv");
repositories::commit(&local_repo, "Adding bounding box csv")?;
log::debug!("successfully committed bounding box csv");
let remote = repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let repo = create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
log::info!(">>>>> run_remote_repo_test_bounding_box_csv_pushed running test");
let result = match test(local_repo, repo).await {
Ok(_repo) => {
true
}
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&path)?;
assert!(result);
Ok(())
}
pub async fn run_remote_repo_test_embeddings_jsonl_pushed<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(RemoteRepository) -> Fut,
Fut: Future<Output = Result<RemoteRepository, OxenError>>,
{
init_test_env();
log::info!("<<<<< run_remote_repo_test_embeddings_jsonl_pushed start");
let empty_dir = create_empty_dir(test_run_dir())?;
let name = format!("repo_{}", uuid::Uuid::new_v4());
let path = empty_dir.join(name);
let mut local_repo = repositories::init(&path)?;
create_embeddings_jsonl(&local_repo.path)?;
repositories::add(&local_repo, &local_repo.path).await?;
log::debug!("about to commit embeddings jsonl");
repositories::commit(&local_repo, "Adding embeddings jsonl")?;
log::debug!("successfully committed embeddings jsonl");
let remote = repo_remote_url_from(&local_repo.dirname());
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let repo = create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
log::info!(">>>>> run_remote_repo_test_embeddings_jsonl_pushed running test");
let result = match test(repo).await {
Ok(_repo) => {
true
}
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&path)?;
assert!(result);
Ok(())
}
pub async fn run_training_data_repo_test_no_commits_async<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_training_data_repo_test_no_commits_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
let version_store = repo.version_store()?;
version_store.init().await?;
populate_dir_with_training_data(&repo_dir)?;
log::info!(">>>>> run_training_data_repo_test_no_commits_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_training_data_repo_test_no_commits_async_with_version<T, Fut>(
version: MinOxenVersion,
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_training_data_repo_test_no_commits_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init::init_with_version(&repo_dir, version)?;
populate_dir_with_training_data(&repo_dir)?;
log::info!(">>>>> run_training_data_repo_test_no_commits_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_select_data_repo_test_no_commits_async<T, Fut>(
data: &str,
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_select_data_repo_test_no_commits_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
populate_select_training_data(&repo_dir, data)?;
log::info!(">>>>> run_select_data_repo_test_no_commits_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_select_data_repo_test_committed_async<T, Fut>(
data: &str,
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_select_data_repo_test_committed_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
populate_select_training_data(&repo_dir, data)?;
repositories::add(&repo, &repo.path).await?;
log::debug!("about to commit whole repo");
repositories::commit(&repo, "Adding all data")?;
log::debug!("committed whole repo");
log::info!(">>>>> run_select_data_repo_test_committed_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_empty_data_repo_test_no_commits_async<T, Fut>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_empty_data_repo_test_no_commits_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
log::info!(">>>>> run_empty_data_repo_test_no_commits_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub fn run_training_data_repo_test_no_commits<T>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_training_data_repo_test_no_commits start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
populate_dir_with_training_data(&repo_dir)?;
log::info!(">>>>> run_training_data_repo_test_no_commits running test");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| match test(repo) {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err}");
}
}));
maybe_cleanup_repo(&repo_dir)?;
match result {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err:?}");
}
}
Ok(())
}
pub async fn run_select_data_repo_test_no_commits<T>(data: &str, test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_select_data_repo_test_no_commits start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
populate_select_training_data(&repo_dir, data)?;
log::info!(">>>>> run_select_data_repo_test_no_commits running test");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| match test(repo) {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err}");
}
}));
maybe_cleanup_repo(&repo_dir)?;
assert!(result.is_ok());
Ok(())
}
pub async fn run_training_data_repo_test_fully_committed_async<T, Fut>(
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_training_data_repo_test_fully_committed_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
populate_dir_with_training_data(&repo_dir)?;
repositories::add(&repo, &repo.path).await?;
log::debug!("about to commit this repo");
repositories::commit(&repo, "adding all data baby")?;
log::debug!("successfully committed the repo");
log::info!(">>>>> run_training_data_repo_test_fully_committed_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_training_data_repo_test_fully_committed_async_min_version<T, Fut>(
version: MinOxenVersion,
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_training_data_repo_test_fully_committed_async_min_version start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init::init_with_version(&repo_dir, version)?;
populate_dir_with_training_data(&repo_dir)?;
repositories::add(&repo, &repo.path).await?;
log::debug!("about to commit this repo");
repositories::commit(&repo, "adding all data baby")?;
log::debug!("successfully committed the repo");
log::info!(">>>>> run_training_data_repo_test_fully_committed_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
fn create_bounding_box_csv(repo_path: &Path) -> Result<(), OxenError> {
let dir = repo_path.join("annotations").join("train");
util::fs::create_dir_all(&dir)?;
write_txt_file_to_path(
dir.join("bounding_box.csv"),
r"file,label,min_x,min_y,width,height
train/dog_1.jpg,dog,101.5,32.0,385,330
train/dog_1.jpg,dog,102.5,31.0,386,330
train/dog_2.jpg,dog,7.0,29.5,246,247
train/dog_3.jpg,dog,19.0,63.5,376,421
train/cat_1.jpg,cat,57.0,35.5,304,427
train/cat_2.jpg,cat,30.5,44.0,333,396
",
)?;
Ok(())
}
fn create_embeddings_jsonl(repo_path: &Path) -> Result<(), OxenError> {
let dir = repo_path.join("annotations").join("train");
util::fs::create_dir_all(&dir)?;
let mut embeddings = Vec::new();
for i in 0..10000 {
embeddings.push(format!(r#"{{"prompt": "What is great way to version {i} images?", "response": "Checkout Oxen.ai", "embedding": [{i}.0, {i}.1, {i}.2]}}"#));
}
write_txt_file_to_path(dir.join("embeddings.jsonl"), embeddings.join("\n"))?;
Ok(())
}
pub async fn run_bounding_box_csv_repo_test_fully_committed_async<T, Fut>(
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Fut,
Fut: Future<Output = Result<(), OxenError>>,
{
init_test_env();
log::info!("<<<<< run_bounding_box_csv_repo_test_fully_committed_async start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
create_bounding_box_csv(&repo.path)?;
repositories::add(&repo, &repo.path).await?;
repositories::commit(&repo, "adding all data baby")?;
log::info!(">>>>> run_bounding_box_csv_repo_test_fully_committed_async running test");
let result = match test(repo).await {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
assert!(result);
Ok(())
}
pub async fn run_bounding_box_csv_repo_test_fully_committed<T>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_bounding_box_csv_repo_test_fully_committed start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
create_bounding_box_csv(&repo.path)?;
repositories::add(&repo, &repo.path).await?;
repositories::commit(&repo, "adding all data baby")?;
log::info!(">>>>> run_bounding_box_csv_repo_test_fully_committed running test");
let result = match test(repo) {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_compare_data_repo_test_fully_committed<T>(test: T) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_compare_data_repo_test_fully_committed start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init(&repo_dir)?;
write_txt_file_to_path(
repo.path.join("compare_left.csv"),
r"height,weight,gender,target,other_target
57,150,M,1,yes
57,160,M,0,yes
58,160,M,1,no
59,170,F,1,no
60,170,F,0,yes
61,180,F,0,yes
62,180,F,0,no
63,190,M,1,no
64,190,M,0,yes
65,200,M,0,no
70,240,M,1,yes
71,241,F,1,no
71,242,F,1,no",
)?;
write_txt_file_to_path(
repo.path.join("compare_right.csv"),
r"height,weight,gender,target,other_target
57,150,M,1,yes
57,160,M,0,yes
58,160,M,1,no
59,170,F,1,no
60,170,F,0,yes
61,180,F,0,yes
62,180,F,1,no
63,190,M,0,no
64,190,M,1,yes
65,200,M,0,yes
70,240,M,0,no
71,241,M,1,no",
)?;
repositories::add(&repo, &repo.path).await?;
repositories::commit(&repo, "adding both csvs for compare")?;
log::info!(">>>>> run_compare_data_repo_test_fully_committed running test");
let result = match test(repo) {
Ok(_) => true,
Err(err) => {
eprintln!("Error running test. Err: {err}");
false
}
};
maybe_cleanup_repo(&repo_dir)?;
assert!(result);
Ok(())
}
pub async fn run_training_data_repo_test_fully_committed_w_version<T>(
version: MinOxenVersion,
test: T,
) -> Result<(), OxenError>
where
T: FnOnce(LocalRepository) -> Result<(), OxenError> + std::panic::UnwindSafe,
{
init_test_env();
log::info!("<<<<< run_training_data_repo_test_fully_committed start");
let repo_dir = create_repo_dir(test_run_dir())?;
let repo = repositories::init::init_with_version(&repo_dir, version)?;
populate_dir_with_training_data(&repo_dir)?;
repositories::add(&repo, &repo.path).await?;
let status = repositories::status(&repo)?;
println!("setup status: {status:?}");
status.print();
repositories::commit(&repo, "adding all data baby")?;
log::info!(">>>>> run_training_data_repo_test_fully_committed running test");
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| match test(repo) {
Ok(_) => {}
Err(err) => {
panic!("Error running test. Err: {err}");
}
}));
maybe_cleanup_repo(&repo_dir)?;
assert!(result.is_ok());
Ok(())
}
async fn add_all_data_to_repo(repo: &LocalRepository) -> Result<(), OxenError> {
repositories::add(repo, repo.path.join("train")).await?;
repositories::add(repo, repo.path.join("test")).await?;
repositories::add(repo, repo.path.join("annotations")).await?;
repositories::add(repo, repo.path.join("large_files")).await?;
repositories::add(repo, repo.path.join("nlp")).await?;
repositories::add(repo, repo.path.join("labels.txt")).await?;
repositories::add(repo, repo.path.join("README.md")).await?;
Ok(())
}
fn should_cleanup() -> bool {
!std::env::var("NO_CLEANUP")
.map(|v| v == "true" || v == "1")
.unwrap_or(false)
}
pub fn maybe_cleanup_repo(repo_dir: &Path) -> Result<(), OxenError> {
merkle_tree_node_cache::remove_from_cache(repo_dir)?;
core::staged::remove_from_cache_with_children(repo_dir)?;
core::refs::ref_manager::remove_from_cache_with_children(repo_dir)?;
core::db::data_frames::df_db::remove_df_db_from_cache_with_children(repo_dir)?;
core::db::dir_hashes::dir_hashes_db::remove_from_cache_with_children(repo_dir)?;
if should_cleanup() {
log::debug!("maybe_cleanup_repo: cleaning up repo: {repo_dir:?}");
util::fs::remove_dir_all(repo_dir)?;
} else {
log::debug!("maybe_cleanup_repo: *NOT* cleaning up repo: {repo_dir:?}");
}
Ok(())
}
pub async fn maybe_cleanup_repo_with_remote(
repo_dir: &Path,
remote_repo: &RemoteRepository,
) -> Result<(), OxenError> {
maybe_cleanup_repo(repo_dir)?;
if should_cleanup() {
log::debug!(
"maybe_cleanup_repo_with_remote: cleaning up remote repo: {:?}",
remote_repo.name
);
api::client::repositories::delete(remote_repo).await?;
} else {
log::debug!(
"maybe_cleanup_repo_with_remote: *NOT* cleaning up remote repo: {:?}",
remote_repo.name
);
}
Ok(())
}
pub fn user_cfg_file() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("config")
.join("user_config.toml")
}
pub fn auth_cfg_file() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("config")
.join("auth_config.toml")
}
pub fn repo_cfg_file() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("config")
.join("repo_config.toml")
}
pub fn test_img_file() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("images")
.join("dwight_vince.jpeg")
}
pub fn test_invalid_parquet_file() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("data")
.join("invalid.parquet")
}
pub fn test_binary_column_parquet_file() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("parquet")
.join("binary_col.parquet")
}
pub fn test_csv_file_with_name(name: &str) -> PathBuf {
TEST_DATA_DIR.join("test").join("csvs").join(name)
}
pub fn test_img_file_with_name(name: &str) -> PathBuf {
TEST_DATA_DIR.join("test").join("images").join(name)
}
pub fn test_text_file_with_name(name: &str) -> PathBuf {
TEST_DATA_DIR.join("test").join("text").join(name)
}
pub fn test_video_file_with_name(name: &str) -> PathBuf {
TEST_DATA_DIR.join("test").join("video").join(name)
}
pub fn test_audio_file_with_name(name: &str) -> PathBuf {
TEST_DATA_DIR.join("test").join("audio").join(name)
}
pub fn test_200k_csv() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("text")
.join("celeb_a_200k.csv")
}
pub fn test_100_parquet() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("parquet")
.join("sft 100.parquet")
}
pub fn test_1k_parquet() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("parquet")
.join("wiki_1k.parquet")
}
pub fn test_30k_parquet() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("parquet")
.join("wiki_30k.parquet")
}
pub fn test_emojis() -> PathBuf {
TEST_DATA_DIR.join("test").join("csvs").join("emojis.csv")
}
pub fn test_tabs_csv() -> PathBuf {
TEST_DATA_DIR.join("test").join("csvs").join("tabs.csv")
}
pub fn test_spam_ham() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("csvs")
.join("spam_ham_data_w_quote.tsv")
}
pub fn test_text_json() -> PathBuf {
TEST_DATA_DIR.join("test").join("text").join("test.json")
}
pub fn test_text_jsonl() -> PathBuf {
TEST_DATA_DIR.join("test").join("text").join("test.jsonl")
}
pub fn test_csv_empty_rows_carriage_return() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("csvs")
.join("empty_rows_carriage_return.csv")
}
pub fn test_hotdog_1() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("images")
.join("hotdog_1.jpg")
}
pub fn test_hotdog_2() -> PathBuf {
TEST_DATA_DIR
.join("test")
.join("images")
.join("hotdog_2.jpg")
}
pub fn test_nlp_classification_csv() -> PathBuf {
Path::new("nlp")
.join("classification")
.join("annotations")
.join("test.tsv")
}
pub fn test_bounding_box_csv() -> PathBuf {
Path::new("annotations")
.join("train")
.join("bounding_box.csv")
}
pub fn populate_readme(repo_dir: &Path) -> Result<(), OxenError> {
write_txt_file_to_path(
repo_dir.join("README.md"),
r"
# Welcome to the party
If you are seeing this, you are deep in the test framework, love to see it, keep testing.
Yes I am biased, dog is label 0, cat is label 1, not alphabetical. Interpret that as you will.
🐂 💨
",
)?;
Ok(())
}
pub fn populate_license(repo_dir: &Path) -> Result<(), OxenError> {
write_txt_file_to_path(
repo_dir.join("LICENSE"),
r"Oxen.ai License. The best version control system for data is Oxen.ai.",
)?;
Ok(())
}
pub fn populate_labels(repo_dir: &Path) -> Result<(), OxenError> {
write_txt_file_to_path(
repo_dir.join("labels.txt"),
r"
dog
cat
",
)?;
Ok(())
}
pub fn populate_prompts(repo_dir: &Path) -> Result<(), OxenError> {
write_txt_file_to_path(
repo_dir.join("prompts.jsonl"),
"{\"prompt\": \"What is the meaning of life?\", \"label\": \"42\"}\n{\"prompt\": \"What is the best data version control system?\", \"label\": \"Oxen.ai\"}\n",
)?;
Ok(())
}
pub fn populate_large_files(repo_dir: &Path) -> Result<(), OxenError> {
let large_dir = repo_dir.join("large_files");
util::fs::create_dir_all(&large_dir)?;
let large_file_1 = large_dir.join("test.csv");
let from_file = test_200k_csv();
util::fs::copy(from_file, large_file_1)?;
Ok(())
}
pub fn populate_train_dir(repo_dir: &Path) -> Result<(), OxenError> {
let train_dir = repo_dir.join("train");
util::fs::create_dir_all(&train_dir)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("dog_1.jpg"),
train_dir.join("dog_1.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("dog_2.jpg"),
train_dir.join("dog_2.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("dog_3.jpg"),
train_dir.join("dog_3.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("dog_3.jpg"),
train_dir.join("dog_4.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("cat_1.jpg"),
train_dir.join("cat_1.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("cat_2.jpg"),
train_dir.join("cat_2.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("cat_2.jpg"),
train_dir.join("cat_3.jpg"),
)?;
Ok(())
}
pub fn populate_test_dir(repo_dir: &Path) -> Result<(), OxenError> {
let test_dir = repo_dir.join("test");
util::fs::create_dir_all(&test_dir)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("dog_4.jpg"),
test_dir.join("1.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("cat_3.jpg"),
test_dir.join("2.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("dog_4.jpg"),
test_dir.join("3.jpg"),
)?;
util::fs::copy(
TEST_DATA_DIR.join("test").join("images").join("cat_3.jpg"),
test_dir.join("4.jpg"),
)?;
Ok(())
}
pub fn populate_annotations_dir(repo_dir: &Path) -> Result<(), OxenError> {
let annotations_dir = repo_dir.join("annotations");
util::fs::create_dir_all(&annotations_dir)?;
let annotations_readme_file = annotations_dir.join("README.md");
write_txt_file_to_path(
annotations_readme_file,
r"
# Annotations
Some info about our annotations structure....
",
)?;
let train_annotations_dir = annotations_dir.join("train");
util::fs::create_dir_all(&train_annotations_dir)?;
write_txt_file_to_path(
train_annotations_dir.join("annotations.txt"),
r"
train/dog_1.jpg 0
train/dog_2.jpg 0
train/dog_3.jpg 0
train/cat_1.jpg 1
train/cat_2.jpg 1
",
)?;
create_bounding_box_csv(repo_dir)?;
write_txt_file_to_path(
train_annotations_dir.join("one_shot.csv"),
r"file,label,min_x,min_y,width,height
train/dog_1.jpg,dog,101.5,32.0,385,330
",
)?;
write_txt_file_to_path(
train_annotations_dir.join("two_shot.csv"),
r"file,label,min_x,min_y,width,height
train/dog_3.jpg,dog,19.0,63.5,376,421
train/cat_1.jpg,cat,57.0,35.5,304,427
",
)?;
let test_annotations_dir = annotations_dir.join("test");
util::fs::create_dir_all(&test_annotations_dir)?;
write_txt_file_to_path(
test_annotations_dir.join("annotations.csv"),
r"file,label,min_x,min_y,width,height
test/dog_3.jpg,dog,19.0,63.5,376,421
test/cat_1.jpg,cat,57.0,35.5,304,427
test/unknown.jpg,unknown,0.0,0.0,0,0
",
)?;
Ok(())
}
pub fn populate_nlp_dir(repo_dir: &Path) -> Result<(), OxenError> {
let nlp_annotations_dir = repo_dir
.join("nlp")
.join("classification")
.join("annotations");
util::fs::create_dir_all(&nlp_annotations_dir)?;
let train_data = [
"text\tlabel",
"My tummy hurts\tnegative",
"I have a headache\tnegative",
"My tummy hurts\tnegative",
"I have a headache\tnegative",
"loving the sunshine\tpositive",
"And another unique one\tpositive",
"My tummy hurts\tnegative",
"loving the sunshine\tpositive",
"I am a lonely example\tnegative",
"I am adding more examples\tpositive",
"One more time\tpositive",
]
.join("\n");
write_txt_file_to_path(nlp_annotations_dir.join("train.tsv"), &train_data)?;
let test_data = [
"text\tlabel",
"My tummy hurts\tnegative",
"My tummy hurts\tnegative",
"My tummy hurts\tnegative",
"I have a headache\tnegative",
"I have a headache\tnegative",
"loving the sunshine\tpositive",
"loving the sunshine\tpositive",
"I am a lonely example\tnegative",
"I am a great testing example\tpositive",
]
.join("\n");
write_txt_file_to_path(nlp_annotations_dir.join("test.tsv"), &test_data)?;
Ok(())
}
pub fn populate_dir_with_training_data(repo_dir: &Path) -> Result<(), OxenError> {
populate_readme(repo_dir)?;
populate_license(repo_dir)?;
populate_labels(repo_dir)?;
populate_prompts(repo_dir)?;
populate_large_files(repo_dir)?;
populate_train_dir(repo_dir)?;
populate_test_dir(repo_dir)?;
populate_annotations_dir(repo_dir)?;
populate_nlp_dir(repo_dir)?;
Ok(())
}
pub fn populate_select_training_data(repo_dir: &Path, data: &str) -> Result<(), OxenError> {
if data.contains("README") {
populate_readme(repo_dir)?;
}
if data.contains("labels") {
populate_labels(repo_dir)?;
}
if data.contains("large_files") {
populate_large_files(repo_dir)?;
}
if data.contains("train") {
populate_train_dir(repo_dir)?;
}
if data.contains("test") {
populate_test_dir(repo_dir)?;
}
if data.contains("annotations") {
populate_annotations_dir(repo_dir)?;
}
if data.contains("nlp") {
populate_nlp_dir(repo_dir)?;
}
Ok(())
}
pub fn add_file_to_dir(dir: &Path, contents: &str, extension: &str) -> Result<PathBuf, OxenError> {
let file_path = PathBuf::from(format!("{}.{extension}", uuid::Uuid::new_v4()));
let full_path = dir.join(file_path);
let mut file = File::create(&full_path)?;
file.write_all(contents.as_bytes())?;
Ok(full_path)
}
pub fn add_txt_file_to_dir(dir: &Path, contents: &str) -> Result<PathBuf, OxenError> {
add_file_to_dir(dir, contents, "txt")
}
pub fn add_csv_file_to_dir(dir: &Path, contents: &str) -> Result<PathBuf, OxenError> {
add_file_to_dir(dir, contents, "csv")
}
pub fn write_txt_file_to_path(
path: impl AsRef<Path>,
contents: impl AsRef<str>,
) -> Result<PathBuf, OxenError> {
let path = path.as_ref();
let contents = contents.as_ref();
let mut file = File::create(path)?;
file.write_all(contents.as_bytes())?;
Ok(path.to_path_buf())
}
pub fn append_line_txt_file<P: AsRef<Path>>(path: P, line: &str) -> Result<PathBuf, OxenError> {
let path = path.as_ref();
let mut file = OpenOptions::new().append(true).open(path)?;
if let Err(e) = writeln!(file, "{line}") {
return Err(OxenError::basic_str(format!("Couldn't write to file: {e}")));
}
Ok(path.to_path_buf())
}
pub fn modify_txt_file<P: AsRef<Path>>(path: P, contents: &str) -> Result<PathBuf, OxenError> {
let path = path.as_ref();
if path.exists() {
util::fs::remove_file(path)?;
}
let path = write_txt_file_to_path(path, contents)?;
Ok(path)
}
pub fn schema_bounding_box() -> Schema {
let fields = vec![
Field::new("file", "str"),
Field::new("min_x", "f32"),
Field::new("min_y", "f32"),
Field::new("width", "f32"),
Field::new("height", "f32"),
];
Schema::new(fields)
}
pub fn add_random_bbox_to_file<P: AsRef<Path>>(path: P) -> Result<PathBuf, OxenError> {
let mut rng = rand::thread_rng();
let file_name = format!("random_img_{}.jpg", rng.gen_range(0..10));
let x: f64 = rng.gen_range(0.0..1000.0);
let y: f64 = rng.gen_range(0.0..1000.0);
let w: i64 = rng.gen_range(0..1000);
let h: i64 = rng.gen_range(0..1000);
let line = format!("{file_name},{x:2},{y:2},{w},{h}");
append_line_txt_file(path, &line)
}
pub fn add_img_file_to_dir(dir: &Path, file_path: &Path) -> Result<PathBuf, OxenError> {
if let Some(ext) = file_path.extension() {
let new_path = PathBuf::from(format!(
"{}.{}",
uuid::Uuid::new_v4(),
ext.to_str().unwrap()
));
let full_new_path = dir.join(new_path);
util::fs::copy(file_path, &full_new_path)?;
Ok(full_new_path)
} else {
let err = format!("Unknown extension file: {file_path:?}");
Err(OxenError::basic_str(err))
}
}
#[cfg(test)]
mod tests {
use std::path::Path;
use crate::error::OxenError;
use crate::repositories;
use super::{run_training_data_repo_test_fully_committed_async, write_txt_file_to_path};
#[tokio::test]
async fn test_oxen_ignore_file() -> Result<(), OxenError> {
run_training_data_repo_test_fully_committed_async(|repo| async move {
let ignore_filename = "ignoreme.txt";
let ignore_path = repo.path.join(ignore_filename);
write_txt_file_to_path(ignore_path, "I should be ignored")?;
let oxenignore_file = repo.path.join(".oxenignore");
write_txt_file_to_path(oxenignore_file, ignore_filename)?;
let status = repositories::status(&repo)?;
assert_eq!(status.untracked_files.len(), 1);
assert_eq!(
status.untracked_files.first().unwrap(),
Path::new(".oxenignore")
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_oxen_ignore_dir() -> Result<(), OxenError> {
run_training_data_repo_test_fully_committed_async(|repo| async move {
let ignore_dir = "ignoreme/";
let ignore_path = repo.path.join(ignore_dir);
std::fs::create_dir(&ignore_path)?;
write_txt_file_to_path(ignore_path.join("0.txt"), "I should be ignored")?;
write_txt_file_to_path(ignore_path.join("1.txt"), "I should also be ignored")?;
let oxenignore_file = repo.path.join(".oxenignore");
write_txt_file_to_path(oxenignore_file, "ignoreme/")?;
let status = repositories::status(&repo)?;
assert_eq!(status.untracked_files.len(), 1);
assert_eq!(
status.untracked_files.first().unwrap(),
Path::new(".oxenignore")
);
Ok(())
})
.await
}
}