use std::fmt::Debug;
use anyhow::{anyhow, Result};
use iroh::endpoint::SendStream;
use iroh::protocol::AcceptError;
use iroh::Endpoint;
use serde::{Deserialize, Serialize};
use crate::bucket_log::BucketLogProvider;
use crate::crypto::PublicKey;
use crate::peer::Peer;
use super::{messages::Message, ALPN};
pub trait BidirectionalHandler: Sized {
type Message: Serialize + for<'de> Deserialize<'de> + Debug;
type Reply: Serialize + for<'de> Deserialize<'de> + Debug;
fn wrap_request(request: Self::Message) -> Message;
fn handle_message<L: BucketLogProvider>(
peer: &Peer<L>,
sender_node_id: &PublicKey,
message: &Self::Message,
) -> impl std::future::Future<Output = Self::Reply> + Send;
fn handle_reply<L>(
peer: &Peer<L>,
recipient_node_id: &PublicKey,
reply: &Self::Reply,
) -> impl std::future::Future<Output = Result<()>> + Send
where
L: BucketLogProvider,
L::Error: std::error::Error + Send + Sync + 'static;
fn handle_message_side_effect<L>(
_peer: &Peer<L>,
_sender_node_id: &PublicKey,
_message: &Self::Message,
_reply: &Self::Reply,
) -> impl std::future::Future<Output = Result<()>> + Send
where
L: BucketLogProvider,
L::Error: std::error::Error + Send + Sync + 'static,
{
async { Ok(()) }
}
async fn send<L>(
peer: &Peer<L>,
recipient_node_id: &PublicKey,
request: Self::Message,
) -> Result<Self::Reply>
where
L: BucketLogProvider,
L::Error: std::error::Error + Send + Sync + 'static,
{
let endpoint = peer.endpoint();
let response = Self::_handle_send::<L>(endpoint, recipient_node_id, request).await?;
Self::handle_reply(peer, recipient_node_id, &response).await?;
Ok(response)
}
async fn _handle_message<L>(
peer: &Peer<L>,
sender_node_id: &PublicKey,
message: Self::Message,
mut send: SendStream,
) -> Result<(), AcceptError>
where
L: BucketLogProvider,
L::Error: std::error::Error + Send + Sync + 'static,
{
let reply = Self::handle_message(peer, sender_node_id, &message).await;
let reply_bytes = bincode::serialize(&reply).map_err(|e| {
tracing::error!("Failed to serialize reply: {}", e);
let err: Box<dyn std::error::Error + Send + Sync> =
anyhow!("failed to serialize reply: {}", e).into();
AcceptError::from(err)
})?;
send.write_all(&reply_bytes).await.map_err(|e| {
tracing::error!("failed to send reply: {}", e);
AcceptError::from(std::io::Error::other(e))
})?;
send.finish().map_err(|e| {
tracing::error!("failed to finish stream: {}", e);
AcceptError::from(std::io::Error::other(e))
})?;
send.stopped().await.map_err(|e| {
tracing::error!("failed to stop stream: {}", e);
AcceptError::from(std::io::Error::other(e))
})?;
if let Err(e) =
Self::handle_message_side_effect(peer, sender_node_id, &message, &reply).await
{
tracing::error!("Error in after_response_sent hook: {}", e);
}
Ok(())
}
async fn _handle_send<L>(
endpoint: &Endpoint,
recipient_node_id: &PublicKey,
message: Self::Message,
) -> Result<Self::Reply>
where
L: BucketLogProvider,
{
let conn = endpoint
.connect(**recipient_node_id, ALPN)
.await
.map_err(|e| {
tracing::error!("Failed to connect to peer {:?}: {}", recipient_node_id, e);
anyhow!("Failed to connect to peer: {}", e)
})?;
tracing::info!("Connected to peer {:?}", recipient_node_id);
let (mut send, mut recv) = conn.open_bi().await.map_err(|e| {
tracing::error!("Failed to open bidirectional stream: {}", e);
anyhow!("Failed to open bidirectional stream: {}", e)
})?;
tracing::info!(
"Opened bidirectional stream with peer {:?}",
recipient_node_id
);
let message = Self::wrap_request(message);
let request_bytes = bincode::serialize(&message)
.map_err(|e| anyhow!("Failed to serialize request: {}", e))?;
tracing::info!(
"BIDIRECTIONAL: Serialized request to {} bytes, first byte: {}",
request_bytes.len(),
request_bytes
.first()
.map(|b| b.to_string())
.unwrap_or_else(|| "none".to_string())
);
send.write_all(&request_bytes)
.await
.map_err(|e| anyhow!("Failed to write request: {}", e))?;
tracing::info!("BIDIRECTIONAL: Sent request");
send.finish()
.map_err(|e| anyhow!("Failed to finish sending request: {}", e))?;
tracing::info!("BIDIRECTIONAL: Finished sending request");
let response_bytes = recv
.read_to_end(1024 * 1024)
.await
.map_err(|e| anyhow!("Failed to read response: {}", e))?;
tracing::info!("BIDIRECTIONAL: Received response");
let response: Self::Reply = bincode::deserialize(&response_bytes)
.map_err(|e| anyhow!("Failed to deserialize response: {}", e))?;
tracing::info!("BIDIRECTIONAL: Deserialized response: {:?}", response);
Ok(response)
}
}