Skip to main content

mf_collab_client/
client.rs

1use futures_util::stream::{SplitSink, SplitStream};
2use futures_util::{ready, Stream};
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use tokio::net::TcpStream;
6use tokio_tungstenite::tungstenite::Message;
7use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
8use yrs::sync::Error;
9
10/// 客户端 WebSocket Sink 包装器,类似于 WarpSink 但用于客户端
11#[derive(Debug)]
12pub struct ClientSink(
13    pub SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
14);
15
16impl futures_util::Sink<Vec<u8>> for ClientSink {
17    type Error = Error;
18
19    fn poll_ready(
20        mut self: Pin<&mut Self>,
21        cx: &mut Context<'_>,
22    ) -> Poll<Result<(), Self::Error>> {
23        let sink = unsafe { Pin::new_unchecked(&mut self.0) };
24        let result = ready!(sink.poll_ready(cx));
25        match result {
26            Ok(_) => Poll::Ready(Ok(())),
27            Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
28        }
29    }
30
31    fn start_send(
32        mut self: Pin<&mut Self>,
33        item: Vec<u8>,
34    ) -> Result<(), Self::Error> {
35        let sink = unsafe { Pin::new_unchecked(&mut self.0) };
36        let result = sink.start_send(Message::binary(item));
37        match result {
38            Ok(_) => Ok(()),
39            Err(e) => Err(Error::Other(Box::new(e))),
40        }
41    }
42
43    fn poll_flush(
44        mut self: Pin<&mut Self>,
45        cx: &mut Context<'_>,
46    ) -> Poll<Result<(), Self::Error>> {
47        let sink = unsafe { Pin::new_unchecked(&mut self.0) };
48        let result = ready!(sink.poll_flush(cx));
49        match result {
50            Ok(_) => Poll::Ready(Ok(())),
51            Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
52        }
53    }
54
55    fn poll_close(
56        mut self: Pin<&mut Self>,
57        cx: &mut Context<'_>,
58    ) -> Poll<Result<(), Self::Error>> {
59        let sink = unsafe { Pin::new_unchecked(&mut self.0) };
60        let result = ready!(sink.poll_close(cx));
61        match result {
62            Ok(_) => Poll::Ready(Ok(())),
63            Err(e) => Poll::Ready(Err(Error::Other(Box::new(e)))),
64        }
65    }
66}
67
68/// 客户端 WebSocket Stream 包装器,类似于 WarpStream 但用于客户端
69#[derive(Debug)]
70pub struct ClientStream(
71    pub SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
72);
73
74impl Stream for ClientStream {
75    type Item = Result<Vec<u8>, Error>;
76
77    fn poll_next(
78        mut self: Pin<&mut Self>,
79        cx: &mut Context<'_>,
80    ) -> Poll<Option<Self::Item>> {
81        let stream = unsafe { Pin::new_unchecked(&mut self.0) };
82        let result = ready!(stream.poll_next(cx));
83        match result {
84            None => Poll::Ready(None),
85            Some(Ok(msg)) => {
86                // 处理不同类型的 WebSocket 消息
87                let bytes = match msg {
88                    Message::Binary(data) => data,
89                    Message::Text(text) => text.into_bytes(),
90                    Message::Close(_) => {
91                        return Poll::Ready(None);
92                    },
93                    Message::Ping(_) => {
94                        // 忽略 ping/pong 消息,继续处理下一个
95                        return self.poll_next(cx);
96                    },
97                    Message::Pong(_) => {
98                        // 忽略 ping/pong 消息,继续处理下一个
99                        return self.poll_next(cx);
100                    },
101                    _ => {
102                        return self.poll_next(cx);
103                    },
104                };
105                Poll::Ready(Some(Ok(bytes)))
106            },
107            Some(Err(e)) => Poll::Ready(Some(Err(Error::Other(Box::new(e))))),
108        }
109    }
110}