Skip to main content

cargowatch_runner/
lib.rs

1//! Managed build execution for CargoWatch.
2
3use std::path::{Path, PathBuf};
4use std::process::Stdio;
5
6use anyhow::{Context, Result, anyhow};
7use cargo_metadata::diagnostic::DiagnosticLevel;
8use cargo_metadata::{CompilerMessage, Message};
9use time::OffsetDateTime;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::process::Command;
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio::time::{Duration, sleep};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, info, warn};
17
18use cargowatch_core::{
19    ArtifactRecord, DetectedProcessClass, DiagnosticRecord, LogEntry, OutputStream, SessionEvent,
20    SessionFinished, SessionInfo, SessionMode, SessionStatus, SummaryCounts,
21    new_managed_session_id,
22};
23
24/// Channel type used to publish live runtime events.
25pub type EventSender = mpsc::UnboundedSender<SessionEvent>;
26
27/// Description of a managed command execution request.
28#[derive(Debug, Clone)]
29pub struct ManagedRunRequest {
30    /// Command line to launch.
31    pub command: Vec<String>,
32    /// Working directory used for the subprocess.
33    pub cwd: PathBuf,
34    /// Best-effort workspace root used for session metadata.
35    pub workspace_root: Option<PathBuf>,
36    /// Optional display title override.
37    pub title: Option<String>,
38}
39
40impl ManagedRunRequest {
41    /// Build a new run request.
42    pub fn new(command: Vec<String>, cwd: PathBuf, workspace_root: Option<PathBuf>) -> Self {
43        Self {
44            title: command.first().cloned(),
45            command,
46            cwd,
47            workspace_root,
48        }
49    }
50}
51
52/// A handle to a running managed session.
53pub struct ManagedSessionHandle {
54    session_id: String,
55    cancellation: CancellationToken,
56    task: JoinHandle<Result<SessionFinished>>,
57}
58
59impl ManagedSessionHandle {
60    /// Return the session identifier.
61    pub fn session_id(&self) -> &str {
62        &self.session_id
63    }
64
65    /// Request cancellation of the underlying subprocess.
66    pub fn cancel(&self) {
67        self.cancellation.cancel();
68    }
69
70    /// Wait for the session to finish.
71    pub async fn wait(self) -> Result<SessionFinished> {
72        self.task.await.context("managed session task failed")?
73    }
74}
75
76#[derive(Debug)]
77struct RawLine {
78    stream: OutputStream,
79    line: String,
80}
81
82/// Spawn a managed cargo or rust-related command.
83pub fn spawn_managed_session(
84    request: ManagedRunRequest,
85    sender: EventSender,
86) -> Result<ManagedSessionHandle> {
87    if request.command.is_empty() {
88        return Err(anyhow!("managed run requires a command after `--`"));
89    }
90
91    let session_id = new_managed_session_id();
92    let cancellation = CancellationToken::new();
93    let task = tokio::spawn(run_managed_session(
94        session_id.clone(),
95        request,
96        sender,
97        cancellation.clone(),
98    ));
99
100    Ok(ManagedSessionHandle {
101        session_id,
102        cancellation,
103        task,
104    })
105}
106
107async fn run_managed_session(
108    session_id: String,
109    request: ManagedRunRequest,
110    sender: EventSender,
111    cancellation: CancellationToken,
112) -> Result<SessionFinished> {
113    let mut summary = SummaryCounts::default();
114    let started_at = OffsetDateTime::now_utc();
115    let (command, parse_cargo_json) = prepare_command(&request.command);
116    let title = request
117        .title
118        .clone()
119        .unwrap_or_else(|| request.command.join(" "));
120    let classification = classify_command(&request.command);
121    let session_info = SessionInfo {
122        session_id: session_id.clone(),
123        mode: SessionMode::Managed,
124        title,
125        command: command.clone(),
126        cwd: request.cwd.clone(),
127        workspace_root: request.workspace_root.clone(),
128        started_at,
129        status: SessionStatus::Running,
130        external_pid: None,
131        classification,
132    };
133    let _ = sender.send(SessionEvent::SessionStarted(session_info));
134
135    info!(session_id, ?command, cwd = %request.cwd.display(), "starting managed session");
136
137    let program = command
138        .first()
139        .cloned()
140        .ok_or_else(|| anyhow!("missing program"))?;
141    let mut child = Command::new(program);
142    child
143        .args(command.iter().skip(1))
144        .current_dir(&request.cwd)
145        .stdout(Stdio::piped())
146        .stderr(Stdio::piped())
147        .stdin(Stdio::null())
148        .kill_on_drop(false);
149
150    let mut child = child.spawn().context("failed to spawn managed command")?;
151    let pid = child.id();
152    if let Some(pid) = pid {
153        debug!(session_id, pid, "managed session spawned");
154    }
155
156    let stdout = child.stdout.take().context("missing stdout pipe")?;
157    let stderr = child.stderr.take().context("missing stderr pipe")?;
158    let (line_tx, mut line_rx) = mpsc::unbounded_channel::<RawLine>();
159    let stdout_task = tokio::spawn(read_lines(stdout, OutputStream::Stdout, line_tx.clone()));
160    let stderr_task = tokio::spawn(read_lines(stderr, OutputStream::Stderr, line_tx));
161    let mut sequence = 0_u64;
162    let mut cancelled = false;
163    let mut streams_open = 2_u8;
164
165    loop {
166        tokio::select! {
167            _ = cancellation.cancelled(), if !cancelled => {
168                cancelled = true;
169                let cancel_line = LogEntry {
170                    sequence,
171                    timestamp: OffsetDateTime::now_utc(),
172                    stream: OutputStream::System,
173                    text: "Cancellation requested. Stopping managed process...".to_string(),
174                    raw: None,
175                    severity: Some(cargowatch_core::event::Severity::Warning),
176                };
177                summary.observe(cargowatch_core::event::Severity::Warning);
178                let _ = sender.send(SessionEvent::OutputLine { session_id: session_id.clone(), entry: cancel_line });
179                if let Err(error) = child.start_kill() {
180                    warn!(session_id, %error, "failed to terminate managed process");
181                }
182            }
183            maybe_line = line_rx.recv(), if streams_open > 0 => {
184                match maybe_line {
185                    Some(line) => {
186                        sequence += 1;
187                        process_line(
188                            &session_id,
189                            parse_cargo_json && line.stream == OutputStream::Stdout,
190                            line,
191                            sequence,
192                            &sender,
193                            &mut summary,
194                        );
195                    }
196                    None => {
197                        streams_open = 0;
198                    }
199                }
200            }
201            _ = sleep(Duration::from_millis(50)) => {
202                if let Some(status) = child.try_wait().context("managed child wait failed")? {
203                let finished_at = OffsetDateTime::now_utc();
204                let duration_ms =
205                    i64::try_from((finished_at - started_at).whole_milliseconds()).unwrap_or(i64::MAX);
206                let final_status = if cancelled {
207                    SessionStatus::Cancelled
208                } else if status.success() {
209                    SessionStatus::Succeeded
210                } else {
211                    SessionStatus::Failed
212                };
213                let finish = SessionFinished {
214                    session_id: session_id.clone(),
215                    finished_at,
216                    status: final_status,
217                    exit_code: status.code(),
218                    duration_ms,
219                    summary,
220                };
221                let _ = sender.send(SessionEvent::SessionFinished(finish.clone()));
222                let _ = stdout_task.await;
223                let _ = stderr_task.await;
224                info!(session_id, ?final_status, exit_code = ?finish.exit_code, duration_ms, "managed session finished");
225                return Ok(finish);
226                }
227            }
228        }
229    }
230}
231
232fn prepare_command(command: &[String]) -> (Vec<String>, bool) {
233    if !is_cargo_command(command) {
234        return (command.to_vec(), false);
235    }
236
237    if command
238        .iter()
239        .any(|part| part.starts_with("--message-format"))
240    {
241        return (command.to_vec(), true);
242    }
243
244    let mut prepared = command.to_vec();
245    prepared.push("--message-format=json-diagnostic-rendered-ansi".to_string());
246    (prepared, true)
247}
248
249fn is_cargo_command(command: &[String]) -> bool {
250    command
251        .first()
252        .map(|part| Path::new(part).file_stem().and_then(|stem| stem.to_str()) == Some("cargo"))
253        .unwrap_or(false)
254}
255
256async fn read_lines<R>(reader: R, stream: OutputStream, sender: mpsc::UnboundedSender<RawLine>)
257where
258    R: tokio::io::AsyncRead + Unpin,
259{
260    let mut lines = BufReader::new(reader).lines();
261    loop {
262        match lines.next_line().await {
263            Ok(Some(line)) => {
264                if sender.send(RawLine { stream, line }).is_err() {
265                    break;
266                }
267            }
268            Ok(None) => break,
269            Err(error) => {
270                warn!(?stream, %error, "failed to read child output");
271                break;
272            }
273        }
274    }
275}
276
277fn process_line(
278    session_id: &str,
279    try_parse_cargo_json: bool,
280    raw_line: RawLine,
281    sequence: u64,
282    sender: &EventSender,
283    summary: &mut SummaryCounts,
284) {
285    if try_parse_cargo_json
286        && let Some(events) = parse_cargo_json_line(session_id, &raw_line.line, sequence)
287    {
288        for event in events {
289            if let SessionEvent::OutputLine { entry, .. } = &event
290                && let Some(severity) = entry.severity
291            {
292                summary.observe(severity);
293            }
294            if let SessionEvent::Diagnostic { diagnostic, .. } = &event {
295                summary.observe(diagnostic.severity);
296            }
297            let _ = sender.send(event);
298        }
299        return;
300    }
301
302    let severity = infer_severity(&raw_line.line, raw_line.stream);
303    if let Some(severity) = severity {
304        summary.observe(severity);
305    }
306    let entry = LogEntry {
307        sequence,
308        timestamp: OffsetDateTime::now_utc(),
309        stream: raw_line.stream,
310        text: raw_line.line,
311        raw: None,
312        severity,
313    };
314    let _ = sender.send(SessionEvent::OutputLine {
315        session_id: session_id.to_string(),
316        entry,
317    });
318}
319
320fn parse_cargo_json_line(session_id: &str, line: &str, sequence: u64) -> Option<Vec<SessionEvent>> {
321    let message = serde_json::from_str::<Message>(line).ok()?;
322    let timestamp = OffsetDateTime::now_utc();
323
324    match message {
325        Message::CompilerMessage(message) => Some(handle_compiler_message(
326            session_id, message, sequence, timestamp, line,
327        )),
328        Message::CompilerArtifact(artifact) => {
329            let target_name = Some(artifact.target.name.clone());
330            let package_id = Some(artifact.package_id.to_string());
331            let text = format!(
332                "{} {}",
333                if artifact.fresh { "fresh" } else { "built" },
334                artifact.target.name
335            );
336            Some(vec![
337                SessionEvent::ArtifactBuilt {
338                    session_id: session_id.to_string(),
339                    artifact: ArtifactRecord {
340                        sequence,
341                        timestamp,
342                        package_id,
343                        target: target_name.clone(),
344                        filenames: artifact.filenames.into_iter().map(Into::into).collect(),
345                        executable: artifact.executable.map(Into::into),
346                        fresh: artifact.fresh,
347                    },
348                },
349                SessionEvent::OutputLine {
350                    session_id: session_id.to_string(),
351                    entry: LogEntry {
352                        sequence,
353                        timestamp,
354                        stream: OutputStream::Stdout,
355                        text,
356                        raw: Some(line.to_string()),
357                        severity: Some(cargowatch_core::event::Severity::Info),
358                    },
359                },
360            ])
361        }
362        Message::BuildScriptExecuted(script) => Some(vec![SessionEvent::OutputLine {
363            session_id: session_id.to_string(),
364            entry: LogEntry {
365                sequence,
366                timestamp,
367                stream: OutputStream::Stdout,
368                text: format!("build script executed for {}", script.package_id),
369                raw: Some(line.to_string()),
370                severity: Some(cargowatch_core::event::Severity::Info),
371            },
372        }]),
373        Message::BuildFinished(finished) => Some(vec![SessionEvent::OutputLine {
374            session_id: session_id.to_string(),
375            entry: LogEntry {
376                sequence,
377                timestamp,
378                stream: OutputStream::System,
379                text: if finished.success {
380                    "Build finished successfully".to_string()
381                } else {
382                    "Build finished with failures".to_string()
383                },
384                raw: Some(line.to_string()),
385                severity: Some(if finished.success {
386                    cargowatch_core::event::Severity::Success
387                } else {
388                    cargowatch_core::event::Severity::Error
389                }),
390            },
391        }]),
392        Message::TextLine(text) => Some(vec![SessionEvent::OutputLine {
393            session_id: session_id.to_string(),
394            entry: LogEntry {
395                sequence,
396                timestamp,
397                stream: OutputStream::Stdout,
398                text,
399                raw: Some(line.to_string()),
400                severity: Some(cargowatch_core::event::Severity::Info),
401            },
402        }]),
403        _ => None,
404    }
405}
406
407fn handle_compiler_message(
408    session_id: &str,
409    message: CompilerMessage,
410    sequence: u64,
411    timestamp: OffsetDateTime,
412    raw: &str,
413) -> Vec<SessionEvent> {
414    let severity = match message.message.level {
415        DiagnosticLevel::Ice | DiagnosticLevel::Error | DiagnosticLevel::FailureNote => {
416            cargowatch_core::event::Severity::Error
417        }
418        DiagnosticLevel::Warning => cargowatch_core::event::Severity::Warning,
419        DiagnosticLevel::Note => cargowatch_core::event::Severity::Note,
420        DiagnosticLevel::Help => cargowatch_core::event::Severity::Help,
421        _ => cargowatch_core::event::Severity::Info,
422    };
423    let primary_span = message.message.spans.iter().find(|span| span.is_primary);
424    let rendered = message
425        .message
426        .rendered
427        .clone()
428        .unwrap_or_else(|| message.message.message.clone());
429    let diagnostic = DiagnosticRecord {
430        id: format!(
431            "{}:{}:{}",
432            message.package_id,
433            message
434                .message
435                .code
436                .as_ref()
437                .map(|code| code.code.as_str())
438                .unwrap_or("diagnostic"),
439            sequence
440        ),
441        timestamp,
442        severity,
443        message: message.message.message.clone(),
444        rendered: Some(rendered.clone()),
445        code: message.message.code.as_ref().map(|code| code.code.clone()),
446        file: primary_span.map(|span| PathBuf::from(&span.file_name)),
447        line: primary_span.and_then(|span| u32::try_from(span.line_start).ok()),
448        column: primary_span.and_then(|span| u32::try_from(span.column_start).ok()),
449        target: Some(message.target.name.clone()),
450        package_id: Some(message.package_id.to_string()),
451    };
452    vec![
453        SessionEvent::Diagnostic {
454            session_id: session_id.to_string(),
455            diagnostic,
456        },
457        SessionEvent::OutputLine {
458            session_id: session_id.to_string(),
459            entry: LogEntry {
460                sequence,
461                timestamp,
462                stream: OutputStream::Stdout,
463                text: rendered,
464                raw: Some(raw.to_string()),
465                severity: Some(severity),
466            },
467        },
468    ]
469}
470
471fn infer_severity(line: &str, stream: OutputStream) -> Option<cargowatch_core::event::Severity> {
472    let lower = line.to_ascii_lowercase();
473    if lower.contains("error[") || lower.starts_with("error:") {
474        return Some(cargowatch_core::event::Severity::Error);
475    }
476    if lower.starts_with("warning:") {
477        return Some(cargowatch_core::event::Severity::Warning);
478    }
479    if lower.starts_with("note:") {
480        return Some(cargowatch_core::event::Severity::Note);
481    }
482    if lower.starts_with("help:") {
483        return Some(cargowatch_core::event::Severity::Help);
484    }
485    match stream {
486        OutputStream::Stdout | OutputStream::Stderr | OutputStream::System => {
487            Some(cargowatch_core::event::Severity::Info)
488        }
489    }
490}
491
492fn classify_command(command: &[String]) -> Option<DetectedProcessClass> {
493    if command.is_empty() {
494        return None;
495    }
496    let first = Path::new(&command[0])
497        .file_stem()
498        .and_then(|stem| stem.to_str())
499        .unwrap_or_default();
500    match first {
501        "cargo" => {
502            let subcommand = command.iter().skip(1).find(|part| !part.starts_with('-'))?;
503            match subcommand.as_str() {
504                "build" => Some(DetectedProcessClass::CargoBuild),
505                "check" => Some(DetectedProcessClass::CargoCheck),
506                "test" => Some(DetectedProcessClass::CargoTest),
507                "clippy" => Some(DetectedProcessClass::CargoClippy),
508                "doc" => Some(DetectedProcessClass::CargoDoc),
509                _ => Some(DetectedProcessClass::UnknownRustProcess),
510            }
511        }
512        "rustc" => Some(DetectedProcessClass::RustcCompile),
513        "rustdoc" => Some(DetectedProcessClass::Rustdoc),
514        "clippy-driver" => Some(DetectedProcessClass::CargoClippy),
515        _ => Some(DetectedProcessClass::UnknownRustProcess),
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use std::fs;
522
523    use proptest::prelude::*;
524
525    use super::*;
526
527    #[test]
528    fn cargo_json_fixture_produces_diagnostic_and_log() {
529        let fixture = fs::read_to_string("tests/fixtures/compiler-message.json")
530            .expect("fixture should load");
531        let events = parse_cargo_json_line("session-1", &fixture, 7).expect("json message");
532
533        assert_eq!(events.len(), 2);
534        assert!(matches!(events[0], SessionEvent::Diagnostic { .. }));
535        assert!(matches!(events[1], SessionEvent::OutputLine { .. }));
536    }
537
538    proptest! {
539        #[test]
540        fn non_json_lines_are_never_parsed_as_cargo_messages(input in "\\PC*") {
541            prop_assume!(serde_json::from_str::<serde_json::Value>(&input).is_err());
542            prop_assert!(parse_cargo_json_line("session", &input, 1).is_none());
543        }
544    }
545}