Skip to main content

agent_procs/cli/
mod.rs

1//! CLI client: connecting to the daemon and sending requests.
2//!
3//! Each subcommand module (`run`, `stop`, `status`, …) builds a
4//! [`Request`](crate::protocol::Request), sends it via [`request`], and
5//! interprets the [`Response`](crate::protocol::Response).
6//!
7//! [`connect`] handles auto-spawning the daemon on first use.
8//! [`stream_responses`] supports streaming commands like `logs --follow`.
9
10pub mod down;
11pub mod logs;
12pub mod restart;
13pub mod run;
14pub mod session_cmd;
15pub mod status;
16pub mod stop;
17pub mod up;
18pub mod wait;
19
20use crate::error::ClientError;
21use crate::paths;
22use crate::protocol::{Request, Response, Stream as ProtoStream};
23use crate::session;
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::net::UnixStream;
26
27pub async fn connect(session: &str, auto_spawn: bool) -> Result<UnixStream, ClientError> {
28    let socket = paths::socket_path(session);
29    let pid = paths::pid_path(session);
30
31    if !session::is_daemon_alive(&pid) {
32        if auto_spawn {
33            crate::daemon::spawn::spawn_daemon(session).map_err(ClientError::SpawnFailed)?;
34        } else {
35            return Err(ClientError::NoDaemon);
36        }
37    }
38
39    UnixStream::connect(&socket)
40        .await
41        .map_err(ClientError::ConnectionFailed)
42}
43
44/// Serialize a request and write it to the socket.
45async fn send_request(
46    writer: &mut tokio::net::unix::OwnedWriteHalf,
47    req: &Request,
48) -> Result<(), ClientError> {
49    let mut json = serde_json::to_string(req).map_err(ClientError::Serialize)?;
50    json.push('\n');
51    writer
52        .write_all(json.as_bytes())
53        .await
54        .map_err(ClientError::Write)?;
55    writer.flush().await.map_err(ClientError::Flush)
56}
57
58pub async fn request(
59    session: &str,
60    req: &Request,
61    auto_spawn: bool,
62) -> Result<Response, ClientError> {
63    let stream = connect(session, auto_spawn).await?;
64    let (reader, mut writer) = stream.into_split();
65
66    send_request(&mut writer, req).await?;
67
68    let mut lines = BufReader::new(reader);
69    let mut line = String::new();
70    lines
71        .read_line(&mut line)
72        .await
73        .map_err(ClientError::Read)?;
74
75    serde_json::from_str(&line).map_err(ClientError::ParseResponse)
76}
77
78/// Send an `EnableProxy` request to the daemon. Returns `Some(exit_code)` on error, None on success.
79pub async fn enable_proxy(session: &str, proxy_port: Option<u16>) -> Option<i32> {
80    let req = Request::EnableProxy { proxy_port };
81    match request(session, &req, true).await {
82        Ok(Response::Ok { message }) => {
83            eprintln!("{}", message);
84            None
85        }
86        Ok(Response::Error { code, message }) => {
87            eprintln!("error enabling proxy: {}", message);
88            Some(code.exit_code())
89        }
90        Err(e) => {
91            eprintln!("error enabling proxy: {}", e);
92            Some(1)
93        }
94        _ => None,
95    }
96}
97
98/// Send a request and dispatch the response through a user-supplied callback.
99///
100/// Handles the `Error` branch centrally (prints the message and returns the
101/// exit code).  `on_success` receives any other response and returns
102/// `Some(exit_code)` to finish or `None` for an unexpected-response fallback.
103pub async fn request_and_handle<F>(
104    session: &str,
105    req: &Request,
106    auto_spawn: bool,
107    on_success: F,
108) -> i32
109where
110    F: FnOnce(Response) -> Option<i32>,
111{
112    match request(session, req, auto_spawn).await {
113        Ok(Response::Error { code, message }) => {
114            eprintln!("error: {}", message);
115            code.exit_code()
116        }
117        Ok(resp) => on_success(resp).unwrap_or_else(|| {
118            eprintln!("unexpected response");
119            1
120        }),
121        Err(e) => {
122            eprintln!("error: {}", e);
123            1
124        }
125    }
126}
127
128/// Send a request and read streaming responses until `LogEnd` or error.
129/// Calls `on_line` for each `LogLine` received. Returns the terminal response.
130pub async fn stream_responses(
131    session: &str,
132    req: &Request,
133    auto_spawn: bool,
134    mut on_line: impl FnMut(&str, ProtoStream, &str),
135) -> Result<Response, ClientError> {
136    let stream = connect(session, auto_spawn).await?;
137    let (reader, mut writer) = stream.into_split();
138
139    send_request(&mut writer, req).await?;
140
141    let mut lines = BufReader::new(reader);
142    loop {
143        let mut line = String::new();
144        let n = lines
145            .read_line(&mut line)
146            .await
147            .map_err(ClientError::Read)?;
148        if n == 0 {
149            return Ok(Response::LogEnd);
150        } // EOF
151
152        let resp: Response = serde_json::from_str(&line).map_err(ClientError::ParseResponse)?;
153        match resp {
154            Response::LogLine {
155                ref process,
156                stream,
157                ref line,
158            } => {
159                on_line(process, stream, line);
160            }
161            Response::LogEnd | Response::Error { .. } => return Ok(resp),
162            other => return Ok(other), // unexpected
163        }
164    }
165}