1use crate::traits::BlockStore;
15use async_trait::async_trait;
16use bytes::Bytes;
17use dashmap::DashMap;
18use ipfrs_core::{Block, Cid, Result};
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::{SystemTime, UNIX_EPOCH};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
26pub enum ConsistencyLevel {
27 Strong,
29 #[default]
31 Eventual,
32 Quorum {
34 read_quorum: usize,
35 write_quorum: usize,
36 },
37 One,
39}
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
43pub struct VersionVector {
44 versions: HashMap<u64, u64>,
46}
47
48impl VersionVector {
49 pub fn new() -> Self {
51 Self {
52 versions: HashMap::new(),
53 }
54 }
55
56 pub fn increment(&mut self, node_id: u64) {
58 let version = self.versions.entry(node_id).or_insert(0);
59 *version += 1;
60 }
61
62 pub fn get(&self, node_id: u64) -> u64 {
64 *self.versions.get(&node_id).unwrap_or(&0)
65 }
66
67 pub fn happens_before(&self, other: &VersionVector) -> bool {
69 let mut strictly_less = false;
70
71 for (node_id, version) in &self.versions {
72 let other_version = other.get(*node_id);
73 if *version > other_version {
74 return false; }
76 if *version < other_version {
77 strictly_less = true;
78 }
79 }
80
81 for (node_id, version) in &other.versions {
83 if !self.versions.contains_key(node_id) && *version > 0 {
84 strictly_less = true;
85 }
86 }
87
88 strictly_less
89 }
90
91 pub fn is_concurrent(&self, other: &VersionVector) -> bool {
93 !self.happens_before(other) && !other.happens_before(self) && self != other
94 }
95
96 pub fn merge(&mut self, other: &VersionVector) {
98 for (node_id, version) in &other.versions {
99 let current = self.versions.entry(*node_id).or_insert(0);
100 *current = (*current).max(*version);
101 }
102 }
103}
104
105impl Default for VersionVector {
106 fn default() -> Self {
107 Self::new()
108 }
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct VersionedValue {
114 pub data: Vec<u8>,
116 pub version: VersionVector,
118 pub timestamp: u64,
120 pub writer_node_id: u64,
122}
123
124impl VersionedValue {
125 pub fn new(data: Vec<u8>, node_id: u64) -> Self {
127 let mut version = VersionVector::new();
128 version.increment(node_id);
129
130 let timestamp = SystemTime::now()
131 .duration_since(UNIX_EPOCH)
132 .unwrap()
133 .as_millis() as u64;
134
135 Self {
136 data,
137 version,
138 timestamp,
139 writer_node_id: node_id,
140 }
141 }
142
143 pub fn is_newer_than(&self, other: &VersionedValue) -> bool {
145 if self.timestamp != other.timestamp {
146 self.timestamp > other.timestamp
147 } else {
148 self.writer_node_id > other.writer_node_id
150 }
151 }
152}
153
154#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
156pub enum ConflictResolution {
157 #[default]
159 LastWriteWins,
160 KeepAll,
162 VectorClock,
164}
165
166pub struct EventualStore<S: BlockStore> {
168 inner: Arc<S>,
170 consistency_level: ConsistencyLevel,
172 conflict_resolution: ConflictResolution,
174 node_id: u64,
176 versions: Arc<DashMap<Cid, VersionedValue>>,
178}
179
180impl<S: BlockStore> EventualStore<S> {
181 pub fn new(
183 inner: S,
184 consistency_level: ConsistencyLevel,
185 conflict_resolution: ConflictResolution,
186 node_id: u64,
187 ) -> Self {
188 Self {
189 inner: Arc::new(inner),
190 consistency_level,
191 conflict_resolution,
192 node_id,
193 versions: Arc::new(DashMap::new()),
194 }
195 }
196
197 pub fn consistency_level(&self) -> ConsistencyLevel {
199 self.consistency_level
200 }
201
202 pub fn set_consistency_level(&mut self, level: ConsistencyLevel) {
204 self.consistency_level = level;
205 }
206
207 fn resolve_conflict(&self, v1: &VersionedValue, v2: &VersionedValue) -> VersionedValue {
209 match self.conflict_resolution {
210 ConflictResolution::LastWriteWins => {
211 if v1.is_newer_than(v2) {
212 v1.clone()
213 } else {
214 v2.clone()
215 }
216 }
217 ConflictResolution::VectorClock => {
218 if v1.version.happens_before(&v2.version) {
220 v2.clone()
221 } else if v2.version.happens_before(&v1.version) {
222 v1.clone()
223 } else {
224 if v1.is_newer_than(v2) {
226 v1.clone()
227 } else {
228 v2.clone()
229 }
230 }
231 }
232 ConflictResolution::KeepAll => {
233 if v1.is_newer_than(v2) {
236 v1.clone()
237 } else {
238 v2.clone()
239 }
240 }
241 }
242 }
243
244 pub async fn put_versioned(&self, cid: Cid, value: VersionedValue) -> Result<()> {
246 if let Some(existing) = self.versions.get(&cid) {
248 let resolved = self.resolve_conflict(&existing, &value);
249 self.versions.insert(cid, resolved.clone());
250 let block = Block::new(Bytes::from(resolved.data))?;
251 self.inner.put(&block).await?;
252 } else {
253 self.versions.insert(cid, value.clone());
254 let block = Block::new(Bytes::from(value.data))?;
255 self.inner.put(&block).await?;
256 }
257
258 Ok(())
259 }
260
261 pub async fn get_versioned(&self, cid: &Cid) -> Result<Option<VersionedValue>> {
263 match self.consistency_level {
264 ConsistencyLevel::Eventual | ConsistencyLevel::One => {
265 if let Some(value) = self.versions.get(cid) {
267 Ok(Some(value.clone()))
268 } else if let Some(block) = self.inner.get(cid).await? {
269 let value = VersionedValue::new(block.data().to_vec(), self.node_id);
271 self.versions.insert(*cid, value.clone());
272 Ok(Some(value))
273 } else {
274 Ok(None)
275 }
276 }
277 ConsistencyLevel::Strong | ConsistencyLevel::Quorum { .. } => {
278 if let Some(value) = self.versions.get(cid) {
281 Ok(Some(value.clone()))
282 } else if let Some(block) = self.inner.get(cid).await? {
283 let value = VersionedValue::new(block.data().to_vec(), self.node_id);
284 self.versions.insert(*cid, value.clone());
285 Ok(Some(value))
286 } else {
287 Ok(None)
288 }
289 }
290 }
291 }
292
293 pub fn get_version(&self, cid: &Cid) -> Option<VersionVector> {
295 self.versions.get(cid).map(|v| v.version.clone())
296 }
297
298 pub fn stats(&self) -> EventualStoreStats {
300 EventualStoreStats {
301 total_versioned_entries: self.versions.len(),
302 consistency_level: self.consistency_level,
303 conflict_resolution: self.conflict_resolution,
304 }
305 }
306}
307
308#[derive(Debug, Clone)]
310pub struct EventualStoreStats {
311 pub total_versioned_entries: usize,
313 pub consistency_level: ConsistencyLevel,
315 pub conflict_resolution: ConflictResolution,
317}
318
319#[async_trait]
320impl<S: BlockStore> BlockStore for EventualStore<S> {
321 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
322 if let Some(versioned) = self.get_versioned(cid).await? {
323 let block = Block::new(Bytes::from(versioned.data))?;
324 Ok(Some(block))
325 } else {
326 Ok(None)
327 }
328 }
329
330 async fn put(&self, block: &Block) -> Result<()> {
331 let value = VersionedValue::new(block.data().to_vec(), self.node_id);
332 self.put_versioned(*block.cid(), value).await
333 }
334
335 async fn has(&self, cid: &Cid) -> Result<bool> {
336 if self.versions.contains_key(cid) {
337 Ok(true)
338 } else {
339 self.inner.has(cid).await
340 }
341 }
342
343 async fn delete(&self, cid: &Cid) -> Result<()> {
344 self.versions.remove(cid);
345 self.inner.delete(cid).await
346 }
347
348 fn list_cids(&self) -> Result<Vec<Cid>> {
349 self.inner.list_cids()
350 }
351
352 fn len(&self) -> usize {
353 self.inner.len()
354 }
355
356 async fn flush(&self) -> Result<()> {
357 self.inner.flush().await
358 }
359
360 async fn get_many(&self, cids: &[Cid]) -> Result<Vec<Option<Block>>> {
361 let mut results = Vec::with_capacity(cids.len());
362 for cid in cids {
363 results.push(self.get(cid).await?);
364 }
365 Ok(results)
366 }
367
368 async fn put_many(&self, blocks: &[Block]) -> Result<()> {
369 for block in blocks {
370 self.put(block).await?;
371 }
372 Ok(())
373 }
374
375 async fn has_many(&self, cids: &[Cid]) -> Result<Vec<bool>> {
376 let mut results = Vec::with_capacity(cids.len());
377 for cid in cids {
378 results.push(self.has(cid).await?);
379 }
380 Ok(results)
381 }
382
383 async fn delete_many(&self, cids: &[Cid]) -> Result<()> {
384 for cid in cids {
385 self.delete(cid).await?;
386 }
387 Ok(())
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[test]
396 fn test_version_vector_happens_before() {
397 let mut v1 = VersionVector::new();
398 v1.increment(1);
399 v1.increment(2);
400
401 let mut v2 = VersionVector::new();
402 v2.increment(1);
403 v2.increment(2);
404 v2.increment(2);
405
406 assert!(v1.happens_before(&v2));
407 assert!(!v2.happens_before(&v1));
408 }
409
410 #[test]
411 fn test_version_vector_concurrent() {
412 let mut v1 = VersionVector::new();
413 v1.increment(1);
414 v1.increment(1);
415
416 let mut v2 = VersionVector::new();
417 v2.increment(2);
418 v2.increment(2);
419
420 assert!(v1.is_concurrent(&v2));
421 assert!(v2.is_concurrent(&v1));
422 }
423
424 #[test]
425 fn test_version_vector_merge() {
426 let mut v1 = VersionVector::new();
427 v1.increment(1);
428 v1.increment(1);
429
430 let mut v2 = VersionVector::new();
431 v2.increment(2);
432 v2.increment(2);
433
434 v1.merge(&v2);
435
436 assert_eq!(v1.get(1), 2);
437 assert_eq!(v1.get(2), 2);
438 }
439
440 #[test]
441 fn test_versioned_value_newer() {
442 let v1 = VersionedValue::new(vec![1, 2, 3], 1);
443 std::thread::sleep(std::time::Duration::from_millis(10));
444 let v2 = VersionedValue::new(vec![4, 5, 6], 2);
445
446 assert!(v2.is_newer_than(&v1));
447 assert!(!v1.is_newer_than(&v2));
448 }
449
450 #[test]
451 fn test_consistency_levels() {
452 assert_eq!(ConsistencyLevel::default(), ConsistencyLevel::Eventual);
453 assert_eq!(
454 ConsistencyLevel::Quorum {
455 read_quorum: 2,
456 write_quorum: 2
457 },
458 ConsistencyLevel::Quorum {
459 read_quorum: 2,
460 write_quorum: 2
461 }
462 );
463 }
464}