1mod config;
2mod stream;
3
4use std::io;
5use std::net::ToSocketAddrs;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use futures::channel::mpsc::{self, UnboundedSender};
10use futures::future::{self, Either, Future, FutureExt, TryFutureExt};
11use futures::sink::SinkExt;
12use futures::stream::{Stream, StreamExt};
13use native_tls::TlsConnector;
14use tokio::net::TcpStream;
15use tokio_tls::TlsConnector as TokioTlsConnector;
16use tokio_util::codec::{Decoder, LinesCodecError};
17
18use crate::client::stream::ClientStream;
19use crate::proto::{Command, IrcCodec, IrcError, Message};
20
21pub use self::config::Config;
22
23#[derive(Debug, Error)]
25pub enum ClientError {
26 #[error("io error: {0}")]
28 Io(#[from] io::Error),
29
30 #[error("tls error: {0}")]
32 Tls(#[from] native_tls::Error),
33
34 #[error("protocol error: {0}")]
36 Proto(#[from] IrcError),
37
38 #[error("mpsc error: {0}")]
40 Send(#[from] mpsc::SendError),
41
42 #[error("line codec error: {0}")]
44 LinesCodec(#[from] LinesCodecError),
45}
46
47type Result<T> = std::result::Result<T, ClientError>;
48
49pub struct Client {
51 config: Config,
52 stream: Pin<Box<dyn Stream<Item = Result<Message>>>>,
53 tx: UnboundedSender<Message>,
54}
55
56pub type ClientFuture = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
57
58impl Client {
59 pub async fn with_config(config: Config) -> Result<(Self, ClientFuture)> {
61 let mut addrs = (config.host.as_ref(), config.port).to_socket_addrs()?;
62 let stream = TcpStream::connect(addrs.next().unwrap()).await?;
63
64 let stream = if config.ssl {
65 let connector: TokioTlsConnector = TlsConnector::new().unwrap().into();
66 let stream = connector.connect(&config.host, stream).await?;
67 ClientStream::Tls(stream)
68 } else {
69 ClientStream::Plain(stream)
70 };
71
72 let stream = IrcCodec::default().framed(stream);
73 let (sink, stream) = stream.split();
74 let (tx, filter_rx) = mpsc::unbounded();
75 let filter_tx = tx.clone();
76
77 let stream = stream.filter_map(move |message| {
78 if let Ok(Message {
79 command: Command::PING(code, _),
80 ..
81 }) = message
82 {
83 let mut filter_tx = filter_tx.clone();
84 Either::Left(async move {
85 match filter_tx
86 .send(Message {
87 tags: None,
88 prefix: None,
89 command: Command::PONG(code, None),
90 })
91 .await
92 {
93 Ok(_) => None,
94 Err(err) => Some(Err(ClientError::from(err))),
95 }
96 })
97 } else {
98 Either::Right(future::ready(Some(message.map_err(ClientError::from))))
99 }
100 });
101
102 let fut = filter_rx
103 .map(Ok)
104 .forward(sink)
105 .map_err(ClientError::from)
106 .boxed();
107
108 let client = Client {
109 config,
110 stream: stream.boxed(),
111 tx,
112 };
113 Ok((client, fut))
114 }
115
116 pub async fn register(&mut self) -> Result<()> {
118 self.send(Message {
119 tags: None,
120 prefix: None,
121 command: Command::NICK(self.config.nick.clone()),
122 })
123 .await?;
124 self.send(Message {
125 tags: None,
126 prefix: None,
127 command: Command::USER(
128 self.config.nick.clone(),
129 self.config.nick.clone(),
130 self.config.nick.clone(),
131 ),
132 })
133 .await
134 }
135
136 pub async fn send(&mut self, message: Message) -> Result<()> {
138 self.tx.send(message).await?;
139 self.tx.flush().await?;
140 Ok(())
141 }
142}
143
144impl Stream for Client {
145 type Item = Result<Message>;
146
147 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
148 Stream::poll_next(Pin::new(&mut self.get_mut().stream), context)
149 }
150}