Skip to main content

web_transport_node/
lib.rs

1use napi::bindgen_prelude::*;
2use napi_derive::napi;
3
4use tokio::sync::Mutex;
5
6fn session_error_to_close_info(err: &web_transport_quinn::SessionError) -> NapiCloseInfo {
7    match err {
8        web_transport_quinn::SessionError::WebTransportError(
9            web_transport_quinn::WebTransportError::Closed(code, reason),
10        ) => NapiCloseInfo {
11            close_code: *code,
12            reason: reason.clone(),
13        },
14        other => NapiCloseInfo {
15            close_code: 0,
16            reason: other.to_string(),
17        },
18    }
19}
20
21/// A WebTransport client that can connect to servers.
22#[napi]
23pub struct NapiClient {
24    inner: web_transport_quinn::Client,
25}
26
27#[napi]
28impl NapiClient {
29    /// Create a client that validates server certificates against system root CAs.
30    #[napi(factory)]
31    pub fn with_system_roots() -> Result<Self> {
32        napi::bindgen_prelude::within_runtime_if_available(|| {
33            let client = web_transport_quinn::ClientBuilder::new()
34                .with_system_roots()
35                .map_err(|e| Error::from_reason(e.to_string()))?;
36            Ok(Self { inner: client })
37        })
38    }
39
40    /// Create a client that skips certificate verification entirely.
41    /// WARNING: Only use for testing with self-signed certificates.
42    #[napi(factory)]
43    pub fn disable_verify() -> Result<Self> {
44        napi::bindgen_prelude::within_runtime_if_available(|| {
45            let client = web_transport_quinn::ClientBuilder::new()
46                .dangerous()
47                .with_no_certificate_verification()
48                .map_err(|e| Error::from_reason(e.to_string()))?;
49            Ok(Self { inner: client })
50        })
51    }
52
53    /// Create a client that validates server certificates by SHA-256 hash.
54    #[napi(factory)]
55    pub fn with_certificate_hashes(hashes: Vec<Buffer>) -> Result<Self> {
56        napi::bindgen_prelude::within_runtime_if_available(|| {
57            let hashes: Vec<Vec<u8>> = hashes.into_iter().map(|b| b.to_vec()).collect();
58            let client = web_transport_quinn::ClientBuilder::new()
59                .with_server_certificate_hashes(hashes)
60                .map_err(|e| Error::from_reason(e.to_string()))?;
61            Ok(Self { inner: client })
62        })
63    }
64
65    /// Connect to a WebTransport server at the given URL.
66    #[napi]
67    pub async fn connect(&self, url_str: String) -> Result<NapiSession> {
68        let url: url::Url = url_str
69            .parse()
70            .map_err(|e: url::ParseError| Error::from_reason(e.to_string()))?;
71        let session = self
72            .inner
73            .connect(url)
74            .await
75            .map_err(|e| Error::from_reason(e.to_string()))?;
76        Ok(NapiSession {
77            inner: session.clone(),
78            closed: Mutex::new(None),
79        })
80    }
81}
82
83/// A WebTransport server that accepts incoming sessions.
84#[napi]
85pub struct NapiServer {
86    inner: Mutex<Option<web_transport_quinn::Server>>,
87}
88
89#[napi]
90impl NapiServer {
91    /// Create a server bound to the given address with the given TLS certificate.
92    #[napi(factory)]
93    pub fn bind(addr: String, cert_pem: Buffer, key_pem: Buffer) -> Result<Self> {
94        napi::bindgen_prelude::within_runtime_if_available(|| {
95            let certs = rustls_pemfile::certs(&mut &cert_pem[..])
96                .collect::<std::result::Result<Vec<_>, _>>()
97                .map_err(|e| Error::from_reason(format!("invalid certificate PEM: {e}")))?;
98            let key = rustls_pemfile::private_key(&mut &key_pem[..])
99                .map_err(|e| Error::from_reason(format!("invalid private key PEM: {e}")))?
100                .ok_or_else(|| Error::from_reason("no private key found in PEM"))?;
101
102            let addr: std::net::SocketAddr = addr
103                .parse()
104                .map_err(|e: std::net::AddrParseError| Error::from_reason(e.to_string()))?;
105
106            let server = web_transport_quinn::ServerBuilder::new()
107                .with_addr(addr)
108                .with_certificate(certs, key)
109                .map_err(|e| Error::from_reason(e.to_string()))?;
110
111            Ok(Self {
112                inner: Mutex::new(Some(server)),
113            })
114        })
115    }
116
117    /// Accept the next incoming WebTransport session request.
118    #[napi]
119    pub async fn accept(&self) -> Result<Option<NapiRequest>> {
120        let mut guard = self.inner.lock().await;
121        let server = match guard.as_mut() {
122            Some(server) => server,
123            None => return Ok(None),
124        };
125        match server.accept().await {
126            Some(request) => Ok(Some(NapiRequest {
127                inner: Mutex::new(Some(request)),
128            })),
129            None => {
130                guard.take();
131                Ok(None)
132            }
133        }
134    }
135
136    /// Close the server, stopping it from accepting new connections.
137    #[napi]
138    pub fn close(&self) {
139        within_runtime_if_available(|| {
140            self.inner.blocking_lock().take();
141        });
142    }
143}
144
145/// A pending WebTransport session request from a client.
146#[napi]
147pub struct NapiRequest {
148    // Option so we can take it in ok()/reject() which consume the Request.
149    inner: Mutex<Option<web_transport_quinn::Request>>,
150}
151
152#[napi]
153impl NapiRequest {
154    /// Get the URL of the CONNECT request.
155    #[napi(getter)]
156    pub async fn url(&self) -> Result<String> {
157        let guard = self.inner.lock().await;
158        let request = guard
159            .as_ref()
160            .ok_or_else(|| Error::from_reason("request already consumed"))?;
161        Ok(request.url.to_string())
162    }
163
164    /// Accept the session with 200 OK.
165    #[napi]
166    pub async fn ok(&self) -> Result<NapiSession> {
167        let request = self
168            .inner
169            .lock()
170            .await
171            .take()
172            .ok_or_else(|| Error::from_reason("request already consumed"))?;
173        let session = request
174            .ok()
175            .await
176            .map_err(|e| Error::from_reason(e.to_string()))?;
177        Ok(NapiSession {
178            inner: session.clone(),
179            closed: Mutex::new(None),
180        })
181    }
182
183    /// Reject the session with the given HTTP status code.
184    #[napi]
185    pub async fn reject(&self, status: u16) -> Result<()> {
186        let request = self
187            .inner
188            .lock()
189            .await
190            .take()
191            .ok_or_else(|| Error::from_reason("request already consumed"))?;
192        let status = http::StatusCode::from_u16(status)
193            .map_err(|e| Error::from_reason(format!("invalid status code: {e}")))?;
194        request
195            .reject(status)
196            .await
197            .map_err(|e| Error::from_reason(e.to_string()))?;
198        Ok(())
199    }
200}
201
202/// An established WebTransport session.
203#[napi]
204pub struct NapiSession {
205    inner: web_transport_quinn::Session,
206    // Cache the closed future result so multiple callers can await it.
207    closed: Mutex<Option<NapiCloseInfo>>,
208}
209
210/// Info about why a session was closed, matching W3C WebTransportCloseInfo.
211#[derive(Clone)]
212#[napi(object)]
213pub struct NapiCloseInfo {
214    pub close_code: u32,
215    pub reason: String,
216}
217
218/// A bidirectional stream pair.
219#[napi]
220pub struct NapiBiStream {
221    send: Option<NapiSendStream>,
222    recv: Option<NapiRecvStream>,
223}
224
225#[napi]
226impl NapiBiStream {
227    /// Take the send stream. Can only be called once.
228    #[napi]
229    pub fn take_send(&mut self) -> Result<NapiSendStream> {
230        self.send
231            .take()
232            .ok_or_else(|| Error::from_reason("send stream already taken"))
233    }
234
235    /// Take the recv stream. Can only be called once.
236    #[napi]
237    pub fn take_recv(&mut self) -> Result<NapiRecvStream> {
238        self.recv
239            .take()
240            .ok_or_else(|| Error::from_reason("recv stream already taken"))
241    }
242}
243
244#[napi]
245impl NapiSession {
246    /// Accept an incoming unidirectional stream.
247    #[napi]
248    pub async fn accept_uni(&self) -> Result<NapiRecvStream> {
249        let recv = self
250            .inner
251            .accept_uni()
252            .await
253            .map_err(|e| Error::from_reason(e.to_string()))?;
254        Ok(NapiRecvStream {
255            inner: Mutex::new(recv),
256        })
257    }
258
259    /// Accept an incoming bidirectional stream.
260    #[napi]
261    pub async fn accept_bi(&self) -> Result<NapiBiStream> {
262        let (send, recv) = self
263            .inner
264            .accept_bi()
265            .await
266            .map_err(|e| Error::from_reason(e.to_string()))?;
267        Ok(NapiBiStream {
268            send: Some(NapiSendStream {
269                inner: Mutex::new(send),
270            }),
271            recv: Some(NapiRecvStream {
272                inner: Mutex::new(recv),
273            }),
274        })
275    }
276
277    /// Open a new unidirectional stream.
278    #[napi]
279    pub async fn open_uni(&self) -> Result<NapiSendStream> {
280        let send = self
281            .inner
282            .open_uni()
283            .await
284            .map_err(|e| Error::from_reason(e.to_string()))?;
285        Ok(NapiSendStream {
286            inner: Mutex::new(send),
287        })
288    }
289
290    /// Open a new bidirectional stream.
291    #[napi]
292    pub async fn open_bi(&self) -> Result<NapiBiStream> {
293        let (send, recv) = self
294            .inner
295            .open_bi()
296            .await
297            .map_err(|e| Error::from_reason(e.to_string()))?;
298        Ok(NapiBiStream {
299            send: Some(NapiSendStream {
300                inner: Mutex::new(send),
301            }),
302            recv: Some(NapiRecvStream {
303                inner: Mutex::new(recv),
304            }),
305        })
306    }
307
308    /// Send a datagram.
309    #[napi]
310    pub fn send_datagram(&self, data: Buffer) -> Result<()> {
311        within_runtime_if_available(|| {
312            self.inner
313                .send_datagram(bytes::Bytes::from(data.to_vec()))
314                .map_err(|e| Error::from_reason(e.to_string()))
315        })
316    }
317
318    /// Receive a datagram.
319    #[napi]
320    pub async fn recv_datagram(&self) -> Result<Buffer> {
321        let data = self
322            .inner
323            .read_datagram()
324            .await
325            .map_err(|e| Error::from_reason(e.to_string()))?;
326        Ok(Buffer::from(data.to_vec()))
327    }
328
329    /// Get the maximum datagram size.
330    #[napi]
331    pub fn max_datagram_size(&self) -> u32 {
332        within_runtime_if_available(|| self.inner.max_datagram_size() as u32)
333    }
334
335    /// Close the session with a code and reason.
336    #[napi]
337    pub fn close(&self, code: u32, reason: String) {
338        within_runtime_if_available(|| {
339            self.inner.close(code, reason.as_bytes());
340        });
341    }
342
343    /// Wait for the session to close, returning close info matching W3C WebTransportCloseInfo.
344    #[napi]
345    pub async fn closed(&self) -> Result<NapiCloseInfo> {
346        // Check if we already have a cached result.
347        {
348            let cached = self.closed.lock().await;
349            if let Some(info) = cached.as_ref() {
350                return Ok(info.clone());
351            }
352        }
353
354        let err = self.inner.closed().await;
355        let info = session_error_to_close_info(&err);
356
357        // Cache the result.
358        {
359            let mut cached = self.closed.lock().await;
360            *cached = Some(info.clone());
361        }
362
363        Ok(info)
364    }
365}
366
367/// A send stream for writing data.
368#[napi]
369pub struct NapiSendStream {
370    inner: Mutex<web_transport_quinn::SendStream>,
371}
372
373#[napi]
374impl NapiSendStream {
375    /// Write data to the stream.
376    #[napi]
377    pub async fn write(&self, data: Buffer) -> Result<()> {
378        let mut stream = self.inner.lock().await;
379        stream
380            .write_all(&data)
381            .await
382            .map_err(|e| Error::from_reason(e.to_string()))
383    }
384
385    /// Signal that no more data will be written.
386    #[napi]
387    pub async fn finish(&self) -> Result<()> {
388        let mut stream = self.inner.lock().await;
389        stream
390            .finish()
391            .map_err(|e| Error::from_reason(e.to_string()))
392    }
393
394    /// Abruptly reset the stream with an error code.
395    #[napi]
396    pub async fn reset(&self, code: u32) -> Result<()> {
397        let mut stream = self.inner.lock().await;
398        stream
399            .reset(code)
400            .map_err(|e| Error::from_reason(e.to_string()))
401    }
402
403    /// Set the priority of the stream.
404    #[napi]
405    pub async fn set_priority(&self, priority: i32) -> Result<()> {
406        let stream = self.inner.lock().await;
407        stream
408            .set_priority(priority)
409            .map_err(|e| Error::from_reason(e.to_string()))
410    }
411}
412
413/// A receive stream for reading data.
414#[napi]
415pub struct NapiRecvStream {
416    inner: Mutex<web_transport_quinn::RecvStream>,
417}
418
419#[napi]
420impl NapiRecvStream {
421    /// Read up to `max_size` bytes from the stream. Returns null on FIN.
422    #[napi]
423    pub async fn read(&self, max_size: u32) -> Result<Option<Buffer>> {
424        let mut stream = self.inner.lock().await;
425        let Some(chunk) = stream
426            .read_chunk(max_size as usize, true)
427            .await
428            .map_err(|e| Error::from_reason(e.to_string()))?
429        else {
430            return Ok(None);
431        };
432
433        Ok(Some(Buffer::from(chunk.bytes.to_vec())))
434    }
435
436    /// Tell the peer to stop sending with the given error code.
437    #[napi]
438    pub async fn stop(&self, code: u32) -> Result<()> {
439        let mut stream = self.inner.lock().await;
440        stream
441            .stop(code)
442            .map_err(|e| Error::from_reason(e.to_string()))
443    }
444}