tw_api/
eventsub.rs

1use anyhow::bail;
2use anyhow::Result;
3use futures::Future;
4use futures::Sink;
5use futures::StreamExt;
6
7use serde::Deserialize;
8
9use futures::Stream;
10
11use std::pin::Pin;
12use std::task::Context;
13use std::task::Poll;
14
15#[derive(Debug, Deserialize)]
16pub struct MessageMetadata {
17    pub message_id: String,
18    pub message_timestamp: String,
19    pub message_type: String,
20    pub subscription_type: Option<String>,
21    pub subscription_version: Option<String>,
22}
23
24#[derive(Debug, Deserialize)]
25pub struct Message {
26    pub metadata: MessageMetadata,
27    pub payload: serde_json::Value,
28}
29
30#[derive(Debug, Deserialize)]
31pub struct SessionWelcomeSession {
32    pub id: String,
33    pub connected_at: String,
34    pub status: String,
35    pub reconnect_url: Option<String>,
36    pub keepalive_timeout_seconds: i64,
37}
38
39#[derive(Debug, Deserialize)]
40pub struct SessionWelcome {
41    pub session: SessionWelcomeSession,
42}
43
44#[derive(Debug, Deserialize)]
45pub struct Notification {
46    pub subscription: serde_json::Value,
47    pub event: serde_json::Value,
48}
49
50#[derive(Debug, Deserialize)]
51pub struct ChannelUpdate {
52    pub broadcaster_user_id: String,
53    pub broadcaster_user_login: String,
54    pub broadcaster_user_name: String,
55    pub title: String,
56    pub language: String,
57    pub category_id: String,
58    pub category_name: String,
59    pub content_classification_labels: Vec<String>,
60}
61
62#[derive(Debug, Deserialize)]
63pub struct CustomRewardRedemptionAddReward {
64    pub id: String,
65}
66
67#[derive(Debug, Deserialize)]
68pub struct CustomRewardRedemptionAdd {
69    pub id: String,
70    pub user_login: String,
71    pub user_input: String,
72    pub reward: CustomRewardRedemptionAddReward,
73}
74
75#[derive(Debug, Deserialize)]
76pub struct StreamOnline {
77    pub id: String,
78    pub broadcaster_user_id: String,
79    pub broadcaster_user_login: String,
80    pub broadcaster_user_name: String,
81    pub r#type: String,
82    pub started_at: String,
83}
84
85#[derive(Debug, Deserialize)]
86pub struct StreamOffline {
87    pub broadcaster_user_id: String,
88    pub broadcaster_user_login: String,
89    pub broadcaster_user_name: String,
90}
91
92#[derive(Debug, Deserialize)]
93pub enum NotificationType {
94    ChannelUpdate(ChannelUpdate),
95    CustomRewardRedemptionAdd(CustomRewardRedemptionAdd),
96    StreamOnline(StreamOnline),
97    StreamOffline(StreamOffline),
98}
99
100pub struct Client {
101    inner_stream: Pin<
102        Box<
103            tokio_tungstenite::WebSocketStream<
104                tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
105            >,
106        >,
107    >,
108    ping_sleep: Pin<Box<tokio::time::Sleep>>,
109}
110
111impl Stream for Client {
112    type Item = NotificationType;
113
114    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115        let this = self.get_mut();
116        let mut inner_stream = this.inner_stream.as_mut();
117
118        match this.ping_sleep.as_mut().poll(cx) {
119            Poll::Pending => {}
120            Poll::Ready(..) => {
121                this.ping_sleep
122                    .as_mut()
123                    .reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(30));
124
125                match inner_stream.as_mut().start_send(
126                    tokio_tungstenite::tungstenite::protocol::Message::Ping(vec![]),
127                ) {
128                    Err(..) => return Poll::Ready(None),
129                    _ => {}
130                };
131            }
132        };
133
134        loop {
135            match inner_stream.as_mut().poll_next(cx) {
136                Poll::Pending => return Poll::Pending,
137                Poll::Ready(v) => match v {
138                    Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Ping(..))) => {
139                        match inner_stream.as_mut().start_send(
140                            tokio_tungstenite::tungstenite::protocol::Message::Pong(vec![]),
141                        ) {
142                            Ok(()) => continue,
143                            Err(..) => break,
144                        };
145                    }
146                    Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => {
147                        let message: Message = match serde_json::from_str(&text) {
148                            Ok(v) => v,
149                            Err(..) => break,
150                        };
151
152                        match message.metadata.message_type.as_str() {
153                            "notification" => {
154                                let subtype = match &message.metadata.subscription_type {
155                                    Some(v) => v,
156                                    None => break,
157                                };
158
159                                let notification: Notification =
160                                    match serde_json::from_value(message.payload.clone()) {
161                                        Ok(v) => v,
162                                        Err(..) => break,
163                                    };
164
165                                match subtype.as_str() {
166                                    "channel.update" => {
167                                        let event: ChannelUpdate =
168                                            match serde_json::from_value(notification.event) {
169                                                Ok(v) => v,
170                                                Err(..) => break,
171                                            };
172
173                                        return Poll::Ready(Some(NotificationType::ChannelUpdate(
174                                            event,
175                                        )));
176                                    }
177                                    "channel.channel_points_custom_reward_redemption.add" => {
178                                        let event: CustomRewardRedemptionAdd =
179                                            match serde_json::from_value(notification.event) {
180                                                Ok(v) => v,
181                                                Err(..) => break,
182                                            };
183
184                                        return Poll::Ready(Some(
185                                            NotificationType::CustomRewardRedemptionAdd(event),
186                                        ));
187                                    }
188                                    "stream.online" => {
189                                        let event: StreamOnline =
190                                            match serde_json::from_value(notification.event) {
191                                                Ok(v) => v,
192                                                Err(..) => break,
193                                            };
194
195                                        return Poll::Ready(Some(NotificationType::StreamOnline(
196                                            event,
197                                        )));
198                                    }
199                                    "stream.offline" => {
200                                        let event: StreamOffline =
201                                            match serde_json::from_value(notification.event) {
202                                                Ok(v) => v,
203                                                Err(..) => break,
204                                            };
205
206                                        return Poll::Ready(Some(NotificationType::StreamOffline(
207                                            event,
208                                        )));
209                                    }
210                                    _ => return Poll::Pending,
211                                }
212                            }
213                            _ => continue,
214                        }
215                    }
216                    Some(..) => continue,
217                    None => break,
218                },
219            }
220        }
221
222        Poll::Ready(None)
223    }
224}
225
226impl<T: crate::auth::TokenStorage> crate::helix::Client<T> {
227    pub async fn connect_eventsub(&mut self, topics: Vec<(String, String)>) -> Result<Client> {
228        let (mut ws_stream, _) =
229            match tokio_tungstenite::connect_async("wss://eventsub.wss.twitch.tv/ws").await {
230                Ok(v) => v,
231                Err(e) => return Err(e.into()),
232            };
233
234        let welcome = loop {
235            let msg = ws_stream.next().await;
236            match msg {
237                Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => {
238                    let message: Message = match serde_json::from_str(&text) {
239                        Ok(v) => v,
240                        Err(e) => return Err(e.into()),
241                    };
242
243                    if message.metadata.message_type.as_str() != "session_welcome" {
244                        bail!("No session welcome");
245                    }
246
247                    let welcome: SessionWelcome =
248                        match serde_json::from_value(message.payload.clone()) {
249                            Ok(v) => v,
250                            Err(e) => return Err(e.into()),
251                        };
252
253                    break welcome;
254                }
255                Some(Err(e)) => return Err(e.into()),
256                Some(..) => {}
257                None => bail!("WebSocket dropped"),
258            }
259        };
260
261        let broadcaster_id = match self.get_token_user_id().await {
262            Ok(v) => v,
263            Err(..) => bail!("No token user id"),
264        };
265        for (subtype, version) in topics.into_iter() {
266            match self
267                .create_eventsub_subscription(&crate::helix::EventSubCreate {
268                    r#type: subtype,
269                    version: version,
270                    condition: crate::helix::EventSubCondition {
271                        broadcaster_id: Some(broadcaster_id.clone()),
272                        broadcaster_user_id: Some(broadcaster_id.clone()),
273                        moderator_user_id: Some(broadcaster_id.clone()),
274                        user_id: Some(broadcaster_id.clone()),
275                        ..Default::default()
276                    },
277                    transport: crate::helix::EventSubTransport {
278                        method: "websocket".to_string(),
279                        session_id: Some(welcome.session.id.clone()),
280                        ..Default::default()
281                    },
282                })
283                .await
284            {
285                Ok(..) => {}
286                Err(..) => {
287                    bail!("create_eventsub_subscription failed")
288                }
289            };
290        }
291
292        Ok(Client {
293            inner_stream: Pin::new(Box::new(ws_stream)),
294            ping_sleep: Box::pin(tokio::time::sleep(tokio::time::Duration::from_secs(30))),
295        })
296    }
297}