irc_async/client/
mod.rs

1mod config;
2mod stream;
3
4use std::io;
5use std::net::ToSocketAddrs;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures::channel::mpsc::{self, UnboundedSender};
10use futures::future::{self, Either, Future, FutureExt, TryFutureExt};
11use futures::sink::SinkExt;
12use futures::stream::{Stream, StreamExt};
13use native_tls::TlsConnector;
14use tokio::net::TcpStream;
15use tokio_tls::TlsConnector as TokioTlsConnector;
16use tokio_util::codec::{Decoder, LinesCodecError};
17
18use crate::client::stream::ClientStream;
19use crate::proto::{Command, IrcCodec, IrcError, Message};
20
21pub use self::config::Config;
22
23/// An error that could arise from running the client
24#[derive(Debug, Error)]
25pub enum ClientError {
26    /// IO error
27    #[error("io error: {0}")]
28    Io(#[from] io::Error),
29
30    /// Tls error
31    #[error("tls error: {0}")]
32    Tls(#[from] native_tls::Error),
33
34    /// Protocol error
35    #[error("protocol error: {0}")]
36    Proto(#[from] IrcError),
37
38    /// Mpsc send error
39    #[error("mpsc error: {0}")]
40    Send(#[from] mpsc::SendError),
41
42    /// Line codec error
43    #[error("line codec error: {0}")]
44    LinesCodec(#[from] LinesCodecError),
45}
46
47type Result<T> = std::result::Result<T, ClientError>;
48
49/// An async IRC client
50pub struct Client {
51    config: Config,
52    stream: Pin<Box<dyn Stream<Item = Result<Message>>>>,
53    tx: UnboundedSender<Message>,
54}
55
56pub type ClientFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
57
58impl Client {
59    /// Create a new client with the specified config
60    pub async fn with_config(config: Config) -> Result<(Self, ClientFuture)> {
61        let mut addrs = (config.host.as_ref(), config.port).to_socket_addrs()?;
62        let stream = TcpStream::connect(addrs.next().unwrap()).await?;
63
64        let stream = if config.ssl {
65            let connector: TokioTlsConnector = TlsConnector::new().unwrap().into();
66            let stream = connector.connect(&config.host, stream).await?;
67            ClientStream::Tls(stream)
68        } else {
69            ClientStream::Plain(stream)
70        };
71
72        let stream = IrcCodec::default().framed(stream);
73        let (sink, stream) = stream.split();
74        let (tx, filter_rx) = mpsc::unbounded();
75        let filter_tx = tx.clone();
76
77        let stream = stream.filter_map(move |message| {
78            if let Ok(Message {
79                command: Command::PING(code, _),
80                ..
81            }) = message
82            {
83                let mut filter_tx = filter_tx.clone();
84                Either::Left(async move {
85                    match filter_tx
86                        .send(Message {
87                            tags: None,
88                            prefix: None,
89                            command: Command::PONG(code, None),
90                        })
91                        .await
92                    {
93                        Ok(_) => None,
94                        Err(err) => Some(Err(ClientError::from(err))),
95                    }
96                })
97            } else {
98                Either::Right(future::ready(Some(message.map_err(ClientError::from))))
99            }
100        });
101
102        let fut = filter_rx
103            .map(Ok)
104            .forward(sink)
105            .map_err(ClientError::from)
106            .boxed();
107
108        let client = Client {
109            config,
110            stream: stream.boxed(),
111            tx,
112        };
113        Ok((client, fut))
114    }
115
116    /// Send the client registration information to the server
117    pub async fn register(&mut self) -> Result<()> {
118        self.send(Message {
119            tags: None,
120            prefix: None,
121            command: Command::NICK(self.config.nick.clone()),
122        })
123        .await?;
124        self.send(Message {
125            tags: None,
126            prefix: None,
127            command: Command::USER(
128                self.config.nick.clone(),
129                self.config.nick.clone(),
130                self.config.nick.clone(),
131            ),
132        })
133        .await
134    }
135
136    /// Send a Message to the server
137    pub async fn send(&mut self, message: Message) -> Result<()> {
138        self.tx.send(message).await?;
139        self.tx.flush().await?;
140        Ok(())
141    }
142}
143
144impl Stream for Client {
145    type Item = Result<Message>;
146
147    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
148        Stream::poll_next(Pin::new(&mut self.get_mut().stream), context)
149    }
150}