Skip to main content

sergeant/twitch/
pubsub.rs

1use std::io::ErrorKind;
2use std::process::{self, Command};
3use std::sync::mpsc::Sender;
4use std::sync::Arc;
5use std::time::{self, Duration};
6use std::vec;
7use std::{error::Error, fs::OpenOptions, io::Write};
8
9use ratatui::buffer::Buffer;
10use ratatui::prelude::*;
11use ratatui::widgets::{Block, Widget};
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tungstenite::connect;
15use tungstenite::Error::{AlreadyClosed, ConnectionClosed, Io};
16use tungstenite::Message::{self, Close, Ping, Text};
17
18use crate::commands::{get_action, get_reward};
19use crate::tui::{MessageParts, Symbol};
20use crate::utils::get_data_directory;
21
22use super::api::{get_user, get_user_profile, User};
23use super::parse::{
24    get_lines, get_message_symbols, get_screen_lines, write_to_buffer, RedeemMessage, RenderCursor, TwitchMessage,
25};
26use super::ChannelMessages;
27
28#[derive(Deserialize, Serialize, Debug)]
29pub struct SocketMessage {
30    r#type: String,
31    data: SocketMessageData,
32}
33
34#[derive(Deserialize, Serialize, Debug)]
35pub struct SocketMessageData {
36    topic: String,
37    message: String,
38}
39
40#[derive(Clone, Deserialize, Serialize, Debug)]
41pub struct MessageData {
42    pub data: SubMessage,
43}
44
45#[derive(Clone, Deserialize, Serialize, Debug)]
46#[serde(untagged)]
47pub enum SubMessage {
48    Points(Box<ChannelPointsData>),
49    Sub(SubscribeEvent),
50    Bits(BitsEvent),
51    // Bits {},
52    // BitsUnlocks {},
53}
54
55#[derive(Clone, Deserialize, Serialize, Debug)]
56pub struct BitsEvent {
57    #[serde(skip)]
58    pub area: Option<Rect>,
59    pub is_anonymous: bool,
60    pub message_type: String,
61    pub data: BitsEventData,
62}
63
64impl Widget for &mut BitsEvent {
65    fn render(self, area: Rect, buf: &mut Buffer) {
66        let bits_from = if self.is_anonymous {
67            "Anonymous"
68        } else {
69            &self.data.user_name
70        };
71
72        let message = format!(
73            "{} has cheered {} bits. They have cheered a total of {} bits in this channel.",
74            bits_from, self.data.bits_used, self.data.total_bits_used
75        );
76
77        let mut cursor = RenderCursor {
78            x: area.left(),
79            y: area.bottom().saturating_sub(1),
80        };
81
82        // Render the bits details in gray
83        let bits_details_message: Vec<Symbol> = get_message_symbols(&message, &mut [], Some((128, 128, 128)));
84
85        // Shrink horizontal area by 4 to make space for border and scroll bar
86        let mut line_area = area;
87        line_area.width = area.width - 4;
88
89        let mut lines: Vec<Vec<MessageParts>> = get_lines(&bits_details_message, &line_area);
90
91        // Get symbols for cheer message in white
92        let message_symbols: Vec<Symbol> = get_message_symbols(&self.data.chat_message, &mut [], Some((255, 255, 255)));
93        // Get lines for cheer message
94        let mut message_lines: Vec<Vec<MessageParts>> = get_lines(&message_symbols, &line_area);
95
96        // Add cheer message lines to the bits details
97        lines.append(&mut message_lines);
98
99        let mut screen_lines = get_screen_lines(&mut lines, &line_area);
100
101        // Move cursor one over to make space for border
102        cursor.x = area.left() + 1;
103        cursor.y = cursor.y.saturating_sub(lines.len() as u16);
104
105        write_to_buffer(&mut screen_lines, buf, &mut cursor);
106
107        cursor.x = 0;
108        cursor.y -= screen_lines.len() as u16;
109
110        let block_area = Rect {
111            x: 0,
112            y: cursor.y.saturating_sub(1),
113            width: area.width.saturating_sub(2),
114            height: screen_lines.len() as u16 + 2,
115        };
116
117        // Purple border
118        Block::bordered()
119            .border_set(symbols::border::ROUNDED)
120            .border_style(Style::reset().fg(Color::Rgb(138, 43, 226)))
121            .title("♦️ Cheer!")
122            .render(block_area, buf);
123
124        self.area = Some(Rect {
125            x: 0,
126            y: cursor.y,
127            width: area.width,
128            height: screen_lines.len() as u16,
129        });
130    }
131}
132
133#[derive(Clone, Deserialize, Serialize, Debug)]
134pub struct BitsEventData {
135    pub user_name: String,
136    pub chat_message: String,
137    pub bits_used: u64,
138    pub total_bits_used: u64,
139    pub context: String, // cheer
140}
141
142#[derive(Clone, Deserialize, Serialize, Debug)]
143pub struct SubscribeEvent {
144    #[serde(skip)]
145    pub area: Option<Rect>,
146    pub topic: String,
147    pub message: SubscribeMessage,
148}
149
150impl Widget for &mut SubscribeEvent {
151    fn render(self, area: Rect, buf: &mut Buffer) {
152        let message = format!(
153            "{} has subscribed for {} months, currently on a {} month streak.",
154            self.message.display_name,
155            self.message.cumulative_months,
156            self.message.streak_months,
157            // self.message.sub_message
158        );
159
160        let mut cursor = RenderCursor {
161            x: area.left(),
162            y: area.bottom().saturating_sub(1),
163        };
164
165        // Render the subscription details in gray
166        let sub_details_symbols: Vec<Symbol> = get_message_symbols(&message, &mut [], Some((128, 128, 128)));
167
168        // Shrink horizontal area by 4 to make space for border and scroll bar
169        let mut line_area = area;
170        line_area.width = area.width - 4;
171
172        let mut lines: Vec<Vec<MessageParts>> = get_lines(&sub_details_symbols, &line_area);
173
174        // Get symbols for subscription message
175        let message_symbols: Vec<Symbol> =
176            get_message_symbols(&self.message.sub_message, &mut [], Some((255, 255, 255)));
177        // Get lines for subscription message
178        let mut message_lines: Vec<Vec<MessageParts>> = get_lines(&message_symbols, &line_area);
179
180        // Add subscription message lines to the subscription details
181        lines.append(&mut message_lines);
182
183        let mut screen_lines = get_screen_lines(&mut lines, &line_area);
184
185        // Move cursor one over to make space for border
186        cursor.x = area.left() + 1;
187        cursor.y = cursor.y.saturating_sub(lines.len() as u16);
188
189        write_to_buffer(&mut screen_lines, buf, &mut cursor);
190
191        cursor.x = 0;
192        cursor.y -= screen_lines.len() as u16;
193
194        let block_area = Rect {
195            x: 0,
196            y: cursor.y - 1,
197            width: area.width - 2,
198            height: screen_lines.len() as u16 + 2,
199        };
200
201        let (sub_icon, sub_desc) = if self.message.context == "subgift" {
202            ('🎁', "was gifted a sub!")
203        } else if self.message.context == "resub" {
204            ('📅', "has resubbed!")
205        } else {
206            ('🎉', "has subbed!")
207        };
208
209        Block::bordered()
210            .border_set(symbols::border::ROUNDED)
211            .border_style(Style::reset().fg(Color::LightBlue))
212            .title(format!("{}{} {}", sub_icon, self.message.display_name, sub_desc))
213            .render(block_area, buf);
214
215        self.area = Some(Rect {
216            x: 0,
217            y: cursor.y,
218            width: area.width,
219            height: screen_lines.len() as u16,
220        });
221    }
222}
223
224#[derive(Clone, Deserialize, Serialize, Debug)]
225pub struct SubscribeMessage {
226    pub display_name: String,   // some_person
227    pub cumulative_months: u64, // 9
228    pub streak_months: u64,     // 3
229    pub context: String,        // subgift, resub
230    pub sub_message: String,    // A message, possibly with emotes
231}
232
233#[derive(Clone, Deserialize, Serialize, Debug)]
234pub struct ChannelPointsData {
235    pub timestamp: String,
236    pub redemption: Redemption,
237}
238
239#[derive(Clone, Deserialize, Serialize, Debug)]
240pub struct UserReference {
241    pub id: String,
242    pub login: String,
243    pub display_name: String,
244    pub profile_url: Option<String>,
245}
246
247#[derive(Clone, Deserialize, Serialize, Debug)]
248pub struct Redemption {
249    pub id: String,
250    pub user: UserReference,
251    pub user_input: Option<String>,
252    pub status: String,
253    pub reward: Reward,
254}
255
256#[derive(Clone, Deserialize, Serialize, Debug)]
257pub struct Reward {
258    pub id: String,
259    pub title: String,
260    pub prompt: String,
261    pub cost: u64,
262}
263
264pub fn send_to_error_log(err: String, json: String) {
265    let now = time::SystemTime::now();
266
267    let log = format!("{:?} - {}: {}\n", now, err, json);
268    let mut error_log = get_data_directory(Some("error_log")).unwrap();
269    error_log.push("log.txt");
270
271    let mut file = OpenOptions::new().create(true).append(true).open(error_log).unwrap();
272    let _ = file.write_all(log.as_bytes());
273}
274
275fn add_user_profile_url(message_data: &mut MessageData, credentials: &Credentials) -> Result<(), Box<dyn Error>> {
276    match message_data.data {
277        SubMessage::Points(ref mut sub_message) => {
278            let id = &sub_message.redemption.user.id;
279            let user_profile = get_user_profile(id.as_str(), credentials);
280
281            if let Ok(profile_url) = user_profile {
282                sub_message.redemption.user.profile_url = profile_url;
283            }
284
285            Ok(())
286        }
287        SubMessage::Sub(_) => Ok(()),
288        SubMessage::Bits(_) => Ok(()),
289    }
290}
291
292fn handle_message(
293    message: Message,
294    user: &User,
295    tx: &Sender<ChannelMessages>,
296    credentials: &Credentials,
297) -> Result<(), Box<dyn Error>> {
298    match message {
299        Message::Text(message) => {
300            if !message.contains("MESSAGE") {
301                return Err("Not a message".into());
302            }
303
304            let socket_message = serde_json::from_str::<SocketMessage>(&message.to_string());
305            let Ok(socket_message) = socket_message else {
306                let log = socket_message.unwrap_err().to_string();
307                send_to_error_log(log, message.to_string());
308                return Err("Not a message".into());
309            };
310
311            let sub_message = &socket_message.data.message;
312            let Ok(mut sub_message) = serde_json::from_str::<MessageData>(sub_message) else {
313                send_to_error_log(sub_message.to_string(), message.to_string());
314                return Err("Not a message".into());
315            };
316
317            let _ = add_user_profile_url(&mut sub_message, credentials);
318
319            // NOTE: Send message transmission before checking for CLI commands attached
320            // to a message so that the CLI command does not block the chat log update
321            tx.send(ChannelMessages::MessageData(sub_message.clone()))?;
322
323            'commands: {
324                match sub_message.data {
325                    SubMessage::Points(ref sub_message) => {
326                        let reward = get_reward(&sub_message.redemption.reward.title);
327
328                        let found_command = if reward.is_ok() {
329                            reward
330                        } else {
331                            break 'commands;
332                        };
333
334                        let Ok(command_name) = found_command else {
335                            break 'commands;
336                        };
337
338                        let default_input = &String::new();
339                        let user_input = &sub_message.redemption.user_input.as_ref().unwrap_or(default_input);
340
341                        let command_result = Command::new(&command_name)
342                            .arg(user_input)
343                            .arg(&sub_message.redemption.user.display_name)
344                            .stdout(process::Stdio::piped())
345                            .stderr(process::Stdio::piped())
346                            .output()
347                            .expect("reward failed");
348
349                        let command_success = command_result.status.success();
350
351                        if !command_success {
352                            send_to_error_log(
353                                command_name.to_string(),
354                                format!("Error running reward command with input: {}", user_input),
355                            );
356
357                            send_to_error_log(
358                                format!("{} output: {:?}", command_name, command_result),
359                                format!("Error running reward command with input: {}", user_input),
360                            );
361
362                            if sub_message.redemption.status == "UNFULFILLED" {
363                                refund_points(sub_message, user, tx, credentials, command_result);
364                            }
365                        } else if sub_message.redemption.status == "UNFULFILLED" {
366                            reward_fulfilled(sub_message, user, credentials);
367                        }
368                    }
369
370                    SubMessage::Sub(ref _sub_message) => {}
371                    SubMessage::Bits(ref _sub_message) => {}
372                }
373            }
374
375            Ok(())
376        }
377        other => {
378            send_to_error_log(other.to_string(), "Unknown Error".into());
379            Err("Not a message".into())
380        }
381    }
382}
383
384fn reward_fulfilled(channel_points_data: &ChannelPointsData, user: &User, credentials: &Credentials) {
385    let api_url = "https://api.twitch.tv/helix/channel_points/custom_rewards/redemptions";
386    let id = &channel_points_data.redemption.id;
387    let reward_id = &channel_points_data.redemption.reward.id;
388    let response = ureq::patch(api_url)
389        .set(
390            "Authorization",
391            &format!("Bearer {}", credentials.oauth_token.replace("oauth:", "")),
392        )
393        .set("Client-Id", credentials.client_id.as_str())
394        .query_pairs(vec![
395            ("id", id.as_str()),
396            ("broadcaster_id", &user.id),
397            ("reward_id", reward_id),
398            ("status", "FULFILLED"),
399        ])
400        .call();
401
402    if response.is_err() {
403        send_to_error_log("Fulfill Error".to_string(), format!("{response:?}"));
404    }
405}
406
407fn refund_points(
408    channel_points_data: &ChannelPointsData,
409    user: &User,
410    tx: &Sender<ChannelMessages>,
411    credentials: &Credentials,
412    command_result: process::Output,
413) {
414    let api_url = "https://api.twitch.tv/helix/channel_points/custom_rewards/redemptions";
415
416    let id = &channel_points_data.redemption.id;
417    let reward_id = &channel_points_data.redemption.reward.id;
418    let response = ureq::patch(api_url)
419        .set(
420            "Authorization",
421            &format!("Bearer {}", credentials.oauth_token.replace("oauth:", "")),
422        )
423        .set("Client-Id", credentials.client_id.as_str())
424        .query_pairs(vec![
425            ("id", id.as_str()),
426            ("broadcaster_id", &user.id),
427            ("reward_id", reward_id),
428            ("status", "CANCELED"),
429        ])
430        .call();
431
432    let success = response.is_ok();
433    if !success {
434        send_to_error_log("Refund Error".to_string(), format!("{response:?}"));
435    }
436
437    let points = channel_points_data.redemption.reward.cost;
438    let result = if success { "were" } else { "could not be" };
439    let redeemer = &channel_points_data.redemption.user.display_name;
440    let message = format!("{points} points {result} refunded to {redeemer}");
441    let area = None;
442
443    let _ = tx.send(ChannelMessages::TwitchMessage(TwitchMessage::RedeemMessage {
444        message: RedeemMessage {
445            message,
446            area,
447            color: None,
448        },
449    }));
450
451    let _ = tx.send(ChannelMessages::TwitchMessage(TwitchMessage::RedeemMessage {
452        message: RedeemMessage {
453            message: String::from_utf8(command_result.stdout)
454                .expect("Invalid UTF-8")
455                .to_string(),
456            area,
457            color: None,
458        },
459    }));
460}
461
462pub struct Credentials {
463    pub oauth_token: Arc<String>,
464    pub client_id: Arc<String>,
465}
466
467pub fn connect_to_pub_sub(
468    oauth_token: Arc<String>,
469    client_id: Arc<String>,
470    tx: Sender<ChannelMessages>,
471) -> Result<(), Box<dyn Error>> {
472    let user = get_user(&oauth_token, &client_id)?;
473    let twitch_pub_sub = "wss://pubsub-edge.twitch.tv";
474
475    match connect(twitch_pub_sub) {
476        Ok((mut socket, _response)) => {
477            let channel_bits = "channel-bits-events-v2.".to_string() + &user.id;
478            // let channel_bits_unlocks = "channel-bits-badge-unlocks.".to_string() + &user.id;
479            let channel_points = "channel-points-channel-v1.".to_string() + &user.id;
480            let channel_subscribe = "channel-subscribe-events-v1.".to_string() + &user.id;
481
482            let auth_token = oauth_token.to_string().replace("oauth:", "");
483
484            let topics_message = json!({
485                "type": "LISTEN",
486                "data": {
487                    "auth_token": auth_token,
488                    "topics": [channel_bits, channel_points, channel_subscribe]
489                }
490            });
491
492            socket.send(topics_message.to_string().into()).unwrap();
493
494            let stream = socket.get_ref();
495            let timeout_duration = Duration::new(60 * 4, 0);
496            match stream {
497                tungstenite::stream::MaybeTlsStream::Plain(stream) => {
498                    let _ = stream.set_read_timeout(Some(timeout_duration));
499                }
500
501                tungstenite::stream::MaybeTlsStream::NativeTls(stream) => {
502                    match stream.get_ref().set_read_timeout(Some(timeout_duration)) {
503                        Ok(it) => it,
504                        Err(_err) => {}
505                    }
506                }
507
508                _ => {}
509            }
510
511            loop {
512                let _ = socket.send("PING".into());
513
514                match socket.read() {
515                    Ok(message) => match message {
516                        Text(message) => {
517                            let credentials = Credentials {
518                                oauth_token: oauth_token.clone(),
519                                client_id: client_id.clone(),
520                            };
521
522                            let _ = handle_message(Message::Text(message), &user, &tx, &credentials);
523                        }
524                        Close(_) => {
525                            send_to_error_log("We got a close message, reconnecting...".to_string(), "".to_string());
526
527                            return connect_to_pub_sub(oauth_token, client_id, tx);
528                        }
529
530                        Ping(_) => {}
531
532                        wtf => {
533                            send_to_error_log("HOW? Why are we here???".to_string(), wtf.to_string());
534
535                            return connect_to_pub_sub(oauth_token, client_id, tx);
536                        }
537                    },
538                    Err(error) => {
539                        send_to_error_log(error.to_string(), "Mistakes were made".to_string());
540
541                        match error {
542                            ConnectionClosed | AlreadyClosed => {
543                                return connect_to_pub_sub(oauth_token, client_id, tx);
544                            }
545
546                            Io(error) => {
547                                if error.kind() != ErrorKind::WouldBlock {
548                                    return connect_to_pub_sub(oauth_token, client_id, tx);
549                                }
550                            }
551
552                            _ => {}
553                        }
554                    }
555                }
556            }
557        }
558
559        Err(error) => {
560            send_to_error_log(error.to_string(), "Could not connect to pub sub".to_string());
561
562            connect_to_pub_sub(oauth_token, client_id, tx)
563        }
564    }
565}