Skip to main content

iroh_http_core/
session.rs

1//! Session — a QUIC connection to a single remote peer.
2//!
3//! `session_connect` establishes (or reuses a pooled) connection and returns
4//! an opaque handle.  Bidirectional streams, unidirectional streams, and
5//! datagrams are all accessible through the session handle.
6
7use iroh::endpoint::Connection;
8use serde::Serialize;
9
10use crate::{
11    parse_node_addr,
12    stream::{pump_body_to_quic_send, pump_quic_recv_to_body, HandleStore, SessionEntry},
13    CoreError, FfiDuplexStream, IrohEndpoint, ALPN_DUPLEX,
14};
15
16/// Returns `true` if the connection error means "connection ended" rather
17/// than a protocol-level bug.  Used to return `None` instead of `Err`.
18fn is_connection_closed(err: &iroh::endpoint::ConnectionError) -> bool {
19    use iroh::endpoint::ConnectionError::*;
20    matches!(
21        err,
22        ApplicationClosed(_) | ConnectionClosed(_) | Reset | TimedOut | LocallyClosed
23    )
24}
25
26/// Close information returned when a session ends.
27#[derive(Debug, Clone, Serialize)]
28#[serde(rename_all = "camelCase")]
29pub struct CloseInfo {
30    pub close_code: u64,
31    pub reason: String,
32}
33
34// ── Session registry ──────────────────────────────────────────────────────────
35
36fn get_conn(endpoint: &IrohEndpoint, handle: u64) -> Result<Connection, CoreError> {
37    endpoint
38        .handles()
39        .lookup_session(handle)
40        .map(|s| s.conn.clone())
41        .ok_or_else(|| CoreError::invalid_handle(handle))
42}
43
44/// Return the remote peer's public key for a session.
45pub fn session_remote_id(
46    endpoint: &IrohEndpoint,
47    handle: u64,
48) -> Result<iroh::PublicKey, CoreError> {
49    get_conn(endpoint, handle).map(|c| c.remote_id())
50}
51
52// ── Public API ────────────────────────────────────────────────────────────────
53
54/// Establish a session (QUIC connection) to a remote peer.
55///
56/// Each call creates a **dedicated** QUIC connection — sessions are not pooled.
57/// This ensures that closing one session handle cannot affect other sessions
58/// to the same peer.  (Fetch operations continue to use the shared pool for
59/// efficiency; sessions opt out because `session_close` closes the underlying
60/// connection.)
61///
62/// Returns an opaque session handle.
63pub async fn session_connect(
64    endpoint: &IrohEndpoint,
65    remote_node_id: &str,
66    direct_addrs: Option<&[std::net::SocketAddr]>,
67) -> Result<u64, CoreError> {
68    let parsed = parse_node_addr(remote_node_id)?;
69    let node_id = parsed.node_id;
70    let mut addr = iroh::EndpointAddr::new(node_id);
71    for a in &parsed.direct_addrs {
72        addr = addr.with_ip_addr(*a);
73    }
74    if let Some(addrs) = direct_addrs {
75        for a in addrs {
76            addr = addr.with_ip_addr(*a);
77        }
78    }
79
80    let conn = endpoint
81        .raw()
82        .connect(addr, ALPN_DUPLEX)
83        .await
84        .map_err(|e| CoreError::connection_failed(format!("connect session: {e}")))?;
85
86    let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
87
88    Ok(handle)
89}
90
91/// Open a new bidirectional stream on an existing session.
92///
93/// Returns `FfiDuplexStream` with `read_handle` / `write_handle` backed by
94/// body channels — the same interface used by fetch and raw_connect.
95pub async fn session_create_bidi_stream(
96    endpoint: &IrohEndpoint,
97    session_handle: u64,
98) -> Result<FfiDuplexStream, CoreError> {
99    let conn = get_conn(endpoint, session_handle)?;
100
101    let (send, recv) = conn
102        .open_bi()
103        .await
104        .map_err(|e| CoreError::connection_failed(format!("open_bi: {e}")))?;
105
106    wrap_bidi_stream(endpoint.handles(), send, recv)
107}
108
109/// Accept the next incoming bidirectional stream from the remote side.
110///
111/// Blocks until the remote opens a stream, or returns `None` when the
112/// connection is closed.
113pub async fn session_next_bidi_stream(
114    endpoint: &IrohEndpoint,
115    session_handle: u64,
116) -> Result<Option<FfiDuplexStream>, CoreError> {
117    let conn = get_conn(endpoint, session_handle)?;
118
119    match conn.accept_bi().await {
120        Ok((send, recv)) => Ok(Some(wrap_bidi_stream(endpoint.handles(), send, recv)?)),
121        Err(e) if is_connection_closed(&e) => Ok(None),
122        Err(e) => Err(CoreError::connection_failed(format!("accept_bi: {e}"))),
123    }
124}
125
126/// Accept an incoming session (QUIC connection) from a remote peer.
127///
128/// Blocks until a peer connects.  Returns an opaque session handle, or
129/// `None` if the endpoint is shutting down.
130pub async fn session_accept(endpoint: &IrohEndpoint) -> Result<Option<u64>, CoreError> {
131    let incoming = match endpoint.raw().accept().await {
132        Some(inc) => inc,
133        None => return Ok(None),
134    };
135
136    let conn = incoming
137        .await
138        .map_err(|e| CoreError::connection_failed(format!("accept session: {e}")))?;
139
140    let handle = endpoint.handles().insert_session(SessionEntry { conn })?;
141
142    Ok(Some(handle))
143}
144
145/// Close a session and remove it from the registry.
146///
147/// `close_code` is an application-level error code (maps to QUIC VarInt).
148/// `reason` is a human-readable string sent to the peer.
149pub fn session_close(
150    endpoint: &IrohEndpoint,
151    session_handle: u64,
152    close_code: u64,
153    reason: &str,
154) -> Result<(), CoreError> {
155    let entry = endpoint
156        .handles()
157        .remove_session(session_handle)
158        .ok_or_else(|| CoreError::invalid_handle(session_handle))?;
159    let code = iroh::endpoint::VarInt::from_u64(close_code).map_err(|_| {
160        CoreError::invalid_input(format!(
161            "close_code {close_code} exceeds QUIC VarInt max (2^62 - 1)"
162        ))
163    })?;
164    entry.conn.close(code, reason.as_bytes());
165    Ok(())
166}
167
168/// Wait for the QUIC handshake to complete on a session.
169///
170/// Resolves immediately if the handshake has already completed.
171pub async fn session_ready(endpoint: &IrohEndpoint, session_handle: u64) -> Result<(), CoreError> {
172    // Validate handle exists — keeps error behavior consistent with other session APIs.
173    let _conn = get_conn(endpoint, session_handle)?;
174    // iroh connections are fully established by the time session_connect returns,
175    // so ready always resolves immediately. Kept for WebTransport API compatibility.
176    Ok(())
177}
178
179/// Wait for the session to close and return the close information.
180///
181/// Blocks until the connection is closed by either side.  Removes the
182/// session from the registry so resources are freed.
183pub async fn session_closed(
184    endpoint: &IrohEndpoint,
185    session_handle: u64,
186) -> Result<CloseInfo, CoreError> {
187    let conn = get_conn(endpoint, session_handle)?;
188    let err = conn.closed().await;
189    // Connection is dead — clean up the registry entry.
190    endpoint.handles().remove_session(session_handle);
191    let (close_code, reason) = parse_connection_error(&err);
192    Ok(CloseInfo { close_code, reason })
193}
194
195// ── Unidirectional streams ────────────────────────────────────────────────────
196
197/// Open a new unidirectional (send-only) stream on an existing session.
198///
199/// Returns a write handle backed by a body channel.
200pub async fn session_create_uni_stream(
201    endpoint: &IrohEndpoint,
202    session_handle: u64,
203) -> Result<u64, CoreError> {
204    let conn = get_conn(endpoint, session_handle)?;
205    let send = conn
206        .open_uni()
207        .await
208        .map_err(|e| CoreError::connection_failed(format!("open_uni: {e}")))?;
209
210    let handles = endpoint.handles();
211    let (send_writer, send_reader) = handles.make_body_channel();
212    let write_handle = handles.insert_writer(send_writer)?;
213    tokio::spawn(pump_body_to_quic_send(send_reader, send));
214
215    Ok(write_handle)
216}
217
218/// Accept the next incoming unidirectional (receive-only) stream.
219///
220/// Returns a read handle, or `None` when the connection is closed.
221pub async fn session_next_uni_stream(
222    endpoint: &IrohEndpoint,
223    session_handle: u64,
224) -> Result<Option<u64>, CoreError> {
225    let conn = get_conn(endpoint, session_handle)?;
226
227    match conn.accept_uni().await {
228        Ok(recv) => {
229            let handles = endpoint.handles();
230            let (recv_writer, recv_reader) = handles.make_body_channel();
231            let read_handle = handles.insert_reader(recv_reader)?;
232            tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
233            Ok(Some(read_handle))
234        }
235        Err(e) if is_connection_closed(&e) => Ok(None),
236        Err(e) => Err(CoreError::connection_failed(format!("accept_uni: {e}"))),
237    }
238}
239
240// ── Datagrams ─────────────────────────────────────────────────────────────────
241
242/// Send a datagram on the session.
243///
244/// Fails if `data.len()` exceeds `session_max_datagram_size`.
245pub fn session_send_datagram(
246    endpoint: &IrohEndpoint,
247    session_handle: u64,
248    data: &[u8],
249) -> Result<(), CoreError> {
250    let conn = get_conn(endpoint, session_handle)?;
251    conn.send_datagram(bytes::Bytes::copy_from_slice(data))
252        .map_err(|e| match e {
253            iroh::endpoint::SendDatagramError::TooLarge => CoreError::body_too_large(
254                "datagram exceeds path MTU; check session_max_datagram_size()",
255            ),
256            _ => CoreError::internal(format!("send_datagram: {e}")),
257        })
258}
259
260/// Receive the next datagram from the session.
261///
262/// Blocks until a datagram arrives, or returns `None` when the connection closes.
263pub async fn session_recv_datagram(
264    endpoint: &IrohEndpoint,
265    session_handle: u64,
266) -> Result<Option<Vec<u8>>, CoreError> {
267    let conn = get_conn(endpoint, session_handle)?;
268    match conn.read_datagram().await {
269        Ok(data) => Ok(Some(data.to_vec())),
270        Err(e) if is_connection_closed(&e) => Ok(None),
271        Err(e) => Err(CoreError::connection_failed(format!("recv_datagram: {e}"))),
272    }
273}
274
275/// Return the current maximum datagram payload size for this session.
276///
277/// Returns `None` if datagrams are not supported on the current path.
278pub fn session_max_datagram_size(
279    endpoint: &IrohEndpoint,
280    session_handle: u64,
281) -> Result<Option<usize>, CoreError> {
282    let conn = get_conn(endpoint, session_handle)?;
283    Ok(conn.max_datagram_size())
284}
285
286// ── Helpers ───────────────────────────────────────────────────────────────────
287
288/// Wrap raw QUIC send/recv streams into body-channel–backed `FfiDuplexStream`.
289fn wrap_bidi_stream(
290    handles: &HandleStore,
291    send: iroh::endpoint::SendStream,
292    recv: iroh::endpoint::RecvStream,
293) -> Result<FfiDuplexStream, CoreError> {
294    let mut guard = handles.insert_guard();
295
296    // Receive side: pump from QUIC recv → BodyWriter → BodyReader (JS reads via nextChunk).
297    let (recv_writer, recv_reader) = handles.make_body_channel();
298    let read_handle = guard.insert_reader(recv_reader)?;
299    tokio::spawn(pump_quic_recv_to_body(recv, recv_writer));
300
301    // Send side: pump from BodyReader (JS writes via sendChunk) → QUIC send.
302    let (send_writer, send_reader) = handles.make_body_channel();
303    let write_handle = guard.insert_writer(send_writer)?;
304    tokio::spawn(pump_body_to_quic_send(send_reader, send));
305
306    guard.commit();
307    Ok(FfiDuplexStream {
308        read_handle,
309        write_handle,
310    })
311}
312
313/// Extract close code and reason from a QUIC `ConnectionError`.
314fn parse_connection_error(err: &iroh::endpoint::ConnectionError) -> (u64, String) {
315    match err {
316        iroh::endpoint::ConnectionError::ApplicationClosed(info) => {
317            let code: u64 = info.error_code.into();
318            let reason = String::from_utf8_lossy(&info.reason).into_owned();
319            (code, reason)
320        }
321        other => (0, other.to_string()),
322    }
323}