1use crate::{
9 config::{self, Config, SharedConfig},
10 engine::Engine,
11 sys,
12 types::*,
13 update, AGENT_VERSION,
14};
15use anyhow::{anyhow, Result};
16use chrono::{DateTime, SecondsFormat, Utc};
17use parking_lot::Mutex;
18use std::{
19 collections::VecDeque,
20 sync::{
21 atomic::{AtomicBool, Ordering},
22 Arc,
23 },
24 time::Duration,
25};
26use tracing::{info, warn};
27
28const TRACE_TARGET: &str = "studio_worker::runtime";
31
32pub const RECENT_JOBS_CAP: usize = 50;
35
36pub const RECENT_LOGS_CAP: usize = 1000;
40
41pub const PROMPT_PREVIEW_CHARS: usize = 200;
45
46#[derive(Debug, Clone)]
49pub struct CurrentJob {
50 pub job_id: String,
51 pub kind: TaskKind,
52 pub model: String,
53 pub prompt: String,
54 pub started_at: DateTime<Utc>,
55}
56
57#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum JobOutcome {
61 Completed,
62 Failed { reason: String },
63}
64
65#[derive(Debug, Clone)]
67pub struct RecentJob {
68 pub job_id: String,
69 pub kind: TaskKind,
70 pub model: String,
71 pub prompt: String,
72 pub outcome: JobOutcome,
73 pub started_at: DateTime<Utc>,
74 pub finished_at: DateTime<Utc>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
79pub enum HeartbeatOutcome {
80 Ok,
81 Err { reason: String },
82}
83
84#[derive(Debug, Clone)]
85pub struct HeartbeatStatus {
86 pub last_attempt_at: DateTime<Utc>,
87 pub outcome: HeartbeatOutcome,
88}
89
90#[derive(Clone, Default)]
95pub struct WorkerObservers {
96 pub current_job: Arc<Mutex<Option<CurrentJob>>>,
97 pub recent_jobs: Arc<Mutex<VecDeque<RecentJob>>>,
98 pub last_heartbeat: Arc<Mutex<Option<HeartbeatStatus>>>,
99 pub recent_logs: Arc<Mutex<VecDeque<LogEntry>>>,
104}
105
106pub fn truncate_prompt(s: &str) -> String {
107 if s.chars().count() <= PROMPT_PREVIEW_CHARS {
108 return s.to_string();
109 }
110 let mut out: String = s.chars().take(PROMPT_PREVIEW_CHARS).collect();
111 out.push('…');
112 out
113}
114
115pub fn record_recent_job(observers: &WorkerObservers, entry: RecentJob) {
116 let mut ring = observers.recent_jobs.lock();
117 ring.push_front(entry);
118 while ring.len() > RECENT_JOBS_CAP {
119 ring.pop_back();
120 }
121}
122
123#[doc(hidden)]
127pub fn push_recent_job_for_tests(observers: &WorkerObservers, job_id: &str) {
128 let now = Utc::now();
129 record_recent_job(
130 observers,
131 RecentJob {
132 job_id: job_id.to_string(),
133 kind: TaskKind::Image,
134 model: "synthetic".into(),
135 prompt: String::new(),
136 outcome: JobOutcome::Completed,
137 started_at: now,
138 finished_at: now,
139 },
140 );
141}
142
143pub const AUTO_UPDATE_TICK: Duration = Duration::from_secs(60);
144pub const AUTO_UPDATE_SHUTDOWN_TICK: Duration = Duration::from_millis(250);
150pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
154
155#[derive(Debug, Clone, Copy)]
157pub struct LoopSchedule {
158 pub ws_session: crate::ws::session::SessionSchedule,
159 pub auto_update_tick: Duration,
160 pub shutdown_tick: Duration,
164}
165
166impl Default for LoopSchedule {
167 fn default() -> Self {
168 Self {
169 ws_session: crate::ws::session::SessionSchedule::default(),
170 auto_update_tick: AUTO_UPDATE_TICK,
171 shutdown_tick: AUTO_UPDATE_SHUTDOWN_TICK,
172 }
173 }
174}
175
176impl LoopSchedule {
177 pub fn fast_for_tests() -> Self {
180 Self {
181 ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
182 auto_update_tick: Duration::from_millis(1),
183 shutdown_tick: Duration::from_millis(1),
184 }
185 }
186}
187
188#[derive(Debug, Clone, Default)]
194pub struct RegisterArgs {
195 pub api_base_url: Option<String>,
196 pub reset: bool,
197}
198
199pub async fn register(config_path: Option<&str>, args: RegisterArgs) -> Result<()> {
203 let (mut cfg, path) = config::load(config_path)?;
204
205 if args.reset {
206 cfg.worker_id = None;
207 cfg.auth_token = None;
208 cfg.registration_request_id = None;
209 cfg.registration_secret = None;
210 cfg.install_id = None;
211 }
212 if let Some(url) = args.api_base_url {
213 cfg.api_base_url = url;
214 }
215
216 config::save(&cfg, &path)?;
217 if args.reset {
218 info!(
219 config_path = %path.display(),
220 "local registration state cleared; next launch will auto-register"
221 );
222 println!(
223 "local registration state cleared; run `studio-worker run` or \
224 `studio-worker ui` to auto-register"
225 );
226 } else {
227 info!(
228 config_path = %path.display(),
229 "register flags persisted; next launch will auto-register"
230 );
231 println!(
232 "saved; run `studio-worker run` or `studio-worker ui` to auto-register against {}",
233 cfg.api_base_url
234 );
235 }
236 Ok(())
237}
238
239pub async fn status(config_path: Option<&str>) -> Result<()> {
240 let (cfg, path) = config::load(config_path)?;
241 println!("{}", format_status(&cfg, &path));
242 Ok(())
243}
244
245pub fn format_status(cfg: &Config, path: &std::path::Path) -> String {
246 let mut out = String::new();
247 use std::fmt::Write as _;
248 let _ = writeln!(out, "config path: {}", path.display());
249 let _ = writeln!(out, "api_base_url: {}", cfg.api_base_url);
250 let registration_line = if cfg.worker_id.is_some() && cfg.auth_token.is_some() {
251 format!("approved as {}", cfg.worker_id.as_deref().unwrap_or(""))
252 } else if let Some(rid) = cfg.registration_request_id.as_deref() {
253 format!("pending operator approval (request {rid})")
254 } else {
255 "not registered (will auto-register on next launch)".into()
256 };
257 let _ = writeln!(out, "registration: {registration_line}");
258 let _ = writeln!(out, "vram_threshold_gb: {}", cfg.vram_threshold_gb);
259 let _ = writeln!(out, "auto_start: {}", cfg.auto_start);
260 let _ = writeln!(out, "models_root: {}", cfg.models_root.display());
261 let _ = writeln!(out, "auto_update: {}", cfg.auto_update_enabled);
262 let _ = writeln!(
263 out,
264 "update_interval: {}s",
265 cfg.auto_update_interval_secs
266 );
267 out
268}
269
270pub fn set_threshold(config_path: Option<&str>, gb: f32) -> Result<()> {
271 if gb < 0.0 {
272 return Err(anyhow!("threshold must be >= 0"));
273 }
274 let (mut cfg, path) = config::load(config_path)?;
275 cfg.vram_threshold_gb = gb;
276 config::save(&cfg, &path)?;
277 info!(
278 target: TRACE_TARGET,
279 op = "set_threshold",
280 vram_threshold_gb = gb,
281 config_path = path.display().to_string(),
282 "VRAM threshold persisted"
283 );
284 println!("vram_threshold_gb = {gb}");
285 Ok(())
286}
287
288pub fn log_startup_banner(cfg: &Config, path: &std::path::Path) {
293 info!(
294 target: TRACE_TARGET,
295 op = "startup",
296 version = AGENT_VERSION,
297 config_path = path.display().to_string(),
298 api_base_url = cfg.api_base_url.as_str(),
299 vram_threshold_gb = cfg.vram_threshold_gb,
300 auto_start = cfg.auto_start,
301 auto_update_enabled = cfg.auto_update_enabled,
302 auto_update_interval_secs = cfg.auto_update_interval_secs,
303 models_root = cfg.models_root.display().to_string(),
304 worker_id = cfg.worker_id.as_deref().unwrap_or("(unregistered)"),
305 "studio-worker booting"
306 );
307}
308
309pub fn show_config(config_path: Option<&str>) -> Result<()> {
310 let (cfg, path) = config::load(config_path)?;
311 println!("# {}", path.display());
312 print!("{}", toml::to_string_pretty(&cfg)?);
313 Ok(())
314}
315
316pub async fn check_update(config_path: Option<&str>) -> Result<()> {
317 let (cfg, _) = config::load(config_path)?;
318 let current = semver::Version::parse(AGENT_VERSION)
319 .map_err(|e| anyhow!("invalid current version {AGENT_VERSION}: {e}"))?;
320 let outcome = tokio::task::spawn_blocking(move || {
321 update::check(&cfg.auto_update_feed, ¤t, cfg.auto_update_prerelease)
322 })
323 .await??;
324 println!("{}", format_check_outcome(&outcome));
325 Ok(())
326}
327
328pub fn format_check_outcome(outcome: &update::CheckOutcome) -> String {
329 match outcome {
330 update::CheckOutcome::UpToDate { current } => format!("up to date: {current}"),
331 update::CheckOutcome::NewerAvailable { current, latest } => {
332 format!("update available: {current} -> {latest}")
333 }
334 }
335}
336
337pub async fn run(config_path: Option<&str>) -> Result<()> {
342 let (cfg, path) = config::load(config_path)?;
343 log_startup_banner(&cfg, &path);
344
345 let cfg = config::shared(cfg);
346 let stop = Arc::new(AtomicBool::new(false));
347 let busy = Arc::new(AtomicBool::new(false));
348 let paused = Arc::new(AtomicBool::new(false));
351 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
352 let observers = WorkerObservers::default();
353 let registration = crate::auto_register::shared_initial();
354
355 let stop_clone = stop.clone();
356 tokio::spawn(async move {
357 let signal = wait_for_shutdown_signal().await;
358 request_shutdown(&stop_clone, signal);
359 });
360
361 ensure_registered(&cfg, &path, ®istration, &stop).await?;
364
365 run_loops(
366 cfg,
367 stop,
368 logs,
369 busy,
370 paused,
371 observers,
372 LoopSchedule::default(),
373 )
374 .await
375}
376
377pub fn request_shutdown(stop: &AtomicBool, signal: &str) {
383 let already_stopping = stop.swap(true, Ordering::SeqCst);
384 info!(
385 target: TRACE_TARGET,
386 op = "shutdown",
387 signal,
388 already_stopping,
389 "shutdown signal received; stopping worker gracefully"
390 );
391}
392
393#[cfg_attr(coverage_nightly, coverage(off))]
409async fn wait_for_shutdown_signal() -> &'static str {
410 #[cfg(unix)]
411 {
412 use tokio::signal::unix::{signal, SignalKind};
413 let mut sigterm = match signal(SignalKind::terminate()) {
414 Ok(s) => s,
415 Err(e) => {
416 warn!(
417 target: TRACE_TARGET,
418 op = "shutdown",
419 error = %e,
420 "could not install SIGTERM handler; falling back to Ctrl-C only"
421 );
422 let _ = tokio::signal::ctrl_c().await;
423 return "SIGINT";
424 }
425 };
426 tokio::select! {
427 _ = tokio::signal::ctrl_c() => "SIGINT",
428 _ = sigterm.recv() => "SIGTERM",
429 }
430 }
431 #[cfg(not(unix))]
432 {
433 let _ = tokio::signal::ctrl_c().await;
434 "ctrl-c"
435 }
436}
437
438pub async fn ensure_registered(
441 cfg: &SharedConfig,
442 path: &std::path::Path,
443 registration: &crate::auto_register::SharedRegistration,
444 stop: &Arc<AtomicBool>,
445) -> Result<()> {
446 use std::time::Duration;
447 loop {
448 if stop.load(Ordering::SeqCst) {
449 return Err(anyhow!("shutdown before registration completed"));
450 }
451 {
452 let snap = cfg.lock();
453 if snap.worker_id.is_some() && snap.auth_token.is_some() {
454 return Ok(());
455 }
456 }
457 let state = crate::auto_register::tick(cfg, path, registration).await;
458 match state {
459 crate::auto_register::RegistrationState::Approved => return Ok(()),
460 crate::auto_register::RegistrationState::Rejected { reason } => {
461 return Err(anyhow!(
462 "registration rejected by the studio operator: {reason}. \
463 Run `studio-worker register --reset` to clear local state \
464 and submit a fresh request."
465 ));
466 }
467 _ => {}
468 }
469 for _ in 0..30 {
471 if stop.load(Ordering::SeqCst) {
472 return Err(anyhow!("shutdown during registration wait"));
473 }
474 tokio::time::sleep(Duration::from_secs(1)).await;
475 }
476 }
477}
478
479pub async fn run_loops(
487 cfg: SharedConfig,
488 stop: Arc<AtomicBool>,
489 logs: Arc<Mutex<Vec<LogEntry>>>,
490 busy: Arc<AtomicBool>,
491 paused: Arc<AtomicBool>,
492 observers: WorkerObservers,
493 schedule: LoopSchedule,
494) -> Result<()> {
495 let session = crate::ws::session::spawn_ws_session(
496 cfg.clone(),
497 stop.clone(),
498 logs.clone(),
499 busy.clone(),
500 paused.clone(),
501 observers.clone(),
502 schedule.ws_session,
503 );
504 let auto_updater = spawn_auto_updater(
505 cfg.clone(),
506 stop.clone(),
507 logs.clone(),
508 busy.clone(),
509 schedule,
510 );
511 let (session_result, _) = tokio::join!(session, auto_updater);
512 session_result
513}
514
515#[derive(Debug, Clone, PartialEq, Eq)]
526pub enum AutoUpdateDecision {
527 Disabled,
529 SkippedBusy,
531 UpToDate,
533 CheckError(String),
535 Updated,
537 UpdateError(String),
539}
540
541pub async fn auto_update_tick(
542 cfg: &Config,
543 busy: bool,
544 logs: &Arc<Mutex<Vec<LogEntry>>>,
545) -> AutoUpdateDecision {
546 if !cfg.auto_update_enabled {
547 return AutoUpdateDecision::Disabled;
548 }
549 if busy {
550 push_log(
551 logs,
552 "info",
553 "auto-update",
554 "skipping check: worker is busy on a job",
555 None,
556 );
557 return AutoUpdateDecision::SkippedBusy;
558 }
559 let feed = cfg.auto_update_feed.clone();
560 let prerelease = cfg.auto_update_prerelease;
561 let logs_for_task = logs.clone();
562 let outcome = tokio::task::spawn_blocking(move || -> Result<AutoUpdateDecision> {
563 let current = semver::Version::parse(AGENT_VERSION)
564 .map_err(|e| anyhow!("invalid AGENT_VERSION {AGENT_VERSION}: {e}"))?;
565 match update::check(&feed, ¤t, prerelease) {
566 Ok(update::CheckOutcome::UpToDate { current }) => {
567 push_log(
568 &logs_for_task,
569 "info",
570 "auto-update",
571 &format!("up to date at {current}"),
572 None,
573 );
574 Ok(AutoUpdateDecision::UpToDate)
575 }
576 Ok(update::CheckOutcome::NewerAvailable { current, latest }) => {
577 push_log(
578 &logs_for_task,
579 "info",
580 "auto-update",
581 &format!("update available {current} -> {latest}; applying"),
582 None,
583 );
584 match update::apply(&feed, &latest) {
585 Ok(()) => {
586 push_log(
587 &logs_for_task,
588 "info",
589 "auto-update",
590 "binary replaced; restart pending",
591 None,
592 );
593 Ok(AutoUpdateDecision::Updated)
594 }
595 Err(e) => {
596 push_log(
597 &logs_for_task,
598 "error",
599 "auto-update",
600 &format!("update failed: {e}"),
601 None,
602 );
603 Ok(AutoUpdateDecision::UpdateError(e.to_string()))
604 }
605 }
606 }
607 Err(e) => {
608 push_log(
609 &logs_for_task,
610 "warn",
611 "auto-update",
612 &format!("check failed: {e}"),
613 None,
614 );
615 Ok(AutoUpdateDecision::CheckError(e.to_string()))
616 }
617 }
618 })
619 .await;
620 match outcome {
621 Ok(Ok(decision)) => decision,
622 Ok(Err(e)) => AutoUpdateDecision::CheckError(e.to_string()),
623 Err(e) => AutoUpdateDecision::CheckError(e.to_string()),
624 }
625}
626
627pub(crate) async fn wait_with_stop(total: Duration, stop: &Arc<AtomicBool>, tick: Duration) {
642 let mut elapsed = Duration::ZERO;
643 while elapsed < total {
644 if stop.load(Ordering::SeqCst) {
645 return;
646 }
647 let next = tick.min(total - elapsed);
648 tokio::time::sleep(next).await;
649 elapsed += next;
650 }
651}
652
653pub fn spawn_auto_updater(
654 cfg: SharedConfig,
655 stop: Arc<AtomicBool>,
656 logs: Arc<Mutex<Vec<LogEntry>>>,
657 busy: Arc<AtomicBool>,
658 schedule: LoopSchedule,
659) -> tokio::task::JoinHandle<()> {
660 tokio::spawn(async move {
661 let mut elapsed = Duration::from_secs(0);
662 while !stop.load(Ordering::SeqCst) {
663 wait_with_stop(schedule.auto_update_tick, &stop, schedule.shutdown_tick).await;
668 if stop.load(Ordering::SeqCst) {
669 break;
670 }
671 elapsed += schedule.auto_update_tick;
672 let snapshot = cfg.lock().clone();
673 if elapsed < Duration::from_secs(snapshot.auto_update_interval_secs) {
674 continue;
675 }
676 elapsed = Duration::from_secs(0);
677 let busy_now = busy.load(Ordering::SeqCst);
678 let decision = auto_update_tick(&snapshot, busy_now, &logs).await;
679 if matches!(decision, AutoUpdateDecision::Updated) {
680 stop.store(true, Ordering::SeqCst);
681 update::restart_self();
682 }
683 }
684 })
685}
686
687pub fn prompt_for(task: &Task) -> String {
691 match task {
692 Task::Image(p) => p.prompt.clone(),
693 Task::Llm(p) => p
694 .messages
695 .last()
696 .map(|m| m.content.clone())
697 .unwrap_or_default(),
698 Task::AudioStt(p) => p.input_url.clone(),
699 Task::AudioTts(p) => p.text.clone(),
700 Task::Video(p) => p.prompt.clone(),
701 }
702}
703
704pub fn is_unsupported_kind(e: &anyhow::Error) -> bool {
705 e.to_string().contains("cannot serve")
706}
707
708pub fn build_capabilities(cfg: &Config, engine: &dyn Engine) -> WorkerCapabilities {
713 build_capabilities_with(cfg, engine, true)
714}
715
716pub fn build_capabilities_with(
721 cfg: &Config,
722 engine: &dyn Engine,
723 auto_enabled: bool,
724) -> WorkerCapabilities {
725 let vram = sys::detect_vram_gb().unwrap_or(0.0);
726 let caps = engine.capabilities();
727 let supported_models_per_kind = caps.supported_models_per_kind.clone();
728 let task_kinds = caps.kinds();
729 let supported_models = {
733 let mut all = caps.flat_models();
734 all.sort();
735 all.dedup();
736 all
737 };
738
739 WorkerCapabilities {
740 machine_name: sys::machine_name(),
741 username: sys::username(),
742 agent_version: AGENT_VERSION.to_string(),
743 engine: engine.name().to_string(),
744 vram_total_gb: vram,
745 vram_threshold_gb: cfg.vram_threshold_gb,
746 auto_enabled,
747 auto_start: cfg.auto_start,
748 supported_models,
749 task_kinds,
750 supported_models_per_kind,
751 }
752}
753
754pub fn summarize_capabilities(caps: &WorkerCapabilities) -> String {
764 let kinds = caps
765 .task_kinds
766 .iter()
767 .map(|k| k.as_str())
768 .collect::<Vec<_>>()
769 .join(", ");
770 format!(
771 "advertising engine={}, vram={:.1}/{:.1}GB threshold, auto_enabled={}, \
772 kinds=[{}], {} model(s)=[{}]",
773 caps.engine,
774 caps.vram_total_gb,
775 caps.vram_threshold_gb,
776 caps.auto_enabled,
777 kinds,
778 caps.supported_models.len(),
779 caps.supported_models.join(", "),
780 )
781}
782
783pub fn push_log(
784 logs: &Arc<Mutex<Vec<LogEntry>>>,
785 level: &str,
786 category: &str,
787 message: &str,
788 job_id: Option<String>,
789) {
790 push_log_with_observers(logs, None, level, category, message, job_id);
791}
792
793pub fn push_log_with_observers(
799 logs: &Arc<Mutex<Vec<LogEntry>>>,
800 observers: Option<&WorkerObservers>,
801 level: &str,
802 category: &str,
803 message: &str,
804 job_id: Option<String>,
805) {
806 let entry = LogEntry {
807 ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
808 level: level.to_string(),
809 category: category.to_string(),
810 message: message.to_string(),
811 job_id,
812 };
813 let job_id = entry.job_id.as_deref();
818 if level == "error" {
819 tracing::error!(target: "studio_worker", job_id, "[{category}] {message}");
820 } else if level == "warn" {
821 tracing::warn!(target: "studio_worker", job_id, "[{category}] {message}");
822 } else {
823 info!(target: "studio_worker", job_id, "[{category}] {message}");
824 }
825 logs.lock().push(entry.clone());
826 if let Some(o) = observers {
827 let mut ring = o.recent_logs.lock();
828 ring.push_back(entry);
829 while ring.len() > RECENT_LOGS_CAP {
830 ring.pop_front();
831 }
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838 use crate::config::Config;
839 use crate::engine::SyntheticEngine;
840
841 #[test]
842 fn capabilities_advertises_all_synthetic_kinds() {
843 let cfg = Config::default();
844 let engine = SyntheticEngine::new();
845 let cap = build_capabilities(&cfg, &engine);
846 assert_eq!(cap.engine, "synthetic");
847 assert_eq!(cap.task_kinds.len(), TaskKind::ALL.len());
848 assert!(cap.auto_enabled, "default capability snapshot is unpaused");
849 for kind in TaskKind::ALL {
850 assert!(cap.supported_models_per_kind.contains_key(&kind));
851 }
852 }
853
854 #[test]
855 fn capabilities_with_paused_flag_drives_auto_enabled() {
856 let cfg = Config::default();
857 let engine = SyntheticEngine::new();
858 let paused_caps = build_capabilities_with(&cfg, &engine, false);
859 assert!(!paused_caps.auto_enabled);
860 }
861
862 #[test]
863 fn summarize_capabilities_lists_engine_kinds_models_vram_and_pause_state() {
864 let cfg = Config {
865 vram_threshold_gb: 6.0,
866 ..Config::default()
867 };
868 let engine = SyntheticEngine::new();
869 let caps = build_capabilities_with(&cfg, &engine, true);
870 let summary = summarize_capabilities(&caps);
871 assert!(summary.contains("engine=synthetic"), "got: {summary}");
873 for kind in &caps.task_kinds {
874 assert!(
875 summary.contains(kind.as_str()),
876 "missing kind {} in: {summary}",
877 kind.as_str()
878 );
879 }
880 assert!(
882 summary.contains(&format!("{} model(s)", caps.supported_models.len())),
883 "missing model count in: {summary}"
884 );
885 assert!(
886 summary.contains("synthetic"),
887 "missing model id in: {summary}"
888 );
889 assert!(
891 summary.contains("6.0"),
892 "missing vram threshold in: {summary}"
893 );
894 assert!(summary.contains("auto_enabled=true"), "got: {summary}");
895 }
896
897 #[test]
898 fn summarize_capabilities_reflects_paused_state() {
899 let cfg = Config::default();
900 let engine = SyntheticEngine::new();
901 let caps = build_capabilities_with(&cfg, &engine, false);
902 assert!(
903 summarize_capabilities(&caps).contains("auto_enabled=false"),
904 "paused worker must advertise auto_enabled=false"
905 );
906 }
907
908 #[test]
909 fn prompt_for_extracts_per_kind() {
910 let image = Task::Image(ImageParams {
911 prompt: "a stone golem".into(),
912 ..Default::default()
913 });
914 assert_eq!(prompt_for(&image), "a stone golem");
915
916 let llm = Task::Llm(LlmParams {
917 messages: vec![
918 ChatMessage {
919 role: "system".into(),
920 content: "be helpful".into(),
921 },
922 ChatMessage {
923 role: "user".into(),
924 content: "hi".into(),
925 },
926 ],
927 max_tokens: 32,
928 temperature: 0.5,
929 ..Default::default()
930 });
931 assert_eq!(prompt_for(&llm), "hi");
932
933 let llm_empty = Task::Llm(LlmParams {
934 messages: vec![],
935 ..Default::default()
936 });
937 assert_eq!(prompt_for(&llm_empty), "");
938
939 let stt = Task::AudioStt(AudioSttParams {
940 input_url: "https://example.com/clip.wav".into(),
941 ..Default::default()
942 });
943 assert_eq!(prompt_for(&stt), "https://example.com/clip.wav");
944
945 let tts = Task::AudioTts(AudioTtsParams {
946 text: "hi there".into(),
947 voice: "v".into(),
948 ext: "wav".into(),
949 ..Default::default()
950 });
951 assert_eq!(prompt_for(&tts), "hi there");
952
953 let video = Task::Video(VideoParams {
954 prompt: "a tiny dragon".into(),
955 seconds: 1.0,
956 width: 256,
957 height: 256,
958 ext: "mp4".into(),
959 ..Default::default()
960 });
961 assert_eq!(prompt_for(&video), "a tiny dragon");
962 }
963
964 #[test]
965 fn is_unsupported_kind_matches_engine_message() {
966 let err = anyhow!("multi engine cannot serve llm tasks");
967 assert!(is_unsupported_kind(&err));
968 let other = anyhow!("network timeout");
969 assert!(!is_unsupported_kind(&other));
970 }
971
972 #[test]
973 fn format_status_includes_every_field() {
974 let cfg = Config::default();
975 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
976 assert!(out.contains("config path:"));
977 assert!(out.contains("api_base_url:"));
978 assert!(out.contains("registration:"));
979 assert!(out.contains("not registered"));
980 assert!(out.contains("models_root:"));
981 assert!(out.contains("auto_update:"));
982 assert!(out.contains("update_interval:"));
983 }
984
985 #[test]
986 fn format_status_shows_worker_id_when_registered() {
987 let cfg = Config {
988 worker_id: Some("w-abc".into()),
989 auth_token: Some("tok".into()),
990 ..Config::default()
991 };
992 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
993 assert!(out.contains("w-abc"));
994 assert!(out.contains("approved"));
995 }
996
997 #[test]
998 fn format_status_shows_pending_request_id() {
999 let cfg = Config {
1000 registration_request_id: Some("rr-7".into()),
1001 ..Config::default()
1002 };
1003 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1004 assert!(out.contains("pending operator approval"));
1005 assert!(out.contains("rr-7"));
1006 }
1007
1008 #[test]
1009 fn format_check_outcome_handles_both_branches() {
1010 let up = update::CheckOutcome::UpToDate {
1011 current: semver::Version::new(1, 2, 3),
1012 };
1013 assert!(format_check_outcome(&up).contains("up to date"));
1014 let newer = update::CheckOutcome::NewerAvailable {
1015 current: semver::Version::new(1, 2, 3),
1016 latest: semver::Version::new(1, 3, 0),
1017 };
1018 let s = format_check_outcome(&newer);
1019 assert!(s.contains("1.2.3 -> 1.3.0"));
1020 }
1021
1022 #[test]
1023 fn push_log_appends_an_entry() {
1024 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1025 push_log(&logs, "info", "test", "hi", None);
1026 push_log(&logs, "warn", "test", "wat", Some("j-1".into()));
1027 push_log(&logs, "error", "test", "boom", None);
1028 let v = logs.lock();
1029 assert_eq!(v.len(), 3);
1030 assert_eq!(v[0].level, "info");
1031 assert_eq!(v[1].level, "warn");
1032 assert_eq!(v[1].job_id.as_deref(), Some("j-1"));
1033 assert_eq!(v[2].level, "error");
1034 }
1035
1036 #[test]
1037 fn push_log_emits_job_id_as_a_structured_tracing_field() {
1038 use crate::test_support::capture;
1043 let logs = capture(|| {
1044 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1045 push_log(
1046 &logs,
1047 "info",
1048 "ws",
1049 "binary upload ok",
1050 Some("job-42".into()),
1051 );
1052 });
1053 assert!(
1054 logs.contains("job_id=\"job-42\""),
1055 "expected structured job_id field, got: {logs}"
1056 );
1057 assert!(
1058 logs.contains("[ws] binary upload ok"),
1059 "expected the human-readable message to survive, got: {logs}"
1060 );
1061 }
1062
1063 #[test]
1064 fn push_log_omits_job_id_field_when_absent() {
1065 use crate::test_support::capture;
1068 let logs = capture(|| {
1069 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1070 push_log(&logs, "info", "auto-update", "up to date", None);
1071 });
1072 assert!(
1073 !logs.contains("job_id"),
1074 "expected no job_id field for a jobless log, got: {logs}"
1075 );
1076 }
1077
1078 #[test]
1081 fn request_shutdown_sets_the_stop_flag() {
1082 let stop = AtomicBool::new(false);
1083 request_shutdown(&stop, "SIGTERM");
1084 assert!(stop.load(Ordering::SeqCst));
1085 }
1086
1087 #[test]
1088 fn request_shutdown_reconfirms_when_already_stopping() {
1089 let stop = AtomicBool::new(true);
1092 request_shutdown(&stop, "SIGINT");
1093 assert!(stop.load(Ordering::SeqCst));
1094 }
1095
1096 #[test]
1097 fn request_shutdown_emits_a_named_shutdown_breadcrumb() {
1098 use crate::test_support::capture;
1099 let logs = capture(|| {
1100 let stop = AtomicBool::new(false);
1101 request_shutdown(&stop, "SIGTERM");
1102 });
1103 assert!(logs.contains("INFO"), "expected INFO event, got: {logs}");
1104 assert!(
1105 logs.contains("studio_worker::runtime"),
1106 "expected runtime target, got: {logs}"
1107 );
1108 assert!(
1109 logs.contains("op=\"shutdown\""),
1110 "expected op field, got: {logs}"
1111 );
1112 assert!(
1113 logs.contains("signal=\"SIGTERM\""),
1114 "expected signal field, got: {logs}"
1115 );
1116 }
1117
1118 #[tokio::test]
1119 async fn auto_update_tick_disabled_when_flag_off() {
1120 let cfg = Config {
1121 auto_update_enabled: false,
1122 ..Config::default()
1123 };
1124 let logs = Arc::new(Mutex::new(Vec::new()));
1125 let decision = auto_update_tick(&cfg, false, &logs).await;
1126 assert_eq!(decision, AutoUpdateDecision::Disabled);
1127 }
1128
1129 #[tokio::test]
1130 async fn auto_update_tick_skipped_when_busy() {
1131 let cfg = Config {
1132 auto_update_enabled: true,
1133 ..Config::default()
1134 };
1135 let logs = Arc::new(Mutex::new(Vec::new()));
1136 let decision = auto_update_tick(&cfg, true, &logs).await;
1137 assert_eq!(decision, AutoUpdateDecision::SkippedBusy);
1138 let entries = logs.lock();
1139 assert!(entries.iter().any(|e| e.message.contains("busy on a job")));
1140 }
1141
1142 #[tokio::test]
1143 async fn wait_with_stop_short_circuits_when_already_stopped() {
1144 let stop = Arc::new(AtomicBool::new(true));
1145 let start = std::time::Instant::now();
1146 wait_with_stop(Duration::from_secs(60), &stop, Duration::from_millis(10)).await;
1147 assert!(
1148 start.elapsed() < Duration::from_millis(100),
1149 "an already-set stop must return without sleeping the full duration"
1150 );
1151 }
1152
1153 #[tokio::test]
1154 async fn auto_updater_stops_promptly_during_idle_wait() {
1155 let cfg = crate::config::shared(Config {
1161 auto_update_enabled: false,
1162 ..Config::default()
1163 });
1164 let stop = Arc::new(AtomicBool::new(false));
1165 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1166 let busy = Arc::new(AtomicBool::new(false));
1167 let schedule = LoopSchedule {
1168 ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
1169 auto_update_tick: Duration::from_secs(3600),
1170 shutdown_tick: Duration::from_millis(1),
1171 };
1172 let handle = spawn_auto_updater(cfg, stop.clone(), logs, busy, schedule);
1173 tokio::time::sleep(Duration::from_millis(10)).await;
1175 stop.store(true, Ordering::SeqCst);
1176 tokio::time::timeout(Duration::from_millis(250), handle)
1177 .await
1178 .expect("auto-updater did not observe stop promptly")
1179 .expect("auto-updater task panicked");
1180 }
1181}