use tracing::debug;
use triblespace_core::blob::encodings::longstring::LongString;
use triblespace_core::blob::encodings::simplearchive::SimpleArchive;
use triblespace_core::id::{Id, genid};
use triblespace_core::repo::{BlobStore, BlobStoreGet, BlobStorePut, BranchStore, PushResult, Repository};
use triblespace_core::trible::TribleSet;
use triblespace_core::inline::encodings::time::NsTAIInterval;
use triblespace_core::inline::Inline;
use triblespace_core::inline::encodings::hash::Handle;
use triblespace_core::prelude::inlineencodings::{GenId, ED25519PublicKey};
use triblespace_core::prelude::attributes;
use triblespace_core::macros::{find, pattern, entity};
use crate::channel::PublisherKey;
use crate::protocol::RawHash;
attributes! {
"FD45B98C108B3F9F2D18C0B5373BC9FB" as pub remote_name: Handle<LongString>;
"ACEBAE99F0B5B1E12DAE3FDC1E2BC575" as pub tracking_remote_branch: GenId;
"C52A223988BB237B0859319661DA23F5" as pub tracking_peer: ED25519PublicKey;
}
pub fn is_tracking_branch<S>(store: &mut S, branch_id: Id) -> bool
where
S: BlobStore + BranchStore,
{
let Ok(Some(head_handle)) = store.head(branch_id) else { return false; };
let Ok(reader) = store.reader() else { return false; };
let Ok(meta) = reader.get::<TribleSet, SimpleArchive>(head_handle) else { return false; };
find!(
v: Id,
pattern!(&meta, [{ _?e @ tracking_remote_branch: ?v }])
).next().is_some()
}
#[derive(Debug, Clone)]
pub struct TrackingBranchInfo {
pub local_id: Id,
pub remote_branch_id: Id,
pub remote_name: String,
}
pub fn list_tracking_branches<S>(store: &mut S) -> Vec<TrackingBranchInfo>
where
S: BlobStore + BranchStore,
{
let mut result = Vec::new();
let Ok(iter) = store.branches() else { return result; };
let bids: Vec<Id> = iter.filter_map(|r| r.ok()).collect();
for bid in bids {
let Ok(Some(meta_handle)) = store.head(bid) else { continue; };
let Ok(reader) = store.reader() else { continue; };
let Ok(meta): Result<TribleSet, _> = reader.get(meta_handle) else { continue; };
let Some(remote_branch_id) = find!(
v: Id,
pattern!(&meta, [{ _?e @ tracking_remote_branch: ?v }])
).next() else { continue; };
let Some(name_handle) = find!(
h: Inline<Handle<LongString>>,
pattern!(&meta, [{ _?e @ remote_name: ?h }])
).next() else { continue; };
let Ok(name_view): Result<anybytes::View<str>, _> = reader.get(name_handle) else { continue; };
result.push(TrackingBranchInfo {
local_id: bid,
remote_branch_id,
remote_name: name_view.as_ref().to_string(),
});
}
result
}
pub fn find_tracking_branch<S>(
store: &mut S,
remote_branch_id: Id,
) -> Option<Id>
where
S: BlobStore + BranchStore,
{
list_tracking_branches(store)
.into_iter()
.find(|info| info.remote_branch_id == remote_branch_id)
.map(|info| info.local_id)
}
fn resolve_commit_in_branch_meta<S: BlobStore>(
store: &mut S,
branch_meta_hash: &RawHash,
) -> Option<Inline<Handle<SimpleArchive>>> {
let reader = store.reader().ok()?;
let meta_handle = Inline::<Handle<SimpleArchive>>::new(*branch_meta_hash);
let meta: TribleSet = reader.get(meta_handle).ok()?;
find!(
h: Inline<Handle<SimpleArchive>>,
pattern!(&meta, [{ _?e @ triblespace_core::repo::head: ?h }])
).next()
}
fn read_updated_at<S: BlobStore>(
store: &mut S,
branch_meta_hash: &RawHash,
) -> Option<Inline<NsTAIInterval>> {
let reader = store.reader().ok()?;
let meta_handle = Inline::<Handle<SimpleArchive>>::new(*branch_meta_hash);
let meta: TribleSet = reader.get(meta_handle).ok()?;
find!(
ts: Inline<NsTAIInterval>,
pattern!(&meta, [{ _?e @ triblespace_core::metadata::updated_at: ?ts }])
).next()
}
fn is_newer(new: Inline<NsTAIInterval>, current: Inline<NsTAIInterval>) -> bool {
let Ok((new_ns, _)): Result<(i128, i128), _> = new.try_from_inline() else {
return false;
};
let Ok((current_ns, _)): Result<(i128, i128), _> = current.try_from_inline() else {
return false;
};
new_ns > current_ns
}
pub fn create_tracking_branch<S>(
store: &mut S,
remote_branch_id: Id,
remote_head_hash: &RawHash,
remote_name_str: &str,
publisher: &PublisherKey,
) -> Option<Id>
where
S: BlobStore + BlobStorePut + BranchStore,
{
let commit_handle = resolve_commit_in_branch_meta(store, remote_head_hash)?;
let remote_updated_at = read_updated_at(store, remote_head_hash);
let tracking_id: Id = *genid();
let name_string = remote_name_str.to_string();
let name_handle: Inline<Handle<LongString>> =
store.put::<LongString, String>(name_string).ok()?;
let pub_key = ed25519_dalek::VerifyingKey::from_bytes(publisher).ok()?;
let meta_set: TribleSet = entity! {
triblespace_core::repo::branch: tracking_id,
triblespace_core::repo::head: commit_handle,
remote_name: name_handle,
tracking_remote_branch: remote_branch_id,
tracking_peer: pub_key,
triblespace_core::metadata::updated_at?: remote_updated_at,
}
.into();
let meta_handle: Inline<Handle<SimpleArchive>> = store.put(meta_set).ok()?;
match store.update(tracking_id, None, Some(meta_handle)).ok()? {
PushResult::Success() => Some(tracking_id),
PushResult::Conflict(_) => None,
}
}
pub fn update_tracking_branch<S>(
store: &mut S,
tracking_branch_id: Id,
remote_branch_id: Id,
new_head_hash: &RawHash,
remote_name_str: &str,
publisher: &PublisherKey,
) -> Option<()>
where
S: BlobStore + BlobStorePut + BranchStore,
{
let old_meta = store.head(tracking_branch_id).ok()??;
let new_ts = read_updated_at(store, new_head_hash);
let current_ts = read_updated_at(store, &old_meta.raw);
if let (Some(current), Some(new)) = (current_ts, new_ts) {
if !is_newer(new, current) {
debug!(
branch = %hex::encode(&remote_branch_id.raw()[..4]),
"tracking: skip stale update (incoming ts ≤ current)"
);
return None;
}
}
let commit_handle = resolve_commit_in_branch_meta(store, new_head_hash)?;
let name_string = remote_name_str.to_string();
let name_handle: Inline<Handle<LongString>> =
store.put::<LongString, String>(name_string).ok()?;
let pub_key = ed25519_dalek::VerifyingKey::from_bytes(publisher).ok()?;
let meta_set: TribleSet = entity! {
triblespace_core::repo::branch: tracking_branch_id,
triblespace_core::repo::head: commit_handle,
remote_name: name_handle,
tracking_remote_branch: remote_branch_id,
tracking_peer: pub_key,
triblespace_core::metadata::updated_at?: new_ts,
}
.into();
let meta_handle: Inline<Handle<SimpleArchive>> = store.put(meta_set).ok()?;
match store.update(tracking_branch_id, Some(old_meta), Some(meta_handle)).ok()? {
PushResult::Success() => Some(()),
PushResult::Conflict(_) => None,
}
}
pub fn ensure_tracking_branch<S>(
store: &mut S,
remote_branch_id: Id,
remote_head_hash: &RawHash,
remote_name_str: &str,
publisher: &PublisherKey,
) -> Option<Id>
where
S: BlobStore + BlobStorePut + BranchStore,
{
if let Some(tracking_id) = find_tracking_branch(store, remote_branch_id) {
update_tracking_branch(store, tracking_id, remote_branch_id, remote_head_hash, remote_name_str, publisher);
Some(tracking_id)
} else {
create_tracking_branch(store, remote_branch_id, remote_head_hash, remote_name_str, publisher)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MergeOutcome {
Empty,
UpToDate,
Merged { new_head: Inline<Handle<SimpleArchive>> },
}
pub fn merge_tracking_into_local<S>(
repo: &mut Repository<S>,
tracking_id: Id,
local_name: &str,
) -> anyhow::Result<MergeOutcome>
where
S: BlobStore + BlobStorePut + BranchStore,
{
let local_id = repo
.ensure_branch(local_name, None)
.map_err(|_| anyhow::anyhow!("ensure branch '{local_name}'"))?;
let remote_ws = repo
.pull(tracking_id)
.map_err(|_| anyhow::anyhow!("pull tracking branch"))?;
let Some(remote_commit) = remote_ws.head() else {
return Ok(MergeOutcome::Empty);
};
let mut local_ws = repo
.pull(local_id)
.map_err(|_| anyhow::anyhow!("pull local branch"))?;
let prev_head = local_ws.head();
let new_head = local_ws
.merge_commit(remote_commit)
.map_err(|e| anyhow::anyhow!("merge: {e:?}"))?;
if Some(new_head) == prev_head {
return Ok(MergeOutcome::UpToDate);
}
repo.push(&mut local_ws)
.map_err(|_| anyhow::anyhow!("push merged branch"))?;
Ok(MergeOutcome::Merged { new_head })
}
#[cfg(test)]
mod tests {
use super::*;
use ed25519_dalek::SigningKey;
use triblespace_core::blob::Blob;
use triblespace_core::id::genid;
use triblespace_core::repo::memoryrepo::MemoryRepo;
fn test_repo() -> Repository<MemoryRepo> {
let signing_key = SigningKey::from_bytes(&[7u8; 32]);
let store = MemoryRepo::default();
Repository::new(store, signing_key, TribleSet::new()).unwrap()
}
#[test]
fn merge_tracking_ff_into_empty_local() {
let mut repo = test_repo();
let source_id = repo.ensure_branch("source", None).unwrap();
let mut src_ws = repo.pull(source_id).unwrap();
src_ws.commit(TribleSet::new(), "remote commit");
let source_head = src_ws.head().unwrap();
repo.push(&mut src_ws).unwrap();
let outcome = merge_tracking_into_local(&mut repo, source_id, "main").unwrap();
assert_eq!(outcome, MergeOutcome::Merged { new_head: source_head });
let main_id = repo.lookup_branch("main").unwrap().expect("main exists");
let main_ws = repo.pull(main_id).unwrap();
assert_eq!(main_ws.head(), Some(source_head));
}
#[test]
fn merge_tracking_up_to_date_is_noop() {
let mut repo = test_repo();
let source_id = repo.ensure_branch("source", None).unwrap();
let mut src_ws = repo.pull(source_id).unwrap();
src_ws.commit(TribleSet::new(), "shared commit");
let shared_head = src_ws.head().unwrap();
repo.push(&mut src_ws).unwrap();
let _ = merge_tracking_into_local(&mut repo, source_id, "main").unwrap();
let outcome = merge_tracking_into_local(&mut repo, source_id, "main").unwrap();
assert_eq!(outcome, MergeOutcome::UpToDate);
let main_id = repo.lookup_branch("main").unwrap().unwrap();
let main_ws = repo.pull(main_id).unwrap();
assert_eq!(main_ws.head(), Some(shared_head));
}
#[test]
fn merge_tracking_divergent_produces_merge_commit() {
let mut repo = test_repo();
let main_id = repo.ensure_branch("main", None).unwrap();
let mut main_ws = repo.pull(main_id).unwrap();
main_ws.commit(TribleSet::new(), "local commit");
let commit_a = main_ws.head().unwrap();
repo.push(&mut main_ws).unwrap();
let source_id = repo.ensure_branch("source", None).unwrap();
let mut src_ws = repo.pull(source_id).unwrap();
src_ws.commit(TribleSet::new(), "remote commit");
let commit_b = src_ws.head().unwrap();
repo.push(&mut src_ws).unwrap();
let outcome = merge_tracking_into_local(&mut repo, source_id, "main").unwrap();
let merge_head = match outcome {
MergeOutcome::Merged { new_head } => new_head,
other => panic!("expected Merged, got {other:?}"),
};
assert_ne!(merge_head, commit_a, "merge commit must advance past local");
assert_ne!(merge_head, commit_b, "merge commit must not just fast-forward to remote");
let mut main_ws = repo.pull(main_id).unwrap();
assert_eq!(main_ws.head(), Some(merge_head));
use triblespace_core::repo::CommitSelector;
let ancestor_set = triblespace_core::repo::ancestors(merge_head)
.select(&mut main_ws)
.expect("ancestors walk");
assert!(ancestor_set.get(&commit_a.raw).is_some(), "commit_a in ancestry");
assert!(ancestor_set.get(&commit_b.raw).is_some(), "commit_b in ancestry");
}
#[test]
fn merge_tracking_empty_source_is_empty_outcome() {
let mut repo = test_repo();
let source_id = repo.ensure_branch("source", None).unwrap();
let outcome = merge_tracking_into_local(&mut repo, source_id, "main").unwrap();
assert_eq!(outcome, MergeOutcome::Empty);
let main_id = repo.lookup_branch("main").unwrap().expect("main created");
let main_ws = repo.pull(main_id).unwrap();
assert_eq!(main_ws.head(), None);
}
#[test]
fn find_tracking_branch_roundtrips() {
let mut store = MemoryRepo::default();
use triblespace_core::repo::branch::branch_unsigned;
use triblespace_core::blob::IntoBlob;
use triblespace_core::blob::encodings::longstring::LongString;
let name_blob = "remote-branch".to_string().to_blob();
let name_handle: Inline<Handle<LongString>> = store.put(name_blob).unwrap();
let remote_branch_id = genid();
let commit_meta: TribleSet = TribleSet::new();
let commit_blob: Blob<SimpleArchive> = commit_meta.to_blob();
let commit_handle = store.put::<SimpleArchive, _>(commit_blob.clone()).unwrap();
let remote_meta = branch_unsigned(*remote_branch_id, name_handle, Some(commit_blob), None);
let remote_meta_handle = store.put::<SimpleArchive, _>(remote_meta).unwrap();
let publisher = [0u8; 32];
let remote_head_hash: RawHash = remote_meta_handle.raw;
let tracking_id = create_tracking_branch(
&mut store, *remote_branch_id, &remote_head_hash, "remote-branch", &publisher,
).expect("create");
let found = find_tracking_branch(&mut store, *remote_branch_id);
assert_eq!(found, Some(tracking_id), "should find the tracking branch we just created");
assert!(is_tracking_branch(&mut store, tracking_id));
let same = ensure_tracking_branch(
&mut store, *remote_branch_id, &remote_head_hash, "remote-branch", &publisher,
);
assert_eq!(same, Some(tracking_id), "ensure should return the existing tracking branch");
let mut store2 = store;
let reader = store2.reader().unwrap();
let track_meta_handle = store2.head(tracking_id).unwrap().unwrap();
let track_meta: TribleSet = reader.get(track_meta_handle).unwrap();
let track_head: Inline<Handle<SimpleArchive>> = find!(
h: Inline<Handle<SimpleArchive>>,
pattern!(&track_meta, [{ _?e @ triblespace_core::repo::head: ?h }])
).next().expect("tracking branch should have a head");
assert_eq!(track_head, commit_handle,
"tracking branch head should be the inner commit, not the branch metadata blob");
}
}