gun/dam.rs
1//! DAM (Directed Acyclic Mesh) protocol implementation
2//!
3//! This module implements Gun's DAM protocol for peer-to-peer message routing with
4//! automatic deduplication and cryptographic signing. DAM ensures:
5//!
6//! - **No message loops**: Messages are deduplicated using message IDs
7//! - **Cryptographic security**: All messages are signed with BLS signatures
8//! - **Reliable delivery**: Automatic retry and queuing for offline peers
9//! - **Efficient routing**: Messages are routed through the peer mesh
10//!
11//! Based on Gun.js `mesh.js`. The DAM protocol is the foundation of Gun's
12//! decentralized networking.
13//!
14//! ## Message Format
15//!
16//! All DAM messages include:
17//! - `#`: Message ID (SHA256 hash of message without signatures)
18//! - `sigs`: Array of BLS signatures and public keys
19//! - Message payload (e.g., `put`, `get`, `dam`)
20//!
21//! ## Peer Management
22//!
23//! The `Mesh` struct manages all peer connections:
24//! - Tracks connected peers
25//! - Routes messages to peers
26//! - Handles message signing and verification
27//! - Manages peer public keys for verification
28
29use crate::core::GunCore;
30use crate::dup::Dup;
31use crate::error::GunResult;
32use crate::types::MessagePredicate;
33use chia_bls::{PublicKey, SecretKey, Signature, sign, verify};
34use serde_json::Value;
35use sha2::{Sha256, Digest};
36use std::collections::HashMap;
37use std::sync::Arc;
38use tokio::sync::{mpsc, RwLock};
39
40/// Represents a peer connection in the DAM mesh
41///
42/// Peers are identified by a URL and have associated connection state including
43/// message queues, batching information, and retry logic.
44///
45/// # Fields
46///
47/// - `id`: Unique identifier for this peer connection
48/// - `url`: WebSocket URL of the peer
49/// - `pid`: Peer ID in the DAM mesh (set after handshake)
50/// - `tx`: Message sender channel for WebSocket communication
51/// - `batch`, `tail`, `queue`: Message batching and queuing
52/// - `last`: Last message ID sent (for ordering)
53/// - `retry`, `tried`: Retry logic for connection attempts
54#[derive(Clone, Debug)]
55pub struct Peer {
56 pub id: String,
57 pub url: String,
58 pub pid: Option<String>, // peer ID for DAM
59 pub tx: Option<mpsc::UnboundedSender<String>>, // WebSocket message sender
60 pub batch: Option<String>, // batched messages
61 pub tail: usize, // batch size
62 pub queue: Vec<String>, // queued messages
63 pub last: Option<String>, // last message ID sent
64 pub retry: i32,
65 pub tried: Option<u64>, // timestamp
66}
67
68impl Peer {
69 pub fn new(url: String) -> Self {
70 use std::sync::atomic::{AtomicU64, Ordering};
71 static COUNTER: AtomicU64 = AtomicU64::new(0);
72 let id = COUNTER.fetch_add(1, Ordering::SeqCst);
73 Self {
74 id: format!("peer_{}", id),
75 url,
76 pid: None,
77 tx: None,
78 batch: None,
79 tail: 0,
80 queue: vec![],
81 last: None,
82 retry: 60,
83 tried: None,
84 }
85 }
86
87 /// Set the WebSocket message sender
88 pub fn set_sender(&mut self, tx: mpsc::UnboundedSender<String>) {
89 self.tx = Some(tx);
90 }
91
92 /// Send a message through the WebSocket connection
93 pub async fn send(&self, message: &str) -> GunResult<()> {
94 if let Some(ref tx) = self.tx {
95 tx.send(message.to_string()).map_err(|e| {
96 crate::error::GunError::Network(format!("Failed to send message: {}", e))
97 })?;
98 } else {
99 // Peer not connected - message will be dropped
100 // Note: In a full implementation, messages could be queued for later delivery
101 // Currently, Mesh.send_to_peer_by_id() will return an error if peer is not found
102 // This is acceptable behavior as peers will reconnect and sync state
103 }
104 Ok(())
105 }
106}
107
108/// DAM Mesh - handles message routing and peer communication
109///
110/// The mesh is the central coordinator for all peer-to-peer communication in Gun.
111/// It manages:
112///
113/// - Peer connections and routing
114/// - Message signing and verification (BLS signatures)
115/// - Message deduplication
116/// - Message broadcasting and targeted delivery
117/// - Peer public key management
118///
119/// Based on Gun.js `mesh.js`. The mesh is thread-safe and can be shared across
120/// threads using `Arc<Mesh>`.
121///
122/// # Example
123///
124/// ```rust,no_run
125/// use gun::dam::Mesh;
126/// use gun::core::GunCore;
127/// use chia_bls::{SecretKey, PublicKey};
128/// use std::sync::Arc;
129///
130/// let core = Arc::new(GunCore::new());
131/// let secret_key = SecretKey::from_seed(&[0u8; 32]);
132/// let public_key = secret_key.public_key();
133/// let mesh = Arc::new(Mesh::new(core, secret_key, public_key, None));
134/// ```
135pub struct Mesh {
136 pub dup: Arc<RwLock<Dup>>,
137 peers: Arc<RwLock<HashMap<String, Peer>>>,
138 core: Arc<GunCore>,
139 pub near: Arc<RwLock<usize>>, // number of connected peers
140 pub pid: String, // our peer ID
141 opt: MeshOptions,
142 secret_key: SecretKey, // BLS secret key for signing outgoing messages
143 public_key: PublicKey, // BLS public key (our own, for reference)
144 peer_public_keys: Arc<RwLock<HashMap<String, PublicKey>>>, // Map peer_id -> public_key for verification
145 message_predicate: Option<MessagePredicate>, // Optional predicate for custom message filtering
146}
147
148/// Configuration options for the DAM mesh
149///
150/// These options control message batching, size limits, and retry behavior.
151#[derive(Clone, Debug)]
152pub struct MeshOptions {
153 pub max_message_size: usize, // default 300MB * 0.3
154 pub pack_size: usize, // batch size
155 pub gap: u64, // batching delay in ms
156 pub retry: i32,
157 pub lack: u64, // lack timeout
158}
159
160impl Default for MeshOptions {
161 fn default() -> Self {
162 Self {
163 max_message_size: (300_000_000.0 * 0.3) as usize,
164 pack_size: ((300_000_000.0 * 0.3 * 0.01 * 0.01) as usize),
165 gap: 0,
166 retry: 60,
167 lack: 9000,
168 }
169 }
170}
171
172impl Mesh {
173 pub fn new(core: Arc<GunCore>, secret_key: SecretKey, public_key: PublicKey, message_predicate: Option<MessagePredicate>) -> Self {
174 let pid = core.random_id(9);
175 Self {
176 dup: Arc::new(RwLock::new(Dup::new_default())),
177 peers: Arc::new(RwLock::new(HashMap::new())),
178 core,
179 near: Arc::new(RwLock::new(0)),
180 pid,
181 opt: MeshOptions::default(),
182 secret_key,
183 public_key,
184 peer_public_keys: Arc::new(RwLock::new(HashMap::new())),
185 message_predicate,
186 }
187 }
188
189 /// Handle incoming message (matches mesh.hear)
190 pub async fn hear(&self, raw: &str, peer: Option<&Peer>) -> GunResult<()> {
191 if raw.is_empty() {
192 return Ok(());
193 }
194
195 let peer_id = peer.map(|p| p.id.clone()).unwrap_or_else(|| "unknown".to_string());
196 eprintln!("DEBUG: mesh.hear() received message from peer {}: {}", peer_id, raw.chars().take(200).collect::<String>());
197
198 // Check message size
199 if raw.len() > self.opt.max_message_size {
200 if let Some(p) = peer {
201 self.say(
202 &serde_json::json!({
203 "dam": "!",
204 "err": "Message too big!"
205 }),
206 Some(p),
207 )
208 .await?;
209 }
210 return Ok(());
211 }
212
213 // Handle batched messages (JSON array)
214 if raw.starts_with('[') {
215 let messages: Vec<Value> = serde_json::from_str(raw)?;
216 eprintln!("DEBUG: Processing {} batched messages from peer {}", messages.len(), peer_id);
217 for msg in messages {
218 self.hear_one(&msg, peer).await?;
219 }
220 return Ok(());
221 }
222
223 // Handle single message
224 let msg: Value = serde_json::from_str(raw)?;
225 self.hear_one(&msg, peer).await?;
226 Ok(())
227 }
228
229 /// Handle a single message (matches mesh.hear.one)
230 async fn hear_one(&self, msg: &Value, peer: Option<&Peer>) -> GunResult<()> {
231 // Get message ID (should be SHA256 hash of message without sigs)
232 let msg_id = msg
233 .get("#")
234 .and_then(|v| v.as_str())
235 .map(|s| s.to_string())
236 .ok_or_else(|| {
237 crate::error::GunError::Network("Message missing ID (#) field".to_string())
238 })?;
239
240 // Create message bytes for verification (without sigs field)
241 let mut msg_for_hash = msg.clone();
242 msg_for_hash.as_object_mut().unwrap().remove("sigs");
243 let msg_bytes = serde_json::to_vec(&msg_for_hash)?;
244
245 // Verify that the message ID matches the SHA256 hash of the message (without sigs)
246 let mut hasher = Sha256::new();
247 hasher.update(&msg_bytes);
248 let computed_hash = hasher.finalize();
249 let computed_hash_hex = hex::encode(computed_hash);
250
251 if msg_id != computed_hash_hex {
252 eprintln!("DEBUG: Message ID hash mismatch. Expected: {}, Got: {}", computed_hash_hex, msg_id);
253 return Ok(()); // Reject message with invalid hash
254 }
255
256 // Verify all signatures in the aggregate before processing
257 let sigs_array = if let Some(sigs) = msg.get("sigs").and_then(|v| v.as_array()) {
258 if sigs.is_empty() {
259 eprintln!("DEBUG: Message missing signatures from peer {:?}", peer.map(|p| &p.id));
260 return Ok(()); // Reject message without signatures
261 }
262 sigs
263 } else {
264 // Legacy format: try to read single sig/pubkey for backward compatibility
265 // But we'll still require sigs array going forward
266 eprintln!("DEBUG: Message missing sigs array from peer {:?}", peer.map(|p| &p.id));
267 return Ok(()); // Reject message without sigs array
268 };
269
270 // msg_bytes already computed above for hash verification, reuse it for signature verification
271
272 // Verify ALL signatures in the aggregate
273 let mut verified_pubkeys = Vec::new();
274 for sig_entry in sigs_array {
275 let sig_hex = match sig_entry.get("sig").and_then(|v| v.as_str()) {
276 Some(hex) => hex,
277 None => {
278 eprintln!("DEBUG: Invalid signature entry: missing sig");
279 return Ok(()); // Reject message with invalid signature entry
280 }
281 };
282 let pubkey_hex = match sig_entry.get("pubkey").and_then(|v| v.as_str()) {
283 Some(hex) => hex,
284 None => {
285 eprintln!("DEBUG: Invalid signature entry: missing pubkey");
286 return Ok(()); // Reject message with invalid signature entry
287 }
288 };
289
290 // Decode signature and public key
291 let sig_bytes = match hex::decode(sig_hex) {
292 Ok(bytes) => bytes,
293 Err(e) => {
294 eprintln!("DEBUG: Invalid signature hex: {}", e);
295 return Ok(()); // Reject message with invalid signature hex
296 }
297 };
298 let pubkey_bytes = match hex::decode(pubkey_hex) {
299 Ok(bytes) => bytes,
300 Err(e) => {
301 eprintln!("DEBUG: Invalid public key hex: {}", e);
302 return Ok(()); // Reject message with invalid public key hex
303 }
304 };
305
306 // Convert to fixed-size arrays
307 if sig_bytes.len() != 96 {
308 eprintln!("DEBUG: Invalid signature length: expected 96 bytes, got {}", sig_bytes.len());
309 return Ok(()); // Reject message with invalid signature
310 }
311 if pubkey_bytes.len() != 48 {
312 eprintln!("DEBUG: Invalid public key length: expected 48 bytes, got {}", pubkey_bytes.len());
313 return Ok(()); // Reject message with invalid public key
314 }
315
316 let mut sig_array = [0u8; 96];
317 sig_array.copy_from_slice(&sig_bytes);
318 let mut pubkey_array = [0u8; 48];
319 pubkey_array.copy_from_slice(&pubkey_bytes);
320
321 let signature = match Signature::from_bytes(&sig_array) {
322 Ok(sig) => sig,
323 Err(e) => {
324 eprintln!("DEBUG: Invalid signature format: {}", e);
325 return Ok(()); // Reject message with invalid signature
326 }
327 };
328 let sender_pubkey = match PublicKey::from_bytes(&pubkey_array) {
329 Ok(pk) => pk,
330 Err(e) => {
331 eprintln!("DEBUG: Invalid public key format: {}", e);
332 return Ok(()); // Reject message with invalid public key
333 }
334 };
335
336 // Verify this signature
337 if !verify(&signature, &sender_pubkey, &msg_bytes) {
338 eprintln!("DEBUG: Signature verification failed for pubkey {} from peer {:?}", pubkey_hex, peer.map(|p| &p.id));
339 return Ok(()); // Reject message if any signature is invalid
340 }
341
342 verified_pubkeys.push(sender_pubkey);
343 }
344
345 // Store peer's public keys for future reference
346 if let Some(p) = peer {
347 let mut peer_keys = self.peer_public_keys.write().await;
348 for pubkey in &verified_pubkeys {
349 peer_keys.insert(format!("{}:{}", p.id, hex::encode(pubkey.to_bytes())), pubkey.clone());
350 }
351 }
352
353 // Check if my signature is already in the aggregate
354 let my_pubkey_hex = hex::encode(self.public_key.to_bytes());
355 let has_my_sig = sigs_array.iter().any(|sig_obj| {
356 sig_obj.get("pubkey")
357 .and_then(|v| v.as_str())
358 .map(|pk| pk == my_pubkey_hex)
359 .unwrap_or(false)
360 });
361
362 // If my signature is not present, add it and re-broadcast (but exclude the sender)
363 if !has_my_sig {
364 // Sign the message
365 let signature = sign(&self.secret_key, &msg_bytes);
366 let signature_hex = hex::encode(signature.to_bytes());
367
368 // Add my signature to the array
369 let mut updated_msg = msg.clone();
370 let mut updated_sigs = sigs_array.to_vec();
371 let sig_entry = serde_json::json!({
372 "sig": signature_hex,
373 "pubkey": my_pubkey_hex
374 });
375 updated_sigs.push(sig_entry);
376 updated_msg["sigs"] = serde_json::Value::Array(updated_sigs);
377
378 // Re-broadcast to all peers except the one that sent it to us
379 let sender_id = peer.map(|p| p.id.clone());
380 let peer_ids: Vec<String> = {
381 let peers = self.peers.read().await;
382 peers.keys()
383 .filter(|id| Some((**id).clone()) != sender_id)
384 .cloned()
385 .collect()
386 };
387
388 let updated_raw = serde_json::to_string(&updated_msg)?;
389 for peer_id in peer_ids {
390 if let Err(e) = self.send_to_peer_by_id(&updated_raw, &peer_id).await {
391 eprintln!("Error re-broadcasting signed message to peer {}: {}", peer_id, e);
392 }
393 }
394 }
395
396 // Check custom message predicate (application-level filtering)
397 // This runs after signature verification but before message processing
398 if let Some(ref predicate) = self.message_predicate {
399 if !predicate(msg) {
400 eprintln!("DEBUG: Message rejected by custom predicate from peer {:?}", peer.map(|p| &p.id));
401 return Ok(()); // Reject message based on predicate
402 }
403 }
404
405 // Deduplication check
406 {
407 let mut dup = self.dup.write().await;
408 if dup.check(&msg_id) {
409 return Ok(()); // duplicate, ignore
410 }
411 dup.track(&msg_id);
412 }
413
414 // Handle special DAM messages
415 if let Some(dam_type) = msg.get("dam").and_then(|v| v.as_str()) {
416 match dam_type {
417 "!" => {
418 // Error message
419 if let Some(p) = peer {
420 if let Some(err) = msg.get("err").and_then(|v| v.as_str()) {
421 eprintln!("DAM Error from peer {}: {}", p.id, err);
422 }
423 }
424 }
425 "?" => {
426 // Peer ID exchange
427 if let Some(p) = peer {
428 self.handle_peer_id_exchange(msg, p).await?;
429 }
430 }
431 "rtc" => {
432 // WebRTC signaling message - these are handled at the Gun level
433 // to avoid circular dependencies between Mesh and WebRTCManager
434 tracing::debug!("Received RTC signaling message via DAM protocol");
435 }
436 _ => {
437 // Other DAM message types
438 }
439 }
440 return Ok(());
441 }
442
443 // Process Gun protocol messages (put, get)
444 if let Some(put_data) = msg.get("put") {
445 eprintln!("DEBUG: Received put message: {}", serde_json::to_string(msg).unwrap_or_default());
446 // Handle put message - update graph and emit node_update event
447 // Gun.js format: { put: { soul: { _: { "#": soul, ">": states }, ...data } } }
448 // The soul is a KEY in the put object, not a field
449 if let Some(put_obj) = put_data.as_object() {
450 // Iterate over each soul in the put object
451 for (soul, node_data) in put_obj {
452 if let Some(node_obj) = node_data.as_object() {
453 // Extract metadata from "_" field
454 let meta = node_obj.get("_").and_then(|v| v.as_object());
455 let soul_from_meta = meta.and_then(|m| m.get("#")).and_then(|v| v.as_str()).unwrap_or(soul);
456
457 // Extract state map from ">" field in metadata
458 let states = meta.and_then(|m| m.get(">")).and_then(|v| v.as_object());
459
460 // Update graph
461 use crate::state::Node;
462 let mut node = self.core.graph.get(soul_from_meta)
463 .unwrap_or_else(|| Node::with_soul(soul_from_meta.to_string()));
464
465 // Merge all fields from node_obj into node (except "_" which is metadata)
466 for (key, value) in node_obj {
467 if key != "_" {
468 // Get state for this key from states map if available
469 let state = states.and_then(|s| s.get(key))
470 .and_then(|v| v.as_f64())
471 .unwrap_or_else(|| self.core.state.next());
472
473 node.data.insert(key.clone(), value.clone());
474 crate::state::State::ify(&mut node, Some(&key), Some(state), Some(value.clone()), Some(soul_from_meta));
475 }
476 }
477
478 // Store updated node
479 if let Err(e) = self.core.graph.put(soul_from_meta, node.clone()) {
480 eprintln!("Error updating graph for soul {}: {}", soul_from_meta, e);
481 } else {
482 eprintln!("DEBUG: Updated graph for soul {} (from peer), emitting node_update event. Node data keys: {:?}", soul_from_meta, node.data.keys().collect::<Vec<_>>());
483 // Emit node_update event so once() and on() callbacks get called
484 let event_type = format!("node_update:{}", soul_from_meta);
485 self.core.events.emit(&crate::events::Event {
486 event_type: event_type.clone(),
487 data: serde_json::Value::Object(node.data.clone()),
488 });
489 // Also emit graph_update for listeners that don't have a specific soul yet
490 self.core.events.emit(&crate::events::Event {
491 event_type: "graph_update".to_string(),
492 data: serde_json::json!({
493 soul_from_meta: serde_json::Value::Object(node.data.clone())
494 }),
495 });
496 }
497 }
498 }
499 }
500 } else if let Some(get_data) = msg.get("get") {
501 eprintln!("DEBUG: Received get message: {}", serde_json::to_string(msg).unwrap_or_default());
502 // Handle get message - respond with requested data
503 if let Some(get_obj) = get_data.as_object() {
504 if let Some(soul_val) = get_obj.get("#") {
505 if let Some(soul) = soul_val.as_str() {
506 // Check if we have the requested node
507 if let Some(node) = self.core.graph.get(soul) {
508 // Check if get request has a key (for nested properties)
509 if let Some(key_val) = get_obj.get(".") {
510 if let Some(key) = key_val.as_str() {
511 // Requesting a specific key - check if it's a soul reference
512 if let Some(value) = node.data.get(key) {
513 if let Some(obj) = value.as_object() {
514 if let Some(soul_ref) = obj.get("#") {
515 if let Some(ref_soul) = soul_ref.as_str() {
516 // It's a soul reference - get the referenced node
517 if let Some(ref_node) = self.core.graph.get(ref_soul) {
518 let mut put_obj = serde_json::json!({
519 "#": ref_soul
520 });
521 for (k, v) in &ref_node.data {
522 put_obj[k] = v.clone();
523 }
524 let response = serde_json::json!({
525 "put": put_obj
526 });
527 eprintln!("DEBUG: Sending get response for nested soul {} (key: {}) to peer", ref_soul, key);
528 if let Some(p) = peer {
529 if let Err(e) = self.say(&response, Some(p)).await {
530 eprintln!("Error sending get response to peer {}: {}", p.id, e);
531 }
532 }
533 }
534 }
535 } else {
536 // Not a soul reference - return the value directly
537 let put_obj = serde_json::json!({
538 "#": soul,
539 key: value.clone()
540 });
541 let response = serde_json::json!({
542 "put": put_obj
543 });
544 eprintln!("DEBUG: Sending get response for key {} in soul {} to peer", key, soul);
545 if let Some(p) = peer {
546 if let Err(e) = self.say(&response, Some(p)).await {
547 eprintln!("Error sending get response to peer {}: {}", p.id, e);
548 }
549 }
550 }
551 }
552 }
553 }
554 } else {
555 // No key specified - return entire node
556 let mut put_obj = serde_json::json!({
557 "#": soul
558 });
559 for (key, value) in &node.data {
560 put_obj[key] = value.clone();
561 }
562 let response = serde_json::json!({
563 "put": put_obj
564 });
565 eprintln!("DEBUG: Sending get response for soul {} to peer. Response: {}", soul, serde_json::to_string(&response).unwrap_or_default());
566 // Broadcast the response instead of sending to specific peer
567 // This ensures the relay server forwards it properly
568 if let Err(e) = self.say(&response, None).await {
569 eprintln!("Error broadcasting get response: {}", e);
570 } else {
571 eprintln!("DEBUG: Get response broadcast successfully");
572 }
573 }
574 } else {
575 eprintln!("DEBUG: Requested soul {} not found in graph", soul);
576 }
577 }
578 }
579 }
580 }
581
582 Ok(())
583 }
584
585 /// Handle peer ID exchange (DAM '?' message)
586 async fn handle_peer_id_exchange(&self, msg: &Value, peer: &Peer) -> GunResult<()> {
587 if let Some(pid) = msg.get("pid").and_then(|v| v.as_str()) {
588 // Update peer PID with minimal lock time
589 {
590 let mut peers = self.peers.write().await;
591 if let Some(p) = peers.get_mut(&peer.id) {
592 p.pid = Some(pid.to_string());
593 }
594 } // Lock released before calling say()
595
596 // Reply with our PID (lock released to avoid deadlock)
597 self.say(
598 &serde_json::json!({
599 "dam": "?",
600 "pid": self.pid,
601 "@": msg.get("#")
602 }),
603 Some(peer),
604 )
605 .await?;
606 }
607 Ok(())
608 }
609
610 /// Send message to peer(s) (matches mesh.say)
611 pub async fn say(&self, msg: &Value, peer: Option<&Peer>) -> GunResult<()> {
612 let mut msg = msg.clone();
613
614 // Create message bytes for hashing and signing (without sigs field)
615 let mut msg_for_hash = msg.clone();
616 msg_for_hash.as_object_mut().unwrap().remove("sigs");
617 let msg_bytes = serde_json::to_vec(&msg_for_hash)?;
618
619 // Generate message ID if not present - use SHA256 hash of message (without sigs)
620 if msg.get("#").is_none() {
621 let mut hasher = Sha256::new();
622 hasher.update(&msg_bytes);
623 let hash = hasher.finalize();
624 let hash_hex = hex::encode(hash);
625 msg["#"] = serde_json::Value::String(hash_hex);
626 }
627
628 // Check if message already has signatures
629 let mut sigs_array = if let Some(sigs) = msg.get("sigs").and_then(|v| v.as_array()) {
630 sigs.clone()
631 } else {
632 Vec::new()
633 };
634
635 // Check if my signature is already in the array
636 let my_pubkey_hex = hex::encode(self.public_key.to_bytes());
637 let has_my_sig = sigs_array.iter().any(|sig_obj| {
638 sig_obj.get("pubkey")
639 .and_then(|v| v.as_str())
640 .map(|pk| pk == my_pubkey_hex)
641 .unwrap_or(false)
642 });
643
644 // If my signature is not present, add it
645 if !has_my_sig {
646 let signature = sign(&self.secret_key, &msg_bytes);
647 let signature_hex = hex::encode(signature.to_bytes());
648
649 let sig_entry = serde_json::json!({
650 "sig": signature_hex,
651 "pubkey": my_pubkey_hex
652 });
653 sigs_array.push(sig_entry);
654 }
655
656 // Add signatures array to message
657 msg["sigs"] = serde_json::Value::Array(sigs_array);
658
659 let raw = serde_json::to_string(&msg)?;
660
661 if let Some(p) = peer {
662 self.send_to_peer_by_id(&raw, &p.id).await?;
663 } else {
664 // Broadcast to all peers - clone IDs first to avoid holding lock during async calls
665 let peer_ids: Vec<String> = {
666 let peers = self.peers.read().await;
667 let ids: Vec<String> = peers.keys().cloned().collect();
668 eprintln!("DEBUG: Broadcasting message to {} peers: {:?}", ids.len(), ids);
669 ids
670 };
671
672 // Now send to each peer without holding the lock
673 for peer_id in peer_ids {
674 eprintln!("DEBUG: Attempting to send broadcast message to peer {}", peer_id);
675 if let Err(e) = self.send_to_peer_by_id(&raw, &peer_id).await {
676 eprintln!("Error sending to peer {}: {}", peer_id, e);
677 // Continue sending to other peers even if one fails
678 } else {
679 eprintln!("DEBUG: Successfully sent broadcast message to peer {}", peer_id);
680 }
681 }
682 }
683
684 Ok(())
685 }
686
687 /// Send raw message to a specific peer by ID
688 /// Routes through WebSocket connection if available, otherwise queues
689 pub(crate) async fn send_to_peer_by_id(&self, raw: &str, peer_id: &str) -> GunResult<()> {
690 // Try to get the sender without holding the lock for long
691 let tx_opt = {
692 let peers = self.peers.read().await;
693 if let Some(peer) = peers.get(peer_id) {
694 eprintln!("DEBUG: Found peer {} with sender available", peer_id);
695 peer.tx.clone() // Clone the Sender to release the lock immediately
696 } else {
697 eprintln!("DEBUG: Peer {} not found in peers list ({} total peers)", peer_id, peers.len());
698 None // Peer not found
699 }
700 };
701
702 if let Some(tx) = tx_opt {
703 // Send immediately through WebSocket (no lock held)
704 let msg_preview = raw.chars().take(150).collect::<String>();
705 eprintln!("DEBUG: Sending message to WebSocket for peer {}: {}", peer_id, msg_preview);
706 tx.send(raw.to_string()).map_err(|e| {
707 eprintln!("DEBUG: WebSocket send error for peer {}: {}", peer_id, e);
708 crate::error::GunError::Network(format!(
709 "Failed to send to peer {}: {}",
710 peer_id, e
711 ))
712 })?;
713 eprintln!("DEBUG: Message sent successfully to WebSocket for peer {}", peer_id);
714 return Ok(());
715 }
716
717 // No sender available - queue the message
718 // Use a short write lock to add to queue
719 {
720 let mut peers = self.peers.write().await;
721 if let Some(peer) = peers.get_mut(peer_id) {
722 peer.queue.push(raw.to_string());
723 // Don't warn - this is expected during initial connection
724 } else {
725 // Peer doesn't exist - this is fine, they'll get it when they connect
726 return Ok(());
727 }
728 } // Lock released here
729
730 Ok(())
731 }
732
733 /// Send raw message to a specific peer (by Peer reference)
734 /// Routes through WebSocket connection if available, otherwise queues
735 #[allow(dead_code)] // Used internally for peer communication
736 async fn send_to_peer(&self, raw: &str, peer: &Peer) -> GunResult<()> {
737 self.send_to_peer_by_id(raw, &peer.id).await
738 }
739
740 /// Update peer with WebSocket sender (called when connection is established)
741 pub async fn set_peer_sender(
742 &self,
743 peer_id: &str,
744 tx: mpsc::UnboundedSender<String>,
745 ) -> GunResult<()> {
746 let mut peers = self.peers.write().await;
747 if let Some(peer) = peers.get_mut(peer_id) {
748 let tx_clone = tx.clone();
749 peer.set_sender(tx);
750 // Flush any queued messages
751 let queue = peer.queue.clone();
752 peer.queue.clear();
753 drop(peers); // Release lock as soon as possible
754
755 // Send queued messages (outside of lock to avoid deadlocks)
756 for msg in queue {
757 if let Err(e) = tx_clone.send(msg) {
758 eprintln!("Error sending queued message: {}", e);
759 break;
760 }
761 }
762 } else {
763 // Peer not found - this shouldn't happen if hi() was called first
764 drop(peers);
765 return Err(crate::error::GunError::Network(format!(
766 "Peer {} not found in mesh, call hi() first",
767 peer_id
768 )));
769 }
770 Ok(())
771 }
772
773 /// Add a peer (matches mesh.hi)
774 pub async fn hi(&self, peer: Peer) -> GunResult<()> {
775 let mut peers = self.peers.write().await;
776 let was_new = !peers.contains_key(&peer.id);
777 let peer_id = peer.id.clone();
778 peers.insert(peer_id.clone(), peer.clone());
779 drop(peers);
780
781 if was_new {
782 let mut near = self.near.write().await;
783 *near += 1;
784 drop(near);
785
786 // Send "hi" message to introduce ourselves to the new peer
787 // We do this after releasing the lock to avoid deadlocks
788 // The "hi" message contains our peer ID (pid) for DAM protocol
789 // Use say() to ensure the message is signed
790 let hi_message = serde_json::json!({
791 "dam": "?",
792 "pid": self.pid,
793 });
794
795 // Send "hi" message using say() which will sign it
796 if let Err(e) = self.say(&hi_message, Some(&peer)).await {
797 tracing::warn!("Failed to send hi message to peer {}: {}", peer_id, e);
798 }
799 }
800 Ok(())
801 }
802
803 /// Remove a peer (matches mesh.bye)
804 pub async fn bye(&self, peer_id: &str) -> GunResult<()> {
805 let mut peers = self.peers.write().await;
806 if peers.remove(peer_id).is_some() {
807 let mut near = self.near.write().await;
808 if *near > 0 {
809 *near -= 1;
810 }
811 }
812 Ok(())
813 }
814
815 /// Get the number of connected peers (peers with active WebSocket connections)
816 /// Acquires read lock with timeout to avoid indefinite blocking
817 pub async fn connected_peer_count(&self) -> usize {
818 use tokio::time::{timeout, Duration};
819
820 // Use timeout to prevent indefinite blocking
821 match timeout(Duration::from_millis(100), self.peers.read()).await {
822 Ok(peers) => peers.values().filter(|p| p.tx.is_some()).count(),
823 Err(_) => {
824 // Lock acquisition timed out - this shouldn't happen in normal operation
825 // but we return 0 to indicate we can't determine the count
826 0
827 }
828 }
829 }
830
831 /// Check if any peers are connected
832 pub async fn has_connected_peers(&self) -> bool {
833 self.connected_peer_count().await > 0
834 }
835
836 /// Wait for at least one peer to be connected, with timeout
837 pub async fn wait_for_connection(&self, timeout_ms: u64) -> bool {
838 use tokio::time::{sleep, Duration, Instant};
839 let deadline = Instant::now() + Duration::from_millis(timeout_ms);
840
841 while Instant::now() < deadline {
842 if self.has_connected_peers().await {
843 return true;
844 }
845 sleep(Duration::from_millis(100)).await;
846 }
847 false
848 }
849
850 /// Get a peer by ID
851 pub async fn get_peer(&self, peer_id: &str) -> Option<Peer> {
852 let peers = self.peers.read().await;
853 peers.get(peer_id).cloned()
854 }
855}