Skip to main content

studio_worker/
runtime.rs

1//! Long-running auto-update task + one-shot CLI helpers.
2//!
3//! After the WS migration the runtime owns just two background
4//! tasks: the WebSocket session (`ws::session::spawn_ws_session`,
5//! which subsumes heartbeats, claim/accept/complete, fail, and log
6//! shipping) and the auto-updater (`spawn_auto_updater`).  Per-tick
7//! helpers from the old polling loops are gone.
8use crate::{
9    config::{self, Config, SharedConfig},
10    engine::Engine,
11    sys,
12    types::*,
13    update, AGENT_VERSION,
14};
15use anyhow::{anyhow, Result};
16use chrono::{DateTime, SecondsFormat, Utc};
17use parking_lot::Mutex;
18use std::{
19    collections::VecDeque,
20    sync::{
21        atomic::{AtomicBool, Ordering},
22        Arc,
23    },
24    time::Duration,
25};
26use tracing::{info, warn};
27
28/// Tracing target for runtime-level events (startup, state mutations).
29/// Stable so operators can filter with `RUST_LOG=studio_worker::runtime=debug`.
30const TRACE_TARGET: &str = "studio_worker::runtime";
31
32/// Maximum number of finished jobs kept in `WorkerObservers::recent_jobs`.
33/// Older entries fall off the back of the ring.
34pub const RECENT_JOBS_CAP: usize = 50;
35
36/// Maximum number of log entries kept in `WorkerObservers::recent_logs`
37/// for the UI's Logs tab.  The shipping queue (`logs: Arc<Mutex<Vec<…>>>`)
38/// is drained on every WS tick — the display ring is what the UI reads.
39pub const RECENT_LOGS_CAP: usize = 1000;
40
41/// Prompt previews stored in `CurrentJob` / `RecentJob` are clipped to
42/// this many chars so the in-memory state stays bounded even when LLM
43/// prompts are huge.
44pub const PROMPT_PREVIEW_CHARS: usize = 200;
45
46/// Job in flight right now.  Populated by the WS session before
47/// dispatch, cleared once the job finishes (success or failure).
48#[derive(Debug, Clone)]
49pub struct CurrentJob {
50    pub job_id: String,
51    pub kind: TaskKind,
52    pub model: String,
53    pub prompt: String,
54    pub started_at: DateTime<Utc>,
55}
56
57/// Outcome a finished job ended with.  Failures carry the human
58/// reason (already surfaced to logs + Sentry).
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum JobOutcome {
61    Completed,
62    Failed { reason: String },
63}
64
65/// One finished job, retained in the recent-jobs ring for the UI.
66#[derive(Debug, Clone)]
67pub struct RecentJob {
68    pub job_id: String,
69    pub kind: TaskKind,
70    pub model: String,
71    pub prompt: String,
72    pub outcome: JobOutcome,
73    pub started_at: DateTime<Utc>,
74    pub finished_at: DateTime<Utc>,
75}
76
77/// Result of the most recent heartbeat the WS session sent.
78#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum HeartbeatOutcome {
80    Ok,
81    Err { reason: String },
82}
83
84#[derive(Debug, Clone)]
85pub struct HeartbeatStatus {
86    pub last_attempt_at: DateTime<Utc>,
87    pub outcome: HeartbeatOutcome,
88}
89
90/// Bundle of in-process observation slots the WS session writes to and
91/// the optional native UI reads from.  `Default` gives empty slots so
92/// existing (headless) call sites stay one-liners.  Cheap to clone —
93/// every field is an `Arc`.
94#[derive(Clone, Default)]
95pub struct WorkerObservers {
96    pub current_job: Arc<Mutex<Option<CurrentJob>>>,
97    pub recent_jobs: Arc<Mutex<VecDeque<RecentJob>>>,
98    pub last_heartbeat: Arc<Mutex<Option<HeartbeatStatus>>>,
99    /// Bounded ring of every log entry the worker has emitted, kept
100    /// for the UI's Logs tab.  Separate from the WS ship queue
101    /// (which is drained every second) so the display doesn't blank
102    /// out between ticks.
103    pub recent_logs: Arc<Mutex<VecDeque<LogEntry>>>,
104}
105
106pub fn truncate_prompt(s: &str) -> String {
107    if s.chars().count() <= PROMPT_PREVIEW_CHARS {
108        return s.to_string();
109    }
110    let mut out: String = s.chars().take(PROMPT_PREVIEW_CHARS).collect();
111    out.push('…');
112    out
113}
114
115pub fn record_recent_job(observers: &WorkerObservers, entry: RecentJob) {
116    let mut ring = observers.recent_jobs.lock();
117    ring.push_front(entry);
118    while ring.len() > RECENT_JOBS_CAP {
119        ring.pop_back();
120    }
121}
122
123/// Test-only helper to populate the recent-jobs ring without driving a
124/// full claim cycle.  Lives in the library surface so integration
125/// tests can pin the ring-capacity contract cheaply.
126#[doc(hidden)]
127pub fn push_recent_job_for_tests(observers: &WorkerObservers, job_id: &str) {
128    let now = Utc::now();
129    record_recent_job(
130        observers,
131        RecentJob {
132            job_id: job_id.to_string(),
133            kind: TaskKind::Image,
134            model: "synthetic".into(),
135            prompt: String::new(),
136            outcome: JobOutcome::Completed,
137            started_at: now,
138            finished_at: now,
139        },
140    );
141}
142
143pub const AUTO_UPDATE_TICK: Duration = Duration::from_secs(60);
144/// Cadence at which the auto-updater's idle wait re-checks the `stop`
145/// flag.  Mirrors the WS session's shutdown tick so a SIGTERM / SIGINT
146/// landing during the (up to `AUTO_UPDATE_TICK`-long) idle window wakes
147/// the loop within ~250 ms instead of leaving `run_loops`' join blocked
148/// for a whole tick.
149pub const AUTO_UPDATE_SHUTDOWN_TICK: Duration = Duration::from_millis(250);
150/// Default WS heartbeat interval, re-exported here so the native UI
151/// (and any other downstream readers) get a stable constant without
152/// reaching into `ws::session`.
153pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
154
155/// Schedule for the long-running loops.
156#[derive(Debug, Clone, Copy)]
157pub struct LoopSchedule {
158    pub ws_session: crate::ws::session::SessionSchedule,
159    pub auto_update_tick: Duration,
160    /// How often the idle wait between update checks re-polls the
161    /// `stop` flag, so a shutdown request isn't deferred for a whole
162    /// `auto_update_tick`.
163    pub shutdown_tick: Duration,
164}
165
166impl Default for LoopSchedule {
167    fn default() -> Self {
168        Self {
169            ws_session: crate::ws::session::SessionSchedule::default(),
170            auto_update_tick: AUTO_UPDATE_TICK,
171            shutdown_tick: AUTO_UPDATE_SHUTDOWN_TICK,
172        }
173    }
174}
175
176impl LoopSchedule {
177    /// Schedule with 1 ms intervals — used by tests to exercise the
178    /// loop wrappers without blocking.
179    pub fn fast_for_tests() -> Self {
180        Self {
181            ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
182            auto_update_tick: Duration::from_millis(1),
183            shutdown_tick: Duration::from_millis(1),
184        }
185    }
186}
187
188// ---------------------------------------------------------------------------
189// One-shot helpers used by the CLI subcommands
190// ---------------------------------------------------------------------------
191
192/// Bundle of flags from `studio-worker register`.
193#[derive(Debug, Clone, Default)]
194pub struct RegisterArgs {
195    pub api_base_url: Option<String>,
196    pub reset: bool,
197}
198
199/// Persist registration metadata for the next launch.  No HTTP — the
200/// auto-register orchestration inside `run` / `ui` is the only thing
201/// that talks to the studio.
202pub async fn register(config_path: Option<&str>, args: RegisterArgs) -> Result<()> {
203    let (mut cfg, path) = config::load(config_path)?;
204
205    if args.reset {
206        cfg.worker_id = None;
207        cfg.auth_token = None;
208        cfg.registration_request_id = None;
209        cfg.registration_secret = None;
210        cfg.install_id = None;
211    }
212    if let Some(url) = args.api_base_url {
213        cfg.api_base_url = url;
214    }
215
216    config::save(&cfg, &path)?;
217    if args.reset {
218        info!(
219            config_path = %path.display(),
220            "local registration state cleared; next launch will auto-register"
221        );
222        println!(
223            "local registration state cleared; run `studio-worker run` or \
224             `studio-worker ui` to auto-register"
225        );
226    } else {
227        info!(
228            config_path = %path.display(),
229            "register flags persisted; next launch will auto-register"
230        );
231        println!(
232            "saved; run `studio-worker run` or `studio-worker ui` to auto-register against {}",
233            cfg.api_base_url
234        );
235    }
236    Ok(())
237}
238
239pub async fn status(config_path: Option<&str>) -> Result<()> {
240    let (cfg, path) = config::load(config_path)?;
241    println!("{}", format_status(&cfg, &path));
242    Ok(())
243}
244
245pub fn format_status(cfg: &Config, path: &std::path::Path) -> String {
246    let mut out = String::new();
247    use std::fmt::Write as _;
248    let _ = writeln!(out, "config path:        {}", path.display());
249    let _ = writeln!(out, "api_base_url:       {}", cfg.api_base_url);
250    let registration_line = if cfg.worker_id.is_some() && cfg.auth_token.is_some() {
251        format!("approved as {}", cfg.worker_id.as_deref().unwrap_or(""))
252    } else if let Some(rid) = cfg.registration_request_id.as_deref() {
253        format!("pending operator approval (request {rid})")
254    } else {
255        "not registered (will auto-register on next launch)".into()
256    };
257    let _ = writeln!(out, "registration:       {registration_line}");
258    let _ = writeln!(out, "vram_threshold_gb:  {}", cfg.vram_threshold_gb);
259    let _ = writeln!(out, "auto_start:         {}", cfg.auto_start);
260    let _ = writeln!(out, "models_root:        {}", cfg.models_root.display());
261    let _ = writeln!(out, "auto_update:        {}", cfg.auto_update_enabled);
262    let _ = writeln!(
263        out,
264        "update_interval:    {}s",
265        cfg.auto_update_interval_secs
266    );
267    out
268}
269
270pub fn set_threshold(config_path: Option<&str>, gb: f32) -> Result<()> {
271    if gb < 0.0 {
272        return Err(anyhow!("threshold must be >= 0"));
273    }
274    let (mut cfg, path) = config::load(config_path)?;
275    cfg.vram_threshold_gb = gb;
276    config::save(&cfg, &path)?;
277    info!(
278        target: TRACE_TARGET,
279        op = "set_threshold",
280        vram_threshold_gb = gb,
281        config_path = path.display().to_string(),
282        "VRAM threshold persisted"
283    );
284    println!("vram_threshold_gb = {gb}");
285    Ok(())
286}
287
288/// Emit a one-shot startup banner so operators can confirm which
289/// config the worker actually loaded.  Without this the only thing in
290/// `journalctl -u studio-worker` on a healthy boot is whatever the
291/// loops happen to log on their first tick.
292pub fn log_startup_banner(cfg: &Config, path: &std::path::Path) {
293    info!(
294        target: TRACE_TARGET,
295        op = "startup",
296        version = AGENT_VERSION,
297        config_path = path.display().to_string(),
298        api_base_url = cfg.api_base_url.as_str(),
299        vram_threshold_gb = cfg.vram_threshold_gb,
300        auto_start = cfg.auto_start,
301        auto_update_enabled = cfg.auto_update_enabled,
302        auto_update_interval_secs = cfg.auto_update_interval_secs,
303        models_root = cfg.models_root.display().to_string(),
304        worker_id = cfg.worker_id.as_deref().unwrap_or("(unregistered)"),
305        "studio-worker booting"
306    );
307}
308
309pub fn show_config(config_path: Option<&str>) -> Result<()> {
310    let (cfg, path) = config::load(config_path)?;
311    println!("# {}", path.display());
312    print!("{}", toml::to_string_pretty(&cfg)?);
313    Ok(())
314}
315
316pub async fn check_update(config_path: Option<&str>) -> Result<()> {
317    let (cfg, _) = config::load(config_path)?;
318    let current = semver::Version::parse(AGENT_VERSION)
319        .map_err(|e| anyhow!("invalid current version {AGENT_VERSION}: {e}"))?;
320    let outcome = tokio::task::spawn_blocking(move || {
321        update::check(&cfg.auto_update_feed, &current, cfg.auto_update_prerelease)
322    })
323    .await??;
324    println!("{}", format_check_outcome(&outcome));
325    Ok(())
326}
327
328pub fn format_check_outcome(outcome: &update::CheckOutcome) -> String {
329    match outcome {
330        update::CheckOutcome::UpToDate { current } => format!("up to date: {current}"),
331        update::CheckOutcome::NewerAvailable { current, latest } => {
332            format!("update available: {current} -> {latest}")
333        }
334    }
335}
336
337// ---------------------------------------------------------------------------
338// Long-running run loop
339// ---------------------------------------------------------------------------
340
341pub async fn run(config_path: Option<&str>) -> Result<()> {
342    let (cfg, path) = config::load(config_path)?;
343    log_startup_banner(&cfg, &path);
344
345    let cfg = config::shared(cfg);
346    let stop = Arc::new(AtomicBool::new(false));
347    let busy = Arc::new(AtomicBool::new(false));
348    // Operator pause toggle.  Runtime-only — never persisted, so the
349    // worker comes up unpaused after every restart.
350    let paused = Arc::new(AtomicBool::new(false));
351    let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
352    let observers = WorkerObservers::default();
353    let registration = crate::auto_register::shared_initial();
354
355    let stop_clone = stop.clone();
356    tokio::spawn(async move {
357        let signal = wait_for_shutdown_signal().await;
358        request_shutdown(&stop_clone, signal);
359    });
360
361    // Block on auto-register until the operator approves (or rejects).
362    // Polls every 30s; aborts on Ctrl-C.
363    ensure_registered(&cfg, &path, &registration, &stop).await?;
364
365    run_loops(
366        cfg,
367        stop,
368        logs,
369        busy,
370        paused,
371        observers,
372        LoopSchedule::default(),
373    )
374    .await
375}
376
377/// Flip the `stop` flag and emit a shutdown breadcrumb so an operator
378/// tailing the journal sees a clean stop, mirroring
379/// [`log_startup_banner`].  Pulled out of the signal task so the
380/// shutdown decision is unit-testable without delivering a real OS
381/// signal.  `signal` names whatever woke us (e.g. `"SIGTERM"`).
382pub fn request_shutdown(stop: &AtomicBool, signal: &str) {
383    let already_stopping = stop.swap(true, Ordering::SeqCst);
384    info!(
385        target: TRACE_TARGET,
386        op = "shutdown",
387        signal,
388        already_stopping,
389        "shutdown signal received; stopping worker gracefully"
390    );
391}
392
393/// Block until the OS asks the worker to stop, returning the name of
394/// the signal that fired.
395///
396/// On Unix we wait on **both** SIGINT (interactive Ctrl-C) and SIGTERM.
397/// SIGTERM is the signal `systemctl stop` / `launchctl unload` / host
398/// shutdown deliver by default, and the worker ships as a `Type=simple`
399/// systemd unit (see `service::render_service`).  Listening for Ctrl-C
400/// alone meant the service manager's stop never reached the graceful
401/// path: the WS session was killed mid-`close`, the studio saw an
402/// abrupt disconnect, and the final log batch never flushed.  If the
403/// SIGTERM handler can't be installed we degrade to Ctrl-C only rather
404/// than abort the shutdown task.
405///
406/// On non-Unix we wait on Ctrl-C, which tokio maps to the console
407/// Ctrl-C / close events.
408#[cfg_attr(coverage_nightly, coverage(off))]
409async fn wait_for_shutdown_signal() -> &'static str {
410    #[cfg(unix)]
411    {
412        use tokio::signal::unix::{signal, SignalKind};
413        let mut sigterm = match signal(SignalKind::terminate()) {
414            Ok(s) => s,
415            Err(e) => {
416                warn!(
417                    target: TRACE_TARGET,
418                    op = "shutdown",
419                    error = %e,
420                    "could not install SIGTERM handler; falling back to Ctrl-C only"
421                );
422                let _ = tokio::signal::ctrl_c().await;
423                return "SIGINT";
424            }
425        };
426        tokio::select! {
427            _ = tokio::signal::ctrl_c() => "SIGINT",
428            _ = sigterm.recv() => "SIGTERM",
429        }
430    }
431    #[cfg(not(unix))]
432    {
433        let _ = tokio::signal::ctrl_c().await;
434        "ctrl-c"
435    }
436}
437
438/// Loop auto_register::tick on a 30s cadence until `worker_id` +
439/// `auth_token` are populated (Approved) or the operator rejects.
440pub async fn ensure_registered(
441    cfg: &SharedConfig,
442    path: &std::path::Path,
443    registration: &crate::auto_register::SharedRegistration,
444    stop: &Arc<AtomicBool>,
445) -> Result<()> {
446    use std::time::Duration;
447    loop {
448        if stop.load(Ordering::SeqCst) {
449            return Err(anyhow!("shutdown before registration completed"));
450        }
451        {
452            let snap = cfg.lock();
453            if snap.worker_id.is_some() && snap.auth_token.is_some() {
454                return Ok(());
455            }
456        }
457        let state = crate::auto_register::tick(cfg, path, registration).await;
458        match state {
459            crate::auto_register::RegistrationState::Approved => return Ok(()),
460            crate::auto_register::RegistrationState::Rejected { reason } => {
461                return Err(anyhow!(
462                    "registration rejected by the studio operator: {reason}.  \
463                     Run `studio-worker register --reset` to clear local state \
464                     and submit a fresh request."
465                ));
466            }
467            _ => {}
468        }
469        // Sleep with a fast-cancel on stop.
470        for _ in 0..30 {
471            if stop.load(Ordering::SeqCst) {
472                return Err(anyhow!("shutdown during registration wait"));
473            }
474            tokio::time::sleep(Duration::from_secs(1)).await;
475        }
476    }
477}
478
479/// Spawn the WS session + auto-updater, wait for them.  Pulled out of
480/// `run` so tests can drive with a different schedule.
481///
482/// `paused` is the runtime-only Pause / Resume toggle the UI flips.
483/// When set, the WS session advertises `auto_enabled = false` in
484/// heartbeats and refuses new job offers without restarting the
485/// session.
486pub async fn run_loops(
487    cfg: SharedConfig,
488    stop: Arc<AtomicBool>,
489    logs: Arc<Mutex<Vec<LogEntry>>>,
490    busy: Arc<AtomicBool>,
491    paused: Arc<AtomicBool>,
492    observers: WorkerObservers,
493    schedule: LoopSchedule,
494) -> Result<()> {
495    let session = crate::ws::session::spawn_ws_session(
496        cfg.clone(),
497        stop.clone(),
498        logs.clone(),
499        busy.clone(),
500        paused.clone(),
501        observers.clone(),
502        schedule.ws_session,
503    );
504    let auto_updater = spawn_auto_updater(
505        cfg.clone(),
506        stop.clone(),
507        logs.clone(),
508        busy.clone(),
509        schedule,
510    );
511    let (session_result, _) = tokio::join!(session, auto_updater);
512    session_result
513}
514
515// ---------------------------------------------------------------------------
516// Per-tick helpers — pure async fns, easy to drive from unit tests.
517// ---------------------------------------------------------------------------
518
519// (The old per-tick HTTP helpers — heartbeat_tick, claim_tick, log_shipper_tick,
520//  run_job, ClaimOutcome — lived here.  They are gone with the WS migration.
521//  See `ws::session::spawn_ws_session` for the replacement that runs the
522//  whole session in one connected loop.)
523
524/// What the auto-updater decided this tick.
525#[derive(Debug, Clone, PartialEq, Eq)]
526pub enum AutoUpdateDecision {
527    /// Auto-update is turned off — do nothing.
528    Disabled,
529    /// Worker is currently running a job — skip.
530    SkippedBusy,
531    /// Local version is already the latest.
532    UpToDate,
533    /// Check failed (network etc.) — leave a log entry, try again later.
534    CheckError(String),
535    /// A newer version was applied successfully.  Caller should restart.
536    Updated,
537    /// A newer version was found but the install failed.
538    UpdateError(String),
539}
540
541pub async fn auto_update_tick(
542    cfg: &Config,
543    busy: bool,
544    logs: &Arc<Mutex<Vec<LogEntry>>>,
545) -> AutoUpdateDecision {
546    if !cfg.auto_update_enabled {
547        return AutoUpdateDecision::Disabled;
548    }
549    if busy {
550        push_log(
551            logs,
552            "info",
553            "auto-update",
554            "skipping check: worker is busy on a job",
555            None,
556        );
557        return AutoUpdateDecision::SkippedBusy;
558    }
559    let feed = cfg.auto_update_feed.clone();
560    let prerelease = cfg.auto_update_prerelease;
561    let logs_for_task = logs.clone();
562    let outcome = tokio::task::spawn_blocking(move || -> Result<AutoUpdateDecision> {
563        let current = semver::Version::parse(AGENT_VERSION)
564            .map_err(|e| anyhow!("invalid AGENT_VERSION {AGENT_VERSION}: {e}"))?;
565        match update::check(&feed, &current, prerelease) {
566            Ok(update::CheckOutcome::UpToDate { current }) => {
567                push_log(
568                    &logs_for_task,
569                    "info",
570                    "auto-update",
571                    &format!("up to date at {current}"),
572                    None,
573                );
574                Ok(AutoUpdateDecision::UpToDate)
575            }
576            Ok(update::CheckOutcome::NewerAvailable { current, latest }) => {
577                push_log(
578                    &logs_for_task,
579                    "info",
580                    "auto-update",
581                    &format!("update available {current} -> {latest}; applying"),
582                    None,
583                );
584                match update::apply(&feed, &latest) {
585                    Ok(()) => {
586                        push_log(
587                            &logs_for_task,
588                            "info",
589                            "auto-update",
590                            "binary replaced; restart pending",
591                            None,
592                        );
593                        Ok(AutoUpdateDecision::Updated)
594                    }
595                    Err(e) => {
596                        push_log(
597                            &logs_for_task,
598                            "error",
599                            "auto-update",
600                            &format!("update failed: {e}"),
601                            None,
602                        );
603                        Ok(AutoUpdateDecision::UpdateError(e.to_string()))
604                    }
605                }
606            }
607            Err(e) => {
608                push_log(
609                    &logs_for_task,
610                    "warn",
611                    "auto-update",
612                    &format!("check failed: {e}"),
613                    None,
614                );
615                Ok(AutoUpdateDecision::CheckError(e.to_string()))
616            }
617        }
618    })
619    .await;
620    match outcome {
621        Ok(Ok(decision)) => decision,
622        Ok(Err(e)) => AutoUpdateDecision::CheckError(e.to_string()),
623        Err(e) => AutoUpdateDecision::CheckError(e.to_string()),
624    }
625}
626
627// ---------------------------------------------------------------------------
628// Long-running task wrappers — they exist solely to call the ticks in a
629// loop on a schedule.  All real logic lives in the ticks.
630// ---------------------------------------------------------------------------
631
632// (`spawn_heartbeat`, `spawn_claim_loop`, `spawn_log_shipper`, and
633//  `next_delay_for` lived here.  Their behaviour is now carried by the
634//  WS-driven tasks in `ws::session`.)
635
636/// Sleep up to `total`, re-checking `stop` every `tick` and returning
637/// the instant a shutdown is requested.  Keeps long idle waits (the
638/// auto-update tick here, reconnect backoff in the WS session)
639/// responsive to SIGTERM / SIGINT without busy-looping.  Shared by the
640/// runtime auto-updater and `ws::session`.
641pub(crate) async fn wait_with_stop(total: Duration, stop: &Arc<AtomicBool>, tick: Duration) {
642    let mut elapsed = Duration::ZERO;
643    while elapsed < total {
644        if stop.load(Ordering::SeqCst) {
645            return;
646        }
647        let next = tick.min(total - elapsed);
648        tokio::time::sleep(next).await;
649        elapsed += next;
650    }
651}
652
653pub fn spawn_auto_updater(
654    cfg: SharedConfig,
655    stop: Arc<AtomicBool>,
656    logs: Arc<Mutex<Vec<LogEntry>>>,
657    busy: Arc<AtomicBool>,
658    schedule: LoopSchedule,
659) -> tokio::task::JoinHandle<()> {
660    tokio::spawn(async move {
661        let mut elapsed = Duration::from_secs(0);
662        while !stop.load(Ordering::SeqCst) {
663            // Stop-aware idle wait: a shutdown signal during this window
664            // wakes the loop within `schedule.shutdown_tick` instead of
665            // leaving `run_loops`' join() blocked for a full
666            // `auto_update_tick`.
667            wait_with_stop(schedule.auto_update_tick, &stop, schedule.shutdown_tick).await;
668            if stop.load(Ordering::SeqCst) {
669                break;
670            }
671            elapsed += schedule.auto_update_tick;
672            let snapshot = cfg.lock().clone();
673            if elapsed < Duration::from_secs(snapshot.auto_update_interval_secs) {
674                continue;
675            }
676            elapsed = Duration::from_secs(0);
677            let busy_now = busy.load(Ordering::SeqCst);
678            let decision = auto_update_tick(&snapshot, busy_now, &logs).await;
679            if matches!(decision, AutoUpdateDecision::Updated) {
680                stop.store(true, Ordering::SeqCst);
681                update::restart_self();
682            }
683        }
684    })
685}
686
687// (`run_job` lived here.  See `ws::session::run_offered_job` for the
688//  WS-driven replacement.)
689
690pub fn prompt_for(task: &Task) -> String {
691    match task {
692        Task::Image(p) => p.prompt.clone(),
693        Task::Llm(p) => p
694            .messages
695            .last()
696            .map(|m| m.content.clone())
697            .unwrap_or_default(),
698        Task::AudioStt(p) => p.input_url.clone(),
699        Task::AudioTts(p) => p.text.clone(),
700        Task::Video(p) => p.prompt.clone(),
701    }
702}
703
704pub fn is_unsupported_kind(e: &anyhow::Error) -> bool {
705    e.to_string().contains("cannot serve")
706}
707
708// ---------------------------------------------------------------------------
709// Helpers
710// ---------------------------------------------------------------------------
711
712pub fn build_capabilities(cfg: &Config, engine: &dyn Engine) -> WorkerCapabilities {
713    build_capabilities_with(cfg, engine, true)
714}
715
716/// Same as [`build_capabilities`] but lets the caller drive
717/// `auto_enabled` from a runtime pause flag (the UI's Pause/Resume
718/// button).  The persisted [`Config`] no longer carries that bit —
719/// it's an in-process toggle.
720pub fn build_capabilities_with(
721    cfg: &Config,
722    engine: &dyn Engine,
723    auto_enabled: bool,
724) -> WorkerCapabilities {
725    let vram = sys::detect_vram_gb().unwrap_or(0.0);
726    let caps = engine.capabilities();
727    let supported_models_per_kind = caps.supported_models_per_kind.clone();
728    let task_kinds = caps.kinds();
729    // Legacy `supported_models` is a flat list across all kinds so the
730    // studio API's claim filter (which only knows about this field) can
731    // match jobs of any modality this worker can serve.
732    let supported_models = {
733        let mut all = caps.flat_models();
734        all.sort();
735        all.dedup();
736        all
737    };
738
739    WorkerCapabilities {
740        machine_name: sys::machine_name(),
741        username: sys::username(),
742        agent_version: AGENT_VERSION.to_string(),
743        engine: engine.name().to_string(),
744        vram_total_gb: vram,
745        vram_threshold_gb: cfg.vram_threshold_gb,
746        auto_enabled,
747        auto_start: cfg.auto_start,
748        supported_models,
749        task_kinds,
750        supported_models_per_kind,
751    }
752}
753
754/// One-line, operator-facing summary of what this worker advertises to
755/// the studio on the WS handshake.  Logged once per session attempt so
756/// the worker's own logs (and the studio's shipped-log view) record
757/// exactly which task kinds, models, and VRAM budget were offered — the
758/// missing complement to [`log_startup_banner`], which only covers the
759/// loaded config.  Without it, an operator chasing "why won't my worker
760/// claim image jobs" has no record of what the worker told the studio
761/// it could do.  Pure so the formatting is unit-tested without a live
762/// session.
763pub fn summarize_capabilities(caps: &WorkerCapabilities) -> String {
764    let kinds = caps
765        .task_kinds
766        .iter()
767        .map(|k| k.as_str())
768        .collect::<Vec<_>>()
769        .join(", ");
770    format!(
771        "advertising engine={}, vram={:.1}/{:.1}GB threshold, auto_enabled={}, \
772         kinds=[{}], {} model(s)=[{}]",
773        caps.engine,
774        caps.vram_total_gb,
775        caps.vram_threshold_gb,
776        caps.auto_enabled,
777        kinds,
778        caps.supported_models.len(),
779        caps.supported_models.join(", "),
780    )
781}
782
783pub fn push_log(
784    logs: &Arc<Mutex<Vec<LogEntry>>>,
785    level: &str,
786    category: &str,
787    message: &str,
788    job_id: Option<String>,
789) {
790    push_log_with_observers(logs, None, level, category, message, job_id);
791}
792
793/// Same as [`push_log`] but also appends to
794/// [`WorkerObservers::recent_logs`] so the UI's Logs tab keeps a
795/// rolling display window.  The WS session uses this variant so
796/// operators don't see the Logs tab blank out every second when the
797/// shipping queue gets drained.
798pub fn push_log_with_observers(
799    logs: &Arc<Mutex<Vec<LogEntry>>>,
800    observers: Option<&WorkerObservers>,
801    level: &str,
802    category: &str,
803    message: &str,
804    job_id: Option<String>,
805) {
806    let entry = LogEntry {
807        ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
808        level: level.to_string(),
809        category: category.to_string(),
810        message: message.to_string(),
811        job_id,
812    };
813    // Carry the job id as a structured field so operators can pivot
814    // shipped studio logs / Sentry breadcrumbs on it. `Option<&str>`
815    // only records the field when `Some`, so jobless breadcrumbs stay
816    // free of a noisy empty `job_id`.
817    let job_id = entry.job_id.as_deref();
818    if level == "error" {
819        tracing::error!(target: "studio_worker", job_id, "[{category}] {message}");
820    } else if level == "warn" {
821        tracing::warn!(target: "studio_worker", job_id, "[{category}] {message}");
822    } else {
823        info!(target: "studio_worker", job_id, "[{category}] {message}");
824    }
825    logs.lock().push(entry.clone());
826    if let Some(o) = observers {
827        let mut ring = o.recent_logs.lock();
828        ring.push_back(entry);
829        while ring.len() > RECENT_LOGS_CAP {
830            ring.pop_front();
831        }
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838    use crate::config::Config;
839    use crate::engine::SyntheticEngine;
840
841    #[test]
842    fn capabilities_advertises_all_synthetic_kinds() {
843        let cfg = Config::default();
844        let engine = SyntheticEngine::new();
845        let cap = build_capabilities(&cfg, &engine);
846        assert_eq!(cap.engine, "synthetic");
847        assert_eq!(cap.task_kinds.len(), TaskKind::ALL.len());
848        assert!(cap.auto_enabled, "default capability snapshot is unpaused");
849        for kind in TaskKind::ALL {
850            assert!(cap.supported_models_per_kind.contains_key(&kind));
851        }
852    }
853
854    #[test]
855    fn capabilities_with_paused_flag_drives_auto_enabled() {
856        let cfg = Config::default();
857        let engine = SyntheticEngine::new();
858        let paused_caps = build_capabilities_with(&cfg, &engine, false);
859        assert!(!paused_caps.auto_enabled);
860    }
861
862    #[test]
863    fn summarize_capabilities_lists_engine_kinds_models_vram_and_pause_state() {
864        let cfg = Config {
865            vram_threshold_gb: 6.0,
866            ..Config::default()
867        };
868        let engine = SyntheticEngine::new();
869        let caps = build_capabilities_with(&cfg, &engine, true);
870        let summary = summarize_capabilities(&caps);
871        // Engine name + every advertised kind is present.
872        assert!(summary.contains("engine=synthetic"), "got: {summary}");
873        for kind in &caps.task_kinds {
874            assert!(
875                summary.contains(kind.as_str()),
876                "missing kind {} in: {summary}",
877                kind.as_str()
878            );
879        }
880        // Model count + an actual advertised model id are present.
881        assert!(
882            summary.contains(&format!("{} model(s)", caps.supported_models.len())),
883            "missing model count in: {summary}"
884        );
885        assert!(
886            summary.contains("synthetic"),
887            "missing model id in: {summary}"
888        );
889        // VRAM budget (total/threshold) + unpaused state are visible.
890        assert!(
891            summary.contains("6.0"),
892            "missing vram threshold in: {summary}"
893        );
894        assert!(summary.contains("auto_enabled=true"), "got: {summary}");
895    }
896
897    #[test]
898    fn summarize_capabilities_reflects_paused_state() {
899        let cfg = Config::default();
900        let engine = SyntheticEngine::new();
901        let caps = build_capabilities_with(&cfg, &engine, false);
902        assert!(
903            summarize_capabilities(&caps).contains("auto_enabled=false"),
904            "paused worker must advertise auto_enabled=false"
905        );
906    }
907
908    #[test]
909    fn prompt_for_extracts_per_kind() {
910        let image = Task::Image(ImageParams {
911            prompt: "a stone golem".into(),
912            ..Default::default()
913        });
914        assert_eq!(prompt_for(&image), "a stone golem");
915
916        let llm = Task::Llm(LlmParams {
917            messages: vec![
918                ChatMessage {
919                    role: "system".into(),
920                    content: "be helpful".into(),
921                },
922                ChatMessage {
923                    role: "user".into(),
924                    content: "hi".into(),
925                },
926            ],
927            max_tokens: 32,
928            temperature: 0.5,
929            ..Default::default()
930        });
931        assert_eq!(prompt_for(&llm), "hi");
932
933        let llm_empty = Task::Llm(LlmParams {
934            messages: vec![],
935            ..Default::default()
936        });
937        assert_eq!(prompt_for(&llm_empty), "");
938
939        let stt = Task::AudioStt(AudioSttParams {
940            input_url: "https://example.com/clip.wav".into(),
941            ..Default::default()
942        });
943        assert_eq!(prompt_for(&stt), "https://example.com/clip.wav");
944
945        let tts = Task::AudioTts(AudioTtsParams {
946            text: "hi there".into(),
947            voice: "v".into(),
948            ext: "wav".into(),
949            ..Default::default()
950        });
951        assert_eq!(prompt_for(&tts), "hi there");
952
953        let video = Task::Video(VideoParams {
954            prompt: "a tiny dragon".into(),
955            seconds: 1.0,
956            width: 256,
957            height: 256,
958            ext: "mp4".into(),
959            ..Default::default()
960        });
961        assert_eq!(prompt_for(&video), "a tiny dragon");
962    }
963
964    #[test]
965    fn is_unsupported_kind_matches_engine_message() {
966        let err = anyhow!("multi engine cannot serve llm tasks");
967        assert!(is_unsupported_kind(&err));
968        let other = anyhow!("network timeout");
969        assert!(!is_unsupported_kind(&other));
970    }
971
972    #[test]
973    fn format_status_includes_every_field() {
974        let cfg = Config::default();
975        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
976        assert!(out.contains("config path:"));
977        assert!(out.contains("api_base_url:"));
978        assert!(out.contains("registration:"));
979        assert!(out.contains("not registered"));
980        assert!(out.contains("models_root:"));
981        assert!(out.contains("auto_update:"));
982        assert!(out.contains("update_interval:"));
983    }
984
985    #[test]
986    fn format_status_shows_worker_id_when_registered() {
987        let cfg = Config {
988            worker_id: Some("w-abc".into()),
989            auth_token: Some("tok".into()),
990            ..Config::default()
991        };
992        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
993        assert!(out.contains("w-abc"));
994        assert!(out.contains("approved"));
995    }
996
997    #[test]
998    fn format_status_shows_pending_request_id() {
999        let cfg = Config {
1000            registration_request_id: Some("rr-7".into()),
1001            ..Config::default()
1002        };
1003        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1004        assert!(out.contains("pending operator approval"));
1005        assert!(out.contains("rr-7"));
1006    }
1007
1008    #[test]
1009    fn format_check_outcome_handles_both_branches() {
1010        let up = update::CheckOutcome::UpToDate {
1011            current: semver::Version::new(1, 2, 3),
1012        };
1013        assert!(format_check_outcome(&up).contains("up to date"));
1014        let newer = update::CheckOutcome::NewerAvailable {
1015            current: semver::Version::new(1, 2, 3),
1016            latest: semver::Version::new(1, 3, 0),
1017        };
1018        let s = format_check_outcome(&newer);
1019        assert!(s.contains("1.2.3 -> 1.3.0"));
1020    }
1021
1022    #[test]
1023    fn push_log_appends_an_entry() {
1024        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1025        push_log(&logs, "info", "test", "hi", None);
1026        push_log(&logs, "warn", "test", "wat", Some("j-1".into()));
1027        push_log(&logs, "error", "test", "boom", None);
1028        let v = logs.lock();
1029        assert_eq!(v.len(), 3);
1030        assert_eq!(v[0].level, "info");
1031        assert_eq!(v[1].level, "warn");
1032        assert_eq!(v[1].job_id.as_deref(), Some("j-1"));
1033        assert_eq!(v[2].level, "error");
1034    }
1035
1036    #[test]
1037    fn push_log_emits_job_id_as_a_structured_tracing_field() {
1038        // Operators correlating shipped studio logs / Sentry
1039        // breadcrumbs by job need the job id as a *field*, not just
1040        // buried in the message text, so `RUST_LOG` filters and Sentry
1041        // tag search can pivot on it.
1042        use crate::test_support::capture;
1043        let logs = capture(|| {
1044            let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1045            push_log(
1046                &logs,
1047                "info",
1048                "ws",
1049                "binary upload ok",
1050                Some("job-42".into()),
1051            );
1052        });
1053        assert!(
1054            logs.contains("job_id=\"job-42\""),
1055            "expected structured job_id field, got: {logs}"
1056        );
1057        assert!(
1058            logs.contains("[ws] binary upload ok"),
1059            "expected the human-readable message to survive, got: {logs}"
1060        );
1061    }
1062
1063    #[test]
1064    fn push_log_omits_job_id_field_when_absent() {
1065        // Jobless breadcrumbs (startup banners, heartbeats, auto-update
1066        // ticks) must not gain a noisy empty `job_id` field.
1067        use crate::test_support::capture;
1068        let logs = capture(|| {
1069            let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1070            push_log(&logs, "info", "auto-update", "up to date", None);
1071        });
1072        assert!(
1073            !logs.contains("job_id"),
1074            "expected no job_id field for a jobless log, got: {logs}"
1075        );
1076    }
1077
1078    // --- async tick tests ---
1079
1080    #[test]
1081    fn request_shutdown_sets_the_stop_flag() {
1082        let stop = AtomicBool::new(false);
1083        request_shutdown(&stop, "SIGTERM");
1084        assert!(stop.load(Ordering::SeqCst));
1085    }
1086
1087    #[test]
1088    fn request_shutdown_reconfirms_when_already_stopping() {
1089        // A second signal (or a race with another shutdown path) must
1090        // not panic or clear the flag — it just re-confirms the stop.
1091        let stop = AtomicBool::new(true);
1092        request_shutdown(&stop, "SIGINT");
1093        assert!(stop.load(Ordering::SeqCst));
1094    }
1095
1096    #[test]
1097    fn request_shutdown_emits_a_named_shutdown_breadcrumb() {
1098        use crate::test_support::capture;
1099        let logs = capture(|| {
1100            let stop = AtomicBool::new(false);
1101            request_shutdown(&stop, "SIGTERM");
1102        });
1103        assert!(logs.contains("INFO"), "expected INFO event, got: {logs}");
1104        assert!(
1105            logs.contains("studio_worker::runtime"),
1106            "expected runtime target, got: {logs}"
1107        );
1108        assert!(
1109            logs.contains("op=\"shutdown\""),
1110            "expected op field, got: {logs}"
1111        );
1112        assert!(
1113            logs.contains("signal=\"SIGTERM\""),
1114            "expected signal field, got: {logs}"
1115        );
1116    }
1117
1118    #[tokio::test]
1119    async fn auto_update_tick_disabled_when_flag_off() {
1120        let cfg = Config {
1121            auto_update_enabled: false,
1122            ..Config::default()
1123        };
1124        let logs = Arc::new(Mutex::new(Vec::new()));
1125        let decision = auto_update_tick(&cfg, false, &logs).await;
1126        assert_eq!(decision, AutoUpdateDecision::Disabled);
1127    }
1128
1129    #[tokio::test]
1130    async fn auto_update_tick_skipped_when_busy() {
1131        let cfg = Config {
1132            auto_update_enabled: true,
1133            ..Config::default()
1134        };
1135        let logs = Arc::new(Mutex::new(Vec::new()));
1136        let decision = auto_update_tick(&cfg, true, &logs).await;
1137        assert_eq!(decision, AutoUpdateDecision::SkippedBusy);
1138        let entries = logs.lock();
1139        assert!(entries.iter().any(|e| e.message.contains("busy on a job")));
1140    }
1141
1142    #[tokio::test]
1143    async fn wait_with_stop_short_circuits_when_already_stopped() {
1144        let stop = Arc::new(AtomicBool::new(true));
1145        let start = std::time::Instant::now();
1146        wait_with_stop(Duration::from_secs(60), &stop, Duration::from_millis(10)).await;
1147        assert!(
1148            start.elapsed() < Duration::from_millis(100),
1149            "an already-set stop must return without sleeping the full duration"
1150        );
1151    }
1152
1153    #[tokio::test]
1154    async fn auto_updater_stops_promptly_during_idle_wait() {
1155        // A huge auto_update_tick means a non-cancellable idle sleep
1156        // would pin the JoinHandle — and thus `run_loops`' join() — for
1157        // the whole tick after stop is set, defeating graceful
1158        // shutdown.  The stop-aware wait must let the task finish well
1159        // inside the tick.
1160        let cfg = crate::config::shared(Config {
1161            auto_update_enabled: false,
1162            ..Config::default()
1163        });
1164        let stop = Arc::new(AtomicBool::new(false));
1165        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1166        let busy = Arc::new(AtomicBool::new(false));
1167        let schedule = LoopSchedule {
1168            ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
1169            auto_update_tick: Duration::from_secs(3600),
1170            shutdown_tick: Duration::from_millis(1),
1171        };
1172        let handle = spawn_auto_updater(cfg, stop.clone(), logs, busy, schedule);
1173        // Let the loop reach its idle wait, then request shutdown.
1174        tokio::time::sleep(Duration::from_millis(10)).await;
1175        stop.store(true, Ordering::SeqCst);
1176        tokio::time::timeout(Duration::from_millis(250), handle)
1177            .await
1178            .expect("auto-updater did not observe stop promptly")
1179            .expect("auto-updater task panicked");
1180    }
1181}