common/peer/protocol/
mod.rs

1use anyhow::anyhow;
2use futures::future::BoxFuture;
3use iroh::endpoint::Connection;
4use iroh::protocol::{AcceptError, ProtocolHandler};
5
6use crate::crypto::PublicKey;
7
8use super::peer_inner::Peer;
9
10pub mod bidirectional;
11pub mod messages;
12
13use messages::Message;
14
15// Re-export for external users implementing custom handlers
16#[allow(unused_imports)]
17pub use bidirectional::BidirectionalHandler;
18pub use messages::ping::{Ping, PingMessage, PingReplyStatus};
19
20// TODO ( amiller68): migrate the alpn, idt there's a great
21//  reason to have an iroh prefix, nthis is not a n0 computer project
22/// ALPN identifier for the JAX protocol
23pub const ALPN: &[u8] = b"/iroh-jax/1";
24
25/// Generic connection handler that processes all incoming messages
26///
27/// This function handles all the boilerplate:
28/// - Accepting bidirectional streams
29/// - Reading and deserializing messages
30/// - Dispatching to appropriate handlers
31/// - Error handling
32async fn handle_connection<L>(peer: Peer<L>, conn: Connection) -> Result<(), AcceptError>
33where
34    L: crate::bucket_log::BucketLogProvider,
35    L::Error: std::error::Error + Send + Sync + 'static,
36{
37    // determine the sender
38    let sender_node_id: PublicKey = conn
39        .remote_node_id()
40        .map_err(|e| {
41            tracing::error!("failed to get remote node id: {}", e);
42            AcceptError::from(e)
43        })?
44        .into();
45    // Accept bidirectional stream
46    let (send, mut recv) = conn.accept_bi().await.map_err(|e| {
47        tracing::error!("failed to accept bidirectional stream: {}", e);
48        AcceptError::from(e)
49    })?;
50    tracing::debug!("bidirectional stream accepted");
51
52    // Read message (1MB limit for non-blob data)
53    let message_bytes = recv.read_to_end(1024 * 1024).await.map_err(|e| {
54        tracing::error!("failed to read message: {}", e);
55        AcceptError::from(std::io::Error::other(e))
56    })?;
57
58    // Deserialize message
59    let message: Message = bincode::deserialize(&message_bytes).map_err(|e| {
60        tracing::error!("Failed to deserialize message: {}", e);
61        tracing::error!(
62            "First 20 bytes of received data: {:?}",
63            &message_bytes[..message_bytes.len().min(20)]
64        );
65        let err: Box<dyn std::error::Error + Send + Sync> =
66            anyhow!("failed to deserialize message: {}", e).into();
67        AcceptError::from(err)
68    })?;
69
70    // Dispatch to appropriate handler
71    message.dispatch(&peer, &sender_node_id, send).await?;
72
73    Ok(())
74}
75
76// This allows the router to accept connections for this protocol
77impl<L> ProtocolHandler for Peer<L>
78where
79    L: crate::bucket_log::BucketLogProvider,
80    L::Error: std::error::Error + Send + Sync + 'static,
81{
82    #[allow(refining_impl_trait)]
83    fn accept(&self, conn: Connection) -> BoxFuture<'static, Result<(), AcceptError>> {
84        let peer = self.clone();
85        Box::pin(handle_connection(peer, conn))
86    }
87}