parity_ws/
communication.rs

1use std::borrow::Cow;
2use std::convert::Into;
3
4use mio;
5use mio::Token;
6use mio_extras::timer::Timeout;
7use url;
8
9use io::ALL;
10use message;
11use protocol::CloseCode;
12use result::{Error, Result};
13use std::cmp::PartialEq;
14use std::hash::{Hash, Hasher};
15use std::fmt;
16
17#[derive(Debug, Clone)]
18pub enum Signal {
19    Message(message::Message),
20    Close(CloseCode, Cow<'static, str>),
21    Ping(Vec<u8>),
22    Pong(Vec<u8>),
23    Connect(url::Url),
24    Shutdown,
25    Timeout { delay: u64, token: Token },
26    Cancel(Timeout),
27}
28
29#[derive(Debug, Clone)]
30pub struct Command {
31    token: Token,
32    signal: Signal,
33    connection_id: u32,
34}
35
36impl Command {
37    pub fn token(&self) -> Token {
38        self.token
39    }
40
41    pub fn into_signal(self) -> Signal {
42        self.signal
43    }
44
45    pub fn connection_id(&self) -> u32 {
46        self.connection_id
47    }
48}
49
50/// A representation of the output of the WebSocket connection. Use this to send messages to the
51/// other endpoint.
52#[derive(Clone)]
53pub struct Sender {
54    token: Token,
55    channel: mio::channel::SyncSender<Command>,
56    connection_id: u32,
57}
58
59impl fmt::Debug for Sender {
60    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61        write!(f,
62            "Sender {{ token: {:?}, channel: mio::channel::SyncSender<Command>, connection_id: {:?} }}",
63            self.token, self.connection_id)
64    }
65}
66
67impl PartialEq for Sender {
68    fn eq(&self, other: &Sender) -> bool {
69        self.token == other.token && self.connection_id == other.connection_id
70    }
71}
72
73impl Eq for Sender { }
74
75impl Hash for Sender {
76    fn hash<H: Hasher>(&self, state: &mut H) {
77        self.connection_id.hash(state);
78        self.token.hash(state);
79    }
80}
81
82
83impl Sender {
84    #[doc(hidden)]
85    #[inline]
86    pub fn new(
87        token: Token,
88        channel: mio::channel::SyncSender<Command>,
89        connection_id: u32,
90    ) -> Sender {
91        Sender {
92            token,
93            channel,
94            connection_id,
95        }
96    }
97
98    /// A Token identifying this sender within the WebSocket.
99    #[inline]
100    pub fn token(&self) -> Token {
101        self.token
102    }
103
104    /// A connection_id identifying this sender within the WebSocket.
105    #[inline]
106    pub fn connection_id(&self) -> u32 {
107        self.connection_id
108    }
109
110    /// Send a message over the connection.
111    #[inline]
112    pub fn send<M>(&self, msg: M) -> Result<()>
113    where
114        M: Into<message::Message>,
115    {
116        self.channel
117            .send(Command {
118                token: self.token,
119                signal: Signal::Message(msg.into()),
120                connection_id: self.connection_id,
121            })
122            .map_err(Error::from)
123    }
124
125    /// Send a message to the endpoints of all connections.
126    ///
127    /// Be careful with this method. It does not discriminate between client and server connections.
128    /// If your WebSocket is only functioning as a server, then usage is simple, this method will
129    /// send a copy of the message to each connected client. However, if you have a WebSocket that
130    /// is listening for connections and is also connected to another WebSocket, this method will
131    /// broadcast a copy of the message to all the clients connected and to that WebSocket server.
132    #[inline]
133    pub fn broadcast<M>(&self, msg: M) -> Result<()>
134    where
135        M: Into<message::Message>,
136    {
137        self.channel
138            .send(Command {
139                token: ALL,
140                signal: Signal::Message(msg.into()),
141                connection_id: self.connection_id,
142            })
143            .map_err(Error::from)
144    }
145
146    /// Send a close code to the other endpoint.
147    #[inline]
148    pub fn close(&self, code: CloseCode) -> Result<()> {
149        self.channel
150            .send(Command {
151                token: self.token,
152                signal: Signal::Close(code, "".into()),
153                connection_id: self.connection_id,
154            })
155            .map_err(Error::from)
156    }
157
158    /// Send a close code and provide a descriptive reason for closing.
159    #[inline]
160    pub fn close_with_reason<S>(&self, code: CloseCode, reason: S) -> Result<()>
161    where
162        S: Into<Cow<'static, str>>,
163    {
164        self.channel
165            .send(Command {
166                token: self.token,
167                signal: Signal::Close(code, reason.into()),
168                connection_id: self.connection_id,
169            })
170            .map_err(Error::from)
171    }
172
173    /// Send a ping to the other endpoint with the given test data.
174    #[inline]
175    pub fn ping(&self, data: Vec<u8>) -> Result<()> {
176        self.channel
177            .send(Command {
178                token: self.token,
179                signal: Signal::Ping(data),
180                connection_id: self.connection_id,
181            })
182            .map_err(Error::from)
183    }
184
185    /// Send a pong to the other endpoint responding with the given test data.
186    #[inline]
187    pub fn pong(&self, data: Vec<u8>) -> Result<()> {
188        self.channel
189            .send(Command {
190                token: self.token,
191                signal: Signal::Pong(data),
192                connection_id: self.connection_id,
193            })
194            .map_err(Error::from)
195    }
196
197    /// Queue a new connection on this WebSocket to the specified URL.
198    #[inline]
199    pub fn connect(&self, url: url::Url) -> Result<()> {
200        self.channel
201            .send(Command {
202                token: self.token,
203                signal: Signal::Connect(url),
204                connection_id: self.connection_id,
205            })
206            .map_err(Error::from)
207    }
208
209    /// Request that all connections terminate and that the WebSocket stop running.
210    #[inline]
211    pub fn shutdown(&self) -> Result<()> {
212        self.channel
213            .send(Command {
214                token: self.token,
215                signal: Signal::Shutdown,
216                connection_id: self.connection_id,
217            })
218            .map_err(Error::from)
219    }
220
221    /// Schedule a `token` to be sent to the WebSocket Handler's `on_timeout` method
222    /// after `ms` milliseconds
223    #[inline]
224    pub fn timeout(&self, ms: u64, token: Token) -> Result<()> {
225        self.channel
226            .send(Command {
227                token: self.token,
228                signal: Signal::Timeout { delay: ms, token },
229                connection_id: self.connection_id,
230            })
231            .map_err(Error::from)
232    }
233
234    /// Queue the cancellation of a previously scheduled timeout.
235    ///
236    /// This method is not guaranteed to prevent the timeout from occurring, because it is
237    /// possible to call this method after a timeout has already occurred. It is still necessary to
238    /// handle spurious timeouts.
239    #[inline]
240    pub fn cancel(&self, timeout: Timeout) -> Result<()> {
241        self.channel
242            .send(Command {
243                token: self.token,
244                signal: Signal::Cancel(timeout),
245                connection_id: self.connection_id,
246            })
247            .map_err(Error::from)
248    }
249}