agent_procs/daemon/
server.rs1use crate::daemon::wait_engine;
2use crate::protocol::{Request, Response};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::net::UnixListener;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11use super::process_manager::ProcessManager;
12
13pub struct DaemonState {
14 pub process_manager: ProcessManager,
15}
16
17pub async fn run(session: &str, socket_path: &Path) {
18 let state = Arc::new(Mutex::new(DaemonState {
19 process_manager: ProcessManager::new(session),
20 }));
21
22 let listener = UnixListener::bind(socket_path).expect("failed to bind socket");
23
24 loop {
25 let (stream, _) = match listener.accept().await {
26 Ok(conn) => conn,
27 Err(_) => break,
28 };
29
30 let state = Arc::clone(&state);
31 tokio::spawn(async move {
32 let (reader, writer) = stream.into_split();
33 let writer = Arc::new(Mutex::new(writer));
34 let mut lines = BufReader::new(reader).lines();
35
36 while let Ok(Some(line)) = lines.next_line().await {
37 let request: Request = match serde_json::from_str(&line) {
38 Ok(r) => r,
39 Err(e) => {
40 let resp = Response::Error {
41 code: 1,
42 message: format!("invalid request: {}", e),
43 };
44 let _ = send_response(&writer, &resp).await;
45 continue;
46 }
47 };
48
49 if let Request::Logs {
51 follow: true,
52 ref target,
53 all,
54 timeout_secs,
55 lines,
56 ..
57 } = request
58 {
59 let output_rx = state.lock().await.process_manager.output_tx.subscribe();
60 let max_lines = lines;
61 let target_filter = target.clone();
62 let show_all = all;
63
64 handle_follow_stream(
65 &writer,
66 output_rx,
67 target_filter,
68 show_all,
69 timeout_secs,
70 max_lines,
71 )
72 .await;
73 continue; }
75
76 let is_shutdown = matches!(request, Request::Shutdown);
77
78 let response = handle_request(&state, request).await;
79 let _ = send_response(&writer, &response).await;
80
81 if is_shutdown {
82 tokio::time::sleep(Duration::from_millis(100)).await;
83 std::process::exit(0);
84 }
85 }
86 });
87 }
88}
89
90async fn handle_follow_stream(
91 writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
92 mut output_rx: broadcast::Receiver<super::log_writer::OutputLine>,
93 target: Option<String>,
94 all: bool,
95 timeout_secs: Option<u64>,
96 max_lines: Option<usize>,
97) {
98 let mut line_count: usize = 0;
99
100 let stream_loop = async {
101 loop {
102 match output_rx.recv().await {
103 Ok(output_line) => {
104 if !all {
105 if let Some(ref t) = target {
106 if output_line.process != *t {
107 continue;
108 }
109 }
110 }
111
112 let resp = Response::LogLine {
113 process: output_line.process,
114 stream: output_line.stream,
115 line: output_line.line,
116 };
117 if send_response(writer, &resp).await.is_err() {
118 return;
119 }
120
121 line_count += 1;
122 if let Some(max) = max_lines {
123 if line_count >= max {
124 return;
125 }
126 }
127 }
128 Err(broadcast::error::RecvError::Lagged(_)) => continue,
129 Err(broadcast::error::RecvError::Closed) => return,
130 }
131 }
132 };
133
134 match timeout_secs {
136 Some(secs) => {
137 let _ = tokio::time::timeout(Duration::from_secs(secs), stream_loop).await;
138 }
139 None => {
140 stream_loop.await;
141 }
142 }
143
144 let _ = send_response(writer, &Response::LogEnd).await;
145}
146
147async fn handle_request(state: &Arc<Mutex<DaemonState>>, request: Request) -> Response {
148 match request {
149 Request::Run {
150 command,
151 name,
152 cwd,
153 env,
154 } => {
155 state
156 .lock()
157 .await
158 .process_manager
159 .spawn_process(&command, name, cwd.as_deref(), env.as_ref())
160 .await
161 }
162 Request::Stop { target } => {
163 state
164 .lock()
165 .await
166 .process_manager
167 .stop_process(&target)
168 .await
169 }
170 Request::StopAll => state.lock().await.process_manager.stop_all().await,
171 Request::Restart { target } => {
172 state
173 .lock()
174 .await
175 .process_manager
176 .restart_process(&target)
177 .await
178 }
179 Request::Status => state.lock().await.process_manager.status(),
180 Request::Wait {
181 target,
182 until,
183 regex,
184 exit,
185 timeout_secs,
186 } => {
187 {
189 let s = state.lock().await;
190 if !s.process_manager.has_process(&target) {
191 return Response::Error {
192 code: 2,
193 message: format!("process not found: {}", target),
194 };
195 }
196 }
197 let output_rx = state.lock().await.process_manager.output_tx.subscribe();
199 let timeout = Duration::from_secs(timeout_secs.unwrap_or(30));
200 let state_clone = Arc::clone(state);
201 let target_clone = target.clone();
202 wait_engine::wait_for(
203 output_rx,
204 &target,
205 until.as_deref(),
206 regex,
207 exit,
208 timeout,
209 move || {
210 let state = state_clone.clone();
213 let target = target_clone.clone();
214 let result = match state.try_lock() {
216 Ok(mut s) => s.process_manager.is_process_exited(&target),
217 Err(_) => None,
218 };
219 result
220 },
221 )
222 .await
223 }
224 Request::Logs { follow: false, .. } => {
225 Response::Error {
227 code: 1,
228 message: "non-follow logs are read directly from disk by CLI".into(),
229 }
230 }
231 Request::Logs { follow: true, .. } => {
232 Response::Error {
234 code: 1,
235 message: "follow requests handled in connection loop".into(),
236 }
237 }
238 Request::Shutdown => {
239 state.lock().await.process_manager.stop_all().await;
240 Response::Ok {
241 message: "daemon shutting down".into(),
242 }
243 }
244 }
245}
246
247async fn send_response(
248 writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
249 response: &Response,
250) -> std::io::Result<()> {
251 let mut w = writer.lock().await;
252 let mut json = serde_json::to_string(response).unwrap();
253 json.push('\n');
254 w.write_all(json.as_bytes()).await?;
255 w.flush().await
256}