1pub mod down;
11pub mod logs;
12pub mod restart;
13pub mod run;
14pub mod session_cmd;
15pub mod status;
16pub mod stop;
17pub mod up;
18pub mod wait;
19
20use crate::error::ClientError;
21use crate::paths;
22use crate::protocol::{Request, Response, Stream as ProtoStream};
23use crate::session;
24use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
25use tokio::net::UnixStream;
26
27pub async fn connect(session: &str, auto_spawn: bool) -> Result<UnixStream, ClientError> {
28 let socket = paths::socket_path(session);
29 let pid = paths::pid_path(session);
30
31 if !session::is_daemon_alive(&pid) {
32 if auto_spawn {
33 crate::daemon::spawn::spawn_daemon(session).map_err(ClientError::SpawnFailed)?;
34 } else {
35 return Err(ClientError::NoDaemon);
36 }
37 }
38
39 UnixStream::connect(&socket)
40 .await
41 .map_err(ClientError::ConnectionFailed)
42}
43
44async fn send_request(
46 writer: &mut tokio::net::unix::OwnedWriteHalf,
47 req: &Request,
48) -> Result<(), ClientError> {
49 let mut json = serde_json::to_string(req).map_err(ClientError::Serialize)?;
50 json.push('\n');
51 writer
52 .write_all(json.as_bytes())
53 .await
54 .map_err(ClientError::Write)?;
55 writer.flush().await.map_err(ClientError::Flush)
56}
57
58pub async fn request(
59 session: &str,
60 req: &Request,
61 auto_spawn: bool,
62) -> Result<Response, ClientError> {
63 let stream = connect(session, auto_spawn).await?;
64 let (reader, mut writer) = stream.into_split();
65
66 send_request(&mut writer, req).await?;
67
68 let mut lines = BufReader::new(reader);
69 let mut line = String::new();
70 lines
71 .read_line(&mut line)
72 .await
73 .map_err(ClientError::Read)?;
74
75 serde_json::from_str(&line).map_err(ClientError::ParseResponse)
76}
77
78pub async fn enable_proxy(session: &str, proxy_port: Option<u16>) -> Option<i32> {
80 let req = Request::EnableProxy { proxy_port };
81 match request(session, &req, true).await {
82 Ok(Response::Ok { message }) => {
83 eprintln!("{}", message);
84 None
85 }
86 Ok(Response::Error { code, message }) => {
87 eprintln!("error enabling proxy: {}", message);
88 Some(code)
89 }
90 Err(e) => {
91 eprintln!("error enabling proxy: {}", e);
92 Some(1)
93 }
94 _ => None,
95 }
96}
97
98pub async fn stream_responses(
101 session: &str,
102 req: &Request,
103 auto_spawn: bool,
104 mut on_line: impl FnMut(&str, ProtoStream, &str),
105) -> Result<Response, ClientError> {
106 let stream = connect(session, auto_spawn).await?;
107 let (reader, mut writer) = stream.into_split();
108
109 send_request(&mut writer, req).await?;
110
111 let mut lines = BufReader::new(reader);
112 loop {
113 let mut line = String::new();
114 let n = lines
115 .read_line(&mut line)
116 .await
117 .map_err(ClientError::Read)?;
118 if n == 0 {
119 return Ok(Response::LogEnd);
120 } let resp: Response = serde_json::from_str(&line).map_err(ClientError::ParseResponse)?;
123 match resp {
124 Response::LogLine {
125 ref process,
126 stream,
127 ref line,
128 } => {
129 on_line(process, stream, line);
130 }
131 Response::LogEnd | Response::Error { .. } => return Ok(resp),
132 other => return Ok(other), }
134 }
135}