agent_procs/daemon/
wait_engine.rs1use crate::daemon::log_writer::OutputLine;
2use crate::protocol::Response;
3use std::time::Duration;
4use tokio::sync::broadcast;
5
6pub async fn wait_for(
9 mut output_rx: broadcast::Receiver<OutputLine>,
10 target: &str,
11 pattern: Option<&str>,
12 use_regex: bool,
13 wait_exit: bool,
14 timeout: Duration,
15 mut check_exit: impl FnMut() -> Option<Option<i32>>,
17) -> Response {
18 let compiled_regex = if use_regex {
19 pattern.and_then(|p| regex::Regex::new(p).ok())
20 } else {
21 None
22 };
23
24 if wait_exit {
26 if let Some(Some(code)) = check_exit() {
27 return Response::WaitExited { exit_code: code };
28 }
29 }
30
31 tokio::select! {
32 result = async {
33 loop {
34 match output_rx.recv().await {
35 Ok(line) => {
36 if line.process != target { continue; }
37 if let Some(pat) = pattern {
38 let matched = if let Some(ref re) = compiled_regex {
39 re.is_match(&line.line)
40 } else {
41 line.line.contains(pat)
42 };
43 if matched {
44 return Response::WaitMatch { line: line.line };
45 }
46 }
47 if wait_exit {
49 if let Some(Some(code)) = check_exit() {
50 return Response::WaitExited { exit_code: code };
51 }
52 }
53 }
54 Err(broadcast::error::RecvError::Lagged(_)) => continue,
55 Err(broadcast::error::RecvError::Closed) => {
56 if wait_exit {
57 if let Some(Some(code)) = check_exit() {
59 return Response::WaitExited { exit_code: code };
60 }
61 }
62 return Response::Error { code: 1, message: "output channel closed".into() };
63 }
64 }
65 }
66 } => result,
67 _ = tokio::time::sleep(timeout) => Response::WaitTimeout,
68 }
69}