1use crate::core::merge::DiamondCRDT;
8use parking_lot::Mutex;
9use serde_json::Value;
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::broadcast;
14
15#[derive(Debug, Clone)]
20pub struct ResourceState {
21 pub crdt: DiamondCRDT,
23
24 pub last_sync: SystemTime,
26
27 pub seen_versions: HashSet<String>,
29
30 pub merge_type: String,
32}
33
34pub struct ResourceStateManager {
40 resources: Arc<Mutex<HashMap<String, Arc<Mutex<ResourceState>>>>>,
42
43 new_resource_tx: broadcast::Sender<String>,
45}
46
47impl ResourceStateManager {
48 #[must_use]
50 pub fn new() -> Self {
51 let (tx, _) = broadcast::channel(1024);
52 Self {
53 resources: Arc::new(Mutex::new(HashMap::new())),
54 new_resource_tx: tx,
55 }
56 }
57
58 #[must_use]
62 pub fn get_or_create_resource(
63 &self,
64 resource_id: &str,
65 initial_agent_id: &str,
66 requested_merge_type: Option<&str>,
67 ) -> Arc<Mutex<ResourceState>> {
68 let mut resources = self.resources.lock();
69
70 resources
71 .entry(resource_id.to_string())
72 .or_insert_with(|| {
73 let merge_type = requested_merge_type
74 .unwrap_or(crate::core::protocol_mod::constants::merge_types::DIAMOND)
75 .to_string();
76
77 let _ = self.new_resource_tx.send(resource_id.to_string());
79
80 Arc::new(Mutex::new(ResourceState {
81 crdt: DiamondCRDT::new(initial_agent_id),
82 last_sync: SystemTime::now(),
83 seen_versions: HashSet::new(),
84 merge_type,
85 }))
86 })
87 .clone()
88 }
89
90 pub fn subscribe_to_indices(&self) -> broadcast::Receiver<String> {
92 self.new_resource_tx.subscribe()
93 }
94
95 #[inline]
97 #[must_use]
98 pub fn get_resource(&self, resource_id: &str) -> Option<Arc<Mutex<ResourceState>>> {
99 let resources = self.resources.lock();
100 resources.get(resource_id).cloned()
101 }
102
103 #[must_use]
105 pub fn list_resources(&self) -> Vec<String> {
106 let resources = self.resources.lock();
107 resources.keys().cloned().collect()
108 }
109
110 #[must_use]
112 pub fn has_version(&self, resource_id: &str, version_id: &str) -> bool {
113 if let Some(resource) = self.get_resource(resource_id) {
114 let state = resource.lock();
115 state.seen_versions.contains(version_id)
116 } else {
117 false
118 }
119 }
120
121 pub fn apply_update(
125 &self,
126 resource_id: &str,
127 content: &str,
128 agent_id: &str,
129 version_id: Option<&str>,
130 requested_merge_type: Option<&str>,
131 ) -> Result<Value, String> {
132 let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
133 let mut state = resource.lock();
134
135 if let Some(req_mt) = requested_merge_type {
136 if state.merge_type != req_mt {
137 return Err(format!(
138 "Merge-type mismatch: resource is {}, requested {}",
139 state.merge_type, req_mt
140 ));
141 }
142 }
143
144 if let Some(vid) = version_id {
145 if state.seen_versions.contains(vid) {
146 return Ok(state.crdt.export_operations());
147 }
148 state.seen_versions.insert(vid.to_string());
149 }
150
151 state.crdt.add_insert(0, content);
152
153 if let Some(vid) = version_id {
155 let frontier = state.crdt.get_local_frontier();
156 state
157 .crdt
158 .register_version_mapping(vid.to_string(), frontier);
159 }
160
161 state.last_sync = SystemTime::now();
162 Ok(state.crdt.export_operations())
163 }
164
165 pub fn apply_remote_insert(
167 &self,
168 resource_id: &str,
169 agent_id: &str,
170 pos: usize,
171 text: &str,
172 version_id: Option<&str>,
173 requested_merge_type: Option<&str>,
174 ) -> Result<Value, String> {
175 let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
176 let mut state = resource.lock();
177
178 if let Some(req_mt) = requested_merge_type {
179 if state.merge_type != req_mt {
180 return Err(format!(
181 "Merge-type mismatch: {} vs {}",
182 state.merge_type, req_mt
183 ));
184 }
185 }
186
187 if let Some(vid) = version_id {
188 if state.seen_versions.contains(vid) {
189 return Ok(state.crdt.export_operations());
190 }
191 state.seen_versions.insert(vid.to_string());
192 }
193
194 state.crdt.add_insert_remote(agent_id, pos, text);
195 state.last_sync = SystemTime::now();
196
197 Ok(state.crdt.export_operations())
198 }
199
200 pub fn apply_remote_insert_versioned(
202 &self,
203 resource_id: &str,
204 agent_id: &str,
205 parents: &[&str],
206 pos: usize,
207 text: &str,
208 version_id: Option<&str>,
209 requested_merge_type: Option<&str>,
210 ) -> Result<Value, String> {
211 let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
212 let mut state = resource.lock();
213
214 if let Some(vid) = version_id {
215 if state.seen_versions.contains(vid) {
216 return Ok(state.crdt.export_operations());
217 }
218 state.seen_versions.insert(vid.to_string());
219 }
220
221 state
222 .crdt
223 .add_insert_remote_versioned(agent_id, parents, pos, text, version_id);
224 state.last_sync = SystemTime::now();
225
226 Ok(state.crdt.export_operations())
227 }
228
229 pub fn apply_remote_delete(
231 &self,
232 resource_id: &str,
233 agent_id: &str,
234 start: usize,
235 end: usize,
236 version_id: Option<&str>,
237 requested_merge_type: Option<&str>,
238 ) -> Result<Value, String> {
239 let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
240 let mut state = resource.lock();
241
242 if let Some(req_mt) = requested_merge_type {
243 if state.merge_type != req_mt {
244 return Err(format!(
245 "Merge-type mismatch: {} vs {}",
246 state.merge_type, req_mt
247 ));
248 }
249 }
250
251 if let Some(vid) = version_id {
252 if state.seen_versions.contains(vid) {
253 return Ok(state.crdt.export_operations());
254 }
255 state.seen_versions.insert(vid.to_string());
256 }
257
258 state.crdt.add_delete_remote(agent_id, start..end);
259 state.last_sync = SystemTime::now();
260
261 Ok(state.crdt.export_operations())
262 }
263
264 pub fn apply_remote_delete_versioned(
266 &self,
267 resource_id: &str,
268 agent_id: &str,
269 parents: &[&str],
270 range: std::ops::Range<usize>,
271 version_id: Option<&str>,
272 requested_merge_type: Option<&str>,
273 ) -> Result<Value, String> {
274 let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
275 let mut state = resource.lock();
276
277 if let Some(vid) = version_id {
278 if state.seen_versions.contains(vid) {
279 return Ok(state.crdt.export_operations());
280 }
281 state.seen_versions.insert(vid.to_string());
282 }
283
284 state
285 .crdt
286 .add_delete_remote_versioned(agent_id, parents, range, version_id);
287 state.last_sync = SystemTime::now();
288
289 Ok(state.crdt.export_operations())
290 }
291
292 #[inline]
296 #[must_use]
297 pub fn get_resource_state(&self, resource_id: &str) -> Option<Value> {
298 self.get_resource(resource_id).map(|resource| {
299 let state = resource.lock();
300 state.crdt.checkpoint()
301 })
302 }
303
304 #[inline]
306 #[must_use]
307 pub fn get_merge_quality(&self, resource_id: &str) -> Option<u32> {
308 self.get_resource(resource_id).map(|resource| {
309 let state = resource.lock();
310 state.crdt.merge_quality()
311 })
312 }
313
314 pub fn register_version_mapping(
316 &self,
317 resource_id: &str,
318 version: String,
319 frontier: crate::vendor::diamond_types::Frontier,
320 ) {
321 if let Some(resource) = self.get_resource(resource_id) {
322 let mut state = resource.lock();
323 state.crdt.register_version_mapping(version, frontier);
324 }
325 }
326
327 pub fn get_history(
329 &self,
330 resource_id: &str,
331 since_versions: &[&str],
332 ) -> Result<Vec<crate::vendor::diamond_types::SerializedOpsOwned>, String> {
333 let resource = self
334 .get_resource(resource_id)
335 .ok_or_else(|| format!("Resource not found: {}", resource_id))?;
336 let state = resource.lock();
337
338 let mut frontiers = Vec::new();
339 for v in since_versions {
340 if let Some(f) = state.crdt.resolve_version(v) {
341 frontiers.push(f.clone());
342 } else {
343 return Err(format!("Version not found/pruned: {}", v));
344 }
345 }
346
347 Ok(state.crdt.get_ops_since(&frontiers))
348 }
349}
350
351impl Clone for ResourceStateManager {
352 fn clone(&self) -> Self {
353 Self {
354 resources: Arc::clone(&self.resources),
355 new_resource_tx: self.new_resource_tx.clone(),
356 }
357 }
358}
359
360impl Default for ResourceStateManager {
361 fn default() -> Self {
362 Self::new()
363 }
364}