agent_procs/daemon/
server.rs1use crate::daemon::wait_engine;
2use crate::protocol::{Request, Response};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::net::UnixListener;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11use super::process_manager::ProcessManager;
12
13pub struct DaemonState {
14 pub process_manager: ProcessManager,
15}
16
17pub async fn run(session: &str, socket_path: &Path) {
18 let state = Arc::new(Mutex::new(DaemonState {
19 process_manager: ProcessManager::new(session),
20 }));
21
22 let listener = UnixListener::bind(socket_path).expect("failed to bind socket");
23
24 loop {
25 let (stream, _) = match listener.accept().await {
26 Ok(conn) => conn,
27 Err(_) => break,
28 };
29
30 let state = Arc::clone(&state);
31 tokio::spawn(async move {
32 let (reader, writer) = stream.into_split();
33 let writer = Arc::new(Mutex::new(writer));
34 let mut lines = BufReader::new(reader).lines();
35
36 while let Ok(Some(line)) = lines.next_line().await {
37 let request: Request = match serde_json::from_str(&line) {
38 Ok(r) => r,
39 Err(e) => {
40 let resp = Response::Error { code: 1, message: format!("invalid request: {}", e) };
41 let _ = send_response(&writer, &resp).await;
42 continue;
43 }
44 };
45
46 if let Request::Logs { follow: true, ref target, all, timeout_secs, lines, .. } = request {
48 let output_rx = state.lock().await.process_manager.output_tx.subscribe();
49 let max_lines = lines;
50 let target_filter = target.clone();
51 let show_all = all;
52
53 handle_follow_stream(
54 &writer, output_rx, target_filter, show_all, timeout_secs, max_lines
55 ).await;
56 continue; }
58
59 let is_shutdown = matches!(request, Request::Shutdown);
60
61 let response = handle_request(&state, request).await;
62 let _ = send_response(&writer, &response).await;
63
64 if is_shutdown {
65 tokio::time::sleep(Duration::from_millis(100)).await;
66 std::process::exit(0);
67 }
68 }
69 });
70 }
71}
72
73async fn handle_follow_stream(
74 writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
75 mut output_rx: broadcast::Receiver<super::log_writer::OutputLine>,
76 target: Option<String>,
77 all: bool,
78 timeout_secs: Option<u64>,
79 max_lines: Option<usize>,
80) {
81 let mut line_count: usize = 0;
82
83 let stream_loop = async {
84 loop {
85 match output_rx.recv().await {
86 Ok(output_line) => {
87 if !all {
88 if let Some(ref t) = target {
89 if output_line.process != *t { continue; }
90 }
91 }
92
93 let resp = Response::LogLine {
94 process: output_line.process,
95 stream: output_line.stream,
96 line: output_line.line,
97 };
98 if send_response(writer, &resp).await.is_err() {
99 return;
100 }
101
102 line_count += 1;
103 if let Some(max) = max_lines {
104 if line_count >= max { return; }
105 }
106 }
107 Err(broadcast::error::RecvError::Lagged(_)) => continue,
108 Err(broadcast::error::RecvError::Closed) => return,
109 }
110 }
111 };
112
113 match timeout_secs {
115 Some(secs) => { let _ = tokio::time::timeout(Duration::from_secs(secs), stream_loop).await; }
116 None => { stream_loop.await; }
117 }
118
119 let _ = send_response(writer, &Response::LogEnd).await;
120}
121
122async fn handle_request(state: &Arc<Mutex<DaemonState>>, request: Request) -> Response {
123 match request {
124 Request::Run { command, name, cwd, env } => {
125 state.lock().await.process_manager.spawn_process(&command, name, cwd.as_deref(), env.as_ref()).await
126 }
127 Request::Stop { target } => {
128 state.lock().await.process_manager.stop_process(&target).await
129 }
130 Request::StopAll => {
131 state.lock().await.process_manager.stop_all().await
132 }
133 Request::Restart { target } => {
134 state.lock().await.process_manager.restart_process(&target).await
135 }
136 Request::Status => {
137 state.lock().await.process_manager.status()
138 }
139 Request::Wait { target, until, regex, exit, timeout_secs } => {
140 {
142 let s = state.lock().await;
143 if !s.process_manager.has_process(&target) {
144 return Response::Error { code: 2, message: format!("process not found: {}", target) };
145 }
146 }
147 let output_rx = state.lock().await.process_manager.output_tx.subscribe();
149 let timeout = Duration::from_secs(timeout_secs.unwrap_or(30));
150 let state_clone = Arc::clone(state);
151 let target_clone = target.clone();
152 wait_engine::wait_for(
153 output_rx,
154 &target,
155 until.as_deref(),
156 regex,
157 exit,
158 timeout,
159 move || {
160 let state = state_clone.clone();
163 let target = target_clone.clone();
164 let result = match state.try_lock() {
166 Ok(mut s) => s.process_manager.is_process_exited(&target),
167 Err(_) => None,
168 };
169 result
170 },
171 ).await
172 }
173 Request::Logs { follow: false, .. } => {
174 Response::Error { code: 1, message: "non-follow logs are read directly from disk by CLI".into() }
176 }
177 Request::Logs { follow: true, .. } => {
178 Response::Error { code: 1, message: "follow requests handled in connection loop".into() }
180 }
181 Request::Shutdown => {
182 state.lock().await.process_manager.stop_all().await;
183 Response::Ok { message: "daemon shutting down".into() }
184 }
185 }
186}
187
188async fn send_response(writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>, response: &Response) -> std::io::Result<()> {
189 let mut w = writer.lock().await;
190 let mut json = serde_json::to_string(response).unwrap();
191 json.push('\n');
192 w.write_all(json.as_bytes()).await?;
193 w.flush().await
194}