Skip to main content

agent_procs/cli/
mod.rs

1pub mod run;
2pub mod stop;
3pub mod restart;
4pub mod status;
5pub mod logs;
6pub mod wait;
7pub mod up;
8pub mod down;
9pub mod session_cmd;
10
11use crate::protocol::{Request, Response, Stream as ProtoStream};
12use crate::paths;
13use crate::session;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::net::UnixStream;
16
17pub async fn connect(session: &str, auto_spawn: bool) -> Result<UnixStream, String> {
18    let socket = paths::socket_path(session);
19    let pid = paths::pid_path(session);
20
21    if !session::is_daemon_alive(&pid) {
22        if auto_spawn {
23            crate::daemon::spawn::spawn_daemon(session)
24                .map_err(|e| format!("failed to spawn daemon: {}", e))?;
25        } else {
26            return Err("no daemon running for this session".into());
27        }
28    }
29
30    UnixStream::connect(&socket)
31        .await
32        .map_err(|e| format!("failed to connect to daemon: {}", e))
33}
34
35pub async fn request(session: &str, req: &Request, auto_spawn: bool) -> Result<Response, String> {
36    let stream = connect(session, auto_spawn).await?;
37    let (reader, mut writer) = stream.into_split();
38
39    let mut json = serde_json::to_string(req).unwrap();
40    json.push('\n');
41    writer.write_all(json.as_bytes()).await.map_err(|e| format!("write error: {}", e))?;
42    writer.flush().await.map_err(|e| format!("flush error: {}", e))?;
43
44    let mut lines = BufReader::new(reader);
45    let mut line = String::new();
46    lines.read_line(&mut line).await.map_err(|e| format!("read error: {}", e))?;
47
48    serde_json::from_str(&line).map_err(|e| format!("parse error: {}", e))
49}
50
51/// Send a request and read streaming responses until LogEnd or error.
52/// Calls `on_line` for each LogLine received. Returns the terminal response.
53pub async fn stream_responses(
54    session: &str,
55    req: &Request,
56    auto_spawn: bool,
57    mut on_line: impl FnMut(&str, ProtoStream, &str),
58) -> Result<Response, String> {
59    let stream = connect(session, auto_spawn).await?;
60    let (reader, mut writer) = stream.into_split();
61
62    let mut json = serde_json::to_string(req).unwrap();
63    json.push('\n');
64    writer.write_all(json.as_bytes()).await.map_err(|e| format!("write error: {}", e))?;
65    writer.flush().await.map_err(|e| format!("flush error: {}", e))?;
66
67    let mut lines = BufReader::new(reader);
68    loop {
69        let mut line = String::new();
70        let n = lines.read_line(&mut line).await.map_err(|e| format!("read error: {}", e))?;
71        if n == 0 { return Ok(Response::LogEnd); } // EOF
72
73        let resp: Response = serde_json::from_str(&line).map_err(|e| format!("parse error: {}", e))?;
74        match resp {
75            Response::LogLine { ref process, stream, ref line } => {
76                on_line(process, stream, line);
77            }
78            Response::LogEnd | Response::Error { .. } => return Ok(resp),
79            other => return Ok(other), // unexpected
80        }
81    }
82}