pub type PeerStreamSenders = std::sync::Arc<
tokio::sync::Mutex<std::collections::HashMap<(SelfID52, RemoteID52), StreamRequestSender>>,
>;
type Stream = (iroh::endpoint::SendStream, ftnet_utils::FrameReader);
type StreamResult = eyre::Result<Stream>;
type ReplyChannel = tokio::sync::oneshot::Sender<StreamResult>;
type RemoteID52 = String;
type SelfID52 = String;
type StreamRequest = (ftnet_utils::Protocol, ReplyChannel);
type StreamRequestSender = tokio::sync::mpsc::Sender<StreamRequest>;
type StreamRequestReceiver = tokio::sync::mpsc::Receiver<StreamRequest>;
pub async fn get_stream(
self_endpoint: iroh::Endpoint,
protocol: ftnet_utils::Protocol,
remote_node_id52: RemoteID52,
peer_stream_senders: PeerStreamSenders,
) -> eyre::Result<(iroh::endpoint::SendStream, ftnet_utils::FrameReader)> {
let stream_request_sender =
get_stream_request_sender(self_endpoint, remote_node_id52, peer_stream_senders).await;
let (reply_channel, receiver) = tokio::sync::oneshot::channel();
stream_request_sender
.send((protocol, reply_channel))
.await?;
receiver.await?
}
async fn get_stream_request_sender(
self_endpoint: iroh::Endpoint,
remote_node_id52: RemoteID52,
peer_stream_senders: PeerStreamSenders,
) -> StreamRequestSender {
let self_id52 = ftnet_utils::public_key_to_id52(&self_endpoint.node_id());
let mut senders = peer_stream_senders.lock().await;
if let Some(sender) = senders.get(&(self_id52.clone(), remote_node_id52.clone())) {
return sender.clone();
}
let (sender, receiver) = tokio::sync::mpsc::channel(1);
senders.insert(
(self_id52.clone(), remote_node_id52.clone()),
sender.clone(),
);
drop(senders);
tokio::spawn(async move {
connection_manager(
receiver,
self_id52,
self_endpoint,
remote_node_id52,
peer_stream_senders,
)
.await;
});
sender
}
async fn connection_manager(
mut receiver: StreamRequestReceiver,
self_id52: SelfID52,
self_endpoint: iroh::Endpoint,
remote_node_id52: RemoteID52,
peer_stream_senders: PeerStreamSenders,
) {
let e = match connection_manager_(&mut receiver, self_endpoint, remote_node_id52.clone()).await
{
Ok(()) => return,
Err(e) => e,
};
tracing::error!("connection manager worker error: {e:?}");
receiver.close();
while let Some((_protocol, reply_channel)) = receiver.recv().await {
if reply_channel
.send(Err(eyre::anyhow!("failed to create connection: {e:?}")))
.is_err()
{
tracing::error!("failed to send error reply: {e:?}");
}
}
let mut senders = peer_stream_senders.lock().await;
senders.remove(&(self_id52.clone(), remote_node_id52.clone()));
}
async fn connection_manager_(
receiver: &mut StreamRequestReceiver,
self_endpoint: iroh::Endpoint,
remote_node_id52: RemoteID52,
) -> eyre::Result<()> {
let conn = match self_endpoint
.connect(
ftnet_utils::id52_to_public_key(&remote_node_id52)?,
ftnet_utils::APNS_IDENTITY,
)
.await
{
Ok(v) => v,
Err(e) => {
tracing::error!("failed to create connection: {e:?}");
return Err(eyre::anyhow!("failed to create connection: {e:?}"));
}
};
while let Some((protocol, reply_channel)) = receiver.recv().await {
if let Err(e) = handle_request(&conn, protocol, reply_channel).await {
tracing::error!("failed to handle request: {e:?}");
return Err(e);
}
}
Ok(())
}
async fn handle_request(
conn: &iroh::endpoint::Connection,
protocol: ftnet_utils::Protocol,
reply_channel: ReplyChannel,
) -> eyre::Result<()> {
use tokio_stream::StreamExt;
let (mut send, recv) = match conn.open_bi().await {
Ok(v) => v,
Err(e) => {
tracing::error!("failed to open_bi: {e:?}");
return Err(eyre::anyhow!("failed to open_bi: {e:?}"));
}
};
send.write_all(&serde_json::to_vec(&protocol)?).await?;
send.write(b"\n").await?;
let mut recv = ftnet_utils::frame_reader(recv);
let msg = match recv.next().await {
Some(v) => v?,
None => {
tracing::error!("failed to read from incoming connection");
return Err(eyre::anyhow!("failed to read from incoming connection"));
}
};
if msg != ftnet_utils::ACK {
tracing::error!("failed to read ack: {msg:?}");
return Err(eyre::anyhow!("failed to read ack: {msg:?}"));
}
reply_channel.send(Ok((send, recv))).unwrap_or_else(|_| {
tracing::error!("failed to send reply");
});
Ok(())
}