gun/
dam.rs

1//! DAM (Directed Acyclic Mesh) protocol implementation
2//!
3//! This module implements Gun's DAM protocol for peer-to-peer message routing with
4//! automatic deduplication and cryptographic signing. DAM ensures:
5//!
6//! - **No message loops**: Messages are deduplicated using message IDs
7//! - **Cryptographic security**: All messages are signed with BLS signatures
8//! - **Reliable delivery**: Automatic retry and queuing for offline peers
9//! - **Efficient routing**: Messages are routed through the peer mesh
10//!
11//! Based on Gun.js `mesh.js`. The DAM protocol is the foundation of Gun's
12//! decentralized networking.
13//!
14//! ## Message Format
15//!
16//! All DAM messages include:
17//! - `#`: Message ID (SHA256 hash of message without signatures)
18//! - `sigs`: Array of BLS signatures and public keys
19//! - Message payload (e.g., `put`, `get`, `dam`)
20//!
21//! ## Peer Management
22//!
23//! The `Mesh` struct manages all peer connections:
24//! - Tracks connected peers
25//! - Routes messages to peers
26//! - Handles message signing and verification
27//! - Manages peer public keys for verification
28
29use crate::core::GunCore;
30use crate::dup::Dup;
31use crate::error::GunResult;
32use crate::types::MessagePredicate;
33use chia_bls::{PublicKey, SecretKey, Signature, sign, verify};
34use serde_json::Value;
35use sha2::{Sha256, Digest};
36use std::collections::HashMap;
37use std::sync::Arc;
38use tokio::sync::{mpsc, RwLock};
39
40/// Represents a peer connection in the DAM mesh
41///
42/// Peers are identified by a URL and have associated connection state including
43/// message queues, batching information, and retry logic.
44///
45/// # Fields
46///
47/// - `id`: Unique identifier for this peer connection
48/// - `url`: WebSocket URL of the peer
49/// - `pid`: Peer ID in the DAM mesh (set after handshake)
50/// - `tx`: Message sender channel for WebSocket communication
51/// - `batch`, `tail`, `queue`: Message batching and queuing
52/// - `last`: Last message ID sent (for ordering)
53/// - `retry`, `tried`: Retry logic for connection attempts
54#[derive(Clone, Debug)]
55pub struct Peer {
56    pub id: String,
57    pub url: String,
58    pub pid: Option<String>,                       // peer ID for DAM
59    pub tx: Option<mpsc::UnboundedSender<String>>, // WebSocket message sender
60    pub batch: Option<String>,                     // batched messages
61    pub tail: usize,                               // batch size
62    pub queue: Vec<String>,                        // queued messages
63    pub last: Option<String>,                      // last message ID sent
64    pub retry: i32,
65    pub tried: Option<u64>, // timestamp
66}
67
68impl Peer {
69    pub fn new(url: String) -> Self {
70        use std::sync::atomic::{AtomicU64, Ordering};
71        static COUNTER: AtomicU64 = AtomicU64::new(0);
72        let id = COUNTER.fetch_add(1, Ordering::SeqCst);
73        Self {
74            id: format!("peer_{}", id),
75            url,
76            pid: None,
77            tx: None,
78            batch: None,
79            tail: 0,
80            queue: vec![],
81            last: None,
82            retry: 60,
83            tried: None,
84        }
85    }
86
87    /// Set the WebSocket message sender
88    pub fn set_sender(&mut self, tx: mpsc::UnboundedSender<String>) {
89        self.tx = Some(tx);
90    }
91
92    /// Send a message through the WebSocket connection
93    pub async fn send(&self, message: &str) -> GunResult<()> {
94        if let Some(ref tx) = self.tx {
95            tx.send(message.to_string()).map_err(|e| {
96                crate::error::GunError::Network(format!("Failed to send message: {}", e))
97            })?;
98        } else {
99            // Peer not connected - message will be dropped
100            // Note: In a full implementation, messages could be queued for later delivery
101            // Currently, Mesh.send_to_peer_by_id() will return an error if peer is not found
102            // This is acceptable behavior as peers will reconnect and sync state
103        }
104        Ok(())
105    }
106}
107
108/// DAM Mesh - handles message routing and peer communication
109///
110/// The mesh is the central coordinator for all peer-to-peer communication in Gun.
111/// It manages:
112///
113/// - Peer connections and routing
114/// - Message signing and verification (BLS signatures)
115/// - Message deduplication
116/// - Message broadcasting and targeted delivery
117/// - Peer public key management
118///
119/// Based on Gun.js `mesh.js`. The mesh is thread-safe and can be shared across
120/// threads using `Arc<Mesh>`.
121///
122/// # Example
123///
124/// ```rust,no_run
125/// use gun::dam::Mesh;
126/// use gun::core::GunCore;
127/// use chia_bls::{SecretKey, PublicKey};
128/// use std::sync::Arc;
129///
130/// let core = Arc::new(GunCore::new());
131/// let secret_key = SecretKey::from_seed(&[0u8; 32]);
132/// let public_key = secret_key.public_key();
133/// let mesh = Arc::new(Mesh::new(core, secret_key, public_key, None));
134/// ```
135pub struct Mesh {
136    pub dup: Arc<RwLock<Dup>>,
137    peers: Arc<RwLock<HashMap<String, Peer>>>,
138    core: Arc<GunCore>,
139    pub near: Arc<RwLock<usize>>, // number of connected peers
140    pub pid: String,              // our peer ID
141    opt: MeshOptions,
142    secret_key: SecretKey,        // BLS secret key for signing outgoing messages
143    public_key: PublicKey,        // BLS public key (our own, for reference)
144    peer_public_keys: Arc<RwLock<HashMap<String, PublicKey>>>, // Map peer_id -> public_key for verification
145    message_predicate: Option<MessagePredicate>, // Optional predicate for custom message filtering
146}
147
148/// Configuration options for the DAM mesh
149///
150/// These options control message batching, size limits, and retry behavior.
151#[derive(Clone, Debug)]
152pub struct MeshOptions {
153    pub max_message_size: usize, // default 300MB * 0.3
154    pub pack_size: usize,        // batch size
155    pub gap: u64,                // batching delay in ms
156    pub retry: i32,
157    pub lack: u64, // lack timeout
158}
159
160impl Default for MeshOptions {
161    fn default() -> Self {
162        Self {
163            max_message_size: (300_000_000.0 * 0.3) as usize,
164            pack_size: ((300_000_000.0 * 0.3 * 0.01 * 0.01) as usize),
165            gap: 0,
166            retry: 60,
167            lack: 9000,
168        }
169    }
170}
171
172impl Mesh {
173    pub fn new(core: Arc<GunCore>, secret_key: SecretKey, public_key: PublicKey, message_predicate: Option<MessagePredicate>) -> Self {
174        let pid = core.random_id(9);
175        Self {
176            dup: Arc::new(RwLock::new(Dup::new_default())),
177            peers: Arc::new(RwLock::new(HashMap::new())),
178            core,
179            near: Arc::new(RwLock::new(0)),
180            pid,
181            opt: MeshOptions::default(),
182            secret_key,
183            public_key,
184            peer_public_keys: Arc::new(RwLock::new(HashMap::new())),
185            message_predicate,
186        }
187    }
188
189    /// Handle incoming message (matches mesh.hear)
190    pub async fn hear(&self, raw: &str, peer: Option<&Peer>) -> GunResult<()> {
191        if raw.is_empty() {
192            return Ok(());
193        }
194
195        let peer_id = peer.map(|p| p.id.clone()).unwrap_or_else(|| "unknown".to_string());
196        eprintln!("DEBUG: mesh.hear() received message from peer {}: {}", peer_id, raw.chars().take(200).collect::<String>());
197
198        // Check message size
199        if raw.len() > self.opt.max_message_size {
200            if let Some(p) = peer {
201                self.say(
202                    &serde_json::json!({
203                        "dam": "!",
204                        "err": "Message too big!"
205                    }),
206                    Some(p),
207                )
208                .await?;
209            }
210            return Ok(());
211        }
212
213        // Handle batched messages (JSON array)
214        if raw.starts_with('[') {
215            let messages: Vec<Value> = serde_json::from_str(raw)?;
216            eprintln!("DEBUG: Processing {} batched messages from peer {}", messages.len(), peer_id);
217            for msg in messages {
218                self.hear_one(&msg, peer).await?;
219            }
220            return Ok(());
221        }
222
223        // Handle single message
224        let msg: Value = serde_json::from_str(raw)?;
225        self.hear_one(&msg, peer).await?;
226        Ok(())
227    }
228
229    /// Handle a single message (matches mesh.hear.one)
230    async fn hear_one(&self, msg: &Value, peer: Option<&Peer>) -> GunResult<()> {
231        // Get message ID (should be SHA256 hash of message without sigs)
232        let msg_id = msg
233            .get("#")
234            .and_then(|v| v.as_str())
235            .map(|s| s.to_string())
236            .ok_or_else(|| {
237                crate::error::GunError::Network("Message missing ID (#) field".to_string())
238            })?;
239
240        // Create message bytes for verification (without sigs field)
241        let mut msg_for_hash = msg.clone();
242        msg_for_hash.as_object_mut().unwrap().remove("sigs");
243        let msg_bytes = serde_json::to_vec(&msg_for_hash)?;
244        
245        // Verify that the message ID matches the SHA256 hash of the message (without sigs)
246        let mut hasher = Sha256::new();
247        hasher.update(&msg_bytes);
248        let computed_hash = hasher.finalize();
249        let computed_hash_hex = hex::encode(computed_hash);
250        
251        if msg_id != computed_hash_hex {
252            eprintln!("DEBUG: Message ID hash mismatch. Expected: {}, Got: {}", computed_hash_hex, msg_id);
253            return Ok(()); // Reject message with invalid hash
254        }
255
256        // Verify all signatures in the aggregate before processing
257        let sigs_array = if let Some(sigs) = msg.get("sigs").and_then(|v| v.as_array()) {
258            if sigs.is_empty() {
259                eprintln!("DEBUG: Message missing signatures from peer {:?}", peer.map(|p| &p.id));
260                return Ok(()); // Reject message without signatures
261            }
262            sigs
263        } else {
264            // Legacy format: try to read single sig/pubkey for backward compatibility
265            // But we'll still require sigs array going forward
266            eprintln!("DEBUG: Message missing sigs array from peer {:?}", peer.map(|p| &p.id));
267            return Ok(()); // Reject message without sigs array
268        };
269        
270        // msg_bytes already computed above for hash verification, reuse it for signature verification
271        
272        // Verify ALL signatures in the aggregate
273        let mut verified_pubkeys = Vec::new();
274        for sig_entry in sigs_array {
275            let sig_hex = match sig_entry.get("sig").and_then(|v| v.as_str()) {
276                Some(hex) => hex,
277                None => {
278                    eprintln!("DEBUG: Invalid signature entry: missing sig");
279                    return Ok(()); // Reject message with invalid signature entry
280                }
281            };
282            let pubkey_hex = match sig_entry.get("pubkey").and_then(|v| v.as_str()) {
283                Some(hex) => hex,
284                None => {
285                    eprintln!("DEBUG: Invalid signature entry: missing pubkey");
286                    return Ok(()); // Reject message with invalid signature entry
287                }
288            };
289            
290            // Decode signature and public key
291            let sig_bytes = match hex::decode(sig_hex) {
292                Ok(bytes) => bytes,
293                Err(e) => {
294                    eprintln!("DEBUG: Invalid signature hex: {}", e);
295                    return Ok(()); // Reject message with invalid signature hex
296                }
297            };
298            let pubkey_bytes = match hex::decode(pubkey_hex) {
299                Ok(bytes) => bytes,
300                Err(e) => {
301                    eprintln!("DEBUG: Invalid public key hex: {}", e);
302                    return Ok(()); // Reject message with invalid public key hex
303                }
304            };
305            
306            // Convert to fixed-size arrays
307            if sig_bytes.len() != 96 {
308                eprintln!("DEBUG: Invalid signature length: expected 96 bytes, got {}", sig_bytes.len());
309                return Ok(()); // Reject message with invalid signature
310            }
311            if pubkey_bytes.len() != 48 {
312                eprintln!("DEBUG: Invalid public key length: expected 48 bytes, got {}", pubkey_bytes.len());
313                return Ok(()); // Reject message with invalid public key
314            }
315            
316            let mut sig_array = [0u8; 96];
317            sig_array.copy_from_slice(&sig_bytes);
318            let mut pubkey_array = [0u8; 48];
319            pubkey_array.copy_from_slice(&pubkey_bytes);
320            
321            let signature = match Signature::from_bytes(&sig_array) {
322                Ok(sig) => sig,
323                Err(e) => {
324                    eprintln!("DEBUG: Invalid signature format: {}", e);
325                    return Ok(()); // Reject message with invalid signature
326                }
327            };
328            let sender_pubkey = match PublicKey::from_bytes(&pubkey_array) {
329                Ok(pk) => pk,
330                Err(e) => {
331                    eprintln!("DEBUG: Invalid public key format: {}", e);
332                    return Ok(()); // Reject message with invalid public key
333                }
334            };
335            
336            // Verify this signature
337            if !verify(&signature, &sender_pubkey, &msg_bytes) {
338                eprintln!("DEBUG: Signature verification failed for pubkey {} from peer {:?}", pubkey_hex, peer.map(|p| &p.id));
339                return Ok(()); // Reject message if any signature is invalid
340            }
341            
342            verified_pubkeys.push(sender_pubkey);
343        }
344        
345        // Store peer's public keys for future reference
346        if let Some(p) = peer {
347            let mut peer_keys = self.peer_public_keys.write().await;
348            for pubkey in &verified_pubkeys {
349                peer_keys.insert(format!("{}:{}", p.id, hex::encode(pubkey.to_bytes())), pubkey.clone());
350            }
351        }
352        
353        // Check if my signature is already in the aggregate
354        let my_pubkey_hex = hex::encode(self.public_key.to_bytes());
355        let has_my_sig = sigs_array.iter().any(|sig_obj| {
356            sig_obj.get("pubkey")
357                .and_then(|v| v.as_str())
358                .map(|pk| pk == my_pubkey_hex)
359                .unwrap_or(false)
360        });
361        
362        // If my signature is not present, add it and re-broadcast (but exclude the sender)
363        if !has_my_sig {
364            // Sign the message
365            let signature = sign(&self.secret_key, &msg_bytes);
366            let signature_hex = hex::encode(signature.to_bytes());
367            
368            // Add my signature to the array
369            let mut updated_msg = msg.clone();
370            let mut updated_sigs = sigs_array.to_vec();
371            let sig_entry = serde_json::json!({
372                "sig": signature_hex,
373                "pubkey": my_pubkey_hex
374            });
375            updated_sigs.push(sig_entry);
376            updated_msg["sigs"] = serde_json::Value::Array(updated_sigs);
377            
378            // Re-broadcast to all peers except the one that sent it to us
379            let sender_id = peer.map(|p| p.id.clone());
380            let peer_ids: Vec<String> = {
381                let peers = self.peers.read().await;
382                peers.keys()
383                    .filter(|id| Some((**id).clone()) != sender_id)
384                    .cloned()
385                    .collect()
386            };
387            
388            let updated_raw = serde_json::to_string(&updated_msg)?;
389            for peer_id in peer_ids {
390                if let Err(e) = self.send_to_peer_by_id(&updated_raw, &peer_id).await {
391                    eprintln!("Error re-broadcasting signed message to peer {}: {}", peer_id, e);
392                }
393            }
394        }
395
396        // Check custom message predicate (application-level filtering)
397        // This runs after signature verification but before message processing
398        if let Some(ref predicate) = self.message_predicate {
399            if !predicate(msg) {
400                eprintln!("DEBUG: Message rejected by custom predicate from peer {:?}", peer.map(|p| &p.id));
401                return Ok(()); // Reject message based on predicate
402            }
403        }
404
405        // Deduplication check
406        {
407            let mut dup = self.dup.write().await;
408            if dup.check(&msg_id) {
409                return Ok(()); // duplicate, ignore
410            }
411            dup.track(&msg_id);
412        }
413
414        // Handle special DAM messages
415        if let Some(dam_type) = msg.get("dam").and_then(|v| v.as_str()) {
416            match dam_type {
417                "!" => {
418                    // Error message
419                    if let Some(p) = peer {
420                        if let Some(err) = msg.get("err").and_then(|v| v.as_str()) {
421                            eprintln!("DAM Error from peer {}: {}", p.id, err);
422                        }
423                    }
424                }
425                "?" => {
426                    // Peer ID exchange
427                    if let Some(p) = peer {
428                        self.handle_peer_id_exchange(msg, p).await?;
429                    }
430                }
431                "rtc" => {
432                    // WebRTC signaling message - these are handled at the Gun level
433                    // to avoid circular dependencies between Mesh and WebRTCManager
434                    tracing::debug!("Received RTC signaling message via DAM protocol");
435                }
436                _ => {
437                    // Other DAM message types
438                }
439            }
440            return Ok(());
441        }
442
443        // Process Gun protocol messages (put, get)
444        if let Some(put_data) = msg.get("put") {
445            eprintln!("DEBUG: Received put message: {}", serde_json::to_string(msg).unwrap_or_default());
446            // Handle put message - update graph and emit node_update event
447            // Gun.js format: { put: { soul: { _: { "#": soul, ">": states }, ...data } } }
448            // The soul is a KEY in the put object, not a field
449            if let Some(put_obj) = put_data.as_object() {
450                // Iterate over each soul in the put object
451                for (soul, node_data) in put_obj {
452                    if let Some(node_obj) = node_data.as_object() {
453                        // Extract metadata from "_" field
454                        let meta = node_obj.get("_").and_then(|v| v.as_object());
455                        let soul_from_meta = meta.and_then(|m| m.get("#")).and_then(|v| v.as_str()).unwrap_or(soul);
456                        
457                        // Extract state map from ">" field in metadata
458                        let states = meta.and_then(|m| m.get(">")).and_then(|v| v.as_object());
459                        
460                        // Update graph
461                        use crate::state::Node;
462                        let mut node = self.core.graph.get(soul_from_meta)
463                            .unwrap_or_else(|| Node::with_soul(soul_from_meta.to_string()));
464                        
465                        // Merge all fields from node_obj into node (except "_" which is metadata)
466                        for (key, value) in node_obj {
467                            if key != "_" {
468                                // Get state for this key from states map if available
469                                let state = states.and_then(|s| s.get(key))
470                                    .and_then(|v| v.as_f64())
471                                    .unwrap_or_else(|| self.core.state.next());
472                                
473                                node.data.insert(key.clone(), value.clone());
474                                crate::state::State::ify(&mut node, Some(&key), Some(state), Some(value.clone()), Some(soul_from_meta));
475                            }
476                        }
477                        
478                        // Store updated node
479                        if let Err(e) = self.core.graph.put(soul_from_meta, node.clone()) {
480                            eprintln!("Error updating graph for soul {}: {}", soul_from_meta, e);
481                        } else {
482                            eprintln!("DEBUG: Updated graph for soul {} (from peer), emitting node_update event. Node data keys: {:?}", soul_from_meta, node.data.keys().collect::<Vec<_>>());
483                            // Emit node_update event so once() and on() callbacks get called
484                            let event_type = format!("node_update:{}", soul_from_meta);
485                            self.core.events.emit(&crate::events::Event {
486                                event_type: event_type.clone(),
487                                data: serde_json::Value::Object(node.data.clone()),
488                            });
489                            // Also emit graph_update for listeners that don't have a specific soul yet
490                            self.core.events.emit(&crate::events::Event {
491                                event_type: "graph_update".to_string(),
492                                data: serde_json::json!({
493                                    soul_from_meta: serde_json::Value::Object(node.data.clone())
494                                }),
495                            });
496                        }
497                    }
498                }
499            }
500        } else if let Some(get_data) = msg.get("get") {
501            eprintln!("DEBUG: Received get message: {}", serde_json::to_string(msg).unwrap_or_default());
502            // Handle get message - respond with requested data
503            if let Some(get_obj) = get_data.as_object() {
504                if let Some(soul_val) = get_obj.get("#") {
505                    if let Some(soul) = soul_val.as_str() {
506                        // Check if we have the requested node
507                        if let Some(node) = self.core.graph.get(soul) {
508                            // Check if get request has a key (for nested properties)
509                            if let Some(key_val) = get_obj.get(".") {
510                                if let Some(key) = key_val.as_str() {
511                                    // Requesting a specific key - check if it's a soul reference
512                                    if let Some(value) = node.data.get(key) {
513                                        if let Some(obj) = value.as_object() {
514                                            if let Some(soul_ref) = obj.get("#") {
515                                                if let Some(ref_soul) = soul_ref.as_str() {
516                                                    // It's a soul reference - get the referenced node
517                                                    if let Some(ref_node) = self.core.graph.get(ref_soul) {
518                                                        let mut put_obj = serde_json::json!({
519                                                            "#": ref_soul
520                                                        });
521                                                        for (k, v) in &ref_node.data {
522                                                            put_obj[k] = v.clone();
523                                                        }
524                                                        let response = serde_json::json!({
525                                                            "put": put_obj
526                                                        });
527                                                        eprintln!("DEBUG: Sending get response for nested soul {} (key: {}) to peer", ref_soul, key);
528                                                        if let Some(p) = peer {
529                                                            if let Err(e) = self.say(&response, Some(p)).await {
530                                                                eprintln!("Error sending get response to peer {}: {}", p.id, e);
531                                                            }
532                                                        }
533                                                    }
534                                                }
535                                            } else {
536                                                // Not a soul reference - return the value directly
537                                                let put_obj = serde_json::json!({
538                                                    "#": soul,
539                                                    key: value.clone()
540                                                });
541                                                let response = serde_json::json!({
542                                                    "put": put_obj
543                                                });
544                                                eprintln!("DEBUG: Sending get response for key {} in soul {} to peer", key, soul);
545                                                if let Some(p) = peer {
546                                                    if let Err(e) = self.say(&response, Some(p)).await {
547                                                        eprintln!("Error sending get response to peer {}: {}", p.id, e);
548                                                    }
549                                                }
550                                            }
551                                        }
552                                    }
553                                }
554                            } else {
555                                // No key specified - return entire node
556                                let mut put_obj = serde_json::json!({
557                                    "#": soul
558                                });
559                                for (key, value) in &node.data {
560                                    put_obj[key] = value.clone();
561                                }
562                                let response = serde_json::json!({
563                                    "put": put_obj
564                                });
565                                eprintln!("DEBUG: Sending get response for soul {} to peer. Response: {}", soul, serde_json::to_string(&response).unwrap_or_default());
566                                // Broadcast the response instead of sending to specific peer
567                                // This ensures the relay server forwards it properly
568                                if let Err(e) = self.say(&response, None).await {
569                                    eprintln!("Error broadcasting get response: {}", e);
570                                } else {
571                                    eprintln!("DEBUG: Get response broadcast successfully");
572                                }
573                            }
574                        } else {
575                            eprintln!("DEBUG: Requested soul {} not found in graph", soul);
576                        }
577                    }
578                }
579            }
580        }
581
582        Ok(())
583    }
584
585    /// Handle peer ID exchange (DAM '?' message)
586    async fn handle_peer_id_exchange(&self, msg: &Value, peer: &Peer) -> GunResult<()> {
587        if let Some(pid) = msg.get("pid").and_then(|v| v.as_str()) {
588            // Update peer PID with minimal lock time
589            {
590                let mut peers = self.peers.write().await;
591                if let Some(p) = peers.get_mut(&peer.id) {
592                    p.pid = Some(pid.to_string());
593                }
594            } // Lock released before calling say()
595
596            // Reply with our PID (lock released to avoid deadlock)
597            self.say(
598                &serde_json::json!({
599                    "dam": "?",
600                    "pid": self.pid,
601                    "@": msg.get("#")
602                }),
603                Some(peer),
604            )
605            .await?;
606        }
607        Ok(())
608    }
609
610    /// Send message to peer(s) (matches mesh.say)
611    pub async fn say(&self, msg: &Value, peer: Option<&Peer>) -> GunResult<()> {
612        let mut msg = msg.clone();
613        
614        // Create message bytes for hashing and signing (without sigs field)
615        let mut msg_for_hash = msg.clone();
616        msg_for_hash.as_object_mut().unwrap().remove("sigs");
617        let msg_bytes = serde_json::to_vec(&msg_for_hash)?;
618        
619        // Generate message ID if not present - use SHA256 hash of message (without sigs)
620        if msg.get("#").is_none() {
621            let mut hasher = Sha256::new();
622            hasher.update(&msg_bytes);
623            let hash = hasher.finalize();
624            let hash_hex = hex::encode(hash);
625            msg["#"] = serde_json::Value::String(hash_hex);
626        }
627        
628        // Check if message already has signatures
629        let mut sigs_array = if let Some(sigs) = msg.get("sigs").and_then(|v| v.as_array()) {
630            sigs.clone()
631        } else {
632            Vec::new()
633        };
634        
635        // Check if my signature is already in the array
636        let my_pubkey_hex = hex::encode(self.public_key.to_bytes());
637        let has_my_sig = sigs_array.iter().any(|sig_obj| {
638            sig_obj.get("pubkey")
639                .and_then(|v| v.as_str())
640                .map(|pk| pk == my_pubkey_hex)
641                .unwrap_or(false)
642        });
643        
644        // If my signature is not present, add it
645        if !has_my_sig {
646            let signature = sign(&self.secret_key, &msg_bytes);
647            let signature_hex = hex::encode(signature.to_bytes());
648            
649            let sig_entry = serde_json::json!({
650                "sig": signature_hex,
651                "pubkey": my_pubkey_hex
652            });
653            sigs_array.push(sig_entry);
654        }
655        
656        // Add signatures array to message
657        msg["sigs"] = serde_json::Value::Array(sigs_array);
658
659        let raw = serde_json::to_string(&msg)?;
660
661        if let Some(p) = peer {
662            self.send_to_peer_by_id(&raw, &p.id).await?;
663        } else {
664            // Broadcast to all peers - clone IDs first to avoid holding lock during async calls
665            let peer_ids: Vec<String> = {
666                let peers = self.peers.read().await;
667                let ids: Vec<String> = peers.keys().cloned().collect();
668                eprintln!("DEBUG: Broadcasting message to {} peers: {:?}", ids.len(), ids);
669                ids
670            };
671
672            // Now send to each peer without holding the lock
673            for peer_id in peer_ids {
674                eprintln!("DEBUG: Attempting to send broadcast message to peer {}", peer_id);
675                if let Err(e) = self.send_to_peer_by_id(&raw, &peer_id).await {
676                    eprintln!("Error sending to peer {}: {}", peer_id, e);
677                    // Continue sending to other peers even if one fails
678                } else {
679                    eprintln!("DEBUG: Successfully sent broadcast message to peer {}", peer_id);
680                }
681            }
682        }
683
684        Ok(())
685    }
686
687    /// Send raw message to a specific peer by ID
688    /// Routes through WebSocket connection if available, otherwise queues
689    pub(crate) async fn send_to_peer_by_id(&self, raw: &str, peer_id: &str) -> GunResult<()> {
690        // Try to get the sender without holding the lock for long
691        let tx_opt = {
692            let peers = self.peers.read().await;
693            if let Some(peer) = peers.get(peer_id) {
694                eprintln!("DEBUG: Found peer {} with sender available", peer_id);
695                peer.tx.clone() // Clone the Sender to release the lock immediately
696            } else {
697                eprintln!("DEBUG: Peer {} not found in peers list ({} total peers)", peer_id, peers.len());
698                None // Peer not found
699            }
700        };
701
702        if let Some(tx) = tx_opt {
703            // Send immediately through WebSocket (no lock held)
704            let msg_preview = raw.chars().take(150).collect::<String>();
705            eprintln!("DEBUG: Sending message to WebSocket for peer {}: {}", peer_id, msg_preview);
706            tx.send(raw.to_string()).map_err(|e| {
707                eprintln!("DEBUG: WebSocket send error for peer {}: {}", peer_id, e);
708                crate::error::GunError::Network(format!(
709                    "Failed to send to peer {}: {}",
710                    peer_id, e
711                ))
712            })?;
713            eprintln!("DEBUG: Message sent successfully to WebSocket for peer {}", peer_id);
714            return Ok(());
715        }
716
717        // No sender available - queue the message
718        // Use a short write lock to add to queue
719        {
720            let mut peers = self.peers.write().await;
721            if let Some(peer) = peers.get_mut(peer_id) {
722                peer.queue.push(raw.to_string());
723                // Don't warn - this is expected during initial connection
724            } else {
725                // Peer doesn't exist - this is fine, they'll get it when they connect
726                return Ok(());
727            }
728        } // Lock released here
729
730        Ok(())
731    }
732
733    /// Send raw message to a specific peer (by Peer reference)
734    /// Routes through WebSocket connection if available, otherwise queues
735    #[allow(dead_code)] // Used internally for peer communication
736    async fn send_to_peer(&self, raw: &str, peer: &Peer) -> GunResult<()> {
737        self.send_to_peer_by_id(raw, &peer.id).await
738    }
739
740    /// Update peer with WebSocket sender (called when connection is established)
741    pub async fn set_peer_sender(
742        &self,
743        peer_id: &str,
744        tx: mpsc::UnboundedSender<String>,
745    ) -> GunResult<()> {
746        let mut peers = self.peers.write().await;
747        if let Some(peer) = peers.get_mut(peer_id) {
748            let tx_clone = tx.clone();
749            peer.set_sender(tx);
750            // Flush any queued messages
751            let queue = peer.queue.clone();
752            peer.queue.clear();
753            drop(peers); // Release lock as soon as possible
754
755            // Send queued messages (outside of lock to avoid deadlocks)
756            for msg in queue {
757                if let Err(e) = tx_clone.send(msg) {
758                    eprintln!("Error sending queued message: {}", e);
759                    break;
760                }
761            }
762        } else {
763            // Peer not found - this shouldn't happen if hi() was called first
764            drop(peers);
765            return Err(crate::error::GunError::Network(format!(
766                "Peer {} not found in mesh, call hi() first",
767                peer_id
768            )));
769        }
770        Ok(())
771    }
772
773    /// Add a peer (matches mesh.hi)
774    pub async fn hi(&self, peer: Peer) -> GunResult<()> {
775        let mut peers = self.peers.write().await;
776        let was_new = !peers.contains_key(&peer.id);
777        let peer_id = peer.id.clone();
778        peers.insert(peer_id.clone(), peer.clone());
779        drop(peers);
780
781        if was_new {
782            let mut near = self.near.write().await;
783            *near += 1;
784            drop(near);
785
786            // Send "hi" message to introduce ourselves to the new peer
787            // We do this after releasing the lock to avoid deadlocks
788            // The "hi" message contains our peer ID (pid) for DAM protocol
789            // Use say() to ensure the message is signed
790            let hi_message = serde_json::json!({
791                "dam": "?",
792                "pid": self.pid,
793            });
794            
795            // Send "hi" message using say() which will sign it
796            if let Err(e) = self.say(&hi_message, Some(&peer)).await {
797                tracing::warn!("Failed to send hi message to peer {}: {}", peer_id, e);
798            }
799        }
800        Ok(())
801    }
802
803    /// Remove a peer (matches mesh.bye)
804    pub async fn bye(&self, peer_id: &str) -> GunResult<()> {
805        let mut peers = self.peers.write().await;
806        if peers.remove(peer_id).is_some() {
807            let mut near = self.near.write().await;
808            if *near > 0 {
809                *near -= 1;
810            }
811        }
812        Ok(())
813    }
814
815    /// Get the number of connected peers (peers with active WebSocket connections)
816    /// Acquires read lock with timeout to avoid indefinite blocking
817    pub async fn connected_peer_count(&self) -> usize {
818        use tokio::time::{timeout, Duration};
819
820        // Use timeout to prevent indefinite blocking
821        match timeout(Duration::from_millis(100), self.peers.read()).await {
822            Ok(peers) => peers.values().filter(|p| p.tx.is_some()).count(),
823            Err(_) => {
824                // Lock acquisition timed out - this shouldn't happen in normal operation
825                // but we return 0 to indicate we can't determine the count
826                0
827            }
828        }
829    }
830
831    /// Check if any peers are connected
832    pub async fn has_connected_peers(&self) -> bool {
833        self.connected_peer_count().await > 0
834    }
835
836    /// Wait for at least one peer to be connected, with timeout
837    pub async fn wait_for_connection(&self, timeout_ms: u64) -> bool {
838        use tokio::time::{sleep, Duration, Instant};
839        let deadline = Instant::now() + Duration::from_millis(timeout_ms);
840
841        while Instant::now() < deadline {
842            if self.has_connected_peers().await {
843                return true;
844            }
845            sleep(Duration::from_millis(100)).await;
846        }
847        false
848    }
849
850    /// Get a peer by ID
851    pub async fn get_peer(&self, peer_id: &str) -> Option<Peer> {
852        let peers = self.peers.read().await;
853        peers.get(peer_id).cloned()
854    }
855}