1pub async fn peer_to_tcp(
16 addr: &str,
17 send: iroh::endpoint::SendStream,
18 recv: iroh::endpoint::RecvStream,
19) -> eyre::Result<()> {
20 let stream = tokio::net::TcpStream::connect(addr).await?;
25 let (tcp_recv, tcp_send) = tokio::io::split(stream);
26 pipe_tcp_stream_over_iroh(tcp_recv, tcp_send, send, recv).await
27}
28
29pub async fn pipe_tcp_stream_over_iroh(
30 mut tcp_recv: impl tokio::io::AsyncRead + Unpin + Send + 'static,
31 tcp_send: impl tokio::io::AsyncWrite + Unpin + Send + 'static,
32 mut send: iroh::endpoint::SendStream,
33 mut recv: iroh::endpoint::RecvStream,
34) -> eyre::Result<()> {
35 tracing::trace!("pipe_tcp_stream_over_iroh");
36
37 let t = tokio::spawn(async move {
38 let mut t = tcp_send;
39 let r = tokio::io::copy(&mut recv, &mut t).await;
40 tracing::trace!("piping tcp stream, copy done");
41 r.map(|_| ())
42 });
43
44 tracing::trace!("copying tcp stream to iroh stream");
45
46 tokio::io::copy(&mut tcp_recv, &mut send).await?;
47
48 tracing::trace!("pipe_tcp_stream_over_iroh copy done");
49
50 send.finish()?;
51
52 tracing::trace!("closed send stream");
53 drop(send);
54
55 let r = Ok(t.await??);
56 tracing::trace!("pipe_tcp_stream_over_iroh done");
57 r
58}
59
60pub async fn tcp_to_peer(
61 header: crate::ProtocolHeader,
62 self_endpoint: iroh::Endpoint,
63 stream: tokio::net::TcpStream,
64 remote_node_id52: &str,
65 peer_connections: crate::PeerStreamSenders,
66 graceful: crate::Graceful,
67) -> eyre::Result<()> {
68 tracing::info!("tcp_to_peer: {remote_node_id52}");
69
70 let (send, recv) = crate::get_stream(
71 self_endpoint,
72 header,
73 remote_node_id52.to_string(),
74 peer_connections.clone(),
75 graceful,
76 )
77 .await?;
78
79 tracing::info!("got stream");
80
81 let (tcp_recv, tcp_send) = tokio::io::split(stream);
82 pipe_tcp_stream_over_iroh(tcp_recv, tcp_send, send, recv).await
83}