Skip to main content

studio_worker/ws/
session.rs

1//! Long-running WebSocket session that owns the worker's lifecycle.
2//!
3//! Replaces the four polling loops (`spawn_heartbeat`, `spawn_claim_loop`,
4//! `spawn_log_shipper`, plus the implicit completion path) with a single
5//! `spawn_ws_session` coordinator + a small handful of helper tasks that
6//! all push frames through a shared `WsSender`.
7//!
8//! Reconnect policy: on a transport error or non-auth close, back off
9//! `BASE_BACKOFF_MS * 2^attempt` and try again, up to
10//! `cfg.ws_reconnect_attempts`.  Out of retries → return `Err` and the
11//! systemd / launchd unit restarts the binary.
12use std::sync::{
13    atomic::{AtomicBool, Ordering},
14    Arc,
15};
16use std::time::Duration;
17
18use anyhow::{anyhow, Result};
19use parking_lot::Mutex;
20use tokio::sync::mpsc;
21use tracing::{info, warn};
22
23use crate::config::SharedConfig;
24use crate::engine::Engine;
25use crate::http::ApiClient;
26use crate::runtime::{
27    is_unsupported_kind, prompt_for, push_log_with_observers, record_recent_job, truncate_prompt,
28    wait_with_stop, CurrentJob, JobOutcome, RecentJob, WorkerObservers,
29};
30use crate::types::{LogEntry, TaskResult};
31use crate::ws::client::{connect, WsClientError, WsResult, WsSender};
32use crate::ws::types::{HelloFrame, JobOfferClaim, WorkerInbound, WorkerOutbound};
33
34/// Tracing target used for every event emitted by the session.
35const TRACE_TARGET: &str = "studio_worker::ws::session";
36
37const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
38const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
39const SHUTDOWN_TICK: Duration = Duration::from_millis(250);
40const BASE_BACKOFF_MS: u64 = 1_000;
41const MAX_BACKOFF_MS: u64 = 30_000;
42const DEFAULT_RECONNECT_ATTEMPTS: u32 = 5;
43/// Extra attempts for the multipart result upload when the studio
44/// returns a 5xx / transport error.  A blip is far cheaper to retry
45/// than the full GPU regeneration a reported `Fail` causes.
46const UPLOAD_RETRIES: u32 = 2;
47/// Base pause between upload retries (grows linearly per attempt).
48const UPLOAD_RETRY_PAUSE: Duration = Duration::from_secs(1);
49/// If no frame (not even a `heartbeatAck`) arrives from the studio within this window, treat the
50/// connection as dead and tear the session down. The studio acks every heartbeat (~5s), so a live
51/// connection always yields a frame well inside this budget; the only time it elapses is a
52/// half-open / dead-peer socket where the reader would otherwise block on `source.next()` forever.
53const READ_IDLE_TIMEOUT: Duration = Duration::from_secs(20);
54
55/// Outcome of a single session attempt.  The reconnect loop decides
56/// whether to back off + retry based on the variant.
57#[derive(Debug)]
58pub enum SessionOutcome {
59    /// Caller requested shutdown; do not reconnect.
60    Stopped,
61    /// Lost the connection unexpectedly; reconnect after backoff.
62    Disconnected,
63    /// Server rejected auth; do not reconnect.
64    AuthFailed(String),
65    /// Server sent a fatal error frame; do not reconnect.
66    Fatal(String),
67}
68
69/// Tunables for the session loop — dialed down in tests.
70#[derive(Debug, Clone, Copy)]
71pub struct SessionSchedule {
72    pub heartbeat: Duration,
73    pub log_flush: Duration,
74    pub shutdown_tick: Duration,
75    pub base_backoff_ms: u64,
76    pub max_backoff_ms: u64,
77    /// Reader gives up + reports a disconnect if no server frame arrives within this window.
78    pub read_idle_timeout: Duration,
79}
80
81impl Default for SessionSchedule {
82    fn default() -> Self {
83        Self {
84            heartbeat: HEARTBEAT_INTERVAL,
85            log_flush: LOG_FLUSH_INTERVAL,
86            shutdown_tick: SHUTDOWN_TICK,
87            base_backoff_ms: BASE_BACKOFF_MS,
88            max_backoff_ms: MAX_BACKOFF_MS,
89            read_idle_timeout: READ_IDLE_TIMEOUT,
90        }
91    }
92}
93
94impl SessionSchedule {
95    pub fn fast_for_tests() -> Self {
96        Self {
97            heartbeat: Duration::from_millis(5),
98            log_flush: Duration::from_millis(5),
99            shutdown_tick: Duration::from_millis(5),
100            base_backoff_ms: 1,
101            max_backoff_ms: 10,
102            // Generous vs the 5ms heartbeat so the existing fast tests never trip it; the
103            // silent-connection test overrides this with a tiny value to exercise the timeout.
104            read_idle_timeout: Duration::from_secs(5),
105        }
106    }
107}
108
109/// Top-level driver: connect, run a session, reconnect on disconnect,
110/// give up after `cfg.ws_reconnect_attempts` failures.
111///
112/// `paused` is a runtime-only flag (not persisted to `Config`).  When
113/// true, the heartbeat reports `autoEnabled = false` and incoming
114/// offers are rejected, so the studio stops sending new jobs.  In-
115/// flight work is allowed to finish.
116#[cfg_attr(coverage_nightly, coverage(off))]
117pub async fn spawn_ws_session(
118    cfg: SharedConfig,
119    stop: Arc<AtomicBool>,
120    logs: Arc<Mutex<Vec<LogEntry>>>,
121    busy: Arc<AtomicBool>,
122    paused: Arc<AtomicBool>,
123    observers: WorkerObservers,
124    schedule: SessionSchedule,
125) -> Result<()> {
126    let max_attempts = {
127        let guard = cfg.lock();
128        guard
129            .ws_reconnect_attempts
130            .unwrap_or(DEFAULT_RECONNECT_ATTEMPTS)
131    };
132
133    let mut attempt: u32 = 0;
134    let mut waiting_for_creds_logged = false;
135    loop {
136        if stop.load(Ordering::SeqCst) {
137            return Ok(());
138        }
139        // Credentials may not exist yet (first launch — the
140        // auto-register loop is racing to populate them).  Poll the
141        // shared config until both `worker_id` and `auth_token` show
142        // up, instead of failing the whole session loop.  This is
143        // what lets the UI's parallel auto-register + WS flow work.
144        if !has_credentials(&cfg) {
145            if !waiting_for_creds_logged {
146                push_log_with_observers(
147                    &logs,
148                    Some(&observers),
149                    "info",
150                    "ws",
151                    "waiting for operator approval before opening the session",
152                    None,
153                );
154                waiting_for_creds_logged = true;
155            }
156            wait_with_stop(Duration::from_secs(1), &stop, schedule.shutdown_tick).await;
157            continue;
158        }
159        waiting_for_creds_logged = false;
160
161        let welcomed = AtomicBool::new(false);
162        match run_one_session(
163            &cfg, &stop, &logs, &busy, &paused, &observers, schedule, &welcomed,
164        )
165        .await
166        {
167            Ok(SessionOutcome::Stopped) => return Ok(()),
168            Ok(SessionOutcome::AuthFailed(reason)) => {
169                push_log_with_observers(
170                    &logs,
171                    Some(&observers),
172                    "error",
173                    "ws",
174                    &format!("auth failed: {reason}. Re-register the worker."),
175                    None,
176                );
177                return Err(anyhow!("ws auth failed: {reason}"));
178            }
179            Ok(SessionOutcome::Fatal(reason)) => {
180                push_log_with_observers(
181                    &logs,
182                    Some(&observers),
183                    "error",
184                    "ws",
185                    &format!("fatal: {reason}"),
186                    None,
187                );
188                return Err(anyhow!("ws fatal: {reason}"));
189            }
190            outcome @ (Ok(SessionOutcome::Disconnected) | Err(_)) => {
191                // A session that successfully connected shouldn't count its later drop toward the
192                // connect-failure cap — only consecutive failures to connect should accumulate, so
193                // a long-lived worker isn't killed by transient mid-session disconnects.
194                if welcomed.load(Ordering::SeqCst) {
195                    attempt = 0;
196                }
197                attempt += 1;
198                if max_attempts > 0 && attempt > max_attempts {
199                    push_log_with_observers(
200                        &logs,
201                        Some(&observers),
202                        "error",
203                        "ws",
204                        &format!("giving up after {attempt} reconnect attempts"),
205                        None,
206                    );
207                    return Err(anyhow!("ws reconnect cap reached"));
208                }
209                let backoff = backoff_for(attempt, schedule);
210                push_log_with_observers(
211                    &logs,
212                    Some(&observers),
213                    "warn",
214                    "ws",
215                    &reconnect_breadcrumb(outcome.as_ref().err(), attempt, backoff),
216                    None,
217                );
218                wait_with_stop(backoff, &stop, schedule.shutdown_tick).await;
219            }
220        }
221    }
222}
223
224/// Outcome of waiting for the server's Welcome (or an error) right
225/// after sending Hello.  Drives the precondition gate that keeps the
226/// heartbeat / log-shipper pumps from racing the studio's async auth
227/// flow.
228enum WelcomeOutcome {
229    Welcomed,
230    AuthFailed(String),
231    Fatal(String),
232    Disconnected,
233}
234
235/// Pull events from the reader until we see a Welcome (success) or an
236/// Error / Disconnect (failure).  Any acks / offers that arrive
237/// before the Welcome are pushed into the logs and discarded — the
238/// studio shouldn't be sending them at this stage, but if it does,
239/// the dispatch loop will pick the next ones up.
240#[cfg_attr(coverage_nightly, coverage(off))]
241async fn wait_for_welcome(
242    event_rx: &mut mpsc::UnboundedReceiver<SessionEvent>,
243    logs: &Arc<Mutex<Vec<LogEntry>>>,
244    observers: &WorkerObservers,
245) -> WelcomeOutcome {
246    while let Some(event) = event_rx.recv().await {
247        match event {
248            SessionEvent::Frame(WorkerOutbound::Welcome {
249                worker_id: wid,
250                server_time,
251            }) => {
252                push_log_with_observers(
253                    logs,
254                    Some(observers),
255                    "info",
256                    "ws",
257                    &welcome_breadcrumb(&wid, &server_time),
258                    None,
259                );
260                return WelcomeOutcome::Welcomed;
261            }
262            SessionEvent::Frame(WorkerOutbound::Error { code, message }) => {
263                push_log_with_observers(
264                    logs,
265                    Some(observers),
266                    "error",
267                    "ws",
268                    &format!("server error before welcome {code:?}: {message}"),
269                    None,
270                );
271                return match code {
272                    crate::ws::types::WorkerErrorCode::AuthFailed => {
273                        WelcomeOutcome::AuthFailed(message)
274                    }
275                    _ => WelcomeOutcome::Fatal(message),
276                };
277            }
278            SessionEvent::Frame(other) => {
279                push_log_with_observers(
280                    logs,
281                    Some(observers),
282                    "warn",
283                    "ws",
284                    &format!("server sent unexpected frame before welcome: {other:?}"),
285                    None,
286                );
287                // Keep waiting — maybe the next frame is Welcome.
288            }
289            SessionEvent::Disconnected(WsClientError::AuthFailed { reason }) => {
290                return WelcomeOutcome::AuthFailed(reason);
291            }
292            SessionEvent::Disconnected(_) => return WelcomeOutcome::Disconnected,
293            SessionEvent::Stopped => return WelcomeOutcome::Disconnected,
294        }
295    }
296    WelcomeOutcome::Disconnected
297}
298
299/// True iff the shared config has both `worker_id` and `auth_token`
300/// populated.  The auto-register flow writes them through on
301/// approval.
302fn has_credentials(cfg: &SharedConfig) -> bool {
303    let guard = cfg.lock();
304    guard
305        .worker_id
306        .as_deref()
307        .map(|s| !s.is_empty())
308        .unwrap_or(false)
309        && guard
310            .auth_token
311            .as_deref()
312            .map(|s| !s.is_empty())
313            .unwrap_or(false)
314}
315
316/// One end-to-end session attempt: connect, hello, run until shutdown
317/// or disconnect.
318#[cfg_attr(coverage_nightly, coverage(off))]
319// Eight collaborators (config + shared flags + observers + schedule + welcomed signal);
320// grouping them adds indirection without improving readability.
321#[allow(clippy::too_many_arguments)]
322async fn run_one_session(
323    cfg: &SharedConfig,
324    stop: &Arc<AtomicBool>,
325    logs: &Arc<Mutex<Vec<LogEntry>>>,
326    busy: &Arc<AtomicBool>,
327    paused: &Arc<AtomicBool>,
328    observers: &WorkerObservers,
329    schedule: SessionSchedule,
330    welcomed: &AtomicBool,
331) -> Result<SessionOutcome> {
332    let (api_base_url, worker_id, auth_token) = {
333        let guard = cfg.lock();
334        (
335            guard.api_base_url.clone(),
336            guard.worker_id.clone().unwrap_or_default(),
337            guard.auth_token.clone().unwrap_or_default(),
338        )
339    };
340    if worker_id.is_empty() || auth_token.is_empty() {
341        return Ok(SessionOutcome::Fatal(
342            "worker_id or auth_token missing; run register".to_string(),
343        ));
344    }
345
346    push_log_with_observers(
347        logs,
348        Some(observers),
349        "info",
350        "ws",
351        &format!("connecting to {api_base_url}"),
352        None,
353    );
354    let client = match connect(&api_base_url, &worker_id, &auth_token).await {
355        Ok(c) => c,
356        Err(WsClientError::AuthFailed { reason }) => {
357            return Ok(SessionOutcome::AuthFailed(reason));
358        }
359        Err(e) => {
360            push_log_with_observers(
361                logs,
362                Some(observers),
363                "warn",
364                "ws",
365                &format!("connect failed: {e}"),
366                None,
367            );
368            return Ok(SessionOutcome::Disconnected);
369        }
370    };
371    let (sender, receiver) = client.split();
372
373    // Send hello with the current capabilities.
374    let engine = crate::engine::build(&cfg.lock())?;
375    let capabilities = crate::runtime::build_capabilities_with(
376        &cfg.lock(),
377        &*engine,
378        !paused.load(Ordering::SeqCst),
379    );
380    // Record exactly what we're about to advertise so the worker's logs
381    // (and the studio's shipped-log view) show the offered kinds /
382    // models / VRAM budget — otherwise the handshake is opaque and
383    // "why won't it claim X jobs" can't be answered from the logs.
384    push_log_with_observers(
385        logs,
386        Some(observers),
387        "info",
388        "ws",
389        &crate::runtime::summarize_capabilities(&capabilities),
390        None,
391    );
392    // A threshold above the card's detected VRAM makes the studio offer
393    // jobs this GPU can't fit — they OOM on load.  Flag the
394    // misconfiguration on the handshake so the OOM has an operator-facing
395    // cause instead of surfacing only as a failed job.
396    if let Some(warning) = crate::runtime::vram_threshold_warning(&capabilities) {
397        push_log_with_observers(logs, Some(observers), "warn", "ws", &warning, None);
398    }
399    sender
400        .send(&WorkerInbound::Hello(HelloFrame {
401            auth_token: auth_token.clone(),
402            capabilities: capabilities.clone(),
403        }))
404        .await
405        .map_err(|e| anyhow!("hello send failed: {e}"))?;
406    info!(target: TRACE_TARGET, worker_id = %worker_id, "hello sent");
407
408    let (event_tx, event_rx) = mpsc::unbounded_channel::<SessionEvent>();
409
410    // Reader task: pump frames into the event channel.
411    let reader = spawn_reader(receiver, event_tx.clone(), schedule.read_idle_timeout);
412
413    // Wait for the server's `Welcome` (or an error) before starting
414    // the heartbeat / log-shipper pumps.  Without this gate, the
415    // first heartbeat fires immediately (tokio `interval()` returns
416    // at t=0) and races the studio's async Hello-auth flow: a
417    // heartbeat arriving while the session is still marked
418    // `authenticated: false` server-side gets rejected with
419    // `protocol_violation: session not authenticated`, killing the
420    // session.
421    let mut event_rx = event_rx;
422    match wait_for_welcome(&mut event_rx, logs, observers).await {
423        WelcomeOutcome::Welcomed => welcomed.store(true, Ordering::SeqCst),
424        WelcomeOutcome::AuthFailed(reason) => {
425            let _ = sender.close(1000, "auth failed").await;
426            let _ = reader.await;
427            return Ok(SessionOutcome::AuthFailed(reason));
428        }
429        WelcomeOutcome::Fatal(reason) => {
430            let _ = sender.close(1000, "protocol violation").await;
431            let _ = reader.await;
432            return Ok(SessionOutcome::Fatal(reason));
433        }
434        WelcomeOutcome::Disconnected => {
435            let _ = reader.await;
436            return Ok(SessionOutcome::Disconnected);
437        }
438    }
439
440    // Heartbeat task.  Reuses the engine handle built for the Hello
441    // frame (rebuilding fires every engine's registration log every
442    // 5s and floods the logs) but rebuilds the capability snapshot
443    // from the live config each tick, so operator edits (e.g. a new
444    // VRAM threshold saved from the UI's Config tab) reach the studio
445    // without waiting for a reconnect.
446    let engine_arc: Arc<dyn Engine> = engine.into();
447    let heartbeat = spawn_heartbeat_pump(
448        cfg.clone(),
449        engine_arc.clone(),
450        sender.clone(),
451        stop.clone(),
452        paused.clone(),
453        logs.clone(),
454        observers.clone(),
455        schedule,
456    );
457
458    // Log shipper task.
459    let log_shipper = spawn_log_shipper_pump(sender.clone(), logs.clone(), stop.clone(), schedule);
460
461    // Shutdown observer: ticks until stop flag is set, then drops the channel.
462    let shutdown_observer = spawn_shutdown_observer(stop.clone(), event_tx.clone(), schedule);
463    drop(event_tx);
464
465    let ctx = SessionContext {
466        sender: sender.clone(),
467        engine: engine_arc,
468        logs: logs.clone(),
469        busy: busy.clone(),
470        paused: paused.clone(),
471        observers: observers.clone(),
472        api_base_url: api_base_url.clone(),
473        worker_id: worker_id.clone(),
474        auth_token: auth_token.clone(),
475    };
476    let outcome = run_dispatch_loop(ctx, event_rx).await;
477
478    // The session is ending (disconnect or shutdown). The heartbeat / log-shipper /
479    // shutdown-observer pumps only break on the *global* stop flag or a send failure, so on a
480    // silent-but-open socket — where heartbeat sends still succeed into the TCP buffer — they would
481    // loop forever and block this function from returning, which is exactly the post-job reconnect
482    // hang. Abort them so teardown is bounded regardless of socket state, then best-effort close +
483    // drain the aborted handles (await returns promptly with Cancelled).
484    reader.abort();
485    heartbeat.abort();
486    log_shipper.abort();
487    shutdown_observer.abort();
488    let _ = sender.close(1000, "session ended").await;
489    let _ = reader.await;
490    let _ = heartbeat.await;
491    let _ = log_shipper.await;
492    let _ = shutdown_observer.await;
493    Ok(outcome)
494}
495
496/// All the events the dispatch loop reacts to.
497#[derive(Debug)]
498enum SessionEvent {
499    /// Frame arrived from the server.
500    Frame(WorkerOutbound),
501    /// Engine task finished (success or fail already reported).
502    Stopped,
503    /// Reader hit EOF / error.
504    Disconnected(WsClientError),
505}
506
507/// Bundle of immutable per-session settings the dispatcher passes
508/// around — keeps clippy's `too_many_arguments` lint happy.  Cloning
509/// is cheap: every field is an `Arc`, a cloneable sender, or a small
510/// `String`.
511#[derive(Clone)]
512struct SessionContext {
513    sender: WsSender,
514    engine: Arc<dyn Engine>,
515    logs: Arc<Mutex<Vec<LogEntry>>>,
516    busy: Arc<AtomicBool>,
517    paused: Arc<AtomicBool>,
518    observers: WorkerObservers,
519    api_base_url: String,
520    worker_id: String,
521    auth_token: String,
522}
523
524#[cfg_attr(coverage_nightly, coverage(off))]
525async fn run_dispatch_loop(
526    ctx: SessionContext,
527    mut event_rx: mpsc::UnboundedReceiver<SessionEvent>,
528) -> SessionOutcome {
529    while let Some(event) = event_rx.recv().await {
530        match event {
531            SessionEvent::Disconnected(WsClientError::AuthFailed { reason }) => {
532                return SessionOutcome::AuthFailed(reason);
533            }
534            SessionEvent::Disconnected(_) => return SessionOutcome::Disconnected,
535            SessionEvent::Stopped => return SessionOutcome::Stopped,
536            SessionEvent::Frame(frame) => match frame {
537                WorkerOutbound::Welcome {
538                    worker_id: wid,
539                    server_time,
540                } => {
541                    push_log_with_observers(
542                        &ctx.logs,
543                        Some(&ctx.observers),
544                        "info",
545                        "ws",
546                        &welcome_breadcrumb(&wid, &server_time),
547                        None,
548                    );
549                }
550                WorkerOutbound::Offer { claim } => {
551                    handle_offer(&ctx, *claim);
552                }
553                WorkerOutbound::Error { code, message } => {
554                    push_log_with_observers(
555                        &ctx.logs,
556                        Some(&ctx.observers),
557                        "error",
558                        "ws",
559                        &format!("server error {code:?}: {message}"),
560                        None,
561                    );
562                    return match code {
563                        crate::ws::types::WorkerErrorCode::AuthFailed => {
564                            SessionOutcome::AuthFailed(message)
565                        }
566                        _ => SessionOutcome::Fatal(message),
567                    };
568                }
569                WorkerOutbound::CompleteAck { job_id } => {
570                    push_log_with_observers(
571                        &ctx.logs,
572                        Some(&ctx.observers),
573                        "info",
574                        "ws",
575                        &result_ack_breadcrumb("completion", &job_id),
576                        Some(job_id),
577                    );
578                }
579                WorkerOutbound::FailAck { job_id } => {
580                    push_log_with_observers(
581                        &ctx.logs,
582                        Some(&ctx.observers),
583                        "info",
584                        "ws",
585                        &result_ack_breadcrumb("failure", &job_id),
586                        Some(job_id),
587                    );
588                }
589                WorkerOutbound::HeartbeatAck => {
590                    // Heartbeat acks fire every ~5s; logging each would
591                    // flood the operator log with no diagnostic value
592                    // (a genuinely missed ack already surfaces via the
593                    // read-idle timeout + reconnect breadcrumb).
594                }
595            },
596        }
597    }
598    SessionOutcome::Disconnected
599}
600
601#[cfg_attr(coverage_nightly, coverage(off))]
602fn handle_offer(ctx: &SessionContext, claim: JobOfferClaim) {
603    let job_id = claim.job_id.clone();
604    push_log_with_observers(
605        &ctx.logs,
606        Some(&ctx.observers),
607        "info",
608        "ws",
609        &offer_received_breadcrumb(
610            &job_id,
611            &claim.game_id,
612            &claim.asset_name,
613            &claim.model,
614            claim.vram_gb_estimate,
615        ),
616        Some(job_id.clone()),
617    );
618    // Operator pressed Pause: reject the offer so the studio retries
619    // on a different worker (or requeues until we resume).  No engine
620    // dispatch, no busy flag flip.
621    if ctx.paused.load(Ordering::SeqCst) {
622        push_log_with_observers(
623            &ctx.logs,
624            Some(&ctx.observers),
625            "info",
626            "ws",
627            &format!("rejecting offer {job_id}: worker is paused"),
628            Some(job_id.clone()),
629        );
630        spawn_reject_offer(
631            ctx.sender.clone(),
632            ctx.logs.clone(),
633            ctx.observers.clone(),
634            job_id,
635            "worker paused by operator",
636            crate::ws::types::RejectCode::Paused,
637        );
638        return;
639    }
640    if !try_reserve_worker(&ctx.busy) {
641        push_log_with_observers(
642            &ctx.logs,
643            Some(&ctx.observers),
644            "info",
645            "ws",
646            &format!("rejecting offer {job_id}: worker is already busy"),
647            Some(job_id.clone()),
648        );
649        spawn_reject_offer(
650            ctx.sender.clone(),
651            ctx.logs.clone(),
652            ctx.observers.clone(),
653            job_id,
654            "worker already has an in-flight job",
655            crate::ws::types::RejectCode::Busy,
656        );
657        return;
658    }
659    let job = claim.into_job_claim();
660    let task_kind = job.task.kind();
661    // The FULL prompt goes back to the studio (and to the engine).
662    // The bounded preview (`truncate_prompt`) is only for the UI's
663    // Jobs tab so the in-memory observer ring stays small even when
664    // LLM prompts are huge.  Mixing the two used to send the
665    // truncated 200-char preview as the `prompt` form field on the
666    // multipart `/complete`, which the studio then persisted onto the
667    // row — mangling every operator-facing prompt in the DB.
668    let full_prompt = prompt_for(&job.task);
669    let prompt_preview = truncate_prompt(&full_prompt);
670    let started_at = chrono::Utc::now();
671
672    let ctx = ctx.clone();
673    tokio::spawn(async move {
674        let accept_result = ctx
675            .sender
676            .send(&WorkerInbound::Accept {
677                job_id: job_id.clone(),
678            })
679            .await;
680        if let Some((level, message)) = offer_response_breadcrumb("accept", &job_id, &accept_result)
681        {
682            push_log_with_observers(
683                &ctx.logs,
684                Some(&ctx.observers),
685                level,
686                "ws",
687                &message,
688                Some(job_id.clone()),
689            );
690        }
691        if accept_result.is_err() {
692            ctx.busy.store(false, Ordering::SeqCst);
693            return;
694        }
695
696        // Surface the job to the UI's Jobs tab — bounded preview only.
697        *ctx.observers.current_job.lock() = Some(CurrentJob {
698            job_id: job_id.clone(),
699            kind: task_kind,
700            model: job.model.clone(),
701            prompt: prompt_preview.clone(),
702            started_at,
703        });
704
705        run_offered_job(
706            &ctx,
707            job,
708            started_at,
709            task_kind,
710            full_prompt,
711            prompt_preview,
712        )
713        .await;
714        ctx.busy.store(false, Ordering::SeqCst);
715    });
716}
717
718fn try_reserve_worker(busy: &AtomicBool) -> bool {
719    busy.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
720        .is_ok()
721}
722
723fn spawn_reject_offer(
724    sender: WsSender,
725    logs: Arc<Mutex<Vec<LogEntry>>>,
726    observers: WorkerObservers,
727    job_id: String,
728    reason: &'static str,
729    code: crate::ws::types::RejectCode,
730) {
731    tokio::spawn(async move {
732        let result = sender
733            .send(&WorkerInbound::Reject {
734                job_id: job_id.clone(),
735                reason: reason.to_string(),
736                code: Some(code),
737            })
738            .await;
739        if let Some((level, message)) = offer_response_breadcrumb("reject", &job_id, &result) {
740            push_log_with_observers(&logs, Some(&observers), level, "ws", &message, Some(job_id));
741        }
742    });
743}
744
745#[cfg_attr(coverage_nightly, coverage(off))]
746async fn run_offered_job(
747    ctx: &SessionContext,
748    job: crate::types::JobClaim,
749    started_at: chrono::DateTime<chrono::Utc>,
750    task_kind: crate::types::TaskKind,
751    full_prompt: String,
752    prompt_preview: String,
753) {
754    let start = std::time::Instant::now();
755    // Pass the studio's `ModelSource` to the engine so sd-cpp /
756    // llama-cpp know which files to load.  Required on every offer
757    // — the studio refuses to promote a job without a model source
758    // and the worker refuses any claim that lacks one.
759    let dispatch = tokio::task::spawn_blocking({
760        let model = job.model.clone();
761        let model_source = job.model_source.clone();
762        let task_for_engine = job.task.clone();
763        let engine = ctx.engine.clone();
764        move || -> Result<TaskResult> {
765            engine.dispatch_with_source(&model, task_for_engine, &model_source)
766        }
767    })
768    .await;
769
770    let job_id = job.job_id.clone();
771    // Every arm produces the outcome as a value, so the compiler
772    // proves the RecentJob ring always records a real outcome — no
773    // mutable default that survives a forgotten assignment.
774    let outcome = match dispatch {
775        Ok(Ok(result)) => {
776            push_log_with_observers(
777                &ctx.logs,
778                Some(&ctx.observers),
779                "info",
780                "ws",
781                &format!("{} dispatched in {:?}", task_kind.as_str(), start.elapsed()),
782                Some(job_id.clone()),
783            );
784            deliver_result(ctx, &job_id, result, &full_prompt).await
785        }
786        Ok(Err(e)) => {
787            warn!(target: TRACE_TARGET, error = %e, "engine dispatch failed");
788            push_log_with_observers(
789                &ctx.logs,
790                Some(&ctx.observers),
791                "error",
792                "ws",
793                &format!("dispatch failed: {e}"),
794                Some(job_id.clone()),
795            );
796            let fail_result = ctx
797                .sender
798                .send(&WorkerInbound::Fail {
799                    job_id: job_id.clone(),
800                    error: e.to_string(),
801                    retryable: !is_unsupported_kind(&e),
802                })
803                .await;
804            record_fail_send(&fail_result, &job_id, &ctx.logs, &ctx.observers);
805            JobOutcome::Failed {
806                reason: e.to_string(),
807            }
808        }
809        Err(e) => {
810            push_log_with_observers(
811                &ctx.logs,
812                Some(&ctx.observers),
813                "error",
814                "ws",
815                &format!("dispatch task panic: {e}"),
816                Some(job_id.clone()),
817            );
818            let fail_result = ctx
819                .sender
820                .send(&WorkerInbound::Fail {
821                    job_id: job_id.clone(),
822                    error: e.to_string(),
823                    retryable: true,
824                })
825                .await;
826            record_fail_send(&fail_result, &job_id, &ctx.logs, &ctx.observers);
827            JobOutcome::Failed {
828                reason: e.to_string(),
829            }
830        }
831    };
832
833    // Surface the finished job to the UI: clear the current-job slot
834    // and push a RecentJob entry into the ring.
835    *ctx.observers.current_job.lock() = None;
836    record_recent_job(
837        &ctx.observers,
838        RecentJob {
839            job_id: job_id.clone(),
840            kind: task_kind,
841            model: job.model.clone(),
842            prompt: prompt_preview,
843            outcome,
844            started_at,
845            finished_at: chrono::Utc::now(),
846        },
847    );
848}
849
850/// Deliver a successful engine result to the studio and return the
851/// outcome to record.  Binary outputs travel the multipart HTTP
852/// `/complete` route (R2 doesn't fit in WS frames); JSON outputs
853/// travel the WS `completeJson` frame.
854#[cfg_attr(coverage_nightly, coverage(off))]
855async fn deliver_result(
856    ctx: &SessionContext,
857    job_id: &str,
858    result: TaskResult,
859    full_prompt: &str,
860) -> JobOutcome {
861    match result {
862        TaskResult::Image { bytes, ext }
863        | TaskResult::AudioTts { bytes, ext }
864        | TaskResult::Video { bytes, ext } => {
865            let upload_result = tokio::task::spawn_blocking({
866                let api_base_url = ctx.api_base_url.clone();
867                let job_id = job_id.to_string();
868                let auth_token = ctx.auth_token.clone();
869                let worker_id = ctx.worker_id.clone();
870                let prompt = full_prompt.to_string();
871                move || -> Result<()> {
872                    let api = ApiClient::new(api_base_url)?;
873                    api.complete_with_retry(
874                        &worker_id,
875                        &auth_token,
876                        &job_id,
877                        &ext,
878                        &prompt,
879                        bytes,
880                        UPLOAD_RETRIES,
881                        UPLOAD_RETRY_PAUSE,
882                    )
883                }
884            })
885            .await;
886            let msg = match upload_result {
887                Ok(Ok(())) => None,
888                Ok(Err(e)) => Some(e.to_string()),
889                Err(e) => Some(format!("upload task panic: {e}")),
890            };
891            match msg {
892                Some(msg) => {
893                    push_log_with_observers(
894                        &ctx.logs,
895                        Some(&ctx.observers),
896                        "error",
897                        "ws",
898                        &msg,
899                        Some(job_id.to_string()),
900                    );
901                    let fail_result = ctx
902                        .sender
903                        .send(&WorkerInbound::Fail {
904                            job_id: job_id.to_string(),
905                            error: msg.clone(),
906                            retryable: true,
907                        })
908                        .await;
909                    record_fail_send(&fail_result, job_id, &ctx.logs, &ctx.observers);
910                    JobOutcome::Failed { reason: msg }
911                }
912                None => {
913                    push_log_with_observers(
914                        &ctx.logs,
915                        Some(&ctx.observers),
916                        "info",
917                        "ws",
918                        "binary upload ok",
919                        Some(job_id.to_string()),
920                    );
921                    // The studio's HTTP `/complete` handler defers a
922                    // `notifyJobCompleted` RPC to the
923                    // WorkerConnections DO; that's the canonical
924                    // "offer next job" nudge.  Sending an extra
925                    // `ReadyForMore` here races that flow: both can
926                    // call `offerNextFor` concurrently, double-
927                    // reserve the session's `currentJob` slot, and
928                    // ship two `Offer` frames — the second `Accept`
929                    // then trips the studio's `session not
930                    // authenticated`-shaped `accept for unknown
931                    // jobId` invariant and the DO kills the
932                    // session.  See:
933                    //   apps/studio/src/worker/modules/graphics/
934                    //     WorkerConnections/orchestrator.ts (commitOffer)
935                    JobOutcome::Completed
936                }
937            }
938        }
939        TaskResult::Llm { json } | TaskResult::AudioStt { json } => {
940            // Mirror the binary path: branch on the send result so a
941            // dropped `completeJson` frame is recorded as a failure
942            // (never a false-positive `Completed`) and a successful
943            // send leaves an explicit completion breadcrumb, symmetric
944            // with the binary path's "binary upload ok".
945            match ctx
946                .sender
947                .send(&WorkerInbound::CompleteJson {
948                    job_id: job_id.to_string(),
949                    result: json,
950                    prompt: Some(full_prompt.to_string()),
951                })
952                .await
953            {
954                Ok(()) => {
955                    push_log_with_observers(
956                        &ctx.logs,
957                        Some(&ctx.observers),
958                        "info",
959                        "ws",
960                        "json result sent",
961                        Some(job_id.to_string()),
962                    );
963                    JobOutcome::Completed
964                }
965                Err(e) => {
966                    let msg = format!("failed to send result: {e}");
967                    push_log_with_observers(
968                        &ctx.logs,
969                        Some(&ctx.observers),
970                        "error",
971                        "ws",
972                        &msg,
973                        Some(job_id.to_string()),
974                    );
975                    JobOutcome::Failed { reason: msg }
976                }
977            }
978        }
979    }
980}
981
982#[cfg_attr(coverage_nightly, coverage(off))]
983fn spawn_reader(
984    mut receiver: crate::ws::client::WsReceiver,
985    event_tx: mpsc::UnboundedSender<SessionEvent>,
986    read_idle_timeout: Duration,
987) -> tokio::task::JoinHandle<()> {
988    tokio::spawn(async move {
989        loop {
990            // Bound the wait so a half-open / dead-peer socket can't block the reader forever.
991            // A live studio acks every heartbeat (~5s), so a frame always lands well inside the
992            // window; elapsing it means the connection is gone and the session must reconnect.
993            match tokio::time::timeout(read_idle_timeout, receiver.recv()).await {
994                Ok(Ok(Some(frame))) => {
995                    if event_tx.send(SessionEvent::Frame(frame)).is_err() {
996                        break;
997                    }
998                }
999                Ok(Ok(None)) => {
1000                    let _ =
1001                        event_tx.send(SessionEvent::Disconnected(WsClientError::ConnectionClosed));
1002                    break;
1003                }
1004                Ok(Err(e)) => {
1005                    let _ = event_tx.send(SessionEvent::Disconnected(e));
1006                    break;
1007                }
1008                Err(_elapsed) => {
1009                    let _ = event_tx.send(SessionEvent::Disconnected(WsClientError::Transport(
1010                        format!(
1011                            "no frames from server for {:?}; treating connection as dead",
1012                            read_idle_timeout
1013                        ),
1014                    )));
1015                    break;
1016                }
1017            }
1018        }
1019    })
1020}
1021
1022#[cfg_attr(coverage_nightly, coverage(off))]
1023// Eight collaborators (config + engine + sender + shared flags + logs + observers + schedule);
1024// grouping them adds indirection without improving readability.
1025#[allow(clippy::too_many_arguments)]
1026fn spawn_heartbeat_pump(
1027    cfg: SharedConfig,
1028    engine: Arc<dyn Engine>,
1029    sender: WsSender,
1030    stop: Arc<AtomicBool>,
1031    paused: Arc<AtomicBool>,
1032    logs: Arc<Mutex<Vec<LogEntry>>>,
1033    observers: WorkerObservers,
1034    schedule: SessionSchedule,
1035) -> tokio::task::JoinHandle<()> {
1036    tokio::spawn(async move {
1037        let mut interval = tokio::time::interval(schedule.heartbeat);
1038        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1039        // Seed with the pause flag's value at session start so the first
1040        // tick never logs a spurious transition; only genuine operator
1041        // toggles during the session ship a breadcrumb.
1042        let mut last_paused = paused.load(Ordering::SeqCst);
1043        loop {
1044            interval.tick().await;
1045            if stop.load(Ordering::SeqCst) {
1046                break;
1047            }
1048            // A Pause / Resume from any source (Status tab, tray menu)
1049            // only emits a local `tracing` breadcrumb; ship the actual
1050            // transition so the studio's shipped-log view and the UI's
1051            // Logs tab record why the worker started / stopped claiming.
1052            let now_paused = paused.load(Ordering::SeqCst);
1053            if let Some(message) = pause_transition_breadcrumb(last_paused, now_paused) {
1054                push_log_with_observers(&logs, Some(&observers), "info", "ws", message, None);
1055            }
1056            last_paused = now_paused;
1057            // Rebuild the snapshot from the live config so operator
1058            // edits (VRAM threshold, auto-start) propagate on the
1059            // next tick instead of on the next reconnect.
1060            let caps = crate::runtime::build_capabilities_with(&cfg.lock(), &*engine, !now_paused);
1061            let current_job_id = heartbeat_current_job_id(&observers);
1062            if let Err(e) = sender
1063                .send(&WorkerInbound::Heartbeat {
1064                    capabilities: caps,
1065                    current_job_id,
1066                })
1067                .await
1068            {
1069                warn!(target: TRACE_TARGET, error = %e, "heartbeat send failed");
1070                break;
1071            }
1072        }
1073    })
1074}
1075
1076fn heartbeat_current_job_id(observers: &WorkerObservers) -> Option<String> {
1077    observers
1078        .current_job
1079        .lock()
1080        .as_ref()
1081        .map(|job| job.job_id.clone())
1082}
1083
1084#[cfg_attr(coverage_nightly, coverage(off))]
1085fn spawn_log_shipper_pump(
1086    sender: WsSender,
1087    logs: Arc<Mutex<Vec<LogEntry>>>,
1088    stop: Arc<AtomicBool>,
1089    schedule: SessionSchedule,
1090) -> tokio::task::JoinHandle<()> {
1091    tokio::spawn(async move {
1092        let mut interval = tokio::time::interval(schedule.log_flush);
1093        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1094        loop {
1095            interval.tick().await;
1096            if stop.load(Ordering::SeqCst) {
1097                break;
1098            }
1099            let batch = {
1100                let mut guard = logs.lock();
1101                if guard.is_empty() {
1102                    continue;
1103                }
1104                std::mem::take(&mut *guard)
1105            };
1106            let frame = WorkerInbound::LogBatch { entries: batch };
1107            if let Err(e) = sender.send(&frame).await {
1108                warn!(target: TRACE_TARGET, error = %e, "log batch send failed; requeueing batch");
1109                // Put the batch back so it ships on the next session
1110                // instead of vanishing with this one.
1111                if let WorkerInbound::LogBatch { entries } = frame {
1112                    crate::runtime::restore_unshipped(&logs, entries);
1113                }
1114                break;
1115            }
1116        }
1117    })
1118}
1119
1120#[cfg_attr(coverage_nightly, coverage(off))]
1121fn spawn_shutdown_observer(
1122    stop: Arc<AtomicBool>,
1123    event_tx: mpsc::UnboundedSender<SessionEvent>,
1124    schedule: SessionSchedule,
1125) -> tokio::task::JoinHandle<()> {
1126    tokio::spawn(async move {
1127        loop {
1128            tokio::time::sleep(schedule.shutdown_tick).await;
1129            if stop.load(Ordering::SeqCst) {
1130                let _ = event_tx.send(SessionEvent::Stopped);
1131                break;
1132            }
1133            if event_tx.is_closed() {
1134                break;
1135            }
1136        }
1137    })
1138}
1139
1140fn backoff_for(attempt: u32, schedule: SessionSchedule) -> Duration {
1141    let factor = 2u64.saturating_pow(attempt.saturating_sub(1));
1142    let raw_ms = schedule.base_backoff_ms.saturating_mul(factor);
1143    Duration::from_millis(raw_ms.min(schedule.max_backoff_ms))
1144}
1145
1146/// Build the operator breadcrumb for a session that dropped or never
1147/// established, surfacing the underlying error so a reconnect loop is
1148/// never opaque about *why* it's retrying.
1149///
1150/// A plain mid-session disconnect (`Ok(SessionOutcome::Disconnected)`)
1151/// carries no error and keeps the legacy "disconnected; reconnect
1152/// attempt …" wording that the studio-shipped log view and the
1153/// `ws_session_full_loop` contract test key on.  An `Err` outcome — an
1154/// engine that failed to build, or a `hello` frame that never made it
1155/// onto the wire — previously vanished into that same wording with no
1156/// cause, leaving an operator staring at an endless reconnect loop with
1157/// nothing to diagnose.  The full anyhow chain (`{:#}`) rides along so a
1158/// wrapped root cause isn't truncated to its outer context.  Pure so
1159/// the wording is unit-tested without a live WS round-trip.
1160fn reconnect_breadcrumb(error: Option<&anyhow::Error>, attempt: u32, backoff: Duration) -> String {
1161    let in_ms = backoff.as_millis();
1162    match error {
1163        Some(e) => format!("session error: {e:#}; reconnect attempt {attempt} in {in_ms}ms"),
1164        None => format!("disconnected; reconnect attempt {attempt} in {in_ms}ms"),
1165    }
1166}
1167
1168/// Operator-facing breadcrumb for the studio's `Welcome` frame.
1169///
1170/// The studio stamps `server_time` (its clock at the moment it
1171/// authenticated this worker) onto every `Welcome`, but it used to be
1172/// deserialised and dropped — the line named only the worker id. With
1173/// it surfaced, an operator can spot clock skew between the worker host
1174/// and the studio straight from the UI's Logs tab and the
1175/// studio-shipped log view: skew distorts heartbeat-timeout reasoning,
1176/// auth-token expiry windows, and log-timestamp correlation across the
1177/// two sides. Pure so the wording is unit-tested without a live
1178/// welcome.
1179fn welcome_breadcrumb(worker_id: &str, server_time: &str) -> String {
1180    format!("server welcomed {worker_id} server_time={server_time}")
1181}
1182
1183/// Operator-facing breadcrumb summarising an incoming job offer.
1184///
1185/// The studio populates `game_id` + `asset_name` on every offer, but
1186/// they used to be deserialised and dropped — the line only named the
1187/// model + vram estimate, so a worker fielding offers across many games
1188/// gave no clue which game / asset each job served. Surfacing both
1189/// (data already on the wire) lets operators triage "which game's jobs
1190/// are failing on this box" straight from the UI's Logs tab and the
1191/// studio-shipped log view. Pure so the wording is unit-tested without
1192/// a live offer.
1193fn offer_received_breadcrumb(
1194    job_id: &str,
1195    game_id: &str,
1196    asset_name: &str,
1197    model: &str,
1198    vram_gb_estimate: f32,
1199) -> String {
1200    format!(
1201        "offer received {job_id} game={game_id} asset={asset_name} model={model} vram={vram_gb_estimate}"
1202    )
1203}
1204
1205/// Operator-facing breadcrumb for the studio's `CompleteAck` /
1206/// `FailAck` frames.
1207///
1208/// The studio sends one of these the moment it has persisted a job's
1209/// result (the binary landed in R2, or the `completeJson` / `Fail`
1210/// frame updated the row). Both used to be silently dropped on the
1211/// "acks are best-effort; ignore" arm, so the worker's own
1212/// "binary upload ok" / completeJson breadcrumb was the last word on a
1213/// job: an operator triaging a job that ran twice (worker reported
1214/// done, studio never persisted, the job timed out + requeued) had no
1215/// signal telling them whether the studio ever acknowledged the
1216/// result. Surfacing the ack closes the job lifecycle in the UI's Logs
1217/// tab and the studio-shipped log view. `HeartbeatAck` stays unlogged:
1218/// it fires every ~5s and a genuinely missed ack already surfaces via
1219/// the read-idle timeout + reconnect breadcrumb. Pure so the wording is
1220/// unit-tested without a live ack.
1221fn result_ack_breadcrumb(outcome: &str, job_id: &str) -> String {
1222    format!("studio confirmed {outcome} of job {job_id}")
1223}
1224
1225/// Decide whether a just-attempted offer-response send (accept /
1226/// reject) warrants a session-level breadcrumb.
1227///
1228/// Returns `None` on success: the happy path is already implied by the
1229/// surrounding "dispatched" / "rejecting offer: paused" breadcrumbs, so
1230/// re-logging it would only add per-job noise.  Returns
1231/// `Some(("error", …))` when the send failed — a dropped accept leaves
1232/// the worker running a job the studio never marked accepted, and a
1233/// dropped reject leaves the offer reserved on a paused worker until it
1234/// times out.  The transport layer already logs the failure locally on
1235/// `studio_worker::ws::client`, but only a session-level breadcrumb
1236/// reaches the UI's Logs tab and the studio-shipped log view with the
1237/// offending `job_id` attached.  Pure so the wording + level are
1238/// unit-tested without a live WS sink.
1239fn offer_response_breadcrumb(
1240    label: &str,
1241    job_id: &str,
1242    result: &WsResult<()>,
1243) -> Option<(&'static str, String)> {
1244    match result {
1245        Ok(()) => None,
1246        Err(e) => Some((
1247            "error",
1248            format!("{label} send failed for offer {job_id}: {e}"),
1249        )),
1250    }
1251}
1252
1253/// Decide whether a just-attempted `Fail`-frame send warrants a
1254/// session-level breadcrumb.
1255///
1256/// Returns `None` on success: the caller already logged the underlying
1257/// job failure (the upload error, dispatch error, or panic), so a `Fail`
1258/// frame that lands needs no second per-job line.  Returns
1259/// `Some(("error", …))` when the send itself failed — a dropped `Fail`
1260/// leaves the studio believing the job is still in flight (reserved on
1261/// the session's `currentJob` slot) until it times out, with no local
1262/// record that the notification never landed.  The transport layer logs
1263/// the drop locally on `studio_worker::ws::client`, but only a
1264/// session-level breadcrumb reaches the UI's Logs tab and the
1265/// studio-shipped log view with the offending `job_id` attached.  Pure
1266/// so the wording + level are unit-tested without a live WS sink.
1267fn fail_send_breadcrumb(job_id: &str, result: &WsResult<()>) -> Option<(&'static str, String)> {
1268    match result {
1269        Ok(()) => None,
1270        Err(e) => Some((
1271            "error",
1272            format!("failed to notify studio of job {job_id} failure: {e}"),
1273        )),
1274    }
1275}
1276
1277/// Push a session-level breadcrumb when a `Fail`-frame send dropped.
1278///
1279/// Trivial glue over [`fail_send_breadcrumb`]: the three job-failure
1280/// arms (upload error, dispatch error, dispatch panic) all notify the
1281/// studio with a `Fail` frame and then call this, so a dropped
1282/// notification is recorded with the `job_id` attached instead of being
1283/// swallowed by `let _ = sender.send(...)`.
1284fn record_fail_send(
1285    result: &WsResult<()>,
1286    job_id: &str,
1287    logs: &Arc<Mutex<Vec<LogEntry>>>,
1288    observers: &WorkerObservers,
1289) {
1290    if let Some((level, message)) = fail_send_breadcrumb(job_id, result) {
1291        push_log_with_observers(
1292            logs,
1293            Some(observers),
1294            level,
1295            "ws",
1296            &message,
1297            Some(job_id.to_string()),
1298        );
1299    }
1300}
1301
1302/// Operator-facing breadcrumb for a change in the runtime pause flag,
1303/// or `None` when the flag is unchanged since the previous heartbeat
1304/// tick.
1305///
1306/// A Pause / Resume from the Status tab or tray menu only emits a local
1307/// `tracing` breadcrumb (stdout / Sentry) naming the source; it never
1308/// enters the worker's shipped log stream.  So the studio's shipped-log
1309/// view and the UI's Logs tab used to show `auto_enabled=false`
1310/// heartbeats with no record of *why* the worker stopped claiming.  The
1311/// heartbeat pump calls this each tick and ships the transition through
1312/// `push_log_with_observers`, so a toggle from *any* source reaches the
1313/// operator-facing surfaces.  Pure so the wording is unit-tested without
1314/// driving the pump.
1315fn pause_transition_breadcrumb(prev: bool, now: bool) -> Option<&'static str> {
1316    match (prev, now) {
1317        (false, true) => Some("claiming paused by operator; new offers are rejected until resumed"),
1318        (true, false) => Some("claiming resumed by operator; accepting new offers again"),
1319        _ => None,
1320    }
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325    use super::*;
1326
1327    #[test]
1328    fn offer_response_breadcrumb_is_silent_on_success() {
1329        // The happy path is already implied by the surrounding
1330        // "dispatched" / "rejecting offer: paused" breadcrumbs, so a
1331        // successful accept / reject send must not add per-job noise.
1332        assert!(offer_response_breadcrumb("accept", "j-1", &Ok(())).is_none());
1333        assert!(offer_response_breadcrumb("reject", "j-2", &Ok(())).is_none());
1334    }
1335
1336    #[test]
1337    fn try_reserve_worker_only_allows_one_in_flight_job() {
1338        let busy = AtomicBool::new(false);
1339        assert!(try_reserve_worker(&busy));
1340        assert!(!try_reserve_worker(&busy));
1341    }
1342
1343    #[test]
1344    fn heartbeat_current_job_id_uses_actual_job_id() {
1345        let observers = WorkerObservers::default();
1346        assert_eq!(heartbeat_current_job_id(&observers), None);
1347        *observers.current_job.lock() = Some(CurrentJob {
1348            job_id: "job-42".into(),
1349            kind: crate::types::TaskKind::Image,
1350            model: "synthetic".into(),
1351            prompt: "prompt".into(),
1352            started_at: chrono::Utc::now(),
1353        });
1354        assert_eq!(
1355            heartbeat_current_job_id(&observers).as_deref(),
1356            Some("job-42")
1357        );
1358    }
1359
1360    #[test]
1361    fn offer_response_breadcrumb_reports_accept_send_failure() {
1362        let (level, msg) =
1363            offer_response_breadcrumb("accept", "j-1", &Err(WsClientError::ConnectionClosed))
1364                .expect("a failed accept send must surface a breadcrumb");
1365        assert_eq!(level, "error");
1366        assert!(msg.contains("accept send failed"), "got: {msg}");
1367        assert!(msg.contains("j-1"), "must name the job: {msg}");
1368        assert!(
1369            msg.contains("connection closed"),
1370            "must carry the cause: {msg}"
1371        );
1372    }
1373
1374    #[test]
1375    fn offer_response_breadcrumb_reports_reject_send_failure() {
1376        let (level, msg) = offer_response_breadcrumb(
1377            "reject",
1378            "j-9",
1379            &Err(WsClientError::Transport("sink gone".into())),
1380        )
1381        .expect("a failed reject send must surface a breadcrumb");
1382        assert_eq!(level, "error");
1383        assert!(msg.contains("reject send failed"), "got: {msg}");
1384        assert!(msg.contains("j-9"), "must name the job: {msg}");
1385        assert!(msg.contains("sink gone"), "must carry the cause: {msg}");
1386    }
1387
1388    #[test]
1389    fn fail_send_breadcrumb_is_silent_on_success() {
1390        // The underlying job failure (upload / dispatch / panic) is
1391        // already logged by the caller, so a Fail-frame that lands must
1392        // not add a second per-job line.
1393        assert!(fail_send_breadcrumb("j-1", &Ok(())).is_none());
1394    }
1395
1396    #[test]
1397    fn fail_send_breadcrumb_reports_send_failure() {
1398        let (level, msg) = fail_send_breadcrumb("j-7", &Err(WsClientError::ConnectionClosed))
1399            .expect("a dropped Fail send must surface a breadcrumb");
1400        assert_eq!(level, "error");
1401        assert!(msg.contains("j-7"), "must name the job: {msg}");
1402        assert!(
1403            msg.contains("connection closed"),
1404            "must carry the cause: {msg}"
1405        );
1406    }
1407
1408    #[test]
1409    fn fail_send_breadcrumb_carries_transport_cause() {
1410        let (level, msg) =
1411            fail_send_breadcrumb("j-3", &Err(WsClientError::Transport("sink gone".into())))
1412                .expect("a dropped Fail send must surface a breadcrumb");
1413        assert_eq!(level, "error");
1414        assert!(msg.contains("j-3"), "must name the job: {msg}");
1415        assert!(msg.contains("sink gone"), "must carry the cause: {msg}");
1416    }
1417
1418    #[test]
1419    fn backoff_grows_exponentially_until_cap() {
1420        let schedule = SessionSchedule {
1421            base_backoff_ms: 100,
1422            max_backoff_ms: 1_000,
1423            heartbeat: Duration::from_secs(1),
1424            log_flush: Duration::from_secs(1),
1425            shutdown_tick: Duration::from_secs(1),
1426            read_idle_timeout: Duration::from_secs(1),
1427        };
1428        assert_eq!(backoff_for(1, schedule), Duration::from_millis(100));
1429        assert_eq!(backoff_for(2, schedule), Duration::from_millis(200));
1430        assert_eq!(backoff_for(3, schedule), Duration::from_millis(400));
1431        assert_eq!(backoff_for(4, schedule), Duration::from_millis(800));
1432        // Capped.
1433        assert_eq!(backoff_for(5, schedule), Duration::from_millis(1_000));
1434        assert_eq!(backoff_for(10, schedule), Duration::from_millis(1_000));
1435    }
1436
1437    #[test]
1438    fn reconnect_breadcrumb_keeps_legacy_wording_for_a_plain_disconnect() {
1439        // A mid-session drop carries no error; the exact wording the
1440        // studio-shipped log view and the `ws_session_full_loop`
1441        // contract test key on must be preserved.
1442        let msg = reconnect_breadcrumb(None, 3, Duration::from_millis(800));
1443        assert_eq!(msg, "disconnected; reconnect attempt 3 in 800ms");
1444    }
1445
1446    #[test]
1447    fn reconnect_breadcrumb_surfaces_the_underlying_error() {
1448        // An `Err` outcome (engine build failure, `hello` send failure)
1449        // used to vanish into the plain "disconnected" line, leaving an
1450        // operator with an opaque endless reconnect loop. The cause must
1451        // now ride along while still naming the attempt + backoff.
1452        let err = anyhow!("hello send failed: connection closed");
1453        let msg = reconnect_breadcrumb(Some(&err), 2, Duration::from_millis(400));
1454        assert!(
1455            msg.contains("reconnect attempt 2 in 400ms"),
1456            "must still name attempt + backoff: {msg}"
1457        );
1458        assert!(
1459            msg.contains("hello send failed: connection closed"),
1460            "must carry the cause: {msg}"
1461        );
1462    }
1463
1464    #[test]
1465    fn reconnect_breadcrumb_includes_the_full_error_chain() {
1466        // anyhow context chains must reach the operator so a wrapped
1467        // root cause isn't truncated to just the outer context.
1468        let err = anyhow!("driver missing").context("engine build failed");
1469        let msg = reconnect_breadcrumb(Some(&err), 1, Duration::from_millis(100));
1470        assert!(msg.contains("engine build failed"), "got: {msg}");
1471        assert!(
1472            msg.contains("driver missing"),
1473            "must include the root cause: {msg}"
1474        );
1475    }
1476
1477    #[test]
1478    fn has_credentials_false_when_either_missing() {
1479        let mut cfg = crate::config::Config::default();
1480        let shared = crate::config::shared(cfg.clone());
1481        assert!(!has_credentials(&shared), "both missing");
1482        cfg.worker_id = Some("w-1".into());
1483        let shared = crate::config::shared(cfg.clone());
1484        assert!(!has_credentials(&shared), "only worker_id");
1485        cfg.worker_id = None;
1486        cfg.auth_token = Some("tok".into());
1487        let shared = crate::config::shared(cfg.clone());
1488        assert!(!has_credentials(&shared), "only auth_token");
1489    }
1490
1491    #[test]
1492    fn has_credentials_true_when_both_present() {
1493        let cfg = crate::config::Config {
1494            worker_id: Some("w-1".into()),
1495            auth_token: Some("tok".into()),
1496            ..crate::config::Config::default()
1497        };
1498        let shared = crate::config::shared(cfg);
1499        assert!(has_credentials(&shared));
1500    }
1501
1502    #[test]
1503    fn has_credentials_false_when_empty_strings() {
1504        let cfg = crate::config::Config {
1505            worker_id: Some("".into()),
1506            auth_token: Some("".into()),
1507            ..crate::config::Config::default()
1508        };
1509        let shared = crate::config::shared(cfg);
1510        assert!(!has_credentials(&shared));
1511    }
1512
1513    #[test]
1514    fn pause_transition_breadcrumb_is_silent_when_unchanged() {
1515        // No flag change since the previous tick — the pump must not add
1516        // a per-tick log line on every 5s heartbeat.
1517        assert!(pause_transition_breadcrumb(false, false).is_none());
1518        assert!(pause_transition_breadcrumb(true, true).is_none());
1519    }
1520
1521    #[test]
1522    fn pause_transition_breadcrumb_reports_pause_and_resume() {
1523        // A genuine operator toggle must ship an info-level breadcrumb
1524        // naming the new claiming state so the studio's shipped-log view
1525        // and the UI's Logs tab record why the worker stopped / resumed.
1526        let paused = pause_transition_breadcrumb(false, true).expect("a pause must be reported");
1527        assert!(
1528            paused.contains("paused by operator"),
1529            "expected a pause message, got: {paused}"
1530        );
1531        let resumed = pause_transition_breadcrumb(true, false).expect("a resume must be reported");
1532        assert!(
1533            resumed.contains("resumed by operator"),
1534            "expected a resume message, got: {resumed}"
1535        );
1536    }
1537
1538    #[test]
1539    fn welcome_breadcrumb_surfaces_server_time() {
1540        // The studio stamps `server_time` (its clock at the moment it
1541        // authenticated this worker) onto every `Welcome`; it used to be
1542        // deserialised and dropped, so an operator couldn't spot clock
1543        // skew between the worker host and the studio. The breadcrumb
1544        // must keep the legacy "server welcomed <id>" wording and add the
1545        // server time alongside it.
1546        let line = welcome_breadcrumb("worker-7", "2026-06-15T21:00:00Z");
1547        assert!(
1548            line.contains("server welcomed worker-7"),
1549            "expected the legacy wording + worker id, got: {line}"
1550        );
1551        assert!(
1552            line.contains("server_time=2026-06-15T21:00:00Z"),
1553            "expected the server time, got: {line}"
1554        );
1555    }
1556
1557    #[test]
1558    fn offer_received_breadcrumb_names_game_and_asset() {
1559        // The studio sends `game_id` + `asset_name` on every offer; both
1560        // used to be deserialised and dropped, so an operator fielding
1561        // offers across many games couldn't tell which game / asset each
1562        // job served. The breadcrumb must surface both alongside the
1563        // model + vram estimate it already reported.
1564        let line = offer_received_breadcrumb(
1565            "j-1",
1566            "game-of-elements",
1567            "game-of-elements/creatures/aurora-fox",
1568            "sd-cpp:flux",
1569            12.5,
1570        );
1571        assert!(
1572            line.contains("offer received j-1"),
1573            "expected the job id, got: {line}"
1574        );
1575        assert!(
1576            line.contains("game=game-of-elements"),
1577            "expected the game id, got: {line}"
1578        );
1579        assert!(
1580            line.contains("asset=game-of-elements/creatures/aurora-fox"),
1581            "expected the asset name, got: {line}"
1582        );
1583        assert!(
1584            line.contains("model=sd-cpp:flux"),
1585            "expected the model, got: {line}"
1586        );
1587        assert!(line.contains("vram=12.5"), "expected the vram, got: {line}");
1588    }
1589
1590    #[test]
1591    fn result_ack_breadcrumb_names_the_outcome_and_job() {
1592        // The studio sends `CompleteAck` / `FailAck` the moment it has
1593        // persisted a job's result; both used to be silently dropped on
1594        // the "acks are best-effort; ignore" arm, so the worker's own
1595        // "binary upload ok" / completeJson line was the last word on a
1596        // job. An operator triaging a job that ran twice (worker
1597        // reported done, studio never persisted, job requeued) had no
1598        // signal telling them whether the studio acknowledged the
1599        // result. The breadcrumb must name both the outcome and the
1600        // offending job id.
1601        assert_eq!(
1602            result_ack_breadcrumb("completion", "j-1"),
1603            "studio confirmed completion of job j-1"
1604        );
1605        assert_eq!(
1606            result_ack_breadcrumb("failure", "j-2"),
1607            "studio confirmed failure of job j-2"
1608        );
1609    }
1610}