use anyhow::anyhow;
use futures::future::BoxFuture;
use iroh::endpoint::Connection;
use iroh::protocol::{AcceptError, ProtocolHandler};
use crate::crypto::PublicKey;
use super::peer_inner::Peer;
pub mod bidirectional;
pub mod messages;
use messages::Message;
#[allow(unused_imports)]
pub use bidirectional::BidirectionalHandler;
pub use messages::ping::{Ping, PingMessage, PingReplyStatus};
pub const ALPN: &[u8] = b"/iroh-jax/1";
async fn handle_connection<L>(peer: Peer<L>, conn: Connection) -> Result<(), AcceptError>
where
L: crate::bucket_log::BucketLogProvider,
L::Error: std::error::Error + Send + Sync + 'static,
{
let sender_node_id: PublicKey = conn
.remote_node_id()
.map_err(|e| {
tracing::error!("failed to get remote node id: {}", e);
AcceptError::from(e)
})?
.into();
let (send, mut recv) = conn.accept_bi().await.map_err(|e| {
tracing::error!("failed to accept bidirectional stream: {}", e);
AcceptError::from(e)
})?;
tracing::debug!("bidirectional stream accepted");
let message_bytes = recv.read_to_end(1024 * 1024).await.map_err(|e| {
tracing::error!("failed to read message: {}", e);
AcceptError::from(std::io::Error::other(e))
})?;
let message: Message = bincode::deserialize(&message_bytes).map_err(|e| {
tracing::error!("Failed to deserialize message: {}", e);
tracing::error!(
"First 20 bytes of received data: {:?}",
&message_bytes[..message_bytes.len().min(20)]
);
let err: Box<dyn std::error::Error + Send + Sync> =
anyhow!("failed to deserialize message: {}", e).into();
AcceptError::from(err)
})?;
message.dispatch(&peer, &sender_node_id, send).await?;
Ok(())
}
impl<L> ProtocolHandler for Peer<L>
where
L: crate::bucket_log::BucketLogProvider,
L::Error: std::error::Error + Send + Sync + 'static,
{
#[allow(refining_impl_trait)]
fn accept(&self, conn: Connection) -> BoxFuture<'static, Result<(), AcceptError>> {
let peer = self.clone();
Box::pin(handle_connection(peer, conn))
}
}