cross_websocket/
native.rs

1use std::borrow::Cow;
2
3use futures::sink::SinkMapErr;
4use futures::stream::{Map, SplitSink, SplitStream};
5use tokio::net::TcpStream;
6use tokio_tungstenite::tungstenite::{Error, Message};
7use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
8
9use crate::{SinkError, WebSocketClient};
10
11use eyre::Result;
12use futures::future::{ready, Ready};
13use futures::prelude::*;
14
15type Bytes = Vec<u8>;
16
17pub type Rx = Map<
18    SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>,
19    fn(Result<Message, Error>) -> Result<Bytes>,
20>;
21
22pub type Tx = sink::With<
23    SinkMapErr<
24        SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
25        fn(Error) -> SinkError,
26    >,
27    Message,
28    Bytes,
29    Ready<Result<Message, SinkError>>,
30    fn(Bytes) -> Ready<Result<Message, SinkError>>,
31>;
32
33pub async fn connect<'a, T: Into<Cow<'a, str>>>(addr: T) -> Result<WebSocketClient<Tx, Rx>> {
34    let str: Cow<'a, str> = addr.into();
35    let (stream, _response) = connect_async(str.as_ref()).await?;
36    let (tx, rx) = stream.split();
37    let rx = rx.map(bytes as fn(_) -> _);
38    let tx = tx
39        .sink_map_err(err as fn(_) -> _)
40        .with(message as fn(_) -> _);
41
42    Ok(WebSocketClient { tx, rx })
43}
44
45fn bytes(message: Result<Message, Error>) -> Result<Bytes> {
46    Ok(message?.into_data())
47}
48
49fn err(err: Error) -> SinkError {
50    SinkError::Send(err.to_string())
51}
52
53fn message(bytes: Bytes) -> Ready<Result<Message, SinkError>> {
54    ready(Ok(Message::binary(bytes)))
55}