common/peer/protocol/
mod.rs1use anyhow::anyhow;
2use futures::future::BoxFuture;
3use iroh::endpoint::Connection;
4use iroh::protocol::{AcceptError, ProtocolHandler};
5
6use crate::crypto::PublicKey;
7
8use super::peer_inner::Peer;
9
10pub mod bidirectional;
11pub mod messages;
12
13use messages::Message;
14
15#[allow(unused_imports)]
17pub use bidirectional::BidirectionalHandler;
18pub use messages::ping::{Ping, PingMessage, PingReplyStatus};
19
20pub const ALPN: &[u8] = b"/iroh-jax/1";
24
25async fn handle_connection<L>(peer: Peer<L>, conn: Connection) -> Result<(), AcceptError>
33where
34 L: crate::bucket_log::BucketLogProvider,
35 L::Error: std::error::Error + Send + Sync + 'static,
36{
37 let sender_node_id: PublicKey = conn
39 .remote_node_id()
40 .map_err(|e| {
41 tracing::error!("failed to get remote node id: {}", e);
42 AcceptError::from(e)
43 })?
44 .into();
45 let (send, mut recv) = conn.accept_bi().await.map_err(|e| {
47 tracing::error!("failed to accept bidirectional stream: {}", e);
48 AcceptError::from(e)
49 })?;
50 tracing::debug!("bidirectional stream accepted");
51
52 let message_bytes = recv.read_to_end(1024 * 1024).await.map_err(|e| {
54 tracing::error!("failed to read message: {}", e);
55 AcceptError::from(std::io::Error::other(e))
56 })?;
57
58 let message: Message = bincode::deserialize(&message_bytes).map_err(|e| {
60 tracing::error!("Failed to deserialize message: {}", e);
61 tracing::error!(
62 "First 20 bytes of received data: {:?}",
63 &message_bytes[..message_bytes.len().min(20)]
64 );
65 let err: Box<dyn std::error::Error + Send + Sync> =
66 anyhow!("failed to deserialize message: {}", e).into();
67 AcceptError::from(err)
68 })?;
69
70 message.dispatch(&peer, &sender_node_id, send).await?;
72
73 Ok(())
74}
75
76impl<L> ProtocolHandler for Peer<L>
78where
79 L: crate::bucket_log::BucketLogProvider,
80 L::Error: std::error::Error + Send + Sync + 'static,
81{
82 #[allow(refining_impl_trait)]
83 fn accept(&self, conn: Connection) -> BoxFuture<'static, Result<(), AcceptError>> {
84 let peer = self.clone();
85 Box::pin(handle_connection(peer, conn))
86 }
87}