1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
//! Holds serializable EventSub stuff
//!
//! Use [`CreateEventSubSubscription`](crate::helix::eventsub::CreateEventSubSubscription) to subscribe to an event according to the [EventSub guide](https://dev.twitch.tv/docs/eventsub).
//! Parse the response payload text with [`Payload::parse`] or the .
//!
//! # Example
//!
//! You've used [`CreateEventSubSubscription`](crate::helix::eventsub::CreateEventSubSubscription) to create a subscription for [`user.authorization.revoke`](EventType::UserAuthorizationRevoke), after verifying your callback accordingly you will then get events sent to the callback
//!
//! To parse these, use [`Payload::parse`]
//!
//! ```rust
//! use twitch_api2::eventsub::Payload;
//! let payload = r#"{
//!     "subscription": {
//!         "id": "f1c2a387-161a-49f9-a165-0f21d7a4e1c4",
//!         "type": "user.authorization.revoke",
//!         "version": "1",
//!         "condition": {
//!             "client_id": "crq72vsaoijkc83xx42hz6i37"
//!         },
//!          "transport": {
//!             "method": "webhook",
//!             "callback": "https://example.com/webhooks/callback"
//!         },
//!         "created_at": "2019-11-16T10:11:12.123Z"
//!     },
//!     "event": {
//!         "client_id": "crq72vsaoijkc83xx42hz6i37",
//!         "user_id": "1337",
//!         "user_name": "cool_user"
//!     }
//! }"#;
//!
//! let payload = Payload::parse(payload).unwrap();
//! match payload {
//!     Payload::UserAuthorizationRevokeV1(p) => {
//!         println!("User with id `{}` has revoked access to client `{}`",
//!             p.event.user_id,
//!             p.event.client_id
//!         )
//!     }
//!     _ => { panic!() }
//! }
//! ```

use crate::types;
use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize};

pub mod channel;
pub mod stream;
pub mod user;

/// An EventSub subscription.
pub trait EventSubscription: DeserializeOwned + Serialize + PartialEq {
    /// Payload for given subscription
    type Payload: PartialEq + std::fmt::Debug + DeserializeOwned + Serialize;

    /// Scopes needed by this subscription
    #[cfg(feature = "twitch_oauth2")]
    const SCOPE: &'static [twitch_oauth2::Scope];
    /// Optional scopes needed by this subscription
    #[cfg(feature = "twitch_oauth2")]
    const OPT_SCOPE: &'static [twitch_oauth2::Scope] = &[];
    /// Subscription type version
    const VERSION: &'static str;
    /// Subscription type name.
    const EVENT_TYPE: EventType;

    /// Creates the [`condition`](https://dev.twitch.tv/docs/eventsub/eventsub-reference#conditions) for this EventSub subscription
    fn condition(&self) -> Result<serde_json::Value, serde_json::Error> {
        serde_json::to_value(self)
    }
}
/// Verification Request
#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)]
#[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
#[non_exhaustive]
pub struct VerificationRequest {
    /// Challenge string.
    ///
    /// After verifying that the response is legit, send back this challenge.
    pub challenge: String,
    /// Information about subscription, including ID
    pub subscription: EventSubSubscription,
    // /// Signature of message
    // pub signature: String,
    // /// ID of subscription, also contained in [`subscription`](VerificationRequest::subscription)
    // pub id: types::EventSubId,
}

