mf_collab_client/
client.rs1use futures_util::stream::{SplitSink, SplitStream};
2use futures_util::{ready, Stream};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::net::TcpStream;
6use tokio_tungstenite::tungstenite::Message;
7use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
8use yrs::sync::Error;
9
10#[derive(Debug)]
12pub struct ClientSink(
13 pub SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
14);
15
16impl futures_util::Sink<Vec<u8>> for ClientSink {
17 type Error = Error;
18
19 fn poll_ready(
20 mut self: Pin<&mut Self>,
21 cx: &mut Context<'_>,
22 ) -> Poll<Result<(), Self::Error>> {
23 let sink = unsafe { Pin::new_unchecked(&mut self.0) };
24 let result = ready!(sink.poll_ready(cx));
25 match result {
26 Ok(_) => Poll::Ready(Ok(())),
27 Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
28 }
29 }
30
31 fn start_send(
32 mut self: Pin<&mut Self>,
33 item: Vec<u8>,
34 ) -> Result<(), Self::Error> {
35 let sink = unsafe { Pin::new_unchecked(&mut self.0) };
36 let result = sink.start_send(Message::binary(item));
37 match result {
38 Ok(_) => Ok(()),
39 Err(e) => Err(Error::Other(Box::new(e))),
40 }
41 }
42
43 fn poll_flush(
44 mut self: Pin<&mut Self>,
45 cx: &mut Context<'_>,
46 ) -> Poll<Result<(), Self::Error>> {
47 let sink = unsafe { Pin::new_unchecked(&mut self.0) };
48 let result = ready!(sink.poll_flush(cx));
49 match result {
50 Ok(_) => Poll::Ready(Ok(())),
51 Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
52 }
53 }
54
55 fn poll_close(
56 mut self: Pin<&mut Self>,
57 cx: &mut Context<'_>,
58 ) -> Poll<Result<(), Self::Error>> {
59 let sink = unsafe { Pin::new_unchecked(&mut self.0) };
60 let result = ready!(sink.poll_close(cx));
61 match result {
62 Ok(_) => Poll::Ready(Ok(())),
63 Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
64 }
65 }
66}
67
68#[derive(Debug)]
70pub struct ClientStream(
71 pub SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
72);
73
74impl Stream for ClientStream {
75 type Item = Result<Vec<u8>, Error>;
76
77 fn poll_next(
78 mut self: Pin<&mut Self>,
79 cx: &mut Context<'_>,
80 ) -> Poll<Option<Self::Item>> {
81 let stream = unsafe { Pin::new_unchecked(&mut self.0) };
82 let result = ready!(stream.poll_next(cx));
83 match result {
84 None => Poll::Ready(None),
85 Some(Ok(msg)) => {
86 let bytes = match msg {
88 Message::Binary(data) => data,
89 Message::Text(text) => text.into_bytes(),
90 Message::Close(_) => {
91 return Poll::Ready(None);
92 },
93 Message::Ping(_) => {
94 return self.poll_next(cx);
96 },
97 Message::Pong(_) => {
98 return self.poll_next(cx);
100 },
101 _ => {
102 return self.poll_next(cx);
103 },
104 };
105 Poll::Ready(Some(Ok(bytes)))
106 },
107 Some(Err(e)) => Poll::Ready(Some(Err(Error::Other(Box::new(e))))),
108 }
109 }
110}