1use std::path::{Path, PathBuf};
4use std::process::Stdio;
5
6use anyhow::{Context, Result, anyhow};
7use cargo_metadata::diagnostic::DiagnosticLevel;
8use cargo_metadata::{CompilerMessage, Message};
9use time::OffsetDateTime;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::process::Command;
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14use tokio::time::{Duration, sleep};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, info, warn};
17
18use cargowatch_core::{
19 ArtifactRecord, DetectedProcessClass, DiagnosticRecord, LogEntry, OutputStream, SessionEvent,
20 SessionFinished, SessionInfo, SessionMode, SessionStatus, SummaryCounts,
21 new_managed_session_id,
22};
23
24pub type EventSender = mpsc::UnboundedSender<SessionEvent>;
26
27#[derive(Debug, Clone)]
29pub struct ManagedRunRequest {
30 pub command: Vec<String>,
32 pub cwd: PathBuf,
34 pub workspace_root: Option<PathBuf>,
36 pub title: Option<String>,
38}
39
40impl ManagedRunRequest {
41 pub fn new(command: Vec<String>, cwd: PathBuf, workspace_root: Option<PathBuf>) -> Self {
43 Self {
44 title: command.first().cloned(),
45 command,
46 cwd,
47 workspace_root,
48 }
49 }
50}
51
52pub struct ManagedSessionHandle {
54 session_id: String,
55 cancellation: CancellationToken,
56 task: JoinHandle<Result<SessionFinished>>,
57}
58
59impl ManagedSessionHandle {
60 pub fn session_id(&self) -> &str {
62 &self.session_id
63 }
64
65 pub fn cancel(&self) {
67 self.cancellation.cancel();
68 }
69
70 pub async fn wait(self) -> Result<SessionFinished> {
72 self.task.await.context("managed session task failed")?
73 }
74}
75
76#[derive(Debug)]
77struct RawLine {
78 stream: OutputStream,
79 line: String,
80}
81
82pub fn spawn_managed_session(
84 request: ManagedRunRequest,
85 sender: EventSender,
86) -> Result<ManagedSessionHandle> {
87 if request.command.is_empty() {
88 return Err(anyhow!("managed run requires a command after `--`"));
89 }
90
91 let session_id = new_managed_session_id();
92 let cancellation = CancellationToken::new();
93 let task = tokio::spawn(run_managed_session(
94 session_id.clone(),
95 request,
96 sender,
97 cancellation.clone(),
98 ));
99
100 Ok(ManagedSessionHandle {
101 session_id,
102 cancellation,
103 task,
104 })
105}
106
107async fn run_managed_session(
108 session_id: String,
109 request: ManagedRunRequest,
110 sender: EventSender,
111 cancellation: CancellationToken,
112) -> Result<SessionFinished> {
113 let mut summary = SummaryCounts::default();
114 let started_at = OffsetDateTime::now_utc();
115 let (command, parse_cargo_json) = prepare_command(&request.command);
116 let title = request
117 .title
118 .clone()
119 .unwrap_or_else(|| request.command.join(" "));
120 let classification = classify_command(&request.command);
121 let session_info = SessionInfo {
122 session_id: session_id.clone(),
123 mode: SessionMode::Managed,
124 title,
125 command: command.clone(),
126 cwd: request.cwd.clone(),
127 workspace_root: request.workspace_root.clone(),
128 started_at,
129 status: SessionStatus::Running,
130 external_pid: None,
131 classification,
132 };
133 let _ = sender.send(SessionEvent::SessionStarted(session_info));
134
135 info!(session_id, ?command, cwd = %request.cwd.display(), "starting managed session");
136
137 let program = command
138 .first()
139 .cloned()
140 .ok_or_else(|| anyhow!("missing program"))?;
141 let mut child = Command::new(program);
142 child
143 .args(command.iter().skip(1))
144 .current_dir(&request.cwd)
145 .stdout(Stdio::piped())
146 .stderr(Stdio::piped())
147 .stdin(Stdio::null())
148 .kill_on_drop(false);
149
150 let mut child = child.spawn().context("failed to spawn managed command")?;
151 let pid = child.id();
152 if let Some(pid) = pid {
153 debug!(session_id, pid, "managed session spawned");
154 }
155
156 let stdout = child.stdout.take().context("missing stdout pipe")?;
157 let stderr = child.stderr.take().context("missing stderr pipe")?;
158 let (line_tx, mut line_rx) = mpsc::unbounded_channel::<RawLine>();
159 let stdout_task = tokio::spawn(read_lines(stdout, OutputStream::Stdout, line_tx.clone()));
160 let stderr_task = tokio::spawn(read_lines(stderr, OutputStream::Stderr, line_tx));
161 let mut sequence = 0_u64;
162 let mut cancelled = false;
163 let mut streams_open = 2_u8;
164
165 loop {
166 tokio::select! {
167 _ = cancellation.cancelled(), if !cancelled => {
168 cancelled = true;
169 let cancel_line = LogEntry {
170 sequence,
171 timestamp: OffsetDateTime::now_utc(),
172 stream: OutputStream::System,
173 text: "Cancellation requested. Stopping managed process...".to_string(),
174 raw: None,
175 severity: Some(cargowatch_core::event::Severity::Warning),
176 };
177 summary.observe(cargowatch_core::event::Severity::Warning);
178 let _ = sender.send(SessionEvent::OutputLine { session_id: session_id.clone(), entry: cancel_line });
179 if let Err(error) = child.start_kill() {
180 warn!(session_id, %error, "failed to terminate managed process");
181 }
182 }
183 maybe_line = line_rx.recv(), if streams_open > 0 => {
184 match maybe_line {
185 Some(line) => {
186 sequence += 1;
187 process_line(
188 &session_id,
189 parse_cargo_json && line.stream == OutputStream::Stdout,
190 line,
191 sequence,
192 &sender,
193 &mut summary,
194 );
195 }
196 None => {
197 streams_open = 0;
198 }
199 }
200 }
201 _ = sleep(Duration::from_millis(50)) => {
202 if let Some(status) = child.try_wait().context("managed child wait failed")? {
203 let finished_at = OffsetDateTime::now_utc();
204 let duration_ms =
205 i64::try_from((finished_at - started_at).whole_milliseconds()).unwrap_or(i64::MAX);
206 let final_status = if cancelled {
207 SessionStatus::Cancelled
208 } else if status.success() {
209 SessionStatus::Succeeded
210 } else {
211 SessionStatus::Failed
212 };
213 let finish = SessionFinished {
214 session_id: session_id.clone(),
215 finished_at,
216 status: final_status,
217 exit_code: status.code(),
218 duration_ms,
219 summary,
220 };
221 let _ = sender.send(SessionEvent::SessionFinished(finish.clone()));
222 let _ = stdout_task.await;
223 let _ = stderr_task.await;
224 info!(session_id, ?final_status, exit_code = ?finish.exit_code, duration_ms, "managed session finished");
225 return Ok(finish);
226 }
227 }
228 }
229 }
230}
231
232fn prepare_command(command: &[String]) -> (Vec<String>, bool) {
233 if !is_cargo_command(command) {
234 return (command.to_vec(), false);
235 }
236
237 if command
238 .iter()
239 .any(|part| part.starts_with("--message-format"))
240 {
241 return (command.to_vec(), true);
242 }
243
244 let mut prepared = command.to_vec();
245 prepared.push("--message-format=json-diagnostic-rendered-ansi".to_string());
246 (prepared, true)
247}
248
249fn is_cargo_command(command: &[String]) -> bool {
250 command
251 .first()
252 .map(|part| Path::new(part).file_stem().and_then(|stem| stem.to_str()) == Some("cargo"))
253 .unwrap_or(false)
254}
255
256async fn read_lines<R>(reader: R, stream: OutputStream, sender: mpsc::UnboundedSender<RawLine>)
257where
258 R: tokio::io::AsyncRead + Unpin,
259{
260 let mut lines = BufReader::new(reader).lines();
261 loop {
262 match lines.next_line().await {
263 Ok(Some(line)) => {
264 if sender.send(RawLine { stream, line }).is_err() {
265 break;
266 }
267 }
268 Ok(None) => break,
269 Err(error) => {
270 warn!(?stream, %error, "failed to read child output");
271 break;
272 }
273 }
274 }
275}
276
277fn process_line(
278 session_id: &str,
279 try_parse_cargo_json: bool,
280 raw_line: RawLine,
281 sequence: u64,
282 sender: &EventSender,
283 summary: &mut SummaryCounts,
284) {
285 if try_parse_cargo_json
286 && let Some(events) = parse_cargo_json_line(session_id, &raw_line.line, sequence)
287 {
288 for event in events {
289 if let SessionEvent::OutputLine { entry, .. } = &event
290 && let Some(severity) = entry.severity
291 {
292 summary.observe(severity);
293 }
294 if let SessionEvent::Diagnostic { diagnostic, .. } = &event {
295 summary.observe(diagnostic.severity);
296 }
297 let _ = sender.send(event);
298 }
299 return;
300 }
301
302 let severity = infer_severity(&raw_line.line, raw_line.stream);
303 if let Some(severity) = severity {
304 summary.observe(severity);
305 }
306 let entry = LogEntry {
307 sequence,
308 timestamp: OffsetDateTime::now_utc(),
309 stream: raw_line.stream,
310 text: raw_line.line,
311 raw: None,
312 severity,
313 };
314 let _ = sender.send(SessionEvent::OutputLine {
315 session_id: session_id.to_string(),
316 entry,
317 });
318}
319
320fn parse_cargo_json_line(session_id: &str, line: &str, sequence: u64) -> Option<Vec<SessionEvent>> {
321 let message = serde_json::from_str::<Message>(line).ok()?;
322 let timestamp = OffsetDateTime::now_utc();
323
324 match message {
325 Message::CompilerMessage(message) => Some(handle_compiler_message(
326 session_id, message, sequence, timestamp, line,
327 )),
328 Message::CompilerArtifact(artifact) => {
329 let target_name = Some(artifact.target.name.clone());
330 let package_id = Some(artifact.package_id.to_string());
331 let text = format!(
332 "{} {}",
333 if artifact.fresh { "fresh" } else { "built" },
334 artifact.target.name
335 );
336 Some(vec![
337 SessionEvent::ArtifactBuilt {
338 session_id: session_id.to_string(),
339 artifact: ArtifactRecord {
340 sequence,
341 timestamp,
342 package_id,
343 target: target_name.clone(),
344 filenames: artifact.filenames.into_iter().map(Into::into).collect(),
345 executable: artifact.executable.map(Into::into),
346 fresh: artifact.fresh,
347 },
348 },
349 SessionEvent::OutputLine {
350 session_id: session_id.to_string(),
351 entry: LogEntry {
352 sequence,
353 timestamp,
354 stream: OutputStream::Stdout,
355 text,
356 raw: Some(line.to_string()),
357 severity: Some(cargowatch_core::event::Severity::Info),
358 },
359 },
360 ])
361 }
362 Message::BuildScriptExecuted(script) => Some(vec![SessionEvent::OutputLine {
363 session_id: session_id.to_string(),
364 entry: LogEntry {
365 sequence,
366 timestamp,
367 stream: OutputStream::Stdout,
368 text: format!("build script executed for {}", script.package_id),
369 raw: Some(line.to_string()),
370 severity: Some(cargowatch_core::event::Severity::Info),
371 },
372 }]),
373 Message::BuildFinished(finished) => Some(vec![SessionEvent::OutputLine {
374 session_id: session_id.to_string(),
375 entry: LogEntry {
376 sequence,
377 timestamp,
378 stream: OutputStream::System,
379 text: if finished.success {
380 "Build finished successfully".to_string()
381 } else {
382 "Build finished with failures".to_string()
383 },
384 raw: Some(line.to_string()),
385 severity: Some(if finished.success {
386 cargowatch_core::event::Severity::Success
387 } else {
388 cargowatch_core::event::Severity::Error
389 }),
390 },
391 }]),
392 Message::TextLine(text) => Some(vec![SessionEvent::OutputLine {
393 session_id: session_id.to_string(),
394 entry: LogEntry {
395 sequence,
396 timestamp,
397 stream: OutputStream::Stdout,
398 text,
399 raw: Some(line.to_string()),
400 severity: Some(cargowatch_core::event::Severity::Info),
401 },
402 }]),
403 _ => None,
404 }
405}
406
407fn handle_compiler_message(
408 session_id: &str,
409 message: CompilerMessage,
410 sequence: u64,
411 timestamp: OffsetDateTime,
412 raw: &str,
413) -> Vec<SessionEvent> {
414 let severity = match message.message.level {
415 DiagnosticLevel::Ice | DiagnosticLevel::Error | DiagnosticLevel::FailureNote => {
416 cargowatch_core::event::Severity::Error
417 }
418 DiagnosticLevel::Warning => cargowatch_core::event::Severity::Warning,
419 DiagnosticLevel::Note => cargowatch_core::event::Severity::Note,
420 DiagnosticLevel::Help => cargowatch_core::event::Severity::Help,
421 _ => cargowatch_core::event::Severity::Info,
422 };
423 let primary_span = message.message.spans.iter().find(|span| span.is_primary);
424 let rendered = message
425 .message
426 .rendered
427 .clone()
428 .unwrap_or_else(|| message.message.message.clone());
429 let diagnostic = DiagnosticRecord {
430 id: format!(
431 "{}:{}:{}",
432 message.package_id,
433 message
434 .message
435 .code
436 .as_ref()
437 .map(|code| code.code.as_str())
438 .unwrap_or("diagnostic"),
439 sequence
440 ),
441 timestamp,
442 severity,
443 message: message.message.message.clone(),
444 rendered: Some(rendered.clone()),
445 code: message.message.code.as_ref().map(|code| code.code.clone()),
446 file: primary_span.map(|span| PathBuf::from(&span.file_name)),
447 line: primary_span.and_then(|span| u32::try_from(span.line_start).ok()),
448 column: primary_span.and_then(|span| u32::try_from(span.column_start).ok()),
449 target: Some(message.target.name.clone()),
450 package_id: Some(message.package_id.to_string()),
451 };
452 vec![
453 SessionEvent::Diagnostic {
454 session_id: session_id.to_string(),
455 diagnostic,
456 },
457 SessionEvent::OutputLine {
458 session_id: session_id.to_string(),
459 entry: LogEntry {
460 sequence,
461 timestamp,
462 stream: OutputStream::Stdout,
463 text: rendered,
464 raw: Some(raw.to_string()),
465 severity: Some(severity),
466 },
467 },
468 ]
469}
470
471fn infer_severity(line: &str, stream: OutputStream) -> Option<cargowatch_core::event::Severity> {
472 let lower = line.to_ascii_lowercase();
473 if lower.contains("error[") || lower.starts_with("error:") {
474 return Some(cargowatch_core::event::Severity::Error);
475 }
476 if lower.starts_with("warning:") {
477 return Some(cargowatch_core::event::Severity::Warning);
478 }
479 if lower.starts_with("note:") {
480 return Some(cargowatch_core::event::Severity::Note);
481 }
482 if lower.starts_with("help:") {
483 return Some(cargowatch_core::event::Severity::Help);
484 }
485 match stream {
486 OutputStream::Stdout | OutputStream::Stderr | OutputStream::System => {
487 Some(cargowatch_core::event::Severity::Info)
488 }
489 }
490}
491
492fn classify_command(command: &[String]) -> Option<DetectedProcessClass> {
493 if command.is_empty() {
494 return None;
495 }
496 let first = Path::new(&command[0])
497 .file_stem()
498 .and_then(|stem| stem.to_str())
499 .unwrap_or_default();
500 match first {
501 "cargo" => {
502 let subcommand = command.iter().skip(1).find(|part| !part.starts_with('-'))?;
503 match subcommand.as_str() {
504 "build" => Some(DetectedProcessClass::CargoBuild),
505 "check" => Some(DetectedProcessClass::CargoCheck),
506 "test" => Some(DetectedProcessClass::CargoTest),
507 "clippy" => Some(DetectedProcessClass::CargoClippy),
508 "doc" => Some(DetectedProcessClass::CargoDoc),
509 _ => Some(DetectedProcessClass::UnknownRustProcess),
510 }
511 }
512 "rustc" => Some(DetectedProcessClass::RustcCompile),
513 "rustdoc" => Some(DetectedProcessClass::Rustdoc),
514 "clippy-driver" => Some(DetectedProcessClass::CargoClippy),
515 _ => Some(DetectedProcessClass::UnknownRustProcess),
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use std::fs;
522
523 use proptest::prelude::*;
524
525 use super::*;
526
527 #[test]
528 fn cargo_json_fixture_produces_diagnostic_and_log() {
529 let fixture = fs::read_to_string("tests/fixtures/compiler-message.json")
530 .expect("fixture should load");
531 let events = parse_cargo_json_line("session-1", &fixture, 7).expect("json message");
532
533 assert_eq!(events.len(), 2);
534 assert!(matches!(events[0], SessionEvent::Diagnostic { .. }));
535 assert!(matches!(events[1], SessionEvent::OutputLine { .. }));
536 }
537
538 proptest! {
539 #[test]
540 fn non_json_lines_are_never_parsed_as_cargo_messages(input in "\\PC*") {
541 prop_assume!(serde_json::from_str::<serde_json::Value>(&input).is_err());
542 prop_assert!(parse_cargo_json_line("session", &input, 1).is_none());
543 }
544 }
545}