1use iroh::endpoint::Connection;
8use serde::Serialize;
9
10use crate::{
11 parse_node_addr,
12 stream::{pump_body_to_quic_send, pump_quic_recv_to_body, HandleStore, SessionEntry},
13 CoreError, FfiDuplexStream, IrohEndpoint, ALPN_DUPLEX,
14};
15
16fn is_connection_closed(err: &iroh::endpoint::ConnectionError) -> bool {
19 use iroh::endpoint::ConnectionError::*;
20 matches!(
21 err,
22 ApplicationClosed(_) | ConnectionClosed(_) | Reset | TimedOut | LocallyClosed
23 )
24}
25
26#[derive(Debug, Clone, Serialize)]
28#[serde(rename_all = "camelCase")]
29pub struct CloseInfo {
30 pub close_code: u64,
31 pub reason: String,
32}
33
34fn get_conn(endpoint: &IrohEndpoint, handle: u64) -> Result<Connection, CoreError> {
37 endpoint
38 .handles()
39 .lookup_session(handle)
40 .map(|s| s.conn.clone())
41 .ok_or_else(|| CoreError::invalid_handle(handle))
42}
43
44pub fn session_remote_id(
46 endpoint: &IrohEndpoint,
47 handle: u64,
48) -> Result<iroh::PublicKey, CoreError> {
49 get_conn(endpoint, handle).map(|c| c.remote_id())
50}
51
52pub async fn session_connect(
64 endpoint: &IrohEndpoint,
65 remote_node_id: &str,
66 direct_addrs: Option<&[std::net::SocketAddr]>,
67) -> Result<u64, CoreError> {
68 let parsed = parse_node_addr(remote_node_id)?;
69 let node_id = parsed.node_id;
70 let mut addr = iroh::EndpointAddr::new(node_id);
71 for a in &parsed.direct_addrs {
72 addr = addr.with_ip_addr(*a);
73 }
74 if let Some(addrs) = direct_addrs {
75 for a in addrs {
76 addr = addr.with_ip_addr(*a);
77 }
78 }
79
80 let conn = endpoint
81 .raw()
82 .connect(addr, ALPN_DUPLEX)
83 .await
84 .map_err(|e| CoreError::connection_failed(format!("connect session: {e}")))?;
85
86 let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
87
88 Ok(handle)
89}
90
91pub async fn session_create_bidi_stream(
96 endpoint: &IrohEndpoint,
97 session_handle: u64,
98) -> Result<FfiDuplexStream, CoreError> {
99 let conn = get_conn(endpoint, session_handle)?;
100
101 let (send, recv) = conn
102 .open_bi()
103 .await
104 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
105
106 wrap_bidi_stream(endpoint.handles(), send, recv)
107}
108
109pub async fn session_next_bidi_stream(
114 endpoint: &IrohEndpoint,
115 session_handle: u64,
116) -> Result<Option<FfiDuplexStream>, CoreError> {
117 let conn = get_conn(endpoint, session_handle)?;
118
119 match conn.accept_bi().await {
120 Ok((send, recv)) => Ok(Some(wrap_bidi_stream(endpoint.handles(), send, recv)?)),
121 Err(e) if is_connection_closed(&e) => Ok(None),
122 Err(e) => Err(CoreError::connection_failed(format!("accept_bi: {e}"))),
123 }
124}
125
126pub async fn session_accept(endpoint: &IrohEndpoint) -> Result<Option<u64>, CoreError> {
131 let incoming = match endpoint.raw().accept().await {
132 Some(inc) => inc,
133 None => return Ok(None),
134 };
135
136 let conn = incoming
137 .await
138 .map_err(|e| CoreError::connection_failed(format!("accept session: {e}")))?;
139
140 let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
141
142 Ok(Some(handle))
143}
144
145pub fn session_close(
150 endpoint: &IrohEndpoint,
151 session_handle: u64,
152 close_code: u64,
153 reason: &str,
154) -> Result<(), CoreError> {
155 let entry = endpoint
156 .handles()
157 .remove_session(session_handle)
158 .ok_or_else(|| CoreError::invalid_handle(session_handle))?;
159 let code = iroh::endpoint::VarInt::from_u64(close_code).map_err(|_| {
160 CoreError::invalid_input(format!(
161 "close_code {close_code} exceeds QUIC VarInt max (2^62 - 1)"
162 ))
163 })?;
164 entry.conn.close(code, reason.as_bytes());
165 Ok(())
166}
167
168pub async fn session_ready(endpoint: &IrohEndpoint, session_handle: u64) -> Result<(), CoreError> {
172 let _conn = get_conn(endpoint, session_handle)?;
174 Ok(())
177}
178
179pub async fn session_closed(
184 endpoint: &IrohEndpoint,
185 session_handle: u64,
186) -> Result<CloseInfo, CoreError> {
187 let conn = get_conn(endpoint, session_handle)?;
188 let err = conn.closed().await;
189 endpoint.handles().remove_session(session_handle);
191 let (close_code, reason) = parse_connection_error(&err);
192 Ok(CloseInfo { close_code, reason })
193}
194
195pub async fn session_create_uni_stream(
201 endpoint: &IrohEndpoint,
202 session_handle: u64,
203) -> Result<u64, CoreError> {
204 let conn = get_conn(endpoint, session_handle)?;
205 let send = conn
206 .open_uni()
207 .await
208 .map_err(|e| CoreError::connection_failed(format!("open_uni: {e}")))?;
209
210 let handles = endpoint.handles();
211 let (send_writer, send_reader) = handles.make_body_channel();
212 let write_handle = handles.insert_writer(send_writer)?;
213 tokio::spawn(pump_body_to_quic_send(send_reader, send));
214
215 Ok(write_handle)
216}
217
218pub async fn session_next_uni_stream(
222 endpoint: &IrohEndpoint,
223 session_handle: u64,
224) -> Result<Option<u64>, CoreError> {
225 let conn = get_conn(endpoint, session_handle)?;
226
227 match conn.accept_uni().await {
228 Ok(recv) => {
229 let handles = endpoint.handles();
230 let (recv_writer, recv_reader) = handles.make_body_channel();
231 let read_handle = handles.insert_reader(recv_reader)?;
232 tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
233 Ok(Some(read_handle))
234 }
235 Err(e) if is_connection_closed(&e) => Ok(None),
236 Err(e) => Err(CoreError::connection_failed(format!("accept_uni: {e}"))),
237 }
238}
239
240pub fn session_send_datagram(
246 endpoint: &IrohEndpoint,
247 session_handle: u64,
248 data: &[u8],
249) -> Result<(), CoreError> {
250 let conn = get_conn(endpoint, session_handle)?;
251 conn.send_datagram(bytes::Bytes::copy_from_slice(data))
252 .map_err(|e| match e {
253 iroh::endpoint::SendDatagramError::TooLarge => CoreError::body_too_large(
254 "datagram exceeds path MTU; check session_max_datagram_size()",
255 ),
256 _ => CoreError::internal(format!("send_datagram: {e}")),
257 })
258}
259
260pub async fn session_recv_datagram(
264 endpoint: &IrohEndpoint,
265 session_handle: u64,
266) -> Result<Option<Vec<u8>>, CoreError> {
267 let conn = get_conn(endpoint, session_handle)?;
268 match conn.read_datagram().await {
269 Ok(data) => Ok(Some(data.to_vec())),
270 Err(e) if is_connection_closed(&e) => Ok(None),
271 Err(e) => Err(CoreError::connection_failed(format!("recv_datagram: {e}"))),
272 }
273}
274
275pub fn session_max_datagram_size(
279 endpoint: &IrohEndpoint,
280 session_handle: u64,
281) -> Result<Option<usize>, CoreError> {
282 let conn = get_conn(endpoint, session_handle)?;
283 Ok(conn.max_datagram_size())
284}
285
286fn wrap_bidi_stream(
290 handles: &HandleStore,
291 send: iroh::endpoint::SendStream,
292 recv: iroh::endpoint::RecvStream,
293) -> Result<FfiDuplexStream, CoreError> {
294 let mut guard = handles.insert_guard();
295
296 let (recv_writer, recv_reader) = handles.make_body_channel();
298 let read_handle = guard.insert_reader(recv_reader)?;
299 tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
300
301 let (send_writer, send_reader) = handles.make_body_channel();
303 let write_handle = guard.insert_writer(send_writer)?;
304 tokio::spawn(pump_body_to_quic_send(send_reader, send));
305
306 guard.commit();
307 Ok(FfiDuplexStream {
308 read_handle,
309 write_handle,
310 })
311}
312
313fn parse_connection_error(err: &iroh::endpoint::ConnectionError) -> (u64, String) {
315 match err {
316 iroh::endpoint::ConnectionError::ApplicationClosed(info) => {
317 let code: u64 = info.error_code.into();
318 let reason = String::from_utf8_lossy(&info.reason).into_owned();
319 (code, reason)
320 }
321 other => (0, other.to_string()),
322 }
323}