#[derive(Debug, PartialEq)]
pub enum DiffResult {
Added(Vec<u8>, Vec<u8>),
Removed(Vec<u8>, Vec<u8>),
Modified(Vec<u8>, Vec<u8>, Vec<u8>),
}
#[derive(Debug, PartialEq, Clone)]
pub enum MergeResult {
Added(Vec<u8>, Vec<u8>),
Removed(Vec<u8>),
Modified(Vec<u8>, Vec<u8>),
Conflict(MergeConflict),
}
#[derive(Debug, PartialEq, Clone)]
pub struct MergeConflict {
pub key: Vec<u8>,
pub base_value: Option<Vec<u8>>,
pub source_value: Option<Vec<u8>>,
pub destination_value: Option<Vec<u8>>,
}
pub trait ConflictResolver {
fn resolve_conflict(&self, conflict: &MergeConflict) -> Option<MergeResult>;
}
#[derive(Debug, Clone, Default)]
pub struct IgnoreConflictsResolver;
impl ConflictResolver for IgnoreConflictsResolver {
fn resolve_conflict(&self, conflict: &MergeConflict) -> Option<MergeResult> {
match &conflict.destination_value {
Some(value) => Some(MergeResult::Modified(conflict.key.clone(), value.clone())),
None => Some(MergeResult::Removed(conflict.key.clone())),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TakeSourceResolver;
impl ConflictResolver for TakeSourceResolver {
fn resolve_conflict(&self, conflict: &MergeConflict) -> Option<MergeResult> {
match &conflict.source_value {
Some(value) => Some(MergeResult::Modified(conflict.key.clone(), value.clone())),
None => Some(MergeResult::Removed(conflict.key.clone())),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TakeDestinationResolver;
impl ConflictResolver for TakeDestinationResolver {
fn resolve_conflict(&self, conflict: &MergeConflict) -> Option<MergeResult> {
match &conflict.destination_value {
Some(value) => Some(MergeResult::Modified(conflict.key.clone(), value.clone())),
None => Some(MergeResult::Removed(conflict.key.clone())),
}
}
}
#[derive(Debug, Clone)]
pub struct AgentPriorityResolver {
agent_priorities: std::collections::HashMap<String, u32>,
default_priority: u32,
}
impl Default for AgentPriorityResolver {
fn default() -> Self {
Self::new()
}
}
impl AgentPriorityResolver {
pub fn new() -> Self {
Self {
agent_priorities: std::collections::HashMap::new(),
default_priority: 1,
}
}
pub fn with_priorities(priorities: std::collections::HashMap<String, u32>) -> Self {
Self {
agent_priorities: priorities,
default_priority: 1,
}
}
pub fn set_agent_priority(&mut self, agent_id: String, priority: u32) {
self.agent_priorities.insert(agent_id, priority);
}
pub fn set_default_priority(&mut self, priority: u32) {
self.default_priority = priority;
}
#[allow(dead_code)]
fn extract_agent_id(&self, key: &[u8]) -> Option<String> {
let key_str = String::from_utf8_lossy(key);
if key_str.starts_with("agent") {
if let Some(colon_pos) = key_str.find(':') {
return Some(key_str[..colon_pos].to_string());
}
}
None
}
#[allow(dead_code)]
fn get_priority_for_key(&self, key: &[u8]) -> u32 {
self.extract_agent_id(key)
.and_then(|agent_id| self.agent_priorities.get(&agent_id))
.copied()
.unwrap_or(self.default_priority)
}
}
impl ConflictResolver for AgentPriorityResolver {
fn resolve_conflict(&self, conflict: &MergeConflict) -> Option<MergeResult> {
match (&conflict.source_value, &conflict.destination_value) {
(Some(source), Some(_dest)) => {
Some(MergeResult::Modified(conflict.key.clone(), source.clone()))
}
(Some(source), None) => {
Some(MergeResult::Added(conflict.key.clone(), source.clone()))
}
(None, Some(dest)) => {
Some(MergeResult::Modified(conflict.key.clone(), dest.clone()))
}
(None, None) => {
Some(MergeResult::Removed(conflict.key.clone()))
}
}
}
}
#[derive(Debug, Clone)]
pub struct TimestampResolver {
timestamp_extractor: fn(&[u8], &[u8]) -> Option<u64>,
}
impl Default for TimestampResolver {
fn default() -> Self {
Self::default_resolver()
}
}
impl TimestampResolver {
pub fn new(extractor: fn(&[u8], &[u8]) -> Option<u64>) -> Self {
Self {
timestamp_extractor: extractor,
}
}
pub fn default_resolver() -> Self {
Self::new(|key, _value| {
let key_str = String::from_utf8_lossy(key);
if let Some(ts_start) = key_str.find("timestamp:") {
let ts_part = &key_str[ts_start + 10..];
if let Some(ts_end) = ts_part.find(':') {
ts_part[..ts_end].parse::<u64>().ok()
} else {
ts_part.parse::<u64>().ok()
}
} else {
None
}
})
}
}
impl ConflictResolver for TimestampResolver {
fn resolve_conflict(&self, conflict: &MergeConflict) -> Option<MergeResult> {
match (&conflict.source_value, &conflict.destination_value) {
(Some(source), Some(dest)) => {
let source_ts = (self.timestamp_extractor)(&conflict.key, source);
let dest_ts = (self.timestamp_extractor)(&conflict.key, dest);
match (source_ts, dest_ts) {
(Some(s_ts), Some(d_ts)) => {
if s_ts >= d_ts {
Some(MergeResult::Modified(conflict.key.clone(), source.clone()))
} else {
Some(MergeResult::Modified(conflict.key.clone(), dest.clone()))
}
}
(Some(_), None) => {
Some(MergeResult::Modified(conflict.key.clone(), source.clone()))
}
(None, Some(_)) => {
Some(MergeResult::Modified(conflict.key.clone(), dest.clone()))
}
(None, None) => {
Some(MergeResult::Modified(conflict.key.clone(), source.clone()))
}
}
}
(Some(source), None) => Some(MergeResult::Added(conflict.key.clone(), source.clone())),
(None, Some(dest)) => Some(MergeResult::Modified(conflict.key.clone(), dest.clone())),
(None, None) => Some(MergeResult::Removed(conflict.key.clone())),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SemanticMergeResolver;
impl ConflictResolver for SemanticMergeResolver {
fn resolve_conflict(&self, conflict: &MergeConflict) -> Option<MergeResult> {
match (&conflict.source_value, &conflict.destination_value) {
(Some(source), Some(dest)) => {
if let (Ok(source_json), Ok(dest_json)) = (
serde_json::from_slice::<serde_json::Value>(source),
serde_json::from_slice::<serde_json::Value>(dest),
) {
let merged = Self::merge_json_values(&source_json, &dest_json);
if let Ok(merged_bytes) = serde_json::to_vec(&merged) {
return Some(MergeResult::Modified(conflict.key.clone(), merged_bytes));
}
}
Some(MergeResult::Modified(conflict.key.clone(), source.clone()))
}
(Some(source), None) => Some(MergeResult::Added(conflict.key.clone(), source.clone())),
(None, Some(dest)) => Some(MergeResult::Modified(conflict.key.clone(), dest.clone())),
(None, None) => Some(MergeResult::Removed(conflict.key.clone())),
}
}
}
impl SemanticMergeResolver {
fn merge_json_values(
source: &serde_json::Value,
dest: &serde_json::Value,
) -> serde_json::Value {
match (source, dest) {
(serde_json::Value::Object(source_obj), serde_json::Value::Object(dest_obj)) => {
let mut merged = dest_obj.clone();
for (key, value) in source_obj {
if let Some(dest_value) = dest_obj.get(key) {
merged.insert(key.clone(), Self::merge_json_values(value, dest_value));
} else {
merged.insert(key.clone(), value.clone());
}
}
serde_json::Value::Object(merged)
}
(serde_json::Value::Array(source_arr), serde_json::Value::Array(dest_arr)) => {
let mut merged = dest_arr.clone();
for item in source_arr {
if !merged.contains(item) {
merged.push(item.clone());
}
}
serde_json::Value::Array(merged)
}
_ => {
source.clone()
}
}
}
}