1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
//! The client module contains all types needed to make a connection
//! to a remote IRC host.

use codec;
use error::{Error, ErrorKind};

use futures::{Future, Sink, Stream, Poll, StartSend, Async};

use pircolate::Message;
use pircolate::message;

use tokio_core::reactor::Handle;
use tokio_core::net::{TcpStream, TcpStreamNew};

use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::codec::Framed;

#[cfg(feature = "tls")]
use tokio_tls::{ConnectAsync, TlsConnectorExt, TlsStream};
#[cfg(feature = "tls")]
use native_tls::TlsConnector;

use std::net::SocketAddr;
use std::time;

const PING_TIMEOUT_IN_SECONDS: u64 = 10 * 60;

/// A light-weight client type for establishing connections to remote servers.
/// This type consumes a given `SocketAddr` and provides several methods for
/// establishing connections to a remote server.  Currently these methods
/// allow for the connection to a server with unencrypted data and TLS
/// encrypted data.
///
/// Each of the connection methods will return a future, that when successfully
/// resolved, will provide a `Stream` that allows for communication with the
/// remote server.
pub struct Client {
    host: SocketAddr,
}

impl Client {
    /// Create a new instance of `Client` that provides the ability to establish
    /// remote server connections with the specified host.
    pub fn new<H: Into<SocketAddr>>(host: H) -> Client {
        Client { host: host.into() }
    }

    /// Returns a future, that when resolved provides an unecrypted `Stream`
    /// that can be used to receive `Message` from the server and send `Message`
    /// to the server.
    ///
    /// The resulting `Stream` can be `split` into a separate `Stream` for
    /// receiving `Message` from the server and a `Sink` for sending `Message`
    /// to the server.
    pub fn connect(&self, handle: &Handle) -> ClientConnectFuture {
        let tcp_stream = TcpStream::connect(&self.host, handle);

        ClientConnectFuture { inner: tcp_stream }
    }

    /// Returns a future, that when resolved provides a TLS encrypted `Stream`
    /// that can be used to receive `Message` from the server and send `Message`
    /// to the server.
    ///
    /// The resulting `Stream` can be `split` into a separate `Stream` for
    /// receiving `Message` from the server and a `Sink` for sending `Message`
    /// to the server.
    ///
    /// `domain` is the domain name of the remote server being connected to.
    /// it is required to validate the security of the connection.
    #[cfg(feature = "tls")]
    pub fn connect_tls<D: Into<String>>(&self,
                                        handle: &Handle,
                                        domain: D)
                                        -> ClientConnectTlsFuture {
        use self::ClientConnectTlsFuture::*;

        let tls_connector = match TlsConnector::builder() {
            Ok(tls_builder) => {
                match tls_builder.build() {
                    Ok(connector) => connector,
                    Err(err) => {
                        return TlsErr(ErrorKind::Tls(err).into());
                    }
                }
            }
            Err(err) => {
                return TlsErr(ErrorKind::Tls(err).into());
            }
        };

        let tcp_stream = TcpStream::connect(&self.host, handle);

        TcpConnecting(tcp_stream, tls_connector, domain.into())
    }
}

/// Represents a future, that when resolved provides an unecrypted `Stream`
/// that can be used to receive `Message` from the server and send `Message`
/// to the server.
pub struct ClientConnectFuture {
    inner: TcpStreamNew,
}

impl Future for ClientConnectFuture {
    type Item = IrcTransport<TcpStream>;
    type Error = Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let framed = try_ready!(self.inner.poll()).framed(codec::IrcCodec);
        let irc_transport = IrcTransport::new(framed);

        Ok(Async::Ready(irc_transport))
    }
}

/// Represents a future, that when resolved provides a TLS encrypted `Stream`
/// that can be used to receive `Message` from the server and send `Message`
/// to the server.
#[cfg(feature = "tls")]
pub enum ClientConnectTlsFuture {
    #[doc(hidden)]
    TlsErr(Error),
    #[doc(hidden)]
    TcpConnecting(TcpStreamNew, TlsConnector, String),
    #[doc(hidden)]
    TlsHandshake(ConnectAsync<TcpStream>),
}

// This future is represented internally as a simple state machine.
// The state machine can either be in error, waiting for a TCP connection to
// fully resolve or error out, or waiting for a TLS handshake to fully resolve
// or error out.  The various error types are all converted to this crate's
// own error representation.
//
// The typical transition is that this future will first resolve an open TCP
// socket, which is then used to establish a TLS connection via a handshake
// to the remote server. If at any point any of these futures fail to resolve
// an error is produced by this future.
//
// Due to the way the the underlying TLS library works, which requires a
// `TlsConnector` to be created, an operation that can possibly fail, this
// future may start in an error state and will immediately resolve with that
// error on the next call to `poll`.
#[cfg(feature = "tls")]
impl Future for ClientConnectTlsFuture {
    type Item = IrcTransport<TlsStream<TcpStream>>;
    type Error = Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        use self::ClientConnectTlsFuture::*;

        let connect_async = match *self {
            TlsErr(ref mut error) => {
                let error = ::std::mem::replace(error, ErrorKind::Unexpected.into());
                return Err(error);
            }

            TlsHandshake(ref mut tls_connect_future) => {
                let framed = try_ready!(tls_connect_future.poll()).framed(codec::IrcCodec);
                let irc_transport = IrcTransport::new(framed);

                return Ok(Async::Ready(irc_transport));
            }

            TcpConnecting(ref mut tcp_connect_future, ref mut tls_connector, ref domain) => {

                let tcp_stream = try_ready!(tcp_connect_future.poll());
                tls_connector.connect_async(&domain, tcp_stream)
            }
        };

        *self = ClientConnectTlsFuture::TlsHandshake(connect_async);

        Ok(Async::NotReady)
    }
}

/// `IrcTransport` represents a framed IRC stream returned from the connection
/// methods when their given futures are resolved. It internally handles the
/// processing of PING requests and timing out the connection when no PINGs
/// have been recently received from the server.
///
/// It is possible to split `IrcTransport` into `Stream` and `Sink` via the
/// the `split` method.
pub struct IrcTransport<T>
    where T: AsyncRead + AsyncWrite
{
    inner: Framed<T, codec::IrcCodec>,
    last_ping: time::Instant,
}

impl<T> IrcTransport<T>
    where T: AsyncRead + AsyncWrite
{
    fn new(inner: Framed<T, codec::IrcCodec>) -> IrcTransport<T> {
        IrcTransport {
            inner: inner,
            last_ping: time::Instant::now(),
        }
    }
}

impl<T> Stream for IrcTransport<T>
    where T: AsyncRead + AsyncWrite
{
    type Item = Message;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.last_ping.elapsed().as_secs() >= PING_TIMEOUT_IN_SECONDS {
            self.close()?;
            return Err(ErrorKind::ConnectionReset.into());
        }

        loop {
            match try_ready!(self.inner.poll()) {
                Some(ref message) if message.raw_command() == "PING" => {
                    self.last_ping = time::Instant::now();

                    if let Some(host) = message.raw_args().next() {
                        let result = self.inner.start_send(message::client::pong(host)?)?;

                        assert!(result.is_ready());

                        self.inner.poll_complete()?;
                    }
                }
                message => return Ok(Async::Ready(message)),
            }
        }
    }
}

impl<T> Sink for IrcTransport<T>
    where T: AsyncRead + AsyncWrite
{
    type SinkItem = Message;
    type SinkError = Error;

    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
        Ok(self.inner.start_send(item)?)
    }

    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        Ok(self.inner.poll_complete()?)
    }
}