Skip to main content

agent_procs/daemon/
server.rs

1use crate::daemon::wait_engine;
2use crate::protocol::{Request, Response};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::net::UnixListener;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11use super::process_manager::ProcessManager;
12
13pub struct DaemonState {
14    pub process_manager: ProcessManager,
15}
16
17pub async fn run(session: &str, socket_path: &Path) {
18    let state = Arc::new(Mutex::new(DaemonState {
19        process_manager: ProcessManager::new(session),
20    }));
21
22    let listener = UnixListener::bind(socket_path).expect("failed to bind socket");
23
24    loop {
25        let (stream, _) = match listener.accept().await {
26            Ok(conn) => conn,
27            Err(_) => break,
28        };
29
30        let state = Arc::clone(&state);
31        tokio::spawn(async move {
32            let (reader, writer) = stream.into_split();
33            let writer = Arc::new(Mutex::new(writer));
34            let mut lines = BufReader::new(reader).lines();
35
36            while let Ok(Some(line)) = lines.next_line().await {
37                let request: Request = match serde_json::from_str(&line) {
38                    Ok(r) => r,
39                    Err(e) => {
40                        let resp = Response::Error {
41                            code: 1,
42                            message: format!("invalid request: {}", e),
43                        };
44                        let _ = send_response(&writer, &resp).await;
45                        continue;
46                    }
47                };
48
49                // Handle follow requests with streaming (before handle_request)
50                if let Request::Logs {
51                    follow: true,
52                    ref target,
53                    all,
54                    timeout_secs,
55                    lines,
56                    ..
57                } = request
58                {
59                    let output_rx = state.lock().await.process_manager.output_tx.subscribe();
60                    let max_lines = lines;
61                    let target_filter = target.clone();
62                    let show_all = all;
63
64                    handle_follow_stream(
65                        &writer,
66                        output_rx,
67                        target_filter,
68                        show_all,
69                        timeout_secs,
70                        max_lines,
71                    )
72                    .await;
73                    continue; // Don't call handle_request
74                }
75
76                let is_shutdown = matches!(request, Request::Shutdown);
77
78                let response = handle_request(&state, request).await;
79                let _ = send_response(&writer, &response).await;
80
81                if is_shutdown {
82                    tokio::time::sleep(Duration::from_millis(100)).await;
83                    std::process::exit(0);
84                }
85            }
86        });
87    }
88}
89
90async fn handle_follow_stream(
91    writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
92    mut output_rx: broadcast::Receiver<super::log_writer::OutputLine>,
93    target: Option<String>,
94    all: bool,
95    timeout_secs: Option<u64>,
96    max_lines: Option<usize>,
97) {
98    let mut line_count: usize = 0;
99
100    let stream_loop = async {
101        loop {
102            match output_rx.recv().await {
103                Ok(output_line) => {
104                    if !all {
105                        if let Some(ref t) = target {
106                            if output_line.process != *t {
107                                continue;
108                            }
109                        }
110                    }
111
112                    let resp = Response::LogLine {
113                        process: output_line.process,
114                        stream: output_line.stream,
115                        line: output_line.line,
116                    };
117                    if send_response(writer, &resp).await.is_err() {
118                        return;
119                    }
120
121                    line_count += 1;
122                    if let Some(max) = max_lines {
123                        if line_count >= max {
124                            return;
125                        }
126                    }
127                }
128                Err(broadcast::error::RecvError::Lagged(_)) => continue,
129                Err(broadcast::error::RecvError::Closed) => return,
130            }
131        }
132    };
133
134    // Apply timeout only if specified; otherwise stream indefinitely
135    match timeout_secs {
136        Some(secs) => {
137            let _ = tokio::time::timeout(Duration::from_secs(secs), stream_loop).await;
138        }
139        None => {
140            stream_loop.await;
141        }
142    }
143
144    let _ = send_response(writer, &Response::LogEnd).await;
145}
146
147async fn handle_request(state: &Arc<Mutex<DaemonState>>, request: Request) -> Response {
148    match request {
149        Request::Run {
150            command,
151            name,
152            cwd,
153            env,
154        } => {
155            state
156                .lock()
157                .await
158                .process_manager
159                .spawn_process(&command, name, cwd.as_deref(), env.as_ref())
160                .await
161        }
162        Request::Stop { target } => {
163            state
164                .lock()
165                .await
166                .process_manager
167                .stop_process(&target)
168                .await
169        }
170        Request::StopAll => state.lock().await.process_manager.stop_all().await,
171        Request::Restart { target } => {
172            state
173                .lock()
174                .await
175                .process_manager
176                .restart_process(&target)
177                .await
178        }
179        Request::Status => state.lock().await.process_manager.status(),
180        Request::Wait {
181            target,
182            until,
183            regex,
184            exit,
185            timeout_secs,
186        } => {
187            // Check process exists
188            {
189                let s = state.lock().await;
190                if !s.process_manager.has_process(&target) {
191                    return Response::Error {
192                        code: 2,
193                        message: format!("process not found: {}", target),
194                    };
195                }
196            }
197            // Subscribe to output and delegate to wait engine
198            let output_rx = state.lock().await.process_manager.output_tx.subscribe();
199            let timeout = Duration::from_secs(timeout_secs.unwrap_or(30));
200            let state_clone = Arc::clone(state);
201            let target_clone = target.clone();
202            wait_engine::wait_for(
203                output_rx,
204                &target,
205                until.as_deref(),
206                regex,
207                exit,
208                timeout,
209                move || {
210                    // This is called synchronously from the wait loop
211                    // We can't hold the lock across the whole wait, so we check briefly
212                    let state = state_clone.clone();
213                    let target = target_clone.clone();
214                    // Use try_lock to avoid deadlock
215                    let result = match state.try_lock() {
216                        Ok(mut s) => s.process_manager.is_process_exited(&target),
217                        Err(_) => None,
218                    };
219                    result
220                },
221            )
222            .await
223        }
224        Request::Logs { follow: false, .. } => {
225            // Non-follow logs are read directly from files by the CLI — no daemon involvement needed
226            Response::Error {
227                code: 1,
228                message: "non-follow logs are read directly from disk by CLI".into(),
229            }
230        }
231        Request::Logs { follow: true, .. } => {
232            // Handled separately in connection loop (needs streaming)
233            Response::Error {
234                code: 1,
235                message: "follow requests handled in connection loop".into(),
236            }
237        }
238        Request::Shutdown => {
239            state.lock().await.process_manager.stop_all().await;
240            Response::Ok {
241                message: "daemon shutting down".into(),
242            }
243        }
244    }
245}
246
247async fn send_response(
248    writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
249    response: &Response,
250) -> std::io::Result<()> {
251    let mut w = writer.lock().await;
252    let mut json = serde_json::to_string(response).unwrap();
253    json.push('\n');
254    w.write_all(json.as_bytes()).await?;
255    w.flush().await
256}