Skip to main content

studio_worker/
runtime.rs

1//! Long-running auto-update task + one-shot CLI helpers.
2//!
3//! After the WS migration the runtime owns just two background
4//! tasks: the WebSocket session (`ws::session::spawn_ws_session`,
5//! which subsumes heartbeats, claim/accept/complete, fail, and log
6//! shipping) and the auto-updater (`spawn_auto_updater`).  Per-tick
7//! helpers from the old polling loops are gone.
8use crate::{
9    config::{self, Config, SharedConfig},
10    engine::Engine,
11    sys,
12    types::*,
13    update, AGENT_VERSION,
14};
15use anyhow::{anyhow, Result};
16use chrono::{DateTime, SecondsFormat, Utc};
17use parking_lot::Mutex;
18use std::{
19    collections::VecDeque,
20    sync::{
21        atomic::{AtomicBool, Ordering},
22        Arc,
23    },
24    time::Duration,
25};
26use tracing::{info, warn};
27
28/// Tracing target for runtime-level events (startup, state mutations).
29/// Stable so operators can filter with `RUST_LOG=studio_worker::runtime=debug`.
30const TRACE_TARGET: &str = "studio_worker::runtime";
31
32/// Maximum number of finished jobs kept in `WorkerObservers::recent_jobs`.
33/// Older entries fall off the back of the ring.
34pub const RECENT_JOBS_CAP: usize = 50;
35
36/// Maximum number of log entries kept in `WorkerObservers::recent_logs`
37/// for the UI's Logs tab.  The shipping queue (`logs: Arc<Mutex<Vec<…>>>`)
38/// is drained on every WS tick — the display ring is what the UI reads.
39pub const RECENT_LOGS_CAP: usize = 1000;
40
41/// Prompt previews stored in `CurrentJob` / `RecentJob` are clipped to
42/// this many chars so the in-memory state stays bounded even when LLM
43/// prompts are huge.
44pub const PROMPT_PREVIEW_CHARS: usize = 200;
45
46/// Maximum number of entries the WS ship queue (`logs:
47/// Arc<Mutex<Vec<LogEntry>>>`) may hold.  The shipper pump only drains
48/// while a session is connected, so a long approval wait or reconnect
49/// backoff would otherwise grow the queue without bound.  On overflow
50/// the oldest entries are dropped and a warn-level marker records the
51/// loss.
52pub const LOG_SHIP_QUEUE_CAP: usize = 5_000;
53
54/// Job in flight right now.  Populated by the WS session before
55/// dispatch, cleared once the job finishes (success or failure).
56#[derive(Debug, Clone)]
57pub struct CurrentJob {
58    pub job_id: String,
59    pub kind: TaskKind,
60    pub model: String,
61    pub prompt: String,
62    pub started_at: DateTime<Utc>,
63}
64
65/// Outcome a finished job ended with.  Failures carry the human
66/// reason (already surfaced to logs + Sentry).
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum JobOutcome {
69    Completed,
70    Failed { reason: String },
71}
72
73/// One finished job, retained in the recent-jobs ring for the UI.
74#[derive(Debug, Clone)]
75pub struct RecentJob {
76    pub job_id: String,
77    pub kind: TaskKind,
78    pub model: String,
79    pub prompt: String,
80    pub outcome: JobOutcome,
81    pub started_at: DateTime<Utc>,
82    pub finished_at: DateTime<Utc>,
83}
84
85/// Result of the most recent heartbeat the WS session sent.
86#[derive(Debug, Clone, PartialEq, Eq)]
87pub enum HeartbeatOutcome {
88    Ok,
89    Err { reason: String },
90}
91
92#[derive(Debug, Clone)]
93pub struct HeartbeatStatus {
94    pub last_attempt_at: DateTime<Utc>,
95    pub outcome: HeartbeatOutcome,
96}
97
98/// Bundle of in-process observation slots the WS session writes to and
99/// the optional native UI reads from.  `Default` gives empty slots so
100/// existing (headless) call sites stay one-liners.  Cheap to clone —
101/// every field is an `Arc`.
102#[derive(Clone, Default)]
103pub struct WorkerObservers {
104    pub current_job: Arc<Mutex<Option<CurrentJob>>>,
105    pub recent_jobs: Arc<Mutex<VecDeque<RecentJob>>>,
106    pub last_heartbeat: Arc<Mutex<Option<HeartbeatStatus>>>,
107    /// Bounded ring of every log entry the worker has emitted, kept
108    /// for the UI's Logs tab.  Separate from the WS ship queue
109    /// (which is drained every second) so the display doesn't blank
110    /// out between ticks.
111    pub recent_logs: Arc<Mutex<VecDeque<LogEntry>>>,
112}
113
114pub fn truncate_prompt(s: &str) -> String {
115    if s.chars().count() <= PROMPT_PREVIEW_CHARS {
116        return s.to_string();
117    }
118    let mut out: String = s.chars().take(PROMPT_PREVIEW_CHARS).collect();
119    out.push('…');
120    out
121}
122
123pub fn record_recent_job(observers: &WorkerObservers, entry: RecentJob) {
124    let mut ring = observers.recent_jobs.lock();
125    ring.push_front(entry);
126    while ring.len() > RECENT_JOBS_CAP {
127        ring.pop_back();
128    }
129}
130
131/// Test-only helper to populate the recent-jobs ring without driving a
132/// full claim cycle.  Lives in the library surface so integration
133/// tests can pin the ring-capacity contract cheaply.
134#[doc(hidden)]
135pub fn push_recent_job_for_tests(observers: &WorkerObservers, job_id: &str) {
136    let now = Utc::now();
137    record_recent_job(
138        observers,
139        RecentJob {
140            job_id: job_id.to_string(),
141            kind: TaskKind::Image,
142            model: "synthetic".into(),
143            prompt: String::new(),
144            outcome: JobOutcome::Completed,
145            started_at: now,
146            finished_at: now,
147        },
148    );
149}
150
151pub const AUTO_UPDATE_TICK: Duration = Duration::from_secs(60);
152/// Cadence at which the auto-updater's idle wait re-checks the `stop`
153/// flag.  Mirrors the WS session's shutdown tick so a SIGTERM / SIGINT
154/// landing during the (up to `AUTO_UPDATE_TICK`-long) idle window wakes
155/// the loop within ~250 ms instead of leaving `run_loops`' join blocked
156/// for a whole tick.
157pub const AUTO_UPDATE_SHUTDOWN_TICK: Duration = Duration::from_millis(250);
158/// Default WS heartbeat interval, re-exported here so the native UI
159/// (and any other downstream readers) get a stable constant without
160/// reaching into `ws::session`.
161pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
162
163/// Schedule for the long-running loops.
164#[derive(Debug, Clone, Copy)]
165pub struct LoopSchedule {
166    pub ws_session: crate::ws::session::SessionSchedule,
167    pub auto_update_tick: Duration,
168    /// How often the idle wait between update checks re-polls the
169    /// `stop` flag, so a shutdown request isn't deferred for a whole
170    /// `auto_update_tick`.
171    pub shutdown_tick: Duration,
172}
173
174impl Default for LoopSchedule {
175    fn default() -> Self {
176        Self {
177            ws_session: crate::ws::session::SessionSchedule::default(),
178            auto_update_tick: AUTO_UPDATE_TICK,
179            shutdown_tick: AUTO_UPDATE_SHUTDOWN_TICK,
180        }
181    }
182}
183
184impl LoopSchedule {
185    /// Schedule with 1 ms intervals — used by tests to exercise the
186    /// loop wrappers without blocking.
187    pub fn fast_for_tests() -> Self {
188        Self {
189            ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
190            auto_update_tick: Duration::from_millis(1),
191            shutdown_tick: Duration::from_millis(1),
192        }
193    }
194}
195
196// ---------------------------------------------------------------------------
197// One-shot helpers used by the CLI subcommands
198// ---------------------------------------------------------------------------
199
200/// Bundle of flags from `studio-worker register`.
201#[derive(Debug, Clone, Default)]
202pub struct RegisterArgs {
203    pub api_base_url: Option<String>,
204    pub reset: bool,
205}
206
207/// Persist registration metadata for the next launch.  No HTTP — the
208/// auto-register orchestration inside `run` / `ui` is the only thing
209/// that talks to the studio.
210pub async fn register(config_path: Option<&str>, args: RegisterArgs) -> Result<()> {
211    let (mut cfg, path) = config::load(config_path)?;
212
213    if args.reset {
214        cfg.worker_id = None;
215        cfg.auth_token = None;
216        cfg.registration_request_id = None;
217        cfg.registration_secret = None;
218        cfg.install_id = None;
219    }
220    if let Some(url) = args.api_base_url {
221        cfg.api_base_url = url;
222    }
223
224    config::save(&cfg, &path)?;
225    if args.reset {
226        info!(
227            config_path = %path.display(),
228            "local registration state cleared; next launch will auto-register"
229        );
230        println!(
231            "local registration state cleared; run `studio-worker run` or \
232             `studio-worker ui` to auto-register"
233        );
234    } else {
235        info!(
236            config_path = %path.display(),
237            "register flags persisted; next launch will auto-register"
238        );
239        println!(
240            "saved; run `studio-worker run` or `studio-worker ui` to auto-register against {}",
241            cfg.api_base_url
242        );
243    }
244    Ok(())
245}
246
247pub async fn status(config_path: Option<&str>) -> Result<()> {
248    let (cfg, path) = config::load(config_path)?;
249    println!("{}", format_status(&cfg, &path));
250    Ok(())
251}
252
253pub fn format_status(cfg: &Config, path: &std::path::Path) -> String {
254    let mut out = String::new();
255    use std::fmt::Write as _;
256    let _ = writeln!(out, "config path:        {}", path.display());
257    let _ = writeln!(out, "api_base_url:       {}", cfg.api_base_url);
258    let registration_line = if cfg.worker_id.is_some() && cfg.auth_token.is_some() {
259        format!("approved as {}", cfg.worker_id.as_deref().unwrap_or(""))
260    } else if let Some(rid) = cfg.registration_request_id.as_deref() {
261        format!("pending operator approval (request {rid})")
262    } else {
263        "not registered (will auto-register on next launch)".into()
264    };
265    let _ = writeln!(out, "registration:       {registration_line}");
266    let _ = writeln!(out, "vram_threshold_gb:  {}", cfg.vram_threshold_gb);
267    let _ = writeln!(out, "auto_start:         {}", cfg.auto_start);
268    let _ = writeln!(out, "models_root:        {}", cfg.models_root.display());
269    let _ = writeln!(out, "auto_update:        {}", cfg.auto_update_enabled);
270    let _ = writeln!(
271        out,
272        "update_interval:    {}s",
273        cfg.auto_update_interval_secs
274    );
275    out
276}
277
278pub fn set_threshold(config_path: Option<&str>, gb: f32) -> Result<()> {
279    if gb < 0.0 {
280        return Err(anyhow!("threshold must be >= 0"));
281    }
282    let (mut cfg, path) = config::load(config_path)?;
283    cfg.vram_threshold_gb = gb;
284    config::save(&cfg, &path)?;
285    info!(
286        target: TRACE_TARGET,
287        op = "set_threshold",
288        vram_threshold_gb = gb,
289        config_path = path.display().to_string(),
290        "VRAM threshold persisted"
291    );
292    println!("vram_threshold_gb = {gb}");
293    Ok(())
294}
295
296/// Emit a one-shot startup banner so operators can confirm which
297/// config the worker actually loaded.  Without this the only thing in
298/// `journalctl -u studio-worker` on a healthy boot is whatever the
299/// loops happen to log on their first tick.
300pub fn log_startup_banner(cfg: &Config, path: &std::path::Path) {
301    info!(
302        target: TRACE_TARGET,
303        op = "startup",
304        version = AGENT_VERSION,
305        config_path = path.display().to_string(),
306        api_base_url = cfg.api_base_url.as_str(),
307        vram_threshold_gb = cfg.vram_threshold_gb,
308        auto_start = cfg.auto_start,
309        auto_update_enabled = cfg.auto_update_enabled,
310        auto_update_interval_secs = cfg.auto_update_interval_secs,
311        models_root = cfg.models_root.display().to_string(),
312        worker_id = cfg.worker_id.as_deref().unwrap_or("(unregistered)"),
313        "studio-worker booting"
314    );
315}
316
317pub fn show_config(config_path: Option<&str>) -> Result<()> {
318    let (cfg, path) = config::load(config_path)?;
319    println!("# {}", path.display());
320    print!("{}", toml::to_string_pretty(&cfg)?);
321    Ok(())
322}
323
324pub async fn check_update(config_path: Option<&str>) -> Result<()> {
325    let (cfg, _) = config::load(config_path)?;
326    let current = semver::Version::parse(AGENT_VERSION)
327        .map_err(|e| anyhow!("invalid current version {AGENT_VERSION}: {e}"))?;
328    let outcome = tokio::task::spawn_blocking(move || {
329        update::check(&cfg.auto_update_feed, &current, cfg.auto_update_prerelease)
330    })
331    .await??;
332    println!("{}", format_check_outcome(&outcome));
333    Ok(())
334}
335
336pub fn format_check_outcome(outcome: &update::CheckOutcome) -> String {
337    match outcome {
338        update::CheckOutcome::UpToDate { current } => format!("up to date: {current}"),
339        update::CheckOutcome::NewerAvailable { current, latest } => {
340            format!("update available: {current} -> {latest}")
341        }
342    }
343}
344
345// ---------------------------------------------------------------------------
346// Long-running run loop
347// ---------------------------------------------------------------------------
348
349pub async fn run(config_path: Option<&str>) -> Result<()> {
350    let (cfg, path) = config::load(config_path)?;
351    log_startup_banner(&cfg, &path);
352
353    let cfg = config::shared(cfg);
354    let stop = Arc::new(AtomicBool::new(false));
355    let busy = Arc::new(AtomicBool::new(false));
356    // Operator pause toggle.  Runtime-only — never persisted, so the
357    // worker comes up unpaused after every restart.
358    let paused = Arc::new(AtomicBool::new(false));
359    let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
360    let observers = WorkerObservers::default();
361    let registration = crate::auto_register::shared_initial();
362
363    let stop_clone = stop.clone();
364    tokio::spawn(async move {
365        let signal = wait_for_shutdown_signal().await;
366        request_shutdown(&stop_clone, signal);
367    });
368
369    // Block on auto-register until the operator approves (or rejects).
370    // Polls every 30s; aborts on Ctrl-C.  A stop signal that arrives
371    // before approval is a clean shutdown (the pre-approval wait is the
372    // normal state of a fresh worker), so exit Ok rather than letting
373    // `run_cli` log it at error and exit non-zero.
374    if ensure_registered(&cfg, &path, &registration, &stop).await? == RegistrationGate::Stopped {
375        info!(
376            target: TRACE_TARGET,
377            op = "shutdown",
378            "stopped before registration completed; exiting cleanly"
379        );
380        return Ok(());
381    }
382
383    run_loops(
384        cfg,
385        stop,
386        logs,
387        busy,
388        paused,
389        observers,
390        LoopSchedule::default(),
391    )
392    .await
393}
394
395/// Flip the `stop` flag and emit a shutdown breadcrumb so an operator
396/// tailing the journal sees a clean stop, mirroring
397/// [`log_startup_banner`].  Pulled out of the signal task so the
398/// shutdown decision is unit-testable without delivering a real OS
399/// signal.  `signal` names whatever woke us (e.g. `"SIGTERM"`).
400pub fn request_shutdown(stop: &AtomicBool, signal: &str) {
401    let already_stopping = stop.swap(true, Ordering::SeqCst);
402    info!(
403        target: TRACE_TARGET,
404        op = "shutdown",
405        signal,
406        already_stopping,
407        "shutdown signal received; stopping worker gracefully"
408    );
409}
410
411/// Block until the OS asks the worker to stop, returning the name of
412/// the signal that fired.
413///
414/// On Unix we wait on **both** SIGINT (interactive Ctrl-C) and SIGTERM.
415/// SIGTERM is the signal `systemctl stop` / `launchctl unload` / host
416/// shutdown deliver by default, and the worker ships as a `Type=simple`
417/// systemd unit (see `service::render_service`).  Listening for Ctrl-C
418/// alone meant the service manager's stop never reached the graceful
419/// path: the WS session was killed mid-`close`, the studio saw an
420/// abrupt disconnect, and the final log batch never flushed.  If the
421/// SIGTERM handler can't be installed we degrade to Ctrl-C only rather
422/// than abort the shutdown task.
423///
424/// On non-Unix we wait on Ctrl-C, which tokio maps to the console
425/// Ctrl-C / close events.
426#[cfg_attr(coverage_nightly, coverage(off))]
427async fn wait_for_shutdown_signal() -> &'static str {
428    #[cfg(unix)]
429    {
430        use tokio::signal::unix::{signal, SignalKind};
431        let mut sigterm = match signal(SignalKind::terminate()) {
432            Ok(s) => s,
433            Err(e) => {
434                warn!(
435                    target: TRACE_TARGET,
436                    op = "shutdown",
437                    error = %e,
438                    "could not install SIGTERM handler; falling back to Ctrl-C only"
439                );
440                let _ = tokio::signal::ctrl_c().await;
441                return "SIGINT";
442            }
443        };
444        tokio::select! {
445            _ = tokio::signal::ctrl_c() => "SIGINT",
446            _ = sigterm.recv() => "SIGTERM",
447        }
448    }
449    #[cfg(not(unix))]
450    {
451        let _ = tokio::signal::ctrl_c().await;
452        "ctrl-c"
453    }
454}
455
456/// Outcome of the startup registration gate ([`ensure_registered`]).
457///
458/// A clean stop signal (Ctrl-C / SIGTERM) that arrives **before** the
459/// studio approves the worker is a routine shutdown, not a failure:
460/// the pre-approval wait is the normal state of a freshly-installed
461/// worker sitting in the studio's approval queue.  Surfacing it as a
462/// distinct [`Stopped`](RegistrationGate::Stopped) outcome lets `run`
463/// exit 0 — so `systemctl stop` doesn't mark the unit failed — and
464/// skip the top-level `tracing::error!` that would otherwise ship a
465/// spurious Sentry event on every clean stop of an unapproved worker.
466/// An operator *rejection*, by contrast, stays a hard `Err`: it's a
467/// terminal state the operator must act on (`register --reset`).
468#[derive(Debug, Clone, Copy, PartialEq, Eq)]
469pub enum RegistrationGate {
470    /// The worker is registered (already, or just approved); proceed
471    /// to open the WS session.
472    Ready,
473    /// A stop signal arrived before approval; shut down cleanly.
474    Stopped,
475}
476
477/// Loop auto_register::tick on a 30s cadence until `worker_id` +
478/// `auth_token` are populated (Approved → [`RegistrationGate::Ready`]),
479/// a stop signal arrives (→ [`RegistrationGate::Stopped`]), or the
480/// operator rejects the worker (→ `Err` with recovery guidance).
481pub async fn ensure_registered(
482    cfg: &SharedConfig,
483    path: &std::path::Path,
484    registration: &crate::auto_register::SharedRegistration,
485    stop: &Arc<AtomicBool>,
486) -> Result<RegistrationGate> {
487    use std::time::Duration;
488    loop {
489        if stop.load(Ordering::SeqCst) {
490            return Ok(RegistrationGate::Stopped);
491        }
492        {
493            let snap = cfg.lock();
494            if snap.worker_id.is_some() && snap.auth_token.is_some() {
495                return Ok(RegistrationGate::Ready);
496            }
497        }
498        let state = crate::auto_register::tick(cfg, path, registration).await;
499        match state {
500            crate::auto_register::RegistrationState::Approved => {
501                return Ok(RegistrationGate::Ready)
502            }
503            crate::auto_register::RegistrationState::Rejected { reason } => {
504                return Err(anyhow!(
505                    "registration rejected by the studio operator: {reason}.  \
506                     Run `studio-worker register --reset` to clear local state \
507                     and submit a fresh request."
508                ));
509            }
510            _ => {}
511        }
512        // Sleep with a fast-cancel on stop.
513        for _ in 0..30 {
514            if stop.load(Ordering::SeqCst) {
515                return Ok(RegistrationGate::Stopped);
516            }
517            tokio::time::sleep(Duration::from_secs(1)).await;
518        }
519    }
520}
521
522/// Spawn the WS session + auto-updater, wait for them.  Pulled out of
523/// `run` so tests can drive with a different schedule.
524///
525/// `paused` is the runtime-only Pause / Resume toggle the UI flips.
526/// When set, the WS session advertises `auto_enabled = false` in
527/// heartbeats and refuses new job offers without restarting the
528/// session.
529pub async fn run_loops(
530    cfg: SharedConfig,
531    stop: Arc<AtomicBool>,
532    logs: Arc<Mutex<Vec<LogEntry>>>,
533    busy: Arc<AtomicBool>,
534    paused: Arc<AtomicBool>,
535    observers: WorkerObservers,
536    schedule: LoopSchedule,
537) -> Result<()> {
538    let session = crate::ws::session::spawn_ws_session(
539        cfg.clone(),
540        stop.clone(),
541        logs.clone(),
542        busy.clone(),
543        paused.clone(),
544        observers.clone(),
545        schedule.ws_session,
546    );
547    let auto_updater = spawn_auto_updater(
548        cfg.clone(),
549        stop.clone(),
550        logs.clone(),
551        busy.clone(),
552        schedule,
553    );
554    let (session_result, _) = tokio::join!(session, auto_updater);
555    session_result
556}
557
558// ---------------------------------------------------------------------------
559// Per-tick helpers — pure async fns, easy to drive from unit tests.
560// ---------------------------------------------------------------------------
561
562// (The old per-tick HTTP helpers — heartbeat_tick, claim_tick, log_shipper_tick,
563//  run_job, ClaimOutcome — lived here.  They are gone with the WS migration.
564//  See `ws::session::spawn_ws_session` for the replacement that runs the
565//  whole session in one connected loop.)
566
567/// What the auto-updater decided this tick.
568#[derive(Debug, Clone, PartialEq, Eq)]
569pub enum AutoUpdateDecision {
570    /// Auto-update is turned off — do nothing.
571    Disabled,
572    /// Worker is currently running a job — skip.
573    SkippedBusy,
574    /// Local version is already the latest.
575    UpToDate,
576    /// Check failed (network etc.) — leave a log entry, try again later.
577    CheckError(String),
578    /// A newer version was applied successfully.  Caller should restart.
579    Updated,
580    /// A newer version was found but the install failed.
581    UpdateError(String),
582}
583
584pub async fn auto_update_tick(
585    cfg: &Config,
586    busy: bool,
587    logs: &Arc<Mutex<Vec<LogEntry>>>,
588) -> AutoUpdateDecision {
589    if !cfg.auto_update_enabled {
590        return AutoUpdateDecision::Disabled;
591    }
592    if busy {
593        push_log(
594            logs,
595            "info",
596            "auto-update",
597            "skipping check: worker is busy on a job",
598            None,
599        );
600        return AutoUpdateDecision::SkippedBusy;
601    }
602    let feed = cfg.auto_update_feed.clone();
603    let prerelease = cfg.auto_update_prerelease;
604    let logs_for_task = logs.clone();
605    let outcome = tokio::task::spawn_blocking(move || -> Result<AutoUpdateDecision> {
606        let current = semver::Version::parse(AGENT_VERSION)
607            .map_err(|e| anyhow!("invalid AGENT_VERSION {AGENT_VERSION}: {e}"))?;
608        match update::check(&feed, &current, prerelease) {
609            Ok(update::CheckOutcome::UpToDate { current }) => {
610                push_log(
611                    &logs_for_task,
612                    "info",
613                    "auto-update",
614                    &format!("up to date at {current}"),
615                    None,
616                );
617                Ok(AutoUpdateDecision::UpToDate)
618            }
619            Ok(update::CheckOutcome::NewerAvailable { current, latest }) => {
620                push_log(
621                    &logs_for_task,
622                    "info",
623                    "auto-update",
624                    &format!("update available {current} -> {latest}; applying"),
625                    None,
626                );
627                match update::apply(&feed, &latest) {
628                    Ok(()) => {
629                        push_log(
630                            &logs_for_task,
631                            "info",
632                            "auto-update",
633                            "binary replaced; restart pending",
634                            None,
635                        );
636                        Ok(AutoUpdateDecision::Updated)
637                    }
638                    Err(e) => {
639                        push_log(
640                            &logs_for_task,
641                            "error",
642                            "auto-update",
643                            &format!("update failed: {e}"),
644                            None,
645                        );
646                        Ok(AutoUpdateDecision::UpdateError(e.to_string()))
647                    }
648                }
649            }
650            Err(e) => {
651                push_log(
652                    &logs_for_task,
653                    "warn",
654                    "auto-update",
655                    &format!("check failed: {e}"),
656                    None,
657                );
658                Ok(AutoUpdateDecision::CheckError(e.to_string()))
659            }
660        }
661    })
662    .await;
663    match outcome {
664        Ok(Ok(decision)) => decision,
665        Ok(Err(e)) => AutoUpdateDecision::CheckError(e.to_string()),
666        Err(e) => AutoUpdateDecision::CheckError(e.to_string()),
667    }
668}
669
670// ---------------------------------------------------------------------------
671// Long-running task wrappers — they exist solely to call the ticks in a
672// loop on a schedule.  All real logic lives in the ticks.
673// ---------------------------------------------------------------------------
674
675// (`spawn_heartbeat`, `spawn_claim_loop`, `spawn_log_shipper`, and
676//  `next_delay_for` lived here.  Their behaviour is now carried by the
677//  WS-driven tasks in `ws::session`.)
678
679/// Sleep up to `total`, re-checking `stop` every `tick` and returning
680/// the instant a shutdown is requested.  Keeps long idle waits (the
681/// auto-update tick here, reconnect backoff in the WS session)
682/// responsive to SIGTERM / SIGINT without busy-looping.  Shared by the
683/// runtime auto-updater and `ws::session`.
684pub(crate) async fn wait_with_stop(total: Duration, stop: &Arc<AtomicBool>, tick: Duration) {
685    let mut elapsed = Duration::ZERO;
686    while elapsed < total {
687        if stop.load(Ordering::SeqCst) {
688            return;
689        }
690        let next = tick.min(total - elapsed);
691        tokio::time::sleep(next).await;
692        elapsed += next;
693    }
694}
695
696pub fn spawn_auto_updater(
697    cfg: SharedConfig,
698    stop: Arc<AtomicBool>,
699    logs: Arc<Mutex<Vec<LogEntry>>>,
700    busy: Arc<AtomicBool>,
701    schedule: LoopSchedule,
702) -> tokio::task::JoinHandle<()> {
703    tokio::spawn(async move {
704        let mut elapsed = Duration::from_secs(0);
705        while !stop.load(Ordering::SeqCst) {
706            // Stop-aware idle wait: a shutdown signal during this window
707            // wakes the loop within `schedule.shutdown_tick` instead of
708            // leaving `run_loops`' join() blocked for a full
709            // `auto_update_tick`.
710            wait_with_stop(schedule.auto_update_tick, &stop, schedule.shutdown_tick).await;
711            if stop.load(Ordering::SeqCst) {
712                break;
713            }
714            elapsed += schedule.auto_update_tick;
715            let snapshot = cfg.lock().clone();
716            if elapsed < Duration::from_secs(snapshot.auto_update_interval_secs) {
717                continue;
718            }
719            elapsed = Duration::from_secs(0);
720            let busy_now = busy.load(Ordering::SeqCst);
721            let decision = auto_update_tick(&snapshot, busy_now, &logs).await;
722            if matches!(decision, AutoUpdateDecision::Updated) {
723                stop.store(true, Ordering::SeqCst);
724                update::restart_self();
725            }
726        }
727    })
728}
729
730// (`run_job` lived here.  See `ws::session::run_offered_job` for the
731//  WS-driven replacement.)
732
733pub fn prompt_for(task: &Task) -> String {
734    match task {
735        Task::Image(p) => p.prompt.clone(),
736        Task::Llm(p) => p
737            .messages
738            .last()
739            .map(|m| m.content.clone())
740            .unwrap_or_default(),
741        Task::AudioStt(p) => p.input_url.clone(),
742        Task::AudioTts(p) => p.text.clone(),
743        Task::Video(p) => p.prompt.clone(),
744    }
745}
746
747pub fn is_unsupported_kind(e: &anyhow::Error) -> bool {
748    // Typed check first — survives context wrapping and rewording.
749    // The string check remains as a fallback for error paths that
750    // haven't migrated to `engine::UnsupportedTask` yet.
751    e.chain().any(|cause| {
752        cause
753            .downcast_ref::<crate::engine::UnsupportedTask>()
754            .is_some()
755    }) || e.to_string().contains("cannot serve")
756}
757
758// ---------------------------------------------------------------------------
759// Helpers
760// ---------------------------------------------------------------------------
761
762pub fn build_capabilities(cfg: &Config, engine: &dyn Engine) -> WorkerCapabilities {
763    build_capabilities_with(cfg, engine, true)
764}
765
766/// Same as [`build_capabilities`] but lets the caller drive
767/// `auto_enabled` from a runtime pause flag (the UI's Pause/Resume
768/// button).  The persisted [`Config`] no longer carries that bit —
769/// it's an in-process toggle.
770pub fn build_capabilities_with(
771    cfg: &Config,
772    engine: &dyn Engine,
773    auto_enabled: bool,
774) -> WorkerCapabilities {
775    let vram = sys::detect_vram_gb().unwrap_or(0.0);
776    let caps = engine.capabilities();
777    let supported_models_per_kind = caps.supported_models_per_kind.clone();
778    let task_kinds = caps.kinds();
779    // Legacy `supported_models` is a flat list across all kinds so the
780    // studio API's claim filter (which only knows about this field) can
781    // match jobs of any modality this worker can serve.
782    let supported_models = {
783        let mut all = caps.flat_models();
784        all.sort();
785        all.dedup();
786        all
787    };
788
789    WorkerCapabilities {
790        machine_name: sys::machine_name(),
791        username: sys::username(),
792        agent_version: AGENT_VERSION.to_string(),
793        engine: engine.name().to_string(),
794        vram_total_gb: vram,
795        vram_threshold_gb: cfg.vram_threshold_gb,
796        auto_enabled,
797        auto_start: cfg.auto_start,
798        supported_models,
799        task_kinds,
800        supported_models_per_kind,
801    }
802}
803
804/// One-line, operator-facing summary of what this worker advertises to
805/// the studio on the WS handshake.  Logged once per session attempt so
806/// the worker's own logs (and the studio's shipped-log view) record
807/// exactly which task kinds, models, and VRAM budget were offered — the
808/// missing complement to [`log_startup_banner`], which only covers the
809/// loaded config.  Without it, an operator chasing "why won't my worker
810/// claim image jobs" has no record of what the worker told the studio
811/// it could do.  Pure so the formatting is unit-tested without a live
812/// session.
813pub fn summarize_capabilities(caps: &WorkerCapabilities) -> String {
814    let kinds = caps
815        .task_kinds
816        .iter()
817        .map(|k| k.as_str())
818        .collect::<Vec<_>>()
819        .join(", ");
820    format!(
821        "advertising engine={}, vram={:.1}/{:.1}GB threshold, auto_enabled={}, \
822         kinds=[{}], {} model(s)=[{}]",
823        caps.engine,
824        caps.vram_total_gb,
825        caps.vram_threshold_gb,
826        caps.auto_enabled,
827        kinds,
828        caps.supported_models.len(),
829        caps.supported_models.join(", "),
830    )
831}
832
833/// Operator-facing warning when the configured VRAM threshold exceeds
834/// the GPU VRAM the worker actually detected.
835///
836/// The studio matches jobs to a worker purely by its advertised
837/// `vram_threshold_gb`, so a threshold set above the card's real
838/// capacity — e.g. the default 12 GB on an 8 GB consumer GPU — makes the
839/// worker accept jobs its GPU can't fit: they load, exhaust VRAM, and
840/// fail with an OOM the operator then has to trace back to a config
841/// value.  Surfacing it on the handshake (one line next to the
842/// capability summary) turns a silent OOM-on-claim into an actionable
843/// "lower your threshold" breadcrumb.
844///
845/// Only fires when the VRAM probe returned a real positive total: a
846/// detected 0 GB means the probe failed (no `nvidia-smi` / sysfs tree,
847/// or a non-NVIDIA GPU we can't size), where the threshold is the only
848/// capacity signal we have and second-guessing it would be wrong.  The
849/// boundary is strict (`threshold > total`), so a threshold that exactly
850/// matches the card stays silent.  Pure so the wording + boundary are
851/// unit-tested without a live GPU.
852pub fn vram_threshold_warning(caps: &WorkerCapabilities) -> Option<String> {
853    if caps.vram_total_gb > 0.0 && caps.vram_threshold_gb > caps.vram_total_gb {
854        Some(format!(
855            "configured VRAM threshold {:.1}GB exceeds detected GPU VRAM {:.1}GB; \
856             the studio may offer jobs larger than this card can fit and they will \
857             OOM on load — lower vram_threshold_gb to at or below {:.1}GB",
858            caps.vram_threshold_gb, caps.vram_total_gb, caps.vram_total_gb
859        ))
860    } else {
861        None
862    }
863}
864
865pub fn push_log(
866    logs: &Arc<Mutex<Vec<LogEntry>>>,
867    level: &str,
868    category: &str,
869    message: &str,
870    job_id: Option<String>,
871) {
872    push_log_with_observers(logs, None, level, category, message, job_id);
873}
874
875/// Same as [`push_log`] but also appends to
876/// [`WorkerObservers::recent_logs`] so the UI's Logs tab keeps a
877/// rolling display window.  The WS session uses this variant so
878/// operators don't see the Logs tab blank out every second when the
879/// shipping queue gets drained.
880pub fn push_log_with_observers(
881    logs: &Arc<Mutex<Vec<LogEntry>>>,
882    observers: Option<&WorkerObservers>,
883    level: &str,
884    category: &str,
885    message: &str,
886    job_id: Option<String>,
887) {
888    let entry = LogEntry {
889        ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
890        level: level.to_string(),
891        category: category.to_string(),
892        message: message.to_string(),
893        job_id,
894    };
895    // Carry the job id as a structured field so operators can pivot
896    // shipped studio logs / Sentry breadcrumbs on it. `Option<&str>`
897    // only records the field when `Some`, so jobless breadcrumbs stay
898    // free of a noisy empty `job_id`.
899    let job_id = entry.job_id.as_deref();
900    if level == "error" {
901        tracing::error!(target: "studio_worker", job_id, "[{category}] {message}");
902    } else if level == "warn" {
903        tracing::warn!(target: "studio_worker", job_id, "[{category}] {message}");
904    } else {
905        info!(target: "studio_worker", job_id, "[{category}] {message}");
906    }
907    {
908        let mut queue = logs.lock();
909        if queue.len() >= LOG_SHIP_QUEUE_CAP {
910            // +1 for the entry below, +1 for the drop marker.
911            let overflow = queue.len() + 2 - LOG_SHIP_QUEUE_CAP;
912            queue.drain(0..overflow);
913            queue.push(LogEntry {
914                ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
915                level: "warn".to_string(),
916                category: "logs".to_string(),
917                message: format!(
918                    "ship queue full ({LOG_SHIP_QUEUE_CAP} entries); dropped {overflow} oldest"
919                ),
920                job_id: None,
921            });
922        }
923        queue.push(entry.clone());
924    }
925    if let Some(o) = observers {
926        let mut ring = o.recent_logs.lock();
927        ring.push_back(entry);
928        while ring.len() > RECENT_LOGS_CAP {
929            ring.pop_front();
930        }
931    }
932}
933
934/// Put a drained-but-unsent batch back at the front of the ship queue
935/// so it survives for the next session attempt.  Entries that arrived
936/// while the batch was in flight stay behind it (newest last).  The
937/// combined queue is clipped to [`LOG_SHIP_QUEUE_CAP`], dropping the
938/// oldest entries first.
939pub fn restore_unshipped(logs: &Arc<Mutex<Vec<LogEntry>>>, mut batch: Vec<LogEntry>) {
940    let mut queue = logs.lock();
941    batch.append(&mut queue);
942    *queue = batch;
943    if queue.len() > LOG_SHIP_QUEUE_CAP {
944        let overflow = queue.len() - LOG_SHIP_QUEUE_CAP;
945        queue.drain(0..overflow);
946    }
947}
948
949#[cfg(test)]
950mod tests {
951    use super::*;
952    use crate::config::Config;
953    use crate::engine::SyntheticEngine;
954
955    #[test]
956    fn is_unsupported_kind_detects_typed_unsupported_task() {
957        let err: anyhow::Error =
958            crate::engine::UnsupportedTask::new("synthetic", TaskKind::Llm).into();
959        assert!(is_unsupported_kind(&err));
960        // The message keeps the legacy operator-facing shape.
961        assert!(err.to_string().contains("cannot serve llm"));
962    }
963
964    #[test]
965    fn is_unsupported_kind_survives_context_wrapping() {
966        // String sniffing broke as soon as a caller added context (the
967        // outer message no longer contains "cannot serve"); the typed
968        // downcast searches the whole chain.
969        let err = anyhow::Error::from(crate::engine::UnsupportedTask::new(
970            "sdcpp",
971            TaskKind::AudioTts,
972        ))
973        .context("dispatching job j-1");
974        assert!(is_unsupported_kind(&err));
975    }
976
977    fn entry(message: &str) -> LogEntry {
978        LogEntry {
979            ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
980            level: "info".into(),
981            category: "test".into(),
982            message: message.into(),
983            job_id: None,
984        }
985    }
986
987    #[test]
988    fn restore_unshipped_requeues_batch_ahead_of_newer_entries() {
989        // A batch the shipper drained but failed to send must survive
990        // for the next session, ordered before entries that arrived
991        // while it was in flight.
992        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(vec![entry("newer")]));
993        restore_unshipped(&logs, vec![entry("batch-1"), entry("batch-2")]);
994        let queue = logs.lock();
995        let order: Vec<&str> = queue.iter().map(|e| e.message.as_str()).collect();
996        assert_eq!(order, vec!["batch-1", "batch-2", "newer"]);
997    }
998
999    #[test]
1000    fn restore_unshipped_respects_the_queue_cap() {
1001        // Requeueing must never grow the queue past the ship cap; the
1002        // oldest (front) entries give way so the newest survive.
1003        let logs: Arc<Mutex<Vec<LogEntry>>> =
1004            Arc::new(Mutex::new(vec![entry("newest"); LOG_SHIP_QUEUE_CAP]));
1005        restore_unshipped(&logs, vec![entry("old-batch"); 100]);
1006        let queue = logs.lock();
1007        assert_eq!(queue.len(), LOG_SHIP_QUEUE_CAP);
1008        assert_eq!(
1009            queue.last().map(|e| e.message.as_str()),
1010            Some("newest"),
1011            "newest entries must survive the cap"
1012        );
1013    }
1014
1015    #[test]
1016    fn ship_queue_is_bounded_and_records_dropped_entries() {
1017        // The WS shipper only drains while a session is connected; a
1018        // long approval wait / reconnect backoff must not grow the
1019        // queue without bound.
1020        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1021        for i in 0..(LOG_SHIP_QUEUE_CAP + 100) {
1022            push_log_with_observers(&logs, None, "info", "test", &format!("entry {i}"), None);
1023        }
1024        let queue = logs.lock();
1025        assert!(
1026            queue.len() <= LOG_SHIP_QUEUE_CAP,
1027            "ship queue exceeded its cap: {}",
1028            queue.len()
1029        );
1030        // The newest entry always survives.
1031        assert_eq!(
1032            queue.last().map(|e| e.message.as_str()),
1033            Some(format!("entry {}", LOG_SHIP_QUEUE_CAP + 99).as_str())
1034        );
1035        // Loss is visible: a marker entry names how many were dropped.
1036        assert!(
1037            queue
1038                .iter()
1039                .any(|e| e.level == "warn" && e.message.contains("dropped")),
1040            "overflow must leave a visible drop marker"
1041        );
1042    }
1043
1044    #[test]
1045    fn recent_logs_ring_is_bounded_at_recent_logs_cap() {
1046        // The observer ring backing the UI Logs tab is never drained
1047        // (unlike the ship queue, which the WS shipper empties every
1048        // second), so this cap is its only bound.  A regression that
1049        // dropped the eviction loop would leak memory for the lifetime
1050        // of a long-running worker; one that flipped `pop_front` for
1051        // `pop_back` would silently retain the *oldest* entries and show
1052        // a stale Logs tab.  Mirrors `recent_jobs_ring_caps_at_*`.
1053        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1054        let observers = WorkerObservers::default();
1055        let overflow = 25;
1056        for i in 0..(RECENT_LOGS_CAP + overflow) {
1057            push_log_with_observers(
1058                &logs,
1059                Some(&observers),
1060                "info",
1061                "test",
1062                &format!("entry {i}"),
1063                None,
1064            );
1065        }
1066        let ring = observers.recent_logs.lock();
1067        assert_eq!(
1068            ring.len(),
1069            RECENT_LOGS_CAP,
1070            "the recent-logs ring must cap at RECENT_LOGS_CAP"
1071        );
1072        // Newest entries go to the back; the oldest `overflow` entries
1073        // must have been evicted from the front.
1074        assert_eq!(
1075            ring.back().map(|e| e.message.as_str()),
1076            Some(format!("entry {}", RECENT_LOGS_CAP + overflow - 1).as_str()),
1077            "the newest entry must survive at the back of the ring"
1078        );
1079        assert_eq!(
1080            ring.front().map(|e| e.message.as_str()),
1081            Some(format!("entry {overflow}").as_str()),
1082            "the oldest surviving entry must be entry #overflow (older evicted)"
1083        );
1084    }
1085
1086    #[test]
1087    fn capabilities_advertises_all_synthetic_kinds() {
1088        let cfg = Config::default();
1089        let engine = SyntheticEngine::new();
1090        let cap = build_capabilities(&cfg, &engine);
1091        assert_eq!(cap.engine, "synthetic");
1092        assert_eq!(cap.task_kinds.len(), TaskKind::ALL.len());
1093        assert!(cap.auto_enabled, "default capability snapshot is unpaused");
1094        for kind in TaskKind::ALL {
1095            assert!(cap.supported_models_per_kind.contains_key(&kind));
1096        }
1097    }
1098
1099    #[test]
1100    fn capabilities_with_paused_flag_drives_auto_enabled() {
1101        let cfg = Config::default();
1102        let engine = SyntheticEngine::new();
1103        let paused_caps = build_capabilities_with(&cfg, &engine, false);
1104        assert!(!paused_caps.auto_enabled);
1105    }
1106
1107    #[test]
1108    fn summarize_capabilities_lists_engine_kinds_models_vram_and_pause_state() {
1109        let cfg = Config {
1110            vram_threshold_gb: 6.0,
1111            ..Config::default()
1112        };
1113        let engine = SyntheticEngine::new();
1114        let caps = build_capabilities_with(&cfg, &engine, true);
1115        let summary = summarize_capabilities(&caps);
1116        // Engine name + every advertised kind is present.
1117        assert!(summary.contains("engine=synthetic"), "got: {summary}");
1118        for kind in &caps.task_kinds {
1119            assert!(
1120                summary.contains(kind.as_str()),
1121                "missing kind {} in: {summary}",
1122                kind.as_str()
1123            );
1124        }
1125        // Model count + an actual advertised model id are present.
1126        assert!(
1127            summary.contains(&format!("{} model(s)", caps.supported_models.len())),
1128            "missing model count in: {summary}"
1129        );
1130        assert!(
1131            summary.contains("synthetic"),
1132            "missing model id in: {summary}"
1133        );
1134        // VRAM budget (total/threshold) + unpaused state are visible.
1135        assert!(
1136            summary.contains("6.0"),
1137            "missing vram threshold in: {summary}"
1138        );
1139        assert!(summary.contains("auto_enabled=true"), "got: {summary}");
1140    }
1141
1142    #[test]
1143    fn summarize_capabilities_reflects_paused_state() {
1144        let cfg = Config::default();
1145        let engine = SyntheticEngine::new();
1146        let caps = build_capabilities_with(&cfg, &engine, false);
1147        assert!(
1148            summarize_capabilities(&caps).contains("auto_enabled=false"),
1149            "paused worker must advertise auto_enabled=false"
1150        );
1151    }
1152
1153    /// Build a capability snapshot, then override the two VRAM fields so
1154    /// the threshold/total relationship is deterministic regardless of
1155    /// the host's real GPU (the probe is `0.0` on CI).
1156    fn caps_with_vram(total_gb: f32, threshold_gb: f32) -> WorkerCapabilities {
1157        let mut caps = build_capabilities_with(&Config::default(), &SyntheticEngine::new(), true);
1158        caps.vram_total_gb = total_gb;
1159        caps.vram_threshold_gb = threshold_gb;
1160        caps
1161    }
1162
1163    #[test]
1164    fn vram_threshold_warning_flags_threshold_above_detected_vram() {
1165        // The default 12 GB threshold on an 8 GB card: the studio will
1166        // offer up-to-12 GB jobs this GPU can't fit, and they OOM on
1167        // load.  The breadcrumb must name both numbers and the config
1168        // key the operator has to lower.
1169        let warning = vram_threshold_warning(&caps_with_vram(8.0, 12.0))
1170            .expect("threshold above detected VRAM must warn");
1171        assert!(warning.contains("12.0"), "missing threshold in: {warning}");
1172        assert!(
1173            warning.contains("8.0"),
1174            "missing detected VRAM in: {warning}"
1175        );
1176        assert!(
1177            warning.contains("vram_threshold_gb"),
1178            "must name the config key to change: {warning}"
1179        );
1180    }
1181
1182    #[test]
1183    fn vram_threshold_warning_silent_when_threshold_within_detected_vram() {
1184        // A 24 GB card with a 12 GB threshold is correctly conservative.
1185        assert!(vram_threshold_warning(&caps_with_vram(24.0, 12.0)).is_none());
1186    }
1187
1188    #[test]
1189    fn vram_threshold_warning_silent_when_threshold_equals_detected() {
1190        // The boundary is strict: a threshold that exactly matches the
1191        // card fits, so it stays silent.
1192        assert!(vram_threshold_warning(&caps_with_vram(12.0, 12.0)).is_none());
1193    }
1194
1195    #[test]
1196    fn vram_threshold_warning_silent_when_vram_undetected() {
1197        // A detected 0 GB means the probe failed (no nvidia-smi / sysfs)
1198        // or it's a non-NVIDIA GPU we can't size; the threshold is then
1199        // the only capacity signal we have, so second-guessing it with a
1200        // spurious OOM warning would be wrong.
1201        assert!(vram_threshold_warning(&caps_with_vram(0.0, 12.0)).is_none());
1202    }
1203
1204    #[test]
1205    fn prompt_for_extracts_per_kind() {
1206        let image = Task::Image(ImageParams {
1207            prompt: "a stone golem".into(),
1208            ..Default::default()
1209        });
1210        assert_eq!(prompt_for(&image), "a stone golem");
1211
1212        let llm = Task::Llm(LlmParams {
1213            messages: vec![
1214                ChatMessage {
1215                    role: "system".into(),
1216                    content: "be helpful".into(),
1217                },
1218                ChatMessage {
1219                    role: "user".into(),
1220                    content: "hi".into(),
1221                },
1222            ],
1223            max_tokens: 32,
1224            temperature: 0.5,
1225            ..Default::default()
1226        });
1227        assert_eq!(prompt_for(&llm), "hi");
1228
1229        let llm_empty = Task::Llm(LlmParams {
1230            messages: vec![],
1231            ..Default::default()
1232        });
1233        assert_eq!(prompt_for(&llm_empty), "");
1234
1235        let stt = Task::AudioStt(AudioSttParams {
1236            input_url: "https://example.com/clip.wav".into(),
1237            ..Default::default()
1238        });
1239        assert_eq!(prompt_for(&stt), "https://example.com/clip.wav");
1240
1241        let tts = Task::AudioTts(AudioTtsParams {
1242            text: "hi there".into(),
1243            voice: "v".into(),
1244            ext: "wav".into(),
1245            ..Default::default()
1246        });
1247        assert_eq!(prompt_for(&tts), "hi there");
1248
1249        let video = Task::Video(VideoParams {
1250            prompt: "a tiny dragon".into(),
1251            seconds: 1.0,
1252            width: 256,
1253            height: 256,
1254            ext: "mp4".into(),
1255            ..Default::default()
1256        });
1257        assert_eq!(prompt_for(&video), "a tiny dragon");
1258    }
1259
1260    #[test]
1261    fn truncate_prompt_passes_short_through_and_clips_long_prompts() {
1262        // Under the cap → returned verbatim, no ellipsis.
1263        let short = "a stone golem";
1264        assert_eq!(truncate_prompt(short), short);
1265
1266        // Exactly at the cap is the boundary: still untouched.
1267        let exactly = "x".repeat(PROMPT_PREVIEW_CHARS);
1268        assert_eq!(
1269            truncate_prompt(&exactly),
1270            exactly,
1271            "a prompt exactly at the cap must not be clipped"
1272        );
1273
1274        // One past the cap → clipped to PROMPT_PREVIEW_CHARS chars plus
1275        // the single ellipsis terminator.
1276        let over = "y".repeat(PROMPT_PREVIEW_CHARS + 1);
1277        let clipped = truncate_prompt(&over);
1278        assert_eq!(
1279            clipped.chars().count(),
1280            PROMPT_PREVIEW_CHARS + 1,
1281            "clipped preview is the cap plus one ellipsis char"
1282        );
1283        assert!(
1284            clipped.ends_with('\u{2026}'),
1285            "a clipped preview ends with an ellipsis"
1286        );
1287        assert_eq!(
1288            clipped
1289                .chars()
1290                .take(PROMPT_PREVIEW_CHARS)
1291                .collect::<String>(),
1292            "y".repeat(PROMPT_PREVIEW_CHARS),
1293            "the kept prefix is the first PROMPT_PREVIEW_CHARS chars"
1294        );
1295    }
1296
1297    #[test]
1298    fn truncate_prompt_clips_on_char_boundaries_for_multibyte_text() {
1299        // Each char here is 3 bytes, so the cap-th *byte* lands
1300        // mid-codepoint: a naive `&s[..PROMPT_PREVIEW_CHARS]` byte slice
1301        // would panic.  `truncate_prompt` counts chars, so a one-over
1302        // multibyte prompt clips cleanly to the cap plus the ellipsis.
1303        let multibyte = "\u{3042}".repeat(PROMPT_PREVIEW_CHARS + 1);
1304        let clipped = truncate_prompt(&multibyte);
1305        assert_eq!(clipped.chars().count(), PROMPT_PREVIEW_CHARS + 1);
1306        assert!(clipped.ends_with('\u{2026}'));
1307        assert_eq!(
1308            clipped.chars().filter(|c| *c == '\u{3042}').count(),
1309            PROMPT_PREVIEW_CHARS,
1310            "exactly PROMPT_PREVIEW_CHARS multibyte chars survive the clip"
1311        );
1312    }
1313
1314    #[test]
1315    fn is_unsupported_kind_matches_engine_message() {
1316        let err = anyhow!("multi engine cannot serve llm tasks");
1317        assert!(is_unsupported_kind(&err));
1318        let other = anyhow!("network timeout");
1319        assert!(!is_unsupported_kind(&other));
1320    }
1321
1322    #[test]
1323    fn format_status_includes_every_field() {
1324        let cfg = Config::default();
1325        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1326        assert!(out.contains("config path:"));
1327        assert!(out.contains("api_base_url:"));
1328        assert!(out.contains("registration:"));
1329        assert!(out.contains("not registered"));
1330        assert!(out.contains("models_root:"));
1331        assert!(out.contains("auto_update:"));
1332        assert!(out.contains("update_interval:"));
1333    }
1334
1335    #[test]
1336    fn format_status_shows_worker_id_when_registered() {
1337        let cfg = Config {
1338            worker_id: Some("w-abc".into()),
1339            auth_token: Some("tok".into()),
1340            ..Config::default()
1341        };
1342        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1343        assert!(out.contains("w-abc"));
1344        assert!(out.contains("approved"));
1345    }
1346
1347    #[test]
1348    fn format_status_shows_pending_request_id() {
1349        let cfg = Config {
1350            registration_request_id: Some("rr-7".into()),
1351            ..Config::default()
1352        };
1353        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1354        assert!(out.contains("pending operator approval"));
1355        assert!(out.contains("rr-7"));
1356    }
1357
1358    #[test]
1359    fn format_check_outcome_handles_both_branches() {
1360        let up = update::CheckOutcome::UpToDate {
1361            current: semver::Version::new(1, 2, 3),
1362        };
1363        assert!(format_check_outcome(&up).contains("up to date"));
1364        let newer = update::CheckOutcome::NewerAvailable {
1365            current: semver::Version::new(1, 2, 3),
1366            latest: semver::Version::new(1, 3, 0),
1367        };
1368        let s = format_check_outcome(&newer);
1369        assert!(s.contains("1.2.3 -> 1.3.0"));
1370    }
1371
1372    #[test]
1373    fn push_log_appends_an_entry() {
1374        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1375        push_log(&logs, "info", "test", "hi", None);
1376        push_log(&logs, "warn", "test", "wat", Some("j-1".into()));
1377        push_log(&logs, "error", "test", "boom", None);
1378        let v = logs.lock();
1379        assert_eq!(v.len(), 3);
1380        assert_eq!(v[0].level, "info");
1381        assert_eq!(v[1].level, "warn");
1382        assert_eq!(v[1].job_id.as_deref(), Some("j-1"));
1383        assert_eq!(v[2].level, "error");
1384    }
1385
1386    #[test]
1387    fn push_log_emits_job_id_as_a_structured_tracing_field() {
1388        // Operators correlating shipped studio logs / Sentry
1389        // breadcrumbs by job need the job id as a *field*, not just
1390        // buried in the message text, so `RUST_LOG` filters and Sentry
1391        // tag search can pivot on it.
1392        use crate::test_support::capture;
1393        let logs = capture(|| {
1394            let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1395            push_log(
1396                &logs,
1397                "info",
1398                "ws",
1399                "binary upload ok",
1400                Some("job-42".into()),
1401            );
1402        });
1403        assert!(
1404            logs.contains("job_id=\"job-42\""),
1405            "expected structured job_id field, got: {logs}"
1406        );
1407        assert!(
1408            logs.contains("[ws] binary upload ok"),
1409            "expected the human-readable message to survive, got: {logs}"
1410        );
1411    }
1412
1413    #[test]
1414    fn push_log_omits_job_id_field_when_absent() {
1415        // Jobless breadcrumbs (startup banners, heartbeats, auto-update
1416        // ticks) must not gain a noisy empty `job_id` field.
1417        use crate::test_support::capture;
1418        let logs = capture(|| {
1419            let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1420            push_log(&logs, "info", "auto-update", "up to date", None);
1421        });
1422        assert!(
1423            !logs.contains("job_id"),
1424            "expected no job_id field for a jobless log, got: {logs}"
1425        );
1426    }
1427
1428    // --- async tick tests ---
1429
1430    #[test]
1431    fn request_shutdown_sets_the_stop_flag() {
1432        let stop = AtomicBool::new(false);
1433        request_shutdown(&stop, "SIGTERM");
1434        assert!(stop.load(Ordering::SeqCst));
1435    }
1436
1437    #[test]
1438    fn request_shutdown_reconfirms_when_already_stopping() {
1439        // A second signal (or a race with another shutdown path) must
1440        // not panic or clear the flag — it just re-confirms the stop.
1441        let stop = AtomicBool::new(true);
1442        request_shutdown(&stop, "SIGINT");
1443        assert!(stop.load(Ordering::SeqCst));
1444    }
1445
1446    #[test]
1447    fn request_shutdown_emits_a_named_shutdown_breadcrumb() {
1448        use crate::test_support::capture;
1449        let logs = capture(|| {
1450            let stop = AtomicBool::new(false);
1451            request_shutdown(&stop, "SIGTERM");
1452        });
1453        assert!(logs.contains("INFO"), "expected INFO event, got: {logs}");
1454        assert!(
1455            logs.contains("studio_worker::runtime"),
1456            "expected runtime target, got: {logs}"
1457        );
1458        assert!(
1459            logs.contains("op=\"shutdown\""),
1460            "expected op field, got: {logs}"
1461        );
1462        assert!(
1463            logs.contains("signal=\"SIGTERM\""),
1464            "expected signal field, got: {logs}"
1465        );
1466    }
1467
1468    #[tokio::test]
1469    async fn auto_update_tick_disabled_when_flag_off() {
1470        let cfg = Config {
1471            auto_update_enabled: false,
1472            ..Config::default()
1473        };
1474        let logs = Arc::new(Mutex::new(Vec::new()));
1475        let decision = auto_update_tick(&cfg, false, &logs).await;
1476        assert_eq!(decision, AutoUpdateDecision::Disabled);
1477    }
1478
1479    #[tokio::test]
1480    async fn auto_update_tick_skipped_when_busy() {
1481        let cfg = Config {
1482            auto_update_enabled: true,
1483            ..Config::default()
1484        };
1485        let logs = Arc::new(Mutex::new(Vec::new()));
1486        let decision = auto_update_tick(&cfg, true, &logs).await;
1487        assert_eq!(decision, AutoUpdateDecision::SkippedBusy);
1488        let entries = logs.lock();
1489        assert!(entries.iter().any(|e| e.message.contains("busy on a job")));
1490    }
1491
1492    #[tokio::test]
1493    async fn wait_with_stop_short_circuits_when_already_stopped() {
1494        let stop = Arc::new(AtomicBool::new(true));
1495        let start = std::time::Instant::now();
1496        wait_with_stop(Duration::from_secs(60), &stop, Duration::from_millis(10)).await;
1497        assert!(
1498            start.elapsed() < Duration::from_millis(100),
1499            "an already-set stop must return without sleeping the full duration"
1500        );
1501    }
1502
1503    #[tokio::test]
1504    async fn auto_updater_stops_promptly_during_idle_wait() {
1505        // A huge auto_update_tick means a non-cancellable idle sleep
1506        // would pin the JoinHandle — and thus `run_loops`' join() — for
1507        // the whole tick after stop is set, defeating graceful
1508        // shutdown.  The stop-aware wait must let the task finish well
1509        // inside the tick.
1510        let cfg = crate::config::shared(Config {
1511            auto_update_enabled: false,
1512            ..Config::default()
1513        });
1514        let stop = Arc::new(AtomicBool::new(false));
1515        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1516        let busy = Arc::new(AtomicBool::new(false));
1517        let schedule = LoopSchedule {
1518            ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
1519            auto_update_tick: Duration::from_secs(3600),
1520            shutdown_tick: Duration::from_millis(1),
1521        };
1522        let handle = spawn_auto_updater(cfg, stop.clone(), logs, busy, schedule);
1523        // Let the loop reach its idle wait, then request shutdown.
1524        tokio::time::sleep(Duration::from_millis(10)).await;
1525        stop.store(true, Ordering::SeqCst);
1526        tokio::time::timeout(Duration::from_millis(250), handle)
1527            .await
1528            .expect("auto-updater did not observe stop promptly")
1529            .expect("auto-updater task panicked");
1530    }
1531}