1pub mod conn;
2
3use std::io::Error as IoError;
4use std::pin::Pin;
5
6use futures::stream::Stream;
7use futures::sink::Sink;
8use futures::task::{Context, Poll};
9
10use super::proto::{
11 transport::{MessageTransport, MessageTransportError},
12 message::Message
13};
14use conn::Connection;
15
16pub struct Client {
21 conn: MessageTransport<Connection>,
22}
23
24impl Stream for Client {
25 type Item = Result<Message, MessageTransportError>;
26
27 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
28 Pin::new(&mut self.conn).poll_next(cx)
29 }
30}
31
32impl Sink<Message> for Client {
33 type Error = MessageTransportError;
34
35 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
36 Pin::new(&mut self.conn).poll_ready(cx)
37 }
38
39 fn start_send(mut self: Pin<&mut Self>, msg: Message) -> Result<(), Self::Error> {
40 Pin::new(&mut self.conn).start_send(msg)
41 }
42
43 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
44 Pin::new(&mut self.conn).poll_flush(cx)
45 }
46
47 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
48 Pin::new(&mut self.conn).poll_close(cx)
49 }
50}
51
52impl Client {
53 pub async fn new<S: AsRef<str> + ?Sized>(dest: &S) -> Result<Client, IoError> {
59 Ok(Client {
60 conn: MessageTransport::new(Connection::unsecure(dest).await?),
61 })
62 }
63
64 #[cfg(feature = "tls")]
70 pub async fn new_secure<S: AsRef<str> + ?Sized>(dest: &S) -> Result<Client, IoError> {
71 Ok(Client {
72 conn: MessageTransport::new(Connection::secure(dest).await?),
73 })
74 }
75
76 #[deprecated(since="0.2.0", note="use the `Stream`/`Sink` API instead")]
78 #[allow(deprecated)]
79 pub async fn read_message(&mut self) -> Result<Message, MessageTransportError> {
80 self.conn.read_message().await
81 }
82
83 #[deprecated(since="0.2.0", note="use the `Stream`/`Sink` API instead")]
85 #[allow(deprecated)]
86 pub async fn write_message(&mut self, m: Message) -> Result<(), MessageTransportError> {
87 self.conn.write_message(m).await
88 }
89}