websocketz/
websocket.rs

1use embedded_io_async::{Read, Write};
2use framez::{
3    Framed,
4    state::{ReadState, ReadWriteState, WriteState},
5};
6use rand::RngCore;
7
8use crate::{
9    FragmentsState, Frame, FramesCodec, Message, OnFrame, WebSocketCore,
10    error::{Error, ProtocolError},
11    http::{Request, Response},
12    options::{AcceptOptions, ConnectOptions},
13};
14
15/// A WebSocket connection.
16///
17/// # Defaults:
18///
19/// - `auto_pong`: `true`
20/// - `auto_close`: `true`
21#[derive(Debug)]
22pub struct WebSocket<'buf, RW, Rng> {
23    #[doc(hidden)]
24    pub core: WebSocketCore<'buf, RW, Rng>,
25}
26
27impl<'buf, RW, Rng> WebSocket<'buf, RW, Rng> {
28    /// Creates a new [`WebSocket`] client after a successful handshake.
29    pub const fn client(
30        inner: RW,
31        rng: Rng,
32        read_buffer: &'buf mut [u8],
33        write_buffer: &'buf mut [u8],
34        fragments_buffer: &'buf mut [u8],
35    ) -> Self {
36        Self {
37            core: WebSocketCore::client(
38                inner,
39                rng,
40                read_buffer,
41                write_buffer,
42                FragmentsState::new(fragments_buffer),
43            ),
44        }
45    }
46
47    /// Creates a new [`WebSocket`] server after a successful handshake.
48    pub const fn server(
49        inner: RW,
50        rng: Rng,
51        read_buffer: &'buf mut [u8],
52        write_buffer: &'buf mut [u8],
53        fragments_buffer: &'buf mut [u8],
54    ) -> Self {
55        Self {
56            core: WebSocketCore::server(
57                inner,
58                rng,
59                read_buffer,
60                write_buffer,
61                FragmentsState::new(fragments_buffer),
62            ),
63        }
64    }
65
66    /// Creates a new [`WebSocket`] client and performs the handshake.
67    ///
68    /// # Generic Parameters
69    /// `N`: The maximum number of headers to accept in the handshake response.
70    pub async fn connect<const N: usize>(
71        options: ConnectOptions<'_, '_>,
72        inner: RW,
73        rng: Rng,
74        read_buffer: &'buf mut [u8],
75        write_buffer: &'buf mut [u8],
76        fragments_buffer: &'buf mut [u8],
77    ) -> Result<Self, Error<RW::Error>>
78    where
79        RW: Read + Write,
80        Rng: RngCore,
81    {
82        Ok(Self::connect_with::<N, _, _, _>(
83            options,
84            inner,
85            rng,
86            read_buffer,
87            write_buffer,
88            fragments_buffer,
89            |_| Ok(()),
90        )
91        .await?
92        .0)
93    }
94
95    /// Creates a new [`WebSocket`] client and performs the handshake with a custom response handler.
96    ///
97    /// # Generic Parameters
98    /// `N`: The maximum number of headers to accept in the handshake response.
99    pub async fn connect_with<const N: usize, F, T, E>(
100        options: ConnectOptions<'_, '_>,
101        inner: RW,
102        rng: Rng,
103        read_buffer: &'buf mut [u8],
104        write_buffer: &'buf mut [u8],
105        fragments_buffer: &'buf mut [u8],
106        on_response: F,
107    ) -> Result<(Self, T), Error<RW::Error, E>>
108    where
109        F: for<'a> Fn(&Response<'a, N>) -> Result<T, E>,
110        RW: Read + Write,
111        Rng: RngCore,
112    {
113        Self::client(inner, rng, read_buffer, write_buffer, fragments_buffer)
114            .client_handshake::<N, _, _, _>(options, on_response)
115            .await
116    }
117
118    /// Creates a new [`WebSocket`] server and performs the handshake.
119    ///
120    /// # Generic Parameters
121    /// `N`: The maximum number of headers to accept in the handshake request.
122    pub async fn accept<const N: usize>(
123        options: AcceptOptions<'_, '_>,
124        inner: RW,
125        rng: Rng,
126        read_buffer: &'buf mut [u8],
127        write_buffer: &'buf mut [u8],
128        fragments_buffer: &'buf mut [u8],
129    ) -> Result<Self, Error<RW::Error>>
130    where
131        RW: Read + Write,
132    {
133        Ok(Self::accept_with::<N, _, _, _>(
134            options,
135            inner,
136            rng,
137            read_buffer,
138            write_buffer,
139            fragments_buffer,
140            |_| Ok(()),
141        )
142        .await?
143        .0)
144    }
145
146    /// Creates a new [`WebSocket`] server and performs the handshake with a custom request handler.
147    ///
148    /// # Generic Parameters
149    /// `N`: The maximum number of headers to accept in the handshake request.
150    pub async fn accept_with<const N: usize, F, T, E>(
151        options: AcceptOptions<'_, '_>,
152        inner: RW,
153        rng: Rng,
154        read_buffer: &'buf mut [u8],
155        write_buffer: &'buf mut [u8],
156        fragments_buffer: &'buf mut [u8],
157        on_request: F,
158    ) -> Result<(Self, T), Error<RW::Error, E>>
159    where
160        F: for<'a> Fn(&Request<'a, N>) -> Result<T, E>,
161        RW: Read + Write,
162    {
163        Self::server(inner, rng, read_buffer, write_buffer, fragments_buffer)
164            .server_handshake::<N, _, _, _>(options, on_request)
165            .await
166    }
167
168    /// Sets whether to automatically send a Pong response.
169    #[inline]
170    pub const fn with_auto_pong(mut self, auto_pong: bool) -> Self {
171        self.core.set_auto_pong(auto_pong);
172        self
173    }
174
175    /// Sets whether to automatically close the connection on receiving a Close frame.
176    #[inline]
177    pub const fn with_auto_close(mut self, auto_close: bool) -> Self {
178        self.core.set_auto_close(auto_close);
179        self
180    }
181
182    /// Returns reference to the reader/writer.
183    #[inline]
184    pub const fn inner(&self) -> &RW {
185        self.core.inner()
186    }
187
188    /// Returns mutable reference to the reader/writer.
189    #[inline]
190    pub const fn inner_mut(&mut self) -> &mut RW {
191        self.core.inner_mut()
192    }
193
194    /// Consumes the [`WebSocket`] and returns the reader/writer.
195    #[inline]
196    pub fn into_inner(self) -> RW {
197        self.core.into_inner()
198    }
199
200    /// Returns the number of bytes that can be framed.
201    #[inline]
202    pub const fn framable(&self) -> usize {
203        self.core.framable()
204    }
205
206    async fn client_handshake<const N: usize, F, T, E>(
207        self,
208        options: ConnectOptions<'_, '_>,
209        on_response: F,
210    ) -> Result<(Self, T), Error<RW::Error, E>>
211    where
212        F: for<'a> Fn(&Response<'a, N>) -> Result<T, E>,
213        RW: Read + Write,
214        Rng: RngCore,
215    {
216        let (core, custom) = self
217            .core
218            .client_handshake::<N, _, _, _>(options, on_response)
219            .await?;
220
221        Ok((Self { core }, custom))
222    }
223
224    async fn server_handshake<const N: usize, F, T, E>(
225        self,
226        options: AcceptOptions<'_, '_>,
227        on_request: F,
228    ) -> Result<(Self, T), Error<RW::Error, E>>
229    where
230        F: for<'a> Fn(&Request<'a, N>) -> Result<T, E>,
231        RW: Read + Write,
232    {
233        let (core, custom) = self
234            .core
235            .server_handshake::<N, _, _, _>(options, on_request)
236            .await?;
237
238        Ok((Self { core }, custom))
239    }
240
241    /// Sends a WebSocket message.
242    pub async fn send(&mut self, message: Message<'_>) -> Result<(), Error<RW::Error>>
243    where
244        RW: Write,
245        Rng: RngCore,
246    {
247        self.core.send(message).await
248    }
249
250    /// Sends a fragmented WebSocket message.
251    pub async fn send_fragmented(
252        &mut self,
253        message: Message<'_>,
254        fragment_size: usize,
255    ) -> Result<(), Error<RW::Error>>
256    where
257        RW: Write,
258        Rng: RngCore,
259    {
260        self.core.send_fragmented(message, fragment_size).await
261    }
262
263    /// Splits the [`WebSocket`] into a [`WebSocketRead`] and a [`WebSocketWrite`] with the provided `split` function.
264    ///
265    /// # Note
266    ///
267    /// `auto_pong` and `auto_close` will `NOT` be applied to the split instances.
268    pub fn split_with<F, R, W>(
269        self,
270        split: F,
271    ) -> (WebSocketRead<'buf, R>, WebSocketWrite<'buf, W, Rng>)
272    where
273        F: FnOnce(RW) -> (R, W),
274    {
275        let (codec, inner, state) = self.core.framed.into_parts();
276        let (read_codec, write_codec) = codec.split();
277
278        let (read, write) = split(inner);
279
280        let framed_read = Framed::from_parts(
281            read_codec,
282            read,
283            ReadWriteState::new(state.read, WriteState::empty()),
284        );
285
286        let framed_write = Framed::from_parts(
287            write_codec,
288            write,
289            ReadWriteState::new(ReadState::empty(), state.write),
290        );
291
292        (
293            WebSocketRead::new_from_framed(framed_read, self.core.fragments_state),
294            WebSocketWrite::new_from_framed(framed_write),
295        )
296    }
297
298    #[doc(hidden)]
299    pub const fn auto(
300        &self,
301    ) -> impl FnOnce(Frame<'_>) -> Result<OnFrame<'_>, ProtocolError> + 'static {
302        self.core.auto()
303    }
304
305    #[doc(hidden)]
306    pub const fn caller(&self) -> crate::functions::ReadAutoCaller {
307        crate::functions::ReadAutoCaller
308    }
309}
310
311/// Read half of a WebSocket connection.
312#[derive(Debug)]
313pub struct WebSocketRead<'buf, RW> {
314    #[doc(hidden)]
315    pub core: WebSocketCore<'buf, RW, ()>,
316}
317
318impl<'buf, RW> WebSocketRead<'buf, RW> {
319    const fn new_from_framed(
320        framed: Framed<'buf, FramesCodec<()>, RW>,
321        fragments_state: FragmentsState<'buf>,
322    ) -> Self {
323        Self {
324            core: WebSocketCore::new_from_framed(framed, fragments_state),
325        }
326    }
327
328    /// Creates a new [`WebSocketRead`] client after a successful handshake.
329    pub const fn client(
330        inner: RW,
331        read_buffer: &'buf mut [u8],
332        fragments_buffer: &'buf mut [u8],
333    ) -> Self {
334        Self {
335            core: WebSocketCore::client(
336                inner,
337                (),
338                read_buffer,
339                &mut [],
340                FragmentsState::new(fragments_buffer),
341            ),
342        }
343    }
344
345    /// Creates a new [`WebSocketRead`] server after a successful handshake.
346    pub const fn server(
347        inner: RW,
348        read_buffer: &'buf mut [u8],
349        fragments_buffer: &'buf mut [u8],
350    ) -> Self {
351        Self {
352            core: WebSocketCore::server(
353                inner,
354                (),
355                read_buffer,
356                &mut [],
357                FragmentsState::new(fragments_buffer),
358            ),
359        }
360    }
361
362    /// Returns reference to the reader.
363    #[inline]
364    pub const fn inner(&self) -> &RW {
365        self.core.inner()
366    }
367
368    /// Returns mutable reference to the reader.
369    #[inline]
370    pub const fn inner_mut(&mut self) -> &mut RW {
371        self.core.inner_mut()
372    }
373
374    /// Consumes the [`WebSocketRead`] and returns the reader.
375    #[inline]
376    pub fn into_inner(self) -> RW {
377        self.core.into_inner()
378    }
379
380    /// Returns the number of bytes that can be framed.
381    #[inline]
382    pub const fn framable(&self) -> usize {
383        self.core.framable()
384    }
385
386    #[doc(hidden)]
387    pub const fn auto(&self) {}
388
389    #[doc(hidden)]
390    pub const fn caller(&self) -> crate::functions::ReadCaller {
391        crate::functions::ReadCaller
392    }
393}
394
395/// Write half of a WebSocket connection.
396#[derive(Debug)]
397pub struct WebSocketWrite<'buf, RW, Rng> {
398    #[doc(hidden)]
399    pub core: WebSocketCore<'buf, RW, Rng>,
400}
401
402impl<'buf, RW, Rng> WebSocketWrite<'buf, RW, Rng> {
403    const fn new_from_framed(framed: Framed<'buf, FramesCodec<Rng>, RW>) -> Self {
404        Self {
405            core: WebSocketCore::new_from_framed(framed, FragmentsState::empty()),
406        }
407    }
408
409    /// Creates a new [`WebSocketWrite`] client after a successful handshake.
410    pub const fn client(inner: RW, rng: Rng, write_buffer: &'buf mut [u8]) -> Self {
411        Self {
412            core: WebSocketCore::client(inner, rng, &mut [], write_buffer, FragmentsState::empty()),
413        }
414    }
415
416    /// Creates a new [`WebSocketWrite`] server after a successful handshake.
417    pub const fn server(inner: RW, rng: Rng, write_buffer: &'buf mut [u8]) -> Self {
418        Self {
419            core: WebSocketCore::server(inner, rng, &mut [], write_buffer, FragmentsState::empty()),
420        }
421    }
422
423    /// Returns reference to the writer.
424    #[inline]
425    pub const fn inner(&self) -> &RW {
426        self.core.inner()
427    }
428
429    /// Returns mutable reference to the writer.
430    #[inline]
431    pub const fn inner_mut(&mut self) -> &mut RW {
432        self.core.inner_mut()
433    }
434
435    /// Consumes the [`WebSocketWrite`] and returns the writer.
436    #[inline]
437    pub fn into_inner(self) -> RW {
438        self.core.into_inner()
439    }
440
441    /// Sends a WebSocket message.
442    pub async fn send(&mut self, message: Message<'_>) -> Result<(), Error<RW::Error>>
443    where
444        RW: Write,
445        Rng: RngCore,
446    {
447        self.core.send(message).await
448    }
449
450    /// Sends a fragmented WebSocket message.
451    pub async fn send_fragmented(
452        &mut self,
453        message: Message<'_>,
454        fragment_size: usize,
455    ) -> Result<(), Error<RW::Error>>
456    where
457        RW: Write,
458        Rng: RngCore,
459    {
460        self.core.send_fragmented(message, fragment_size).await
461    }
462}