1use crate::{
9 config::{self, Config, SharedConfig},
10 engine::Engine,
11 sys,
12 types::*,
13 update, AGENT_VERSION,
14};
15use anyhow::{anyhow, Result};
16use chrono::{DateTime, SecondsFormat, Utc};
17use parking_lot::Mutex;
18use std::{
19 collections::VecDeque,
20 sync::{
21 atomic::{AtomicBool, Ordering},
22 Arc,
23 },
24 time::Duration,
25};
26use tracing::info;
27
28const TRACE_TARGET: &str = "studio_worker::runtime";
31
32pub const RECENT_JOBS_CAP: usize = 50;
35
36pub const PROMPT_PREVIEW_CHARS: usize = 200;
40
41#[derive(Debug, Clone)]
44pub struct CurrentJob {
45 pub job_id: String,
46 pub kind: TaskKind,
47 pub model: String,
48 pub prompt: String,
49 pub started_at: DateTime<Utc>,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
55pub enum JobOutcome {
56 Completed,
57 Failed { reason: String },
58}
59
60#[derive(Debug, Clone)]
62pub struct RecentJob {
63 pub job_id: String,
64 pub kind: TaskKind,
65 pub model: String,
66 pub prompt: String,
67 pub outcome: JobOutcome,
68 pub started_at: DateTime<Utc>,
69 pub finished_at: DateTime<Utc>,
70}
71
72#[derive(Debug, Clone, PartialEq, Eq)]
74pub enum HeartbeatOutcome {
75 Ok,
76 Err { reason: String },
77}
78
79#[derive(Debug, Clone)]
80pub struct HeartbeatStatus {
81 pub last_attempt_at: DateTime<Utc>,
82 pub outcome: HeartbeatOutcome,
83}
84
85#[derive(Clone, Default)]
90pub struct WorkerObservers {
91 pub current_job: Arc<Mutex<Option<CurrentJob>>>,
92 pub recent_jobs: Arc<Mutex<VecDeque<RecentJob>>>,
93 pub last_heartbeat: Arc<Mutex<Option<HeartbeatStatus>>>,
94}
95
96pub fn truncate_prompt(s: &str) -> String {
97 if s.chars().count() <= PROMPT_PREVIEW_CHARS {
98 return s.to_string();
99 }
100 let mut out: String = s.chars().take(PROMPT_PREVIEW_CHARS).collect();
101 out.push('…');
102 out
103}
104
105pub fn record_recent_job(observers: &WorkerObservers, entry: RecentJob) {
106 let mut ring = observers.recent_jobs.lock();
107 ring.push_front(entry);
108 while ring.len() > RECENT_JOBS_CAP {
109 ring.pop_back();
110 }
111}
112
113#[doc(hidden)]
117pub fn push_recent_job_for_tests(observers: &WorkerObservers, job_id: &str) {
118 let now = Utc::now();
119 record_recent_job(
120 observers,
121 RecentJob {
122 job_id: job_id.to_string(),
123 kind: TaskKind::Image,
124 model: "synthetic".into(),
125 prompt: String::new(),
126 outcome: JobOutcome::Completed,
127 started_at: now,
128 finished_at: now,
129 },
130 );
131}
132
133pub const AUTO_UPDATE_TICK: Duration = Duration::from_secs(60);
134pub const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
138
139#[derive(Debug, Clone, Copy)]
142pub struct LoopSchedule {
143 pub auto_update_tick: Duration,
144}
145
146impl Default for LoopSchedule {
147 fn default() -> Self {
148 Self {
149 auto_update_tick: AUTO_UPDATE_TICK,
150 }
151 }
152}
153
154impl LoopSchedule {
155 pub fn fast_for_tests() -> Self {
158 Self {
159 auto_update_tick: Duration::from_millis(1),
160 }
161 }
162}
163
164#[derive(Debug, Clone, Default)]
170pub struct RegisterArgs {
171 pub api_base_url: Option<String>,
172 pub label: Option<String>,
173 pub reset: bool,
174}
175
176pub async fn register(config_path: Option<&str>, args: RegisterArgs) -> Result<()> {
180 let (mut cfg, path) = config::load(config_path)?;
181
182 if args.reset {
183 cfg.worker_id = None;
184 cfg.auth_token = None;
185 cfg.registration_request_id = None;
186 cfg.registration_secret = None;
187 cfg.install_id = None;
188 }
189 if let Some(url) = args.api_base_url {
190 cfg.api_base_url = url;
191 }
192 if let Some(label) = args.label {
193 cfg.label = if label.trim().is_empty() {
194 None
195 } else {
196 Some(label)
197 };
198 }
199
200 config::save(&cfg, &path)?;
201 if args.reset {
202 info!(
203 config_path = %path.display(),
204 "local registration state cleared; next launch will auto-register"
205 );
206 println!(
207 "local registration state cleared; run `studio-worker run` or \
208 `studio-worker ui` to auto-register"
209 );
210 } else {
211 info!(
212 config_path = %path.display(),
213 "register flags persisted; next launch will auto-register"
214 );
215 println!(
216 "saved; run `studio-worker run` or `studio-worker ui` to auto-register against {}",
217 cfg.api_base_url
218 );
219 }
220 Ok(())
221}
222
223pub async fn status(config_path: Option<&str>) -> Result<()> {
224 let (cfg, path) = config::load(config_path)?;
225 println!("{}", format_status(&cfg, &path));
226 Ok(())
227}
228
229pub fn format_status(cfg: &Config, path: &std::path::Path) -> String {
230 let mut out = String::new();
231 use std::fmt::Write as _;
232 let _ = writeln!(out, "config path: {}", path.display());
233 let _ = writeln!(out, "api_base_url: {}", cfg.api_base_url);
234 let registration_line = if cfg.worker_id.is_some() && cfg.auth_token.is_some() {
235 format!("approved as {}", cfg.worker_id.as_deref().unwrap_or(""))
236 } else if let Some(rid) = cfg.registration_request_id.as_deref() {
237 format!("pending operator approval (request {rid})")
238 } else {
239 "not registered (will auto-register on next launch)".into()
240 };
241 let _ = writeln!(out, "registration: {registration_line}");
242 if let Some(label) = cfg.label.as_deref() {
243 let _ = writeln!(out, "label: {label}");
244 }
245 let _ = writeln!(out, "engine: {}", cfg.engine);
246 let _ = writeln!(out, "vram_threshold_gb: {}", cfg.vram_threshold_gb);
247 let _ = writeln!(out, "auto_enabled: {}", cfg.auto_enabled);
248 let _ = writeln!(out, "auto_start: {}", cfg.auto_start);
249 let _ = writeln!(out, "auto_update: {}", cfg.auto_update_enabled);
250 let _ = writeln!(
251 out,
252 "update_interval: {}s",
253 cfg.auto_update_interval_secs
254 );
255 out
256}
257
258pub fn set_enabled(config_path: Option<&str>, enabled: bool) -> Result<()> {
259 let (mut cfg, path) = config::load(config_path)?;
260 cfg.auto_enabled = enabled;
261 config::save(&cfg, &path)?;
262 info!(
263 target: TRACE_TARGET,
264 op = "set_enabled",
265 auto_enabled = enabled,
266 config_path = path.display().to_string(),
267 "auto-claim flag persisted"
268 );
269 println!("auto_enabled = {enabled}");
270 Ok(())
271}
272
273pub fn set_threshold(config_path: Option<&str>, gb: f32) -> Result<()> {
274 if gb < 0.0 {
275 return Err(anyhow!("threshold must be >= 0"));
276 }
277 let (mut cfg, path) = config::load(config_path)?;
278 cfg.vram_threshold_gb = gb;
279 config::save(&cfg, &path)?;
280 info!(
281 target: TRACE_TARGET,
282 op = "set_threshold",
283 vram_threshold_gb = gb,
284 config_path = path.display().to_string(),
285 "VRAM threshold persisted"
286 );
287 println!("vram_threshold_gb = {gb}");
288 Ok(())
289}
290
291pub fn log_startup_banner(cfg: &Config, path: &std::path::Path) {
296 info!(
297 target: TRACE_TARGET,
298 op = "startup",
299 version = AGENT_VERSION,
300 config_path = path.display().to_string(),
301 api_base_url = cfg.api_base_url.as_str(),
302 engine = cfg.engine.as_str(),
303 vram_threshold_gb = cfg.vram_threshold_gb,
304 auto_enabled = cfg.auto_enabled,
305 auto_update_enabled = cfg.auto_update_enabled,
306 auto_update_interval_secs = cfg.auto_update_interval_secs,
307 worker_id = cfg.worker_id.as_deref().unwrap_or("(unregistered)"),
308 "studio-worker booting"
309 );
310}
311
312pub fn show_config(config_path: Option<&str>) -> Result<()> {
313 let (cfg, path) = config::load(config_path)?;
314 println!("# {}", path.display());
315 print!("{}", toml::to_string_pretty(&cfg)?);
316 Ok(())
317}
318
319pub async fn check_update(config_path: Option<&str>) -> Result<()> {
320 let (cfg, _) = config::load(config_path)?;
321 let current = semver::Version::parse(AGENT_VERSION)
322 .map_err(|e| anyhow!("invalid current version {AGENT_VERSION}: {e}"))?;
323 let outcome = tokio::task::spawn_blocking(move || {
324 update::check(&cfg.auto_update_feed, ¤t, cfg.auto_update_prerelease)
325 })
326 .await??;
327 println!("{}", format_check_outcome(&outcome));
328 Ok(())
329}
330
331pub fn format_check_outcome(outcome: &update::CheckOutcome) -> String {
332 match outcome {
333 update::CheckOutcome::UpToDate { current } => format!("up to date: {current}"),
334 update::CheckOutcome::NewerAvailable { current, latest } => {
335 format!("update available: {current} -> {latest}")
336 }
337 }
338}
339
340pub async fn run(config_path: Option<&str>) -> Result<()> {
345 let (cfg, path) = config::load(config_path)?;
346 log_startup_banner(&cfg, &path);
347
348 let cfg = config::shared(cfg);
349 let stop = Arc::new(AtomicBool::new(false));
350 let busy = Arc::new(AtomicBool::new(false));
351 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
352 let observers = WorkerObservers::default();
353 let registration = crate::auto_register::shared_initial();
354
355 let stop_clone = stop.clone();
356 tokio::spawn(async move {
357 let _ = tokio::signal::ctrl_c().await;
358 stop_clone.store(true, Ordering::SeqCst);
359 });
360
361 ensure_registered(&cfg, &path, ®istration, &stop).await?;
364
365 run_loops(cfg, stop, logs, busy, observers, LoopSchedule::default()).await
366}
367
368pub async fn ensure_registered(
371 cfg: &SharedConfig,
372 path: &std::path::Path,
373 registration: &crate::auto_register::SharedRegistration,
374 stop: &Arc<AtomicBool>,
375) -> Result<()> {
376 use std::time::Duration;
377 loop {
378 if stop.load(Ordering::SeqCst) {
379 return Err(anyhow!("shutdown before registration completed"));
380 }
381 {
382 let snap = cfg.lock();
383 if snap.worker_id.is_some() && snap.auth_token.is_some() {
384 return Ok(());
385 }
386 }
387 let state = crate::auto_register::tick(cfg, path, registration).await;
388 match state {
389 crate::auto_register::RegistrationState::Approved => return Ok(()),
390 crate::auto_register::RegistrationState::Rejected { reason } => {
391 return Err(anyhow!(
392 "registration rejected by the studio operator: {reason}. \
393 Run `studio-worker register --reset` to clear local state \
394 and submit a fresh request."
395 ));
396 }
397 _ => {}
398 }
399 for _ in 0..30 {
401 if stop.load(Ordering::SeqCst) {
402 return Err(anyhow!("shutdown during registration wait"));
403 }
404 tokio::time::sleep(Duration::from_secs(1)).await;
405 }
406 }
407}
408
409pub async fn run_loops(
412 cfg: SharedConfig,
413 stop: Arc<AtomicBool>,
414 logs: Arc<Mutex<Vec<LogEntry>>>,
415 busy: Arc<AtomicBool>,
416 observers: WorkerObservers,
417 schedule: LoopSchedule,
418) -> Result<()> {
419 let session_schedule = crate::ws::session::SessionSchedule::default();
420 let session = crate::ws::session::spawn_ws_session(
421 cfg.clone(),
422 stop.clone(),
423 logs.clone(),
424 busy.clone(),
425 observers.clone(),
426 session_schedule,
427 );
428 let auto_updater = spawn_auto_updater(
429 cfg.clone(),
430 stop.clone(),
431 logs.clone(),
432 busy.clone(),
433 schedule,
434 );
435 let (session_result, _) = tokio::join!(session, auto_updater);
436 session_result
437}
438
439#[derive(Debug, Clone, PartialEq, Eq)]
450pub enum AutoUpdateDecision {
451 Disabled,
453 SkippedBusy,
455 UpToDate,
457 CheckError(String),
459 Updated,
461 UpdateError(String),
463}
464
465pub async fn auto_update_tick(
466 cfg: &Config,
467 busy: bool,
468 logs: &Arc<Mutex<Vec<LogEntry>>>,
469) -> AutoUpdateDecision {
470 if !cfg.auto_update_enabled {
471 return AutoUpdateDecision::Disabled;
472 }
473 if busy {
474 push_log(
475 logs,
476 "info",
477 "auto-update",
478 "skipping check: worker is busy on a job",
479 None,
480 );
481 return AutoUpdateDecision::SkippedBusy;
482 }
483 let feed = cfg.auto_update_feed.clone();
484 let prerelease = cfg.auto_update_prerelease;
485 let logs_for_task = logs.clone();
486 let outcome = tokio::task::spawn_blocking(move || -> Result<AutoUpdateDecision> {
487 let current = semver::Version::parse(AGENT_VERSION)
488 .map_err(|e| anyhow!("invalid AGENT_VERSION {AGENT_VERSION}: {e}"))?;
489 match update::check(&feed, ¤t, prerelease) {
490 Ok(update::CheckOutcome::UpToDate { current }) => {
491 push_log(
492 &logs_for_task,
493 "info",
494 "auto-update",
495 &format!("up to date at {current}"),
496 None,
497 );
498 Ok(AutoUpdateDecision::UpToDate)
499 }
500 Ok(update::CheckOutcome::NewerAvailable { current, latest }) => {
501 push_log(
502 &logs_for_task,
503 "info",
504 "auto-update",
505 &format!("update available {current} -> {latest}; applying"),
506 None,
507 );
508 match update::apply(&feed, &latest) {
509 Ok(()) => {
510 push_log(
511 &logs_for_task,
512 "info",
513 "auto-update",
514 "binary replaced; restart pending",
515 None,
516 );
517 Ok(AutoUpdateDecision::Updated)
518 }
519 Err(e) => {
520 push_log(
521 &logs_for_task,
522 "error",
523 "auto-update",
524 &format!("update failed: {e}"),
525 None,
526 );
527 Ok(AutoUpdateDecision::UpdateError(e.to_string()))
528 }
529 }
530 }
531 Err(e) => {
532 push_log(
533 &logs_for_task,
534 "warn",
535 "auto-update",
536 &format!("check failed: {e}"),
537 None,
538 );
539 Ok(AutoUpdateDecision::CheckError(e.to_string()))
540 }
541 }
542 })
543 .await;
544 match outcome {
545 Ok(Ok(decision)) => decision,
546 Ok(Err(e)) => AutoUpdateDecision::CheckError(e.to_string()),
547 Err(e) => AutoUpdateDecision::CheckError(e.to_string()),
548 }
549}
550
551pub fn spawn_auto_updater(
561 cfg: SharedConfig,
562 stop: Arc<AtomicBool>,
563 logs: Arc<Mutex<Vec<LogEntry>>>,
564 busy: Arc<AtomicBool>,
565 schedule: LoopSchedule,
566) -> tokio::task::JoinHandle<()> {
567 tokio::spawn(async move {
568 let mut elapsed = Duration::from_secs(0);
569 while !stop.load(Ordering::SeqCst) {
570 tokio::time::sleep(schedule.auto_update_tick).await;
571 elapsed += schedule.auto_update_tick;
572 let snapshot = cfg.lock().clone();
573 if elapsed < Duration::from_secs(snapshot.auto_update_interval_secs) {
574 continue;
575 }
576 elapsed = Duration::from_secs(0);
577 let busy_now = busy.load(Ordering::SeqCst);
578 let decision = auto_update_tick(&snapshot, busy_now, &logs).await;
579 if matches!(decision, AutoUpdateDecision::Updated) {
580 stop.store(true, Ordering::SeqCst);
581 update::restart_self();
582 }
583 }
584 })
585}
586
587pub fn prompt_for(task: &Task) -> String {
591 match task {
592 Task::Image(p) => p.prompt.clone(),
593 Task::Llm(p) => p
594 .messages
595 .last()
596 .map(|m| m.content.clone())
597 .unwrap_or_default(),
598 Task::AudioStt(p) => p.input_url.clone(),
599 Task::AudioTts(p) => p.text.clone(),
600 Task::Video(p) => p.prompt.clone(),
601 }
602}
603
604pub fn is_unsupported_kind(e: &anyhow::Error) -> bool {
605 e.to_string().contains("cannot serve")
606}
607
608pub fn build_capabilities(cfg: &Config, engine: &dyn Engine) -> WorkerCapabilities {
613 let vram = sys::detect_vram_gb().unwrap_or(0.0);
614 let caps = engine.capabilities();
615 let supported_models_per_kind = caps.supported_models_per_kind.clone();
616 let task_kinds = caps.kinds();
617 let supported_models = {
621 let mut all = caps.flat_models();
622 all.sort();
623 all.dedup();
624 all
625 };
626 let supported_models = if cfg.supported_models_override.is_empty() {
627 supported_models
628 } else {
629 cfg.supported_models_override.clone()
630 };
631
632 WorkerCapabilities {
633 machine_name: sys::machine_name(),
634 username: sys::username(),
635 agent_version: AGENT_VERSION.to_string(),
636 engine: cfg.engine.clone(),
637 vram_total_gb: vram,
638 vram_threshold_gb: cfg.vram_threshold_gb,
639 auto_enabled: cfg.auto_enabled,
640 auto_start: cfg.auto_start,
641 supported_models,
642 task_kinds,
643 supported_models_per_kind,
644 }
645}
646
647pub fn push_log(
648 logs: &Arc<Mutex<Vec<LogEntry>>>,
649 level: &str,
650 category: &str,
651 message: &str,
652 job_id: Option<String>,
653) {
654 let entry = LogEntry {
655 ts: Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true),
656 level: level.to_string(),
657 category: category.to_string(),
658 message: message.to_string(),
659 job_id,
660 };
661 if level == "error" {
662 tracing::error!(target: "studio_worker", "[{category}] {message}");
663 } else if level == "warn" {
664 tracing::warn!(target: "studio_worker", "[{category}] {message}");
665 } else {
666 info!(target: "studio_worker", "[{category}] {message}");
667 }
668 logs.lock().push(entry);
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674 use crate::config::Config;
675 use crate::engine::SyntheticEngine;
676
677 #[test]
678 fn capabilities_advertises_all_synthetic_kinds() {
679 let cfg = Config::default();
680 let engine = SyntheticEngine::new(vec![]);
681 let cap = build_capabilities(&cfg, &engine);
682 assert_eq!(cap.engine, "synthetic");
683 assert_eq!(cap.task_kinds.len(), TaskKind::ALL.len());
684 for kind in TaskKind::ALL {
685 assert!(cap.supported_models_per_kind.contains_key(&kind));
686 }
687 }
688
689 #[test]
690 fn capabilities_uses_override_for_legacy_flat_list() {
691 let cfg = Config {
692 supported_models_override: vec!["only-this".into()],
693 ..Config::default()
694 };
695 let engine = SyntheticEngine::new(vec![]);
696 let cap = build_capabilities(&cfg, &engine);
697 assert_eq!(cap.supported_models, vec!["only-this".to_string()]);
698 }
699
700 #[test]
701 fn prompt_for_extracts_per_kind() {
702 let image = Task::Image(ImageParams {
703 prompt: "a stone golem".into(),
704 width: 512,
705 height: 512,
706 steps: 20,
707 seed: None,
708 ext: "webp".into(),
709 });
710 assert_eq!(prompt_for(&image), "a stone golem");
711
712 let llm = Task::Llm(LlmParams {
713 messages: vec![
714 ChatMessage {
715 role: "system".into(),
716 content: "be helpful".into(),
717 },
718 ChatMessage {
719 role: "user".into(),
720 content: "hi".into(),
721 },
722 ],
723 max_tokens: 32,
724 temperature: 0.5,
725 });
726 assert_eq!(prompt_for(&llm), "hi");
727
728 let llm_empty = Task::Llm(LlmParams {
729 messages: vec![],
730 max_tokens: 1,
731 temperature: 0.0,
732 });
733 assert_eq!(prompt_for(&llm_empty), "");
734
735 let stt = Task::AudioStt(AudioSttParams {
736 input_url: "https://example.com/clip.wav".into(),
737 language: None,
738 });
739 assert_eq!(prompt_for(&stt), "https://example.com/clip.wav");
740
741 let tts = Task::AudioTts(AudioTtsParams {
742 text: "hi there".into(),
743 voice: "v".into(),
744 ext: "wav".into(),
745 });
746 assert_eq!(prompt_for(&tts), "hi there");
747
748 let video = Task::Video(VideoParams {
749 prompt: "a tiny dragon".into(),
750 seconds: 1.0,
751 width: 256,
752 height: 256,
753 ext: "mp4".into(),
754 });
755 assert_eq!(prompt_for(&video), "a tiny dragon");
756 }
757
758 #[test]
759 fn is_unsupported_kind_matches_engine_message() {
760 let err = anyhow!("gradio engine cannot serve llm tasks");
761 assert!(is_unsupported_kind(&err));
762 let other = anyhow!("network timeout");
763 assert!(!is_unsupported_kind(&other));
764 }
765
766 #[test]
767 fn format_status_includes_every_field() {
768 let cfg = Config::default();
769 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
770 assert!(out.contains("config path:"));
771 assert!(out.contains("api_base_url:"));
772 assert!(out.contains("registration:"));
773 assert!(out.contains("not registered"));
774 assert!(out.contains("auto_update:"));
775 assert!(out.contains("update_interval:"));
776 }
777
778 #[test]
779 fn format_status_shows_worker_id_when_registered() {
780 let cfg = Config {
781 worker_id: Some("w-abc".into()),
782 auth_token: Some("tok".into()),
783 ..Config::default()
784 };
785 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
786 assert!(out.contains("w-abc"));
787 assert!(out.contains("approved"));
788 }
789
790 #[test]
791 fn format_status_shows_pending_request_id() {
792 let cfg = Config {
793 registration_request_id: Some("rr-7".into()),
794 ..Config::default()
795 };
796 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
797 assert!(out.contains("pending operator approval"));
798 assert!(out.contains("rr-7"));
799 }
800
801 #[test]
802 fn format_status_shows_label_when_set() {
803 let cfg = Config {
804 label: Some("alice's rig".into()),
805 ..Config::default()
806 };
807 let out = format_status(&cfg, std::path::Path::new("/tmp/x.toml"));
808 assert!(out.contains("alice's rig"));
809 }
810
811 #[test]
812 fn format_check_outcome_handles_both_branches() {
813 let up = update::CheckOutcome::UpToDate {
814 current: semver::Version::new(1, 2, 3),
815 };
816 assert!(format_check_outcome(&up).contains("up to date"));
817 let newer = update::CheckOutcome::NewerAvailable {
818 current: semver::Version::new(1, 2, 3),
819 latest: semver::Version::new(1, 3, 0),
820 };
821 let s = format_check_outcome(&newer);
822 assert!(s.contains("1.2.3 -> 1.3.0"));
823 }
824
825 #[test]
826 fn push_log_appends_an_entry() {
827 let logs: Arc<Mutex<Vec<LogEntry>>> = Arc::new(Mutex::new(Vec::new()));
828 push_log(&logs, "info", "test", "hi", None);
829 push_log(&logs, "warn", "test", "wat", Some("j-1".into()));
830 push_log(&logs, "error", "test", "boom", None);
831 let v = logs.lock();
832 assert_eq!(v.len(), 3);
833 assert_eq!(v[0].level, "info");
834 assert_eq!(v[1].level, "warn");
835 assert_eq!(v[1].job_id.as_deref(), Some("j-1"));
836 assert_eq!(v[2].level, "error");
837 }
838
839 #[tokio::test]
842 async fn auto_update_tick_disabled_when_flag_off() {
843 let cfg = Config {
844 auto_update_enabled: false,
845 ..Config::default()
846 };
847 let logs = Arc::new(Mutex::new(Vec::new()));
848 let decision = auto_update_tick(&cfg, false, &logs).await;
849 assert_eq!(decision, AutoUpdateDecision::Disabled);
850 }
851
852 #[tokio::test]
853 async fn auto_update_tick_skipped_when_busy() {
854 let cfg = Config {
855 auto_update_enabled: true,
856 ..Config::default()
857 };
858 let logs = Arc::new(Mutex::new(Vec::new()));
859 let decision = auto_update_tick(&cfg, true, &logs).await;
860 assert_eq!(decision, AutoUpdateDecision::SkippedBusy);
861 let entries = logs.lock();
862 assert!(entries.iter().any(|e| e.message.contains("busy on a job")));
863 }
864}