use crate::core::GunCore;
use crate::dup::Dup;
use crate::error::GunResult;
use crate::types::MessagePredicate;
use chia_bls::{PublicKey, SecretKey, Signature, sign, verify};
use serde_json::Value;
use sha2::{Sha256, Digest};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
#[derive(Clone, Debug)]
pub struct Peer {
pub id: String,
pub url: String,
pub pid: Option<String>, pub tx: Option<mpsc::UnboundedSender<String>>, pub batch: Option<String>, pub tail: usize, pub queue: Vec<String>, pub last: Option<String>, pub retry: i32,
pub tried: Option<u64>, }
impl Peer {
pub fn new(url: String) -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
Self {
id: format!("peer_{}", id),
url,
pid: None,
tx: None,
batch: None,
tail: 0,
queue: vec![],
last: None,
retry: 60,
tried: None,
}
}
pub fn set_sender(&mut self, tx: mpsc::UnboundedSender<String>) {
self.tx = Some(tx);
}
pub async fn send(&self, message: &str) -> GunResult<()> {
if let Some(ref tx) = self.tx {
tx.send(message.to_string()).map_err(|e| {
crate::error::GunError::Network(format!("Failed to send message: {}", e))
})?;
} else {
}
Ok(())
}
}
pub struct Mesh {
pub dup: Arc<RwLock<Dup>>,
peers: Arc<RwLock<HashMap<String, Peer>>>,
core: Arc<GunCore>,
pub near: Arc<RwLock<usize>>, pub pid: String, opt: MeshOptions,
secret_key: SecretKey, public_key: PublicKey, peer_public_keys: Arc<RwLock<HashMap<String, PublicKey>>>, message_predicate: Option<MessagePredicate>, }
#[derive(Clone, Debug)]
pub struct MeshOptions {
pub max_message_size: usize, pub pack_size: usize, pub gap: u64, pub retry: i32,
pub lack: u64, }
impl Default for MeshOptions {
fn default() -> Self {
Self {
max_message_size: (300_000_000.0 * 0.3) as usize,
pack_size: ((300_000_000.0 * 0.3 * 0.01 * 0.01) as usize),
gap: 0,
retry: 60,
lack: 9000,
}
}
}
impl Mesh {
pub fn new(core: Arc<GunCore>, secret_key: SecretKey, public_key: PublicKey, message_predicate: Option<MessagePredicate>) -> Self {
let pid = core.random_id(9);
Self {
dup: Arc::new(RwLock::new(Dup::new_default())),
peers: Arc::new(RwLock::new(HashMap::new())),
core,
near: Arc::new(RwLock::new(0)),
pid,
opt: MeshOptions::default(),
secret_key,
public_key,
peer_public_keys: Arc::new(RwLock::new(HashMap::new())),
message_predicate,
}
}
pub async fn hear(&self, raw: &str, peer: Option<&Peer>) -> GunResult<()> {
if raw.is_empty() {
return Ok(());
}
let peer_id = peer.map(|p| p.id.clone()).unwrap_or_else(|| "unknown".to_string());
eprintln!("DEBUG: mesh.hear() received message from peer {}: {}", peer_id, raw.chars().take(200).collect::<String>());
if raw.len() > self.opt.max_message_size {
if let Some(p) = peer {
self.say(
&serde_json::json!({
"dam": "!",
"err": "Message too big!"
}),
Some(p),
)
.await?;
}
return Ok(());
}
if raw.starts_with('[') {
let messages: Vec<Value> = serde_json::from_str(raw)?;
eprintln!("DEBUG: Processing {} batched messages from peer {}", messages.len(), peer_id);
for msg in messages {
self.hear_one(&msg, peer).await?;
}
return Ok(());
}
let msg: Value = serde_json::from_str(raw)?;
self.hear_one(&msg, peer).await?;
Ok(())
}
async fn hear_one(&self, msg: &Value, peer: Option<&Peer>) -> GunResult<()> {
let msg_id = msg
.get("#")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| {
crate::error::GunError::Network("Message missing ID (#) field".to_string())
})?;
let mut msg_for_hash = msg.clone();
msg_for_hash.as_object_mut().unwrap().remove("sigs");
let msg_bytes = serde_json::to_vec(&msg_for_hash)?;
let mut hasher = Sha256::new();
hasher.update(&msg_bytes);
let computed_hash = hasher.finalize();
let computed_hash_hex = hex::encode(computed_hash);
if msg_id != computed_hash_hex {
eprintln!("DEBUG: Message ID hash mismatch. Expected: {}, Got: {}", computed_hash_hex, msg_id);
return Ok(()); }
let sigs_array = if let Some(sigs) = msg.get("sigs").and_then(|v| v.as_array()) {
if sigs.is_empty() {
eprintln!("DEBUG: Message missing signatures from peer {:?}", peer.map(|p| &p.id));
return Ok(()); }
sigs
} else {
eprintln!("DEBUG: Message missing sigs array from peer {:?}", peer.map(|p| &p.id));
return Ok(()); };
let mut verified_pubkeys = Vec::new();
for sig_entry in sigs_array {
let sig_hex = match sig_entry.get("sig").and_then(|v| v.as_str()) {
Some(hex) => hex,
None => {
eprintln!("DEBUG: Invalid signature entry: missing sig");
return Ok(()); }
};
let pubkey_hex = match sig_entry.get("pubkey").and_then(|v| v.as_str()) {
Some(hex) => hex,
None => {
eprintln!("DEBUG: Invalid signature entry: missing pubkey");
return Ok(()); }
};
let sig_bytes = match hex::decode(sig_hex) {
Ok(bytes) => bytes,
Err(e) => {
eprintln!("DEBUG: Invalid signature hex: {}", e);
return Ok(()); }
};
let pubkey_bytes = match hex::decode(pubkey_hex) {
Ok(bytes) => bytes,
Err(e) => {
eprintln!("DEBUG: Invalid public key hex: {}", e);
return Ok(()); }
};
if sig_bytes.len() != 96 {
eprintln!("DEBUG: Invalid signature length: expected 96 bytes, got {}", sig_bytes.len());
return Ok(()); }
if pubkey_bytes.len() != 48 {
eprintln!("DEBUG: Invalid public key length: expected 48 bytes, got {}", pubkey_bytes.len());
return Ok(()); }
let mut sig_array = [0u8; 96];
sig_array.copy_from_slice(&sig_bytes);
let mut pubkey_array = [0u8; 48];
pubkey_array.copy_from_slice(&pubkey_bytes);
let signature = match Signature::from_bytes(&sig_array) {
Ok(sig) => sig,
Err(e) => {
eprintln!("DEBUG: Invalid signature format: {}", e);
return Ok(()); }
};
let sender_pubkey = match PublicKey::from_bytes(&pubkey_array) {
Ok(pk) => pk,
Err(e) => {
eprintln!("DEBUG: Invalid public key format: {}", e);
return Ok(()); }
};
if !verify(&signature, &sender_pubkey, &msg_bytes) {
eprintln!("DEBUG: Signature verification failed for pubkey {} from peer {:?}", pubkey_hex, peer.map(|p| &p.id));
return Ok(()); }
verified_pubkeys.push(sender_pubkey);
}
if let Some(p) = peer {
let mut peer_keys = self.peer_public_keys.write().await;
for pubkey in &verified_pubkeys {
peer_keys.insert(format!("{}:{}", p.id, hex::encode(pubkey.to_bytes())), pubkey.clone());
}
}
let my_pubkey_hex = hex::encode(self.public_key.to_bytes());
let has_my_sig = sigs_array.iter().any(|sig_obj| {
sig_obj.get("pubkey")
.and_then(|v| v.as_str())
.map(|pk| pk == my_pubkey_hex)
.unwrap_or(false)
});
if !has_my_sig {
let signature = sign(&self.secret_key, &msg_bytes);
let signature_hex = hex::encode(signature.to_bytes());
let mut updated_msg = msg.clone();
let mut updated_sigs = sigs_array.to_vec();
let sig_entry = serde_json::json!({
"sig": signature_hex,
"pubkey": my_pubkey_hex
});
updated_sigs.push(sig_entry);
updated_msg["sigs"] = serde_json::Value::Array(updated_sigs);
let sender_id = peer.map(|p| p.id.clone());
let peer_ids: Vec<String> = {
let peers = self.peers.read().await;
peers.keys()
.filter(|id| Some((**id).clone()) != sender_id)
.cloned()
.collect()
};
let updated_raw = serde_json::to_string(&updated_msg)?;
for peer_id in peer_ids {
if let Err(e) = self.send_to_peer_by_id(&updated_raw, &peer_id).await {
eprintln!("Error re-broadcasting signed message to peer {}: {}", peer_id, e);
}
}
}
if let Some(ref predicate) = self.message_predicate {
if !predicate(msg) {
eprintln!("DEBUG: Message rejected by custom predicate from peer {:?}", peer.map(|p| &p.id));
return Ok(()); }
}
{
let mut dup = self.dup.write().await;
if dup.check(&msg_id) {
return Ok(()); }
dup.track(&msg_id);
}
if let Some(dam_type) = msg.get("dam").and_then(|v| v.as_str()) {
match dam_type {
"!" => {
if let Some(p) = peer {
if let Some(err) = msg.get("err").and_then(|v| v.as_str()) {
eprintln!("DAM Error from peer {}: {}", p.id, err);
}
}
}
"?" => {
if let Some(p) = peer {
self.handle_peer_id_exchange(msg, p).await?;
}
}
"rtc" => {
tracing::debug!("Received RTC signaling message via DAM protocol");
}
_ => {
}
}
return Ok(());
}
if let Some(put_data) = msg.get("put") {
eprintln!("DEBUG: Received put message: {}", serde_json::to_string(msg).unwrap_or_default());
if let Some(put_obj) = put_data.as_object() {
for (soul, node_data) in put_obj {
if let Some(node_obj) = node_data.as_object() {
let meta = node_obj.get("_").and_then(|v| v.as_object());
let soul_from_meta = meta.and_then(|m| m.get("#")).and_then(|v| v.as_str()).unwrap_or(soul);
let states = meta.and_then(|m| m.get(">")).and_then(|v| v.as_object());
use crate::state::Node;
let mut node = self.core.graph.get(soul_from_meta)
.unwrap_or_else(|| Node::with_soul(soul_from_meta.to_string()));
for (key, value) in node_obj {
if key != "_" {
let state = states.and_then(|s| s.get(key))
.and_then(|v| v.as_f64())
.unwrap_or_else(|| self.core.state.next());
node.data.insert(key.clone(), value.clone());
crate::state::State::ify(&mut node, Some(&key), Some(state), Some(value.clone()), Some(soul_from_meta));
}
}
if let Err(e) = self.core.graph.put(soul_from_meta, node.clone()) {
eprintln!("Error updating graph for soul {}: {}", soul_from_meta, e);
} else {
eprintln!("DEBUG: Updated graph for soul {} (from peer), emitting node_update event. Node data keys: {:?}", soul_from_meta, node.data.keys().collect::<Vec<_>>());
let event_type = format!("node_update:{}", soul_from_meta);
self.core.events.emit(&crate::events::Event {
event_type: event_type.clone(),
data: serde_json::Value::Object(node.data.clone()),
});
self.core.events.emit(&crate::events::Event {
event_type: "graph_update".to_string(),
data: serde_json::json!({
soul_from_meta: serde_json::Value::Object(node.data.clone())
}),
});
}
}
}
}
} else if let Some(get_data) = msg.get("get") {
eprintln!("DEBUG: Received get message: {}", serde_json::to_string(msg).unwrap_or_default());
if let Some(get_obj) = get_data.as_object() {
if let Some(soul_val) = get_obj.get("#") {
if let Some(soul) = soul_val.as_str() {
if let Some(node) = self.core.graph.get(soul) {
if let Some(key_val) = get_obj.get(".") {
if let Some(key) = key_val.as_str() {
if let Some(value) = node.data.get(key) {
if let Some(obj) = value.as_object() {
if let Some(soul_ref) = obj.get("#") {
if let Some(ref_soul) = soul_ref.as_str() {
if let Some(ref_node) = self.core.graph.get(ref_soul) {
let mut put_obj = serde_json::json!({
"#": ref_soul
});
for (k, v) in &ref_node.data {
put_obj[k] = v.clone();
}
let response = serde_json::json!({
"put": put_obj
});
eprintln!("DEBUG: Sending get response for nested soul {} (key: {}) to peer", ref_soul, key);
if let Some(p) = peer {
if let Err(e) = self.say(&response, Some(p)).await {
eprintln!("Error sending get response to peer {}: {}", p.id, e);
}
}
}
}
} else {
let put_obj = serde_json::json!({
"#": soul,
key: value.clone()
});
let response = serde_json::json!({
"put": put_obj
});
eprintln!("DEBUG: Sending get response for key {} in soul {} to peer", key, soul);
if let Some(p) = peer {
if let Err(e) = self.say(&response, Some(p)).await {
eprintln!("Error sending get response to peer {}: {}", p.id, e);
}
}
}
}
}
}
} else {
let mut put_obj = serde_json::json!({
"#": soul
});
for (key, value) in &node.data {
put_obj[key] = value.clone();
}
let response = serde_json::json!({
"put": put_obj
});
eprintln!("DEBUG: Sending get response for soul {} to peer. Response: {}", soul, serde_json::to_string(&response).unwrap_or_default());
if let Err(e) = self.say(&response, None).await {
eprintln!("Error broadcasting get response: {}", e);
} else {
eprintln!("DEBUG: Get response broadcast successfully");
}
}
} else {
eprintln!("DEBUG: Requested soul {} not found in graph", soul);
}
}
}
}
}
Ok(())
}
async fn handle_peer_id_exchange(&self, msg: &Value, peer: &Peer) -> GunResult<()> {
if let Some(pid) = msg.get("pid").and_then(|v| v.as_str()) {
{
let mut peers = self.peers.write().await;
if let Some(p) = peers.get_mut(&peer.id) {
p.pid = Some(pid.to_string());
}
}
self.say(
&serde_json::json!({
"dam": "?",
"pid": self.pid,
"@": msg.get("#")
}),
Some(peer),
)
.await?;
}
Ok(())
}
pub async fn say(&self, msg: &Value, peer: Option<&Peer>) -> GunResult<()> {
let mut msg = msg.clone();
let mut msg_for_hash = msg.clone();
msg_for_hash.as_object_mut().unwrap().remove("sigs");
let msg_bytes = serde_json::to_vec(&msg_for_hash)?;
if msg.get("#").is_none() {
let mut hasher = Sha256::new();
hasher.update(&msg_bytes);
let hash = hasher.finalize();
let hash_hex = hex::encode(hash);
msg["#"] = serde_json::Value::String(hash_hex);
}
let mut sigs_array = if let Some(sigs) = msg.get("sigs").and_then(|v| v.as_array()) {
sigs.clone()
} else {
Vec::new()
};
let my_pubkey_hex = hex::encode(self.public_key.to_bytes());
let has_my_sig = sigs_array.iter().any(|sig_obj| {
sig_obj.get("pubkey")
.and_then(|v| v.as_str())
.map(|pk| pk == my_pubkey_hex)
.unwrap_or(false)
});
if !has_my_sig {
let signature = sign(&self.secret_key, &msg_bytes);
let signature_hex = hex::encode(signature.to_bytes());
let sig_entry = serde_json::json!({
"sig": signature_hex,
"pubkey": my_pubkey_hex
});
sigs_array.push(sig_entry);
}
msg["sigs"] = serde_json::Value::Array(sigs_array);
let raw = serde_json::to_string(&msg)?;
if let Some(p) = peer {
self.send_to_peer_by_id(&raw, &p.id).await?;
} else {
let peer_ids: Vec<String> = {
let peers = self.peers.read().await;
let ids: Vec<String> = peers.keys().cloned().collect();
eprintln!("DEBUG: Broadcasting message to {} peers: {:?}", ids.len(), ids);
ids
};
for peer_id in peer_ids {
eprintln!("DEBUG: Attempting to send broadcast message to peer {}", peer_id);
if let Err(e) = self.send_to_peer_by_id(&raw, &peer_id).await {
eprintln!("Error sending to peer {}: {}", peer_id, e);
} else {
eprintln!("DEBUG: Successfully sent broadcast message to peer {}", peer_id);
}
}
}
Ok(())
}
pub(crate) async fn send_to_peer_by_id(&self, raw: &str, peer_id: &str) -> GunResult<()> {
let tx_opt = {
let peers = self.peers.read().await;
if let Some(peer) = peers.get(peer_id) {
eprintln!("DEBUG: Found peer {} with sender available", peer_id);
peer.tx.clone() } else {
eprintln!("DEBUG: Peer {} not found in peers list ({} total peers)", peer_id, peers.len());
None }
};
if let Some(tx) = tx_opt {
let msg_preview = raw.chars().take(150).collect::<String>();
eprintln!("DEBUG: Sending message to WebSocket for peer {}: {}", peer_id, msg_preview);
tx.send(raw.to_string()).map_err(|e| {
eprintln!("DEBUG: WebSocket send error for peer {}: {}", peer_id, e);
crate::error::GunError::Network(format!(
"Failed to send to peer {}: {}",
peer_id, e
))
})?;
eprintln!("DEBUG: Message sent successfully to WebSocket for peer {}", peer_id);
return Ok(());
}
{
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(peer_id) {
peer.queue.push(raw.to_string());
} else {
return Ok(());
}
}
Ok(())
}
#[allow(dead_code)] async fn send_to_peer(&self, raw: &str, peer: &Peer) -> GunResult<()> {
self.send_to_peer_by_id(raw, &peer.id).await
}
pub async fn set_peer_sender(
&self,
peer_id: &str,
tx: mpsc::UnboundedSender<String>,
) -> GunResult<()> {
let mut peers = self.peers.write().await;
if let Some(peer) = peers.get_mut(peer_id) {
let tx_clone = tx.clone();
peer.set_sender(tx);
let queue = peer.queue.clone();
peer.queue.clear();
drop(peers);
for msg in queue {
if let Err(e) = tx_clone.send(msg) {
eprintln!("Error sending queued message: {}", e);
break;
}
}
} else {
drop(peers);
return Err(crate::error::GunError::Network(format!(
"Peer {} not found in mesh, call hi() first",
peer_id
)));
}
Ok(())
}
pub async fn hi(&self, peer: Peer) -> GunResult<()> {
let mut peers = self.peers.write().await;
let was_new = !peers.contains_key(&peer.id);
let peer_id = peer.id.clone();
peers.insert(peer_id.clone(), peer.clone());
drop(peers);
if was_new {
let mut near = self.near.write().await;
*near += 1;
drop(near);
let hi_message = serde_json::json!({
"dam": "?",
"pid": self.pid,
});
if let Err(e) = self.say(&hi_message, Some(&peer)).await {
tracing::warn!("Failed to send hi message to peer {}: {}", peer_id, e);
}
}
Ok(())
}
pub async fn bye(&self, peer_id: &str) -> GunResult<()> {
let mut peers = self.peers.write().await;
if peers.remove(peer_id).is_some() {
let mut near = self.near.write().await;
if *near > 0 {
*near -= 1;
}
}
Ok(())
}
pub async fn connected_peer_count(&self) -> usize {
use tokio::time::{timeout, Duration};
match timeout(Duration::from_millis(100), self.peers.read()).await {
Ok(peers) => peers.values().filter(|p| p.tx.is_some()).count(),
Err(_) => {
0
}
}
}
pub async fn has_connected_peers(&self) -> bool {
self.connected_peer_count().await > 0
}
pub async fn wait_for_connection(&self, timeout_ms: u64) -> bool {
use tokio::time::{sleep, Duration, Instant};
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
while Instant::now() < deadline {
if self.has_connected_peers().await {
return true;
}
sleep(Duration::from_millis(100)).await;
}
false
}
pub async fn get_peer(&self, peer_id: &str) -> Option<Peer> {
let peers = self.peers.read().await;
peers.get(peer_id).cloned()
}
}