1use std::collections::HashMap;
28use std::sync::Arc;
29
30use tokio::io::AsyncReadExt as _;
31use tokio::process::Command;
32use tokio::sync::broadcast;
33use tokio::task::JoinHandle;
34use tokio_util::sync::CancellationToken;
35
36use crate::diagnostics_extractor::DiagnosticsExtractor;
37use crate::log_matcher::{MatcherEngine, MatcherRegistry};
38use crate::log_store::LogStore;
39use crate::operation::Operation;
40use crate::task_registry::{TaskId, TaskStatus};
41use crate::vt_parser::{StyledLine, TerminalParser};
42
43#[derive(Clone, Debug, PartialEq, Eq)]
49pub enum OutputStream {
50 Stdout,
51 Stderr,
52}
53
54#[derive(Clone, Debug)]
59pub struct TaskOutputEvent {
60 pub task_id: TaskId,
61 pub stream: OutputStream,
62 pub chunk: Vec<u8>,
64}
65
66#[derive(Clone, Debug)]
69pub struct ParsedOutputEvent {
70 pub task_id: TaskId,
71 pub stream: OutputStream,
72 pub lines: Vec<StyledLine>,
74}
75
76#[derive(Clone, Debug)]
78pub enum TaskEvent {
79 Started(TaskId),
81 ParsedOutput(ParsedOutputEvent),
83 Finished(TaskId, TaskStatus),
85}
86
87pub struct TaskExecutor {
97 event_tx: broadcast::Sender<TaskEvent>,
98 handles: HashMap<TaskId, JoinHandle<()>>,
100 matcher_registry: std::sync::Arc<MatcherRegistry>,
102}
103
104impl TaskExecutor {
105 const BROADCAST_CAPACITY: usize = 256;
108
109 pub fn new() -> Self {
110 let (event_tx, _) = broadcast::channel(Self::BROADCAST_CAPACITY);
111 Self {
112 event_tx,
113 handles: HashMap::new(),
114 matcher_registry: std::sync::Arc::new(MatcherRegistry::new()),
115 }
116 }
117
118 pub fn with_matcher_registry(matcher_registry: std::sync::Arc<MatcherRegistry>) -> Self {
122 let (event_tx, _) = broadcast::channel(Self::BROADCAST_CAPACITY);
123 Self {
124 event_tx,
125 handles: HashMap::new(),
126 matcher_registry,
127 }
128 }
129
130 pub fn subscribe(&self) -> broadcast::Receiver<TaskEvent> {
135 self.event_tx.subscribe()
136 }
137
138 #[allow(clippy::too_many_arguments)]
163 pub fn spawn(
164 &mut self,
165 task_id: TaskId,
166 command: String,
167 cancellation_token: CancellationToken,
168 marker: String,
169 log_store: Option<LogStore>,
170 op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
171 ) {
172 if let Some(old) = self.handles.remove(&task_id) {
175 old.abort();
176 }
177
178 let event_tx = self.event_tx.clone();
179 let op_tx = op_tx.clone();
180
181 let log_rx = log_store.as_ref().map(|_| event_tx.subscribe());
185
186 let source = marker
189 .strip_prefix("task:")
190 .and_then(|s| s.split(':').next())
191 .unwrap_or("task")
192 .to_string();
193 let extractor = Arc::new(DiagnosticsExtractor::new(marker.clone(), source));
194
195 let matchers = self.matcher_registry.iter_active();
199 let stdout_engine = if matchers.is_empty() {
200 None
201 } else {
202 Some(MatcherEngine::new(matchers.clone(), marker.clone()))
203 };
204 let stderr_engine = if matchers.is_empty() {
205 None
206 } else {
207 Some(MatcherEngine::new(matchers, marker))
208 };
209
210 let handle = tokio::spawn(async move {
211 if let (Some(ls), Some(mut rx)) = (log_store, log_rx) {
214 tokio::spawn(async move {
215 let mut writer = match ls.open_writer(task_id).await {
216 Ok(w) => w,
217 Err(e) => {
218 log::warn!(
219 "log_store: failed to open writer for task {}: {e}",
220 task_id.0
221 );
222 return;
223 }
224 };
225
226 let mut finished_normally = false;
227 loop {
228 match rx.recv().await {
229 Ok(TaskEvent::ParsedOutput(ev)) if ev.task_id == task_id => {
230 for line in &ev.lines {
231 if let Err(e) = writer.append_line(&line.text).await {
232 log::warn!("log_store: write error: {e}");
233 }
234 }
235 }
236 Ok(TaskEvent::Finished(id, _)) if id == task_id => {
237 finished_normally = true;
238 break;
239 }
240 Err(broadcast::error::RecvError::Closed) => break,
241 Err(broadcast::error::RecvError::Lagged(n)) => {
242 log::warn!("log_store: receiver lagged by {n} messages");
243 }
244 _ => {}
245 }
246 }
247
248 if finished_normally {
249 if let Err(e) = writer.close_and_compress(&ls).await {
250 log::warn!(
251 "log_store: compress error for task {}: {e}",
252 task_id.0
253 );
254 let _ = ls.log_path(task_id); }
257 } else {
258 if let Err(e) = writer.close().await {
260 log::warn!(
261 "log_store: close error for task {}: {e}",
262 task_id.0
263 );
264 }
265 }
266 });
267 }
268
269 let _ = event_tx.send(TaskEvent::Started(task_id));
271 let _ = op_tx.send(vec![Operation::TaskStarted(task_id)]);
272
273 let child_result = build_command(&command)
275 .stdout(std::process::Stdio::piped())
276 .stderr(std::process::Stdio::piped())
277 .kill_on_drop(true)
280 .spawn();
281
282 let mut child = match child_result {
283 Ok(c) => c,
284 Err(_) => {
285 let _ = event_tx.send(TaskEvent::Finished(task_id, TaskStatus::Error));
286 let _ = op_tx.send(vec![Operation::TaskFinished {
287 id: task_id,
288 status: TaskStatus::Error,
289 }]);
290 return;
291 }
292 };
293
294 let stdout = child.stdout.take().expect("stdout was piped");
296 let stderr = child.stderr.take().expect("stderr was piped");
297
298 let stdout_tx = event_tx.clone();
301 let stdout_op_tx = op_tx.clone();
302 let stdout_extractor = Arc::clone(&extractor);
303 let stdout_task = tokio::spawn(async move {
304 stream_output_parsed(
305 task_id,
306 OutputStream::Stdout,
307 stdout,
308 TerminalParser::new(),
309 &stdout_extractor,
310 stdout_engine,
311 &stdout_tx,
312 &stdout_op_tx,
313 )
314 .await;
315 });
316
317 let stderr_tx = event_tx.clone();
318 let stderr_op_tx = op_tx.clone();
319 let stderr_extractor = Arc::clone(&extractor);
320 let stderr_task = tokio::spawn(async move {
321 stream_output_parsed(
322 task_id,
323 OutputStream::Stderr,
324 stderr,
325 TerminalParser::new(),
326 &stderr_extractor,
327 stderr_engine,
328 &stderr_tx,
329 &stderr_op_tx,
330 )
331 .await;
332 });
333
334 let final_status = tokio::select! {
336 _ = cancellation_token.cancelled() => {
337 let _ = child.kill().await;
342 stdout_task.abort();
343 stderr_task.abort();
344 TaskStatus::Cancelled
345 }
346 result = child.wait() => {
347 let _ = tokio::join!(stdout_task, stderr_task);
349 match result {
350 Ok(exit) if exit.success() => TaskStatus::Success,
351 _ => TaskStatus::Error,
352 }
353 }
354 };
355
356 let _ = event_tx.send(TaskEvent::Finished(task_id, final_status.clone()));
357 let _ = op_tx.send(vec![Operation::TaskFinished {
358 id: task_id,
359 status: final_status,
360 }]);
361 });
362
363 self.handles.insert(task_id, handle);
364 }
365
366 pub fn abort(&mut self, task_id: TaskId) {
371 if let Some(handle) = self.handles.remove(&task_id) {
372 handle.abort();
373 }
374 }
375}
376
377impl Default for TaskExecutor {
378 fn default() -> Self {
379 Self::new()
380 }
381}
382
383#[allow(clippy::too_many_arguments)]
405async fn stream_output_parsed(
406 task_id: TaskId,
407 stream: OutputStream,
408 mut reader: impl tokio::io::AsyncRead + Unpin,
409 mut parser: TerminalParser,
410 extractor: &DiagnosticsExtractor,
411 mut engine: Option<MatcherEngine>,
412 tx: &broadcast::Sender<TaskEvent>,
413 op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
414) {
415 let mut buf = vec![0u8; 4096];
416 loop {
417 match reader.read(&mut buf).await {
418 Ok(0) | Err(_) => break,
419 Ok(n) => {
420 let lines = parser.push(&buf[..n]);
421 broadcast_and_extract(task_id, &stream, &lines, extractor, engine.as_mut(), tx, op_tx);
422 }
423 }
424 }
425 if let Some(line) = parser.flush() {
427 broadcast_and_extract(task_id, &stream, &[line], extractor, engine.as_mut(), tx, op_tx);
428 }
429 if let Some(eng) = engine.as_mut() {
431 let issues = eng.flush();
432 send_issues(issues, op_tx);
433 }
434}
435
436fn broadcast_and_extract(
439 task_id: TaskId,
440 stream: &OutputStream,
441 lines: &[StyledLine],
442 extractor: &DiagnosticsExtractor,
443 engine: Option<&mut MatcherEngine>,
444 tx: &broadcast::Sender<TaskEvent>,
445 op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
446) {
447 if lines.is_empty() {
448 return;
449 }
450
451 let _ = tx.send(TaskEvent::ParsedOutput(ParsedOutputEvent {
453 task_id,
454 stream: stream.clone(),
455 lines: lines.to_vec(),
456 }));
457
458 for line in lines {
460 let issues = extractor.extract_from_line(line);
461 if !issues.is_empty() {
462 let ops: Vec<Operation> =
463 issues.into_iter().map(|i| Operation::AddIssue { issue: i }).collect();
464 let _ = op_tx.send(ops);
465 }
466 }
467
468 if let Some(eng) = engine {
470 for line in lines {
471 let issues = eng.process_line(&line.text);
472 send_issues(issues, op_tx);
473 }
474 }
475}
476
477fn send_issues(
479 issues: Vec<crate::issue_registry::NewIssue>,
480 op_tx: &tokio::sync::mpsc::UnboundedSender<Vec<Operation>>,
481) {
482 if !issues.is_empty() {
483 let ops: Vec<Operation> =
484 issues.into_iter().map(|i| Operation::AddIssue { issue: i }).collect();
485 let _ = op_tx.send(ops);
486 }
487}
488
489fn build_command(command: &str) -> Command {
492 #[cfg(unix)]
493 {
494 let mut cmd = Command::new("sh");
495 cmd.arg("-c").arg(command);
496 cmd
497 }
498 #[cfg(windows)]
499 {
500 let mut cmd = Command::new("cmd");
501 cmd.arg("/C").arg(command);
502 cmd
503 }
504}
505
506#[cfg(test)]
511mod tests {
512 use super::*;
513 use crate::task_registry::{TaskKey, TaskQueueId, TaskTrigger, TaskRegistry};
514 use tokio::sync::mpsc::unbounded_channel;
515 use std::time::Duration;
516
517 const TEST_TIMEOUT: Duration = Duration::from_secs(15);
519
520 fn dummy_key() -> TaskKey {
521 TaskKey {
522 queue: TaskQueueId("test".into()),
523 target: "t".into(),
524 }
525 }
526
527 fn key_n(n: u64) -> TaskKey {
528 TaskKey {
529 queue: TaskQueueId("test".into()),
530 target: format!("t{n}"),
531 }
532 }
533
534 fn schedule(reg: &mut TaskRegistry, key: TaskKey, command: &str) -> (TaskId, CancellationToken, String) {
535 let cmd = command.to_string();
536 let id = reg.schedule_task(key, TaskTrigger::Manual, cmd.clone());
537 let token = reg.get(id).unwrap().cancellation_token.clone();
538 (id, token, cmd)
539 }
540
541 #[cfg(unix)]
547 mod cmds {
548 pub const ECHO_STDOUT: &str = "echo hello";
549 pub const ECHO_STDERR: &str = "echo err >&2";
550 pub const SLEEP_LONG: &str = "sleep 60";
551 pub const NO_SUCH: &str = "/no/such/binary/xyz_oo_test";
552 }
553 #[cfg(windows)]
554 mod cmds {
555 pub const ECHO_STDOUT: &str = "echo hello";
557 pub const ECHO_STDERR: &str = "echo err>&2";
559 pub const SLEEP_LONG: &str = "ping -n 120 127.0.0.1";
561 pub const NO_SUCH: &str = "no_such_binary_xyz_oo_test";
563 }
564
565 #[tokio::test]
567 async fn task_emits_stdout() {
568 tokio::time::timeout(TEST_TIMEOUT, async {
569 let mut reg = TaskRegistry::new();
570 let mut exec = TaskExecutor::new();
571 let mut rx = exec.subscribe();
572 let (op_tx, mut op_rx) = unbounded_channel::<Vec<Operation>>();
573
574 let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::ECHO_STDOUT);
575 exec.spawn(id, cmd, token, "task:test:t".into(), None, &op_tx);
576
577 let mut got_output = false;
578 loop {
579 match rx.recv().await.unwrap() {
580 TaskEvent::ParsedOutput(ev)
581 if ev.task_id == id && ev.stream == OutputStream::Stdout =>
582 {
583 got_output = true;
584 }
585 TaskEvent::Finished(tid, status) if tid == id => {
586 assert_eq!(status, TaskStatus::Success);
587 break;
588 }
589 _ => {}
590 }
591 }
592 assert!(got_output, "expected at least one stdout output event");
593
594 let ops: Vec<Operation> = op_rx.try_recv().unwrap();
596 assert!(ops.iter().any(|o| matches!(o, Operation::TaskStarted(i) if *i == id)));
597 let ops2: Vec<Operation> = op_rx.recv().await.unwrap();
598 assert!(ops2.iter().any(|o| matches!(
599 o,
600 Operation::TaskFinished { id: i, status: TaskStatus::Success } if *i == id
601 )));
602 })
603 .await
604 .expect("test timed out");
605 }
606
607 #[tokio::test]
609 async fn task_emits_stderr() {
610 tokio::time::timeout(TEST_TIMEOUT, async {
611 let mut reg = TaskRegistry::new();
612 let mut exec = TaskExecutor::new();
613 let mut rx = exec.subscribe();
614 let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
615
616 let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::ECHO_STDERR);
617 exec.spawn(id, cmd, token, "task:test:t".into(), None, &op_tx);
618
619 let mut got_stderr = false;
620 loop {
621 match rx.recv().await.unwrap() {
622 TaskEvent::ParsedOutput(ev)
623 if ev.task_id == id && ev.stream == OutputStream::Stderr =>
624 {
625 got_stderr = true;
626 }
627 TaskEvent::Finished(tid, _) if tid == id => break,
628 _ => {}
629 }
630 }
631 assert!(got_stderr, "expected at least one stderr output event");
632 })
633 .await
634 .expect("test timed out");
635 }
636
637 #[tokio::test]
639 async fn cancellation_stops_task() {
640 tokio::time::timeout(TEST_TIMEOUT, async {
641 let mut reg = TaskRegistry::new();
642 let mut exec = TaskExecutor::new();
643 let mut rx = exec.subscribe();
644 let (op_tx, mut op_rx) = unbounded_channel::<Vec<Operation>>();
645
646 let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::SLEEP_LONG);
647 exec.spawn(id, cmd, token.clone(), "task:test:t".into(), None, &op_tx);
648
649 loop {
651 if let Ok(TaskEvent::Started(tid)) = rx.recv().await
652 && tid == id { break; }
653 }
654 token.cancel();
655
656 loop {
658 if let Ok(TaskEvent::Finished(tid, status)) = rx.recv().await
659 && tid == id {
660 assert_eq!(status, TaskStatus::Cancelled);
661 break;
662 }
663 }
664
665 let mut found = false;
667 while let Some(ops) = op_rx.recv().await {
668 if ops.iter().any(|o| matches!(
669 o,
670 Operation::TaskFinished { id: i, status: TaskStatus::Cancelled }
671 if *i == id
672 )) {
673 found = true;
674 break;
675 }
676 }
677 assert!(found, "expected TaskFinished(Cancelled) in op_tx");
678 })
679 .await
680 .expect("test timed out");
681 }
682
683 #[tokio::test]
685 async fn multiple_subscribers_receive_same_output() {
686 tokio::time::timeout(TEST_TIMEOUT, async {
687 let mut reg = TaskRegistry::new();
688 let mut exec = TaskExecutor::new();
689 let rx1 = exec.subscribe();
690 let rx2 = exec.subscribe();
691 let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
692
693 let (id, token, cmd) = schedule(&mut reg, dummy_key(), cmds::ECHO_STDOUT);
694 exec.spawn(id, cmd, token, "task:test:t".into(), None, &op_tx);
695
696 async fn collect(mut rx: broadcast::Receiver<TaskEvent>, id: TaskId) -> usize {
697 let mut chunks = 0usize;
698 loop {
699 match rx.recv().await.unwrap() {
700 TaskEvent::ParsedOutput(_) => chunks += 1,
701 TaskEvent::Finished(tid, _) if tid == id => return chunks,
702 _ => {}
703 }
704 }
705 }
706
707 let (c1, c2) = tokio::join!(collect(rx1, id), collect(rx2, id));
708 assert_eq!(c1, c2, "both subscribers should see the same chunk count");
709 assert!(c1 > 0, "expected at least one output chunk");
710 })
711 .await
712 .expect("test timed out");
713 }
714
715 #[tokio::test]
717 async fn spawn_failure_reports_error() {
718 tokio::time::timeout(TEST_TIMEOUT, async {
719 let mut reg = TaskRegistry::new();
720 let mut exec = TaskExecutor::new();
721 let mut rx = exec.subscribe();
722 let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
723
724 let (id, token, _) = schedule(&mut reg, dummy_key(), cmds::NO_SUCH);
725 exec.spawn(id, cmds::NO_SUCH.to_string(), token, "task:test:t".into(), None, &op_tx);
726
727 let final_status;
728 loop {
729 match rx.recv().await.unwrap() {
730 TaskEvent::Finished(tid, status) if tid == id => {
731 final_status = status;
732 break;
733 }
734 _ => {}
735 }
736 }
737 assert!(
739 matches!(final_status, TaskStatus::Error),
740 "expected Error, got {final_status:?}"
741 );
742 })
743 .await
744 .expect("test timed out");
745 }
746
747 #[tokio::test]
749 async fn rapid_cancel_restart() {
750 tokio::time::timeout(TEST_TIMEOUT, async {
751 let mut reg = TaskRegistry::new();
752 let mut exec = TaskExecutor::new();
753 let mut rx = exec.subscribe();
754 let (op_tx, _op_rx) = unbounded_channel::<Vec<Operation>>();
755
756 let (id1, token1, cmd1) = schedule(&mut reg, key_n(1), cmds::SLEEP_LONG);
757 exec.spawn(id1, cmd1, token1.clone(), "task:test:t1".into(), None, &op_tx);
758
759 loop {
761 if let Ok(TaskEvent::Started(tid)) = rx.recv().await
762 && tid == id1 { break; }
763 }
764 token1.cancel();
765
766 let (id2, token2, cmd2) = schedule(&mut reg, key_n(2), cmds::ECHO_STDOUT);
768 exec.spawn(id2, cmd2, token2, "task:test:t2".into(), None, &op_tx);
769
770 let mut saw_cancel = false;
772 let mut saw_success = false;
773 loop {
774 match rx.recv().await.unwrap() {
775 TaskEvent::Finished(tid, TaskStatus::Cancelled) if tid == id1 => {
776 saw_cancel = true;
777 }
778 TaskEvent::Finished(tid, TaskStatus::Success) if tid == id2 => {
779 saw_success = true;
780 }
781 _ => {}
782 }
783 if saw_cancel && saw_success { break; }
784 }
785 assert!(saw_cancel, "first task should be Cancelled");
786 assert!(saw_success, "second task should succeed");
787 })
788 .await
789 .expect("test timed out");
790 }
791}
792