use crate::api::client;
use crate::constants::{DEFAULT_PAGE_NUM, DIR_HASHES_DIR, DIRS_DIR, HISTORY_DIR};
use crate::error::OxenError;
use crate::model::commit::CommitWithBranchName;
use crate::model::entry::unsynced_commit_entry::UnsyncedCommitEntries;
use crate::model::{Branch, Commit, CommitEntry, LocalRepository, MerkleHash, RemoteRepository};
use crate::opts::PaginateOpts;
use crate::util::hasher::hash_buffer;
use crate::util::progress_bar::{ProgressBarType, oxify_bar};
use crate::view::tree::merkle_hashes::MerkleHashes;
use crate::{api, constants, repositories};
use crate::{current_function, util};
use crate::view::entries::ListCommitEntryResponse;
use crate::view::{
CommitResponse, ListCommitResponse, MerkleHashesResponse, PaginatedCommits, RootCommitResponse,
StatusMessage,
};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::str;
use std::sync::Arc;
use async_compression::futures::bufread::GzipDecoder;
use async_tar::Archive;
use bytesize::ByteSize;
use flate2::Compression;
use flate2::write::GzEncoder;
use futures_util::TryStreamExt;
use http::header::CONTENT_LENGTH;
use indicatif::{ProgressBar, ProgressStyle};
pub struct ChunkParams {
pub chunk_num: usize,
pub total_chunks: usize,
pub total_size: usize,
}
pub async fn get_by_id(
repository: &RemoteRepository,
commit_id: impl AsRef<str>,
) -> Result<Option<Commit>, OxenError> {
let commit_id = commit_id.as_ref();
let uri = format!("/commits/{commit_id}");
let url = api::endpoint::url_from_repo(repository, &uri)?;
log::debug!("remote::commits::get_by_id {url}");
let client = client::new_for_url(&url)?;
let res = client.get(&url).send().await?;
if res.status() == 404 {
return Ok(None);
}
let body = client::parse_json_body(&url, res).await?;
log::debug!("api::client::commits::get_by_id Got response {body}");
let response: Result<CommitResponse, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(j_res) => Ok(Some(j_res.commit)),
Err(err) => Err(OxenError::basic_str(format!(
"get_commit_by_id() Could not deserialize response [{err}]\n{body}"
))),
}
}
pub async fn list_commits_for_path(
remote_repo: &RemoteRepository,
revision: impl AsRef<str>,
path: impl AsRef<Path>,
page_opts: &PaginateOpts,
) -> Result<PaginatedCommits, OxenError> {
let revision = revision.as_ref();
let path = path.as_ref();
let path_str = path.to_string_lossy();
let uri = format!(
"/commits/history/{revision}/{path_str}?page={}&page_size={}",
page_opts.page_num, page_opts.page_size
);
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let res = client.get(&url).send().await?;
let body = client::parse_json_body(&url, res).await?;
let response: Result<PaginatedCommits, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(j_res) => Ok(j_res),
Err(err) => Err(OxenError::basic_str(format!(
"list_commits_for_file() Could not deserialize response [{err}]\n{body}"
))),
}
}
pub async fn list_missing_hashes(
remote_repo: &RemoteRepository,
commits: Vec<Commit>,
) -> Result<Vec<Commit>, OxenError> {
let uri = "/commits/missing".to_string();
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let commit_hashes = commits
.iter()
.map(|c| c.hash().unwrap())
.collect::<HashSet<MerkleHash>>();
let res = client
.post(&url)
.json(&MerkleHashes {
hashes: commit_hashes,
})
.send()
.await?;
let body = client::parse_json_body(&url, res).await?;
let response: Result<MerkleHashesResponse, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(response) => {
let hashes = response.hashes;
Ok(commits
.iter()
.filter(|c| hashes.contains(&c.hash().unwrap()))
.cloned()
.collect())
}
Err(err) => Err(OxenError::basic_str(format!(
"api::client::tree::list_missing_hashes() Could not deserialize response [{err}]\n{body}"
))),
}
}
pub async fn list_missing_files(
remote_repo: &RemoteRepository,
base_commit: Option<Commit>,
head_commit_id: &str,
) -> Result<Vec<CommitEntry>, OxenError> {
let url = match base_commit {
Some(base_commit) => {
let base_commit_id = base_commit.id;
let uri = format!("/commits/missing_files?base={base_commit_id}&head={head_commit_id}");
crate::api::endpoint::url_from_repo(remote_repo, &uri)?
}
None => {
let uri = format!("/commits/missing_files?head={head_commit_id}");
crate::api::endpoint::url_from_repo(remote_repo, &uri)?
}
};
let client = client::new_for_url(&url)?;
let res = client.get(&url).send().await?;
let body = client::parse_json_body(&url, res).await?;
let response: Result<ListCommitEntryResponse, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(response) => Ok(response.entries),
Err(err) => Err(OxenError::basic_str(format!(
"api::client::commits::list_missing_files() Could not deserialize response [{err}]\n{body}"
))),
}
}
pub async fn mark_commits_as_synced(
remote_repo: &RemoteRepository,
commit_hashes: HashSet<MerkleHash>,
) -> Result<(), OxenError> {
let uri = "/commits/mark_commits_as_synced".to_string();
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
let res = client
.post(&url)
.json(&MerkleHashes {
hashes: commit_hashes,
})
.send()
.await?;
let body = client::parse_json_body(&url, res).await?;
let response: Result<MerkleHashesResponse, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(_response) => Ok(()),
Err(err) => Err(OxenError::basic_str(format!(
"api::client::tree::list_missing_hashes() Could not deserialize response [{err}]\n{body}"
))),
}
}
pub async fn list_commit_history(
remote_repo: &RemoteRepository,
revision: &str,
) -> Result<Vec<Commit>, OxenError> {
let mut all_commits: Vec<Commit> = Vec::new();
let mut page_num = DEFAULT_PAGE_NUM;
let page_size = 100;
let bar = Arc::new(ProgressBar::new_spinner());
bar.set_style(ProgressStyle::default_spinner());
loop {
let page_opts = PaginateOpts {
page_num,
page_size,
};
match list_commit_history_paginated(remote_repo, revision, &page_opts).await {
Ok(paginated_commits) => {
if page_num == DEFAULT_PAGE_NUM {
let bar = oxify_bar(bar.clone(), ProgressBarType::Counter);
bar.set_length(paginated_commits.pagination.total_entries as u64);
}
let n_commits = paginated_commits.commits.len();
all_commits.extend(paginated_commits.commits);
bar.inc(n_commits as u64);
if page_num < paginated_commits.pagination.total_pages {
page_num += 1;
} else {
break;
}
}
Err(err) => {
return Err(err);
}
}
}
bar.finish_and_clear();
Ok(all_commits)
}
pub async fn list_commit_history_paginated(
remote_repo: &RemoteRepository,
revision: &str,
page_opts: &PaginateOpts,
) -> Result<PaginatedCommits, OxenError> {
let page_num = page_opts.page_num;
let page_size = page_opts.page_size;
let uri = format!("/commits/history/{revision}?page={page_num}&page_size={page_size}");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::new_for_url(&url)?;
match client.get(&url).send().await {
Ok(res) => {
let body = client::parse_json_body(&url, res).await?;
let response: Result<PaginatedCommits, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(j_res) => Ok(j_res),
Err(err) => Err(OxenError::basic_str(format!(
"list_commit_history() Could not deserialize response [{err}]\n{body}"
))),
}
}
Err(err) => Err(OxenError::basic_str(format!(
"list_commit_history() Request failed: {err}"
))),
}
}
pub async fn root_commit_maybe(
remote_repo: &RemoteRepository,
) -> Result<Option<Commit>, OxenError> {
let uri = "/commits/root".to_string();
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("remote::commits::root_commit {url}");
let client = client::new_for_url(&url)?;
if let Ok(res) = client.get(&url).send().await {
let body = client::parse_json_body(&url, res).await?;
log::debug!("api::client::commits::root_commit Got response {body}");
let response: Result<RootCommitResponse, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(j_res) => Ok(j_res.commit),
Err(err) => Err(OxenError::basic_str(format!(
"root_commit() Could not deserialize response [{err}]\n{body}"
))),
}
} else {
Err(OxenError::basic_str("root_commit() Request failed"))
}
}
#[tracing::instrument(skip(remote_repo, path), fields(commit_id))]
pub async fn download_dir_hashes_from_commit(
remote_repo: &RemoteRepository,
commit_id: &str,
path: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let uri = format!("/commits/{commit_id}/download_dir_hashes_db");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("calling download_dir_hashes_from_commit for commit {commit_id}");
download_dir_hashes_from_url(url, path).await
}
#[tracing::instrument(skip(remote_repo, path), fields(base_commit_id, head_commit_id))]
pub async fn download_base_head_dir_hashes(
remote_repo: &RemoteRepository,
base_commit_id: &str,
head_commit_id: &str,
path: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let uri = format!("/commits/{base_commit_id}..{head_commit_id}/download_dir_hashes_db");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!(
"calling download_base_head_dir_hashes for commits {base_commit_id}..{head_commit_id}"
);
download_dir_hashes_from_url(url, path).await
}
pub async fn download_dir_hashes_from_url(
url: impl AsRef<str>,
path: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let url = url.as_ref();
log::debug!("{} downloading from {}", current_function!(), url);
let client = client::new_for_url(url)?;
match client.get(url).send().await {
Ok(res) => {
let path = path.as_ref();
let reader = res
.bytes_stream()
.map_err(futures::io::Error::other)
.into_async_read();
let decoder = GzipDecoder::new(futures::io::BufReader::new(reader));
let archive = Archive::new(decoder);
let full_unpacked_path = path;
log::debug!("unpacking to {full_unpacked_path:?}");
util::fs::unpack_async_tar_archive(archive, full_unpacked_path).await?;
log::debug!("archive_result for url {url} is ok");
Ok(path.to_path_buf())
}
Err(err) => {
let error = format!("Error fetching commit db: {err}");
Err(OxenError::basic_str(error))
}
}
}
#[tracing::instrument(skip(remote_repo, path), fields(commit_id))]
pub async fn download_dir_hashes_db_to_path(
remote_repo: &RemoteRepository,
commit_id: &str,
path: impl AsRef<Path>,
) -> Result<PathBuf, OxenError> {
let uri = format!("/commits/{commit_id}/commit_db");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("calling download_dir_hashes_db_to_path for commit {commit_id}");
log::debug!("{} downloading from {}", current_function!(), url);
let client = client::new_for_url(&url)?;
match client.get(url).send().await {
Ok(res) => {
let path = path.as_ref();
let reader = res
.bytes_stream()
.map_err(futures::io::Error::other)
.into_async_read();
let decoder = GzipDecoder::new(futures::io::BufReader::new(reader));
let archive = Archive::new(decoder);
let full_unpacked_path = path.join(HISTORY_DIR).join(commit_id);
let tmp_path = path.join("tmp").join(commit_id).join("commits_db");
util::fs::create_dir_all(&tmp_path)?;
util::fs::unpack_async_tar_archive(archive, &tmp_path).await?;
log::debug!("archive_result for commit {commit_id:?} is ok");
if full_unpacked_path.exists() {
log::debug!(
"{} removing existing {:?}",
current_function!(),
full_unpacked_path
);
util::fs::remove_dir_all(&full_unpacked_path)?;
} else {
log::debug!("{} creating {:?}", current_function!(), full_unpacked_path);
if let Some(parent) = full_unpacked_path.parent() {
util::fs::create_dir_all(parent)?;
} else {
log::error!(
"{} no parent found for {:?}",
current_function!(),
full_unpacked_path
);
}
}
log::debug!("renaming {tmp_path:?} to {full_unpacked_path:?}");
util::fs::rename(
tmp_path.join(HISTORY_DIR).join(commit_id),
&full_unpacked_path,
)?;
log::debug!("{} writing to {:?}", current_function!(), path);
Ok(path.to_path_buf())
}
Err(err) => {
let error = format!("Error fetching commit db: {err}");
Err(OxenError::basic_str(error))
}
}
}
pub async fn post_push_complete(
remote_repo: &RemoteRepository,
branch: &Branch,
commit_id: impl AsRef<str>,
) -> Result<(), OxenError> {
use serde_json::json;
let commit_id = commit_id.as_ref();
let uri = format!("/commits/{commit_id}/complete");
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
log::debug!("post_push_complete: {url}");
let body = serde_json::to_string(&json!({
"branch": {
"name": branch.name,
"commit_id": commit_id,
}
}))
.unwrap();
let client = client::new_for_url(&url)?;
if let Ok(res) = client.post(&url).body(body).send().await {
let body = client::parse_json_body(&url, res).await?;
let response: Result<StatusMessage, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(_) => Ok(()),
Err(err) => Err(OxenError::basic_str(format!(
"post_push_complete() Could not deserialize response [{err}]\n{body}"
))),
}
} else {
Err(OxenError::basic_str("post_push_complete() Request failed"))
}
}
pub async fn post_commits_to_server(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
commits: &Vec<UnsyncedCommitEntries>,
branch_name: String,
) -> Result<(), OxenError> {
let mut commits_with_size: Vec<CommitWithBranchName> = Vec::new();
for commit_with_entries in commits {
let commit_history_dir = util::fs::oxen_hidden_dir(&local_repo.path)
.join(HISTORY_DIR)
.join(&commit_with_entries.commit.id);
let entries_size =
repositories::entries::compute_entries_size(&commit_with_entries.entries)?;
let size = match fs_extra::dir::get_size(&commit_history_dir) {
Ok(size) => size + entries_size,
Err(err) => {
log::warn!("Err {err}: {commit_history_dir:?}");
entries_size
}
};
let commit_with_size = CommitWithBranchName::from_commit(
&commit_with_entries.commit,
size,
branch_name.clone(),
);
commits_with_size.push(commit_with_size);
}
bulk_create_commit_obj_on_server(remote_repo, &commits_with_size).await?;
Ok(())
}
pub async fn post_commit_dir_hashes_to_server(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
commit: &Commit,
) -> Result<(), OxenError> {
let commit_dir = util::fs::oxen_hidden_dir(&local_repo.path)
.join(HISTORY_DIR)
.join(commit.id.clone());
let tar_subdir = Path::new(HISTORY_DIR).join(commit.id.clone());
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
let dirs_to_compress = vec![DIRS_DIR, DIR_HASHES_DIR];
for dir in &dirs_to_compress {
let full_path = commit_dir.join(dir);
let tar_path = tar_subdir.join(dir);
if full_path.exists() {
tar.append_dir_all(&tar_path, full_path)?;
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let is_compressed = true;
let filename = None;
let quiet_bar = Arc::new(ProgressBar::hidden());
let client = client::new_for_remote_repo(remote_repo)?;
post_data_to_server_with_client(
&client,
remote_repo,
buffer,
is_compressed,
&filename,
quiet_bar,
)
.await
}
pub async fn post_commits_dir_hashes_to_server(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
commits: &Vec<Commit>,
) -> Result<(), OxenError> {
let enc = GzEncoder::new(Vec::new(), Compression::default());
let mut tar = tar::Builder::new(enc);
for commit in commits {
let commit_dir = util::fs::oxen_hidden_dir(&local_repo.path)
.join(HISTORY_DIR)
.join(commit.id.clone());
let tar_subdir = Path::new(HISTORY_DIR).join(commit.id.clone());
let dirs_to_compress = vec![DIRS_DIR, DIR_HASHES_DIR];
for dir in &dirs_to_compress {
let full_path = commit_dir.join(dir);
let tar_path = tar_subdir.join(dir);
if full_path.exists() {
tar.append_dir_all(&tar_path, full_path)?;
}
}
}
tar.finish()?;
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let is_compressed = true;
let filename = None;
let quiet_bar = Arc::new(ProgressBar::hidden());
let client = client::new_for_remote_repo(remote_repo)?;
post_data_to_server_with_client(
&client,
remote_repo,
buffer,
is_compressed,
&filename,
quiet_bar,
)
.await
}
pub async fn bulk_create_commit_obj_on_server(
remote_repo: &RemoteRepository,
commits: &Vec<CommitWithBranchName>,
) -> Result<ListCommitResponse, OxenError> {
let url = api::endpoint::url_from_repo(remote_repo, "/commits/bulk")?;
log::debug!("bulk_create_commit_obj_on_server {url}\n{commits:?}");
let client = client::new_for_url(&url)?;
if let Ok(res) = client.post(&url).json(commits).send().await {
let body = client::parse_json_body(&url, res).await?;
log::debug!("bulk_create_commit_obj_on_server got response {body}");
let response: Result<ListCommitResponse, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(response) => Ok(response),
Err(_) => Err(OxenError::basic_str(format!(
"bulk_create_commit_obj_on_server Err deserializing \n\n{body}"
))),
}
} else {
Err(OxenError::basic_str(
"bulk_create_commit_obj_on_server error sending data from file",
))
}
}
pub async fn post_data_to_server_with_client(
client: &reqwest::Client,
remote_repo: &RemoteRepository,
buffer: Vec<u8>,
is_compressed: bool,
filename: &Option<String>,
bar: Arc<ProgressBar>,
) -> Result<(), OxenError> {
let chunk_size: usize = constants::AVG_CHUNK_SIZE as usize;
if buffer.len() > chunk_size {
upload_data_to_server_in_chunks_with_client(
client,
remote_repo,
&buffer,
chunk_size,
is_compressed,
filename,
)
.await?;
} else {
upload_single_tarball_to_server_with_client_with_retry(client, remote_repo, &buffer, bar)
.await?;
}
Ok(())
}
pub async fn upload_single_tarball_to_server_with_client_with_retry(
client: &reqwest::Client,
remote_repo: &RemoteRepository,
buffer: &[u8],
bar: Arc<ProgressBar>,
) -> Result<(), OxenError> {
let mut total_tries = 0;
while total_tries < constants::NUM_HTTP_RETRIES {
match upload_single_tarball_to_server_with_client(
client,
remote_repo,
buffer,
bar.to_owned(),
)
.await
{
Ok(_) => {
return Ok(());
}
Err(err) => {
total_tries += 1;
let sleep_time = total_tries * total_tries;
log::debug!(
"upload_single_tarball_to_server_with_retry upload failed sleeping {sleep_time}: {err:?}"
);
std::thread::sleep(std::time::Duration::from_secs(sleep_time));
}
}
}
Err(OxenError::basic_str("Upload retry failed."))
}
async fn upload_single_tarball_to_server_with_client(
client: &reqwest::Client,
remote_repo: &RemoteRepository,
buffer: &[u8],
bar: Arc<ProgressBar>,
) -> Result<StatusMessage, OxenError> {
let uri = "/commits/upload".to_string();
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let size = buffer.len() as u64;
let res = client.post(&url).body(buffer.to_owned()).send().await?;
let body = client::parse_json_body(&url, res).await?;
let response: Result<StatusMessage, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(response) => {
bar.inc(size);
Ok(response)
}
Err(_) => Err(OxenError::basic_str(format!(
"upload_single_tarball_to_server Err deserializing \n\n{body}"
))),
}
}
async fn upload_data_to_server_in_chunks_with_client(
client: &reqwest::Client,
remote_repo: &RemoteRepository,
buffer: &[u8],
chunk_size: usize,
is_compressed: bool,
filename: &Option<String>,
) -> Result<(), OxenError> {
let total_size = buffer.len();
log::debug!("upload_data_to_server_in_chunks chunking data {total_size} ...");
let chunks: Vec<&[u8]> = buffer.chunks(chunk_size).collect();
let hash = hash_buffer(buffer);
log::debug!(
"upload_data_to_server_in_chunks got {} chunks from {}",
chunks.len(),
ByteSize::b(total_size as u64)
);
for (i, chunk) in chunks.iter().enumerate() {
log::debug!(
"upload_data_to_server_in_chunks uploading chunk {} of size {}",
i,
ByteSize::b(chunks.len() as u64)
);
let params = ChunkParams {
chunk_num: i,
total_chunks: chunks.len(),
total_size,
};
match upload_data_chunk_to_server_with_retry(
client,
remote_repo,
chunk,
&hash,
¶ms,
is_compressed,
filename,
)
.await
{
Ok(_) => {
log::debug!("Success uploading chunk!")
}
Err(err) => {
log::error!("Err uploading chunk: {err}")
}
}
}
Ok(())
}
pub async fn upload_data_chunk_to_server_with_retry(
client: &reqwest::Client,
remote_repo: &RemoteRepository,
chunk: &[u8],
hash: &str,
params: &ChunkParams,
is_compressed: bool,
filename: &Option<String>,
) -> Result<(), OxenError> {
let mut total_tries = 0;
let mut last_error = String::from("");
while total_tries < constants::NUM_HTTP_RETRIES {
match upload_data_chunk_to_server(
client,
remote_repo,
chunk,
hash,
params,
is_compressed,
filename,
)
.await
{
Ok(_) => {
return Ok(());
}
Err(err) => {
total_tries += 1;
let sleep_time = total_tries * total_tries;
log::debug!(
"upload_data_chunk_to_server_with_retry upload failed sleeping {sleep_time}: {err}"
);
last_error = format!("{err}");
std::thread::sleep(std::time::Duration::from_secs(sleep_time));
}
}
}
Err(OxenError::basic_str(format!(
"Upload chunk retry failed. {last_error}"
)))
}
async fn upload_data_chunk_to_server(
client: &reqwest::Client,
remote_repo: &RemoteRepository,
chunk: &[u8],
hash: &str,
params: &ChunkParams,
is_compressed: bool,
filename: &Option<String>,
) -> Result<StatusMessage, OxenError> {
let maybe_filename = if !is_compressed {
format!(
"&filename={}",
urlencoding::encode(
filename
.as_ref()
.expect("Must provide filename if !compressed")
)
)
} else {
String::from("")
};
let uri = format!(
"/commits/upload_chunk?chunk_num={}&total_size={}&hash={}&total_chunks={}&is_compressed={}{}",
params.chunk_num,
params.total_size,
hash,
params.total_chunks,
is_compressed,
maybe_filename
);
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let total_size = chunk.len() as u64;
log::debug!(
"upload_data_chunk_to_server posting {} to url {}",
ByteSize::b(total_size),
url
);
let res = client
.post(&url)
.header(CONTENT_LENGTH, total_size.to_string())
.body(chunk.to_owned())
.send()
.await?;
let body = client::parse_json_body(&url, res).await?;
log::debug!("upload_data_chunk_to_server got response {body}");
let response: Result<StatusMessage, serde_json::Error> = serde_json::from_str(&body);
match response {
Ok(response) => Ok(response),
Err(err) => Err(OxenError::basic_str(format!(
"upload_data_chunk_to_server Err deserializing: {err}"
))),
}
}
#[cfg(test)]
mod tests {
use crate::api;
use crate::command;
use crate::constants;
use crate::constants::DEFAULT_BRANCH_NAME;
use crate::error::OxenError;
use crate::repositories;
use crate::test;
#[tokio::test]
async fn test_list_remote_commits_all() -> Result<(), OxenError> {
test::run_training_data_repo_test_fully_committed_async(|local_repo| async move {
let mut local_repo = local_repo;
let commit_history = repositories::commits::list(&local_repo)?;
let num_local_commits = commit_history.len();
let name = local_repo.dirname();
let remote = test::repo_remote_url_from(&name);
command::config::set_remote(&mut local_repo, constants::DEFAULT_REMOTE_NAME, &remote)?;
let remote_repo = test::create_remote_repo(&local_repo).await?;
repositories::push(&local_repo).await?;
let remote_commits =
api::client::commits::list_commit_history(&remote_repo, DEFAULT_BRANCH_NAME)
.await?;
assert_eq!(remote_commits.len(), num_local_commits);
api::client::repositories::delete(&remote_repo).await?;
Ok(())
})
.await
}
#[tokio::test]
async fn test_list_remote_commits_base_head() -> Result<(), OxenError> {
test::run_training_data_fully_sync_remote(|local_repo, remote_repo| async move {
let commit_history = repositories::commits::list(&local_repo)?;
assert!(commit_history.len() >= 6);
let head_commit = &commit_history[1];
let base_commit = &commit_history[4];
println!("base_commit: {base_commit}\nhead_commit: {head_commit}");
let revision = format!("{}..{}", base_commit.id, head_commit.id);
let remote_commits =
api::client::commits::list_commit_history(&remote_repo, &revision).await?;
for commit in remote_commits.iter() {
println!("got commit: {} -> {}", commit.id, commit.message);
}
assert_eq!(remote_commits.len(), 4);
Ok(remote_repo)
})
.await
}
#[tokio::test]
async fn test_list_missing_hashes_filters_via_merkle_tree() -> Result<(), OxenError> {
test::run_one_commit_sync_repo_test(|local_repo, remote_repo| async move {
let commit = repositories::commits::head_commit(&local_repo)?;
let missing =
api::client::commits::list_missing_hashes(&remote_repo, vec![commit]).await?;
assert_eq!(missing.len(), 0);
let file_path = local_repo.path.join("test.txt");
let file_path = test::write_txt_file_to_path(file_path, "image,label\n1,2\n3,4\n5,6")?;
repositories::add(&local_repo, &file_path).await?;
let new_commit = repositories::commit(&local_repo, "test")?;
let missing =
api::client::commits::list_missing_hashes(&remote_repo, vec![new_commit.clone()])
.await?;
assert_eq!(missing.len(), 1);
assert!(missing.contains(&new_commit));
Ok(remote_repo)
})
.await
}
}