use futures_util::{SinkExt, StreamExt};
use native_tls::TlsConnector;
use serde::{Deserialize, Deserializer};
use serde_json::Value;
use tokio::sync::mpsc;
use tokio::task::AbortHandle;
use tokio_tungstenite::{
connect_async_tls_with_config,
tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
Connector,
};
use crate::{auth::Credentials, error::LcuError};
#[derive(Debug, Clone)]
pub struct LcuEvent {
pub uri: String,
pub event_type: EventType,
pub data: Value,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EventType {
Create,
Update,
Delete,
Other(String),
}
impl<'de> Deserialize<'de> for EventType {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
Ok(match s.as_str() {
"Create" => EventType::Create,
"Update" => EventType::Update,
"Delete" => EventType::Delete,
_ => EventType::Other(s),
})
}
}
pub struct EventStream {
rx: mpsc::Receiver<LcuEvent>,
abort: AbortHandle,
}
impl EventStream {
pub async fn recv(&mut self) -> Option<LcuEvent> {
self.rx.recv().await
}
pub fn close(self) {
}
}
impl Drop for EventStream {
fn drop(&mut self) {
self.abort.abort();
}
}
#[derive(Debug, Deserialize)]
struct WampPayload {
uri: String,
data: Value,
#[serde(rename = "eventType")]
event_type: EventType,
}
pub async fn connect(credentials: &Credentials, buffer: usize) -> Result<EventStream, LcuError> {
connect_with_topics(credentials, &["OnJsonApiEvent".to_string()], buffer).await
}
pub async fn connect_filtered(
credentials: &Credentials,
uris: &[&str],
buffer: usize,
) -> Result<EventStream, LcuError> {
let topics: Vec<String> = uris
.iter()
.map(|u| format!("OnJsonApiEvent_{}", u.trim_start_matches('/').replace('/', "_")))
.collect();
connect_with_topics(credentials, &topics, buffer).await
}
async fn connect_with_topics(
credentials: &Credentials,
topics: &[String],
buffer: usize,
) -> Result<EventStream, LcuError> {
let tls = TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()?;
let mut request = credentials.lcu_ws_url().into_client_request()?;
request.headers_mut().insert(
"Authorization",
HeaderValue::from_str(&credentials.basic_auth())
.map_err(|e| LcuError::InvalidHeader(e.to_string()))?,
);
let (mut ws_stream, _response) = connect_async_tls_with_config(
request,
None,
false,
Some(Connector::NativeTls(tls)),
)
.await?;
for topic in topics {
ws_stream
.send(Message::Text(
serde_json::json!([5, topic]).to_string().into(),
))
.await?;
}
let (tx, rx) = mpsc::channel::<LcuEvent>(buffer);
let handle = tokio::spawn(async move {
while let Some(msg) = ws_stream.next().await {
let text = match msg {
Ok(Message::Text(t)) => t,
Ok(Message::Close(_)) | Err(_) => break,
_ => continue,
};
if let Some(event) = parse_wamp_event(&text) {
if tx.send(event).await.is_err() {
break; }
}
}
});
Ok(EventStream {
rx,
abort: handle.abort_handle(),
})
}
fn parse_wamp_event(text: &str) -> Option<LcuEvent> {
let arr: Vec<Value> = serde_json::from_str(text).ok()?;
if arr.len() < 3 || arr[0].as_u64() != Some(8) {
return None;
}
let payload: WampPayload = serde_json::from_value(arr.into_iter().nth(2)?).ok()?;
Some(LcuEvent {
uri: payload.uri,
event_type: payload.event_type,
data: payload.data,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_valid_update_event() {
let raw = r#"[8,"OnJsonApiEvent",{"uri":"/lol-gameflow/v1/session","eventType":"Update","data":{"phase":"Lobby"}}]"#;
let event = parse_wamp_event(raw).unwrap();
assert_eq!(event.uri, "/lol-gameflow/v1/session");
assert_eq!(event.event_type, EventType::Update);
assert_eq!(event.data["phase"], "Lobby");
}
#[test]
fn parse_delete_event() {
let raw = r#"[8,"OnJsonApiEvent",{"uri":"/lol-lobby/v2/lobby","eventType":"Delete","data":null}]"#;
let event = parse_wamp_event(raw).unwrap();
assert_eq!(event.event_type, EventType::Delete);
}
#[test]
fn unknown_event_type_preserves_name() {
let raw = r#"[8,"OnJsonApiEvent",{"uri":"/x","eventType":"SomeFutureType","data":null}]"#;
let event = parse_wamp_event(raw).unwrap();
assert_eq!(event.event_type, EventType::Other("SomeFutureType".into()));
}
#[test]
fn ignores_non_event_frames() {
assert!(parse_wamp_event(r#"[5,"OnJsonApiEvent"]"#).is_none());
assert!(parse_wamp_event("not json").is_none());
assert!(parse_wamp_event("[]").is_none());
}
}