twitch_tohell 0.1.1

Twitch EventSub webhook and WebSocket support
Documentation
use crate::{SubscriptionType, websocket::MessageType};

#[derive(Debug, Clone, Copy)]
pub struct Scanner {
    metadata: liver_shot::Span,
    payload: liver_shot::Span,
    pub message_type: MessageType,
    pub subscription_type: Option<SubscriptionType>,
}

impl Scanner {
    pub fn new(data: &str) -> Result<Self, ScanError> {
        let metadata = liver_shot::find("metadata", data)?;
        let payload = liver_shot::find("payload", data)?;

        let mt_span = metadata.find("message_type", data)?;
        let message_type: MessageType = data[mt_span.start + 1..mt_span.end - 1]
            .parse()
            .map_err(ScanError::parse)?;

        let subscription_type = match metadata.find("subscription_type", data) {
            Ok(span) => {
                let s = &data[span.start + 1..span.end - 1];
                Some(s.parse::<SubscriptionType>().map_err(ScanError::parse)?)
            }
            Err(e) if e.is_not_found() => None,
            Err(e) => return Err(e.into()),
        };

        Ok(Self {
            metadata,
            payload,
            message_type,
            subscription_type,
        })
    }

    #[inline]
    pub fn is_notification(&self) -> bool {
        matches!(self.message_type, MessageType::Notification)
    }

    #[inline]
    pub fn is_welcome(&self) -> bool {
        matches!(self.message_type, MessageType::SessionWelcome)
    }

    #[inline]
    pub fn is_keepalive(&self) -> bool {
        matches!(self.message_type, MessageType::SessionKeepalive)
    }

    #[inline]
    pub fn is_reconnect(&self) -> bool {
        matches!(self.message_type, MessageType::SessionReconnect)
    }

    #[inline]
    pub fn is_revocation(&self) -> bool {
        matches!(self.message_type, MessageType::Revocation)
    }

    #[inline]
    pub fn get_metadata<'a>(&self, data: &'a str) -> &'a str {
        self.metadata.get(data)
    }

    #[inline]
    pub fn get_payload<'a>(&self, data: &'a str) -> &'a str {
        self.payload.get(data)
    }

    #[inline]
    pub fn get_session<'a>(&self, data: &'a str) -> Result<&'a str, ScanError> {
        self.find_in_payload(data, "session")
    }

    #[inline]
    pub fn get_subscription<'a>(&self, data: &'a str) -> Result<&'a str, ScanError> {
        self.find_in_payload(data, "subscription")
    }

    #[inline]
    pub fn get_event<'a>(&self, data: &'a str) -> Result<&'a str, ScanError> {
        self.find_in_payload(data, "event")
    }

    #[inline]
    pub fn get_reconnect_url<'a>(&self, data: &'a str) -> Result<&'a str, ScanError> {
        let session = liver_shot::find("session", data)?;
        find_str(data, &session, "reconnect_url")
    }

    fn find_in_payload<'a>(&self, data: &'a str, field_name: &str) -> Result<&'a str, ScanError> {
        Ok(self.payload.find(field_name, data)?.get(data))
    }
}

fn find_str<'a>(
    data: &'a str,
    span: &liver_shot::Span,
    field_name: &str,
) -> Result<&'a str, ScanError> {
    let value_span = span.find(field_name, data)?;
    Ok(&data[value_span.start + 1..value_span.end - 1])
}

#[derive(Debug, thiserror::Error)]
pub enum ScanError {
    #[error("{0}")]
    Parse(String),
    #[error(transparent)]
    Json(#[from] liver_shot::Error),
}

impl ScanError {
    pub fn parse(error: impl ToString) -> Self {
        Self::Parse(error.to_string())
    }
}

#[cfg(test)]
mod tests {
    use crate::{
        Subscription, SubscriptionType,
        websocket::{MessageType, MetaData, Scanner, Session},
    };

    #[test]
    fn welcome() {
        let data = r#"
{
  "metadata": {
    "message_id": "96a3f3b5-5dec-4eed-908e-e11ee657416c",
    "message_type": "session_welcome",
    "message_timestamp": "2023-07-19T14:56:51.634234626Z"
  },
  "payload": {
    "session": {
      "id": "AQoQILE98gtqShGmLD7AM6yJThAB",
      "status": "connected",
      "connected_at": "2023-07-19T14:56:51.616329898Z",
      "keepalive_timeout_seconds": 10,
      "reconnect_url": null
    }
  }
}
"#;

        let scanner = Scanner::new(data).unwrap();

        assert_eq!(scanner.message_type, MessageType::SessionWelcome);
        assert_eq!(scanner.subscription_type, None);

        let _: MetaData = serde_json::from_str(scanner.get_metadata(data)).unwrap();
        let _: Session = serde_json::from_str(scanner.get_session(data).unwrap()).unwrap();
    }

