Skip to main content

chipzen_bot/
external.rs

1//! External-API remote-play entry point: [`run_external_bot`].
2//!
3//! Where [`crate::run_bot`] connects a bot to a *known* match URL (the
4//! containerized/upload path — the platform's executor hands the container
5//! its `/ws/match/{match_id}/{participant_id}` URL), this module implements
6//! the **external-API remote-play** path: a developer runs their bot on
7//! their own machine, authenticates with a long-lived `cz_extbot_` token,
8//! and the platform matches and dispatches them exactly like any other
9//! competitor.
10//!
11//! The flow (documented in `docs/EXTERNAL-API-BOT-PROTOCOL.md`):
12//!
13//! ```text
14//! lobby WS  /ws/external/bot/{bot_id}      (token in authenticate frame)
15//!     -> "matched" notify (carries match_id + gateway_ws_url)
16//!     -> per-match gateway WS  /ws/external/match/{mid}/{pid}
17//!                               (token in Sec-WebSocket-Protocol header)
18//!     -> two-layer bot handshake + game loop to match_end
19//! ```
20//!
21//! The **match data plane is identical** to the containerized path, so the
22//! game loop here reuses [`crate::_run_session`] verbatim — the only
23//! external-API-specific code is the lobby connection and the per-match
24//! gateway handshake. A developer writes ONE [`Bot`] and it works on both
25//! paths.
26//!
27//! The lobby is held open for the bot's whole session and each `matched`
28//! plays in its own task, so the lobby heartbeat is answered even while a
29//! multi-minute match is in flight. This is what lets a single connection
30//! serve a whole tournament (the bot is "checked in" via lobby presence and
31//! matched once per round). Match-task ownership is hoisted above the lobby
32//! session so a lobby reconnect doesn't kill in-flight matches, and a
33//! dropped gateway socket reconnects and resumes via the server's
34//! `reconnected` / `pending_request` frame.
35
36use crate::bot::Bot;
37use crate::client::{
38    _run_session, default_user_agent, MessageReader, MessageWriter, SessionContext,
39};
40use crate::config::{load_chipzen_config, resolve_token, ChipzenConfig};
41use crate::connect::{connect_to_chipzen, EnvName};
42use crate::error::Error;
43use crate::retry::RetryPolicy;
44use async_trait::async_trait;
45use serde_json::{json, Value};
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::Duration;
49use tokio::task::JoinSet;
50
51/// Sentinel subprotocol that marks the `cz_extbot_` token in the
52/// `Sec-WebSocket-Protocol` header (CZ issue 2932 moved the token off the
53/// query string, where it leaked into proxy access logs). Must match the
54/// value the platform's api gateway expects.
55pub const BOT_TOKEN_SUBPROTOCOL: &str = "chipzen-bot-token";
56
57/// How long the lobby loop blocks on a single `recv` before waking to
58/// re-check the stop signal (milliseconds). Short enough that a stop is
59/// honored promptly; long enough that the loop isn't a busy-wait. Stored in
60/// an atomic so tests can shrink it (mirrors the Python suite's monkeypatch
61/// of `_LOBBY_RECV_TIMEOUT_S`).
62static LOBBY_RECV_TIMEOUT_MS: AtomicU64 = AtomicU64::new(2_000);
63
64fn lobby_recv_timeout() -> Duration {
65    Duration::from_millis(LOBBY_RECV_TIMEOUT_MS.load(Ordering::Relaxed))
66}
67
68/// Override the lobby recv timeout. Hidden test hook — not part of the
69/// supported API.
70#[doc(hidden)]
71pub fn _set_lobby_recv_timeout_ms(ms: u64) {
72    LOBBY_RECV_TIMEOUT_MS.store(ms, Ordering::Relaxed);
73}
74
75/// On teardown, how long to let still-in-flight matches finish before
76/// cancelling them, so nothing is left orphaned.
77const MATCH_DRAIN_GRACE: Duration = Duration::from_secs(5);
78
79const DEFAULT_CLIENT_NAME: &str = "chipzen-sdk-rust";
80
81/// Build the `Sec-WebSocket-Protocol` offer that carries the bot token:
82/// `[sentinel, token]`. The sentinel marks "the next value is my bot
83/// token"; the api gateway extracts the token from this header (so it never
84/// appears in any access log / URL) and echoes the sentinel back on accept.
85pub fn bot_token_subprotocols(token: &str) -> Vec<String> {
86    vec![BOT_TOKEN_SUBPROTOCOL.to_string(), token.to_string()]
87}
88
89/// Strip any path/query from a URL, keeping scheme + host(:port) only.
90/// Tolerates a bare `host:port` without a scheme (defaults to `wss`).
91fn normalise_base(url: &str) -> String {
92    let (scheme, rest) = match url.split_once("://") {
93        Some((s, r)) => (s, r),
94        None => ("wss", url),
95    };
96    // Authority ends at the first '/', '?' or '#'.
97    let end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
98    let authority = &rest[..end];
99    format!("{scheme}://{authority}")
100}
101
102/// Split a ws/wss URL into `(scheme, authority)` where authority is
103/// `host[:port]`. Mirrors [`normalise_base`]'s parsing.
104fn split_origin(url: &str) -> (&str, &str) {
105    let (scheme, rest) = match url.split_once("://") {
106        Some((s, r)) => (s, r),
107        None => ("wss", url),
108    };
109    let end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
110    (scheme, &rest[..end])
111}
112
113/// Resolve the `matched.gateway_ws_url` path against the lobby origin.
114///
115/// The `matched` notification carries `gateway_ws_url` as a *path*
116/// (`/ws/external/match/{mid}/{pid}`). The `cz_extbot_` token is NOT on the
117/// query string — it travels in the `Sec-WebSocket-Protocol` header.
118///
119/// A server that returns a full URL is honored ONLY if it stays on the same
120/// origin as the lobby and does not downgrade `wss` → `ws` — otherwise the bot
121/// token would be sent to an attacker-/misconfig-supplied host (or in
122/// cleartext). A relative path is re-anchored to the lobby origin, so it's
123/// inherently same-origin.
124///
125/// # Errors
126/// Returns [`Error::UntrustedGateway`] if `gateway_ws_path` is an absolute URL
127/// on a different origin than the lobby, or downgrades a `wss` lobby to `ws`.
128pub fn resolve_gateway_url(lobby_url: &str, gateway_ws_path: &str) -> Result<String, Error> {
129    if gateway_ws_path.starts_with("ws://") || gateway_ws_path.starts_with("wss://") {
130        let lobby_base = normalise_base(lobby_url);
131        let (lobby_scheme, lobby_authority) = split_origin(&lobby_base);
132        let (gw_scheme, gw_authority) = split_origin(gateway_ws_path);
133        let downgrade = lobby_scheme == "wss" && gw_scheme != "wss";
134        if gw_authority != lobby_authority || downgrade {
135            return Err(Error::UntrustedGateway(format!(
136                "{gateway_ws_path:?}: cross-origin or insecure relative to lobby \
137                 {lobby_scheme}://{lobby_authority} (the bot token must not be sent to a \
138                 different host or in cleartext)"
139            )));
140        }
141        return Ok(gateway_ws_path.to_string());
142    }
143    Ok(format!("{}{}", normalise_base(lobby_url), gateway_ws_path))
144}
145
146/// Parse a WS frame into a JSON object (`{}` on non-object / bad JSON).
147fn loads(raw: &str) -> Value {
148    match serde_json::from_str::<Value>(raw) {
149        Ok(v @ Value::Object(_)) => v,
150        _ => json!({}),
151    }
152}
153
154/// One per-match result collected during a session.
155#[derive(Debug, Clone)]
156pub struct MatchResult {
157    /// The match id, when known (from the `match_end` payload).
158    pub match_id: Option<String>,
159    /// The full `match_end` payload on a clean finish, or `None` if the
160    /// match was abandoned after exhausting the reconnect budget.
161    pub end: Option<Value>,
162}
163
164/// Options for [`run_external_bot`]. Mirrors the Python kwargs.
165#[derive(Debug, Clone, Default)]
166pub struct RunExternalOptions {
167    /// External-API bot UUID. Used to build the lobby URL when `url` is not
168    /// given; falls back to `[external_api].bot_id` in `chipzen.toml`.
169    pub bot_id: Option<String>,
170    /// Target environment. `None` consults `$CHIPZEN_ENV` then defaults to
171    /// `prod`.
172    pub env: Option<EnvName>,
173    /// Explicit full lobby URL (`wss://.../ws/external/bot/{bot_id}`).
174    /// Overrides `bot_id` / `env` URL derivation when set.
175    pub url: Option<String>,
176    /// Long-lived `cz_extbot_` API token. Falls back to
177    /// `[external_api].token` in `chipzen.toml`. Required.
178    pub token: Option<String>,
179    /// Pre-loaded config, to avoid a second filesystem stat.
180    pub config: Option<ChipzenConfig>,
181    /// Reconnect-pacing policy for both lobby drops and mid-match gateway
182    /// drops. `None` uses [`RetryPolicy::default`].
183    pub retry_policy: Option<RetryPolicy>,
184    /// Sent in the per-match `hello` handshake. Defaults to
185    /// `chipzen-sdk-rust`.
186    pub client_name: Option<String>,
187    /// Sent in the per-match `hello` handshake. Defaults to the crate
188    /// version.
189    pub client_version: Option<String>,
190    /// When `true` (default), a panic in `decide()` is folded; `false`
191    /// propagates it as a terminal [`Error::BotDecision`].
192    pub safe_mode: Option<bool>,
193    /// Stop after this many matches complete. `None` runs until the lobby
194    /// closes / the bot is evicted. `Some(1)` plays a single challenge.
195    pub max_matches: Option<u64>,
196    /// Override the WS `User-Agent`. Defaults to `chipzen-sdk-rust/<version>`.
197    pub user_agent: Option<String>,
198}
199
200// ---------------------------------------------------------------------------
201// CLI helper — the Rust equivalent of `chipzen run-external <bot.py>`.
202// ---------------------------------------------------------------------------
203
204/// Parsed `run-external` flags, mirroring the Python CLI
205/// (`--env`/`--token`/`--bot-id`/`--max-matches`/`--no-safe-mode`).
206///
207/// Rust cannot dynamically load a bot from a file the way Python's
208/// `chipzen run-external my_bot.py` does — a Rust bot is compiled into a
209/// binary. The equivalent is [`run_external_cli`]: wire it into your own
210/// bot binary's `main` (the scaffolded starter does this for you) and it
211/// loads `chipzen.toml`, resolves the env-aware lobby URL, and runs your
212/// bot via [`run_external_bot`]. Build this struct however you like — from
213/// `clap`, from env vars, or by hand — so this crate stays argument-parser
214/// agnostic.
215#[derive(Debug, Clone, Default)]
216pub struct RunExternalArgs {
217    /// `--env prod|staging|local`. `None` consults `$CHIPZEN_ENV` then
218    /// defaults to `prod`.
219    pub env: Option<EnvName>,
220    /// `--token`. Overrides `[external_api].token` in `chipzen.toml`.
221    pub token: Option<String>,
222    /// `--bot-id`. Overrides `[external_api].bot_id`. Required when no
223    /// `[external_api].url` is configured.
224    pub bot_id: Option<String>,
225    /// `--max-matches`. `None` runs until the lobby closes (tournament
226    /// check-in); `Some(1)` plays a single challenge.
227    pub max_matches: Option<u64>,
228    /// `--no-safe-mode` sets this to `false` (let a `decide()` panic crash
229    /// the process). Defaults to `true`.
230    pub safe_mode: bool,
231}
232
233impl RunExternalArgs {
234    /// A default args set with `safe_mode` on — the common case.
235    pub fn new() -> Self {
236        Self {
237            env: None,
238            token: None,
239            bot_id: None,
240            max_matches: None,
241            safe_mode: true,
242        }
243    }
244}
245
246/// The Rust equivalent of the `chipzen run-external` CLI: resolve config +
247/// env URL + token from `args` (and a discovered `chipzen.toml`), then run
248/// `factory`'s bot via [`run_external_bot`].
249///
250/// Wire this into your bot binary's `main` — the scaffolded starter ships a
251/// `run-external` subcommand that calls it. `factory` produces one bot per
252/// match (`|| MyBot::default()`).
253///
254/// # Errors
255///
256/// Surfaces the same errors as [`run_external_bot`] (no token, no URL,
257/// malformed config, or a terminal [`Error::BotDecision`] under
258/// `safe_mode = false`).
259pub async fn run_external_cli<B, F>(
260    factory: F,
261    args: RunExternalArgs,
262) -> Result<Vec<MatchResult>, Error>
263where
264    B: Bot,
265    F: Fn() -> B + Send + Sync + 'static,
266{
267    let options = RunExternalOptions {
268        bot_id: args.bot_id,
269        env: args.env,
270        token: args.token,
271        max_matches: args.max_matches,
272        safe_mode: Some(args.safe_mode),
273        ..Default::default()
274    };
275    run_external_bot(factory, options).await
276}
277
278// ---------------------------------------------------------------------------
279// Transport seam — real impl wraps tokio-tungstenite; tests script it.
280// ---------------------------------------------------------------------------
281
282/// Opens lobby + gateway WebSocket connections. Abstracted so tests can
283/// drive [`run_external_with_transport`] against scripted sockets, exactly
284/// as the conformance harness drives [`_run_session`].
285///
286/// A connection is returned as a boxed [`MessageReader`] + [`MessageWriter`]
287/// pair. Implementors must be `Send + Sync + 'static` (a single transport is
288/// shared across the lobby loop and every spawned match task).
289#[async_trait]
290pub trait LobbyTransport: Send + Sync + 'static {
291    /// Open the lobby WS at `url`, sending `user_agent` on the handshake.
292    async fn connect_lobby(
293        &self,
294        url: &str,
295        user_agent: &str,
296    ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error>;
297
298    /// Open the per-match gateway WS at `url`. `token` travels in the
299    /// `Sec-WebSocket-Protocol` header (see [`bot_token_subprotocols`]).
300    async fn connect_gateway(
301        &self,
302        url: &str,
303        token: &str,
304        user_agent: &str,
305    ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error>;
306}
307
308/// Production transport: real tokio-tungstenite sockets.
309struct WsTransport;
310
311#[async_trait]
312impl LobbyTransport for WsTransport {
313    async fn connect_lobby(
314        &self,
315        url: &str,
316        user_agent: &str,
317    ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error> {
318        ws_transport::connect(url, user_agent, None).await
319    }
320
321    async fn connect_gateway(
322        &self,
323        url: &str,
324        token: &str,
325        user_agent: &str,
326    ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error> {
327        ws_transport::connect(url, user_agent, Some(token)).await
328    }
329}
330
331// ---------------------------------------------------------------------------
332// Public entry point
333// ---------------------------------------------------------------------------
334
335/// Run a bot on the Chipzen external-API remote-play path.
336///
337/// Connects to the lobby, then plays every match the platform dispatches to
338/// this bot (a single challenge, or every round of a tournament) until the
339/// lobby closes, the bot is evicted, or `max_matches` matches complete.
340///
341/// `factory` produces one bot instance per match. Pass a closure that
342/// returns a fresh bot (`|| MyBot::default()`) for correct per-match state
343/// when matches may overlap; the same closure can capture-and-clone shared
344/// config. Each match runs in its own task on its own gateway socket.
345///
346/// Returns one [`MatchResult`] per match played this session.
347///
348/// # Errors
349///
350/// * [`Error::Protocol`] if no token can be resolved, or neither `url` nor a
351///   `bot_id` is available to build the lobby URL.
352/// * [`Error::BotDecision`] if `decide()` panics under `safe_mode = false`.
353pub async fn run_external_bot<B, F>(
354    factory: F,
355    options: RunExternalOptions,
356) -> Result<Vec<MatchResult>, Error>
357where
358    B: Bot,
359    F: Fn() -> B + Send + Sync + 'static,
360{
361    run_external_with_transport(factory, options, Arc::new(WsTransport)).await
362}
363
364/// Testable core of [`run_external_bot`], generic over the transport.
365///
366/// `run_external_bot` calls this with the real `WsTransport`; tests pass a
367/// scripted transport to exercise the lobby/gateway/reconnect logic without
368/// a server.
369pub async fn run_external_with_transport<B, F, T>(
370    factory: F,
371    options: RunExternalOptions,
372    transport: Arc<T>,
373) -> Result<Vec<MatchResult>, Error>
374where
375    B: Bot,
376    F: Fn() -> B + Send + Sync + 'static,
377    T: LobbyTransport,
378{
379    let config = match options.config {
380        Some(c) => Some(c),
381        None => load_chipzen_config(None)?,
382    };
383
384    // --- Resolve lobby URL + token + retry policy --------------------------
385    let (lobby_url, policy, resolved_token) = if let Some(url) = options.url {
386        let policy = options.retry_policy.unwrap_or_default();
387        let token = resolve_token(options.token.as_deref(), config.as_ref());
388        (url, policy, token)
389    } else {
390        let bot_id = options
391            .bot_id
392            .clone()
393            .or_else(|| config.as_ref().and_then(|c| c.bot_id.clone()));
394        let Some(bot_id) = bot_id.filter(|s| !s.is_empty()) else {
395            return Err(Error::Protocol(
396                "run_external_bot() needs a lobby URL. Set url, or bot_id (or \
397                 [external_api].bot_id / url in chipzen.toml)."
398                    .to_string(),
399            ));
400        };
401        let conn = connect_to_chipzen(&bot_id, options.env, options.retry_policy, config.clone())?;
402        // An explicit token still wins over the config-file token.
403        let token = match options.token.as_deref() {
404            Some(t) => Some(t.to_string()),
405            None => conn.token.clone(),
406        };
407        (conn.url, conn.retry_policy, token)
408    };
409
410    let Some(resolved_token) = resolved_token.filter(|t| !t.is_empty()) else {
411        return Err(Error::Protocol(
412            "run_external_bot() requires an external-API token (cz_extbot_...). Pass \
413             token, or set [external_api].token in chipzen.toml."
414                .to_string(),
415        ));
416    };
417
418    let client_version = options
419        .client_version
420        .clone()
421        .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
422    let user_agent = options
423        .user_agent
424        .clone()
425        .unwrap_or_else(default_user_agent);
426    let client_name = options
427        .client_name
428        .clone()
429        .unwrap_or_else(|| DEFAULT_CLIENT_NAME.to_string());
430    let safe_mode = options.safe_mode.unwrap_or(true);
431
432    let session = Arc::new(SessionParams {
433        token: resolved_token,
434        client_name,
435        client_version,
436        safe_mode,
437        user_agent,
438        policy,
439        max_matches: options.max_matches,
440    });
441
442    // Shared mutable session state — owned HERE, not by a single lobby
443    // session, so in-flight matches survive a lobby reconnect.
444    let results: Arc<Mutex<Vec<MatchResult>>> = Arc::new(Mutex::new(Vec::new()));
445    let completed = Arc::new(AtomicU64::new(0));
446    let stop = Arc::new(AtomicBool::new(false));
447    let fatal: Arc<Mutex<Option<Error>>> = Arc::new(Mutex::new(None));
448    let mut match_tasks: JoinSet<()> = JoinSet::new();
449
450    // --- Lobby session loop with reconnect/backoff -------------------------
451    let mut consecutive_failures: u32 = 0;
452    let mut ever_connected = false;
453    let mut giveup: Option<Error> = None;
454
455    while !stop.load(Ordering::SeqCst) {
456        let run = run_lobby_once(
457            &transport,
458            &lobby_url,
459            &factory,
460            &session,
461            &results,
462            &completed,
463            &stop,
464            &fatal,
465            &mut match_tasks,
466        )
467        .await;
468
469        match run {
470            Ok(status) => {
471                ever_connected = true;
472                consecutive_failures = 0;
473                if matches!(status, LobbyStatus::Stopped | LobbyStatus::Evicted)
474                    || fatal.lock().unwrap().is_some()
475                {
476                    break;
477                }
478                // status == Closed: the lobby dropped. In-flight matches keep
479                // playing on their own sockets; reconnect per the policy.
480                consecutive_failures += 1;
481                if consecutive_failures > session.policy.max_reconnect_attempts {
482                    break;
483                }
484                let delay = session.policy.backoff_ms(consecutive_failures);
485                tokio::time::sleep(Duration::from_millis(delay)).await;
486            }
487            Err(exc) => {
488                // connect() itself failed — count it as a reconnect attempt.
489                consecutive_failures += 1;
490                if consecutive_failures > session.policy.max_reconnect_attempts {
491                    // Only a hard error if we NEVER reached the lobby. If we
492                    // connected and played, give up quietly and return results.
493                    if !ever_connected {
494                        giveup = Some(exc);
495                    }
496                    break;
497                }
498                let delay = session.policy.backoff_ms(consecutive_failures);
499                tokio::time::sleep(Duration::from_millis(delay)).await;
500            }
501        }
502    }
503
504    // --- Teardown: never orphan an in-flight match task --------------------
505    drain_and_cancel(&mut match_tasks).await;
506
507    if let Some(err) = fatal.lock().unwrap().take() {
508        return Err(err);
509    }
510    if let Some(err) = giveup {
511        return Err(err);
512    }
513
514    let out = std::mem::take(&mut *results.lock().unwrap());
515    Ok(out)
516}
517
518/// Immutable per-session parameters shared with every spawned match task.
519struct SessionParams {
520    token: String,
521    client_name: String,
522    client_version: String,
523    safe_mode: bool,
524    user_agent: String,
525    policy: RetryPolicy,
526    max_matches: Option<u64>,
527}
528
529/// Why a single lobby connection ended.
530enum LobbyStatus {
531    /// Caller's stop signal fired or `max_matches` reached — do not reconnect.
532    Stopped,
533    /// The lobby evicted us (a newer connection replaced this one).
534    Evicted,
535    /// The lobby connection closed unexpectedly — the caller may reconnect.
536    Closed,
537}
538
539/// Hold ONE lobby connection and dispatch every `matched` it delivers.
540///
541/// Each `matched` is spawned into `match_tasks` (owned by the caller) so the
542/// lobby heartbeat is answered during matches AND in-flight matches survive
543/// a lobby reconnect — a match runs on its own gateway socket.
544#[allow(clippy::too_many_arguments)]
545async fn run_lobby_once<B, F, T>(
546    transport: &Arc<T>,
547    lobby_url: &str,
548    factory: &F,
549    session: &Arc<SessionParams>,
550    results: &Arc<Mutex<Vec<MatchResult>>>,
551    completed: &Arc<AtomicU64>,
552    stop: &Arc<AtomicBool>,
553    fatal: &Arc<Mutex<Option<Error>>>,
554    match_tasks: &mut JoinSet<()>,
555) -> Result<LobbyStatus, Error>
556where
557    B: Bot,
558    F: Fn() -> B + Send + Sync + 'static,
559    T: LobbyTransport,
560{
561    let (mut reader, mut writer) = transport
562        .connect_lobby(lobby_url, &session.user_agent)
563        .await?;
564
565    // Lobby auth frame.
566    writer
567        .send(json!({ "type": "authenticate", "token": session.token }).to_string())
568        .await?;
569
570    while !stop.load(Ordering::SeqCst) {
571        let recv = tokio::time::timeout(lobby_recv_timeout(), reader.next()).await;
572        let raw = match recv {
573            // Periodic wake to re-check the stop signal.
574            Err(_elapsed) => continue,
575            Ok(Ok(Some(raw))) => raw,
576            // Clean close.
577            Ok(Ok(None)) => return Ok(LobbyStatus::Closed),
578            // Transport error → treat as a drop the caller may reconnect.
579            Ok(Err(_)) => return Ok(LobbyStatus::Closed),
580        };
581
582        let msg = loads(&raw);
583        match msg.get("type").and_then(|v| v.as_str()) {
584            Some("ping") => {
585                writer.send(json!({ "type": "pong" }).to_string()).await?;
586            }
587            Some("hello") => {
588                // Lobby connected; no client hello on the lobby leg.
589            }
590            Some("matched") => {
591                let gateway_path = msg
592                    .get("gateway_ws_url")
593                    .and_then(|v| v.as_str())
594                    .unwrap_or("");
595                let match_id = msg
596                    .get("match_id")
597                    .and_then(|v| v.as_str())
598                    .unwrap_or("")
599                    .to_string();
600                // Untrusted gateway URL (cross-origin / downgrade) → skip this
601                // match rather than send the token there.
602                let gateway_url = match resolve_gateway_url(lobby_url, gateway_path) {
603                    Ok(url) => url,
604                    Err(_) => continue,
605                };
606
607                let bot = factory();
608                let transport = Arc::clone(transport);
609                let session = Arc::clone(session);
610                let results = Arc::clone(results);
611                let completed = Arc::clone(completed);
612                let stop = Arc::clone(stop);
613                let fatal = Arc::clone(fatal);
614
615                match_tasks.spawn(async move {
616                    let outcome =
617                        play_one_match(&*transport, &gateway_url, &match_id, bot, &session).await;
618                    record_match_outcome(
619                        outcome, &match_id, &session, &results, &completed, &stop, &fatal,
620                    );
621                });
622            }
623            Some("evict") => return Ok(LobbyStatus::Evicted),
624            _ => {
625                // Ignore unknown lobby frame types.
626            }
627        }
628    }
629
630    if stop.load(Ordering::SeqCst) {
631        Ok(LobbyStatus::Stopped)
632    } else {
633        Ok(LobbyStatus::Closed)
634    }
635}
636
637/// Fold a finished match into the shared results + completed counter, and
638/// flip the stop signal on a terminal `BotDecision` error or `max_matches`.
639fn record_match_outcome(
640    outcome: Result<Option<Value>, Error>,
641    match_id: &str,
642    session: &SessionParams,
643    results: &Mutex<Vec<MatchResult>>,
644    completed: &AtomicU64,
645    stop: &AtomicBool,
646    fatal: &Mutex<Option<Error>>,
647) {
648    match outcome {
649        // safe_mode=false: a deterministic bot bug. Surface it (stop the
650        // session + re-raise from run_external_bot) — the whole point of
651        // safe_mode=false is a loud failure.
652        Err(err @ Error::BotDecision(_)) => {
653            let mut slot = fatal.lock().unwrap();
654            if slot.is_none() {
655                *slot = Some(err);
656            }
657            stop.store(true, Ordering::SeqCst);
658            return;
659        }
660        Err(_other) => {
661            // Record, never crash the lobby. (Connect/transport errors that
662            // outlived the reconnect budget land here as the match abandoned.)
663            results.lock().unwrap().push(MatchResult {
664                match_id: Some(match_id.to_string()),
665                end: None,
666            });
667        }
668        Ok(end) => {
669            let match_id = end
670                .as_ref()
671                .and_then(|e| e.get("match_id"))
672                .and_then(|v| v.as_str())
673                .map(String::from)
674                .or_else(|| Some(match_id.to_string()));
675            completed.fetch_add(1, Ordering::SeqCst);
676            results.lock().unwrap().push(MatchResult { match_id, end });
677        }
678    }
679
680    if let Some(max) = session.max_matches {
681        if completed.load(Ordering::SeqCst) >= max {
682            stop.store(true, Ordering::SeqCst);
683        }
684    }
685}
686
687/// Play one match end-to-end over the per-match gateway WS, reconnecting
688/// across a mid-match drop.
689///
690/// Opens the gateway WS (token in the `Sec-WebSocket-Protocol` header) and
691/// hands off to [`_run_session`] for the two-layer handshake + game loop.
692/// If the socket drops before `match_end`, reconnects (bounded by the
693/// policy) and lets the platform's reconnect-resume re-deliver the pending
694/// turn — `_run_session` already consumes the server `reconnected` frame and
695/// replays its `pending_request`, and the same `bot` carries its state
696/// across the gap.
697///
698/// Returns the `match_end` payload, or `Ok(None)` if the match could not be
699/// completed within the reconnect budget. A [`Error::BotDecision`] is
700/// terminal and propagates immediately (never reconnect-retried).
701async fn play_one_match<B, T>(
702    transport: &T,
703    gateway_url: &str,
704    match_id: &str,
705    mut bot: B,
706    session: &SessionParams,
707) -> Result<Option<Value>, Error>
708where
709    B: Bot,
710    T: LobbyTransport,
711{
712    let ctx = SessionContext {
713        match_id: match_id.to_string(),
714        // The inner leg's auth token is the gateway's internal JWT; the
715        // executor ignores the value we send (an empty token), but the
716        // authenticate frame MUST be first.
717        token: None,
718        ticket: None,
719        client_name: session.client_name.clone(),
720        client_version: session.client_version.clone(),
721        safe_mode: session.safe_mode,
722    };
723
724    let mut attempt: u32 = 0;
725    loop {
726        let connect = transport
727            .connect_gateway(gateway_url, &session.token, &session.user_agent)
728            .await;
729
730        match connect {
731            Ok((mut reader, mut writer)) => {
732                match _run_session(&mut reader, &mut writer, &mut bot, &ctx).await {
733                    // Clean match_end — done.
734                    Ok(Some(end)) => return Ok(Some(end)),
735                    // Socket closed without a match_end (a drop). Try to resume.
736                    Ok(None) => {}
737                    // Deterministic bot bug — terminal, never reconnect-retry.
738                    Err(e @ Error::BotDecision(_)) => return Err(e),
739                    // Transport/protocol error mid-session — try to resume.
740                    Err(_) => {}
741                }
742            }
743            // Connect failed — counts as a drop too.
744            Err(Error::BotDecision(_)) => unreachable!("connect cannot raise BotDecision"),
745            Err(_) => {}
746        }
747
748        attempt += 1;
749        if attempt > session.policy.max_reconnect_attempts {
750            return Ok(None); // reconnect budget exhausted — abandon
751        }
752        let delay = session.policy.backoff_ms(attempt);
753        tokio::time::sleep(Duration::from_millis(delay)).await;
754    }
755}
756
757/// Teardown: let still-in-flight matches finish for a short grace window,
758/// then cancel any stragglers so no task is orphaned.
759async fn drain_and_cancel(match_tasks: &mut JoinSet<()>) {
760    if match_tasks.is_empty() {
761        return;
762    }
763    // `join_next` until the grace window elapses; whatever's left is aborted.
764    let drain = async { while match_tasks.join_next().await.is_some() {} };
765    if tokio::time::timeout(MATCH_DRAIN_GRACE, drain)
766        .await
767        .is_err()
768    {
769        match_tasks.abort_all();
770        while match_tasks.join_next().await.is_some() {}
771    }
772}
773
774// ---------------------------------------------------------------------------
775// Real WebSocket transport adapter
776// ---------------------------------------------------------------------------
777
778mod ws_transport {
779    use super::{bot_token_subprotocols, MessageReader, MessageWriter};
780    use crate::error::Error;
781    use async_trait::async_trait;
782    use futures_util::stream::{SplitSink, SplitStream};
783    use futures_util::{SinkExt, StreamExt};
784    use tokio::net::TcpStream;
785    use tokio_tungstenite::{
786        connect_async,
787        tungstenite::client::IntoClientRequest,
788        tungstenite::handshake::client::generate_key,
789        tungstenite::http::header::{
790            CONNECTION, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_PROTOCOL, SEC_WEBSOCKET_VERSION, UPGRADE,
791            USER_AGENT,
792        },
793        tungstenite::Message,
794        MaybeTlsStream, WebSocketStream,
795    };
796
797    type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
798
799    /// Open a WS connection, attaching the `User-Agent` header and, when a
800    /// `token` is supplied, the `Sec-WebSocket-Protocol` token offer.
801    pub async fn connect(
802        url: &str,
803        user_agent: &str,
804        token: Option<&str>,
805    ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error> {
806        let mut request = url.into_client_request().map_err(Error::from)?;
807        let headers = request.headers_mut();
808        if let Ok(value) = user_agent.parse() {
809            headers.insert(USER_AGENT, value);
810        }
811        if let Some(token) = token {
812            let offer = bot_token_subprotocols(token).join(", ");
813            if let Ok(value) = offer.parse() {
814                headers.insert(SEC_WEBSOCKET_PROTOCOL, value);
815            }
816            // `into_client_request` already set the upgrade/version/key
817            // headers; ensure the standard handshake set is present in case a
818            // future tungstenite changes defaults.
819            headers
820                .entry(SEC_WEBSOCKET_VERSION)
821                .or_insert_with(|| "13".parse().expect("static header"));
822            headers
823                .entry(SEC_WEBSOCKET_KEY)
824                .or_insert_with(|| generate_key().parse().expect("generated key"));
825            headers
826                .entry(CONNECTION)
827                .or_insert_with(|| "Upgrade".parse().expect("static header"));
828            headers
829                .entry(UPGRADE)
830                .or_insert_with(|| "websocket".parse().expect("static header"));
831        }
832
833        let (ws_stream, _) = connect_async(request).await?;
834        let (write_half, read_half) = ws_stream.split();
835        let reader: Box<dyn MessageReader> = Box::new(OwnedWsReader { inner: read_half });
836        let writer: Box<dyn MessageWriter> = Box::new(OwnedWsWriter { inner: write_half });
837        Ok((reader, writer))
838    }
839
840    struct OwnedWsReader {
841        inner: SplitStream<WsStream>,
842    }
843
844    #[async_trait]
845    impl MessageReader for OwnedWsReader {
846        async fn next(&mut self) -> Result<Option<String>, Error> {
847            loop {
848                match self.inner.next().await {
849                    Some(Ok(Message::Text(t))) => return Ok(Some(t.to_string())),
850                    Some(Ok(Message::Ping(_))) => continue,
851                    Some(Ok(Message::Close(_))) | None => return Ok(None),
852                    Some(Ok(_)) => continue,
853                    Some(Err(e)) => return Err(Error::from(e)),
854                }
855            }
856        }
857    }
858
859    struct OwnedWsWriter {
860        inner: SplitSink<WsStream, Message>,
861    }
862
863    #[async_trait]
864    impl MessageWriter for OwnedWsWriter {
865        async fn send(&mut self, payload: String) -> Result<(), Error> {
866            self.inner
867                .send(Message::Text(payload))
868                .await
869                .map_err(Error::from)
870        }
871    }
872}