Skip to main content

agent_procs/daemon/
wait_engine.rs

1use crate::daemon::actor::PmHandle;
2use crate::daemon::log_writer::OutputLine;
3use crate::protocol::{ErrorCode, Response};
4use std::time::Duration;
5use tokio::sync::broadcast;
6use tokio::time::{self, MissedTickBehavior};
7
8const EXIT_POLL_INTERVAL: Duration = Duration::from_millis(100);
9
10/// Wait for a condition on a process's output.
11/// Returns a Response indicating match, exit, or timeout.
12pub async fn wait_for(
13    mut output_rx: broadcast::Receiver<OutputLine>,
14    target: &str,
15    pattern: Option<&str>,
16    use_regex: bool,
17    wait_exit: bool,
18    timeout: Duration,
19    handle: PmHandle,
20) -> Response {
21    let compiled_regex = if use_regex {
22        match pattern {
23            Some(p) => match regex::Regex::new(p) {
24                Ok(re) => Some(re),
25                Err(e) => {
26                    return Response::Error {
27                        code: ErrorCode::General,
28                        message: format!("invalid regex: {}", e),
29                    };
30                }
31            },
32            None => None,
33        }
34    } else {
35        None
36    };
37
38    // Check if already exited before we start waiting
39    if wait_exit && let Some(exit_code) = handle.is_process_exited(target).await {
40        return Response::WaitExited { exit_code };
41    }
42
43    let mut exit_poll = time::interval(EXIT_POLL_INTERVAL);
44    exit_poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
45    exit_poll.tick().await;
46
47    tokio::select! {
48        result = async {
49            loop {
50                tokio::select! {
51                    recv = output_rx.recv() => match recv {
52                        Ok(line) => {
53                            if line.process != target { continue; }
54                            if let Some(pat) = pattern {
55                                let matched = if let Some(ref re) = compiled_regex {
56                                    re.is_match(&line.line)
57                                } else {
58                                    line.line.contains(pat)
59                                };
60                                if matched {
61                                    return Response::WaitMatch { line: line.line };
62                                }
63                            }
64                        }
65                        Err(broadcast::error::RecvError::Lagged(_)) => {},
66                        Err(broadcast::error::RecvError::Closed) => {
67                            if wait_exit
68                                && let Some(exit_code) = handle.is_process_exited(target).await
69                            {
70                                return Response::WaitExited { exit_code };
71                            }
72                            return Response::Error {
73                                code: ErrorCode::General,
74                                message: "output channel closed".into(),
75                            };
76                        }
77                    },
78                    _ = exit_poll.tick(), if wait_exit => {
79                        if let Some(exit_code) = handle.is_process_exited(target).await {
80                            return Response::WaitExited { exit_code };
81                        }
82                    }
83                }
84            }
85        } => result,
86        () = tokio::time::sleep(timeout) => Response::WaitTimeout,
87    }
88}