use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use pacr_types::{CausalId, PacrRecord};
use crate::{CausalDag, DagError};
#[derive(Debug, Clone)]
pub struct MergeResult {
pub records_merged: usize,
pub records_skipped: usize,
pub orphans_deferred: Vec<Arc<PacrRecord>>,
pub remote_tips: Vec<CausalId>,
}
pub fn merge_remote(dag: &CausalDag, remote: &[Arc<PacrRecord>]) -> MergeResult {
if remote.is_empty() {
return MergeResult {
records_merged: 0,
records_skipped: 0,
orphans_deferred: vec![],
remote_tips: vec![],
};
}
let mut records_skipped = 0usize;
let mut to_merge: Vec<Arc<PacrRecord>> = Vec::with_capacity(remote.len());
for rec in remote {
if dag.contains(&rec.id) {
records_skipped += 1;
} else {
to_merge.push(Arc::clone(rec));
}
}
let sorted = topological_sort(&to_merge);
let mut records_merged = 0usize;
let mut orphans_deferred: Vec<Arc<PacrRecord>> = Vec::new();
let mut merged_ids: HashSet<CausalId> = HashSet::new();
for rec in &sorted {
match dag.append((*rec).as_ref().clone()) {
Ok(_) => {
records_merged += 1;
merged_ids.insert(rec.id);
}
Err(DagError::DuplicateId(_)) => {
records_skipped += 1;
}
Err(DagError::MissingPredecessor { .. } | DagError::SelfReference(_)) => {
orphans_deferred.push(Arc::clone(rec));
}
}
}
let all_remote_ids: HashSet<CausalId> = remote.iter().map(|r| r.id).collect();
let mut referenced_as_predecessor: HashSet<CausalId> = HashSet::new();
for rec in remote {
for pred in &rec.predecessors {
if all_remote_ids.contains(pred) {
referenced_as_predecessor.insert(*pred);
}
}
}
let remote_tips: Vec<CausalId> = merged_ids
.iter()
.filter(|id| !referenced_as_predecessor.contains(id))
.copied()
.collect();
MergeResult {
records_merged,
records_skipped,
orphans_deferred,
remote_tips,
}
}
pub fn topological_sort(records: &[Arc<PacrRecord>]) -> Vec<Arc<PacrRecord>> {
if records.is_empty() {
return vec![];
}
let id_set: HashSet<CausalId> = records.iter().map(|r| r.id).collect();
let mut in_degree: HashMap<CausalId, usize> = HashMap::with_capacity(records.len());
let mut reverse_adj: HashMap<CausalId, Vec<CausalId>> = HashMap::with_capacity(records.len());
for rec in records {
in_degree.entry(rec.id).or_insert(0);
}
for rec in records {
for pred in &rec.predecessors {
if pred.is_genesis() || !id_set.contains(pred) {
continue;
}
*in_degree.entry(rec.id).or_insert(0) += 1;
reverse_adj.entry(*pred).or_default().push(rec.id);
}
}
let record_map: HashMap<CausalId, Arc<PacrRecord>> =
records.iter().map(|r| (r.id, Arc::clone(r))).collect();
let mut queue: VecDeque<CausalId> = in_degree
.iter()
.filter(|(_, °)| deg == 0)
.map(|(&id, _)| id)
.collect();
let mut queue_vec: Vec<CausalId> = queue.drain(..).collect();
queue_vec.sort_unstable();
queue.extend(queue_vec);
let mut sorted: Vec<Arc<PacrRecord>> = Vec::with_capacity(records.len());
let mut visited: HashSet<CausalId> = HashSet::new();
while let Some(id) = queue.pop_front() {
if visited.contains(&id) {
continue;
}
visited.insert(id);
if let Some(rec) = record_map.get(&id) {
sorted.push(Arc::clone(rec));
}
if let Some(dependents) = reverse_adj.get(&id) {
let mut newly_ready: Vec<CausalId> = Vec::new();
for &dep_id in dependents {
if visited.contains(&dep_id) {
continue;
}
let deg = in_degree.entry(dep_id).or_insert(0);
if *deg > 0 {
*deg -= 1;
}
if *deg == 0 {
newly_ready.push(dep_id);
}
}
newly_ready.sort_unstable();
for id in newly_ready {
queue.push_back(id);
}
}
}
for rec in records {
if !visited.contains(&rec.id) {
sorted.push(Arc::clone(rec));
}
}
sorted
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use pacr_types::{CognitiveSplit, Estimate, PacrBuilder, ResourceTriple};
fn make_record(id: u128, preds: &[u128]) -> Arc<PacrRecord> {
let preds_sv: smallvec::SmallVec<[CausalId; 4]> =
preds.iter().map(|&p| CausalId(p)).collect();
Arc::new(
PacrBuilder::new()
.id(CausalId(id))
.predecessors(preds_sv)
.landauer_cost(Estimate::exact(1e-20))
.resources(ResourceTriple {
energy: Estimate::exact(1e-16),
time: Estimate::exact(1e-6),
space: Estimate::exact(0.0),
})
.cognitive_split(CognitiveSplit {
statistical_complexity: Estimate::exact(0.5),
entropy_rate: Estimate::exact(0.3),
})
.payload(Bytes::new())
.build()
.unwrap(),
)
}
#[test]
fn empty_remote_returns_empty_result() {
let dag = CausalDag::new();
let result = merge_remote(&dag, &[]);
assert_eq!(result.records_merged, 0);
assert_eq!(result.records_skipped, 0);
assert!(result.orphans_deferred.is_empty());
assert!(result.remote_tips.is_empty());
}
#[test]
fn normal_merge_inserts_records() {
let r1 = make_record(1, &[0]); let r2 = make_record(2, &[1]);
let r3 = make_record(3, &[2]);
let dag = CausalDag::new();
let result = merge_remote(&dag, &[r1, r2, r3]);
assert_eq!(result.records_merged, 3);
assert_eq!(result.records_skipped, 0);
assert!(result.orphans_deferred.is_empty());
assert!(dag.contains(&CausalId(1)));
assert!(dag.contains(&CausalId(2)));
assert!(dag.contains(&CausalId(3)));
}
#[test]
fn deferred_chain_handles_missing_predecessor() {
let r2 = make_record(2, &[999]);
let dag = CausalDag::new();
let result = merge_remote(&dag, &[r2]);
assert_eq!(result.records_merged, 0);
assert_eq!(result.records_skipped, 0);
assert_eq!(result.orphans_deferred.len(), 1);
assert_eq!(result.orphans_deferred[0].id, CausalId(2));
}
#[test]
fn duplicate_records_skipped() {
let dag = CausalDag::new();
let r1_local = make_record(1, &[0]);
dag.append((*r1_local).clone()).unwrap();
let r1_remote = make_record(1, &[0]);
let r2 = make_record(2, &[1]);
let result = merge_remote(&dag, &[r1_remote, r2]);
assert_eq!(
result.records_skipped, 1,
"R1 should be skipped (already in DAG)"
);
assert_eq!(result.records_merged, 1, "R2 should be merged");
}
#[test]
fn topological_sort_orders_dependents_after_parents() {
let r3 = make_record(3, &[2]);
let r2 = make_record(2, &[1]);
let r1 = make_record(1, &[0]);
let sorted = topological_sort(&[r3, r2, r1]);
assert_eq!(sorted.len(), 3);
let pos = |id: u128| {
sorted
.iter()
.position(|r| r.id == CausalId(id))
.expect("record must be present")
};
assert!(pos(1) < pos(2), "R1 must precede R2");
assert!(pos(2) < pos(3), "R2 must precede R3");
}
#[test]
fn remote_tips_identified_correctly() {
let r1 = make_record(1, &[0]);
let r2 = make_record(2, &[1]);
let r3 = make_record(3, &[1]);
let dag = CausalDag::new();
let result = merge_remote(&dag, &[r1, r2, r3]);
assert_eq!(result.records_merged, 3);
assert!(
result.remote_tips.contains(&CausalId(2)),
"R2 should be a tip"
);
assert!(
result.remote_tips.contains(&CausalId(3)),
"R3 should be a tip"
);
assert!(
!result.remote_tips.contains(&CausalId(1)),
"R1 is not a tip (it is referenced as predecessor)"
);
}
#[test]
fn reunion_candidate_tips_do_not_include_non_remote() {
let dag = CausalDag::new();
let l1 = make_record(10, &[0]);
dag.append((*l1).clone()).unwrap();
let r1 = make_record(20, &[10]); let r2 = make_record(30, &[20]); let result = merge_remote(&dag, &[r1, r2]);
assert_eq!(result.records_merged, 2);
assert!(
result.remote_tips.contains(&CausalId(30)),
"R2 should be a tip"
);
assert!(
!result.remote_tips.contains(&CausalId(20)),
"R1 should not be a tip (it is a predecessor of R2)"
);
assert!(
!result.remote_tips.contains(&CausalId(10)),
"L1 must not appear in remote_tips (not in remote batch)"
);
}
}