1use futures_util::{SinkExt, StreamExt};
9use native_tls::TlsConnector;
10use serde::{Deserialize, Deserializer};
11use serde_json::Value;
12use tokio::sync::mpsc;
13use tokio::task::AbortHandle;
14use tokio_tungstenite::{
15 connect_async_tls_with_config,
16 tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
17 Connector,
18};
19
20use crate::{auth::Credentials, error::LcuError};
21
22#[derive(Debug, Clone)]
26pub struct LcuEvent {
27 pub uri: String,
29 pub event_type: EventType,
31 pub data: Value,
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum EventType {
42 Create,
44 Update,
46 Delete,
48 Other(String),
51}
52
53impl<'de> Deserialize<'de> for EventType {
54 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
55 let s = String::deserialize(deserializer)?;
56 Ok(match s.as_str() {
57 "Create" => EventType::Create,
58 "Update" => EventType::Update,
59 "Delete" => EventType::Delete,
60 _ => EventType::Other(s),
61 })
62 }
63}
64
65pub struct EventStream {
71 rx: mpsc::Receiver<LcuEvent>,
72 abort: AbortHandle,
73}
74
75impl EventStream {
76 pub async fn recv(&mut self) -> Option<LcuEvent> {
78 self.rx.recv().await
79 }
80
81 pub fn close(self) {
83 }
85}
86
87impl Drop for EventStream {
88 fn drop(&mut self) {
89 self.abort.abort();
90 }
91}
92
93#[derive(Debug, Deserialize)]
96struct WampPayload {
97 uri: String,
98 data: Value,
99 #[serde(rename = "eventType")]
100 event_type: EventType,
101}
102
103pub async fn connect(credentials: &Credentials, buffer: usize) -> Result<EventStream, LcuError> {
127 connect_with_topics(credentials, &["OnJsonApiEvent".to_string()], buffer).await
128}
129
130pub async fn connect_filtered(
147 credentials: &Credentials,
148 uris: &[&str],
149 buffer: usize,
150) -> Result<EventStream, LcuError> {
151 let topics: Vec<String> = uris
152 .iter()
153 .map(|u| format!("OnJsonApiEvent_{}", u.trim_start_matches('/').replace('/', "_")))
154 .collect();
155 connect_with_topics(credentials, &topics, buffer).await
156}
157
158async fn connect_with_topics(
159 credentials: &Credentials,
160 topics: &[String],
161 buffer: usize,
162) -> Result<EventStream, LcuError> {
163 let tls = TlsConnector::builder()
164 .danger_accept_invalid_certs(true)
165 .danger_accept_invalid_hostnames(true)
166 .build()?;
167
168 let mut request = credentials.lcu_ws_url().into_client_request()?;
169 request.headers_mut().insert(
170 "Authorization",
171 HeaderValue::from_str(&credentials.basic_auth())
172 .map_err(|e| LcuError::InvalidHeader(e.to_string()))?,
173 );
174
175 let (mut ws_stream, _response) = connect_async_tls_with_config(
176 request,
177 None,
178 false,
179 Some(Connector::NativeTls(tls)),
180 )
181 .await?;
182
183 for topic in topics {
184 ws_stream
185 .send(Message::Text(
186 serde_json::json!([5, topic]).to_string().into(),
187 ))
188 .await?;
189 }
190
191 let (tx, rx) = mpsc::channel::<LcuEvent>(buffer);
192 let handle = tokio::spawn(async move {
193 while let Some(msg) = ws_stream.next().await {
194 let text = match msg {
195 Ok(Message::Text(t)) => t,
196 Ok(Message::Close(_)) | Err(_) => break,
197 _ => continue,
198 };
199 if let Some(event) = parse_wamp_event(&text) {
200 if tx.send(event).await.is_err() {
201 break; }
203 }
204 }
206 });
207
208 Ok(EventStream {
209 rx,
210 abort: handle.abort_handle(),
211 })
212}
213
214fn parse_wamp_event(text: &str) -> Option<LcuEvent> {
221 let arr: Vec<Value> = serde_json::from_str(text).ok()?;
222 if arr.len() < 3 || arr[0].as_u64() != Some(8) {
223 return None;
224 }
225 let payload: WampPayload = serde_json::from_value(arr.into_iter().nth(2)?).ok()?;
226 Some(LcuEvent {
227 uri: payload.uri,
228 event_type: payload.event_type,
229 data: payload.data,
230 })
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 #[test]
238 fn parse_valid_update_event() {
239 let raw = r#"[8,"OnJsonApiEvent",{"uri":"/lol-gameflow/v1/session","eventType":"Update","data":{"phase":"Lobby"}}]"#;
240 let event = parse_wamp_event(raw).unwrap();
241 assert_eq!(event.uri, "/lol-gameflow/v1/session");
242 assert_eq!(event.event_type, EventType::Update);
243 assert_eq!(event.data["phase"], "Lobby");
244 }
245
246 #[test]
247 fn parse_delete_event() {
248 let raw = r#"[8,"OnJsonApiEvent",{"uri":"/lol-lobby/v2/lobby","eventType":"Delete","data":null}]"#;
249 let event = parse_wamp_event(raw).unwrap();
250 assert_eq!(event.event_type, EventType::Delete);
251 }
252
253 #[test]
254 fn unknown_event_type_preserves_name() {
255 let raw = r#"[8,"OnJsonApiEvent",{"uri":"/x","eventType":"SomeFutureType","data":null}]"#;
256 let event = parse_wamp_event(raw).unwrap();
257 assert_eq!(event.event_type, EventType::Other("SomeFutureType".into()));
258 }
259
260 #[test]
261 fn ignores_non_event_frames() {
262 assert!(parse_wamp_event(r#"[5,"OnJsonApiEvent"]"#).is_none());
264 assert!(parse_wamp_event("not json").is_none());
265 assert!(parse_wamp_event("[]").is_none());
266 }
267}