1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
#![feature(async_await, async_closure)] use failure::{err_msg, Error}; use futures::Stream as _; use futures3::compat::{Future01CompatExt, Sink01CompatExt, Stream01CompatExt}; use futures3::{Sink, SinkExt, Stream, StreamExt}; use serde_json::Value; use std::borrow::Borrow; use std::pin::Pin; use tokio_tungstenite::connect_async; use tungstenite::error::Error as WsError; use tungstenite::Message; use url::Url; pub struct WebSocket { sink: Pin<Box<dyn Sink<Message, Error = WsError> + Send>>, stream: Pin<Box<dyn Stream<Item = Result<Message, WsError>> + Send>>, } impl WebSocket { pub async fn connect(url: impl AsRef<str>) -> Result<Self, Error> { let url = Url::parse(url.as_ref())?; let (ws_stream, _) = connect_async(url).compat().await?; let (sink, stream) = ws_stream.split(); let (sink, stream) = (sink.sink_compat(), stream.compat()); let (sink, stream) = (Box::pin(sink), Box::pin(stream)); Ok(Self { sink, stream }) } pub async fn send(&mut self, value: impl Borrow<Value>) -> Result<(), Error> { let text = serde_json::to_string(value.borrow())?; let msg = Message::Text(text); self.sink.send(msg).await?; Ok(()) } pub async fn recv(&mut self) -> Result<Value, Error> { loop { let msg = self.stream.next().await; let msg = msg.ok_or_else(|| err_msg("websocket stream ended"))??; match msg { Message::Text(text) => { let value = serde_json::from_str(&text)?; return Ok(value); } Message::Binary(data) => { let value = serde_json::from_slice(&data)?; return Ok(value); } Message::Ping(_) | Message::Pong(_) => {} Message::Close(_) => { return Err(err_msg("wsbsocket closed")); } } } } }