Skip to main content

agent_procs/cli/
mod.rs

1pub mod down;
2pub mod logs;
3pub mod restart;
4pub mod run;
5pub mod session_cmd;
6pub mod status;
7pub mod stop;
8pub mod up;
9pub mod wait;
10
11use crate::paths;
12use crate::protocol::{Request, Response, Stream as ProtoStream};
13use crate::session;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::UnixStream;
16
17pub async fn connect(session: &str, auto_spawn: bool) -> Result<UnixStream, String> {
18    let socket = paths::socket_path(session);
19    let pid = paths::pid_path(session);
20
21    if !session::is_daemon_alive(&pid) {
22        if auto_spawn {
23            crate::daemon::spawn::spawn_daemon(session)
24                .map_err(|e| format!("failed to spawn daemon: {}", e))?;
25        } else {
26            return Err("no daemon running for this session".into());
27        }
28    }
29
30    UnixStream::connect(&socket)
31        .await
32        .map_err(|e| format!("failed to connect to daemon: {}", e))
33}
34
35pub async fn request(session: &str, req: &Request, auto_spawn: bool) -> Result<Response, String> {
36    let stream = connect(session, auto_spawn).await?;
37    let (reader, mut writer) = stream.into_split();
38
39    let mut json = serde_json::to_string(req).map_err(|e| format!("serialize error: {}", e))?;
40    json.push('\n');
41    writer
42        .write_all(json.as_bytes())
43        .await
44        .map_err(|e| format!("write error: {}", e))?;
45    writer
46        .flush()
47        .await
48        .map_err(|e| format!("flush error: {}", e))?;
49
50    let mut lines = BufReader::new(reader);
51    let mut line = String::new();
52    lines
53        .read_line(&mut line)
54        .await
55        .map_err(|e| format!("read error: {}", e))?;
56
57    serde_json::from_str(&line).map_err(|e| format!("parse error: {}", e))
58}
59
60/// Send a request and read streaming responses until LogEnd or error.
61/// Calls `on_line` for each LogLine received. Returns the terminal response.
62pub async fn stream_responses(
63    session: &str,
64    req: &Request,
65    auto_spawn: bool,
66    mut on_line: impl FnMut(&str, ProtoStream, &str),
67) -> Result<Response, String> {
68    let stream = connect(session, auto_spawn).await?;
69    let (reader, mut writer) = stream.into_split();
70
71    let mut json = serde_json::to_string(req).map_err(|e| format!("serialize error: {}", e))?;
72    json.push('\n');
73    writer
74        .write_all(json.as_bytes())
75        .await
76        .map_err(|e| format!("write error: {}", e))?;
77    writer
78        .flush()
79        .await
80        .map_err(|e| format!("flush error: {}", e))?;
81
82    let mut lines = BufReader::new(reader);
83    loop {
84        let mut line = String::new();
85        let n = lines
86            .read_line(&mut line)
87            .await
88            .map_err(|e| format!("read error: {}", e))?;
89        if n == 0 {
90            return Ok(Response::LogEnd);
91        } // EOF
92
93        let resp: Response =
94            serde_json::from_str(&line).map_err(|e| format!("parse error: {}", e))?;
95        match resp {
96            Response::LogLine {
97                ref process,
98                stream,
99                ref line,
100            } => {
101                on_line(process, stream, line);
102            }
103            Response::LogEnd | Response::Error { .. } => return Ok(resp),
104            other => return Ok(other), // unexpected
105        }
106    }
107}