websocket_transport/
lib.rs1#![cfg_attr(feature = "strict", deny(warnings))]
4#![cfg_attr(feature = "strict", deny(missing_docs, missing_debug_implementations))]
5#![cfg_attr(feature = "clippy", feature(plugin))]
6#![cfg_attr(feature = "clippy", plugin(clippy))]
7#![cfg_attr(feature = "clippy", allow(doc_markdown))]
8#![doc(html_root_url = "https://docs.rs/websocket-transport/0.1.0")]
9
10#[macro_use]
11extern crate futures;
12extern crate websocket;
13
14use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};
15use std::fmt;
16use std::str::Utf8Error;
17use websocket::message::OwnedMessage;
18
19pub struct WsTransport<T> {
38 inner: T,
39 sending: Option<OwnedMessage>,
40 flushing: bool,
41}
42
43impl<T> WsTransport<T> {
44 pub fn new(inner: T) -> Self {
54 WsTransport {
55 inner: inner,
56 sending: None,
57 flushing: false,
58 }
59 }
60}
61
62impl<T> fmt::Debug for WsTransport<T>
63where
64 T: fmt::Debug,
65{
66 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67 f.debug_struct("WsTransport")
68 .field("ws", &Omitted)
69 .field("sending", &self.sending)
70 .field("flushing", &self.flushing)
71 .finish()
72 }
73}
74
75impl<T> Stream for WsTransport<T>
76where
77 T: Stream<Item = OwnedMessage>,
78 T: Sink<SinkItem = OwnedMessage, SinkError = <T as Stream>::Error>,
79 T::Error: From<Utf8Error>,
80{
81 type Item = String;
82 type Error = T::Error;
83
84 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
85 loop {
86 if let Some(msg) = self.sending.take() {
87 if let AsyncSink::NotReady(msg) = self.inner.start_send(msg)? {
88 self.sending = Some(msg);
89 return Ok(Async::NotReady);
90 }
91 self.flushing = true;
92 }
93
94 if self.flushing {
95 try_ready!(self.inner.poll_complete());
96 self.flushing = false;
97 }
98
99 let item = try_ready!(self.inner.poll());
100 match item {
101 None => return Ok(None.into()),
102 Some(OwnedMessage::Text(text)) => {
103 return Ok(Some(text).into());
104 }
105 Some(OwnedMessage::Binary(bytes)) => {
106 let text = String::from_utf8(bytes).map_err(|err| err.utf8_error().into())?;
107 return Ok(Some(text).into());
108 }
109 Some(OwnedMessage::Ping(data)) => {
110 self.sending = Some(OwnedMessage::Pong(data));
111 }
112 Some(OwnedMessage::Close(_)) | Some(OwnedMessage::Pong(_)) => (),
113 }
114 }
115 }
116}
117
118impl<T> Sink for WsTransport<T>
119where
120 T: Sink<SinkItem = OwnedMessage>,
121{
122 type SinkItem = String;
123 type SinkError = T::SinkError;
124
125 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
126 Ok(match self.inner.start_send(OwnedMessage::Text(item))? {
127 AsyncSink::Ready => AsyncSink::Ready,
128 AsyncSink::NotReady(msg) => AsyncSink::NotReady(match msg {
129 OwnedMessage::Text(item) => item,
130 _ => unreachable!("websocket-transport: inner Sink broke its contract"),
131 }),
132 })
133 }
134
135 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
136 self.inner.poll_complete()
137 }
138
139 fn close(&mut self) -> Poll<(), Self::SinkError> {
140 self.inner.close()
141 }
142}
143
144struct Omitted;
145
146impl fmt::Debug for Omitted {
147 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
148 write!(f, "...")
149 }
150}