Skip to main content

agent_procs/daemon/
wait_engine.rs

1use crate::daemon::log_writer::OutputLine;
2use crate::protocol::Response;
3use std::time::Duration;
4use tokio::sync::broadcast;
5
6/// Wait for a condition on a process's output.
7/// Returns a Response indicating match, exit, or timeout.
8pub async fn wait_for(
9    mut output_rx: broadcast::Receiver<OutputLine>,
10    target: &str,
11    pattern: Option<&str>,
12    use_regex: bool,
13    wait_exit: bool,
14    timeout: Duration,
15    // Closure to check if process has already exited
16    mut check_exit: impl FnMut() -> Option<Option<i32>>,
17) -> Response {
18    let compiled_regex = if use_regex {
19        match pattern {
20            Some(p) => match regex::Regex::new(p) {
21                Ok(re) => Some(re),
22                Err(e) => {
23                    return Response::Error {
24                        code: 1,
25                        message: format!("invalid regex: {}", e),
26                    };
27                }
28            },
29            None => None,
30        }
31    } else {
32        None
33    };
34
35    // Check if already exited before we start waiting
36    if wait_exit {
37        if let Some(exit_code) = check_exit() {
38            return Response::WaitExited { exit_code };
39        }
40    }
41
42    tokio::select! {
43        result = async {
44            loop {
45                match output_rx.recv().await {
46                    Ok(line) => {
47                        if line.process != target { continue; }
48                        if let Some(pat) = pattern {
49                            let matched = if let Some(ref re) = compiled_regex {
50                                re.is_match(&line.line)
51                            } else {
52                                line.line.contains(pat)
53                            };
54                            if matched {
55                                return Response::WaitMatch { line: line.line };
56                            }
57                        }
58                        // After each line, check if process exited (for --exit mode)
59                        if wait_exit {
60                            if let Some(exit_code) = check_exit() {
61                                return Response::WaitExited { exit_code };
62                            }
63                        }
64                    }
65                    Err(broadcast::error::RecvError::Lagged(_)) => {},
66                    Err(broadcast::error::RecvError::Closed) => {
67                        if wait_exit {
68                            // Channel closed — process likely exited
69                            if let Some(exit_code) = check_exit() {
70                                return Response::WaitExited { exit_code };
71                            }
72                        }
73                        return Response::Error { code: 1, message: "output channel closed".into() };
74                    }
75                }
76            }
77        } => result,
78        () = tokio::time::sleep(timeout) => Response::WaitTimeout,
79    }
80}