Skip to main content

peat_protocol/sync/
automerge.rs

1//! Automerge-based implementation of DataSyncBackend
2//!
3//! This module provides a CRDT backend using the Automerge library (v0.7.1),
4//! enabling eventual consistency without requiring Ditto SDK.
5//!
6//! # Architecture
7//!
8//! - **Documents**: Stored as Automerge CRDTs indexed by collection:id
9//! - **Sync Protocol**: Uses Automerge's built-in sync state machine
10//! - **Query Engine**: In-memory filtering on exported JSON
11//!
12//! # Example
13//!
14//! ```text
15//! use peat_protocol::sync::automerge::AutomergeBackend;
16//! use peat_protocol::sync::traits::*;
17//! use peat_protocol::sync::types::*;
18//!
19//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
20//! let backend = AutomergeBackend::new();
21//! backend.initialize(BackendConfig::default()).await?;
22//!
23//! let doc_store = backend.document_store();
24//! // Use DocumentStore API...
25//! # Ok(())
26//! # }
27//! ```
28
29use async_trait::async_trait;
30use automerge::{sync, sync::SyncDoc, transaction::Transactable, Automerge};
31use std::collections::HashMap;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::{Arc, Mutex};
34use std::time::Duration;
35use tokio::sync::mpsc;
36
37use crate::error::{Error, Result};
38use crate::qos::{DeletionPolicy, DeletionPolicyRegistry, Tombstone, TombstoneSyncMessage};
39#[cfg(feature = "automerge-backend")]
40use crate::storage::automerge_conversion::automerge_to_message;
41use crate::sync::traits::*;
42use crate::sync::types::*;
43
44/// Automerge-based backend for CRDT synchronization
45///
46/// This backend implements all DataSyncBackend traits using Automerge as the
47/// underlying CRDT library, providing an alternative to Ditto.
48#[derive(Clone)]
49pub struct AutomergeBackend {
50    /// Automerge documents indexed by collection:id key
51    documents: Arc<Mutex<HashMap<String, Automerge>>>,
52
53    /// Sync states for each peer:document pair
54    sync_states: Arc<Mutex<HashMap<String, sync::State>>>,
55
56    /// Configuration
57    config: Arc<Mutex<Option<BackendConfig>>>,
58
59    /// Initialized flag
60    initialized: Arc<Mutex<bool>>,
61
62    /// Change notification channels for observers
63    observers: Arc<Mutex<Vec<mpsc::UnboundedSender<ChangeEvent>>>>,
64
65    /// Tombstone storage indexed by collection:doc_id (ADR-034)
66    tombstones: Arc<Mutex<HashMap<String, Tombstone>>>,
67
68    /// Deletion policy registry (ADR-034)
69    deletion_policy_registry: Arc<DeletionPolicyRegistry>,
70
71    /// Monotonic Lamport counter for tombstone ordering (ADR-034, Issue #668)
72    lamport_counter: Arc<AtomicU64>,
73
74    /// Pending tombstones awaiting propagation to peers (ADR-034, Issue #668)
75    pending_tombstones: Arc<Mutex<Vec<TombstoneSyncMessage>>>,
76
77    /// GC task handle for periodic tombstone/TTL cleanup (ADR-034, Issue #668)
78    gc_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
79}
80
81impl AutomergeBackend {
82    /// Create new AutomergeBackend
83    ///
84    /// # Example
85    ///
86    /// ```
87    /// use peat_protocol::sync::automerge::AutomergeBackend;
88    ///
89    /// let backend = AutomergeBackend::new();
90    /// ```
91    pub fn new() -> Self {
92        Self {
93            documents: Arc::new(Mutex::new(HashMap::new())),
94            sync_states: Arc::new(Mutex::new(HashMap::new())),
95            config: Arc::new(Mutex::new(None)),
96            initialized: Arc::new(Mutex::new(false)),
97            observers: Arc::new(Mutex::new(Vec::new())),
98            tombstones: Arc::new(Mutex::new(HashMap::new())),
99            deletion_policy_registry: Arc::new(DeletionPolicyRegistry::new()),
100            lamport_counter: Arc::new(AtomicU64::new(0)),
101            pending_tombstones: Arc::new(Mutex::new(Vec::new())),
102            gc_handle: Arc::new(Mutex::new(None)),
103        }
104    }
105
106    /// Helper: Generate document key from collection and ID
107    fn doc_key(collection: &str, doc_id: &DocumentId) -> String {
108        format!("{}:{}", collection, doc_id)
109    }
110
111    /// Get the node ID for tombstone attribution (ADR-034, Issue #668)
112    ///
113    /// Uses the app_id from BackendConfig if initialized, falls back to "local".
114    fn node_id(&self) -> String {
115        self.config
116            .lock()
117            .ok()
118            .and_then(|c| c.as_ref().map(|cfg| cfg.app_id.clone()))
119            .unwrap_or_else(|| "local".to_string())
120    }
121
122    /// Get the next Lamport timestamp for tombstone ordering (ADR-034, Issue #668)
123    fn next_lamport(&self) -> u64 {
124        self.lamport_counter.fetch_add(1, Ordering::SeqCst)
125    }
126
127    /// Drain pending tombstones for propagation to peers (ADR-034, Issue #668)
128    ///
129    /// Called by the sync coordinator to retrieve tombstones created since
130    /// the last drain. Returns and clears the pending queue.
131    pub fn drain_pending_tombstones(&self) -> Vec<TombstoneSyncMessage> {
132        self.pending_tombstones
133            .lock()
134            .map(|mut q| std::mem::take(&mut *q))
135            .unwrap_or_default()
136    }
137
138    /// Helper: Convert Automerge document to our Document type
139    ///
140    /// Issue #518: Now supports nested objects and proper Counter extraction.
141    fn automerge_to_document(doc: &Automerge, doc_id: DocumentId) -> Result<Document> {
142        use automerge::ReadDoc;
143
144        let mut fields = HashMap::new();
145
146        // Try to read from the root/root path
147        if let Ok(Some((automerge::Value::Object(automerge::ObjType::Map), obj_id))) =
148            doc.get(automerge::ROOT, "root")
149        {
150            // Iterate over the map entries
151            for item in doc.map_range(&obj_id, ..) {
152                let key_str = item.key.to_string();
153                if let Ok(Some((value, nested_id))) = doc.get(&obj_id, &item.key as &str) {
154                    // Convert the Automerge value to serde_json::Value
155                    // Pass the nested_id for recursive object traversal
156                    if let Some(json_val) = Self::automerge_value_to_json(doc, &value, &nested_id) {
157                        fields.insert(key_str, json_val);
158                    }
159                }
160            }
161        }
162
163        Ok(Document {
164            id: Some(doc_id),
165            fields,
166            updated_at: std::time::SystemTime::now(),
167        })
168    }
169
170    /// Helper: Convert Automerge value to serde_json::Value with nested object support.
171    ///
172    /// Issue #518: This function properly handles:
173    /// - Counter values (extracts actual i64 value)
174    /// - Nested objects (recursively traverses Maps and Lists)
175    /// - All scalar types
176    ///
177    /// # Arguments
178    /// * `doc` - The Automerge document for nested object traversal
179    /// * `value` - The Automerge value to convert
180    /// * `obj_id` - The object ID (used when value is an Object type)
181    fn automerge_value_to_json(
182        doc: &Automerge,
183        value: &automerge::Value,
184        obj_id: &automerge::ObjId,
185    ) -> Option<Value> {
186        use automerge::ReadDoc;
187
188        match value {
189            automerge::Value::Scalar(scalar) => Self::automerge_scalar_to_json(scalar.as_ref()),
190            automerge::Value::Object(obj_type) => {
191                // Issue #518: Recursively convert nested objects
192                match obj_type {
193                    automerge::ObjType::Map | automerge::ObjType::Table => {
194                        // Convert map to JSON object
195                        let mut map = serde_json::Map::new();
196                        for item in doc.map_range(obj_id, ..) {
197                            let key = item.key.to_string();
198                            if let Ok(Some((nested_value, nested_obj_id))) =
199                                doc.get(obj_id, &item.key as &str)
200                            {
201                                if let Some(json_val) = Self::automerge_value_to_json(
202                                    doc,
203                                    &nested_value,
204                                    &nested_obj_id,
205                                ) {
206                                    map.insert(key, json_val);
207                                }
208                            }
209                        }
210                        Some(Value::Object(map))
211                    }
212                    automerge::ObjType::List => {
213                        // Convert list to JSON array
214                        let length = doc.length(obj_id);
215                        let mut arr = Vec::with_capacity(length);
216                        for idx in 0..length {
217                            if let Ok(Some((nested_value, nested_obj_id))) = doc.get(obj_id, idx) {
218                                if let Some(json_val) = Self::automerge_value_to_json(
219                                    doc,
220                                    &nested_value,
221                                    &nested_obj_id,
222                                ) {
223                                    arr.push(json_val);
224                                }
225                            }
226                        }
227                        Some(Value::Array(arr))
228                    }
229                    automerge::ObjType::Text => {
230                        // Convert text to string
231                        let text = doc.text(obj_id).ok()?;
232                        Some(Value::String(text))
233                    }
234                }
235            }
236        }
237    }
238
239    /// Helper: Convert Automerge scalar value to serde_json::Value
240    ///
241    /// Issue #518: Counter values now properly extract the i64 value.
242    fn automerge_scalar_to_json(scalar: &automerge::ScalarValue) -> Option<Value> {
243        let json_val = match scalar {
244            automerge::ScalarValue::Str(s) => Value::String(s.to_string()),
245            automerge::ScalarValue::Int(i) => Value::Number(serde_json::Number::from(*i)),
246            automerge::ScalarValue::Uint(u) => Value::Number(serde_json::Number::from(*u)),
247            automerge::ScalarValue::F64(f) => serde_json::Number::from_f64(*f)
248                .map(Value::Number)
249                .unwrap_or(Value::Null),
250            automerge::ScalarValue::Boolean(b) => Value::Bool(*b),
251            automerge::ScalarValue::Null => Value::Null,
252            automerge::ScalarValue::Bytes(bytes) => {
253                // Encode bytes as array of numbers
254                let byte_array: Vec<serde_json::Value> = bytes
255                    .iter()
256                    .map(|b| Value::Number(serde_json::Number::from(*b)))
257                    .collect();
258                Value::Array(byte_array)
259            }
260            automerge::ScalarValue::Counter(c) => {
261                // Issue #518: Extract actual counter value using From<&Counter> for i64
262                // The Counter type implements From trait to convert to i64
263                let counter_value: i64 = i64::from(c);
264                Value::Number(serde_json::Number::from(counter_value))
265            }
266            automerge::ScalarValue::Timestamp(ts) => Value::Number(serde_json::Number::from(*ts)),
267            automerge::ScalarValue::Unknown { .. } => Value::Null,
268        };
269        Some(json_val)
270    }
271
272    /// Helper: Apply Document fields to Automerge doc
273    fn apply_document_to_automerge(doc: &mut Automerge, document: &Document) -> Result<()> {
274        doc.transact::<_, _, automerge::AutomergeError>(|tx| {
275            // Create or get root map
276            let root = tx.put_object(automerge::ROOT, "root", automerge::ObjType::Map)?;
277
278            for (key, value) in &document.fields {
279                // Convert serde_json::Value to Automerge scalar
280                Self::put_json_value(tx, &root, key, value)?;
281            }
282
283            Ok(())
284        })
285        .map_err(|e| Error::Internal(format!("Transaction failed: {:?}", e)))?;
286
287        Ok(())
288    }
289
290    /// Helper: Put JSON value into Automerge
291    fn put_json_value(
292        tx: &mut automerge::transaction::Transaction,
293        obj: &automerge::ObjId,
294        key: &str,
295        value: &serde_json::Value,
296    ) -> std::result::Result<(), automerge::AutomergeError> {
297        use serde_json::Value;
298
299        match value {
300            Value::String(s) => {
301                tx.put(obj, key, s.clone())?;
302            }
303            Value::Number(n) => {
304                if let Some(i) = n.as_i64() {
305                    tx.put(obj, key, i)?;
306                } else if let Some(f) = n.as_f64() {
307                    tx.put(obj, key, f)?;
308                }
309            }
310            Value::Bool(b) => {
311                tx.put(obj, key, *b)?;
312            }
313            Value::Null => {
314                // Skip null values
315            }
316            Value::Array(arr) => {
317                // Create list
318                let list = tx.put_object(obj, key, automerge::ObjType::List)?;
319                for (idx, item) in arr.iter().enumerate() {
320                    Self::insert_json_value(tx, &list, idx, item)?;
321                }
322            }
323            Value::Object(map) => {
324                // Create nested map
325                let nested = tx.put_object(obj, key, automerge::ObjType::Map)?;
326                for (k, v) in map {
327                    Self::put_json_value(tx, &nested, k, v)?;
328                }
329            }
330        }
331
332        Ok(())
333    }
334
335    /// Helper: Insert JSON value into Automerge list
336    fn insert_json_value(
337        tx: &mut automerge::transaction::Transaction,
338        list: &automerge::ObjId,
339        index: usize,
340        value: &serde_json::Value,
341    ) -> std::result::Result<(), automerge::AutomergeError> {
342        use serde_json::Value;
343
344        match value {
345            Value::String(s) => {
346                tx.insert(list, index, s.clone())?;
347            }
348            Value::Number(n) => {
349                if let Some(i) = n.as_i64() {
350                    tx.insert(list, index, i)?;
351                } else if let Some(f) = n.as_f64() {
352                    tx.insert(list, index, f)?;
353                }
354            }
355            Value::Bool(b) => {
356                tx.insert(list, index, *b)?;
357            }
358            Value::Null => {
359                // Skip null values
360            }
361            Value::Array(_) | Value::Object(_) => {
362                // For complex nested types, serialize as JSON string
363                let json_str =
364                    serde_json::to_string(value).map_err(|_| automerge::AutomergeError::Fail)?;
365                tx.insert(list, index, json_str)?;
366            }
367        }
368
369        Ok(())
370    }
371
372    /// Helper: Check if document matches query
373    fn matches_query(&self, document: &Document, query: &Query) -> Result<bool> {
374        match query {
375            Query::All => Ok(true),
376
377            Query::Eq { field, value } => {
378                if let Some(doc_value) = document.fields.get(field) {
379                    Ok(doc_value == value)
380                } else {
381                    Ok(false)
382                }
383            }
384
385            Query::Lt { field, value } => {
386                if let Some(doc_value) = document.fields.get(field) {
387                    Ok(self.compare_values(doc_value, value)? < 0)
388                } else {
389                    Ok(false)
390                }
391            }
392
393            Query::Gt { field, value } => {
394                if let Some(doc_value) = document.fields.get(field) {
395                    Ok(self.compare_values(doc_value, value)? > 0)
396                } else {
397                    Ok(false)
398                }
399            }
400
401            Query::And(queries) => {
402                for q in queries {
403                    if !self.matches_query(document, q)? {
404                        return Ok(false);
405                    }
406                }
407                Ok(true)
408            }
409
410            Query::Or(queries) => {
411                for q in queries {
412                    if self.matches_query(document, q)? {
413                        return Ok(true);
414                    }
415                }
416                Ok(false)
417            }
418
419            // === Negation query (Issue #357) ===
420            Query::Not(inner) => Ok(!self.matches_query(document, inner)?),
421
422            // === Custom query support (Issue #517) ===
423            // Evaluate DQL-like custom queries using pattern-based parser
424            Query::Custom(query_str) => Ok(evaluate_custom_query(document, query_str)),
425
426            // === Spatial queries (Issue #356) ===
427            Query::WithinRadius {
428                center,
429                radius_meters,
430                lat_field,
431                lon_field,
432            } => {
433                let lat_key = lat_field.as_deref().unwrap_or("lat");
434                let lon_key = lon_field.as_deref().unwrap_or("lon");
435
436                if let (Some(lat_val), Some(lon_val)) = (
437                    document.fields.get(lat_key).and_then(|v| v.as_f64()),
438                    document.fields.get(lon_key).and_then(|v| v.as_f64()),
439                ) {
440                    let doc_point = GeoPoint::new(lat_val, lon_val);
441                    Ok(doc_point.within_radius(center, *radius_meters))
442                } else {
443                    Ok(false)
444                }
445            }
446
447            Query::WithinBounds {
448                min,
449                max,
450                lat_field,
451                lon_field,
452            } => {
453                let lat_key = lat_field.as_deref().unwrap_or("lat");
454                let lon_key = lon_field.as_deref().unwrap_or("lon");
455
456                if let (Some(lat_val), Some(lon_val)) = (
457                    document.fields.get(lat_key).and_then(|v| v.as_f64()),
458                    document.fields.get(lon_key).and_then(|v| v.as_f64()),
459                ) {
460                    let doc_point = GeoPoint::new(lat_val, lon_val);
461                    Ok(doc_point.within_bounds(min, max))
462                } else {
463                    Ok(false)
464                }
465            }
466
467            // === Deletion-aware queries (ADR-034, Issue #369) ===
468            Query::IncludeDeleted(inner) => {
469                // IncludeDeleted wraps another query - run the inner query
470                // The soft-delete filter bypass is handled at the query() method level
471                self.matches_query(document, inner)
472            }
473
474            Query::DeletedOnly => {
475                // Only match documents with _deleted=true
476                let is_deleted = document
477                    .fields
478                    .get("_deleted")
479                    .and_then(|v| v.as_bool())
480                    .unwrap_or(false);
481                Ok(is_deleted)
482            }
483        }
484    }
485
486    /// Helper: Compare two JSON values
487    fn compare_values(&self, a: &serde_json::Value, b: &serde_json::Value) -> Result<i8> {
488        use serde_json::Value;
489
490        match (a, b) {
491            (Value::Number(a_num), Value::Number(b_num)) => {
492                if let (Some(a_f), Some(b_f)) = (a_num.as_f64(), b_num.as_f64()) {
493                    if a_f < b_f {
494                        Ok(-1)
495                    } else if a_f > b_f {
496                        Ok(1)
497                    } else {
498                        Ok(0)
499                    }
500                } else {
501                    Err(Error::Internal("Number comparison failed".into()))
502                }
503            }
504            (Value::String(a_str), Value::String(b_str)) => {
505                if a_str < b_str {
506                    Ok(-1)
507                } else if a_str > b_str {
508                    Ok(1)
509                } else {
510                    Ok(0)
511                }
512            }
513            _ => Err(Error::Internal("Unsupported value comparison".into())),
514        }
515    }
516
517    /// Generate sync message for a document
518    ///
519    /// This uses Automerge's sync protocol to generate a message containing
520    /// the changes needed to sync with a peer.
521    pub fn generate_sync_message(
522        &self,
523        collection: &str,
524        doc_id: &DocumentId,
525        peer_id: &str,
526    ) -> Result<Vec<u8>> {
527        let key = Self::doc_key(collection, doc_id);
528        let docs = self
529            .documents
530            .lock()
531            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
532
533        let automerge_doc = docs.get(&key).ok_or_else(|| Error::NotFound {
534            resource_type: "Document".to_string(),
535            id: doc_id.clone(),
536        })?;
537
538        // Get or create sync state for this peer
539        let mut sync_states = self
540            .sync_states
541            .lock()
542            .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?;
543        let sync_state = sync_states
544            .entry(format!("{}:{}", peer_id, key))
545            .or_default();
546
547        // Generate sync message
548        let message = automerge_doc.generate_sync_message(sync_state);
549
550        // Encode message (handle Option)
551        match message {
552            Some(msg) => Ok(msg.encode()),
553            None => Ok(Vec::new()), // No changes to sync
554        }
555    }
556
557    /// Receive and apply sync message
558    ///
559    /// This applies changes from a peer's sync message to our local document.
560    pub fn receive_sync_message(
561        &self,
562        collection: &str,
563        doc_id: &DocumentId,
564        peer_id: &str,
565        message: &[u8],
566    ) -> Result<()> {
567        let key = Self::doc_key(collection, doc_id);
568        let mut docs = self
569            .documents
570            .lock()
571            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
572
573        let automerge_doc = docs.get_mut(&key).ok_or_else(|| Error::NotFound {
574            resource_type: "Document".to_string(),
575            id: doc_id.clone(),
576        })?;
577
578        // Decode message
579        let sync_message = sync::Message::decode(message)
580            .map_err(|e| Error::Internal(format!("Message decode failed: {:?}", e)))?;
581
582        // Get sync state
583        let mut sync_states = self
584            .sync_states
585            .lock()
586            .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?;
587        let sync_state = sync_states
588            .entry(format!("{}:{}", peer_id, key))
589            .or_default();
590
591        // Apply sync message
592        automerge_doc
593            .receive_sync_message(sync_state, sync_message)
594            .map_err(|e| Error::Internal(format!("Sync message apply failed: {:?}", e)))?;
595
596        Ok(())
597    }
598}
599
600impl Default for AutomergeBackend {
601    fn default() -> Self {
602        Self::new()
603    }
604}
605
606// ============================================================================
607// DocumentStore Trait Implementation
608// ============================================================================
609
610#[async_trait]
611impl DocumentStore for AutomergeBackend {
612    async fn upsert(&self, collection: &str, mut document: Document) -> anyhow::Result<DocumentId> {
613        // Generate ID if not present
614        let doc_id = document
615            .id
616            .clone()
617            .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
618
619        let key = Self::doc_key(collection, &doc_id);
620        let mut docs = self
621            .documents
622            .lock()
623            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
624
625        if let Some(existing_doc) = docs.get_mut(&key) {
626            // Update existing document
627            Self::apply_document_to_automerge(existing_doc, &document)?;
628        } else {
629            // Create new document
630            let mut automerge_doc = Automerge::new();
631            Self::apply_document_to_automerge(&mut automerge_doc, &document)?;
632            docs.insert(key, automerge_doc);
633        }
634
635        document.id = Some(doc_id.clone());
636
637        // Notify observers
638        drop(docs); // Release lock before notifying
639        let observers = self
640            .observers
641            .lock()
642            .map_err(|_| Error::Internal("observers lock poisoned".into()))?;
643        for observer in observers.iter() {
644            let _ = observer.send(ChangeEvent::Updated {
645                collection: collection.to_string(),
646                document: document.clone(),
647            });
648        }
649        drop(observers);
650
651        Ok(doc_id)
652    }
653
654    async fn query(&self, collection: &str, query: &Query) -> anyhow::Result<Vec<Document>> {
655        let docs = self
656            .documents
657            .lock()
658            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
659        let mut results = Vec::new();
660
661        // Iterate all documents in collection
662        for (key, automerge_doc) in docs.iter() {
663            if !key.starts_with(&format!("{}:", collection)) {
664                continue;
665            }
666
667            // Extract document ID from key
668            let doc_id = key.split(':').nth(1).unwrap_or("").to_string();
669
670            // Convert to our Document type
671            let document = Self::automerge_to_document(automerge_doc, doc_id)?;
672
673            // Apply soft-delete filter (ADR-034, Issue #369)
674            // By default, queries exclude documents with _deleted=true
675            // IncludeDeleted and DeletedOnly queries override this behavior
676            if !query.matches_deletion_state(&document) {
677                continue;
678            }
679
680            // Apply query filter
681            if self.matches_query(&document, query)? {
682                results.push(document);
683            }
684        }
685
686        Ok(results)
687    }
688
689    async fn remove(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<()> {
690        let key = Self::doc_key(collection, doc_id);
691        let mut docs = self
692            .documents
693            .lock()
694            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
695
696        docs.remove(&key).ok_or_else(|| Error::NotFound {
697            resource_type: "Document".to_string(),
698            id: doc_id.clone(),
699        })?;
700
701        // Notify observers
702        drop(docs); // Release lock before notifying
703        let observers = self
704            .observers
705            .lock()
706            .map_err(|_| Error::Internal("observers lock poisoned".into()))?;
707        for observer in observers.iter() {
708            let _ = observer.send(ChangeEvent::Removed {
709                collection: collection.to_string(),
710                doc_id: doc_id.clone(),
711            });
712        }
713        drop(observers);
714
715        Ok(())
716    }
717
718    async fn get(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<Option<Document>> {
719        let key = Self::doc_key(collection, doc_id);
720        let docs = self
721            .documents
722            .lock()
723            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
724
725        if let Some(automerge_doc) = docs.get(&key) {
726            let document = Self::automerge_to_document(automerge_doc, doc_id.clone())?;
727            Ok(Some(document))
728        } else {
729            Ok(None)
730        }
731    }
732
733    async fn count(&self, collection: &str, query: &Query) -> anyhow::Result<usize> {
734        let results = self.query(collection, query).await?;
735        Ok(results.len())
736    }
737
738    fn observe(&self, collection: &str, query: &Query) -> anyhow::Result<ChangeStream> {
739        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
740
741        // Send initial snapshot of matching documents
742        let docs = self
743            .documents
744            .lock()
745            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
746        let mut initial_docs = Vec::new();
747
748        for (key, automerge_doc) in docs.iter() {
749            if !key.starts_with(&format!("{}:", collection)) {
750                continue;
751            }
752
753            let doc_id = key.split(':').nth(1).unwrap_or("").to_string();
754            if let Ok(document) = Self::automerge_to_document(automerge_doc, doc_id) {
755                if self.matches_query(&document, query).unwrap_or(false) {
756                    initial_docs.push(document);
757                }
758            }
759        }
760
761        drop(docs); // Release lock
762
763        // Send initial snapshot
764        let _ = tx.send(ChangeEvent::Initial {
765            documents: initial_docs,
766        });
767
768        // Register this observer for future updates
769        self.observers
770            .lock()
771            .map_err(|_| Error::Internal("observers lock poisoned".into()))?
772            .push(tx.clone());
773
774        Ok(ChangeStream { receiver: rx })
775    }
776
777    // === Deletion methods (ADR-034) ===
778
779    async fn delete(
780        &self,
781        collection: &str,
782        doc_id: &DocumentId,
783        reason: Option<&str>,
784    ) -> anyhow::Result<crate::qos::DeleteResult> {
785        let policy = self.deletion_policy(collection);
786
787        match policy {
788            DeletionPolicy::Immutable => {
789                // Cannot delete immutable documents
790                Ok(crate::qos::DeleteResult::immutable())
791            }
792            DeletionPolicy::ImplicitTTL { .. } => {
793                // Implicit TTL: no-op, documents expire automatically
794                Ok(crate::qos::DeleteResult {
795                    deleted: false,
796                    tombstone_id: None,
797                    expires_at: None,
798                    policy: policy.clone(),
799                })
800            }
801            DeletionPolicy::Tombstone {
802                tombstone_ttl,
803                delete_wins: _,
804            } => {
805                // Create tombstone with real node ID and Lamport (Issue #668)
806                let node_id = self.node_id();
807                let lamport = self.next_lamport();
808                let tombstone = if let Some(reason_str) = reason {
809                    Tombstone::with_reason(
810                        doc_id.clone(),
811                        collection.to_string(),
812                        node_id,
813                        lamport,
814                        reason_str,
815                    )
816                } else {
817                    Tombstone::new(doc_id.clone(), collection.to_string(), node_id, lamport)
818                };
819                let tombstone_id = format!("{}:{}", collection, doc_id);
820
821                // Store tombstone
822                self.tombstones
823                    .lock()
824                    .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
825                    .insert(tombstone_id.clone(), tombstone.clone());
826
827                // Enqueue for propagation to peers (Issue #668)
828                if let Ok(mut pending) = self.pending_tombstones.lock() {
829                    pending.push(TombstoneSyncMessage::from_tombstone(tombstone));
830                }
831
832                // Remove the actual document
833                self.remove(collection, doc_id).await.ok(); // Ignore if not found
834
835                Ok(crate::qos::DeleteResult {
836                    deleted: true,
837                    tombstone_id: Some(tombstone_id),
838                    expires_at: Some(std::time::SystemTime::now() + tombstone_ttl),
839                    policy: policy.clone(),
840                })
841            }
842            DeletionPolicy::SoftDelete {
843                include_deleted_default: _,
844            } => {
845                // Soft delete: mark document with _deleted=true
846                if let Some(mut doc) = self.get(collection, doc_id).await? {
847                    doc.fields.insert("_deleted".to_string(), Value::Bool(true));
848                    doc.fields.insert(
849                        "_deleted_at".to_string(),
850                        Value::String(chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string()),
851                    );
852                    if let Some(reason) = reason {
853                        doc.fields.insert(
854                            "_deleted_reason".to_string(),
855                            Value::String(reason.to_string()),
856                        );
857                    }
858                    self.upsert(collection, doc).await?;
859
860                    Ok(crate::qos::DeleteResult::soft_deleted(policy.clone()))
861                } else {
862                    // Document not found - still report as deleted
863                    Ok(crate::qos::DeleteResult {
864                        deleted: false,
865                        tombstone_id: None,
866                        expires_at: None,
867                        policy: policy.clone(),
868                    })
869                }
870            }
871        }
872    }
873
874    async fn is_deleted(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<bool> {
875        let key = format!("{}:{}", collection, doc_id);
876
877        // Check if there's a tombstone
878        if self
879            .tombstones
880            .lock()
881            .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
882            .contains_key(&key)
883        {
884            return Ok(true);
885        }
886
887        // Check for soft-delete (_deleted field)
888        if let Some(doc) = self.get(collection, doc_id).await? {
889            if let Some(deleted) = doc.fields.get("_deleted") {
890                return Ok(deleted.as_bool().unwrap_or(false));
891            }
892        }
893
894        Ok(false)
895    }
896
897    fn deletion_policy(&self, collection: &str) -> crate::qos::DeletionPolicy {
898        self.deletion_policy_registry.get(collection)
899    }
900
901    async fn get_tombstones(&self, collection: &str) -> anyhow::Result<Vec<crate::qos::Tombstone>> {
902        let tombstones = self
903            .tombstones
904            .lock()
905            .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?;
906        let prefix = format!("{}:", collection);
907
908        Ok(tombstones
909            .iter()
910            .filter(|(key, _)| key.starts_with(&prefix))
911            .map(|(_, tombstone)| tombstone.clone())
912            .collect())
913    }
914
915    async fn apply_tombstone(&self, tombstone: &crate::qos::Tombstone) -> anyhow::Result<()> {
916        let key = format!("{}:{}", tombstone.collection, tombstone.document_id);
917
918        // Store the tombstone
919        self.tombstones
920            .lock()
921            .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
922            .insert(key, tombstone.clone());
923
924        // Remove the document if it exists
925        self.remove(&tombstone.collection, &tombstone.document_id)
926            .await
927            .ok();
928
929        Ok(())
930    }
931}
932
933// ============================================================================
934// GcStore Trait Implementation (ADR-034, Issue #668)
935// ============================================================================
936
937impl crate::qos::GcStore for AutomergeBackend {
938    fn get_all_tombstones(&self) -> anyhow::Result<Vec<Tombstone>> {
939        Ok(self
940            .tombstones
941            .lock()
942            .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
943            .values()
944            .cloned()
945            .collect())
946    }
947
948    fn remove_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
949        let key = format!("{}:{}", collection, document_id);
950        Ok(self
951            .tombstones
952            .lock()
953            .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
954            .remove(&key)
955            .is_some())
956    }
957
958    fn has_tombstone(&self, collection: &str, document_id: &str) -> anyhow::Result<bool> {
959        let key = format!("{}:{}", collection, document_id);
960        Ok(self
961            .tombstones
962            .lock()
963            .map_err(|_| Error::Internal("tombstones lock poisoned".into()))?
964            .contains_key(&key))
965    }
966
967    fn get_expired_documents(
968        &self,
969        _collection: &str,
970        _cutoff: std::time::SystemTime,
971    ) -> anyhow::Result<Vec<String>> {
972        // In-memory backend doesn't track document timestamps for TTL expiry
973        Ok(Vec::new())
974    }
975
976    fn hard_delete(&self, collection: &str, document_id: &str) -> anyhow::Result<()> {
977        let key = format!("{}:{}", collection, document_id);
978        self.documents
979            .lock()
980            .map_err(|_| Error::Internal("documents lock poisoned".into()))?
981            .remove(&key);
982        Ok(())
983    }
984
985    fn list_collections(&self) -> anyhow::Result<Vec<String>> {
986        let docs = self
987            .documents
988            .lock()
989            .map_err(|_| Error::Internal("documents lock poisoned".into()))?;
990        let mut collections: Vec<String> = docs
991            .keys()
992            .filter_map(|k| k.split(':').next().map(|s| s.to_string()))
993            .collect::<std::collections::HashSet<_>>()
994            .into_iter()
995            .collect();
996        collections.sort();
997        Ok(collections)
998    }
999}
1000
1001// ============================================================================
1002// PeerDiscovery Trait Implementation
1003// ============================================================================
1004
1005#[async_trait]
1006impl PeerDiscovery for AutomergeBackend {
1007    async fn start(&self) -> anyhow::Result<()> {
1008        // Manual peer discovery only for initial implementation
1009        // Full implementation would support mDNS, etc.
1010        Ok(())
1011    }
1012
1013    async fn stop(&self) -> anyhow::Result<()> {
1014        Ok(())
1015    }
1016
1017    async fn discovered_peers(&self) -> anyhow::Result<Vec<PeerInfo>> {
1018        // Return empty - manual configuration required
1019        Ok(Vec::new())
1020    }
1021
1022    async fn add_peer(&self, _address: &str, _transport: TransportType) -> anyhow::Result<()> {
1023        // Manual peer addition not implemented in initial version
1024        Ok(())
1025    }
1026
1027    async fn wait_for_peer(&self, _peer_id: &PeerId, _timeout: Duration) -> anyhow::Result<()> {
1028        // Peer waiting not implemented in initial version
1029        Err(Error::Internal("wait_for_peer not implemented".into()).into())
1030    }
1031
1032    fn on_peer_event(&self, _callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
1033        // Callback registration not implemented in initial version
1034        // Would store in a Vec for future notifications
1035    }
1036
1037    async fn get_peer_info(&self, _peer_id: &PeerId) -> anyhow::Result<Option<PeerInfo>> {
1038        // Peer info lookup not implemented in initial version
1039        Ok(None)
1040    }
1041}
1042
1043// ============================================================================
1044// SyncEngine Trait Implementation
1045// ============================================================================
1046
1047#[async_trait]
1048impl SyncEngine for AutomergeBackend {
1049    async fn start_sync(&self) -> anyhow::Result<()> {
1050        // For Automerge, sync is pull-based via generate/receive_sync_message
1051        // This method indicates we're ready to sync
1052        Ok(())
1053    }
1054
1055    async fn stop_sync(&self) -> anyhow::Result<()> {
1056        // Clean up sync states
1057        self.sync_states
1058            .lock()
1059            .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?
1060            .clear();
1061        Ok(())
1062    }
1063
1064    async fn subscribe(
1065        &self,
1066        collection: &str,
1067        _query: &Query,
1068    ) -> anyhow::Result<SyncSubscription> {
1069        // Create subscription handle
1070        // For Automerge, subscriptions are logical - we track interest
1071        Ok(SyncSubscription::new(
1072            collection.to_string(),
1073            Box::new(AutomergeSubscriptionHandle {
1074                collection: collection.to_string(),
1075            }),
1076        ))
1077    }
1078
1079    async fn is_syncing(&self) -> anyhow::Result<bool> {
1080        // Always ready to sync with Automerge
1081        Ok(self.is_ready().await)
1082    }
1083}
1084
1085/// Subscription handle for Automerge
1086struct AutomergeSubscriptionHandle {
1087    #[allow(dead_code)]
1088    collection: String,
1089}
1090
1091// ============================================================================
1092// DataSyncBackend Trait Implementation
1093// ============================================================================
1094
1095#[async_trait]
1096impl DataSyncBackend for AutomergeBackend {
1097    async fn initialize(&self, config: BackendConfig) -> anyhow::Result<()> {
1098        let mut initialized = self
1099            .initialized
1100            .lock()
1101            .map_err(|_| Error::Internal("initialized lock poisoned".into()))?;
1102        if *initialized {
1103            return Err(Error::Internal("Already initialized".into()).into());
1104        }
1105
1106        *self
1107            .config
1108            .lock()
1109            .map_err(|_| Error::Internal("config lock poisoned".into()))? = Some(config);
1110        *initialized = true;
1111
1112        // Start periodic garbage collection for tombstones and TTL (Issue #668)
1113        let gc = Arc::new(crate::qos::GarbageCollector::with_policy_registry(
1114            Arc::new(self.clone()),
1115            Arc::clone(&self.deletion_policy_registry),
1116            crate::qos::GcConfig::default(),
1117        ));
1118        let handle = crate::qos::start_periodic_gc(gc);
1119        *self
1120            .gc_handle
1121            .lock()
1122            .map_err(|_| Error::Internal("gc_handle lock poisoned".into()))? = Some(handle);
1123
1124        Ok(())
1125    }
1126
1127    async fn shutdown(&self) -> anyhow::Result<()> {
1128        // Stop GC task (Issue #668)
1129        if let Ok(mut handle) = self.gc_handle.lock() {
1130            if let Some(h) = handle.take() {
1131                h.abort();
1132            }
1133        }
1134
1135        self.stop_sync().await?;
1136        self.documents
1137            .lock()
1138            .map_err(|_| Error::Internal("documents lock poisoned".into()))?
1139            .clear();
1140        self.sync_states
1141            .lock()
1142            .map_err(|_| Error::Internal("sync_states lock poisoned".into()))?
1143            .clear();
1144        *self
1145            .initialized
1146            .lock()
1147            .map_err(|_| Error::Internal("initialized lock poisoned".into()))? = false;
1148
1149        Ok(())
1150    }
1151
1152    fn document_store(&self) -> Arc<dyn DocumentStore> {
1153        Arc::new(self.clone())
1154    }
1155
1156    fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
1157        Arc::new(self.clone())
1158    }
1159
1160    fn sync_engine(&self) -> Arc<dyn SyncEngine> {
1161        Arc::new(self.clone())
1162    }
1163
1164    fn as_any(&self) -> &dyn std::any::Any {
1165        self
1166    }
1167
1168    async fn is_ready(&self) -> bool {
1169        self.initialized.lock().map(|guard| *guard).unwrap_or(false)
1170    }
1171
1172    fn backend_info(&self) -> BackendInfo {
1173        BackendInfo {
1174            name: "Automerge".to_string(),
1175            version: "0.7.1".to_string(),
1176        }
1177    }
1178}
1179
1180// ============================================================================
1181// AutomergeIroh Backend Adapter (Phase 7: Lab Integration)
1182// ============================================================================
1183
1184/// Type alias for peer event callback list
1185type PeerCallbacks = Arc<Mutex<Vec<Box<dyn Fn(PeerEvent) + Send + Sync>>>>;
1186
1187/// Topology-driven connection events
1188///
1189/// These events allow external topology managers (e.g., peat-mesh TopologyManager)
1190/// to control which peers the backend connects to, avoiding N² mesh formation.
1191/// When a topology event receiver is configured, the backend delegates connection
1192/// decisions to the topology manager instead of connecting to all discovered peers.
1193#[derive(Debug, Clone)]
1194pub enum TopologyConnectionEvent {
1195    /// Connect to a peer selected by topology manager
1196    ConnectPeer {
1197        /// Peer identifier (node_id)
1198        peer_id: String,
1199        /// Network addresses for the peer
1200        addresses: Vec<String>,
1201        /// Optional relay URL
1202        relay_url: Option<String>,
1203    },
1204    /// Disconnect from a peer (topology decision)
1205    DisconnectPeer {
1206        /// Peer identifier to disconnect
1207        peer_id: String,
1208    },
1209}
1210
1211/// Default maximum connections when topology manager is not configured
1212pub const DEFAULT_MAX_CONNECTIONS: usize = 7;
1213
1214/// DataSyncBackend adapter for storage::AutomergeBackend
1215///
1216/// This adapter wraps the storage::AutomergeBackend (RocksDB + Iroh + Automerge)
1217/// to provide DataSyncBackend trait compatibility for cap_sim_node.rs
1218#[derive(Clone)]
1219pub struct AutomergeIrohBackend {
1220    /// The underlying Automerge+Iroh backend
1221    backend: Arc<crate::storage::AutomergeBackend>,
1222
1223    /// Reference to the transport for peer discovery
1224    transport: Arc<crate::network::IrohTransport>,
1225
1226    /// Peer event callbacks
1227    peer_callbacks: PeerCallbacks,
1228
1229    /// Initialization state
1230    initialized: Arc<Mutex<bool>>,
1231
1232    /// Formation key for peer authentication (ADR-030)
1233    /// Peers must share the same app_id and secret_key to connect
1234    formation_key: Arc<std::sync::RwLock<Option<crate::security::FormationKey>>>,
1235
1236    /// Peer discovery manager (ADR-011 Phase 3)
1237    #[cfg(feature = "automerge-backend")]
1238    discovery_manager: Arc<tokio::sync::RwLock<crate::discovery::peer::DiscoveryManager>>,
1239
1240    /// Optional blob store for file/model transfer (Issue #379, ADR-025)
1241    ///
1242    /// When enabled, provides content-addressed blob storage with P2P transfer
1243    /// capability via iroh-blobs. Peers connected for document sync are automatically
1244    /// registered for blob transfer as well.
1245    #[cfg(feature = "automerge-backend")]
1246    blob_store: Option<Arc<crate::storage::NetworkedIrohBlobStore>>,
1247
1248    /// Optional topology event receiver for topology-driven connections
1249    ///
1250    /// When provided, the backend delegates connection decisions to the topology
1251    /// manager instead of connecting to all discovered peers. This prevents N²
1252    /// mesh formation and enables multi-hop routing.
1253    #[cfg(feature = "automerge-backend")]
1254    topology_event_rx:
1255        Arc<tokio::sync::Mutex<Option<mpsc::UnboundedReceiver<TopologyConnectionEvent>>>>,
1256
1257    /// Maximum peer connections when topology manager is not configured
1258    ///
1259    /// Defaults to DEFAULT_MAX_CONNECTIONS (7). When topology events are
1260    /// provided, this limit is ignored (topology manager controls connections).
1261    #[cfg(feature = "automerge-backend")]
1262    max_connections: usize,
1263
1264    /// Deletion policy registry for tombstone/soft-delete dispatch (Issue #668)
1265    deletion_policy_registry: Arc<DeletionPolicyRegistry>,
1266
1267    /// Monotonic Lamport counter for tombstone ordering (Issue #668)
1268    lamport_counter: Arc<AtomicU64>,
1269}
1270
1271impl AutomergeIrohBackend {
1272    /// Create a new adapter
1273    pub fn new(
1274        backend: Arc<crate::storage::AutomergeBackend>,
1275        transport: Arc<crate::network::IrohTransport>,
1276    ) -> Self {
1277        Self {
1278            backend,
1279            transport,
1280            peer_callbacks: Arc::new(Mutex::new(Vec::new())),
1281            initialized: Arc::new(Mutex::new(false)),
1282            formation_key: Arc::new(std::sync::RwLock::new(None)),
1283            #[cfg(feature = "automerge-backend")]
1284            discovery_manager: Arc::new(tokio::sync::RwLock::new(
1285                crate::discovery::peer::DiscoveryManager::default(),
1286            )),
1287            #[cfg(feature = "automerge-backend")]
1288            blob_store: None,
1289            #[cfg(feature = "automerge-backend")]
1290            topology_event_rx: Arc::new(tokio::sync::Mutex::new(None)),
1291            #[cfg(feature = "automerge-backend")]
1292            max_connections: DEFAULT_MAX_CONNECTIONS,
1293            deletion_policy_registry: Arc::new(DeletionPolicyRegistry::new()),
1294            lamport_counter: Arc::new(AtomicU64::new(0)),
1295        }
1296    }
1297
1298    /// Configure topology-driven connection management
1299    ///
1300    /// When topology events are provided, the backend delegates all connection
1301    /// decisions to the external topology manager (e.g., peat-mesh TopologyManager).
1302    /// This prevents N² mesh formation and enables multi-hop routing.
1303    ///
1304    /// # Example
1305    ///
1306    /// ```ignore
1307    /// let (tx, rx) = mpsc::unbounded_channel();
1308    /// let backend = AutomergeIrohBackend::new(storage, transport)
1309    ///     .with_topology_events(rx);
1310    ///
1311    /// // TopologyManager sends events via tx
1312    /// tx.send(TopologyConnectionEvent::ConnectPeer { ... });
1313    /// ```
1314    #[cfg(feature = "automerge-backend")]
1315    pub fn with_topology_events(
1316        mut self,
1317        rx: mpsc::UnboundedReceiver<TopologyConnectionEvent>,
1318    ) -> Self {
1319        self.topology_event_rx = Arc::new(tokio::sync::Mutex::new(Some(rx)));
1320        self
1321    }
1322
1323    /// Set maximum peer connections for fallback mode
1324    ///
1325    /// When topology events are not configured, the backend limits connections
1326    /// to this many peers discovered via mDNS/static config. Defaults to 7.
1327    #[cfg(feature = "automerge-backend")]
1328    pub fn with_max_connections(mut self, max: usize) -> Self {
1329        self.max_connections = max;
1330        self
1331    }
1332
1333    /// Check if topology-driven connection management is enabled
1334    #[cfg(feature = "automerge-backend")]
1335    pub fn has_topology_events(&self) -> bool {
1336        // Check if the receiver exists (non-blocking)
1337        self.topology_event_rx
1338            .try_lock()
1339            .is_ok_and(|guard| guard.is_some())
1340    }
1341
1342    /// Get the formation key (if initialized with credentials)
1343    pub fn formation_key(&self) -> Option<crate::security::FormationKey> {
1344        self.formation_key
1345            .read()
1346            .ok()
1347            .and_then(|guard| guard.clone())
1348    }
1349
1350    /// Get the formation ID (app_id used as formation identifier)
1351    pub fn formation_id(&self) -> Option<String> {
1352        self.formation_key
1353            .read()
1354            .ok()
1355            .and_then(|guard| guard.as_ref().map(|k| k.formation_id().to_string()))
1356    }
1357
1358    /// Create from store and transport (convenience method)
1359    pub fn from_parts(
1360        store: Arc<crate::storage::AutomergeStore>,
1361        transport: Arc<crate::network::IrohTransport>,
1362    ) -> Self {
1363        let backend = Arc::new(crate::storage::AutomergeBackend::with_transport(
1364            store,
1365            Arc::clone(&transport),
1366        ));
1367        Self::new(backend, transport)
1368    }
1369
1370    /// Get the transport (for testing/advanced usage)
1371    pub fn transport(&self) -> Arc<crate::network::IrohTransport> {
1372        Arc::clone(&self.transport)
1373    }
1374
1375    /// Get the storage backend (Issue #378: shared with sync coordinator)
1376    ///
1377    /// Returns the underlying `AutomergeBackend` used by this sync backend.
1378    /// This ensures callers use the same backend instance that the sync
1379    /// coordinator uses, preventing state from being split across instances.
1380    pub fn storage_backend(&self) -> Arc<crate::storage::AutomergeBackend> {
1381        Arc::clone(&self.backend)
1382    }
1383
1384    /// Get the transport Arc pointer address (for debugging Issue #271)
1385    ///
1386    /// This returns the raw pointer address of the transport Arc, which can be used
1387    /// to verify that cloned backends share the same transport instance.
1388    /// If two backends show different addresses, they have different transports.
1389    pub fn transport_arc_ptr(&self) -> *const crate::network::IrohTransport {
1390        Arc::as_ptr(&self.transport)
1391    }
1392
1393    /// Debug method to verify transport sharing (Issue #271)
1394    ///
1395    /// Logs the transport Arc pointer address. Call this on original and cloned
1396    /// backends to verify they share the same transport instance.
1397    pub fn debug_log_transport_ptr(&self, context: &str) {
1398        tracing::debug!(
1399            transport_ptr = ?Arc::as_ptr(&self.transport),
1400            endpoint_id = %self.transport.endpoint_id(),
1401            peer_count = self.transport.peer_count(),
1402            context = context,
1403            "AutomergeIrohBackend transport instance"
1404        );
1405    }
1406
1407    /// Get this node's endpoint ID
1408    pub fn endpoint_id(&self) -> iroh::EndpointId {
1409        self.transport.endpoint_id()
1410    }
1411
1412    // =========================================================================
1413    // Blob Store Methods (Issue #379, ADR-025)
1414    // =========================================================================
1415
1416    /// Enable blob storage with P2P transfer capability
1417    ///
1418    /// Creates a `NetworkedIrohBlobStore` for content-addressed file transfer.
1419    /// The blob store uses a separate iroh endpoint for the iroh-blobs protocol,
1420    /// but peers are automatically synchronized when document sync connections
1421    /// are established.
1422    ///
1423    /// # Arguments
1424    ///
1425    /// * `blob_dir` - Directory for blob storage and metadata sidecars
1426    ///
1427    /// # Example
1428    ///
1429    /// ```ignore
1430    /// use peat_protocol::sync::automerge::AutomergeIrohBackend;
1431    /// use std::path::PathBuf;
1432    ///
1433    /// let backend = AutomergeIrohBackend::from_parts(store, transport);
1434    /// backend.enable_blob_store(PathBuf::from("/tmp/peat-blobs")).await?;
1435    ///
1436    /// // Now you can use the blob store
1437    /// let blob_store = backend.blob_store().unwrap();
1438    /// let token = blob_store.create_blob_from_bytes(data, metadata).await?;
1439    /// ```
1440    #[cfg(feature = "automerge-backend")]
1441    pub async fn enable_blob_store(
1442        &mut self,
1443        blob_dir: std::path::PathBuf,
1444    ) -> std::result::Result<(), anyhow::Error> {
1445        use crate::storage::NetworkedIrohBlobStore;
1446
1447        let blob_store = NetworkedIrohBlobStore::new(blob_dir).await?;
1448
1449        // Register currently connected peers with the blob store
1450        let connected_peers = self.transport.connected_peers();
1451        for peer_id in connected_peers {
1452            blob_store.add_peer(peer_id).await;
1453        }
1454
1455        self.blob_store = Some(blob_store);
1456
1457        tracing::info!(
1458            endpoint_id = %self.transport.endpoint_id(),
1459            "Blob store enabled for AutomergeIrohBackend"
1460        );
1461
1462        Ok(())
1463    }
1464
1465    /// Get reference to the blob store (if enabled)
1466    ///
1467    /// Returns `None` if `enable_blob_store()` has not been called.
1468    #[cfg(feature = "automerge-backend")]
1469    pub fn blob_store(&self) -> Option<Arc<crate::storage::NetworkedIrohBlobStore>> {
1470        self.blob_store.clone()
1471    }
1472
1473    /// Check if blob storage is enabled
1474    #[cfg(feature = "automerge-backend")]
1475    pub fn has_blob_store(&self) -> bool {
1476        self.blob_store.is_some()
1477    }
1478
1479    /// Register a peer with the blob store for file transfer
1480    ///
1481    /// This is called automatically when document sync connections are established,
1482    /// but can also be called manually if needed.
1483    #[cfg(feature = "automerge-backend")]
1484    pub async fn register_blob_peer(&self, peer_id: iroh::EndpointId) {
1485        if let Some(ref blob_store) = self.blob_store {
1486            blob_store.add_peer(peer_id).await;
1487            tracing::debug!(
1488                peer_id = %peer_id.fmt_short(),
1489                "Registered peer for blob transfer"
1490            );
1491        }
1492    }
1493
1494    /// Unregister a peer from the blob store
1495    #[cfg(feature = "automerge-backend")]
1496    pub async fn unregister_blob_peer(&self, peer_id: &iroh::EndpointId) {
1497        if let Some(ref blob_store) = self.blob_store {
1498            blob_store.remove_peer(peer_id).await;
1499            tracing::debug!(
1500                peer_id = %peer_id.fmt_short(),
1501                "Unregistered peer from blob transfer"
1502            );
1503        }
1504    }
1505
1506    /// Start automatic peer synchronization for blob transfers
1507    ///
1508    /// Spawns a background task that listens to transport peer events and
1509    /// automatically registers/unregisters peers with the blob store when
1510    /// document sync connections are established or closed.
1511    ///
1512    /// This should be called after `enable_blob_store()` and before starting
1513    /// peer connections.
1514    ///
1515    /// # Example
1516    ///
1517    /// ```ignore
1518    /// backend.enable_blob_store(blob_dir).await?;
1519    /// backend.start_blob_peer_sync();
1520    /// backend.initialize(config).await?; // Now peer connections auto-register
1521    /// ```
1522    #[cfg(feature = "automerge-backend")]
1523    pub fn start_blob_peer_sync(&self) {
1524        use crate::network::iroh_transport::TransportPeerEvent;
1525
1526        let blob_store = match &self.blob_store {
1527            Some(store) => Arc::clone(store),
1528            None => {
1529                tracing::warn!("start_blob_peer_sync called but blob store not enabled");
1530                return;
1531            }
1532        };
1533
1534        let mut events = self.transport.subscribe_peer_events();
1535
1536        tokio::spawn(async move {
1537            tracing::debug!("Blob peer sync task started");
1538
1539            while let Some(event) = events.recv().await {
1540                match event {
1541                    TransportPeerEvent::Connected { endpoint_id, .. } => {
1542                        blob_store.add_peer(endpoint_id).await;
1543                        tracing::debug!(
1544                            peer_id = %endpoint_id.fmt_short(),
1545                            "Auto-registered peer for blob transfer on connect"
1546                        );
1547                    }
1548                    TransportPeerEvent::Disconnected { endpoint_id, .. } => {
1549                        blob_store.remove_peer(&endpoint_id).await;
1550                        tracing::debug!(
1551                            peer_id = %endpoint_id.fmt_short(),
1552                            "Auto-unregistered peer from blob transfer on disconnect"
1553                        );
1554                    }
1555                }
1556            }
1557
1558            tracing::debug!("Blob peer sync task stopped");
1559        });
1560    }
1561
1562    /// Manually trigger sync for a specific document with all connected peers
1563    ///
1564    /// This is useful for testing or for explicit sync triggering when the
1565    /// automatic sync triggered by upsert may have been blocked by cooldown.
1566    ///
1567    /// # Arguments
1568    ///
1569    /// * `doc_key` - The full document key (e.g., "beacons:edge-sensor-001")
1570    pub async fn sync_document(&self, doc_key: &str) -> Result<()> {
1571        self.backend
1572            .sync_document(doc_key)
1573            .await
1574            .map_err(|e| Error::Network {
1575                message: format!("Failed to sync document {}: {}", doc_key, e),
1576                peer_id: None,
1577                source: None,
1578            })
1579    }
1580
1581    /// Add a discovery strategy to the peer discovery manager
1582    ///
1583    /// This allows configuring static peers, mDNS discovery, etc.
1584    #[cfg(feature = "automerge-backend")]
1585    pub async fn add_discovery_strategy(
1586        &self,
1587        strategy: Box<dyn crate::discovery::peer::DiscoveryStrategy>,
1588    ) -> Result<()> {
1589        let mut manager = self.discovery_manager.write().await;
1590        manager.add_strategy(strategy);
1591        Ok(())
1592    }
1593
1594    /// Immediately connect to all discovered peers
1595    ///
1596    /// This bypasses the background connection task's periodic interval, allowing
1597    /// tests to establish connections without waiting 1-7 seconds for the next cycle.
1598    ///
1599    /// Returns the number of new connections established.
1600    ///
1601    /// # Example
1602    ///
1603    /// ```ignore
1604    /// // Add discovery strategy with peer info
1605    /// backend_a.add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_b]))).await?;
1606    /// backend_b.add_discovery_strategy(Box::new(StaticDiscovery::from_peers(vec![peer_a]))).await?;
1607    ///
1608    /// // Connect immediately instead of waiting for background task
1609    /// backend_a.connect_to_discovered_peers_now().await?;
1610    /// backend_b.connect_to_discovered_peers_now().await?;
1611    /// ```
1612    #[cfg(feature = "automerge-backend")]
1613    pub async fn connect_to_discovered_peers_now(&self) -> Result<usize> {
1614        use crate::network::formation_handshake::perform_initiator_handshake;
1615        use crate::network::PeerInfo as NetworkPeerInfo;
1616
1617        let formation_key = self
1618            .formation_key
1619            .read()
1620            .map_err(|_| Error::Internal("formation_key lock poisoned".into()))?
1621            .clone()
1622            .ok_or_else(|| Error::config_error("Backend not initialized", None))?;
1623
1624        // Get discovered peers
1625        let manager = self.discovery_manager.read().await;
1626        let discovered_peers = manager.get_peers().await;
1627        drop(manager);
1628
1629        let mut new_connections = 0;
1630
1631        for peer in discovered_peers {
1632            let network_peer_info = NetworkPeerInfo {
1633                name: peer.name.clone(),
1634                node_id: peer.node_id.clone(),
1635                addresses: peer.addresses.clone(),
1636                relay_url: peer.relay_url.clone(),
1637            };
1638
1639            if let Ok(endpoint_id) = peer.endpoint_id() {
1640                match self.transport.connect_peer(&network_peer_info).await {
1641                    Ok(Some(conn)) => {
1642                        // Issue #346: Give the accept loop a moment to process any
1643                        // incoming connection from this peer. In symmetric discovery
1644                        // (both peers have each other in config), both will connect
1645                        // simultaneously and the accept loop needs time to process
1646                        // the incoming connection and do conflict resolution.
1647                        tokio::task::yield_now().await;
1648                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1649
1650                        // Check if connection was closed by conflict resolution
1651                        if conn.close_reason().is_some() {
1652                            tracing::debug!(
1653                                "Immediate connect: peer {} superseded by accept path",
1654                                peer.name
1655                            );
1656                            continue;
1657                        }
1658
1659                        // New connection - perform formation handshake
1660                        match perform_initiator_handshake(&conn, &formation_key).await {
1661                            Ok(()) => {
1662                                tracing::debug!(
1663                                    "Immediate connect: authenticated with peer {}",
1664                                    peer.name
1665                                );
1666                                // Issue #346: Emit Connected AFTER successful handshake
1667                                self.transport.emit_peer_connected(endpoint_id);
1668                                new_connections += 1;
1669                            }
1670                            Err(e) => {
1671                                tracing::warn!(
1672                                    "Immediate connect: peer {} failed auth: {}",
1673                                    peer.name,
1674                                    e
1675                                );
1676                                conn.close(1u32.into(), b"authentication failed");
1677                                // Issue #346: Don't call disconnect() here - the connection
1678                                // in the map might be a different one after conflict resolution.
1679                                // conn.close() is sufficient; close monitor handles cleanup.
1680                            }
1681                        }
1682                    }
1683                    Ok(None) => {
1684                        // Accept path is handling connection - no action needed
1685                        tracing::debug!(
1686                            "Immediate connect: peer {} handled by accept path",
1687                            peer.name
1688                        );
1689                    }
1690                    Err(e) => {
1691                        tracing::debug!(
1692                            "Immediate connect: failed to connect to {}: {}",
1693                            peer.name,
1694                            e
1695                        );
1696                    }
1697                }
1698            }
1699        }
1700
1701        Ok(new_connections)
1702    }
1703
1704    /// Get access to the peer discovery information
1705    ///
1706    /// Returns a handle for querying discovered peers.
1707    #[cfg(feature = "automerge-backend")]
1708    pub fn get_peer_discovery(&self) -> PeerDiscoveryHandle {
1709        PeerDiscoveryHandle {
1710            manager: Arc::clone(&self.discovery_manager),
1711        }
1712    }
1713}
1714
1715/// Handle for accessing peer discovery information
1716#[cfg(feature = "automerge-backend")]
1717pub struct PeerDiscoveryHandle {
1718    manager: Arc<tokio::sync::RwLock<crate::discovery::peer::DiscoveryManager>>,
1719}
1720
1721#[cfg(feature = "automerge-backend")]
1722impl PeerDiscoveryHandle {
1723    /// Get all discovered peers
1724    ///
1725    /// Queries all discovery strategies and returns their currently cached peers.
1726    /// Strategies update their caches asynchronously, so this is a fast read operation.
1727    pub async fn discovered_peers(&self) -> Result<Vec<crate::discovery::peer::PeerInfo>> {
1728        let manager = self.manager.read().await;
1729        manager
1730            .discovered_peers()
1731            .await
1732            .map_err(|e| Error::Discovery {
1733                message: e.to_string(),
1734                source: None,
1735            })
1736    }
1737
1738    /// Get the number of discovered peers
1739    ///
1740    /// Queries all discovery strategies and counts their currently cached peers.
1741    pub async fn peer_count(&self) -> usize {
1742        let manager = self.manager.read().await;
1743        manager.peer_count().await
1744    }
1745}
1746
1747// DocumentStore implementation for AutomergeIrohBackend
1748struct IrohDocumentStore {
1749    backend: Arc<crate::storage::AutomergeBackend>,
1750    /// Deletion policy registry for tombstone/soft-delete dispatch (Issue #668)
1751    deletion_policy_registry: Arc<DeletionPolicyRegistry>,
1752    /// Monotonic Lamport counter for tombstone ordering (Issue #668)
1753    lamport_counter: Arc<AtomicU64>,
1754    /// Node ID for tombstone attribution (Issue #668)
1755    node_id: String,
1756}
1757
1758#[async_trait]
1759impl DocumentStore for IrohDocumentStore {
1760    async fn upsert(&self, collection: &str, document: Document) -> anyhow::Result<DocumentId> {
1761        use crate::storage::traits::StorageBackend;
1762
1763        // Generate ID if not provided
1764        let doc_id = document.id.clone().unwrap_or_else(|| {
1765            use std::time::SystemTime;
1766            let timestamp = SystemTime::now()
1767                .duration_since(SystemTime::UNIX_EPOCH)
1768                .expect("system clock before UNIX epoch")
1769                .as_nanos();
1770            format!("doc-{}", timestamp)
1771        });
1772
1773        // Serialize document to JSON bytes
1774        let json_bytes = serde_json::to_vec(&document)?;
1775
1776        // Get collection and upsert
1777        let coll = self.backend.collection(collection);
1778        coll.upsert(&doc_id, json_bytes)
1779            .map_err(|e| Error::Storage {
1780                message: e.to_string(),
1781                operation: Some("upsert".to_string()),
1782                key: Some(doc_id.clone()),
1783                source: None,
1784            })?;
1785
1786        // Trigger sync to push the document to connected peers
1787        // The doc_key format is "collection:doc_id"
1788        let doc_key = format!("{}:{}", collection, doc_id);
1789        match self.backend.sync_document(&doc_key).await {
1790            Ok(()) => {
1791                tracing::debug!("Sync triggered for document {} after upsert", doc_key);
1792            }
1793            Err(e) => {
1794                // Log but don't fail - sync is best-effort
1795                tracing::debug!("Failed to sync document {} after upsert: {}", doc_key, e);
1796            }
1797        }
1798
1799        Ok(doc_id)
1800    }
1801
1802    async fn query(&self, collection: &str, query: &Query) -> anyhow::Result<Vec<Document>> {
1803        use crate::storage::traits::StorageBackend;
1804
1805        let coll = self.backend.collection(collection);
1806        let all_items = coll.scan().map_err(|e| Error::Storage {
1807            message: e.to_string(),
1808            operation: Some("scan".to_string()),
1809            key: None,
1810            source: None,
1811        })?;
1812
1813        // Deserialize and filter
1814        let mut results = Vec::new();
1815        for (doc_id, bytes) in all_items {
1816            if let Ok(mut doc) = serde_json::from_slice::<Document>(&bytes) {
1817                // Set the ID from the key if not already set
1818                if doc.id.is_none() {
1819                    doc.id = Some(doc_id);
1820                }
1821
1822                // Apply soft-delete filter (ADR-034, Issue #369)
1823                // By default, queries exclude documents with _deleted=true
1824                // IncludeDeleted and DeletedOnly queries override this behavior
1825                if !query.matches_deletion_state(&doc) {
1826                    continue;
1827                }
1828
1829                if matches_query(&doc, query) {
1830                    results.push(doc);
1831                }
1832            }
1833        }
1834
1835        Ok(results)
1836    }
1837
1838    async fn remove(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<()> {
1839        use crate::storage::traits::StorageBackend;
1840
1841        let coll = self.backend.collection(collection);
1842        coll.delete(doc_id).map_err(|e| Error::Storage {
1843            message: e.to_string(),
1844            operation: Some("delete".to_string()),
1845            key: Some(doc_id.clone()),
1846            source: None,
1847        })?;
1848        Ok(())
1849    }
1850
1851    async fn delete(
1852        &self,
1853        collection: &str,
1854        doc_id: &DocumentId,
1855        reason: Option<&str>,
1856    ) -> anyhow::Result<crate::qos::DeleteResult> {
1857        let policy = self.deletion_policy(collection);
1858        let store = self.backend.automerge_store();
1859
1860        match policy {
1861            DeletionPolicy::Immutable => Ok(crate::qos::DeleteResult::immutable()),
1862            DeletionPolicy::ImplicitTTL { .. } => Ok(crate::qos::DeleteResult {
1863                deleted: false,
1864                tombstone_id: None,
1865                expires_at: None,
1866                policy: policy.clone(),
1867            }),
1868            DeletionPolicy::Tombstone {
1869                tombstone_ttl,
1870                delete_wins: _,
1871            } => {
1872                let lamport = self.lamport_counter.fetch_add(1, Ordering::SeqCst);
1873                let tombstone = if let Some(reason_str) = reason {
1874                    Tombstone::with_reason(
1875                        doc_id.clone(),
1876                        collection.to_string(),
1877                        self.node_id.clone(),
1878                        lamport,
1879                        reason_str,
1880                    )
1881                } else {
1882                    Tombstone::new(
1883                        doc_id.clone(),
1884                        collection.to_string(),
1885                        self.node_id.clone(),
1886                        lamport,
1887                    )
1888                };
1889                let tombstone_id = format!("{}:{}", collection, doc_id);
1890
1891                // Store tombstone in AutomergeStore (persisted to RocksDB)
1892                store
1893                    .put_tombstone(&tombstone)
1894                    .map_err(|e| Error::Storage {
1895                        message: e.to_string(),
1896                        operation: Some("put_tombstone".to_string()),
1897                        key: Some(tombstone_id.clone()),
1898                        source: None,
1899                    })?;
1900
1901                // Remove the document
1902                let doc_key = format!("{}:{}", collection, doc_id);
1903                store.delete(&doc_key).ok();
1904
1905                // Propagate tombstone to all connected peers (Issue #668)
1906                // The tombstone is already stored in AutomergeStore, so
1907                // sync_tombstones_with_peer will include it in the batch.
1908                if let Some(coordinator) = self.backend.sync_coordinator() {
1909                    let coordinator = coordinator.clone();
1910                    let backend = Arc::clone(&self.backend);
1911                    tokio::spawn(async move {
1912                        // Get connected peers via the transport
1913                        if let Some(transport) = backend.iroh_transport() {
1914                            for peer_id in transport.connected_peers() {
1915                                if let Err(e) = coordinator.sync_tombstones_with_peer(peer_id).await
1916                                {
1917                                    tracing::debug!(
1918                                        "Tombstone propagation to peer {:?} failed: {}",
1919                                        peer_id,
1920                                        e
1921                                    );
1922                                }
1923                            }
1924                        }
1925                    });
1926                }
1927
1928                Ok(crate::qos::DeleteResult {
1929                    deleted: true,
1930                    tombstone_id: Some(tombstone_id),
1931                    expires_at: Some(std::time::SystemTime::now() + tombstone_ttl),
1932                    policy: policy.clone(),
1933                })
1934            }
1935            DeletionPolicy::SoftDelete {
1936                include_deleted_default: _,
1937            } => {
1938                // Soft delete: get doc, mark _deleted=true, upsert back
1939                let doc_key = format!("{}:{}", collection, doc_id);
1940                if let Some(automerge_doc) = store.get(&doc_key).map_err(|e| Error::Storage {
1941                    message: e.to_string(),
1942                    operation: Some("get".to_string()),
1943                    key: Some(doc_key.clone()),
1944                    source: None,
1945                })? {
1946                    // Apply soft-delete fields via Automerge transaction
1947                    let mut doc = automerge_doc;
1948                    let mut tx = doc.transaction();
1949                    let root = automerge::ROOT;
1950                    tx.put(&root, "_deleted", true).ok();
1951                    tx.put(
1952                        &root,
1953                        "_deleted_at",
1954                        chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
1955                    )
1956                    .ok();
1957                    if let Some(r) = reason {
1958                        tx.put(&root, "_deleted_reason", r).ok();
1959                    }
1960                    tx.commit();
1961                    store.put(&doc_key, &doc).map_err(|e| Error::Storage {
1962                        message: e.to_string(),
1963                        operation: Some("put".to_string()),
1964                        key: Some(doc_key.clone()),
1965                        source: None,
1966                    })?;
1967
1968                    // Trigger sync so soft-delete propagates via normal document sync
1969                    if let Ok(()) = self.backend.sync_document(&doc_key).await {
1970                        tracing::debug!("Sync triggered for soft-deleted document {}", doc_key);
1971                    }
1972
1973                    Ok(crate::qos::DeleteResult::soft_deleted(policy.clone()))
1974                } else {
1975                    Ok(crate::qos::DeleteResult {
1976                        deleted: false,
1977                        tombstone_id: None,
1978                        expires_at: None,
1979                        policy: policy.clone(),
1980                    })
1981                }
1982            }
1983        }
1984    }
1985
1986    async fn is_deleted(&self, collection: &str, doc_id: &DocumentId) -> anyhow::Result<bool> {
1987        let store = self.backend.automerge_store();
1988
1989        // Check tombstone first
1990        if store.has_tombstone(collection, doc_id).unwrap_or(false) {
1991            return Ok(true);
1992        }
1993
1994        // Check soft-delete field
1995        let doc_key = format!("{}:{}", collection, doc_id);
1996        if let Ok(Some(automerge_doc)) = store.get(&doc_key) {
1997            use automerge::ReadDoc;
1998            if let Ok(Some((automerge::Value::Scalar(s), _))) =
1999                automerge_doc.get(automerge::ROOT, "_deleted")
2000            {
2001                if let automerge::ScalarValue::Boolean(true) = s.as_ref() {
2002                    return Ok(true);
2003                }
2004            }
2005        }
2006
2007        Ok(false)
2008    }
2009
2010    fn deletion_policy(&self, collection: &str) -> crate::qos::DeletionPolicy {
2011        self.deletion_policy_registry.get(collection)
2012    }
2013
2014    async fn get_tombstones(&self, collection: &str) -> anyhow::Result<Vec<Tombstone>> {
2015        self.backend
2016            .automerge_store()
2017            .get_tombstones_for_collection(collection)
2018            .map_err(|e| {
2019                Error::Storage {
2020                    message: e.to_string(),
2021                    operation: Some("get_tombstones".to_string()),
2022                    key: None,
2023                    source: None,
2024                }
2025                .into()
2026            })
2027    }
2028
2029    async fn apply_tombstone(&self, tombstone: &Tombstone) -> anyhow::Result<()> {
2030        let store = self.backend.automerge_store();
2031        store.put_tombstone(tombstone).map_err(|e| Error::Storage {
2032            message: e.to_string(),
2033            operation: Some("put_tombstone".to_string()),
2034            key: Some(format!(
2035                "{}:{}",
2036                tombstone.collection, tombstone.document_id
2037            )),
2038            source: None,
2039        })?;
2040
2041        // Remove the document if it exists
2042        let doc_key = format!("{}:{}", tombstone.collection, tombstone.document_id);
2043        store.delete(&doc_key).ok();
2044        Ok(())
2045    }
2046
2047    fn observe(&self, collection: &str, query: &Query) -> anyhow::Result<ChangeStream> {
2048        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
2049
2050        // Get initial snapshot
2051        // Issue #457: Use direct store scan to handle both Collection::upsert
2052        // and message_to_automerge storage formats
2053        let collection_prefix = format!("{}:", collection);
2054        let all_docs = self
2055            .backend
2056            .automerge_store()
2057            .scan_prefix(&collection_prefix)
2058            .map_err(|e| Error::Storage {
2059                message: e.to_string(),
2060                operation: Some("scan_prefix".to_string()),
2061                key: None,
2062                source: None,
2063            })?;
2064
2065        let mut initial_docs = Vec::new();
2066        for (doc_key, automerge_doc) in all_docs {
2067            // Extract doc_id from key
2068            let doc_id = match doc_key.strip_prefix(&collection_prefix) {
2069                Some(id) => id.to_string(),
2070                None => continue,
2071            };
2072
2073            // Convert Automerge doc to Document
2074            if let Ok(json_value) = automerge_to_message::<serde_json::Value>(&automerge_doc) {
2075                let fields = if let serde_json::Value::Object(map) = json_value {
2076                    map.into_iter().collect()
2077                } else {
2078                    serde_json::Map::new().into_iter().collect()
2079                };
2080                let doc = Document {
2081                    id: Some(doc_id),
2082                    fields,
2083                    updated_at: std::time::SystemTime::now(),
2084                };
2085
2086                if matches_query(&doc, query) {
2087                    initial_docs.push(doc);
2088                }
2089            }
2090        }
2091
2092        // Send initial snapshot
2093        let _ = tx.send(ChangeEvent::Initial {
2094            documents: initial_docs,
2095        });
2096
2097        // Subscribe to observer notifications from the store (Issue #221, Issue #377)
2098        // This enables emitting ChangeEvent::Updated when documents sync from peers.
2099        // Using subscribe_to_observer_changes() instead of subscribe_to_changes() ensures
2100        // we get notifications for ALL document changes, including remotely synced docs.
2101        let mut change_rx = self
2102            .backend
2103            .automerge_store()
2104            .subscribe_to_observer_changes();
2105        let collection_name = collection.to_string();
2106        let collection_prefix = format!("{}:", collection);
2107        let query_clone = query.clone();
2108        let backend = Arc::clone(&self.backend);
2109        let tx_clone = tx.clone();
2110
2111        // Spawn background task to listen for changes and emit Updated events
2112        tokio::spawn(async move {
2113            loop {
2114                match change_rx.recv().await {
2115                    Ok(doc_key) => {
2116                        // Check if this change is for our collection
2117                        if !doc_key.starts_with(&collection_prefix) {
2118                            continue;
2119                        }
2120
2121                        // Extract doc_id from key (format: "collection:doc_id")
2122                        let doc_id = match doc_key.strip_prefix(&collection_prefix) {
2123                            Some(id) => id.to_string(),
2124                            None => continue,
2125                        };
2126
2127                        // Fetch the updated document directly from store
2128                        // Issue #457: AutomergeSummaryStorage uses message_to_automerge which stores
2129                        // fields at ROOT, but Collection::get expects a "data" field wrapper.
2130                        // Use direct store access for consistent handling of all document formats.
2131                        let maybe_doc: Option<Document> = if let Ok(Some(automerge_doc)) =
2132                            backend.automerge_store().get(&doc_key)
2133                        {
2134                            // Convert Automerge doc to JSON Value, then to Document
2135                            if let Ok(json_value) =
2136                                automerge_to_message::<serde_json::Value>(&automerge_doc)
2137                            {
2138                                // Convert JSON Value to Document
2139                                let fields = if let serde_json::Value::Object(map) = json_value {
2140                                    map.into_iter().collect()
2141                                } else {
2142                                    serde_json::Map::new().into_iter().collect()
2143                                };
2144                                Some(Document {
2145                                    id: Some(doc_id.clone()),
2146                                    fields,
2147                                    updated_at: std::time::SystemTime::now(),
2148                                })
2149                            } else {
2150                                None
2151                            }
2152                        } else {
2153                            None
2154                        };
2155
2156                        if let Some(mut doc) = maybe_doc {
2157                            if doc.id.is_none() {
2158                                doc.id = Some(doc_id);
2159                            }
2160
2161                            // Check if document matches query
2162                            if matches_query(&doc, &query_clone) {
2163                                // Emit Updated event
2164                                if tx_clone
2165                                    .send(ChangeEvent::Updated {
2166                                        collection: collection_name.clone(),
2167                                        document: doc,
2168                                    })
2169                                    .is_err()
2170                                {
2171                                    // Receiver dropped, stop listening
2172                                    break;
2173                                }
2174                            }
2175                        }
2176                    }
2177                    Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
2178                        // Issue #346: When lagged, re-emit all documents in the collection
2179                        // to ensure observers don't miss updates. This is critical for
2180                        // metrics tracking and hierarchical aggregation callbacks.
2181                        tracing::warn!(
2182                            "Observer change notification lagged, skipped {} messages - re-emitting all documents",
2183                            n
2184                        );
2185
2186                        // Re-scan collection and emit Updated for all matching documents
2187                        // Issue #457: Use direct store scan to handle both Collection::upsert
2188                        // and message_to_automerge storage formats
2189                        let prefix = &collection_prefix;
2190                        if let Ok(all_docs) = backend.automerge_store().scan_prefix(prefix) {
2191                            for (doc_key, automerge_doc) in all_docs {
2192                                // Extract doc_id from key
2193                                let doc_id = match doc_key.strip_prefix(prefix) {
2194                                    Some(id) => id.to_string(),
2195                                    None => continue,
2196                                };
2197
2198                                // Try to convert Automerge doc to Document
2199                                let maybe_doc: Option<Document> = if let Ok(json_value) =
2200                                    automerge_to_message::<serde_json::Value>(&automerge_doc)
2201                                {
2202                                    let fields = if let serde_json::Value::Object(map) = json_value
2203                                    {
2204                                        map.into_iter().collect()
2205                                    } else {
2206                                        serde_json::Map::new().into_iter().collect()
2207                                    };
2208                                    Some(Document {
2209                                        id: Some(doc_id),
2210                                        fields,
2211                                        updated_at: std::time::SystemTime::now(),
2212                                    })
2213                                } else {
2214                                    None
2215                                };
2216
2217                                if let Some(doc) = maybe_doc {
2218                                    // Send event if document matches query
2219                                    #[allow(clippy::collapsible_if)]
2220                                    if matches_query(&doc, &query_clone) {
2221                                        if tx_clone
2222                                            .send(ChangeEvent::Updated {
2223                                                collection: collection_name.clone(),
2224                                                document: doc,
2225                                            })
2226                                            .is_err()
2227                                        {
2228                                            // Receiver dropped, stop listening
2229                                            break;
2230                                        }
2231                                    }
2232                                }
2233                            }
2234                        }
2235                    }
2236                    Err(tokio::sync::broadcast::error::RecvError::Closed) => {
2237                        // Channel closed, stop listening
2238                        break;
2239                    }
2240                }
2241            }
2242        });
2243
2244        Ok(ChangeStream { receiver: rx })
2245    }
2246}
2247
2248// PeerDiscovery implementation for AutomergeIrohBackend
2249struct IrohPeerDiscovery {
2250    transport: Arc<crate::network::IrohTransport>,
2251    peer_callbacks: PeerCallbacks,
2252    #[cfg(feature = "automerge-backend")]
2253    discovery_manager: Arc<tokio::sync::RwLock<crate::discovery::peer::DiscoveryManager>>,
2254    /// Formation key for peer authentication (required for secure connections)
2255    #[cfg(feature = "automerge-backend")]
2256    formation_key: Arc<std::sync::RwLock<Option<crate::security::FormationKey>>>,
2257    /// Whether the event forwarder task is running (Issue #275)
2258    event_forwarder_running: Arc<std::sync::atomic::AtomicBool>,
2259    /// Optional topology event receiver for topology-driven connections
2260    #[cfg(feature = "automerge-backend")]
2261    topology_event_rx:
2262        Arc<tokio::sync::Mutex<Option<mpsc::UnboundedReceiver<TopologyConnectionEvent>>>>,
2263    /// Maximum peer connections when topology manager is not configured
2264    #[cfg(feature = "automerge-backend")]
2265    max_connections: usize,
2266}
2267
2268#[async_trait]
2269impl PeerDiscovery for IrohPeerDiscovery {
2270    async fn start(&self) -> anyhow::Result<()> {
2271        // Get formation key for authentication (required)
2272        let formation_key = self
2273            .formation_key
2274            .read()
2275            .map_err(|_| Error::Internal("formation_key lock poisoned".into()))?
2276            .clone()
2277            .ok_or_else(|| Error::Internal("Formation key not initialized".to_string()))?;
2278
2279        // Start authenticated accept loop (replaces simple start_accept_loop)
2280        // This spawns a background task that accepts connections and performs handshake
2281        //
2282        // IMPORTANT (Issue #229): We MUST mark the accept loop as managed BEFORE spawning
2283        // our custom loop. This prevents AutomergeBackend::start_sync() from starting a
2284        // duplicate accept loop via transport.start_accept_loop(), which would cause
2285        // competing loops where one might accept connections without doing the handshake.
2286        #[cfg(feature = "automerge-backend")]
2287        {
2288            // Mark accept loop as externally managed to prevent duplicate loops
2289            self.transport.mark_accept_loop_managed().map_err(|e| {
2290                Error::Internal(format!("Failed to mark accept loop as managed: {}", e))
2291            })?;
2292
2293            let transport = Arc::clone(&self.transport);
2294            let formation_key_accept = formation_key.clone();
2295
2296            tokio::spawn(async move {
2297                use crate::network::formation_handshake::perform_responder_handshake;
2298
2299                // Issue #346: Track consecutive errors to detect permanent failures
2300                let mut consecutive_errors = 0u32;
2301                const MAX_CONSECUTIVE_ERRORS: u32 = 10;
2302
2303                loop {
2304                    // Accept incoming connection
2305                    // Note (Issue #229): accept() returns Option<Connection>
2306                    // - Some(conn) = new connection that needs authentication
2307                    // - None = duplicate/transient (already handled or failed QUIC handshake)
2308                    match transport.accept().await {
2309                        Ok(Some(conn)) => {
2310                            consecutive_errors = 0; // Reset on success
2311                            let peer_id = conn.remote_id();
2312
2313                            // Perform formation handshake to authenticate peer
2314                            match perform_responder_handshake(&conn, &formation_key_accept).await {
2315                                Ok(()) => {
2316                                    // Issue #346: Emit Connected AFTER successful handshake
2317                                    transport.emit_peer_connected(peer_id);
2318                                }
2319                                Err(e) => {
2320                                    tracing::warn!(
2321                                        ?peer_id,
2322                                        error = %e,
2323                                        "Formation handshake failed"
2324                                    );
2325                                    // Close the unauthenticated connection - connection monitor
2326                                    // will handle cleanup (Issue #346 stable_id check)
2327                                    conn.close(1u32.into(), b"authentication failed");
2328                                }
2329                            }
2330                        }
2331                        Ok(None) => {
2332                            // Issue #346: This now includes transient errors (failed QUIC handshake)
2333                            // as well as duplicate connections. Either way, continue accepting.
2334                            consecutive_errors = 0; // Reset - we're still accepting
2335                        }
2336                        Err(e) => {
2337                            // Issue #346: Only fatal errors (endpoint closed) should stop the loop
2338                            // But add a circuit breaker for repeated failures
2339                            consecutive_errors += 1;
2340                            let error_msg = format!("{}", e);
2341
2342                            if error_msg.contains("Endpoint closed")
2343                                || error_msg.contains("no more")
2344                            {
2345                                tracing::info!("Accept loop stopped: endpoint closed");
2346                                break;
2347                            }
2348
2349                            if consecutive_errors >= MAX_CONSECUTIVE_ERRORS {
2350                                tracing::error!(
2351                                    consecutive_errors,
2352                                    error = %e,
2353                                    "Accept loop stopping after {} consecutive errors",
2354                                    MAX_CONSECUTIVE_ERRORS
2355                                );
2356                                break;
2357                            }
2358
2359                            tracing::warn!(
2360                                error = %e,
2361                                consecutive_errors,
2362                                "Accept error (will retry, {} more before stopping)",
2363                                MAX_CONSECUTIVE_ERRORS - consecutive_errors
2364                            );
2365                            // Small delay before retrying to avoid tight error loop
2366                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2367                        }
2368                    }
2369                }
2370                tracing::info!("Authenticated accept loop stopped");
2371            });
2372        }
2373
2374        // Start discovery manager
2375        #[cfg(feature = "automerge-backend")]
2376        {
2377            let mut manager = self.discovery_manager.write().await;
2378            manager.start().await.map_err(|e| {
2379                Error::Internal(format!("Failed to start discovery manager: {}", e))
2380            })?;
2381        }
2382
2383        // Spawn mDNS discovery event handler (Issue #233)
2384        // This subscribes to Iroh's MdnsDiscovery stream and connects to newly discovered peers.
2385        // Without this, mDNS only populates the address book but doesn't trigger connections.
2386        #[cfg(feature = "automerge-backend")]
2387        if let Some(mdns) = self.transport.mdns_discovery() {
2388            use futures_lite::StreamExt;
2389            use iroh::address_lookup::mdns::DiscoveryEvent;
2390
2391            let mdns = mdns.clone();
2392            let transport = Arc::clone(&self.transport);
2393            let formation_key_mdns = formation_key.clone();
2394
2395            tokio::spawn(async move {
2396                use crate::network::formation_handshake::perform_initiator_handshake;
2397
2398                tracing::info!("Starting mDNS discovery event handler");
2399                let mut stream = mdns.subscribe().await;
2400
2401                while let Some(event) = stream.next().await {
2402                    match event {
2403                        DiscoveryEvent::Discovered { endpoint_info, .. } => {
2404                            let peer_id = endpoint_info.endpoint_id;
2405                            tracing::info!(
2406                                peer_id = %peer_id,
2407                                "mDNS discovered peer, attempting connection"
2408                            );
2409
2410                            // Check if already connected
2411                            if transport.get_connection(&peer_id).is_some() {
2412                                tracing::debug!(
2413                                    peer_id = %peer_id,
2414                                    "Already connected to mDNS-discovered peer"
2415                                );
2416                                continue;
2417                            }
2418
2419                            // Connect using just the EndpointId (addresses from mDNS discovery)
2420                            match transport.connect_by_id(peer_id).await {
2421                                Ok(Some(conn)) => {
2422                                    // New connection - perform formation handshake
2423                                    match perform_initiator_handshake(&conn, &formation_key_mdns)
2424                                        .await
2425                                    {
2426                                        Ok(()) => {
2427                                            tracing::info!(
2428                                                peer_id = %peer_id,
2429                                                "mDNS peer connected and authenticated"
2430                                            );
2431                                            // Issue #346: Emit Connected AFTER successful handshake
2432                                            transport.emit_peer_connected(peer_id);
2433                                        }
2434                                        Err(e) => {
2435                                            tracing::warn!(
2436                                                peer_id = %peer_id,
2437                                                error = %e,
2438                                                "mDNS peer failed authentication"
2439                                            );
2440                                            conn.close(1u32.into(), b"authentication failed");
2441                                            transport.disconnect(&peer_id).ok();
2442                                        }
2443                                    }
2444                                }
2445                                Ok(None) => {
2446                                    // Accept path is handling connection
2447                                    tracing::debug!(
2448                                        peer_id = %peer_id,
2449                                        "mDNS peer connection handled by accept path"
2450                                    );
2451                                }
2452                                Err(e) => {
2453                                    tracing::debug!(
2454                                        peer_id = %peer_id,
2455                                        error = %e,
2456                                        "Failed to connect to mDNS-discovered peer"
2457                                    );
2458                                }
2459                            }
2460                        }
2461                        DiscoveryEvent::Expired { endpoint_id } => {
2462                            tracing::debug!(
2463                                peer_id = %endpoint_id,
2464                                "mDNS peer expired (no longer advertising)"
2465                            );
2466                            // Note: We don't disconnect immediately since the peer might still
2467                            // be reachable. The connection will fail naturally if unreachable.
2468                        }
2469                    }
2470                }
2471                tracing::debug!("mDNS discovery event handler stopped");
2472            });
2473        }
2474
2475        // Check if topology-driven connection management is configured
2476        #[cfg(feature = "automerge-backend")]
2477        let has_topology_events = {
2478            let guard = self.topology_event_rx.lock().await;
2479            guard.is_some()
2480        };
2481
2482        // Spawn topology event handler if configured (prevents N² mesh)
2483        #[cfg(feature = "automerge-backend")]
2484        if has_topology_events {
2485            let topology_rx = self.topology_event_rx.clone();
2486            let transport = Arc::clone(&self.transport);
2487            let formation_key_topology = formation_key.clone();
2488
2489            tokio::spawn(async move {
2490                use crate::network::formation_handshake::perform_initiator_handshake;
2491                use crate::network::PeerInfo as NetworkPeerInfo;
2492
2493                // Take the receiver from the mutex
2494                let mut rx = {
2495                    let mut guard = topology_rx.lock().await;
2496                    guard.take()
2497                };
2498
2499                if let Some(ref mut receiver) = rx {
2500                    tracing::info!("Topology-driven connection management enabled");
2501
2502                    while let Some(event) = receiver.recv().await {
2503                        match event {
2504                            TopologyConnectionEvent::ConnectPeer {
2505                                peer_id,
2506                                addresses,
2507                                relay_url,
2508                            } => {
2509                                tracing::debug!(
2510                                    peer_id = %peer_id,
2511                                    "Topology event: connecting to peer"
2512                                );
2513
2514                                let network_peer_info = NetworkPeerInfo {
2515                                    name: peer_id.clone(),
2516                                    node_id: peer_id.clone(),
2517                                    addresses,
2518                                    relay_url,
2519                                };
2520
2521                                match transport.connect_peer(&network_peer_info).await {
2522                                    Ok(Some(conn)) => {
2523                                        // Give accept loop time for conflict resolution
2524                                        tokio::task::yield_now().await;
2525                                        tokio::time::sleep(tokio::time::Duration::from_millis(10))
2526                                            .await;
2527
2528                                        if conn.close_reason().is_some() {
2529                                            tracing::debug!(
2530                                                "Topology peer {} superseded by accept path",
2531                                                peer_id
2532                                            );
2533                                            continue;
2534                                        }
2535
2536                                        // Perform formation handshake
2537                                        match perform_initiator_handshake(
2538                                            &conn,
2539                                            &formation_key_topology,
2540                                        )
2541                                        .await
2542                                        {
2543                                            Ok(()) => {
2544                                                // Convert peer_id hex string to EndpointId
2545                                                if let Ok(bytes) = hex::decode(&peer_id) {
2546                                                    if bytes.len() == 32 {
2547                                                        let mut array = [0u8; 32];
2548                                                        array.copy_from_slice(&bytes);
2549                                                        if let Ok(endpoint_id) =
2550                                                            iroh::EndpointId::from_bytes(&array)
2551                                                        {
2552                                                            transport
2553                                                                .emit_peer_connected(endpoint_id);
2554                                                            tracing::info!(
2555                                                                "Topology: connected and authenticated with peer: {}",
2556                                                                peer_id
2557                                                            );
2558                                                        }
2559                                                    }
2560                                                }
2561                                            }
2562                                            Err(e) => {
2563                                                tracing::warn!(
2564                                                    "Topology peer {} failed authentication: {}",
2565                                                    peer_id,
2566                                                    e
2567                                                );
2568                                                conn.close(1u32.into(), b"authentication failed");
2569                                            }
2570                                        }
2571                                    }
2572                                    Ok(None) => {
2573                                        tracing::debug!(
2574                                            "Topology peer {} handled by accept path",
2575                                            peer_id
2576                                        );
2577                                    }
2578                                    Err(e) => {
2579                                        tracing::debug!(
2580                                            "Failed to connect to topology peer {}: {}",
2581                                            peer_id,
2582                                            e
2583                                        );
2584                                    }
2585                                }
2586                            }
2587                            TopologyConnectionEvent::DisconnectPeer { peer_id } => {
2588                                tracing::debug!(
2589                                    peer_id = %peer_id,
2590                                    "Topology event: disconnecting from peer"
2591                                );
2592                                // Convert peer_id hex string to EndpointId
2593                                if let Ok(bytes) = hex::decode(&peer_id) {
2594                                    if bytes.len() == 32 {
2595                                        let mut array = [0u8; 32];
2596                                        array.copy_from_slice(&bytes);
2597                                        if let Ok(endpoint_id) =
2598                                            iroh::EndpointId::from_bytes(&array)
2599                                        {
2600                                            let _ = transport.disconnect(&endpoint_id);
2601                                        }
2602                                    }
2603                                }
2604                            }
2605                        }
2606                    }
2607                }
2608                tracing::debug!("Topology event handler stopped");
2609            });
2610        }
2611
2612        // Spawn background task to connect to discovered peers (with authentication)
2613        // Only runs if topology events are NOT configured (fallback to limited discovery)
2614        #[cfg(feature = "automerge-backend")]
2615        if !has_topology_events {
2616            let discovery_manager = Arc::clone(&self.discovery_manager);
2617            let transport = Arc::clone(&self.transport);
2618            let formation_key_connect = formation_key;
2619            let max_connections = self.max_connections;
2620
2621            tokio::spawn(async move {
2622                use crate::network::formation_handshake::perform_initiator_handshake;
2623                use crate::network::iroh_transport::TransportPeerEvent;
2624                use crate::network::PeerInfo as NetworkPeerInfo;
2625
2626                tracing::info!(
2627                    "Discovery-based connection management enabled (max {} connections)",
2628                    max_connections
2629                );
2630
2631                // Subscribe to peer events for immediate reconnection on disconnect (Issue #504)
2632                let mut peer_events = transport.subscribe_peer_events();
2633
2634                // Adaptive interval: start fast (1s), slow down once mesh is stable (up to 5s)
2635                let mut interval_secs = 1u64;
2636                let mut consecutive_no_new_connections = 0u32;
2637
2638                loop {
2639                    // Issue #504: Use select! to react immediately to disconnect events
2640                    // instead of waiting up to 5 seconds for the next polling cycle
2641                    let sleep_future =
2642                        tokio::time::sleep(std::time::Duration::from_secs(interval_secs));
2643                    tokio::pin!(sleep_future);
2644
2645                    tokio::select! {
2646                        _ = &mut sleep_future => {
2647                            // Normal timeout - continue with connection cycle
2648                        }
2649                        event = peer_events.recv() => {
2650                            match event {
2651                                Some(TransportPeerEvent::Disconnected { endpoint_id, reason }) => {
2652                                    tracing::debug!(
2653                                        peer = %endpoint_id.fmt_short(),
2654                                        reason = %reason,
2655                                        "Peer disconnected - triggering immediate reconnection attempt"
2656                                    );
2657                                    // Reset to fast polling for quick recovery
2658                                    interval_secs = 1;
2659                                    consecutive_no_new_connections = 0;
2660                                }
2661                                Some(TransportPeerEvent::Connected { .. }) => {
2662                                    // New connection - continue normally
2663                                }
2664                                None => {
2665                                    // Channel closed, exit the loop
2666                                    tracing::debug!("Peer event channel closed, stopping connection manager");
2667                                    break;
2668                                }
2669                            }
2670                        }
2671                    }
2672
2673                    // Check current connection count
2674                    let current_connections = transport.connected_peers().len();
2675                    if current_connections >= max_connections {
2676                        tracing::debug!(
2677                            "At max connections ({}/{}), skipping discovery connect cycle",
2678                            current_connections,
2679                            max_connections
2680                        );
2681                        consecutive_no_new_connections += 1;
2682                        if consecutive_no_new_connections >= 3 && interval_secs < 5 {
2683                            interval_secs = (interval_secs * 2).min(5);
2684                        }
2685                        continue;
2686                    }
2687
2688                    // Get discovered peers
2689                    let manager = discovery_manager.read().await;
2690                    let discovered_peers = manager.get_peers().await;
2691                    drop(manager);
2692
2693                    // Try to connect to discovered peers (up to max_connections limit)
2694                    let mut made_new_connection = false;
2695                    let slots_available = max_connections.saturating_sub(current_connections);
2696
2697                    for peer in discovered_peers.into_iter().take(slots_available) {
2698                        // Convert discovery::peer::PeerInfo to network::PeerInfo
2699                        let network_peer_info = NetworkPeerInfo {
2700                            name: peer.name.clone(),
2701                            node_id: peer.node_id.clone(),
2702                            addresses: peer.addresses.clone(),
2703                            relay_url: peer.relay_url.clone(),
2704                        };
2705
2706                        // Try to connect to the peer
2707                        // connect_peer() returns Option<Connection> (Issue #229):
2708                        // - Some(conn): New connection, we need to do initiator handshake
2709                        // - None: Already connected via accept path, no action needed
2710                        if let Ok(endpoint_id) = peer.endpoint_id() {
2711                            match transport.connect_peer(&network_peer_info).await {
2712                                Ok(Some(conn)) => {
2713                                    // Issue #346: Give the accept loop a moment to process any
2714                                    // incoming connection from this peer. In symmetric discovery
2715                                    // (both peers have each other in config), both will connect
2716                                    // simultaneously and the accept loop needs time to process
2717                                    // the incoming connection and do conflict resolution.
2718                                    tokio::task::yield_now().await;
2719                                    tokio::time::sleep(tokio::time::Duration::from_millis(10))
2720                                        .await;
2721
2722                                    // Check if connection was closed by conflict resolution
2723                                    // (accept path superseded this connection).
2724                                    if conn.close_reason().is_some() {
2725                                        tracing::debug!(
2726                                            "Peer {} connection superseded by accept path",
2727                                            peer.name
2728                                        );
2729                                        continue;
2730                                    }
2731
2732                                    // New connection - perform formation handshake
2733                                    match perform_initiator_handshake(&conn, &formation_key_connect)
2734                                        .await
2735                                    {
2736                                        Ok(()) => {
2737                                            tracing::info!(
2738                                                "Connected and authenticated with peer: {} ({}/{})",
2739                                                peer.name,
2740                                                transport.connected_peers().len(),
2741                                                max_connections
2742                                            );
2743                                            // Issue #346: Emit Connected AFTER successful handshake
2744                                            transport.emit_peer_connected(endpoint_id);
2745                                            made_new_connection = true;
2746                                        }
2747                                        Err(e) => {
2748                                            tracing::warn!(
2749                                                "Peer {} failed authentication: {}. Disconnecting.",
2750                                                peer.name,
2751                                                e
2752                                            );
2753                                            // Issue #346: Don't call disconnect() here - the connection
2754                                            // in the map might be a different one after conflict resolution.
2755                                            // conn.close() is sufficient; close monitor handles cleanup.
2756                                            conn.close(1u32.into(), b"authentication failed");
2757                                        }
2758                                    }
2759                                }
2760                                Ok(None) => {
2761                                    // Accept path is handling connection
2762                                    tracing::debug!(
2763                                        "Peer {} connection handled by accept path",
2764                                        peer.name
2765                                    );
2766                                }
2767                                Err(e) => {
2768                                    tracing::debug!(
2769                                        "Failed to connect to discovered peer {}: {}",
2770                                        peer.name,
2771                                        e
2772                                    );
2773                                }
2774                            }
2775                        }
2776                    }
2777
2778                    // Adaptive backoff: stay fast while forming mesh, slow down once stable
2779                    if made_new_connection {
2780                        // Reset to fast polling when we're actively connecting
2781                        interval_secs = 1;
2782                        consecutive_no_new_connections = 0;
2783                    } else {
2784                        consecutive_no_new_connections += 1;
2785                        // After 3 cycles with no new connections, increase interval (max 5s)
2786                        if consecutive_no_new_connections >= 3 && interval_secs < 5 {
2787                            interval_secs = (interval_secs * 2).min(5);
2788                            tracing::debug!(
2789                                "Mesh stable, increasing connect interval to {}s",
2790                                interval_secs
2791                            );
2792                        }
2793                    }
2794                }
2795            });
2796        }
2797
2798        Ok(())
2799    }
2800
2801    async fn stop(&self) -> anyhow::Result<()> {
2802        Ok(())
2803    }
2804
2805    async fn discovered_peers(&self) -> anyhow::Result<Vec<PeerInfo>> {
2806        let mut peers = Vec::new();
2807
2808        // Get connected peers from transport
2809        let peer_ids = self.transport.connected_peers();
2810        for peer_id in peer_ids {
2811            if self.transport.get_connection(&peer_id).is_some() {
2812                peers.push(PeerInfo {
2813                    peer_id: hex::encode(peer_id.as_bytes()),
2814                    address: None,
2815                    transport: TransportType::Custom,
2816                    connected: true,
2817                    last_seen: std::time::SystemTime::now(),
2818                    metadata: HashMap::new(),
2819                });
2820            }
2821        }
2822
2823        // Add discovered but not yet connected peers from discovery manager
2824        #[cfg(feature = "automerge-backend")]
2825        {
2826            let manager = self.discovery_manager.read().await;
2827            for discovered_peer in manager.get_peers().await {
2828                // Check if already connected
2829                if !peers.iter().any(|p| p.peer_id == discovered_peer.node_id) {
2830                    peers.push(PeerInfo {
2831                        peer_id: discovered_peer.node_id.clone(),
2832                        address: discovered_peer.addresses.first().cloned(),
2833                        transport: TransportType::Custom,
2834                        connected: false,
2835                        last_seen: std::time::SystemTime::now(),
2836                        metadata: HashMap::new(),
2837                    });
2838                }
2839            }
2840        }
2841
2842        Ok(peers)
2843    }
2844
2845    async fn add_peer(&self, address: &str, _transport: TransportType) -> anyhow::Result<()> {
2846        use crate::network::iroh_transport::IrohTransport;
2847        use crate::network::PeerInfo as NetworkPeerInfo;
2848
2849        // Get formation key for authentication
2850        let formation_key = self
2851            .formation_key
2852            .read()
2853            .map_err(|_| Error::Internal("formation_key lock poisoned".into()))?
2854            .clone()
2855            .ok_or_else(|| Error::Internal("Formation key not initialized".to_string()))?;
2856
2857        // Parse address format (Issue #226):
2858        // Format 1: "seed|hostname:port" - Derives EndpointId from seed (for containerlab)
2859        // Format 2: "hex_node_id" - Raw hex EndpointId (legacy static config)
2860        //
2861        // Example: "alpha-formation/node-1|192.168.1.100:9000"
2862        let (node_id, socket_addr) = if address.contains('|') {
2863            // Seed-based format: "seed|address"
2864            let parts: Vec<&str> = address.splitn(2, '|').collect();
2865            if parts.len() != 2 {
2866                return Err(Error::Internal(format!(
2867                    "Invalid address format: {}. Expected 'seed|host:port'",
2868                    address
2869                ))
2870                .into());
2871            }
2872            let seed = parts[0];
2873            let addr = parts[1];
2874
2875            // Derive EndpointId from seed using deterministic key generation
2876            let endpoint_id = IrohTransport::endpoint_id_from_seed(seed);
2877            let node_id_hex = hex::encode(endpoint_id.as_bytes());
2878
2879            tracing::debug!(
2880                seed = seed,
2881                node_id = %node_id_hex,
2882                address = addr,
2883                "Derived EndpointId from seed for add_peer"
2884            );
2885
2886            (node_id_hex, addr.to_string())
2887        } else {
2888            // Legacy format: assume address is a hex-encoded EndpointId
2889            // (for backwards compatibility with existing static configs)
2890            (address.to_string(), address.to_string())
2891        };
2892
2893        let peer_info = NetworkPeerInfo {
2894            name: "manual-peer".to_string(),
2895            node_id,
2896            addresses: vec![socket_addr],
2897            relay_url: None,
2898        };
2899
2900        // Connect to peer (conflict resolution handled by transport layer)
2901        let conn_opt =
2902            self.transport
2903                .connect_peer(&peer_info)
2904                .await
2905                .map_err(|e| Error::Network {
2906                    message: format!("Failed to connect to peer: {}", e),
2907                    peer_id: None,
2908                    source: None,
2909                })?;
2910
2911        // Perform formation handshake to authenticate (only if we got a new connection)
2912        #[cfg(feature = "automerge-backend")]
2913        if let Some(conn) = conn_opt {
2914            use crate::network::formation_handshake::perform_initiator_handshake;
2915
2916            let endpoint_id = conn.remote_id();
2917            if let Err(e) = perform_initiator_handshake(&conn, &formation_key).await {
2918                // Authentication failed - close the connection
2919                // Issue #346: Don't call disconnect() here - the connection
2920                // in the map might be a different one after conflict resolution.
2921                // conn.close() is sufficient; close monitor handles cleanup.
2922                conn.close(1u32.into(), b"authentication failed");
2923
2924                return Err(Error::Network {
2925                    message: format!("Peer authentication failed: {}", e),
2926                    peer_id: Some(address.to_string()),
2927                    source: None,
2928                }
2929                .into());
2930            }
2931            // Issue #346: Emit Connected AFTER successful handshake
2932            self.transport.emit_peer_connected(endpoint_id);
2933        }
2934        // If conn_opt is None, accept path is handling the connection
2935
2936        Ok(())
2937    }
2938
2939    async fn wait_for_peer(&self, peer_id: &PeerId, timeout: Duration) -> anyhow::Result<()> {
2940        let start = std::time::Instant::now();
2941
2942        loop {
2943            let peers = self.discovered_peers().await?;
2944            if peers.iter().any(|p| &p.peer_id == peer_id) {
2945                return Ok(());
2946            }
2947
2948            if start.elapsed() > timeout {
2949                return Err(Error::Network {
2950                    message: format!("Timeout waiting for peer: {}", peer_id),
2951                    peer_id: Some(peer_id.clone()),
2952                    source: None,
2953                }
2954                .into());
2955            }
2956
2957            tokio::time::sleep(Duration::from_millis(100)).await;
2958        }
2959    }
2960
2961    fn on_peer_event(&self, callback: Box<dyn Fn(PeerEvent) + Send + Sync>) {
2962        if let Ok(mut callbacks) = self.peer_callbacks.lock() {
2963            callbacks.push(callback);
2964        }
2965
2966        // Start event forwarder on first callback registration (Issue #275)
2967        // Use compare_exchange to ensure we only start once
2968        if self
2969            .event_forwarder_running
2970            .compare_exchange(
2971                false,
2972                true,
2973                std::sync::atomic::Ordering::SeqCst,
2974                std::sync::atomic::Ordering::SeqCst,
2975            )
2976            .is_ok()
2977        {
2978            // Subscribe to transport events and forward to callbacks
2979            let mut rx = self.transport.subscribe_peer_events();
2980            let callbacks = Arc::clone(&self.peer_callbacks);
2981            let running = Arc::clone(&self.event_forwarder_running);
2982
2983            // Spawn the forwarder task using std::thread with a tokio runtime
2984            // (since on_peer_event is not async)
2985            std::thread::spawn(move || {
2986                let rt = tokio::runtime::Builder::new_current_thread()
2987                    .enable_all()
2988                    .build()
2989                    .expect("Failed to create event forwarder runtime");
2990
2991                rt.block_on(async move {
2992                    use crate::network::TransportPeerEvent;
2993
2994                    while running.load(std::sync::atomic::Ordering::SeqCst) {
2995                        match tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
2996                            .await
2997                        {
2998                            Ok(Some(transport_event)) => {
2999                                // Convert TransportPeerEvent to PeerEvent
3000                                let peer_event = match transport_event {
3001                                    TransportPeerEvent::Connected { endpoint_id, .. } => {
3002                                        PeerEvent::Connected(PeerInfo {
3003                                            peer_id: format!("{:?}", endpoint_id),
3004                                            address: None,
3005                                            transport: TransportType::Tcp, // QUIC maps to TCP for now
3006                                            connected: true,
3007                                            last_seen: std::time::SystemTime::now(),
3008                                            metadata: std::collections::HashMap::new(),
3009                                        })
3010                                    }
3011                                    TransportPeerEvent::Disconnected {
3012                                        endpoint_id,
3013                                        reason,
3014                                    } => PeerEvent::Disconnected {
3015                                        peer_id: format!("{:?}", endpoint_id),
3016                                        reason: Some(reason),
3017                                    },
3018                                };
3019
3020                                // Invoke all callbacks
3021                                if let Ok(cbs) = callbacks.lock() {
3022                                    for cb in cbs.iter() {
3023                                        cb(peer_event.clone());
3024                                    }
3025                                }
3026                            }
3027                            Ok(None) => {
3028                                // Channel closed, stop forwarder
3029                                break;
3030                            }
3031                            Err(_) => {
3032                                // Timeout - continue to check running flag
3033                            }
3034                        }
3035                    }
3036                });
3037            });
3038        }
3039    }
3040
3041    async fn get_peer_info(&self, peer_id: &PeerId) -> anyhow::Result<Option<PeerInfo>> {
3042        let peers = self.discovered_peers().await?;
3043        Ok(peers.into_iter().find(|p| &p.peer_id == peer_id))
3044    }
3045}
3046
3047// SyncEngine implementation for AutomergeIrohBackend
3048struct IrohSyncEngine {
3049    backend: Arc<crate::storage::AutomergeBackend>,
3050    transport: Arc<crate::network::IrohTransport>,
3051    formation_key: Option<crate::security::FormationKey>,
3052}
3053
3054#[async_trait]
3055impl SyncEngine for IrohSyncEngine {
3056    async fn start_sync(&self) -> anyhow::Result<()> {
3057        use crate::storage::capabilities::SyncCapable;
3058        self.backend.start_sync().map_err(|e| Error::Storage {
3059            message: format!("Failed to start sync: {}", e),
3060            operation: Some("start_sync".to_string()),
3061            key: None,
3062            source: None,
3063        })?;
3064        Ok(())
3065    }
3066
3067    async fn stop_sync(&self) -> anyhow::Result<()> {
3068        use crate::storage::capabilities::SyncCapable;
3069        self.backend.stop_sync().map_err(|e| Error::Storage {
3070            message: format!("Failed to stop sync: {}", e),
3071            operation: Some("stop_sync".to_string()),
3072            key: None,
3073            source: None,
3074        })?;
3075        Ok(())
3076    }
3077
3078    async fn subscribe(
3079        &self,
3080        collection: &str,
3081        _query: &Query,
3082    ) -> anyhow::Result<SyncSubscription> {
3083        Ok(SyncSubscription::new(collection, ()))
3084    }
3085
3086    async fn is_syncing(&self) -> anyhow::Result<bool> {
3087        use crate::storage::capabilities::SyncCapable;
3088        let stats = self.backend.sync_stats().map_err(|e| Error::Storage {
3089            message: format!("Failed to get sync stats: {}", e),
3090            operation: Some("sync_stats".to_string()),
3091            key: None,
3092            source: None,
3093        })?;
3094        Ok(stats.peer_count > 0)
3095    }
3096
3097    /// Connect to a peer using their EndpointId and addresses (Issue #235)
3098    ///
3099    /// This enables static peer configuration in containerlab and similar environments
3100    /// where mDNS discovery may not work across network namespaces.
3101    async fn connect_to_peer(
3102        &self,
3103        endpoint_id_hex: &str,
3104        addresses: &[String],
3105    ) -> anyhow::Result<bool> {
3106        use crate::network::PeerInfo as NetworkPeerInfo;
3107
3108        // Parse the endpoint ID from hex
3109        let endpoint_id_bytes = hex::decode(endpoint_id_hex)
3110            .map_err(|e| Error::Internal(format!("Invalid endpoint_id_hex: {}", e)))?;
3111
3112        if endpoint_id_bytes.len() != 32 {
3113            return Err(Error::Internal(format!(
3114                "Invalid endpoint_id_hex length: expected 32 bytes, got {}",
3115                endpoint_id_bytes.len()
3116            ))
3117            .into());
3118        }
3119
3120        // Issue #346: Removed tie-breaking from sync layer
3121        //
3122        // Tie-breaking is handled by the transport layer (IrohTransport::connect).
3123        // For static configurations (TCP_CONNECT), we should always attempt to connect
3124        // when explicitly configured. The transport will return Ok(None) if we should
3125        // wait for the peer to connect to us, which we handle below.
3126        //
3127        // Having tie-breaking at BOTH layers caused connections to fail when:
3128        // - Child node (soldier) has higher EndpointId than parent (squad leader)
3129        // - Child's TCP_CONNECT says "connect to parent"
3130        // - Sync layer tie-breaking blocked the connection
3131        // - Parent doesn't have child in config, so never connects
3132        // - Result: no connection!
3133        let our_endpoint_id = self.transport.endpoint_id();
3134        let our_endpoint_hex = hex::encode(our_endpoint_id.as_bytes());
3135
3136        tracing::debug!(
3137            our_endpoint = %our_endpoint_hex,
3138            peer_endpoint = %endpoint_id_hex,
3139            addresses = ?addresses,
3140            "Connecting to peer via static configuration"
3141        );
3142
3143        // Create PeerInfo for the transport
3144        let peer_info = NetworkPeerInfo {
3145            name: format!("peer-{}", &endpoint_id_hex[..8]),
3146            node_id: endpoint_id_hex.to_string(),
3147            addresses: addresses.to_vec(),
3148            relay_url: None,
3149        };
3150
3151        // Issue #346: connect_peer returns Option<Connection>
3152        // - Some(conn): New connection, we need to do initiator handshake
3153        // - None: Accept path is handling, no action needed
3154        match self.transport.connect_peer(&peer_info).await {
3155            Ok(Some(conn)) => {
3156                // Issue #346: Check if connection was closed by conflict resolution
3157                if conn.close_reason().is_some() {
3158                    tracing::debug!(
3159                        peer_endpoint = %endpoint_id_hex,
3160                        "Connection superseded by accept path"
3161                    );
3162                    return Ok(false);
3163                }
3164
3165                // New connection - perform formation handshake
3166                if let Some(ref formation_key) = self.formation_key {
3167                    use crate::network::formation_handshake::perform_initiator_handshake;
3168                    match perform_initiator_handshake(&conn, formation_key).await {
3169                        Ok(()) => {
3170                            tracing::info!(
3171                                peer_endpoint = %endpoint_id_hex,
3172                                "Successfully connected to peer and authenticated"
3173                            );
3174                            // Issue #378: Emit peer connected event to notify sync handlers
3175                            if let Ok(peer_id) = peer_info.endpoint_id() {
3176                                self.transport.emit_peer_connected(peer_id);
3177                            }
3178                            Ok(true)
3179                        }
3180                        Err(e) => {
3181                            tracing::warn!(
3182                                peer_endpoint = %endpoint_id_hex,
3183                                error = %e,
3184                                "Peer authentication failed"
3185                            );
3186                            // Close the connection on auth failure
3187                            if let Ok(peer_id) = peer_info.endpoint_id() {
3188                                conn.close(1u32.into(), b"authentication failed");
3189                                self.transport.disconnect(&peer_id).ok();
3190                            }
3191                            Err(Error::Network {
3192                                message: format!("Peer authentication failed: {}", e),
3193                                peer_id: Some(endpoint_id_hex.to_string()),
3194                                source: None,
3195                            }
3196                            .into())
3197                        }
3198                    }
3199                } else {
3200                    // No formation key - just report connected
3201                    tracing::info!(
3202                        peer_endpoint = %endpoint_id_hex,
3203                        "Successfully connected to peer (no authentication)"
3204                    );
3205                    // Issue #378: Emit peer connected event to notify sync handlers
3206                    if let Ok(peer_id) = peer_info.endpoint_id() {
3207                        self.transport.emit_peer_connected(peer_id);
3208                    }
3209                    Ok(true)
3210                }
3211            }
3212            Ok(None) => {
3213                // Accept path is handling connection
3214                tracing::debug!(
3215                    peer_endpoint = %endpoint_id_hex,
3216                    "Connection handled by accept path"
3217                );
3218                // Return true since a connection will be established via accept path
3219                Ok(true)
3220            }
3221            Err(e) => {
3222                tracing::warn!(
3223                    peer_endpoint = %endpoint_id_hex,
3224                    error = %e,
3225                    "Failed to connect to peer"
3226                );
3227                Err(Error::Network {
3228                    message: format!("Failed to connect to peer: {}", e),
3229                    peer_id: Some(endpoint_id_hex.to_string()),
3230                    source: None,
3231                }
3232                .into())
3233            }
3234        }
3235    }
3236}
3237
3238// DataSyncBackend implementation
3239#[async_trait]
3240impl DataSyncBackend for AutomergeIrohBackend {
3241    async fn initialize(&self, config: BackendConfig) -> anyhow::Result<()> {
3242        // Require shared_key for peer authentication
3243        let shared_key = config.shared_key.as_ref().ok_or_else(|| {
3244            Error::config_error(
3245                "AutomergeIroh backend requires PEAT_SECRET_KEY for peer authentication",
3246                Some("shared_key".to_string()),
3247            )
3248        })?;
3249
3250        // Create FormationKey from app_id (formation_id) and shared_key
3251        // This ensures only peers with matching credentials can sync
3252        let formation_key = crate::security::FormationKey::from_base64(&config.app_id, shared_key)
3253            .map_err(|e| {
3254                Error::config_error(
3255                    format!(
3256                        "Invalid shared_key format: {}. Expected base64-encoded 32-byte key.",
3257                        e
3258                    ),
3259                    Some("shared_key".to_string()),
3260                )
3261            })?;
3262
3263        // Store the formation key for peer authentication
3264        *self
3265            .formation_key
3266            .write()
3267            .map_err(|_| Error::Internal("formation_key lock poisoned".into()))? =
3268            Some(formation_key);
3269
3270        *self
3271            .initialized
3272            .lock()
3273            .map_err(|_| Error::Internal("initialized lock poisoned".into()))? = true;
3274        self.peer_discovery().start().await?;
3275        Ok(())
3276    }
3277
3278    async fn shutdown(&self) -> anyhow::Result<()> {
3279        if self.is_ready().await {
3280            let _ = self.sync_engine().stop_sync().await;
3281            let _ = self.peer_discovery().stop().await;
3282        }
3283        *self
3284            .initialized
3285            .lock()
3286            .map_err(|_| Error::Internal("initialized lock poisoned".into()))? = false;
3287        Ok(())
3288    }
3289
3290    fn document_store(&self) -> Arc<dyn DocumentStore> {
3291        // Derive node ID from transport endpoint (unique per node)
3292        let node_id = self.transport.endpoint_id().to_string();
3293        Arc::new(IrohDocumentStore {
3294            backend: Arc::clone(&self.backend),
3295            deletion_policy_registry: Arc::clone(&self.deletion_policy_registry),
3296            lamport_counter: Arc::clone(&self.lamport_counter),
3297            node_id,
3298        })
3299    }
3300
3301    fn peer_discovery(&self) -> Arc<dyn PeerDiscovery> {
3302        Arc::new(IrohPeerDiscovery {
3303            transport: Arc::clone(&self.transport),
3304            peer_callbacks: Arc::clone(&self.peer_callbacks),
3305            #[cfg(feature = "automerge-backend")]
3306            discovery_manager: Arc::clone(&self.discovery_manager),
3307            #[cfg(feature = "automerge-backend")]
3308            formation_key: Arc::clone(&self.formation_key),
3309            event_forwarder_running: Arc::new(std::sync::atomic::AtomicBool::new(false)),
3310            #[cfg(feature = "automerge-backend")]
3311            topology_event_rx: Arc::clone(&self.topology_event_rx),
3312            #[cfg(feature = "automerge-backend")]
3313            max_connections: self.max_connections,
3314        })
3315    }
3316
3317    fn sync_engine(&self) -> Arc<dyn SyncEngine> {
3318        Arc::new(IrohSyncEngine {
3319            backend: Arc::clone(&self.backend),
3320            transport: Arc::clone(&self.transport),
3321            formation_key: self.formation_key(),
3322        })
3323    }
3324
3325    async fn is_ready(&self) -> bool {
3326        self.initialized.lock().map(|guard| *guard).unwrap_or(false)
3327    }
3328
3329    fn backend_info(&self) -> BackendInfo {
3330        BackendInfo {
3331            name: "AutomergeIroh".to_string(),
3332            version: env!("CARGO_PKG_VERSION").to_string(),
3333        }
3334    }
3335
3336    fn as_any(&self) -> &dyn std::any::Any {
3337        self
3338    }
3339}
3340
3341// Implement HierarchicalStorageCapable for AutomergeIrohBackend
3342// This enables peat-sim hierarchical mode with Automerge backend
3343#[cfg(feature = "automerge-backend")]
3344impl crate::storage::HierarchicalStorageCapable for AutomergeIrohBackend {
3345    fn summary_storage(&self) -> Arc<dyn crate::hierarchy::SummaryStorage> {
3346        // Delegate to the underlying storage::AutomergeBackend
3347        crate::storage::HierarchicalStorageCapable::summary_storage(self.backend.as_ref())
3348    }
3349
3350    fn command_storage(&self) -> Arc<dyn crate::command::CommandStorage> {
3351        // Delegate to the underlying storage::AutomergeBackend
3352        crate::storage::HierarchicalStorageCapable::command_storage(self.backend.as_ref())
3353    }
3354}
3355
3356// ============================================================================
3357// Custom Query Parser (Issue #517, #520)
3358// ============================================================================
3359//
3360// This module provides a simple pattern-based query evaluator for DQL-like
3361// custom queries. Instead of implementing a full SQL parser, we handle the
3362// specific patterns used in Peat Protocol.
3363//
3364// Supported patterns (Issue #517 - original):
3365// - `field == 'value'` / `field == true/false`
3366// - `field STARTS WITH 'prefix'`
3367// - `field ENDS WITH 'suffix'`
3368// - `CONTAINS(field, 'value')`
3369// - `A AND B` / `A OR B` (compound expressions)
3370//
3371// Extended patterns (Issue #520 - full syntactic parity):
3372// - `field != 'value'` / `field != true/false` (inequality)
3373// - `field LIKE '%pattern%'` (wildcard matching)
3374// - `field IN ['a', 'b', 'c']` (set membership)
3375// - `field.nested.path` (nested field access)
3376// - `NOT (expr)` (negation wrapper)
3377// - `field IS NULL` / `field IS NOT NULL` (null checks)
3378//
3379// For unrecognized patterns, we return `true` (match all) as a conservative
3380// fallback - this ensures we never hide documents that should match.
3381// ============================================================================
3382
3383/// Evaluate a custom DQL-like query string against a document.
3384///
3385/// This is a pattern-based evaluator that handles the specific query patterns
3386/// used in Peat Protocol. For unrecognized patterns, returns `true` (conservative).
3387///
3388/// # Arguments
3389/// * `doc` - The document to match against
3390/// * `query_str` - The DQL-like query string (e.g., "collection_name == 'squad_summaries'")
3391///
3392/// # Returns
3393/// * `true` if the document matches the query (or query is unrecognized)
3394/// * `false` if the document definitely doesn't match
3395fn evaluate_custom_query(doc: &Document, query_str: &str) -> bool {
3396    let trimmed = query_str.trim();
3397
3398    // Handle compound OR expressions (lowest precedence)
3399    // Split on " OR " but be careful not to split inside quotes
3400    if let Some((left, right)) = split_compound(trimmed, " OR ") {
3401        return evaluate_custom_query(doc, left) || evaluate_custom_query(doc, right);
3402    }
3403
3404    // Handle compound AND expressions
3405    if let Some((left, right)) = split_compound(trimmed, " AND ") {
3406        return evaluate_custom_query(doc, left) && evaluate_custom_query(doc, right);
3407    }
3408
3409    // Strip outer parentheses if present
3410    let expr = if trimmed.starts_with('(') && trimmed.ends_with(')') {
3411        &trimmed[1..trimmed.len() - 1]
3412    } else {
3413        trimmed
3414    };
3415
3416    // Pattern: NOT (expr) - negation wrapper (Issue #520)
3417    if let Some(inner) = parse_not_expression(expr) {
3418        return !evaluate_custom_query(doc, inner);
3419    }
3420
3421    // Pattern: CONTAINS(field, 'value')
3422    if expr.starts_with("CONTAINS(") && expr.ends_with(')') {
3423        return evaluate_contains(doc, expr);
3424    }
3425
3426    // Pattern: field IS NULL / field IS NOT NULL (Issue #520)
3427    if let Some((field, is_null)) = parse_is_null(expr) {
3428        return evaluate_is_null(doc, field, is_null);
3429    }
3430
3431    // Pattern: field != 'value' or field != true/false (Issue #520)
3432    // Must check before equality since != contains =
3433    if let Some((field, value)) = parse_inequality(expr) {
3434        return !evaluate_equality(doc, field, value);
3435    }
3436
3437    // Pattern: field == 'value' or field == true/false
3438    if let Some((field, value)) = parse_equality(expr) {
3439        return evaluate_equality(doc, field, value);
3440    }
3441
3442    // Pattern: field LIKE '%pattern%' (Issue #520)
3443    if let Some((field, pattern)) = parse_like(expr) {
3444        return evaluate_like(doc, field, pattern);
3445    }
3446
3447    // Pattern: field IN ['a', 'b', 'c'] (Issue #520)
3448    if let Some((field, values)) = parse_in(expr) {
3449        return evaluate_in(doc, field, &values);
3450    }
3451
3452    // Pattern: field STARTS WITH 'prefix'
3453    if let Some((field, prefix)) = parse_starts_with(expr) {
3454        return evaluate_starts_with(doc, field, prefix);
3455    }
3456
3457    // Pattern: field ENDS WITH 'suffix'
3458    if let Some((field, suffix)) = parse_ends_with(expr) {
3459        return evaluate_ends_with(doc, field, suffix);
3460    }
3461
3462    // Unrecognized pattern: return true (conservative fallback)
3463    // This ensures we never hide documents that should match
3464    true
3465}
3466
3467/// Split a compound expression on a delimiter, respecting parentheses and quotes.
3468fn split_compound<'a>(expr: &'a str, delimiter: &str) -> Option<(&'a str, &'a str)> {
3469    let mut depth = 0;
3470    let mut in_quote = false;
3471    let bytes = expr.as_bytes();
3472
3473    for i in 0..expr.len() {
3474        match bytes[i] {
3475            b'\'' => in_quote = !in_quote,
3476            b'(' if !in_quote => depth += 1,
3477            b')' if !in_quote => depth -= 1,
3478            _ if !in_quote && depth == 0 && expr[i..].starts_with(delimiter) => {
3479                return Some((&expr[..i], &expr[i + delimiter.len()..]));
3480            }
3481            _ => {}
3482        }
3483    }
3484    None
3485}
3486
3487/// Parse equality expression: `field == 'value'` or `field == true/false`
3488fn parse_equality(expr: &str) -> Option<(&str, &str)> {
3489    let parts: Vec<&str> = expr.splitn(2, "==").collect();
3490    if parts.len() == 2 {
3491        let field = parts[0].trim();
3492        let value = parts[1].trim();
3493        Some((field, value))
3494    } else {
3495        None
3496    }
3497}
3498
3499/// Parse STARTS WITH expression: `field STARTS WITH 'prefix'`
3500fn parse_starts_with(expr: &str) -> Option<(&str, &str)> {
3501    let upper = expr.to_uppercase();
3502    if let Some(idx) = upper.find(" STARTS WITH ") {
3503        let field = expr[..idx].trim();
3504        let value = expr[idx + 13..].trim(); // " STARTS WITH " is 13 chars
3505        Some((field, value))
3506    } else {
3507        None
3508    }
3509}
3510
3511/// Parse ENDS WITH expression: `field ENDS WITH 'suffix'`
3512fn parse_ends_with(expr: &str) -> Option<(&str, &str)> {
3513    let upper = expr.to_uppercase();
3514    if let Some(idx) = upper.find(" ENDS WITH ") {
3515        let field = expr[..idx].trim();
3516        let value = expr[idx + 11..].trim(); // " ENDS WITH " is 11 chars
3517        Some((field, value))
3518    } else {
3519        None
3520    }
3521}
3522
3523/// Extract string literal from quoted value: 'value' -> value
3524fn extract_string_literal(value: &str) -> Option<&str> {
3525    let trimmed = value.trim();
3526    if trimmed.starts_with('\'') && trimmed.ends_with('\'') && trimmed.len() >= 2 {
3527        Some(&trimmed[1..trimmed.len() - 1])
3528    } else {
3529        None
3530    }
3531}
3532
3533/// Evaluate CONTAINS(field, 'value') pattern
3534fn evaluate_contains(doc: &Document, expr: &str) -> bool {
3535    // Parse CONTAINS(field, 'value')
3536    let inner = &expr[9..expr.len() - 1]; // Strip "CONTAINS(" and ")"
3537    let parts: Vec<&str> = inner.splitn(2, ',').collect();
3538
3539    if parts.len() != 2 {
3540        return true; // Conservative fallback
3541    }
3542
3543    let field = parts[0].trim();
3544    let value = parts[1].trim();
3545
3546    let search_value = match extract_string_literal(value) {
3547        Some(v) => v,
3548        None => return true, // Conservative fallback
3549    };
3550
3551    // Check if field value contains the search value
3552    match doc.get(field) {
3553        Some(serde_json::Value::Array(arr)) => arr.iter().any(|item| {
3554            if let Some(s) = item.as_str() {
3555                s == search_value
3556            } else {
3557                false
3558            }
3559        }),
3560        Some(serde_json::Value::String(s)) => s.contains(search_value),
3561        _ => false,
3562    }
3563}
3564
3565/// Evaluate field == value pattern
3566/// Supports nested field access via get_nested_field (Issue #520)
3567fn evaluate_equality(doc: &Document, field: &str, value: &str) -> bool {
3568    // Handle boolean values
3569    if value == "true" {
3570        return get_nested_field(doc, field)
3571            .and_then(|v| v.as_bool())
3572            .unwrap_or(false);
3573    }
3574    if value == "false" {
3575        return !get_nested_field(doc, field)
3576            .and_then(|v| v.as_bool())
3577            .unwrap_or(true);
3578    }
3579
3580    // Handle string literals
3581    if let Some(string_value) = extract_string_literal(value) {
3582        return match get_nested_field(doc, field) {
3583            Some(serde_json::Value::String(s)) => s == string_value,
3584            _ => false,
3585        };
3586    }
3587
3588    // Handle numeric values (try to parse as number)
3589    if let Ok(num) = value.parse::<i64>() {
3590        return match get_nested_field(doc, field) {
3591            Some(serde_json::Value::Number(n)) => n.as_i64() == Some(num),
3592            _ => false,
3593        };
3594    }
3595    if let Ok(num) = value.parse::<f64>() {
3596        return match get_nested_field(doc, field) {
3597            Some(serde_json::Value::Number(n)) => n
3598                .as_f64()
3599                .map(|f| (f - num).abs() < f64::EPSILON)
3600                .unwrap_or(false),
3601            _ => false,
3602        };
3603    }
3604
3605    // Unknown value format, conservative fallback
3606    true
3607}
3608
3609/// Evaluate field STARTS WITH 'prefix' pattern
3610fn evaluate_starts_with(doc: &Document, field: &str, value: &str) -> bool {
3611    let prefix = match extract_string_literal(value) {
3612        Some(v) => v,
3613        None => return true, // Conservative fallback
3614    };
3615
3616    match doc.get(field) {
3617        Some(serde_json::Value::String(s)) => s.starts_with(prefix),
3618        _ => false,
3619    }
3620}
3621
3622/// Evaluate field ENDS WITH 'suffix' pattern
3623fn evaluate_ends_with(doc: &Document, field: &str, value: &str) -> bool {
3624    let suffix = match extract_string_literal(value) {
3625        Some(v) => v,
3626        None => return true, // Conservative fallback
3627    };
3628
3629    match doc.get(field) {
3630        Some(serde_json::Value::String(s)) => s.ends_with(suffix),
3631        _ => false,
3632    }
3633}
3634
3635// ============================================================================
3636// Issue #520: Extended DQL patterns for full syntactic parity
3637// ============================================================================
3638
3639/// Parse inequality expression: `field != 'value'` or `field != true/false`
3640fn parse_inequality(expr: &str) -> Option<(&str, &str)> {
3641    // Look for != operator
3642    if let Some(idx) = expr.find("!=") {
3643        let field = expr[..idx].trim();
3644        let value = expr[idx + 2..].trim();
3645        // Make sure this isn't part of == (shouldn't happen, but be safe)
3646        if !field.is_empty() && !value.is_empty() {
3647            return Some((field, value));
3648        }
3649    }
3650    None
3651}
3652
3653/// Parse NOT expression: `NOT (expr)` or `NOT expr`
3654fn parse_not_expression(expr: &str) -> Option<&str> {
3655    let upper = expr.to_uppercase();
3656    if upper.starts_with("NOT ") {
3657        let rest = expr[4..].trim();
3658        // If wrapped in parens, strip them
3659        if rest.starts_with('(') && rest.ends_with(')') {
3660            Some(&rest[1..rest.len() - 1])
3661        } else {
3662            Some(rest)
3663        }
3664    } else {
3665        None
3666    }
3667}
3668
3669/// Parse IS NULL / IS NOT NULL expression
3670fn parse_is_null(expr: &str) -> Option<(&str, bool)> {
3671    let upper = expr.to_uppercase();
3672    if let Some(idx) = upper.find(" IS NOT NULL") {
3673        let field = expr[..idx].trim();
3674        return Some((field, false)); // is_null = false means IS NOT NULL
3675    }
3676    if let Some(idx) = upper.find(" IS NULL") {
3677        let field = expr[..idx].trim();
3678        return Some((field, true)); // is_null = true means IS NULL
3679    }
3680    None
3681}
3682
3683/// Evaluate IS NULL / IS NOT NULL pattern
3684fn evaluate_is_null(doc: &Document, field: &str, is_null: bool) -> bool {
3685    let field_value = get_nested_field(doc, field);
3686    let value_is_null = field_value.is_none() || field_value == Some(&serde_json::Value::Null);
3687    if is_null {
3688        value_is_null
3689    } else {
3690        !value_is_null
3691    }
3692}
3693
3694/// Parse LIKE expression: `field LIKE '%pattern%'`
3695fn parse_like(expr: &str) -> Option<(&str, &str)> {
3696    let upper = expr.to_uppercase();
3697    if let Some(idx) = upper.find(" LIKE ") {
3698        let field = expr[..idx].trim();
3699        let pattern = expr[idx + 6..].trim(); // " LIKE " is 6 chars
3700        Some((field, pattern))
3701    } else {
3702        None
3703    }
3704}
3705
3706/// Evaluate LIKE pattern with % wildcards
3707fn evaluate_like(doc: &Document, field: &str, pattern: &str) -> bool {
3708    let pattern_str = match extract_string_literal(pattern) {
3709        Some(v) => v,
3710        None => return true, // Conservative fallback
3711    };
3712
3713    let field_value = match get_nested_field(doc, field) {
3714        Some(serde_json::Value::String(s)) => s.as_str(),
3715        _ => return false,
3716    };
3717
3718    // Convert SQL LIKE pattern to simple matching
3719    // % matches any sequence of characters
3720    // _ matches any single character (not implemented for simplicity)
3721    match_like_pattern(field_value, pattern_str)
3722}
3723
3724/// Match a value against a SQL LIKE pattern with % wildcards
3725fn match_like_pattern(value: &str, pattern: &str) -> bool {
3726    // Split pattern by % and match segments
3727    let segments: Vec<&str> = pattern.split('%').collect();
3728
3729    if segments.is_empty() {
3730        return true;
3731    }
3732
3733    // Handle patterns like '%', '%%', etc.
3734    if segments.iter().all(|s| s.is_empty()) {
3735        return true;
3736    }
3737
3738    let mut pos = 0;
3739    let starts_with_wildcard = pattern.starts_with('%');
3740    let ends_with_wildcard = pattern.ends_with('%');
3741
3742    for (i, segment) in segments.iter().enumerate() {
3743        if segment.is_empty() {
3744            continue;
3745        }
3746
3747        if i == 0 && !starts_with_wildcard {
3748            // First segment must match at start
3749            if !value.starts_with(segment) {
3750                return false;
3751            }
3752            pos = segment.len();
3753        } else if i == segments.len() - 1 && !ends_with_wildcard {
3754            // Last segment must match at end
3755            if !value.ends_with(segment) {
3756                return false;
3757            }
3758        } else {
3759            // Middle segment - find it anywhere after current position
3760            if let Some(found_pos) = value[pos..].find(segment) {
3761                pos += found_pos + segment.len();
3762            } else {
3763                return false;
3764            }
3765        }
3766    }
3767
3768    true
3769}
3770
3771/// Parse IN expression: `field IN ['a', 'b', 'c']`
3772fn parse_in(expr: &str) -> Option<(&str, Vec<String>)> {
3773    let upper = expr.to_uppercase();
3774    if let Some(idx) = upper.find(" IN ") {
3775        let field = expr[..idx].trim();
3776        let values_str = expr[idx + 4..].trim(); // " IN " is 4 chars
3777
3778        // Parse the array: ['a', 'b', 'c'] or [1, 2, 3]
3779        if values_str.starts_with('[') && values_str.ends_with(']') {
3780            let inner = &values_str[1..values_str.len() - 1];
3781            let values: Vec<String> = inner
3782                .split(',')
3783                .map(|v| {
3784                    let trimmed = v.trim();
3785                    // Extract string literal or use as-is for numbers
3786                    if let Some(s) = extract_string_literal(trimmed) {
3787                        s.to_string()
3788                    } else {
3789                        trimmed.to_string()
3790                    }
3791                })
3792                .collect();
3793            return Some((field, values));
3794        }
3795    }
3796    None
3797}
3798
3799/// Evaluate IN pattern for set membership
3800fn evaluate_in(doc: &Document, field: &str, values: &[String]) -> bool {
3801    let field_value = match get_nested_field(doc, field) {
3802        Some(v) => v,
3803        None => return false,
3804    };
3805
3806    match field_value {
3807        serde_json::Value::String(s) => values.iter().any(|v| v == s),
3808        serde_json::Value::Number(n) => {
3809            if let Some(i) = n.as_i64() {
3810                values.iter().any(|v| v.parse::<i64>().ok() == Some(i))
3811            } else if let Some(f) = n.as_f64() {
3812                values.iter().any(|v| {
3813                    v.parse::<f64>()
3814                        .ok()
3815                        .map(|vf| (vf - f).abs() < f64::EPSILON)
3816                        == Some(true)
3817                })
3818            } else {
3819                false
3820            }
3821        }
3822        serde_json::Value::Bool(b) => {
3823            let bool_str = if *b { "true" } else { "false" };
3824            values.iter().any(|v| v == bool_str)
3825        }
3826        _ => false,
3827    }
3828}
3829
3830/// Get a potentially nested field value from a document
3831/// Supports both simple fields ("name") and nested paths ("address.city")
3832fn get_nested_field<'a>(doc: &'a Document, field: &str) -> Option<&'a serde_json::Value> {
3833    if !field.contains('.') {
3834        // Simple field access
3835        return doc.get(field);
3836    }
3837
3838    // Nested field access: field.subfield.subsubfield
3839    let parts: Vec<&str> = field.split('.').collect();
3840    let mut current = doc.get(parts[0])?;
3841
3842    for part in &parts[1..] {
3843        match current {
3844            serde_json::Value::Object(obj) => {
3845                current = obj.get(*part)?;
3846            }
3847            _ => return None,
3848        }
3849    }
3850
3851    Some(current)
3852}
3853
3854// Helper function for query matching
3855fn matches_query(doc: &Document, query: &Query) -> bool {
3856    match query {
3857        Query::All => true,
3858        Query::Eq { field, value } => {
3859            // Special case for "id" field - check doc.id instead of doc.fields
3860            if field == "id" {
3861                if let Some(ref doc_id) = doc.id {
3862                    if let Some(value_str) = value.as_str() {
3863                        return doc_id == value_str;
3864                    }
3865                }
3866                return false;
3867            }
3868            doc.get(field) == Some(value)
3869        }
3870        Query::Lt { field, value } => {
3871            if let Some(doc_val) = doc.get(field) {
3872                compare_values(doc_val, value) < 0
3873            } else {
3874                false
3875            }
3876        }
3877        Query::Gt { field, value } => {
3878            if let Some(doc_val) = doc.get(field) {
3879                compare_values(doc_val, value) > 0
3880            } else {
3881                false
3882            }
3883        }
3884        Query::And(queries) => queries.iter().all(|q| matches_query(doc, q)),
3885        Query::Or(queries) => queries.iter().any(|q| matches_query(doc, q)),
3886
3887        // === Custom query support (Issue #517) ===
3888        // Evaluate DQL-like custom queries using pattern-based parser
3889        Query::Custom(query_str) => evaluate_custom_query(doc, query_str),
3890
3891        // === Spatial queries (Issue #356) ===
3892        Query::WithinRadius {
3893            center,
3894            radius_meters,
3895            lat_field,
3896            lon_field,
3897        } => {
3898            let lat_key = lat_field.as_deref().unwrap_or("lat");
3899            let lon_key = lon_field.as_deref().unwrap_or("lon");
3900
3901            if let (Some(lat_val), Some(lon_val)) = (
3902                doc.get(lat_key).and_then(|v| v.as_f64()),
3903                doc.get(lon_key).and_then(|v| v.as_f64()),
3904            ) {
3905                let doc_point = GeoPoint::new(lat_val, lon_val);
3906                doc_point.within_radius(center, *radius_meters)
3907            } else {
3908                false
3909            }
3910        }
3911
3912        Query::WithinBounds {
3913            min,
3914            max,
3915            lat_field,
3916            lon_field,
3917        } => {
3918            let lat_key = lat_field.as_deref().unwrap_or("lat");
3919            let lon_key = lon_field.as_deref().unwrap_or("lon");
3920
3921            if let (Some(lat_val), Some(lon_val)) = (
3922                doc.get(lat_key).and_then(|v| v.as_f64()),
3923                doc.get(lon_key).and_then(|v| v.as_f64()),
3924            ) {
3925                let doc_point = GeoPoint::new(lat_val, lon_val);
3926                doc_point.within_bounds(min, max)
3927            } else {
3928                false
3929            }
3930        }
3931
3932        // === Negation query (Issue #357) ===
3933        Query::Not(inner) => !matches_query(doc, inner),
3934
3935        // === Deletion-aware queries (ADR-034, Issue #369) ===
3936        Query::IncludeDeleted(inner) => {
3937            // IncludeDeleted wraps another query - run the inner query
3938            // The soft-delete filter bypass is handled at the query() method level
3939            matches_query(doc, inner)
3940        }
3941
3942        Query::DeletedOnly => {
3943            // Only match documents with _deleted=true
3944            doc.fields
3945                .get("_deleted")
3946                .and_then(|v| v.as_bool())
3947                .unwrap_or(false)
3948        }
3949    }
3950}
3951
3952fn compare_values(a: &serde_json::Value, b: &serde_json::Value) -> i32 {
3953    use serde_json::Value as V;
3954
3955    match (a, b) {
3956        (V::Number(n1), V::Number(n2)) => {
3957            if let (Some(f1), Some(f2)) = (n1.as_f64(), n2.as_f64()) {
3958                if f1 < f2 {
3959                    -1
3960                } else if f1 > f2 {
3961                    1
3962                } else {
3963                    0
3964                }
3965            } else {
3966                0
3967            }
3968        }
3969        (V::String(s1), V::String(s2)) => s1.cmp(s2) as i32,
3970        _ => 0,
3971    }
3972}
3973
3974#[cfg(test)]
3975mod tests {
3976    use super::*;
3977    use std::collections::HashMap;
3978    use std::path::PathBuf;
3979
3980    /// Helper: Create test BackendConfig with valid credentials
3981    fn test_config() -> BackendConfig {
3982        // Generate a valid test secret key (base64-encoded 32 bytes)
3983        let test_secret = crate::security::FormationKey::generate_secret();
3984        BackendConfig {
3985            app_id: "test_app".to_string(),
3986            persistence_dir: PathBuf::from("/tmp/automerge_test"),
3987            shared_key: Some(test_secret),
3988            transport: TransportConfig::default(),
3989            extra: HashMap::new(),
3990        }
3991    }
3992
3993    #[tokio::test]
3994    async fn test_automerge_backend_creation() {
3995        let backend = AutomergeBackend::new();
3996        assert!(!backend.is_ready().await);
3997    }
3998
3999    #[tokio::test]
4000    async fn test_document_upsert() {
4001        let backend = AutomergeBackend::new();
4002        backend.initialize(test_config()).await.unwrap();
4003
4004        let mut fields = HashMap::new();
4005        fields.insert("name".to_string(), serde_json::json!("test"));
4006        fields.insert("value".to_string(), serde_json::json!(42));
4007
4008        let doc = Document::new(fields);
4009        let doc_id = backend
4010            .document_store()
4011            .upsert("test_collection", doc)
4012            .await
4013            .unwrap();
4014
4015        assert!(!doc_id.is_empty());
4016    }
4017
4018    #[tokio::test]
4019    async fn test_document_query() {
4020        let backend = AutomergeBackend::new();
4021        backend.initialize(test_config()).await.unwrap();
4022
4023        // Insert test document
4024        let mut fields = HashMap::new();
4025        fields.insert("status".to_string(), serde_json::json!("active"));
4026        let doc = Document::new(fields);
4027        backend
4028            .document_store()
4029            .upsert("test_collection", doc)
4030            .await
4031            .unwrap();
4032
4033        // Query
4034        let query = Query::Eq {
4035            field: "status".to_string(),
4036            value: serde_json::json!("active"),
4037        };
4038
4039        let results = backend
4040            .document_store()
4041            .query("test_collection", &query)
4042            .await
4043            .unwrap();
4044
4045        assert_eq!(results.len(), 1);
4046    }
4047
4048    #[tokio::test]
4049    async fn test_document_get() {
4050        let backend = AutomergeBackend::new();
4051        backend.initialize(test_config()).await.unwrap();
4052
4053        // Insert document
4054        let mut fields = HashMap::new();
4055        fields.insert("data".to_string(), serde_json::json!("test_value"));
4056        let doc = Document::new(fields);
4057        let doc_id = backend
4058            .document_store()
4059            .upsert("test_coll", doc)
4060            .await
4061            .unwrap();
4062
4063        // Get document
4064        let retrieved = backend
4065            .document_store()
4066            .get("test_coll", &doc_id)
4067            .await
4068            .unwrap();
4069
4070        assert!(retrieved.is_some());
4071        let retrieved_doc = retrieved.unwrap();
4072        assert_eq!(
4073            retrieved_doc.fields.get("data").unwrap(),
4074            &serde_json::json!("test_value")
4075        );
4076    }
4077
4078    #[tokio::test]
4079    async fn test_document_remove() {
4080        let backend = AutomergeBackend::new();
4081        backend.initialize(test_config()).await.unwrap();
4082
4083        // Insert document
4084        let mut fields = HashMap::new();
4085        fields.insert("temp".to_string(), serde_json::json!(true));
4086        let doc = Document::new(fields);
4087        let doc_id = backend
4088            .document_store()
4089            .upsert("temp_coll", doc)
4090            .await
4091            .unwrap();
4092
4093        // Remove document
4094        backend
4095            .document_store()
4096            .remove("temp_coll", &doc_id)
4097            .await
4098            .unwrap();
4099
4100        // Verify removed
4101        let retrieved = backend
4102            .document_store()
4103            .get("temp_coll", &doc_id)
4104            .await
4105            .unwrap();
4106
4107        assert!(retrieved.is_none());
4108    }
4109}
4110
4111/// Tests for AutomergeIrohBackend credential requirements
4112#[cfg(all(test, feature = "automerge-backend"))]
4113mod iroh_credential_tests {
4114    use super::*;
4115    use std::collections::HashMap;
4116    use std::path::PathBuf;
4117
4118    /// Test that AutomergeIrohBackend initialization fails without shared_key
4119    #[tokio::test]
4120    async fn test_automerge_iroh_requires_credentials() {
4121        // Create backend components
4122        let temp_dir = tempfile::tempdir().unwrap();
4123        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4124        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4125
4126        let backend = AutomergeIrohBackend::from_parts(store, transport);
4127
4128        // Config without shared_key should fail
4129        let config = BackendConfig {
4130            app_id: "test_app".to_string(),
4131            persistence_dir: PathBuf::from("/tmp/test"),
4132            shared_key: None, // Missing!
4133            transport: TransportConfig::default(),
4134            extra: HashMap::new(),
4135        };
4136
4137        let result = backend.initialize(config).await;
4138        assert!(result.is_err());
4139
4140        let error = result.unwrap_err();
4141        let error_msg = error.to_string();
4142        assert!(
4143            error_msg.contains("PEAT_SECRET_KEY") || error_msg.contains("shared_key"),
4144            "Error should mention missing credentials: {}",
4145            error_msg
4146        );
4147    }
4148
4149    /// Test that AutomergeIrohBackend initializes successfully with valid credentials
4150    #[tokio::test]
4151    async fn test_automerge_iroh_with_valid_credentials() {
4152        // Create backend components
4153        let temp_dir = tempfile::tempdir().unwrap();
4154        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4155        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4156
4157        let backend = AutomergeIrohBackend::from_parts(store, transport);
4158
4159        // Generate valid test credentials
4160        let test_secret = crate::security::FormationKey::generate_secret();
4161
4162        let config = BackendConfig {
4163            app_id: "test_formation".to_string(),
4164            persistence_dir: temp_dir.path().to_path_buf(),
4165            shared_key: Some(test_secret),
4166            transport: TransportConfig::default(),
4167            extra: HashMap::new(),
4168        };
4169
4170        let result = backend.initialize(config).await;
4171        assert!(result.is_ok(), "Should initialize with valid credentials");
4172
4173        // Verify formation key was stored
4174        let formation_key = backend.formation_key();
4175        assert!(
4176            formation_key.is_some(),
4177            "Formation key should be set after initialization"
4178        );
4179        assert_eq!(
4180            formation_key.unwrap().formation_id(),
4181            "test_formation",
4182            "Formation ID should match app_id"
4183        );
4184    }
4185
4186    /// Test that invalid shared_key format is rejected
4187    #[tokio::test]
4188    async fn test_automerge_iroh_rejects_invalid_key_format() {
4189        // Create backend components
4190        let temp_dir = tempfile::tempdir().unwrap();
4191        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4192        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4193
4194        let backend = AutomergeIrohBackend::from_parts(store, transport);
4195
4196        // Invalid base64 key
4197        let config = BackendConfig {
4198            app_id: "test_app".to_string(),
4199            persistence_dir: PathBuf::from("/tmp/test"),
4200            shared_key: Some("not-valid-base64!!!".to_string()),
4201            transport: TransportConfig::default(),
4202            extra: HashMap::new(),
4203        };
4204
4205        let result = backend.initialize(config).await;
4206        assert!(result.is_err(), "Should reject invalid base64 key");
4207
4208        let error_msg = result.unwrap_err().to_string();
4209        assert!(
4210            error_msg.contains("Invalid shared_key format"),
4211            "Error should mention invalid format: {}",
4212            error_msg
4213        );
4214    }
4215}
4216
4217/// Tests for Issue #271: Verify Clone correctly shares transport instance
4218#[cfg(all(test, feature = "automerge-backend"))]
4219mod issue_271_clone_tests {
4220    use super::*;
4221
4222    /// Test that cloning AutomergeIrohBackend shares the same transport Arc
4223    ///
4224    /// Issue #271: When cloning AutomergeIrohBackend, the transport should be
4225    /// shared (same Arc pointer), not duplicated. This ensures connections
4226    /// accumulate correctly across all references to the backend.
4227    #[tokio::test]
4228    async fn test_clone_shares_transport_arc() {
4229        // Create backend components
4230        let temp_dir = tempfile::tempdir().unwrap();
4231        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4232        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4233
4234        let original = AutomergeIrohBackend::from_parts(store, Arc::clone(&transport));
4235        let cloned = original.clone();
4236
4237        // Verify transport Arc is shared (same pointer)
4238        let original_transport_ptr = Arc::as_ptr(&original.transport());
4239        let cloned_transport_ptr = Arc::as_ptr(&cloned.transport());
4240
4241        assert_eq!(
4242            original_transport_ptr, cloned_transport_ptr,
4243            "Clone should share the same transport Arc, but got different pointers:\n  Original: {:?}\n  Clone: {:?}",
4244            original_transport_ptr, cloned_transport_ptr
4245        );
4246
4247        // Verify both point to the same transport as the original Arc
4248        let source_transport_ptr = Arc::as_ptr(&transport);
4249        assert_eq!(
4250            original_transport_ptr, source_transport_ptr,
4251            "Original backend transport should be same as source transport Arc"
4252        );
4253    }
4254
4255    /// Test that cloning AutomergeIrohBackend shares the same backend Arc
4256    #[tokio::test]
4257    async fn test_clone_shares_backend_arc() {
4258        let temp_dir = tempfile::tempdir().unwrap();
4259        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4260        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4261
4262        let original = AutomergeIrohBackend::from_parts(store, transport);
4263        let cloned = original.clone();
4264
4265        // Verify backend Arc is shared (same pointer)
4266        // We need to access the internal backend field - using a helper method
4267        // Since backend is private, we verify via behavior: both should see same endpoint_id
4268        assert_eq!(
4269            original.endpoint_id(),
4270            cloned.endpoint_id(),
4271            "Clone should have same endpoint_id as original"
4272        );
4273    }
4274
4275    /// Test that transport peer_count is consistent across clone
4276    ///
4277    /// This verifies that if connections are managed via one reference,
4278    /// they are visible via the clone (because they share the same transport).
4279    #[tokio::test]
4280    async fn test_clone_shares_peer_count() {
4281        let temp_dir = tempfile::tempdir().unwrap();
4282        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4283        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4284
4285        let original = AutomergeIrohBackend::from_parts(store, Arc::clone(&transport));
4286        let cloned = original.clone();
4287
4288        // Both should report the same peer count (0 initially)
4289        let original_count = original.transport().peer_count();
4290        let cloned_count = cloned.transport().peer_count();
4291
4292        assert_eq!(
4293            original_count, cloned_count,
4294            "Original and clone should report same peer_count"
4295        );
4296        assert_eq!(original_count, 0, "Initial peer count should be 0");
4297
4298        // Verify via source transport as well
4299        assert_eq!(
4300            transport.peer_count(),
4301            original_count,
4302            "Source transport should have same count"
4303        );
4304    }
4305
4306    /// Test that formation_key is shared across clone
4307    #[tokio::test]
4308    async fn test_clone_shares_formation_key() {
4309        let temp_dir = tempfile::tempdir().unwrap();
4310        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4311        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4312
4313        let original = AutomergeIrohBackend::from_parts(store, transport);
4314
4315        // Initialize with credentials
4316        let test_secret = crate::security::FormationKey::generate_secret();
4317        let config = BackendConfig {
4318            app_id: "test_formation".to_string(),
4319            persistence_dir: temp_dir.path().to_path_buf(),
4320            shared_key: Some(test_secret),
4321            transport: TransportConfig::default(),
4322            extra: std::collections::HashMap::new(),
4323        };
4324        original.initialize(config).await.unwrap();
4325
4326        // Clone after initialization
4327        let cloned = original.clone();
4328
4329        // Both should see the formation key
4330        let original_key = original.formation_key();
4331        let cloned_key = cloned.formation_key();
4332
4333        assert!(original_key.is_some(), "Original should have formation key");
4334        assert!(cloned_key.is_some(), "Clone should have formation key");
4335        assert_eq!(
4336            original_key.as_ref().map(|k| k.formation_id()),
4337            cloned_key.as_ref().map(|k| k.formation_id()),
4338            "Clone should share same formation key"
4339        );
4340    }
4341
4342    /// Test that initialized state is shared across clone
4343    #[tokio::test]
4344    async fn test_clone_shares_initialized_state() {
4345        let temp_dir = tempfile::tempdir().unwrap();
4346        let store = Arc::new(crate::storage::AutomergeStore::open(temp_dir.path()).unwrap());
4347        let transport = Arc::new(crate::network::IrohTransport::new().await.unwrap());
4348
4349        let original = AutomergeIrohBackend::from_parts(store, transport);
4350
4351        // Before initialization
4352        let cloned_before = original.clone();
4353        assert!(
4354            !original.is_ready().await,
4355            "Original should not be ready before init"
4356        );
4357        assert!(
4358            !cloned_before.is_ready().await,
4359            "Clone should not be ready before init"
4360        );
4361
4362        // Initialize original
4363        let test_secret = crate::security::FormationKey::generate_secret();
4364        let config = BackendConfig {
4365            app_id: "test_formation".to_string(),
4366            persistence_dir: temp_dir.path().to_path_buf(),
4367            shared_key: Some(test_secret),
4368            transport: TransportConfig::default(),
4369            extra: std::collections::HashMap::new(),
4370        };
4371        original.initialize(config).await.unwrap();
4372
4373        // Clone created before init should NOW see it as ready
4374        // (because initialized flag is in shared Arc<Mutex<bool>>)
4375        assert!(
4376            original.is_ready().await,
4377            "Original should be ready after init"
4378        );
4379        assert!(
4380            cloned_before.is_ready().await,
4381            "Clone (created before init) should also be ready, proving Arc is shared"
4382        );
4383    }
4384
4385    // === Deletion Tests (ADR-034) ===
4386
4387    fn deletion_test_config() -> BackendConfig {
4388        let test_secret = crate::security::FormationKey::generate_secret();
4389        BackendConfig {
4390            app_id: "deletion_test".to_string(),
4391            persistence_dir: std::path::PathBuf::from("/tmp/deletion_test"),
4392            shared_key: Some(test_secret),
4393            transport: TransportConfig::default(),
4394            extra: HashMap::new(),
4395        }
4396    }
4397
4398    #[tokio::test]
4399    async fn test_soft_delete() {
4400        let backend = AutomergeBackend::new();
4401        backend.initialize(deletion_test_config()).await.unwrap();
4402
4403        // Insert document
4404        let mut fields = HashMap::new();
4405        fields.insert("data".to_string(), serde_json::json!("test_value"));
4406        let doc = Document::new(fields);
4407        let doc_id = backend
4408            .document_store()
4409            .upsert("test_collection", doc)
4410            .await
4411            .unwrap();
4412
4413        // Verify document exists
4414        let retrieved = backend
4415            .document_store()
4416            .get("test_collection", &doc_id)
4417            .await
4418            .unwrap();
4419        assert!(retrieved.is_some());
4420        assert!(!backend
4421            .document_store()
4422            .is_deleted("test_collection", &doc_id)
4423            .await
4424            .unwrap());
4425
4426        // Delete (default policy is SoftDelete)
4427        let result = backend
4428            .document_store()
4429            .delete("test_collection", &doc_id, Some("test deletion"))
4430            .await
4431            .unwrap();
4432        assert!(result.deleted);
4433
4434        // Document should now be marked as deleted
4435        assert!(backend
4436            .document_store()
4437            .is_deleted("test_collection", &doc_id)
4438            .await
4439            .unwrap());
4440
4441        // Document should still exist (soft delete preserves it)
4442        let deleted_doc = backend
4443            .document_store()
4444            .get("test_collection", &doc_id)
4445            .await
4446            .unwrap();
4447        assert!(deleted_doc.is_some());
4448        let deleted_doc = deleted_doc.unwrap();
4449        assert_eq!(
4450            deleted_doc.fields.get("_deleted"),
4451            Some(&serde_json::json!(true))
4452        );
4453        assert!(deleted_doc.fields.contains_key("_deleted_at"));
4454        assert_eq!(
4455            deleted_doc.fields.get("_deleted_reason"),
4456            Some(&serde_json::json!("test deletion"))
4457        );
4458    }
4459
4460    #[tokio::test]
4461    async fn test_tombstone_delete() {
4462        let backend = AutomergeBackend::new();
4463        backend.initialize(deletion_test_config()).await.unwrap();
4464
4465        // Configure tombstone policy for this collection
4466        backend.deletion_policy_registry.set(
4467            "tombstone_collection",
4468            crate::qos::DeletionPolicy::Tombstone {
4469                tombstone_ttl: std::time::Duration::from_secs(3600),
4470                delete_wins: true,
4471            },
4472        );
4473
4474        // Insert document
4475        let mut fields = HashMap::new();
4476        fields.insert("data".to_string(), serde_json::json!("tombstone_test"));
4477        let doc = Document::new(fields);
4478        let doc_id = backend
4479            .document_store()
4480            .upsert("tombstone_collection", doc)
4481            .await
4482            .unwrap();
4483
4484        // Delete with tombstone policy
4485        let result = backend
4486            .document_store()
4487            .delete("tombstone_collection", &doc_id, Some("removed"))
4488            .await
4489            .unwrap();
4490        assert!(result.deleted);
4491        assert!(result.tombstone_id.is_some());
4492        assert!(result.expires_at.is_some());
4493
4494        // Document should be deleted
4495        assert!(backend
4496            .document_store()
4497            .is_deleted("tombstone_collection", &doc_id)
4498            .await
4499            .unwrap());
4500
4501        // Document should be removed (not just marked)
4502        let removed_doc = backend
4503            .document_store()
4504            .get("tombstone_collection", &doc_id)
4505            .await
4506            .unwrap();
4507        assert!(removed_doc.is_none());
4508
4509        // Tombstone should exist with real node ID and lamport (Issue #668)
4510        let tombstones = backend
4511            .document_store()
4512            .get_tombstones("tombstone_collection")
4513            .await
4514            .unwrap();
4515        assert_eq!(tombstones.len(), 1);
4516        assert_eq!(tombstones[0].document_id, doc_id);
4517        assert_eq!(tombstones[0].reason, Some("removed".to_string()));
4518        // After initialize(), node_id should be app_id from config
4519        assert_eq!(tombstones[0].deleted_by, "deletion_test");
4520        // First delete should get lamport=0
4521        assert_eq!(tombstones[0].lamport, 0);
4522    }
4523
4524    #[tokio::test]
4525    async fn test_deletion_policy() {
4526        let backend = AutomergeBackend::new();
4527
4528        // Default policy is SoftDelete
4529        let policy = backend
4530            .document_store()
4531            .deletion_policy("unknown_collection");
4532        assert!(matches!(
4533            policy,
4534            crate::qos::DeletionPolicy::SoftDelete { .. }
4535        ));
4536
4537        // Verify default policies for known collections
4538        assert!(matches!(
4539            backend.document_store().deletion_policy("beacons"),
4540            crate::qos::DeletionPolicy::ImplicitTTL { .. }
4541        ));
4542        assert!(matches!(
4543            backend.document_store().deletion_policy("nodes"),
4544            crate::qos::DeletionPolicy::Tombstone { .. }
4545        ));
4546        assert!(matches!(
4547            backend.document_store().deletion_policy("contact_reports"),
4548            crate::qos::DeletionPolicy::SoftDelete { .. }
4549        ));
4550    }
4551
4552    #[tokio::test]
4553    async fn test_apply_tombstone() {
4554        let backend = AutomergeBackend::new();
4555        backend.initialize(deletion_test_config()).await.unwrap();
4556
4557        // Insert document
4558        let mut fields = HashMap::new();
4559        fields.insert("data".to_string(), serde_json::json!("to_be_deleted"));
4560        let doc = Document::new(fields);
4561        let doc_id = backend
4562            .document_store()
4563            .upsert("sync_test", doc)
4564            .await
4565            .unwrap();
4566
4567        // Create a tombstone (simulating receiving from sync)
4568        let tombstone = crate::qos::Tombstone::with_reason(
4569            doc_id.clone(),
4570            "sync_test".to_string(),
4571            "remote_node".to_string(),
4572            1, // Lamport timestamp
4573            "synced deletion",
4574        );
4575
4576        // Apply tombstone
4577        backend
4578            .document_store()
4579            .apply_tombstone(&tombstone)
4580            .await
4581            .unwrap();
4582
4583        // Document should be deleted
4584        assert!(backend
4585            .document_store()
4586            .is_deleted("sync_test", &doc_id)
4587            .await
4588            .unwrap());
4589
4590        // Document should be removed
4591        let removed_doc = backend
4592            .document_store()
4593            .get("sync_test", &doc_id)
4594            .await
4595            .unwrap();
4596        assert!(removed_doc.is_none());
4597    }
4598
4599    // === Issue #668: Lamport, Node ID, Pending Tombstones, GC Tests ===
4600
4601    #[tokio::test]
4602    async fn test_delete_increments_lamport() {
4603        let backend = AutomergeBackend::new();
4604        backend.initialize(deletion_test_config()).await.unwrap();
4605        backend.deletion_policy_registry.set(
4606            "nodes",
4607            crate::qos::DeletionPolicy::Tombstone {
4608                tombstone_ttl: std::time::Duration::from_secs(3600),
4609                delete_wins: true,
4610            },
4611        );
4612
4613        // Insert two documents
4614        let doc1 = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4615        let doc2 = Document::new(HashMap::from([("x".to_string(), serde_json::json!(2))]));
4616        let id1 = backend
4617            .document_store()
4618            .upsert("nodes", doc1)
4619            .await
4620            .unwrap();
4621        let id2 = backend
4622            .document_store()
4623            .upsert("nodes", doc2)
4624            .await
4625            .unwrap();
4626
4627        // Delete both
4628        backend
4629            .document_store()
4630            .delete("nodes", &id1, None)
4631            .await
4632            .unwrap();
4633        backend
4634            .document_store()
4635            .delete("nodes", &id2, None)
4636            .await
4637            .unwrap();
4638
4639        // Verify monotonic lamport
4640        let tombstones = backend
4641            .document_store()
4642            .get_tombstones("nodes")
4643            .await
4644            .unwrap();
4645        assert_eq!(tombstones.len(), 2);
4646        let lamports: Vec<u64> = tombstones.iter().map(|t| t.lamport).collect();
4647        assert!(
4648            lamports.contains(&0) && lamports.contains(&1),
4649            "Expected lamport 0 and 1, got {:?}",
4650            lamports
4651        );
4652    }
4653
4654    #[tokio::test]
4655    async fn test_delete_uses_config_node_id() {
4656        let backend = AutomergeBackend::new();
4657
4658        // Before initialization, node_id falls back to "local"
4659        backend.deletion_policy_registry.set(
4660            "test_coll",
4661            crate::qos::DeletionPolicy::Tombstone {
4662                tombstone_ttl: std::time::Duration::from_secs(3600),
4663                delete_wins: true,
4664            },
4665        );
4666        let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4667        let id = backend
4668            .document_store()
4669            .upsert("test_coll", doc)
4670            .await
4671            .unwrap();
4672        backend
4673            .document_store()
4674            .delete("test_coll", &id, None)
4675            .await
4676            .unwrap();
4677        let tombstones = backend
4678            .document_store()
4679            .get_tombstones("test_coll")
4680            .await
4681            .unwrap();
4682        assert_eq!(tombstones[0].deleted_by, "local");
4683
4684        // After initialization, node_id should be app_id
4685        let backend2 = AutomergeBackend::new();
4686        backend2.initialize(deletion_test_config()).await.unwrap();
4687        backend2.deletion_policy_registry.set(
4688            "test_coll",
4689            crate::qos::DeletionPolicy::Tombstone {
4690                tombstone_ttl: std::time::Duration::from_secs(3600),
4691                delete_wins: true,
4692            },
4693        );
4694        let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(2))]));
4695        let id2 = backend2
4696            .document_store()
4697            .upsert("test_coll", doc)
4698            .await
4699            .unwrap();
4700        backend2
4701            .document_store()
4702            .delete("test_coll", &id2, None)
4703            .await
4704            .unwrap();
4705        let tombstones2 = backend2
4706            .document_store()
4707            .get_tombstones("test_coll")
4708            .await
4709            .unwrap();
4710        assert_eq!(tombstones2[0].deleted_by, "deletion_test");
4711    }
4712
4713    #[tokio::test]
4714    async fn test_drain_pending_tombstones() {
4715        let backend = AutomergeBackend::new();
4716        backend.initialize(deletion_test_config()).await.unwrap();
4717        backend.deletion_policy_registry.set(
4718            "nodes",
4719            crate::qos::DeletionPolicy::Tombstone {
4720                tombstone_ttl: std::time::Duration::from_secs(3600),
4721                delete_wins: true,
4722            },
4723        );
4724
4725        // Initially empty
4726        assert!(backend.drain_pending_tombstones().is_empty());
4727
4728        // Insert and delete
4729        let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4730        let id = backend.document_store().upsert("nodes", doc).await.unwrap();
4731        backend
4732            .document_store()
4733            .delete("nodes", &id, Some("test"))
4734            .await
4735            .unwrap();
4736
4737        // Should have one pending tombstone
4738        let pending = backend.drain_pending_tombstones();
4739        assert_eq!(pending.len(), 1);
4740        assert_eq!(pending[0].tombstone.document_id, id);
4741        assert_eq!(pending[0].tombstone.collection, "nodes");
4742
4743        // After drain, should be empty again
4744        assert!(backend.drain_pending_tombstones().is_empty());
4745    }
4746
4747    #[tokio::test]
4748    async fn test_gc_starts_on_init() {
4749        let backend = AutomergeBackend::new();
4750        // Before init, no GC handle
4751        assert!(backend.gc_handle.lock().unwrap().is_none());
4752
4753        backend.initialize(deletion_test_config()).await.unwrap();
4754        // After init, GC handle should exist
4755        assert!(backend.gc_handle.lock().unwrap().is_some());
4756
4757        // Shutdown should clear it
4758        backend.shutdown().await.unwrap();
4759        assert!(backend.gc_handle.lock().unwrap().is_none());
4760    }
4761
4762    #[tokio::test]
4763    async fn test_gc_store_impl() {
4764        use crate::qos::GcStore;
4765
4766        let backend = AutomergeBackend::new();
4767        backend.initialize(deletion_test_config()).await.unwrap();
4768        backend.deletion_policy_registry.set(
4769            "nodes",
4770            crate::qos::DeletionPolicy::Tombstone {
4771                tombstone_ttl: std::time::Duration::from_secs(3600),
4772                delete_wins: true,
4773            },
4774        );
4775
4776        // Insert, delete, create tombstone
4777        let doc = Document::new(HashMap::from([("x".to_string(), serde_json::json!(1))]));
4778        let id = backend.document_store().upsert("nodes", doc).await.unwrap();
4779        backend
4780            .document_store()
4781            .delete("nodes", &id, None)
4782            .await
4783            .unwrap();
4784
4785        // GcStore methods should work
4786        assert!(backend.has_tombstone("nodes", &id).unwrap());
4787        assert_eq!(backend.get_all_tombstones().unwrap().len(), 1);
4788
4789        // Remove tombstone
4790        assert!(backend.remove_tombstone("nodes", &id).unwrap());
4791        assert!(!backend.has_tombstone("nodes", &id).unwrap());
4792        assert_eq!(backend.get_all_tombstones().unwrap().len(), 0);
4793
4794        // list_collections
4795        let collections = backend.list_collections().unwrap();
4796        // May be empty since we deleted the only doc; that's OK
4797        assert!(collections.is_empty() || collections.contains(&"nodes".to_string()));
4798    }
4799
4800    // ============================================================================
4801    // Issue #517: Query::Custom Parser Tests
4802    // ============================================================================
4803
4804    /// Helper: Create a test document with given fields
4805    fn create_test_doc(fields: Vec<(&str, serde_json::Value)>) -> Document {
4806        let mut field_map = HashMap::new();
4807        for (key, value) in fields {
4808            field_map.insert(key.to_string(), value);
4809        }
4810        Document::new(field_map)
4811    }
4812
4813    #[test]
4814    fn test_custom_query_equality_string() {
4815        // Test: collection_name == 'squad_summaries'
4816        let doc = create_test_doc(vec![(
4817            "collection_name",
4818            serde_json::json!("squad_summaries"),
4819        )]);
4820
4821        assert!(evaluate_custom_query(
4822            &doc,
4823            "collection_name == 'squad_summaries'"
4824        ));
4825        assert!(!evaluate_custom_query(&doc, "collection_name == 'other'"));
4826    }
4827
4828    #[test]
4829    fn test_custom_query_equality_boolean() {
4830        // Test: public == true / public == false
4831        let doc_public = create_test_doc(vec![("public", serde_json::json!(true))]);
4832        let doc_private = create_test_doc(vec![("public", serde_json::json!(false))]);
4833
4834        assert!(evaluate_custom_query(&doc_public, "public == true"));
4835        assert!(!evaluate_custom_query(&doc_public, "public == false"));
4836        assert!(evaluate_custom_query(&doc_private, "public == false"));
4837        assert!(!evaluate_custom_query(&doc_private, "public == true"));
4838    }
4839
4840    #[test]
4841    fn test_custom_query_starts_with() {
4842        // Test: collection_name STARTS WITH 'squad-'
4843        let doc = create_test_doc(vec![("collection_name", serde_json::json!("squad-alpha"))]);
4844
4845        assert!(evaluate_custom_query(
4846            &doc,
4847            "collection_name STARTS WITH 'squad-'"
4848        ));
4849        assert!(evaluate_custom_query(
4850            &doc,
4851            "collection_name starts with 'squad-'"
4852        )); // Case insensitive
4853        assert!(!evaluate_custom_query(
4854            &doc,
4855            "collection_name STARTS WITH 'platoon-'"
4856        ));
4857    }
4858
4859    #[test]
4860    fn test_custom_query_ends_with() {
4861        // Test: collection_name ENDS WITH '.summaries'
4862        let doc = create_test_doc(vec![(
4863            "collection_name",
4864            serde_json::json!("squad.summaries"),
4865        )]);
4866
4867        assert!(evaluate_custom_query(
4868            &doc,
4869            "collection_name ENDS WITH '.summaries'"
4870        ));
4871        assert!(evaluate_custom_query(
4872            &doc,
4873            "collection_name ends with '.summaries'"
4874        )); // Case insensitive
4875        assert!(!evaluate_custom_query(
4876            &doc,
4877            "collection_name ENDS WITH '.reports'"
4878        ));
4879    }
4880
4881    #[test]
4882    fn test_custom_query_contains_array() {
4883        // Test: CONTAINS(authorized_roles, 'soldier')
4884        let doc = create_test_doc(vec![(
4885            "authorized_roles",
4886            serde_json::json!(["soldier", "squad_leader"]),
4887        )]);
4888
4889        assert!(evaluate_custom_query(
4890            &doc,
4891            "CONTAINS(authorized_roles, 'soldier')"
4892        ));
4893        assert!(evaluate_custom_query(
4894            &doc,
4895            "CONTAINS(authorized_roles, 'squad_leader')"
4896        ));
4897        assert!(!evaluate_custom_query(
4898            &doc,
4899            "CONTAINS(authorized_roles, 'general')"
4900        ));
4901    }
4902
4903    #[test]
4904    fn test_custom_query_contains_string() {
4905        // Test: CONTAINS on string field (substring search)
4906        let doc = create_test_doc(vec![(
4907            "description",
4908            serde_json::json!("This is a squad summary document"),
4909        )]);
4910
4911        assert!(evaluate_custom_query(
4912            &doc,
4913            "CONTAINS(description, 'squad')"
4914        ));
4915        assert!(evaluate_custom_query(
4916            &doc,
4917            "CONTAINS(description, 'summary')"
4918        ));
4919        assert!(!evaluate_custom_query(
4920            &doc,
4921            "CONTAINS(description, 'platoon')"
4922        ));
4923    }
4924
4925    #[test]
4926    fn test_custom_query_or_compound() {
4927        // Test: type == 'node_state' OR type == 'squad_summary'
4928        let doc_node = create_test_doc(vec![("type", serde_json::json!("node_state"))]);
4929        let doc_squad = create_test_doc(vec![("type", serde_json::json!("squad_summary"))]);
4930        let doc_other = create_test_doc(vec![("type", serde_json::json!("other"))]);
4931
4932        let query = "type == 'node_state' OR type == 'squad_summary'";
4933        assert!(evaluate_custom_query(&doc_node, query));
4934        assert!(evaluate_custom_query(&doc_squad, query));
4935        assert!(!evaluate_custom_query(&doc_other, query));
4936    }
4937
4938    #[test]
4939    fn test_custom_query_and_compound() {
4940        // Test: public == true AND type == 'node_state'
4941        let doc_match = create_test_doc(vec![
4942            ("public", serde_json::json!(true)),
4943            ("type", serde_json::json!("node_state")),
4944        ]);
4945        let doc_partial = create_test_doc(vec![
4946            ("public", serde_json::json!(true)),
4947            ("type", serde_json::json!("other")),
4948        ]);
4949
4950        let query = "public == true AND type == 'node_state'";
4951        assert!(evaluate_custom_query(&doc_match, query));
4952        assert!(!evaluate_custom_query(&doc_partial, query));
4953    }
4954
4955    #[test]
4956    fn test_custom_query_complex_compound() {
4957        // Test: (public == true OR CONTAINS(authorized_roles, 'soldier'))
4958        let doc_public = create_test_doc(vec![
4959            ("public", serde_json::json!(true)),
4960            ("authorized_roles", serde_json::json!([])),
4961        ]);
4962        let doc_soldier = create_test_doc(vec![
4963            ("public", serde_json::json!(false)),
4964            ("authorized_roles", serde_json::json!(["soldier"])),
4965        ]);
4966        let doc_neither = create_test_doc(vec![
4967            ("public", serde_json::json!(false)),
4968            ("authorized_roles", serde_json::json!(["general"])),
4969        ]);
4970
4971        let query = "public == true OR CONTAINS(authorized_roles, 'soldier')";
4972        assert!(evaluate_custom_query(&doc_public, query));
4973        assert!(evaluate_custom_query(&doc_soldier, query));
4974        assert!(!evaluate_custom_query(&doc_neither, query));
4975    }
4976
4977    #[test]
4978    fn test_custom_query_with_parentheses() {
4979        // Test: queries with parentheses are handled
4980        let doc = create_test_doc(vec![(
4981            "collection_name",
4982            serde_json::json!("squad_summaries"),
4983        )]);
4984
4985        assert!(evaluate_custom_query(
4986            &doc,
4987            "(collection_name == 'squad_summaries')"
4988        ));
4989    }
4990
4991    #[test]
4992    fn test_custom_query_unknown_pattern_returns_true() {
4993        // Test: Unknown patterns return true (conservative fallback)
4994        let doc = create_test_doc(vec![("field", serde_json::json!("value"))]);
4995
4996        // These are patterns we don't recognize - should return true
4997        assert!(evaluate_custom_query(&doc, "SOME_UNKNOWN_FUNCTION(x, y)"));
4998        assert!(evaluate_custom_query(&doc, "field BETWEEN 1 AND 10")); // BETWEEN not implemented
4999        assert!(evaluate_custom_query(&doc, "field REGEXP '^test'")); // REGEXP not implemented
5000    }
5001
5002    #[test]
5003    fn test_custom_query_matches_query_integration() {
5004        // Test that Query::Custom works through matches_query
5005        let doc = create_test_doc(vec![(
5006            "collection_name",
5007            serde_json::json!("squad_summaries"),
5008        )]);
5009
5010        let query = Query::Custom("collection_name == 'squad_summaries'".to_string());
5011        assert!(matches_query(&doc, &query));
5012
5013        let query_no_match = Query::Custom("collection_name == 'other'".to_string());
5014        assert!(!matches_query(&doc, &query_no_match));
5015    }
5016
5017    #[test]
5018    fn test_custom_query_real_world_patterns() {
5019        // Test actual patterns from the Peat codebase
5020
5021        // Pattern 1: collection_name == 'squad_summaries' (from peat-sim)
5022        let doc_summaries = create_test_doc(vec![(
5023            "collection_name",
5024            serde_json::json!("squad_summaries"),
5025        )]);
5026        assert!(evaluate_custom_query(
5027            &doc_summaries,
5028            "collection_name == 'squad_summaries'"
5029        ));
5030
5031        // Pattern 2: collection_name STARTS WITH 'squad-1' OR type == 'node_state'
5032        let doc_squad = create_test_doc(vec![
5033            ("collection_name", serde_json::json!("squad-1-alpha")),
5034            ("type", serde_json::json!("other")),
5035        ]);
5036        let doc_node = create_test_doc(vec![
5037            ("collection_name", serde_json::json!("other")),
5038            ("type", serde_json::json!("node_state")),
5039        ]);
5040        let query = "collection_name STARTS WITH 'squad-1' OR type == 'node_state'";
5041        assert!(evaluate_custom_query(&doc_squad, query));
5042        assert!(evaluate_custom_query(&doc_node, query));
5043
5044        // Pattern 3: collection_name ENDS WITH '.summaries' OR type == 'squad_summary'
5045        let doc_suffix = create_test_doc(vec![
5046            ("collection_name", serde_json::json!("platoon.summaries")),
5047            ("type", serde_json::json!("other")),
5048        ]);
5049        let query2 = "collection_name ENDS WITH '.summaries' OR type == 'squad_summary'";
5050        assert!(evaluate_custom_query(&doc_suffix, query2));
5051
5052        // Pattern 4: public == true OR CONTAINS(authorized_roles, 'soldier')
5053        let doc_with_role = create_test_doc(vec![
5054            ("public", serde_json::json!(false)),
5055            ("authorized_roles", serde_json::json!(["soldier", "medic"])),
5056        ]);
5057        let query3 = "public == true OR CONTAINS(authorized_roles, 'soldier')";
5058        assert!(evaluate_custom_query(&doc_with_role, query3));
5059    }
5060
5061    // ============================================================================
5062    // Issue #520: Extended DQL patterns for full syntactic parity
5063    // ============================================================================
5064
5065    #[test]
5066    fn test_custom_query_inequality_string() {
5067        // Test: field != 'value'
5068        let doc = create_test_doc(vec![("status", serde_json::json!("active"))]);
5069
5070        assert!(evaluate_custom_query(&doc, "status != 'inactive'"));
5071        assert!(!evaluate_custom_query(&doc, "status != 'active'"));
5072    }
5073
5074    #[test]
5075    fn test_custom_query_inequality_boolean() {
5076        // Test: field != true/false
5077        let doc_active = create_test_doc(vec![("enabled", serde_json::json!(true))]);
5078        let doc_inactive = create_test_doc(vec![("enabled", serde_json::json!(false))]);
5079
5080        assert!(evaluate_custom_query(&doc_active, "enabled != false"));
5081        assert!(!evaluate_custom_query(&doc_active, "enabled != true"));
5082        assert!(evaluate_custom_query(&doc_inactive, "enabled != true"));
5083        assert!(!evaluate_custom_query(&doc_inactive, "enabled != false"));
5084    }
5085
5086    #[test]
5087    fn test_custom_query_inequality_numeric() {
5088        // Test: field != number
5089        let doc = create_test_doc(vec![("count", serde_json::json!(42))]);
5090
5091        assert!(evaluate_custom_query(&doc, "count != 0"));
5092        assert!(evaluate_custom_query(&doc, "count != 100"));
5093        assert!(!evaluate_custom_query(&doc, "count != 42"));
5094    }
5095
5096    #[test]
5097    fn test_custom_query_like_prefix() {
5098        // Test: field LIKE 'prefix%'
5099        let doc = create_test_doc(vec![("name", serde_json::json!("squad-alpha-1"))]);
5100
5101        assert!(evaluate_custom_query(&doc, "name LIKE 'squad%'"));
5102        assert!(evaluate_custom_query(&doc, "name like 'squad%'")); // Case insensitive
5103        assert!(!evaluate_custom_query(&doc, "name LIKE 'platoon%'"));
5104    }
5105
5106    #[test]
5107    fn test_custom_query_like_suffix() {
5108        // Test: field LIKE '%suffix'
5109        let doc = create_test_doc(vec![("filename", serde_json::json!("report.pdf"))]);
5110
5111        assert!(evaluate_custom_query(&doc, "filename LIKE '%.pdf'"));
5112        assert!(!evaluate_custom_query(&doc, "filename LIKE '%.doc'"));
5113    }
5114
5115    #[test]
5116    fn test_custom_query_like_contains() {
5117        // Test: field LIKE '%middle%'
5118        let doc = create_test_doc(vec![(
5119            "description",
5120            serde_json::json!("This is a tactical mission report"),
5121        )]);
5122
5123        assert!(evaluate_custom_query(&doc, "description LIKE '%tactical%'"));
5124        assert!(evaluate_custom_query(&doc, "description LIKE '%mission%'"));
5125        assert!(!evaluate_custom_query(
5126            &doc,
5127            "description LIKE '%strategic%'"
5128        ));
5129    }
5130
5131    #[test]
5132    fn test_custom_query_like_complex() {
5133        // Test: field LIKE 'prefix%middle%suffix'
5134        let doc = create_test_doc(vec![("path", serde_json::json!("squad-alpha-report.json"))]);
5135
5136        assert!(evaluate_custom_query(&doc, "path LIKE 'squad%report%'")); // prefix and middle
5137        assert!(evaluate_custom_query(&doc, "path LIKE '%alpha%json'")); // middle and suffix
5138        assert!(evaluate_custom_query(&doc, "path LIKE 'squad%.json'")); // prefix and suffix
5139    }
5140
5141    #[test]
5142    fn test_custom_query_in_strings() {
5143        // Test: field IN ['a', 'b', 'c']
5144        let doc = create_test_doc(vec![("role", serde_json::json!("soldier"))]);
5145
5146        assert!(evaluate_custom_query(
5147            &doc,
5148            "role IN ['soldier', 'medic', 'engineer']"
5149        ));
5150        assert!(!evaluate_custom_query(
5151            &doc,
5152            "role IN ['general', 'colonel']"
5153        ));
5154    }
5155
5156    #[test]
5157    fn test_custom_query_in_numbers() {
5158        // Test: field IN [1, 2, 3]
5159        let doc = create_test_doc(vec![("priority", serde_json::json!(2))]);
5160
5161        assert!(evaluate_custom_query(&doc, "priority IN [1, 2, 3]"));
5162        assert!(!evaluate_custom_query(&doc, "priority IN [4, 5, 6]"));
5163    }
5164
5165    #[test]
5166    fn test_custom_query_in_case_insensitive() {
5167        // Test: IN keyword is case insensitive
5168        let doc = create_test_doc(vec![("status", serde_json::json!("active"))]);
5169
5170        assert!(evaluate_custom_query(
5171            &doc,
5172            "status in ['active', 'pending']"
5173        ));
5174        assert!(evaluate_custom_query(
5175            &doc,
5176            "status IN ['active', 'pending']"
5177        ));
5178    }
5179
5180    #[test]
5181    fn test_custom_query_not_expression() {
5182        // Test: NOT (expr)
5183        let doc = create_test_doc(vec![("enabled", serde_json::json!(false))]);
5184
5185        assert!(evaluate_custom_query(&doc, "NOT (enabled == true)"));
5186        assert!(!evaluate_custom_query(&doc, "NOT (enabled == false)"));
5187    }
5188
5189    #[test]
5190    fn test_custom_query_not_without_parens() {
5191        // Test: NOT expr (without parentheses)
5192        let doc = create_test_doc(vec![("status", serde_json::json!("inactive"))]);
5193
5194        assert!(evaluate_custom_query(&doc, "NOT status == 'active'"));
5195        assert!(!evaluate_custom_query(&doc, "NOT status == 'inactive'"));
5196    }
5197
5198    #[test]
5199    fn test_custom_query_not_case_insensitive() {
5200        // Test: not is case insensitive
5201        let doc = create_test_doc(vec![("flag", serde_json::json!(true))]);
5202
5203        assert!(evaluate_custom_query(&doc, "not (flag == false)"));
5204        assert!(evaluate_custom_query(&doc, "NOT (flag == false)"));
5205    }
5206
5207    #[test]
5208    fn test_custom_query_is_null() {
5209        // Test: field IS NULL
5210        let doc_with_null = create_test_doc(vec![("optional", serde_json::Value::Null)]);
5211        let doc_without_field = create_test_doc(vec![("other", serde_json::json!("value"))]);
5212        let doc_with_value = create_test_doc(vec![("optional", serde_json::json!("present"))]);
5213
5214        assert!(evaluate_custom_query(&doc_with_null, "optional IS NULL"));
5215        assert!(evaluate_custom_query(
5216            &doc_without_field,
5217            "optional IS NULL"
5218        ));
5219        assert!(!evaluate_custom_query(&doc_with_value, "optional IS NULL"));
5220    }
5221
5222    #[test]
5223    fn test_custom_query_is_not_null() {
5224        // Test: field IS NOT NULL
5225        let doc_with_value = create_test_doc(vec![("required", serde_json::json!("value"))]);
5226        let doc_with_null = create_test_doc(vec![("required", serde_json::Value::Null)]);
5227        let doc_missing = create_test_doc(vec![("other", serde_json::json!("x"))]);
5228
5229        assert!(evaluate_custom_query(
5230            &doc_with_value,
5231            "required IS NOT NULL"
5232        ));
5233        assert!(!evaluate_custom_query(
5234            &doc_with_null,
5235            "required IS NOT NULL"
5236        ));
5237        assert!(!evaluate_custom_query(&doc_missing, "required IS NOT NULL"));
5238    }
5239
5240    #[test]
5241    fn test_custom_query_is_null_case_insensitive() {
5242        // Test: IS NULL is case insensitive
5243        let doc = create_test_doc(vec![("field", serde_json::Value::Null)]);
5244
5245        assert!(evaluate_custom_query(&doc, "field is null"));
5246        assert!(evaluate_custom_query(&doc, "field IS NULL"));
5247        assert!(evaluate_custom_query(&doc, "field Is Null"));
5248    }
5249
5250    #[test]
5251    fn test_custom_query_nested_field_equality() {
5252        // Test: nested.field == 'value'
5253        let doc = create_test_doc(vec![(
5254            "address",
5255            serde_json::json!({"city": "San Francisco", "state": "CA"}),
5256        )]);
5257
5258        assert!(evaluate_custom_query(
5259            &doc,
5260            "address.city == 'San Francisco'"
5261        ));
5262        assert!(evaluate_custom_query(&doc, "address.state == 'CA'"));
5263        assert!(!evaluate_custom_query(&doc, "address.city == 'New York'"));
5264    }
5265
5266    #[test]
5267    fn test_custom_query_nested_field_deep() {
5268        // Test: deeply nested field access
5269        let doc = create_test_doc(vec![(
5270            "data",
5271            serde_json::json!({"level1": {"level2": {"value": "deep"}}}),
5272        )]);
5273
5274        assert!(evaluate_custom_query(
5275            &doc,
5276            "data.level1.level2.value == 'deep'"
5277        ));
5278        assert!(!evaluate_custom_query(
5279            &doc,
5280            "data.level1.level2.value == 'shallow'"
5281        ));
5282    }
5283
5284    #[test]
5285    fn test_custom_query_nested_field_is_null() {
5286        // Test: nested.field IS NULL
5287        let doc = create_test_doc(vec![(
5288            "config",
5289            serde_json::json!({"enabled": true, "optional": null}),
5290        )]);
5291
5292        assert!(evaluate_custom_query(&doc, "config.optional IS NULL"));
5293        assert!(evaluate_custom_query(&doc, "config.missing IS NULL"));
5294        assert!(!evaluate_custom_query(&doc, "config.enabled IS NULL"));
5295    }
5296
5297    #[test]
5298    fn test_custom_query_nested_field_in() {
5299        // Test: nested.field IN [...]
5300        let doc = create_test_doc(vec![(
5301            "user",
5302            serde_json::json!({"role": "admin", "level": 5}),
5303        )]);
5304
5305        assert!(evaluate_custom_query(
5306            &doc,
5307            "user.role IN ['admin', 'superuser']"
5308        ));
5309        assert!(evaluate_custom_query(&doc, "user.level IN [1, 5, 10]"));
5310        assert!(!evaluate_custom_query(
5311            &doc,
5312            "user.role IN ['guest', 'user']"
5313        ));
5314    }
5315
5316    #[test]
5317    fn test_custom_query_compound_with_new_patterns() {
5318        // Test: combining new patterns with AND/OR
5319        let doc = create_test_doc(vec![
5320            ("status", serde_json::json!("active")),
5321            ("priority", serde_json::json!(1)),
5322            ("optional", serde_json::Value::Null),
5323        ]);
5324
5325        // status != 'deleted' AND priority IN [1, 2, 3]
5326        assert!(evaluate_custom_query(
5327            &doc,
5328            "status != 'deleted' AND priority IN [1, 2, 3]"
5329        ));
5330
5331        // optional IS NULL OR status LIKE 'act%'
5332        assert!(evaluate_custom_query(
5333            &doc,
5334            "optional IS NULL OR status LIKE 'act%'"
5335        ));
5336
5337        // NOT (status == 'inactive') AND priority != 0
5338        assert!(evaluate_custom_query(
5339            &doc,
5340            "NOT (status == 'inactive') AND priority != 0"
5341        ));
5342    }
5343
5344    #[test]
5345    fn test_match_like_pattern_unit() {
5346        // Unit tests for match_like_pattern helper function
5347        assert!(match_like_pattern("hello world", "%world"));
5348        assert!(match_like_pattern("hello world", "hello%"));
5349        assert!(match_like_pattern("hello world", "%lo wo%"));
5350        assert!(match_like_pattern("hello world", "%"));
5351        assert!(match_like_pattern("hello world", "%%"));
5352        assert!(match_like_pattern("hello world", "hello world"));
5353        assert!(!match_like_pattern("hello world", "goodbye%"));
5354        assert!(!match_like_pattern("hello world", "%goodbye"));
5355        assert!(!match_like_pattern("hello", "hello world"));
5356    }
5357
5358    // ============================================================================
5359    // Issue #518: Counter and Nested Object Tests
5360    // ============================================================================
5361
5362    #[test]
5363    fn test_automerge_scalar_counter_extraction() {
5364        // Test that Counter values are properly extracted
5365        use automerge::ScalarValue;
5366
5367        // Create a counter with value 42
5368        let counter = ScalarValue::counter(42);
5369
5370        // Extract it using our function
5371        if let ScalarValue::Counter(c) = &counter {
5372            let value: i64 = i64::from(c);
5373            assert_eq!(value, 42, "Counter value should be 42");
5374        } else {
5375            panic!("Expected Counter variant");
5376        }
5377    }
5378
5379    #[test]
5380    fn test_automerge_scalar_to_json_all_types() {
5381        // Test all scalar types convert correctly
5382        use automerge::ScalarValue;
5383
5384        // String
5385        let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Str("hello".into()));
5386        assert_eq!(result, Some(serde_json::json!("hello")));
5387
5388        // Integer
5389        let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Int(-42));
5390        assert_eq!(result, Some(serde_json::json!(-42)));
5391
5392        // Unsigned integer
5393        let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Uint(42));
5394        assert_eq!(result, Some(serde_json::json!(42)));
5395
5396        // Float (use arbitrary value to avoid clippy::approx_constant)
5397        let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::F64(1.234));
5398        assert!(result.is_some());
5399        if let Some(serde_json::Value::Number(n)) = result {
5400            assert!((n.as_f64().unwrap() - 1.234).abs() < 0.001);
5401        }
5402
5403        // Boolean
5404        let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Boolean(true));
5405        assert_eq!(result, Some(serde_json::json!(true)));
5406
5407        // Null
5408        let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Null);
5409        assert_eq!(result, Some(serde_json::Value::Null));
5410
5411        // Timestamp
5412        let result =
5413            AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Timestamp(1234567890));
5414        assert_eq!(result, Some(serde_json::json!(1234567890)));
5415
5416        // Counter (Issue #518)
5417        let counter = ScalarValue::counter(100);
5418        if let ScalarValue::Counter(c) = &counter {
5419            let result =
5420                AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Counter(c.clone()));
5421            assert_eq!(
5422                result,
5423                Some(serde_json::json!(100)),
5424                "Counter should return actual value, not 0"
5425            );
5426        }
5427
5428        // Bytes
5429        let result = AutomergeBackend::automerge_scalar_to_json(&ScalarValue::Bytes(vec![1, 2, 3]));
5430        assert_eq!(result, Some(serde_json::json!([1, 2, 3])));
5431    }
5432
5433    #[tokio::test]
5434    async fn test_nested_object_roundtrip() {
5435        // Test that nested objects survive the roundtrip through Automerge
5436        let backend = AutomergeBackend::new();
5437        backend.initialize(deletion_test_config()).await.unwrap();
5438
5439        // Create a document with nested structure
5440        let nested_doc = Document::new(
5441            vec![
5442                ("name".to_string(), serde_json::json!("test")),
5443                (
5444                    "metadata".to_string(),
5445                    serde_json::json!({
5446                        "version": 1,
5447                        "author": "test_user"
5448                    }),
5449                ),
5450                ("items".to_string(), serde_json::json!([1, 2, 3])),
5451            ]
5452            .into_iter()
5453            .collect(),
5454        );
5455
5456        let doc_id = backend
5457            .document_store()
5458            .upsert("nested_test", nested_doc)
5459            .await
5460            .unwrap();
5461
5462        // Retrieve and verify
5463        let retrieved = backend
5464            .document_store()
5465            .get("nested_test", &doc_id)
5466            .await
5467            .unwrap()
5468            .expect("Document should exist");
5469
5470        assert_eq!(
5471            retrieved.fields.get("name"),
5472            Some(&serde_json::json!("test"))
5473        );
5474
5475        // Verify nested object (Issue #518)
5476        if let Some(metadata) = retrieved.fields.get("metadata") {
5477            assert!(metadata.is_object(), "metadata should be an object");
5478            if let Some(version) = metadata.get("version") {
5479                assert_eq!(version, &serde_json::json!(1));
5480            }
5481            if let Some(author) = metadata.get("author") {
5482                assert_eq!(author, &serde_json::json!("test_user"));
5483            }
5484        }
5485
5486        // Verify array
5487        if let Some(items) = retrieved.fields.get("items") {
5488            assert!(items.is_array(), "items should be an array");
5489            assert_eq!(items, &serde_json::json!([1, 2, 3]));
5490        }
5491    }
5492}