1use interprocess::local_socket::tokio::Stream;
6use interprocess::local_socket::traits::tokio::Stream as _;
7
8use crate::cli::Cli;
9use crate::cli::DaemonSub;
10use crate::cli::OutputFormat as CliOutputFormat;
11use crate::daemon::framing::read_frame;
12use crate::daemon::framing::write_frame;
13use crate::daemon::ipc::socket_name;
14use crate::daemon::proto::Event;
15use crate::daemon::proto::Frame;
16use crate::daemon::proto::JobId;
17use crate::daemon::proto::JobRecord;
18use crate::daemon::proto::LogLevel;
19use crate::daemon::proto::Request;
20use crate::daemon::proto::Response;
21use crate::daemon::proto::StatusSnapshot;
22use crate::errors::CliError;
23
24pub async fn connect() -> Result<Stream, CliError> {
25 let name = socket_name()?;
26 Stream::connect(name).await.map_err(|e| match e.kind() {
27 std::io::ErrorKind::NotFound | std::io::ErrorKind::ConnectionRefused => {
28 CliError::NoDaemonRunning
29 }
30 _ => CliError::Io(e),
31 })
32}
33
34pub async fn dispatch_use_daemon(cli: Cli) -> Result<(), CliError> {
35 let detach = cli.detach;
36 let quiet = cli.quiet;
37 let no_progress = cli.no_progress;
38 let request = cli.into_rpc_request()?;
39 let mut stream = match connect().await {
40 Ok(s) => s,
41 Err(CliError::NoDaemonRunning) => {
42 eprintln!("no daemon running; starting one in lazy auth mode...");
43 auto_spawn_daemon().await?;
44 connect().await?
45 }
46 Err(e) => return Err(e),
47 };
48 write_frame(&mut stream, &Frame::Request(request)).await?;
49
50 let resp = read_frame(&mut stream).await?;
51 let (job_id, position) = match resp {
52 Frame::Response(Response::JobAccepted { job_id, position }) => (job_id, position),
53 Frame::Response(Response::Error { kind, message }) => {
54 return Err(CliError::DaemonError(format!("{kind:?}: {message}")));
55 }
56 other => {
57 return Err(CliError::MalformedFrame(format!(
58 "expected JobAccepted, got {other:?}"
59 )));
60 }
61 };
62
63 if detach {
64 println!("job {} queued (position {})", job_id, position);
65 return Ok(());
66 }
67
68 attach_loop(&mut stream, job_id, quiet, no_progress).await
69}
70
71struct AttachRenderer {
77 bar: Option<indicatif::ProgressBar>,
78 quiet: bool,
79 no_progress: bool,
80}
81
82impl AttachRenderer {
83 fn new(quiet: bool, no_progress: bool) -> Self {
84 Self {
85 bar: None,
86 quiet,
87 no_progress,
88 }
89 }
90
91 fn handle(&mut self, ev: Event) {
92 match ev {
93 Event::Stdout { line, .. } => {
94 if !self.quiet {
95 println!("{line}");
96 }
97 }
98 Event::Log {
99 level,
100 target,
101 message,
102 ..
103 } => emit_log(level, &target, &message),
104 Event::Progress { update, .. } => {
105 if self.no_progress {
106 return;
107 }
108 let bar = self.bar.get_or_insert_with(|| {
109 let pb = indicatif::ProgressBar::new(update.bytes_total);
110 pb.set_style(
111 indicatif::ProgressStyle::default_bar()
112 .template("{spinner:.green} [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})")
113 .unwrap()
114 .progress_chars("=> "),
115 );
116 pb
117 });
118 bar.set_length(update.bytes_total);
119 bar.set_position(update.bytes_done);
120 }
121 Event::JobFinished { .. } => {
122 if let Some(bar) = self.bar.take() {
123 bar.finish_and_clear();
124 }
125 }
126 Event::JobStarted { .. } | Event::QueueChanged { .. } => {}
127 }
128 }
129}
130
131pub async fn attach_loop(
135 stream: &mut Stream,
136 job_id: JobId,
137 quiet: bool,
138 no_progress: bool,
139) -> Result<(), CliError> {
140 let mut renderer = AttachRenderer::new(quiet, no_progress);
141 let ctrl_c = tokio::signal::ctrl_c();
142 tokio::pin!(ctrl_c);
143
144 loop {
145 let frame_fut = read_frame(stream);
146 tokio::pin!(frame_fut);
147 tokio::select! {
148 _ = &mut ctrl_c => {
149 eprintln!("detached -- reattach with `steamroom daemon attach {}`", job_id.0);
150 return Ok(());
151 }
152 r = &mut frame_fut => match r {
153 Ok(Frame::Event(ev)) => renderer.handle(ev),
154 Ok(Frame::EndOfStream { exit_code }) => {
155 if exit_code != 0 {
156 std::process::exit(exit_code);
157 }
158 return Ok(());
159 }
160 Ok(other) => {
161 return Err(CliError::MalformedFrame(format!("unexpected frame: {other:?}")));
162 }
163 Err(CliError::SocketClosed) => return Ok(()),
164 Err(e) => return Err(e),
165 }
166 }
167 }
168}
169
170fn emit_log(level: LogLevel, target: &str, message: &str) {
171 match level {
172 LogLevel::Error => tracing::error!(target: "daemon", "{target}: {message}"),
173 LogLevel::Warn => tracing::warn!(target: "daemon", "{target}: {message}"),
174 LogLevel::Info => tracing::info!(target: "daemon", "{target}: {message}"),
175 LogLevel::Debug => tracing::debug!(target: "daemon", "{target}: {message}"),
176 LogLevel::Trace => tracing::trace!(target: "daemon", "{target}: {message}"),
177 }
178}
179
180pub async fn run_daemon_subcommand(
183 sub: DaemonSub,
184 quiet: bool,
185 no_progress: bool,
186) -> Result<(), CliError> {
187 match sub {
188 DaemonSub::Start => unreachable!("daemon start is dispatched in main() pre-runtime"),
192 DaemonSub::Info => {
193 crate::daemon::lifecycle::render_daemon_info();
194 Ok(())
195 }
196 DaemonSub::Stop { force } => stop_daemon(force).await,
197 DaemonSub::Attach { job_id } => attach_existing(JobId(job_id), quiet, no_progress).await,
198 DaemonSub::Status { text, format } => {
199 #[cfg(feature = "tui")]
204 {
205 if text || format.is_some() {
206 print_status_once(format).await
207 } else {
208 crate::daemon::tui::run_tui().await
209 }
210 }
211 #[cfg(not(feature = "tui"))]
212 {
213 let _ = text; print_status_once(format).await
215 }
216 }
217 }
218}
219
220async fn stop_daemon(force: bool) -> Result<(), CliError> {
221 let mut stream = connect().await?;
222 write_frame(&mut stream, &Frame::Request(Request::Stop { force })).await?;
223 let resp = read_frame(&mut stream).await?;
224 match resp {
225 Frame::Response(Response::Stopping) => {
226 if force {
227 println!("stopping daemon (cancelling active job)");
228 } else {
229 println!("stopping daemon (active job will finish)");
230 }
231 Ok(())
232 }
233 Frame::Response(Response::Error { kind, message }) => {
234 Err(CliError::DaemonError(format!("{kind:?}: {message}")))
235 }
236 other => Err(CliError::MalformedFrame(format!(
237 "expected Stopping, got {other:?}"
238 ))),
239 }
240}
241
242async fn attach_existing(job_id: JobId, quiet: bool, no_progress: bool) -> Result<(), CliError> {
243 let mut stream = connect().await?;
244 write_frame(&mut stream, &Frame::Request(Request::Attach { job_id })).await?;
245 attach_loop(&mut stream, job_id, quiet, no_progress).await
246}
247
248async fn print_status_once(format: Option<CliOutputFormat>) -> Result<(), CliError> {
249 let mut stream = connect().await?;
250 write_frame(&mut stream, &Frame::Request(Request::Status)).await?;
251 let resp = read_frame(&mut stream).await?;
252 let snap = match resp {
253 Frame::Response(Response::Status(s)) => s,
254 Frame::Response(Response::Error { kind, message }) => {
255 return Err(CliError::DaemonError(format!("{kind:?}: {message}")));
256 }
257 other => {
258 return Err(CliError::MalformedFrame(format!(
259 "expected Status, got {other:?}"
260 )));
261 }
262 };
263 match format {
264 Some(CliOutputFormat::Json) => print_status_json(&snap),
265 _ => print_status_table(&snap),
266 }
267 Ok(())
268}
269
270fn print_status_json(snap: &StatusSnapshot) {
271 let json = serde_json::json!({
272 "daemon_pid": snap.daemon_pid,
273 "daemon_started_at": snap.daemon_started_at,
274 "account": snap.account,
275 "active": snap.active.as_ref().map(record_to_json),
276 "queue": snap.queue.iter().map(record_to_json).collect::<Vec<_>>(),
277 "recent": snap.recent.iter().map(record_to_json).collect::<Vec<_>>(),
278 });
279 println!(
280 "{}",
281 serde_json::to_string_pretty(&json).expect("snapshot is JSON-clean")
282 );
283}
284
285fn record_to_json(r: &JobRecord) -> serde_json::Value {
286 serde_json::json!({
287 "job_id": r.job_id.0,
288 "kind": format!("{:?}", r.kind),
289 "args_summary": r.args_summary,
290 "priority": r.priority,
291 "submitted_at": r.submitted_at,
292 "started_at": r.started_at,
293 "finished_at": r.finished_at,
294 "exit_code": r.exit_code,
295 })
296}
297
298fn print_status_table(snap: &StatusSnapshot) {
299 println!("daemon pid : {}", snap.daemon_pid);
300 println!(
301 "account : {}",
302 snap.account.as_deref().unwrap_or("(none)")
303 );
304 println!();
305 match &snap.active {
306 Some(active) => {
307 println!("Active:");
308 println!(
309 " {} ({:?}) {}",
310 active.job_id, active.kind, active.args_summary
311 );
312 }
313 None => println!("Active: (idle)"),
314 }
315 if !snap.queue.is_empty() {
316 println!();
317 println!("Queue:");
318 for j in &snap.queue {
319 let mark = if j.priority { "*" } else { " " };
320 println!(" {} {} ({:?}) {}", mark, j.job_id, j.kind, j.args_summary);
321 }
322 }
323 if !snap.recent.is_empty() {
324 println!();
325 println!("Recent:");
326 for j in &snap.recent {
327 let ec = j.exit_code.map(|c| format!("exit {c}")).unwrap_or_default();
328 println!(" {} ({:?}) {} {}", j.job_id, j.kind, j.args_summary, ec);
329 }
330 }
331}
332
333async fn auto_spawn_daemon() -> Result<(), CliError> {
338 use std::process::Stdio;
339 let exe = std::env::current_exe().map_err(CliError::Io)?;
340 let status = tokio::process::Command::new(exe)
341 .args(["daemon", "start"])
342 .stdin(Stdio::null())
343 .stdout(Stdio::null())
348 .stderr(Stdio::null())
349 .status()
350 .await
351 .map_err(CliError::Io)?;
352 if !status.success() {
353 return Err(CliError::DaemonError(format!(
354 "auto-spawn `daemon start` exited with status {status}"
355 )));
356 }
357 Ok(())
358}