1use crate::crdt::{CRDT, Mergeable, ReplicaId};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::error::Error;
10use std::fmt;
11
12#[derive(Debug, Clone, PartialEq)]
14pub enum BuilderError {
15 InvalidFieldConfig(String),
17 MissingField(String),
19 TypeMismatch(String),
21 UnsupportedStrategy(String),
23 SerializationError(String),
25 MergeError(String),
27}
28
29impl fmt::Display for BuilderError {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match self {
32 BuilderError::InvalidFieldConfig(msg) => write!(f, "Invalid field config: {}", msg),
33 BuilderError::MissingField(field) => write!(f, "Missing required field: {}", field),
34 BuilderError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
35 BuilderError::UnsupportedStrategy(strategy) => write!(f, "Unsupported strategy: {}", strategy),
36 BuilderError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
37 BuilderError::MergeError(msg) => write!(f, "Merge error: {}", msg),
38 }
39 }
40}
41
42impl Error for BuilderError {}
43
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
46pub enum CrdtStrategy {
47 Lww,
49 AddWins,
51 RemoveWins,
53 GCounter,
55 MvRegister,
57 Rga,
59 Lseq,
61 YjsTree,
63 Dag,
65}
66
67#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
69pub struct FieldConfig {
70 pub name: String,
72 pub strategy: CrdtStrategy,
74 pub optional: bool,
76 pub default: Option<serde_json::Value>,
78}
79
80#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
82pub struct CrdtBuilderConfig {
83 pub type_name: String,
85 pub fields: Vec<FieldConfig>,
87 pub replica_id_field: Option<String>,
89}
90
91pub trait CrdtField: Clone + Send + Sync {
93 fn get_value(&self) -> serde_json::Value;
95
96 fn set_value(&mut self, value: serde_json::Value) -> Result<(), BuilderError>;
98
99 fn merge(&mut self, other: &Self) -> Result<(), BuilderError>;
101
102 fn has_conflict(&self, other: &Self) -> bool;
104
105 fn strategy(&self) -> CrdtStrategy;
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
111pub struct GenericCrdtField {
112 pub name: String,
114 pub value: serde_json::Value,
116 pub strategy: CrdtStrategy,
118 pub metadata: HashMap<String, serde_json::Value>,
120}
121
122impl CrdtField for GenericCrdtField {
123 fn get_value(&self) -> serde_json::Value {
124 self.value.clone()
125 }
126
127 fn set_value(&mut self, value: serde_json::Value) -> Result<(), BuilderError> {
128 self.value = value;
129 Ok(())
130 }
131
132 fn merge(&mut self, other: &Self) -> Result<(), BuilderError> {
133 if self.strategy != other.strategy {
134 return Err(BuilderError::TypeMismatch(
135 format!("Cannot merge fields with different strategies: {:?} vs {:?}",
136 self.strategy, other.strategy)
137 ));
138 }
139
140 match self.strategy {
141 CrdtStrategy::Lww => self.merge_lww(other),
142 CrdtStrategy::AddWins => self.merge_add_wins(other),
143 CrdtStrategy::RemoveWins => self.merge_remove_wins(other),
144 CrdtStrategy::GCounter => self.merge_gcounter(other),
145 CrdtStrategy::MvRegister => self.merge_mv_register(other),
146 CrdtStrategy::Rga => self.merge_rga(other),
147 CrdtStrategy::Lseq => self.merge_lseq(other),
148 CrdtStrategy::YjsTree => self.merge_yjs_tree(other),
149 CrdtStrategy::Dag => self.merge_dag(other),
150 }
151 }
152
153 fn has_conflict(&self, other: &Self) -> bool {
154 if self.strategy != other.strategy {
155 return true;
156 }
157
158 match self.strategy {
159 CrdtStrategy::Lww => self.has_lww_conflict(other),
160 CrdtStrategy::AddWins => self.has_add_wins_conflict(other),
161 CrdtStrategy::RemoveWins => self.has_remove_wins_conflict(other),
162 CrdtStrategy::GCounter => false, CrdtStrategy::MvRegister => self.has_mv_register_conflict(other),
164 CrdtStrategy::Rga => self.has_rga_conflict(other),
165 CrdtStrategy::Lseq => self.has_lseq_conflict(other),
166 CrdtStrategy::YjsTree => self.has_yjs_tree_conflict(other),
167 CrdtStrategy::Dag => self.has_dag_conflict(other),
168 }
169 }
170
171 fn strategy(&self) -> CrdtStrategy {
172 self.strategy.clone()
173 }
174}
175
176impl GenericCrdtField {
177 pub fn new(name: String, value: serde_json::Value, strategy: CrdtStrategy) -> Self {
179 Self {
180 name,
181 value,
182 strategy,
183 metadata: HashMap::new(),
184 }
185 }
186
187 fn merge_lww(&mut self, other: &Self) -> Result<(), BuilderError> {
189 let self_timestamp = self.metadata.get("timestamp")
190 .and_then(|v| v.as_u64())
191 .unwrap_or(0);
192 let other_timestamp = other.metadata.get("timestamp")
193 .and_then(|v| v.as_u64())
194 .unwrap_or(0);
195
196 if other_timestamp >= self_timestamp {
197 self.value = other.value.clone();
198 self.metadata = other.metadata.clone();
199 }
200
201 Ok(())
202 }
203
204 fn merge_add_wins(&mut self, other: &Self) -> Result<(), BuilderError> {
206 if let (Some(self_set), Some(other_set)) = (
208 self.value.as_array(),
209 other.value.as_array()
210 ) {
211 let mut combined: Vec<serde_json::Value> = self_set.clone();
212 for item in other_set {
213 if !combined.contains(item) {
214 combined.push(item.clone());
215 }
216 }
217 self.value = serde_json::Value::Array(combined);
218 } else {
219 self.merge_lww(other)?;
221 }
222
223 Ok(())
224 }
225
226 fn merge_remove_wins(&mut self, other: &Self) -> Result<(), BuilderError> {
228 if let (Some(self_set), Some(other_set)) = (
230 self.value.as_array(),
231 other.value.as_array()
232 ) {
233 let mut combined: Vec<serde_json::Value> = self_set.clone();
234 for item in other_set {
235 if !combined.contains(item) {
236 combined.push(item.clone());
237 }
238 }
239
240 combined.retain(|item| {
242 !other.metadata.get("removed")
243 .and_then(|v| v.as_array())
244 .map(|removed| removed.contains(item))
245 .unwrap_or(false)
246 });
247
248 self.value = serde_json::Value::Array(combined);
249 } else {
250 self.merge_lww(other)?;
252 }
253
254 Ok(())
255 }
256
257 fn merge_gcounter(&mut self, other: &Self) -> Result<(), BuilderError> {
259 if let (Some(self_count), Some(other_count)) = (
260 self.value.as_u64(),
261 other.value.as_u64()
262 ) {
263 self.value = serde_json::Value::Number(serde_json::Number::from(
264 self_count.max(other_count)
265 ));
266 }
267
268 Ok(())
269 }
270
271 fn merge_mv_register(&mut self, other: &Self) -> Result<(), BuilderError> {
273 if let (Some(self_values), Some(other_values)) = (
275 self.value.as_array(),
276 other.value.as_array()
277 ) {
278 let mut combined: Vec<serde_json::Value> = self_values.clone();
279 for value in other_values {
280 if !combined.contains(value) {
281 combined.push(value.clone());
282 }
283 }
284 self.value = serde_json::Value::Array(combined);
285 } else {
286 self.merge_lww(other)?;
288 }
289
290 Ok(())
291 }
292
293 fn merge_rga(&mut self, other: &Self) -> Result<(), BuilderError> {
295 self.merge_add_wins(other)
297 }
298
299 fn merge_lseq(&mut self, other: &Self) -> Result<(), BuilderError> {
301 self.merge_add_wins(other)
303 }
304
305 fn merge_yjs_tree(&mut self, other: &Self) -> Result<(), BuilderError> {
307 self.merge_add_wins(other)
309 }
310
311 fn merge_dag(&mut self, other: &Self) -> Result<(), BuilderError> {
313 self.merge_add_wins(other)
315 }
316
317 fn has_lww_conflict(&self, other: &Self) -> bool {
319 let self_timestamp = self.metadata.get("timestamp")
320 .and_then(|v| v.as_u64())
321 .unwrap_or(0);
322 let other_timestamp = other.metadata.get("timestamp")
323 .and_then(|v| v.as_u64())
324 .unwrap_or(0);
325
326 self.value != other.value && self_timestamp == other_timestamp
327 }
328
329 fn has_add_wins_conflict(&self, _other: &Self) -> bool {
330 false }
332
333 fn has_remove_wins_conflict(&self, _other: &Self) -> bool {
334 false }
336
337 fn has_mv_register_conflict(&self, other: &Self) -> bool {
338 self.value != other.value
339 }
340
341 fn has_rga_conflict(&self, other: &Self) -> bool {
342 self.value != other.value
343 }
344
345 fn has_lseq_conflict(&self, other: &Self) -> bool {
346 self.value != other.value
347 }
348
349 fn has_yjs_tree_conflict(&self, other: &Self) -> bool {
350 self.value != other.value
351 }
352
353 fn has_dag_conflict(&self, other: &Self) -> bool {
354 self.value != other.value
355 }
356}
357
358#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
360pub struct CustomCrdt {
361 pub config: CrdtBuilderConfig,
363 pub fields: HashMap<String, GenericCrdtField>,
365 pub replica_id: ReplicaId,
367}
368
369impl CRDT for CustomCrdt {
370 fn replica_id(&self) -> &ReplicaId {
371 &self.replica_id
372 }
373}
374
375impl Mergeable for CustomCrdt {
376 type Error = BuilderError;
377
378 fn merge(&mut self, other: &Self) -> Result<(), Self::Error> {
379 if self.config.type_name != other.config.type_name {
380 return Err(BuilderError::TypeMismatch(
381 format!("Cannot merge CRDTs of different types: {} vs {}",
382 self.config.type_name, other.config.type_name)
383 ));
384 }
385
386 for (field_name, other_field) in &other.fields {
388 if let Some(self_field) = self.fields.get_mut(field_name) {
389 self_field.merge(other_field)?;
390 } else {
391 self.fields.insert(field_name.clone(), other_field.clone());
393 }
394 }
395
396 Ok(())
397 }
398
399 fn has_conflict(&self, other: &Self) -> bool {
400 if self.config.type_name != other.config.type_name {
401 return true;
402 }
403
404 for (field_name, self_field) in &self.fields {
406 if let Some(other_field) = other.fields.get(field_name) {
407 if self_field.has_conflict(other_field) {
408 return true;
409 }
410 }
411 }
412
413 false
414 }
415}
416
417impl CustomCrdt {
418 pub fn new(config: CrdtBuilderConfig, replica_id: ReplicaId) -> Self {
420 let mut fields = HashMap::new();
421
422 for field_config in &config.fields {
424 let default_value = field_config.default.clone()
425 .unwrap_or_else(|| serde_json::Value::Null);
426
427 let field = GenericCrdtField::new(
428 field_config.name.clone(),
429 default_value,
430 field_config.strategy.clone(),
431 );
432
433 fields.insert(field_config.name.clone(), field);
434 }
435
436 Self {
437 config,
438 fields,
439 replica_id,
440 }
441 }
442
443 pub fn get_field(&self, field_name: &str) -> Option<&serde_json::Value> {
445 self.fields.get(field_name).map(|f| &f.value)
446 }
447
448 pub fn set_field(&mut self, field_name: &str, value: serde_json::Value) -> Result<(), BuilderError> {
450 if let Some(field) = self.fields.get_mut(field_name) {
451 field.set_value(value)?;
452 if field.strategy == CrdtStrategy::Lww {
454 field.metadata.insert("timestamp".to_string(),
455 serde_json::Value::Number(serde_json::Number::from(
456 std::time::SystemTime::now()
457 .duration_since(std::time::UNIX_EPOCH)
458 .unwrap_or_default()
459 .as_millis() as u64
460 )));
461 }
462 Ok(())
463 } else {
464 Err(BuilderError::MissingField(field_name.to_string()))
465 }
466 }
467
468 pub fn field_names(&self) -> Vec<String> {
470 self.fields.keys().cloned().collect()
471 }
472
473 pub fn get_field_config(&self, field_name: &str) -> Option<&FieldConfig> {
475 self.config.fields.iter().find(|f| f.name == field_name)
476 }
477}
478
479pub struct CrdtBuilder {
481 config: CrdtBuilderConfig,
482}
483
484impl CrdtBuilder {
485 pub fn new(type_name: String) -> Self {
487 Self {
488 config: CrdtBuilderConfig {
489 type_name,
490 fields: Vec::new(),
491 replica_id_field: None,
492 },
493 }
494 }
495
496 pub fn add_field(mut self, name: String, strategy: CrdtStrategy) -> Self {
498 self.config.fields.push(FieldConfig {
499 name,
500 strategy,
501 optional: false,
502 default: None,
503 });
504 self
505 }
506
507 pub fn add_optional_field(mut self, name: String, strategy: CrdtStrategy, default: serde_json::Value) -> Self {
509 self.config.fields.push(FieldConfig {
510 name,
511 strategy,
512 optional: true,
513 default: Some(default),
514 });
515 self
516 }
517
518 pub fn replica_id_field(mut self, field_name: String) -> Self {
520 self.config.replica_id_field = Some(field_name);
521 self
522 }
523
524 pub fn build(self) -> CrdtBuilderConfig {
526 self.config
527 }
528
529 pub fn create_crdt(self, replica_id: ReplicaId) -> CustomCrdt {
531 CustomCrdt::new(self.config, replica_id)
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538 use crate::crdt::ReplicaId;
539 use uuid::Uuid;
540
541 #[test]
542 fn test_crdt_builder_creation() {
543 let config = CrdtBuilder::new("TestCRDT".to_string())
544 .add_field("name".to_string(), CrdtStrategy::Lww)
545 .add_field("count".to_string(), CrdtStrategy::GCounter)
546 .add_optional_field("tags".to_string(), CrdtStrategy::AddWins,
547 serde_json::Value::Array(vec![]))
548 .build();
549
550 assert_eq!(config.type_name, "TestCRDT");
551 assert_eq!(config.fields.len(), 3);
552 assert_eq!(config.fields[0].name, "name");
553 assert_eq!(config.fields[0].strategy, CrdtStrategy::Lww);
554 assert_eq!(config.fields[1].name, "count");
555 assert_eq!(config.fields[1].strategy, CrdtStrategy::GCounter);
556 assert_eq!(config.fields[2].name, "tags");
557 assert_eq!(config.fields[2].strategy, CrdtStrategy::AddWins);
558 assert!(config.fields[2].optional);
559 }
560
561 #[test]
562 fn test_custom_crdt_creation() {
563 let replica_id = ReplicaId::from(Uuid::new_v4());
564 let config = CrdtBuilder::new("TestCRDT".to_string())
565 .add_field("name".to_string(), CrdtStrategy::Lww)
566 .add_field("count".to_string(), CrdtStrategy::GCounter)
567 .build();
568
569 let crdt = CustomCrdt::new(config, replica_id.clone());
570
571 assert_eq!(crdt.replica_id(), &replica_id);
572 assert_eq!(crdt.field_names().len(), 2);
573 assert!(crdt.get_field("name").is_some());
574 assert!(crdt.get_field("count").is_some());
575 }
576
577 #[test]
578 fn test_custom_crdt_field_operations() {
579 let replica_id = ReplicaId::from(Uuid::new_v4());
580 let config = CrdtBuilder::new("TestCRDT".to_string())
581 .add_field("name".to_string(), CrdtStrategy::Lww)
582 .add_field("count".to_string(), CrdtStrategy::GCounter)
583 .build();
584
585 let mut crdt = CustomCrdt::new(config, replica_id);
586
587 crdt.set_field("name", serde_json::Value::String("test".to_string())).unwrap();
589 crdt.set_field("count", serde_json::Value::Number(serde_json::Number::from(42))).unwrap();
590
591 assert_eq!(crdt.get_field("name"), Some(&serde_json::Value::String("test".to_string())));
593 assert_eq!(crdt.get_field("count"), Some(&serde_json::Value::Number(serde_json::Number::from(42))));
594 }
595
596 #[test]
597 fn test_custom_crdt_merge() {
598 let replica_id1 = ReplicaId::from(Uuid::new_v4());
599 let replica_id2 = ReplicaId::from(Uuid::new_v4());
600
601 let config = CrdtBuilder::new("TestCRDT".to_string())
602 .add_field("name".to_string(), CrdtStrategy::Lww)
603 .add_field("count".to_string(), CrdtStrategy::GCounter)
604 .build();
605
606 let mut crdt1 = CustomCrdt::new(config.clone(), replica_id1);
607 let mut crdt2 = CustomCrdt::new(config, replica_id2);
608
609 crdt1.set_field("name", serde_json::Value::String("alice".to_string())).unwrap();
611 crdt1.set_field("count", serde_json::Value::Number(serde_json::Number::from(10))).unwrap();
612
613 std::thread::sleep(std::time::Duration::from_millis(1));
615
616 crdt2.set_field("name", serde_json::Value::String("bob".to_string())).unwrap();
617 crdt2.set_field("count", serde_json::Value::Number(serde_json::Number::from(20))).unwrap();
618
619 crdt1.merge(&crdt2).unwrap();
621
622 assert_eq!(crdt1.get_field("name"), Some(&serde_json::Value::String("bob".to_string())));
625 assert_eq!(crdt1.get_field("count"), Some(&serde_json::Value::Number(serde_json::Number::from(20))));
627 }
628
629 #[test]
630 fn test_custom_crdt_conflict_detection() {
631 let replica_id1 = ReplicaId::from(Uuid::new_v4());
632 let replica_id2 = ReplicaId::from(Uuid::new_v4());
633
634 let config = CrdtBuilder::new("TestCRDT".to_string())
635 .add_field("name".to_string(), CrdtStrategy::Lww)
636 .add_field("count".to_string(), CrdtStrategy::GCounter)
637 .build();
638
639 let mut crdt1 = CustomCrdt::new(config.clone(), replica_id1);
640 let mut crdt2 = CustomCrdt::new(config, replica_id2);
641
642 crdt1.set_field("name", serde_json::Value::String("alice".to_string())).unwrap();
644 crdt2.set_field("name", serde_json::Value::String("bob".to_string())).unwrap();
645
646 if let Some(field1) = crdt1.fields.get_mut("name") {
648 field1.metadata.insert("timestamp".to_string(), serde_json::Value::Number(serde_json::Number::from(1000)));
649 }
650 if let Some(field2) = crdt2.fields.get_mut("name") {
651 field2.metadata.insert("timestamp".to_string(), serde_json::Value::Number(serde_json::Number::from(1000)));
652 }
653
654 assert!(crdt1.has_conflict(&crdt2));
656 }
657
658 #[test]
659 fn test_generic_field_merge_strategies() {
660 let mut field1 = GenericCrdtField::new(
662 "test".to_string(),
663 serde_json::Value::String("alice".to_string()),
664 CrdtStrategy::Lww,
665 );
666 let field2 = GenericCrdtField::new(
667 "test".to_string(),
668 serde_json::Value::String("bob".to_string()),
669 CrdtStrategy::Lww,
670 );
671
672 field1.metadata.insert("timestamp".to_string(), serde_json::Value::Number(serde_json::Number::from(1000)));
674 let mut field2_with_timestamp = field2.clone();
675 field2_with_timestamp.metadata.insert("timestamp".to_string(), serde_json::Value::Number(serde_json::Number::from(2000)));
676
677 field1.merge(&field2_with_timestamp).unwrap();
679 assert_eq!(field1.value, serde_json::Value::String("bob".to_string()));
680
681 let mut counter1 = GenericCrdtField::new(
683 "count".to_string(),
684 serde_json::Value::Number(serde_json::Number::from(10)),
685 CrdtStrategy::GCounter,
686 );
687 let counter2 = GenericCrdtField::new(
688 "count".to_string(),
689 serde_json::Value::Number(serde_json::Number::from(20)),
690 CrdtStrategy::GCounter,
691 );
692
693 counter1.merge(&counter2).unwrap();
694 assert_eq!(counter1.value, serde_json::Value::Number(serde_json::Number::from(20)));
695 }
696
697 #[test]
698 fn test_builder_error_handling() {
699 let replica_id = ReplicaId::from(Uuid::new_v4());
700 let config = CrdtBuilder::new("TestCRDT".to_string())
701 .add_field("name".to_string(), CrdtStrategy::Lww)
702 .build();
703
704 let mut crdt = CustomCrdt::new(config, replica_id);
705
706 let result = crdt.set_field("nonexistent", serde_json::Value::String("test".to_string()));
708 assert!(result.is_err());
709 assert_eq!(result.unwrap_err(), BuilderError::MissingField("nonexistent".to_string()));
710 }
711}