/// Subscription payload. Received on events. Enumerates all possible [`NotificationPayload`s](NotificationPayload)
///
/// Use [`Payload::parse`] to construct
#[derive(PartialEq, Debug, Serialize, Deserialize)] // FIXME: Clone?
#[serde(remote = "Self")]
#[allow(clippy::large_enum_variant)]
pub enum Payload {
    /// Webhook Callback Verification
    VerificationRequest(VerificationRequest),
    /// Channel Update V1 Event
    ChannelUpdateV1(NotificationPayload<channel::ChannelUpdateV1>),
    /// Channel Follow V1 Event
    ChannelFollowV1(NotificationPayload<channel::ChannelFollowV1>),
    /// Channel Subscribe V1 Event
    ChannelSubscribeV1(NotificationPayload<channel::ChannelSubscribeV1>),
    /// Channel Cheer V1 Event
    ChannelCheerV1(NotificationPayload<channel::ChannelCheerV1>),
    /// Channel Ban V1 Event
    ChannelBanV1(NotificationPayload<channel::ChannelBanV1>),
    /// Channel Unban V1 Event
    ChannelUnbanV1(NotificationPayload<channel::ChannelUnbanV1>),
    /// Channel Points Custom Reward Add V1 Event
    ChannelPointsCustomRewardAddV1(NotificationPayload<channel::ChannelPointsCustomRewardAddV1>),
    /// Channel Points Custom Reward Update V1 Event
    ChannelPointsCustomRewardUpdateV1(
        NotificationPayload<channel::ChannelPointsCustomRewardUpdateV1>,
    ),
    /// Channel Points Custom Reward Remove V1 Event
    ChannelPointsCustomRewardRemoveV1(
        NotificationPayload<channel::ChannelPointsCustomRewardRemoveV1>,
    ),
    /// Channel Points Custom Reward Redemption Add V1 Event
    ChannelPointsCustomRewardRedemptionAddV1(
        NotificationPayload<channel::ChannelPointsCustomRewardRedemptionAddV1>,
    ),
    /// Channel Points Custom Reward Redemption Update V1 Event
    ChannelPointsCustomRewardRedemptionUpdateV1(
        NotificationPayload<channel::ChannelPointsCustomRewardRedemptionUpdateV1>,
    ),
    /// Channel Hype Train Begin V1 Event
    ChannelHypeTrainBeginV1(NotificationPayload<channel::ChannelHypeTrainBeginV1>),
    /// Channel Hype Train Progress V1 Event
    ChannelHypeTrainProgressV1(NotificationPayload<channel::ChannelHypeTrainProgressV1>),
    /// Channel Hype Train End V1 Event
    ChannelHypeTrainEndV1(NotificationPayload<channel::ChannelHypeTrainEndV1>),
    /// StreamOnline V1 Event
    StreamOnlineV1(NotificationPayload<stream::StreamOnlineV1>),
    /// StreamOffline V1 Event
    StreamOfflineV1(NotificationPayload<stream::StreamOfflineV1>),
    /// User Update V1 Event
    UserUpdateV1(NotificationPayload<user::UserUpdateV1>),
    /// User Authorization Revoke V1 Event
    UserAuthorizationRevokeV1(NotificationPayload<user::UserAuthorizationRevokeV1>),
}

impl Payload {
    /// Parse string slice as a [Payload]
    pub fn parse(source: &str) -> Result<Payload, PayloadParseError> {
        serde_json::from_str(source).map_err(Into::into)
    }

    // FIXME: Should not throwaway headers etc
    /// Parse http response as a [Payload].
    pub fn parse_response(source: &http::Response<Vec<u8>>) -> Result<Payload, PayloadParseError> {
        Payload::parse(std::str::from_utf8(source.body())?)
    }
}

