1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
mod config;
mod stream;

use std::io;
use std::net::ToSocketAddrs;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::channel::mpsc::{self, UnboundedSender};
use futures::future::{self, Either, Future, FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{Stream, StreamExt};
use native_tls::TlsConnector;
use tokio::net::TcpStream;
use tokio_tls::TlsConnector as TokioTlsConnector;
use tokio_util::codec::{Decoder, LinesCodecError};

use crate::client::stream::ClientStream;
use crate::proto::{Command, IrcCodec, IrcError, Message};

pub use self::config::Config;

/// An error that could arise from running the client
#[derive(Debug, Error)]
pub enum ClientError {
    /// IO error
    #[error("io error: {0}")]
    Io(#[from] io::Error),

    /// Tls error
    #[error("tls error: {0}")]
    Tls(#[from] native_tls::Error),

    /// Protocol error
    #[error("protocol error: {0}")]
    Proto(#[from] IrcError),

    /// Mpsc send error
    #[error("mpsc error: {0}")]
    Send(#[from] mpsc::SendError),

    /// Line codec error
    #[error("line codec error: {0}")]
    LinesCodec(#[from] LinesCodecError),
}

type Result<T> = std::result::Result<T, ClientError>;

/// An async IRC client
pub struct Client {
    config: Config,
    stream: Pin<Box<dyn Stream<Item = Result<Message>>>>,
    tx: UnboundedSender<Message>,
}

pub type ClientFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;

impl Client {
    /// Create a new client with the specified config
    pub async fn with_config(config: Config) -> Result<(Self, ClientFuture)> {
        let mut addrs = (config.host.as_ref(), config.port).to_socket_addrs()?;
        let stream = TcpStream::connect(addrs.next().unwrap()).await?;

        let stream = if config.ssl {
            let connector: TokioTlsConnector = TlsConnector::new().unwrap().into();
            let stream = connector.connect(&config.host, stream).await?;
            ClientStream::Tls(stream)
        } else {
            ClientStream::Plain(stream)
        };

        let stream = IrcCodec::default().framed(stream);
        let (sink, stream) = stream.split();
        let (tx, filter_rx) = mpsc::unbounded();
        let filter_tx = tx.clone();

        let stream = stream.filter_map(move |message| {
            if let Ok(Message {
                command: Command::PING(code, _),
                ..
            }) = message
            {
                let mut filter_tx = filter_tx.clone();
                Either::Left(async move {
                    match filter_tx
                        .send(Message {
                            tags: None,
                            prefix: None,
                            command: Command::PONG(code, None),
                        })
                        .await
                    {
                        Ok(_) => None,
                        Err(err) => Some(Err(ClientError::from(err))),
                    }
                })
            } else {
                Either::Right(future::ready(Some(message.map_err(ClientError::from))))
            }
        });

        let fut = filter_rx
            .map(Ok)
            .forward(sink)
            .map_err(ClientError::from)
            .boxed();

        let client = Client {
            config,
            stream: stream.boxed(),
            tx,
        };
        Ok((client, fut))
    }

    /// Send the client registration information to the server
    pub async fn register(&mut self) -> Result<()> {
        self.send(Message {
            tags: None,
            prefix: None,
            command: Command::NICK(self.config.nick.clone()),
        })
        .await?;
        self.send(Message {
            tags: None,
            prefix: None,
            command: Command::USER(
                self.config.nick.clone(),
                self.config.nick.clone(),
                self.config.nick.clone(),
            ),
        })
        .await
    }

    /// Send a Message to the server
    pub async fn send(&mut self, message: Message) -> Result<()> {
        self.tx.send(message).await?;
        self.tx.flush().await?;
        Ok(())
    }
}

impl Stream for Client {
    type Item = Result<Message>;

    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
        Stream::poll_next(Pin::new(&mut self.get_mut().stream), context)
    }
}