chipzen_bot/external.rs
1//! External-API remote-play entry point: [`run_external_bot`].
2//!
3//! Where [`crate::run_bot`] connects a bot to a *known* match URL (the
4//! containerized/upload path — the platform's executor hands the container
5//! its `/ws/match/{match_id}/{participant_id}` URL), this module implements
6//! the **external-API remote-play** path: a developer runs their bot on
7//! their own machine, authenticates with a long-lived `cz_extbot_` token,
8//! and the platform matches and dispatches them exactly like any other
9//! competitor.
10//!
11//! The flow (documented in `docs/EXTERNAL-API-BOT-PROTOCOL.md`):
12//!
13//! ```text
14//! lobby WS /ws/external/bot/{bot_id} (token in authenticate frame)
15//! -> "matched" notify (carries match_id + gateway_ws_url)
16//! -> per-match gateway WS /ws/external/match/{mid}/{pid}
17//! (token in Sec-WebSocket-Protocol header)
18//! -> two-layer bot handshake + game loop to match_end
19//! ```
20//!
21//! The **match data plane is identical** to the containerized path, so the
22//! game loop here reuses [`crate::_run_session`] verbatim — the only
23//! external-API-specific code is the lobby connection and the per-match
24//! gateway handshake. A developer writes ONE [`Bot`] and it works on both
25//! paths.
26//!
27//! The lobby is held open for the bot's whole session and each `matched`
28//! plays in its own task, so the lobby heartbeat is answered even while a
29//! multi-minute match is in flight. This is what lets a single connection
30//! serve a whole tournament (the bot is "checked in" via lobby presence and
31//! matched once per round). Match-task ownership is hoisted above the lobby
32//! session so a lobby reconnect doesn't kill in-flight matches, and a
33//! dropped gateway socket reconnects and resumes via the server's
34//! `reconnected` / `pending_request` frame.
35
36use crate::bot::Bot;
37use crate::client::{
38 _run_session, default_user_agent, MessageReader, MessageWriter, SessionContext,
39};
40use crate::config::{load_chipzen_config, resolve_token, ChipzenConfig};
41use crate::connect::{connect_to_chipzen, EnvName};
42use crate::error::Error;
43use crate::retry::RetryPolicy;
44use async_trait::async_trait;
45use serde_json::{json, Value};
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::sync::{Arc, Mutex};
48use std::time::Duration;
49use tokio::task::JoinSet;
50
51/// Sentinel subprotocol that marks the `cz_extbot_` token in the
52/// `Sec-WebSocket-Protocol` header (CZ issue 2932 moved the token off the
53/// query string, where it leaked into proxy access logs). Must match the
54/// value the platform's api gateway expects.
55pub const BOT_TOKEN_SUBPROTOCOL: &str = "chipzen-bot-token";
56
57/// How long the lobby loop blocks on a single `recv` before waking to
58/// re-check the stop signal (milliseconds). Short enough that a stop is
59/// honored promptly; long enough that the loop isn't a busy-wait. Stored in
60/// an atomic so tests can shrink it (mirrors the Python suite's monkeypatch
61/// of `_LOBBY_RECV_TIMEOUT_S`).
62static LOBBY_RECV_TIMEOUT_MS: AtomicU64 = AtomicU64::new(2_000);
63
64fn lobby_recv_timeout() -> Duration {
65 Duration::from_millis(LOBBY_RECV_TIMEOUT_MS.load(Ordering::Relaxed))
66}
67
68/// Override the lobby recv timeout. Hidden test hook — not part of the
69/// supported API.
70#[doc(hidden)]
71pub fn _set_lobby_recv_timeout_ms(ms: u64) {
72 LOBBY_RECV_TIMEOUT_MS.store(ms, Ordering::Relaxed);
73}
74
75/// On teardown, how long to let still-in-flight matches finish before
76/// cancelling them, so nothing is left orphaned.
77const MATCH_DRAIN_GRACE: Duration = Duration::from_secs(5);
78
79const DEFAULT_CLIENT_NAME: &str = "chipzen-sdk-rust";
80
81/// Build the `Sec-WebSocket-Protocol` offer that carries the bot token:
82/// `[sentinel, token]`. The sentinel marks "the next value is my bot
83/// token"; the api gateway extracts the token from this header (so it never
84/// appears in any access log / URL) and echoes the sentinel back on accept.
85pub fn bot_token_subprotocols(token: &str) -> Vec<String> {
86 vec![BOT_TOKEN_SUBPROTOCOL.to_string(), token.to_string()]
87}
88
89/// Strip any path/query from a URL, keeping scheme + host(:port) only.
90/// Tolerates a bare `host:port` without a scheme (defaults to `wss`).
91fn normalise_base(url: &str) -> String {
92 let (scheme, rest) = match url.split_once("://") {
93 Some((s, r)) => (s, r),
94 None => ("wss", url),
95 };
96 // Authority ends at the first '/', '?' or '#'.
97 let end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
98 let authority = &rest[..end];
99 format!("{scheme}://{authority}")
100}
101
102/// Split a ws/wss URL into `(scheme, authority)` where authority is
103/// `host[:port]`. Mirrors [`normalise_base`]'s parsing.
104fn split_origin(url: &str) -> (&str, &str) {
105 let (scheme, rest) = match url.split_once("://") {
106 Some((s, r)) => (s, r),
107 None => ("wss", url),
108 };
109 let end = rest.find(['/', '?', '#']).unwrap_or(rest.len());
110 (scheme, &rest[..end])
111}
112
113/// Resolve the `matched.gateway_ws_url` path against the lobby origin.
114///
115/// The `matched` notification carries `gateway_ws_url` as a *path*
116/// (`/ws/external/match/{mid}/{pid}`). The `cz_extbot_` token is NOT on the
117/// query string — it travels in the `Sec-WebSocket-Protocol` header.
118///
119/// A server that returns a full URL is honored ONLY if it stays on the same
120/// origin as the lobby and does not downgrade `wss` → `ws` — otherwise the bot
121/// token would be sent to an attacker-/misconfig-supplied host (or in
122/// cleartext). A relative path is re-anchored to the lobby origin, so it's
123/// inherently same-origin.
124///
125/// # Errors
126/// Returns [`Error::UntrustedGateway`] if `gateway_ws_path` is an absolute URL
127/// on a different origin than the lobby, or downgrades a `wss` lobby to `ws`.
128pub fn resolve_gateway_url(lobby_url: &str, gateway_ws_path: &str) -> Result<String, Error> {
129 if gateway_ws_path.starts_with("ws://") || gateway_ws_path.starts_with("wss://") {
130 let lobby_base = normalise_base(lobby_url);
131 let (lobby_scheme, lobby_authority) = split_origin(&lobby_base);
132 let (gw_scheme, gw_authority) = split_origin(gateway_ws_path);
133 let downgrade = lobby_scheme == "wss" && gw_scheme != "wss";
134 if gw_authority != lobby_authority || downgrade {
135 return Err(Error::UntrustedGateway(format!(
136 "{gateway_ws_path:?}: cross-origin or insecure relative to lobby \
137 {lobby_scheme}://{lobby_authority} (the bot token must not be sent to a \
138 different host or in cleartext)"
139 )));
140 }
141 return Ok(gateway_ws_path.to_string());
142 }
143 Ok(format!("{}{}", normalise_base(lobby_url), gateway_ws_path))
144}
145
146/// Parse a WS frame into a JSON object (`{}` on non-object / bad JSON).
147fn loads(raw: &str) -> Value {
148 match serde_json::from_str::<Value>(raw) {
149 Ok(v @ Value::Object(_)) => v,
150 _ => json!({}),
151 }
152}
153
154/// One per-match result collected during a session.
155#[derive(Debug, Clone)]
156pub struct MatchResult {
157 /// The match id, when known (from the `match_end` payload).
158 pub match_id: Option<String>,
159 /// The full `match_end` payload on a clean finish, or `None` if the
160 /// match was abandoned after exhausting the reconnect budget.
161 pub end: Option<Value>,
162}
163
164/// Options for [`run_external_bot`]. Mirrors the Python kwargs.
165#[derive(Debug, Clone, Default)]
166pub struct RunExternalOptions {
167 /// External-API bot UUID. Used to build the lobby URL when `url` is not
168 /// given; falls back to `[external_api].bot_id` in `chipzen.toml`.
169 pub bot_id: Option<String>,
170 /// Target environment. `None` consults `$CHIPZEN_ENV` then defaults to
171 /// `prod`.
172 pub env: Option<EnvName>,
173 /// Explicit full lobby URL (`wss://.../ws/external/bot/{bot_id}`).
174 /// Overrides `bot_id` / `env` URL derivation when set.
175 pub url: Option<String>,
176 /// Long-lived `cz_extbot_` API token. Falls back to
177 /// `[external_api].token` in `chipzen.toml`. Required.
178 pub token: Option<String>,
179 /// Pre-loaded config, to avoid a second filesystem stat.
180 pub config: Option<ChipzenConfig>,
181 /// Reconnect-pacing policy for both lobby drops and mid-match gateway
182 /// drops. `None` uses [`RetryPolicy::default`].
183 pub retry_policy: Option<RetryPolicy>,
184 /// Sent in the per-match `hello` handshake. Defaults to
185 /// `chipzen-sdk-rust`.
186 pub client_name: Option<String>,
187 /// Sent in the per-match `hello` handshake. Defaults to the crate
188 /// version.
189 pub client_version: Option<String>,
190 /// When `true` (default), a panic in `decide()` is folded; `false`
191 /// propagates it as a terminal [`Error::BotDecision`].
192 pub safe_mode: Option<bool>,
193 /// Stop after this many matches complete. `None` runs until the lobby
194 /// closes / the bot is evicted. `Some(1)` plays a single challenge.
195 pub max_matches: Option<u64>,
196 /// Override the WS `User-Agent`. Defaults to `chipzen-sdk-rust/<version>`.
197 pub user_agent: Option<String>,
198}
199
200// ---------------------------------------------------------------------------
201// CLI helper — the Rust equivalent of `chipzen run-external <bot.py>`.
202// ---------------------------------------------------------------------------
203
204/// Parsed `run-external` flags, mirroring the Python CLI
205/// (`--env`/`--token`/`--bot-id`/`--max-matches`/`--no-safe-mode`).
206///
207/// Rust cannot dynamically load a bot from a file the way Python's
208/// `chipzen run-external my_bot.py` does — a Rust bot is compiled into a
209/// binary. The equivalent is [`run_external_cli`]: wire it into your own
210/// bot binary's `main` (the scaffolded starter does this for you) and it
211/// loads `chipzen.toml`, resolves the env-aware lobby URL, and runs your
212/// bot via [`run_external_bot`]. Build this struct however you like — from
213/// `clap`, from env vars, or by hand — so this crate stays argument-parser
214/// agnostic.
215#[derive(Debug, Clone, Default)]
216pub struct RunExternalArgs {
217 /// `--env prod|staging|local`. `None` consults `$CHIPZEN_ENV` then
218 /// defaults to `prod`.
219 pub env: Option<EnvName>,
220 /// `--token`. Overrides `[external_api].token` in `chipzen.toml`.
221 pub token: Option<String>,
222 /// `--bot-id`. Overrides `[external_api].bot_id`. Required when no
223 /// `[external_api].url` is configured.
224 pub bot_id: Option<String>,
225 /// `--max-matches`. `None` runs until the lobby closes (tournament
226 /// check-in); `Some(1)` plays a single challenge.
227 pub max_matches: Option<u64>,
228 /// `--no-safe-mode` sets this to `false` (let a `decide()` panic crash
229 /// the process). Defaults to `true`.
230 pub safe_mode: bool,
231}
232
233impl RunExternalArgs {
234 /// A default args set with `safe_mode` on — the common case.
235 pub fn new() -> Self {
236 Self {
237 env: None,
238 token: None,
239 bot_id: None,
240 max_matches: None,
241 safe_mode: true,
242 }
243 }
244}
245
246/// The Rust equivalent of the `chipzen run-external` CLI: resolve config +
247/// env URL + token from `args` (and a discovered `chipzen.toml`), then run
248/// `factory`'s bot via [`run_external_bot`].
249///
250/// Wire this into your bot binary's `main` — the scaffolded starter ships a
251/// `run-external` subcommand that calls it. `factory` produces one bot per
252/// match (`|| MyBot::default()`).
253///
254/// # Errors
255///
256/// Surfaces the same errors as [`run_external_bot`] (no token, no URL,
257/// malformed config, or a terminal [`Error::BotDecision`] under
258/// `safe_mode = false`).
259pub async fn run_external_cli<B, F>(
260 factory: F,
261 args: RunExternalArgs,
262) -> Result<Vec<MatchResult>, Error>
263where
264 B: Bot,
265 F: Fn() -> B + Send + Sync + 'static,
266{
267 let options = RunExternalOptions {
268 bot_id: args.bot_id,
269 env: args.env,
270 token: args.token,
271 max_matches: args.max_matches,
272 safe_mode: Some(args.safe_mode),
273 ..Default::default()
274 };
275 run_external_bot(factory, options).await
276}
277
278// ---------------------------------------------------------------------------
279// Transport seam — real impl wraps tokio-tungstenite; tests script it.
280// ---------------------------------------------------------------------------
281
282/// Opens lobby + gateway WebSocket connections. Abstracted so tests can
283/// drive [`run_external_with_transport`] against scripted sockets, exactly
284/// as the conformance harness drives [`_run_session`].
285///
286/// A connection is returned as a boxed [`MessageReader`] + [`MessageWriter`]
287/// pair. Implementors must be `Send + Sync + 'static` (a single transport is
288/// shared across the lobby loop and every spawned match task).
289#[async_trait]
290pub trait LobbyTransport: Send + Sync + 'static {
291 /// Open the lobby WS at `url`, sending `user_agent` on the handshake.
292 async fn connect_lobby(
293 &self,
294 url: &str,
295 user_agent: &str,
296 ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error>;
297
298 /// Open the per-match gateway WS at `url`. `token` travels in the
299 /// `Sec-WebSocket-Protocol` header (see [`bot_token_subprotocols`]).
300 async fn connect_gateway(
301 &self,
302 url: &str,
303 token: &str,
304 user_agent: &str,
305 ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error>;
306}
307
308/// Production transport: real tokio-tungstenite sockets.
309struct WsTransport;
310
311#[async_trait]
312impl LobbyTransport for WsTransport {
313 async fn connect_lobby(
314 &self,
315 url: &str,
316 user_agent: &str,
317 ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error> {
318 ws_transport::connect(url, user_agent, None).await
319 }
320
321 async fn connect_gateway(
322 &self,
323 url: &str,
324 token: &str,
325 user_agent: &str,
326 ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error> {
327 ws_transport::connect(url, user_agent, Some(token)).await
328 }
329}
330
331// ---------------------------------------------------------------------------
332// Public entry point
333// ---------------------------------------------------------------------------
334
335/// Run a bot on the Chipzen external-API remote-play path.
336///
337/// Connects to the lobby, then plays every match the platform dispatches to
338/// this bot (a single challenge, or every round of a tournament) until the
339/// lobby closes, the bot is evicted, or `max_matches` matches complete.
340///
341/// `factory` produces one bot instance per match. Pass a closure that
342/// returns a fresh bot (`|| MyBot::default()`) for correct per-match state
343/// when matches may overlap; the same closure can capture-and-clone shared
344/// config. Each match runs in its own task on its own gateway socket.
345///
346/// Returns one [`MatchResult`] per match played this session.
347///
348/// # Errors
349///
350/// * [`Error::Protocol`] if no token can be resolved, or neither `url` nor a
351/// `bot_id` is available to build the lobby URL.
352/// * [`Error::BotDecision`] if `decide()` panics under `safe_mode = false`.
353pub async fn run_external_bot<B, F>(
354 factory: F,
355 options: RunExternalOptions,
356) -> Result<Vec<MatchResult>, Error>
357where
358 B: Bot,
359 F: Fn() -> B + Send + Sync + 'static,
360{
361 run_external_with_transport(factory, options, Arc::new(WsTransport)).await
362}
363
364/// Testable core of [`run_external_bot`], generic over the transport.
365///
366/// `run_external_bot` calls this with the real `WsTransport`; tests pass a
367/// scripted transport to exercise the lobby/gateway/reconnect logic without
368/// a server.
369pub async fn run_external_with_transport<B, F, T>(
370 factory: F,
371 options: RunExternalOptions,
372 transport: Arc<T>,
373) -> Result<Vec<MatchResult>, Error>
374where
375 B: Bot,
376 F: Fn() -> B + Send + Sync + 'static,
377 T: LobbyTransport,
378{
379 let config = match options.config {
380 Some(c) => Some(c),
381 None => load_chipzen_config(None)?,
382 };
383
384 // --- Resolve lobby URL + token + retry policy --------------------------
385 let (lobby_url, policy, resolved_token) = if let Some(url) = options.url {
386 let policy = options.retry_policy.unwrap_or_default();
387 let token = resolve_token(options.token.as_deref(), config.as_ref());
388 (url, policy, token)
389 } else {
390 let bot_id = options
391 .bot_id
392 .clone()
393 .or_else(|| config.as_ref().and_then(|c| c.bot_id.clone()));
394 let Some(bot_id) = bot_id.filter(|s| !s.is_empty()) else {
395 return Err(Error::Protocol(
396 "run_external_bot() needs a lobby URL. Set url, or bot_id (or \
397 [external_api].bot_id / url in chipzen.toml)."
398 .to_string(),
399 ));
400 };
401 let conn = connect_to_chipzen(&bot_id, options.env, options.retry_policy, config.clone())?;
402 // An explicit token still wins over the config-file token.
403 let token = match options.token.as_deref() {
404 Some(t) => Some(t.to_string()),
405 None => conn.token.clone(),
406 };
407 (conn.url, conn.retry_policy, token)
408 };
409
410 let Some(resolved_token) = resolved_token.filter(|t| !t.is_empty()) else {
411 return Err(Error::Protocol(
412 "run_external_bot() requires an external-API token (cz_extbot_...). Pass \
413 token, or set [external_api].token in chipzen.toml."
414 .to_string(),
415 ));
416 };
417
418 let client_version = options
419 .client_version
420 .clone()
421 .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
422 let user_agent = options
423 .user_agent
424 .clone()
425 .unwrap_or_else(default_user_agent);
426 let client_name = options
427 .client_name
428 .clone()
429 .unwrap_or_else(|| DEFAULT_CLIENT_NAME.to_string());
430 let safe_mode = options.safe_mode.unwrap_or(true);
431
432 let session = Arc::new(SessionParams {
433 token: resolved_token,
434 client_name,
435 client_version,
436 safe_mode,
437 user_agent,
438 policy,
439 max_matches: options.max_matches,
440 });
441
442 // Shared mutable session state — owned HERE, not by a single lobby
443 // session, so in-flight matches survive a lobby reconnect.
444 let results: Arc<Mutex<Vec<MatchResult>>> = Arc::new(Mutex::new(Vec::new()));
445 let completed = Arc::new(AtomicU64::new(0));
446 let stop = Arc::new(AtomicBool::new(false));
447 let fatal: Arc<Mutex<Option<Error>>> = Arc::new(Mutex::new(None));
448 let mut match_tasks: JoinSet<()> = JoinSet::new();
449
450 // --- Lobby session loop with reconnect/backoff -------------------------
451 let mut consecutive_failures: u32 = 0;
452 let mut ever_connected = false;
453 let mut giveup: Option<Error> = None;
454
455 while !stop.load(Ordering::SeqCst) {
456 let run = run_lobby_once(
457 &transport,
458 &lobby_url,
459 &factory,
460 &session,
461 &results,
462 &completed,
463 &stop,
464 &fatal,
465 &mut match_tasks,
466 )
467 .await;
468
469 match run {
470 Ok(status) => {
471 ever_connected = true;
472 consecutive_failures = 0;
473 if matches!(status, LobbyStatus::Stopped | LobbyStatus::Evicted)
474 || fatal.lock().unwrap().is_some()
475 {
476 break;
477 }
478 // status == Closed: the lobby dropped. In-flight matches keep
479 // playing on their own sockets; reconnect per the policy.
480 consecutive_failures += 1;
481 if consecutive_failures > session.policy.max_reconnect_attempts {
482 break;
483 }
484 let delay = session.policy.backoff_ms(consecutive_failures);
485 tokio::time::sleep(Duration::from_millis(delay)).await;
486 }
487 Err(exc) => {
488 // connect() itself failed — count it as a reconnect attempt.
489 consecutive_failures += 1;
490 if consecutive_failures > session.policy.max_reconnect_attempts {
491 // Only a hard error if we NEVER reached the lobby. If we
492 // connected and played, give up quietly and return results.
493 if !ever_connected {
494 giveup = Some(exc);
495 }
496 break;
497 }
498 let delay = session.policy.backoff_ms(consecutive_failures);
499 tokio::time::sleep(Duration::from_millis(delay)).await;
500 }
501 }
502 }
503
504 // --- Teardown: never orphan an in-flight match task --------------------
505 drain_and_cancel(&mut match_tasks).await;
506
507 if let Some(err) = fatal.lock().unwrap().take() {
508 return Err(err);
509 }
510 if let Some(err) = giveup {
511 return Err(err);
512 }
513
514 let out = std::mem::take(&mut *results.lock().unwrap());
515 Ok(out)
516}
517
518/// Immutable per-session parameters shared with every spawned match task.
519struct SessionParams {
520 token: String,
521 client_name: String,
522 client_version: String,
523 safe_mode: bool,
524 user_agent: String,
525 policy: RetryPolicy,
526 max_matches: Option<u64>,
527}
528
529/// Why a single lobby connection ended.
530enum LobbyStatus {
531 /// Caller's stop signal fired or `max_matches` reached — do not reconnect.
532 Stopped,
533 /// The lobby evicted us (a newer connection replaced this one).
534 Evicted,
535 /// The lobby connection closed unexpectedly — the caller may reconnect.
536 Closed,
537}
538
539/// Hold ONE lobby connection and dispatch every `matched` it delivers.
540///
541/// Each `matched` is spawned into `match_tasks` (owned by the caller) so the
542/// lobby heartbeat is answered during matches AND in-flight matches survive
543/// a lobby reconnect — a match runs on its own gateway socket.
544#[allow(clippy::too_many_arguments)]
545async fn run_lobby_once<B, F, T>(
546 transport: &Arc<T>,
547 lobby_url: &str,
548 factory: &F,
549 session: &Arc<SessionParams>,
550 results: &Arc<Mutex<Vec<MatchResult>>>,
551 completed: &Arc<AtomicU64>,
552 stop: &Arc<AtomicBool>,
553 fatal: &Arc<Mutex<Option<Error>>>,
554 match_tasks: &mut JoinSet<()>,
555) -> Result<LobbyStatus, Error>
556where
557 B: Bot,
558 F: Fn() -> B + Send + Sync + 'static,
559 T: LobbyTransport,
560{
561 let (mut reader, mut writer) = transport
562 .connect_lobby(lobby_url, &session.user_agent)
563 .await?;
564
565 // Lobby auth frame.
566 writer
567 .send(json!({ "type": "authenticate", "token": session.token }).to_string())
568 .await?;
569
570 while !stop.load(Ordering::SeqCst) {
571 let recv = tokio::time::timeout(lobby_recv_timeout(), reader.next()).await;
572 let raw = match recv {
573 // Periodic wake to re-check the stop signal.
574 Err(_elapsed) => continue,
575 Ok(Ok(Some(raw))) => raw,
576 // Clean close.
577 Ok(Ok(None)) => return Ok(LobbyStatus::Closed),
578 // Transport error → treat as a drop the caller may reconnect.
579 Ok(Err(_)) => return Ok(LobbyStatus::Closed),
580 };
581
582 let msg = loads(&raw);
583 match msg.get("type").and_then(|v| v.as_str()) {
584 Some("ping") => {
585 writer.send(json!({ "type": "pong" }).to_string()).await?;
586 }
587 Some("hello") => {
588 // Lobby connected; no client hello on the lobby leg.
589 }
590 Some("matched") => {
591 let gateway_path = msg
592 .get("gateway_ws_url")
593 .and_then(|v| v.as_str())
594 .unwrap_or("");
595 let match_id = msg
596 .get("match_id")
597 .and_then(|v| v.as_str())
598 .unwrap_or("")
599 .to_string();
600 // Untrusted gateway URL (cross-origin / downgrade) → skip this
601 // match rather than send the token there.
602 let gateway_url = match resolve_gateway_url(lobby_url, gateway_path) {
603 Ok(url) => url,
604 Err(_) => continue,
605 };
606
607 let bot = factory();
608 let transport = Arc::clone(transport);
609 let session = Arc::clone(session);
610 let results = Arc::clone(results);
611 let completed = Arc::clone(completed);
612 let stop = Arc::clone(stop);
613 let fatal = Arc::clone(fatal);
614
615 match_tasks.spawn(async move {
616 let outcome =
617 play_one_match(&*transport, &gateway_url, &match_id, bot, &session).await;
618 record_match_outcome(
619 outcome, &match_id, &session, &results, &completed, &stop, &fatal,
620 );
621 });
622 }
623 Some("evict") => return Ok(LobbyStatus::Evicted),
624 _ => {
625 // Ignore unknown lobby frame types.
626 }
627 }
628 }
629
630 if stop.load(Ordering::SeqCst) {
631 Ok(LobbyStatus::Stopped)
632 } else {
633 Ok(LobbyStatus::Closed)
634 }
635}
636
637/// Fold a finished match into the shared results + completed counter, and
638/// flip the stop signal on a terminal `BotDecision` error or `max_matches`.
639fn record_match_outcome(
640 outcome: Result<Option<Value>, Error>,
641 match_id: &str,
642 session: &SessionParams,
643 results: &Mutex<Vec<MatchResult>>,
644 completed: &AtomicU64,
645 stop: &AtomicBool,
646 fatal: &Mutex<Option<Error>>,
647) {
648 match outcome {
649 // safe_mode=false: a deterministic bot bug. Surface it (stop the
650 // session + re-raise from run_external_bot) — the whole point of
651 // safe_mode=false is a loud failure.
652 Err(err @ Error::BotDecision(_)) => {
653 let mut slot = fatal.lock().unwrap();
654 if slot.is_none() {
655 *slot = Some(err);
656 }
657 stop.store(true, Ordering::SeqCst);
658 return;
659 }
660 Err(_other) => {
661 // Record, never crash the lobby. (Connect/transport errors that
662 // outlived the reconnect budget land here as the match abandoned.)
663 results.lock().unwrap().push(MatchResult {
664 match_id: Some(match_id.to_string()),
665 end: None,
666 });
667 }
668 Ok(end) => {
669 let match_id = end
670 .as_ref()
671 .and_then(|e| e.get("match_id"))
672 .and_then(|v| v.as_str())
673 .map(String::from)
674 .or_else(|| Some(match_id.to_string()));
675 completed.fetch_add(1, Ordering::SeqCst);
676 results.lock().unwrap().push(MatchResult { match_id, end });
677 }
678 }
679
680 if let Some(max) = session.max_matches {
681 if completed.load(Ordering::SeqCst) >= max {
682 stop.store(true, Ordering::SeqCst);
683 }
684 }
685}
686
687/// Play one match end-to-end over the per-match gateway WS, reconnecting
688/// across a mid-match drop.
689///
690/// Opens the gateway WS (token in the `Sec-WebSocket-Protocol` header) and
691/// hands off to [`_run_session`] for the two-layer handshake + game loop.
692/// If the socket drops before `match_end`, reconnects (bounded by the
693/// policy) and lets the platform's reconnect-resume re-deliver the pending
694/// turn — `_run_session` already consumes the server `reconnected` frame and
695/// replays its `pending_request`, and the same `bot` carries its state
696/// across the gap.
697///
698/// Returns the `match_end` payload, or `Ok(None)` if the match could not be
699/// completed within the reconnect budget. A [`Error::BotDecision`] is
700/// terminal and propagates immediately (never reconnect-retried).
701async fn play_one_match<B, T>(
702 transport: &T,
703 gateway_url: &str,
704 match_id: &str,
705 mut bot: B,
706 session: &SessionParams,
707) -> Result<Option<Value>, Error>
708where
709 B: Bot,
710 T: LobbyTransport,
711{
712 let ctx = SessionContext {
713 match_id: match_id.to_string(),
714 // The inner leg's auth token is the gateway's internal JWT; the
715 // executor ignores the value we send (an empty token), but the
716 // authenticate frame MUST be first.
717 token: None,
718 ticket: None,
719 client_name: session.client_name.clone(),
720 client_version: session.client_version.clone(),
721 safe_mode: session.safe_mode,
722 };
723
724 let mut attempt: u32 = 0;
725 loop {
726 let connect = transport
727 .connect_gateway(gateway_url, &session.token, &session.user_agent)
728 .await;
729
730 match connect {
731 Ok((mut reader, mut writer)) => {
732 match _run_session(&mut reader, &mut writer, &mut bot, &ctx).await {
733 // Clean match_end — done.
734 Ok(Some(end)) => return Ok(Some(end)),
735 // Socket closed without a match_end (a drop). Try to resume.
736 Ok(None) => {}
737 // Deterministic bot bug — terminal, never reconnect-retry.
738 Err(e @ Error::BotDecision(_)) => return Err(e),
739 // Transport/protocol error mid-session — try to resume.
740 Err(_) => {}
741 }
742 }
743 // Connect failed — counts as a drop too.
744 Err(Error::BotDecision(_)) => unreachable!("connect cannot raise BotDecision"),
745 Err(_) => {}
746 }
747
748 attempt += 1;
749 if attempt > session.policy.max_reconnect_attempts {
750 return Ok(None); // reconnect budget exhausted — abandon
751 }
752 let delay = session.policy.backoff_ms(attempt);
753 tokio::time::sleep(Duration::from_millis(delay)).await;
754 }
755}
756
757/// Teardown: let still-in-flight matches finish for a short grace window,
758/// then cancel any stragglers so no task is orphaned.
759async fn drain_and_cancel(match_tasks: &mut JoinSet<()>) {
760 if match_tasks.is_empty() {
761 return;
762 }
763 // `join_next` until the grace window elapses; whatever's left is aborted.
764 let drain = async { while match_tasks.join_next().await.is_some() {} };
765 if tokio::time::timeout(MATCH_DRAIN_GRACE, drain)
766 .await
767 .is_err()
768 {
769 match_tasks.abort_all();
770 while match_tasks.join_next().await.is_some() {}
771 }
772}
773
774// ---------------------------------------------------------------------------
775// Real WebSocket transport adapter
776// ---------------------------------------------------------------------------
777
778mod ws_transport {
779 use super::{bot_token_subprotocols, MessageReader, MessageWriter};
780 use crate::error::Error;
781 use async_trait::async_trait;
782 use futures_util::stream::{SplitSink, SplitStream};
783 use futures_util::{SinkExt, StreamExt};
784 use tokio::net::TcpStream;
785 use tokio_tungstenite::{
786 connect_async,
787 tungstenite::client::IntoClientRequest,
788 tungstenite::handshake::client::generate_key,
789 tungstenite::http::header::{
790 CONNECTION, SEC_WEBSOCKET_KEY, SEC_WEBSOCKET_PROTOCOL, SEC_WEBSOCKET_VERSION, UPGRADE,
791 USER_AGENT,
792 },
793 tungstenite::Message,
794 MaybeTlsStream, WebSocketStream,
795 };
796
797 type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
798
799 /// Open a WS connection, attaching the `User-Agent` header and, when a
800 /// `token` is supplied, the `Sec-WebSocket-Protocol` token offer.
801 pub async fn connect(
802 url: &str,
803 user_agent: &str,
804 token: Option<&str>,
805 ) -> Result<(Box<dyn MessageReader>, Box<dyn MessageWriter>), Error> {
806 let mut request = url.into_client_request().map_err(Error::from)?;
807 let headers = request.headers_mut();
808 if let Ok(value) = user_agent.parse() {
809 headers.insert(USER_AGENT, value);
810 }
811 if let Some(token) = token {
812 let offer = bot_token_subprotocols(token).join(", ");
813 if let Ok(value) = offer.parse() {
814 headers.insert(SEC_WEBSOCKET_PROTOCOL, value);
815 }
816 // `into_client_request` already set the upgrade/version/key
817 // headers; ensure the standard handshake set is present in case a
818 // future tungstenite changes defaults.
819 headers
820 .entry(SEC_WEBSOCKET_VERSION)
821 .or_insert_with(|| "13".parse().expect("static header"));
822 headers
823 .entry(SEC_WEBSOCKET_KEY)
824 .or_insert_with(|| generate_key().parse().expect("generated key"));
825 headers
826 .entry(CONNECTION)
827 .or_insert_with(|| "Upgrade".parse().expect("static header"));
828 headers
829 .entry(UPGRADE)
830 .or_insert_with(|| "websocket".parse().expect("static header"));
831 }
832
833 let (ws_stream, _) = connect_async(request).await?;
834 let (write_half, read_half) = ws_stream.split();
835 let reader: Box<dyn MessageReader> = Box::new(OwnedWsReader { inner: read_half });
836 let writer: Box<dyn MessageWriter> = Box::new(OwnedWsWriter { inner: write_half });
837 Ok((reader, writer))
838 }
839
840 struct OwnedWsReader {
841 inner: SplitStream<WsStream>,
842 }
843
844 #[async_trait]
845 impl MessageReader for OwnedWsReader {
846 async fn next(&mut self) -> Result<Option<String>, Error> {
847 loop {
848 match self.inner.next().await {
849 Some(Ok(Message::Text(t))) => return Ok(Some(t.to_string())),
850 Some(Ok(Message::Ping(_))) => continue,
851 Some(Ok(Message::Close(_))) | None => return Ok(None),
852 Some(Ok(_)) => continue,
853 Some(Err(e)) => return Err(Error::from(e)),
854 }
855 }
856 }
857 }
858
859 struct OwnedWsWriter {
860 inner: SplitSink<WsStream, Message>,
861 }
862
863 #[async_trait]
864 impl MessageWriter for OwnedWsWriter {
865 async fn send(&mut self, payload: String) -> Result<(), Error> {
866 self.inner
867 .send(Message::Text(payload))
868 .await
869 .map_err(Error::from)
870 }
871 }
872}