/// Errors that can happen when parsing payload
#[derive(thiserror::Error, displaydoc::Display, Debug)]
pub enum PayloadParseError {
    /// could not parse [`http::Request::body()`] as UTF8
    Utf8Error(#[from] std::str::Utf8Error),
    /// could not parse [`http::Request::body()`] as a [`Payload`]
    DeserializeError(#[from] serde_json::Error),
}

impl<'de> Deserialize<'de> for Payload {
    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
        use std::convert::TryInto;
        macro_rules! match_event {
            ($response:expr; $($module:ident::$event:ident);* $(;)?) => {
                #[deny(unreachable_patterns)]
                match (&*$response.s.version, &$response.s.type_) {
                    $(  (<$module::$event as EventSubscription>::VERSION, &<$module::$event as EventSubscription>::EVENT_TYPE) => {
                        Payload::$event(NotificationPayload {
                            subscription: $response.s.try_into().map_err(serde::de::Error::custom)?,
                            event: serde_json::from_value($response.e).map_err(serde::de::Error::custom)?,
                        })
                    }  )*
                    (v, e) => return Err(serde::de::Error::custom(format!("could not find a match for version `{}` on event type `{}`", v, e)))
                }
            }
        }

        #[derive(Deserialize, Clone)]
        #[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
        struct IEventSubscripionInformation {
            condition: serde_json::Value,
            created_at: types::Timestamp,
            id: String,
            transport: TransportResponse,
            #[serde(rename = "type")]
            type_: EventType,
            version: String,
        }
        #[derive(Deserialize)]
        #[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
        struct IResponse {
            #[serde(rename = "subscription")]
            s: IEventSubscripionInformation,
            #[serde(rename = "event", alias = "challenge")]
            e: serde_json::Value,
        }

        impl<E: EventSubscription> std::convert::TryFrom<IEventSubscripionInformation>
            for EventSubscriptionInformation<E>
        {
            type Error = serde_json::Error;

            fn try_from(info: IEventSubscripionInformation) -> Result<Self, Self::Error> {
                debug_assert_eq!(info.version, E::VERSION);
                debug_assert_eq!(info.type_, E::EVENT_TYPE);
                Ok(EventSubscriptionInformation {
                    id: info.id,
                    condition: serde_json::from_value(info.condition)?,
                    created_at: info.created_at,
                    transport: info.transport,
                })
            }
        }
        #[derive(Deserialize)]
        #[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
        #[serde(untagged)]
        enum IIResponse {
            VerificationRequest(VerificationRequest),
            IResponse(IResponse),
        }

        let response = IIResponse::deserialize(deserializer).map_err(|e| {
            serde::de::Error::custom(format!("could not deserialize response: {}", e))
        })?;
        match response {
            IIResponse::VerificationRequest(verification) => {
                Ok(Payload::VerificationRequest(verification))
            }
            IIResponse::IResponse(response) => Ok(match_event! { response;
                channel::ChannelUpdateV1;
                channel::ChannelFollowV1;
                channel::ChannelSubscribeV1;
                channel::ChannelCheerV1;
                channel::ChannelBanV1;
                channel::ChannelUnbanV1;
                channel::ChannelPointsCustomRewardAddV1;
                channel::ChannelPointsCustomRewardUpdateV1;
                channel::ChannelPointsCustomRewardRemoveV1;
                channel::ChannelPointsCustomRewardRedemptionAddV1;
                channel::ChannelPointsCustomRewardRedemptionUpdateV1;
                channel::ChannelHypeTrainBeginV1;
                channel::ChannelHypeTrainProgressV1;
                channel::ChannelHypeTrainEndV1;
                stream::StreamOnlineV1;
                stream::StreamOfflineV1;
                user::UserUpdateV1;
                user::UserAuthorizationRevokeV1;
            }),
        }
    }
}

/// Notification received
#[derive(Debug, PartialEq, Serialize, Deserialize)] // FIXME: Clone ?
#[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
#[non_exhaustive]
pub struct NotificationPayload<E: EventSubscription> {
    /// Subscription information.
    #[serde(bound = "E: EventSubscription")]
    pub subscription: EventSubscriptionInformation<E>,
    /// Event information.
    #[serde(bound = "E: EventSubscription")]
    pub event: <E as EventSubscription>::Payload,
}

/// Metadata about the subscription.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
#[non_exhaustive]
pub struct EventSubscriptionInformation<E: EventSubscription> {
    /// Your client ID.
    pub id: String,
    /// Subscription-specific parameters.
    #[serde(bound = "E: EventSubscription")]
    pub condition: E,
    /// The time the notification was created.
    pub created_at: types::Timestamp,
    /// Transport method
    pub transport: TransportResponse,
}

/// Transport setting for event notification
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
#[non_exhaustive]
pub struct Transport {
    /// Method for transport
    pub method: TransportMethod,
    /// Callback
    pub callback: String,
    /// Secret attached to the subscription.
    ///
    /// # Notes
    ///
    /// Secret must be between 10 and 100 characters
    pub secret: String,
}

/// Transport response on event notification
///
/// Does not include secret.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
#[non_exhaustive]
pub struct TransportResponse {
    /// Method for transport
    pub method: TransportMethod,
    /// Callback
    pub callback: String,
}

/// Transport method
///
/// Currently, only webhooks are supported
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[serde(rename_all = "lowercase")]
pub enum TransportMethod {
    /// Webhook
    Webhook,
}

/// Event name
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(not(feature = "allow_unknown_fields"), serde(deny_unknown_fields))]
#[non_exhaustive]
pub enum EventType {
    /// `channel.update` subscription type sends notifications when a broadcaster updates the category, title, mature flag, or broadcast language for their channel.
    #[serde(rename = "channel.update")]
    ChannelUpdate,
    /// `channel.follow`: a specified channel receives a follow.
    #[serde(rename = "channel.follow")]
    ChannelFollow,
    /// `channel.subscribe`: a specified channel receives a subscriber. This does not include resubscribes.
    #[serde(rename = "channel.subscribe")]
    ChannelSubscribe,
    /// `channel.cheer`: a user cheers on the specified channel.
    #[serde(rename = "channel.cheer")]
    ChannelCheer,
    /// `channel.ban`: a viewer is banned from the specified channel.
    #[serde(rename = "channel.ban")]
    ChannelBan,
    /// `channel.unban`: a viewer is unbanned from the specified channel.
    #[serde(rename = "channel.unban")]
    ChannelUnban,
    /// `channel.channel_points_custom_reward.add`: a custom channel points reward has been created for the specified channel.
    #[serde(rename = "channel.channel_points_custom_reward.add")]
    ChannelPointsCustomRewardAdd,
    /// `channel.channel_points_custom_reward.update`: a custom channel points reward has been updated for the specified channel.
    #[serde(rename = "channel.channel_points_custom_reward.update")]
    ChannelPointsCustomRewardUpdate,
    /// `channel.channel_points_custom_reward.remove`: a custom channel points reward has been removed from the specified channel.
    #[serde(rename = "channel.channel_points_custom_reward.remove")]
    ChannelPointsCustomRewardRemove,
    /// `channel.channel_points_custom_reward_redemption.add`: a viewer has redeemed a custom channel points reward on the specified channel.
    #[serde(rename = "channel.channel_points_custom_reward_redemption.add")]
    ChannelPointsCustomRewardRedemptionAdd,
    /// `channel.channel_points_custom_reward_redemption.update`: a redemption of a channel points custom reward has been updated for the specified channel.
    #[serde(rename = "channel.channel_points_custom_reward_redemption.update")]
    ChannelPointsCustomRewardRedemptionUpdate,
    /// `channel.hype_train.begin`: a hype train begins on the specified channel.
    #[serde(rename = "channel.hype_train.begin")]
    ChannelHypeTrainBegin,
    /// `channel.hype_train.progress`: a hype train makes progress on the specified channel.
    #[serde(rename = "channel.hype_train.progress")]
    ChannelHypeTrainProgress,
    /// `channel.hype_train.end`: a hype train ends on the specified channel.
    #[serde(rename = "channel.hype_train.end")]
    ChannelHypeTrainEnd,
    /// `stream.online`: the specified broadcaster starts a stream.
    #[serde(rename = "stream.online")]
    StreamOnline,
    /// `stream.online`: the specified broadcaster stops a stream.
    #[serde(rename = "stream.offline")]
    StreamOffline,
    /// `user.update`: user updates their account.
    #[serde(rename = "user.update")]
    UserUpdate,
    /// `user.authorization.revoke`: a user has revoked authorization for your client id. Use this webhook to meet government requirements for handling user data, such as GDPR, LGPD, or CCPA.
    #[serde(rename = "user.authorization.revoke")]
    UserAuthorizationRevoke,
}

