use crate::core::merge::DiamondCRDT;
use parking_lot::Mutex;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub struct ResourceState {
pub crdt: DiamondCRDT,
pub last_sync: SystemTime,
pub seen_versions: HashSet<String>,
pub merge_type: String,
}
pub struct ResourceStateManager {
resources: Arc<Mutex<HashMap<String, Arc<Mutex<ResourceState>>>>>,
new_resource_tx: broadcast::Sender<String>,
}
impl ResourceStateManager {
#[must_use]
pub fn new() -> Self {
let (tx, _) = broadcast::channel(1024);
Self {
resources: Arc::new(Mutex::new(HashMap::new())),
new_resource_tx: tx,
}
}
#[must_use]
pub fn get_or_create_resource(
&self,
resource_id: &str,
initial_agent_id: &str,
requested_merge_type: Option<&str>,
) -> Arc<Mutex<ResourceState>> {
let mut resources = self.resources.lock();
resources
.entry(resource_id.to_string())
.or_insert_with(|| {
let merge_type = requested_merge_type
.unwrap_or(crate::core::protocol_mod::constants::merge_types::DIAMOND)
.to_string();
let _ = self.new_resource_tx.send(resource_id.to_string());
Arc::new(Mutex::new(ResourceState {
crdt: DiamondCRDT::new(initial_agent_id),
last_sync: SystemTime::now(),
seen_versions: HashSet::new(),
merge_type,
}))
})
.clone()
}
pub fn subscribe_to_indices(&self) -> broadcast::Receiver<String> {
self.new_resource_tx.subscribe()
}
#[inline]
#[must_use]
pub fn get_resource(&self, resource_id: &str) -> Option<Arc<Mutex<ResourceState>>> {
let resources = self.resources.lock();
resources.get(resource_id).cloned()
}
#[must_use]
pub fn list_resources(&self) -> Vec<String> {
let resources = self.resources.lock();
resources.keys().cloned().collect()
}
#[must_use]
pub fn has_version(&self, resource_id: &str, version_id: &str) -> bool {
if let Some(resource) = self.get_resource(resource_id) {
let state = resource.lock();
state.seen_versions.contains(version_id)
} else {
false
}
}
pub fn apply_update(
&self,
resource_id: &str,
content: &str,
agent_id: &str,
version_id: Option<&str>,
requested_merge_type: Option<&str>,
) -> Result<Value, String> {
let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
let mut state = resource.lock();
if let Some(req_mt) = requested_merge_type {
if state.merge_type != req_mt {
return Err(format!(
"Merge-type mismatch: resource is {}, requested {}",
state.merge_type, req_mt
));
}
}
if let Some(vid) = version_id {
if state.seen_versions.contains(vid) {
return Ok(state.crdt.export_operations());
}
state.seen_versions.insert(vid.to_string());
}
state.crdt.add_insert(0, content);
if let Some(vid) = version_id {
let frontier = state.crdt.get_local_frontier();
state
.crdt
.register_version_mapping(vid.to_string(), frontier);
}
state.last_sync = SystemTime::now();
Ok(state.crdt.export_operations())
}
pub fn apply_remote_insert(
&self,
resource_id: &str,
agent_id: &str,
pos: usize,
text: &str,
version_id: Option<&str>,
requested_merge_type: Option<&str>,
) -> Result<Value, String> {
let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
let mut state = resource.lock();
if let Some(req_mt) = requested_merge_type {
if state.merge_type != req_mt {
return Err(format!(
"Merge-type mismatch: {} vs {}",
state.merge_type, req_mt
));
}
}
if let Some(vid) = version_id {
if state.seen_versions.contains(vid) {
return Ok(state.crdt.export_operations());
}
state.seen_versions.insert(vid.to_string());
}
state.crdt.add_insert_remote(agent_id, pos, text);
state.last_sync = SystemTime::now();
Ok(state.crdt.export_operations())
}
pub fn apply_remote_insert_versioned(
&self,
resource_id: &str,
agent_id: &str,
parents: &[&str],
pos: usize,
text: &str,
version_id: Option<&str>,
requested_merge_type: Option<&str>,
) -> Result<Value, String> {
let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
let mut state = resource.lock();
if let Some(vid) = version_id {
if state.seen_versions.contains(vid) {
return Ok(state.crdt.export_operations());
}
state.seen_versions.insert(vid.to_string());
}
state
.crdt
.add_insert_remote_versioned(agent_id, parents, pos, text, version_id);
state.last_sync = SystemTime::now();
Ok(state.crdt.export_operations())
}
pub fn apply_remote_delete(
&self,
resource_id: &str,
agent_id: &str,
start: usize,
end: usize,
version_id: Option<&str>,
requested_merge_type: Option<&str>,
) -> Result<Value, String> {
let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
let mut state = resource.lock();
if let Some(req_mt) = requested_merge_type {
if state.merge_type != req_mt {
return Err(format!(
"Merge-type mismatch: {} vs {}",
state.merge_type, req_mt
));
}
}
if let Some(vid) = version_id {
if state.seen_versions.contains(vid) {
return Ok(state.crdt.export_operations());
}
state.seen_versions.insert(vid.to_string());
}
state.crdt.add_delete_remote(agent_id, start..end);
state.last_sync = SystemTime::now();
Ok(state.crdt.export_operations())
}
pub fn apply_remote_delete_versioned(
&self,
resource_id: &str,
agent_id: &str,
parents: &[&str],
range: std::ops::Range<usize>,
version_id: Option<&str>,
requested_merge_type: Option<&str>,
) -> Result<Value, String> {
let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
let mut state = resource.lock();
if let Some(vid) = version_id {
if state.seen_versions.contains(vid) {
return Ok(state.crdt.export_operations());
}
state.seen_versions.insert(vid.to_string());
}
state
.crdt
.add_delete_remote_versioned(agent_id, parents, range, version_id);
state.last_sync = SystemTime::now();
Ok(state.crdt.export_operations())
}
#[inline]
#[must_use]
pub fn get_resource_state(&self, resource_id: &str) -> Option<Value> {
self.get_resource(resource_id).map(|resource| {
let state = resource.lock();
state.crdt.checkpoint()
})
}
#[inline]
#[must_use]
pub fn get_merge_quality(&self, resource_id: &str) -> Option<u32> {
self.get_resource(resource_id).map(|resource| {
let state = resource.lock();
state.crdt.merge_quality()
})
}
pub fn register_version_mapping(
&self,
resource_id: &str,
version: String,
frontier: crate::vendor::diamond_types::Frontier,
) {
if let Some(resource) = self.get_resource(resource_id) {
let mut state = resource.lock();
state.crdt.register_version_mapping(version, frontier);
}
}
pub fn get_history(
&self,
resource_id: &str,
since_versions: &[&str],
) -> Result<Vec<crate::vendor::diamond_types::SerializedOpsOwned>, String> {
let resource = self
.get_resource(resource_id)
.ok_or_else(|| format!("Resource not found: {}", resource_id))?;
let state = resource.lock();
let mut frontiers = Vec::new();
for v in since_versions {
if let Some(f) = state.crdt.resolve_version(v) {
frontiers.push(f.clone());
} else {
return Err(format!("Version not found/pruned: {}", v));
}
}
Ok(state.crdt.get_ops_since(&frontiers))
}
}
impl Clone for ResourceStateManager {
fn clone(&self) -> Self {
Self {
resources: Arc::clone(&self.resources),
new_resource_tx: self.new_resource_tx.clone(),
}
}
}
impl Default for ResourceStateManager {
fn default() -> Self {
Self::new()
}
}