agent_procs/daemon/
wait_engine.rs1use crate::daemon::actor::PmHandle;
2use crate::daemon::log_writer::OutputLine;
3use crate::protocol::{ErrorCode, Response};
4use std::time::Duration;
5use tokio::sync::broadcast;
6
7pub async fn wait_for(
10 mut output_rx: broadcast::Receiver<OutputLine>,
11 target: &str,
12 pattern: Option<&str>,
13 use_regex: bool,
14 wait_exit: bool,
15 timeout: Duration,
16 handle: PmHandle,
17) -> Response {
18 let compiled_regex = if use_regex {
19 match pattern {
20 Some(p) => match regex::Regex::new(p) {
21 Ok(re) => Some(re),
22 Err(e) => {
23 return Response::Error {
24 code: ErrorCode::General,
25 message: format!("invalid regex: {}", e),
26 };
27 }
28 },
29 None => None,
30 }
31 } else {
32 None
33 };
34
35 if wait_exit && let Some(exit_code) = handle.is_process_exited(target).await {
37 return Response::WaitExited { exit_code };
38 }
39
40 tokio::select! {
41 result = async {
42 loop {
43 match output_rx.recv().await {
44 Ok(line) => {
45 if line.process != target { continue; }
46 if let Some(pat) = pattern {
47 let matched = if let Some(ref re) = compiled_regex {
48 re.is_match(&line.line)
49 } else {
50 line.line.contains(pat)
51 };
52 if matched {
53 return Response::WaitMatch { line: line.line };
54 }
55 }
56 if wait_exit
58 && let Some(exit_code) = handle.is_process_exited(target).await
59 {
60 return Response::WaitExited { exit_code };
61 }
62 }
63 Err(broadcast::error::RecvError::Lagged(_)) => {},
64 Err(broadcast::error::RecvError::Closed) => {
65 if wait_exit {
66 if let Some(exit_code) = handle.is_process_exited(target).await {
68 return Response::WaitExited { exit_code };
69 }
70 }
71 return Response::Error { code: ErrorCode::General, message: "output channel closed".into() };
72 }
73 }
74 }
75 } => result,
76 () = tokio::time::sleep(timeout) => Response::WaitTimeout,
77 }
78}