Skip to main content

twitch_irc/client/
mod.rs

1//! The chat client and its accompanying types.
2
3pub(crate) mod event_loop;
4mod pool_connection;
5
6use crate::client::event_loop::{ClientLoopCommand, ClientLoopWorker};
7use crate::config::ClientConfig;
8use crate::error::Error;
9use crate::login::LoginCredentials;
10use crate::message::IRCTags;
11use crate::message::commands::ServerMessage;
12use crate::message::{IRCMessage, ReplyToMessage};
13#[cfg(feature = "metrics-collection")]
14use crate::metrics::MetricsBundle;
15use crate::transport::Transport;
16use crate::validate::validate_login;
17use crate::{irc, validate};
18use std::collections::HashSet;
19use std::sync::Arc;
20use tokio::sync::{mpsc, oneshot};
21
22/// A send-only handle to control the Twitch IRC Client.
23#[derive(Debug)]
24pub struct TwitchIRCClient<T: Transport, L: LoginCredentials> {
25    // we use an Arc<>.
26    // the client loop has to also hold a handle to this sender to be able to feed itself
27    // with commands as well. (e.g. to rejoin channels)
28    // the client loop gets a Weak<> (a weak reference) and this client holds strong
29    // references. That means when the last client handle is dropped, the client loop
30    // exits, because the underlying mpsc::UnboundedSender will be dropped.
31    // The client will then also no longer be able to send "itself" messages, because
32    // it always only holds a Weak<> and has to check whether the weak reference is still
33    // valid before sending itself messages.
34    client_loop_tx: Arc<mpsc::UnboundedSender<ClientLoopCommand<T, L>>>,
35}
36
37// we have to implement Debug and Clone manually, the derive macro places
38// the requirement `T: Clone` which we cannot currently satisfy and don't need
39impl<T: Transport, L: LoginCredentials> Clone for TwitchIRCClient<T, L> {
40    fn clone(&self) -> Self {
41        TwitchIRCClient {
42            client_loop_tx: self.client_loop_tx.clone(),
43        }
44    }
45}
46
47impl<T: Transport, L: LoginCredentials> TwitchIRCClient<T, L> {
48    /// Create a new client from the given configuration.
49    ///
50    /// Note this method is not side-effect-free - a background task will be spawned
51    /// as a result of calling this function.
52    pub fn new(
53        config: ClientConfig<L>,
54    ) -> (
55        mpsc::UnboundedReceiver<ServerMessage>,
56        TwitchIRCClient<T, L>,
57    ) {
58        let config = Arc::new(config);
59        let (client_loop_tx, client_loop_rx) = mpsc::unbounded_channel();
60        let client_loop_tx = Arc::new(client_loop_tx);
61        let (client_incoming_messages_tx, client_incoming_messages_rx) = mpsc::unbounded_channel();
62
63        #[cfg(feature = "metrics-collection")]
64        let metrics = MetricsBundle::new(&config.metrics_config);
65
66        ClientLoopWorker::spawn(
67            config,
68            // the worker gets only a weak reference
69            Arc::downgrade(&client_loop_tx),
70            client_loop_rx,
71            client_incoming_messages_tx,
72            #[cfg(feature = "metrics-collection")]
73            metrics,
74        );
75
76        (
77            client_incoming_messages_rx,
78            TwitchIRCClient { client_loop_tx },
79        )
80    }
81}
82
83impl<T: Transport, L: LoginCredentials> TwitchIRCClient<T, L> {
84    /// Connect to Twitch IRC without joining any channels.
85    ///
86    /// **You typically do not need to call this method.** This is only provided for the rare
87    /// case that one would only want to receive incoming whispers without joining channels
88    /// or ever sending messages out. If your application joins channels during startup,
89    /// calling `.connect()` is superfluous, as the client will automatically open the necessary
90    /// connections when you join channels or send messages.
91    pub async fn connect(&self) {
92        let (return_tx, return_rx) = oneshot::channel();
93        self.client_loop_tx
94            .send(ClientLoopCommand::Connect {
95                return_sender: return_tx,
96            })
97            .unwrap();
98        // unwrap: ClientLoopWorker should not die before all sender handles have been dropped
99        return_rx.await.unwrap();
100    }
101
102    /// Send an arbitrary IRC message to one of the connections in the connection pool.
103    ///
104    /// An error is returned in case the message could not be sent over the picked connection.
105    pub async fn send_message(&self, message: IRCMessage) -> Result<(), Error<T, L>> {
106        let (return_tx, return_rx) = oneshot::channel();
107        self.client_loop_tx
108            .send(ClientLoopCommand::SendMessage {
109                message,
110                return_sender: return_tx,
111            })
112            .unwrap();
113        // unwrap: ClientLoopWorker should not die before all sender handles have been dropped
114        return_rx.await.unwrap()
115    }
116
117    /// Send a `PRIVMSG`-type IRC message to a Twitch channel. The `message` can be a normal
118    /// chat message or a chat command like `/ban` or similar. [Note however that the usage
119    /// of chat commands via IRC is deprecated and scheduled to be removed by
120    /// Twitch for 2023-02-18.](https://discuss.dev.twitch.tv/t/deprecation-of-chat-commands-through-irc/40486)
121    ///
122    /// If you want to just send a normal chat message, `say()` should be preferred since it
123    /// prevents commands like `/ban` from accidentally being executed.
124    pub async fn privmsg(&self, channel_login: String, message: String) -> Result<(), Error<T, L>> {
125        self.send_message(irc!["PRIVMSG", format!("#{}", channel_login), message])
126            .await
127    }
128
129    /// Say a chat message in the given Twitch channel.
130    ///
131    /// This method automatically prevents commands from being executed. For example
132    /// `say("a_channel", "/ban a_user")` would not actually ban a user, instead it would
133    /// send that exact message as a normal chat message instead.
134    ///
135    /// No particular filtering is performed on the message. If the message is too long for chat,
136    /// it will not be cut short or split into multiple messages (what happens is determined
137    /// by the behaviour of the Twitch IRC server).
138    pub async fn say(&self, channel_login: String, message: String) -> Result<(), Error<T, L>> {
139        self.privmsg(channel_login, format!(". {message}")).await
140    }
141
142    /// Say a `/me` chat message in the given Twitch channel. These messages are usually
143    /// shown in Twitch chat in italics or in the bot's name color, and without the colon
144    /// normally separating name and message, e.g.:
145    ///
146    /// ```no_run
147    /// # use twitch_irc::{SecureTCPTransport, TwitchIRCClient};
148    /// # use twitch_irc::login::StaticLoginCredentials;
149    /// # let client: TwitchIRCClient<SecureTCPTransport, StaticLoginCredentials> = todo!();
150    /// client.say("sodapoppin".to_owned(), "Hey guys!".to_owned());
151    /// // Displayed as: A_Cool_New_Bot: Hey guys!
152    /// client.me("sodapoppin".to_owned(), "is now leaving to grab a drink.".to_owned());
153    /// // Displayed as: *A_Cool_New_Bot is now leaving to grab a drink.*
154    /// ```
155    ///
156    /// This method automatically prevents commands from being executed. For example
157    /// `me("a_channel", "/ban a_user")` would not actually ban a user, instead it would
158    /// send that exact message as a normal chat message instead.
159    ///
160    /// No particular filtering is performed on the message. If the message is too long for chat,
161    /// it will not be cut short or split into multiple messages (what happens is determined
162    /// by the behaviour of the Twitch IRC server).
163    pub async fn me(&self, channel_login: String, message: String) -> Result<(), Error<T, L>> {
164        self.privmsg(channel_login, format!("/me {message}")).await
165    }
166
167    /// Reply to a given message. The sent message is tagged to be in reply of the
168    /// specified message, using that message's unique ID. The message is of course also
169    /// sent to same channel as the message that we are replying to.
170    ///
171    /// This method automatically prevents commands from being executed. For example
172    /// `say_in_reply_to(a_message, "/ban a_user")` would not actually ban a user,
173    /// instead it would send that exact message as a normal chat message instead.
174    ///
175    /// No particular filtering is performed on the message. If the message is too long for chat,
176    /// it will not be cut short or split into multiple messages (what happens is determined
177    /// by the behaviour of the Twitch IRC server).
178    ///
179    /// The given parameter can be anything that implements [`ReplyToMessage`], which can
180    /// be one of the following:
181    ///
182    /// * a [`&PrivmsgMessage`](crate::message::PrivmsgMessage)
183    /// * a tuple `(&str, &str)` or `(String, String)`, where the first member is the login name
184    ///   of the channel the message was sent to, and the second member is the ID of the message
185    ///   to reply to.
186    ///
187    /// Note that even though [`UserNoticeMessage`](crate::message::UserNoticeMessage) has a
188    /// `message_id`, you can NOT reply to these messages or delete them. For this reason,
189    /// [`ReplyToMessage`] is not implemented for
190    /// [`UserNoticeMessage`](crate::message::UserNoticeMessage).
191    pub async fn say_in_reply_to(
192        &self,
193        reply_to: &impl ReplyToMessage,
194        message: String,
195    ) -> Result<(), Error<T, L>> {
196        self.say_or_me_in_reply_to(reply_to, message, false).await
197    }
198
199    /// Reply to a given message with a `/me` message. The sent message is tagged to be in reply of
200    /// the specified message, using that message's unique ID. The message is of course also
201    /// sent to same channel as the message that we are replying to.
202    ///
203    /// See the documentation on the [`me()`](TwitchIRCClient::me) method for more details about
204    /// what `/me` messages are.
205    ///
206    /// This method automatically prevents commands from being executed. For example
207    /// `me_in_reply_to(a_message, "/ban a_user")` would not actually ban a user,
208    /// instead it would send that exact message as a normal chat message instead.
209    ///
210    /// No particular filtering is performed on the message. If the message is too long for chat,
211    /// it will not be cut short or split into multiple messages (what happens is determined
212    /// by the behaviour of the Twitch IRC server).
213    ///
214    /// The given parameter can be anything that implements [`ReplyToMessage`], which can
215    /// be one of the following:
216    ///
217    /// * a [`&PrivmsgMessage`](crate::message::PrivmsgMessage)
218    /// * a tuple `(&str, &str)` or `(String, String)`, where the first member is the login name
219    ///   of the channel the message was sent to, and the second member is the ID of the message
220    ///   to reply to.
221    ///
222    /// Note that even though [`UserNoticeMessage`](crate::message::UserNoticeMessage) has a
223    /// `message_id`, you can NOT reply to these messages or delete them. For this reason,
224    /// [`ReplyToMessage`] is not implemented for
225    /// [`UserNoticeMessage`](crate::message::UserNoticeMessage).
226    pub async fn me_in_reply_to(
227        &self,
228        reply_to: &impl ReplyToMessage,
229        message: String,
230    ) -> Result<(), Error<T, L>> {
231        self.say_or_me_in_reply_to(reply_to, message, true).await
232    }
233
234    async fn say_or_me_in_reply_to(
235        &self,
236        reply_to: &impl ReplyToMessage,
237        message: String,
238        me: bool,
239    ) -> Result<(), Error<T, L>> {
240        let mut tags = IRCTags::new();
241        tags.0.insert(
242            "reply-parent-msg-id".to_owned(),
243            reply_to.message_id().to_owned(),
244        );
245
246        let irc_message = IRCMessage::new(
247            tags,
248            None,
249            "PRIVMSG".to_owned(),
250            vec![
251                format!("#{}", reply_to.channel_login()),
252                format!("{} {}", if me { "/me" } else { "." }, message),
253            ], // The prefixed "." prevents commands from being executed if not in /me-mode
254        );
255        self.send_message(irc_message).await
256    }
257
258    /// Join the given Twitch channel (When a channel is joined, the client will receive messages
259    /// sent to it).
260    ///
261    /// The client will internally ensure that there has always been at least _an attempt_ to join
262    /// this channel. However this does not necessarily mean the join is always successful.
263    ///
264    /// If the given `channel_login` does not exist (or is suspended) then the IRC server
265    /// will ignore the `JOIN` and you will not be joined to the given channel (what channel would
266    /// you even expect to join if the channel does not exist?).
267    ///
268    /// However, the client listens for a server-side confirmation to this `JOIN` command.
269    /// If the server confirms that the `JOIN` was successful, then the client saves this information.
270    /// This information can be queried using `get_channel_status()`.
271    ///
272    /// If you later issue another `join()` call, and the server previously confirmed the successful
273    /// joining of `channel_login`, then no message will be sent out.
274    ///
275    /// However if the server *did not* confirm the successful `JOIN` command previously, then the
276    /// `JOIN` is attempted again.
277    ///
278    /// You can use this mechanism to e.g. periodically re-try `JOIN`ing a given channel if
279    /// joining to freshly created channels or freshly renamed channels is a concern in your application.
280    ///
281    /// Another note on Twitch behaviour: If a channel gets suspended, the `JOIN` membership stays
282    /// active as long as the connection with that `JOIN` membership stays active. For this reason,
283    /// there is no special logic or handling required for when a channel gets suspended.
284    /// (The `JOIN` membership in that channel will continue to count as confirmed for as long
285    /// as the connection stays alive. If the connection fails, the "confirmed" status for that
286    /// channel is reset, and the client will automatically attempt to re-join that channel on a
287    /// different or new connection.
288    /// Unless an answer is again received by the server, the `join()` will then make attempts again
289    /// to join that channel.
290    ///
291    /// Returns a [`validate::Error`] if the passed `channel_login` is of
292    /// [invalid format](crate::validate::validate_login). Returns `Ok(())` otherwise.
293    pub fn join(&self, channel_login: String) -> Result<(), validate::Error> {
294        validate_login(&channel_login)?;
295
296        self.client_loop_tx
297            .send(ClientLoopCommand::Join { channel_login })
298            .unwrap();
299
300        Ok(())
301    }
302
303    /// Instruct the client to only be connected to these channels. Channels currently joined
304    /// but not in the given set are parted, and channels in the set that are not currently
305    /// joined are joined.
306    ///
307    /// For further semantics about join and parts, see the documentation for [`TwitchIRCClient::join`].
308    ///
309    /// Returns a [`validate::Error`] if the passed `channel_login` is of
310    /// [invalid format](crate::validate::validate_login). Returns `Ok(())` otherwise.
311    pub fn set_wanted_channels(&self, channels: HashSet<String>) -> Result<(), validate::Error> {
312        for channel_login in &channels {
313            validate_login(channel_login)?;
314        }
315
316        self.client_loop_tx
317            .send(ClientLoopCommand::SetWantedChannels { channels })
318            .unwrap();
319
320        Ok(())
321    }
322
323    /// Query the client for what status a certain channel is in.
324    ///
325    /// Returns two booleans: The first indicates whether a channel is `wanted`. This is true
326    /// if the last operation for this channel was a `join()` method, or alternatively whether
327    /// it was included in the set of channels in a `set_wanted_channels` call.
328    ///
329    /// The second boolean indicates whether this channel is currently joined server-side.
330    /// (This is purely based on `JOIN` and `PART` messages being received from the server).
331    ///
332    /// Note that any combination of `true` and `false` is possible here.
333    ///
334    /// For example, `(true, false)` could indicate that the `JOIN` message to join this channel is currently
335    /// being sent or already sent, but no response confirming the `JOIN` has been received yet.
336    /// **Note this status can also mean that the server did not answer the `JOIN` request because
337    /// the channel did not exist/was suspended or similar conditions.**
338    ///
339    /// `(false, true)` might on the other hand (similarly) that a `PART` message is sent but not
340    /// answered yet by the server.
341    ///
342    /// `(true, true)` confirms that the channel is currently successfully joined in a normal fashion.
343    ///
344    /// `(false, false)` is returned for a channel that has not been joined previously at all
345    /// or where a previous `PART` command has completed.
346    pub async fn get_channel_status(&self, channel_login: String) -> (bool, bool) {
347        // channel_login format sanity check not really needed here, the code will deal with arbitrary strings just fine
348
349        let (return_tx, return_rx) = oneshot::channel();
350        self.client_loop_tx
351            .send(ClientLoopCommand::GetChannelStatus {
352                channel_login,
353                return_sender: return_tx,
354            })
355            .unwrap();
356        // unwrap: ClientLoopWorker should not die before all sender handles have been dropped
357        return_rx.await.unwrap()
358    }
359
360    /// Part (leave) a channel, to stop receiving messages sent to that channel.
361    ///
362    /// This has the same semantics as `join()`. Similarly, a `part()` call will have no effect
363    /// if the channel is not currently joined.
364    pub fn part(&self, channel_login: String) {
365        // channel_login format sanity check not really needed here, the code will deal with arbitrary strings just fine
366
367        self.client_loop_tx
368            .send(ClientLoopCommand::Part { channel_login })
369            .unwrap();
370    }
371
372    /// Ping a random connection. This does not await the `PONG` response from Twitch.
373    /// The future resolves once the `PING` command is sent to the wire.
374    /// An error is returned in case the message could not be sent over the picked connection.
375    pub async fn ping(&self) -> Result<(), Error<T, L>> {
376        let (return_tx, return_rx) = oneshot::channel();
377        self.client_loop_tx
378            .send(ClientLoopCommand::Ping {
379                return_sender: return_tx,
380            })
381            .unwrap();
382        // unwrap: ClientLoopWorker should not die before all sender handles have been dropped
383        return_rx.await.unwrap()
384    }
385}