1use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21
22#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28pub struct IndexVersion {
29 pub vector_id: String,
31 pub version: u64,
33 pub timestamp_ms: u64,
35 pub vector: Vec<f32>,
37 pub metadata: HashMap<String, String>,
39 pub source_dc: String,
41}
42
43impl IndexVersion {
44 pub fn new(
46 vector_id: impl Into<String>,
47 version: u64,
48 timestamp_ms: u64,
49 vector: Vec<f32>,
50 source_dc: impl Into<String>,
51 ) -> Self {
52 Self {
53 vector_id: vector_id.into(),
54 version,
55 timestamp_ms,
56 vector,
57 metadata: HashMap::new(),
58 source_dc: source_dc.into(),
59 }
60 }
61
62 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
64 self.metadata.insert(key.into(), value.into());
65 self
66 }
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78pub struct MergedIndex {
79 pub version: IndexVersion,
81 pub vector_source: String,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
91pub enum ConflictPolicy {
92 LastWriteWins,
96 HighestVersionWins,
100 MergeUnion,
104 Manual,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114pub enum Resolution {
115 UseLocal,
117 UseRemote,
119 Merge(MergedIndex),
121 RequiresManual,
123}
124
125#[derive(Debug, Clone, Copy, Default)]
143pub struct ConflictResolver;
144
145impl ConflictResolver {
146 pub fn resolve(
149 &self,
150 local: &IndexVersion,
151 remote: &IndexVersion,
152 policy: &ConflictPolicy,
153 ) -> Resolution {
154 match policy {
155 ConflictPolicy::LastWriteWins => {
156 if remote.timestamp_ms > local.timestamp_ms {
157 Resolution::UseRemote
158 } else if remote.timestamp_ms < local.timestamp_ms {
159 Resolution::UseLocal
160 } else {
161 Resolution::UseRemote
163 }
164 }
165 ConflictPolicy::HighestVersionWins => {
166 if remote.version > local.version {
167 Resolution::UseRemote
168 } else if remote.version < local.version {
169 Resolution::UseLocal
170 } else {
171 Resolution::UseLocal
173 }
174 }
175 ConflictPolicy::MergeUnion => {
176 let (winner, loser) = if remote.version >= local.version {
178 (remote, local)
179 } else {
180 (local, remote)
181 };
182
183 let mut merged_meta = loser.metadata.clone();
184 for (k, v) in &winner.metadata {
186 merged_meta.insert(k.clone(), v.clone());
187 }
188
189 let merged_version = IndexVersion {
190 vector_id: local.vector_id.clone(),
191 version: winner.version,
192 timestamp_ms: winner.timestamp_ms,
193 vector: winner.vector.clone(),
194 metadata: merged_meta,
195 source_dc: winner.source_dc.clone(),
196 };
197
198 Resolution::Merge(MergedIndex {
199 version: merged_version,
200 vector_source: winner.source_dc.clone(),
201 })
202 }
203 ConflictPolicy::Manual => Resolution::RequiresManual,
204 }
205 }
206}
207
208#[cfg(test)]
213mod tests {
214 use super::*;
215
216 fn make_version(version: u64, timestamp_ms: u64, dc: &str) -> IndexVersion {
217 IndexVersion::new(
218 "vec-1",
219 version,
220 timestamp_ms,
221 vec![version as f32, 0.0],
222 dc,
223 )
224 }
225
226 #[test]
229 fn test_lww_remote_newer() {
230 let r = ConflictResolver;
231 let local = make_version(1, 1000, "dc-a");
232 let remote = make_version(2, 2000, "dc-b");
233 assert_eq!(
234 r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
235 Resolution::UseRemote
236 );
237 }
238
239 #[test]
240 fn test_lww_local_newer() {
241 let r = ConflictResolver;
242 let local = make_version(2, 2000, "dc-a");
243 let remote = make_version(1, 1000, "dc-b");
244 assert_eq!(
245 r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
246 Resolution::UseLocal
247 );
248 }
249
250 #[test]
251 fn test_lww_tie_prefers_remote() {
252 let r = ConflictResolver;
253 let local = make_version(1, 1000, "dc-a");
254 let remote = make_version(2, 1000, "dc-b"); assert_eq!(
256 r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
257 Resolution::UseRemote
258 );
259 }
260
261 #[test]
264 fn test_hvw_remote_higher() {
265 let r = ConflictResolver;
266 let local = make_version(5, 1000, "dc-a");
267 let remote = make_version(10, 500, "dc-b"); assert_eq!(
269 r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
270 Resolution::UseRemote
271 );
272 }
273
274 #[test]
275 fn test_hvw_local_higher() {
276 let r = ConflictResolver;
277 let local = make_version(10, 1000, "dc-a");
278 let remote = make_version(5, 2000, "dc-b");
279 assert_eq!(
280 r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
281 Resolution::UseLocal
282 );
283 }
284
285 #[test]
286 fn test_hvw_tie_prefers_local() {
287 let r = ConflictResolver;
288 let local = make_version(7, 1000, "dc-a");
289 let remote = make_version(7, 1000, "dc-b"); assert_eq!(
291 r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
292 Resolution::UseLocal
293 );
294 }
295
296 #[test]
299 fn test_merge_union_higher_version_provides_vector() {
300 let r = ConflictResolver;
301 let local = IndexVersion::new("vec-1", 5, 1000, vec![1.0, 2.0], "dc-a");
302 let remote = IndexVersion::new("vec-1", 10, 2000, vec![3.0, 4.0], "dc-b");
303
304 if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
305 assert_eq!(merged.version.vector, vec![3.0, 4.0]);
306 assert_eq!(merged.vector_source, "dc-b");
307 } else {
308 panic!("Expected Merge resolution");
309 }
310 }
311
312 #[test]
313 fn test_merge_union_metadata_union() {
314 let r = ConflictResolver;
315 let mut local = IndexVersion::new("vec-1", 5, 1000, vec![1.0], "dc-a");
316 local.metadata.insert("key_a".into(), "val_a".into());
317 local.metadata.insert("shared".into(), "local_val".into());
318
319 let mut remote = IndexVersion::new("vec-1", 10, 2000, vec![2.0], "dc-b");
320 remote.metadata.insert("key_b".into(), "val_b".into());
321 remote.metadata.insert("shared".into(), "remote_val".into());
322
323 if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
324 assert!(merged.version.metadata.contains_key("key_a"));
326 assert!(merged.version.metadata.contains_key("key_b"));
327 assert_eq!(merged.version.metadata["shared"], "remote_val");
329 } else {
330 panic!("Expected Merge resolution");
331 }
332 }
333
334 #[test]
335 fn test_merge_union_equal_versions_picks_remote() {
336 let r = ConflictResolver;
337 let local = IndexVersion::new("vec-1", 5, 1000, vec![1.0], "dc-a");
338 let remote = IndexVersion::new("vec-1", 5, 2000, vec![2.0], "dc-b");
339
340 if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
341 assert_eq!(merged.version.vector, vec![2.0]);
343 } else {
344 panic!("Expected Merge resolution");
345 }
346 }
347
348 #[test]
351 fn test_manual_policy_requires_manual() {
352 let r = ConflictResolver;
353 let local = make_version(1, 1000, "dc-a");
354 let remote = make_version(2, 2000, "dc-b");
355 assert_eq!(
356 r.resolve(&local, &remote, &ConflictPolicy::Manual),
357 Resolution::RequiresManual
358 );
359 }
360
361 #[test]
364 fn test_index_version_with_metadata() {
365 let v =
366 IndexVersion::new("v1", 1, 1000, vec![0.0], "dc-a").with_metadata("tag", "important");
367 assert_eq!(v.metadata["tag"], "important");
368 }
369
370 #[test]
371 fn test_resolution_is_clone() {
372 let r = Resolution::UseLocal;
373 let _r2 = r.clone();
374 }
375
376 #[test]
377 fn test_merged_index_carries_correct_version_number() {
378 let r = ConflictResolver;
379 let local = IndexVersion::new("v1", 3, 100, vec![1.0], "dc-a");
380 let remote = IndexVersion::new("v1", 8, 200, vec![8.0], "dc-b");
381
382 if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
383 assert_eq!(merged.version.version, 8);
384 } else {
385 panic!("Expected merge");
386 }
387 }
388}