use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IndexVersion {
pub vector_id: String,
pub version: u64,
pub timestamp_ms: u64,
pub vector: Vec<f32>,
pub metadata: HashMap<String, String>,
pub source_dc: String,
}
impl IndexVersion {
pub fn new(
vector_id: impl Into<String>,
version: u64,
timestamp_ms: u64,
vector: Vec<f32>,
source_dc: impl Into<String>,
) -> Self {
Self {
vector_id: vector_id.into(),
version,
timestamp_ms,
vector,
metadata: HashMap::new(),
source_dc: source_dc.into(),
}
}
pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct MergedIndex {
pub version: IndexVersion,
pub vector_source: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictPolicy {
LastWriteWins,
HighestVersionWins,
MergeUnion,
Manual,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum Resolution {
UseLocal,
UseRemote,
Merge(MergedIndex),
RequiresManual,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ConflictResolver;
impl ConflictResolver {
pub fn resolve(
&self,
local: &IndexVersion,
remote: &IndexVersion,
policy: &ConflictPolicy,
) -> Resolution {
match policy {
ConflictPolicy::LastWriteWins => {
if remote.timestamp_ms > local.timestamp_ms {
Resolution::UseRemote
} else if remote.timestamp_ms < local.timestamp_ms {
Resolution::UseLocal
} else {
Resolution::UseRemote
}
}
ConflictPolicy::HighestVersionWins => {
if remote.version > local.version {
Resolution::UseRemote
} else if remote.version < local.version {
Resolution::UseLocal
} else {
Resolution::UseLocal
}
}
ConflictPolicy::MergeUnion => {
let (winner, loser) = if remote.version >= local.version {
(remote, local)
} else {
(local, remote)
};
let mut merged_meta = loser.metadata.clone();
for (k, v) in &winner.metadata {
merged_meta.insert(k.clone(), v.clone());
}
let merged_version = IndexVersion {
vector_id: local.vector_id.clone(),
version: winner.version,
timestamp_ms: winner.timestamp_ms,
vector: winner.vector.clone(),
metadata: merged_meta,
source_dc: winner.source_dc.clone(),
};
Resolution::Merge(MergedIndex {
version: merged_version,
vector_source: winner.source_dc.clone(),
})
}
ConflictPolicy::Manual => Resolution::RequiresManual,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_version(version: u64, timestamp_ms: u64, dc: &str) -> IndexVersion {
IndexVersion::new(
"vec-1",
version,
timestamp_ms,
vec![version as f32, 0.0],
dc,
)
}
#[test]
fn test_lww_remote_newer() {
let r = ConflictResolver;
let local = make_version(1, 1000, "dc-a");
let remote = make_version(2, 2000, "dc-b");
assert_eq!(
r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
Resolution::UseRemote
);
}
#[test]
fn test_lww_local_newer() {
let r = ConflictResolver;
let local = make_version(2, 2000, "dc-a");
let remote = make_version(1, 1000, "dc-b");
assert_eq!(
r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
Resolution::UseLocal
);
}
#[test]
fn test_lww_tie_prefers_remote() {
let r = ConflictResolver;
let local = make_version(1, 1000, "dc-a");
let remote = make_version(2, 1000, "dc-b"); assert_eq!(
r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
Resolution::UseRemote
);
}
#[test]
fn test_hvw_remote_higher() {
let r = ConflictResolver;
let local = make_version(5, 1000, "dc-a");
let remote = make_version(10, 500, "dc-b"); assert_eq!(
r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
Resolution::UseRemote
);
}
#[test]
fn test_hvw_local_higher() {
let r = ConflictResolver;
let local = make_version(10, 1000, "dc-a");
let remote = make_version(5, 2000, "dc-b");
assert_eq!(
r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
Resolution::UseLocal
);
}
#[test]
fn test_hvw_tie_prefers_local() {
let r = ConflictResolver;
let local = make_version(7, 1000, "dc-a");
let remote = make_version(7, 1000, "dc-b"); assert_eq!(
r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
Resolution::UseLocal
);
}
#[test]
fn test_merge_union_higher_version_provides_vector() {
let r = ConflictResolver;
let local = IndexVersion::new("vec-1", 5, 1000, vec![1.0, 2.0], "dc-a");
let remote = IndexVersion::new("vec-1", 10, 2000, vec![3.0, 4.0], "dc-b");
if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
assert_eq!(merged.version.vector, vec![3.0, 4.0]);
assert_eq!(merged.vector_source, "dc-b");
} else {
panic!("Expected Merge resolution");
}
}
#[test]
fn test_merge_union_metadata_union() {
let r = ConflictResolver;
let mut local = IndexVersion::new("vec-1", 5, 1000, vec![1.0], "dc-a");
local.metadata.insert("key_a".into(), "val_a".into());
local.metadata.insert("shared".into(), "local_val".into());
let mut remote = IndexVersion::new("vec-1", 10, 2000, vec![2.0], "dc-b");
remote.metadata.insert("key_b".into(), "val_b".into());
remote.metadata.insert("shared".into(), "remote_val".into());
if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
assert!(merged.version.metadata.contains_key("key_a"));
assert!(merged.version.metadata.contains_key("key_b"));
assert_eq!(merged.version.metadata["shared"], "remote_val");
} else {
panic!("Expected Merge resolution");
}
}
#[test]
fn test_merge_union_equal_versions_picks_remote() {
let r = ConflictResolver;
let local = IndexVersion::new("vec-1", 5, 1000, vec![1.0], "dc-a");
let remote = IndexVersion::new("vec-1", 5, 2000, vec![2.0], "dc-b");
if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
assert_eq!(merged.version.vector, vec![2.0]);
} else {
panic!("Expected Merge resolution");
}
}
#[test]
fn test_manual_policy_requires_manual() {
let r = ConflictResolver;
let local = make_version(1, 1000, "dc-a");
let remote = make_version(2, 2000, "dc-b");
assert_eq!(
r.resolve(&local, &remote, &ConflictPolicy::Manual),
Resolution::RequiresManual
);
}
#[test]
fn test_index_version_with_metadata() {
let v =
IndexVersion::new("v1", 1, 1000, vec![0.0], "dc-a").with_metadata("tag", "important");
assert_eq!(v.metadata["tag"], "important");
}
#[test]
fn test_resolution_is_clone() {
let r = Resolution::UseLocal;
let _r2 = r.clone();
}
#[test]
fn test_merged_index_carries_correct_version_number() {
let r = ConflictResolver;
let local = IndexVersion::new("v1", 3, 100, vec![1.0], "dc-a");
let remote = IndexVersion::new("v1", 8, 200, vec![8.0], "dc-b");
if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
assert_eq!(merged.version.version, 8);
} else {
panic!("Expected merge");
}
}
}