1use anyhow::bail;
2use anyhow::Result;
3use futures::Future;
4use futures::Sink;
5use futures::StreamExt;
6
7use serde::Deserialize;
8
9use futures::Stream;
10
11use std::pin::Pin;
12use std::task::Context;
13use std::task::Poll;
14
15#[derive(Debug, Deserialize)]
16pub struct MessageMetadata {
17 pub message_id: String,
18 pub message_timestamp: String,
19 pub message_type: String,
20 pub subscription_type: Option<String>,
21 pub subscription_version: Option<String>,
22}
23
24#[derive(Debug, Deserialize)]
25pub struct Message {
26 pub metadata: MessageMetadata,
27 pub payload: serde_json::Value,
28}
29
30#[derive(Debug, Deserialize)]
31pub struct SessionWelcomeSession {
32 pub id: String,
33 pub connected_at: String,
34 pub status: String,
35 pub reconnect_url: Option<String>,
36 pub keepalive_timeout_seconds: i64,
37}
38
39#[derive(Debug, Deserialize)]
40pub struct SessionWelcome {
41 pub session: SessionWelcomeSession,
42}
43
44#[derive(Debug, Deserialize)]
45pub struct Notification {
46 pub subscription: serde_json::Value,
47 pub event: serde_json::Value,
48}
49
50#[derive(Debug, Deserialize)]
51pub struct ChannelUpdate {
52 pub broadcaster_user_id: String,
53 pub broadcaster_user_login: String,
54 pub broadcaster_user_name: String,
55 pub title: String,
56 pub language: String,
57 pub category_id: String,
58 pub category_name: String,
59 pub content_classification_labels: Vec<String>,
60}
61
62#[derive(Debug, Deserialize)]
63pub struct CustomRewardRedemptionAddReward {
64 pub id: String,
65}
66
67#[derive(Debug, Deserialize)]
68pub struct CustomRewardRedemptionAdd {
69 pub id: String,
70 pub user_login: String,
71 pub user_input: String,
72 pub reward: CustomRewardRedemptionAddReward,
73}
74
75#[derive(Debug, Deserialize)]
76pub struct StreamOnline {
77 pub id: String,
78 pub broadcaster_user_id: String,
79 pub broadcaster_user_login: String,
80 pub broadcaster_user_name: String,
81 pub r#type: String,
82 pub started_at: String,
83}
84
85#[derive(Debug, Deserialize)]
86pub struct StreamOffline {
87 pub broadcaster_user_id: String,
88 pub broadcaster_user_login: String,
89 pub broadcaster_user_name: String,
90}
91
92#[derive(Debug, Deserialize)]
93pub enum NotificationType {
94 ChannelUpdate(ChannelUpdate),
95 CustomRewardRedemptionAdd(CustomRewardRedemptionAdd),
96 StreamOnline(StreamOnline),
97 StreamOffline(StreamOffline),
98}
99
100pub struct Client {
101 inner_stream: Pin<
102 Box<
103 tokio_tungstenite::WebSocketStream<
104 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
105 >,
106 >,
107 >,
108 ping_sleep: Pin<Box<tokio::time::Sleep>>,
109}
110
111impl Stream for Client {
112 type Item = NotificationType;
113
114 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115 let this = self.get_mut();
116 let mut inner_stream = this.inner_stream.as_mut();
117
118 match this.ping_sleep.as_mut().poll(cx) {
119 Poll::Pending => {}
120 Poll::Ready(..) => {
121 this.ping_sleep
122 .as_mut()
123 .reset(tokio::time::Instant::now() + tokio::time::Duration::from_secs(30));
124
125 match inner_stream.as_mut().start_send(
126 tokio_tungstenite::tungstenite::protocol::Message::Ping(vec![]),
127 ) {
128 Err(..) => return Poll::Ready(None),
129 _ => {}
130 };
131 }
132 };
133
134 loop {
135 match inner_stream.as_mut().poll_next(cx) {
136 Poll::Pending => return Poll::Pending,
137 Poll::Ready(v) => match v {
138 Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Ping(..))) => {
139 match inner_stream.as_mut().start_send(
140 tokio_tungstenite::tungstenite::protocol::Message::Pong(vec![]),
141 ) {
142 Ok(()) => continue,
143 Err(..) => break,
144 };
145 }
146 Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => {
147 let message: Message = match serde_json::from_str(&text) {
148 Ok(v) => v,
149 Err(..) => break,
150 };
151
152 match message.metadata.message_type.as_str() {
153 "notification" => {
154 let subtype = match &message.metadata.subscription_type {
155 Some(v) => v,
156 None => break,
157 };
158
159 let notification: Notification =
160 match serde_json::from_value(message.payload.clone()) {
161 Ok(v) => v,
162 Err(..) => break,
163 };
164
165 match subtype.as_str() {
166 "channel.update" => {
167 let event: ChannelUpdate =
168 match serde_json::from_value(notification.event) {
169 Ok(v) => v,
170 Err(..) => break,
171 };
172
173 return Poll::Ready(Some(NotificationType::ChannelUpdate(
174 event,
175 )));
176 }
177 "channel.channel_points_custom_reward_redemption.add" => {
178 let event: CustomRewardRedemptionAdd =
179 match serde_json::from_value(notification.event) {
180 Ok(v) => v,
181 Err(..) => break,
182 };
183
184 return Poll::Ready(Some(
185 NotificationType::CustomRewardRedemptionAdd(event),
186 ));
187 }
188 "stream.online" => {
189 let event: StreamOnline =
190 match serde_json::from_value(notification.event) {
191 Ok(v) => v,
192 Err(..) => break,
193 };
194
195 return Poll::Ready(Some(NotificationType::StreamOnline(
196 event,
197 )));
198 }
199 "stream.offline" => {
200 let event: StreamOffline =
201 match serde_json::from_value(notification.event) {
202 Ok(v) => v,
203 Err(..) => break,
204 };
205
206 return Poll::Ready(Some(NotificationType::StreamOffline(
207 event,
208 )));
209 }
210 _ => return Poll::Pending,
211 }
212 }
213 _ => continue,
214 }
215 }
216 Some(..) => continue,
217 None => break,
218 },
219 }
220 }
221
222 Poll::Ready(None)
223 }
224}
225
226impl<T: crate::auth::TokenStorage> crate::helix::Client<T> {
227 pub async fn connect_eventsub(&mut self, topics: Vec<(String, String)>) -> Result<Client> {
228 let (mut ws_stream, _) =
229 match tokio_tungstenite::connect_async("wss://eventsub.wss.twitch.tv/ws").await {
230 Ok(v) => v,
231 Err(e) => return Err(e.into()),
232 };
233
234 let welcome = loop {
235 let msg = ws_stream.next().await;
236 match msg {
237 Some(Ok(tokio_tungstenite::tungstenite::protocol::Message::Text(text))) => {
238 let message: Message = match serde_json::from_str(&text) {
239 Ok(v) => v,
240 Err(e) => return Err(e.into()),
241 };
242
243 if message.metadata.message_type.as_str() != "session_welcome" {
244 bail!("No session welcome");
245 }
246
247 let welcome: SessionWelcome =
248 match serde_json::from_value(message.payload.clone()) {
249 Ok(v) => v,
250 Err(e) => return Err(e.into()),
251 };
252
253 break welcome;
254 }
255 Some(Err(e)) => return Err(e.into()),
256 Some(..) => {}
257 None => bail!("WebSocket dropped"),
258 }
259 };
260
261 let broadcaster_id = match self.get_token_user_id().await {
262 Ok(v) => v,
263 Err(..) => bail!("No token user id"),
264 };
265 for (subtype, version) in topics.into_iter() {
266 match self
267 .create_eventsub_subscription(&crate::helix::EventSubCreate {
268 r#type: subtype,
269 version: version,
270 condition: crate::helix::EventSubCondition {
271 broadcaster_id: Some(broadcaster_id.clone()),
272 broadcaster_user_id: Some(broadcaster_id.clone()),
273 moderator_user_id: Some(broadcaster_id.clone()),
274 user_id: Some(broadcaster_id.clone()),
275 ..Default::default()
276 },
277 transport: crate::helix::EventSubTransport {
278 method: "websocket".to_string(),
279 session_id: Some(welcome.session.id.clone()),
280 ..Default::default()
281 },
282 })
283 .await
284 {
285 Ok(..) => {}
286 Err(..) => {
287 bail!("create_eventsub_subscription failed")
288 }
289 };
290 }
291
292 Ok(Client {
293 inner_stream: Pin::new(Box::new(ws_stream)),
294 ping_sleep: Box::pin(tokio::time::sleep(tokio::time::Duration::from_secs(30))),
295 })
296 }
297}