1use std::io::ErrorKind;
2use std::process::{self, Command};
3use std::sync::mpsc::Sender;
4use std::sync::Arc;
5use std::time::{self, Duration};
6use std::vec;
7use std::{error::Error, fs::OpenOptions, io::Write};
8
9use ratatui::buffer::Buffer;
10use ratatui::prelude::*;
11use ratatui::widgets::{Block, Widget};
12use serde::{Deserialize, Serialize};
13use serde_json::json;
14use tungstenite::connect;
15use tungstenite::Error::{AlreadyClosed, ConnectionClosed, Io};
16use tungstenite::Message::{self, Close, Ping, Text};
17
18use crate::commands::{get_action, get_reward};
19use crate::tui::{MessageParts, Symbol};
20use crate::utils::get_data_directory;
21
22use super::api::{get_user, get_user_profile, User};
23use super::parse::{
24 get_lines, get_message_symbols, get_screen_lines, write_to_buffer, RedeemMessage, RenderCursor, TwitchMessage,
25};
26use super::ChannelMessages;
27
28#[derive(Deserialize, Serialize, Debug)]
29pub struct SocketMessage {
30 r#type: String,
31 data: SocketMessageData,
32}
33
34#[derive(Deserialize, Serialize, Debug)]
35pub struct SocketMessageData {
36 topic: String,
37 message: String,
38}
39
40#[derive(Clone, Deserialize, Serialize, Debug)]
41pub struct MessageData {
42 pub data: SubMessage,
43}
44
45#[derive(Clone, Deserialize, Serialize, Debug)]
46#[serde(untagged)]
47pub enum SubMessage {
48 Points(Box<ChannelPointsData>),
49 Sub(SubscribeEvent),
50 Bits(BitsEvent),
51 }
54
55#[derive(Clone, Deserialize, Serialize, Debug)]
56pub struct BitsEvent {
57 #[serde(skip)]
58 pub area: Option<Rect>,
59 pub is_anonymous: bool,
60 pub message_type: String,
61 pub data: BitsEventData,
62}
63
64impl Widget for &mut BitsEvent {
65 fn render(self, area: Rect, buf: &mut Buffer) {
66 let bits_from = if self.is_anonymous {
67 "Anonymous"
68 } else {
69 &self.data.user_name
70 };
71
72 let message = format!(
73 "{} has cheered {} bits. They have cheered a total of {} bits in this channel.",
74 bits_from, self.data.bits_used, self.data.total_bits_used
75 );
76
77 let mut cursor = RenderCursor {
78 x: area.left(),
79 y: area.bottom().saturating_sub(1),
80 };
81
82 let bits_details_message: Vec<Symbol> = get_message_symbols(&message, &mut [], Some((128, 128, 128)));
84
85 let mut line_area = area;
87 line_area.width = area.width - 4;
88
89 let mut lines: Vec<Vec<MessageParts>> = get_lines(&bits_details_message, &line_area);
90
91 let message_symbols: Vec<Symbol> = get_message_symbols(&self.data.chat_message, &mut [], Some((255, 255, 255)));
93 let mut message_lines: Vec<Vec<MessageParts>> = get_lines(&message_symbols, &line_area);
95
96 lines.append(&mut message_lines);
98
99 let mut screen_lines = get_screen_lines(&mut lines, &line_area);
100
101 cursor.x = area.left() + 1;
103 cursor.y = cursor.y.saturating_sub(lines.len() as u16);
104
105 write_to_buffer(&mut screen_lines, buf, &mut cursor);
106
107 cursor.x = 0;
108 cursor.y -= screen_lines.len() as u16;
109
110 let block_area = Rect {
111 x: 0,
112 y: cursor.y.saturating_sub(1),
113 width: area.width.saturating_sub(2),
114 height: screen_lines.len() as u16 + 2,
115 };
116
117 Block::bordered()
119 .border_set(symbols::border::ROUNDED)
120 .border_style(Style::reset().fg(Color::Rgb(138, 43, 226)))
121 .title("♦️ Cheer!")
122 .render(block_area, buf);
123
124 self.area = Some(Rect {
125 x: 0,
126 y: cursor.y,
127 width: area.width,
128 height: screen_lines.len() as u16,
129 });
130 }
131}
132
133#[derive(Clone, Deserialize, Serialize, Debug)]
134pub struct BitsEventData {
135 pub user_name: String,
136 pub chat_message: String,
137 pub bits_used: u64,
138 pub total_bits_used: u64,
139 pub context: String, }
141
142#[derive(Clone, Deserialize, Serialize, Debug)]
143pub struct SubscribeEvent {
144 #[serde(skip)]
145 pub area: Option<Rect>,
146 pub topic: String,
147 pub message: SubscribeMessage,
148}
149
150impl Widget for &mut SubscribeEvent {
151 fn render(self, area: Rect, buf: &mut Buffer) {
152 let message = format!(
153 "{} has subscribed for {} months, currently on a {} month streak.",
154 self.message.display_name,
155 self.message.cumulative_months,
156 self.message.streak_months,
157 );
159
160 let mut cursor = RenderCursor {
161 x: area.left(),
162 y: area.bottom().saturating_sub(1),
163 };
164
165 let sub_details_symbols: Vec<Symbol> = get_message_symbols(&message, &mut [], Some((128, 128, 128)));
167
168 let mut line_area = area;
170 line_area.width = area.width - 4;
171
172 let mut lines: Vec<Vec<MessageParts>> = get_lines(&sub_details_symbols, &line_area);
173
174 let message_symbols: Vec<Symbol> =
176 get_message_symbols(&self.message.sub_message, &mut [], Some((255, 255, 255)));
177 let mut message_lines: Vec<Vec<MessageParts>> = get_lines(&message_symbols, &line_area);
179
180 lines.append(&mut message_lines);
182
183 let mut screen_lines = get_screen_lines(&mut lines, &line_area);
184
185 cursor.x = area.left() + 1;
187 cursor.y = cursor.y.saturating_sub(lines.len() as u16);
188
189 write_to_buffer(&mut screen_lines, buf, &mut cursor);
190
191 cursor.x = 0;
192 cursor.y -= screen_lines.len() as u16;
193
194 let block_area = Rect {
195 x: 0,
196 y: cursor.y - 1,
197 width: area.width - 2,
198 height: screen_lines.len() as u16 + 2,
199 };
200
201 let (sub_icon, sub_desc) = if self.message.context == "subgift" {
202 ('🎁', "was gifted a sub!")
203 } else if self.message.context == "resub" {
204 ('📅', "has resubbed!")
205 } else {
206 ('🎉', "has subbed!")
207 };
208
209 Block::bordered()
210 .border_set(symbols::border::ROUNDED)
211 .border_style(Style::reset().fg(Color::LightBlue))
212 .title(format!("{}{} {}", sub_icon, self.message.display_name, sub_desc))
213 .render(block_area, buf);
214
215 self.area = Some(Rect {
216 x: 0,
217 y: cursor.y,
218 width: area.width,
219 height: screen_lines.len() as u16,
220 });
221 }
222}
223
224#[derive(Clone, Deserialize, Serialize, Debug)]
225pub struct SubscribeMessage {
226 pub display_name: String, pub cumulative_months: u64, pub streak_months: u64, pub context: String, pub sub_message: String, }
232
233#[derive(Clone, Deserialize, Serialize, Debug)]
234pub struct ChannelPointsData {
235 pub timestamp: String,
236 pub redemption: Redemption,
237}
238
239#[derive(Clone, Deserialize, Serialize, Debug)]
240pub struct UserReference {
241 pub id: String,
242 pub login: String,
243 pub display_name: String,
244 pub profile_url: Option<String>,
245}
246
247#[derive(Clone, Deserialize, Serialize, Debug)]
248pub struct Redemption {
249 pub id: String,
250 pub user: UserReference,
251 pub user_input: Option<String>,
252 pub status: String,
253 pub reward: Reward,
254}
255
256#[derive(Clone, Deserialize, Serialize, Debug)]
257pub struct Reward {
258 pub id: String,
259 pub title: String,
260 pub prompt: String,
261 pub cost: u64,
262}
263
264pub fn send_to_error_log(err: String, json: String) {
265 let now = time::SystemTime::now();
266
267 let log = format!("{:?} - {}: {}\n", now, err, json);
268 let mut error_log = get_data_directory(Some("error_log")).unwrap();
269 error_log.push("log.txt");
270
271 let mut file = OpenOptions::new().create(true).append(true).open(error_log).unwrap();
272 let _ = file.write_all(log.as_bytes());
273}
274
275fn add_user_profile_url(message_data: &mut MessageData, credentials: &Credentials) -> Result<(), Box<dyn Error>> {
276 match message_data.data {
277 SubMessage::Points(ref mut sub_message) => {
278 let id = &sub_message.redemption.user.id;
279 let user_profile = get_user_profile(id.as_str(), credentials);
280
281 if let Ok(profile_url) = user_profile {
282 sub_message.redemption.user.profile_url = profile_url;
283 }
284
285 Ok(())
286 }
287 SubMessage::Sub(_) => Ok(()),
288 SubMessage::Bits(_) => Ok(()),
289 }
290}
291
292fn handle_message(
293 message: Message,
294 user: &User,
295 tx: &Sender<ChannelMessages>,
296 credentials: &Credentials,
297) -> Result<(), Box<dyn Error>> {
298 match message {
299 Message::Text(message) => {
300 if !message.contains("MESSAGE") {
301 return Err("Not a message".into());
302 }
303
304 let socket_message = serde_json::from_str::<SocketMessage>(&message.to_string());
305 let Ok(socket_message) = socket_message else {
306 let log = socket_message.unwrap_err().to_string();
307 send_to_error_log(log, message.to_string());
308 return Err("Not a message".into());
309 };
310
311 let sub_message = &socket_message.data.message;
312 let Ok(mut sub_message) = serde_json::from_str::<MessageData>(sub_message) else {
313 send_to_error_log(sub_message.to_string(), message.to_string());
314 return Err("Not a message".into());
315 };
316
317 let _ = add_user_profile_url(&mut sub_message, credentials);
318
319 tx.send(ChannelMessages::MessageData(sub_message.clone()))?;
322
323 'commands: {
324 match sub_message.data {
325 SubMessage::Points(ref sub_message) => {
326 let reward = get_reward(&sub_message.redemption.reward.title);
327
328 let found_command = if reward.is_ok() {
329 reward
330 } else {
331 break 'commands;
332 };
333
334 let Ok(command_name) = found_command else {
335 break 'commands;
336 };
337
338 let default_input = &String::new();
339 let user_input = &sub_message.redemption.user_input.as_ref().unwrap_or(default_input);
340
341 let command_result = Command::new(&command_name)
342 .arg(user_input)
343 .arg(&sub_message.redemption.user.display_name)
344 .stdout(process::Stdio::piped())
345 .stderr(process::Stdio::piped())
346 .output()
347 .expect("reward failed");
348
349 let command_success = command_result.status.success();
350
351 if !command_success {
352 send_to_error_log(
353 command_name.to_string(),
354 format!("Error running reward command with input: {}", user_input),
355 );
356
357 send_to_error_log(
358 format!("{} output: {:?}", command_name, command_result),
359 format!("Error running reward command with input: {}", user_input),
360 );
361
362 if sub_message.redemption.status == "UNFULFILLED" {
363 refund_points(sub_message, user, tx, credentials, command_result);
364 }
365 } else if sub_message.redemption.status == "UNFULFILLED" {
366 reward_fulfilled(sub_message, user, credentials);
367 }
368 }
369
370 SubMessage::Sub(ref _sub_message) => {}
371 SubMessage::Bits(ref _sub_message) => {}
372 }
373 }
374
375 Ok(())
376 }
377 other => {
378 send_to_error_log(other.to_string(), "Unknown Error".into());
379 Err("Not a message".into())
380 }
381 }
382}
383
384fn reward_fulfilled(channel_points_data: &ChannelPointsData, user: &User, credentials: &Credentials) {
385 let api_url = "https://api.twitch.tv/helix/channel_points/custom_rewards/redemptions";
386 let id = &channel_points_data.redemption.id;
387 let reward_id = &channel_points_data.redemption.reward.id;
388 let response = ureq::patch(api_url)
389 .set(
390 "Authorization",
391 &format!("Bearer {}", credentials.oauth_token.replace("oauth:", "")),
392 )
393 .set("Client-Id", credentials.client_id.as_str())
394 .query_pairs(vec![
395 ("id", id.as_str()),
396 ("broadcaster_id", &user.id),
397 ("reward_id", reward_id),
398 ("status", "FULFILLED"),
399 ])
400 .call();
401
402 if response.is_err() {
403 send_to_error_log("Fulfill Error".to_string(), format!("{response:?}"));
404 }
405}
406
407fn refund_points(
408 channel_points_data: &ChannelPointsData,
409 user: &User,
410 tx: &Sender<ChannelMessages>,
411 credentials: &Credentials,
412 command_result: process::Output,
413) {
414 let api_url = "https://api.twitch.tv/helix/channel_points/custom_rewards/redemptions";
415
416 let id = &channel_points_data.redemption.id;
417 let reward_id = &channel_points_data.redemption.reward.id;
418 let response = ureq::patch(api_url)
419 .set(
420 "Authorization",
421 &format!("Bearer {}", credentials.oauth_token.replace("oauth:", "")),
422 )
423 .set("Client-Id", credentials.client_id.as_str())
424 .query_pairs(vec![
425 ("id", id.as_str()),
426 ("broadcaster_id", &user.id),
427 ("reward_id", reward_id),
428 ("status", "CANCELED"),
429 ])
430 .call();
431
432 let success = response.is_ok();
433 if !success {
434 send_to_error_log("Refund Error".to_string(), format!("{response:?}"));
435 }
436
437 let points = channel_points_data.redemption.reward.cost;
438 let result = if success { "were" } else { "could not be" };
439 let redeemer = &channel_points_data.redemption.user.display_name;
440 let message = format!("{points} points {result} refunded to {redeemer}");
441 let area = None;
442
443 let _ = tx.send(ChannelMessages::TwitchMessage(TwitchMessage::RedeemMessage {
444 message: RedeemMessage {
445 message,
446 area,
447 color: None,
448 },
449 }));
450
451 let _ = tx.send(ChannelMessages::TwitchMessage(TwitchMessage::RedeemMessage {
452 message: RedeemMessage {
453 message: String::from_utf8(command_result.stdout)
454 .expect("Invalid UTF-8")
455 .to_string(),
456 area,
457 color: None,
458 },
459 }));
460}
461
462pub struct Credentials {
463 pub oauth_token: Arc<String>,
464 pub client_id: Arc<String>,
465}
466
467pub fn connect_to_pub_sub(
468 oauth_token: Arc<String>,
469 client_id: Arc<String>,
470 tx: Sender<ChannelMessages>,
471) -> Result<(), Box<dyn Error>> {
472 let user = get_user(&oauth_token, &client_id)?;
473 let twitch_pub_sub = "wss://pubsub-edge.twitch.tv";
474
475 match connect(twitch_pub_sub) {
476 Ok((mut socket, _response)) => {
477 let channel_bits = "channel-bits-events-v2.".to_string() + &user.id;
478 let channel_points = "channel-points-channel-v1.".to_string() + &user.id;
480 let channel_subscribe = "channel-subscribe-events-v1.".to_string() + &user.id;
481
482 let auth_token = oauth_token.to_string().replace("oauth:", "");
483
484 let topics_message = json!({
485 "type": "LISTEN",
486 "data": {
487 "auth_token": auth_token,
488 "topics": [channel_bits, channel_points, channel_subscribe]
489 }
490 });
491
492 socket.send(topics_message.to_string().into()).unwrap();
493
494 let stream = socket.get_ref();
495 let timeout_duration = Duration::new(60 * 4, 0);
496 match stream {
497 tungstenite::stream::MaybeTlsStream::Plain(stream) => {
498 let _ = stream.set_read_timeout(Some(timeout_duration));
499 }
500
501 tungstenite::stream::MaybeTlsStream::NativeTls(stream) => {
502 match stream.get_ref().set_read_timeout(Some(timeout_duration)) {
503 Ok(it) => it,
504 Err(_err) => {}
505 }
506 }
507
508 _ => {}
509 }
510
511 loop {
512 let _ = socket.send("PING".into());
513
514 match socket.read() {
515 Ok(message) => match message {
516 Text(message) => {
517 let credentials = Credentials {
518 oauth_token: oauth_token.clone(),
519 client_id: client_id.clone(),
520 };
521
522 let _ = handle_message(Message::Text(message), &user, &tx, &credentials);
523 }
524 Close(_) => {
525 send_to_error_log("We got a close message, reconnecting...".to_string(), "".to_string());
526
527 return connect_to_pub_sub(oauth_token, client_id, tx);
528 }
529
530 Ping(_) => {}
531
532 wtf => {
533 send_to_error_log("HOW? Why are we here???".to_string(), wtf.to_string());
534
535 return connect_to_pub_sub(oauth_token, client_id, tx);
536 }
537 },
538 Err(error) => {
539 send_to_error_log(error.to_string(), "Mistakes were made".to_string());
540
541 match error {
542 ConnectionClosed | AlreadyClosed => {
543 return connect_to_pub_sub(oauth_token, client_id, tx);
544 }
545
546 Io(error) => {
547 if error.kind() != ErrorKind::WouldBlock {
548 return connect_to_pub_sub(oauth_token, client_id, tx);
549 }
550 }
551
552 _ => {}
553 }
554 }
555 }
556 }
557 }
558
559 Err(error) => {
560 send_to_error_log(error.to_string(), "Could not connect to pub sub".to_string());
561
562 connect_to_pub_sub(oauth_token, client_id, tx)
563 }
564 }
565}