Skip to main content

agent_procs/daemon/
server.rs

1use crate::daemon::wait_engine;
2use crate::protocol::{Request, Response};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::net::UnixListener;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11use super::process_manager::ProcessManager;
12
13pub struct DaemonState {
14    pub process_manager: ProcessManager,
15}
16
17pub async fn run(session: &str, socket_path: &Path) {
18    let state = Arc::new(Mutex::new(DaemonState {
19        process_manager: ProcessManager::new(session),
20    }));
21
22    let listener = match UnixListener::bind(socket_path) {
23        Ok(l) => l,
24        Err(e) => {
25            eprintln!("fatal: failed to bind socket {:?}: {}", socket_path, e);
26            return;
27        }
28    };
29
30    // Shutdown signal: set to true when a Shutdown request is handled
31    let shutdown = Arc::new(tokio::sync::Notify::new());
32
33    loop {
34        let (stream, _) = tokio::select! {
35            result = listener.accept() => match result {
36                Ok(conn) => conn,
37                Err(e) => {
38                    eprintln!("warning: accept error: {}", e);
39                    continue;
40                }
41            },
42            _ = shutdown.notified() => break,
43        };
44
45        let state = Arc::clone(&state);
46        let shutdown = Arc::clone(&shutdown);
47        tokio::spawn(async move {
48            let (reader, writer) = stream.into_split();
49            let writer = Arc::new(Mutex::new(writer));
50            let mut lines = BufReader::new(reader).lines();
51
52            while let Ok(Some(line)) = lines.next_line().await {
53                let request: Request = match serde_json::from_str(&line) {
54                    Ok(r) => r,
55                    Err(e) => {
56                        let resp = Response::Error {
57                            code: 1,
58                            message: format!("invalid request: {}", e),
59                        };
60                        let _ = send_response(&writer, &resp).await;
61                        continue;
62                    }
63                };
64
65                // Handle follow requests with streaming (before handle_request)
66                if let Request::Logs {
67                    follow: true,
68                    ref target,
69                    all,
70                    timeout_secs,
71                    lines,
72                    ..
73                } = request
74                {
75                    let output_rx = state.lock().await.process_manager.output_tx.subscribe();
76                    let max_lines = lines;
77                    let target_filter = target.clone();
78                    let show_all = all;
79
80                    handle_follow_stream(
81                        &writer,
82                        output_rx,
83                        target_filter,
84                        show_all,
85                        timeout_secs,
86                        max_lines,
87                    )
88                    .await;
89                    continue; // Don't call handle_request
90                }
91
92                let is_shutdown = matches!(request, Request::Shutdown);
93
94                let response = handle_request(&state, request).await;
95                let _ = send_response(&writer, &response).await;
96
97                if is_shutdown {
98                    shutdown.notify_one();
99                    return;
100                }
101            }
102        });
103    }
104}
105
106async fn handle_follow_stream(
107    writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
108    mut output_rx: broadcast::Receiver<super::log_writer::OutputLine>,
109    target: Option<String>,
110    all: bool,
111    timeout_secs: Option<u64>,
112    max_lines: Option<usize>,
113) {
114    let mut line_count: usize = 0;
115
116    let stream_loop = async {
117        loop {
118            match output_rx.recv().await {
119                Ok(output_line) => {
120                    if !all {
121                        if let Some(ref t) = target {
122                            if output_line.process != *t {
123                                continue;
124                            }
125                        }
126                    }
127
128                    let resp = Response::LogLine {
129                        process: output_line.process,
130                        stream: output_line.stream,
131                        line: output_line.line,
132                    };
133                    if send_response(writer, &resp).await.is_err() {
134                        return;
135                    }
136
137                    line_count += 1;
138                    if let Some(max) = max_lines {
139                        if line_count >= max {
140                            return;
141                        }
142                    }
143                }
144                Err(broadcast::error::RecvError::Lagged(_)) => continue,
145                Err(broadcast::error::RecvError::Closed) => return,
146            }
147        }
148    };
149
150    // Apply timeout only if specified; otherwise stream indefinitely
151    match timeout_secs {
152        Some(secs) => {
153            let _ = tokio::time::timeout(Duration::from_secs(secs), stream_loop).await;
154        }
155        None => {
156            stream_loop.await;
157        }
158    }
159
160    let _ = send_response(writer, &Response::LogEnd).await;
161}
162
163async fn handle_request(state: &Arc<Mutex<DaemonState>>, request: Request) -> Response {
164    match request {
165        Request::Run {
166            command,
167            name,
168            cwd,
169            env,
170        } => {
171            state
172                .lock()
173                .await
174                .process_manager
175                .spawn_process(&command, name, cwd.as_deref(), env.as_ref())
176                .await
177        }
178        Request::Stop { target } => {
179            state
180                .lock()
181                .await
182                .process_manager
183                .stop_process(&target)
184                .await
185        }
186        Request::StopAll => state.lock().await.process_manager.stop_all().await,
187        Request::Restart { target } => {
188            state
189                .lock()
190                .await
191                .process_manager
192                .restart_process(&target)
193                .await
194        }
195        Request::Status => state.lock().await.process_manager.status(),
196        Request::Wait {
197            target,
198            until,
199            regex,
200            exit,
201            timeout_secs,
202        } => {
203            // Check process exists
204            {
205                let s = state.lock().await;
206                if !s.process_manager.has_process(&target) {
207                    return Response::Error {
208                        code: 2,
209                        message: format!("process not found: {}", target),
210                    };
211                }
212            }
213            // Subscribe to output and delegate to wait engine
214            let output_rx = state.lock().await.process_manager.output_tx.subscribe();
215            let timeout = Duration::from_secs(timeout_secs.unwrap_or(30));
216            let state_clone = Arc::clone(state);
217            let target_clone = target.clone();
218            wait_engine::wait_for(
219                output_rx,
220                &target,
221                until.as_deref(),
222                regex,
223                exit,
224                timeout,
225                move || {
226                    // This is called synchronously from the wait loop
227                    // We can't hold the lock across the whole wait, so we check briefly
228                    let state = state_clone.clone();
229                    let target = target_clone.clone();
230                    // Use try_lock to avoid deadlock
231                    let result = match state.try_lock() {
232                        Ok(mut s) => s.process_manager.is_process_exited(&target),
233                        Err(_) => None,
234                    };
235                    result
236                },
237            )
238            .await
239        }
240        Request::Logs { follow: false, .. } => {
241            // Non-follow logs are read directly from files by the CLI — no daemon involvement needed
242            Response::Error {
243                code: 1,
244                message: "non-follow logs are read directly from disk by CLI".into(),
245            }
246        }
247        Request::Logs { follow: true, .. } => {
248            // Handled separately in connection loop (needs streaming)
249            Response::Error {
250                code: 1,
251                message: "follow requests handled in connection loop".into(),
252            }
253        }
254        Request::Shutdown => {
255            state.lock().await.process_manager.stop_all().await;
256            Response::Ok {
257                message: "daemon shutting down".into(),
258            }
259        }
260    }
261}
262
263async fn send_response(
264    writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
265    response: &Response,
266) -> std::io::Result<()> {
267    let mut w = writer.lock().await;
268    let mut json = serde_json::to_string(response)
269        .expect("Response serialization should never fail for well-typed enums");
270    json.push('\n');
271    w.write_all(json.as_bytes()).await?;
272    w.flush().await
273}