impl std::fmt::Display for EventType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.serialize(f) }
}

///  Subscription request status
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
#[serde(rename_all = "snake_case")] // FIXME: Most examples use kebab-case... but reality seems to be snake_case
pub enum Status {
    /// Designates that the subscription is in an operable state and is valid.
    Enabled,
    /// Webhook is pending verification of the callback specified in the subscription creation request.
    WebhookCallbackVerificationPending,
    /// Webhook failed verification of the callback specified in the subscription creation request.
    WebhookCallbackVerificationFailed,
    /// Notification delivery failure rate was too high.
    NotificationFailuresExceeded,
    /// Authorization for user(s) in the condition was revoked.
    AuthorizationRevoked,
    /// A user in the condition of the subscription was removed.
    UserRemoved,
}

/// General information about an EventSub subscription.
#[derive(PartialEq, Deserialize, Serialize, Debug, Clone)]
#[non_exhaustive]
#[cfg(feature = "eventsub")]
#[cfg_attr(nightly, doc(cfg(feature = "eventsub")))]
pub struct EventSubSubscription {
    /// JSON object specifying custom parameters for the subscription.
    // FIXME: Should be [eventsub::Condition]
    pub condition: serde_json::Value,
    /// RFC3339 timestamp indicating when the subscription was created.
    pub created_at: types::Timestamp,
    /// ID of the subscription.
    pub id: types::EventSubId,
    /// Status of the subscription.
    pub status: Status,
    /// Notification delivery specific information. Includes the transport method and callback URL.
    pub transport: TransportResponse,
    /// The category of the subscription.
    #[serde(rename = "type")]
    pub type_: String,
    /// The version of the subscription.
    pub version: String,
}

