agent_procs/daemon/
wait_engine.rs1use crate::daemon::log_writer::OutputLine;
2use crate::protocol::Response;
3use std::time::Duration;
4use tokio::sync::broadcast;
5
6pub async fn wait_for(
9 mut output_rx: broadcast::Receiver<OutputLine>,
10 target: &str,
11 pattern: Option<&str>,
12 use_regex: bool,
13 wait_exit: bool,
14 timeout: Duration,
15 mut check_exit: impl FnMut() -> Option<Option<i32>>,
17) -> Response {
18 let compiled_regex = if use_regex {
19 match pattern {
20 Some(p) => match regex::Regex::new(p) {
21 Ok(re) => Some(re),
22 Err(e) => {
23 return Response::Error {
24 code: 1,
25 message: format!("invalid regex: {}", e),
26 };
27 }
28 },
29 None => None,
30 }
31 } else {
32 None
33 };
34
35 if wait_exit {
37 if let Some(exit_code) = check_exit() {
38 return Response::WaitExited { exit_code };
39 }
40 }
41
42 tokio::select! {
43 result = async {
44 loop {
45 match output_rx.recv().await {
46 Ok(line) => {
47 if line.process != target { continue; }
48 if let Some(pat) = pattern {
49 let matched = if let Some(ref re) = compiled_regex {
50 re.is_match(&line.line)
51 } else {
52 line.line.contains(pat)
53 };
54 if matched {
55 return Response::WaitMatch { line: line.line };
56 }
57 }
58 if wait_exit {
60 if let Some(exit_code) = check_exit() {
61 return Response::WaitExited { exit_code };
62 }
63 }
64 }
65 Err(broadcast::error::RecvError::Lagged(_)) => {},
66 Err(broadcast::error::RecvError::Closed) => {
67 if wait_exit {
68 if let Some(exit_code) = check_exit() {
70 return Response::WaitExited { exit_code };
71 }
72 }
73 return Response::Error { code: 1, message: "output channel closed".into() };
74 }
75 }
76 }
77 } => result,
78 () = tokio::time::sleep(timeout) => Response::WaitTimeout,
79 }
80}