1use async_trait::async_trait;
30use automerge::{sync, sync::SyncDoc, transaction::Transactable, Automerge};
31use std::collections::HashMap;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35use tokio::sync::mpsc;
36
37use crate::error::{Error, Result};
38use crate::qos::{DeletionPolicy, DeletionPolicyRegistry, Tombstone, TombstoneSyncMessage};
39#[cfg(feature = "automerge-backend")]
40use crate::storage::automerge_conversion::automerge_to_message;
41use crate::sync::traits::*;
42use crate::sync::types::*;
43
44#[derive(Clone)]
49pub struct AutomergeBackend {
50 documents: Arc<Mutex<HashMap<String, Automerge>>>,
52
53 sync_states: Arc<Mutex<HashMap<String, sync::State>>>,
55
56 config: Arc<Mutex<Option<BackendConfig>>>,
58
59 initialized: Arc<Mutex<bool>>,
61
62 observers: Arc<Mutex<Vec<mpsc::UnboundedSender<ChangeEvent>>>>,
64
65 tombstones: Arc<Mutex<HashMap<String, Tombstone>>>,
67
68 deletion_policy_registry: Arc<DeletionPolicyRegistry>,
70
71 lamport_counter: Arc<AtomicU64>,
73
74 pending_tombstones: Arc<Mutex<Vec<TombstoneSyncMessage>>>,
76
77 gc_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
79}
80
81impl AutomergeBackend {
82 pub fn new() -> Self {
92 Self {
93 documents: Arc::new(Mutex::new(HashMap::new())),
94 sync_states: Arc::new(Mutex::new(HashMap::new())),
95 config: Arc::new(Mutex::new(None)),
96 initialized: Arc::new(Mutex::new(false)),
97 observers: Arc::new(Mutex::new(Vec::new())),
98 tombstones: Arc::new(Mutex::new(HashMap::new())),
99 deletion_policy_registry: Arc::new(DeletionPolicyRegistry::new()),
100 lamport_counter: Arc::new(AtomicU64::new(0)),
101 pending_tombstones: Arc::new(Mutex::new(Vec::new())),
102 gc_handle: Arc::new(Mutex::new(None)),
103 }
104 }
105
106 fn doc_key(collection: &str, doc_id: &DocumentId) -> String {
108 format!("{}:{}", collection, doc_id)
109 }
110
111 fn node_id(&self) -> String {
115 self.config
116 .lock()
117 .ok()
118 .and_then(|c| c.as_ref().map(|cfg| cfg.app_id.clone()))
119 .unwrap_or_else(|| "local".to_string())
120 }
121
122 fn next_lamport(&self) -> u64 {
124 self.lamport_counter.fetch_add(1, Ordering::SeqCst)
125 }
126
127 pub fn drain_pending_tombstones(&self) -> Vec<TombstoneSyncMessage> {
132 self.pending_tombstones
133 .lock()
134 .map(|mut q| std::mem::take(&mut *q))
135 .unwrap_or_default()
136 }
137
138 fn automerge_to_document(doc: &Automerge, doc_id: DocumentId) -> Result<Document> {
142 use automerge::ReadDoc;
143
144 let mut fields = HashMap::new();
145
146 if let Ok(Some((automerge::Value::Object(automerge::ObjType::Map), obj_id))) =
148 doc.get(automerge::ROOT, "root")
149 {
150 for item in doc.map_range(&obj_id, ..) {
152 let key_str = item.key.to_string();
153 if let Ok(Some((value, nested_id))) = doc.get(&obj_id, &item.key as &str) {
154 if let Some(json_val) = Self::automerge_value_to_json(doc, &value, &nested_id) {
157 fields.insert(key_str, json_val);
158 }
159 }
160 }
161 }
162
163 Ok(Document {
164 id: Some(doc_id),
165 fields,
166 updated_at: std::time::SystemTime::now(),
167 })
168 }
169
170 fn automerge_value_to_json(
182 doc: &Automerge,
183 value: &automerge::Value,
184 obj_id: &automerge::ObjId,
185 ) -> Option<Value> {
186 use automerge::ReadDoc;
187
188 match value {
189 automerge::Value::Scalar(scalar) => Self::automerge_scalar_to_json(scalar.as_ref()),
190 automerge::Value::Object(obj_type) => {
191 match obj_type {
193 automerge::ObjType::Map | automerge::ObjType::Table => {
194 let mut map = serde_json::Map::new();
196 for item in doc.map_range(obj_id, ..) {
197 let key = item.key.to_string();
198 if let Ok(Some((nested_value, nested_obj_id))) =
199 doc.get(obj_id, &item.key as &str)
200 {
201 if let Some(json_val) = Self::automerge_value_to_json(
202 doc,
203 &nested_value,
204 &nested_obj_id,
205 ) {
206 map.insert(key, json_val);
207 }
208 }
209 }
210 Some(Value::Object(map))
211 }
212 automerge::ObjType::List => {
213 let length = doc.length(obj_id);
215 let mut arr = Vec::with_capacity(length);
216 for idx in 0..length {
217 if let Ok(Some((nested_value, nested_obj_id))) = doc.get(obj_id, idx) {
218 if let Some(json_val) = Self::automerge_value_to_json(
219 doc,
220 &nested_value,
221 &nested_obj_id,
222 ) {
223 arr.push(json_val);
224 }
225 }
226 }
227 Some(Value::Array(arr))
228 }
229 automerge::ObjType::Text => {
230 let text = doc.text(obj_id).ok()?;
232 Some(Value::String(text))
233 }
234 }
235 }
236 }
237 }
238
239 fn automerge_scalar_to_json(scalar: &automerge::ScalarValue) -> Option<Value> {
243 let json_val = match scalar {
244 automerge::ScalarValue::Str(s) => Value::String(s.to_string()),
245 automerge::ScalarValue::Int(i) => Value::Number(serde_json::Number::from(*i)),
246 automerge::ScalarValue::Uint(u) => Value::Number(serde_json::Number::from(*u)),
247 automerge::ScalarValue::F64(f) => serde_json::Number::from_f64(*f)
248 .map(Value::Number)
249 .unwrap_or(Value::Null),
250 automerge::ScalarValue::Boolean(b) => Value::Bool(*b),
251 automerge::ScalarValue::Null => Value::Null,
252 automerge::ScalarValue::Bytes(bytes) => {
253 let byte_array: Vec<serde_json::Value> = bytes
255 .iter()
256 .map(|b| Value::Number(serde_json::Number::from(*b)))
257 .collect();
258 Value::Array(byte_array)
259 }
260 automerge::ScalarValue::Counter(c) => {
261 let counter_value: i64 = i64::from(c);
264 Value::Number(serde_json::Number::from(counter_value))
265 }
266 automerge::ScalarValue::Timestamp(ts) => Value::Number(serde_json::Number::from(*ts)),
267 automerge::ScalarValue::Unknown { .. } => Value::Null,
268 };
269 Some(json_val)
270 }
271
272 fn apply_document_to_automerge(doc: &mut Automerge, document: &Document) -> Result<()> {
274 doc.transact::<_, _, automerge::AutomergeError>(|tx| {
275 let root = tx.put_object(automerge::ROOT, "root", automerge::ObjType::Map)?;
277
278 for (key, value) in &document.fields {
279 Self::put_json_value(tx, &root, key, value)?;
281 }
282
283 Ok(())
284 })
285 .map_err(|e| Error::Internal(format!("Transaction failed: {:?}", e)))?;
286
287 Ok(())
288 }
289
290 fn put_json_value(
292 tx: &mut automerge::transaction::Transaction,
293 obj: &automerge::ObjId,
294 key: &str,
295 value: &serde_json::Value,
296 ) -> std::result::Result<(), automerge::AutomergeError> {
297 use serde_json::Value;
298
299 match value {
300 Value::String(s) => {
301 tx.put(obj, key, s.clone())?;
302 }
303 Value::Number(n) => {
304 if let Some(i) = n.as_i64() {
305 tx.put(obj, key, i)?;
306 } else if let Some(f) = n.as_f64() {
307 tx.put(obj, key, f)?;
308 }
309 }
310 Value::Bool(b) => {
311 tx.put(obj, key, *b)?;
312 }
313 Value::Null => {
314 }
316 Value::Array(arr) => {
317 let list = tx.put_object(obj, key, automerge::ObjType::List)?;
319 for (idx, item) in arr.iter().enumerate() {
320 Self::insert_json_value(tx, &list, idx, item)?;
321 }
322 }
323 Value::Object(map) => {
324 let nested = tx.put_object(obj, key, automerge::ObjType::Map)?;
326 for (k, v) in map {
327 Self::put_json_value(tx, &nested, k, v)?;
328 }
329 }
330 }
331
332 Ok(())
333 }
334
335 fn insert_json_value(
337 tx: &mut automerge::transaction::Transaction,
338 list: &automerge::ObjId,
339 index: usize,
340 value: &serde_json::Value,
341 ) -> std::result::Result<(), automerge::AutomergeError> {
342 use serde_json::Value;
343
344 match value {
345 Value::String(s) => {
346 tx.insert(list, index, s.clone())?;
347 }
348 Value::Number(n) => {
349 if let Some(i) = n.as_i64() {
350 tx.insert(list, index, i)?;
351 } else if let Some(f) = n.as_f64() {
352 tx.insert(list, index, f)?;
353 }
354 }
355 Value::Bool(b) => {
356 tx.insert(list, index, *b)?;
357 }
358 Value::Null => {
359 }
361 Value::Array(_) | Value::Object(_) => {
362 let json_str =
364 serde_json::to_string(value).map_err(|_| automerge::AutomergeError::Fail)?;
365 tx.insert(list, index, json_str)?;
366 }
367 }
368
369 Ok(())
370 }
371
372 fn matches_query(&self, document: &Document, query: &Query) -> Result<bool> {
374 match query {
375 Query::All => Ok(true),
376
377 Query::Eq { field, value } => {
378 if let Some(doc_value) = document.fields.get(field) {
379 Ok(doc_value == value)
380 } else {
381 Ok(false)
382 }
383 }
384
385 Query::Lt { field, value } => {
386 if let Some(doc_value) = document.fields.get(field) {
387 Ok(self.compare_values(doc_value, value)? < 0)
388 } else {
389 Ok(false)
390 }
391 }
392
393 Query::Gt { field, value } => {
394 if let Some(doc_value) = document.fields.get(field) {
395 Ok(self.compare_values(doc_value, value)? > 0)
396 } else {
397 Ok(false)
398 }
399 }
400
401 Query::And(queries) => {
402 for q in queries {
403 if !self.matches_query(document, q)? {
404 return Ok(false);
405 }
406 }
407 Ok(true)
408 }
409
410 Query::Or(queries) => {
411 for q in queries {
412 if self.matches_query(document, q)? {
413 return Ok(true);
414 }
415 }
416 Ok(false)
417 }
418
419 Query::Not(inner) => Ok(!self.matches_query(document, inner)?),
421
422 Query::Custom(query_str) => Ok(evaluate_custom_query(document, query_str)),
425
426 Query::WithinRadius {
428 center,
429 radius_meters,
430 lat_field,
431 lon_field,
432 } => {
433 let lat_key = lat_field.as_deref().unwrap_or("lat");
434 let lon_key = lon_field.as_deref().unwrap_or("lon");
435
436 if let (Some(lat_val), Some(lon_val)) = (
437 document.fields.get(lat_key).and_then(|v| v.as_f64()),
438 document.fields.get(lon_key).and_then(|v| v.as_f64()),
439 ) {
440 let doc_point = GeoPoint::new(lat_val, lon_val);
441 Ok(doc_point.within_radius(center, *radius_meters))
442 } else {
443 Ok(false)
444 }
445 }
446
447 Query::WithinBounds {
448 min,
449 max,
450 lat_field,
451 lon_field,
452 } => {
453 let lat_key = lat_field.as_deref().unwrap_or("lat");
454 let lon_key = lon_field.as_deref().unwrap_or("lon");
455
456 if let (Some(lat_val), Some(lon_val)) = (
457 document.fields.get(lat_key).and_then(|v| v.as_f64()),
458 document.fields.get(lon_key).and_then(|v| v.as_f64()),
459 ) {
460 let doc_point = GeoPoint::new(lat_val, lon_val);
461 Ok(doc_point.within_bounds(min, max))
462 } else {
463 Ok(false)
464 }
465 }
466
467 Query::IncludeDeleted(inner) => {
469 self.matches_query(document, inner)
472 }
473
474 Query::DeletedOnly => {
475 let is_deleted = document
477 .fields
478 .get("_deleted")
479 .and_then(|v| v.as_bool())
480 .unwrap_or(false);
481 Ok(is_deleted)
482 }
483 }
484 }
485
486 fn compare_values(&self, a: &serde_json::Value, b: &serde_json::Value) -> Result<i8> {
488 use serde_json::Value;
489
490 match (a, b) {
491 (Value::Number(a_num), Value::Number(b_num)) => {
492 if let (Some(a_f), Some(b_f)) = (a_num.as_f64(), b_num.as_f64()) {
493 if a_f < b_f {
494 Ok(-1)
495 } else if a_f > b_f {
496 Ok(1)
497 } else {
498 Ok(0)
499 }
500 } else {
501 Err(Error::Internal("Number comparison failed".into()))
502 }
503 }
504 (Value::String(a_str), Value::String(b_str)) => {
505 if a_str < b_str {
506 Ok(-1)
507 } else if a_str > b_str {
508 Ok(1)
509 } else {
510 Ok(0)
511 }
512 }
513 _ => Err(Error::Internal("Unsupported value comparison".into())),
514 }
515 }
516
517 pub fn generate_sync_message(
522 &self,
523 collection: &str,
524 doc_id: &DocumentId,
525 peer_id: &str,
526 ) -> Result<Vec<u8>> {
527 let key = Self::doc_key(collection, doc_id);
528 let docs = self
529 .documents
530 .lock()
531 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
532
533 let automerge_doc = docs.get(&key).ok_or_else(|| Error::NotFound {
534 resource_type: "Document".to_string(),
535 id: doc_id.clone(),
536 })?;
537
538 let mut sync_states = self
540 .sync_states
541 .lock()
542 .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?;
543 let sync_state = sync_states
544 .entry(format!("{}:{}", peer_id, key))
545 .or_default();
546
547 let message = automerge_doc.generate_sync_message(sync_state);
549
550 match message {
552 Some(msg) => Ok(msg.encode()),
553 None => Ok(Vec::new()), }
555 }
556
557 pub fn receive_sync_message(
561 &self,
562 collection: &str,
563 doc_id: &DocumentId,
564 peer_id: &str,
565 message: &[u8],
566 ) -> Result<()> {
567 let key = Self::doc_key(collection, doc_id);
568 let mut docs = self
569 .documents
570 .lock()
571 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
572
573 let automerge_doc = docs.get_mut(&key).ok_or_else(|| Error::NotFound {
574 resource_type: "Document".to_string(),
575 id: doc_id.clone(),
576 })?;
577
578 let sync_message = sync::Message::decode(message)
580 .map_err(|e| Error::Internal(format!("Message decode failed: {:?}", e)))?;
581
582 let mut sync_states = self
584 .sync_states
585 .lock()
586 .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?;
587 let sync_state = sync_states
588 .entry(format!("{}:{}", peer_id, key))
589 .or_default();
590
591 automerge_doc
593 .receive_sync_message(sync_state, sync_message)
594 .map_err(|e| Error::Internal(format!("Sync message apply failed: {:?}", e)))?;
595
596 Ok(())
597 }
598}
599
600impl Default for AutomergeBackend {
601 fn default() -> Self {
602 Self::new()
603 }
604}
605
606#[async_trait]
611impl DocumentStore for AutomergeBackend {
612 async fn upsert(&self, collection: &str, mut document: Document) -> anyhow::Result<DocumentId> {
613 let doc_id = document
615 .id
616 .clone()
617 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
618
619 let key = Self::doc_key(collection, &doc_id);
620 let mut docs = self
621 .documents
622 .lock()
623 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
624
625 if let Some(existing_doc) = docs.get_mut(&key) {
626 Self::apply_document_to_automerge(existing_doc, &document)?;
628 } else {
629 let mut automerge_doc = Automerge::new();
631 Self::apply_document_to_automerge(&mut automerge_doc, &document)?;
632 docs.insert(key, automerge_doc);
633 }
634
635 document.id = Some(doc_id.clone());
636
637 drop(docs); let observers = self
640 .observers
641 .lock()
642 .map_err(|_| Error::Internal("observers lock poisoned".into()))?;
643 for observer in observers.iter() {
644 let _ = observer.send(ChangeEvent::Updated {
645 collection: collection.to_string(),
646 document: document.clone(),
647 });
648 }
649 drop(observers);
650
651 Ok(doc_id)
652 }
653
654 async fn query(&self, collection: &str, query: &Query) -> anyhow::Result<Vec<Document>> {
655 let docs = self
656 .documents
657 .lock()
658 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
659 let mut results = Vec::new();
660
661 for (key, automerge_doc) in docs.iter() {
663 if !key.starts_with(&format!("{}:", collection)) {
664 continue;
665 }
666
667 let doc_id = key.split(':').nth(1).unwrap_or("").to_string();
669
670 let document = Self::automerge_to_document(automerge_doc, doc_id)?;
672
673 if !query.matches_deletion_state(&document) {
677 continue;
678 }
679
680 if self.matches_query(&document, query)? {
682 results.push(document);
683 }
684 }
685
686 Ok(results)
687 }
688
689 async fn remove(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<()> {
690 let key = Self::doc_key(collection, doc_id);
691 let mut docs = self
692 .documents
693 .lock()
694 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
695
696 docs.remove(&key).ok_or_else(|| Error::NotFound {
697 resource_type: "Document".to_string(),
698 id: doc_id.clone(),
699 })?;
700
701 drop(docs); let observers = self
704 .observers
705 .lock()
706 .map_err(|_| Error::Internal("observers lock poisoned".into()))?;
707 for observer in observers.iter() {
708 let _ = observer.send(ChangeEvent::Removed {
709 collection: collection.to_string(),
710 doc_id: doc_id.clone(),
711 });
712 }
713 drop(observers);
714
715 Ok(())
716 }
717
718 async fn get(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<Option<Document>> {
719 let key = Self::doc_key(collection, doc_id);
720 let docs = self
721 .documents
722 .lock()
723 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
724
725 if let Some(automerge_doc) = docs.get(&key) {
726 let document = Self::automerge_to_document(automerge_doc, doc_id.clone())?;
727 Ok(Some(document))
728 } else {
729 Ok(None)
730 }
731 }
732
733 async fn count(&self, collection: &str, query: &Query) -> anyhow::Result<usize> {
734 let results = self.query(collection, query).await?;
735 Ok(results.len())
736 }
737
738 fn observe(&self, collection: &str, query: &Query) -> anyhow::Result<ChangeStream> {
739 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
740
741 let docs = self
743 .documents
744 .lock()
745 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
746 let mut initial_docs = Vec::new();
747
748 for (key, automerge_doc) in docs.iter() {
749 if !key.starts_with(&format!("{}:", collection)) {
750 continue;
751 }
752
753 let doc_id = key.split(':').nth(1).unwrap_or("").to_string();
754 if let Ok(document) = Self::automerge_to_document(automerge_doc, doc_id) {
755 if self.matches_query(&document, query).unwrap_or(false) {
756 initial_docs.push(document);
757 }
758 }
759 }
760
761 drop(docs); let _ = tx.send(ChangeEvent::Initial {
765 documents: initial_docs,
766 });
767
768 self.observers
770 .lock()
771 .map_err(|_| Error::Internal("observers lock poisoned".into()))?
772 .push(tx.clone());
773
774 Ok(ChangeStream { receiver: rx })
775 }
776
777 async fn delete(
780 &self,
781 collection: &str,
782 doc_id: &DocumentId,
783 reason: Option<&str>,
784 ) -> anyhow::Result<crate::qos::DeleteResult> {
785 let policy = self.deletion_policy(collection);
786
787 match policy {
788 DeletionPolicy::Immutable => {
789 Ok(crate::qos::DeleteResult::immutable())
791 }
792 DeletionPolicy::ImplicitTTL { .. } => {
793 Ok(crate::qos::DeleteResult {
795 deleted: false,
796 tombstone_id: None,
797 expires_at: None,
798 policy: policy.clone(),
799 })
800 }
801 DeletionPolicy::Tombstone {
802 tombstone_ttl,
803 delete_wins: _,
804 } => {
805 let node_id = self.node_id();
807 let lamport = self.next_lamport();
808 let tombstone = if let Some(reason_str) = reason {
809 Tombstone::with_reason(
810 doc_id.clone(),
811 collection.to_string(),
812 node_id,
813 lamport,
814 reason_str,
815 )
816 } else {
817 Tombstone::new(doc_id.clone(), collection.to_string(), node_id, lamport)
818 };
819 let tombstone_id = format!("{}:{}", collection, doc_id);
820
821 self.tombstones
823 .lock()
824 .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
825 .insert(tombstone_id.clone(), tombstone.clone());
826
827 if let Ok(mut pending) = self.pending_tombstones.lock() {
829 pending.push(TombstoneSyncMessage::from_tombstone(tombstone));
830 }
831
832 self.remove(collection, doc_id).await.ok(); Ok(crate::qos::DeleteResult {
836 deleted: true,
837 tombstone_id: Some(tombstone_id),
838 expires_at: Some(std::time::SystemTime::now() + tombstone_ttl),
839 policy: policy.clone(),
840 })
841 }
842 DeletionPolicy::SoftDelete {
843 include_deleted_default: _,
844 } => {
845 if let Some(mut doc) = self.get(collection, doc_id).await? {
847 doc.fields.insert("_deleted".to_string(), Value::Bool(true));
848 doc.fields.insert(
849 "_deleted_at".to_string(),
850 Value::String(chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()),
851 );
852 if let Some(reason) = reason {
853 doc.fields.insert(
854 "_deleted_reason".to_string(),
855 Value::String(reason.to_string()),
856 );
857 }
858 self.upsert(collection, doc).await?;
859
860 Ok(crate::qos::DeleteResult::soft_deleted(policy.clone()))
861 } else {
862 Ok(crate::qos::DeleteResult {
864 deleted: false,
865 tombstone_id: None,
866 expires_at: None,
867 policy: policy.clone(),
868 })
869 }
870 }
871 }
872 }
873
874 async fn is_deleted(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<bool> {
875 let key = format!("{}:{}", collection, doc_id);
876
877 if self
879 .tombstones
880 .lock()
881 .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
882 .contains_key(&key)
883 {
884 return Ok(true);
885 }
886
887 if let Some(doc) = self.get(collection, doc_id).await? {
889 if let Some(deleted) = doc.fields.get("_deleted") {
890 return Ok(deleted.as_bool().unwrap_or(false));
891 }
892 }
893
894 Ok(false)
895 }
896
897 fn deletion_policy(&self, collection: &str) -> crate::qos::DeletionPolicy {
898 self.deletion_policy_registry.get(collection)
899 }
900
901 async fn get_tombstones(&self, collection: &str) -> anyhow::Result<Vec<crate::qos::Tombstone>> {
902 let tombstones = self
903 .tombstones
904 .lock()
905 .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?;
906 let prefix = format!("{}:", collection);
907
908 Ok(tombstones
909 .iter()
910 .filter(|(key, _)| key.starts_with(&prefix))
911 .map(|(_, tombstone)| tombstone.clone())
912 .collect())
913 }
914
915 async fn apply_tombstone(&self, tombstone: &crate::qos::Tombstone) -> anyhow::Result<()> {
916 let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
917
918 self.tombstones
920 .lock()
921 .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
922 .insert(key, tombstone.clone());
923
924 self.remove(&tombstone.collection, &tombstone.document_id)
926 .await
927 .ok();
928
929 Ok(())
930 }
931}
932
933impl crate::qos::GcStore for AutomergeBackend {
938 fn get_all_tombstones(&self) -> anyhow::Result<Vec<Tombstone>> {
939 Ok(self
940 .tombstones
941 .lock()
942 .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
943 .values()
944 .cloned()
945 .collect())
946 }
947
948 fn remove_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
949 let key = format!("{}:{}", collection, document_id);
950 Ok(self
951 .tombstones
952 .lock()
953 .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
954 .remove(&key)
955 .is_some())
956 }
957
958 fn has_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
959 let key = format!("{}:{}", collection, document_id);
960 Ok(self
961 .tombstones
962 .lock()
963 .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
964 .contains_key(&key))
965 }
966
967 fn get_expired_documents(
968 &self,
969 _collection: &str,
970 _cutoff: std::time::SystemTime,
971 ) -> anyhow::Result<Vec<String>> {
972 Ok(Vec::new())
974 }
975
976 fn hard_delete(&self, collection: &str, document_id: &str) -> anyhow::Result<()> {
977 let key = format!("{}:{}", collection, document_id);
978 self.documents
979 .lock()
980 .map_err(|_| Error::Internal("documents lock poisoned".into()))?
981 .remove(&key);
982 Ok(())
983 }
984
985 fn list_collections(&self) -> anyhow::Result<Vec<String>> {
986 let docs = self
987 .documents
988 .lock()
989 .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
990 let mut collections: Vec<String> = docs
991 .keys()
992 .filter_map(|k| k.split(':').next().map(|s| s.to_string()))
993 .collect::<std::collections::HashSet<_>>()
994 .into_iter()
995 .collect();
996 collections.sort();
997 Ok(collections)
998 }
999}
1000
1001#[async_trait]
1006impl PeerDiscovery for AutomergeBackend {
1007 async fn start(&self) -> anyhow::Result<()> {
1008 Ok(())
1011 }
1012
1013 async fn stop(&self) -> anyhow::Result<()> {
1014 Ok(())
1015 }
1016
1017 async fn discovered_peers(&self) -> anyhow::Result<Vec<PeerInfo>> {
1018 Ok(Vec::new())
1020 }
1021
1022 async fn add_peer(&self, _address: &str, _transport: TransportType) -> anyhow::Result<()> {
1023 Ok(())
1025 }
1026
1027 async fn wait_for_peer(&self, _peer_id: &PeerId, _timeout: Duration) -> anyhow::Result<()> {
1028 Err(Error::Internal("wait_for_peer not implemented".into()).into())
1030 }
1031
1032 fn on_peer_event(&self, _callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
1033 }
1036
1037 async fn get_peer_info(&self, _peer_id: &PeerId) -> anyhow::Result<Option<PeerInfo>> {
1038 Ok(None)
1040 }
1041}
1042
1043#[async_trait]
1048impl SyncEngine for AutomergeBackend {
1049 async fn start_sync(&self) -> anyhow::Result<()> {
1050 Ok(())
1053 }
1054
1055 async fn stop_sync(&self) -> anyhow::Result<()> {
1056 self.sync_states
1058 .lock()
1059 .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?
1060 .clear();
1061 Ok(())
1062 }
1063
1064 async fn subscribe(
1065 &self,
1066 collection: &str,
1067 _query: &Query,
1068 ) -> anyhow::Result<SyncSubscription> {
1069 Ok(SyncSubscription::new(
1072 collection.to_string(),
1073 Box::new(AutomergeSubscriptionHandle {
1074 collection: collection.to_string(),
1075 }),
1076 ))
1077 }
1078
1079 async fn is_syncing(&self) -> anyhow::Result<bool> {
1080 Ok(self.is_ready().await)
1082 }
1083}
1084
1085struct AutomergeSubscriptionHandle {
1087 #[allow(dead_code)]
1088 collection: String,
1089}
1090
1091#[async_trait]
1096impl DataSyncBackend for AutomergeBackend {
1097 async fn initialize(&self, config: BackendConfig) -> anyhow::Result<()> {
1098 let mut initialized = self
1099 .initialized
1100 .lock()
1101 .map_err(|_| Error::Internal("initialized lock poisoned".into()))?;
1102 if *initialized {
1103 return Err(Error::Internal("Already initialized".into()).into());
1104 }
1105
1106 *self
1107 .config
1108 .lock()
1109 .map_err(|_| Error::Internal("config lock poisoned".into()))? = Some(config);
1110 *initialized = true;
1111
1112 let gc = Arc::new(crate::qos::GarbageCollector::with_policy_registry(
1114 Arc::new(self.clone()),
1115 Arc::clone(&self.deletion_policy_registry),
1116 crate::qos::GcConfig::default(),
1117 ));
1118 let handle = crate::qos::start_periodic_gc(gc);
1119 *self
1120 .gc_handle
1121 .lock()
1122 .map_err(|_| Error::Internal("gc_handle lock poisoned".into()))? = Some(handle);
1123
1124 Ok(())
1125 }
1126
1127 async fn shutdown(&self) -> anyhow::Result<()> {
1128 if let Ok(mut handle) = self.gc_handle.lock() {
1130 if let Some(h) = handle.take() {
1131 h.abort();
1132 }
1133 }
1134
1135 self.stop_sync().await?;
1136 self.documents
1137 .lock()
1138 .map_err(|_| Error::Internal("documents lock poisoned".into()))?
1139 .clear();
1140 self.sync_states
1141 .lock()
1142 .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?
1143 .clear();
1144 *self
1145 .initialized
1146 .lock()
1147 .map_err(|_| Error::Internal("initialized lock poisoned".into()))? = false;
1148
1149 Ok(())
1150 }
1151
1152 fn document_store(&self) -> Arc<dyn DocumentStore> {
1153 Arc::new(self.clone())
1154 }
1155
1156 fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
1157 Arc::new(self.clone())
1158 }
1159
1160 fn sync_engine(&self) -> Arc<dyn SyncEngine> {
1161 Arc::new(self.clone())
1162 }
1163
1164 fn as_any(&self) -> &dyn std::any::Any {
1165 self
1166 }
1167
1168 async fn is_ready(&self) -> bool {
1169 self.initialized.lock().map(|guard| *guard).unwrap_or(false)
1170 }
1171
1172 fn backend_info(&self) -> BackendInfo {
1173 BackendInfo {
1174 name: "Automerge".to_string(),
1175 version: "0.7.1".to_string(),
1176 }
1177 }
1178}
1179
1180type PeerCallbacks = Arc<Mutex<Vec<Box<dyn Fn(PeerEvent) + Send + Sync>>>>;
1186
1187#[derive(Debug, Clone)]
1194pub enum TopologyConnectionEvent {
1195 ConnectPeer {
1197 peer_id: String,
1199 addresses: Vec<String>,
1201 relay_url: Option<String>,
1203 },
1204 DisconnectPeer {
1206 peer_id: String,
1208 },
1209}
1210
1211pub const DEFAULT_MAX_CONNECTIONS: usize = 7;
1213
1214#[derive(Clone)]
1219pub struct AutomergeIrohBackend {
1220 backend: Arc<crate::storage::AutomergeBackend>,
1222
1223 transport: Arc<crate::network::IrohTransport>,
1225
1226 peer_callbacks: PeerCallbacks,
1228
1229 initialized: Arc<Mutex<bool>>,
1231
1232 formation_key: Arc<std::sync::RwLock<Option<crate::security::FormationKey>>>,
1235
1236 #[cfg(feature = "automerge-backend")]
1238 discovery_manager: Arc<tokio::sync::RwLock<crate::discovery::peer::DiscoveryManager>>,
1239
1240 #[cfg(feature = "automerge-backend")]
1246 blob_store: Option<Arc<crate::storage::NetworkedIrohBlobStore>>,
1247
1248 #[cfg(feature = "automerge-backend")]
1254 topology_event_rx:
1255 Arc<tokio::sync::Mutex<Option<mpsc::UnboundedReceiver<TopologyConnectionEvent>>>>,
1256
1257 #[cfg(feature = "automerge-backend")]
1262 max_connections: usize,
1263
1264 deletion_policy_registry: Arc<DeletionPolicyRegistry>,
1266
1267 lamport_counter: Arc<AtomicU64>,
1269}
1270
1271impl AutomergeIrohBackend {
1272 pub fn new(
1274 backend: Arc<crate::storage::AutomergeBackend>,
1275 transport: Arc<crate::network::IrohTransport>,
1276 ) -> Self {
1277 Self {
1278 backend,
1279 transport,
1280 peer_callbacks: Arc::new(Mutex::new(Vec::new())),
1281 initialized: Arc::new(Mutex::new(false)),
1282 formation_key: Arc::new(std::sync::RwLock::new(None)),
1283 #[cfg(feature = "automerge-backend")]
1284 discovery_manager: Arc::new(tokio::sync::RwLock::new(
1285 crate::discovery::peer::DiscoveryManager::default(),
1286 )),
1287 #[cfg(feature = "automerge-backend")]
1288 blob_store: None,
1289 #[cfg(feature = "automerge-backend")]
1290 topology_event_rx: Arc::new(tokio::sync::Mutex::new(None)),
1291 #[cfg(feature = "automerge-backend")]
1292 max_connections: DEFAULT_MAX_CONNECTIONS,
1293 deletion_policy_registry: Arc::new(DeletionPolicyRegistry::new()),
1294 lamport_counter: Arc::new(AtomicU64::new(0)),
1295 }
1296 }
1297
1298 #[cfg(feature = "automerge-backend")]
1315 pub fn with_topology_events(
1316 mut self,
1317 rx: mpsc::UnboundedReceiver<TopologyConnectionEvent>,
1318 ) -> Self {
1319 self.topology_event_rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1320 self
1321 }
1322
1323 #[cfg(feature = "automerge-backend")]
1328 pub fn with_max_connections(mut self, max: usize) -> Self {
1329 self.max_connections = max;
1330 self
1331 }
1332
1333 #[cfg(feature = "automerge-backend")]
1335 pub fn has_topology_events(&self) -> bool {
1336 self.topology_event_rx
1338 .try_lock()
1339 .is_ok_and(|guard| guard.is_some())
1340 }
1341
1342 pub fn formation_key(&self) -> Option<crate::security::FormationKey> {
1344 self.formation_key
1345 .read()
1346 .ok()
1347 .and_then(|guard| guard.clone())
1348 }
1349
1350 pub fn formation_id(&self) -> Option<String> {
1352 self.formation_key
1353 .read()
1354 .ok()
1355 .and_then(|guard| guard.as_ref().map(|k| k.formation_id().to_string()))
1356 }
1357
1358 pub fn from_parts(
1360 store: Arc<crate::storage::AutomergeStore>,
1361 transport: Arc<crate::network::IrohTransport>,
1362 ) -> Self {
1363 let backend = Arc::new(crate::storage::AutomergeBackend::with_transport(
1364 store,
1365 Arc::clone(&transport),
1366 ));
1367 Self::new(backend, transport)
1368 }
1369
1370 pub fn transport(&self) -> Arc<crate::network::IrohTransport> {
1372 Arc::clone(&self.transport)
1373 }
1374
1375 pub fn storage_backend(&self) -> Arc<crate::storage::AutomergeBackend> {
1381 Arc::clone(&self.backend)
1382 }
1383
1384 pub fn transport_arc_ptr(&self) -> *const crate::network::IrohTransport {
1390 Arc::as_ptr(&self.transport)
1391 }
1392
1393 pub fn debug_log_transport_ptr(&self, context: &str) {
1398 tracing::debug!(
1399 transport_ptr = ?Arc::as_ptr(&self.transport),
1400 endpoint_id = %self.transport.endpoint_id(),
1401 peer_count = self.transport.peer_count(),
1402 context = context,
1403 "AutomergeIrohBackend transport instance"
1404 );
1405 }
1406
1407 pub fn endpoint_id(&self) -> iroh::EndpointId {
1409 self.transport.endpoint_id()
1410 }
1411
1412 #[cfg(feature = "automerge-backend")]
1441 pub async fn enable_blob_store(
1442 &mut self,
1443 blob_dir: std::path::PathBuf,
1444 ) -> std::result::Result<(), anyhow::Error> {
1445 use crate::storage::NetworkedIrohBlobStore;
1446
1447 let blob_store = NetworkedIrohBlobStore::new(blob_dir).await?;
1448
1449 let connected_peers = self.transport.connected_peers();
1451 for peer_id in connected_peers {
1452 blob_store.add_peer(peer_id).await;
1453 }
1454
1455 self.blob_store = Some(blob_store);
1456
1457 tracing::info!(
1458 endpoint_id = %self.transport.endpoint_id(),
1459 "Blob store enabled for AutomergeIrohBackend"
1460 );
1461
1462 Ok(())
1463 }
1464
1465 #[cfg(feature = "automerge-backend")]
1469 pub fn blob_store(&self) -> Option<Arc<crate::storage::NetworkedIrohBlobStore>> {
1470 self.blob_store.clone()
1471 }
1472
1473 #[cfg(feature = "automerge-backend")]
1475 pub fn has_blob_store(&self) -> bool {
1476 self.blob_store.is_some()
1477 }
1478
1479 #[cfg(feature = "automerge-backend")]
1484 pub async fn register_blob_peer(&self, peer_id: iroh::EndpointId) {
1485 if let Some(ref blob_store) = self.blob_store {
1486 blob_store.add_peer(peer_id).await;
1487 tracing::debug!(
1488 peer_id = %peer_id.fmt_short(),
1489 "Registered peer for blob transfer"
1490 );
1491 }
1492 }
1493
1494 #[cfg(feature = "automerge-backend")]
1496 pub async fn unregister_blob_peer(&self, peer_id: &iroh::EndpointId) {
1497 if let Some(ref blob_store) = self.blob_store {
1498 blob_store.remove_peer(peer_id).await;
1499 tracing::debug!(
1500 peer_id = %peer_id.fmt_short(),
1501 "Unregistered peer from blob transfer"
1502 );
1503 }
1504 }
1505
1506 #[cfg(feature = "automerge-backend")]
1523 pub fn start_blob_peer_sync(&self) {
1524 use crate::network::iroh_transport::TransportPeerEvent;
1525
1526 let blob_store = match &self.blob_store {
1527 Some(store) => Arc::clone(store),
1528 None => {
1529 tracing::warn!("start_blob_peer_sync called but blob store not enabled");
1530 return;
1531 }
1532 };
1533
1534 let mut events = self.transport.subscribe_peer_events();
1535
1536 tokio::spawn(async move {
1537 tracing::debug!("Blob peer sync task started");
1538
1539 while let Some(event) = events.recv().await {
1540 match event {
1541 TransportPeerEvent::Connected { endpoint_id, .. } => {
1542 blob_store.add_peer(endpoint_id).await;
1543 tracing::debug!(
1544 peer_id = %endpoint_id.fmt_short(),
1545 "Auto-registered peer for blob transfer on connect"
1546 );
1547 }
1548 TransportPeerEvent::Disconnected { endpoint_id, .. } => {
1549 blob_store.remove_peer(&endpoint_id).await;
1550 tracing::debug!(
1551 peer_id = %endpoint_id.fmt_short(),
1552 "Auto-unregistered peer from blob transfer on disconnect"
1553 );
1554 }
1555 }
1556 }
1557
1558 tracing::debug!("Blob peer sync task stopped");
1559 });
1560 }
1561
1562 pub async fn sync_document(&self, doc_key: &str) -> Result<()> {
1571 self.backend
1572 .sync_document(doc_key)
1573 .await
1574 .map_err(|e| Error::Network {
1575 message: format!("Failed to sync document {}: {}", doc_key, e),
1576 peer_id: None,
1577 source: None,
1578 })
1579 }
1580
1581 #[cfg(feature = "automerge-backend")]
1585 pub async fn add_discovery_strategy(
1586 &self,
1587 strategy: Box<dyn crate::discovery::peer::DiscoveryStrategy>,
1588 ) -> Result<()> {
1589 let mut manager = self.discovery_manager.write().await;
1590 manager.add_strategy(strategy);
1591 Ok(())
1592 }
1593
1594 #[cfg(feature = "automerge-backend")]
1613 pub async fn connect_to_discovered_peers_now(&self) -> Result<usize> {
1614 use crate::network::formation_handshake::perform_initiator_handshake;
1615 use crate::network::PeerInfo as NetworkPeerInfo;
1616
1617 let formation_key = self
1618 .formation_key
1619 .read()
1620 .map_err(|_| Error::Internal("formation_key lock poisoned".into()))?
1621 .clone()
1622 .ok_or_else(|| Error::config_error("Backend not initialized", None))?;
1623
1624 let manager = self.discovery_manager.read().await;
1626 let discovered_peers = manager.get_peers().await;
1627 drop(manager);
1628
1629 let mut new_connections = 0;
1630
1631 for peer in discovered_peers {
1632 let network_peer_info = NetworkPeerInfo {
1633 name: peer.name.clone(),
1634 node_id: peer.node_id.clone(),
1635 addresses: peer.addresses.clone(),
1636 relay_url: peer.relay_url.clone(),
1637 };
1638
1639 if let Ok(endpoint_id) = peer.endpoint_id() {
1640 match self.transport.connect_peer(&network_peer_info).await {
1641 Ok(Some(conn)) => {
1642 tokio::task::yield_now().await;
1648 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1649
1650 if conn.close_reason().is_some() {
1652 tracing::debug!(
1653 "Immediate connect: peer {} superseded by accept path",
1654 peer.name
1655 );
1656 continue;
1657 }
1658
1659 match perform_initiator_handshake(&conn, &formation_key).await {
1661 Ok(()) => {
1662 tracing::debug!(
1663 "Immediate connect: authenticated with peer {}",
1664 peer.name
1665 );
1666 self.transport.emit_peer_connected(endpoint_id);
1668 new_connections += 1;
1669 }
1670 Err(e) => {
1671 tracing::warn!(
1672 "Immediate connect: peer {} failed auth: {}",
1673 peer.name,
1674 e
1675 );
1676 conn.close(1u32.into(), b"authentication failed");
1677 }
1681 }
1682 }
1683 Ok(None) => {
1684 tracing::debug!(
1686 "Immediate connect: peer {} handled by accept path",
1687 peer.name
1688 );
1689 }
1690 Err(e) => {
1691 tracing::debug!(
1692 "Immediate connect: failed to connect to {}: {}",
1693 peer.name,
1694 e
1695 );
1696 }
1697 }
1698 }
1699 }
1700
1701 Ok(new_connections)
1702 }
1703
1704 #[cfg(feature = "automerge-backend")]
1708 pub fn get_peer_discovery(&self) -> PeerDiscoveryHandle {
1709 PeerDiscoveryHandle {
1710 manager: Arc::clone(&self.discovery_manager),
1711 }
1712 }
1713}
1714
1715#[cfg(feature = "automerge-backend")]
1717pub struct PeerDiscoveryHandle {
1718 manager: Arc<tokio::sync::RwLock<crate::discovery::peer::DiscoveryManager>>,
1719}
1720
1721#[cfg(feature = "automerge-backend")]
1722impl PeerDiscoveryHandle {
1723 pub async fn discovered_peers(&self) -> Result<Vec<crate::discovery::peer::PeerInfo>> {
1728 let manager = self.manager.read().await;
1729 manager
1730 .discovered_peers()
1731 .await
1732 .map_err(|e| Error::Discovery {
1733 message: e.to_string(),
1734 source: None,
1735 })
1736 }
1737
1738 pub async fn peer_count(&self) -> usize {
1742 let manager = self.manager.read().await;
1743 manager.peer_count().await
1744 }
1745}
1746
1747struct IrohDocumentStore {
1749 backend: Arc<crate::storage::AutomergeBackend>,
1750 deletion_policy_registry: Arc<DeletionPolicyRegistry>,
1752 lamport_counter: Arc<AtomicU64>,
1754 node_id: String,
1756}
1757
1758#[async_trait]
1759impl DocumentStore for IrohDocumentStore {
1760 async fn upsert(&self, collection: &str, document: Document) -> anyhow::Result<DocumentId> {
1761 use crate::storage::traits::StorageBackend;
1762
1763 let doc_id = document.id.clone().unwrap_or_else(|| {
1765 use std::time::SystemTime;
1766 let timestamp = SystemTime::now()
1767 .duration_since(SystemTime::UNIX_EPOCH)
1768 .expect("system clock before UNIX epoch")
1769 .as_nanos();
1770 format!("doc-{}", timestamp)
1771 });
1772
1773 let json_bytes = serde_json::to_vec(&document)?;
1775
1776 let coll = self.backend.collection(collection);
1778 coll.upsert(&doc_id, json_bytes)
1779 .map_err(|e| Error::Storage {
1780 message: e.to_string(),
1781 operation: Some("upsert".to_string()),
1782 key: Some(doc_id.clone()),
1783 source: None,
1784 })?;
1785
1786 let doc_key = format!("{}:{}", collection, doc_id);
1789 match self.backend.sync_document(&doc_key).await {
1790 Ok(()) => {
1791 tracing::debug!("Sync triggered for document {} after upsert", doc_key);
1792 }
1793 Err(e) => {
1794 tracing::debug!("Failed to sync document {} after upsert: {}", doc_key, e);
1796 }
1797 }
1798
1799 Ok(doc_id)
1800 }
1801
1802 async fn query(&self, collection: &str, query: &Query) -> anyhow::Result<Vec<Document>> {
1803 use crate::storage::traits::StorageBackend;
1804
1805 let coll = self.backend.collection(collection);
1806 let all_items = coll.scan().map_err(|e| Error::Storage {
1807 message: e.to_string(),
1808 operation: Some("scan".to_string()),
1809 key: None,
1810 source: None,
1811 })?;
1812
1813 let mut results = Vec::new();
1815 for (doc_id, bytes) in all_items {
1816 if let Ok(mut doc) = serde_json::from_slice::<Document>(&bytes) {
1817 if doc.id.is_none() {
1819 doc.id = Some(doc_id);
1820 }
1821
1822 if !query.matches_deletion_state(&doc) {
1826 continue;
1827 }
1828
1829 if matches_query(&doc, query) {
1830 results.push(doc);
1831 }
1832 }
1833 }
1834
1835 Ok(results)
1836 }
1837
1838 async fn remove(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<()> {
1839 use crate::storage::traits::StorageBackend;
1840
1841 let coll = self.backend.collection(collection);
1842 coll.delete(doc_id).map_err(|e| Error::Storage {
1843 message: e.to_string(),
1844 operation: Some("delete".to_string()),
1845 key: Some(doc_id.clone()),
1846 source: None,
1847 })?;
1848 Ok(())
1849 }
1850
1851 async fn delete(
1852 &self,
1853 collection: &str,
1854 doc_id: &DocumentId,
1855 reason: Option<&str>,
1856 ) -> anyhow::Result<crate::qos::DeleteResult> {
1857 let policy = self.deletion_policy(collection);
1858 let store = self.backend.automerge_store();
1859
1860 match policy {
1861 DeletionPolicy::Immutable => Ok(crate::qos::DeleteResult::immutable()),
1862 DeletionPolicy::ImplicitTTL { .. } => Ok(crate::qos::DeleteResult {
1863 deleted: false,
1864 tombstone_id: None,
1865 expires_at: None,
1866 policy: policy.clone(),
1867 }),
1868 DeletionPolicy::Tombstone {
1869 tombstone_ttl,
1870 delete_wins: _,
1871 } => {
1872 let lamport = self.lamport_counter.fetch_add(1, Ordering::SeqCst);
1873 let tombstone = if let Some(reason_str) = reason {
1874 Tombstone::with_reason(
1875 doc_id.clone(),
1876 collection.to_string(),
1877 self.node_id.clone(),
1878 lamport,
1879 reason_str,
1880 )
1881 } else {
1882 Tombstone::new(
1883 doc_id.clone(),
1884 collection.to_string(),
1885 self.node_id.clone(),
1886 lamport,
1887 )
1888 };
1889 let tombstone_id = format!("{}:{}", collection, doc_id);
1890
1891 store
1893 .put_tombstone(&tombstone)
1894 .map_err(|e| Error::Storage {
1895 message: e.to_string(),
1896 operation: Some("put_tombstone".to_string()),
1897 key: Some(tombstone_id.clone()),
1898 source: None,
1899 })?;
1900
1901 let doc_key = format!("{}:{}", collection, doc_id);
1903 store.delete(&doc_key).ok();
1904
1905 if let Some(coordinator) = self.backend.sync_coordinator() {
1909 let coordinator = coordinator.clone();
1910 let backend = Arc::clone(&self.backend);
1911 tokio::spawn(async move {
1912 if let Some(transport) = backend.iroh_transport() {
1914 for peer_id in transport.connected_peers() {
1915 if let Err(e) = coordinator.sync_tombstones_with_peer(peer_id).await
1916 {
1917 tracing::debug!(
1918 "Tombstone propagation to peer {:?} failed: {}",
1919 peer_id,
1920 e
1921 );
1922 }
1923 }
1924 }
1925 });
1926 }
1927
1928 Ok(crate::qos::DeleteResult {
1929 deleted: true,
1930 tombstone_id: Some(tombstone_id),
1931 expires_at: Some(std::time::SystemTime::now() + tombstone_ttl),
1932 policy: policy.clone(),
1933 })
1934 }
1935 DeletionPolicy::SoftDelete {
1936 include_deleted_default: _,
1937 } => {
1938 let doc_key = format!("{}:{}", collection, doc_id);
1940 if let Some(automerge_doc) = store.get(&doc_key).map_err(|e| Error::Storage {
1941 message: e.to_string(),
1942 operation: Some("get".to_string()),
1943 key: Some(doc_key.clone()),
1944 source: None,
1945 })? {
1946 let mut doc = automerge_doc;
1948 let mut tx = doc.transaction();
1949 let root = automerge::ROOT;
1950 tx.put(&root, "_deleted", true).ok();
1951 tx.put(
1952 &root,
1953 "_deleted_at",
1954 chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1955 )
1956 .ok();
1957 if let Some(r) = reason {
1958 tx.put(&root, "_deleted_reason", r).ok();
1959 }
1960 tx.commit();
1961 store.put(&doc_key, &doc).map_err(|e| Error::Storage {
1962 message: e.to_string(),
1963 operation: Some("put".to_string()),
1964 key: Some(doc_key.clone()),
1965 source: None,
1966 })?;
1967
1968 if let Ok(()) = self.backend.sync_document(&doc_key).await {
1970 tracing::debug!("Sync triggered for soft-deleted document {}", doc_key);
1971 }
1972
1973 Ok(crate::qos::DeleteResult::soft_deleted(policy.clone()))
1974 } else {
1975 Ok(crate::qos::DeleteResult {
1976 deleted: false,
1977 tombstone_id: None,
1978 expires_at: None,
1979 policy: policy.clone(),
1980 })
1981 }
1982 }
1983 }
1984 }
1985
1986 async fn is_deleted(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<bool> {
1987 let store = self.backend.automerge_store();
1988
1989 if store.has_tombstone(collection, doc_id).unwrap_or(false) {
1991 return Ok(true);
1992 }
1993
1994 let doc_key = format!("{}:{}", collection, doc_id);
1996 if let Ok(Some(automerge_doc)) = store.get(&doc_key) {
1997 use automerge::ReadDoc;
1998 if let Ok(Some((automerge::Value::Scalar(s), _))) =
1999 automerge_doc.get(automerge::ROOT, "_deleted")
2000 {
2001 if let automerge::ScalarValue::Boolean(true) = s.as_ref() {
2002 return Ok(true);
2003 }
2004 }
2005 }
2006
2007 Ok(false)
2008 }
2009
2010 fn deletion_policy(&self, collection: &str) -> crate::qos::DeletionPolicy {
2011 self.deletion_policy_registry.get(collection)
2012 }
2013
2014 async fn get_tombstones(&self, collection: &str) -> anyhow::Result<Vec<Tombstone>> {
2015 self.backend
2016 .automerge_store()
2017 .get_tombstones_for_collection(collection)
2018 .map_err(|e| {
2019 Error::Storage {
2020 message: e.to_string(),
2021 operation: Some("get_tombstones".to_string()),
2022 key: None,
2023 source: None,
2024 }
2025 .into()
2026 })
2027 }
2028
2029 async fn apply_tombstone(&self, tombstone: &Tombstone) -> anyhow::Result<()> {
2030 let store = self.backend.automerge_store();
2031 store.put_tombstone(tombstone).map_err(|e| Error::Storage {
2032 message: e.to_string(),
2033 operation: Some("put_tombstone".to_string()),
2034 key: Some(format!(
2035 "{}:{}",
2036 tombstone.collection, tombstone.document_id
2037 )),
2038 source: None,
2039 })?;
2040
2041 let doc_key = format!("{}:{}", tombstone.collection, tombstone.document_id);
2043 store.delete(&doc_key).ok();
2044 Ok(())
2045 }
2046
2047 fn observe(&self, collection: &str, query: &Query) -> anyhow::Result<ChangeStream> {
2048 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2049
2050 let collection_prefix = format!("{}:", collection);
2054 let all_docs = self
2055 .backend
2056 .automerge_store()
2057 .scan_prefix(&collection_prefix)
2058 .map_err(|e| Error::Storage {
2059 message: e.to_string(),
2060 operation: Some("scan_prefix".to_string()),
2061 key: None,
2062 source: None,
2063 })?;
2064
2065 let mut initial_docs = Vec::new();
2066 for (doc_key, automerge_doc) in all_docs {
2067 let doc_id = match doc_key.strip_prefix(&collection_prefix) {
2069 Some(id) => id.to_string(),
2070 None => continue,
2071 };
2072
2073 if let Ok(json_value) = automerge_to_message::<serde_json::Value>(&automerge_doc) {
2075 let fields = if let serde_json::Value::Object(map) = json_value {
2076 map.into_iter().collect()
2077 } else {
2078 serde_json::Map::new().into_iter().collect()
2079 };
2080 let doc = Document {
2081 id: Some(doc_id),
2082 fields,
2083 updated_at: std::time::SystemTime::now(),
2084 };
2085
2086 if matches_query(&doc, query) {
2087 initial_docs.push(doc);
2088 }
2089 }
2090 }
2091
2092 let _ = tx.send(ChangeEvent::Initial {
2094 documents: initial_docs,
2095 });
2096
2097 let mut change_rx = self
2102 .backend
2103 .automerge_store()
2104 .subscribe_to_observer_changes();
2105 let collection_name = collection.to_string();
2106 let collection_prefix = format!("{}:", collection);
2107 let query_clone = query.clone();
2108 let backend = Arc::clone(&self.backend);
2109 let tx_clone = tx.clone();
2110
2111 tokio::spawn(async move {
2113 loop {
2114 match change_rx.recv().await {
2115 Ok(doc_key) => {
2116 if !doc_key.starts_with(&collection_prefix) {
2118 continue;
2119 }
2120
2121 let doc_id = match doc_key.strip_prefix(&collection_prefix) {
2123 Some(id) => id.to_string(),
2124 None => continue,
2125 };
2126
2127 let maybe_doc: Option<Document> = if let Ok(Some(automerge_doc)) =
2132 backend.automerge_store().get(&doc_key)
2133 {
2134 if let Ok(json_value) =
2136 automerge_to_message::<serde_json::Value>(&automerge_doc)
2137 {
2138 let fields = if let serde_json::Value::Object(map) = json_value {
2140 map.into_iter().collect()
2141 } else {
2142 serde_json::Map::new().into_iter().collect()
2143 };
2144 Some(Document {
2145 id: Some(doc_id.clone()),
2146 fields,
2147 updated_at: std::time::SystemTime::now(),
2148 })
2149 } else {
2150 None
2151 }
2152 } else {
2153 None
2154 };
2155
2156 if let Some(mut doc) = maybe_doc {
2157 if doc.id.is_none() {
2158 doc.id = Some(doc_id);
2159 }
2160
2161 if matches_query(&doc, &query_clone) {
2163 if tx_clone
2165 .send(ChangeEvent::Updated {
2166 collection: collection_name.clone(),
2167 document: doc,
2168 })
2169 .is_err()
2170 {
2171 break;
2173 }
2174 }
2175 }
2176 }
2177 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
2178 tracing::warn!(
2182 "Observer change notification lagged, skipped {} messages - re-emitting all documents",
2183 n
2184 );
2185
2186 let prefix = &collection_prefix;
2190 if let Ok(all_docs) = backend.automerge_store().scan_prefix(prefix) {
2191 for (doc_key, automerge_doc) in all_docs {
2192 let doc_id = match doc_key.strip_prefix(prefix) {
2194 Some(id) => id.to_string(),
2195 None => continue,
2196 };
2197
2198 let maybe_doc: Option<Document> = if let Ok(json_value) =
2200 automerge_to_message::<serde_json::Value>(&automerge_doc)
2201 {
2202 let fields = if let serde_json::Value::Object(map) = json_value
2203 {
2204 map.into_iter().collect()
2205 } else {
2206 serde_json::Map::new().into_iter().collect()
2207 };
2208 Some(Document {
2209 id: Some(doc_id),
2210 fields,
2211 updated_at: std::time::SystemTime::now(),
2212 })
2213 } else {
2214 None
2215 };
2216
2217 if let Some(doc) = maybe_doc {
2218 #[allow(clippy::collapsible_if)]
2220 if matches_query(&doc, &query_clone) {
2221 if tx_clone
2222 .send(ChangeEvent::Updated {
2223 collection: collection_name.clone(),
2224 document: doc,
2225 })
2226 .is_err()
2227 {
2228 break;
2230 }
2231 }
2232 }
2233 }
2234 }
2235 }
2236 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
2237 break;
2239 }
2240 }
2241 }
2242 });
2243
2244 Ok(ChangeStream { receiver: rx })
2245 }
2246}
2247
2248struct IrohPeerDiscovery {
2250 transport: Arc<crate::network::IrohTransport>,
2251 peer_callbacks: PeerCallbacks,
2252 #[cfg(feature = "automerge-backend")]
2253 discovery_manager: Arc<tokio::sync::RwLock<crate::discovery::peer::DiscoveryManager>>,
2254 #[cfg(feature = "automerge-backend")]
2256 formation_key: Arc<std::sync::RwLock<Option<crate::security::FormationKey>>>,
2257 event_forwarder_running: Arc<std::sync::atomic::AtomicBool>,
2259 #[cfg(feature = "automerge-backend")]
2261 topology_event_rx:
2262 Arc<tokio::sync::Mutex<Option<mpsc::UnboundedReceiver<TopologyConnectionEvent>>>>,
2263 #[cfg(feature = "automerge-backend")]
2265 max_connections: usize,
2266}
2267
2268#[async_trait]
2269impl PeerDiscovery for IrohPeerDiscovery {
2270 async fn start(&self) -> anyhow::Result<()> {
2271 let formation_key = self
2273 .formation_key
2274 .read()
2275 .map_err(|_| Error::Internal("formation_key lock poisoned".into()))?
2276 .clone()
2277 .ok_or_else(|| Error::Internal("Formation key not initialized".to_string()))?;
2278
2279 #[cfg(feature = "automerge-backend")]
2287 {
2288 self.transport.mark_accept_loop_managed().map_err(|e| {
2290 Error::Internal(format!("Failed to mark accept loop as managed: {}", e))
2291 })?;
2292
2293 let transport = Arc::clone(&self.transport);
2294 let formation_key_accept = formation_key.clone();
2295
2296 tokio::spawn(async move {
2297 use crate::network::formation_handshake::perform_responder_handshake;
2298
2299 let mut consecutive_errors = 0u32;
2301 const MAX_CONSECUTIVE_ERRORS: u32 = 10;
2302
2303 loop {
2304 match transport.accept().await {
2309 Ok(Some(conn)) => {
2310 consecutive_errors = 0; let peer_id = conn.remote_id();
2312
2313 match perform_responder_handshake(&conn, &formation_key_accept).await {
2315 Ok(()) => {
2316 transport.emit_peer_connected(peer_id);
2318 }
2319 Err(e) => {
2320 tracing::warn!(
2321 ?peer_id,
2322 error = %e,
2323 "Formation handshake failed"
2324 );
2325 conn.close(1u32.into(), b"authentication failed");
2328 }
2329 }
2330 }
2331 Ok(None) => {
2332 consecutive_errors = 0; }
2336 Err(e) => {
2337 consecutive_errors += 1;
2340 let error_msg = format!("{}", e);
2341
2342 if error_msg.contains("Endpoint closed")
2343 || error_msg.contains("no more")
2344 {
2345 tracing::info!("Accept loop stopped: endpoint closed");
2346 break;
2347 }
2348
2349 if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
2350 tracing::error!(
2351 consecutive_errors,
2352 error = %e,
2353 "Accept loop stopping after {} consecutive errors",
2354 MAX_CONSECUTIVE_ERRORS
2355 );
2356 break;
2357 }
2358
2359 tracing::warn!(
2360 error = %e,
2361 consecutive_errors,
2362 "Accept error (will retry, {} more before stopping)",
2363 MAX_CONSECUTIVE_ERRORS - consecutive_errors
2364 );
2365 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2367 }
2368 }
2369 }
2370 tracing::info!("Authenticated accept loop stopped");
2371 });
2372 }
2373
2374 #[cfg(feature = "automerge-backend")]
2376 {
2377 let mut manager = self.discovery_manager.write().await;
2378 manager.start().await.map_err(|e| {
2379 Error::Internal(format!("Failed to start discovery manager: {}", e))
2380 })?;
2381 }
2382
2383 #[cfg(feature = "automerge-backend")]
2387 if let Some(mdns) = self.transport.mdns_discovery() {
2388 use futures_lite::StreamExt;
2389 use iroh::address_lookup::mdns::DiscoveryEvent;
2390
2391 let mdns = mdns.clone();
2392 let transport = Arc::clone(&self.transport);
2393 let formation_key_mdns = formation_key.clone();
2394
2395 tokio::spawn(async move {
2396 use crate::network::formation_handshake::perform_initiator_handshake;
2397
2398 tracing::info!("Starting mDNS discovery event handler");
2399 let mut stream = mdns.subscribe().await;
2400
2401 while let Some(event) = stream.next().await {
2402 match event {
2403 DiscoveryEvent::Discovered { endpoint_info, .. } => {
2404 let peer_id = endpoint_info.endpoint_id;
2405 tracing::info!(
2406 peer_id = %peer_id,
2407 "mDNS discovered peer, attempting connection"
2408 );
2409
2410 if transport.get_connection(&peer_id).is_some() {
2412 tracing::debug!(
2413 peer_id = %peer_id,
2414 "Already connected to mDNS-discovered peer"
2415 );
2416 continue;
2417 }
2418
2419 match transport.connect_by_id(peer_id).await {
2421 Ok(Some(conn)) => {
2422 match perform_initiator_handshake(&conn, &formation_key_mdns)
2424 .await
2425 {
2426 Ok(()) => {
2427 tracing::info!(
2428 peer_id = %peer_id,
2429 "mDNS peer connected and authenticated"
2430 );
2431 transport.emit_peer_connected(peer_id);
2433 }
2434 Err(e) => {
2435 tracing::warn!(
2436 peer_id = %peer_id,
2437 error = %e,
2438 "mDNS peer failed authentication"
2439 );
2440 conn.close(1u32.into(), b"authentication failed");
2441 transport.disconnect(&peer_id).ok();
2442 }
2443 }
2444 }
2445 Ok(None) => {
2446 tracing::debug!(
2448 peer_id = %peer_id,
2449 "mDNS peer connection handled by accept path"
2450 );
2451 }
2452 Err(e) => {
2453 tracing::debug!(
2454 peer_id = %peer_id,
2455 error = %e,
2456 "Failed to connect to mDNS-discovered peer"
2457 );
2458 }
2459 }
2460 }
2461 DiscoveryEvent::Expired { endpoint_id } => {
2462 tracing::debug!(
2463 peer_id = %endpoint_id,
2464 "mDNS peer expired (no longer advertising)"
2465 );
2466 }
2469 }
2470 }
2471 tracing::debug!("mDNS discovery event handler stopped");
2472 });
2473 }
2474
2475 #[cfg(feature = "automerge-backend")]
2477 let has_topology_events = {
2478 let guard = self.topology_event_rx.lock().await;
2479 guard.is_some()
2480 };
2481
2482 #[cfg(feature = "automerge-backend")]
2484 if has_topology_events {
2485 let topology_rx = self.topology_event_rx.clone();
2486 let transport = Arc::clone(&self.transport);
2487 let formation_key_topology = formation_key.clone();
2488
2489 tokio::spawn(async move {
2490 use crate::network::formation_handshake::perform_initiator_handshake;
2491 use crate::network::PeerInfo as NetworkPeerInfo;
2492
2493 let mut rx = {
2495 let mut guard = topology_rx.lock().await;
2496 guard.take()
2497 };
2498
2499 if let Some(ref mut receiver) = rx {
2500 tracing::info!("Topology-driven connection management enabled");
2501
2502 while let Some(event) = receiver.recv().await {
2503 match event {
2504 TopologyConnectionEvent::ConnectPeer {
2505 peer_id,
2506 addresses,
2507 relay_url,
2508 } => {
2509 tracing::debug!(
2510 peer_id = %peer_id,
2511 "Topology event: connecting to peer"
2512 );
2513
2514 let network_peer_info = NetworkPeerInfo {
2515 name: peer_id.clone(),
2516 node_id: peer_id.clone(),
2517 addresses,
2518 relay_url,
2519 };
2520
2521 match transport.connect_peer(&network_peer_info).await {
2522 Ok(Some(conn)) => {
2523 tokio::task::yield_now().await;
2525 tokio::time::sleep(tokio::time::Duration::from_millis(10))
2526 .await;
2527
2528 if conn.close_reason().is_some() {
2529 tracing::debug!(
2530 "Topology peer {} superseded by accept path",
2531 peer_id
2532 );
2533 continue;
2534 }
2535
2536 match perform_initiator_handshake(
2538 &conn,
2539 &formation_key_topology,
2540 )
2541 .await
2542 {
2543 Ok(()) => {
2544 if let Ok(bytes) = hex::decode(&peer_id) {
2546 if bytes.len() == 32 {
2547 let mut array = [0u8; 32];
2548 array.copy_from_slice(&bytes);
2549 if let Ok(endpoint_id) =
2550 iroh::EndpointId::from_bytes(&array)
2551 {
2552 transport
2553 .emit_peer_connected(endpoint_id);
2554 tracing::info!(
2555 "Topology: connected and authenticated with peer: {}",
2556 peer_id
2557 );
2558 }
2559 }
2560 }
2561 }
2562 Err(e) => {
2563 tracing::warn!(
2564 "Topology peer {} failed authentication: {}",
2565 peer_id,
2566 e
2567 );
2568 conn.close(1u32.into(), b"authentication failed");
2569 }
2570 }
2571 }
2572 Ok(None) => {
2573 tracing::debug!(
2574 "Topology peer {} handled by accept path",
2575 peer_id
2576 );
2577 }
2578 Err(e) => {
2579 tracing::debug!(
2580 "Failed to connect to topology peer {}: {}",
2581 peer_id,
2582 e
2583 );
2584 }
2585 }
2586 }
2587 TopologyConnectionEvent::DisconnectPeer { peer_id } => {
2588 tracing::debug!(
2589 peer_id = %peer_id,
2590 "Topology event: disconnecting from peer"
2591 );
2592 if let Ok(bytes) = hex::decode(&peer_id) {
2594 if bytes.len() == 32 {
2595 let mut array = [0u8; 32];
2596 array.copy_from_slice(&bytes);
2597 if let Ok(endpoint_id) =
2598 iroh::EndpointId::from_bytes(&array)
2599 {
2600 let _ = transport.disconnect(&endpoint_id);
2601 }
2602 }
2603 }
2604 }
2605 }
2606 }
2607 }
2608 tracing::debug!("Topology event handler stopped");
2609 });
2610 }
2611
2612 #[cfg(feature = "automerge-backend")]
2615 if !has_topology_events {
2616 let discovery_manager = Arc::clone(&self.discovery_manager);
2617 let transport = Arc::clone(&self.transport);
2618 let formation_key_connect = formation_key;
2619 let max_connections = self.max_connections;
2620
2621 tokio::spawn(async move {
2622 use crate::network::formation_handshake::perform_initiator_handshake;
2623 use crate::network::iroh_transport::TransportPeerEvent;
2624 use crate::network::PeerInfo as NetworkPeerInfo;
2625
2626 tracing::info!(
2627 "Discovery-based connection management enabled (max {} connections)",
2628 max_connections
2629 );
2630
2631 let mut peer_events = transport.subscribe_peer_events();
2633
2634 let mut interval_secs = 1u64;
2636 let mut consecutive_no_new_connections = 0u32;
2637
2638 loop {
2639 let sleep_future =
2642 tokio::time::sleep(std::time::Duration::from_secs(interval_secs));
2643 tokio::pin!(sleep_future);
2644
2645 tokio::select! {
2646 _ = &mut sleep_future => {
2647 }
2649 event = peer_events.recv() => {
2650 match event {
2651 Some(TransportPeerEvent::Disconnected { endpoint_id, reason }) => {
2652 tracing::debug!(
2653 peer = %endpoint_id.fmt_short(),
2654 reason = %reason,
2655 "Peer disconnected - triggering immediate reconnection attempt"
2656 );
2657 interval_secs = 1;
2659 consecutive_no_new_connections = 0;
2660 }
2661 Some(TransportPeerEvent::Connected { .. }) => {
2662 }
2664 None => {
2665 tracing::debug!("Peer event channel closed, stopping connection manager");
2667 break;
2668 }
2669 }
2670 }
2671 }
2672
2673 let current_connections = transport.connected_peers().len();
2675 if current_connections >= max_connections {
2676 tracing::debug!(
2677 "At max connections ({}/{}), skipping discovery connect cycle",
2678 current_connections,
2679 max_connections
2680 );
2681 consecutive_no_new_connections += 1;
2682 if consecutive_no_new_connections >= 3 && interval_secs < 5 {
2683 interval_secs = (interval_secs * 2).min(5);
2684 }
2685 continue;
2686 }
2687
2688 let manager = discovery_manager.read().await;
2690 let discovered_peers = manager.get_peers().await;
2691 drop(manager);
2692
2693 let mut made_new_connection = false;
2695 let slots_available = max_connections.saturating_sub(current_connections);
2696
2697 for peer in discovered_peers.into_iter().take(slots_available) {
2698 let network_peer_info = NetworkPeerInfo {
2700 name: peer.name.clone(),
2701 node_id: peer.node_id.clone(),
2702 addresses: peer.addresses.clone(),
2703 relay_url: peer.relay_url.clone(),
2704 };
2705
2706 if let Ok(endpoint_id) = peer.endpoint_id() {
2711 match transport.connect_peer(&network_peer_info).await {
2712 Ok(Some(conn)) => {
2713 tokio::task::yield_now().await;
2719 tokio::time::sleep(tokio::time::Duration::from_millis(10))
2720 .await;
2721
2722 if conn.close_reason().is_some() {
2725 tracing::debug!(
2726 "Peer {} connection superseded by accept path",
2727 peer.name
2728 );
2729 continue;
2730 }
2731
2732 match perform_initiator_handshake(&conn, &formation_key_connect)
2734 .await
2735 {
2736 Ok(()) => {
2737 tracing::info!(
2738 "Connected and authenticated with peer: {} ({}/{})",
2739 peer.name,
2740 transport.connected_peers().len(),
2741 max_connections
2742 );
2743 transport.emit_peer_connected(endpoint_id);
2745 made_new_connection = true;
2746 }
2747 Err(e) => {
2748 tracing::warn!(
2749 "Peer {} failed authentication: {}. Disconnecting.",
2750 peer.name,
2751 e
2752 );
2753 conn.close(1u32.into(), b"authentication failed");
2757 }
2758 }
2759 }
2760 Ok(None) => {
2761 tracing::debug!(
2763 "Peer {} connection handled by accept path",
2764 peer.name
2765 );
2766 }
2767 Err(e) => {
2768 tracing::debug!(
2769 "Failed to connect to discovered peer {}: {}",
2770 peer.name,
2771 e
2772 );
2773 }
2774 }
2775 }
2776 }
2777
2778 if made_new_connection {
2780 interval_secs = 1;
2782 consecutive_no_new_connections = 0;
2783 } else {
2784 consecutive_no_new_connections += 1;
2785 if consecutive_no_new_connections >= 3 && interval_secs < 5 {
2787 interval_secs = (interval_secs * 2).min(5);
2788 tracing::debug!(
2789 "Mesh stable, increasing connect interval to {}s",
2790 interval_secs
2791 );
2792 }
2793 }
2794 }
2795 });
2796 }
2797
2798 Ok(())
2799 }
2800
2801 async fn stop(&self) -> anyhow::Result<()> {
2802 Ok(())
2803 }
2804
2805 async fn discovered_peers(&self) -> anyhow::Result<Vec<PeerInfo>> {
2806 let mut peers = Vec::new();
2807
2808 let peer_ids = self.transport.connected_peers();
2810 for peer_id in peer_ids {
2811 if self.transport.get_connection(&peer_id).is_some() {
2812 peers.push(PeerInfo {
2813 peer_id: hex::encode(peer_id.as_bytes()),
2814 address: None,
2815 transport: TransportType::Custom,
2816 connected: true,
2817 last_seen: std::time::SystemTime::now(),
2818 metadata: HashMap::new(),
2819 });
2820 }
2821 }
2822
2823 #[cfg(feature = "automerge-backend")]
2825 {
2826 let manager = self.discovery_manager.read().await;
2827 for discovered_peer in manager.get_peers().await {
2828 if !peers.iter().any(|p| p.peer_id == discovered_peer.node_id) {
2830 peers.push(PeerInfo {
2831 peer_id: discovered_peer.node_id.clone(),
2832 address: discovered_peer.addresses.first().cloned(),
2833 transport: TransportType::Custom,
2834 connected: false,
2835 last_seen: std::time::SystemTime::now(),
2836 metadata: HashMap::new(),
2837 });
2838 }
2839 }
2840 }
2841
2842 Ok(peers)
2843 }
2844
2845 async fn add_peer(&self, address: &str, _transport: TransportType) -> anyhow::Result<()> {
2846 use crate::network::iroh_transport::IrohTransport;
2847 use crate::network::PeerInfo as NetworkPeerInfo;
2848
2849 let formation_key = self
2851 .formation_key
2852 .read()
2853 .map_err(|_| Error::Internal("formation_key lock poisoned".into()))?
2854 .clone()
2855 .ok_or_else(|| Error::Internal("Formation key not initialized".to_string()))?;
2856
2857 let (node_id, socket_addr) = if address.contains('|') {
2863 let parts: Vec<&str> = address.splitn(2, '|').collect();
2865 if parts.len() != 2 {
2866 return Err(Error::Internal(format!(
2867 "Invalid address format: {}. Expected 'seed|host:port'",
2868 address
2869 ))
2870 .into());
2871 }
2872 let seed = parts[0];
2873 let addr = parts[1];
2874
2875 let endpoint_id = IrohTransport::endpoint_id_from_seed(seed);
2877 let node_id_hex = hex::encode(endpoint_id.as_bytes());
2878
2879 tracing::debug!(
2880 seed = seed,
2881 node_id = %node_id_hex,
2882 address = addr,
2883 "Derived EndpointId from seed for add_peer"
2884 );
2885
2886 (node_id_hex, addr.to_string())
2887 } else {
2888 (address.to_string(), address.to_string())
2891 };
2892
2893 let peer_info = NetworkPeerInfo {
2894 name: "manual-peer".to_string(),
2895 node_id,
2896 addresses: vec![socket_addr],
2897 relay_url: None,
2898 };
2899
2900 let conn_opt =
2902 self.transport
2903 .connect_peer(&peer_info)
2904 .await
2905 .map_err(|e| Error::Network {
2906 message: format!("Failed to connect to peer: {}", e),
2907 peer_id: None,
2908 source: None,
2909 })?;
2910
2911 #[cfg(feature = "automerge-backend")]
2913 if let Some(conn) = conn_opt {
2914 use crate::network::formation_handshake::perform_initiator_handshake;
2915
2916 let endpoint_id = conn.remote_id();
2917 if let Err(e) = perform_initiator_handshake(&conn, &formation_key).await {
2918 conn.close(1u32.into(), b"authentication failed");
2923
2924 return Err(Error::Network {
2925 message: format!("Peer authentication failed: {}", e),
2926 peer_id: Some(address.to_string()),
2927 source: None,
2928 }
2929 .into());
2930 }
2931 self.transport.emit_peer_connected(endpoint_id);
2933 }
2934 Ok(())
2937 }
2938
2939 async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> anyhow::Result<()> {
2940 let start = std::time::Instant::now();
2941
2942 loop {
2943 let peers = self.discovered_peers().await?;
2944 if peers.iter().any(|p| &p.peer_id == peer_id) {
2945 return Ok(());
2946 }
2947
2948 if start.elapsed() > timeout {
2949 return Err(Error::Network {
2950 message: format!("Timeout waiting for peer: {}", peer_id),
2951 peer_id: Some(peer_id.clone()),
2952 source: None,
2953 }
2954 .into());
2955 }
2956
2957 tokio::time::sleep(Duration::from_millis(100)).await;
2958 }
2959 }
2960
2961 fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
2962 if let Ok(mut callbacks) = self.peer_callbacks.lock() {
2963 callbacks.push(callback);
2964 }
2965
2966 if self
2969 .event_forwarder_running
2970 .compare_exchange(
2971 false,
2972 true,
2973 std::sync::atomic::Ordering::SeqCst,
2974 std::sync::atomic::Ordering::SeqCst,
2975 )
2976 .is_ok()
2977 {
2978 let mut rx = self.transport.subscribe_peer_events();
2980 let callbacks = Arc::clone(&self.peer_callbacks);
2981 let running = Arc::clone(&self.event_forwarder_running);
2982
2983 std::thread::spawn(move || {
2986 let rt = tokio::runtime::Builder::new_current_thread()
2987 .enable_all()
2988 .build()
2989 .expect("Failed to create event forwarder runtime");
2990
2991 rt.block_on(async move {
2992 use crate::network::TransportPeerEvent;
2993
2994 while running.load(std::sync::atomic::Ordering::SeqCst) {
2995 match tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
2996 .await
2997 {
2998 Ok(Some(transport_event)) => {
2999 let peer_event = match transport_event {
3001 TransportPeerEvent::Connected { endpoint_id, .. } => {
3002 PeerEvent::Connected(PeerInfo {
3003 peer_id: format!("{:?}", endpoint_id),
3004 address: None,
3005 transport: TransportType::Tcp, connected: true,
3007 last_seen: std::time::SystemTime::now(),
3008 metadata: std::collections::HashMap::new(),
3009 })
3010 }
3011 TransportPeerEvent::Disconnected {
3012 endpoint_id,
3013 reason,
3014 } => PeerEvent::Disconnected {
3015 peer_id: format!("{:?}", endpoint_id),
3016 reason: Some(reason),
3017 },
3018 };
3019
3020 if let Ok(cbs) = callbacks.lock() {
3022 for cb in cbs.iter() {
3023 cb(peer_event.clone());
3024 }
3025 }
3026 }
3027 Ok(None) => {
3028 break;
3030 }
3031 Err(_) => {
3032 }
3034 }
3035 }
3036 });
3037 });
3038 }
3039 }
3040
3041 async fn get_peer_info(&self, peer_id: &PeerId) -> anyhow::Result<Option<PeerInfo>> {
3042 let peers = self.discovered_peers().await?;
3043 Ok(peers.into_iter().find(|p| &p.peer_id == peer_id))
3044 }
3045}
3046
3047struct IrohSyncEngine {
3049 backend: Arc<crate::storage::AutomergeBackend>,
3050 transport: Arc<crate::network::IrohTransport>,
3051 formation_key: Option<crate::security::FormationKey>,
3052}
3053
3054#[async_trait]
3055impl SyncEngine for IrohSyncEngine {
3056 async fn start_sync(&self) -> anyhow::Result<()> {
3057 use crate::storage::capabilities::SyncCapable;
3058 self.backend.start_sync().map_err(|e| Error::Storage {
3059 message: format!("Failed to start sync: {}", e),
3060 operation: Some("start_sync".to_string()),
3061 key: None,
3062 source: None,
3063 })?;
3064 Ok(())
3065 }
3066
3067 async fn stop_sync(&self) -> anyhow::Result<()> {
3068 use crate::storage::capabilities::SyncCapable;
3069 self.backend.stop_sync().map_err(|e| Error::Storage {
3070 message: format!("Failed to stop sync: {}", e),
3071 operation: Some("stop_sync".to_string()),
3072 key: None,
3073 source: None,
3074 })?;
3075 Ok(())
3076 }
3077
3078 async fn subscribe(
3079 &self,
3080 collection: &str,
3081 _query: &Query,
3082 ) -> anyhow::Result<SyncSubscription> {
3083 Ok(SyncSubscription::new(collection, ()))
3084 }
3085
3086 async fn is_syncing(&self) -> anyhow::Result<bool> {
3087 use crate::storage::capabilities::SyncCapable;
3088 let stats = self.backend.sync_stats().map_err(|e| Error::Storage {
3089 message: format!("Failed to get sync stats: {}", e),
3090 operation: Some("sync_stats".to_string()),
3091 key: None,
3092 source: None,
3093 })?;
3094 Ok(stats.peer_count > 0)
3095 }
3096
3097 async fn connect_to_peer(
3102 &self,
3103 endpoint_id_hex: &str,
3104 addresses: &[String],
3105 ) -> anyhow::Result<bool> {
3106 use crate::network::PeerInfo as NetworkPeerInfo;
3107
3108 let endpoint_id_bytes = hex::decode(endpoint_id_hex)
3110 .map_err(|e| Error::Internal(format!("Invalid endpoint_id_hex: {}", e)))?;
3111
3112 if endpoint_id_bytes.len() != 32 {
3113 return Err(Error::Internal(format!(
3114 "Invalid endpoint_id_hex length: expected 32 bytes, got {}",
3115 endpoint_id_bytes.len()
3116 ))
3117 .into());
3118 }
3119
3120 let our_endpoint_id = self.transport.endpoint_id();
3134 let our_endpoint_hex = hex::encode(our_endpoint_id.as_bytes());
3135
3136 tracing::debug!(
3137 our_endpoint = %our_endpoint_hex,
3138 peer_endpoint = %endpoint_id_hex,
3139 addresses = ?addresses,
3140 "Connecting to peer via static configuration"
3141 );
3142
3143 let peer_info = NetworkPeerInfo {
3145 name: format!("peer-{}", &endpoint_id_hex[..8]),
3146 node_id: endpoint_id_hex.to_string(),
3147 addresses: addresses.to_vec(),
3148 relay_url: None,
3149 };
3150
3151 match self.transport.connect_peer(&peer_info).await {
3155 Ok(Some(conn)) => {
3156 if conn.close_reason().is_some() {
3158 tracing::debug!(
3159 peer_endpoint = %endpoint_id_hex,
3160 "Connection superseded by accept path"
3161 );
3162 return Ok(false);
3163 }
3164
3165 if let Some(ref formation_key) = self.formation_key {
3167 use crate::network::formation_handshake::perform_initiator_handshake;
3168 match perform_initiator_handshake(&conn, formation_key).await {
3169 Ok(()) => {
3170 tracing::info!(
3171 peer_endpoint = %endpoint_id_hex,
3172 "Successfully connected to peer and authenticated"
3173 );
3174 if let Ok(peer_id) = peer_info.endpoint_id() {
3176 self.transport.emit_peer_connected(peer_id);
3177 }
3178 Ok(true)
3179 }
3180 Err(e) => {
3181 tracing::warn!(
3182 peer_endpoint = %endpoint_id_hex,
3183 error = %e,
3184 "Peer authentication failed"
3185 );
3186 if let Ok(peer_id) = peer_info.endpoint_id() {
3188 conn.close(1u32.into(), b"authentication failed");
3189 self.transport.disconnect(&peer_id).ok();
3190 }
3191 Err(Error::Network {
3192 message: format!("Peer authentication failed: {}", e),
3193 peer_id: Some(endpoint_id_hex.to_string()),
3194 source: None,
3195 }
3196 .into())
3197 }
3198 }
3199 } else {
3200 tracing::info!(
3202 peer_endpoint = %endpoint_id_hex,
3203 "Successfully connected to peer (no authentication)"
3204 );
3205 if let Ok(peer_id) = peer_info.endpoint_id() {
3207 self.transport.emit_peer_connected(peer_id);
3208 }
3209 Ok(true)
3210 }
3211 }
3212 Ok(None) => {
3213 tracing::debug!(
3215 peer_endpoint = %endpoint_id_hex,
3216 "Connection handled by accept path"
3217 );
3218 Ok(true)
3220 }
3221 Err(e) => {
3222 tracing::warn!(
3223 peer_endpoint = %endpoint_id_hex,
3224 error = %e,
3225 "Failed to connect to peer"
3226 );
3227 Err(Error::Network {
3228 message: format!("Failed to connect to peer: {}", e),
3229 peer_id: Some(endpoint_id_hex.to_string()),
3230 source: None,
3231 }
3232 .into())
3233 }
3234 }
3235 }
3236}
3237
3238#[async_trait]
3240impl DataSyncBackend for AutomergeIrohBackend {
3241 async fn initialize(&self, config: BackendConfig) -> anyhow::Result<()> {
3242 let shared_key = config.shared_key.as_ref().ok_or_else(|| {
3244 Error::config_error(
3245 "AutomergeIroh backend requires PEAT_SECRET_KEY for peer authentication",
3246 Some("shared_key".to_string()),
3247 )
3248 })?;
3249
3250 let formation_key = crate::security::FormationKey::from_base64(&config.app_id, shared_key)
3253 .map_err(|e| {
3254 Error::config_error(
3255 format!(
3256 "Invalid shared_key format: {}. Expected base64-encoded 32-byte key.",
3257 e
3258 ),
3259 Some("shared_key".to_string()),
3260 )
3261 })?;
3262
3263 *self
3265 .formation_key
3266 .write()
3267 .map_err(|_| Error::Internal("formation_key lock poisoned".into()))? =
3268 Some(formation_key);
3269
3270 *self
3271 .initialized
3272 .lock()
3273 .map_err(|_| Error::Internal("initialized lock poisoned".into()))? = true;
3274 self.peer_discovery().start().await?;
3275 Ok(())
3276 }
3277
3278 async fn shutdown(&self) -> anyhow::Result<()> {
3279 if self.is_ready().await {
3280 let _ = self.sync_engine().stop_sync().await;
3281 let _ = self.peer_discovery().stop().await;
3282 }
3283 *self
3284 .initialized
3285 .lock()
3286 .map_err(|_| Error::Internal("initialized lock poisoned".into()))? = false;
3287 Ok(())
3288 }
3289
3290 fn document_store(&self) -> Arc<dyn DocumentStore> {
3291 let node_id = self.transport.endpoint_id().to_string();
3293 Arc::new(IrohDocumentStore {
3294 backend: Arc::clone(&self.backend),
3295 deletion_policy_registry: Arc::clone(&self.deletion_policy_registry),
3296 lamport_counter: Arc::clone(&self.lamport_counter),
3297 node_id,
3298 })
3299 }
3300
3301 fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
3302 Arc::new(IrohPeerDiscovery {
3303 transport: Arc::clone(&self.transport),
3304 peer_callbacks: Arc::clone(&self.peer_callbacks),
3305 #[cfg(feature = "automerge-backend")]
3306 discovery_manager: Arc::clone(&self.discovery_manager),
3307 #[cfg(feature = "automerge-backend")]
3308 formation_key: Arc::clone(&self.formation_key),
3309 event_forwarder_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3310 #[cfg(feature = "automerge-backend")]
3311 topology_event_rx: Arc::clone(&self.topology_event_rx),
3312 #[cfg(feature = "automerge-backend")]
3313 max_connections: self.max_connections,
3314 })
3315 }
3316
3317 fn sync_engine(&self) -> Arc<dyn SyncEngine> {
3318 Arc::new(IrohSyncEngine {
3319 backend: Arc::clone(&self.backend),
3320 transport: Arc::clone(&self.transport),
3321 formation_key: self.formation_key(),
3322 })
3323 }
3324
3325 async fn is_ready(&self) -> bool {
3326 self.initialized.lock().map(|guard| *guard).unwrap_or(false)
3327 }
3328
3329 fn backend_info(&self) -> BackendInfo {
3330 BackendInfo {
3331 name: "AutomergeIroh".to_string(),
3332 version: env!("CARGO_PKG_VERSION").to_string(),
3333 }
3334 }
3335
3336 fn as_any(&self) -> &dyn std::any::Any {
3337 self
3338 }
3339}
3340
3341#[cfg(feature = "automerge-backend")]
3344impl crate::storage::HierarchicalStorageCapable for AutomergeIrohBackend {
3345 fn summary_storage(&self) -> Arc<dyn crate::hierarchy::SummaryStorage> {
3346 crate::storage::HierarchicalStorageCapable::summary_storage(self.backend.as_ref())
3348 }
3349
3350 fn command_storage(&self) -> Arc<dyn crate::command::CommandStorage> {
3351 crate::storage::HierarchicalStorageCapable::command_storage(self.backend.as_ref())
3353 }
3354}
3355
3356fn evaluate_custom_query(doc: &Document, query_str: &str) -> bool {
3396 let trimmed = query_str.trim();
3397
3398 if let Some((left, right)) = split_compound(trimmed, " OR ") {
3401 return evaluate_custom_query(doc, left) || evaluate_custom_query(doc, right);
3402 }
3403
3404 if let Some((left, right)) = split_compound(trimmed, " AND ") {
3406 return evaluate_custom_query(doc, left) && evaluate_custom_query(doc, right);
3407 }
3408
3409 let expr = if trimmed.starts_with('(') && trimmed.ends_with(')') {
3411 &trimmed[1..trimmed.len() - 1]
3412 } else {
3413 trimmed
3414 };
3415
3416 if let Some(inner) = parse_not_expression(expr) {
3418 return !evaluate_custom_query(doc, inner);
3419 }
3420
3421 if expr.starts_with("CONTAINS(") && expr.ends_with(')') {
3423 return evaluate_contains(doc, expr);
3424 }
3425
3426 if let Some((field, is_null)) = parse_is_null(expr) {
3428 return evaluate_is_null(doc, field, is_null);
3429 }
3430
3431 if let Some((field, value)) = parse_inequality(expr) {
3434 return !evaluate_equality(doc, field, value);
3435 }
3436
3437 if let Some((field, value)) = parse_equality(expr) {
3439 return evaluate_equality(doc, field, value);
3440 }
3441
3442 if let Some((field, pattern)) = parse_like(expr) {
3444 return evaluate_like(doc, field, pattern);
3445 }
3446
3447 if let Some((field, values)) = parse_in(expr) {
3449 return evaluate_in(doc, field, &values);
3450 }
3451
3452 if let Some((field, prefix)) = parse_starts_with(expr) {
3454 return evaluate_starts_with(doc, field, prefix);
3455 }
3456
3457 if let Some((field, suffix)) = parse_ends_with(expr) {
3459 return evaluate_ends_with(doc, field, suffix);
3460 }
3461
3462 true
3465}
3466
3467fn split_compound<'a>(expr: &'a str, delimiter: &str) -> Option<(&'a str, &'a str)> {
3469 let mut depth = 0;
3470 let mut in_quote = false;
3471 let bytes = expr.as_bytes();
3472
3473 for i in 0..expr.len() {
3474 match bytes[i] {
3475 b'\'' => in_quote = !in_quote,
3476 b'(' if !in_quote => depth += 1,
3477 b')' if !in_quote => depth -= 1,
3478 _ if !in_quote && depth == 0 && expr[i..].starts_with(delimiter) => {
3479 return Some((&expr[..i], &expr[i + delimiter.len()..]));
3480 }
3481 _ => {}
3482 }
3483 }
3484 None
3485}
3486
3487fn parse_equality(expr: &str) -> Option<(&str, &str)> {
3489 let parts: Vec<&str> = expr.splitn(2, "==").collect();
3490 if parts.len() == 2 {
3491 let field = parts[0].trim();
3492 let value = parts[1].trim();
3493 Some((field, value))
3494 } else {
3495 None
3496 }
3497}
3498
3499fn parse_starts_with(expr: &str) -> Option<(&str, &str)> {
3501 let upper = expr.to_uppercase();
3502 if let Some(idx) = upper.find(" STARTS WITH ") {
3503 let field = expr[..idx].trim();
3504 let value = expr[idx + 13..].trim(); Some((field, value))
3506 } else {
3507 None
3508 }
3509}
3510
3511fn parse_ends_with(expr: &str) -> Option<(&str, &str)> {
3513 let upper = expr.to_uppercase();
3514 if let Some(idx) = upper.find(" ENDS WITH ") {
3515 let field = expr[..idx].trim();
3516 let value = expr[idx + 11..].trim(); Some((field, value))
3518 } else {
3519 None
3520 }
3521}
3522
3523fn extract_string_literal(value: &str) -> Option<&str> {
3525 let trimmed = value.trim();
3526 if trimmed.starts_with('\'') && trimmed.ends_with('\'') && trimmed.len() >= 2 {
3527 Some(&trimmed[1..trimmed.len() - 1])
3528 } else {
3529 None
3530 }
3531}
3532
3533fn evaluate_contains(doc: &Document, expr: &str) -> bool {
3535 let inner = &expr[9..expr.len() - 1]; let parts: Vec<&str> = inner.splitn(2, ',').collect();
3538
3539 if parts.len() != 2 {
3540 return true; }
3542
3543 let field = parts[0].trim();
3544 let value = parts[1].trim();
3545
3546 let search_value = match extract_string_literal(value) {
3547 Some(v) => v,
3548 None => return true, };
3550
3551 match doc.get(field) {
3553 Some(serde_json::Value::Array(arr)) => arr.iter().any(|item| {
3554 if let Some(s) = item.as_str() {
3555 s == search_value
3556 } else {
3557 false
3558 }
3559 }),
3560 Some(serde_json::Value::String(s)) => s.contains(search_value),
3561 _ => false,
3562 }
3563}
3564
3565fn evaluate_equality(doc: &Document, field: &str, value: &str) -> bool {
3568 if value == "true" {
3570 return get_nested_field(doc, field)
3571 .and_then(|v| v.as_bool())
3572 .unwrap_or(false);
3573 }
3574 if value == "false" {
3575 return !get_nested_field(doc, field)
3576 .and_then(|v| v.as_bool())
3577 .unwrap_or(true);
3578 }
3579
3580 if let Some(string_value) = extract_string_literal(value) {
3582 return match get_nested_field(doc, field) {
3583 Some(serde_json::Value::String(s)) => s == string_value,
3584 _ => false,
3585 };
3586 }
3587
3588 if let Ok(num) = value.parse::<i64>() {
3590 return match get_nested_field(doc, field) {
3591 Some(serde_json::Value::Number(n)) => n.as_i64() == Some(num),
3592 _ => false,
3593 };
3594 }
3595 if let Ok(num) = value.parse::<f64>() {
3596 return match get_nested_field(doc, field) {
3597 Some(serde_json::Value::Number(n)) => n
3598 .as_f64()
3599 .map(|f| (f - num).abs() < f64::EPSILON)
3600 .unwrap_or(false),
3601 _ => false,
3602 };
3603 }
3604
3605 true
3607}
3608
3609fn evaluate_starts_with(doc: &Document, field: &str, value: &str) -> bool {
3611 let prefix = match extract_string_literal(value) {
3612 Some(v) => v,
3613 None => return true, };
3615
3616 match doc.get(field) {
3617 Some(serde_json::Value::String(s)) => s.starts_with(prefix),
3618 _ => false,
3619 }
3620}
3621
3622fn evaluate_ends_with(doc: &Document, field: &str, value: &str) -> bool {
3624 let suffix = match extract_string_literal(value) {
3625 Some(v) => v,
3626 None => return true, };
3628
3629 match doc.get(field) {
3630 Some(serde_json::Value::String(s)) => s.ends_with(suffix),
3631 _ => false,
3632 }
3633}
3634
3635fn parse_inequality(expr: &str) -> Option<(&str, &str)> {
3641 if let Some(idx) = expr.find("!=") {
3643 let field = expr[..idx].trim();
3644 let value = expr[idx + 2..].trim();
3645 if !field.is_empty() && !value.is_empty() {
3647 return Some((field, value));
3648 }
3649 }
3650 None
3651}
3652
3653fn parse_not_expression(expr: &str) -> Option<&str> {
3655 let upper = expr.to_uppercase();
3656 if upper.starts_with("NOT ") {
3657 let rest = expr[4..].trim();
3658 if rest.starts_with('(') && rest.ends_with(')') {
3660 Some(&rest[1..rest.len() - 1])
3661 } else {
3662 Some(rest)
3663 }
3664 } else {
3665 None
3666 }
3667}
3668
3669fn parse_is_null(expr: &str) -> Option<(&str, bool)> {
3671 let upper = expr.to_uppercase();
3672 if let Some(idx) = upper.find(" IS NOT NULL") {
3673 let field = expr[..idx].trim();
3674 return Some((field, false)); }
3676 if let Some(idx) = upper.find(" IS NULL") {
3677 let field = expr[..idx].trim();
3678 return Some((field, true)); }
3680 None
3681}
3682
3683fn evaluate_is_null(doc: &Document, field: &str, is_null: bool) -> bool {
3685 let field_value = get_nested_field(doc, field);
3686 let value_is_null = field_value.is_none() || field_value == Some(&serde_json::Value::Null);
3687 if is_null {
3688 value_is_null
3689 } else {
3690 !value_is_null
3691 }
3692}
3693
3694fn parse_like(expr: &str) -> Option<(&str, &str)> {
3696 let upper = expr.to_uppercase();
3697 if let Some(idx) = upper.find(" LIKE ") {
3698 let field = expr[..idx].trim();
3699 let pattern = expr[idx + 6..].trim(); Some((field, pattern))
3701 } else {
3702 None
3703 }
3704}
3705
3706fn evaluate_like(doc: &Document, field: &str, pattern: &str) -> bool {
3708 let pattern_str = match extract_string_literal(pattern) {
3709 Some(v) => v,
3710 None => return true, };
3712
3713 let field_value = match get_nested_field(doc, field) {
3714 Some(serde_json::Value::String(s)) => s.as_str(),
3715 _ => return false,
3716 };
3717
3718 match_like_pattern(field_value, pattern_str)
3722}
3723
3724fn match_like_pattern(value: &str, pattern: &str) -> bool {
3726 let segments: Vec<&str> = pattern.split('%').collect();
3728
3729 if segments.is_empty() {
3730 return true;
3731 }
3732
3733 if segments.iter().all(|s| s.is_empty()) {
3735 return true;
3736 }
3737
3738 let mut pos = 0;
3739 let starts_with_wildcard = pattern.starts_with('%');
3740 let ends_with_wildcard = pattern.ends_with('%');
3741
3742 for (i, segment) in segments.iter().enumerate() {
3743 if segment.is_empty() {
3744 continue;
3745 }
3746
3747 if i == 0 && !starts_with_wildcard {
3748 if !value.starts_with(segment) {
3750 return false;
3751 }
3752 pos = segment.len();
3753 } else if i == segments.len() - 1 && !ends_with_wildcard {
3754 if !value.ends_with(segment) {
3756 return false;
3757 }
3758 } else {
3759 if let Some(found_pos) = value[pos..].find(segment) {
3761 pos += found_pos + segment.len();
3762 } else {
3763 return false;
3764 }
3765 }
3766 }
3767
3768 true
3769}
3770
3771fn parse_in(expr: &str) -> Option<(&str, Vec<String>)> {
3773 let upper = expr.to_uppercase();
3774 if let Some(idx) = upper.find(" IN ") {
3775 let field = expr[..idx].trim();
3776 let values_str = expr[idx + 4..].trim(); if values_str.starts_with('[') && values_str.ends_with(']') {
3780 let inner = &values_str[1..values_str.len() - 1];
3781 let values: Vec<String> = inner
3782 .split(',')
3783 .map(|v| {
3784 let trimmed = v.trim();
3785 if let Some(s) = extract_string_literal(trimmed) {
3787 s.to_string()
3788 } else {
3789 trimmed.to_string()
3790 }
3791 })
3792 .collect();
3793 return Some((field, values));
3794 }
3795 }
3796 None
3797}
3798
3799fn evaluate_in(doc: &Document, field: &str, values: &[String]) -> bool {
3801 let field_value = match get_nested_field(doc, field) {
3802 Some(v) => v,
3803 None => return false,
3804 };
3805
3806 match field_value {
3807 serde_json::Value::String(s) => values.iter().any(|v| v == s),
3808 serde_json::Value::Number(n) => {
3809 if let Some(i) = n.as_i64() {
3810 values.iter().any(|v| v.parse::<i64>().ok() == Some(i))
3811 } else if let Some(f) = n.as_f64() {
3812 values.iter().any(|v| {
3813 v.parse::<f64>()
3814 .ok()
3815 .map(|vf| (vf - f).abs() < f64::EPSILON)
3816 == Some(true)
3817 })
3818 } else {
3819 false
3820 }
3821 }
3822 serde_json::Value::Bool(b) => {
3823 let bool_str = if *b { "true" } else { "false" };
3824 values.iter().any(|v| v == bool_str)
3825 }
3826 _ => false,
3827 }
3828}
3829
3830fn get_nested_field<'a>(doc: &'a Document, field: &str) -> Option<&'a serde_json::Value> {
3833 if !field.contains('.') {
3834 return doc.get(field);
3836 }
3837
3838 let parts: Vec<&str> = field.split('.').collect();
3840 let mut current = doc.get(parts[0])?;
3841
3842 for part in &parts[1..] {
3843 match current {
3844 serde_json::Value::Object(obj) => {
3845 current = obj.get(*part)?;
3846 }
3847 _ => return None,
3848 }
3849 }
3850
3851 Some(current)
3852}
3853
3854fn matches_query(doc: &Document, query: &Query) -> bool {
3856 match query {
3857 Query::All => true,
3858 Query::Eq { field, value } => {
3859 if field == "id" {
3861 if let Some(ref doc_id) = doc.id {
3862 if let Some(value_str) = value.as_str() {
3863 return doc_id == value_str;
3864 }
3865 }
3866 return false;
3867 }
3868 doc.get(field) == Some(value)
3869 }
3870 Query::Lt { field, value } => {
3871 if let Some(doc_val) = doc.get(field) {
3872 compare_values(doc_val, value) < 0
3873 } else {
3874 false
3875 }
3876 }
3877 Query::Gt { field, value } => {
3878 if let Some(doc_val) = doc.get(field) {
3879 compare_values(doc_val, value) > 0
3880 } else {
3881 false
3882 }
3883 }
3884 Query::And(queries) => queries.iter().all(|q| matches_query(doc, q)),
3885 Query::Or(queries) => queries.iter().any(|q| matches_query(doc, q)),
3886
3887 Query::Custom(query_str) => evaluate_custom_query(doc, query_str),
3890
3891 Query::WithinRadius {
3893 center,
3894 radius_meters,
3895 lat_field,
3896 lon_field,
3897 } => {
3898 let lat_key = lat_field.as_deref().unwrap_or("lat");
3899 let lon_key = lon_field.as_deref().unwrap_or("lon");
3900
3901 if let (Some(lat_val), Some(lon_val)) = (
3902 doc.get(lat_key).and_then(|v| v.as_f64()),
3903 doc.get(lon_key).and_then(|v| v.as_f64()),
3904 ) {
3905 let doc_point = GeoPoint::new(lat_val, lon_val);
3906 doc_point.within_radius(center, *radius_meters)
3907 } else {
3908 false
3909 }
3910 }
3911
3912 Query::WithinBounds {
3913 min,
3914 max,
3915 lat_field,
3916 lon_field,
3917 } => {
3918 let lat_key = lat_field.as_deref().unwrap_or("lat");
3919 let lon_key = lon_field.as_deref().unwrap_or("lon");
3920
3921 if let (Some(lat_val), Some(lon_val)) = (
3922 doc.get(lat_key).and_then(|v| v.as_f64()),
3923 doc.get(lon_key).and_then(|v| v.as_f64()),
3924 ) {
3925 let doc_point = GeoPoint::new(lat_val, lon_val);
3926 doc_point.within_bounds(min, max)
3927 } else {
3928 false
3929 }
3930 }
3931
3932 Query::Not(inner) => !matches_query(doc, inner),
3934
3935 Query::IncludeDeleted(inner) => {
3937 matches_query(doc, inner)
3940 }
3941
3942 Query::DeletedOnly => {
3943 doc.fields
3945 .get("_deleted")
3946 .and_then(|v| v.as_bool())
3947 .unwrap_or(false)
3948 }
3949 }
3950}
3951
3952fn compare_values(a: &serde_json::Value, b: &serde_json::Value) -> i32 {
3953 use serde_json::Value as V;
3954
3955 match (a, b) {
3956 (V::Number(n1), V::Number(n2)) => {
3957 if let (Some(f1), Some(f2)) = (n1.as_f64(), n2.as_f64()) {
3958 if f1 < f2 {
3959 -1
3960 } else if f1 > f2 {
3961 1
3962 } else {
3963 0
3964 }
3965 } else {
3966 0
3967 }
3968 }
3969 (V::String(s1), V::String(s2)) => s1.cmp(s2) as i32,
3970 _ => 0,
3971 }
3972}
3973
3974#[cfg(test)]
3975mod tests {
3976 use super::*;
3977 use std::collections::HashMap;
3978 use std::path::PathBuf;
3979
3980 fn test_config() -> BackendConfig {
3982 let test_secret = crate::security::FormationKey::generate_secret();
3984 BackendConfig {
3985 app_id: "test_app".to_string(),
3986 persistence_dir: PathBuf::from("/tmp/automerge_test"),
3987 shared_key: Some(test_secret),
3988 transport: TransportConfig::default(),
3989 extra: HashMap::new(),
3990 }
3991 }
3992
3993 #[tokio::test]
3994 async fn test_automerge_backend_creation() {
3995 let backend = AutomergeBackend::new();
3996 assert!(!backend.is_ready().await);
3997 }
3998
3999 #[tokio::test]
4000 async fn test_document_upsert() {
4001 let backend = AutomergeBackend::new();
4002 backend.initialize(test_config()).await.unwrap();
4003
4004 let mut fields = HashMap::new();
4005 fields.insert("name".to_string(), serde_json::json!("test"));
4006 fields.insert("value".to_string(), serde_json::json!(42));
4007
4008 let doc = Document::new(fields);
4009 let doc_id = backend
4010 .document_store()
4011 .upsert("test_collection", doc)
4012 .await
4013 .unwrap();
4014
4015 assert!(!doc_id.is_empty());
4016 }
4017
4018 #[tokio::test]
4019 async fn test_document_query() {
4020 let backend = AutomergeBackend::new();
4021 backend.initialize(test_config()).await.unwrap();
4022
4023 let mut fields = HashMap::new();
4025 fields.insert("status".to_string(), serde_json::json!("active"));
4026 let doc = Document::new(fields);
4027 backend
4028 .document_store()
4029 .upsert("test_collection", doc)
4030 .await
4031 .unwrap();
4032
4033 let query = Query::Eq {
4035 field: "status".to_string(),
4036 value: serde_json::json!("active"),
4037 };
4038
4039 let results = backend
4040 .document_store()
4041 .query("test_collection", &query)
4042 .await
4043 .unwrap();
4044
4045 assert_eq!(results.len(), 1);
4046 }
4047
4048 #[tokio::test]
4049 async fn test_document_get() {
4050 let backend = AutomergeBackend::new();
4051 backend.initialize(test_config()).await.unwrap();
4052
4053 let mut fields = HashMap::new();
4055 fields.insert("data".to_string(), serde_json::json!("test_value"));
4056 let doc = Document::new(fields);
4057 let doc_id = backend
4058 .document_store()
4059 .upsert("test_coll", doc)
4060 .await
4061 .unwrap();
4062
4063 let retrieved = backend
4065 .document_store()
4066 .get("test_coll", &doc_id)
4067 .await
4068 .unwrap();
4069
4070 assert!(retrieved.is_some());
4071 let retrieved_doc = retrieved.unwrap();
4072 assert_eq!(
4073 retrieved_doc.fields.get("data").unwrap(),
4074 &serde_json::json!("test_value")
4075 );
4076 }
4077
4078 #[tokio::test]
4079 async fn test_document_remove() {
4080 let backend = AutomergeBackend::new();
4081 backend.initialize(test_config()).await.unwrap();
4082
4083 let mut fields = HashMap::new();
4085 fields.insert("temp".to_string(), serde_json::json!(true));
4086 let doc = Document::new(fields);
4087 let doc_id = backend
4088 .document_store()
4089 .upsert("temp_coll", doc)
4090 .await
4091 .unwrap();
4092
4093 backend
4095 .document_store()
4096 .remove("temp_coll", &doc_id)
4097 .await
4098 .unwrap();
4099
4100 let retrieved = backend
4102 .document_store()
4103 .get("temp_coll", &doc_id)
4104 .await
4105 .unwrap();
4106
4107 assert!(retrieved.is_none());
4108 }
4109}
4110
4111#[cfg(all(test, feature = "automerge-backend"))]
4113mod iroh_credential_tests {
4114 use super::*;
4115 use std::collections::HashMap;
4116 use std::path::PathBuf;
4117
4118 #[tokio::test]
4120 async fn test_automerge_iroh_requires_credentials() {
4121 let temp_dir = tempfile::tempdir().unwrap();
4123 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4124 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4125
4126 let backend = AutomergeIrohBackend::from_parts(store, transport);
4127
4128 let config = BackendConfig {
4130 app_id: "test_app".to_string(),
4131 persistence_dir: PathBuf::from("/tmp/test"),
4132 shared_key: None, transport: TransportConfig::default(),
4134 extra: HashMap::new(),
4135 };
4136
4137 let result = backend.initialize(config).await;
4138 assert!(result.is_err());
4139
4140 let error = result.unwrap_err();
4141 let error_msg = error.to_string();
4142 assert!(
4143 error_msg.contains("PEAT_SECRET_KEY") || error_msg.contains("shared_key"),
4144 "Error should mention missing credentials: {}",
4145 error_msg
4146 );
4147 }
4148
4149 #[tokio::test]
4151 async fn test_automerge_iroh_with_valid_credentials() {
4152 let temp_dir = tempfile::tempdir().unwrap();
4154 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4155 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4156
4157 let backend = AutomergeIrohBackend::from_parts(store, transport);
4158
4159 let test_secret = crate::security::FormationKey::generate_secret();
4161
4162 let config = BackendConfig {
4163 app_id: "test_formation".to_string(),
4164 persistence_dir: temp_dir.path().to_path_buf(),
4165 shared_key: Some(test_secret),
4166 transport: TransportConfig::default(),
4167 extra: HashMap::new(),
4168 };
4169
4170 let result = backend.initialize(config).await;
4171 assert!(result.is_ok(), "Should initialize with valid credentials");
4172
4173 let formation_key = backend.formation_key();
4175 assert!(
4176 formation_key.is_some(),
4177 "Formation key should be set after initialization"
4178 );
4179 assert_eq!(
4180 formation_key.unwrap().formation_id(),
4181 "test_formation",
4182 "Formation ID should match app_id"
4183 );
4184 }
4185
4186 #[tokio::test]
4188 async fn test_automerge_iroh_rejects_invalid_key_format() {
4189 let temp_dir = tempfile::tempdir().unwrap();
4191 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4192 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4193
4194 let backend = AutomergeIrohBackend::from_parts(store, transport);
4195
4196 let config = BackendConfig {
4198 app_id: "test_app".to_string(),
4199 persistence_dir: PathBuf::from("/tmp/test"),
4200 shared_key: Some("not-valid-base64!!!".to_string()),
4201 transport: TransportConfig::default(),
4202 extra: HashMap::new(),
4203 };
4204
4205 let result = backend.initialize(config).await;
4206 assert!(result.is_err(), "Should reject invalid base64 key");
4207
4208 let error_msg = result.unwrap_err().to_string();
4209 assert!(
4210 error_msg.contains("Invalid shared_key format"),
4211 "Error should mention invalid format: {}",
4212 error_msg
4213 );
4214 }
4215}
4216
4217#[cfg(all(test, feature = "automerge-backend"))]
4219mod issue_271_clone_tests {
4220 use super::*;
4221
4222 #[tokio::test]
4228 async fn test_clone_shares_transport_arc() {
4229 let temp_dir = tempfile::tempdir().unwrap();
4231 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4232 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4233
4234 let original = AutomergeIrohBackend::from_parts(store, Arc::clone(&transport));
4235 let cloned = original.clone();
4236
4237 let original_transport_ptr = Arc::as_ptr(&original.transport());
4239 let cloned_transport_ptr = Arc::as_ptr(&cloned.transport());
4240
4241 assert_eq!(
4242 original_transport_ptr, cloned_transport_ptr,
4243 "Clone should share the same transport Arc, but got different pointers:\n Original: {:?}\n Clone: {:?}",
4244 original_transport_ptr, cloned_transport_ptr
4245 );
4246
4247 let source_transport_ptr = Arc::as_ptr(&transport);
4249 assert_eq!(
4250 original_transport_ptr, source_transport_ptr,
4251 "Original backend transport should be same as source transport Arc"
4252 );
4253 }
4254
4255 #[tokio::test]
4257 async fn test_clone_shares_backend_arc() {
4258 let temp_dir = tempfile::tempdir().unwrap();
4259 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4260 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4261
4262 let original = AutomergeIrohBackend::from_parts(store, transport);
4263 let cloned = original.clone();
4264
4265 assert_eq!(
4269 original.endpoint_id(),
4270 cloned.endpoint_id(),
4271 "Clone should have same endpoint_id as original"
4272 );
4273 }
4274
4275 #[tokio::test]
4280 async fn test_clone_shares_peer_count() {
4281 let temp_dir = tempfile::tempdir().unwrap();
4282 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4283 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4284
4285 let original = AutomergeIrohBackend::from_parts(store, Arc::clone(&transport));
4286 let cloned = original.clone();
4287
4288 let original_count = original.transport().peer_count();
4290 let cloned_count = cloned.transport().peer_count();
4291
4292 assert_eq!(
4293 original_count, cloned_count,
4294 "Original and clone should report same peer_count"
4295 );
4296 assert_eq!(original_count, 0, "Initial peer count should be 0");
4297
4298 assert_eq!(
4300 transport.peer_count(),
4301 original_count,
4302 "Source transport should have same count"
4303 );
4304 }
4305
4306 #[tokio::test]
4308 async fn test_clone_shares_formation_key() {
4309 let temp_dir = tempfile::tempdir().unwrap();
4310 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4311 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4312
4313 let original = AutomergeIrohBackend::from_parts(store, transport);
4314
4315 let test_secret = crate::security::FormationKey::generate_secret();
4317 let config = BackendConfig {
4318 app_id: "test_formation".to_string(),
4319 persistence_dir: temp_dir.path().to_path_buf(),
4320 shared_key: Some(test_secret),
4321 transport: TransportConfig::default(),
4322 extra: std::collections::HashMap::new(),
4323 };
4324 original.initialize(config).await.unwrap();
4325
4326 let cloned = original.clone();
4328
4329 let original_key = original.formation_key();
4331 let cloned_key = cloned.formation_key();
4332
4333 assert!(original_key.is_some(), "Original should have formation key");
4334 assert!(cloned_key.is_some(), "Clone should have formation key");
4335 assert_eq!(
4336 original_key.as_ref().map(|k| k.formation_id()),
4337 cloned_key.as_ref().map(|k| k.formation_id()),
4338 "Clone should share same formation key"
4339 );
4340 }
4341
4342 #[tokio::test]
4344 async fn test_clone_shares_initialized_state() {
4345 let temp_dir = tempfile::tempdir().unwrap();
4346 let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4347 let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4348
4349 let original = AutomergeIrohBackend::from_parts(store, transport);
4350
4351 let cloned_before = original.clone();
4353 assert!(
4354 !original.is_ready().await,
4355 "Original should not be ready before init"
4356 );
4357 assert!(
4358 !cloned_before.is_ready().await,
4359 "Clone should not be ready before init"
4360 );
4361
4362 let test_secret = crate::security::FormationKey::generate_secret();
4364 let config = BackendConfig {
4365 app_id: "test_formation".to_string(),
4366 persistence_dir: temp_dir.path().to_path_buf(),
4367 shared_key: Some(test_secret),
4368 transport: TransportConfig::default(),
4369 extra: std::collections::HashMap::new(),
4370 };
4371 original.initialize(config).await.unwrap();
4372
4373 assert!(
4376 original.is_ready().await,
4377 "Original should be ready after init"
4378 );
4379 assert!(
4380 cloned_before.is_ready().await,
4381 "Clone (created before init) should also be ready, proving Arc is shared"
4382 );
4383 }
4384
4385 fn deletion_test_config() -> BackendConfig {
4388 let test_secret = crate::security::FormationKey::generate_secret();
4389 BackendConfig {
4390 app_id: "deletion_test".to_string(),
4391 persistence_dir: std::path::PathBuf::from("/tmp/deletion_test"),
4392 shared_key: Some(test_secret),
4393 transport: TransportConfig::default(),
4394 extra: HashMap::new(),
4395 }
4396 }
4397
4398 #[tokio::test]
4399 async fn test_soft_delete() {
4400 let backend = AutomergeBackend::new();
4401 backend.initialize(deletion_test_config()).await.unwrap();
4402
4403 let mut fields = HashMap::new();
4405 fields.insert("data".to_string(), serde_json::json!("test_value"));
4406 let doc = Document::new(fields);
4407 let doc_id = backend
4408 .document_store()
4409 .upsert("test_collection", doc)
4410 .await
4411 .unwrap();
4412
4413 let retrieved = backend
4415 .document_store()
4416 .get("test_collection", &doc_id)
4417 .await
4418 .unwrap();
4419 assert!(retrieved.is_some());
4420 assert!(!backend
4421 .document_store()
4422 .is_deleted("test_collection", &doc_id)
4423 .await
4424 .unwrap());
4425
4426 let result = backend
4428 .document_store()
4429 .delete("test_collection", &doc_id, Some("test deletion"))
4430 .await
4431 .unwrap();
4432 assert!(result.deleted);
4433
4434 assert!(backend
4436 .document_store()
4437 .is_deleted("test_collection", &doc_id)
4438 .await
4439 .unwrap());
4440
4441 let deleted_doc = backend
4443 .document_store()
4444 .get("test_collection", &doc_id)
4445 .await
4446 .unwrap();
4447 assert!(deleted_doc.is_some());
4448 let deleted_doc = deleted_doc.unwrap();
4449 assert_eq!(
4450 deleted_doc.fields.get("_deleted"),
4451 Some(&serde_json::json!(true))
4452 );
4453 assert!(deleted_doc.fields.contains_key("_deleted_at"));
4454 assert_eq!(
4455 deleted_doc.fields.get("_deleted_reason"),
4456 Some(&serde_json::json!("test deletion"))
4457 );
4458 }
4459
4460 #[tokio::test]
4461 async fn test_tombstone_delete() {
4462 let backend = AutomergeBackend::new();
4463 backend.initialize(deletion_test_config()).await.unwrap();
4464
4465 backend.deletion_policy_registry.set(
4467 "tombstone_collection",
4468 crate::qos::DeletionPolicy::Tombstone {
4469 tombstone_ttl: std::time::Duration::from_secs(3600),
4470 delete_wins: true,
4471 },
4472 );
4473
4474 let mut fields = HashMap::new();
4476 fields.insert("data".to_string(), serde_json::json!("tombstone_test"));
4477 let doc = Document::new(fields);
4478 let doc_id = backend
4479 .document_store()
4480 .upsert("tombstone_collection", doc)
4481 .await
4482 .unwrap();
4483
4484 let result = backend
4486 .document_store()
4487 .delete("tombstone_collection", &doc_id, Some("removed"))
4488 .await
4489 .unwrap();
4490 assert!(result.deleted);
4491 assert!(result.tombstone_id.is_some());
4492 assert!(result.expires_at.is_some());
4493
4494 assert!(backend
4496 .document_store()
4497 .is_deleted("tombstone_collection", &doc_id)
4498 .await
4499 .unwrap());
4500
4501 let removed_doc = backend
4503 .document_store()
4504 .get("tombstone_collection", &doc_id)
4505 .await
4506 .unwrap();
4507 assert!(removed_doc.is_none());
4508
4509 let tombstones = backend
4511 .document_store()
4512 .get_tombstones("tombstone_collection")
4513 .await
4514 .unwrap();
4515 assert_eq!(tombstones.len(), 1);
4516 assert_eq!(tombstones[0].document_id, doc_id);
4517 assert_eq!(tombstones[0].reason, Some("removed".to_string()));
4518 assert_eq!(tombstones[0].deleted_by, "deletion_test");
4520 assert_eq!(tombstones[0].lamport, 0);
4522 }
4523
4524 #[tokio::test]
4525 async fn test_deletion_policy() {
4526 let backend = AutomergeBackend::new();
4527
4528 let policy = backend
4530 .document_store()
4531 .deletion_policy("unknown_collection");
4532 assert!(matches!(
4533 policy,
4534 crate::qos::DeletionPolicy::SoftDelete { .. }
4535 ));
4536
4537 assert!(matches!(
4539 backend.document_store().deletion_policy("beacons"),
4540 crate::qos::DeletionPolicy::ImplicitTTL { .. }
4541 ));
4542 assert!(matches!(
4543 backend.document_store().deletion_policy("nodes"),
4544 crate::qos::DeletionPolicy::Tombstone { .. }
4545 ));
4546 assert!(matches!(
4547 backend.document_store().deletion_policy("contact_reports"),
4548 crate::qos::DeletionPolicy::SoftDelete { .. }
4549 ));
4550 }
4551
4552 #[tokio::test]
4553 async fn test_apply_tombstone() {
4554 let backend = AutomergeBackend::new();
4555 backend.initialize(deletion_test_config()).await.unwrap();
4556
4557 let mut fields = HashMap::new();
4559 fields.insert("data".to_string(), serde_json::json!("to_be_deleted"));
4560 let doc = Document::new(fields);
4561 let doc_id = backend
4562 .document_store()
4563 .upsert("sync_test", doc)
4564 .await
4565 .unwrap();
4566
4567 let tombstone = crate::qos::Tombstone::with_reason(
4569 doc_id.clone(),
4570 "sync_test".to_string(),
4571 "remote_node".to_string(),
4572 1, "synced deletion",
4574 );
4575
4576 backend
4578 .document_store()
4579 .apply_tombstone(&tombstone)
4580 .await
4581 .unwrap();
4582
4583 assert!(backend
4585 .document_store()
4586 .is_deleted("sync_test", &doc_id)
4587 .await
4588 .unwrap());
4589
4590 let removed_doc = backend
4592 .document_store()
4593 .get("sync_test", &doc_id)
4594 .await
4595 .unwrap();
4596 assert!(removed_doc.is_none());
4597 }
4598
4599 #[tokio::test]
4602 async fn test_delete_increments_lamport() {
4603 let backend = AutomergeBackend::new();
4604 backend.initialize(deletion_test_config()).await.unwrap();
4605 backend.deletion_policy_registry.set(
4606 "nodes",
4607 crate::qos::DeletionPolicy::Tombstone {
4608 tombstone_ttl: std::time::Duration::from_secs(3600),
4609 delete_wins: true,
4610 },
4611 );
4612
4613 let doc1 = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4615 let doc2 = Document::new(HashMap::from([("x".to_string(), serde_json::json!(2))]));
4616 let id1 = backend
4617 .document_store()
4618 .upsert("nodes", doc1)
4619 .await
4620 .unwrap();
4621 let id2 = backend
4622 .document_store()
4623 .upsert("nodes", doc2)
4624 .await
4625 .unwrap();
4626
4627 backend
4629 .document_store()
4630 .delete("nodes", &id1, None)
4631 .await
4632 .unwrap();
4633 backend
4634 .document_store()
4635 .delete("nodes", &id2, None)
4636 .await
4637 .unwrap();
4638
4639 let tombstones = backend
4641 .document_store()
4642 .get_tombstones("nodes")
4643 .await
4644 .unwrap();
4645 assert_eq!(tombstones.len(), 2);
4646 let lamports: Vec<u64> = tombstones.iter().map(|t| t.lamport).collect();
4647 assert!(
4648 lamports.contains(&0) && lamports.contains(&1),
4649 "Expected lamport 0 and 1, got {:?}",
4650 lamports
4651 );
4652 }
4653
4654 #[tokio::test]
4655 async fn test_delete_uses_config_node_id() {
4656 let backend = AutomergeBackend::new();
4657
4658 backend.deletion_policy_registry.set(
4660 "test_coll",
4661 crate::qos::DeletionPolicy::Tombstone {
4662 tombstone_ttl: std::time::Duration::from_secs(3600),
4663 delete_wins: true,
4664 },
4665 );
4666 let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4667 let id = backend
4668 .document_store()
4669 .upsert("test_coll", doc)
4670 .await
4671 .unwrap();
4672 backend
4673 .document_store()
4674 .delete("test_coll", &id, None)
4675 .await
4676 .unwrap();
4677 let tombstones = backend
4678 .document_store()
4679 .get_tombstones("test_coll")
4680 .await
4681 .unwrap();
4682 assert_eq!(tombstones[0].deleted_by, "local");
4683
4684 let backend2 = AutomergeBackend::new();
4686 backend2.initialize(deletion_test_config()).await.unwrap();
4687 backend2.deletion_policy_registry.set(
4688 "test_coll",
4689 crate::qos::DeletionPolicy::Tombstone {
4690 tombstone_ttl: std::time::Duration::from_secs(3600),
4691 delete_wins: true,
4692 },
4693 );
4694 let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(2))]));
4695 let id2 = backend2
4696 .document_store()
4697 .upsert("test_coll", doc)
4698 .await
4699 .unwrap();
4700 backend2
4701 .document_store()
4702 .delete("test_coll", &id2, None)
4703 .await
4704 .unwrap();
4705 let tombstones2 = backend2
4706 .document_store()
4707 .get_tombstones("test_coll")
4708 .await
4709 .unwrap();
4710 assert_eq!(tombstones2[0].deleted_by, "deletion_test");
4711 }
4712
4713 #[tokio::test]
4714 async fn test_drain_pending_tombstones() {
4715 let backend = AutomergeBackend::new();
4716 backend.initialize(deletion_test_config()).await.unwrap();
4717 backend.deletion_policy_registry.set(
4718 "nodes",
4719 crate::qos::DeletionPolicy::Tombstone {
4720 tombstone_ttl: std::time::Duration::from_secs(3600),
4721 delete_wins: true,
4722 },
4723 );
4724
4725 assert!(backend.drain_pending_tombstones().is_empty());
4727
4728 let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4730 let id = backend.document_store().upsert("nodes", doc).await.unwrap();
4731 backend
4732 .document_store()
4733 .delete("nodes", &id, Some("test"))
4734 .await
4735 .unwrap();
4736
4737 let pending = backend.drain_pending_tombstones();
4739 assert_eq!(pending.len(), 1);
4740 assert_eq!(pending[0].tombstone.document_id, id);
4741 assert_eq!(pending[0].tombstone.collection, "nodes");
4742
4743 assert!(backend.drain_pending_tombstones().is_empty());
4745 }
4746
4747 #[tokio::test]
4748 async fn test_gc_starts_on_init() {
4749 let backend = AutomergeBackend::new();
4750 assert!(backend.gc_handle.lock().unwrap().is_none());
4752
4753 backend.initialize(deletion_test_config()).await.unwrap();
4754 assert!(backend.gc_handle.lock().unwrap().is_some());
4756
4757 backend.shutdown().await.unwrap();
4759 assert!(backend.gc_handle.lock().unwrap().is_none());
4760 }
4761
4762 #[tokio::test]
4763 async fn test_gc_store_impl() {
4764 use crate::qos::GcStore;
4765
4766 let backend = AutomergeBackend::new();
4767 backend.initialize(deletion_test_config()).await.unwrap();
4768 backend.deletion_policy_registry.set(
4769 "nodes",
4770 crate::qos::DeletionPolicy::Tombstone {
4771 tombstone_ttl: std::time::Duration::from_secs(3600),
4772 delete_wins: true,
4773 },
4774 );
4775
4776 let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4778 let id = backend.document_store().upsert("nodes", doc).await.unwrap();
4779 backend
4780 .document_store()
4781 .delete("nodes", &id, None)
4782 .await
4783 .unwrap();
4784
4785 assert!(backend.has_tombstone("nodes", &id).unwrap());
4787 assert_eq!(backend.get_all_tombstones().unwrap().len(), 1);
4788
4789 assert!(backend.remove_tombstone("nodes", &id).unwrap());
4791 assert!(!backend.has_tombstone("nodes", &id).unwrap());
4792 assert_eq!(backend.get_all_tombstones().unwrap().len(), 0);
4793
4794 let collections = backend.list_collections().unwrap();
4796 assert!(collections.is_empty() || collections.contains(&"nodes".to_string()));
4798 }
4799
4800 fn create_test_doc(fields: Vec<(&str, serde_json::Value)>) -> Document {
4806 let mut field_map = HashMap::new();
4807 for (key, value) in fields {
4808 field_map.insert(key.to_string(), value);
4809 }
4810 Document::new(field_map)
4811 }
4812
4813 #[test]
4814 fn test_custom_query_equality_string() {
4815 let doc = create_test_doc(vec![(
4817 "collection_name",
4818 serde_json::json!("squad_summaries"),
4819 )]);
4820
4821 assert!(evaluate_custom_query(
4822 &doc,
4823 "collection_name == 'squad_summaries'"
4824 ));
4825 assert!(!evaluate_custom_query(&doc, "collection_name == 'other'"));
4826 }
4827
4828 #[test]
4829 fn test_custom_query_equality_boolean() {
4830 let doc_public = create_test_doc(vec![("public", serde_json::json!(true))]);
4832 let doc_private = create_test_doc(vec![("public", serde_json::json!(false))]);
4833
4834 assert!(evaluate_custom_query(&doc_public, "public == true"));
4835 assert!(!evaluate_custom_query(&doc_public, "public == false"));
4836 assert!(evaluate_custom_query(&doc_private, "public == false"));
4837 assert!(!evaluate_custom_query(&doc_private, "public == true"));
4838 }
4839
4840 #[test]
4841 fn test_custom_query_starts_with() {
4842 let doc = create_test_doc(vec![("collection_name", serde_json::json!("squad-alpha"))]);
4844
4845 assert!(evaluate_custom_query(
4846 &doc,
4847 "collection_name STARTS WITH 'squad-'"
4848 ));
4849 assert!(evaluate_custom_query(
4850 &doc,
4851 "collection_name starts with 'squad-'"
4852 )); assert!(!evaluate_custom_query(
4854 &doc,
4855 "collection_name STARTS WITH 'platoon-'"
4856 ));
4857 }
4858
4859 #[test]
4860 fn test_custom_query_ends_with() {
4861 let doc = create_test_doc(vec![(
4863 "collection_name",
4864 serde_json::json!("squad.summaries"),
4865 )]);
4866
4867 assert!(evaluate_custom_query(
4868 &doc,
4869 "collection_name ENDS WITH '.summaries'"
4870 ));
4871 assert!(evaluate_custom_query(
4872 &doc,
4873 "collection_name ends with '.summaries'"
4874 )); assert!(!evaluate_custom_query(
4876 &doc,
4877 "collection_name ENDS WITH '.reports'"
4878 ));
4879 }
4880
4881 #[test]
4882 fn test_custom_query_contains_array() {
4883 let doc = create_test_doc(vec![(
4885 "authorized_roles",
4886 serde_json::json!(["soldier", "squad_leader"]),
4887 )]);
4888
4889 assert!(evaluate_custom_query(
4890 &doc,
4891 "CONTAINS(authorized_roles, 'soldier')"
4892 ));
4893 assert!(evaluate_custom_query(
4894 &doc,
4895 "CONTAINS(authorized_roles, 'squad_leader')"
4896 ));
4897 assert!(!evaluate_custom_query(
4898 &doc,
4899 "CONTAINS(authorized_roles, 'general')"
4900 ));
4901 }
4902
4903 #[test]
4904 fn test_custom_query_contains_string() {
4905 let doc = create_test_doc(vec![(
4907 "description",
4908 serde_json::json!("This is a squad summary document"),
4909 )]);
4910
4911 assert!(evaluate_custom_query(
4912 &doc,
4913 "CONTAINS(description, 'squad')"
4914 ));
4915 assert!(evaluate_custom_query(
4916 &doc,
4917 "CONTAINS(description, 'summary')"
4918 ));
4919 assert!(!evaluate_custom_query(
4920 &doc,
4921 "CONTAINS(description, 'platoon')"
4922 ));
4923 }
4924
4925 #[test]
4926 fn test_custom_query_or_compound() {
4927 let doc_node = create_test_doc(vec![("type", serde_json::json!("node_state"))]);
4929 let doc_squad = create_test_doc(vec![("type", serde_json::json!("squad_summary"))]);
4930 let doc_other = create_test_doc(vec![("type", serde_json::json!("other"))]);
4931
4932 let query = "type == 'node_state' OR type == 'squad_summary'";
4933 assert!(evaluate_custom_query(&doc_node, query));
4934 assert!(evaluate_custom_query(&doc_squad, query));
4935 assert!(!evaluate_custom_query(&doc_other, query));
4936 }
4937
4938 #[test]
4939 fn test_custom_query_and_compound() {
4940 let doc_match = create_test_doc(vec![
4942 ("public", serde_json::json!(true)),
4943 ("type", serde_json::json!("node_state")),
4944 ]);
4945 let doc_partial = create_test_doc(vec![
4946 ("public", serde_json::json!(true)),
4947 ("type", serde_json::json!("other")),
4948 ]);
4949
4950 let query = "public == true AND type == 'node_state'";
4951 assert!(evaluate_custom_query(&doc_match, query));
4952 assert!(!evaluate_custom_query(&doc_partial, query));
4953 }
4954
4955 #[test]
4956 fn test_custom_query_complex_compound() {
4957 let doc_public = create_test_doc(vec![
4959 ("public", serde_json::json!(true)),
4960 ("authorized_roles", serde_json::json!([])),
4961 ]);
4962 let doc_soldier = create_test_doc(vec![
4963 ("public", serde_json::json!(false)),
4964 ("authorized_roles", serde_json::json!(["soldier"])),
4965 ]);
4966 let doc_neither = create_test_doc(vec![
4967 ("public", serde_json::json!(false)),
4968 ("authorized_roles", serde_json::json!(["general"])),
4969 ]);
4970
4971 let query = "public == true OR CONTAINS(authorized_roles, 'soldier')";
4972 assert!(evaluate_custom_query(&doc_public, query));
4973 assert!(evaluate_custom_query(&doc_soldier, query));
4974 assert!(!evaluate_custom_query(&doc_neither, query));
4975 }
4976
4977 #[test]
4978 fn test_custom_query_with_parentheses() {
4979 let doc = create_test_doc(vec![(
4981 "collection_name",
4982 serde_json::json!("squad_summaries"),
4983 )]);
4984
4985 assert!(evaluate_custom_query(
4986 &doc,
4987 "(collection_name == 'squad_summaries')"
4988 ));
4989 }
4990
4991 #[test]
4992 fn test_custom_query_unknown_pattern_returns_true() {
4993 let doc = create_test_doc(vec![("field", serde_json::json!("value"))]);
4995
4996 assert!(evaluate_custom_query(&doc, "SOME_UNKNOWN_FUNCTION(x, y)"));
4998 assert!(evaluate_custom_query(&doc, "field BETWEEN 1 AND 10")); assert!(evaluate_custom_query(&doc, "field REGEXP '^test'")); }
5001
5002 #[test]
5003 fn test_custom_query_matches_query_integration() {
5004 let doc = create_test_doc(vec![(
5006 "collection_name",
5007 serde_json::json!("squad_summaries"),
5008 )]);
5009
5010 let query = Query::Custom("collection_name == 'squad_summaries'".to_string());
5011 assert!(matches_query(&doc, &query));
5012
5013 let query_no_match = Query::Custom("collection_name == 'other'".to_string());
5014 assert!(!matches_query(&doc, &query_no_match));
5015 }
5016
5017 #[test]
5018 fn test_custom_query_real_world_patterns() {
5019 let doc_summaries = create_test_doc(vec![(
5023 "collection_name",
5024 serde_json::json!("squad_summaries"),
5025 )]);
5026 assert!(evaluate_custom_query(
5027 &doc_summaries,
5028 "collection_name == 'squad_summaries'"
5029 ));
5030
5031 let doc_squad = create_test_doc(vec![
5033 ("collection_name", serde_json::json!("squad-1-alpha")),
5034 ("type", serde_json::json!("other")),
5035 ]);
5036 let doc_node = create_test_doc(vec![
5037 ("collection_name", serde_json::json!("other")),
5038 ("type", serde_json::json!("node_state")),
5039 ]);
5040 let query = "collection_name STARTS WITH 'squad-1' OR type == 'node_state'";
5041 assert!(evaluate_custom_query(&doc_squad, query));
5042 assert!(evaluate_custom_query(&doc_node, query));
5043
5044 let doc_suffix = create_test_doc(vec![
5046 ("collection_name", serde_json::json!("platoon.summaries")),
5047 ("type", serde_json::json!("other")),
5048 ]);
5049 let query2 = "collection_name ENDS WITH '.summaries' OR type == 'squad_summary'";
5050 assert!(evaluate_custom_query(&doc_suffix, query2));
5051
5052 let doc_with_role = create_test_doc(vec![
5054 ("public", serde_json::json!(false)),
5055 ("authorized_roles", serde_json::json!(["soldier", "medic"])),
5056 ]);
5057 let query3 = "public == true OR CONTAINS(authorized_roles, 'soldier')";
5058 assert!(evaluate_custom_query(&doc_with_role, query3));
5059 }
5060
5061 #[test]
5066 fn test_custom_query_inequality_string() {
5067 let doc = create_test_doc(vec![("status", serde_json::json!("active"))]);
5069
5070 assert!(evaluate_custom_query(&doc, "status != 'inactive'"));
5071 assert!(!evaluate_custom_query(&doc, "status != 'active'"));
5072 }
5073
5074 #[test]
5075 fn test_custom_query_inequality_boolean() {
5076 let doc_active = create_test_doc(vec![("enabled", serde_json::json!(true))]);
5078 let doc_inactive = create_test_doc(vec![("enabled", serde_json::json!(false))]);
5079
5080 assert!(evaluate_custom_query(&doc_active, "enabled != false"));
5081 assert!(!evaluate_custom_query(&doc_active, "enabled != true"));
5082 assert!(evaluate_custom_query(&doc_inactive, "enabled != true"));
5083 assert!(!evaluate_custom_query(&doc_inactive, "enabled != false"));
5084 }
5085
5086 #[test]
5087 fn test_custom_query_inequality_numeric() {
5088 let doc = create_test_doc(vec![("count", serde_json::json!(42))]);
5090
5091 assert!(evaluate_custom_query(&doc, "count != 0"));
5092 assert!(evaluate_custom_query(&doc, "count != 100"));
5093 assert!(!evaluate_custom_query(&doc, "count != 42"));
5094 }
5095
5096 #[test]
5097 fn test_custom_query_like_prefix() {
5098 let doc = create_test_doc(vec![("name", serde_json::json!("squad-alpha-1"))]);
5100
5101 assert!(evaluate_custom_query(&doc, "name LIKE 'squad%'"));
5102 assert!(evaluate_custom_query(&doc, "name like 'squad%'")); assert!(!evaluate_custom_query(&doc, "name LIKE 'platoon%'"));
5104 }
5105
5106 #[test]
5107 fn test_custom_query_like_suffix() {
5108 let doc = create_test_doc(vec![("filename", serde_json::json!("report.pdf"))]);
5110
5111 assert!(evaluate_custom_query(&doc, "filename LIKE '%.pdf'"));
5112 assert!(!evaluate_custom_query(&doc, "filename LIKE '%.doc'"));
5113 }
5114
5115 #[test]
5116 fn test_custom_query_like_contains() {
5117 let doc = create_test_doc(vec![(
5119 "description",
5120 serde_json::json!("This is a tactical mission report"),
5121 )]);
5122
5123 assert!(evaluate_custom_query(&doc, "description LIKE '%tactical%'"));
5124 assert!(evaluate_custom_query(&doc, "description LIKE '%mission%'"));
5125 assert!(!evaluate_custom_query(
5126 &doc,
5127 "description LIKE '%strategic%'"
5128 ));
5129 }
5130
5131 #[test]
5132 fn test_custom_query_like_complex() {
5133 let doc = create_test_doc(vec![("path", serde_json::json!("squad-alpha-report.json"))]);
5135
5136 assert!(evaluate_custom_query(&doc, "path LIKE 'squad%report%'")); assert!(evaluate_custom_query(&doc, "path LIKE '%alpha%json'")); assert!(evaluate_custom_query(&doc, "path LIKE 'squad%.json'")); }
5140
5141 #[test]
5142 fn test_custom_query_in_strings() {
5143 let doc = create_test_doc(vec![("role", serde_json::json!("soldier"))]);
5145
5146 assert!(evaluate_custom_query(
5147 &doc,
5148 "role IN ['soldier', 'medic', 'engineer']"
5149 ));
5150 assert!(!evaluate_custom_query(
5151 &doc,
5152 "role IN ['general', 'colonel']"
5153 ));
5154 }
5155
5156 #[test]
5157 fn test_custom_query_in_numbers() {
5158 let doc = create_test_doc(vec![("priority", serde_json::json!(2))]);
5160
5161 assert!(evaluate_custom_query(&doc, "priority IN [1, 2, 3]"));
5162 assert!(!evaluate_custom_query(&doc, "priority IN [4, 5, 6]"));
5163 }
5164
5165 #[test]
5166 fn test_custom_query_in_case_insensitive() {
5167 let doc = create_test_doc(vec![("status", serde_json::json!("active"))]);
5169
5170 assert!(evaluate_custom_query(
5171 &doc,
5172 "status in ['active', 'pending']"
5173 ));
5174 assert!(evaluate_custom_query(
5175 &doc,
5176 "status IN ['active', 'pending']"
5177 ));
5178 }
5179
5180 #[test]
5181 fn test_custom_query_not_expression() {
5182 let doc = create_test_doc(vec![("enabled", serde_json::json!(false))]);
5184
5185 assert!(evaluate_custom_query(&doc, "NOT (enabled == true)"));
5186 assert!(!evaluate_custom_query(&doc, "NOT (enabled == false)"));
5187 }
5188
5189 #[test]
5190 fn test_custom_query_not_without_parens() {
5191 let doc = create_test_doc(vec![("status", serde_json::json!("inactive"))]);
5193
5194 assert!(evaluate_custom_query(&doc, "NOT status == 'active'"));
5195 assert!(!evaluate_custom_query(&doc, "NOT status == 'inactive'"));
5196 }
5197
5198 #[test]
5199 fn test_custom_query_not_case_insensitive() {
5200 let doc = create_test_doc(vec![("flag", serde_json::json!(true))]);
5202
5203 assert!(evaluate_custom_query(&doc, "not (flag == false)"));
5204 assert!(evaluate_custom_query(&doc, "NOT (flag == false)"));
5205 }
5206
5207 #[test]
5208 fn test_custom_query_is_null() {
5209 let doc_with_null = create_test_doc(vec![("optional", serde_json::Value::Null)]);
5211 let doc_without_field = create_test_doc(vec![("other", serde_json::json!("value"))]);
5212 let doc_with_value = create_test_doc(vec![("optional", serde_json::json!("present"))]);
5213
5214 assert!(evaluate_custom_query(&doc_with_null, "optional IS NULL"));
5215 assert!(evaluate_custom_query(
5216 &doc_without_field,
5217 "optional IS NULL"
5218 ));
5219 assert!(!evaluate_custom_query(&doc_with_value, "optional IS NULL"));
5220 }
5221
5222 #[test]
5223 fn test_custom_query_is_not_null() {
5224 let doc_with_value = create_test_doc(vec![("required", serde_json::json!("value"))]);
5226 let doc_with_null = create_test_doc(vec![("required", serde_json::Value::Null)]);
5227 let doc_missing = create_test_doc(vec![("other", serde_json::json!("x"))]);
5228
5229 assert!(evaluate_custom_query(
5230 &doc_with_value,
5231 "required IS NOT NULL"
5232 ));
5233 assert!(!evaluate_custom_query(
5234 &doc_with_null,
5235 "required IS NOT NULL"
5236 ));
5237 assert!(!evaluate_custom_query(&doc_missing, "required IS NOT NULL"));
5238 }
5239
5240 #[test]
5241 fn test_custom_query_is_null_case_insensitive() {
5242 let doc = create_test_doc(vec![("field", serde_json::Value::Null)]);
5244
5245 assert!(evaluate_custom_query(&doc, "field is null"));
5246 assert!(evaluate_custom_query(&doc, "field IS NULL"));
5247 assert!(evaluate_custom_query(&doc, "field Is Null"));
5248 }
5249
5250 #[test]
5251 fn test_custom_query_nested_field_equality() {
5252 let doc = create_test_doc(vec![(
5254 "address",
5255 serde_json::json!({"city": "San Francisco", "state": "CA"}),
5256 )]);
5257
5258 assert!(evaluate_custom_query(
5259 &doc,
5260 "address.city == 'San Francisco'"
5261 ));
5262 assert!(evaluate_custom_query(&doc, "address.state == 'CA'"));
5263 assert!(!evaluate_custom_query(&doc, "address.city == 'New York'"));
5264 }
5265
5266 #[test]
5267 fn test_custom_query_nested_field_deep() {
5268 let doc = create_test_doc(vec![(
5270 "data",
5271 serde_json::json!({"level1": {"level2": {"value": "deep"}}}),
5272 )]);
5273
5274 assert!(evaluate_custom_query(
5275 &doc,
5276 "data.level1.level2.value == 'deep'"
5277 ));
5278 assert!(!evaluate_custom_query(
5279 &doc,
5280 "data.level1.level2.value == 'shallow'"
5281 ));
5282 }
5283
5284 #[test]
5285 fn test_custom_query_nested_field_is_null() {
5286 let doc = create_test_doc(vec![(
5288 "config",
5289 serde_json::json!({"enabled": true, "optional": null}),
5290 )]);
5291
5292 assert!(evaluate_custom_query(&doc, "config.optional IS NULL"));
5293 assert!(evaluate_custom_query(&doc, "config.missing IS NULL"));
5294 assert!(!evaluate_custom_query(&doc, "config.enabled IS NULL"));
5295 }
5296
5297 #[test]
5298 fn test_custom_query_nested_field_in() {
5299 let doc = create_test_doc(vec![(
5301 "user",
5302 serde_json::json!({"role": "admin", "level": 5}),
5303 )]);
5304
5305 assert!(evaluate_custom_query(
5306 &doc,
5307 "user.role IN ['admin', 'superuser']"
5308 ));
5309 assert!(evaluate_custom_query(&doc, "user.level IN [1, 5, 10]"));
5310 assert!(!evaluate_custom_query(
5311 &doc,
5312 "user.role IN ['guest', 'user']"
5313 ));
5314 }
5315
5316 #[test]
5317 fn test_custom_query_compound_with_new_patterns() {
5318 let doc = create_test_doc(vec![
5320 ("status", serde_json::json!("active")),
5321 ("priority", serde_json::json!(1)),
5322 ("optional", serde_json::Value::Null),
5323 ]);
5324
5325 assert!(evaluate_custom_query(
5327 &doc,
5328 "status != 'deleted' AND priority IN [1, 2, 3]"
5329 ));
5330
5331 assert!(evaluate_custom_query(
5333 &doc,
5334 "optional IS NULL OR status LIKE 'act%'"
5335 ));
5336
5337 assert!(evaluate_custom_query(
5339 &doc,
5340 "NOT (status == 'inactive') AND priority != 0"
5341 ));
5342 }
5343
5344 #[test]
5345 fn test_match_like_pattern_unit() {
5346 assert!(match_like_pattern("hello world", "%world"));
5348 assert!(match_like_pattern("hello world", "hello%"));
5349 assert!(match_like_pattern("hello world", "%lo wo%"));
5350 assert!(match_like_pattern("hello world", "%"));
5351 assert!(match_like_pattern("hello world", "%%"));
5352 assert!(match_like_pattern("hello world", "hello world"));
5353 assert!(!match_like_pattern("hello world", "goodbye%"));
5354 assert!(!match_like_pattern("hello world", "%goodbye"));
5355 assert!(!match_like_pattern("hello", "hello world"));
5356 }
5357
5358 #[test]
5363 fn test_automerge_scalar_counter_extraction() {
5364 use automerge::ScalarValue;
5366
5367 let counter = ScalarValue::counter(42);
5369
5370 if let ScalarValue::Counter(c) = &counter {
5372 let value: i64 = i64::from(c);
5373 assert_eq!(value, 42, "Counter value should be 42");
5374 } else {
5375 panic!("Expected Counter variant");
5376 }
5377 }
5378
5379 #[test]
5380 fn test_automerge_scalar_to_json_all_types() {
5381 use automerge::ScalarValue;
5383
5384 let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Str("hello".into()));
5386 assert_eq!(result, Some(serde_json::json!("hello")));
5387
5388 let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Int(-42));
5390 assert_eq!(result, Some(serde_json::json!(-42)));
5391
5392 let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Uint(42));
5394 assert_eq!(result, Some(serde_json::json!(42)));
5395
5396 let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::F64(1.234));
5398 assert!(result.is_some());
5399 if let Some(serde_json::Value::Number(n)) = result {
5400 assert!((n.as_f64().unwrap() - 1.234).abs() < 0.001);
5401 }
5402
5403 let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Boolean(true));
5405 assert_eq!(result, Some(serde_json::json!(true)));
5406
5407 let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Null);
5409 assert_eq!(result, Some(serde_json::Value::Null));
5410
5411 let result =
5413 AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Timestamp(1234567890));
5414 assert_eq!(result, Some(serde_json::json!(1234567890)));
5415
5416 let counter = ScalarValue::counter(100);
5418 if let ScalarValue::Counter(c) = &counter {
5419 let result =
5420 AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Counter(c.clone()));
5421 assert_eq!(
5422 result,
5423 Some(serde_json::json!(100)),
5424 "Counter should return actual value, not 0"
5425 );
5426 }
5427
5428 let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Bytes(vec![1, 2, 3]));
5430 assert_eq!(result, Some(serde_json::json!([1, 2, 3])));
5431 }
5432
5433 #[tokio::test]
5434 async fn test_nested_object_roundtrip() {
5435 let backend = AutomergeBackend::new();
5437 backend.initialize(deletion_test_config()).await.unwrap();
5438
5439 let nested_doc = Document::new(
5441 vec![
5442 ("name".to_string(), serde_json::json!("test")),
5443 (
5444 "metadata".to_string(),
5445 serde_json::json!({
5446 "version": 1,
5447 "author": "test_user"
5448 }),
5449 ),
5450 ("items".to_string(), serde_json::json!([1, 2, 3])),
5451 ]
5452 .into_iter()
5453 .collect(),
5454 );
5455
5456 let doc_id = backend
5457 .document_store()
5458 .upsert("nested_test", nested_doc)
5459 .await
5460 .unwrap();
5461
5462 let retrieved = backend
5464 .document_store()
5465 .get("nested_test", &doc_id)
5466 .await
5467 .unwrap()
5468 .expect("Document should exist");
5469
5470 assert_eq!(
5471 retrieved.fields.get("name"),
5472 Some(&serde_json::json!("test"))
5473 );
5474
5475 if let Some(metadata) = retrieved.fields.get("metadata") {
5477 assert!(metadata.is_object(), "metadata should be an object");
5478 if let Some(version) = metadata.get("version") {
5479 assert_eq!(version, &serde_json::json!(1));
5480 }
5481 if let Some(author) = metadata.get("author") {
5482 assert_eq!(author, &serde_json::json!("test_user"));
5483 }
5484 }
5485
5486 if let Some(items) = retrieved.fields.get("items") {
5488 assert!(items.is_array(), "items should be an array");
5489 assert_eq!(items, &serde_json::json!([1, 2, 3]));
5490 }
5491 }
5492}