#[test]
fn test_verification_response() {
    use http::header::{HeaderMap, HeaderName, HeaderValue};

    #[rustfmt::skip]
    let _headers: HeaderMap = vec![
        ("Twitch-Eventsub-Message-Id","e76c6bd4-55c9-4987-8304-da1588d8988b"),
        ("Twitch-Eventsub-Message-Retry", "0"),
        ("Twitch-Eventsub-Message-Type", "webhook_callback_verification"),
        ("Twitch-Eventsub-Message-Signature","sha256=f56bf6ce06a1adf46fa27831d7d15d"),
        ("Twitch-Eventsub-Message-Timestamp","2019-11-16T10:11:12.123Z"),
        ("Twitch-Eventsub-Subscription-Type", "channel.follow"),
        ("Twitch-Eventsub-Subscription-Version", "1"),

    ].into_iter()
        .map(|(h, v)| {
            (
                h.parse::<HeaderName>().unwrap(),
                v.parse::<HeaderValue>().unwrap(),
            )
        })
        .collect();

    let body = r#"
    {
        "challenge": "pogchamp-kappa-360noscope-vohiyo",
        "subscription": {
            "id": "f1c2a387-161a-49f9-a165-0f21d7a4e1c4",
            "status": "webhook_callback_verification_pending",
            "type": "channel.follow",
            "version": "1",
            "condition": {
                    "broadcaster_user_id": "12826"
            },
            "transport": {
                "method": "webhook",
                "callback": "https://example.com/webhooks/callback"
            },
            "created_at": "2019-11-16T10:11:12.123Z"
        }
    }"#;

    dbg!(crate::eventsub::Payload::parse(&body).unwrap());
}