Skip to main content

agent_procs/daemon/
wait_engine.rs

1use crate::daemon::log_writer::OutputLine;
2use crate::protocol::Response;
3use std::time::Duration;
4use tokio::sync::broadcast;
5
6/// Wait for a condition on a process's output.
7/// Returns a Response indicating match, exit, or timeout.
8pub async fn wait_for(
9    mut output_rx: broadcast::Receiver<OutputLine>,
10    target: &str,
11    pattern: Option<&str>,
12    use_regex: bool,
13    wait_exit: bool,
14    timeout: Duration,
15    // Closure to check if process has already exited
16    mut check_exit: impl FnMut() -> Option<Option<i32>>,
17) -> Response {
18    let compiled_regex = if use_regex {
19        pattern.and_then(|p| regex::Regex::new(p).ok())
20    } else {
21        None
22    };
23
24    // Check if already exited before we start waiting
25    if wait_exit {
26        if let Some(Some(code)) = check_exit() {
27            return Response::WaitExited { exit_code: code };
28        }
29    }
30
31    tokio::select! {
32        result = async {
33            loop {
34                match output_rx.recv().await {
35                    Ok(line) => {
36                        if line.process != target { continue; }
37                        if let Some(pat) = pattern {
38                            let matched = if let Some(ref re) = compiled_regex {
39                                re.is_match(&line.line)
40                            } else {
41                                line.line.contains(pat)
42                            };
43                            if matched {
44                                return Response::WaitMatch { line: line.line };
45                            }
46                        }
47                        // After each line, check if process exited (for --exit mode)
48                        if wait_exit {
49                            if let Some(Some(code)) = check_exit() {
50                                return Response::WaitExited { exit_code: code };
51                            }
52                        }
53                    }
54                    Err(broadcast::error::RecvError::Lagged(_)) => continue,
55                    Err(broadcast::error::RecvError::Closed) => {
56                        if wait_exit {
57                            // Channel closed — process likely exited
58                            if let Some(Some(code)) = check_exit() {
59                                return Response::WaitExited { exit_code: code };
60                            }
61                        }
62                        return Response::Error { code: 1, message: "output channel closed".into() };
63                    }
64                }
65            }
66        } => result,
67        _ = tokio::time::sleep(timeout) => Response::WaitTimeout,
68    }
69}