Skip to main content

oxirs_vec/distributed/
conflict_resolver.rs

1//! Conflict resolution for cross-DC vector index synchronisation.
2//!
3//! When two datacenters each accept writes during a network partition they can
4//! accumulate divergent versions of the same index entry.  This module provides
5//! strategies for reconciling those versions when connectivity is restored.
6//!
7//! # Design
8//!
9//! - `IndexVersion` carries a version number, timestamp and the actual vector
10//!   data so the resolver can make a purely logical decision without touching
11//!   the storage layer.
12//! - `ConflictResolver` is stateless; call `resolve` any number of times.
13//! - `Resolution` describes what the caller should do with the two versions.
14//!
15//! # Pure Rust Policy
16//!
17//! No CUDA runtime calls or FFI.
18
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21
22// ============================================================
23// IndexVersion
24// ============================================================
25
26/// A versioned snapshot of a single vector entry.
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28pub struct IndexVersion {
29    /// The unique identifier of the vector
30    pub vector_id: String,
31    /// Monotonically increasing version number (e.g. Raft log index or DC seq)
32    pub version: u64,
33    /// Wall-clock timestamp when this version was written (Unix milliseconds)
34    pub timestamp_ms: u64,
35    /// The vector data
36    pub vector: Vec<f32>,
37    /// Associated metadata key-value pairs
38    pub metadata: HashMap<String, String>,
39    /// Originating datacenter identifier
40    pub source_dc: String,
41}
42
43impl IndexVersion {
44    /// Create a new version entry.
45    pub fn new(
46        vector_id: impl Into<String>,
47        version: u64,
48        timestamp_ms: u64,
49        vector: Vec<f32>,
50        source_dc: impl Into<String>,
51    ) -> Self {
52        Self {
53            vector_id: vector_id.into(),
54            version,
55            timestamp_ms,
56            vector,
57            metadata: HashMap::new(),
58            source_dc: source_dc.into(),
59        }
60    }
61
62    /// Attach metadata to this version (builder-style).
63    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
64        self.metadata.insert(key.into(), value.into());
65        self
66    }
67}
68
69// ============================================================
70// MergedIndex
71// ============================================================
72
73/// A merged index entry produced when `ConflictPolicy::MergeUnion` is applied.
74///
75/// The vector data is taken from the higher-version side; metadata is a union
76/// of both sides (local keys win on collision).
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78pub struct MergedIndex {
79    /// The vector entry that will be written to the index
80    pub version: IndexVersion,
81    /// Which DC's vector data was chosen
82    pub vector_source: String,
83}
84
85// ============================================================
86// ConflictPolicy
87// ============================================================
88
89/// Strategy for resolving a write conflict between two index versions.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
91pub enum ConflictPolicy {
92    /// The version with the later wall-clock timestamp wins.
93    ///
94    /// Ties are broken in favour of the remote (incoming) version.
95    LastWriteWins,
96    /// The version with the higher monotonic version number wins.
97    ///
98    /// Ties are broken in favour of the local version.
99    HighestVersionWins,
100    /// Take the vector from the higher-version side and merge all metadata.
101    ///
102    /// Local metadata keys take precedence on collision.
103    MergeUnion,
104    /// Surface the conflict to the caller for application-level handling.
105    Manual,
106}
107
108// ============================================================
109// Resolution
110// ============================================================
111
112/// The outcome of a conflict resolution operation.
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
114pub enum Resolution {
115    /// Keep the local version unchanged.
116    UseLocal,
117    /// Replace local with the remote version.
118    UseRemote,
119    /// Replace local with the merged version.
120    Merge(MergedIndex),
121    /// The policy is `Manual`; the caller must decide.
122    RequiresManual,
123}
124
125// ============================================================
126// ConflictResolver
127// ============================================================
128
129/// Stateless conflict resolver.
130///
131/// # Example
132/// ```
133/// use oxirs_vec::distributed::{ConflictResolver, ConflictPolicy, IndexVersion, Resolution};
134///
135/// let resolver = ConflictResolver;
136/// let local = IndexVersion::new("v1", 10, 1_000, vec![1.0, 0.0], "dc-a");
137/// let remote = IndexVersion::new("v1", 20, 2_000, vec![0.0, 1.0], "dc-b");
138///
139/// let outcome = resolver.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins);
140/// assert_eq!(outcome, Resolution::UseRemote);
141/// ```
142#[derive(Debug, Clone, Copy, Default)]
143pub struct ConflictResolver;
144
145impl ConflictResolver {
146    /// Resolve a conflict between a `local` and `remote` version of the same
147    /// vector entry according to `policy`.
148    pub fn resolve(
149        &self,
150        local: &IndexVersion,
151        remote: &IndexVersion,
152        policy: &ConflictPolicy,
153    ) -> Resolution {
154        match policy {
155            ConflictPolicy::LastWriteWins => {
156                if remote.timestamp_ms > local.timestamp_ms {
157                    Resolution::UseRemote
158                } else if remote.timestamp_ms < local.timestamp_ms {
159                    Resolution::UseLocal
160                } else {
161                    // Tie: favour remote (incoming wins)
162                    Resolution::UseRemote
163                }
164            }
165            ConflictPolicy::HighestVersionWins => {
166                if remote.version > local.version {
167                    Resolution::UseRemote
168                } else if remote.version < local.version {
169                    Resolution::UseLocal
170                } else {
171                    // Tie: favour local (stable)
172                    Resolution::UseLocal
173                }
174            }
175            ConflictPolicy::MergeUnion => {
176                // Higher version provides the vector; metadata is unioned
177                let (winner, loser) = if remote.version >= local.version {
178                    (remote, local)
179                } else {
180                    (local, remote)
181                };
182
183                let mut merged_meta = loser.metadata.clone();
184                // Local (winner here) keys win on collision
185                for (k, v) in &winner.metadata {
186                    merged_meta.insert(k.clone(), v.clone());
187                }
188
189                let merged_version = IndexVersion {
190                    vector_id: local.vector_id.clone(),
191                    version: winner.version,
192                    timestamp_ms: winner.timestamp_ms,
193                    vector: winner.vector.clone(),
194                    metadata: merged_meta,
195                    source_dc: winner.source_dc.clone(),
196                };
197
198                Resolution::Merge(MergedIndex {
199                    version: merged_version,
200                    vector_source: winner.source_dc.clone(),
201                })
202            }
203            ConflictPolicy::Manual => Resolution::RequiresManual,
204        }
205    }
206}
207
208// ============================================================
209// Tests
210// ============================================================
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    fn make_version(version: u64, timestamp_ms: u64, dc: &str) -> IndexVersion {
217        IndexVersion::new(
218            "vec-1",
219            version,
220            timestamp_ms,
221            vec![version as f32, 0.0],
222            dc,
223        )
224    }
225
226    // ---- LastWriteWins ----
227
228    #[test]
229    fn test_lww_remote_newer() {
230        let r = ConflictResolver;
231        let local = make_version(1, 1000, "dc-a");
232        let remote = make_version(2, 2000, "dc-b");
233        assert_eq!(
234            r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
235            Resolution::UseRemote
236        );
237    }
238
239    #[test]
240    fn test_lww_local_newer() {
241        let r = ConflictResolver;
242        let local = make_version(2, 2000, "dc-a");
243        let remote = make_version(1, 1000, "dc-b");
244        assert_eq!(
245            r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
246            Resolution::UseLocal
247        );
248    }
249
250    #[test]
251    fn test_lww_tie_prefers_remote() {
252        let r = ConflictResolver;
253        let local = make_version(1, 1000, "dc-a");
254        let remote = make_version(2, 1000, "dc-b"); // same timestamp
255        assert_eq!(
256            r.resolve(&local, &remote, &ConflictPolicy::LastWriteWins),
257            Resolution::UseRemote
258        );
259    }
260
261    // ---- HighestVersionWins ----
262
263    #[test]
264    fn test_hvw_remote_higher() {
265        let r = ConflictResolver;
266        let local = make_version(5, 1000, "dc-a");
267        let remote = make_version(10, 500, "dc-b"); // older timestamp but higher version
268        assert_eq!(
269            r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
270            Resolution::UseRemote
271        );
272    }
273
274    #[test]
275    fn test_hvw_local_higher() {
276        let r = ConflictResolver;
277        let local = make_version(10, 1000, "dc-a");
278        let remote = make_version(5, 2000, "dc-b");
279        assert_eq!(
280            r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
281            Resolution::UseLocal
282        );
283    }
284
285    #[test]
286    fn test_hvw_tie_prefers_local() {
287        let r = ConflictResolver;
288        let local = make_version(7, 1000, "dc-a");
289        let remote = make_version(7, 1000, "dc-b"); // exact tie
290        assert_eq!(
291            r.resolve(&local, &remote, &ConflictPolicy::HighestVersionWins),
292            Resolution::UseLocal
293        );
294    }
295
296    // ---- MergeUnion ----
297
298    #[test]
299    fn test_merge_union_higher_version_provides_vector() {
300        let r = ConflictResolver;
301        let local = IndexVersion::new("vec-1", 5, 1000, vec![1.0, 2.0], "dc-a");
302        let remote = IndexVersion::new("vec-1", 10, 2000, vec![3.0, 4.0], "dc-b");
303
304        if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
305            assert_eq!(merged.version.vector, vec![3.0, 4.0]);
306            assert_eq!(merged.vector_source, "dc-b");
307        } else {
308            panic!("Expected Merge resolution");
309        }
310    }
311
312    #[test]
313    fn test_merge_union_metadata_union() {
314        let r = ConflictResolver;
315        let mut local = IndexVersion::new("vec-1", 5, 1000, vec![1.0], "dc-a");
316        local.metadata.insert("key_a".into(), "val_a".into());
317        local.metadata.insert("shared".into(), "local_val".into());
318
319        let mut remote = IndexVersion::new("vec-1", 10, 2000, vec![2.0], "dc-b");
320        remote.metadata.insert("key_b".into(), "val_b".into());
321        remote.metadata.insert("shared".into(), "remote_val".into());
322
323        if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
324            // Both keys present
325            assert!(merged.version.metadata.contains_key("key_a"));
326            assert!(merged.version.metadata.contains_key("key_b"));
327            // Winner (remote, higher version) overwrites shared key
328            assert_eq!(merged.version.metadata["shared"], "remote_val");
329        } else {
330            panic!("Expected Merge resolution");
331        }
332    }
333
334    #[test]
335    fn test_merge_union_equal_versions_picks_remote() {
336        let r = ConflictResolver;
337        let local = IndexVersion::new("vec-1", 5, 1000, vec![1.0], "dc-a");
338        let remote = IndexVersion::new("vec-1", 5, 2000, vec![2.0], "dc-b");
339
340        if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
341            // Equal versions: remote >= local so remote vector is chosen
342            assert_eq!(merged.version.vector, vec![2.0]);
343        } else {
344            panic!("Expected Merge resolution");
345        }
346    }
347
348    // ---- Manual ----
349
350    #[test]
351    fn test_manual_policy_requires_manual() {
352        let r = ConflictResolver;
353        let local = make_version(1, 1000, "dc-a");
354        let remote = make_version(2, 2000, "dc-b");
355        assert_eq!(
356            r.resolve(&local, &remote, &ConflictPolicy::Manual),
357            Resolution::RequiresManual
358        );
359    }
360
361    // ---- IndexVersion helpers ----
362
363    #[test]
364    fn test_index_version_with_metadata() {
365        let v =
366            IndexVersion::new("v1", 1, 1000, vec![0.0], "dc-a").with_metadata("tag", "important");
367        assert_eq!(v.metadata["tag"], "important");
368    }
369
370    #[test]
371    fn test_resolution_is_clone() {
372        let r = Resolution::UseLocal;
373        let _r2 = r.clone();
374    }
375
376    #[test]
377    fn test_merged_index_carries_correct_version_number() {
378        let r = ConflictResolver;
379        let local = IndexVersion::new("v1", 3, 100, vec![1.0], "dc-a");
380        let remote = IndexVersion::new("v1", 8, 200, vec![8.0], "dc-b");
381
382        if let Resolution::Merge(merged) = r.resolve(&local, &remote, &ConflictPolicy::MergeUnion) {
383            assert_eq!(merged.version.version, 8);
384        } else {
385            panic!("Expected merge");
386        }
387    }
388}