use std::collections::HashMap;
use std::pin::pin;
use async_trait::async_trait;
use clayers_xml::ContentHash;
use futures_core::Stream;
use crate::error::{Error, Result};
use crate::graph;
use crate::object::Object;
use crate::store::{ObjectStore, RefStore};
async fn try_collect_stream<S>(stream: S) -> Result<HashMap<ContentHash, Object>>
where
S: Stream<Item = Result<(ContentHash, Object)>>,
{
let mut stream = pin!(stream);
let mut map = HashMap::new();
while let Some(item) = std::future::poll_fn(|cx| stream.as_mut().poll_next(cx)).await {
let (hash, obj) = item?;
map.insert(hash, obj);
}
Ok(map)
}
#[async_trait]
pub trait RefConflict: Send + Sync {
async fn resolve(
&self,
store: &dyn ObjectStore,
ref_name: &str,
src_hash: ContentHash,
dst_hash: ContentHash,
) -> Result<bool>;
}
pub struct FastForwardOnly;
#[async_trait]
impl RefConflict for FastForwardOnly {
async fn resolve(
&self,
store: &dyn ObjectStore,
_ref_name: &str,
src_hash: ContentHash,
dst_hash: ContentHash,
) -> Result<bool> {
let lca = graph::common_ancestor(store, src_hash, dst_hash).await?;
if lca == Some(dst_hash) {
Ok(true)
} else {
Err(Error::Ref(
"cannot fast-forward: destination is not an ancestor of source".into(),
))
}
}
}
pub struct Overwrite;
#[async_trait]
impl RefConflict for Overwrite {
async fn resolve(
&self,
_store: &dyn ObjectStore,
_ref_name: &str,
_src_hash: ContentHash,
_dst_hash: ContentHash,
) -> Result<bool> {
Ok(true)
}
}
pub struct Reject;
#[async_trait]
impl RefConflict for Reject {
async fn resolve(
&self,
_store: &dyn ObjectStore,
_ref_name: &str,
_src_hash: ContentHash,
_dst_hash: ContentHash,
) -> Result<bool> {
Err(Error::Ref(
"destination ref already exists with a different value".into(),
))
}
}
pub async fn transfer_objects(
src: &dyn ObjectStore,
dst: &dyn ObjectStore,
root: ContentHash,
) -> Result<usize> {
let src_objects = try_collect_stream(src.subtree(&root)).await?;
let mut missing = Vec::new();
for (hash, obj) in &src_objects {
if !dst.contains(hash).await? {
missing.push((*hash, obj.clone()));
}
}
if missing.is_empty() {
return Ok(0);
}
let count = missing.len();
let mut tx = dst.transaction().await?;
for (hash, obj) in missing {
tx.put(hash, obj).await?;
}
tx.commit().await?;
Ok(count)
}
pub async fn sync_ref(
src_objects: &dyn ObjectStore,
src_refs: &dyn RefStore,
dst_objects: &dyn ObjectStore,
dst_refs: &dyn RefStore,
ref_name: &str,
on_conflict: &dyn RefConflict,
) -> Result<bool> {
let src_hash = src_refs
.get_ref(ref_name)
.await?
.ok_or_else(|| Error::Ref(format!("source ref not found: {ref_name}")))?;
let dst_hash = dst_refs.get_ref(ref_name).await?;
if let Some(dst_hash) = dst_hash {
if dst_hash == src_hash {
return Ok(false);
}
transfer_objects(src_objects, dst_objects, src_hash).await?;
let proceed = on_conflict
.resolve(dst_objects, ref_name, src_hash, dst_hash)
.await?;
if !proceed {
return Ok(false);
}
} else {
transfer_objects(src_objects, dst_objects, src_hash).await?;
}
dst_refs.set_ref(ref_name, src_hash).await?;
Ok(true)
}
pub async fn sync_refs(
src_objects: &dyn ObjectStore,
src_refs: &dyn RefStore,
dst_objects: &dyn ObjectStore,
dst_refs: &dyn RefStore,
prefix: &str,
on_conflict: &dyn RefConflict,
) -> Result<usize> {
let refs = src_refs.list_refs(prefix).await?;
let mut count = 0;
for (ref_name, _) in &refs {
let updated = sync_ref(
src_objects,
src_refs,
dst_objects,
dst_refs,
ref_name,
on_conflict,
)
.await?;
if updated {
count += 1;
}
}
Ok(count)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::object::{
Author, CommentObject, CommitObject, DocumentObject, ElementObject, PIObject, TagObject,
TextObject, TreeEntry, TreeObject,
};
use crate::store::memory::MemoryStore;
use chrono::Utc;
use proptest::prelude::*;
use tokio_stream::StreamExt as _;
fn author() -> Author {
Author {
name: "Test".into(),
email: "test@test.com".into(),
}
}
async fn build_commit(
store: &MemoryStore,
id: &[u8],
parents: Vec<ContentHash>,
) -> (ContentHash, ContentHash) {
let text_hash = ContentHash::from_canonical(id);
let text = TextObject {
content: String::from_utf8_lossy(id).into(),
};
let elem_id: Vec<u8> = id.iter().chain(b"elem").copied().collect();
let elem_hash = ContentHash::from_canonical(&elem_id);
let elem = ElementObject {
local_name: "root".into(),
namespace_uri: None,
namespace_prefix: None,
extra_namespaces: vec![],
attributes: vec![],
children: vec![text_hash],
inclusive_hash: elem_hash,
};
let doc_id: Vec<u8> = id.iter().chain(b"doc").copied().collect();
let doc_hash = ContentHash::from_canonical(&doc_id);
let doc = DocumentObject { root: elem_hash, prologue: vec![] };
let tree_id: Vec<u8> = id.iter().chain(b"tree").copied().collect();
let tree_hash = ContentHash::from_canonical(&tree_id);
let tree = TreeObject::new(vec![
TreeEntry { path: "doc.xml".into(), document: doc_hash },
]);
let commit_id: Vec<u8> = id.iter().chain(b"commit").copied().collect();
let commit_hash = ContentHash::from_canonical(&commit_id);
let commit = CommitObject {
tree: tree_hash,
parents,
author: author(),
timestamp: Utc::now(),
message: format!("commit {}", String::from_utf8_lossy(id)),
};
let mut tx = store.transaction().await.unwrap();
tx.put(text_hash, Object::Text(text)).await.unwrap();
tx.put(elem_hash, Object::Element(elem)).await.unwrap();
tx.put(doc_hash, Object::Document(doc)).await.unwrap();
tx.put(tree_hash, Object::Tree(tree)).await.unwrap();
tx.put(commit_hash, Object::Commit(commit)).await.unwrap();
tx.commit().await.unwrap();
(commit_hash, doc_hash)
}
#[tokio::test]
async fn sync_transfer_objects_copies_missing() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
let count = transfer_objects(&src, &dst, commit_hash).await.unwrap();
assert_eq!(count, 5);
assert!(dst.contains(&commit_hash).await.unwrap());
}
#[tokio::test]
async fn sync_transfer_idempotent() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
transfer_objects(&src, &dst, commit_hash).await.unwrap();
let second = transfer_objects(&src, &dst, commit_hash).await.unwrap();
assert_eq!(second, 0, "second transfer should copy 0 objects");
}
#[tokio::test]
async fn sync_ref_fast_forward() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
dst.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/main", c2).await.unwrap();
let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
.await
.unwrap();
assert!(updated, "fast-forward should report ref was updated");
let dst_ref = dst.get_ref("refs/heads/main").await.unwrap();
assert_eq!(dst_ref, Some(c2));
}
#[tokio::test]
async fn sync_ref_fast_forward_rejects_diverged() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
let (c3, _) = build_commit(&dst, b"c3", vec![c1]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
src.set_ref("refs/heads/main", c2).await.unwrap();
dst.set_ref("refs/heads/main", c3).await.unwrap();
let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly).await;
assert!(result.is_err(), "should reject diverged histories");
}
#[tokio::test]
async fn sync_ref_overwrite_always_succeeds() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
dst.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/main", c2).await.unwrap();
let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Overwrite)
.await
.unwrap();
assert!(updated, "overwrite should report ref was updated");
let dst_ref = dst.get_ref("refs/heads/main").await.unwrap();
assert_eq!(dst_ref, Some(c2));
}
#[tokio::test]
async fn sync_ref_reject_fails_when_different() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
dst.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/main", c2).await.unwrap();
let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject).await;
assert!(result.is_err(), "should reject when refs differ");
}
#[tokio::test]
async fn sync_refs_with_prefix() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/feature", c2).await.unwrap();
let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
.await
.unwrap();
assert_eq!(count, 2);
assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
assert_eq!(dst.get_ref("refs/heads/feature").await.unwrap(), Some(c2));
}
#[tokio::test]
async fn sync_ref_missing_src_ref_errors() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let result =
sync_ref(&src, &src, &dst, &dst, "refs/heads/missing", &Overwrite).await;
assert!(result.is_err(), "missing source ref should error");
}
#[tokio::test]
async fn sync_ref_already_up_to_date() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
src.set_ref("refs/heads/main", c1).await.unwrap();
dst.set_ref("refs/heads/main", c1).await.unwrap();
let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject)
.await
.unwrap();
assert!(!updated, "already up-to-date should report false");
}
#[tokio::test]
async fn sync_reachable_follows_tag_to_commit() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (commit_hash, _) = build_commit(&src, b"c1", vec![]).await;
let tag_id = b"tag-v1";
let tag_hash = ContentHash::from_canonical(tag_id);
let tag = TagObject {
target: commit_hash,
name: "v1.0".into(),
tagger: author(),
timestamp: Utc::now(),
message: "release".into(),
};
let mut tx = src.transaction().await.unwrap();
tx.put(tag_hash, Object::Tag(tag)).await.unwrap();
tx.commit().await.unwrap();
let count = transfer_objects(&src, &dst, tag_hash).await.unwrap();
assert_eq!(count, 6);
assert!(dst.contains(&tag_hash).await.unwrap());
assert!(dst.contains(&commit_hash).await.unwrap());
}
#[tokio::test]
async fn sync_reachable_follows_comment_and_pi_leaves() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let comment_hash = ContentHash::from_canonical(b"comment1");
let comment = CommentObject {
content: "a comment".into(),
};
let pi_hash = ContentHash::from_canonical(b"pi1");
let pi = PIObject {
target: "xml-stylesheet".into(),
data: Some("type=\"text/xsl\"".into()),
};
let elem_hash = ContentHash::from_canonical(b"elem-mixed");
let elem = ElementObject {
local_name: "root".into(),
namespace_uri: None,
namespace_prefix: None,
extra_namespaces: vec![],
attributes: vec![],
children: vec![comment_hash, pi_hash],
inclusive_hash: elem_hash,
};
let doc_hash = ContentHash::from_canonical(b"doc-mixed");
let doc = DocumentObject { root: elem_hash, prologue: vec![] };
let tree_hash = ContentHash::from_canonical(b"tree-mixed");
let tree = TreeObject::new(vec![
TreeEntry { path: "doc.xml".into(), document: doc_hash },
]);
let commit_hash = ContentHash::from_canonical(b"commit-mixed");
let commit = CommitObject {
tree: tree_hash,
parents: vec![],
author: author(),
timestamp: Utc::now(),
message: "mixed content".into(),
};
let mut tx = src.transaction().await.unwrap();
tx.put(comment_hash, Object::Comment(comment)).await.unwrap();
tx.put(pi_hash, Object::PI(pi)).await.unwrap();
tx.put(elem_hash, Object::Element(elem)).await.unwrap();
tx.put(doc_hash, Object::Document(doc)).await.unwrap();
tx.put(tree_hash, Object::Tree(tree)).await.unwrap();
tx.put(commit_hash, Object::Commit(commit)).await.unwrap();
tx.commit().await.unwrap();
let count = transfer_objects(&src, &dst, commit_hash).await.unwrap();
assert_eq!(count, 6); assert!(dst.contains(&comment_hash).await.unwrap());
assert!(dst.contains(&pi_hash).await.unwrap());
}
#[tokio::test]
async fn sync_transfer_copies_all_inner_objects() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (commit_hash, doc_hash) = build_commit(&src, b"c1", vec![]).await;
let text_hash = ContentHash::from_canonical(b"c1");
let elem_hash = ContentHash::from_canonical(b"c1elem");
let tree_hash = ContentHash::from_canonical(b"c1tree");
transfer_objects(&src, &dst, commit_hash).await.unwrap();
assert!(dst.contains(&commit_hash).await.unwrap(), "commit missing");
assert!(dst.contains(&tree_hash).await.unwrap(), "tree missing");
assert!(dst.contains(&doc_hash).await.unwrap(), "document missing");
assert!(dst.contains(&elem_hash).await.unwrap(), "element missing");
assert!(dst.contains(&text_hash).await.unwrap(), "text missing");
let src_text = src.get(&text_hash).await.unwrap().unwrap();
let dst_text = dst.get(&text_hash).await.unwrap().unwrap();
assert_eq!(src_text, dst_text);
}
#[tokio::test]
async fn sync_transfer_follows_parent_chain() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
let (c3, _) = build_commit(&src, b"c3", vec![c2]).await;
let count = transfer_objects(&src, &dst, c3).await.unwrap();
assert_eq!(count, 15);
assert!(dst.contains(&c1).await.unwrap(), "ancestor c1 missing");
assert!(dst.contains(&c2).await.unwrap(), "ancestor c2 missing");
assert!(dst.contains(&c3).await.unwrap(), "tip c3 missing");
}
#[tokio::test]
async fn sync_refs_prefix_excludes_non_matching() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/tags/v1", c2).await.unwrap();
let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
.await
.unwrap();
assert_eq!(count, 1);
assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
assert_eq!(
dst.get_ref("refs/tags/v1").await.unwrap(),
None,
"tag ref should NOT have been synced"
);
}
#[tokio::test]
async fn sync_refs_aborts_on_conflict_error() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
let (c3, _) = build_commit(&src, b"c3", vec![]).await;
src.set_ref("refs/heads/alpha", c1).await.unwrap();
src.set_ref("refs/heads/beta", c2).await.unwrap();
transfer_objects(&src, &dst, c3).await.unwrap();
dst.set_ref("refs/heads/alpha", c3).await.unwrap();
let result = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Reject).await;
assert!(result.is_err(), "should abort when a ref conflicts under Reject");
}
#[tokio::test]
async fn sync_ref_fast_forward_full_flow() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
let created = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Overwrite)
.await
.unwrap();
assert!(created, "initial sync should report updated");
src.set_ref("refs/heads/main", c2).await.unwrap();
let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
.await
.unwrap();
assert!(updated, "fast-forward should report updated");
assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c2));
assert!(dst.contains(&c1).await.unwrap());
}
#[tokio::test]
async fn sync_transfer_deduplicates_shared_subtree() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
let first = transfer_objects(&src, &dst, c1).await.unwrap();
assert_eq!(first, 5);
let second = transfer_objects(&src, &dst, c2).await.unwrap();
assert_eq!(second, 5, "should only transfer c2's new objects, not c1's");
}
#[tokio::test]
async fn sync_ref_creates_new_ref_on_empty_dst() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &FastForwardOnly)
.await
.unwrap();
assert!(updated, "creating a new ref should report updated");
assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
assert!(dst.contains(&c1).await.unwrap(), "objects should be transferred");
}
struct Skip;
#[async_trait]
impl RefConflict for Skip {
async fn resolve(
&self,
_store: &dyn ObjectStore,
_ref_name: &str,
_src_hash: ContentHash,
_dst_hash: ContentHash,
) -> Result<bool> {
Ok(false)
}
}
#[tokio::test]
async fn sync_ref_skip_leaves_dst_ref_unchanged() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
dst.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/main", c2).await.unwrap();
let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Skip)
.await
.unwrap();
assert!(!updated, "skip should report false");
assert_eq!(
dst.get_ref("refs/heads/main").await.unwrap(),
Some(c1),
"ref should remain at c1 after skip"
);
}
#[tokio::test]
async fn sync_ref_skip_still_transfers_objects() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
dst.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/main", c2).await.unwrap();
let updated = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Skip)
.await
.unwrap();
assert!(!updated, "skip should not report updated");
assert!(
dst.contains(&c2).await.unwrap(),
"c2 objects should be on dst even after skip"
);
}
#[tokio::test]
async fn sync_transfer_follows_merge_commit_parents() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![c1]).await;
let (c3, _) = build_commit(&src, b"c3", vec![c1]).await;
let (c4, _) = build_commit(&src, b"c4", vec![c2, c3]).await;
let count = transfer_objects(&src, &dst, c4).await.unwrap();
assert_eq!(count, 20);
assert!(dst.contains(&c1).await.unwrap(), "root c1 missing");
assert!(dst.contains(&c2).await.unwrap(), "left parent c2 missing");
assert!(dst.contains(&c3).await.unwrap(), "right parent c3 missing");
assert!(dst.contains(&c4).await.unwrap(), "merge c4 missing");
}
#[tokio::test]
async fn sync_ref_reject_still_leaves_objects_on_dst() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
transfer_objects(&src, &dst, c1).await.unwrap();
dst.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/main", c2).await.unwrap();
let result = sync_ref(&src, &src, &dst, &dst, "refs/heads/main", &Reject).await;
assert!(result.is_err());
assert!(
dst.contains(&c2).await.unwrap(),
"objects should be on dst despite rejection"
);
assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
}
#[tokio::test]
async fn sync_refs_excludes_skipped_from_count() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/feature", c2).await.unwrap();
let (c3, _) = build_commit(&dst, b"c3", vec![]).await;
dst.set_ref("refs/heads/main", c3).await.unwrap();
let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Skip)
.await
.unwrap();
assert_eq!(count, 1, "count should exclude skipped refs");
assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c3));
assert_eq!(dst.get_ref("refs/heads/feature").await.unwrap(), Some(c2));
}
#[tokio::test]
async fn sync_refs_empty_prefix_matches_all() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/tags/v1", c2).await.unwrap();
let count = sync_refs(&src, &src, &dst, &dst, "", &Overwrite)
.await
.unwrap();
assert_eq!(count, 2);
assert_eq!(dst.get_ref("refs/heads/main").await.unwrap(), Some(c1));
assert_eq!(dst.get_ref("refs/tags/v1").await.unwrap(), Some(c2));
}
#[tokio::test]
async fn sync_refs_excludes_up_to_date_from_count() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
let (c2, _) = build_commit(&src, b"c2", vec![]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
src.set_ref("refs/heads/feature", c2).await.unwrap();
let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
.await
.unwrap();
assert_eq!(count, 2);
let count = sync_refs(&src, &src, &dst, &dst, "refs/heads/", &Overwrite)
.await
.unwrap();
assert_eq!(count, 0, "nothing changed, count should be 0");
}
#[tokio::test]
async fn sync_refs_no_matching_refs_returns_zero() {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let (c1, _) = build_commit(&src, b"c1", vec![]).await;
src.set_ref("refs/heads/main", c1).await.unwrap();
let count = sync_refs(&src, &src, &dst, &dst, "refs/remotes/", &Overwrite)
.await
.unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn regression_transfer_with_former_hash_collision_dag() {
use crate::object::*;
let seed = ContentHash::from_canonical(b"regression-seed");
let mut hi: u32 = 0;
let mut next_h = || {
let mut input = seed.0.to_vec();
input.extend_from_slice(&hi.to_le_bytes());
hi += 1;
ContentHash::from_canonical(&input)
};
let mut objects = Vec::new();
let mut leaf_hashes = Vec::new();
for _ in 0..4 {
let h = next_h();
objects.push((h, Object::Text(TextObject { content: String::new() })));
leaf_hashes.push(h);
}
let inner1_h = next_h();
objects.push((
inner1_h,
Object::Element(ElementObject {
local_name: "a".into(),
namespace_uri: None,
namespace_prefix: None,
extra_namespaces: vec![],
attributes: vec![],
children: vec![leaf_hashes[0], leaf_hashes[1]],
inclusive_hash: inner1_h,
}),
));
let inner2_h = next_h();
objects.push((
inner2_h,
Object::Element(ElementObject {
local_name: "b".into(),
namespace_uri: None,
namespace_prefix: None,
extra_namespaces: vec![],
attributes: vec![],
children: vec![leaf_hashes[2], leaf_hashes[3]],
inclusive_hash: inner2_h,
}),
));
let root_h = next_h();
objects.push((
root_h,
Object::Element(ElementObject {
local_name: "root".into(),
namespace_uri: None,
namespace_prefix: None,
extra_namespaces: vec![],
attributes: vec![],
children: vec![inner1_h, inner2_h],
inclusive_hash: root_h,
}),
));
let doc_h = next_h();
objects.push((
doc_h,
Object::Document(DocumentObject {
root: root_h,
prologue: vec![],
}),
));
let unique: std::collections::HashSet<_> = objects.iter().map(|(h, _)| *h).collect();
assert_eq!(
unique.len(),
objects.len(),
"all hashes must be unique (this was the bug)"
);
let src = MemoryStore::new();
let dst = MemoryStore::new();
let mut tx = src.transaction().await.unwrap();
for (h, o) in &objects {
tx.put(*h, o.clone()).await.unwrap();
}
tx.commit().await.unwrap();
transfer_objects(&src, &dst, doc_h).await.unwrap();
for (h, _) in &objects {
assert!(
dst.contains(h).await.unwrap(),
"hash {h} should be on dst after transfer"
);
let src_obj = src.get(h).await.unwrap();
let dst_obj = dst.get(h).await.unwrap();
assert_eq!(src_obj, dst_obj, "object at {h} should match");
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_transfer_idempotent((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
let rt = crate::store::prop_strategies::runtime();
rt.block_on(async {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let mut tx = src.transaction().await.unwrap();
for (h, o) in &dag {
tx.put(*h, o.clone()).await.unwrap();
}
tx.commit().await.unwrap();
let first = transfer_objects(&src, &dst, root).await.unwrap();
assert!(first > 0, "first transfer should copy objects");
let second = transfer_objects(&src, &dst, root).await.unwrap();
assert_eq!(second, 0, "second transfer should be a no-op");
});
}
#[test]
fn prop_transfer_complete((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
let rt = crate::store::prop_strategies::runtime();
rt.block_on(async {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let mut tx = src.transaction().await.unwrap();
for (h, o) in &dag {
tx.put(*h, o.clone()).await.unwrap();
}
tx.commit().await.unwrap();
transfer_objects(&src, &dst, root).await.unwrap();
for (h, _) in &dag {
assert!(
dst.contains(h).await.unwrap(),
"hash {h} should be on dst after transfer"
);
}
});
}
#[test]
fn prop_transfer_preserves((dag, root) in crate::store::prop_strategies::arb_object_dag()) {
let rt = crate::store::prop_strategies::runtime();
rt.block_on(async {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let mut tx = src.transaction().await.unwrap();
for (h, o) in &dag {
tx.put(*h, o.clone()).await.unwrap();
}
tx.commit().await.unwrap();
transfer_objects(&src, &dst, root).await.unwrap();
for (h, _) in &dag {
let src_obj = src.get(h).await.unwrap();
let dst_obj = dst.get(h).await.unwrap();
assert_eq!(
src_obj, dst_obj,
"object at {h} should be identical on src and dst"
);
}
});
}
#[test]
fn prop_transfer_commit_dag((dag, root, _order) in crate::store::prop_strategies::arb_commit_dag()) {
let rt = crate::store::prop_strategies::runtime();
rt.block_on(async {
let src = MemoryStore::new();
let dst = MemoryStore::new();
let mut tx = src.transaction().await.unwrap();
for (h, o) in &dag {
tx.put(*h, o.clone()).await.unwrap();
}
tx.commit().await.unwrap();
let first = transfer_objects(&src, &dst, root).await.unwrap();
assert!(first > 0, "first transfer should copy objects");
let second = transfer_objects(&src, &dst, root).await.unwrap();
assert_eq!(second, 0, "second transfer should be a no-op");
let src_reachable: Vec<_> = src.subtree(&root)
.map(|r| r.unwrap())
.collect()
.await;
for (h, src_obj) in &src_reachable {
assert!(
dst.contains(h).await.unwrap(),
"hash {h} should be on dst after commit-dag transfer"
);
let dst_obj = dst.get(h).await.unwrap();
assert_eq!(
dst_obj.as_ref(), Some(src_obj),
"object at {h} should be identical on src and dst"
);
}
});
}
}
}