agent_procs/daemon/
server.rs1use crate::daemon::wait_engine;
2use crate::protocol::{Request, Response};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::net::UnixListener;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11use super::process_manager::ProcessManager;
12
13pub struct DaemonState {
14 pub process_manager: ProcessManager,
15}
16
17pub async fn run(session: &str, socket_path: &Path) {
18 let state = Arc::new(Mutex::new(DaemonState {
19 process_manager: ProcessManager::new(session),
20 }));
21
22 let listener = match UnixListener::bind(socket_path) {
23 Ok(l) => l,
24 Err(e) => {
25 eprintln!("fatal: failed to bind socket {:?}: {}", socket_path, e);
26 return;
27 }
28 };
29
30 let shutdown = Arc::new(tokio::sync::Notify::new());
32
33 loop {
34 let (stream, _) = tokio::select! {
35 result = listener.accept() => match result {
36 Ok(conn) => conn,
37 Err(e) => {
38 eprintln!("warning: accept error: {}", e);
39 continue;
40 }
41 },
42 _ = shutdown.notified() => break,
43 };
44
45 let state = Arc::clone(&state);
46 let shutdown = Arc::clone(&shutdown);
47 tokio::spawn(async move {
48 let (reader, writer) = stream.into_split();
49 let writer = Arc::new(Mutex::new(writer));
50 let mut lines = BufReader::new(reader).lines();
51
52 while let Ok(Some(line)) = lines.next_line().await {
53 let request: Request = match serde_json::from_str(&line) {
54 Ok(r) => r,
55 Err(e) => {
56 let resp = Response::Error {
57 code: 1,
58 message: format!("invalid request: {}", e),
59 };
60 let _ = send_response(&writer, &resp).await;
61 continue;
62 }
63 };
64
65 if let Request::Logs {
67 follow: true,
68 ref target,
69 all,
70 timeout_secs,
71 lines,
72 ..
73 } = request
74 {
75 let output_rx = state.lock().await.process_manager.output_tx.subscribe();
76 let max_lines = lines;
77 let target_filter = target.clone();
78 let show_all = all;
79
80 handle_follow_stream(
81 &writer,
82 output_rx,
83 target_filter,
84 show_all,
85 timeout_secs,
86 max_lines,
87 )
88 .await;
89 continue; }
91
92 let is_shutdown = matches!(request, Request::Shutdown);
93
94 let response = handle_request(&state, request).await;
95 let _ = send_response(&writer, &response).await;
96
97 if is_shutdown {
98 shutdown.notify_one();
99 return;
100 }
101 }
102 });
103 }
104}
105
106async fn handle_follow_stream(
107 writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
108 mut output_rx: broadcast::Receiver<super::log_writer::OutputLine>,
109 target: Option<String>,
110 all: bool,
111 timeout_secs: Option<u64>,
112 max_lines: Option<usize>,
113) {
114 let mut line_count: usize = 0;
115
116 let stream_loop = async {
117 loop {
118 match output_rx.recv().await {
119 Ok(output_line) => {
120 if !all {
121 if let Some(ref t) = target {
122 if output_line.process != *t {
123 continue;
124 }
125 }
126 }
127
128 let resp = Response::LogLine {
129 process: output_line.process,
130 stream: output_line.stream,
131 line: output_line.line,
132 };
133 if send_response(writer, &resp).await.is_err() {
134 return;
135 }
136
137 line_count += 1;
138 if let Some(max) = max_lines {
139 if line_count >= max {
140 return;
141 }
142 }
143 }
144 Err(broadcast::error::RecvError::Lagged(_)) => continue,
145 Err(broadcast::error::RecvError::Closed) => return,
146 }
147 }
148 };
149
150 match timeout_secs {
152 Some(secs) => {
153 let _ = tokio::time::timeout(Duration::from_secs(secs), stream_loop).await;
154 }
155 None => {
156 stream_loop.await;
157 }
158 }
159
160 let _ = send_response(writer, &Response::LogEnd).await;
161}
162
163async fn handle_request(state: &Arc<Mutex<DaemonState>>, request: Request) -> Response {
164 match request {
165 Request::Run {
166 command,
167 name,
168 cwd,
169 env,
170 } => {
171 state
172 .lock()
173 .await
174 .process_manager
175 .spawn_process(&command, name, cwd.as_deref(), env.as_ref())
176 .await
177 }
178 Request::Stop { target } => {
179 state
180 .lock()
181 .await
182 .process_manager
183 .stop_process(&target)
184 .await
185 }
186 Request::StopAll => state.lock().await.process_manager.stop_all().await,
187 Request::Restart { target } => {
188 state
189 .lock()
190 .await
191 .process_manager
192 .restart_process(&target)
193 .await
194 }
195 Request::Status => state.lock().await.process_manager.status(),
196 Request::Wait {
197 target,
198 until,
199 regex,
200 exit,
201 timeout_secs,
202 } => {
203 {
205 let s = state.lock().await;
206 if !s.process_manager.has_process(&target) {
207 return Response::Error {
208 code: 2,
209 message: format!("process not found: {}", target),
210 };
211 }
212 }
213 let output_rx = state.lock().await.process_manager.output_tx.subscribe();
215 let timeout = Duration::from_secs(timeout_secs.unwrap_or(30));
216 let state_clone = Arc::clone(state);
217 let target_clone = target.clone();
218 wait_engine::wait_for(
219 output_rx,
220 &target,
221 until.as_deref(),
222 regex,
223 exit,
224 timeout,
225 move || {
226 let state = state_clone.clone();
229 let target = target_clone.clone();
230 let result = match state.try_lock() {
232 Ok(mut s) => s.process_manager.is_process_exited(&target),
233 Err(_) => None,
234 };
235 result
236 },
237 )
238 .await
239 }
240 Request::Logs { follow: false, .. } => {
241 Response::Error {
243 code: 1,
244 message: "non-follow logs are read directly from disk by CLI".into(),
245 }
246 }
247 Request::Logs { follow: true, .. } => {
248 Response::Error {
250 code: 1,
251 message: "follow requests handled in connection loop".into(),
252 }
253 }
254 Request::Shutdown => {
255 state.lock().await.process_manager.stop_all().await;
256 Response::Ok {
257 message: "daemon shutting down".into(),
258 }
259 }
260 }
261}
262
263async fn send_response(
264 writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
265 response: &Response,
266) -> std::io::Result<()> {
267 let mut w = writer.lock().await;
268 let mut json = serde_json::to_string(response)
269 .expect("Response serialization should never fail for well-typed enums");
270 json.push('\n');
271 w.write_all(json.as_bytes()).await?;
272 w.flush().await
273}