rocket_ws_community/
duplex.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use rocket::data::IoStream;
5use rocket::futures::stream::{FusedStream, Stream};
6use rocket::futures::{Sink, SinkExt, StreamExt};
7
8use crate::frame::{CloseFrame, Message};
9use crate::result::{Error, Result};
10
11/// A readable and writeable WebSocket [`Message`] `async` stream.
12///
13/// This struct implements [`Stream`] and [`Sink`], allowing for `async` reading
14/// and writing of [`Message`]s. The [`StreamExt`] and [`SinkExt`] traits can be
15/// imported to provide additional functionality for streams and sinks:
16///
17/// ```rust
18/// # extern crate rocket_ws_community as rocket_ws;
19/// # use rocket::get;
20/// # use rocket_ws as ws;
21/// use rocket::futures::{SinkExt, StreamExt};
22///
23/// #[get("/echo/manual")]
24/// fn echo_manual<'r>(ws: ws::WebSocket) -> ws::Channel<'r> {
25///     ws.channel(move |mut stream| Box::pin(async move {
26///         while let Some(message) = stream.next().await {
27///             let _ = stream.send(message?).await;
28///         }
29///
30///         Ok(())
31///     }))
32/// }
33/// ```
34///
35/// [`StreamExt`]: rocket::futures::StreamExt
36/// [`SinkExt`]: rocket::futures::SinkExt
37pub struct DuplexStream(tokio_tungstenite::WebSocketStream<IoStream>);
38
39impl DuplexStream {
40    pub(crate) async fn new(stream: IoStream, config: crate::Config) -> Self {
41        use crate::tungstenite::protocol::Role;
42        use tokio_tungstenite::WebSocketStream;
43
44        let inner = WebSocketStream::from_raw_socket(stream, Role::Server, Some(config));
45        DuplexStream(inner.await)
46    }
47
48    /// Close the stream now. This does not typically need to be called.
49    pub async fn close(&mut self, msg: Option<CloseFrame<'_>>) -> Result<()> {
50        self.0.close(msg).await
51    }
52}
53
54impl Stream for DuplexStream {
55    type Item = Result<Message>;
56
57    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58        self.get_mut().0.poll_next_unpin(cx)
59    }
60
61    fn size_hint(&self) -> (usize, Option<usize>) {
62        self.0.size_hint()
63    }
64}
65
66impl FusedStream for DuplexStream {
67    fn is_terminated(&self) -> bool {
68        self.0.is_terminated()
69    }
70}
71
72impl Sink<Message> for DuplexStream {
73    type Error = Error;
74
75    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
76        self.get_mut().0.poll_ready_unpin(cx)
77    }
78
79    fn start_send(self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error> {
80        self.get_mut().0.start_send_unpin(item)
81    }
82
83    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
84        self.get_mut().0.poll_flush_unpin(cx)
85    }
86
87    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
88        self.get_mut().0.poll_close_unpin(cx)
89    }
90}