use crate::{
config::{self, Config, SharedConfig},
engine::Engine,
sys,
types::*,
update, AGENT_VERSION,
};
use anyhow::{anyhow, Result};
use chrono::{DateTime, SecondsFormat, Utc};
use parking_lot::Mutex;
use std::{
collections::VecDeque,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use tracing::{info, warn};
const TRACE_TARGET: &str = "studio_worker::runtime";
pub const RECENT_JOBS_CAP: usize = 50;
pub const RECENT_LOGS_CAP: usize = 1000;
pub const PROMPT_PREVIEW_CHARS: usize = 200;
#[derive(Debug, Clone)]
pub struct CurrentJob {
pub job_id: String,
pub kind: TaskKind,
pub model: String,
pub prompt: String,
pub started_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum JobOutcome {
Completed,
Failed { reason: String },
}
#[derive(Debug, Clone)]
pub struct RecentJob {
pub job_id: String,
pub kind: TaskKind,
pub model: String,
pub prompt: String,
pub outcome: JobOutcome,
pub started_at: DateTime<Utc>,
pub finished_at: DateTime<Utc>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HeartbeatOutcome {
Ok,
Err { reason: String },
}
#[derive(Debug, Clone)]
pub struct HeartbeatStatus {
pub last_attempt_at: DateTime<Utc>,
pub outcome: HeartbeatOutcome,
}
#[derive(Clone, Default)]
pub struct WorkerObservers {
pub current_job: Arc<Mutex<Option<CurrentJob>>>,
pub recent_jobs: Arc<Mutex<VecDeque<RecentJob>>>,
pub last_heartbeat: Arc<Mutex<Option<HeartbeatStatus>>>,
pub recent_logs: Arc<Mutex<VecDeque<LogEntry>>>,
}
pub fn truncate_prompt(s: &str) -> String {
if s.chars().count() <= PROMPT_PREVIEW_CHARS {
return s.to_string();
}
let mut out: String = s.chars().take(PROMPT_PREVIEW_CHARS).collect();
out.push('…');
out
}
pub fn record_recent_job(observers: &WorkerObservers, entry: RecentJob) {
let mut ring = observers.recent_jobs.lock();
ring.push_front(entry);
while ring.len() > RECENT_JOBS_CAP {
ring.pop_back();
}
}
#[doc(hidden)]
pub fn push_recent_job_for_tests(observers: &WorkerObservers, job_id: &str) {
let now = Utc::now();
record_recent_job(
observers,
RecentJob {
job_id: job_id.to_string(),
kind: TaskKind::Image,
model: "synthetic".into(),
prompt: String::new(),
outcome: JobOutcome::Completed,
started_at: now,
finished_at: now,
},
);
}
pub const AUTO_UPDATE_TICK: Duration = Duration::from_secs(60);
pub const AUTO_UPDATE_SHUTDOWN_TICK: Duration = Duration::from_millis(250);
pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug, Clone, Copy)]
pub struct LoopSchedule {
pub ws_session: crate::ws::session::SessionSchedule,
pub auto_update_tick: Duration,
pub shutdown_tick: Duration,
}
impl Default for LoopSchedule {
fn default() -> Self {
Self {
ws_session: crate::ws::session::SessionSchedule::default(),
auto_update_tick: AUTO_UPDATE_TICK,
shutdown_tick: AUTO_UPDATE_SHUTDOWN_TICK,
}
}
}
impl LoopSchedule {
pub fn fast_for_tests() -> Self {
Self {
ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
auto_update_tick: Duration::from_millis(1),
shutdown_tick: Duration::from_millis(1),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RegisterArgs {
pub api_base_url: Option<String>,
pub reset: bool,
}
pub async fn register(config_path: Option<&str>, args: RegisterArgs) -> Result<()> {
let (mut cfg, path) = config::load(config_path)?;
if args.reset {
cfg.worker_id = None;
cfg.auth_token = None;
cfg.registration_request_id = None;
cfg.registration_secret = None;
cfg.install_id = None;
}
if let Some(url) = args.api_base_url {
cfg.api_base_url = url;
}
config::save(&cfg, &path)?;
if args.reset {
info!(
config_path = %path.display(),
"local registration state cleared; next launch will auto-register"
);
println!(
"local registration state cleared; run `studio-worker run` or \
`studio-worker ui` to auto-register"
);
} else {
info!(
config_path = %path.display(),
"register flags persisted; next launch will auto-register"
);
println!(
"saved; run `studio-worker run` or `studio-worker ui` to auto-register against {}",
cfg.api_base_url
);
}
Ok(())
}
pub async fn status(config_path: Option<&str>) -> Result<()> {
let (cfg, path) = config::load(config_path)?;
println!("{}", format_status(&cfg, &path));
Ok(())
}
pub fn format_status(cfg: &Config, path: &std::path::Path) -> String {
let mut out = String::new();
use std::fmt::Write as _;
let _ = writeln!(out, "config path: {}", path.display());
let _ = writeln!(out, "api_base_url: {}", cfg.api_base_url);
let registration_line = if cfg.worker_id.is_some() && cfg.auth_token.is_some() {
format!("approved as {}", cfg.worker_id.as_deref().unwrap_or(""))
} else if let Some(rid) = cfg.registration_request_id.as_deref() {
format!("pending operator approval (request {rid})")
} else {
"not registered (will auto-register on next launch)".into()
};
let _ = writeln!(out, "registration: {registration_line}");
let _ = writeln!(out, "vram_threshold_gb: {}", cfg.vram_threshold_gb);
let _ = writeln!(out, "auto_start: {}", cfg.auto_start);
let _ = writeln!(out, "models_root: {}", cfg.models_root.display());
let _ = writeln!(out, "auto_update: {}", cfg.auto_update_enabled);
let _ = writeln!(
out,
"update_interval: {}s",
cfg.auto_update_interval_secs
);
out
}
pub fn set_threshold(config_path: Option<&str>, gb: f32) -> Result<()> {
if gb < 0.0 {
return Err(anyhow!("threshold must be >= 0"));
}
let (mut cfg, path) = config::load(config_path)?;
cfg.vram_threshold_gb = gb;
config::save(&cfg, &path)?;
info!(
target: TRACE_TARGET,
op = "set_threshold",
vram_threshold_gb = gb,
config_path = path.display().to_string(),
"VRAM threshold persisted"
);
println!("vram_threshold_gb = {gb}");
Ok(())
}
pub fn log_startup_banner(cfg: &Config, path: &std::path::Path) {
info!(
target: TRACE_TARGET,
op = "startup",
version = AGENT_VERSION,
config_path = path.display().to_string(),
api_base_url = cfg.api_base_url.as_str(),
vram_threshold_gb = cfg.vram_threshold_gb,
auto_start = cfg.auto_start,
auto_update_enabled = cfg.auto_update_enabled,
auto_update_interval_secs = cfg.auto_update_interval_secs,
models_root = cfg.models_root.display().to_string(),
worker_id = cfg.worker_id.as_deref().unwrap_or("(unregistered)"),
"studio-worker booting"
);
}
pub fn show_config(config_path: Option<&str>) -> Result<()> {
let (cfg, path) = config::load(config_path)?;
println!("# {}", path.display());
print!("{}", toml::to_string_pretty(&cfg)?);
Ok(())
}
pub async fn check_update(config_path: Option<&str>) -> Result<()> {
let (cfg, _) = config::load(config_path)?;
let current = semver::Version::parse(AGENT_VERSION)
.map_err(|e| anyhow!("invalid current version {AGENT_VERSION}: {e}"))?;
let outcome = tokio::task::spawn_blocking(move || {
update::check(&cfg.auto_update_feed, ¤t, cfg.auto_update_prerelease)
})
.await??;
println!("{}", format_check_outcome(&outcome));
Ok(())
}
pub fn format_check_outcome(outcome: &update::CheckOutcome) -> String {
match outcome {
update::CheckOutcome::UpToDate { current } => format!("up to date: {current}"),
update::CheckOutcome::NewerAvailable { current, latest } => {
format!("update available: {current} -> {latest}")
}
}
}
pub async fn run(config_path: Option<&str>) -> Result<()> {
let (cfg, path) = config::load(config_path)?;
log_startup_banner(&cfg, &path);
let cfg = config::shared(cfg);
let stop = Arc::new(AtomicBool::new(false));
let busy = Arc::new(AtomicBool::new(false));
let paused = Arc::new(AtomicBool::new(false));
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
let observers = WorkerObservers::default();
let registration = crate::auto_register::shared_initial();
let stop_clone = stop.clone();
tokio::spawn(async move {
let signal = wait_for_shutdown_signal().await;
request_shutdown(&stop_clone, signal);
});
ensure_registered(&cfg, &path, ®istration, &stop).await?;
run_loops(
cfg,
stop,
logs,
busy,
paused,
observers,
LoopSchedule::default(),
)
.await
}
pub fn request_shutdown(stop: &AtomicBool, signal: &str) {
let already_stopping = stop.swap(true, Ordering::SeqCst);
info!(
target: TRACE_TARGET,
op = "shutdown",
signal,
already_stopping,
"shutdown signal received; stopping worker gracefully"
);
}
#[cfg_attr(coverage_nightly, coverage(off))]
async fn wait_for_shutdown_signal() -> &'static str {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(s) => s,
Err(e) => {
warn!(
target: TRACE_TARGET,
op = "shutdown",
error = %e,
"could not install SIGTERM handler; falling back to Ctrl-C only"
);
let _ = tokio::signal::ctrl_c().await;
return "SIGINT";
}
};
tokio::select! {
_ = tokio::signal::ctrl_c() => "SIGINT",
_ = sigterm.recv() => "SIGTERM",
}
}
#[cfg(not(unix))]
{
let _ = tokio::signal::ctrl_c().await;
"ctrl-c"
}
}
pub async fn ensure_registered(
cfg: &SharedConfig,
path: &std::path::Path,
registration: &crate::auto_register::SharedRegistration,
stop: &Arc<AtomicBool>,
) -> Result<()> {
use std::time::Duration;
loop {
if stop.load(Ordering::SeqCst) {
return Err(anyhow!("shutdown before registration completed"));
}
{
let snap = cfg.lock();
if snap.worker_id.is_some() && snap.auth_token.is_some() {
return Ok(());
}
}
let state = crate::auto_register::tick(cfg, path, registration).await;
match state {
crate::auto_register::RegistrationState::Approved => return Ok(()),
crate::auto_register::RegistrationState::Rejected { reason } => {
return Err(anyhow!(
"registration rejected by the studio operator: {reason}. \
Run `studio-worker register --reset` to clear local state \
and submit a fresh request."
));
}
_ => {}
}
for _ in 0..30 {
if stop.load(Ordering::SeqCst) {
return Err(anyhow!("shutdown during registration wait"));
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
pub async fn run_loops(
cfg: SharedConfig,
stop: Arc<AtomicBool>,
logs: Arc<Mutex<Vec<LogEntry>>>,
busy: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
observers: WorkerObservers,
schedule: LoopSchedule,
) -> Result<()> {
let session = crate::ws::session::spawn_ws_session(
cfg.clone(),
stop.clone(),
logs.clone(),
busy.clone(),
paused.clone(),
observers.clone(),
schedule.ws_session,
);
let auto_updater = spawn_auto_updater(
cfg.clone(),
stop.clone(),
logs.clone(),
busy.clone(),
schedule,
);
let (session_result, _) = tokio::join!(session, auto_updater);
session_result
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AutoUpdateDecision {
Disabled,
SkippedBusy,
UpToDate,
CheckError(String),
Updated,
UpdateError(String),
}
pub async fn auto_update_tick(
cfg: &Config,
busy: bool,
logs: &Arc<Mutex<Vec<LogEntry>>>,
) -> AutoUpdateDecision {
if !cfg.auto_update_enabled {
return AutoUpdateDecision::Disabled;
}
if busy {
push_log(
logs,
"info",
"auto-update",
"skipping check: worker is busy on a job",
None,
);
return AutoUpdateDecision::SkippedBusy;
}
let feed = cfg.auto_update_feed.clone();
let prerelease = cfg.auto_update_prerelease;
let logs_for_task = logs.clone();
let outcome = tokio::task::spawn_blocking(move || -> Result<AutoUpdateDecision> {
let current = semver::Version::parse(AGENT_VERSION)
.map_err(|e| anyhow!("invalid AGENT_VERSION {AGENT_VERSION}: {e}"))?;
match update::check(&feed, ¤t, prerelease) {
Ok(update::CheckOutcome::UpToDate { current }) => {
push_log(
&logs_for_task,
"info",
"auto-update",
&format!("up to date at {current}"),
None,
);
Ok(AutoUpdateDecision::UpToDate)
}
Ok(update::CheckOutcome::NewerAvailable { current, latest }) => {
push_log(
&logs_for_task,
"info",
"auto-update",
&format!("update available {current} -> {latest}; applying"),
None,
);
match update::apply(&feed, &latest) {
Ok(()) => {
push_log(
&logs_for_task,
"info",
"auto-update",
"binary replaced; restart pending",
None,
);
Ok(AutoUpdateDecision::Updated)
}
Err(e) => {
push_log(
&logs_for_task,
"error",
"auto-update",
&format!("update failed: {e}"),
None,
);
Ok(AutoUpdateDecision::UpdateError(e.to_string()))
}
}
}
Err(e) => {
push_log(
&logs_for_task,
"warn",
"auto-update",
&format!("check failed: {e}"),
None,
);
Ok(AutoUpdateDecision::CheckError(e.to_string()))
}
}
})
.await;
match outcome {
Ok(Ok(decision)) => decision,
Ok(Err(e)) => AutoUpdateDecision::CheckError(e.to_string()),
Err(e) => AutoUpdateDecision::CheckError(e.to_string()),
}
}
pub(crate) async fn wait_with_stop(total: Duration, stop: &Arc<AtomicBool>, tick: Duration) {
let mut elapsed = Duration::ZERO;
while elapsed < total {
if stop.load(Ordering::SeqCst) {
return;
}
let next = tick.min(total - elapsed);
tokio::time::sleep(next).await;
elapsed += next;
}
}
pub fn spawn_auto_updater(
cfg: SharedConfig,
stop: Arc<AtomicBool>,
logs: Arc<Mutex<Vec<LogEntry>>>,
busy: Arc<AtomicBool>,
schedule: LoopSchedule,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut elapsed = Duration::from_secs(0);
while !stop.load(Ordering::SeqCst) {
wait_with_stop(schedule.auto_update_tick, &stop, schedule.shutdown_tick).await;
if stop.load(Ordering::SeqCst) {
break;
}
elapsed += schedule.auto_update_tick;
let snapshot = cfg.lock().clone();
if elapsed < Duration::from_secs(snapshot.auto_update_interval_secs) {
continue;
}
elapsed = Duration::from_secs(0);
let busy_now = busy.load(Ordering::SeqCst);
let decision = auto_update_tick(&snapshot, busy_now, &logs).await;
if matches!(decision, AutoUpdateDecision::Updated) {
stop.store(true, Ordering::SeqCst);
update::restart_self();
}
}
})
}
pub fn prompt_for(task: &Task) -> String {
match task {
Task::Image(p) => p.prompt.clone(),
Task::Llm(p) => p
.messages
.last()
.map(|m| m.content.clone())
.unwrap_or_default(),
Task::AudioStt(p) => p.input_url.clone(),
Task::AudioTts(p) => p.text.clone(),
Task::Video(p) => p.prompt.clone(),
}
}
pub fn is_unsupported_kind(e: &anyhow::Error) -> bool {
e.to_string().contains("cannot serve")
}
pub fn build_capabilities(cfg: &Config, engine: &dyn Engine) -> WorkerCapabilities {
build_capabilities_with(cfg, engine, true)
}
pub fn build_capabilities_with(
cfg: &Config,
engine: &dyn Engine,
auto_enabled: bool,
) -> WorkerCapabilities {
let vram = sys::detect_vram_gb().unwrap_or(0.0);
let caps = engine.capabilities();
let supported_models_per_kind = caps.supported_models_per_kind.clone();
let task_kinds = caps.kinds();
let supported_models = {
let mut all = caps.flat_models();
all.sort();
all.dedup();
all
};
WorkerCapabilities {
machine_name: sys::machine_name(),
username: sys::username(),
agent_version: AGENT_VERSION.to_string(),
engine: engine.name().to_string(),
vram_total_gb: vram,
vram_threshold_gb: cfg.vram_threshold_gb,
auto_enabled,
auto_start: cfg.auto_start,
supported_models,
task_kinds,
supported_models_per_kind,
}
}
pub fn summarize_capabilities(caps: &WorkerCapabilities) -> String {
let kinds = caps
.task_kinds
.iter()
.map(|k| k.as_str())
.collect::<Vec<_>>()
.join(", ");
format!(
"advertising engine={}, vram={:.1}/{:.1}GB threshold, auto_enabled={}, \
kinds=[{}], {} model(s)=[{}]",
caps.engine,
caps.vram_total_gb,
caps.vram_threshold_gb,
caps.auto_enabled,
kinds,
caps.supported_models.len(),
caps.supported_models.join(", "),
)
}
pub fn push_log(
logs: &Arc<Mutex<Vec<LogEntry>>>,
level: &str,
category: &str,
message: &str,
job_id: Option<String>,
) {
push_log_with_observers(logs, None, level, category, message, job_id);
}
pub fn push_log_with_observers(
logs: &Arc<Mutex<Vec<LogEntry>>>,
observers: Option<&WorkerObservers>,
level: &str,
category: &str,
message: &str,
job_id: Option<String>,
) {
let entry = LogEntry {
ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
level: level.to_string(),
category: category.to_string(),
message: message.to_string(),
job_id,
};
let job_id = entry.job_id.as_deref();
if level == "error" {
tracing::error!(target: "studio_worker", job_id, "[{category}] {message}");
} else if level == "warn" {
tracing::warn!(target: "studio_worker", job_id, "[{category}] {message}");
} else {
info!(target: "studio_worker", job_id, "[{category}] {message}");
}
logs.lock().push(entry.clone());
if let Some(o) = observers {
let mut ring = o.recent_logs.lock();
ring.push_back(entry);
while ring.len() > RECENT_LOGS_CAP {
ring.pop_front();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::Config;
use crate::engine::SyntheticEngine;
#[test]
fn capabilities_advertises_all_synthetic_kinds() {
let cfg = Config::default();
let engine = SyntheticEngine::new();
let cap = build_capabilities(&cfg, &engine);
assert_eq!(cap.engine, "synthetic");
assert_eq!(cap.task_kinds.len(), TaskKind::ALL.len());
assert!(cap.auto_enabled, "default capability snapshot is unpaused");
for kind in TaskKind::ALL {
assert!(cap.supported_models_per_kind.contains_key(&kind));
}
}
#[test]
fn capabilities_with_paused_flag_drives_auto_enabled() {
let cfg = Config::default();
let engine = SyntheticEngine::new();
let paused_caps = build_capabilities_with(&cfg, &engine, false);
assert!(!paused_caps.auto_enabled);
}
#[test]
fn summarize_capabilities_lists_engine_kinds_models_vram_and_pause_state() {
let cfg = Config {
vram_threshold_gb: 6.0,
..Config::default()
};
let engine = SyntheticEngine::new();
let caps = build_capabilities_with(&cfg, &engine, true);
let summary = summarize_capabilities(&caps);
assert!(summary.contains("engine=synthetic"), "got: {summary}");
for kind in &caps.task_kinds {
assert!(
summary.contains(kind.as_str()),
"missing kind {} in: {summary}",
kind.as_str()
);
}
assert!(
summary.contains(&format!("{} model(s)", caps.supported_models.len())),
"missing model count in: {summary}"
);
assert!(
summary.contains("synthetic"),
"missing model id in: {summary}"
);
assert!(
summary.contains("6.0"),
"missing vram threshold in: {summary}"
);
assert!(summary.contains("auto_enabled=true"), "got: {summary}");
}
#[test]
fn summarize_capabilities_reflects_paused_state() {
let cfg = Config::default();
let engine = SyntheticEngine::new();
let caps = build_capabilities_with(&cfg, &engine, false);
assert!(
summarize_capabilities(&caps).contains("auto_enabled=false"),
"paused worker must advertise auto_enabled=false"
);
}
#[test]
fn prompt_for_extracts_per_kind() {
let image = Task::Image(ImageParams {
prompt: "a stone golem".into(),
..Default::default()
});
assert_eq!(prompt_for(&image), "a stone golem");
let llm = Task::Llm(LlmParams {
messages: vec![
ChatMessage {
role: "system".into(),
content: "be helpful".into(),
},
ChatMessage {
role: "user".into(),
content: "hi".into(),
},
],
max_tokens: 32,
temperature: 0.5,
..Default::default()
});
assert_eq!(prompt_for(&llm), "hi");
let llm_empty = Task::Llm(LlmParams {
messages: vec![],
..Default::default()
});
assert_eq!(prompt_for(&llm_empty), "");
let stt = Task::AudioStt(AudioSttParams {
input_url: "https://example.com/clip.wav".into(),
..Default::default()
});
assert_eq!(prompt_for(&stt), "https://example.com/clip.wav");
let tts = Task::AudioTts(AudioTtsParams {
text: "hi there".into(),
voice: "v".into(),
ext: "wav".into(),
..Default::default()
});
assert_eq!(prompt_for(&tts), "hi there");
let video = Task::Video(VideoParams {
prompt: "a tiny dragon".into(),
seconds: 1.0,
width: 256,
height: 256,
ext: "mp4".into(),
..Default::default()
});
assert_eq!(prompt_for(&video), "a tiny dragon");
}
#[test]
fn is_unsupported_kind_matches_engine_message() {
let err = anyhow!("multi engine cannot serve llm tasks");
assert!(is_unsupported_kind(&err));
let other = anyhow!("network timeout");
assert!(!is_unsupported_kind(&other));
}
#[test]
fn format_status_includes_every_field() {
let cfg = Config::default();
let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
assert!(out.contains("config path:"));
assert!(out.contains("api_base_url:"));
assert!(out.contains("registration:"));
assert!(out.contains("not registered"));
assert!(out.contains("models_root:"));
assert!(out.contains("auto_update:"));
assert!(out.contains("update_interval:"));
}
#[test]
fn format_status_shows_worker_id_when_registered() {
let cfg = Config {
worker_id: Some("w-abc".into()),
auth_token: Some("tok".into()),
..Config::default()
};
let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
assert!(out.contains("w-abc"));
assert!(out.contains("approved"));
}
#[test]
fn format_status_shows_pending_request_id() {
let cfg = Config {
registration_request_id: Some("rr-7".into()),
..Config::default()
};
let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
assert!(out.contains("pending operator approval"));
assert!(out.contains("rr-7"));
}
#[test]
fn format_check_outcome_handles_both_branches() {
let up = update::CheckOutcome::UpToDate {
current: semver::Version::new(1, 2, 3),
};
assert!(format_check_outcome(&up).contains("up to date"));
let newer = update::CheckOutcome::NewerAvailable {
current: semver::Version::new(1, 2, 3),
latest: semver::Version::new(1, 3, 0),
};
let s = format_check_outcome(&newer);
assert!(s.contains("1.2.3 -> 1.3.0"));
}
#[test]
fn push_log_appends_an_entry() {
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
push_log(&logs, "info", "test", "hi", None);
push_log(&logs, "warn", "test", "wat", Some("j-1".into()));
push_log(&logs, "error", "test", "boom", None);
let v = logs.lock();
assert_eq!(v.len(), 3);
assert_eq!(v[0].level, "info");
assert_eq!(v[1].level, "warn");
assert_eq!(v[1].job_id.as_deref(), Some("j-1"));
assert_eq!(v[2].level, "error");
}
#[test]
fn push_log_emits_job_id_as_a_structured_tracing_field() {
use crate::test_support::capture;
let logs = capture(|| {
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
push_log(
&logs,
"info",
"ws",
"binary upload ok",
Some("job-42".into()),
);
});
assert!(
logs.contains("job_id=\"job-42\""),
"expected structured job_id field, got: {logs}"
);
assert!(
logs.contains("[ws] binary upload ok"),
"expected the human-readable message to survive, got: {logs}"
);
}
#[test]
fn push_log_omits_job_id_field_when_absent() {
use crate::test_support::capture;
let logs = capture(|| {
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
push_log(&logs, "info", "auto-update", "up to date", None);
});
assert!(
!logs.contains("job_id"),
"expected no job_id field for a jobless log, got: {logs}"
);
}
#[test]
fn request_shutdown_sets_the_stop_flag() {
let stop = AtomicBool::new(false);
request_shutdown(&stop, "SIGTERM");
assert!(stop.load(Ordering::SeqCst));
}
#[test]
fn request_shutdown_reconfirms_when_already_stopping() {
let stop = AtomicBool::new(true);
request_shutdown(&stop, "SIGINT");
assert!(stop.load(Ordering::SeqCst));
}
#[test]
fn request_shutdown_emits_a_named_shutdown_breadcrumb() {
use crate::test_support::capture;
let logs = capture(|| {
let stop = AtomicBool::new(false);
request_shutdown(&stop, "SIGTERM");
});
assert!(logs.contains("INFO"), "expected INFO event, got: {logs}");
assert!(
logs.contains("studio_worker::runtime"),
"expected runtime target, got: {logs}"
);
assert!(
logs.contains("op=\"shutdown\""),
"expected op field, got: {logs}"
);
assert!(
logs.contains("signal=\"SIGTERM\""),
"expected signal field, got: {logs}"
);
}
#[tokio::test]
async fn auto_update_tick_disabled_when_flag_off() {
let cfg = Config {
auto_update_enabled: false,
..Config::default()
};
let logs = Arc::new(Mutex::new(Vec::new()));
let decision = auto_update_tick(&cfg, false, &logs).await;
assert_eq!(decision, AutoUpdateDecision::Disabled);
}
#[tokio::test]
async fn auto_update_tick_skipped_when_busy() {
let cfg = Config {
auto_update_enabled: true,
..Config::default()
};
let logs = Arc::new(Mutex::new(Vec::new()));
let decision = auto_update_tick(&cfg, true, &logs).await;
assert_eq!(decision, AutoUpdateDecision::SkippedBusy);
let entries = logs.lock();
assert!(entries.iter().any(|e| e.message.contains("busy on a job")));
}
#[tokio::test]
async fn wait_with_stop_short_circuits_when_already_stopped() {
let stop = Arc::new(AtomicBool::new(true));
let start = std::time::Instant::now();
wait_with_stop(Duration::from_secs(60), &stop, Duration::from_millis(10)).await;
assert!(
start.elapsed() < Duration::from_millis(100),
"an already-set stop must return without sleeping the full duration"
);
}
#[tokio::test]
async fn auto_updater_stops_promptly_during_idle_wait() {
let cfg = crate::config::shared(Config {
auto_update_enabled: false,
..Config::default()
});
let stop = Arc::new(AtomicBool::new(false));
let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
let busy = Arc::new(AtomicBool::new(false));
let schedule = LoopSchedule {
ws_session: crate::ws::session::SessionSchedule::fast_for_tests(),
auto_update_tick: Duration::from_secs(3600),
shutdown_tick: Duration::from_millis(1),
};
let handle = spawn_auto_updater(cfg, stop.clone(), logs, busy, schedule);
tokio::time::sleep(Duration::from_millis(10)).await;
stop.store(true, Ordering::SeqCst);
tokio::time::timeout(Duration::from_millis(250), handle)
.await
.expect("auto-updater did not observe stop promptly")
.expect("auto-updater task panicked");
}
}