Skip to main content

studio_worker/
runtime.rs

1//! Long-running auto-update task + one-shot CLI helpers.
2//!
3//! After the WS migration the runtime owns just two background
4//! tasks: the WebSocket session (`ws::session::spawn_ws_session`,
5//! which subsumes heartbeats, claim/accept/complete, fail, and log
6//! shipping) and the auto-updater (`spawn_auto_updater`).  Per-tick
7//! helpers from the old polling loops are gone.
8use crate::{
9    config::{self, Config, SharedConfig},
10    engine::Engine,
11    sys,
12    types::*,
13    update, AGENT_VERSION,
14};
15use anyhow::{anyhow, Result};
16use chrono::{DateTime, SecondsFormat, Utc};
17use parking_lot::Mutex;
18use std::{
19    collections::VecDeque,
20    sync::{
21        atomic::{AtomicBool, Ordering},
22        Arc,
23    },
24    time::Duration,
25};
26use tracing::info;
27
28/// Tracing target for runtime-level events (startup, state mutations).
29/// Stable so operators can filter with `RUST_LOG=studio_worker::runtime=debug`.
30const TRACE_TARGET: &str = "studio_worker::runtime";
31
32/// Maximum number of finished jobs kept in `WorkerObservers::recent_jobs`.
33/// Older entries fall off the back of the ring.
34pub const RECENT_JOBS_CAP: usize = 50;
35
36/// Prompt previews stored in `CurrentJob` / `RecentJob` are clipped to
37/// this many chars so the in-memory state stays bounded even when LLM
38/// prompts are huge.
39pub const PROMPT_PREVIEW_CHARS: usize = 200;
40
41/// Job in flight right now.  Populated by the WS session before
42/// dispatch, cleared once the job finishes (success or failure).
43#[derive(Debug, Clone)]
44pub struct CurrentJob {
45    pub job_id: String,
46    pub kind: TaskKind,
47    pub model: String,
48    pub prompt: String,
49    pub started_at: DateTime<Utc>,
50}
51
52/// Outcome a finished job ended with.  Failures carry the human
53/// reason (already surfaced to logs + Sentry).
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum JobOutcome {
56    Completed,
57    Failed { reason: String },
58}
59
60/// One finished job, retained in the recent-jobs ring for the UI.
61#[derive(Debug, Clone)]
62pub struct RecentJob {
63    pub job_id: String,
64    pub kind: TaskKind,
65    pub model: String,
66    pub prompt: String,
67    pub outcome: JobOutcome,
68    pub started_at: DateTime<Utc>,
69    pub finished_at: DateTime<Utc>,
70}
71
72/// Result of the most recent heartbeat the WS session sent.
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum HeartbeatOutcome {
75    Ok,
76    Err { reason: String },
77}
78
79#[derive(Debug, Clone)]
80pub struct HeartbeatStatus {
81    pub last_attempt_at: DateTime<Utc>,
82    pub outcome: HeartbeatOutcome,
83}
84
85/// Bundle of in-process observation slots the WS session writes to and
86/// the optional native UI reads from.  `Default` gives empty slots so
87/// existing (headless) call sites stay one-liners.  Cheap to clone —
88/// every field is an `Arc`.
89#[derive(Clone, Default)]
90pub struct WorkerObservers {
91    pub current_job: Arc<Mutex<Option<CurrentJob>>>,
92    pub recent_jobs: Arc<Mutex<VecDeque<RecentJob>>>,
93    pub last_heartbeat: Arc<Mutex<Option<HeartbeatStatus>>>,
94}
95
96pub fn truncate_prompt(s: &str) -> String {
97    if s.chars().count() <= PROMPT_PREVIEW_CHARS {
98        return s.to_string();
99    }
100    let mut out: String = s.chars().take(PROMPT_PREVIEW_CHARS).collect();
101    out.push('…');
102    out
103}
104
105pub fn record_recent_job(observers: &WorkerObservers, entry: RecentJob) {
106    let mut ring = observers.recent_jobs.lock();
107    ring.push_front(entry);
108    while ring.len() > RECENT_JOBS_CAP {
109        ring.pop_back();
110    }
111}
112
113/// Test-only helper to populate the recent-jobs ring without driving a
114/// full claim cycle.  Lives in the library surface so integration
115/// tests can pin the ring-capacity contract cheaply.
116#[doc(hidden)]
117pub fn push_recent_job_for_tests(observers: &WorkerObservers, job_id: &str) {
118    let now = Utc::now();
119    record_recent_job(
120        observers,
121        RecentJob {
122            job_id: job_id.to_string(),
123            kind: TaskKind::Image,
124            model: "synthetic".into(),
125            prompt: String::new(),
126            outcome: JobOutcome::Completed,
127            started_at: now,
128            finished_at: now,
129        },
130    );
131}
132
133pub const AUTO_UPDATE_TICK: Duration = Duration::from_secs(60);
134/// Default WS heartbeat interval, re-exported here so the native UI
135/// (and any other downstream readers) get a stable constant without
136/// reaching into `ws::session`.
137pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
138
139/// Schedule for the auto-updater loop.  The WS session has its own
140/// `SessionSchedule` (see `ws::session`).
141#[derive(Debug, Clone, Copy)]
142pub struct LoopSchedule {
143    pub auto_update_tick: Duration,
144}
145
146impl Default for LoopSchedule {
147    fn default() -> Self {
148        Self {
149            auto_update_tick: AUTO_UPDATE_TICK,
150        }
151    }
152}
153
154impl LoopSchedule {
155    /// Schedule with 1 ms intervals — used by tests to exercise the
156    /// loop wrappers without blocking.
157    pub fn fast_for_tests() -> Self {
158        Self {
159            auto_update_tick: Duration::from_millis(1),
160        }
161    }
162}
163
164// ---------------------------------------------------------------------------
165// One-shot helpers used by the CLI subcommands
166// ---------------------------------------------------------------------------
167
168/// Bundle of flags from `studio-worker register`.
169#[derive(Debug, Clone, Default)]
170pub struct RegisterArgs {
171    pub api_base_url: Option<String>,
172    pub label: Option<String>,
173    pub reset: bool,
174}
175
176/// Persist registration metadata for the next launch.  No HTTP — the
177/// auto-register orchestration inside `run` / `ui` is the only thing
178/// that talks to the studio.
179pub async fn register(config_path: Option<&str>, args: RegisterArgs) -> Result<()> {
180    let (mut cfg, path) = config::load(config_path)?;
181
182    if args.reset {
183        cfg.worker_id = None;
184        cfg.auth_token = None;
185        cfg.registration_request_id = None;
186        cfg.registration_secret = None;
187        cfg.install_id = None;
188    }
189    if let Some(url) = args.api_base_url {
190        cfg.api_base_url = url;
191    }
192    if let Some(label) = args.label {
193        cfg.label = if label.trim().is_empty() {
194            None
195        } else {
196            Some(label)
197        };
198    }
199
200    config::save(&cfg, &path)?;
201    if args.reset {
202        info!(
203            config_path = %path.display(),
204            "local registration state cleared; next launch will auto-register"
205        );
206        println!(
207            "local registration state cleared; run `studio-worker run` or \
208             `studio-worker ui` to auto-register"
209        );
210    } else {
211        info!(
212            config_path = %path.display(),
213            "register flags persisted; next launch will auto-register"
214        );
215        println!(
216            "saved; run `studio-worker run` or `studio-worker ui` to auto-register against {}",
217            cfg.api_base_url
218        );
219    }
220    Ok(())
221}
222
223pub async fn status(config_path: Option<&str>) -> Result<()> {
224    let (cfg, path) = config::load(config_path)?;
225    println!("{}", format_status(&cfg, &path));
226    Ok(())
227}
228
229pub fn format_status(cfg: &Config, path: &std::path::Path) -> String {
230    let mut out = String::new();
231    use std::fmt::Write as _;
232    let _ = writeln!(out, "config path:        {}", path.display());
233    let _ = writeln!(out, "api_base_url:       {}", cfg.api_base_url);
234    let registration_line = if cfg.worker_id.is_some() && cfg.auth_token.is_some() {
235        format!("approved as {}", cfg.worker_id.as_deref().unwrap_or(""))
236    } else if let Some(rid) = cfg.registration_request_id.as_deref() {
237        format!("pending operator approval (request {rid})")
238    } else {
239        "not registered (will auto-register on next launch)".into()
240    };
241    let _ = writeln!(out, "registration:       {registration_line}");
242    if let Some(label) = cfg.label.as_deref() {
243        let _ = writeln!(out, "label:              {label}");
244    }
245    let _ = writeln!(out, "engine:             {}", cfg.engine);
246    let _ = writeln!(out, "vram_threshold_gb:  {}", cfg.vram_threshold_gb);
247    let _ = writeln!(out, "auto_enabled:       {}", cfg.auto_enabled);
248    let _ = writeln!(out, "auto_start:         {}", cfg.auto_start);
249    let _ = writeln!(out, "auto_update:        {}", cfg.auto_update_enabled);
250    let _ = writeln!(
251        out,
252        "update_interval:    {}s",
253        cfg.auto_update_interval_secs
254    );
255    out
256}
257
258pub fn set_enabled(config_path: Option<&str>, enabled: bool) -> Result<()> {
259    let (mut cfg, path) = config::load(config_path)?;
260    cfg.auto_enabled = enabled;
261    config::save(&cfg, &path)?;
262    info!(
263        target: TRACE_TARGET,
264        op = "set_enabled",
265        auto_enabled = enabled,
266        config_path = path.display().to_string(),
267        "auto-claim flag persisted"
268    );
269    println!("auto_enabled = {enabled}");
270    Ok(())
271}
272
273pub fn set_threshold(config_path: Option<&str>, gb: f32) -> Result<()> {
274    if gb < 0.0 {
275        return Err(anyhow!("threshold must be >= 0"));
276    }
277    let (mut cfg, path) = config::load(config_path)?;
278    cfg.vram_threshold_gb = gb;
279    config::save(&cfg, &path)?;
280    info!(
281        target: TRACE_TARGET,
282        op = "set_threshold",
283        vram_threshold_gb = gb,
284        config_path = path.display().to_string(),
285        "VRAM threshold persisted"
286    );
287    println!("vram_threshold_gb = {gb}");
288    Ok(())
289}
290
291/// Emit a one-shot startup banner so operators can confirm which
292/// config the worker actually loaded.  Without this the only thing in
293/// `journalctl -u studio-worker` on a healthy boot is whatever the
294/// loops happen to log on their first tick.
295pub fn log_startup_banner(cfg: &Config, path: &std::path::Path) {
296    info!(
297        target: TRACE_TARGET,
298        op = "startup",
299        version = AGENT_VERSION,
300        config_path = path.display().to_string(),
301        api_base_url = cfg.api_base_url.as_str(),
302        engine = cfg.engine.as_str(),
303        vram_threshold_gb = cfg.vram_threshold_gb,
304        auto_enabled = cfg.auto_enabled,
305        auto_update_enabled = cfg.auto_update_enabled,
306        auto_update_interval_secs = cfg.auto_update_interval_secs,
307        worker_id = cfg.worker_id.as_deref().unwrap_or("(unregistered)"),
308        "studio-worker booting"
309    );
310}
311
312pub fn show_config(config_path: Option<&str>) -> Result<()> {
313    let (cfg, path) = config::load(config_path)?;
314    println!("# {}", path.display());
315    print!("{}", toml::to_string_pretty(&cfg)?);
316    Ok(())
317}
318
319pub async fn check_update(config_path: Option<&str>) -> Result<()> {
320    let (cfg, _) = config::load(config_path)?;
321    let current = semver::Version::parse(AGENT_VERSION)
322        .map_err(|e| anyhow!("invalid current version {AGENT_VERSION}: {e}"))?;
323    let outcome = tokio::task::spawn_blocking(move || {
324        update::check(&cfg.auto_update_feed, &current, cfg.auto_update_prerelease)
325    })
326    .await??;
327    println!("{}", format_check_outcome(&outcome));
328    Ok(())
329}
330
331pub fn format_check_outcome(outcome: &update::CheckOutcome) -> String {
332    match outcome {
333        update::CheckOutcome::UpToDate { current } => format!("up to date: {current}"),
334        update::CheckOutcome::NewerAvailable { current, latest } => {
335            format!("update available: {current} -> {latest}")
336        }
337    }
338}
339
340// ---------------------------------------------------------------------------
341// Long-running run loop
342// ---------------------------------------------------------------------------
343
344pub async fn run(config_path: Option<&str>) -> Result<()> {
345    let (cfg, path) = config::load(config_path)?;
346    log_startup_banner(&cfg, &path);
347
348    let cfg = config::shared(cfg);
349    let stop = Arc::new(AtomicBool::new(false));
350    let busy = Arc::new(AtomicBool::new(false));
351    let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
352    let observers = WorkerObservers::default();
353    let registration = crate::auto_register::shared_initial();
354
355    let stop_clone = stop.clone();
356    tokio::spawn(async move {
357        let _ = tokio::signal::ctrl_c().await;
358        stop_clone.store(true, Ordering::SeqCst);
359    });
360
361    // Block on auto-register until the operator approves (or rejects).
362    // Polls every 30s; aborts on Ctrl-C.
363    ensure_registered(&cfg, &path, &registration, &stop).await?;
364
365    run_loops(cfg, stop, logs, busy, observers, LoopSchedule::default()).await
366}
367
368/// Loop auto_register::tick on a 30s cadence until `worker_id` +
369/// `auth_token` are populated (Approved) or the operator rejects.
370pub async fn ensure_registered(
371    cfg: &SharedConfig,
372    path: &std::path::Path,
373    registration: &crate::auto_register::SharedRegistration,
374    stop: &Arc<AtomicBool>,
375) -> Result<()> {
376    use std::time::Duration;
377    loop {
378        if stop.load(Ordering::SeqCst) {
379            return Err(anyhow!("shutdown before registration completed"));
380        }
381        {
382            let snap = cfg.lock();
383            if snap.worker_id.is_some() && snap.auth_token.is_some() {
384                return Ok(());
385            }
386        }
387        let state = crate::auto_register::tick(cfg, path, registration).await;
388        match state {
389            crate::auto_register::RegistrationState::Approved => return Ok(()),
390            crate::auto_register::RegistrationState::Rejected { reason } => {
391                return Err(anyhow!(
392                    "registration rejected by the studio operator: {reason}.  \
393                     Run `studio-worker register --reset` to clear local state \
394                     and submit a fresh request."
395                ));
396            }
397            _ => {}
398        }
399        // Sleep with a fast-cancel on stop.
400        for _ in 0..30 {
401            if stop.load(Ordering::SeqCst) {
402                return Err(anyhow!("shutdown during registration wait"));
403            }
404            tokio::time::sleep(Duration::from_secs(1)).await;
405        }
406    }
407}
408
409/// Spawn the WS session + auto-updater, wait for them.  Pulled out of
410/// `run` so tests can drive with a different schedule.
411pub async fn run_loops(
412    cfg: SharedConfig,
413    stop: Arc<AtomicBool>,
414    logs: Arc<Mutex<Vec<LogEntry>>>,
415    busy: Arc<AtomicBool>,
416    observers: WorkerObservers,
417    schedule: LoopSchedule,
418) -> Result<()> {
419    let session_schedule = crate::ws::session::SessionSchedule::default();
420    let session = crate::ws::session::spawn_ws_session(
421        cfg.clone(),
422        stop.clone(),
423        logs.clone(),
424        busy.clone(),
425        observers.clone(),
426        session_schedule,
427    );
428    let auto_updater = spawn_auto_updater(
429        cfg.clone(),
430        stop.clone(),
431        logs.clone(),
432        busy.clone(),
433        schedule,
434    );
435    let (session_result, _) = tokio::join!(session, auto_updater);
436    session_result
437}
438
439// ---------------------------------------------------------------------------
440// Per-tick helpers — pure async fns, easy to drive from unit tests.
441// ---------------------------------------------------------------------------
442
443// (The old per-tick HTTP helpers — heartbeat_tick, claim_tick, log_shipper_tick,
444//  run_job, ClaimOutcome — lived here.  They are gone with the WS migration.
445//  See `ws::session::spawn_ws_session` for the replacement that runs the
446//  whole session in one connected loop.)
447
448/// What the auto-updater decided this tick.
449#[derive(Debug, Clone, PartialEq, Eq)]
450pub enum AutoUpdateDecision {
451    /// Auto-update is turned off — do nothing.
452    Disabled,
453    /// Worker is currently running a job — skip.
454    SkippedBusy,
455    /// Local version is already the latest.
456    UpToDate,
457    /// Check failed (network etc.) — leave a log entry, try again later.
458    CheckError(String),
459    /// A newer version was applied successfully.  Caller should restart.
460    Updated,
461    /// A newer version was found but the install failed.
462    UpdateError(String),
463}
464
465pub async fn auto_update_tick(
466    cfg: &Config,
467    busy: bool,
468    logs: &Arc<Mutex<Vec<LogEntry>>>,
469) -> AutoUpdateDecision {
470    if !cfg.auto_update_enabled {
471        return AutoUpdateDecision::Disabled;
472    }
473    if busy {
474        push_log(
475            logs,
476            "info",
477            "auto-update",
478            "skipping check: worker is busy on a job",
479            None,
480        );
481        return AutoUpdateDecision::SkippedBusy;
482    }
483    let feed = cfg.auto_update_feed.clone();
484    let prerelease = cfg.auto_update_prerelease;
485    let logs_for_task = logs.clone();
486    let outcome = tokio::task::spawn_blocking(move || -> Result<AutoUpdateDecision> {
487        let current = semver::Version::parse(AGENT_VERSION)
488            .map_err(|e| anyhow!("invalid AGENT_VERSION {AGENT_VERSION}: {e}"))?;
489        match update::check(&feed, &current, prerelease) {
490            Ok(update::CheckOutcome::UpToDate { current }) => {
491                push_log(
492                    &logs_for_task,
493                    "info",
494                    "auto-update",
495                    &format!("up to date at {current}"),
496                    None,
497                );
498                Ok(AutoUpdateDecision::UpToDate)
499            }
500            Ok(update::CheckOutcome::NewerAvailable { current, latest }) => {
501                push_log(
502                    &logs_for_task,
503                    "info",
504                    "auto-update",
505                    &format!("update available {current} -> {latest}; applying"),
506                    None,
507                );
508                match update::apply(&feed, &latest) {
509                    Ok(()) => {
510                        push_log(
511                            &logs_for_task,
512                            "info",
513                            "auto-update",
514                            "binary replaced; restart pending",
515                            None,
516                        );
517                        Ok(AutoUpdateDecision::Updated)
518                    }
519                    Err(e) => {
520                        push_log(
521                            &logs_for_task,
522                            "error",
523                            "auto-update",
524                            &format!("update failed: {e}"),
525                            None,
526                        );
527                        Ok(AutoUpdateDecision::UpdateError(e.to_string()))
528                    }
529                }
530            }
531            Err(e) => {
532                push_log(
533                    &logs_for_task,
534                    "warn",
535                    "auto-update",
536                    &format!("check failed: {e}"),
537                    None,
538                );
539                Ok(AutoUpdateDecision::CheckError(e.to_string()))
540            }
541        }
542    })
543    .await;
544    match outcome {
545        Ok(Ok(decision)) => decision,
546        Ok(Err(e)) => AutoUpdateDecision::CheckError(e.to_string()),
547        Err(e) => AutoUpdateDecision::CheckError(e.to_string()),
548    }
549}
550
551// ---------------------------------------------------------------------------
552// Long-running task wrappers — they exist solely to call the ticks in a
553// loop on a schedule.  All real logic lives in the ticks.
554// ---------------------------------------------------------------------------
555
556// (`spawn_heartbeat`, `spawn_claim_loop`, `spawn_log_shipper`, and
557//  `next_delay_for` lived here.  Their behaviour is now carried by the
558//  WS-driven tasks in `ws::session`.)
559
560pub fn spawn_auto_updater(
561    cfg: SharedConfig,
562    stop: Arc<AtomicBool>,
563    logs: Arc<Mutex<Vec<LogEntry>>>,
564    busy: Arc<AtomicBool>,
565    schedule: LoopSchedule,
566) -> tokio::task::JoinHandle<()> {
567    tokio::spawn(async move {
568        let mut elapsed = Duration::from_secs(0);
569        while !stop.load(Ordering::SeqCst) {
570            tokio::time::sleep(schedule.auto_update_tick).await;
571            elapsed += schedule.auto_update_tick;
572            let snapshot = cfg.lock().clone();
573            if elapsed < Duration::from_secs(snapshot.auto_update_interval_secs) {
574                continue;
575            }
576            elapsed = Duration::from_secs(0);
577            let busy_now = busy.load(Ordering::SeqCst);
578            let decision = auto_update_tick(&snapshot, busy_now, &logs).await;
579            if matches!(decision, AutoUpdateDecision::Updated) {
580                stop.store(true, Ordering::SeqCst);
581                update::restart_self();
582            }
583        }
584    })
585}
586
587// (`run_job` lived here.  See `ws::session::run_offered_job` for the
588//  WS-driven replacement.)
589
590pub fn prompt_for(task: &Task) -> String {
591    match task {
592        Task::Image(p) => p.prompt.clone(),
593        Task::Llm(p) => p
594            .messages
595            .last()
596            .map(|m| m.content.clone())
597            .unwrap_or_default(),
598        Task::AudioStt(p) => p.input_url.clone(),
599        Task::AudioTts(p) => p.text.clone(),
600        Task::Video(p) => p.prompt.clone(),
601    }
602}
603
604pub fn is_unsupported_kind(e: &anyhow::Error) -> bool {
605    e.to_string().contains("cannot serve")
606}
607
608// ---------------------------------------------------------------------------
609// Helpers
610// ---------------------------------------------------------------------------
611
612pub fn build_capabilities(cfg: &Config, engine: &dyn Engine) -> WorkerCapabilities {
613    let vram = sys::detect_vram_gb().unwrap_or(0.0);
614    let caps = engine.capabilities();
615    let supported_models_per_kind = caps.supported_models_per_kind.clone();
616    let task_kinds = caps.kinds();
617    // Legacy `supported_models` is a flat list across all kinds so the
618    // studio API's claim filter (which only knows about this field) can
619    // match jobs of any modality this worker can serve.
620    let supported_models = {
621        let mut all = caps.flat_models();
622        all.sort();
623        all.dedup();
624        all
625    };
626    let supported_models = if cfg.supported_models_override.is_empty() {
627        supported_models
628    } else {
629        cfg.supported_models_override.clone()
630    };
631
632    WorkerCapabilities {
633        machine_name: sys::machine_name(),
634        username: sys::username(),
635        agent_version: AGENT_VERSION.to_string(),
636        engine: cfg.engine.clone(),
637        vram_total_gb: vram,
638        vram_threshold_gb: cfg.vram_threshold_gb,
639        auto_enabled: cfg.auto_enabled,
640        auto_start: cfg.auto_start,
641        supported_models,
642        task_kinds,
643        supported_models_per_kind,
644    }
645}
646
647pub fn push_log(
648    logs: &Arc<Mutex<Vec<LogEntry>>>,
649    level: &str,
650    category: &str,
651    message: &str,
652    job_id: Option<String>,
653) {
654    let entry = LogEntry {
655        ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
656        level: level.to_string(),
657        category: category.to_string(),
658        message: message.to_string(),
659        job_id,
660    };
661    if level == "error" {
662        tracing::error!(target: "studio_worker", "[{category}] {message}");
663    } else if level == "warn" {
664        tracing::warn!(target: "studio_worker", "[{category}] {message}");
665    } else {
666        info!(target: "studio_worker", "[{category}] {message}");
667    }
668    logs.lock().push(entry);
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674    use crate::config::Config;
675    use crate::engine::SyntheticEngine;
676
677    #[test]
678    fn capabilities_advertises_all_synthetic_kinds() {
679        let cfg = Config::default();
680        let engine = SyntheticEngine::new(vec![]);
681        let cap = build_capabilities(&cfg, &engine);
682        assert_eq!(cap.engine, "synthetic");
683        assert_eq!(cap.task_kinds.len(), TaskKind::ALL.len());
684        for kind in TaskKind::ALL {
685            assert!(cap.supported_models_per_kind.contains_key(&kind));
686        }
687    }
688
689    #[test]
690    fn capabilities_uses_override_for_legacy_flat_list() {
691        let cfg = Config {
692            supported_models_override: vec!["only-this".into()],
693            ..Config::default()
694        };
695        let engine = SyntheticEngine::new(vec![]);
696        let cap = build_capabilities(&cfg, &engine);
697        assert_eq!(cap.supported_models, vec!["only-this".to_string()]);
698    }
699
700    #[test]
701    fn prompt_for_extracts_per_kind() {
702        let image = Task::Image(ImageParams {
703            prompt: "a stone golem".into(),
704            width: 512,
705            height: 512,
706            steps: 20,
707            seed: None,
708            ext: "webp".into(),
709        });
710        assert_eq!(prompt_for(&image), "a stone golem");
711
712        let llm = Task::Llm(LlmParams {
713            messages: vec![
714                ChatMessage {
715                    role: "system".into(),
716                    content: "be helpful".into(),
717                },
718                ChatMessage {
719                    role: "user".into(),
720                    content: "hi".into(),
721                },
722            ],
723            max_tokens: 32,
724            temperature: 0.5,
725        });
726        assert_eq!(prompt_for(&llm), "hi");
727
728        let llm_empty = Task::Llm(LlmParams {
729            messages: vec![],
730            max_tokens: 1,
731            temperature: 0.0,
732        });
733        assert_eq!(prompt_for(&llm_empty), "");
734
735        let stt = Task::AudioStt(AudioSttParams {
736            input_url: "https://example.com/clip.wav".into(),
737            language: None,
738        });
739        assert_eq!(prompt_for(&stt), "https://example.com/clip.wav");
740
741        let tts = Task::AudioTts(AudioTtsParams {
742            text: "hi there".into(),
743            voice: "v".into(),
744            ext: "wav".into(),
745        });
746        assert_eq!(prompt_for(&tts), "hi there");
747
748        let video = Task::Video(VideoParams {
749            prompt: "a tiny dragon".into(),
750            seconds: 1.0,
751            width: 256,
752            height: 256,
753            ext: "mp4".into(),
754        });
755        assert_eq!(prompt_for(&video), "a tiny dragon");
756    }
757
758    #[test]
759    fn is_unsupported_kind_matches_engine_message() {
760        let err = anyhow!("gradio engine cannot serve llm tasks");
761        assert!(is_unsupported_kind(&err));
762        let other = anyhow!("network timeout");
763        assert!(!is_unsupported_kind(&other));
764    }
765
766    #[test]
767    fn format_status_includes_every_field() {
768        let cfg = Config::default();
769        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
770        assert!(out.contains("config path:"));
771        assert!(out.contains("api_base_url:"));
772        assert!(out.contains("registration:"));
773        assert!(out.contains("not registered"));
774        assert!(out.contains("auto_update:"));
775        assert!(out.contains("update_interval:"));
776    }
777
778    #[test]
779    fn format_status_shows_worker_id_when_registered() {
780        let cfg = Config {
781            worker_id: Some("w-abc".into()),
782            auth_token: Some("tok".into()),
783            ..Config::default()
784        };
785        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
786        assert!(out.contains("w-abc"));
787        assert!(out.contains("approved"));
788    }
789
790    #[test]
791    fn format_status_shows_pending_request_id() {
792        let cfg = Config {
793            registration_request_id: Some("rr-7".into()),
794            ..Config::default()
795        };
796        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
797        assert!(out.contains("pending operator approval"));
798        assert!(out.contains("rr-7"));
799    }
800
801    #[test]
802    fn format_status_shows_label_when_set() {
803        let cfg = Config {
804            label: Some("alice's rig".into()),
805            ..Config::default()
806        };
807        let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
808        assert!(out.contains("alice's rig"));
809    }
810
811    #[test]
812    fn format_check_outcome_handles_both_branches() {
813        let up = update::CheckOutcome::UpToDate {
814            current: semver::Version::new(1, 2, 3),
815        };
816        assert!(format_check_outcome(&up).contains("up to date"));
817        let newer = update::CheckOutcome::NewerAvailable {
818            current: semver::Version::new(1, 2, 3),
819            latest: semver::Version::new(1, 3, 0),
820        };
821        let s = format_check_outcome(&newer);
822        assert!(s.contains("1.2.3 -> 1.3.0"));
823    }
824
825    #[test]
826    fn push_log_appends_an_entry() {
827        let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
828        push_log(&logs, "info", "test", "hi", None);
829        push_log(&logs, "warn", "test", "wat", Some("j-1".into()));
830        push_log(&logs, "error", "test", "boom", None);
831        let v = logs.lock();
832        assert_eq!(v.len(), 3);
833        assert_eq!(v[0].level, "info");
834        assert_eq!(v[1].level, "warn");
835        assert_eq!(v[1].job_id.as_deref(), Some("j-1"));
836        assert_eq!(v[2].level, "error");
837    }
838
839    // --- async tick tests ---
840
841    #[tokio::test]
842    async fn auto_update_tick_disabled_when_flag_off() {
843        let cfg = Config {
844            auto_update_enabled: false,
845            ..Config::default()
846        };
847        let logs = Arc::new(Mutex::new(Vec::new()));
848        let decision = auto_update_tick(&cfg, false, &logs).await;
849        assert_eq!(decision, AutoUpdateDecision::Disabled);
850    }
851
852    #[tokio::test]
853    async fn auto_update_tick_skipped_when_busy() {
854        let cfg = Config {
855            auto_update_enabled: true,
856            ..Config::default()
857        };
858        let logs = Arc::new(Mutex::new(Vec::new()));
859        let decision = auto_update_tick(&cfg, true, &logs).await;
860        assert_eq!(decision, AutoUpdateDecision::SkippedBusy);
861        let entries = logs.lock();
862        assert!(entries.iter().any(|e| e.message.contains("busy on a job")));
863    }
864}