Skip to main content

agent_procs/daemon/
wait_engine.rs

1use crate::daemon::actor::PmHandle;
2use crate::daemon::log_writer::OutputLine;
3use crate::protocol::{ErrorCode, Response};
4use std::time::Duration;
5use tokio::sync::broadcast;
6
7/// Wait for a condition on a process's output.
8/// Returns a Response indicating match, exit, or timeout.
9pub async fn wait_for(
10    mut output_rx: broadcast::Receiver<OutputLine>,
11    target: &str,
12    pattern: Option<&str>,
13    use_regex: bool,
14    wait_exit: bool,
15    timeout: Duration,
16    handle: PmHandle,
17) -> Response {
18    let compiled_regex = if use_regex {
19        match pattern {
20            Some(p) => match regex::Regex::new(p) {
21                Ok(re) => Some(re),
22                Err(e) => {
23                    return Response::Error {
24                        code: ErrorCode::General,
25                        message: format!("invalid regex: {}", e),
26                    };
27                }
28            },
29            None => None,
30        }
31    } else {
32        None
33    };
34
35    // Check if already exited before we start waiting
36    if wait_exit && let Some(exit_code) = handle.is_process_exited(target).await {
37        return Response::WaitExited { exit_code };
38    }
39
40    tokio::select! {
41        result = async {
42            loop {
43                match output_rx.recv().await {
44                    Ok(line) => {
45                        if line.process != target { continue; }
46                        if let Some(pat) = pattern {
47                            let matched = if let Some(ref re) = compiled_regex {
48                                re.is_match(&line.line)
49                            } else {
50                                line.line.contains(pat)
51                            };
52                            if matched {
53                                return Response::WaitMatch { line: line.line };
54                            }
55                        }
56                        // After each line, check if process exited (for --exit mode)
57                        if wait_exit
58                            && let Some(exit_code) = handle.is_process_exited(target).await
59                        {
60                            return Response::WaitExited { exit_code };
61                        }
62                    }
63                    Err(broadcast::error::RecvError::Lagged(_)) => {},
64                    Err(broadcast::error::RecvError::Closed) => {
65                        if wait_exit {
66                            // Channel closed — process likely exited
67                            if let Some(exit_code) = handle.is_process_exited(target).await {
68                                return Response::WaitExited { exit_code };
69                            }
70                        }
71                        return Response::Error { code: ErrorCode::General, message: "output channel closed".into() };
72                    }
73                }
74            }
75        } => result,
76        () = tokio::time::sleep(timeout) => Response::WaitTimeout,
77    }
78}