use crate::vendor::diamond_types::list::operation::TextOperation;
use crate::vendor::diamond_types::{CRDTKind, CreateValue, HasLength, LV};
use parking_lot::Mutex;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
use super::merge_type::{MergePatch, MergeResult, MergeType};
#[derive(Clone, Debug)]
pub struct DiamondCRDT {
agent_id: String,
oplog: crate::vendor::diamond_types::OpLog,
branch: crate::vendor::diamond_types::Branch,
remote_versions: HashMap<String, u32>,
version_fronties: HashMap<String, crate::vendor::diamond_types::Frontier>,
text_id: LV,
}
impl DiamondCRDT {
#[must_use]
pub fn new(agent_id: impl Into<String>) -> Self {
let agent_id_str = agent_id.into();
let mut oplog = crate::vendor::diamond_types::OpLog::new();
let genesis_agent = oplog.cg.get_or_create_agent_id("genesis");
let text_id = oplog.local_map_set(
genesis_agent,
crate::vendor::diamond_types::ROOT_CRDT_ID,
"content",
CreateValue::NewCRDT(CRDTKind::Text),
);
let branch = oplog.checkout_tip();
Self {
agent_id: agent_id_str,
oplog,
branch,
remote_versions: HashMap::new(),
version_fronties: HashMap::new(),
text_id,
}
}
pub fn add_insert(&mut self, pos: usize, text: &str) {
let agent = self.oplog.cg.get_or_create_agent_id(&self.agent_id);
self.oplog
.local_text_op(agent, self.text_id, TextOperation::new_insert(pos, text));
let frontier = self.get_local_frontier();
self.branch = self.oplog.checkout_tip();
let version = self.get_version();
self.version_fronties.insert(version, frontier);
}
pub fn add_delete(&mut self, range: std::ops::Range<usize>) {
let agent = self.oplog.cg.get_or_create_agent_id(&self.agent_id);
self.oplog
.local_text_op(agent, self.text_id, TextOperation::new_delete(range));
let frontier = self.get_local_frontier();
self.branch = self.oplog.checkout_tip();
let version = self.get_version();
self.version_fronties.insert(version, frontier);
}
pub fn add_insert_remote_versioned(
&mut self,
agent_id: &str,
parents: &[&str],
pos: usize,
text: &str,
version_id: Option<&str>,
) {
let agent = self.oplog.cg.get_or_create_agent_id(agent_id);
let mut lvs = Vec::new();
for p in parents {
if let Some(frontier) = self.resolve_version(p) {
lvs.extend(frontier.as_ref());
}
}
let op =
crate::vendor::diamond_types::list::operation::TextOperation::new_insert(pos, text);
let range = self
.oplog
.cg
.assign_local_op_with_parents(&lvs, agent, op.len());
self.oplog.remote_text_op(self.text_id, range, op);
self.branch = self.oplog.checkout_tip();
if let Some(vid) = version_id {
let frontier = crate::vendor::diamond_types::Frontier::new_1(range.last());
self.register_version_mapping(vid.to_string(), frontier);
}
}
pub fn add_insert_remote(&mut self, agent_id: &str, pos: usize, text: &str) {
let agent = self.oplog.cg.get_or_create_agent_id(agent_id);
self.oplog.local_text_op(
agent,
self.text_id,
crate::vendor::diamond_types::list::operation::TextOperation::new_insert(pos, text),
);
self.branch = self.oplog.checkout_tip();
}
pub fn add_delete_remote_versioned(
&mut self,
agent_id: &str,
parents: &[&str],
range: std::ops::Range<usize>,
version_id: Option<&str>,
) {
let agent = self.oplog.cg.get_or_create_agent_id(agent_id);
let mut lvs = Vec::new();
for p in parents {
if let Some(frontier) = self.resolve_version(p) {
lvs.extend(frontier.as_ref());
}
}
let op = crate::vendor::diamond_types::list::operation::TextOperation::new_delete(range);
let range = self
.oplog
.cg
.assign_local_op_with_parents(&lvs, agent, op.len());
self.oplog.remote_text_op(self.text_id, range, op);
self.branch = self.oplog.checkout_tip();
if let Some(vid) = version_id {
let frontier = crate::vendor::diamond_types::Frontier::new_1(range.last());
self.register_version_mapping(vid.to_string(), frontier);
}
}
pub fn add_delete_remote(&mut self, agent_id: &str, range: std::ops::Range<usize>) {
let agent = self.oplog.cg.get_or_create_agent_id(agent_id);
self.oplog.local_text_op(
agent,
self.text_id,
crate::vendor::diamond_types::list::operation::TextOperation::new_delete(range),
);
self.branch = self.oplog.checkout_tip();
}
#[inline]
pub fn content(&self) -> String {
self.branch
.texts
.get(&self.text_id)
.map(|t| t.to_string())
.unwrap_or_default()
}
#[inline]
#[must_use]
pub fn agent_id(&self) -> &str {
&self.agent_id
}
#[inline]
#[must_use]
pub fn operation_count(&self) -> usize {
self.oplog.cg.len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.operation_count() <= 1
}
#[must_use]
pub fn export_operations(&self) -> Value {
let version = format!("oplog-{}-{}", self.agent_id, self.oplog.cg.len());
json!({
"agent_id": self.agent_id,
"operations_count": self.oplog.cg.len(),
"content": self.content(),
"version": version,
})
}
#[must_use]
pub fn get_version(&self) -> String {
format!("diamond-{}-{}", self.agent_id, self.oplog.cg.len())
}
pub fn get_local_frontier(&self) -> crate::vendor::diamond_types::Frontier {
self.oplog.cg.version.clone()
}
#[must_use]
pub fn checkpoint(&self) -> Value {
json!({
"content": self.content(),
"version": self.get_version(),
"agent_id": self.agent_id,
"oplog_len": self.oplog.cg.len(),
})
}
#[inline]
#[must_use]
pub fn merge_quality(&self) -> u32 {
if self.remote_versions.is_empty() {
100
} else {
let remote_agents = self.remote_versions.len() as f64;
let diversity_factor = (remote_agents / (remote_agents + 1.0)) * 100.0;
(diversity_factor.clamp(0.0, 100.0)) as u32
}
}
pub fn resolve_version(
&self,
version: &str,
) -> Option<&crate::vendor::diamond_types::Frontier> {
self.version_fronties.get(version)
}
pub fn register_version_mapping(
&mut self,
version: String,
frontier: crate::vendor::diamond_types::Frontier,
) {
self.version_fronties.insert(version, frontier);
}
pub fn get_ops_since(
&self,
since: &[crate::vendor::diamond_types::Frontier],
) -> Vec<crate::vendor::diamond_types::SerializedOpsOwned> {
let mut all_lvs = Vec::new();
for f in since {
all_lvs.extend(f.as_ref());
}
let delta = self.oplog.ops_since(&all_lvs);
vec![delta.to_owned()]
}
pub fn dbg_check(&self, deep: bool) {
self.oplog.cg.dbg_check(deep);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_insert() {
let mut crdt = DiamondCRDT::new("alice");
crdt.add_insert(0, "hello");
assert_eq!(crdt.content(), "hello");
}
#[test]
fn test_multiple_inserts() {
let mut crdt = DiamondCRDT::new("alice");
crdt.add_insert(0, "hello");
crdt.add_insert(5, " world");
assert_eq!(crdt.content(), "hello world");
}
#[test]
fn test_delete() {
let mut crdt = DiamondCRDT::new("alice");
crdt.add_insert(0, "hello world");
crdt.add_delete(5..6);
assert_eq!(crdt.content(), "helloworld");
}
#[test]
fn test_concurrent_edits() {
let mut crdt = DiamondCRDT::new("alice");
crdt.add_insert(0, "hello");
crdt.add_insert_remote("bob", 5, " world");
assert_eq!(crdt.content(), "hello world");
}
#[test]
fn test_agent_id() {
let crdt = DiamondCRDT::new("alice");
assert_eq!(crdt.agent_id(), "alice");
}
#[test]
fn test_is_empty() {
let mut crdt = DiamondCRDT::new("alice");
assert!(crdt.is_empty());
crdt.add_insert(0, "text");
assert!(!crdt.is_empty());
}
#[test]
fn test_export_operations() {
let mut crdt = DiamondCRDT::new("alice");
crdt.add_insert(0, "hello");
let export = crdt.export_operations();
assert_eq!(export["agent_id"], "alice");
assert!(export["operations_count"].is_number());
assert_eq!(export["content"], "hello");
assert!(export["version"].is_string());
}
#[test]
fn test_get_version() {
let mut crdt = DiamondCRDT::new("alice");
let v1 = crdt.get_version();
assert!(v1.contains("alice"));
crdt.add_insert(0, "text");
let v2 = crdt.get_version();
assert_ne!(v1, v2);
}
#[test]
fn test_checkpoint() {
let mut crdt = DiamondCRDT::new("alice");
crdt.add_insert(0, "hello");
let cp = crdt.checkpoint();
assert_eq!(cp["content"], "hello");
assert_eq!(cp["agent_id"], "alice");
assert!(cp["version"].is_string());
}
#[test]
fn test_merge_quality() {
let crdt = DiamondCRDT::new("alice");
assert_eq!(crdt.merge_quality(), 100);
}
}
#[derive(Debug, Clone)]
pub struct DiamondMergeType {
crdt: Arc<Mutex<DiamondCRDT>>,
}
impl DiamondMergeType {
pub fn new(agent_id: &str) -> Self {
Self {
crdt: Arc::new(Mutex::new(DiamondCRDT::new(agent_id))),
}
}
}
impl MergeType for DiamondMergeType {
fn name(&self) -> &str {
"diamond"
}
fn initialize(&mut self, content: &str) -> MergeResult {
let mut crdt = self.crdt.lock();
let len = crdt.content().len();
if len > 0 {
crdt.add_delete(0..len);
}
crdt.add_insert(0, content);
MergeResult::success(Some(crdt.get_version()), Vec::new())
}
fn apply_patch(&mut self, patch: MergePatch) -> MergeResult {
let mut crdt = self.crdt.lock();
let parents_refs: Vec<&str> = patch.parents.iter().map(|s| s.as_str()).collect();
let agent_id = patch
.version
.as_ref()
.and_then(|v| v.split('-').next())
.unwrap_or("remote");
let content_str = match &patch.content {
Value::String(s) => s.clone(),
_ => patch.content.to_string(),
};
let range_raw = patch.range.trim_matches(|c| c == '[' || c == ']');
let parts: Vec<&str> = if range_raw.contains(':') {
range_raw.split(':').collect()
} else if range_raw.contains(',') {
range_raw.split(',').collect()
} else {
vec![range_raw]
};
if parts.len() == 2 {
if let (Ok(start), Ok(end)) = (
parts[0].trim().parse::<usize>(),
parts[1].trim().parse::<usize>(),
) {
if start == end {
crdt.add_insert_remote_versioned(
agent_id,
&parents_refs,
start,
&content_str,
patch.version.as_deref(),
);
} else {
crdt.add_delete_remote_versioned(
agent_id,
&parents_refs,
start..end,
patch.version.as_deref(),
);
if !content_str.is_empty() {
crdt.add_insert_remote_versioned(
agent_id,
&parents_refs,
start,
&content_str,
patch.version.as_deref(),
);
}
}
}
} else if range_raw.is_empty() {
let len = crdt.content().len();
if len > 0 {
crdt.add_delete_remote_versioned(agent_id, &parents_refs, 0..len, None);
}
crdt.add_insert_remote_versioned(
agent_id,
&parents_refs,
0,
&content_str,
patch.version.as_deref(),
);
}
MergeResult::success(patch.version, Vec::new())
}
fn local_edit(&mut self, patch: MergePatch) -> MergeResult {
let mut crdt = self.crdt.lock();
let content_str = match &patch.content {
Value::String(s) => s.clone(),
_ => patch.content.to_string(),
};
let range_raw = patch.range.trim_matches(|c| c == '[' || c == ']');
let parts: Vec<&str> = if range_raw.contains(':') {
range_raw.split(':').collect()
} else if range_raw.contains(',') {
range_raw.split(',').collect()
} else {
vec![range_raw]
};
if parts.len() == 2 {
if let (Ok(start), Ok(end)) = (
parts[0].trim().parse::<usize>(),
parts[1].trim().parse::<usize>(),
) {
if start == end {
crdt.add_insert(start, &content_str);
} else {
crdt.add_delete(start..end);
if !content_str.is_empty() {
crdt.add_insert(start, &content_str);
}
}
}
} else if range_raw.is_empty() {
let len = crdt.content().len();
if len > 0 {
crdt.add_delete(0..len);
}
crdt.add_insert(0, &content_str);
}
let version = crdt.get_version();
let mut out_patch = patch;
out_patch.version = Some(version.clone());
out_patch.parents = vec![version.clone()];
MergeResult::success(Some(version), vec![out_patch])
}
fn get_content(&self) -> String {
self.crdt.lock().content()
}
fn get_version(&self) -> Vec<String> {
vec![self.crdt.lock().get_version()]
}
fn get_all_versions(&self) -> HashMap<String, Vec<String>> {
let mut map = HashMap::new();
map.insert(self.crdt.lock().get_version(), Vec::new());
map
}
fn prune(&mut self) -> bool {
false
}
fn clone_box(&self) -> Box<dyn MergeType> {
Box::new(self.clone())
}
}