Skip to main content

iroh_http_core/ffi/
session.rs

1//! `Session` — a QUIC connection to a single remote peer.
2//!
3//! [`Session::connect`] establishes (or accepts via [`Session::accept`]) a QUIC
4//! connection and returns a session value backed by an opaque handle.
5//! Bidirectional streams, unidirectional streams, and datagrams are all
6//! accessible through methods on [`Session`].
7//!
8//! The shape mirrors W3C WebTransport: every method has a direct counterpart
9//! on the `WebTransport` interface.
10//!
11// Legitimate FFI wiring — uses the disallowed types intentionally.
12#![allow(clippy::disallowed_types)]
13//! Lives under `mod ffi` (Slice E, #187) because `Session` is `u64`-handle-
14//! shaped: it wraps a slotmap entry in [`crate::ffi::handles::HandleStore`]
15//! and returns [`crate::FfiDuplexStream`]. A pure-Rust QUIC session API would
16//! sit under `mod http` and yield typed stream values directly; that is not
17//! today's API. Until it is, this module belongs on the FFI side.
18
19use iroh::endpoint::Connection;
20use serde::Serialize;
21
22use crate::{
23    ffi::handles::{HandleStore, SessionEntry},
24    ffi::pumps::{pump_body_to_quic_send, pump_quic_recv_to_body},
25    parse_node_addr, CoreError, FfiDuplexStream, IrohEndpoint, ALPN_DUPLEX,
26};
27
28/// Returns `true` if the connection error means "connection ended" rather
29/// than a protocol-level bug.  Used to return `None` instead of `Err`.
30fn is_connection_closed(err: &iroh::endpoint::ConnectionError) -> bool {
31    use iroh::endpoint::ConnectionError::*;
32    matches!(
33        err,
34        ApplicationClosed(_) | ConnectionClosed(_) | Reset | TimedOut | LocallyClosed
35    )
36}
37
38/// Close information returned when a session ends.
39#[derive(Debug, Clone, Serialize)]
40#[serde(rename_all = "camelCase")]
41pub struct CloseInfo {
42    pub close_code: u64,
43    pub reason: String,
44}
45
46/// A QUIC session to a single remote peer.
47///
48/// Cheap to clone — this is just an [`IrohEndpoint`] ref-clone plus a `u64`
49/// handle into the endpoint's session registry. Sessions are not pooled;
50/// closing one session has no effect on other sessions to the same peer.
51#[derive(Clone)]
52pub struct Session {
53    endpoint: IrohEndpoint,
54    handle: u64,
55}
56
57impl Session {
58    /// Reconstruct a `Session` from a handle previously obtained via
59    /// [`Session::connect`] or [`Session::accept`].
60    ///
61    /// FFI adapters use this to rehydrate a session from a `u64` handle that
62    /// crossed a language boundary.
63    pub fn from_handle(endpoint: IrohEndpoint, handle: u64) -> Self {
64        Self { endpoint, handle }
65    }
66
67    /// The opaque handle identifying this session inside the endpoint registry.
68    pub fn handle(&self) -> u64 {
69        self.handle
70    }
71
72    /// Establish a session (QUIC connection) to a remote peer.
73    ///
74    /// Each call creates a **dedicated** QUIC connection — sessions are not
75    /// pooled. Closing one session handle cannot affect other sessions to the
76    /// same peer. (Fetch operations continue to use the shared pool for
77    /// efficiency; sessions opt out because [`Session::close`] closes the
78    /// underlying connection.)
79    pub async fn connect(
80        endpoint: IrohEndpoint,
81        remote_node_id: &str,
82        direct_addrs: Option<&[std::net::SocketAddr]>,
83    ) -> Result<Self, CoreError> {
84        let parsed = parse_node_addr(remote_node_id)?;
85        let node_id = parsed.node_id;
86        let mut addr = iroh::EndpointAddr::new(node_id);
87        for a in &parsed.direct_addrs {
88            addr = addr.with_ip_addr(*a);
89        }
90        if let Some(addrs) = direct_addrs {
91            for a in addrs {
92                addr = addr.with_ip_addr(*a);
93            }
94        }
95
96        let conn = endpoint
97            .raw()
98            .connect(addr, ALPN_DUPLEX)
99            .await
100            .map_err(|e| CoreError::connection_failed(format!("connect session: {e}")))?;
101
102        let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
103
104        Ok(Self { endpoint, handle })
105    }
106
107    /// Accept an incoming session (QUIC connection) from a remote peer.
108    ///
109    /// Blocks until a peer connects.  Returns the new session, or `None` if
110    /// the endpoint is shutting down.
111    pub async fn accept(endpoint: IrohEndpoint) -> Result<Option<Self>, CoreError> {
112        let incoming = match endpoint.raw().accept().await {
113            Some(inc) => inc,
114            None => return Ok(None),
115        };
116
117        let conn = incoming
118            .await
119            .map_err(|e| CoreError::connection_failed(format!("accept session: {e}")))?;
120
121        let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
122
123        Ok(Some(Self { endpoint, handle }))
124    }
125
126    fn conn(&self) -> Result<Connection, CoreError> {
127        self.endpoint
128            .handles()
129            .lookup_session(self.handle)
130            .map(|s| s.conn.clone())
131            .ok_or_else(|| CoreError::invalid_handle(self.handle))
132    }
133
134    /// Return the remote peer's public key.
135    pub fn remote_id(&self) -> Result<iroh::PublicKey, CoreError> {
136        self.conn().map(|c| c.remote_id())
137    }
138
139    /// Open a new bidirectional stream on this session.
140    ///
141    /// Returns [`FfiDuplexStream`] with `read_handle` / `write_handle` backed by
142    /// body channels — the same interface used by fetch and raw_connect.
143    pub async fn create_bidi_stream(&self) -> Result<FfiDuplexStream, CoreError> {
144        let conn = self.conn()?;
145
146        let (send, recv) = conn
147            .open_bi()
148            .await
149            .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
150
151        wrap_bidi_stream(self.endpoint.handles(), send, recv)
152    }
153
154    /// Accept the next incoming bidirectional stream from the remote side.
155    ///
156    /// Blocks until the remote opens a stream, or returns `None` when the
157    /// connection is closed.
158    pub async fn next_bidi_stream(&self) -> Result<Option<FfiDuplexStream>, CoreError> {
159        let conn = self.conn()?;
160
161        match conn.accept_bi().await {
162            Ok((send, recv)) => Ok(Some(wrap_bidi_stream(self.endpoint.handles(), send, recv)?)),
163            Err(e) if is_connection_closed(&e) => Ok(None),
164            Err(e) => Err(CoreError::connection_failed(format!("accept_bi: {e}"))),
165        }
166    }
167
168    /// Close this session and remove it from the registry.
169    ///
170    /// `close_code` is an application-level error code (maps to QUIC VarInt).
171    /// `reason` is a human-readable string sent to the peer.
172    pub fn close(&self, close_code: u64, reason: &str) -> Result<(), CoreError> {
173        let entry = self
174            .endpoint
175            .handles()
176            .remove_session(self.handle)
177            .ok_or_else(|| CoreError::invalid_handle(self.handle))?;
178        let code = iroh::endpoint::VarInt::from_u64(close_code).map_err(|_| {
179            CoreError::invalid_input(format!(
180                "close_code {close_code} exceeds QUIC VarInt max (2^62 - 1)"
181            ))
182        })?;
183        entry.conn.close(code, reason.as_bytes());
184        Ok(())
185    }
186
187    /// Wait for the QUIC handshake to complete on this session.
188    ///
189    /// Resolves immediately if the handshake has already completed.
190    pub async fn ready(&self) -> Result<(), CoreError> {
191        // Validate handle exists — keeps error behavior consistent with other session APIs.
192        let _conn = self.conn()?;
193        // iroh connections are fully established by the time Session::connect returns,
194        // so ready always resolves immediately. Kept for WebTransport API compatibility.
195        Ok(())
196    }
197
198    /// Wait for the session to close and return the close information.
199    ///
200    /// Blocks until the connection is closed by either side.  Removes the
201    /// session from the registry so resources are freed.
202    pub async fn closed(&self) -> Result<CloseInfo, CoreError> {
203        let conn = self.conn()?;
204        let err = conn.closed().await;
205        // Connection is dead — clean up the registry entry.
206        self.endpoint.handles().remove_session(self.handle);
207        let (close_code, reason) = parse_connection_error(&err);
208        Ok(CloseInfo { close_code, reason })
209    }
210
211    /// Open a new unidirectional (send-only) stream on this session.
212    ///
213    /// Returns a write handle backed by a body channel.
214    pub async fn create_uni_stream(&self) -> Result<u64, CoreError> {
215        let conn = self.conn()?;
216        let send = conn
217            .open_uni()
218            .await
219            .map_err(|e| CoreError::connection_failed(format!("open_uni: {e}")))?;
220
221        let handles = self.endpoint.handles();
222        let (send_writer, send_reader) = handles.make_body_channel();
223        let write_handle = handles.insert_writer(send_writer)?;
224        tokio::spawn(pump_body_to_quic_send(send_reader, send));
225
226        Ok(write_handle)
227    }
228
229    /// Accept the next incoming unidirectional (receive-only) stream.
230    ///
231    /// Returns a read handle, or `None` when the connection is closed.
232    pub async fn next_uni_stream(&self) -> Result<Option<u64>, CoreError> {
233        let conn = self.conn()?;
234
235        match conn.accept_uni().await {
236            Ok(recv) => {
237                let handles = self.endpoint.handles();
238                let (recv_writer, recv_reader) = handles.make_body_channel();
239                let read_handle = handles.insert_reader(recv_reader)?;
240                tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
241                Ok(Some(read_handle))
242            }
243            Err(e) if is_connection_closed(&e) => Ok(None),
244            Err(e) => Err(CoreError::connection_failed(format!("accept_uni: {e}"))),
245        }
246    }
247
248    /// Send a datagram on this session.
249    ///
250    /// Fails if `data.len()` exceeds [`Session::max_datagram_size`].
251    pub fn send_datagram(&self, data: &[u8]) -> Result<(), CoreError> {
252        let conn = self.conn()?;
253        conn.send_datagram(bytes::Bytes::copy_from_slice(data))
254            .map_err(|e| match e {
255                iroh::endpoint::SendDatagramError::TooLarge => CoreError::body_too_large(
256                    "datagram exceeds path MTU; check Session::max_datagram_size()",
257                ),
258                _ => CoreError::internal(format!("send_datagram: {e}")),
259            })
260    }
261
262    /// Receive the next datagram from this session.
263    ///
264    /// Blocks until a datagram arrives, or returns `None` when the connection closes.
265    pub async fn recv_datagram(&self) -> Result<Option<Vec<u8>>, CoreError> {
266        let conn = self.conn()?;
267        match conn.read_datagram().await {
268            Ok(data) => Ok(Some(data.to_vec())),
269            Err(e) if is_connection_closed(&e) => Ok(None),
270            Err(e) => Err(CoreError::connection_failed(format!("recv_datagram: {e}"))),
271        }
272    }
273
274    /// Return the current maximum datagram payload size for this session.
275    ///
276    /// Returns `None` if datagrams are not supported on the current path.
277    pub fn max_datagram_size(&self) -> Result<Option<usize>, CoreError> {
278        let conn = self.conn()?;
279        Ok(conn.max_datagram_size())
280    }
281}
282
283// ── Helpers ───────────────────────────────────────────────────────────────────
284
285/// Wrap raw QUIC send/recv streams into body-channel–backed `FfiDuplexStream`.
286fn wrap_bidi_stream(
287    handles: &HandleStore,
288    send: iroh::endpoint::SendStream,
289    recv: iroh::endpoint::RecvStream,
290) -> Result<FfiDuplexStream, CoreError> {
291    let mut guard = handles.insert_guard();
292
293    // Receive side: pump from QUIC recv → BodyWriter → BodyReader (JS reads via nextChunk).
294    let (recv_writer, recv_reader) = handles.make_body_channel();
295    let read_handle = guard.insert_reader(recv_reader)?;
296    tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
297
298    // Send side: pump from BodyReader (JS writes via sendChunk) → QUIC send.
299    let (send_writer, send_reader) = handles.make_body_channel();
300    let write_handle = guard.insert_writer(send_writer)?;
301    tokio::spawn(pump_body_to_quic_send(send_reader, send));
302
303    guard.commit();
304    Ok(FfiDuplexStream {
305        read_handle,
306        write_handle,
307    })
308}
309
310/// Extract close code and reason from a QUIC `ConnectionError`.
311fn parse_connection_error(err: &iroh::endpoint::ConnectionError) -> (u64, String) {
312    match err {
313        iroh::endpoint::ConnectionError::ApplicationClosed(info) => {
314            let code: u64 = info.error_code.into();
315            let reason = String::from_utf8_lossy(&info.reason).into_owned();
316            (code, reason)
317        }
318        other => (0, other.to_string()),
319    }
320}