Skip to main content

league_link/
websocket.rs

1//! WebSocket client for the LCU event stream.
2//!
3//! The LCU pushes a WAMP message for every API state change. [`connect`]
4//! subscribes to **all** JSON API events; [`connect_filtered`] subscribes
5//! only to the URIs you name. Both forward events through a
6//! `tokio::sync::mpsc` channel wrapped in an [`EventStream`].
7
8use futures_util::{SinkExt, StreamExt};
9use native_tls::TlsConnector;
10use serde::{Deserialize, Deserializer};
11use serde_json::Value;
12use tokio::sync::mpsc;
13use tokio::task::AbortHandle;
14use tokio_tungstenite::{
15    connect_async_tls_with_config,
16    tungstenite::{client::IntoClientRequest, http::HeaderValue, Message},
17    Connector,
18};
19
20use crate::{auth::Credentials, error::LcuError};
21
22// ─── Public types ────────────────────────────────────────────
23
24/// A single LCU WebSocket event.
25#[derive(Debug, Clone)]
26pub struct LcuEvent {
27    /// The REST endpoint whose state changed, e.g. `/lol-gameflow/v1/session`.
28    pub uri: String,
29    /// Whether the resource was created, updated, or deleted.
30    pub event_type: EventType,
31    /// The new state as arbitrary JSON.
32    pub data: Value,
33}
34
35/// Event type from the LCU WAMP payload.
36///
37/// Unknown event names are preserved in [`EventType::Other`] rather than
38/// collapsed to a single `Unknown` variant — useful if Riot introduces
39/// new event types after this crate was published.
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub enum EventType {
42    /// A new resource was created at the event's URI.
43    Create,
44    /// An existing resource's state changed.
45    Update,
46    /// The resource at the URI was removed.
47    Delete,
48    /// An event name this crate does not recognise; the inner string is
49    /// the raw payload value verbatim.
50    Other(String),
51}
52
53impl<'de> Deserialize<'de> for EventType {
54    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
55        let s = String::deserialize(deserializer)?;
56        Ok(match s.as_str() {
57            "Create" => EventType::Create,
58            "Update" => EventType::Update,
59            "Delete" => EventType::Delete,
60            _ => EventType::Other(s),
61        })
62    }
63}
64
65/// Handle to a live LCU event stream.
66///
67/// Dropping the stream aborts the background WebSocket task. Call
68/// [`EventStream::recv`] to pull the next event, or [`EventStream::close`]
69/// for an explicit graceful shutdown.
70pub struct EventStream {
71    rx: mpsc::Receiver<LcuEvent>,
72    abort: AbortHandle,
73}
74
75impl EventStream {
76    /// Wait for the next event, or `None` once the connection is closed.
77    pub async fn recv(&mut self) -> Option<LcuEvent> {
78        self.rx.recv().await
79    }
80
81    /// Abort the background task eagerly. Equivalent to dropping the stream.
82    pub fn close(self) {
83        // `Drop` does the aborting; this just makes intent explicit.
84    }
85}
86
87impl Drop for EventStream {
88    fn drop(&mut self) {
89        self.abort.abort();
90    }
91}
92
93// ─── Internal WAMP deserialization ───────────────────────────
94
95#[derive(Debug, Deserialize)]
96struct WampPayload {
97    uri: String,
98    data: Value,
99    #[serde(rename = "eventType")]
100    event_type: EventType,
101}
102
103// ─── Core ────────────────────────────────────────────────────
104
105/// Connect to the LCU WebSocket and subscribe to **all** JSON API events.
106///
107/// # Design — channel instead of callbacks
108///
109/// The original [`league-connect`][lc] JS library dispatches events through
110/// a `Map<uri, callback[]>`. This port uses a `tokio::sync::mpsc` channel
111/// and returns an [`EventStream`]:
112///
113/// ```text
114/// tokio task (owns WebSocket)
115///     └─ tx.send(event) ──channel──► caller: stream.recv().await
116/// ```
117///
118/// Dropping the stream aborts the task — there is no extra bookkeeping.
119///
120/// [lc]: https://github.com/junlarsen/league-connect
121///
122/// # Arguments
123///
124/// - `credentials` — obtained via [`authenticate`][crate::authenticate]
125/// - `buffer` — mpsc channel capacity; 64–256 is a reasonable default
126pub async fn connect(credentials: &Credentials, buffer: usize) -> Result<EventStream, LcuError> {
127    connect_with_topics(credentials, &["OnJsonApiEvent".to_string()], buffer).await
128}
129
130/// Connect and subscribe only to specific LCU URIs.
131///
132/// Each URI is translated to a per-path WAMP topic
133/// (`OnJsonApiEvent_lol-gameflow_v1_session`), so the server only sends
134/// events you actually asked for — saving bandwidth on high-churn endpoints
135/// like `/lol-chat/v1/conversations/*`.
136///
137/// ```no_run
138/// # async fn demo(creds: &league_link::Credentials) -> Result<(), league_link::LcuError> {
139/// let mut stream = league_link::connect_filtered(
140///     creds,
141///     &["/lol-gameflow/v1/session", "/lol-lobby/v2/lobby"],
142///     64,
143/// ).await?;
144/// # Ok(()) }
145/// ```
146pub async fn connect_filtered(
147    credentials: &Credentials,
148    uris: &[&str],
149    buffer: usize,
150) -> Result<EventStream, LcuError> {
151    let topics: Vec<String> = uris
152        .iter()
153        .map(|u| format!("OnJsonApiEvent_{}", u.trim_start_matches('/').replace('/', "_")))
154        .collect();
155    connect_with_topics(credentials, &topics, buffer).await
156}
157
158async fn connect_with_topics(
159    credentials: &Credentials,
160    topics: &[String],
161    buffer: usize,
162) -> Result<EventStream, LcuError> {
163    let tls = TlsConnector::builder()
164        .danger_accept_invalid_certs(true)
165        .danger_accept_invalid_hostnames(true)
166        .build()?;
167
168    let mut request = credentials.lcu_ws_url().into_client_request()?;
169    request.headers_mut().insert(
170        "Authorization",
171        HeaderValue::from_str(&credentials.basic_auth())
172            .map_err(|e| LcuError::InvalidHeader(e.to_string()))?,
173    );
174
175    let (mut ws_stream, _response) = connect_async_tls_with_config(
176        request,
177        None,
178        false,
179        Some(Connector::NativeTls(tls)),
180    )
181    .await?;
182
183    for topic in topics {
184        ws_stream
185            .send(Message::Text(
186                serde_json::json!([5, topic]).to_string().into(),
187            ))
188            .await?;
189    }
190
191    let (tx, rx) = mpsc::channel::<LcuEvent>(buffer);
192    let handle = tokio::spawn(async move {
193        while let Some(msg) = ws_stream.next().await {
194            let text = match msg {
195                Ok(Message::Text(t)) => t,
196                Ok(Message::Close(_)) | Err(_) => break,
197                _ => continue,
198            };
199            if let Some(event) = parse_wamp_event(&text) {
200                if tx.send(event).await.is_err() {
201                    break; // receiver dropped
202                }
203            }
204            // Unparseable frames (heartbeats, subscribe ACKs) are silently skipped.
205        }
206    });
207
208    Ok(EventStream {
209        rx,
210        abort: handle.abort_handle(),
211    })
212}
213
214// ─── Internal ────────────────────────────────────────────────
215
216/// Parse a raw WAMP text frame into an [`LcuEvent`].
217///
218/// LCU event format: `[8, "OnJsonApiEvent", { uri, data, eventType }]`
219/// (opcode 8 = WAMP EVENT).
220fn parse_wamp_event(text: &str) -> Option<LcuEvent> {
221    let arr: Vec<Value> = serde_json::from_str(text).ok()?;
222    if arr.len() < 3 || arr[0].as_u64() != Some(8) {
223        return None;
224    }
225    let payload: WampPayload = serde_json::from_value(arr.into_iter().nth(2)?).ok()?;
226    Some(LcuEvent {
227        uri: payload.uri,
228        event_type: payload.event_type,
229        data: payload.data,
230    })
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    #[test]
238    fn parse_valid_update_event() {
239        let raw = r#"[8,"OnJsonApiEvent",{"uri":"/lol-gameflow/v1/session","eventType":"Update","data":{"phase":"Lobby"}}]"#;
240        let event = parse_wamp_event(raw).unwrap();
241        assert_eq!(event.uri, "/lol-gameflow/v1/session");
242        assert_eq!(event.event_type, EventType::Update);
243        assert_eq!(event.data["phase"], "Lobby");
244    }
245
246    #[test]
247    fn parse_delete_event() {
248        let raw = r#"[8,"OnJsonApiEvent",{"uri":"/lol-lobby/v2/lobby","eventType":"Delete","data":null}]"#;
249        let event = parse_wamp_event(raw).unwrap();
250        assert_eq!(event.event_type, EventType::Delete);
251    }
252
253    #[test]
254    fn unknown_event_type_preserves_name() {
255        let raw = r#"[8,"OnJsonApiEvent",{"uri":"/x","eventType":"SomeFutureType","data":null}]"#;
256        let event = parse_wamp_event(raw).unwrap();
257        assert_eq!(event.event_type, EventType::Other("SomeFutureType".into()));
258    }
259
260    #[test]
261    fn ignores_non_event_frames() {
262        // opcode 5 = Subscribe ACK, not an event
263        assert!(parse_wamp_event(r#"[5,"OnJsonApiEvent"]"#).is_none());
264        assert!(parse_wamp_event("not json").is_none());
265        assert!(parse_wamp_event("[]").is_none());
266    }
267}