use crate::core;
use crate::core::versions::MinOxenVersion;
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::opts::fetch_opts::FetchOpts;
#[tracing::instrument(skip(repo), fields(repo_path = %repo.path.display()))]
pub async fn pull(repo: &LocalRepository) -> Result<(), OxenError> {
#[cfg(feature = "metrics")]
metrics::counter!("oxen_pull_total").increment(1);
#[cfg(feature = "metrics")]
let timer = std::time::Instant::now();
let result = match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::pull::pull(repo).await,
};
#[cfg(feature = "metrics")]
{
let end = timer.elapsed();
metrics::histogram!("oxen_pull_duration_ms").record(end.as_millis() as f64);
}
result
}
#[tracing::instrument(skip(repo), fields(repo_path = %repo.path.display()))]
pub async fn pull_all(repo: &LocalRepository) -> Result<(), OxenError> {
#[cfg(feature = "metrics")]
metrics::counter!("oxen_pull_total").increment(1);
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::pull::pull_all(repo).await,
}
}
#[tracing::instrument(skip(repo, fetch_opts), fields(repo_path = %repo.path.display()))]
pub async fn pull_remote_branch(
repo: &LocalRepository,
fetch_opts: &FetchOpts,
) -> Result<(), OxenError> {
match repo.min_version() {
MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"),
_ => core::v_latest::pull::pull_remote_branch(repo, fetch_opts).await,
}
}
#[cfg(test)]
mod tests {
use crate::api;
use crate::command;
use crate::constants;
use crate::constants::OXEN_HIDDEN_DIR;
use crate::core::df::tabular;
use crate::error::OxenError;
use crate::opts::CloneOpts;
use crate::opts::DFOpts;
use crate::opts::FetchOpts;
use crate::opts::PushOpts;
use crate::opts::RmOpts;
use crate::repositories;
use crate::repositories::LocalRepository;
use crate::test;
use crate::util;
use std::path::Path;
use std::path::PathBuf;
#[tokio::test]
async fn test_command_push_clone_pull_push() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|mut repo| async move {
let train_dirname = "train";
let train_dir = repo.path.join(train_dirname);
let og_num_files = util::fs::rcount_files_in_dir(&train_dir);
repositories::add(&repo, &train_dir).await?;
repositories::commit(&repo, "Adding training data")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
let party_ppl_filename = "party_ppl.txt";
let party_ppl_contents = String::from("Wassup Party Ppl");
let party_ppl_file_path = repo.path.join(party_ppl_filename);
util::fs::write_to_path(&party_ppl_file_path, &party_ppl_contents)?;
repositories::add(&repo, &party_ppl_file_path).await?;
let latest_commit = repositories::commit(&repo, "Adding party_ppl.txt")?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("new_repo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let oxen_dir = cloned_repo.path.join(OXEN_HIDDEN_DIR);
assert!(oxen_dir.exists());
repositories::pull(&cloned_repo).await?;
let cloned_train_dir = cloned_repo.path.join(train_dirname);
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_train_dir);
assert_eq!(og_num_files, cloned_num_files);
let cloned_party_ppl_path = cloned_repo.path.join(party_ppl_filename);
assert!(cloned_party_ppl_path.exists());
let cloned_contents = util::fs::read_from_path(&cloned_party_ppl_path)?;
assert_eq!(cloned_contents, party_ppl_contents);
let head = repositories::commits::head_commit(&cloned_repo)?;
assert_eq!(head.id, latest_commit.id);
let repo_commits = repositories::commits::list(&repo)?;
let cloned_commits = repositories::commits::list(&cloned_repo)?;
assert_eq!(repo_commits.len(), cloned_commits.len());
let status = repositories::status(&cloned_repo).await?;
assert!(status.is_clean());
let send_it_back_filename = "send_it_back.txt";
let send_it_back_contents = String::from("Hello from the other side");
let send_it_back_file_path = cloned_repo.path.join(send_it_back_filename);
util::fs::write_to_path(&send_it_back_file_path, &send_it_back_contents)?;
repositories::add(&cloned_repo, &send_it_back_file_path).await?;
repositories::commit(&cloned_repo, "Adding send_it_back.txt")?;
repositories::push(&cloned_repo).await?;
repositories::pull(&repo).await?;
let old_repo_status = repositories::status(&repo).await?;
old_repo_status.print();
assert!(!old_repo_status.has_modified_entries());
let pulled_send_it_back_path = repo.path.join(send_it_back_filename);
assert!(pulled_send_it_back_path.exists());
let pulled_contents = util::fs::read_from_path(&pulled_send_it_back_path)?;
assert_eq!(pulled_contents, send_it_back_contents);
let party_ppl_contents = String::from("Late to the party");
util::fs::write_to_path(&party_ppl_file_path, &party_ppl_contents)?;
repositories::add(&repo, &party_ppl_file_path).await?;
repositories::commit(&repo, "Modified party ppl contents")?;
repositories::push(&repo).await?;
repositories::pull(&cloned_repo).await?;
let pulled_contents = util::fs::read_from_path(&cloned_party_ppl_path)?;
assert_eq!(pulled_contents, party_ppl_contents);
println!("----BEFORE-----");
util::fs::remove_file(&send_it_back_file_path)?;
repositories::add(&cloned_repo, &send_it_back_file_path).await?;
repositories::commit(&cloned_repo, "Removing the send it back file")?;
repositories::push(&cloned_repo).await?;
println!("----AFTER-----");
repositories::pull(&repo).await?;
let pulled_send_it_back_path = repo.path.join(send_it_back_filename);
assert!(!pulled_send_it_back_path.exists());
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_command_add_modify_remove_push_pull() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut repo| async move {
let filename = "labels.txt";
let filepath = repo.path.join(filename);
test::write_txt_file_to_path(&filepath, "I am the labels")?;
repositories::add(&repo, &filepath).await?;
repositories::commit(&repo, "Adding labels file")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("new_repo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let cloned_filepath = cloned_repo.path.join(filename);
let changed_content = "messing up the labels";
util::fs::write_to_path(&cloned_filepath, changed_content)?;
repositories::add(&cloned_repo, &cloned_filepath).await?;
repositories::commit(&cloned_repo, "I messed with the label file")?;
repositories::push(&cloned_repo).await?;
repositories::pull(&repo).await?;
let pulled_content = util::fs::read_from_path(&filepath)?;
assert_eq!(pulled_content, changed_content);
util::fs::remove_file(&filepath)?;
repositories::add(&repo, &filepath).await?;
repositories::commit(&repo, "You mess with it, I remove it")?;
repositories::push(&repo).await?;
repositories::pull(&cloned_repo).await?;
assert!(!cloned_filepath.exists());
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_push_pull_push_pull_on_branch() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|mut repo| async move {
let train_path = repo.path.join("train");
repositories::add(&repo, &train_path).await?;
repositories::commit(&repo, "Adding train dir")?;
let larger_dir = repo.path.join("large_files");
repositories::add(&repo, &larger_dir).await?;
repositories::commit(&repo, "Adding larger files")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
let og_num_files = util::fs::rcount_files_in_dir(&repo.path);
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("new_repo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_repo.path);
assert_eq!(8, cloned_num_files);
let og_commits = repositories::commits::list(&repo)?;
let cloned_commits = repositories::commits::list(&cloned_repo)?;
assert_eq!(og_commits.len(), cloned_commits.len());
let branch_name = "adding-training-data";
repositories::branches::create_checkout(&cloned_repo, branch_name)?;
let hotdog_path = test::test_hotdog_1();
let new_file_path = cloned_repo.path.join("train").join("hotdog_1.jpg");
util::fs::copy(hotdog_path, &new_file_path)?;
repositories::add(&cloned_repo, &new_file_path).await?;
repositories::commit(&cloned_repo, "Adding one file to train dir")?;
let opts = PushOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: branch_name.to_string(),
..Default::default()
};
repositories::push::push_remote_branch(&cloned_repo, &opts).await?;
let fetch_opts = &FetchOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: branch_name.to_string(),
all: true,
..FetchOpts::new()
};
repositories::fetch::fetch_branch(&repo, fetch_opts).await?;
repositories::checkout::checkout(&repo, branch_name).await?;
let num_new_files = util::fs::rcount_files_in_dir(&repo.path);
assert_eq!(og_num_files + 1, num_new_files);
let hotdog_path = test::test_hotdog_2();
let new_file_path = train_path.join("hotdog_2.jpg");
util::fs::copy(hotdog_path, &new_file_path)?;
repositories::add(&repo, &train_path).await?;
repositories::commit(&repo, "Adding next file to train dir")?;
let opts = PushOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: branch_name.to_string(),
..Default::default()
};
repositories::push::push_remote_branch(&repo, &opts).await?;
repositories::pull_remote_branch(
&cloned_repo,
&FetchOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: branch_name.to_string(),
all: false,
..FetchOpts::new()
},
)
.await?;
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_repo.path);
assert_eq!(10, cloned_num_files);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_push_pull_push_pull_on_other_branch() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut repo| async move {
let train_dir = repo.path.join("train");
let train_paths = [
Path::new("data/test/images/cat_1.jpg"),
Path::new("data/test/images/cat_2.jpg"),
Path::new("data/test/images/cat_3.jpg"),
Path::new("data/test/images/dog_1.jpg"),
Path::new("data/test/images/dog_2.jpg"),
];
util::fs::create_dir_all(&train_dir)?;
for path in train_paths.iter() {
util::fs::copy(
test::REPO_ROOT.join(path),
train_dir.join(path.file_name().unwrap()),
)?;
}
repositories::add(&repo, &train_dir).await?;
repositories::commit(&repo, "Adding train dir")?;
let og_branch = repositories::branches::current_branch(&repo)?.unwrap();
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("new_repo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_repo.path);
assert_eq!(train_paths.len(), cloned_num_files);
let branch_name = "adding-training-data";
repositories::branches::create_checkout(&cloned_repo, branch_name)?;
let hotdog_path = test::test_hotdog_1();
let new_file_path = cloned_repo.path.join("train").join("hotdog_1.jpg");
util::fs::copy(hotdog_path, &new_file_path)?;
repositories::add(&cloned_repo, &new_file_path).await?;
repositories::commit(&cloned_repo, "Adding one file to train dir")?;
let opts = PushOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: branch_name.to_string(),
..Default::default()
};
repositories::push::push_remote_branch(&cloned_repo, &opts).await?;
repositories::pull_remote_branch(
&repo,
&FetchOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: og_branch.name.to_string(),
all: true,
..FetchOpts::new()
},
)
.await?;
let og_num_files = util::fs::rcount_files_in_dir(&repo.path);
assert_eq!(train_paths.len(), og_num_files);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_push_pull_file_without_extension() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|mut repo| async move {
let filename = "LICENSE";
let filepath = repo.path.join(filename);
let og_content = "I am the License.";
test::write_txt_file_to_path(&filepath, og_content)?;
repositories::add(&repo, filepath).await?;
let commit = repositories::commit(&repo, "Adding file without extension");
assert!(commit.is_ok());
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("new_repo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let filepath = cloned_repo.path.join(filename);
let content = util::fs::read_from_path(&filepath)?;
assert_eq!(og_content, content);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_push_pull_separate_branch_less_files() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut repo| async move {
for i in 1..6 {
let filename = format!("{i}.txt");
let filepath = repo.path.join(&filename);
test::write_txt_file_to_path(&filepath, &filename)?;
}
let filepath = repo.path.join("1.txt");
repositories::add(&repo, &filepath).await?;
let filepath = repo.path.join("2.txt");
repositories::add(&repo, &filepath).await?;
repositories::commit(&repo, "Adding initial data")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
let branch_name = "feature/add-mooooore-data";
repositories::branches::create_checkout(&repo, branch_name)?;
for i in 3..6 {
let filename = format!("{i}.txt");
let filepath = repo.path.join(&filename);
repositories::add(&repo, &filepath).await?;
}
repositories::commit(&repo, "Adding mooooore data")?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let opts = CloneOpts::from_branch(
remote_repo.url(),
new_repo_dir.join("new_repo"),
branch_name,
);
let cloned_repo = repositories::clone(&opts).await?;
let cloned_files = util::fs::rlist_files_in_dir(&cloned_repo.path);
for file in cloned_files.iter() {
println!("Cloned file: {}", file.display());
}
let cloned_num_files = cloned_files.len();
assert_eq!(cloned_num_files, 5);
repositories::fetch_all(&cloned_repo, &FetchOpts::new()).await?;
repositories::checkout(&cloned_repo, "main").await?;
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_repo.path);
assert_eq!(cloned_num_files, 2);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_push_pull_separate_branch_more_files() -> Result<(), OxenError> {
test::run_empty_local_repo_test_async(|mut repo| async move {
for i in 1..6 {
let filename = format!("{i}.txt");
let filepath = repo.path.join(&filename);
test::write_txt_file_to_path(&filepath, &filename)?;
}
let filepath = repo.path.join("1.txt");
repositories::add(&repo, &filepath).await?;
let filepath = repo.path.join("2.txt");
repositories::add(&repo, &filepath).await?;
repositories::commit(&repo, "Adding initial data")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
let branch_name = "feature/add-mooooore-data";
repositories::branches::create_checkout(&repo, branch_name)?;
for i in 3..6 {
let filename = format!("{i}.txt");
let filepath = repo.path.join(&filename);
repositories::add(&repo, &filepath).await?;
}
repositories::commit(&repo, "Adding mooooore data")?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let opts = CloneOpts::new(remote_repo.url(), new_repo_dir.join("new_repo"));
let cloned_repo = repositories::clone(&opts).await?;
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_repo.path);
assert_eq!(cloned_num_files, 2);
repositories::fetch_all(&cloned_repo, &FetchOpts::new()).await?;
repositories::checkout(&cloned_repo, branch_name).await?;
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_repo.path);
assert_eq!(cloned_num_files, 5);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_push_pull_moved_files() -> Result<(), OxenError> {
test::run_training_data_fully_sync_remote(|local_repo, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
let contents = "this is the file";
let path = &local_repo.path.join("a.txt");
test::write_txt_file_to_path(path, contents)?;
println!("Writing file to {}", path.display());
repositories::add(&local_repo, path).await?;
println!("adding file to index at path {}", path.display());
repositories::commit(&local_repo, "Adding file for first time")?;
let new_path = &local_repo.path.join("newfolder").join("a.txt");
util::fs::create_dir_all(local_repo.path.join("newfolder"))?;
test::write_txt_file_to_path(new_path, contents)?;
repositories::add(&local_repo, new_path).await?;
let new_path = &local_repo.path.join("newfolder").join("b.txt");
test::write_txt_file_to_path(new_path, contents)?;
repositories::add(&local_repo, new_path).await?;
let path = "a.txt";
let new_path = local_repo.path.join(path);
util::fs::remove_file(&new_path)?;
repositories::add(&local_repo, &new_path).await?;
repositories::commit(
&local_repo,
"Moved file to 2 new places and deleted original",
)?;
repositories::push(&local_repo).await?;
test::run_empty_dir_test_async(|repo_dir| async move {
let repo_dir = repo_dir.join("repoo");
let _cloned_repo =
repositories::deep_clone_url(&remote_repo.remote.url, &repo_dir).await?;
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_push_new_branch_default_clone() -> Result<(), OxenError> {
test::run_training_data_fully_sync_remote(|_local_repo, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|repo_dir| async move {
let repo_dir = repo_dir.join("repoo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &repo_dir).await?;
let branch_name = "new-branch";
repositories::branches::create_checkout(&cloned_repo, branch_name)?;
let contents = "this is the file";
let path = &cloned_repo.path.join("a.txt");
test::write_txt_file_to_path(path, contents)?;
repositories::add(&cloned_repo, path).await?;
let commit = repositories::commit(&cloned_repo, "Adding file for first time")?;
let opts = PushOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: branch_name.to_string(),
..Default::default()
};
let push_result = repositories::push::push_remote_branch(&cloned_repo, &opts).await;
log::debug!("Push result: {push_result:?}");
assert!(push_result.is_ok());
let remote_branch = api::client::branches::get_by_name(&remote_repo, branch_name)
.await?
.unwrap();
assert_eq!(remote_branch.commit_id, commit.id);
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_flags_merge_conflict_on_subtree_pull() -> Result<(), OxenError> {
test::run_training_data_fully_sync_remote(|_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|repos_base_dir| async move {
let user_a_repo_dir = repos_base_dir.join("user_a_repo");
let mut clone_opts = CloneOpts::new(&remote_repo.remote.url, &user_a_repo_dir);
clone_opts.fetch_opts.subtree_paths =
Some(vec![PathBuf::from("nlp").join("classification")]);
clone_opts.fetch_opts.depth = Some(2);
let user_a_repo = repositories::clone(&clone_opts).await?;
let user_b_repo_dir = repos_base_dir.join("user_b_repo");
let mut clone_opts = CloneOpts::new(&remote_repo.remote.url, &user_b_repo_dir);
clone_opts.fetch_opts.subtree_paths =
Some(vec![PathBuf::from("nlp").join("classification")]);
clone_opts.fetch_opts.depth = Some(2);
let user_b_repo = repositories::clone(&clone_opts).await?;
let new_file = PathBuf::from("nlp")
.join("classification")
.join("new_data.tsv");
let new_file_path = user_a_repo.path.join(&new_file);
let new_file_path = test::write_txt_file_to_path(new_file_path, "image\tlabel")?;
repositories::add(&user_a_repo, &new_file_path).await?;
repositories::commit(&user_a_repo, "User A adding new data.")?;
repositories::push(&user_a_repo).await?;
let new_file_path = user_b_repo.path.join(&new_file);
let new_file_path =
test::write_txt_file_to_path(new_file_path, "I am user B, try to stop me")?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "User B adding the same file.")?;
let result = repositories::push(&user_b_repo).await;
assert!(result.is_err());
let result = repositories::pull(&user_b_repo).await;
assert!(result.is_err());
assert_eq!(user_b_repo.depth(), Some(2));
assert_eq!(
user_b_repo.subtree_paths(),
Some(vec![PathBuf::from("nlp").join("classification")])
);
assert!(user_b_repo.path.join("nlp").join("classification").exists());
assert!(!user_b_repo.path.join("train").exists());
let status = repositories::status(&user_b_repo).await?;
assert!(!status.merge_conflicts.is_empty());
status.print();
repositories::checkout::checkout_ours(&user_b_repo, new_file).await?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "Taking my changes")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_flags_merge_conflict_on_root_subtree_pull() -> Result<(), OxenError> {
test::run_training_data_fully_sync_remote(|_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|repos_base_dir| async move {
let user_a_repo_dir = repos_base_dir.join("user_a_repo");
let mut clone_opts = CloneOpts::new(&remote_repo.remote.url, &user_a_repo_dir);
clone_opts.fetch_opts.subtree_paths = Some(vec![PathBuf::from(".")]);
clone_opts.fetch_opts.depth = Some(1);
let user_a_repo = repositories::clone(&clone_opts).await?;
let user_b_repo_dir = repos_base_dir.join("user_b_repo");
let mut clone_opts = CloneOpts::new(&remote_repo.remote.url, &user_b_repo_dir);
clone_opts.fetch_opts.subtree_paths = Some(vec![PathBuf::from(".")]);
clone_opts.fetch_opts.depth = Some(1);
let user_b_repo = repositories::clone(&clone_opts).await?;
let new_file = PathBuf::from("README.md");
let new_file_path = user_a_repo.path.join(&new_file);
let new_file_path = test::write_txt_file_to_path(new_file_path, "User A's README")?;
repositories::add(&user_a_repo, &new_file_path).await?;
repositories::commit(&user_a_repo, "User A adding new data.")?;
repositories::push(&user_a_repo).await?;
let new_file_path = user_b_repo.path.join(&new_file);
let new_file_path =
test::write_txt_file_to_path(new_file_path, "I am user B, try to stop me")?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "User B adding the same README.")?;
let result = repositories::push(&user_b_repo).await;
assert!(result.is_err());
let result = repositories::pull(&user_b_repo).await;
assert!(result.is_err());
assert_eq!(user_b_repo.depth(), Some(1));
assert_eq!(user_b_repo.subtree_paths(), Some(vec![PathBuf::from("")]),);
assert!(user_b_repo.path.join("README.md").exists());
assert!(!user_b_repo.path.join("nlp").exists());
assert!(!user_b_repo.path.join("train").exists());
let status = repositories::status(&user_b_repo).await?;
status.print();
assert!(!status.merge_conflicts.is_empty());
assert_eq!(status.merge_conflicts.len(), 1);
assert_eq!(status.merge_conflicts[0].base_entry.path, new_file);
assert_eq!(status.removed_files.len(), 0);
repositories::checkout::checkout_ours(&user_b_repo, new_file).await?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "Taking my changes")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_flags_merge_conflict_on_pull() -> Result<(), OxenError> {
test::run_training_data_fully_sync_remote(|_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("user_a_repo");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("user_b_repo");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let new_file = "new_file.txt";
let new_file_path = user_a_repo.path.join(new_file);
let new_file_path = test::write_txt_file_to_path(new_file_path, "new file")?;
repositories::add(&user_a_repo, &new_file_path).await?;
repositories::commit(&user_a_repo, "User A changing file.")?;
repositories::push(&user_a_repo).await?;
let new_file_path = user_b_repo.path.join(new_file);
let new_file_path =
test::write_txt_file_to_path(new_file_path, "I am user B, try to stop me")?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "User B changing file.")?;
let result = repositories::push(&user_b_repo).await;
assert!(result.is_err());
let result = repositories::pull(&user_b_repo).await;
assert!(result.is_err());
let status = repositories::status(&user_b_repo).await?;
assert!(!status.merge_conflicts.is_empty());
status.print();
repositories::checkout::checkout_ours(&user_b_repo, new_file).await?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "Taking my changes")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_does_not_remove_local_files() -> Result<(), OxenError> {
test::run_one_commit_sync_repo_test(|_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("user_a_repo");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("user_b_repo");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let file_1 = "file_1.txt";
test::write_txt_file_to_path(user_a_repo.path.join(file_1), "File 1")?;
let file_2 = "file_2.txt";
test::write_txt_file_to_path(user_a_repo.path.join(file_2), "File 2")?;
repositories::add(&user_a_repo, user_a_repo.path.join(file_1)).await?;
repositories::add(&user_a_repo, user_a_repo.path.join(file_2)).await?;
repositories::commit(&user_a_repo, "Adding file_1 and file_2")?;
repositories::push(&user_a_repo).await?;
let file_3 = "file_3.txt";
test::write_txt_file_to_path(user_b_repo.path.join(file_3), "File 3")?;
repositories::add(&user_b_repo, user_b_repo.path.join(file_3)).await?;
repositories::commit(&user_b_repo, "Adding file_3")?;
repositories::pull(&user_b_repo).await?;
repositories::commits::head_commit(&user_b_repo)?;
assert!(user_b_repo.path.join(file_1).exists());
assert!(user_b_repo.path.join(file_2).exists());
assert!(user_b_repo.path.join(file_3).exists());
Ok(())
})
.await?;
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_does_not_remove_untracked_files() -> Result<(), OxenError> {
test::run_one_commit_sync_repo_test(|_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("user_a_repo");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("user_b_repo");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let file_1 = "file_1.txt";
test::write_txt_file_to_path(user_a_repo.path.join(file_1), "File 1")?;
let file_2 = "file_2.txt";
test::write_txt_file_to_path(user_a_repo.path.join(file_2), "File 2")?;
repositories::add(&user_a_repo, user_a_repo.path.join(file_1)).await?;
repositories::add(&user_a_repo, user_a_repo.path.join(file_2)).await?;
repositories::commit(&user_a_repo, "Adding file_1 and file_2")?;
repositories::push(&user_a_repo).await?;
let local_file_2 = "file_2.txt";
test::write_txt_file_to_path(
user_b_repo.path.join(local_file_2),
"wrong not correct content",
)?;
let file_3 = "file_3.txt";
test::write_txt_file_to_path(user_b_repo.path.join(file_3), "File 3")?;
let dir_1 = "dir_1";
std::fs::create_dir(user_b_repo.path.join(dir_1))?;
let dir_2 = "dir_2";
std::fs::create_dir(user_b_repo.path.join(dir_2))?;
let file_4 = "file_4.txt";
test::write_txt_file_to_path(
user_b_repo.path.join(dir_2).join(file_4),
"File 4",
)?;
let file_5 = "file_5.txt";
test::write_txt_file_to_path(
user_b_repo.path.join(dir_2).join(file_5),
"File 5",
)?;
let dir_3 = "dir_3";
let subdir = "subdir";
util::fs::create_dir_all(user_b_repo.path.join(dir_3).join(subdir))?;
let subfile = "subfile.txt";
test::write_txt_file_to_path(
user_b_repo.path.join(dir_3).join(subdir).join(subfile),
"Subfile",
)?;
let result = repositories::pull(&user_b_repo).await;
assert!(result.is_err());
util::fs::remove_file(user_b_repo.path.join(local_file_2))?;
repositories::pull(&user_b_repo).await?;
assert!(user_b_repo.path.join(file_1).exists());
assert!(user_b_repo.path.join(file_2).exists());
let local_file_2_contents =
std::fs::read_to_string(user_b_repo.path.join(local_file_2))?;
assert_eq!(local_file_2_contents, "File 2");
assert!(user_b_repo.path.join(file_3).exists());
assert!(user_b_repo.path.join(dir_1).exists());
assert!(user_b_repo.path.join(dir_2).exists());
assert!(user_b_repo.path.join(dir_2).join(file_4).exists());
assert!(user_b_repo.path.join(dir_2).join(file_5).exists());
assert!(user_b_repo.path.join(dir_3).exists());
assert!(user_b_repo.path.join(dir_3).join(subdir).exists());
assert!(
user_b_repo
.path
.join(dir_3)
.join(subdir)
.join(subfile)
.exists()
);
Ok(())
})
.await?;
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_multiple_commits() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|mut repo| async move {
let filename = "labels.txt";
let file_path = repo.path.join(filename);
repositories::add(&repo, &file_path).await?;
repositories::commit(&repo, "Adding labels file")?;
let train_path = repo.path.join("train");
repositories::add(&repo, &train_path).await?;
repositories::commit(&repo, "Adding train dir")?;
let test_path = repo.path.join("test");
repositories::add(&repo, &test_path).await?;
repositories::commit(&repo, "Adding test dir")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("repoo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let cloned_num_files = util::fs::rcount_files_in_dir(&cloned_repo.path);
assert_eq!(12, cloned_num_files);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_pull_re_fetches_missing_blobs() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|mut repo| async move {
let labels = repo.path.join("labels.txt");
repositories::add(&repo, &labels).await?;
let first_commit = repositories::commit(&repo, "First commit")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
let second_path = repo.path.join("second.txt");
let second_contents = "second commit content";
util::fs::write_to_path(&second_path, second_contents)?;
repositories::add(&repo, &second_path).await?;
let second_commit = repositories::commit(&repo, "Second commit")?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("clone");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
repositories::branches::update(
&cloned_repo,
constants::DEFAULT_BRANCH_NAME,
&first_commit.id,
)?;
let head = repositories::commits::head_commit(&cloned_repo)?;
assert_eq!(head.id, first_commit.id);
let second_node = repositories::tree::get_node_by_path(
&cloned_repo,
&second_commit,
"second.txt",
)?
.expect("second.txt must be reachable from the second commit's tree");
let blob_hash = second_node.hash.to_string();
let version_store = cloned_repo.version_store();
let blob_path = version_store.get_version_path(&blob_hash).await?;
assert!(blob_path.exists(), "blob should be present after clone");
util::fs::remove_file(&*blob_path)?;
let blob_path_after_wipe = version_store.get_version_path(&blob_hash).await?;
assert!(
!blob_path_after_wipe.exists(),
"blob should be gone after wipe"
);
repositories::pull(&cloned_repo).await?;
let blob_path_after_pull = version_store.get_version_path(&blob_hash).await?;
assert!(
blob_path_after_pull.exists(),
"pull should have re-fetched the wiped blob"
);
let new_head = repositories::commits::head_commit(&cloned_repo)?;
assert_eq!(new_head.id, second_commit.id);
let cloned_second = cloned_repo.path.join("second.txt");
assert!(cloned_second.exists());
assert_eq!(util::fs::read_from_path(&cloned_second)?, second_contents);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_pull_data_frame() -> Result<(), OxenError> {
test::run_select_data_repo_test_no_commits_async("annotations", |mut repo| async move {
let filename = "annotations/train/bounding_box.csv";
let file_path = repo.path.join(filename);
let og_df = tabular::read_df(&file_path, DFOpts::empty()).await?;
let og_contents = util::fs::read_from_path(&file_path)?;
repositories::add(&repo, &file_path).await?;
repositories::commit(&repo, "Adding bounding box file")?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("repoo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let file_path = cloned_repo.path.join(filename);
let cloned_df = tabular::read_df(&file_path, DFOpts::empty()).await?;
let cloned_contents = util::fs::read_from_path(&file_path)?;
assert_eq!(og_df.height(), cloned_df.height());
assert_eq!(og_df.width(), cloned_df.width());
assert_eq!(cloned_contents, og_contents);
let status = repositories::status(&cloned_repo).await?;
status.print();
assert!(status.is_clean());
let commit = repositories::commits::head_commit(&cloned_repo)?;
let schemas = repositories::data_frames::schemas::list(&repo, &commit)?;
assert!(!schemas.is_empty());
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_pull_multiple_data_frames_multiple_schemas() -> Result<(), OxenError> {
test::run_training_data_repo_test_fully_committed_async(|mut repo| async move {
let filename = Path::new("nlp")
.join("classification")
.join("annotations")
.join("train.tsv");
let file_path = repo.path.join(filename);
let og_df = tabular::read_df(&file_path, DFOpts::empty()).await?;
let og_sentiment_contents = util::fs::read_from_path(&file_path)?;
let commit = repositories::commits::head_commit(&repo)?;
let schemas = repositories::data_frames::schemas::list(&repo, &commit)?;
let num_schemas = schemas.len();
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("repoo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &new_repo_dir).await?;
let filename = Path::new("nlp")
.join("classification")
.join("annotations")
.join("train.tsv");
let file_path = cloned_repo.path.join(&filename);
let cloned_df = tabular::read_df(&file_path, DFOpts::empty()).await?;
let cloned_contents = util::fs::read_from_path(&file_path)?;
assert_eq!(og_df.height(), cloned_df.height());
assert_eq!(og_df.width(), cloned_df.width());
assert_eq!(cloned_contents, og_sentiment_contents);
println!("Cloned {filename:?} {cloned_df}");
let status = repositories::status(&cloned_repo).await?;
status.print();
assert!(status.is_clean());
let head_commit = repositories::commits::head_commit(&cloned_repo)?;
let pulled_schemas = repositories::data_frames::schemas::list(&repo, &head_commit)?;
assert_eq!(pulled_schemas.len(), num_schemas);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_pull_full_commit_history() -> Result<(), OxenError> {
test::run_training_data_repo_test_no_commits_async(|mut repo| async move {
let filename = "labels.txt";
let filepath = repo.path.join(filename);
repositories::add(&repo, &filepath).await?;
repositories::commit(&repo, "Adding labels file")?;
let new_filename = "new.txt";
let new_filepath = repo.path.join(new_filename);
util::fs::write_to_path(&new_filepath, "hallo")?;
repositories::add(&repo, &new_filepath).await?;
repositories::commit(&repo, "Adding a new file")?;
let train_path = repo.path.join("train");
repositories::add(&repo, &train_path).await?;
repositories::commit(&repo, "Adding train dir")?;
let test_path = repo.path.join("test");
repositories::add(&repo, &test_path).await?;
repositories::commit(&repo, "Adding test dir")?;
let local_history = repositories::commits::list(&repo)?;
let remote = test::repo_remote_url_from(&repo.dirname());
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let new_repo_dir = new_repo_dir.join("repoo");
let mut clone_opts = CloneOpts::new(&remote_repo.remote.url, &new_repo_dir);
clone_opts.fetch_opts.all = true;
let cloned_repo = repositories::clone(&clone_opts).await?;
let cloned_history = repositories::commits::list(&cloned_repo)?;
assert_eq!(local_history.len(), cloned_history.len());
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_pull_full_commit_history_after_shallow_clone() -> Result<(), OxenError> {
test::run_training_data_repo_test_fully_committed_async(|mut repo| async move {
let og_commits = repositories::commits::list_all(&repo)?;
let name = repo.dirname();
let remote = test::repo_remote_url_from(&name);
command::config::set_remote(&mut repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&repo).await?;
repositories::push(&repo).await?;
test::run_empty_dir_test_async(|new_repo_dir| async move {
let mut opts =
CloneOpts::new(&remote_repo.remote.url, new_repo_dir.join("new_repo"));
opts.fetch_opts.subtree_paths = Some(vec![PathBuf::from("test")]);
opts.fetch_opts.depth = Some(1);
let cloned_repo = repositories::clone(&opts).await?;
repositories::pull_all(&cloned_repo).await?;
let pulled_commits = repositories::commits::list_all(&cloned_repo)?;
assert_eq!(pulled_commits.len(), og_commits.len());
Ok(())
})
.await
})
.await
}
#[tokio::test]
async fn test_pull_does_not_overwrite_new_file_modified_by_remote() -> Result<(), OxenError> {
test::run_select_data_sync_remote("README.md", |_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("repo_a");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("repo_b");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let new_file = "new_file.txt";
let new_file_path = user_b_repo.path.join(new_file);
test::write_txt_file_to_path(&new_file_path, "hello from user b file")?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "Adding new file")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
let new_file = "new_file.txt";
let new_file_path = user_a_repo.path.join(new_file);
test::write_txt_file_to_path(&new_file_path, "hello from user a file")?;
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_err());
let content = util::fs::read_from_path(&new_file_path)?;
assert_eq!(content, "hello from user a file");
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_err());
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_does_not_overwrite_modified_files_after_remote_modification()
-> Result<(), OxenError> {
test::run_select_data_sync_remote("README.md", |_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("repo_a");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("repo_b");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let readme_path = user_b_repo.path.join("README.md");
test::write_txt_file_to_path(
&readme_path,
"Hello from another user b README :(",
)?;
repositories::add(&user_b_repo, &readme_path).await?;
repositories::commit(&user_b_repo, "Updating the README on the remote")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
let modified_file = "README.md";
let modified_file_path = user_a_repo.path.join(modified_file);
test::write_txt_file_to_path(&modified_file_path, "# User A README")?;
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_err());
let content = util::fs::read_from_path(&modified_file_path)?;
assert_eq!(content, "# User A README");
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_does_not_overwrite_modified_files_before_remote_modification()
-> Result<(), OxenError> {
test::run_select_data_sync_remote("README.md", |_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("repo_a");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
let modified_file = "README.md";
let modified_file_path = user_a_repo.path.join(modified_file);
test::write_txt_file_to_path(&modified_file_path, "# User A README")?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("repo_b");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let readme_path = user_b_repo.path.join("README.md");
test::write_txt_file_to_path(
&readme_path,
"Hello from another user b README :(",
)?;
repositories::add(&user_b_repo, &readme_path).await?;
repositories::commit(&user_b_repo, "Updating the README on the remote")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_err());
let content = util::fs::read_from_path(&modified_file_path)?;
assert_eq!(content, "# User A README");
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_err());
let content = util::fs::read_from_path(&modified_file_path)?;
assert_eq!(content, "# User A README");
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_does_not_overwrite_modified_files_after_modifying_different_file()
-> Result<(), OxenError> {
test::run_select_data_sync_remote("README.md", |_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("repo_a");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("repo_b");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let new_file = "new_file.txt";
let new_file_path = user_b_repo.path.join(new_file);
test::write_txt_file_to_path(
&new_file_path,
"Hello from another user b new file",
)?;
repositories::add(&user_b_repo, &new_file_path).await?;
repositories::commit(&user_b_repo, "Updating the new file on the remote")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
let modified_file = "README.md";
let modified_file_path = user_a_repo.path.join(modified_file);
test::write_txt_file_to_path(&modified_file_path, "# User A README")?;
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_ok());
let content = util::fs::read_from_path(&modified_file_path)?;
assert_eq!(content, "# User A README");
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_ok());
let content = util::fs::read_from_path(&modified_file_path)?;
assert_eq!(content, "# User A README");
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_does_not_overwrite_modified_files_after_removing_file()
-> Result<(), OxenError> {
test::run_select_data_sync_remote("README.md", |_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|user_a_repo_dir| async move {
let user_a_repo_dir_copy = user_a_repo_dir.join("repo_a");
let user_a_repo =
repositories::clone_url(&remote_repo.remote.url, &user_a_repo_dir_copy).await?;
test::run_empty_dir_test_async(|user_b_repo_dir| async move {
let user_b_repo_dir_copy = user_b_repo_dir.join("repo_b");
let user_b_repo =
repositories::clone_url(&remote_repo.remote.url, &user_b_repo_dir_copy)
.await?;
let rm_opts = RmOpts {
path: PathBuf::from("README.md"),
..Default::default()
};
repositories::rm(&user_b_repo, &rm_opts).await?;
repositories::commit(&user_b_repo, "Removing the README.md on the remote")?;
repositories::push(&user_b_repo).await?;
Ok(())
})
.await?;
let modified_file = "README.md";
let modified_file_path = user_a_repo.path.join(modified_file);
test::write_txt_file_to_path(&modified_file_path, "# User A README")?;
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_err());
let content = util::fs::read_from_path(&modified_file_path)?;
assert_eq!(content, "# User A README");
let result = repositories::pull(&user_a_repo).await;
assert!(result.is_err());
let content = util::fs::read_from_path(&modified_file_path)?;
assert_eq!(content, "# User A README");
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_subtree_clone_branch_push_pull() -> Result<(), OxenError> {
test::run_training_data_fully_sync_remote(|_, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|repo_dir| async move {
let repo_dir = repo_dir.join("subtree_repo");
let mut clone_opts = CloneOpts::new(&remote_repo.remote.url, &repo_dir);
clone_opts.fetch_opts.subtree_paths = Some(vec![PathBuf::from("train")]);
let repo = repositories::clone(&clone_opts).await?;
assert!(repo.path.join("train").exists());
assert!(!repo.path.join("test").exists());
let branch_name = "branch1";
repositories::branches::create_checkout(&repo, branch_name)?;
let dir1 = repo.path.join("dir1");
util::fs::create_dir_all(&dir1)?;
let new_file = dir1.join("newfile.txt");
test::write_txt_file_to_path(&new_file, "This is a new file")?;
repositories::add(&repo, &new_file).await?;
repositories::commit(&repo, "Adding new file in dir1")?;
let opts = PushOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: branch_name.to_string(),
..Default::default()
};
repositories::push::push_remote_branch(&repo, &opts).await?;
let pull_result = repositories::pull_remote_branch(
&repo,
&FetchOpts {
remote: constants::DEFAULT_REMOTE_NAME.to_string(),
branch: "main".to_string(),
all: false,
subtree_paths: Some(vec![PathBuf::from("train")]),
depth: None,
..FetchOpts::new()
},
)
.await;
pull_result?;
assert!(repo.path.join("train").exists());
assert!(repo.path.join("dir1").join("newfile.txt").exists());
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
#[tokio::test]
async fn test_pull_and_merge_non_empty_repos() -> Result<(), OxenError> {
test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move {
let remote_repo_clone = remote_repo.clone();
test::run_empty_dir_test_async(|dir| async move {
let mut local_repo = LocalRepository::new(dir, None)?;
let hello_file = local_repo.path.join("dir").join("hello.txt");
util::fs::write_to_path(
&hello_file,
"Oxen.ai is the best data version control system.",
)?;
repositories::add(&local_repo, &hello_file).await?;
repositories::commit(&local_repo, "Hello!")?;
let url = remote_repo_clone.url();
command::config::set_remote(&mut local_repo, "origin", url)?;
repositories::pull(&local_repo).await?;
assert!(hello_file.exists());
assert!(PathBuf::from("README.md").exists());
let commits = repositories::commits::list_all(&local_repo)?;
assert_eq!(commits.len(), 3);
Ok(())
})
.await?;
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_pull_missing_files_redownloads_deleted_versions() -> Result<(), OxenError> {
test::run_one_commit_sync_repo_test(|_local_repo, remote_repo| async move {
let remote_repo_copy = remote_repo.clone();
test::run_empty_dir_test_async(|repo_dir| async move {
let repo_dir = repo_dir.join("cloned_repo");
let cloned_repo =
repositories::clone_url(&remote_repo.remote.url, &repo_dir).await?;
let version_store = cloned_repo.version_store();
let versions = version_store.list_versions().await?;
assert!(
!versions.is_empty(),
"Cloned repo should have version files"
);
let deleted_hash = versions[0].clone();
version_store.delete_version(&deleted_hash).await?;
assert!(
!version_store.version_exists(&deleted_hash).await?,
"Version file should be deleted"
);
repositories::pull(&cloned_repo).await?;
assert!(
!version_store.version_exists(&deleted_hash).await?,
"Normal pull should not re-download deleted version"
);
let fetch_opts = FetchOpts {
missing_files: true,
..FetchOpts::new()
};
repositories::pull_remote_branch(&cloned_repo, &fetch_opts).await?;
assert!(
version_store.version_exists(&deleted_hash).await?,
"pull --missing-files should re-download deleted version"
);
Ok(())
})
.await?;
Ok(remote_repo_copy)
})
.await
}
}