agent_procs/daemon/
wait_engine.rs1use crate::daemon::actor::PmHandle;
2use crate::daemon::log_writer::OutputLine;
3use crate::protocol::{ErrorCode, Response};
4use std::time::Duration;
5use tokio::sync::broadcast;
6use tokio::time::{self, MissedTickBehavior};
7
8const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
9
10pub async fn wait_for(
13 mut output_rx: broadcast::Receiver<OutputLine>,
14 target: &str,
15 pattern: Option<&str>,
16 use_regex: bool,
17 wait_exit: bool,
18 timeout: Duration,
19 handle: PmHandle,
20) -> Response {
21 let compiled_regex = if use_regex {
22 match pattern {
23 Some(p) => match regex::Regex::new(p) {
24 Ok(re) => Some(re),
25 Err(e) => {
26 return Response::Error {
27 code: ErrorCode::General,
28 message: format!("invalid regex: {}", e),
29 };
30 }
31 },
32 None => None,
33 }
34 } else {
35 None
36 };
37
38 if wait_exit && let Some(exit_code) = handle.is_process_exited(target).await {
40 return Response::WaitExited { exit_code };
41 }
42
43 let mut exit_poll = time::interval(EXIT_POLL_INTERVAL);
44 exit_poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
45 exit_poll.tick().await;
46
47 tokio::select! {
48 result = async {
49 loop {
50 tokio::select! {
51 recv = output_rx.recv() => match recv {
52 Ok(line) => {
53 if line.process != target { continue; }
54 if let Some(pat) = pattern {
55 let matched = if let Some(ref re) = compiled_regex {
56 re.is_match(&line.line)
57 } else {
58 line.line.contains(pat)
59 };
60 if matched {
61 return Response::WaitMatch { line: line.line };
62 }
63 }
64 }
65 Err(broadcast::error::RecvError::Lagged(_)) => {},
66 Err(broadcast::error::RecvError::Closed) => {
67 if wait_exit
68 && let Some(exit_code) = handle.is_process_exited(target).await
69 {
70 return Response::WaitExited { exit_code };
71 }
72 return Response::Error {
73 code: ErrorCode::General,
74 message: "output channel closed".into(),
75 };
76 }
77 },
78 _ = exit_poll.tick(), if wait_exit => {
79 if let Some(exit_code) = handle.is_process_exited(target).await {
80 return Response::WaitExited { exit_code };
81 }
82 }
83 }
84 }
85 } => result,
86 () = tokio::time::sleep(timeout) => Response::WaitTimeout,
87 }
88}