Skip to main content

agent_procs/cli/
mod.rs

1//! CLI client: connecting to the daemon and sending requests.
2//!
3//! Each subcommand module (`run`, `stop`, `status`, …) builds a
4//! [`Request`](crate::protocol::Request), sends it via [`request`], and
5//! interprets the [`Response`](crate::protocol::Response).
6//!
7//! [`connect`] handles auto-spawning the daemon on first use.
8//! [`stream_responses`] supports streaming commands like `logs --follow`.
9
10pub mod down;
11pub mod logs;
12pub mod restart;
13pub mod run;
14pub mod session_cmd;
15pub mod status;
16pub mod stop;
17pub mod up;
18pub mod wait;
19
20use crate::error::ClientError;
21use crate::paths;
22use crate::protocol::{Request, Response, Stream as ProtoStream};
23use crate::session;
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::net::UnixStream;
26
27pub async fn connect(session: &str, auto_spawn: bool) -> Result<UnixStream, ClientError> {
28    let socket = paths::socket_path(session);
29    let pid = paths::pid_path(session);
30
31    if !session::is_daemon_alive(&pid) {
32        if auto_spawn {
33            crate::daemon::spawn::spawn_daemon(session).map_err(ClientError::SpawnFailed)?;
34        } else {
35            return Err(ClientError::NoDaemon);
36        }
37    }
38
39    UnixStream::connect(&socket)
40        .await
41        .map_err(ClientError::ConnectionFailed)
42}
43
44/// Serialize a request and write it to the socket.
45async fn send_request(
46    writer: &mut tokio::net::unix::OwnedWriteHalf,
47    req: &Request,
48) -> Result<(), ClientError> {
49    let mut json = serde_json::to_string(req).map_err(ClientError::Serialize)?;
50    json.push('\n');
51    writer
52        .write_all(json.as_bytes())
53        .await
54        .map_err(ClientError::Write)?;
55    writer.flush().await.map_err(ClientError::Flush)
56}
57
58pub async fn request(
59    session: &str,
60    req: &Request,
61    auto_spawn: bool,
62) -> Result<Response, ClientError> {
63    let stream = connect(session, auto_spawn).await?;
64    let (reader, mut writer) = stream.into_split();
65
66    send_request(&mut writer, req).await?;
67
68    let mut lines = BufReader::new(reader);
69    let mut line = String::new();
70    lines
71        .read_line(&mut line)
72        .await
73        .map_err(ClientError::Read)?;
74
75    serde_json::from_str(&line).map_err(ClientError::ParseResponse)
76}
77
78/// Send an `EnableProxy` request to the daemon. Returns `Some(exit_code)` on error, None on success.
79pub async fn enable_proxy(session: &str, proxy_port: Option<u16>) -> Option<i32> {
80    let req = Request::EnableProxy { proxy_port };
81    match request(session, &req, true).await {
82        Ok(Response::Ok { message }) => {
83            eprintln!("{}", message);
84            None
85        }
86        Ok(Response::Error { code, message }) => {
87            eprintln!("error enabling proxy: {}", message);
88            Some(code)
89        }
90        Err(e) => {
91            eprintln!("error enabling proxy: {}", e);
92            Some(1)
93        }
94        _ => None,
95    }
96}
97
98/// Send a request and read streaming responses until `LogEnd` or error.
99/// Calls `on_line` for each `LogLine` received. Returns the terminal response.
100pub async fn stream_responses(
101    session: &str,
102    req: &Request,
103    auto_spawn: bool,
104    mut on_line: impl FnMut(&str, ProtoStream, &str),
105) -> Result<Response, ClientError> {
106    let stream = connect(session, auto_spawn).await?;
107    let (reader, mut writer) = stream.into_split();
108
109    send_request(&mut writer, req).await?;
110
111    let mut lines = BufReader::new(reader);
112    loop {
113        let mut line = String::new();
114        let n = lines
115            .read_line(&mut line)
116            .await
117            .map_err(ClientError::Read)?;
118        if n == 0 {
119            return Ok(Response::LogEnd);
120        } // EOF
121
122        let resp: Response = serde_json::from_str(&line).map_err(ClientError::ParseResponse)?;
123        match resp {
124            Response::LogLine {
125                ref process,
126                stream,
127                ref line,
128            } => {
129                on_line(process, stream, line);
130            }
131            Response::LogEnd | Response::Error { .. } => return Ok(resp),
132            other => return Ok(other), // unexpected
133        }
134    }
135}