Skip to main content

steamroom_cli/daemon/
client.rs

1//! Client side of `--use-daemon`: connect, submit, attach to the event
2//! stream, render events to stdout via the same formatting the direct
3//! CLI uses.
4
5use interprocess::local_socket::tokio::Stream;
6use interprocess::local_socket::traits::tokio::Stream as _;
7
8use crate::cli::Cli;
9use crate::cli::DaemonSub;
10use crate::cli::OutputFormat as CliOutputFormat;
11use crate::daemon::framing::read_frame;
12use crate::daemon::framing::write_frame;
13use crate::daemon::ipc::socket_name;
14use crate::daemon::proto::Event;
15use crate::daemon::proto::Frame;
16use crate::daemon::proto::JobId;
17use crate::daemon::proto::JobRecord;
18use crate::daemon::proto::LogLevel;
19use crate::daemon::proto::Request;
20use crate::daemon::proto::Response;
21use crate::daemon::proto::StatusSnapshot;
22use crate::errors::CliError;
23
24pub async fn connect() -> Result<Stream, CliError> {
25    let name = socket_name()?;
26    Stream::connect(name).await.map_err(|e| match e.kind() {
27        std::io::ErrorKind::NotFound | std::io::ErrorKind::ConnectionRefused => {
28            CliError::NoDaemonRunning
29        }
30        _ => CliError::Io(e),
31    })
32}
33
34pub async fn dispatch_use_daemon(cli: Cli) -> Result<(), CliError> {
35    let detach = cli.detach;
36    let quiet = cli.quiet;
37    let no_progress = cli.no_progress;
38    let request = cli.into_rpc_request()?;
39    let mut stream = match connect().await {
40        Ok(s) => s,
41        Err(CliError::NoDaemonRunning) => {
42            eprintln!("no daemon running; starting one in lazy auth mode...");
43            auto_spawn_daemon().await?;
44            connect().await?
45        }
46        Err(e) => return Err(e),
47    };
48    write_frame(&mut stream, &Frame::Request(request)).await?;
49
50    let resp = read_frame(&mut stream).await?;
51    let (job_id, position) = match resp {
52        Frame::Response(Response::JobAccepted { job_id, position }) => (job_id, position),
53        Frame::Response(Response::Error { kind, message }) => {
54            return Err(CliError::DaemonError(format!("{kind:?}: {message}")));
55        }
56        other => {
57            return Err(CliError::MalformedFrame(format!(
58                "expected JobAccepted, got {other:?}"
59            )));
60        }
61    };
62
63    if detach {
64        println!("job {} queued (position {})", job_id, position);
65        return Ok(());
66    }
67
68    attach_loop(&mut stream, job_id, quiet, no_progress).await
69}
70
71/// Render attached events to the user's terminal. `quiet` suppresses
72/// `Event::Stdout` lines (matching direct-mode `--quiet`). `no_progress`
73/// suppresses the indicatif progress bar (matching direct-mode
74/// `--no-progress`). `Event::Log` events are always forwarded through
75/// `tracing` so the user's `--debug` / `--quiet` filter applies.
76struct AttachRenderer {
77    bar: Option<indicatif::ProgressBar>,
78    quiet: bool,
79    no_progress: bool,
80}
81
82impl AttachRenderer {
83    fn new(quiet: bool, no_progress: bool) -> Self {
84        Self {
85            bar: None,
86            quiet,
87            no_progress,
88        }
89    }
90
91    fn handle(&mut self, ev: Event) {
92        match ev {
93            Event::Stdout { line, .. } => {
94                if !self.quiet {
95                    println!("{line}");
96                }
97            }
98            Event::Log {
99                level,
100                target,
101                message,
102                ..
103            } => emit_log(level, &target, &message),
104            Event::Progress { update, .. } => {
105                if self.no_progress {
106                    return;
107                }
108                let bar = self.bar.get_or_insert_with(|| {
109                    let pb = indicatif::ProgressBar::new(update.bytes_total);
110                    pb.set_style(
111                        indicatif::ProgressStyle::default_bar()
112                            .template("{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
113                            .unwrap()
114                            .progress_chars("=> "),
115                    );
116                    pb
117                });
118                bar.set_length(update.bytes_total);
119                bar.set_position(update.bytes_done);
120            }
121            Event::JobFinished { .. } => {
122                if let Some(bar) = self.bar.take() {
123                    bar.finish_and_clear();
124                }
125            }
126            Event::JobStarted { .. } | Event::QueueChanged { .. } => {}
127        }
128    }
129}
130
131/// Stream events from a connection and render them as the direct CLI
132/// would. Returns when EndOfStream arrives, when Ctrl-C detaches, or
133/// when the socket closes cleanly.
134pub async fn attach_loop(
135    stream: &mut Stream,
136    job_id: JobId,
137    quiet: bool,
138    no_progress: bool,
139) -> Result<(), CliError> {
140    let mut renderer = AttachRenderer::new(quiet, no_progress);
141    let ctrl_c = tokio::signal::ctrl_c();
142    tokio::pin!(ctrl_c);
143
144    loop {
145        let frame_fut = read_frame(stream);
146        tokio::pin!(frame_fut);
147        tokio::select! {
148            _ = &mut ctrl_c => {
149                eprintln!("detached -- reattach with `steamroom daemon attach {}`", job_id.0);
150                return Ok(());
151            }
152            r = &mut frame_fut => match r {
153                Ok(Frame::Event(ev)) => renderer.handle(ev),
154                Ok(Frame::EndOfStream { exit_code }) => {
155                    if exit_code != 0 {
156                        std::process::exit(exit_code);
157                    }
158                    return Ok(());
159                }
160                Ok(other) => {
161                    return Err(CliError::MalformedFrame(format!("unexpected frame: {other:?}")));
162                }
163                Err(CliError::SocketClosed) => return Ok(()),
164                Err(e) => return Err(e),
165            }
166        }
167    }
168}
169
170fn emit_log(level: LogLevel, target: &str, message: &str) {
171    match level {
172        LogLevel::Error => tracing::error!(target: "daemon", "{target}: {message}"),
173        LogLevel::Warn => tracing::warn!(target: "daemon", "{target}: {message}"),
174        LogLevel::Info => tracing::info!(target: "daemon", "{target}: {message}"),
175        LogLevel::Debug => tracing::debug!(target: "daemon", "{target}: {message}"),
176        LogLevel::Trace => tracing::trace!(target: "daemon", "{target}: {message}"),
177    }
178}
179
180// -- Subcommand handlers (T20) ----------------------------------------
181
182pub async fn run_daemon_subcommand(
183    sub: DaemonSub,
184    quiet: bool,
185    no_progress: bool,
186) -> Result<(), CliError> {
187    match sub {
188        // `Start` is intercepted in main() before the tokio runtime is built,
189        // because launching the daemon must fork after dropping the runtime.
190        // Reaching this arm means the dispatcher missed it.
191        DaemonSub::Start => unreachable!("daemon start is dispatched in main() pre-runtime"),
192        DaemonSub::Info => {
193            crate::daemon::lifecycle::render_daemon_info();
194            Ok(())
195        }
196        DaemonSub::Stop { force } => stop_daemon(force).await,
197        DaemonSub::Attach { job_id } => attach_existing(JobId(job_id), quiet, no_progress).await,
198        DaemonSub::Status { text, format } => {
199            // Any explicit format implies text mode -- the TUI doesn't
200            // render JSON / plain / table choices, only its own widgets.
201            // When built without the `tui` feature, fall back to a text
202            // snapshot regardless of which flags were passed.
203            #[cfg(feature = "tui")]
204            {
205                if text || format.is_some() {
206                    print_status_once(format).await
207                } else {
208                    crate::daemon::tui::run_tui().await
209                }
210            }
211            #[cfg(not(feature = "tui"))]
212            {
213                let _ = text; // both flags are honored as "print once" here
214                print_status_once(format).await
215            }
216        }
217    }
218}
219
220async fn stop_daemon(force: bool) -> Result<(), CliError> {
221    let mut stream = connect().await?;
222    write_frame(&mut stream, &Frame::Request(Request::Stop { force })).await?;
223    let resp = read_frame(&mut stream).await?;
224    match resp {
225        Frame::Response(Response::Stopping) => {
226            if force {
227                println!("stopping daemon (cancelling active job)");
228            } else {
229                println!("stopping daemon (active job will finish)");
230            }
231            Ok(())
232        }
233        Frame::Response(Response::Error { kind, message }) => {
234            Err(CliError::DaemonError(format!("{kind:?}: {message}")))
235        }
236        other => Err(CliError::MalformedFrame(format!(
237            "expected Stopping, got {other:?}"
238        ))),
239    }
240}
241
242async fn attach_existing(job_id: JobId, quiet: bool, no_progress: bool) -> Result<(), CliError> {
243    let mut stream = connect().await?;
244    write_frame(&mut stream, &Frame::Request(Request::Attach { job_id })).await?;
245    attach_loop(&mut stream, job_id, quiet, no_progress).await
246}
247
248async fn print_status_once(format: Option<CliOutputFormat>) -> Result<(), CliError> {
249    let mut stream = connect().await?;
250    write_frame(&mut stream, &Frame::Request(Request::Status)).await?;
251    let resp = read_frame(&mut stream).await?;
252    let snap = match resp {
253        Frame::Response(Response::Status(s)) => s,
254        Frame::Response(Response::Error { kind, message }) => {
255            return Err(CliError::DaemonError(format!("{kind:?}: {message}")));
256        }
257        other => {
258            return Err(CliError::MalformedFrame(format!(
259                "expected Status, got {other:?}"
260            )));
261        }
262    };
263    match format {
264        Some(CliOutputFormat::Json) => print_status_json(&snap),
265        _ => print_status_table(&snap),
266    }
267    Ok(())
268}
269
270fn print_status_json(snap: &StatusSnapshot) {
271    let json = serde_json::json!({
272        "daemon_pid": snap.daemon_pid,
273        "daemon_started_at": snap.daemon_started_at,
274        "account": snap.account,
275        "active": snap.active.as_ref().map(record_to_json),
276        "queue": snap.queue.iter().map(record_to_json).collect::<Vec<_>>(),
277        "recent": snap.recent.iter().map(record_to_json).collect::<Vec<_>>(),
278    });
279    println!(
280        "{}",
281        serde_json::to_string_pretty(&json).expect("snapshot is JSON-clean")
282    );
283}
284
285fn record_to_json(r: &JobRecord) -> serde_json::Value {
286    serde_json::json!({
287        "job_id": r.job_id.0,
288        "kind": format!("{:?}", r.kind),
289        "args_summary": r.args_summary,
290        "priority": r.priority,
291        "submitted_at": r.submitted_at,
292        "started_at": r.started_at,
293        "finished_at": r.finished_at,
294        "exit_code": r.exit_code,
295    })
296}
297
298fn print_status_table(snap: &StatusSnapshot) {
299    println!("daemon pid : {}", snap.daemon_pid);
300    println!(
301        "account    : {}",
302        snap.account.as_deref().unwrap_or("(none)")
303    );
304    println!();
305    match &snap.active {
306        Some(active) => {
307            println!("Active:");
308            println!(
309                "  {} ({:?}) {}",
310                active.job_id, active.kind, active.args_summary
311            );
312        }
313        None => println!("Active: (idle)"),
314    }
315    if !snap.queue.is_empty() {
316        println!();
317        println!("Queue:");
318        for j in &snap.queue {
319            let mark = if j.priority { "*" } else { " " };
320            println!("  {} {} ({:?}) {}", mark, j.job_id, j.kind, j.args_summary);
321        }
322    }
323    if !snap.recent.is_empty() {
324        println!();
325        println!("Recent:");
326        for j in &snap.recent {
327            let ec = j.exit_code.map(|c| format!("exit {c}")).unwrap_or_default();
328            println!("  {} ({:?}) {} {}", j.job_id, j.kind, j.args_summary, ec);
329        }
330    }
331}
332
333/// Auto-spawn a lazy-auth daemon when `--use-daemon` finds no running
334/// daemon. Runs `steamroom daemon start` as a subprocess; that process
335/// does its own fork+exec dance, probes the socket until bound, and
336/// exits 0. Returning Ok here guarantees the socket is up.
337async fn auto_spawn_daemon() -> Result<(), CliError> {
338    use std::process::Stdio;
339    let exe = std::env::current_exe().map_err(CliError::Io)?;
340    let status = tokio::process::Command::new(exe)
341        .args(["daemon", "start"])
342        .stdin(Stdio::null())
343        // The spawned `daemon start` would otherwise print its info
344        // banner to our stdout, which mixes with the user's --use-daemon
345        // output. Suppress both streams; the daemon log file has the
346        // same content.
347        .stdout(Stdio::null())
348        .stderr(Stdio::null())
349        .status()
350        .await
351        .map_err(CliError::Io)?;
352    if !status.success() {
353        return Err(CliError::DaemonError(format!(
354            "auto-spawn `daemon start` exited with status {status}"
355        )));
356    }
357    Ok(())
358}