agent_procs/daemon/
wait_engine.rs1use crate::daemon::actor::PmHandle;
2use crate::daemon::log_writer::OutputLine;
3use crate::protocol::{ErrorCode, Response};
4use std::time::Duration;
5use tokio::sync::broadcast;
6
7pub async fn wait_for(
10 mut output_rx: broadcast::Receiver<OutputLine>,
11 target: &str,
12 pattern: Option<&str>,
13 use_regex: bool,
14 wait_exit: bool,
15 timeout: Duration,
16 handle: PmHandle,
17) -> Response {
18 let compiled_regex = if use_regex {
19 match pattern {
20 Some(p) => match regex::Regex::new(p) {
21 Ok(re) => Some(re),
22 Err(e) => {
23 return Response::Error {
24 code: ErrorCode::General,
25 message: format!("invalid regex: {}", e),
26 };
27 }
28 },
29 None => None,
30 }
31 } else {
32 None
33 };
34
35 if wait_exit {
37 if let Some(exit_code) = handle.is_process_exited(target).await {
38 return Response::WaitExited { exit_code };
39 }
40 }
41
42 tokio::select! {
43 result = async {
44 loop {
45 match output_rx.recv().await {
46 Ok(line) => {
47 if line.process != target { continue; }
48 if let Some(pat) = pattern {
49 let matched = if let Some(ref re) = compiled_regex {
50 re.is_match(&line.line)
51 } else {
52 line.line.contains(pat)
53 };
54 if matched {
55 return Response::WaitMatch { line: line.line };
56 }
57 }
58 if wait_exit {
60 if let Some(exit_code) = handle.is_process_exited(target).await {
61 return Response::WaitExited { exit_code };
62 }
63 }
64 }
65 Err(broadcast::error::RecvError::Lagged(_)) => {},
66 Err(broadcast::error::RecvError::Closed) => {
67 if wait_exit {
68 if let Some(exit_code) = handle.is_process_exited(target).await {
70 return Response::WaitExited { exit_code };
71 }
72 }
73 return Response::Error { code: ErrorCode::General, message: "output channel closed".into() };
74 }
75 }
76 }
77 } => result,
78 () = tokio::time::sleep(timeout) => Response::WaitTimeout,
79 }
80}