Skip to main content

harddrive_party_shared/client/
events.rs

1use crate::{client::ClientError, ui_messages::UiEvent};
2use bincode::deserialize;
3use futures::{stream::SplitStream, Stream, StreamExt};
4use reqwest::Url;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio_tungstenite::{connect_async, WebSocketStream};
8
9#[cfg(feature = "native")]
10pub struct EventStream(
11    SplitStream<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
12);
13
14#[cfg(feature = "native")]
15impl EventStream {
16    pub async fn new(mut ui_url: Url) -> Result<Self, ClientError> {
17        ui_url
18            .set_scheme("ws")
19            .map_err(|_| ClientError::InvalidUrl)?;
20        ui_url.set_path("ws");
21        let (ws_stream, _) = connect_async(ui_url)
22            .await
23            .map_err(|err| ClientError::ConnectionError(err.to_string()))?;
24        let (_write, read) = ws_stream.split();
25        Ok(Self(read))
26    }
27}
28
29/// Gives a stream of events from the websocket server
30#[cfg(feature = "native")]
31impl Stream for EventStream {
32    type Item = Result<UiEvent, ClientError>;
33
34    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
35        match self.0.poll_next_unpin(cx) {
36            Poll::Pending => Poll::Pending,
37            Poll::Ready(Some(message_result)) => Poll::Ready(Some(match message_result {
38                Ok(message) => match message {
39                    tokio_tungstenite::tungstenite::Message::Binary(bytes) => {
40                        Ok(deserialize(&bytes)?)
41                    }
42                    _ => Err(ClientError::UnexpectedMessageType),
43                },
44                Err(error) => Err(ClientError::ConnectionError(error.to_string())),
45            })),
46            Poll::Ready(None) => Poll::Ready(None),
47        }
48    }
49}