use std::collections::BTreeMap;
use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use crate::crypto::PublicKey;
use crate::linked_data::{BlockEncoded, DagCborCodec, Link};
use super::conflict::{
operations_conflict, Conflict, ConflictResolver, MergeResult, Resolution, ResolvedConflict,
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum OpType {
Add,
Remove,
Mkdir,
Mv {
from: PathBuf,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct OpId {
pub timestamp: u64,
pub peer_id: PublicKey,
}
impl PartialOrd for OpId {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for OpId {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
match self.timestamp.cmp(&other.timestamp) {
std::cmp::Ordering::Equal => self.peer_id.cmp(&other.peer_id),
ord => ord,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PathOperation {
pub id: OpId,
pub op_type: OpType,
pub path: PathBuf,
pub content_link: Option<Link>,
pub is_dir: bool,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct PathOpLog {
operations: BTreeMap<OpId, PathOperation>,
#[serde(skip)]
local_clock: u64,
}
impl BlockEncoded<DagCborCodec> for PathOpLog {}
impl PathOpLog {
pub fn new() -> Self {
Self::default()
}
pub fn from_operation(op: &PathOperation) -> Self {
let mut log = Self::new();
log.operations.insert(op.id.clone(), op.clone());
log.local_clock = op.id.timestamp;
log
}
pub fn rebuild_clock(&mut self) {
self.local_clock = self
.operations
.keys()
.map(|id| id.timestamp)
.max()
.unwrap_or(0);
}
pub fn record(
&mut self,
peer_id: PublicKey,
op_type: OpType,
path: impl Into<PathBuf>,
content_link: Option<Link>,
is_dir: bool,
) -> OpId {
self.local_clock += 1;
let id = OpId {
timestamp: self.local_clock,
peer_id,
};
let op = PathOperation {
id: id.clone(),
op_type,
path: path.into(),
content_link,
is_dir,
};
self.operations.insert(id.clone(), op);
id
}
pub fn merge(&mut self, other: &PathOpLog) -> usize {
let mut added = 0;
for (id, op) in &other.operations {
if !self.operations.contains_key(id) {
self.operations.insert(id.clone(), op.clone());
added += 1;
if id.timestamp >= self.local_clock {
self.local_clock = id.timestamp + 1;
}
}
}
added
}
pub fn merge_with_resolver(
&mut self,
other: &PathOpLog,
resolver: &dyn ConflictResolver,
local_peer: &PublicKey,
) -> MergeResult {
let mut result = MergeResult::new();
let mut conflicts_by_path: BTreeMap<PathBuf, Vec<(&OpId, &PathOperation)>> =
BTreeMap::new();
for (id, op) in &other.operations {
if self.operations.contains_key(id) {
continue;
}
let has_conflict = self
.operations
.values()
.any(|existing| operations_conflict(existing, op));
if has_conflict {
conflicts_by_path
.entry(op.path.clone())
.or_default()
.push((id, op));
}
}
for (id, op) in &other.operations {
if self.operations.contains_key(id) {
continue;
}
if id.timestamp >= self.local_clock {
self.local_clock = id.timestamp + 1;
}
let conflicting_base = self
.operations
.values()
.find(|existing| operations_conflict(existing, op));
match conflicting_base {
Some(base) => {
let conflict = Conflict::new(op.path.clone(), base.clone(), op.clone());
let resolution = resolver.resolve(&conflict, local_peer);
match resolution {
Resolution::UseBase => {
result.conflicts_resolved.push(ResolvedConflict {
conflict,
resolution,
});
}
Resolution::UseIncoming => {
self.operations.insert(id.clone(), op.clone());
result.operations_added += 1;
result.conflicts_resolved.push(ResolvedConflict {
conflict,
resolution,
});
}
Resolution::KeepBoth => {
self.operations.insert(id.clone(), op.clone());
result.operations_added += 1;
result.unresolved_conflicts.push(conflict);
}
Resolution::SkipBoth => {
result.conflicts_resolved.push(ResolvedConflict {
conflict,
resolution,
});
}
Resolution::RenameIncoming { new_path } => {
let mut renamed_op = op.clone();
renamed_op.path = new_path.clone();
self.operations.insert(id.clone(), renamed_op);
result.operations_added += 1;
result.conflicts_resolved.push(ResolvedConflict {
conflict,
resolution: Resolution::RenameIncoming { new_path },
});
}
}
}
None => {
self.operations.insert(id.clone(), op.clone());
result.operations_added += 1;
}
}
}
result
}
pub fn operations(&self) -> &BTreeMap<OpId, PathOperation> {
&self.operations
}
pub fn resolve_path(&self, path: impl AsRef<std::path::Path>) -> Option<&PathOperation> {
let path = path.as_ref();
let path_ops: Vec<&PathOperation> = self
.operations
.values()
.filter(|op| op.path == path)
.collect();
if path_ops.is_empty() {
return None;
}
path_ops.into_iter().max_by_key(|op| &op.id)
}
pub fn resolve_all(&self) -> BTreeMap<PathBuf, &PathOperation> {
let mut result: BTreeMap<PathBuf, &PathOperation> = BTreeMap::new();
let mut by_path: BTreeMap<&PathBuf, Vec<&PathOperation>> = BTreeMap::new();
for op in self.operations.values() {
by_path.entry(&op.path).or_default().push(op);
}
for (path, ops) in by_path {
if let Some(winner) = ops.into_iter().max_by_key(|op| &op.id) {
if !matches!(winner.op_type, OpType::Remove) {
result.insert(path.clone(), winner);
}
}
}
result
}
pub fn len(&self) -> usize {
self.operations.len()
}
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
pub fn clear_preserving_clock(&mut self) {
self.operations.clear();
}
pub fn ops_for_path(&self, path: impl AsRef<std::path::Path>) -> Vec<&PathOperation> {
let path = path.as_ref();
self.operations
.values()
.filter(|op| op.path == path)
.collect()
}
pub fn ops_in_order(&self) -> impl Iterator<Item = &PathOperation> {
self.operations.values()
}
}
pub fn merge_logs<R: ConflictResolver>(
logs: &[&PathOpLog],
resolver: &R,
local_peer: &PublicKey,
) -> (PathOpLog, Vec<MergeResult>) {
if logs.is_empty() {
return (PathOpLog::new(), Vec::new());
}
let mut merged = logs[0].clone();
let mut results = Vec::with_capacity(logs.len().saturating_sub(1));
for log in logs.iter().skip(1) {
let result = merged.merge_with_resolver(log, resolver, local_peer);
results.push(result);
}
(merged, results)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::SecretKey;
fn make_peer_id(seed: u8) -> PublicKey {
let mut seed_bytes = [0u8; 32];
seed_bytes[0] = seed;
let secret = SecretKey::from(seed_bytes);
secret.public()
}
#[test]
fn test_op_id_ordering() {
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let id1 = OpId {
timestamp: 1,
peer_id: peer1,
};
let id2 = OpId {
timestamp: 2,
peer_id: peer1,
};
let id3 = OpId {
timestamp: 1,
peer_id: peer2,
};
assert!(id2 > id1);
assert!(id3 != id1);
if peer2 > peer1 {
assert!(id3 > id1);
} else {
assert!(id3 < id1);
}
}
#[test]
fn test_record_operation() {
let peer1 = make_peer_id(1);
let mut log = PathOpLog::new();
let id = log.record(peer1, OpType::Add, "file.txt", None, false);
assert_eq!(id.timestamp, 1);
assert_eq!(log.len(), 1);
let op = log.operations.get(&id).unwrap();
assert_eq!(op.path, PathBuf::from("file.txt"));
assert!(matches!(op.op_type, OpType::Add));
}
#[test]
fn test_record_multiple_operations() {
let peer1 = make_peer_id(1);
let mut log = PathOpLog::new();
let id1 = log.record(peer1, OpType::Add, "file1.txt", None, false);
let id2 = log.record(peer1, OpType::Add, "file2.txt", None, false);
let id3 = log.record(peer1, OpType::Remove, "file1.txt", None, false);
assert_eq!(id1.timestamp, 1);
assert_eq!(id2.timestamp, 2);
assert_eq!(id3.timestamp, 3);
assert_eq!(log.len(), 3);
}
#[test]
fn test_merge_logs() {
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file1.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "file2.txt", None, false);
let added = log1.merge(&log2);
assert_eq!(added, 1);
assert_eq!(log1.len(), 2);
}
#[test]
fn test_merge_idempotent() {
let peer1 = make_peer_id(1);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let log1_clone = log1.clone();
let added = log1.merge(&log1_clone);
assert_eq!(added, 0);
assert_eq!(log1.len(), 1);
}
#[test]
fn test_resolve_path_single_op() {
let peer1 = make_peer_id(1);
let mut log = PathOpLog::new();
log.record(peer1, OpType::Add, "file.txt", None, false);
let resolved = log.resolve_path("file.txt");
assert!(resolved.is_some());
assert!(matches!(resolved.unwrap().op_type, OpType::Add));
}
#[test]
fn test_resolve_path_latest_wins() {
let peer1 = make_peer_id(1);
let mut log = PathOpLog::new();
log.record(peer1, OpType::Add, "file.txt", None, false);
log.record(peer1, OpType::Remove, "file.txt", None, false);
let resolved = log.resolve_path("file.txt");
assert!(resolved.is_some());
assert!(matches!(resolved.unwrap().op_type, OpType::Remove));
}
#[test]
fn test_resolve_all_excludes_removed() {
let peer1 = make_peer_id(1);
let mut log = PathOpLog::new();
log.record(peer1, OpType::Add, "file1.txt", None, false);
log.record(peer1, OpType::Add, "file2.txt", None, false);
log.record(peer1, OpType::Remove, "file1.txt", None, false);
let resolved = log.resolve_all();
assert_eq!(resolved.len(), 1);
assert!(resolved.contains_key(&PathBuf::from("file2.txt")));
assert!(!resolved.contains_key(&PathBuf::from("file1.txt")));
}
#[test]
fn test_concurrent_ops_different_peers() {
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Remove, "file.txt", None, false);
log1.merge(&log2);
let resolved = log1.resolve_path("file.txt");
assert!(resolved.is_some());
let winning_op = resolved.unwrap();
if peer2 > peer1 {
assert!(matches!(winning_op.op_type, OpType::Remove));
} else {
assert!(matches!(winning_op.op_type, OpType::Add));
}
}
#[test]
fn test_mv_operation() {
let peer1 = make_peer_id(1);
let mut log = PathOpLog::new();
log.record(peer1, OpType::Add, "old.txt", None, false);
log.record(
peer1,
OpType::Mv {
from: PathBuf::from("old.txt"),
},
"new.txt",
None,
false,
);
assert_eq!(log.len(), 2);
let resolved = log.resolve_path("new.txt");
assert!(resolved.is_some());
assert!(matches!(resolved.unwrap().op_type, OpType::Mv { .. }));
}
#[test]
fn test_serialization_roundtrip() {
use crate::linked_data::BlockEncoded;
let peer1 = make_peer_id(1);
let mut log = PathOpLog::new();
log.record(peer1, OpType::Add, "file1.txt", None, false);
log.record(peer1, OpType::Mkdir, "dir", None, true);
log.record(
peer1,
OpType::Mv {
from: PathBuf::from("file1.txt"),
},
"dir/file1.txt",
None,
false,
);
let encoded = log.encode().unwrap();
let decoded = PathOpLog::decode(&encoded).unwrap();
assert_eq!(log.operations, decoded.operations);
}
#[test]
fn test_merge_with_resolver_no_conflicts() {
use super::super::conflict::LastWriteWins;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file1.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "file2.txt", None, false);
let resolver = LastWriteWins::new();
let result = log1.merge_with_resolver(&log2, &resolver, &peer1);
assert_eq!(result.operations_added, 1);
assert_eq!(result.conflicts_resolved.len(), 0);
assert!(!result.has_unresolved());
assert_eq!(log1.len(), 2);
}
#[test]
fn test_merge_with_resolver_last_write_wins() {
use super::super::conflict::{LastWriteWins, Resolution};
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "dummy", None, false); log2.record(peer2, OpType::Remove, "file.txt", None, false);
let resolver = LastWriteWins::new();
let result = log1.merge_with_resolver(&log2, &resolver, &peer1);
assert_eq!(result.operations_added, 2);
assert_eq!(result.conflicts_resolved.len(), 1);
let resolved = &result.conflicts_resolved[0];
assert_eq!(resolved.resolution, Resolution::UseIncoming);
}
#[test]
fn test_merge_with_resolver_base_wins() {
use super::super::conflict::{BaseWins, Resolution};
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Remove, "file.txt", None, false);
let resolver = BaseWins::new();
let result = log1.merge_with_resolver(&log2, &resolver, &peer1);
assert_eq!(result.operations_added, 0);
assert_eq!(result.conflicts_resolved.len(), 1);
let resolved = &result.conflicts_resolved[0];
assert_eq!(resolved.resolution, Resolution::UseBase);
let resolved_path = log1.resolve_path("file.txt");
assert!(matches!(resolved_path.unwrap().op_type, OpType::Add));
}
#[test]
fn test_merge_with_resolver_fork_on_conflict() {
use super::super::conflict::ForkOnConflict;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "file.txt", None, false);
let resolver = ForkOnConflict::new();
let result = log1.merge_with_resolver(&log2, &resolver, &peer1);
assert_eq!(result.operations_added, 1);
assert_eq!(result.conflicts_resolved.len(), 0);
assert!(result.has_unresolved());
assert_eq!(result.unresolved_conflicts.len(), 1);
assert_eq!(log1.len(), 2);
let resolved = log1.resolve_path("file.txt").unwrap();
if peer2 > peer1 {
assert_eq!(resolved.id.peer_id, peer2);
} else {
assert_eq!(resolved.id.peer_id, peer1);
}
}
#[test]
fn test_merge_with_resolver_concurrent_ops() {
use super::super::conflict::LastWriteWins;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "file.txt", None, false);
let resolver = LastWriteWins::new();
let result = log1.merge_with_resolver(&log2, &resolver, &peer1);
assert_eq!(result.total_conflicts(), 1);
if peer2 > peer1 {
assert_eq!(result.operations_added, 1);
} else {
assert_eq!(result.operations_added, 0);
}
}
#[test]
fn test_merge_with_resolver_idempotent() {
use super::super::conflict::LastWriteWins;
let peer1 = make_peer_id(1);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let log1_clone = log1.clone();
let resolver = LastWriteWins::new();
let result = log1.merge_with_resolver(&log1_clone, &resolver, &peer1);
assert_eq!(result.operations_added, 0);
assert_eq!(result.total_conflicts(), 0);
assert_eq!(log1.len(), 1);
}
#[test]
fn test_merge_with_resolver_mixed_conflicts() {
use super::super::conflict::LastWriteWins;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file1.txt", None, false);
log1.record(peer1, OpType::Add, "file2.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Remove, "file1.txt", None, false); log2.record(peer2, OpType::Add, "file3.txt", None, false);
let resolver = LastWriteWins::new();
let result = log1.merge_with_resolver(&log2, &resolver, &peer1);
assert_eq!(result.total_conflicts(), 1);
assert!(log1.resolve_path("file3.txt").is_some());
}
#[test]
fn test_merge_with_resolver_conflict_file() {
use super::super::conflict::{ConflictFile, Resolution};
use crate::linked_data::Link;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let make_link = |seed: u8| {
let mut hash_bytes = [0u8; 32];
hash_bytes[0] = seed;
let hash = iroh_blobs::Hash::from_bytes(hash_bytes);
Link::new(crate::linked_data::LD_RAW_CODEC, hash)
};
let link_local = make_link(0xAA);
let link_other = make_link(0xBB);
let link_incoming = make_link(0xCC);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "document.txt", Some(link_local), false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "other.txt", Some(link_other), false); log2.record(
peer2,
OpType::Add,
"document.txt",
Some(link_incoming.clone()),
false,
);
let resolver = ConflictFile::new();
let result = log1.merge_with_resolver(&log2, &resolver, &peer1);
assert_eq!(result.operations_added, 2);
assert_eq!(result.conflicts_resolved.len(), 1);
let resolved = &result.conflicts_resolved[0];
match &resolved.resolution {
Resolution::RenameIncoming { new_path } => {
let expected_version: String =
link_incoming.hash().to_string().chars().take(8).collect();
assert_eq!(
new_path,
&std::path::PathBuf::from(format!("document.txt@{}", expected_version))
);
}
_ => panic!("Expected RenameIncoming, got {:?}", resolved.resolution),
}
let original = log1.resolve_path("document.txt").unwrap();
assert_eq!(original.id.peer_id, peer1);
let resolved_conflict = &result.conflicts_resolved[0];
if let Resolution::RenameIncoming { new_path } = &resolved_conflict.resolution {
let conflict = log1.resolve_path(new_path).unwrap();
assert_eq!(conflict.id.peer_id, peer2);
}
}
#[test]
fn test_merge_logs_empty() {
use super::super::conflict::LastWriteWins;
use super::merge_logs;
let peer1 = make_peer_id(1);
let resolver = LastWriteWins::new();
let (merged, results) = merge_logs::<LastWriteWins>(&[], &resolver, &peer1);
assert!(merged.is_empty());
assert!(results.is_empty());
}
#[test]
fn test_merge_logs_single() {
use super::super::conflict::LastWriteWins;
use super::merge_logs;
let peer1 = make_peer_id(1);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file.txt", None, false);
let resolver = LastWriteWins::new();
let (merged, results) = merge_logs(&[&log1], &resolver, &peer1);
assert_eq!(merged.len(), 1);
assert!(results.is_empty());
}
#[test]
fn test_merge_logs_two_no_conflict() {
use super::super::conflict::LastWriteWins;
use super::merge_logs;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "file1.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "file2.txt", None, false);
let resolver = LastWriteWins::new();
let (merged, results) = merge_logs(&[&log1, &log2], &resolver, &peer1);
assert_eq!(merged.len(), 2);
assert!(merged.resolve_path("file1.txt").is_some());
assert!(merged.resolve_path("file2.txt").is_some());
assert_eq!(results.len(), 1);
assert_eq!(results[0].operations_added, 1);
assert_eq!(results[0].total_conflicts(), 0);
}
#[test]
fn test_merge_logs_three_way() {
use super::super::conflict::ConflictFile;
use super::merge_logs;
use crate::linked_data::Link;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let peer3 = make_peer_id(3);
let make_link = |seed: u8| {
let mut hash_bytes = [0u8; 32];
hash_bytes[0] = seed;
let hash = iroh_blobs::Hash::from_bytes(hash_bytes);
Link::new(crate::linked_data::LD_RAW_CODEC, hash)
};
let mut log_alice = PathOpLog::new();
log_alice.record(peer1, OpType::Add, "file.txt", Some(make_link(0xAA)), false);
let mut log_bob = PathOpLog::new();
log_bob.record(peer2, OpType::Add, "file.txt", Some(make_link(0xBB)), false);
let mut log_carol = PathOpLog::new();
log_carol.record(peer3, OpType::Add, "file.txt", Some(make_link(0xCC)), false);
let resolver = ConflictFile::new();
let (merged, results) = merge_logs(&[&log_alice, &log_bob, &log_carol], &resolver, &peer1);
assert_eq!(results.len(), 2);
assert_eq!(results[0].total_conflicts(), 1);
assert_eq!(results[1].total_conflicts(), 1);
assert_eq!(merged.len(), 3);
assert!(merged.resolve_path("file.txt").is_some());
}
#[test]
fn test_merge_logs_accumulates_operations() {
use super::super::conflict::LastWriteWins;
use super::merge_logs;
let peer1 = make_peer_id(1);
let peer2 = make_peer_id(2);
let peer3 = make_peer_id(3);
let mut log1 = PathOpLog::new();
log1.record(peer1, OpType::Add, "a.txt", None, false);
log1.record(peer1, OpType::Add, "b.txt", None, false);
let mut log2 = PathOpLog::new();
log2.record(peer2, OpType::Add, "c.txt", None, false);
let mut log3 = PathOpLog::new();
log3.record(peer3, OpType::Add, "d.txt", None, false);
log3.record(peer3, OpType::Add, "e.txt", None, false);
let resolver = LastWriteWins::new();
let (merged, results) = merge_logs(&[&log1, &log2, &log3], &resolver, &peer1);
assert_eq!(merged.len(), 5);
assert_eq!(results.len(), 2);
assert_eq!(results[0].operations_added, 1); assert_eq!(results[1].operations_added, 2); }
}