ipfrs_storage/
eventual_consistency.rs

1//! Eventual consistency support for distributed storage.
2//!
3//! Provides configurable consistency levels, version vectors, and conflict resolution
4//! for eventually consistent reads and writes.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use ipfrs_storage::eventual_consistency::{ConsistencyLevel, EventualStore};
10//!
11//! let store = EventualStore::new(base_store, ConsistencyLevel::Quorum { read: 2, write: 2 });
12//! ```
13
14use crate::traits::BlockStore;
15use async_trait::async_trait;
16use bytes::Bytes;
17use dashmap::DashMap;
18use ipfrs_core::{Block, Cid, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24/// Consistency level for read and write operations
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
26pub enum ConsistencyLevel {
27    /// Strong consistency - all replicas must agree
28    Strong,
29    /// Eventual consistency - accept stale reads, async replication
30    #[default]
31    Eventual,
32    /// Quorum-based consistency - R + W > N for linearizability
33    Quorum {
34        read_quorum: usize,
35        write_quorum: usize,
36    },
37    /// One - only one replica needs to respond (fastest, least consistent)
38    One,
39}
40
41/// Version vector for tracking causality
42#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
43pub struct VersionVector {
44    /// Mapping of node ID to version number
45    versions: HashMap<u64, u64>,
46}
47
48impl VersionVector {
49    /// Create a new empty version vector
50    pub fn new() -> Self {
51        Self {
52            versions: HashMap::new(),
53        }
54    }
55
56    /// Increment the version for a node
57    pub fn increment(&mut self, node_id: u64) {
58        let version = self.versions.entry(node_id).or_insert(0);
59        *version += 1;
60    }
61
62    /// Get the version for a node
63    pub fn get(&self, node_id: u64) -> u64 {
64        *self.versions.get(&node_id).unwrap_or(&0)
65    }
66
67    /// Check if this version vector happens before another
68    pub fn happens_before(&self, other: &VersionVector) -> bool {
69        let mut strictly_less = false;
70
71        for (node_id, version) in &self.versions {
72            let other_version = other.get(*node_id);
73            if *version > other_version {
74                return false; // Not happens-before if any version is greater
75            }
76            if *version < other_version {
77                strictly_less = true;
78            }
79        }
80
81        // Check for nodes in other but not in self
82        for (node_id, version) in &other.versions {
83            if !self.versions.contains_key(node_id) && *version > 0 {
84                strictly_less = true;
85            }
86        }
87
88        strictly_less
89    }
90
91    /// Check if two version vectors are concurrent (conflicting)
92    pub fn is_concurrent(&self, other: &VersionVector) -> bool {
93        !self.happens_before(other) && !other.happens_before(self) && self != other
94    }
95
96    /// Merge two version vectors (take the maximum of each component)
97    pub fn merge(&mut self, other: &VersionVector) {
98        for (node_id, version) in &other.versions {
99            let current = self.versions.entry(*node_id).or_insert(0);
100            *current = (*current).max(*version);
101        }
102    }
103}
104
105impl Default for VersionVector {
106    fn default() -> Self {
107        Self::new()
108    }
109}
110
111/// Versioned value with metadata
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct VersionedValue {
114    /// The actual data
115    pub data: Vec<u8>,
116    /// Version vector for causality tracking
117    pub version: VersionVector,
118    /// Timestamp of last write
119    pub timestamp: u64,
120    /// Node ID that performed the write
121    pub writer_node_id: u64,
122}
123
124impl VersionedValue {
125    /// Create a new versioned value
126    pub fn new(data: Vec<u8>, node_id: u64) -> Self {
127        let mut version = VersionVector::new();
128        version.increment(node_id);
129
130        let timestamp = SystemTime::now()
131            .duration_since(UNIX_EPOCH)
132            .unwrap()
133            .as_millis() as u64;
134
135        Self {
136            data,
137            version,
138            timestamp,
139            writer_node_id: node_id,
140        }
141    }
142
143    /// Check if this value is newer than another (for last-write-wins)
144    pub fn is_newer_than(&self, other: &VersionedValue) -> bool {
145        if self.timestamp != other.timestamp {
146            self.timestamp > other.timestamp
147        } else {
148            // Break ties using writer node ID
149            self.writer_node_id > other.writer_node_id
150        }
151    }
152}
153
154/// Conflict resolution strategy
155#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
156pub enum ConflictResolution {
157    /// Last-write-wins based on timestamp
158    #[default]
159    LastWriteWins,
160    /// Keep all conflicting versions (application must resolve)
161    KeepAll,
162    /// Use version vectors to detect causality
163    VectorClock,
164}
165
166/// Eventually consistent block store wrapper
167pub struct EventualStore<S: BlockStore> {
168    /// Underlying block store
169    inner: Arc<S>,
170    /// Consistency level for operations
171    consistency_level: ConsistencyLevel,
172    /// Conflict resolution strategy
173    conflict_resolution: ConflictResolution,
174    /// Local node ID
175    node_id: u64,
176    /// Versioned values cache
177    versions: Arc<DashMap<Cid, VersionedValue>>,
178}
179
180impl<S: BlockStore> EventualStore<S> {
181    /// Create a new eventually consistent store
182    pub fn new(
183        inner: S,
184        consistency_level: ConsistencyLevel,
185        conflict_resolution: ConflictResolution,
186        node_id: u64,
187    ) -> Self {
188        Self {
189            inner: Arc::new(inner),
190            consistency_level,
191            conflict_resolution,
192            node_id,
193            versions: Arc::new(DashMap::new()),
194        }
195    }
196
197    /// Get the consistency level
198    pub fn consistency_level(&self) -> ConsistencyLevel {
199        self.consistency_level
200    }
201
202    /// Set the consistency level
203    pub fn set_consistency_level(&mut self, level: ConsistencyLevel) {
204        self.consistency_level = level;
205    }
206
207    /// Resolve conflicts between two versioned values
208    fn resolve_conflict(&self, v1: &VersionedValue, v2: &VersionedValue) -> VersionedValue {
209        match self.conflict_resolution {
210            ConflictResolution::LastWriteWins => {
211                if v1.is_newer_than(v2) {
212                    v1.clone()
213                } else {
214                    v2.clone()
215                }
216            }
217            ConflictResolution::VectorClock => {
218                // Use version vector to determine causality
219                if v1.version.happens_before(&v2.version) {
220                    v2.clone()
221                } else if v2.version.happens_before(&v1.version) {
222                    v1.clone()
223                } else {
224                    // Concurrent - fall back to last-write-wins
225                    if v1.is_newer_than(v2) {
226                        v1.clone()
227                    } else {
228                        v2.clone()
229                    }
230                }
231            }
232            ConflictResolution::KeepAll => {
233                // For now, just keep the newer one
234                // In a real implementation, we'd return both
235                if v1.is_newer_than(v2) {
236                    v1.clone()
237                } else {
238                    v2.clone()
239                }
240            }
241        }
242    }
243
244    /// Store a versioned value
245    pub async fn put_versioned(&self, cid: Cid, value: VersionedValue) -> Result<()> {
246        // Check for conflicts
247        if let Some(existing) = self.versions.get(&cid) {
248            let resolved = self.resolve_conflict(&existing, &value);
249            self.versions.insert(cid, resolved.clone());
250            let block = Block::new(Bytes::from(resolved.data))?;
251            self.inner.put(&block).await?;
252        } else {
253            self.versions.insert(cid, value.clone());
254            let block = Block::new(Bytes::from(value.data))?;
255            self.inner.put(&block).await?;
256        }
257
258        Ok(())
259    }
260
261    /// Get a versioned value
262    pub async fn get_versioned(&self, cid: &Cid) -> Result<Option<VersionedValue>> {
263        match self.consistency_level {
264            ConsistencyLevel::Eventual | ConsistencyLevel::One => {
265                // Return local value if available
266                if let Some(value) = self.versions.get(cid) {
267                    Ok(Some(value.clone()))
268                } else if let Some(block) = self.inner.get(cid).await? {
269                    // Reconstruct versioned value from block
270                    let value = VersionedValue::new(block.data().to_vec(), self.node_id);
271                    self.versions.insert(*cid, value.clone());
272                    Ok(Some(value))
273                } else {
274                    Ok(None)
275                }
276            }
277            ConsistencyLevel::Strong | ConsistencyLevel::Quorum { .. } => {
278                // For strong consistency, we'd need to query multiple replicas
279                // For now, just return local value
280                if let Some(value) = self.versions.get(cid) {
281                    Ok(Some(value.clone()))
282                } else if let Some(block) = self.inner.get(cid).await? {
283                    let value = VersionedValue::new(block.data().to_vec(), self.node_id);
284                    self.versions.insert(*cid, value.clone());
285                    Ok(Some(value))
286                } else {
287                    Ok(None)
288                }
289            }
290        }
291    }
292
293    /// Get the version vector for a CID
294    pub fn get_version(&self, cid: &Cid) -> Option<VersionVector> {
295        self.versions.get(cid).map(|v| v.version.clone())
296    }
297
298    /// Get statistics about the store
299    pub fn stats(&self) -> EventualStoreStats {
300        EventualStoreStats {
301            total_versioned_entries: self.versions.len(),
302            consistency_level: self.consistency_level,
303            conflict_resolution: self.conflict_resolution,
304        }
305    }
306}
307
308/// Statistics for eventually consistent store
309#[derive(Debug, Clone)]
310pub struct EventualStoreStats {
311    /// Total number of versioned entries
312    pub total_versioned_entries: usize,
313    /// Current consistency level
314    pub consistency_level: ConsistencyLevel,
315    /// Conflict resolution strategy
316    pub conflict_resolution: ConflictResolution,
317}
318
319#[async_trait]
320impl<S: BlockStore> BlockStore for EventualStore<S> {
321    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
322        if let Some(versioned) = self.get_versioned(cid).await? {
323            let block = Block::new(Bytes::from(versioned.data))?;
324            Ok(Some(block))
325        } else {
326            Ok(None)
327        }
328    }
329
330    async fn put(&self, block: &Block) -> Result<()> {
331        let value = VersionedValue::new(block.data().to_vec(), self.node_id);
332        self.put_versioned(*block.cid(), value).await
333    }
334
335    async fn has(&self, cid: &Cid) -> Result<bool> {
336        if self.versions.contains_key(cid) {
337            Ok(true)
338        } else {
339            self.inner.has(cid).await
340        }
341    }
342
343    async fn delete(&self, cid: &Cid) -> Result<()> {
344        self.versions.remove(cid);
345        self.inner.delete(cid).await
346    }
347
348    fn list_cids(&self) -> Result<Vec<Cid>> {
349        self.inner.list_cids()
350    }
351
352    fn len(&self) -> usize {
353        self.inner.len()
354    }
355
356    async fn flush(&self) -> Result<()> {
357        self.inner.flush().await
358    }
359
360    async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
361        let mut results = Vec::with_capacity(cids.len());
362        for cid in cids {
363            results.push(self.get(cid).await?);
364        }
365        Ok(results)
366    }
367
368    async fn put_many(&self, blocks: &[Block]) -> Result<()> {
369        for block in blocks {
370            self.put(block).await?;
371        }
372        Ok(())
373    }
374
375    async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
376        let mut results = Vec::with_capacity(cids.len());
377        for cid in cids {
378            results.push(self.has(cid).await?);
379        }
380        Ok(results)
381    }
382
383    async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
384        for cid in cids {
385            self.delete(cid).await?;
386        }
387        Ok(())
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    #[test]
396    fn test_version_vector_happens_before() {
397        let mut v1 = VersionVector::new();
398        v1.increment(1);
399        v1.increment(2);
400
401        let mut v2 = VersionVector::new();
402        v2.increment(1);
403        v2.increment(2);
404        v2.increment(2);
405
406        assert!(v1.happens_before(&v2));
407        assert!(!v2.happens_before(&v1));
408    }
409
410    #[test]
411    fn test_version_vector_concurrent() {
412        let mut v1 = VersionVector::new();
413        v1.increment(1);
414        v1.increment(1);
415
416        let mut v2 = VersionVector::new();
417        v2.increment(2);
418        v2.increment(2);
419
420        assert!(v1.is_concurrent(&v2));
421        assert!(v2.is_concurrent(&v1));
422    }
423
424    #[test]
425    fn test_version_vector_merge() {
426        let mut v1 = VersionVector::new();
427        v1.increment(1);
428        v1.increment(1);
429
430        let mut v2 = VersionVector::new();
431        v2.increment(2);
432        v2.increment(2);
433
434        v1.merge(&v2);
435
436        assert_eq!(v1.get(1), 2);
437        assert_eq!(v1.get(2), 2);
438    }
439
440    #[test]
441    fn test_versioned_value_newer() {
442        let v1 = VersionedValue::new(vec![1, 2, 3], 1);
443        std::thread::sleep(std::time::Duration::from_millis(10));
444        let v2 = VersionedValue::new(vec![4, 5, 6], 2);
445
446        assert!(v2.is_newer_than(&v1));
447        assert!(!v1.is_newer_than(&v2));
448    }
449
450    #[test]
451    fn test_consistency_levels() {
452        assert_eq!(ConsistencyLevel::default(), ConsistencyLevel::Eventual);
453        assert_eq!(
454            ConsistencyLevel::Quorum {
455                read_quorum: 2,
456                write_quorum: 2
457            },
458            ConsistencyLevel::Quorum {
459                read_quorum: 2,
460                write_quorum: 2
461            }
462        );
463    }
464}