websocket_transport/
lib.rs

1//! See the type-level documentation for [`WsTransport`](struct.WsTransport.html).
2
3#![cfg_attr(feature = "strict", deny(warnings))]
4#![cfg_attr(feature = "strict", deny(missing_docs, missing_debug_implementations))]
5#![cfg_attr(feature = "clippy", feature(plugin))]
6#![cfg_attr(feature = "clippy", plugin(clippy))]
7#![cfg_attr(feature = "clippy", allow(doc_markdown))]
8#![doc(html_root_url = "https://docs.rs/websocket-transport/0.1.0")]
9
10#[macro_use]
11extern crate futures;
12extern crate websocket;
13
14use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
15use std::fmt;
16use std::str::Utf8Error;
17use websocket::message::OwnedMessage;
18
19/// An easy wrapper around an async WebSocket which implements
20/// [`Stream`](https://docs.rs/futures/0.1.15/futures/stream/trait.Stream.html)
21/// and
22/// [`Sink`](https://docs.rs/futures/0.1.15/futures/sink/trait.Sink.html)
23/// for `String`.
24///
25/// This type automatically takes care of:
26///
27/// - receiving and responding to `Ping`s, as the `Stream` is polled
28/// - attempting to convert `Binary` messages to UTF-8 `String`s
29///
30/// It can be wrapped around
31/// [`Client`](https://docs.rs/websocket/0.20.2/websocket/client/async/type.Client.html)
32/// or any other type which implements
33/// [`Stream`](https://docs.rs/futures/0.1.15/futures/stream/trait.Stream.html)
34/// and
35/// [`Sink`](https://docs.rs/futures/0.1.15/futures/sink/trait.Sink.html) for
36/// [`OwnedMessage`](https://docs.rs/websocket/0.20.2/websocket/message/enum.OwnedMessage.html).
37pub struct WsTransport<T> {
38    inner: T,
39    sending: Option<OwnedMessage>,
40    flushing: bool,
41}
42
43impl<T> WsTransport<T> {
44    /// Wrap around an inner async WebSocket transport.
45    ///
46    /// `T` can be
47    /// [`Client`](https://docs.rs/websocket/0.20.2/websocket/client/async/type.Client.html)
48    /// or any other type which implements
49    /// [`Stream`](https://docs.rs/futures/0.1.15/futures/stream/trait.Stream.html)
50    /// and
51    /// [`Sink`](https://docs.rs/futures/0.1.15/futures/sink/trait.Sink.html) for
52    /// [`OwnedMessage`](https://docs.rs/websocket/0.20.2/websocket/message/enum.OwnedMessage.html).
53    pub fn new(inner: T) -> Self {
54        WsTransport {
55            inner: inner,
56            sending: None,
57            flushing: false,
58        }
59    }
60}
61
62impl<T> fmt::Debug for WsTransport<T>
63where
64    T: fmt::Debug,
65{
66    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67        f.debug_struct("WsTransport")
68            .field("ws", &Omitted)
69            .field("sending", &self.sending)
70            .field("flushing", &self.flushing)
71            .finish()
72    }
73}
74
75impl<T> Stream for WsTransport<T>
76where
77    T: Stream<Item = OwnedMessage>,
78    T: Sink<SinkItem = OwnedMessage, SinkError = <T as Stream>::Error>,
79    T::Error: From<Utf8Error>,
80{
81    type Item = String;
82    type Error = T::Error;
83
84    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
85        loop {
86            if let Some(msg) = self.sending.take() {
87                if let AsyncSink::NotReady(msg) = self.inner.start_send(msg)? {
88                    self.sending = Some(msg);
89                    return Ok(Async::NotReady);
90                }
91                self.flushing = true;
92            }
93
94            if self.flushing {
95                try_ready!(self.inner.poll_complete());
96                self.flushing = false;
97            }
98
99            let item = try_ready!(self.inner.poll());
100            match item {
101                None => return Ok(None.into()),
102                Some(OwnedMessage::Text(text)) => {
103                    return Ok(Some(text).into());
104                }
105                Some(OwnedMessage::Binary(bytes)) => {
106                    let text = String::from_utf8(bytes).map_err(|err| err.utf8_error().into())?;
107                    return Ok(Some(text).into());
108                }
109                Some(OwnedMessage::Ping(data)) => {
110                    self.sending = Some(OwnedMessage::Pong(data));
111                }
112                Some(OwnedMessage::Close(_)) | Some(OwnedMessage::Pong(_)) => (),
113            }
114        }
115    }
116}
117
118impl<T> Sink for WsTransport<T>
119where
120    T: Sink<SinkItem = OwnedMessage>,
121{
122    type SinkItem = String;
123    type SinkError = T::SinkError;
124
125    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
126        Ok(match self.inner.start_send(OwnedMessage::Text(item))? {
127            AsyncSink::Ready => AsyncSink::Ready,
128            AsyncSink::NotReady(msg) => AsyncSink::NotReady(match msg {
129                OwnedMessage::Text(item) => item,
130                _ => unreachable!("websocket-transport: inner Sink broke its contract"),
131            }),
132        })
133    }
134
135    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
136        self.inner.poll_complete()
137    }
138
139    fn close(&mut self) -> Poll<(), Self::SinkError> {
140        self.inner.close()
141    }
142}
143
144struct Omitted;
145
146impl fmt::Debug for Omitted {
147    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
148        write!(f, "...")
149    }
150}