1use std::sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc,
15};
16use std::time::Duration;
17
18use anyhow::{anyhow, Result};
19use parking_lot::Mutex;
20use tokio::sync::mpsc;
21use tracing::{info, warn};
22
23use crate::config::SharedConfig;
24use crate::engine::Engine;
25use crate::http::ApiClient;
26use crate::runtime::{
27 is_unsupported_kind, prompt_for, push_log_with_observers, record_recent_job, truncate_prompt,
28 wait_with_stop, CurrentJob, JobOutcome, RecentJob, WorkerObservers,
29};
30use crate::types::{LogEntry, TaskResult};
31use crate::ws::client::{connect, WsClientError, WsResult, WsSender};
32use crate::ws::types::{HelloFrame, JobOfferClaim, WorkerInbound, WorkerOutbound};
33
34const TRACE_TARGET: &str = "studio_worker::ws::session";
36
37const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
38const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(1);
39const SHUTDOWN_TICK: Duration = Duration::from_millis(250);
40const BASE_BACKOFF_MS: u64 = 1_000;
41const MAX_BACKOFF_MS: u64 = 30_000;
42const DEFAULT_RECONNECT_ATTEMPTS: u32 = 5;
43const UPLOAD_RETRIES: u32 = 2;
47const UPLOAD_RETRY_PAUSE: Duration = Duration::from_secs(1);
49const READ_IDLE_TIMEOUT: Duration = Duration::from_secs(20);
54
55#[derive(Debug)]
58pub enum SessionOutcome {
59 Stopped,
61 Disconnected,
63 AuthFailed(String),
65 Fatal(String),
67}
68
69#[derive(Debug, Clone, Copy)]
71pub struct SessionSchedule {
72 pub heartbeat: Duration,
73 pub log_flush: Duration,
74 pub shutdown_tick: Duration,
75 pub base_backoff_ms: u64,
76 pub max_backoff_ms: u64,
77 pub read_idle_timeout: Duration,
79}
80
81impl Default for SessionSchedule {
82 fn default() -> Self {
83 Self {
84 heartbeat: HEARTBEAT_INTERVAL,
85 log_flush: LOG_FLUSH_INTERVAL,
86 shutdown_tick: SHUTDOWN_TICK,
87 base_backoff_ms: BASE_BACKOFF_MS,
88 max_backoff_ms: MAX_BACKOFF_MS,
89 read_idle_timeout: READ_IDLE_TIMEOUT,
90 }
91 }
92}
93
94impl SessionSchedule {
95 pub fn fast_for_tests() -> Self {
96 Self {
97 heartbeat: Duration::from_millis(5),
98 log_flush: Duration::from_millis(5),
99 shutdown_tick: Duration::from_millis(5),
100 base_backoff_ms: 1,
101 max_backoff_ms: 10,
102 read_idle_timeout: Duration::from_secs(5),
105 }
106 }
107}
108
109#[cfg_attr(coverage_nightly, coverage(off))]
117pub async fn spawn_ws_session(
118 cfg: SharedConfig,
119 stop: Arc<AtomicBool>,
120 logs: Arc<Mutex<Vec<LogEntry>>>,
121 busy: Arc<AtomicBool>,
122 paused: Arc<AtomicBool>,
123 observers: WorkerObservers,
124 schedule: SessionSchedule,
125) -> Result<()> {
126 let max_attempts = {
127 let guard = cfg.lock();
128 guard
129 .ws_reconnect_attempts
130 .unwrap_or(DEFAULT_RECONNECT_ATTEMPTS)
131 };
132
133 let mut attempt: u32 = 0;
134 let mut waiting_for_creds_logged = false;
135 loop {
136 if stop.load(Ordering::SeqCst) {
137 return Ok(());
138 }
139 if !has_credentials(&cfg) {
145 if !waiting_for_creds_logged {
146 push_log_with_observers(
147 &logs,
148 Some(&observers),
149 "info",
150 "ws",
151 "waiting for operator approval before opening the session",
152 None,
153 );
154 waiting_for_creds_logged = true;
155 }
156 wait_with_stop(Duration::from_secs(1), &stop, schedule.shutdown_tick).await;
157 continue;
158 }
159 waiting_for_creds_logged = false;
160
161 let welcomed = AtomicBool::new(false);
162 match run_one_session(
163 &cfg, &stop, &logs, &busy, &paused, &observers, schedule, &welcomed,
164 )
165 .await
166 {
167 Ok(SessionOutcome::Stopped) => return Ok(()),
168 Ok(SessionOutcome::AuthFailed(reason)) => {
169 push_log_with_observers(
170 &logs,
171 Some(&observers),
172 "error",
173 "ws",
174 &format!("auth failed: {reason}. Re-register the worker."),
175 None,
176 );
177 return Err(anyhow!("ws auth failed: {reason}"));
178 }
179 Ok(SessionOutcome::Fatal(reason)) => {
180 push_log_with_observers(
181 &logs,
182 Some(&observers),
183 "error",
184 "ws",
185 &format!("fatal: {reason}"),
186 None,
187 );
188 return Err(anyhow!("ws fatal: {reason}"));
189 }
190 outcome @ (Ok(SessionOutcome::Disconnected) | Err(_)) => {
191 if welcomed.load(Ordering::SeqCst) {
195 attempt = 0;
196 }
197 attempt += 1;
198 if max_attempts > 0 && attempt > max_attempts {
199 push_log_with_observers(
200 &logs,
201 Some(&observers),
202 "error",
203 "ws",
204 &format!("giving up after {attempt} reconnect attempts"),
205 None,
206 );
207 return Err(anyhow!("ws reconnect cap reached"));
208 }
209 let backoff = backoff_for(attempt, schedule);
210 push_log_with_observers(
211 &logs,
212 Some(&observers),
213 "warn",
214 "ws",
215 &reconnect_breadcrumb(outcome.as_ref().err(), attempt, backoff),
216 None,
217 );
218 wait_with_stop(backoff, &stop, schedule.shutdown_tick).await;
219 }
220 }
221 }
222}
223
224enum WelcomeOutcome {
229 Welcomed,
230 AuthFailed(String),
231 Fatal(String),
232 Disconnected,
233}
234
235#[cfg_attr(coverage_nightly, coverage(off))]
241async fn wait_for_welcome(
242 event_rx: &mut mpsc::UnboundedReceiver<SessionEvent>,
243 logs: &Arc<Mutex<Vec<LogEntry>>>,
244 observers: &WorkerObservers,
245) -> WelcomeOutcome {
246 while let Some(event) = event_rx.recv().await {
247 match event {
248 SessionEvent::Frame(WorkerOutbound::Welcome {
249 worker_id: wid,
250 server_time,
251 }) => {
252 push_log_with_observers(
253 logs,
254 Some(observers),
255 "info",
256 "ws",
257 &welcome_breadcrumb(&wid, &server_time),
258 None,
259 );
260 return WelcomeOutcome::Welcomed;
261 }
262 SessionEvent::Frame(WorkerOutbound::Error { code, message }) => {
263 push_log_with_observers(
264 logs,
265 Some(observers),
266 "error",
267 "ws",
268 &format!("server error before welcome {code:?}: {message}"),
269 None,
270 );
271 return match code {
272 crate::ws::types::WorkerErrorCode::AuthFailed => {
273 WelcomeOutcome::AuthFailed(message)
274 }
275 _ => WelcomeOutcome::Fatal(message),
276 };
277 }
278 SessionEvent::Frame(other) => {
279 push_log_with_observers(
280 logs,
281 Some(observers),
282 "warn",
283 "ws",
284 &format!("server sent unexpected frame before welcome: {other:?}"),
285 None,
286 );
287 }
289 SessionEvent::Disconnected(WsClientError::AuthFailed { reason }) => {
290 return WelcomeOutcome::AuthFailed(reason);
291 }
292 SessionEvent::Disconnected(_) => return WelcomeOutcome::Disconnected,
293 SessionEvent::Stopped => return WelcomeOutcome::Disconnected,
294 }
295 }
296 WelcomeOutcome::Disconnected
297}
298
299fn has_credentials(cfg: &SharedConfig) -> bool {
303 let guard = cfg.lock();
304 guard
305 .worker_id
306 .as_deref()
307 .map(|s| !s.is_empty())
308 .unwrap_or(false)
309 && guard
310 .auth_token
311 .as_deref()
312 .map(|s| !s.is_empty())
313 .unwrap_or(false)
314}
315
316#[cfg_attr(coverage_nightly, coverage(off))]
319#[allow(clippy::too_many_arguments)]
322async fn run_one_session(
323 cfg: &SharedConfig,
324 stop: &Arc<AtomicBool>,
325 logs: &Arc<Mutex<Vec<LogEntry>>>,
326 busy: &Arc<AtomicBool>,
327 paused: &Arc<AtomicBool>,
328 observers: &WorkerObservers,
329 schedule: SessionSchedule,
330 welcomed: &AtomicBool,
331) -> Result<SessionOutcome> {
332 let (api_base_url, worker_id, auth_token) = {
333 let guard = cfg.lock();
334 (
335 guard.api_base_url.clone(),
336 guard.worker_id.clone().unwrap_or_default(),
337 guard.auth_token.clone().unwrap_or_default(),
338 )
339 };
340 if worker_id.is_empty() || auth_token.is_empty() {
341 return Ok(SessionOutcome::Fatal(
342 "worker_id or auth_token missing; run register".to_string(),
343 ));
344 }
345
346 push_log_with_observers(
347 logs,
348 Some(observers),
349 "info",
350 "ws",
351 &format!("connecting to {api_base_url}"),
352 None,
353 );
354 let client = match connect(&api_base_url, &worker_id, &auth_token).await {
355 Ok(c) => c,
356 Err(WsClientError::AuthFailed { reason }) => {
357 return Ok(SessionOutcome::AuthFailed(reason));
358 }
359 Err(e) => {
360 push_log_with_observers(
361 logs,
362 Some(observers),
363 "warn",
364 "ws",
365 &format!("connect failed: {e}"),
366 None,
367 );
368 return Ok(SessionOutcome::Disconnected);
369 }
370 };
371 let (sender, receiver) = client.split();
372
373 let engine = crate::engine::build(&cfg.lock())?;
375 let capabilities = crate::runtime::build_capabilities_with(
376 &cfg.lock(),
377 &*engine,
378 !paused.load(Ordering::SeqCst),
379 );
380 push_log_with_observers(
385 logs,
386 Some(observers),
387 "info",
388 "ws",
389 &crate::runtime::summarize_capabilities(&capabilities),
390 None,
391 );
392 if let Some(warning) = crate::runtime::vram_threshold_warning(&capabilities) {
397 push_log_with_observers(logs, Some(observers), "warn", "ws", &warning, None);
398 }
399 sender
400 .send(&WorkerInbound::Hello(HelloFrame {
401 auth_token: auth_token.clone(),
402 capabilities: capabilities.clone(),
403 }))
404 .await
405 .map_err(|e| anyhow!("hello send failed: {e}"))?;
406 info!(target: TRACE_TARGET, worker_id = %worker_id, "hello sent");
407
408 let (event_tx, event_rx) = mpsc::unbounded_channel::<SessionEvent>();
409
410 let reader = spawn_reader(receiver, event_tx.clone(), schedule.read_idle_timeout);
412
413 let mut event_rx = event_rx;
422 match wait_for_welcome(&mut event_rx, logs, observers).await {
423 WelcomeOutcome::Welcomed => welcomed.store(true, Ordering::SeqCst),
424 WelcomeOutcome::AuthFailed(reason) => {
425 let _ = sender.close(1000, "auth failed").await;
426 let _ = reader.await;
427 return Ok(SessionOutcome::AuthFailed(reason));
428 }
429 WelcomeOutcome::Fatal(reason) => {
430 let _ = sender.close(1000, "protocol violation").await;
431 let _ = reader.await;
432 return Ok(SessionOutcome::Fatal(reason));
433 }
434 WelcomeOutcome::Disconnected => {
435 let _ = reader.await;
436 return Ok(SessionOutcome::Disconnected);
437 }
438 }
439
440 let engine_arc: Arc<dyn Engine> = engine.into();
447 let heartbeat = spawn_heartbeat_pump(
448 cfg.clone(),
449 engine_arc.clone(),
450 sender.clone(),
451 stop.clone(),
452 paused.clone(),
453 logs.clone(),
454 observers.clone(),
455 schedule,
456 );
457
458 let log_shipper = spawn_log_shipper_pump(sender.clone(), logs.clone(), stop.clone(), schedule);
460
461 let shutdown_observer = spawn_shutdown_observer(stop.clone(), event_tx.clone(), schedule);
463 drop(event_tx);
464
465 let ctx = SessionContext {
466 sender: sender.clone(),
467 engine: engine_arc,
468 logs: logs.clone(),
469 busy: busy.clone(),
470 paused: paused.clone(),
471 observers: observers.clone(),
472 api_base_url: api_base_url.clone(),
473 worker_id: worker_id.clone(),
474 auth_token: auth_token.clone(),
475 };
476 let outcome = run_dispatch_loop(ctx, event_rx).await;
477
478 reader.abort();
485 heartbeat.abort();
486 log_shipper.abort();
487 shutdown_observer.abort();
488 let _ = sender.close(1000, "session ended").await;
489 let _ = reader.await;
490 let _ = heartbeat.await;
491 let _ = log_shipper.await;
492 let _ = shutdown_observer.await;
493 Ok(outcome)
494}
495
496#[derive(Debug)]
498enum SessionEvent {
499 Frame(WorkerOutbound),
501 Stopped,
503 Disconnected(WsClientError),
505}
506
507#[derive(Clone)]
512struct SessionContext {
513 sender: WsSender,
514 engine: Arc<dyn Engine>,
515 logs: Arc<Mutex<Vec<LogEntry>>>,
516 busy: Arc<AtomicBool>,
517 paused: Arc<AtomicBool>,
518 observers: WorkerObservers,
519 api_base_url: String,
520 worker_id: String,
521 auth_token: String,
522}
523
524#[cfg_attr(coverage_nightly, coverage(off))]
525async fn run_dispatch_loop(
526 ctx: SessionContext,
527 mut event_rx: mpsc::UnboundedReceiver<SessionEvent>,
528) -> SessionOutcome {
529 while let Some(event) = event_rx.recv().await {
530 match event {
531 SessionEvent::Disconnected(WsClientError::AuthFailed { reason }) => {
532 return SessionOutcome::AuthFailed(reason);
533 }
534 SessionEvent::Disconnected(_) => return SessionOutcome::Disconnected,
535 SessionEvent::Stopped => return SessionOutcome::Stopped,
536 SessionEvent::Frame(frame) => match frame {
537 WorkerOutbound::Welcome {
538 worker_id: wid,
539 server_time,
540 } => {
541 push_log_with_observers(
542 &ctx.logs,
543 Some(&ctx.observers),
544 "info",
545 "ws",
546 &welcome_breadcrumb(&wid, &server_time),
547 None,
548 );
549 }
550 WorkerOutbound::Offer { claim } => {
551 handle_offer(&ctx, *claim);
552 }
553 WorkerOutbound::Error { code, message } => {
554 push_log_with_observers(
555 &ctx.logs,
556 Some(&ctx.observers),
557 "error",
558 "ws",
559 &format!("server error {code:?}: {message}"),
560 None,
561 );
562 return match code {
563 crate::ws::types::WorkerErrorCode::AuthFailed => {
564 SessionOutcome::AuthFailed(message)
565 }
566 _ => SessionOutcome::Fatal(message),
567 };
568 }
569 WorkerOutbound::CompleteAck { job_id } => {
570 push_log_with_observers(
571 &ctx.logs,
572 Some(&ctx.observers),
573 "info",
574 "ws",
575 &result_ack_breadcrumb("completion", &job_id),
576 Some(job_id),
577 );
578 }
579 WorkerOutbound::FailAck { job_id } => {
580 push_log_with_observers(
581 &ctx.logs,
582 Some(&ctx.observers),
583 "info",
584 "ws",
585 &result_ack_breadcrumb("failure", &job_id),
586 Some(job_id),
587 );
588 }
589 WorkerOutbound::HeartbeatAck => {
590 }
595 },
596 }
597 }
598 SessionOutcome::Disconnected
599}
600
601#[cfg_attr(coverage_nightly, coverage(off))]
602fn handle_offer(ctx: &SessionContext, claim: JobOfferClaim) {
603 let job_id = claim.job_id.clone();
604 push_log_with_observers(
605 &ctx.logs,
606 Some(&ctx.observers),
607 "info",
608 "ws",
609 &offer_received_breadcrumb(
610 &job_id,
611 &claim.game_id,
612 &claim.asset_name,
613 &claim.model,
614 claim.vram_gb_estimate,
615 ),
616 Some(job_id.clone()),
617 );
618 if ctx.paused.load(Ordering::SeqCst) {
622 push_log_with_observers(
623 &ctx.logs,
624 Some(&ctx.observers),
625 "info",
626 "ws",
627 &format!("rejecting offer {job_id}: worker is paused"),
628 Some(job_id.clone()),
629 );
630 spawn_reject_offer(
631 ctx.sender.clone(),
632 ctx.logs.clone(),
633 ctx.observers.clone(),
634 job_id,
635 "worker paused by operator",
636 crate::ws::types::RejectCode::Paused,
637 );
638 return;
639 }
640 if !try_reserve_worker(&ctx.busy) {
641 push_log_with_observers(
642 &ctx.logs,
643 Some(&ctx.observers),
644 "info",
645 "ws",
646 &format!("rejecting offer {job_id}: worker is already busy"),
647 Some(job_id.clone()),
648 );
649 spawn_reject_offer(
650 ctx.sender.clone(),
651 ctx.logs.clone(),
652 ctx.observers.clone(),
653 job_id,
654 "worker already has an in-flight job",
655 crate::ws::types::RejectCode::Busy,
656 );
657 return;
658 }
659 let job = claim.into_job_claim();
660 let task_kind = job.task.kind();
661 let full_prompt = prompt_for(&job.task);
669 let prompt_preview = truncate_prompt(&full_prompt);
670 let started_at = chrono::Utc::now();
671
672 let ctx = ctx.clone();
673 tokio::spawn(async move {
674 let accept_result = ctx
675 .sender
676 .send(&WorkerInbound::Accept {
677 job_id: job_id.clone(),
678 })
679 .await;
680 if let Some((level, message)) = offer_response_breadcrumb("accept", &job_id, &accept_result)
681 {
682 push_log_with_observers(
683 &ctx.logs,
684 Some(&ctx.observers),
685 level,
686 "ws",
687 &message,
688 Some(job_id.clone()),
689 );
690 }
691 if accept_result.is_err() {
692 ctx.busy.store(false, Ordering::SeqCst);
693 return;
694 }
695
696 *ctx.observers.current_job.lock() = Some(CurrentJob {
698 job_id: job_id.clone(),
699 kind: task_kind,
700 model: job.model.clone(),
701 prompt: prompt_preview.clone(),
702 started_at,
703 });
704
705 run_offered_job(
706 &ctx,
707 job,
708 started_at,
709 task_kind,
710 full_prompt,
711 prompt_preview,
712 )
713 .await;
714 ctx.busy.store(false, Ordering::SeqCst);
715 });
716}
717
718fn try_reserve_worker(busy: &AtomicBool) -> bool {
719 busy.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
720 .is_ok()
721}
722
723fn spawn_reject_offer(
724 sender: WsSender,
725 logs: Arc<Mutex<Vec<LogEntry>>>,
726 observers: WorkerObservers,
727 job_id: String,
728 reason: &'static str,
729 code: crate::ws::types::RejectCode,
730) {
731 tokio::spawn(async move {
732 let result = sender
733 .send(&WorkerInbound::Reject {
734 job_id: job_id.clone(),
735 reason: reason.to_string(),
736 code: Some(code),
737 })
738 .await;
739 if let Some((level, message)) = offer_response_breadcrumb("reject", &job_id, &result) {
740 push_log_with_observers(&logs, Some(&observers), level, "ws", &message, Some(job_id));
741 }
742 });
743}
744
745#[cfg_attr(coverage_nightly, coverage(off))]
746async fn run_offered_job(
747 ctx: &SessionContext,
748 job: crate::types::JobClaim,
749 started_at: chrono::DateTime<chrono::Utc>,
750 task_kind: crate::types::TaskKind,
751 full_prompt: String,
752 prompt_preview: String,
753) {
754 let start = std::time::Instant::now();
755 let dispatch = tokio::task::spawn_blocking({
760 let model = job.model.clone();
761 let model_source = job.model_source.clone();
762 let task_for_engine = job.task.clone();
763 let engine = ctx.engine.clone();
764 move || -> Result<TaskResult> {
765 engine.dispatch_with_source(&model, task_for_engine, &model_source)
766 }
767 })
768 .await;
769
770 let job_id = job.job_id.clone();
771 let outcome = match dispatch {
775 Ok(Ok(result)) => {
776 push_log_with_observers(
777 &ctx.logs,
778 Some(&ctx.observers),
779 "info",
780 "ws",
781 &format!("{} dispatched in {:?}", task_kind.as_str(), start.elapsed()),
782 Some(job_id.clone()),
783 );
784 deliver_result(ctx, &job_id, result, &full_prompt).await
785 }
786 Ok(Err(e)) => {
787 warn!(target: TRACE_TARGET, error = %e, "engine dispatch failed");
788 push_log_with_observers(
789 &ctx.logs,
790 Some(&ctx.observers),
791 "error",
792 "ws",
793 &format!("dispatch failed: {e}"),
794 Some(job_id.clone()),
795 );
796 let fail_result = ctx
797 .sender
798 .send(&WorkerInbound::Fail {
799 job_id: job_id.clone(),
800 error: e.to_string(),
801 retryable: !is_unsupported_kind(&e),
802 })
803 .await;
804 record_fail_send(&fail_result, &job_id, &ctx.logs, &ctx.observers);
805 JobOutcome::Failed {
806 reason: e.to_string(),
807 }
808 }
809 Err(e) => {
810 push_log_with_observers(
811 &ctx.logs,
812 Some(&ctx.observers),
813 "error",
814 "ws",
815 &format!("dispatch task panic: {e}"),
816 Some(job_id.clone()),
817 );
818 let fail_result = ctx
819 .sender
820 .send(&WorkerInbound::Fail {
821 job_id: job_id.clone(),
822 error: e.to_string(),
823 retryable: true,
824 })
825 .await;
826 record_fail_send(&fail_result, &job_id, &ctx.logs, &ctx.observers);
827 JobOutcome::Failed {
828 reason: e.to_string(),
829 }
830 }
831 };
832
833 *ctx.observers.current_job.lock() = None;
836 record_recent_job(
837 &ctx.observers,
838 RecentJob {
839 job_id: job_id.clone(),
840 kind: task_kind,
841 model: job.model.clone(),
842 prompt: prompt_preview,
843 outcome,
844 started_at,
845 finished_at: chrono::Utc::now(),
846 },
847 );
848}
849
850#[cfg_attr(coverage_nightly, coverage(off))]
855async fn deliver_result(
856 ctx: &SessionContext,
857 job_id: &str,
858 result: TaskResult,
859 full_prompt: &str,
860) -> JobOutcome {
861 match result {
862 TaskResult::Image { bytes, ext }
863 | TaskResult::AudioTts { bytes, ext }
864 | TaskResult::Video { bytes, ext } => {
865 let upload_result = tokio::task::spawn_blocking({
866 let api_base_url = ctx.api_base_url.clone();
867 let job_id = job_id.to_string();
868 let auth_token = ctx.auth_token.clone();
869 let worker_id = ctx.worker_id.clone();
870 let prompt = full_prompt.to_string();
871 move || -> Result<()> {
872 let api = ApiClient::new(api_base_url)?;
873 api.complete_with_retry(
874 &worker_id,
875 &auth_token,
876 &job_id,
877 &ext,
878 &prompt,
879 bytes,
880 UPLOAD_RETRIES,
881 UPLOAD_RETRY_PAUSE,
882 )
883 }
884 })
885 .await;
886 let msg = match upload_result {
887 Ok(Ok(())) => None,
888 Ok(Err(e)) => Some(e.to_string()),
889 Err(e) => Some(format!("upload task panic: {e}")),
890 };
891 match msg {
892 Some(msg) => {
893 push_log_with_observers(
894 &ctx.logs,
895 Some(&ctx.observers),
896 "error",
897 "ws",
898 &msg,
899 Some(job_id.to_string()),
900 );
901 let fail_result = ctx
902 .sender
903 .send(&WorkerInbound::Fail {
904 job_id: job_id.to_string(),
905 error: msg.clone(),
906 retryable: true,
907 })
908 .await;
909 record_fail_send(&fail_result, job_id, &ctx.logs, &ctx.observers);
910 JobOutcome::Failed { reason: msg }
911 }
912 None => {
913 push_log_with_observers(
914 &ctx.logs,
915 Some(&ctx.observers),
916 "info",
917 "ws",
918 "binary upload ok",
919 Some(job_id.to_string()),
920 );
921 JobOutcome::Completed
936 }
937 }
938 }
939 TaskResult::Llm { json } | TaskResult::AudioStt { json } => {
940 match ctx
946 .sender
947 .send(&WorkerInbound::CompleteJson {
948 job_id: job_id.to_string(),
949 result: json,
950 prompt: Some(full_prompt.to_string()),
951 })
952 .await
953 {
954 Ok(()) => {
955 push_log_with_observers(
956 &ctx.logs,
957 Some(&ctx.observers),
958 "info",
959 "ws",
960 "json result sent",
961 Some(job_id.to_string()),
962 );
963 JobOutcome::Completed
964 }
965 Err(e) => {
966 let msg = format!("failed to send result: {e}");
967 push_log_with_observers(
968 &ctx.logs,
969 Some(&ctx.observers),
970 "error",
971 "ws",
972 &msg,
973 Some(job_id.to_string()),
974 );
975 JobOutcome::Failed { reason: msg }
976 }
977 }
978 }
979 }
980}
981
982#[cfg_attr(coverage_nightly, coverage(off))]
983fn spawn_reader(
984 mut receiver: crate::ws::client::WsReceiver,
985 event_tx: mpsc::UnboundedSender<SessionEvent>,
986 read_idle_timeout: Duration,
987) -> tokio::task::JoinHandle<()> {
988 tokio::spawn(async move {
989 loop {
990 match tokio::time::timeout(read_idle_timeout, receiver.recv()).await {
994 Ok(Ok(Some(frame))) => {
995 if event_tx.send(SessionEvent::Frame(frame)).is_err() {
996 break;
997 }
998 }
999 Ok(Ok(None)) => {
1000 let _ =
1001 event_tx.send(SessionEvent::Disconnected(WsClientError::ConnectionClosed));
1002 break;
1003 }
1004 Ok(Err(e)) => {
1005 let _ = event_tx.send(SessionEvent::Disconnected(e));
1006 break;
1007 }
1008 Err(_elapsed) => {
1009 let _ = event_tx.send(SessionEvent::Disconnected(WsClientError::Transport(
1010 format!(
1011 "no frames from server for {:?}; treating connection as dead",
1012 read_idle_timeout
1013 ),
1014 )));
1015 break;
1016 }
1017 }
1018 }
1019 })
1020}
1021
1022#[cfg_attr(coverage_nightly, coverage(off))]
1023#[allow(clippy::too_many_arguments)]
1026fn spawn_heartbeat_pump(
1027 cfg: SharedConfig,
1028 engine: Arc<dyn Engine>,
1029 sender: WsSender,
1030 stop: Arc<AtomicBool>,
1031 paused: Arc<AtomicBool>,
1032 logs: Arc<Mutex<Vec<LogEntry>>>,
1033 observers: WorkerObservers,
1034 schedule: SessionSchedule,
1035) -> tokio::task::JoinHandle<()> {
1036 tokio::spawn(async move {
1037 let mut interval = tokio::time::interval(schedule.heartbeat);
1038 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1039 let mut last_paused = paused.load(Ordering::SeqCst);
1043 loop {
1044 interval.tick().await;
1045 if stop.load(Ordering::SeqCst) {
1046 break;
1047 }
1048 let now_paused = paused.load(Ordering::SeqCst);
1053 if let Some(message) = pause_transition_breadcrumb(last_paused, now_paused) {
1054 push_log_with_observers(&logs, Some(&observers), "info", "ws", message, None);
1055 }
1056 last_paused = now_paused;
1057 let caps = crate::runtime::build_capabilities_with(&cfg.lock(), &*engine, !now_paused);
1061 let current_job_id = heartbeat_current_job_id(&observers);
1062 if let Err(e) = sender
1063 .send(&WorkerInbound::Heartbeat {
1064 capabilities: caps,
1065 current_job_id,
1066 })
1067 .await
1068 {
1069 warn!(target: TRACE_TARGET, error = %e, "heartbeat send failed");
1070 break;
1071 }
1072 }
1073 })
1074}
1075
1076fn heartbeat_current_job_id(observers: &WorkerObservers) -> Option<String> {
1077 observers
1078 .current_job
1079 .lock()
1080 .as_ref()
1081 .map(|job| job.job_id.clone())
1082}
1083
1084#[cfg_attr(coverage_nightly, coverage(off))]
1085fn spawn_log_shipper_pump(
1086 sender: WsSender,
1087 logs: Arc<Mutex<Vec<LogEntry>>>,
1088 stop: Arc<AtomicBool>,
1089 schedule: SessionSchedule,
1090) -> tokio::task::JoinHandle<()> {
1091 tokio::spawn(async move {
1092 let mut interval = tokio::time::interval(schedule.log_flush);
1093 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
1094 loop {
1095 interval.tick().await;
1096 if stop.load(Ordering::SeqCst) {
1097 break;
1098 }
1099 let batch = {
1100 let mut guard = logs.lock();
1101 if guard.is_empty() {
1102 continue;
1103 }
1104 std::mem::take(&mut *guard)
1105 };
1106 let frame = WorkerInbound::LogBatch { entries: batch };
1107 if let Err(e) = sender.send(&frame).await {
1108 warn!(target: TRACE_TARGET, error = %e, "log batch send failed; requeueing batch");
1109 if let WorkerInbound::LogBatch { entries } = frame {
1112 crate::runtime::restore_unshipped(&logs, entries);
1113 }
1114 break;
1115 }
1116 }
1117 })
1118}
1119
1120#[cfg_attr(coverage_nightly, coverage(off))]
1121fn spawn_shutdown_observer(
1122 stop: Arc<AtomicBool>,
1123 event_tx: mpsc::UnboundedSender<SessionEvent>,
1124 schedule: SessionSchedule,
1125) -> tokio::task::JoinHandle<()> {
1126 tokio::spawn(async move {
1127 loop {
1128 tokio::time::sleep(schedule.shutdown_tick).await;
1129 if stop.load(Ordering::SeqCst) {
1130 let _ = event_tx.send(SessionEvent::Stopped);
1131 break;
1132 }
1133 if event_tx.is_closed() {
1134 break;
1135 }
1136 }
1137 })
1138}
1139
1140fn backoff_for(attempt: u32, schedule: SessionSchedule) -> Duration {
1141 let factor = 2u64.saturating_pow(attempt.saturating_sub(1));
1142 let raw_ms = schedule.base_backoff_ms.saturating_mul(factor);
1143 Duration::from_millis(raw_ms.min(schedule.max_backoff_ms))
1144}
1145
1146fn reconnect_breadcrumb(error: Option<&anyhow::Error>, attempt: u32, backoff: Duration) -> String {
1161 let in_ms = backoff.as_millis();
1162 match error {
1163 Some(e) => format!("session error: {e:#}; reconnect attempt {attempt} in {in_ms}ms"),
1164 None => format!("disconnected; reconnect attempt {attempt} in {in_ms}ms"),
1165 }
1166}
1167
1168fn welcome_breadcrumb(worker_id: &str, server_time: &str) -> String {
1180 format!("server welcomed {worker_id} server_time={server_time}")
1181}
1182
1183fn offer_received_breadcrumb(
1194 job_id: &str,
1195 game_id: &str,
1196 asset_name: &str,
1197 model: &str,
1198 vram_gb_estimate: f32,
1199) -> String {
1200 format!(
1201 "offer received {job_id} game={game_id} asset={asset_name} model={model} vram={vram_gb_estimate}"
1202 )
1203}
1204
1205fn result_ack_breadcrumb(outcome: &str, job_id: &str) -> String {
1222 format!("studio confirmed {outcome} of job {job_id}")
1223}
1224
1225fn offer_response_breadcrumb(
1240 label: &str,
1241 job_id: &str,
1242 result: &WsResult<()>,
1243) -> Option<(&'static str, String)> {
1244 match result {
1245 Ok(()) => None,
1246 Err(e) => Some((
1247 "error",
1248 format!("{label} send failed for offer {job_id}: {e}"),
1249 )),
1250 }
1251}
1252
1253fn fail_send_breadcrumb(job_id: &str, result: &WsResult<()>) -> Option<(&'static str, String)> {
1268 match result {
1269 Ok(()) => None,
1270 Err(e) => Some((
1271 "error",
1272 format!("failed to notify studio of job {job_id} failure: {e}"),
1273 )),
1274 }
1275}
1276
1277fn record_fail_send(
1285 result: &WsResult<()>,
1286 job_id: &str,
1287 logs: &Arc<Mutex<Vec<LogEntry>>>,
1288 observers: &WorkerObservers,
1289) {
1290 if let Some((level, message)) = fail_send_breadcrumb(job_id, result) {
1291 push_log_with_observers(
1292 logs,
1293 Some(observers),
1294 level,
1295 "ws",
1296 &message,
1297 Some(job_id.to_string()),
1298 );
1299 }
1300}
1301
1302fn pause_transition_breadcrumb(prev: bool, now: bool) -> Option<&'static str> {
1316 match (prev, now) {
1317 (false, true) => Some("claiming paused by operator; new offers are rejected until resumed"),
1318 (true, false) => Some("claiming resumed by operator; accepting new offers again"),
1319 _ => None,
1320 }
1321}
1322
1323#[cfg(test)]
1324mod tests {
1325 use super::*;
1326
1327 #[test]
1328 fn offer_response_breadcrumb_is_silent_on_success() {
1329 assert!(offer_response_breadcrumb("accept", "j-1", &Ok(())).is_none());
1333 assert!(offer_response_breadcrumb("reject", "j-2", &Ok(())).is_none());
1334 }
1335
1336 #[test]
1337 fn try_reserve_worker_only_allows_one_in_flight_job() {
1338 let busy = AtomicBool::new(false);
1339 assert!(try_reserve_worker(&busy));
1340 assert!(!try_reserve_worker(&busy));
1341 }
1342
1343 #[test]
1344 fn heartbeat_current_job_id_uses_actual_job_id() {
1345 let observers = WorkerObservers::default();
1346 assert_eq!(heartbeat_current_job_id(&observers), None);
1347 *observers.current_job.lock() = Some(CurrentJob {
1348 job_id: "job-42".into(),
1349 kind: crate::types::TaskKind::Image,
1350 model: "synthetic".into(),
1351 prompt: "prompt".into(),
1352 started_at: chrono::Utc::now(),
1353 });
1354 assert_eq!(
1355 heartbeat_current_job_id(&observers).as_deref(),
1356 Some("job-42")
1357 );
1358 }
1359
1360 #[test]
1361 fn offer_response_breadcrumb_reports_accept_send_failure() {
1362 let (level, msg) =
1363 offer_response_breadcrumb("accept", "j-1", &Err(WsClientError::ConnectionClosed))
1364 .expect("a failed accept send must surface a breadcrumb");
1365 assert_eq!(level, "error");
1366 assert!(msg.contains("accept send failed"), "got: {msg}");
1367 assert!(msg.contains("j-1"), "must name the job: {msg}");
1368 assert!(
1369 msg.contains("connection closed"),
1370 "must carry the cause: {msg}"
1371 );
1372 }
1373
1374 #[test]
1375 fn offer_response_breadcrumb_reports_reject_send_failure() {
1376 let (level, msg) = offer_response_breadcrumb(
1377 "reject",
1378 "j-9",
1379 &Err(WsClientError::Transport("sink gone".into())),
1380 )
1381 .expect("a failed reject send must surface a breadcrumb");
1382 assert_eq!(level, "error");
1383 assert!(msg.contains("reject send failed"), "got: {msg}");
1384 assert!(msg.contains("j-9"), "must name the job: {msg}");
1385 assert!(msg.contains("sink gone"), "must carry the cause: {msg}");
1386 }
1387
1388 #[test]
1389 fn fail_send_breadcrumb_is_silent_on_success() {
1390 assert!(fail_send_breadcrumb("j-1", &Ok(())).is_none());
1394 }
1395
1396 #[test]
1397 fn fail_send_breadcrumb_reports_send_failure() {
1398 let (level, msg) = fail_send_breadcrumb("j-7", &Err(WsClientError::ConnectionClosed))
1399 .expect("a dropped Fail send must surface a breadcrumb");
1400 assert_eq!(level, "error");
1401 assert!(msg.contains("j-7"), "must name the job: {msg}");
1402 assert!(
1403 msg.contains("connection closed"),
1404 "must carry the cause: {msg}"
1405 );
1406 }
1407
1408 #[test]
1409 fn fail_send_breadcrumb_carries_transport_cause() {
1410 let (level, msg) =
1411 fail_send_breadcrumb("j-3", &Err(WsClientError::Transport("sink gone".into())))
1412 .expect("a dropped Fail send must surface a breadcrumb");
1413 assert_eq!(level, "error");
1414 assert!(msg.contains("j-3"), "must name the job: {msg}");
1415 assert!(msg.contains("sink gone"), "must carry the cause: {msg}");
1416 }
1417
1418 #[test]
1419 fn backoff_grows_exponentially_until_cap() {
1420 let schedule = SessionSchedule {
1421 base_backoff_ms: 100,
1422 max_backoff_ms: 1_000,
1423 heartbeat: Duration::from_secs(1),
1424 log_flush: Duration::from_secs(1),
1425 shutdown_tick: Duration::from_secs(1),
1426 read_idle_timeout: Duration::from_secs(1),
1427 };
1428 assert_eq!(backoff_for(1, schedule), Duration::from_millis(100));
1429 assert_eq!(backoff_for(2, schedule), Duration::from_millis(200));
1430 assert_eq!(backoff_for(3, schedule), Duration::from_millis(400));
1431 assert_eq!(backoff_for(4, schedule), Duration::from_millis(800));
1432 assert_eq!(backoff_for(5, schedule), Duration::from_millis(1_000));
1434 assert_eq!(backoff_for(10, schedule), Duration::from_millis(1_000));
1435 }
1436
1437 #[test]
1438 fn reconnect_breadcrumb_keeps_legacy_wording_for_a_plain_disconnect() {
1439 let msg = reconnect_breadcrumb(None, 3, Duration::from_millis(800));
1443 assert_eq!(msg, "disconnected; reconnect attempt 3 in 800ms");
1444 }
1445
1446 #[test]
1447 fn reconnect_breadcrumb_surfaces_the_underlying_error() {
1448 let err = anyhow!("hello send failed: connection closed");
1453 let msg = reconnect_breadcrumb(Some(&err), 2, Duration::from_millis(400));
1454 assert!(
1455 msg.contains("reconnect attempt 2 in 400ms"),
1456 "must still name attempt + backoff: {msg}"
1457 );
1458 assert!(
1459 msg.contains("hello send failed: connection closed"),
1460 "must carry the cause: {msg}"
1461 );
1462 }
1463
1464 #[test]
1465 fn reconnect_breadcrumb_includes_the_full_error_chain() {
1466 let err = anyhow!("driver missing").context("engine build failed");
1469 let msg = reconnect_breadcrumb(Some(&err), 1, Duration::from_millis(100));
1470 assert!(msg.contains("engine build failed"), "got: {msg}");
1471 assert!(
1472 msg.contains("driver missing"),
1473 "must include the root cause: {msg}"
1474 );
1475 }
1476
1477 #[test]
1478 fn has_credentials_false_when_either_missing() {
1479 let mut cfg = crate::config::Config::default();
1480 let shared = crate::config::shared(cfg.clone());
1481 assert!(!has_credentials(&shared), "both missing");
1482 cfg.worker_id = Some("w-1".into());
1483 let shared = crate::config::shared(cfg.clone());
1484 assert!(!has_credentials(&shared), "only worker_id");
1485 cfg.worker_id = None;
1486 cfg.auth_token = Some("tok".into());
1487 let shared = crate::config::shared(cfg.clone());
1488 assert!(!has_credentials(&shared), "only auth_token");
1489 }
1490
1491 #[test]
1492 fn has_credentials_true_when_both_present() {
1493 let cfg = crate::config::Config {
1494 worker_id: Some("w-1".into()),
1495 auth_token: Some("tok".into()),
1496 ..crate::config::Config::default()
1497 };
1498 let shared = crate::config::shared(cfg);
1499 assert!(has_credentials(&shared));
1500 }
1501
1502 #[test]
1503 fn has_credentials_false_when_empty_strings() {
1504 let cfg = crate::config::Config {
1505 worker_id: Some("".into()),
1506 auth_token: Some("".into()),
1507 ..crate::config::Config::default()
1508 };
1509 let shared = crate::config::shared(cfg);
1510 assert!(!has_credentials(&shared));
1511 }
1512
1513 #[test]
1514 fn pause_transition_breadcrumb_is_silent_when_unchanged() {
1515 assert!(pause_transition_breadcrumb(false, false).is_none());
1518 assert!(pause_transition_breadcrumb(true, true).is_none());
1519 }
1520
1521 #[test]
1522 fn pause_transition_breadcrumb_reports_pause_and_resume() {
1523 let paused = pause_transition_breadcrumb(false, true).expect("a pause must be reported");
1527 assert!(
1528 paused.contains("paused by operator"),
1529 "expected a pause message, got: {paused}"
1530 );
1531 let resumed = pause_transition_breadcrumb(true, false).expect("a resume must be reported");
1532 assert!(
1533 resumed.contains("resumed by operator"),
1534 "expected a resume message, got: {resumed}"
1535 );
1536 }
1537
1538 #[test]
1539 fn welcome_breadcrumb_surfaces_server_time() {
1540 let line = welcome_breadcrumb("worker-7", "2026-06-15T21:00:00Z");
1547 assert!(
1548 line.contains("server welcomed worker-7"),
1549 "expected the legacy wording + worker id, got: {line}"
1550 );
1551 assert!(
1552 line.contains("server_time=2026-06-15T21:00:00Z"),
1553 "expected the server time, got: {line}"
1554 );
1555 }
1556
1557 #[test]
1558 fn offer_received_breadcrumb_names_game_and_asset() {
1559 let line = offer_received_breadcrumb(
1565 "j-1",
1566 "game-of-elements",
1567 "game-of-elements/creatures/aurora-fox",
1568 "sd-cpp:flux",
1569 12.5,
1570 );
1571 assert!(
1572 line.contains("offer received j-1"),
1573 "expected the job id, got: {line}"
1574 );
1575 assert!(
1576 line.contains("game=game-of-elements"),
1577 "expected the game id, got: {line}"
1578 );
1579 assert!(
1580 line.contains("asset=game-of-elements/creatures/aurora-fox"),
1581 "expected the asset name, got: {line}"
1582 );
1583 assert!(
1584 line.contains("model=sd-cpp:flux"),
1585 "expected the model, got: {line}"
1586 );
1587 assert!(line.contains("vram=12.5"), "expected the vram, got: {line}");
1588 }
1589
1590 #[test]
1591 fn result_ack_breadcrumb_names_the_outcome_and_job() {
1592 assert_eq!(
1602 result_ack_breadcrumb("completion", "j-1"),
1603 "studio confirmed completion of job j-1"
1604 );
1605 assert_eq!(
1606 result_ack_breadcrumb("failure", "j-2"),
1607 "studio confirmed failure of job j-2"
1608 );
1609 }
1610}