iroh_http_core/ffi/
session.rs1#![allow(clippy::disallowed_types)]
13use iroh::endpoint::Connection;
20use serde::Serialize;
21
22use crate::{
23 ffi::handles::{HandleStore, SessionEntry},
24 ffi::pumps::{pump_body_to_quic_send, pump_quic_recv_to_body},
25 parse_node_addr, CoreError, FfiDuplexStream, IrohEndpoint, ALPN_DUPLEX,
26};
27
28fn is_connection_closed(err: &iroh::endpoint::ConnectionError) -> bool {
31 use iroh::endpoint::ConnectionError::*;
32 matches!(
33 err,
34 ApplicationClosed(_) | ConnectionClosed(_) | Reset | TimedOut | LocallyClosed
35 )
36}
37
38#[derive(Debug, Clone, Serialize)]
40#[serde(rename_all = "camelCase")]
41pub struct CloseInfo {
42 pub close_code: u64,
43 pub reason: String,
44}
45
46#[derive(Clone)]
52pub struct Session {
53 endpoint: IrohEndpoint,
54 handle: u64,
55}
56
57impl Session {
58 pub fn from_handle(endpoint: IrohEndpoint, handle: u64) -> Self {
64 Self { endpoint, handle }
65 }
66
67 pub fn handle(&self) -> u64 {
69 self.handle
70 }
71
72 pub async fn connect(
80 endpoint: IrohEndpoint,
81 remote_node_id: &str,
82 direct_addrs: Option<&[std::net::SocketAddr]>,
83 ) -> Result<Self, CoreError> {
84 let parsed = parse_node_addr(remote_node_id)?;
85 let node_id = parsed.node_id;
86 let mut addr = iroh::EndpointAddr::new(node_id);
87 for a in &parsed.direct_addrs {
88 addr = addr.with_ip_addr(*a);
89 }
90 if let Some(addrs) = direct_addrs {
91 for a in addrs {
92 addr = addr.with_ip_addr(*a);
93 }
94 }
95
96 let conn = endpoint
97 .raw()
98 .connect(addr, ALPN_DUPLEX)
99 .await
100 .map_err(|e| CoreError::connection_failed(format!("connect session: {e}")))?;
101
102 let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
103
104 Ok(Self { endpoint, handle })
105 }
106
107 pub async fn accept(endpoint: IrohEndpoint) -> Result<Option<Self>, CoreError> {
112 let incoming = match endpoint.raw().accept().await {
113 Some(inc) => inc,
114 None => return Ok(None),
115 };
116
117 let conn = incoming
118 .await
119 .map_err(|e| CoreError::connection_failed(format!("accept session: {e}")))?;
120
121 let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
122
123 Ok(Some(Self { endpoint, handle }))
124 }
125
126 fn conn(&self) -> Result<Connection, CoreError> {
127 self.endpoint
128 .handles()
129 .lookup_session(self.handle)
130 .map(|s| s.conn.clone())
131 .ok_or_else(|| CoreError::invalid_handle(self.handle))
132 }
133
134 pub fn remote_id(&self) -> Result<iroh::PublicKey, CoreError> {
136 self.conn().map(|c| c.remote_id())
137 }
138
139 pub async fn create_bidi_stream(&self) -> Result<FfiDuplexStream, CoreError> {
144 let conn = self.conn()?;
145
146 let (send, recv) = conn
147 .open_bi()
148 .await
149 .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
150
151 wrap_bidi_stream(self.endpoint.handles(), send, recv)
152 }
153
154 pub async fn next_bidi_stream(&self) -> Result<Option<FfiDuplexStream>, CoreError> {
159 let conn = self.conn()?;
160
161 match conn.accept_bi().await {
162 Ok((send, recv)) => Ok(Some(wrap_bidi_stream(self.endpoint.handles(), send, recv)?)),
163 Err(e) if is_connection_closed(&e) => Ok(None),
164 Err(e) => Err(CoreError::connection_failed(format!("accept_bi: {e}"))),
165 }
166 }
167
168 pub fn close(&self, close_code: u64, reason: &str) -> Result<(), CoreError> {
173 let entry = self
174 .endpoint
175 .handles()
176 .remove_session(self.handle)
177 .ok_or_else(|| CoreError::invalid_handle(self.handle))?;
178 let code = iroh::endpoint::VarInt::from_u64(close_code).map_err(|_| {
179 CoreError::invalid_input(format!(
180 "close_code {close_code} exceeds QUIC VarInt max (2^62 - 1)"
181 ))
182 })?;
183 entry.conn.close(code, reason.as_bytes());
184 Ok(())
185 }
186
187 pub async fn ready(&self) -> Result<(), CoreError> {
191 let _conn = self.conn()?;
193 Ok(())
196 }
197
198 pub async fn closed(&self) -> Result<CloseInfo, CoreError> {
203 let conn = self.conn()?;
204 let err = conn.closed().await;
205 self.endpoint.handles().remove_session(self.handle);
207 let (close_code, reason) = parse_connection_error(&err);
208 Ok(CloseInfo { close_code, reason })
209 }
210
211 pub async fn create_uni_stream(&self) -> Result<u64, CoreError> {
215 let conn = self.conn()?;
216 let send = conn
217 .open_uni()
218 .await
219 .map_err(|e| CoreError::connection_failed(format!("open_uni: {e}")))?;
220
221 let handles = self.endpoint.handles();
222 let (send_writer, send_reader) = handles.make_body_channel();
223 let write_handle = handles.insert_writer(send_writer)?;
224 tokio::spawn(pump_body_to_quic_send(send_reader, send));
225
226 Ok(write_handle)
227 }
228
229 pub async fn next_uni_stream(&self) -> Result<Option<u64>, CoreError> {
233 let conn = self.conn()?;
234
235 match conn.accept_uni().await {
236 Ok(recv) => {
237 let handles = self.endpoint.handles();
238 let (recv_writer, recv_reader) = handles.make_body_channel();
239 let read_handle = handles.insert_reader(recv_reader)?;
240 tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
241 Ok(Some(read_handle))
242 }
243 Err(e) if is_connection_closed(&e) => Ok(None),
244 Err(e) => Err(CoreError::connection_failed(format!("accept_uni: {e}"))),
245 }
246 }
247
248 pub fn send_datagram(&self, data: &[u8]) -> Result<(), CoreError> {
252 let conn = self.conn()?;
253 conn.send_datagram(bytes::Bytes::copy_from_slice(data))
254 .map_err(|e| match e {
255 iroh::endpoint::SendDatagramError::TooLarge => CoreError::body_too_large(
256 "datagram exceeds path MTU; check Session::max_datagram_size()",
257 ),
258 _ => CoreError::internal(format!("send_datagram: {e}")),
259 })
260 }
261
262 pub async fn recv_datagram(&self) -> Result<Option<Vec<u8>>, CoreError> {
266 let conn = self.conn()?;
267 match conn.read_datagram().await {
268 Ok(data) => Ok(Some(data.to_vec())),
269 Err(e) if is_connection_closed(&e) => Ok(None),
270 Err(e) => Err(CoreError::connection_failed(format!("recv_datagram: {e}"))),
271 }
272 }
273
274 pub fn max_datagram_size(&self) -> Result<Option<usize>, CoreError> {
278 let conn = self.conn()?;
279 Ok(conn.max_datagram_size())
280 }
281}
282
283fn wrap_bidi_stream(
287 handles: &HandleStore,
288 send: iroh::endpoint::SendStream,
289 recv: iroh::endpoint::RecvStream,
290) -> Result<FfiDuplexStream, CoreError> {
291 let mut guard = handles.insert_guard();
292
293 let (recv_writer, recv_reader) = handles.make_body_channel();
295 let read_handle = guard.insert_reader(recv_reader)?;
296 tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
297
298 let (send_writer, send_reader) = handles.make_body_channel();
300 let write_handle = guard.insert_writer(send_writer)?;
301 tokio::spawn(pump_body_to_quic_send(send_reader, send));
302
303 guard.commit();
304 Ok(FfiDuplexStream {
305 read_handle,
306 write_handle,
307 })
308}
309
310fn parse_connection_error(err: &iroh::endpoint::ConnectionError) -> (u64, String) {
312 match err {
313 iroh::endpoint::ConnectionError::ApplicationClosed(info) => {
314 let code: u64 = info.error_code.into();
315 let reason = String::from_utf8_lossy(&info.reason).into_owned();
316 (code, reason)
317 }
318 other => (0, other.to_string()),
319 }
320}