1use std::collections::VecDeque;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use tokio::sync::Mutex;
10use tokio::sync::broadcast;
11use tokio_util::sync::CancellationToken;
12
13const GRACEFUL_STOP_GRACE: std::time::Duration = std::time::Duration::from_secs(30);
17
18use crate::daemon::proto::Event;
19use crate::daemon::proto::JobId;
20use crate::daemon::proto::JobKind;
21use crate::daemon::proto::JobRecord;
22use crate::daemon::proto::LogLevel;
23use crate::daemon::proto::ProgressUpdate;
24use crate::daemon::proto::Request;
25use crate::daemon::proto::StatusSnapshot;
26
27pub struct QueuedJob {
31 pub job_id: JobId,
32 pub kind: JobKind,
33 pub request: Request,
34 pub priority: bool,
35 pub submitted_at: u64,
36 pub cancel: CancellationToken,
37 pub args_summary: String,
38}
39
40pub struct RunningJob {
41 pub record: JobRecord,
42 pub cancel: CancellationToken,
43}
44
45pub struct DaemonState {
46 pub queue: Mutex<VecDeque<QueuedJob>>,
47 pub active: Mutex<Option<RunningJob>>,
48 pub recent: Mutex<RingBuffer<JobRecord>>,
49 pub replay: Mutex<ReplayBuffer>,
55 pub events: broadcast::Sender<Event>,
56 pub next_job_id: AtomicU64,
57 pub accepting: CancellationToken,
61 pub shutdown: CancellationToken,
64 pub started_at: u64,
65 pub daemon_pid: u32,
66 pub account: Option<String>,
67 pub queue_notify: tokio::sync::Notify,
68}
69
70impl DaemonState {
71 pub fn new(account: Option<String>, daemon_pid: u32, started_at: u64) -> Arc<Self> {
72 let (events, _) = broadcast::channel(512);
77 Arc::new(Self {
78 queue: Mutex::new(VecDeque::new()),
79 active: Mutex::new(None),
80 recent: Mutex::new(RingBuffer::new(32)),
81 replay: Mutex::new(ReplayBuffer::new(32, 200)),
82 events,
83 next_job_id: AtomicU64::new(1),
84 accepting: CancellationToken::new(),
85 shutdown: CancellationToken::new(),
86 started_at,
87 daemon_pid,
88 account,
89 queue_notify: tokio::sync::Notify::new(),
90 })
91 }
92
93 pub fn allocate_job_id(&self) -> JobId {
94 JobId(self.next_job_id.fetch_add(1, Ordering::Relaxed))
95 }
96
97 pub async fn enqueue(&self, job: QueuedJob) -> u32 {
103 let mut q = self.queue.lock().await;
104 let position = if job.priority {
105 let boundary = q.iter().take_while(|j| j.priority).count();
106 q.insert(boundary, job);
107 boundary as u32
108 } else {
109 let pos = q.len() as u32;
110 q.push_back(job);
111 pos
112 };
113 self.queue_notify.notify_one();
114 let snap = self.snapshot_inner(&q, None).await;
115 let _ = self.events.send(Event::QueueChanged { snapshot: snap });
116 position
117 }
118
119 pub async fn toggle_priority(&self, job_id: JobId) -> Result<(), JobNotFound> {
120 let mut q = self.queue.lock().await;
121 let Some(idx) = q.iter().position(|j| j.job_id == job_id) else {
122 return Err(JobNotFound);
123 };
124 let mut job = q.remove(idx).expect("index just found");
125 job.priority = !job.priority;
126 if job.priority {
127 let boundary = q.iter().take_while(|j| j.priority).count();
128 q.insert(boundary, job);
129 } else {
130 q.push_back(job);
131 }
132 let snap = self.snapshot_inner(&q, None).await;
133 let _ = self.events.send(Event::QueueChanged { snapshot: snap });
134 Ok(())
135 }
136
137 pub async fn cancel(&self, job_id: JobId) -> Result<(), JobNotFound> {
138 let active_token = {
141 let guard = self.active.lock().await;
142 guard
143 .as_ref()
144 .and_then(|r| (r.record.job_id == job_id).then(|| r.cancel.clone()))
145 };
146 if let Some(token) = active_token {
147 token.cancel();
148 return Ok(());
149 }
150 let mut q = self.queue.lock().await;
152 let Some(idx) = q.iter().position(|j| j.job_id == job_id) else {
153 return Err(JobNotFound);
154 };
155 let removed = q.remove(idx).expect("index just found");
156 let _ = self.events.send(Event::JobFinished {
157 job_id: removed.job_id,
158 exit_code: 130,
159 });
160 let snap = self.snapshot_inner(&q, None).await;
161 let _ = self.events.send(Event::QueueChanged { snapshot: snap });
162 Ok(())
163 }
164
165 async fn snapshot_inner(
166 &self,
167 queue: &VecDeque<QueuedJob>,
168 active_override: Option<&RunningJob>,
169 ) -> StatusSnapshot {
170 let active = match active_override {
171 Some(r) => Some(r.record.clone()),
172 None => self.active.lock().await.as_ref().map(|r| r.record.clone()),
173 };
174 StatusSnapshot {
175 daemon_pid: self.daemon_pid,
176 daemon_started_at: self.started_at,
177 account: self.account.clone(),
178 active,
179 queue: queue.iter().map(job_record_for_queued).collect(),
180 recent: self.recent.lock().await.iter().cloned().collect(),
181 }
182 }
183
184 pub async fn snapshot(&self) -> StatusSnapshot {
185 let q = self.queue.lock().await;
186 self.snapshot_inner(&q, None).await
187 }
188
189 pub async fn broadcast_snapshot(&self) {
193 let snap = self.snapshot().await;
194 let _ = self.events.send(Event::QueueChanged { snapshot: snap });
195 }
196
197 pub async fn recent_exit_code(&self, job_id: JobId) -> Option<i32> {
200 self.recent
201 .lock()
202 .await
203 .iter()
204 .find(|r| r.job_id == job_id)
205 .and_then(|r| r.exit_code)
206 }
207}
208
209fn job_record_for_queued(j: &QueuedJob) -> JobRecord {
210 JobRecord {
211 job_id: j.job_id,
212 kind: j.kind,
213 args_summary: j.args_summary.clone(),
214 priority: j.priority,
215 submitted_at: j.submitted_at,
216 started_at: None,
217 finished_at: None,
218 exit_code: None,
219 progress: None,
220 }
221}
222
223#[derive(Debug)]
224pub struct JobNotFound;
225
226pub struct RingBuffer<T> {
227 cap: usize,
228 items: VecDeque<T>,
229}
230
231impl<T> RingBuffer<T> {
232 pub fn new(cap: usize) -> Self {
233 Self {
234 cap,
235 items: VecDeque::with_capacity(cap),
236 }
237 }
238 pub fn push(&mut self, v: T) {
239 if self.items.len() == self.cap {
240 self.items.pop_front();
241 }
242 self.items.push_back(v);
243 }
244 pub fn iter(&self) -> impl Iterator<Item = &T> {
245 self.items.iter()
246 }
247}
248
249pub struct ReplayBuffer {
254 entries: VecDeque<(JobId, VecDeque<Event>)>,
255 cap_jobs: usize,
256 cap_per_job: usize,
257}
258
259impl ReplayBuffer {
260 pub fn new(cap_jobs: usize, cap_per_job: usize) -> Self {
261 Self {
262 entries: VecDeque::with_capacity(cap_jobs),
263 cap_jobs,
264 cap_per_job,
265 }
266 }
267
268 pub fn start_job(&mut self, job_id: JobId) {
272 if self.entries.iter().any(|(id, _)| *id == job_id) {
273 return;
274 }
275 if self.entries.len() >= self.cap_jobs {
276 self.entries.pop_front();
277 }
278 self.entries
279 .push_back((job_id, VecDeque::with_capacity(64)));
280 }
281
282 pub fn append(&mut self, job_id: JobId, ev: Event) {
285 if let Some((_, evs)) = self.entries.iter_mut().rev().find(|(id, _)| *id == job_id) {
286 if evs.len() >= self.cap_per_job {
287 evs.pop_front();
288 }
289 evs.push_back(ev);
290 }
291 }
292
293 pub fn events_for(&self, job_id: JobId) -> Option<Vec<Event>> {
294 self.entries
295 .iter()
296 .rev()
297 .find(|(id, _)| *id == job_id)
298 .map(|(_, evs)| evs.iter().cloned().collect())
299 }
300}
301
302async fn replay_collector_loop(state: Arc<DaemonState>, mut rx: broadcast::Receiver<Event>) {
307 loop {
308 match rx.recv().await {
309 Ok(ev) => {
310 if let Some(job_id) = ev.job_id() {
311 let mut replay = state.replay.lock().await;
312 if matches!(&ev, Event::JobStarted { .. }) {
313 replay.start_job(job_id);
314 }
315 replay.append(job_id, ev);
316 }
317 }
318 Err(broadcast::error::RecvError::Lagged(_)) => continue,
319 Err(broadcast::error::RecvError::Closed) => return,
320 }
321 }
322}
323
324pub fn spawn_replay_collector(state: Arc<DaemonState>) -> tokio::task::JoinHandle<()> {
329 let rx = state.events.subscribe();
330 tokio::spawn(replay_collector_loop(state, rx))
331}
332
333use crate::sink::JobSink;
334use steamroom::client::LoggedIn;
335use steamroom::client::SteamClient;
336
337pub struct BroadcastSink {
340 pub job_id: JobId,
341 pub events: broadcast::Sender<Event>,
342}
343
344impl JobSink for BroadcastSink {
345 fn stdout_line(&self, line: &str) {
346 let _ = self.events.send(Event::Stdout {
347 job_id: self.job_id,
348 line: line.to_string(),
349 });
350 }
351 fn progress(&self, update: ProgressUpdate) {
352 let _ = self.events.send(Event::Progress {
353 job_id: self.job_id,
354 update,
355 });
356 }
357}
358
359async fn wait_for_next_job(state: &DaemonState) -> Option<QueuedJob> {
360 loop {
361 if state.shutdown.is_cancelled() {
362 return None;
363 }
364 {
365 let mut q = state.queue.lock().await;
366 if let Some(job) = q.pop_front() {
367 return Some(job);
368 }
369 if state.accepting.is_cancelled() {
370 return None;
372 }
373 }
374 tokio::select! {
375 _ = state.queue_notify.notified() => {}
376 _ = state.shutdown.cancelled() => return None,
377 }
378 }
379}
380
381fn unix_now() -> u64 {
382 std::time::SystemTime::now()
383 .duration_since(std::time::UNIX_EPOCH)
384 .map(|d| d.as_secs())
385 .unwrap_or(0)
387}
388
389fn is_disconnected(err: &crate::errors::CliError) -> bool {
393 use crate::errors::CliError;
394 use steamroom::error::ConnectionError;
395 use steamroom::error::Error as SteamError;
396
397 if matches!(
398 err,
399 CliError::Steam(SteamError::Connection(
400 ConnectionError::Disconnected
401 | ConnectionError::EncryptionFailed
402 | ConnectionError::DnsResolutionFailed,
403 ))
404 ) {
405 return true;
406 }
407
408 let mut source: Option<&(dyn std::error::Error + 'static)> = Some(err);
415 while let Some(e) = source {
416 if let Some(io) = e.downcast_ref::<std::io::Error>()
417 && matches!(
418 io.kind(),
419 std::io::ErrorKind::ConnectionReset
420 | std::io::ErrorKind::BrokenPipe
421 | std::io::ErrorKind::UnexpectedEof
422 | std::io::ErrorKind::NotConnected
423 | std::io::ErrorKind::ConnectionAborted
424 )
425 {
426 return true;
427 }
428 source = e.source();
429 }
430 false
431}
432
433async fn lazy_login(
439 preferred_user: Option<&str>,
440) -> Result<SteamClient<LoggedIn>, crate::errors::CliError> {
441 use crate::cli::AuthOptions;
442 let auth = AuthOptions {
443 username: preferred_user.map(|s| s.to_string()),
444 password: None,
445 qr: false,
446 use_steam_token: false,
447 remember_password: false,
448 device_name: None,
449 };
450 crate::commands::shared::connect_and_login(&auth, None).await
451}
452
453pub async fn worker_loop(
458 state: Arc<DaemonState>,
459 initial_client: Option<SteamClient<LoggedIn>>,
460 preferred_user: Option<String>,
461) {
462 let mut client = initial_client;
463 while let Some(job) = wait_for_next_job(&state).await {
464 let started_at = unix_now();
465 let sink = BroadcastSink {
466 job_id: job.job_id,
467 events: state.events.clone(),
468 };
469 let record = JobRecord {
470 job_id: job.job_id,
471 kind: job.kind,
472 args_summary: job.args_summary.clone(),
473 priority: job.priority,
474 submitted_at: job.submitted_at,
475 started_at: Some(started_at),
476 finished_at: None,
477 exit_code: None,
478 progress: None,
479 };
480 {
481 let mut active = state.active.lock().await;
482 *active = Some(RunningJob {
483 record: record.clone(),
484 cancel: job.cancel.clone(),
485 });
486 }
487 let _ = state.events.send(Event::JobStarted {
488 job_id: job.job_id,
489 kind: job.kind,
490 args_summary: job.args_summary.clone(),
491 });
492 state.broadcast_snapshot().await;
497
498 let sink: Arc<dyn JobSink> = Arc::new(sink);
499 let active_client = match &client {
504 Some(c) => c.clone(),
505 None => match lazy_login(preferred_user.as_deref()).await {
506 Ok(c) => {
507 client = Some(c.clone());
508 c
509 }
510 Err(e) => {
511 let _ = state.events.send(Event::Log {
512 job_id: Some(job.job_id),
513 level: LogLevel::Error,
514 target: "daemon::worker".into(),
515 message: format!("login failed: {e}"),
516 });
517 let _ = state.events.send(Event::JobFinished {
518 job_id: job.job_id,
519 exit_code: 1,
520 });
521 let mut active = state.active.lock().await;
522 *active = None;
523 let mut finished = record;
524 finished.finished_at = Some(unix_now());
525 finished.exit_code = Some(1);
526 state.recent.lock().await.push(finished);
527 state.broadcast_snapshot().await;
528 continue;
529 }
530 },
531 };
532 use futures::future::FutureExt;
533 use tracing::Instrument;
534 let dispatch_fut = dispatch(job.request, active_client, sink.clone(), job.cancel.clone())
535 .instrument(tracing::info_span!("job", job_id = job.job_id.0));
536 let dispatch_result = match std::panic::AssertUnwindSafe(dispatch_fut)
537 .catch_unwind()
538 .await
539 {
540 Ok(res) => res,
541 Err(payload) => {
542 let msg = if let Some(s) = payload.downcast_ref::<&str>() {
543 s.to_string()
544 } else if let Some(s) = payload.downcast_ref::<String>() {
545 s.clone()
546 } else {
547 "(non-string panic payload)".into()
548 };
549 let _ = state.events.send(Event::Log {
550 job_id: Some(job.job_id),
551 level: LogLevel::Error,
552 target: "daemon::worker".into(),
553 message: format!("job panicked: {msg}"),
554 });
555 Err(crate::errors::CliError::Io(std::io::Error::other(
556 "job panicked",
557 )))
558 }
559 };
560 let exit_code = match dispatch_result {
561 Ok(()) => 0,
562 Err(crate::errors::CliError::Cancelled) => 130,
563 Err(e) => {
564 if is_disconnected(&e) {
565 client = None;
568 let _ = state.events.send(Event::Log {
569 job_id: None,
570 level: LogLevel::Warn,
571 target: "daemon::worker".into(),
572 message: "Steam connection lost; will reauthenticate on next job".into(),
573 });
574 }
575 let _ = state.events.send(Event::Log {
576 job_id: Some(job.job_id),
577 level: LogLevel::Error,
578 target: "daemon::worker".into(),
579 message: format!("{e}"),
580 });
581 1
582 }
583 };
584
585 {
586 let mut active = state.active.lock().await;
587 *active = None;
588 }
589 let mut finished = record;
590 finished.finished_at = Some(unix_now());
591 finished.exit_code = Some(exit_code);
592 state.recent.lock().await.push(finished);
593 let _ = state.events.send(Event::JobFinished {
594 job_id: job.job_id,
595 exit_code,
596 });
597 state.broadcast_snapshot().await;
599 }
600 if state.accepting.is_cancelled() {
603 state.shutdown.cancel();
604 }
605}
606
607async fn dispatch(
608 req: Request,
609 client: SteamClient<LoggedIn>,
610 sink: Arc<dyn JobSink>,
611 cancel: CancellationToken,
612) -> Result<(), crate::errors::CliError> {
613 use crate::commands;
614 match req {
615 Request::Download { args, .. } => {
616 commands::download::run_download(args.into(), client, sink, cancel, false).await
618 }
619 Request::Info { args, .. } => {
620 commands::info::run_info(args.into(), client, sink, cancel).await
621 }
622 Request::Files { args, .. } => {
623 commands::files::run_files(args.into(), Some(client), sink, cancel).await
624 }
625 Request::Manifests { args, .. } => {
626 commands::manifests::run_manifests(args.into(), client, sink, cancel).await
627 }
628 Request::Diff { args, .. } => {
629 commands::diff::run_diff(args.into(), client, sink, cancel).await
630 }
631 Request::Packages { args, .. } => {
632 commands::packages::run_packages(args.into(), client, sink, cancel).await
633 }
634 Request::SaveManifest { args, .. } => {
635 commands::save_manifest::run_save_manifest(args.into(), client, sink, cancel).await
636 }
637 Request::Workshop { args, .. } => {
638 commands::workshop::run_workshop(args.into(), client, sink, cancel, false).await
640 }
641 Request::LocalInfo { args, .. } => {
642 commands::local_info::run_local_info(args.into(), sink, cancel).await
643 }
644 Request::Status
645 | Request::Subscribe
646 | Request::Attach { .. }
647 | Request::Cancel { .. }
648 | Request::TogglePriority { .. }
649 | Request::Stop { .. } => {
650 unreachable!("control variants do not produce jobs");
652 }
653 }
654}
655
656use tokio::io::AsyncRead;
657use tokio::io::AsyncWrite;
658
659use crate::daemon::framing::read_frame;
660use crate::daemon::framing::write_frame;
661use crate::daemon::proto::ErrorKind;
662use crate::daemon::proto::Frame;
663use crate::daemon::proto::Response;
664
665pub async fn handle_connection<S>(state: Arc<DaemonState>, mut stream: S)
670where
671 S: AsyncRead + AsyncWrite + Unpin + Send,
672{
673 let req = match read_frame(&mut stream).await {
674 Ok(Frame::Request(r)) => r,
675 Ok(other) => {
676 let _ = write_frame(
677 &mut stream,
678 &Frame::Response(Response::Error {
679 kind: ErrorKind::InvalidRequest,
680 message: format!("expected Request, got {other:?}"),
681 }),
682 )
683 .await;
684 return;
685 }
686 Err(e) => {
687 let _ = write_frame(
688 &mut stream,
689 &Frame::Response(Response::Error {
690 kind: ErrorKind::InvalidRequest,
691 message: e.to_string(),
692 }),
693 )
694 .await;
695 return;
696 }
697 };
698
699 match req {
700 Request::Status => {
701 let snap = state.snapshot().await;
702 let _ = write_frame(&mut stream, &Frame::Response(Response::Status(snap))).await;
703 }
704 Request::Stop { force } => {
705 state.accepting.cancel(); state.queue_notify.notify_one(); if force {
708 if let Some(running) = state.active.lock().await.as_ref() {
710 running.cancel.cancel();
711 }
712 state.shutdown.cancel();
713 } else {
714 let state = state.clone();
721 tokio::spawn(async move {
722 tokio::time::sleep(GRACEFUL_STOP_GRACE).await;
723 if state.shutdown.is_cancelled() {
724 return;
726 }
727 if let Some(running) = state.active.lock().await.as_ref() {
728 let _ = state.events.send(Event::Log {
729 job_id: Some(running.record.job_id),
730 level: LogLevel::Warn,
731 target: "daemon::stop".into(),
732 message: format!(
733 "graceful stop grace ({}s) elapsed; cancelling active job",
734 GRACEFUL_STOP_GRACE.as_secs()
735 ),
736 });
737 running.cancel.cancel();
738 }
739 });
740 }
741 let _ = write_frame(&mut stream, &Frame::Response(Response::Stopping)).await;
742 }
743 Request::Cancel { job_id } => {
744 let resp = match state.cancel(job_id).await {
745 Ok(()) => Response::Ack,
746 Err(_) => Response::Error {
747 kind: ErrorKind::JobNotFound,
748 message: format!("{job_id}"),
749 },
750 };
751 let _ = write_frame(&mut stream, &Frame::Response(resp)).await;
752 }
753 Request::TogglePriority { job_id } => {
754 let resp = match state.toggle_priority(job_id).await {
755 Ok(()) => Response::Ack,
756 Err(_) => Response::Error {
757 kind: ErrorKind::JobNotFound,
758 message: format!("{job_id}"),
759 },
760 };
761 let _ = write_frame(&mut stream, &Frame::Response(resp)).await;
762 }
763 Request::Subscribe => {
764 let rx = state.events.subscribe();
765 stream_events(state.clone(), &mut stream, None, rx).await;
766 }
767 Request::Attach { job_id } => {
768 let active_match = state
769 .active
770 .lock()
771 .await
772 .as_ref()
773 .map(|r| r.record.job_id == job_id)
774 .unwrap_or(false);
775 let queued = state.queue.lock().await.iter().any(|j| j.job_id == job_id);
776
777 if !active_match && !queued {
778 if let Some(exit_code) = state.recent_exit_code(job_id).await {
782 let events = state.replay.lock().await.events_for(job_id);
783 if let Some(events) = events {
784 for ev in events {
785 if write_frame(&mut stream, &Frame::Event(ev)).await.is_err() {
786 return;
787 }
788 }
789 }
790 let _ = write_frame(&mut stream, &Frame::EndOfStream { exit_code }).await;
791 } else {
792 let _ = write_frame(
793 &mut stream,
794 &Frame::Response(Response::Error {
795 kind: ErrorKind::JobNotFound,
796 message: format!("{job_id}"),
797 }),
798 )
799 .await;
800 }
801 return;
802 }
803 let replayed = state.replay.lock().await.events_for(job_id);
806 if let Some(events) = replayed {
807 for ev in events {
808 if write_frame(&mut stream, &Frame::Event(ev)).await.is_err() {
809 return;
810 }
811 }
812 }
813 let rx = state.events.subscribe();
814 stream_events(state.clone(), &mut stream, Some(job_id), rx).await;
815 }
816 other => {
818 let priority = matches!(
819 &other,
820 Request::Download { priority: true, .. }
821 | Request::Info { priority: true, .. }
822 | Request::Files { priority: true, .. }
823 | Request::Manifests { priority: true, .. }
824 | Request::Diff { priority: true, .. }
825 | Request::Packages { priority: true, .. }
826 | Request::SaveManifest { priority: true, .. }
827 | Request::Workshop { priority: true, .. }
828 | Request::LocalInfo { priority: true, .. }
829 );
830 let kind = job_kind_of(&other);
831 let args_summary = summarize(&other);
832 let job_id = state.allocate_job_id();
833 let cancel = CancellationToken::new();
834 let job = QueuedJob {
835 job_id,
836 kind,
837 request: other,
838 priority,
839 submitted_at: unix_now(),
840 cancel,
841 args_summary,
842 };
843 let rx = state.events.subscribe();
845 let position = state.enqueue(job).await;
846 let _ = write_frame(
847 &mut stream,
848 &Frame::Response(Response::JobAccepted { job_id, position }),
849 )
850 .await;
851 stream_events(state.clone(), &mut stream, Some(job_id), rx).await;
852 }
853 }
854}
855
856async fn stream_events<S>(
857 state: Arc<DaemonState>,
858 stream: &mut S,
859 filter: Option<JobId>,
860 mut rx: tokio::sync::broadcast::Receiver<Event>,
861) where
862 S: AsyncWrite + Unpin,
863{
864 loop {
865 tokio::select! {
866 _ = state.shutdown.cancelled() => {
867 let _ = write_frame(stream, &Frame::EndOfStream { exit_code: 130 }).await;
868 return;
869 }
870 ev = rx.recv() => match ev {
871 Ok(ev) => {
872 if let Some(want) = filter
873 && ev.job_id() != Some(want)
874 {
875 continue;
876 }
877 let terminal_exit = match (&ev, filter) {
882 (Event::JobFinished { exit_code, .. }, Some(_)) => Some(*exit_code),
883 _ => None,
884 };
885 if write_frame(stream, &Frame::Event(ev)).await.is_err() {
886 return; }
888 if let Some(exit_code) = terminal_exit {
889 let _ = write_frame(stream, &Frame::EndOfStream { exit_code }).await;
890 return;
891 }
892 }
893 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
894 if let Some(want) = filter
898 && let Some(exit_code) = state.recent_exit_code(want).await
899 {
900 let _ = write_frame(stream, &Frame::EndOfStream { exit_code }).await;
901 return;
902 }
903 continue;
904 }
905 Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
906 }
907 }
908 }
909}
910
911fn job_kind_of(r: &Request) -> JobKind {
912 match r {
913 Request::Download { .. } => JobKind::Download,
914 Request::Info { .. } => JobKind::Info,
915 Request::Files { .. } => JobKind::Files,
916 Request::Manifests { .. } => JobKind::Manifests,
917 Request::Diff { .. } => JobKind::Diff,
918 Request::Packages { .. } => JobKind::Packages,
919 Request::SaveManifest { .. } => JobKind::SaveManifest,
920 Request::Workshop { .. } => JobKind::Workshop,
921 Request::LocalInfo { .. } => JobKind::LocalInfo,
922 _ => unreachable!("control variants do not produce jobs"),
923 }
924}
925
926fn summarize(r: &Request) -> String {
927 match r {
928 Request::Download { args, .. } => {
929 format!("download app={} depot={:?}", args.app, args.depot)
930 }
931 Request::Info { args, .. } => format!("info app={}", args.app),
932 Request::Files { args, .. } => format!("files app={:?}", args.app),
933 Request::Manifests { args, .. } => format!("manifests app={}", args.app),
934 Request::Diff { args, .. } => format!(
935 "diff depot={} from={} to={}",
936 args.depot, args.from, args.to
937 ),
938 Request::Packages { args, .. } => format!("packages count={}", args.packages.len()),
939 Request::SaveManifest { args, .. } => {
940 format!("save-manifest app={} depot={}", args.app, args.depot)
941 }
942 Request::Workshop { args, .. } => format!("workshop item={}", args.item),
943 Request::LocalInfo { .. } => "local-info".to_string(),
944 _ => "(control)".to_string(),
945 }
946}
947
948#[cfg(test)]
949mod tests {
950 use super::*;
951 use crate::daemon::proto::InfoParams;
952 use crate::daemon::proto::OutputFormat;
953
954 fn ev_stdout(job_id: JobId, line: &str) -> Event {
955 Event::Stdout {
956 job_id,
957 line: line.into(),
958 }
959 }
960
961 #[test]
962 fn replay_buffer_evicts_oldest_job_when_full() {
963 let mut rb = ReplayBuffer::new(2, 10);
964 rb.start_job(JobId(1));
965 rb.start_job(JobId(2));
966 rb.start_job(JobId(3)); assert!(rb.events_for(JobId(1)).is_none());
968 assert!(rb.events_for(JobId(2)).is_some());
969 assert!(rb.events_for(JobId(3)).is_some());
970 }
971
972 #[test]
973 fn replay_buffer_caps_events_per_job() {
974 let mut rb = ReplayBuffer::new(4, 3);
975 rb.start_job(JobId(1));
976 for i in 0..5 {
977 rb.append(JobId(1), ev_stdout(JobId(1), &format!("line {i}")));
978 }
979 let events = rb.events_for(JobId(1)).expect("present");
980 assert_eq!(events.len(), 3);
981 match &events[0] {
983 Event::Stdout { line, .. } => assert_eq!(line, "line 2"),
984 _ => panic!("expected Stdout"),
985 }
986 }
987
988 fn fake_queued(state: &DaemonState, priority: bool) -> QueuedJob {
989 QueuedJob {
990 job_id: state.allocate_job_id(),
991 kind: JobKind::Info,
992 request: Request::Info {
993 args: InfoParams {
994 app: 1,
995 format: Some(OutputFormat::Plain),
996 os: None,
997 show_all: false,
998 },
999 priority,
1000 },
1001 priority,
1002 submitted_at: 0,
1003 cancel: CancellationToken::new(),
1004 args_summary: "fake".into(),
1005 }
1006 }
1007
1008 #[tokio::test]
1009 async fn enqueue_returns_position_zero_for_empty_queue() {
1010 let s = DaemonState::new(None, 1, 0);
1011 let pos = s.enqueue(fake_queued(&s, false)).await;
1012 assert_eq!(pos, 0);
1013 }
1014
1015 #[tokio::test]
1016 async fn priority_jumps_non_priority() {
1017 let s = DaemonState::new(None, 1, 0);
1018 let _ = s.enqueue(fake_queued(&s, false)).await;
1019 let _ = s.enqueue(fake_queued(&s, false)).await;
1020 let prio_pos = s.enqueue(fake_queued(&s, true)).await;
1021 assert_eq!(prio_pos, 0, "first priority should land at the head");
1022
1023 let snap = s.snapshot().await;
1024 let kinds: Vec<bool> = snap.queue.iter().map(|j| j.priority).collect();
1025 assert_eq!(kinds, vec![true, false, false]);
1026 }
1027
1028 #[tokio::test]
1029 async fn cancel_queued_removes_and_emits_finished() {
1030 let s = DaemonState::new(None, 1, 0);
1031 let mut rx = s.events.subscribe();
1032 let _ = s.enqueue(fake_queued(&s, false)).await;
1033 let snap = s.snapshot().await;
1034 let target = snap.queue[0].job_id;
1035 s.cancel(target).await.expect("ok");
1036 let mut saw_finished = false;
1037 while let Ok(ev) = rx.try_recv() {
1038 if let Event::JobFinished { job_id, exit_code } = ev {
1039 assert_eq!(job_id, target);
1040 assert_eq!(exit_code, 130);
1041 saw_finished = true;
1042 }
1043 }
1044 assert!(saw_finished, "expected JobFinished after cancel");
1045 }
1046
1047 #[tokio::test]
1048 async fn toggle_priority_moves_across_boundary() {
1049 let s = DaemonState::new(None, 1, 0);
1050 let _ = s.enqueue(fake_queued(&s, true)).await;
1051 let _ = s.enqueue(fake_queued(&s, false)).await;
1052 let target = s.snapshot().await.queue[1].job_id;
1053 s.toggle_priority(target).await.expect("ok");
1054 let kinds: Vec<bool> = s
1055 .snapshot()
1056 .await
1057 .queue
1058 .iter()
1059 .map(|j| j.priority)
1060 .collect();
1061 assert_eq!(kinds, vec![true, true]);
1062 }
1063
1064 use tokio::io::duplex;
1065
1066 #[tokio::test]
1067 async fn status_request_round_trips() {
1068 let s = DaemonState::new(Some("acct".into()), 42, 1000);
1069 let (mut client, server) = duplex(64 * 1024);
1070 let server_state = s.clone();
1071 let server_task = tokio::spawn(async move {
1072 handle_connection(server_state, server).await;
1073 });
1074 crate::daemon::framing::write_frame(&mut client, &Frame::Request(Request::Status))
1075 .await
1076 .unwrap();
1077 let resp = crate::daemon::framing::read_frame(&mut client)
1078 .await
1079 .unwrap();
1080 match resp {
1081 Frame::Response(Response::Status(snap)) => {
1082 assert_eq!(snap.daemon_pid, 42);
1083 assert_eq!(snap.account.as_deref(), Some("acct"));
1084 }
1085 other => panic!("wrong: {other:?}"),
1086 }
1087 server_task.await.unwrap();
1088 }
1089
1090 #[tokio::test]
1091 async fn cancel_active_job_releases_active_lock_promptly() {
1092 let s = DaemonState::new(None, 1, 0);
1094 let active_cancel = CancellationToken::new();
1095 let record = JobRecord {
1096 job_id: JobId(99),
1097 kind: JobKind::Info,
1098 args_summary: "fake".into(),
1099 priority: false,
1100 submitted_at: 0,
1101 started_at: Some(0),
1102 finished_at: None,
1103 exit_code: None,
1104 progress: None,
1105 };
1106 *s.active.lock().await = Some(RunningJob {
1107 record,
1108 cancel: active_cancel.clone(),
1109 });
1110
1111 let res =
1113 tokio::time::timeout(std::time::Duration::from_millis(100), s.cancel(JobId(99))).await;
1114 assert!(res.is_ok(), "cancel timed out -- deadlock?");
1115 assert!(active_cancel.is_cancelled());
1116 }
1117}