Skip to main content

steamroom_cli/daemon/
server.rs

1//! Daemon-side state, worker loop, and connection task. Decoupled from
2//! socket I/O so the queue and dispatch logic can be unit-tested with
3//! plain method calls.
4
5use std::collections::VecDeque;
6use std::sync::Arc;
7use std::sync::atomic::AtomicU64;
8use std::sync::atomic::Ordering;
9use tokio::sync::Mutex;
10use tokio::sync::broadcast;
11use tokio_util::sync::CancellationToken;
12
13/// How long a graceful `daemon stop` waits for the active job before
14/// signalling the job's cancel token. Matches the spec's documented
15/// behavior so a runaway job can't pin the daemon indefinitely.
16const GRACEFUL_STOP_GRACE: std::time::Duration = std::time::Duration::from_secs(30);
17
18use crate::daemon::proto::Event;
19use crate::daemon::proto::JobId;
20use crate::daemon::proto::JobKind;
21use crate::daemon::proto::JobRecord;
22use crate::daemon::proto::LogLevel;
23use crate::daemon::proto::ProgressUpdate;
24use crate::daemon::proto::Request;
25use crate::daemon::proto::StatusSnapshot;
26
27/// One unit of work waiting to run. The `request` carries the parameters;
28/// `priority` is also reflected here so the queue can rebalance on
29/// `TogglePriority` without re-reading the Request.
30pub struct QueuedJob {
31    pub job_id: JobId,
32    pub kind: JobKind,
33    pub request: Request,
34    pub priority: bool,
35    pub submitted_at: u64,
36    pub cancel: CancellationToken,
37    pub args_summary: String,
38}
39
40pub struct RunningJob {
41    pub record: JobRecord,
42    pub cancel: CancellationToken,
43}
44
45pub struct DaemonState {
46    pub queue: Mutex<VecDeque<QueuedJob>>,
47    pub active: Mutex<Option<RunningJob>>,
48    pub recent: Mutex<RingBuffer<JobRecord>>,
49    /// Per-job event history for `attach <id>` to a finished job. The
50    /// collector task (spawned in `serve_resumed`) populates this from
51    /// the broadcast channel; the Attach handler dumps it before
52    /// EndOfStream. Bounded per-job and across jobs so a runaway
53    /// process can't OOM the daemon.
54    pub replay: Mutex<ReplayBuffer>,
55    pub events: broadcast::Sender<Event>,
56    pub next_job_id: AtomicU64,
57    /// Set when Stop is received. The accept loop and worker_loop check
58    /// this to refuse new jobs and exit when idle. Active connections
59    /// keep streaming until their job completes.
60    pub accepting: CancellationToken,
61    /// Set for a hard stop (force=true) or when the queue drains after a
62    /// graceful stop. Signals all subscribers and the accept loop to exit.
63    pub shutdown: CancellationToken,
64    pub started_at: u64,
65    pub daemon_pid: u32,
66    pub account: Option<String>,
67    pub queue_notify: tokio::sync::Notify,
68}
69
70impl DaemonState {
71    pub fn new(account: Option<String>, daemon_pid: u32, started_at: u64) -> Arc<Self> {
72        // Channel depth: enough to buffer one chunky download's worth of
73        // progress events between worker emit and subscriber drain. Lag
74        // is tolerated; clients reading slower than the worker writes
75        // see `broadcast::error::RecvError::Lagged` and skip ahead.
76        let (events, _) = broadcast::channel(512);
77        Arc::new(Self {
78            queue: Mutex::new(VecDeque::new()),
79            active: Mutex::new(None),
80            recent: Mutex::new(RingBuffer::new(32)),
81            replay: Mutex::new(ReplayBuffer::new(32, 200)),
82            events,
83            next_job_id: AtomicU64::new(1),
84            accepting: CancellationToken::new(),
85            shutdown: CancellationToken::new(),
86            started_at,
87            daemon_pid,
88            account,
89            queue_notify: tokio::sync::Notify::new(),
90        })
91    }
92
93    pub fn allocate_job_id(&self) -> JobId {
94        JobId(self.next_job_id.fetch_add(1, Ordering::Relaxed))
95    }
96
97    /// Insert a job. Priority items sit at the boundary between existing
98    /// priority items and non-priority items; non-priority items append.
99    /// Returns the queue position the caller should report: 0 if it will
100    /// run next (queue empty and no active), 1+ otherwise. The "active"
101    /// job is NOT counted because it is already running.
102    pub async fn enqueue(&self, job: QueuedJob) -> u32 {
103        let mut q = self.queue.lock().await;
104        let position = if job.priority {
105            let boundary = q.iter().take_while(|j| j.priority).count();
106            q.insert(boundary, job);
107            boundary as u32
108        } else {
109            let pos = q.len() as u32;
110            q.push_back(job);
111            pos
112        };
113        self.queue_notify.notify_one();
114        let snap = self.snapshot_inner(&q, None).await;
115        let _ = self.events.send(Event::QueueChanged { snapshot: snap });
116        position
117    }
118
119    pub async fn toggle_priority(&self, job_id: JobId) -> Result<(), JobNotFound> {
120        let mut q = self.queue.lock().await;
121        let Some(idx) = q.iter().position(|j| j.job_id == job_id) else {
122            return Err(JobNotFound);
123        };
124        let mut job = q.remove(idx).expect("index just found");
125        job.priority = !job.priority;
126        if job.priority {
127            let boundary = q.iter().take_while(|j| j.priority).count();
128            q.insert(boundary, job);
129        } else {
130            q.push_back(job);
131        }
132        let snap = self.snapshot_inner(&q, None).await;
133        let _ = self.events.send(Event::QueueChanged { snapshot: snap });
134        Ok(())
135    }
136
137    pub async fn cancel(&self, job_id: JobId) -> Result<(), JobNotFound> {
138        // Pull the active job's cancel token under a scoped guard so the
139        // active lock is released before we take the queue lock.
140        let active_token = {
141            let guard = self.active.lock().await;
142            guard
143                .as_ref()
144                .and_then(|r| (r.record.job_id == job_id).then(|| r.cancel.clone()))
145        };
146        if let Some(token) = active_token {
147            token.cancel();
148            return Ok(());
149        }
150        // Not active; try queued.
151        let mut q = self.queue.lock().await;
152        let Some(idx) = q.iter().position(|j| j.job_id == job_id) else {
153            return Err(JobNotFound);
154        };
155        let removed = q.remove(idx).expect("index just found");
156        let _ = self.events.send(Event::JobFinished {
157            job_id: removed.job_id,
158            exit_code: 130,
159        });
160        let snap = self.snapshot_inner(&q, None).await;
161        let _ = self.events.send(Event::QueueChanged { snapshot: snap });
162        Ok(())
163    }
164
165    async fn snapshot_inner(
166        &self,
167        queue: &VecDeque<QueuedJob>,
168        active_override: Option<&RunningJob>,
169    ) -> StatusSnapshot {
170        let active = match active_override {
171            Some(r) => Some(r.record.clone()),
172            None => self.active.lock().await.as_ref().map(|r| r.record.clone()),
173        };
174        StatusSnapshot {
175            daemon_pid: self.daemon_pid,
176            daemon_started_at: self.started_at,
177            account: self.account.clone(),
178            active,
179            queue: queue.iter().map(job_record_for_queued).collect(),
180            recent: self.recent.lock().await.iter().cloned().collect(),
181        }
182    }
183
184    pub async fn snapshot(&self) -> StatusSnapshot {
185        let q = self.queue.lock().await;
186        self.snapshot_inner(&q, None).await
187    }
188
189    /// Snapshot + broadcast on the events channel. Called whenever the
190    /// queue, active job, or recent ring changes so Subscribe stream
191    /// clients (the TUI, daemon status) refresh without re-polling.
192    pub async fn broadcast_snapshot(&self) {
193        let snap = self.snapshot().await;
194        let _ = self.events.send(Event::QueueChanged { snapshot: snap });
195    }
196
197    /// Look up a recently-finished job by id. Used by `stream_events` to
198    /// recover the terminal exit code after a broadcast lag.
199    pub async fn recent_exit_code(&self, job_id: JobId) -> Option<i32> {
200        self.recent
201            .lock()
202            .await
203            .iter()
204            .find(|r| r.job_id == job_id)
205            .and_then(|r| r.exit_code)
206    }
207}
208
209fn job_record_for_queued(j: &QueuedJob) -> JobRecord {
210    JobRecord {
211        job_id: j.job_id,
212        kind: j.kind,
213        args_summary: j.args_summary.clone(),
214        priority: j.priority,
215        submitted_at: j.submitted_at,
216        started_at: None,
217        finished_at: None,
218        exit_code: None,
219        progress: None,
220    }
221}
222
223#[derive(Debug)]
224pub struct JobNotFound;
225
226pub struct RingBuffer<T> {
227    cap: usize,
228    items: VecDeque<T>,
229}
230
231impl<T> RingBuffer<T> {
232    pub fn new(cap: usize) -> Self {
233        Self {
234            cap,
235            items: VecDeque::with_capacity(cap),
236        }
237    }
238    pub fn push(&mut self, v: T) {
239        if self.items.len() == self.cap {
240            self.items.pop_front();
241        }
242        self.items.push_back(v);
243    }
244    pub fn iter(&self) -> impl Iterator<Item = &T> {
245        self.items.iter()
246    }
247}
248
249/// Per-job ring of `Event`s used to replay history when `daemon attach`
250/// targets a finished or in-flight job. Bounded both in jobs tracked
251/// (oldest job dropped when full) and in events per job (oldest event
252/// per job dropped when full).
253pub struct ReplayBuffer {
254    entries: VecDeque<(JobId, VecDeque<Event>)>,
255    cap_jobs: usize,
256    cap_per_job: usize,
257}
258
259impl ReplayBuffer {
260    pub fn new(cap_jobs: usize, cap_per_job: usize) -> Self {
261        Self {
262            entries: VecDeque::with_capacity(cap_jobs),
263            cap_jobs,
264            cap_per_job,
265        }
266    }
267
268    /// Begin a new job's history. Drops the oldest tracked job if at
269    /// capacity. Idempotent for repeated JobStarted events on the same
270    /// id (which shouldn't happen but we tolerate it).
271    pub fn start_job(&mut self, job_id: JobId) {
272        if self.entries.iter().any(|(id, _)| *id == job_id) {
273            return;
274        }
275        if self.entries.len() >= self.cap_jobs {
276            self.entries.pop_front();
277        }
278        self.entries
279            .push_back((job_id, VecDeque::with_capacity(64)));
280    }
281
282    /// Append an event to the named job's ring. Silently ignored if
283    /// the job's entry has been evicted.
284    pub fn append(&mut self, job_id: JobId, ev: Event) {
285        if let Some((_, evs)) = self.entries.iter_mut().rev().find(|(id, _)| *id == job_id) {
286            if evs.len() >= self.cap_per_job {
287                evs.pop_front();
288            }
289            evs.push_back(ev);
290        }
291    }
292
293    pub fn events_for(&self, job_id: JobId) -> Option<Vec<Event>> {
294        self.entries
295            .iter()
296            .rev()
297            .find(|(id, _)| *id == job_id)
298            .map(|(_, evs)| evs.iter().cloned().collect())
299    }
300}
301
302/// Long-running task: subscribes to the broadcast channel and routes
303/// every per-job event into the replay buffer. Spawn via
304/// `spawn_replay_collector` so the subscribe happens synchronously and
305/// no events are missed before the task is scheduled.
306async fn replay_collector_loop(state: Arc<DaemonState>, mut rx: broadcast::Receiver<Event>) {
307    loop {
308        match rx.recv().await {
309            Ok(ev) => {
310                if let Some(job_id) = ev.job_id() {
311                    let mut replay = state.replay.lock().await;
312                    if matches!(&ev, Event::JobStarted { .. }) {
313                        replay.start_job(job_id);
314                    }
315                    replay.append(job_id, ev);
316                }
317            }
318            Err(broadcast::error::RecvError::Lagged(_)) => continue,
319            Err(broadcast::error::RecvError::Closed) => return,
320        }
321    }
322}
323
324/// Spawn the replay collector. The broadcast subscription is created
325/// synchronously before the task is spawned, so events emitted between
326/// the call to this function and the spawned task's first poll are not
327/// lost.
328pub fn spawn_replay_collector(state: Arc<DaemonState>) -> tokio::task::JoinHandle<()> {
329    let rx = state.events.subscribe();
330    tokio::spawn(replay_collector_loop(state, rx))
331}
332
333use crate::sink::JobSink;
334use steamroom::client::LoggedIn;
335use steamroom::client::SteamClient;
336
337/// Daemon-side JobSink that translates every call into an Event and
338/// broadcasts it. Cheap to construct per job.
339pub struct BroadcastSink {
340    pub job_id: JobId,
341    pub events: broadcast::Sender<Event>,
342}
343
344impl JobSink for BroadcastSink {
345    fn stdout_line(&self, line: &str) {
346        let _ = self.events.send(Event::Stdout {
347            job_id: self.job_id,
348            line: line.to_string(),
349        });
350    }
351    fn progress(&self, update: ProgressUpdate) {
352        let _ = self.events.send(Event::Progress {
353            job_id: self.job_id,
354            update,
355        });
356    }
357}
358
359async fn wait_for_next_job(state: &DaemonState) -> Option<QueuedJob> {
360    loop {
361        if state.shutdown.is_cancelled() {
362            return None;
363        }
364        {
365            let mut q = state.queue.lock().await;
366            if let Some(job) = q.pop_front() {
367                return Some(job);
368            }
369            if state.accepting.is_cancelled() {
370                // Graceful Stop: queue empty, no more work coming.
371                return None;
372            }
373        }
374        tokio::select! {
375            _ = state.queue_notify.notified() => {}
376            _ = state.shutdown.cancelled() => return None,
377        }
378    }
379}
380
381fn unix_now() -> u64 {
382    std::time::SystemTime::now()
383        .duration_since(std::time::UNIX_EPOCH)
384        .map(|d| d.as_secs())
385        // Zero on error is acceptable: timestamps are advisory for display.
386        .unwrap_or(0)
387}
388
389/// True if the error suggests the underlying CM connection is gone
390/// rather than the request being malformed. Used by the worker to drop
391/// the SteamClient and force a fresh login on the next job.
392fn is_disconnected(err: &crate::errors::CliError) -> bool {
393    use crate::errors::CliError;
394    use steamroom::error::ConnectionError;
395    use steamroom::error::Error as SteamError;
396
397    if matches!(
398        err,
399        CliError::Steam(SteamError::Connection(
400            ConnectionError::Disconnected
401                | ConnectionError::EncryptionFailed
402                | ConnectionError::DnsResolutionFailed,
403        ))
404    ) {
405        return true;
406    }
407
408    // A dropped CM/CDN socket (e.g. Steam closing an idle connection) surfaces
409    // as an io::Error wrapped at varying depths: CliError::Io, SteamError::Io,
410    // or ConnectionError::Io. Matching one layer misses the others, so walk the
411    // whole source chain and treat any connection-flavored io::Error as a
412    // disconnect. Without this the worker keeps reusing the dead client and
413    // every subsequent job fails instead of reauthenticating.
414    let mut source: Option<&(dyn std::error::Error + 'static)> = Some(err);
415    while let Some(e) = source {
416        if let Some(io) = e.downcast_ref::<std::io::Error>()
417            && matches!(
418                io.kind(),
419                std::io::ErrorKind::ConnectionReset
420                    | std::io::ErrorKind::BrokenPipe
421                    | std::io::ErrorKind::UnexpectedEof
422                    | std::io::ErrorKind::NotConnected
423                    | std::io::ErrorKind::ConnectionAborted
424            )
425        {
426            return true;
427        }
428        source = e.source();
429    }
430    false
431}
432
433/// Lazily authenticate when the worker needs a client. `preferred_user`
434/// is the username the daemon was launched with (Some if `daemon start
435/// --username foo`; None for fully-lazy daemons). Falls back to the
436/// user's saved-token detection and finally anonymous, mirroring direct
437/// mode's `connect_and_login`.
438async fn lazy_login(
439    preferred_user: Option<&str>,
440) -> Result<SteamClient<LoggedIn>, crate::errors::CliError> {
441    use crate::cli::AuthOptions;
442    let auth = AuthOptions {
443        username: preferred_user.map(|s| s.to_string()),
444        password: None,
445        qr: false,
446        use_steam_token: false,
447        remember_password: false,
448        device_name: None,
449    };
450    crate::commands::shared::connect_and_login(&auth, None).await
451}
452
453/// Single-job worker loop. Holds the SteamClient lazily: it is acquired
454/// on the first job that needs it and recreated after a connection-loss
455/// error so subsequent jobs reauthenticate transparently. `preferred_user`
456/// is forwarded into the lazy login flow.
457pub async fn worker_loop(
458    state: Arc<DaemonState>,
459    initial_client: Option<SteamClient<LoggedIn>>,
460    preferred_user: Option<String>,
461) {
462    let mut client = initial_client;
463    while let Some(job) = wait_for_next_job(&state).await {
464        let started_at = unix_now();
465        let sink = BroadcastSink {
466            job_id: job.job_id,
467            events: state.events.clone(),
468        };
469        let record = JobRecord {
470            job_id: job.job_id,
471            kind: job.kind,
472            args_summary: job.args_summary.clone(),
473            priority: job.priority,
474            submitted_at: job.submitted_at,
475            started_at: Some(started_at),
476            finished_at: None,
477            exit_code: None,
478            progress: None,
479        };
480        {
481            let mut active = state.active.lock().await;
482            *active = Some(RunningJob {
483                record: record.clone(),
484                cancel: job.cancel.clone(),
485            });
486        }
487        let _ = state.events.send(Event::JobStarted {
488            job_id: job.job_id,
489            kind: job.kind,
490            args_summary: job.args_summary.clone(),
491        });
492        // The job has moved from the queue to active. Subscribe streams
493        // (TUI, status monitors) need a fresh snapshot to drop the queue
494        // entry; QueueChanged is the only event they treat as
495        // queue/active state-of-the-world.
496        state.broadcast_snapshot().await;
497
498        let sink: Arc<dyn JobSink> = Arc::new(sink);
499        // Ensure we have a client. For local_info this is wasted work
500        // since the dispatch arm ignores the client, but the cost is
501        // bounded by lazy_login's network round-trip and only fires
502        // once per disconnect/restart.
503        let active_client = match &client {
504            Some(c) => c.clone(),
505            None => match lazy_login(preferred_user.as_deref()).await {
506                Ok(c) => {
507                    client = Some(c.clone());
508                    c
509                }
510                Err(e) => {
511                    let _ = state.events.send(Event::Log {
512                        job_id: Some(job.job_id),
513                        level: LogLevel::Error,
514                        target: "daemon::worker".into(),
515                        message: format!("login failed: {e}"),
516                    });
517                    let _ = state.events.send(Event::JobFinished {
518                        job_id: job.job_id,
519                        exit_code: 1,
520                    });
521                    let mut active = state.active.lock().await;
522                    *active = None;
523                    let mut finished = record;
524                    finished.finished_at = Some(unix_now());
525                    finished.exit_code = Some(1);
526                    state.recent.lock().await.push(finished);
527                    state.broadcast_snapshot().await;
528                    continue;
529                }
530            },
531        };
532        use futures::future::FutureExt;
533        use tracing::Instrument;
534        let dispatch_fut = dispatch(job.request, active_client, sink.clone(), job.cancel.clone())
535            .instrument(tracing::info_span!("job", job_id = job.job_id.0));
536        let dispatch_result = match std::panic::AssertUnwindSafe(dispatch_fut)
537            .catch_unwind()
538            .await
539        {
540            Ok(res) => res,
541            Err(payload) => {
542                let msg = if let Some(s) = payload.downcast_ref::<&str>() {
543                    s.to_string()
544                } else if let Some(s) = payload.downcast_ref::<String>() {
545                    s.clone()
546                } else {
547                    "(non-string panic payload)".into()
548                };
549                let _ = state.events.send(Event::Log {
550                    job_id: Some(job.job_id),
551                    level: LogLevel::Error,
552                    target: "daemon::worker".into(),
553                    message: format!("job panicked: {msg}"),
554                });
555                Err(crate::errors::CliError::Io(std::io::Error::other(
556                    "job panicked",
557                )))
558            }
559        };
560        let exit_code = match dispatch_result {
561            Ok(()) => 0,
562            Err(crate::errors::CliError::Cancelled) => 130,
563            Err(e) => {
564                if is_disconnected(&e) {
565                    // CM connection looks dead. Drop the client; the next
566                    // job will trigger a lazy reauth.
567                    client = None;
568                    let _ = state.events.send(Event::Log {
569                        job_id: None,
570                        level: LogLevel::Warn,
571                        target: "daemon::worker".into(),
572                        message: "Steam connection lost; will reauthenticate on next job".into(),
573                    });
574                }
575                let _ = state.events.send(Event::Log {
576                    job_id: Some(job.job_id),
577                    level: LogLevel::Error,
578                    target: "daemon::worker".into(),
579                    message: format!("{e}"),
580                });
581                1
582            }
583        };
584
585        {
586            let mut active = state.active.lock().await;
587            *active = None;
588        }
589        let mut finished = record;
590        finished.finished_at = Some(unix_now());
591        finished.exit_code = Some(exit_code);
592        state.recent.lock().await.push(finished);
593        let _ = state.events.send(Event::JobFinished {
594            job_id: job.job_id,
595            exit_code,
596        });
597        // active cleared, recent grew -- refresh subscribed clients.
598        state.broadcast_snapshot().await;
599    }
600    // If we got here because accepting was cancelled and the queue is empty,
601    // signal full shutdown so the accept loop and any Subscribe streams wind down.
602    if state.accepting.is_cancelled() {
603        state.shutdown.cancel();
604    }
605}
606
607async fn dispatch(
608    req: Request,
609    client: SteamClient<LoggedIn>,
610    sink: Arc<dyn JobSink>,
611    cancel: CancellationToken,
612) -> Result<(), crate::errors::CliError> {
613    use crate::commands;
614    match req {
615        Request::Download { args, .. } => {
616            // show_progress=false: progress flows via sink.progress, not indicatif.
617            commands::download::run_download(args.into(), client, sink, cancel, false).await
618        }
619        Request::Info { args, .. } => {
620            commands::info::run_info(args.into(), client, sink, cancel).await
621        }
622        Request::Files { args, .. } => {
623            commands::files::run_files(args.into(), Some(client), sink, cancel).await
624        }
625        Request::Manifests { args, .. } => {
626            commands::manifests::run_manifests(args.into(), client, sink, cancel).await
627        }
628        Request::Diff { args, .. } => {
629            commands::diff::run_diff(args.into(), client, sink, cancel).await
630        }
631        Request::Packages { args, .. } => {
632            commands::packages::run_packages(args.into(), client, sink, cancel).await
633        }
634        Request::SaveManifest { args, .. } => {
635            commands::save_manifest::run_save_manifest(args.into(), client, sink, cancel).await
636        }
637        Request::Workshop { args, .. } => {
638            // show_progress=false: progress flows via sink.progress, not indicatif.
639            commands::workshop::run_workshop(args.into(), client, sink, cancel, false).await
640        }
641        Request::LocalInfo { args, .. } => {
642            commands::local_info::run_local_info(args.into(), sink, cancel).await
643        }
644        Request::Status
645        | Request::Subscribe
646        | Request::Attach { .. }
647        | Request::Cancel { .. }
648        | Request::TogglePriority { .. }
649        | Request::Stop { .. } => {
650            // Control variants are handled by handle_connection (T14), not dispatched as jobs.
651            unreachable!("control variants do not produce jobs");
652        }
653    }
654}
655
656use tokio::io::AsyncRead;
657use tokio::io::AsyncWrite;
658
659use crate::daemon::framing::read_frame;
660use crate::daemon::framing::write_frame;
661use crate::daemon::proto::ErrorKind;
662use crate::daemon::proto::Frame;
663use crate::daemon::proto::Response;
664
665/// Handle a single client connection. Reads exactly one Request, then
666/// either replies with a single Response and closes (control RPCs), or
667/// streams Events filtered by job id (job submissions, Subscribe, Attach)
668/// until terminated by JobFinished or a shutdown signal.
669pub async fn handle_connection<S>(state: Arc<DaemonState>, mut stream: S)
670where
671    S: AsyncRead + AsyncWrite + Unpin + Send,
672{
673    let req = match read_frame(&mut stream).await {
674        Ok(Frame::Request(r)) => r,
675        Ok(other) => {
676            let _ = write_frame(
677                &mut stream,
678                &Frame::Response(Response::Error {
679                    kind: ErrorKind::InvalidRequest,
680                    message: format!("expected Request, got {other:?}"),
681                }),
682            )
683            .await;
684            return;
685        }
686        Err(e) => {
687            let _ = write_frame(
688                &mut stream,
689                &Frame::Response(Response::Error {
690                    kind: ErrorKind::InvalidRequest,
691                    message: e.to_string(),
692                }),
693            )
694            .await;
695            return;
696        }
697    };
698
699    match req {
700        Request::Status => {
701            let snap = state.snapshot().await;
702            let _ = write_frame(&mut stream, &Frame::Response(Response::Status(snap))).await;
703        }
704        Request::Stop { force } => {
705            state.accepting.cancel(); // refuse new submissions
706            state.queue_notify.notify_one(); // unstick worker if idle
707            if force {
708                // Hard stop: cancel active job and tell all subscribers.
709                if let Some(running) = state.active.lock().await.as_ref() {
710                    running.cancel.cancel();
711                }
712                state.shutdown.cancel();
713            } else {
714                // Graceful: do NOT signal `shutdown` here. The
715                // worker_loop will set shutdown once active is None AND
716                // queue is empty. Spawn a grace-timer task that cancels
717                // the active job (whichever job is active at that
718                // moment) after 30s so a 6-hour download can't pin the
719                // daemon forever.
720                let state = state.clone();
721                tokio::spawn(async move {
722                    tokio::time::sleep(GRACEFUL_STOP_GRACE).await;
723                    if state.shutdown.is_cancelled() {
724                        // Already fully shut down (queue drained); nothing to do.
725                        return;
726                    }
727                    if let Some(running) = state.active.lock().await.as_ref() {
728                        let _ = state.events.send(Event::Log {
729                            job_id: Some(running.record.job_id),
730                            level: LogLevel::Warn,
731                            target: "daemon::stop".into(),
732                            message: format!(
733                                "graceful stop grace ({}s) elapsed; cancelling active job",
734                                GRACEFUL_STOP_GRACE.as_secs()
735                            ),
736                        });
737                        running.cancel.cancel();
738                    }
739                });
740            }
741            let _ = write_frame(&mut stream, &Frame::Response(Response::Stopping)).await;
742        }
743        Request::Cancel { job_id } => {
744            let resp = match state.cancel(job_id).await {
745                Ok(()) => Response::Ack,
746                Err(_) => Response::Error {
747                    kind: ErrorKind::JobNotFound,
748                    message: format!("{job_id}"),
749                },
750            };
751            let _ = write_frame(&mut stream, &Frame::Response(resp)).await;
752        }
753        Request::TogglePriority { job_id } => {
754            let resp = match state.toggle_priority(job_id).await {
755                Ok(()) => Response::Ack,
756                Err(_) => Response::Error {
757                    kind: ErrorKind::JobNotFound,
758                    message: format!("{job_id}"),
759                },
760            };
761            let _ = write_frame(&mut stream, &Frame::Response(resp)).await;
762        }
763        Request::Subscribe => {
764            let rx = state.events.subscribe();
765            stream_events(state.clone(), &mut stream, None, rx).await;
766        }
767        Request::Attach { job_id } => {
768            let active_match = state
769                .active
770                .lock()
771                .await
772                .as_ref()
773                .map(|r| r.record.job_id == job_id)
774                .unwrap_or(false);
775            let queued = state.queue.lock().await.iter().any(|j| j.job_id == job_id);
776
777            if !active_match && !queued {
778                // Either finished or never existed. If finished, replay
779                // the buffered events before EndOfStream so the user
780                // sees stdout/log/progress, not just an exit code.
781                if let Some(exit_code) = state.recent_exit_code(job_id).await {
782                    let events = state.replay.lock().await.events_for(job_id);
783                    if let Some(events) = events {
784                        for ev in events {
785                            if write_frame(&mut stream, &Frame::Event(ev)).await.is_err() {
786                                return;
787                            }
788                        }
789                    }
790                    let _ = write_frame(&mut stream, &Frame::EndOfStream { exit_code }).await;
791                } else {
792                    let _ = write_frame(
793                        &mut stream,
794                        &Frame::Response(Response::Error {
795                            kind: ErrorKind::JobNotFound,
796                            message: format!("{job_id}"),
797                        }),
798                    )
799                    .await;
800                }
801                return;
802            }
803            // In-flight: replay everything we have so the user sees what
804            // happened before they attached, then continue live.
805            let replayed = state.replay.lock().await.events_for(job_id);
806            if let Some(events) = replayed {
807                for ev in events {
808                    if write_frame(&mut stream, &Frame::Event(ev)).await.is_err() {
809                        return;
810                    }
811                }
812            }
813            let rx = state.events.subscribe();
814            stream_events(state.clone(), &mut stream, Some(job_id), rx).await;
815        }
816        // Job submissions.
817        other => {
818            let priority = matches!(
819                &other,
820                Request::Download { priority: true, .. }
821                    | Request::Info { priority: true, .. }
822                    | Request::Files { priority: true, .. }
823                    | Request::Manifests { priority: true, .. }
824                    | Request::Diff { priority: true, .. }
825                    | Request::Packages { priority: true, .. }
826                    | Request::SaveManifest { priority: true, .. }
827                    | Request::Workshop { priority: true, .. }
828                    | Request::LocalInfo { priority: true, .. }
829            );
830            let kind = job_kind_of(&other);
831            let args_summary = summarize(&other);
832            let job_id = state.allocate_job_id();
833            let cancel = CancellationToken::new();
834            let job = QueuedJob {
835                job_id,
836                kind,
837                request: other,
838                priority,
839                submitted_at: unix_now(),
840                cancel,
841                args_summary,
842            };
843            // Subscribe BEFORE enqueue so we don't miss JobStarted.
844            let rx = state.events.subscribe();
845            let position = state.enqueue(job).await;
846            let _ = write_frame(
847                &mut stream,
848                &Frame::Response(Response::JobAccepted { job_id, position }),
849            )
850            .await;
851            stream_events(state.clone(), &mut stream, Some(job_id), rx).await;
852        }
853    }
854}
855
856async fn stream_events<S>(
857    state: Arc<DaemonState>,
858    stream: &mut S,
859    filter: Option<JobId>,
860    mut rx: tokio::sync::broadcast::Receiver<Event>,
861) where
862    S: AsyncWrite + Unpin,
863{
864    loop {
865        tokio::select! {
866            _ = state.shutdown.cancelled() => {
867                let _ = write_frame(stream, &Frame::EndOfStream { exit_code: 130 }).await;
868                return;
869            }
870            ev = rx.recv() => match ev {
871                Ok(ev) => {
872                    if let Some(want) = filter
873                        && ev.job_id() != Some(want)
874                    {
875                        continue;
876                    }
877                    // A JobFinished terminates a single-job attach (filter
878                    // = Some(id)) but is just another broadcast to a global
879                    // Subscribe (filter = None) -- the TUI / monitor stays
880                    // up for the next job.
881                    let terminal_exit = match (&ev, filter) {
882                        (Event::JobFinished { exit_code, .. }, Some(_)) => Some(*exit_code),
883                        _ => None,
884                    };
885                    if write_frame(stream, &Frame::Event(ev)).await.is_err() {
886                        return; // Client dropped; that is fine.
887                    }
888                    if let Some(exit_code) = terminal_exit {
889                        let _ = write_frame(stream, &Frame::EndOfStream { exit_code }).await;
890                        return;
891                    }
892                }
893                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
894                    // If the filtered job already finished while we were lagging,
895                    // recover its exit code from `recent` and write the terminal
896                    // frame. Otherwise resume reading and hope the buffer recovers.
897                    if let Some(want) = filter
898                        && let Some(exit_code) = state.recent_exit_code(want).await
899                    {
900                        let _ = write_frame(stream, &Frame::EndOfStream { exit_code }).await;
901                        return;
902                    }
903                    continue;
904                }
905                Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
906            }
907        }
908    }
909}
910
911fn job_kind_of(r: &Request) -> JobKind {
912    match r {
913        Request::Download { .. } => JobKind::Download,
914        Request::Info { .. } => JobKind::Info,
915        Request::Files { .. } => JobKind::Files,
916        Request::Manifests { .. } => JobKind::Manifests,
917        Request::Diff { .. } => JobKind::Diff,
918        Request::Packages { .. } => JobKind::Packages,
919        Request::SaveManifest { .. } => JobKind::SaveManifest,
920        Request::Workshop { .. } => JobKind::Workshop,
921        Request::LocalInfo { .. } => JobKind::LocalInfo,
922        _ => unreachable!("control variants do not produce jobs"),
923    }
924}
925
926fn summarize(r: &Request) -> String {
927    match r {
928        Request::Download { args, .. } => {
929            format!("download app={} depot={:?}", args.app, args.depot)
930        }
931        Request::Info { args, .. } => format!("info app={}", args.app),
932        Request::Files { args, .. } => format!("files app={:?}", args.app),
933        Request::Manifests { args, .. } => format!("manifests app={}", args.app),
934        Request::Diff { args, .. } => format!(
935            "diff depot={} from={} to={}",
936            args.depot, args.from, args.to
937        ),
938        Request::Packages { args, .. } => format!("packages count={}", args.packages.len()),
939        Request::SaveManifest { args, .. } => {
940            format!("save-manifest app={} depot={}", args.app, args.depot)
941        }
942        Request::Workshop { args, .. } => format!("workshop item={}", args.item),
943        Request::LocalInfo { .. } => "local-info".to_string(),
944        _ => "(control)".to_string(),
945    }
946}
947
948#[cfg(test)]
949mod tests {
950    use super::*;
951    use crate::daemon::proto::InfoParams;
952    use crate::daemon::proto::OutputFormat;
953
954    fn ev_stdout(job_id: JobId, line: &str) -> Event {
955        Event::Stdout {
956            job_id,
957            line: line.into(),
958        }
959    }
960
961    #[test]
962    fn replay_buffer_evicts_oldest_job_when_full() {
963        let mut rb = ReplayBuffer::new(2, 10);
964        rb.start_job(JobId(1));
965        rb.start_job(JobId(2));
966        rb.start_job(JobId(3)); // evicts JobId(1)
967        assert!(rb.events_for(JobId(1)).is_none());
968        assert!(rb.events_for(JobId(2)).is_some());
969        assert!(rb.events_for(JobId(3)).is_some());
970    }
971
972    #[test]
973    fn replay_buffer_caps_events_per_job() {
974        let mut rb = ReplayBuffer::new(4, 3);
975        rb.start_job(JobId(1));
976        for i in 0..5 {
977            rb.append(JobId(1), ev_stdout(JobId(1), &format!("line {i}")));
978        }
979        let events = rb.events_for(JobId(1)).expect("present");
980        assert_eq!(events.len(), 3);
981        // Oldest two were dropped; events 2,3,4 remain.
982        match &events[0] {
983            Event::Stdout { line, .. } => assert_eq!(line, "line 2"),
984            _ => panic!("expected Stdout"),
985        }
986    }
987
988    fn fake_queued(state: &DaemonState, priority: bool) -> QueuedJob {
989        QueuedJob {
990            job_id: state.allocate_job_id(),
991            kind: JobKind::Info,
992            request: Request::Info {
993                args: InfoParams {
994                    app: 1,
995                    format: Some(OutputFormat::Plain),
996                    os: None,
997                    show_all: false,
998                },
999                priority,
1000            },
1001            priority,
1002            submitted_at: 0,
1003            cancel: CancellationToken::new(),
1004            args_summary: "fake".into(),
1005        }
1006    }
1007
1008    #[tokio::test]
1009    async fn enqueue_returns_position_zero_for_empty_queue() {
1010        let s = DaemonState::new(None, 1, 0);
1011        let pos = s.enqueue(fake_queued(&s, false)).await;
1012        assert_eq!(pos, 0);
1013    }
1014
1015    #[tokio::test]
1016    async fn priority_jumps_non_priority() {
1017        let s = DaemonState::new(None, 1, 0);
1018        let _ = s.enqueue(fake_queued(&s, false)).await;
1019        let _ = s.enqueue(fake_queued(&s, false)).await;
1020        let prio_pos = s.enqueue(fake_queued(&s, true)).await;
1021        assert_eq!(prio_pos, 0, "first priority should land at the head");
1022
1023        let snap = s.snapshot().await;
1024        let kinds: Vec<bool> = snap.queue.iter().map(|j| j.priority).collect();
1025        assert_eq!(kinds, vec![true, false, false]);
1026    }
1027
1028    #[tokio::test]
1029    async fn cancel_queued_removes_and_emits_finished() {
1030        let s = DaemonState::new(None, 1, 0);
1031        let mut rx = s.events.subscribe();
1032        let _ = s.enqueue(fake_queued(&s, false)).await;
1033        let snap = s.snapshot().await;
1034        let target = snap.queue[0].job_id;
1035        s.cancel(target).await.expect("ok");
1036        let mut saw_finished = false;
1037        while let Ok(ev) = rx.try_recv() {
1038            if let Event::JobFinished { job_id, exit_code } = ev {
1039                assert_eq!(job_id, target);
1040                assert_eq!(exit_code, 130);
1041                saw_finished = true;
1042            }
1043        }
1044        assert!(saw_finished, "expected JobFinished after cancel");
1045    }
1046
1047    #[tokio::test]
1048    async fn toggle_priority_moves_across_boundary() {
1049        let s = DaemonState::new(None, 1, 0);
1050        let _ = s.enqueue(fake_queued(&s, true)).await;
1051        let _ = s.enqueue(fake_queued(&s, false)).await;
1052        let target = s.snapshot().await.queue[1].job_id;
1053        s.toggle_priority(target).await.expect("ok");
1054        let kinds: Vec<bool> = s
1055            .snapshot()
1056            .await
1057            .queue
1058            .iter()
1059            .map(|j| j.priority)
1060            .collect();
1061        assert_eq!(kinds, vec![true, true]);
1062    }
1063
1064    use tokio::io::duplex;
1065
1066    #[tokio::test]
1067    async fn status_request_round_trips() {
1068        let s = DaemonState::new(Some("acct".into()), 42, 1000);
1069        let (mut client, server) = duplex(64 * 1024);
1070        let server_state = s.clone();
1071        let server_task = tokio::spawn(async move {
1072            handle_connection(server_state, server).await;
1073        });
1074        crate::daemon::framing::write_frame(&mut client, &Frame::Request(Request::Status))
1075            .await
1076            .unwrap();
1077        let resp = crate::daemon::framing::read_frame(&mut client)
1078            .await
1079            .unwrap();
1080        match resp {
1081            Frame::Response(Response::Status(snap)) => {
1082                assert_eq!(snap.daemon_pid, 42);
1083                assert_eq!(snap.account.as_deref(), Some("acct"));
1084            }
1085            other => panic!("wrong: {other:?}"),
1086        }
1087        server_task.await.unwrap();
1088    }
1089
1090    #[tokio::test]
1091    async fn cancel_active_job_releases_active_lock_promptly() {
1092        // Construct a DaemonState with a fake active job.
1093        let s = DaemonState::new(None, 1, 0);
1094        let active_cancel = CancellationToken::new();
1095        let record = JobRecord {
1096            job_id: JobId(99),
1097            kind: JobKind::Info,
1098            args_summary: "fake".into(),
1099            priority: false,
1100            submitted_at: 0,
1101            started_at: Some(0),
1102            finished_at: None,
1103            exit_code: None,
1104            progress: None,
1105        };
1106        *s.active.lock().await = Some(RunningJob {
1107            record,
1108            cancel: active_cancel.clone(),
1109        });
1110
1111        // Cancel should return quickly (no lock held across the queue lock).
1112        let res =
1113            tokio::time::timeout(std::time::Duration::from_millis(100), s.cancel(JobId(99))).await;
1114        assert!(res.is_ok(), "cancel timed out -- deadlock?");
1115        assert!(active_cancel.is_cancelled());
1116    }
1117}