1use crate::{
9 config::{self, Config, SharedConfig},
10 engine::Engine,
11 sys,
12 types::*,
13 update, AGENT_VERSION,
14};
15use anyhow::{anyhow, Result};
16use chrono::{DateTime, SecondsFormat, Utc};
17use parking_lot::Mutex;
18use std::{
19 collections::VecDeque,
20 sync::{
21 atomic::{AtomicBool, Ordering},
22 Arc,
23 },
24 time::Duration,
25};
26use tracing::{info, warn};
27
28const TRACE_TARGET: &str = "studio_worker::runtime";
31
32pub const RECENT_JOBS_CAP: usize = 50;
35
36pub const RECENT_LOGS_CAP: usize = 1000;
40
41pub const PROMPT_PREVIEW_CHARS: usize = 200;
45
46pub const LOG_SHIP_QUEUE_CAP: usize = 5_000;
53
54#[derive(Debug, Clone)]
57pub struct CurrentJob {
58 pub job_id: String,
59 pub kind: TaskKind,
60 pub model: String,
61 pub prompt: String,
62 pub started_at: DateTime<Utc>,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
68pub enum JobOutcome {
69 Completed,
70 Failed { reason: String },
71}
72
73#[derive(Debug, Clone)]
75pub struct RecentJob {
76 pub job_id: String,
77 pub kind: TaskKind,
78 pub model: String,
79 pub prompt: String,
80 pub outcome: JobOutcome,
81 pub started_at: DateTime<Utc>,
82 pub finished_at: DateTime<Utc>,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
87pub enum HeartbeatOutcome {
88 Ok,
89 Err { reason: String },
90}
91
92#[derive(Debug, Clone)]
93pub struct HeartbeatStatus {
94 pub last_attempt_at: DateTime<Utc>,
95 pub outcome: HeartbeatOutcome,
96}
97
98#[derive(Clone, Default)]
103pub struct WorkerObservers {
104 pub current_job: Arc<Mutex<Option<CurrentJob>>>,
105 pub recent_jobs: Arc<Mutex<VecDeque<RecentJob>>>,
106 pub last_heartbeat: Arc<Mutex<Option<HeartbeatStatus>>>,
107 pub recent_logs: Arc<Mutex<VecDeque<LogEntry>>>,
112}
113
114pub fn truncate_prompt(s: &str) -> String {
115 if s.chars().count() <= PROMPT_PREVIEW_CHARS {
116 return s.to_string();
117 }
118 let mut out: String = s.chars().take(PROMPT_PREVIEW_CHARS).collect();
119 out.push('…');
120 out
121}
122
123pub fn record_recent_job(observers: &WorkerObservers, entry: RecentJob) {
124 let mut ring = observers.recent_jobs.lock();
125 ring.push_front(entry);
126 while ring.len() > RECENT_JOBS_CAP {
127 ring.pop_back();
128 }
129}
130
131#[doc(hidden)]
135pub fn push_recent_job_for_tests(observers: &WorkerObservers, job_id: &str) {
136 let now = Utc::now();
137 record_recent_job(
138 observers,
139 RecentJob {
140 job_id: job_id.to_string(),
141 kind: TaskKind::Image,
142 model: "synthetic".into(),
143 prompt: String::new(),
144 outcome: JobOutcome::Completed,
145 started_at: now,
146 finished_at: now,
147 },
148 );
149}
150
151pub const AUTO_UPDATE_TICK: Duration = Duration::from_secs(60);
152pub const AUTO_UPDATE_SHUTDOWN_TICK: Duration = Duration::from_millis(250);
158pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
162
163#[derive(Debug, Clone, Copy)]
165pub struct LoopSchedule {
166 pub ws_session: crate::ws::session::SessionSchedule,
167 pub auto_update_tick: Duration,
168 pub shutdown_tick: Duration,
172}
173
174impl Default for LoopSchedule {
175 fn default() -> Self {
176 Self {
177 ws_session: crate::ws::session::SessionSchedule::default(),
178 auto_update_tick: AUTO_UPDATE_TICK,
179 shutdown_tick: AUTO_UPDATE_SHUTDOWN_TICK,
180 }
181 }
182}
183
184impl LoopSchedule {
185 pub fn fast_for_tests() -> Self {
188 Self {
189 ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
190 auto_update_tick: Duration::from_millis(1),
191 shutdown_tick: Duration::from_millis(1),
192 }
193 }
194}
195
196#[derive(Debug, Clone, Default)]
202pub struct RegisterArgs {
203 pub api_base_url: Option<String>,
204 pub reset: bool,
205}
206
207pub async fn register(config_path: Option<&str>, args: RegisterArgs) -> Result<()> {
211 let (mut cfg, path) = config::load(config_path)?;
212
213 if args.reset {
214 cfg.worker_id = None;
215 cfg.auth_token = None;
216 cfg.registration_request_id = None;
217 cfg.registration_secret = None;
218 cfg.install_id = None;
219 }
220 if let Some(url) = args.api_base_url {
221 cfg.api_base_url = url;
222 }
223
224 config::save(&cfg, &path)?;
225 if args.reset {
226 info!(
227 config_path = %path.display(),
228 "local registration state cleared; next launch will auto-register"
229 );
230 println!(
231 "local registration state cleared; run `studio-worker run` or \
232 `studio-worker ui` to auto-register"
233 );
234 } else {
235 info!(
236 config_path = %path.display(),
237 "register flags persisted; next launch will auto-register"
238 );
239 println!(
240 "saved; run `studio-worker run` or `studio-worker ui` to auto-register against {}",
241 cfg.api_base_url
242 );
243 }
244 Ok(())
245}
246
247pub async fn status(config_path: Option<&str>) -> Result<()> {
248 let (cfg, path) = config::load(config_path)?;
249 println!("{}", format_status(&cfg, &path));
250 Ok(())
251}
252
253pub fn format_status(cfg: &Config, path: &std::path::Path) -> String {
254 let mut out = String::new();
255 use std::fmt::Write as _;
256 let _ = writeln!(out, "config path: {}", path.display());
257 let _ = writeln!(out, "api_base_url: {}", cfg.api_base_url);
258 let registration_line = if cfg.worker_id.is_some() && cfg.auth_token.is_some() {
259 format!("approved as {}", cfg.worker_id.as_deref().unwrap_or(""))
260 } else if let Some(rid) = cfg.registration_request_id.as_deref() {
261 format!("pending operator approval (request {rid})")
262 } else {
263 "not registered (will auto-register on next launch)".into()
264 };
265 let _ = writeln!(out, "registration: {registration_line}");
266 let _ = writeln!(out, "vram_threshold_gb: {}", cfg.vram_threshold_gb);
267 let _ = writeln!(out, "auto_start: {}", cfg.auto_start);
268 let _ = writeln!(out, "models_root: {}", cfg.models_root.display());
269 let _ = writeln!(out, "auto_update: {}", cfg.auto_update_enabled);
270 let _ = writeln!(
271 out,
272 "update_interval: {}s",
273 cfg.auto_update_interval_secs
274 );
275 out
276}
277
278pub fn set_threshold(config_path: Option<&str>, gb: f32) -> Result<()> {
279 if gb < 0.0 {
280 return Err(anyhow!("threshold must be >= 0"));
281 }
282 let (mut cfg, path) = config::load(config_path)?;
283 cfg.vram_threshold_gb = gb;
284 config::save(&cfg, &path)?;
285 info!(
286 target: TRACE_TARGET,
287 op = "set_threshold",
288 vram_threshold_gb = gb,
289 config_path = path.display().to_string(),
290 "VRAM threshold persisted"
291 );
292 println!("vram_threshold_gb = {gb}");
293 Ok(())
294}
295
296pub fn log_startup_banner(cfg: &Config, path: &std::path::Path) {
301 info!(
302 target: TRACE_TARGET,
303 op = "startup",
304 version = AGENT_VERSION,
305 config_path = path.display().to_string(),
306 api_base_url = cfg.api_base_url.as_str(),
307 vram_threshold_gb = cfg.vram_threshold_gb,
308 auto_start = cfg.auto_start,
309 auto_update_enabled = cfg.auto_update_enabled,
310 auto_update_interval_secs = cfg.auto_update_interval_secs,
311 models_root = cfg.models_root.display().to_string(),
312 worker_id = cfg.worker_id.as_deref().unwrap_or("(unregistered)"),
313 "studio-worker booting"
314 );
315}
316
317pub fn show_config(config_path: Option<&str>) -> Result<()> {
318 let (cfg, path) = config::load(config_path)?;
319 println!("# {}", path.display());
320 print!("{}", toml::to_string_pretty(&cfg)?);
321 Ok(())
322}
323
324pub async fn check_update(config_path: Option<&str>) -> Result<()> {
325 let (cfg, _) = config::load(config_path)?;
326 let current = semver::Version::parse(AGENT_VERSION)
327 .map_err(|e| anyhow!("invalid current version {AGENT_VERSION}: {e}"))?;
328 let outcome = tokio::task::spawn_blocking(move || {
329 update::check(&cfg.auto_update_feed, ¤t, cfg.auto_update_prerelease)
330 })
331 .await??;
332 println!("{}", format_check_outcome(&outcome));
333 Ok(())
334}
335
336pub fn format_check_outcome(outcome: &update::CheckOutcome) -> String {
337 match outcome {
338 update::CheckOutcome::UpToDate { current } => format!("up to date: {current}"),
339 update::CheckOutcome::NewerAvailable { current, latest } => {
340 format!("update available: {current} -> {latest}")
341 }
342 }
343}
344
345pub async fn run(config_path: Option<&str>) -> Result<()> {
350 let (cfg, path) = config::load(config_path)?;
351 log_startup_banner(&cfg, &path);
352
353 let cfg = config::shared(cfg);
354 let stop = Arc::new(AtomicBool::new(false));
355 let busy = Arc::new(AtomicBool::new(false));
356 let paused = Arc::new(AtomicBool::new(false));
359 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
360 let observers = WorkerObservers::default();
361 let registration = crate::auto_register::shared_initial();
362
363 let stop_clone = stop.clone();
364 tokio::spawn(async move {
365 let signal = wait_for_shutdown_signal().await;
366 request_shutdown(&stop_clone, signal);
367 });
368
369 if ensure_registered(&cfg, &path, ®istration, &stop).await? == RegistrationGate::Stopped {
375 info!(
376 target: TRACE_TARGET,
377 op = "shutdown",
378 "stopped before registration completed; exiting cleanly"
379 );
380 return Ok(());
381 }
382
383 run_loops(
384 cfg,
385 stop,
386 logs,
387 busy,
388 paused,
389 observers,
390 LoopSchedule::default(),
391 )
392 .await
393}
394
395pub fn request_shutdown(stop: &AtomicBool, signal: &str) {
401 let already_stopping = stop.swap(true, Ordering::SeqCst);
402 info!(
403 target: TRACE_TARGET,
404 op = "shutdown",
405 signal,
406 already_stopping,
407 "shutdown signal received; stopping worker gracefully"
408 );
409}
410
411#[cfg_attr(coverage_nightly, coverage(off))]
427async fn wait_for_shutdown_signal() -> &'static str {
428 #[cfg(unix)]
429 {
430 use tokio::signal::unix::{signal, SignalKind};
431 let mut sigterm = match signal(SignalKind::terminate()) {
432 Ok(s) => s,
433 Err(e) => {
434 warn!(
435 target: TRACE_TARGET,
436 op = "shutdown",
437 error = %e,
438 "could not install SIGTERM handler; falling back to Ctrl-C only"
439 );
440 let _ = tokio::signal::ctrl_c().await;
441 return "SIGINT";
442 }
443 };
444 tokio::select! {
445 _ = tokio::signal::ctrl_c() => "SIGINT",
446 _ = sigterm.recv() => "SIGTERM",
447 }
448 }
449 #[cfg(not(unix))]
450 {
451 let _ = tokio::signal::ctrl_c().await;
452 "ctrl-c"
453 }
454}
455
456#[derive(Debug, Clone, Copy, PartialEq, Eq)]
469pub enum RegistrationGate {
470 Ready,
473 Stopped,
475}
476
477pub async fn ensure_registered(
482 cfg: &SharedConfig,
483 path: &std::path::Path,
484 registration: &crate::auto_register::SharedRegistration,
485 stop: &Arc<AtomicBool>,
486) -> Result<RegistrationGate> {
487 use std::time::Duration;
488 loop {
489 if stop.load(Ordering::SeqCst) {
490 return Ok(RegistrationGate::Stopped);
491 }
492 {
493 let snap = cfg.lock();
494 if snap.worker_id.is_some() && snap.auth_token.is_some() {
495 return Ok(RegistrationGate::Ready);
496 }
497 }
498 let state = crate::auto_register::tick(cfg, path, registration).await;
499 match state {
500 crate::auto_register::RegistrationState::Approved => {
501 return Ok(RegistrationGate::Ready)
502 }
503 crate::auto_register::RegistrationState::Rejected { reason } => {
504 return Err(anyhow!(
505 "registration rejected by the studio operator: {reason}. \
506 Run `studio-worker register --reset` to clear local state \
507 and submit a fresh request."
508 ));
509 }
510 _ => {}
511 }
512 for _ in 0..30 {
514 if stop.load(Ordering::SeqCst) {
515 return Ok(RegistrationGate::Stopped);
516 }
517 tokio::time::sleep(Duration::from_secs(1)).await;
518 }
519 }
520}
521
522pub async fn run_loops(
530 cfg: SharedConfig,
531 stop: Arc<AtomicBool>,
532 logs: Arc<Mutex<Vec<LogEntry>>>,
533 busy: Arc<AtomicBool>,
534 paused: Arc<AtomicBool>,
535 observers: WorkerObservers,
536 schedule: LoopSchedule,
537) -> Result<()> {
538 let session = crate::ws::session::spawn_ws_session(
539 cfg.clone(),
540 stop.clone(),
541 logs.clone(),
542 busy.clone(),
543 paused.clone(),
544 observers.clone(),
545 schedule.ws_session,
546 );
547 let auto_updater = spawn_auto_updater(
548 cfg.clone(),
549 stop.clone(),
550 logs.clone(),
551 busy.clone(),
552 schedule,
553 );
554 let (session_result, _) = tokio::join!(session, auto_updater);
555 session_result
556}
557
558#[derive(Debug, Clone, PartialEq, Eq)]
569pub enum AutoUpdateDecision {
570 Disabled,
572 SkippedBusy,
574 UpToDate,
576 CheckError(String),
578 Updated,
580 UpdateError(String),
582}
583
584pub async fn auto_update_tick(
585 cfg: &Config,
586 busy: bool,
587 logs: &Arc<Mutex<Vec<LogEntry>>>,
588) -> AutoUpdateDecision {
589 if !cfg.auto_update_enabled {
590 return AutoUpdateDecision::Disabled;
591 }
592 if busy {
593 push_log(
594 logs,
595 "info",
596 "auto-update",
597 "skipping check: worker is busy on a job",
598 None,
599 );
600 return AutoUpdateDecision::SkippedBusy;
601 }
602 let feed = cfg.auto_update_feed.clone();
603 let prerelease = cfg.auto_update_prerelease;
604 let logs_for_task = logs.clone();
605 let outcome = tokio::task::spawn_blocking(move || -> Result<AutoUpdateDecision> {
606 let current = semver::Version::parse(AGENT_VERSION)
607 .map_err(|e| anyhow!("invalid AGENT_VERSION {AGENT_VERSION}: {e}"))?;
608 match update::check(&feed, ¤t, prerelease) {
609 Ok(update::CheckOutcome::UpToDate { current }) => {
610 push_log(
611 &logs_for_task,
612 "info",
613 "auto-update",
614 &format!("up to date at {current}"),
615 None,
616 );
617 Ok(AutoUpdateDecision::UpToDate)
618 }
619 Ok(update::CheckOutcome::NewerAvailable { current, latest }) => {
620 push_log(
621 &logs_for_task,
622 "info",
623 "auto-update",
624 &format!("update available {current} -> {latest}; applying"),
625 None,
626 );
627 match update::apply(&feed, &latest) {
628 Ok(()) => {
629 push_log(
630 &logs_for_task,
631 "info",
632 "auto-update",
633 "binary replaced; restart pending",
634 None,
635 );
636 Ok(AutoUpdateDecision::Updated)
637 }
638 Err(e) => {
639 push_log(
640 &logs_for_task,
641 "error",
642 "auto-update",
643 &format!("update failed: {e}"),
644 None,
645 );
646 Ok(AutoUpdateDecision::UpdateError(e.to_string()))
647 }
648 }
649 }
650 Err(e) => {
651 push_log(
652 &logs_for_task,
653 "warn",
654 "auto-update",
655 &format!("check failed: {e}"),
656 None,
657 );
658 Ok(AutoUpdateDecision::CheckError(e.to_string()))
659 }
660 }
661 })
662 .await;
663 match outcome {
664 Ok(Ok(decision)) => decision,
665 Ok(Err(e)) => AutoUpdateDecision::CheckError(e.to_string()),
666 Err(e) => AutoUpdateDecision::CheckError(e.to_string()),
667 }
668}
669
670pub(crate) async fn wait_with_stop(total: Duration, stop: &Arc<AtomicBool>, tick: Duration) {
685 let mut elapsed = Duration::ZERO;
686 while elapsed < total {
687 if stop.load(Ordering::SeqCst) {
688 return;
689 }
690 let next = tick.min(total - elapsed);
691 tokio::time::sleep(next).await;
692 elapsed += next;
693 }
694}
695
696pub fn spawn_auto_updater(
697 cfg: SharedConfig,
698 stop: Arc<AtomicBool>,
699 logs: Arc<Mutex<Vec<LogEntry>>>,
700 busy: Arc<AtomicBool>,
701 schedule: LoopSchedule,
702) -> tokio::task::JoinHandle<()> {
703 tokio::spawn(async move {
704 let mut elapsed = Duration::from_secs(0);
705 while !stop.load(Ordering::SeqCst) {
706 wait_with_stop(schedule.auto_update_tick, &stop, schedule.shutdown_tick).await;
711 if stop.load(Ordering::SeqCst) {
712 break;
713 }
714 elapsed += schedule.auto_update_tick;
715 let snapshot = cfg.lock().clone();
716 if elapsed < Duration::from_secs(snapshot.auto_update_interval_secs) {
717 continue;
718 }
719 elapsed = Duration::from_secs(0);
720 let busy_now = busy.load(Ordering::SeqCst);
721 let decision = auto_update_tick(&snapshot, busy_now, &logs).await;
722 if matches!(decision, AutoUpdateDecision::Updated) {
723 stop.store(true, Ordering::SeqCst);
724 update::restart_self();
725 }
726 }
727 })
728}
729
730pub fn prompt_for(task: &Task) -> String {
734 match task {
735 Task::Image(p) => p.prompt.clone(),
736 Task::Llm(p) => p
737 .messages
738 .last()
739 .map(|m| m.content.clone())
740 .unwrap_or_default(),
741 Task::AudioStt(p) => p.input_url.clone(),
742 Task::AudioTts(p) => p.text.clone(),
743 Task::Video(p) => p.prompt.clone(),
744 }
745}
746
747pub fn is_unsupported_kind(e: &anyhow::Error) -> bool {
748 e.chain().any(|cause| {
752 cause
753 .downcast_ref::<crate::engine::UnsupportedTask>()
754 .is_some()
755 }) || e.to_string().contains("cannot serve")
756}
757
758pub fn build_capabilities(cfg: &Config, engine: &dyn Engine) -> WorkerCapabilities {
763 build_capabilities_with(cfg, engine, true)
764}
765
766pub fn build_capabilities_with(
771 cfg: &Config,
772 engine: &dyn Engine,
773 auto_enabled: bool,
774) -> WorkerCapabilities {
775 let vram = sys::detect_vram_gb().unwrap_or(0.0);
776 let caps = engine.capabilities();
777 let supported_models_per_kind = caps.supported_models_per_kind.clone();
778 let task_kinds = caps.kinds();
779 let supported_models = {
783 let mut all = caps.flat_models();
784 all.sort();
785 all.dedup();
786 all
787 };
788
789 WorkerCapabilities {
790 machine_name: sys::machine_name(),
791 username: sys::username(),
792 agent_version: AGENT_VERSION.to_string(),
793 engine: engine.name().to_string(),
794 vram_total_gb: vram,
795 vram_threshold_gb: cfg.vram_threshold_gb,
796 auto_enabled,
797 auto_start: cfg.auto_start,
798 supported_models,
799 task_kinds,
800 supported_models_per_kind,
801 }
802}
803
804pub fn summarize_capabilities(caps: &WorkerCapabilities) -> String {
814 let kinds = caps
815 .task_kinds
816 .iter()
817 .map(|k| k.as_str())
818 .collect::<Vec<_>>()
819 .join(", ");
820 format!(
821 "advertising engine={}, vram={:.1}/{:.1}GB threshold, auto_enabled={}, \
822 kinds=[{}], {} model(s)=[{}]",
823 caps.engine,
824 caps.vram_total_gb,
825 caps.vram_threshold_gb,
826 caps.auto_enabled,
827 kinds,
828 caps.supported_models.len(),
829 caps.supported_models.join(", "),
830 )
831}
832
833pub fn vram_threshold_warning(caps: &WorkerCapabilities) -> Option<String> {
853 if caps.vram_total_gb > 0.0 && caps.vram_threshold_gb > caps.vram_total_gb {
854 Some(format!(
855 "configured VRAM threshold {:.1}GB exceeds detected GPU VRAM {:.1}GB; \
856 the studio may offer jobs larger than this card can fit and they will \
857 OOM on load — lower vram_threshold_gb to at or below {:.1}GB",
858 caps.vram_threshold_gb, caps.vram_total_gb, caps.vram_total_gb
859 ))
860 } else {
861 None
862 }
863}
864
865pub fn push_log(
866 logs: &Arc<Mutex<Vec<LogEntry>>>,
867 level: &str,
868 category: &str,
869 message: &str,
870 job_id: Option<String>,
871) {
872 push_log_with_observers(logs, None, level, category, message, job_id);
873}
874
875pub fn push_log_with_observers(
881 logs: &Arc<Mutex<Vec<LogEntry>>>,
882 observers: Option<&WorkerObservers>,
883 level: &str,
884 category: &str,
885 message: &str,
886 job_id: Option<String>,
887) {
888 let entry = LogEntry {
889 ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
890 level: level.to_string(),
891 category: category.to_string(),
892 message: message.to_string(),
893 job_id,
894 };
895 let job_id = entry.job_id.as_deref();
900 if level == "error" {
901 tracing::error!(target: "studio_worker", job_id, "[{category}] {message}");
902 } else if level == "warn" {
903 tracing::warn!(target: "studio_worker", job_id, "[{category}] {message}");
904 } else {
905 info!(target: "studio_worker", job_id, "[{category}] {message}");
906 }
907 {
908 let mut queue = logs.lock();
909 if queue.len() >= LOG_SHIP_QUEUE_CAP {
910 let overflow = queue.len() + 2 - LOG_SHIP_QUEUE_CAP;
912 queue.drain(0..overflow);
913 queue.push(LogEntry {
914 ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
915 level: "warn".to_string(),
916 category: "logs".to_string(),
917 message: format!(
918 "ship queue full ({LOG_SHIP_QUEUE_CAP} entries); dropped {overflow} oldest"
919 ),
920 job_id: None,
921 });
922 }
923 queue.push(entry.clone());
924 }
925 if let Some(o) = observers {
926 let mut ring = o.recent_logs.lock();
927 ring.push_back(entry);
928 while ring.len() > RECENT_LOGS_CAP {
929 ring.pop_front();
930 }
931 }
932}
933
934pub fn restore_unshipped(logs: &Arc<Mutex<Vec<LogEntry>>>, mut batch: Vec<LogEntry>) {
940 let mut queue = logs.lock();
941 batch.append(&mut queue);
942 *queue = batch;
943 if queue.len() > LOG_SHIP_QUEUE_CAP {
944 let overflow = queue.len() - LOG_SHIP_QUEUE_CAP;
945 queue.drain(0..overflow);
946 }
947}
948
949#[cfg(test)]
950mod tests {
951 use super::*;
952 use crate::config::Config;
953 use crate::engine::SyntheticEngine;
954
955 #[test]
956 fn is_unsupported_kind_detects_typed_unsupported_task() {
957 let err: anyhow::Error =
958 crate::engine::UnsupportedTask::new("synthetic", TaskKind::Llm).into();
959 assert!(is_unsupported_kind(&err));
960 assert!(err.to_string().contains("cannot serve llm"));
962 }
963
964 #[test]
965 fn is_unsupported_kind_survives_context_wrapping() {
966 let err = anyhow::Error::from(crate::engine::UnsupportedTask::new(
970 "sdcpp",
971 TaskKind::AudioTts,
972 ))
973 .context("dispatching job j-1");
974 assert!(is_unsupported_kind(&err));
975 }
976
977 fn entry(message: &str) -> LogEntry {
978 LogEntry {
979 ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
980 level: "info".into(),
981 category: "test".into(),
982 message: message.into(),
983 job_id: None,
984 }
985 }
986
987 #[test]
988 fn restore_unshipped_requeues_batch_ahead_of_newer_entries() {
989 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(vec![entry("newer")]));
993 restore_unshipped(&logs, vec![entry("batch-1"), entry("batch-2")]);
994 let queue = logs.lock();
995 let order: Vec<&str> = queue.iter().map(|e| e.message.as_str()).collect();
996 assert_eq!(order, vec!["batch-1", "batch-2", "newer"]);
997 }
998
999 #[test]
1000 fn restore_unshipped_respects_the_queue_cap() {
1001 let logs: Arc<Mutex<Vec<LogEntry>>> =
1004 Arc::new(Mutex::new(vec![entry("newest"); LOG_SHIP_QUEUE_CAP]));
1005 restore_unshipped(&logs, vec![entry("old-batch"); 100]);
1006 let queue = logs.lock();
1007 assert_eq!(queue.len(), LOG_SHIP_QUEUE_CAP);
1008 assert_eq!(
1009 queue.last().map(|e| e.message.as_str()),
1010 Some("newest"),
1011 "newest entries must survive the cap"
1012 );
1013 }
1014
1015 #[test]
1016 fn ship_queue_is_bounded_and_records_dropped_entries() {
1017 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1021 for i in 0..(LOG_SHIP_QUEUE_CAP + 100) {
1022 push_log_with_observers(&logs, None, "info", "test", &format!("entry {i}"), None);
1023 }
1024 let queue = logs.lock();
1025 assert!(
1026 queue.len() <= LOG_SHIP_QUEUE_CAP,
1027 "ship queue exceeded its cap: {}",
1028 queue.len()
1029 );
1030 assert_eq!(
1032 queue.last().map(|e| e.message.as_str()),
1033 Some(format!("entry {}", LOG_SHIP_QUEUE_CAP + 99).as_str())
1034 );
1035 assert!(
1037 queue
1038 .iter()
1039 .any(|e| e.level == "warn" && e.message.contains("dropped")),
1040 "overflow must leave a visible drop marker"
1041 );
1042 }
1043
1044 #[test]
1045 fn recent_logs_ring_is_bounded_at_recent_logs_cap() {
1046 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1054 let observers = WorkerObservers::default();
1055 let overflow = 25;
1056 for i in 0..(RECENT_LOGS_CAP + overflow) {
1057 push_log_with_observers(
1058 &logs,
1059 Some(&observers),
1060 "info",
1061 "test",
1062 &format!("entry {i}"),
1063 None,
1064 );
1065 }
1066 let ring = observers.recent_logs.lock();
1067 assert_eq!(
1068 ring.len(),
1069 RECENT_LOGS_CAP,
1070 "the recent-logs ring must cap at RECENT_LOGS_CAP"
1071 );
1072 assert_eq!(
1075 ring.back().map(|e| e.message.as_str()),
1076 Some(format!("entry {}", RECENT_LOGS_CAP + overflow - 1).as_str()),
1077 "the newest entry must survive at the back of the ring"
1078 );
1079 assert_eq!(
1080 ring.front().map(|e| e.message.as_str()),
1081 Some(format!("entry {overflow}").as_str()),
1082 "the oldest surviving entry must be entry #overflow (older evicted)"
1083 );
1084 }
1085
1086 #[test]
1087 fn capabilities_advertises_all_synthetic_kinds() {
1088 let cfg = Config::default();
1089 let engine = SyntheticEngine::new();
1090 let cap = build_capabilities(&cfg, &engine);
1091 assert_eq!(cap.engine, "synthetic");
1092 assert_eq!(cap.task_kinds.len(), TaskKind::ALL.len());
1093 assert!(cap.auto_enabled, "default capability snapshot is unpaused");
1094 for kind in TaskKind::ALL {
1095 assert!(cap.supported_models_per_kind.contains_key(&kind));
1096 }
1097 }
1098
1099 #[test]
1100 fn capabilities_with_paused_flag_drives_auto_enabled() {
1101 let cfg = Config::default();
1102 let engine = SyntheticEngine::new();
1103 let paused_caps = build_capabilities_with(&cfg, &engine, false);
1104 assert!(!paused_caps.auto_enabled);
1105 }
1106
1107 #[test]
1108 fn summarize_capabilities_lists_engine_kinds_models_vram_and_pause_state() {
1109 let cfg = Config {
1110 vram_threshold_gb: 6.0,
1111 ..Config::default()
1112 };
1113 let engine = SyntheticEngine::new();
1114 let caps = build_capabilities_with(&cfg, &engine, true);
1115 let summary = summarize_capabilities(&caps);
1116 assert!(summary.contains("engine=synthetic"), "got: {summary}");
1118 for kind in &caps.task_kinds {
1119 assert!(
1120 summary.contains(kind.as_str()),
1121 "missing kind {} in: {summary}",
1122 kind.as_str()
1123 );
1124 }
1125 assert!(
1127 summary.contains(&format!("{} model(s)", caps.supported_models.len())),
1128 "missing model count in: {summary}"
1129 );
1130 assert!(
1131 summary.contains("synthetic"),
1132 "missing model id in: {summary}"
1133 );
1134 assert!(
1136 summary.contains("6.0"),
1137 "missing vram threshold in: {summary}"
1138 );
1139 assert!(summary.contains("auto_enabled=true"), "got: {summary}");
1140 }
1141
1142 #[test]
1143 fn summarize_capabilities_reflects_paused_state() {
1144 let cfg = Config::default();
1145 let engine = SyntheticEngine::new();
1146 let caps = build_capabilities_with(&cfg, &engine, false);
1147 assert!(
1148 summarize_capabilities(&caps).contains("auto_enabled=false"),
1149 "paused worker must advertise auto_enabled=false"
1150 );
1151 }
1152
1153 fn caps_with_vram(total_gb: f32, threshold_gb: f32) -> WorkerCapabilities {
1157 let mut caps = build_capabilities_with(&Config::default(), &SyntheticEngine::new(), true);
1158 caps.vram_total_gb = total_gb;
1159 caps.vram_threshold_gb = threshold_gb;
1160 caps
1161 }
1162
1163 #[test]
1164 fn vram_threshold_warning_flags_threshold_above_detected_vram() {
1165 let warning = vram_threshold_warning(&caps_with_vram(8.0, 12.0))
1170 .expect("threshold above detected VRAM must warn");
1171 assert!(warning.contains("12.0"), "missing threshold in: {warning}");
1172 assert!(
1173 warning.contains("8.0"),
1174 "missing detected VRAM in: {warning}"
1175 );
1176 assert!(
1177 warning.contains("vram_threshold_gb"),
1178 "must name the config key to change: {warning}"
1179 );
1180 }
1181
1182 #[test]
1183 fn vram_threshold_warning_silent_when_threshold_within_detected_vram() {
1184 assert!(vram_threshold_warning(&caps_with_vram(24.0, 12.0)).is_none());
1186 }
1187
1188 #[test]
1189 fn vram_threshold_warning_silent_when_threshold_equals_detected() {
1190 assert!(vram_threshold_warning(&caps_with_vram(12.0, 12.0)).is_none());
1193 }
1194
1195 #[test]
1196 fn vram_threshold_warning_silent_when_vram_undetected() {
1197 assert!(vram_threshold_warning(&caps_with_vram(0.0, 12.0)).is_none());
1202 }
1203
1204 #[test]
1205 fn prompt_for_extracts_per_kind() {
1206 let image = Task::Image(ImageParams {
1207 prompt: "a stone golem".into(),
1208 ..Default::default()
1209 });
1210 assert_eq!(prompt_for(&image), "a stone golem");
1211
1212 let llm = Task::Llm(LlmParams {
1213 messages: vec![
1214 ChatMessage {
1215 role: "system".into(),
1216 content: "be helpful".into(),
1217 },
1218 ChatMessage {
1219 role: "user".into(),
1220 content: "hi".into(),
1221 },
1222 ],
1223 max_tokens: 32,
1224 temperature: 0.5,
1225 ..Default::default()
1226 });
1227 assert_eq!(prompt_for(&llm), "hi");
1228
1229 let llm_empty = Task::Llm(LlmParams {
1230 messages: vec![],
1231 ..Default::default()
1232 });
1233 assert_eq!(prompt_for(&llm_empty), "");
1234
1235 let stt = Task::AudioStt(AudioSttParams {
1236 input_url: "https://example.com/clip.wav".into(),
1237 ..Default::default()
1238 });
1239 assert_eq!(prompt_for(&stt), "https://example.com/clip.wav");
1240
1241 let tts = Task::AudioTts(AudioTtsParams {
1242 text: "hi there".into(),
1243 voice: "v".into(),
1244 ext: "wav".into(),
1245 ..Default::default()
1246 });
1247 assert_eq!(prompt_for(&tts), "hi there");
1248
1249 let video = Task::Video(VideoParams {
1250 prompt: "a tiny dragon".into(),
1251 seconds: 1.0,
1252 width: 256,
1253 height: 256,
1254 ext: "mp4".into(),
1255 ..Default::default()
1256 });
1257 assert_eq!(prompt_for(&video), "a tiny dragon");
1258 }
1259
1260 #[test]
1261 fn truncate_prompt_passes_short_through_and_clips_long_prompts() {
1262 let short = "a stone golem";
1264 assert_eq!(truncate_prompt(short), short);
1265
1266 let exactly = "x".repeat(PROMPT_PREVIEW_CHARS);
1268 assert_eq!(
1269 truncate_prompt(&exactly),
1270 exactly,
1271 "a prompt exactly at the cap must not be clipped"
1272 );
1273
1274 let over = "y".repeat(PROMPT_PREVIEW_CHARS + 1);
1277 let clipped = truncate_prompt(&over);
1278 assert_eq!(
1279 clipped.chars().count(),
1280 PROMPT_PREVIEW_CHARS + 1,
1281 "clipped preview is the cap plus one ellipsis char"
1282 );
1283 assert!(
1284 clipped.ends_with('\u{2026}'),
1285 "a clipped preview ends with an ellipsis"
1286 );
1287 assert_eq!(
1288 clipped
1289 .chars()
1290 .take(PROMPT_PREVIEW_CHARS)
1291 .collect::<String>(),
1292 "y".repeat(PROMPT_PREVIEW_CHARS),
1293 "the kept prefix is the first PROMPT_PREVIEW_CHARS chars"
1294 );
1295 }
1296
1297 #[test]
1298 fn truncate_prompt_clips_on_char_boundaries_for_multibyte_text() {
1299 let multibyte = "\u{3042}".repeat(PROMPT_PREVIEW_CHARS + 1);
1304 let clipped = truncate_prompt(&multibyte);
1305 assert_eq!(clipped.chars().count(), PROMPT_PREVIEW_CHARS + 1);
1306 assert!(clipped.ends_with('\u{2026}'));
1307 assert_eq!(
1308 clipped.chars().filter(|c| *c == '\u{3042}').count(),
1309 PROMPT_PREVIEW_CHARS,
1310 "exactly PROMPT_PREVIEW_CHARS multibyte chars survive the clip"
1311 );
1312 }
1313
1314 #[test]
1315 fn is_unsupported_kind_matches_engine_message() {
1316 let err = anyhow!("multi engine cannot serve llm tasks");
1317 assert!(is_unsupported_kind(&err));
1318 let other = anyhow!("network timeout");
1319 assert!(!is_unsupported_kind(&other));
1320 }
1321
1322 #[test]
1323 fn format_status_includes_every_field() {
1324 let cfg = Config::default();
1325 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1326 assert!(out.contains("config path:"));
1327 assert!(out.contains("api_base_url:"));
1328 assert!(out.contains("registration:"));
1329 assert!(out.contains("not registered"));
1330 assert!(out.contains("models_root:"));
1331 assert!(out.contains("auto_update:"));
1332 assert!(out.contains("update_interval:"));
1333 }
1334
1335 #[test]
1336 fn format_status_shows_worker_id_when_registered() {
1337 let cfg = Config {
1338 worker_id: Some("w-abc".into()),
1339 auth_token: Some("tok".into()),
1340 ..Config::default()
1341 };
1342 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1343 assert!(out.contains("w-abc"));
1344 assert!(out.contains("approved"));
1345 }
1346
1347 #[test]
1348 fn format_status_shows_pending_request_id() {
1349 let cfg = Config {
1350 registration_request_id: Some("rr-7".into()),
1351 ..Config::default()
1352 };
1353 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
1354 assert!(out.contains("pending operator approval"));
1355 assert!(out.contains("rr-7"));
1356 }
1357
1358 #[test]
1359 fn format_check_outcome_handles_both_branches() {
1360 let up = update::CheckOutcome::UpToDate {
1361 current: semver::Version::new(1, 2, 3),
1362 };
1363 assert!(format_check_outcome(&up).contains("up to date"));
1364 let newer = update::CheckOutcome::NewerAvailable {
1365 current: semver::Version::new(1, 2, 3),
1366 latest: semver::Version::new(1, 3, 0),
1367 };
1368 let s = format_check_outcome(&newer);
1369 assert!(s.contains("1.2.3 -> 1.3.0"));
1370 }
1371
1372 #[test]
1373 fn push_log_appends_an_entry() {
1374 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1375 push_log(&logs, "info", "test", "hi", None);
1376 push_log(&logs, "warn", "test", "wat", Some("j-1".into()));
1377 push_log(&logs, "error", "test", "boom", None);
1378 let v = logs.lock();
1379 assert_eq!(v.len(), 3);
1380 assert_eq!(v[0].level, "info");
1381 assert_eq!(v[1].level, "warn");
1382 assert_eq!(v[1].job_id.as_deref(), Some("j-1"));
1383 assert_eq!(v[2].level, "error");
1384 }
1385
1386 #[test]
1387 fn push_log_emits_job_id_as_a_structured_tracing_field() {
1388 use crate::test_support::capture;
1393 let logs = capture(|| {
1394 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1395 push_log(
1396 &logs,
1397 "info",
1398 "ws",
1399 "binary upload ok",
1400 Some("job-42".into()),
1401 );
1402 });
1403 assert!(
1404 logs.contains("job_id=\"job-42\""),
1405 "expected structured job_id field, got: {logs}"
1406 );
1407 assert!(
1408 logs.contains("[ws] binary upload ok"),
1409 "expected the human-readable message to survive, got: {logs}"
1410 );
1411 }
1412
1413 #[test]
1414 fn push_log_omits_job_id_field_when_absent() {
1415 use crate::test_support::capture;
1418 let logs = capture(|| {
1419 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1420 push_log(&logs, "info", "auto-update", "up to date", None);
1421 });
1422 assert!(
1423 !logs.contains("job_id"),
1424 "expected no job_id field for a jobless log, got: {logs}"
1425 );
1426 }
1427
1428 #[test]
1431 fn request_shutdown_sets_the_stop_flag() {
1432 let stop = AtomicBool::new(false);
1433 request_shutdown(&stop, "SIGTERM");
1434 assert!(stop.load(Ordering::SeqCst));
1435 }
1436
1437 #[test]
1438 fn request_shutdown_reconfirms_when_already_stopping() {
1439 let stop = AtomicBool::new(true);
1442 request_shutdown(&stop, "SIGINT");
1443 assert!(stop.load(Ordering::SeqCst));
1444 }
1445
1446 #[test]
1447 fn request_shutdown_emits_a_named_shutdown_breadcrumb() {
1448 use crate::test_support::capture;
1449 let logs = capture(|| {
1450 let stop = AtomicBool::new(false);
1451 request_shutdown(&stop, "SIGTERM");
1452 });
1453 assert!(logs.contains("INFO"), "expected INFO event, got: {logs}");
1454 assert!(
1455 logs.contains("studio_worker::runtime"),
1456 "expected runtime target, got: {logs}"
1457 );
1458 assert!(
1459 logs.contains("op=\"shutdown\""),
1460 "expected op field, got: {logs}"
1461 );
1462 assert!(
1463 logs.contains("signal=\"SIGTERM\""),
1464 "expected signal field, got: {logs}"
1465 );
1466 }
1467
1468 #[tokio::test]
1469 async fn auto_update_tick_disabled_when_flag_off() {
1470 let cfg = Config {
1471 auto_update_enabled: false,
1472 ..Config::default()
1473 };
1474 let logs = Arc::new(Mutex::new(Vec::new()));
1475 let decision = auto_update_tick(&cfg, false, &logs).await;
1476 assert_eq!(decision, AutoUpdateDecision::Disabled);
1477 }
1478
1479 #[tokio::test]
1480 async fn auto_update_tick_skipped_when_busy() {
1481 let cfg = Config {
1482 auto_update_enabled: true,
1483 ..Config::default()
1484 };
1485 let logs = Arc::new(Mutex::new(Vec::new()));
1486 let decision = auto_update_tick(&cfg, true, &logs).await;
1487 assert_eq!(decision, AutoUpdateDecision::SkippedBusy);
1488 let entries = logs.lock();
1489 assert!(entries.iter().any(|e| e.message.contains("busy on a job")));
1490 }
1491
1492 #[tokio::test]
1493 async fn wait_with_stop_short_circuits_when_already_stopped() {
1494 let stop = Arc::new(AtomicBool::new(true));
1495 let start = std::time::Instant::now();
1496 wait_with_stop(Duration::from_secs(60), &stop, Duration::from_millis(10)).await;
1497 assert!(
1498 start.elapsed() < Duration::from_millis(100),
1499 "an already-set stop must return without sleeping the full duration"
1500 );
1501 }
1502
1503 #[tokio::test]
1504 async fn auto_updater_stops_promptly_during_idle_wait() {
1505 let cfg = crate::config::shared(Config {
1511 auto_update_enabled: false,
1512 ..Config::default()
1513 });
1514 let stop = Arc::new(AtomicBool::new(false));
1515 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
1516 let busy = Arc::new(AtomicBool::new(false));
1517 let schedule = LoopSchedule {
1518 ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
1519 auto_update_tick: Duration::from_secs(3600),
1520 shutdown_tick: Duration::from_millis(1),
1521 };
1522 let handle = spawn_auto_updater(cfg, stop.clone(), logs, busy, schedule);
1523 tokio::time::sleep(Duration::from_millis(10)).await;
1525 stop.store(true, Ordering::SeqCst);
1526 tokio::time::timeout(Duration::from_millis(250), handle)
1527 .await
1528 .expect("auto-updater did not observe stop promptly")
1529 .expect("auto-updater task panicked");
1530 }
1531}