    #[test]
    fn keepalive() {
        let data = r#"
{
    "metadata": {
        "message_id": "84c1e79a-2a4b-4c13-ba0b-4312293e9308",
        "message_type": "session_keepalive",
        "message_timestamp": "2023-07-19T10:11:12.634234626Z"
    },
    "payload": {}
}
"#;

        let scanner = Scanner::new(data).unwrap();

        assert_eq!(scanner.message_type, MessageType::SessionKeepalive);
        assert_eq!(scanner.subscription_type, None);

        let _: MetaData = serde_json::from_str(scanner.get_metadata(data)).unwrap();
    }

    #[test]
    fn notification() {
        let data = r#"
{
    "metadata": {
        "message_id": "befa7b53-d79d-478f-86b9-120f112b044e",
        "message_type": "notification",
        "message_timestamp": "2022-11-16T10:11:12.464757833Z",
        "subscription_type": "channel.follow",
        "subscription_version": "1"
    },
    "payload": {
        "subscription": {
            "id": "f1c2a387-161a-49f9-a165-0f21d7a4e1c4",
            "status": "enabled",
            "type": "channel.follow",
            "version": "1",
            "cost": 1,
            "condition": {
                "broadcaster_user_id": "12826"
            },
            "transport": {
                "method": "websocket",
                "session_id": "AQoQexAWVYKSTIu4ec_2VAxyuhAB"
            },
            "created_at": "2022-11-16T10:11:12.464757833Z"
        },
        "event": {
            "user_id": "1337",
            "user_login": "awesome_user",
            "user_name": "Awesome_User",
            "broadcaster_user_id": "12826",
            "broadcaster_user_login": "twitch",
            "broadcaster_user_name": "Twitch",
            "followed_at": "2023-07-15T18:16:11.17106713Z"
        }
    }
}
"#;

        let scanner = Scanner::new(data).unwrap();

        assert_eq!(scanner.message_type, MessageType::Notification);
        assert_eq!(
            scanner.subscription_type,
            Some(SubscriptionType::ChannelFollow)
        );

        let _: MetaData = serde_json::from_str(scanner.get_metadata(data)).unwrap();
        let _: Subscription =
            serde_json::from_str(scanner.get_subscription(data).unwrap()).unwrap();
    }

    #[test]
    fn reconnect() {
        let data = r#"
{
    "metadata": {
        "message_id": "84c1e79a-2a4b-4c13-ba0b-4312293e9308",
        "message_type": "session_reconnect",
        "message_timestamp": "2022-11-18T09:10:11.634234626Z"
    },
    "payload": {
        "session": {
           "id": "AQoQexAWVYKSTIu4ec_2VAxyuhAB",
           "status": "reconnecting",
           "keepalive_timeout_seconds": null,
           "reconnect_url": "wss://eventsub.wss.twitch.tv?...",
           "connected_at": "2022-11-16T10:11:12.634234626Z"
        }
    }
}
"#;

        let scanner = Scanner::new(data).unwrap();

        assert_eq!(scanner.message_type, MessageType::SessionReconnect);
        assert_eq!(scanner.subscription_type, None);

        let _: MetaData = serde_json::from_str(scanner.get_metadata(data)).unwrap();
        let _: Session = serde_json::from_str(scanner.get_session(data).unwrap()).unwrap();
    }

    #[test]
    fn revocation() {
        let data = r#"
{

    "metadata": {
        "message_id": "84c1e79a-2a4b-4c13-ba0b-4312293e9308",
        "message_type": "revocation",
        "message_timestamp": "2022-11-16T10:11:12.464757833Z",
        "subscription_type": "channel.follow",
        "subscription_version": "1"
    },
    "payload": {
        "subscription": {
            "id": "f1c2a387-161a-49f9-a165-0f21d7a4e1c4",
            "status": "authorization_revoked",
            "type": "channel.follow",
            "version": "1",
            "cost": 1,
            "condition": {
                "broadcaster_user_id": "12826"
            },
            "transport": {
                "method": "websocket",
                "session_id": "AQoQexAWVYKSTIu4ec_2VAxyuhAB"
            },
            "created_at": "2022-11-16T10:11:12.464757833Z"
        }
    }
}
"#;
        let scanner = Scanner::new(data).unwrap();

        assert_eq!(scanner.message_type, MessageType::Revocation);
        assert_eq!(
            scanner.subscription_type,
            Some(SubscriptionType::ChannelFollow)
        );

        let _: MetaData = serde_json::from_str(scanner.get_metadata(data)).unwrap();
        let _: Subscription =
            serde_json::from_str(scanner.get_subscription(data).unwrap()).unwrap();
    }
}