Skip to main content

agent_procs/daemon/
server.rs

1use crate::daemon::wait_engine;
2use crate::protocol::{Request, Response};
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
7use tokio::net::UnixListener;
8use tokio::sync::broadcast;
9use tokio::sync::Mutex;
10
11use super::process_manager::ProcessManager;
12
13pub struct DaemonState {
14    pub process_manager: ProcessManager,
15}
16
17pub async fn run(session: &str, socket_path: &Path) {
18    let state = Arc::new(Mutex::new(DaemonState {
19        process_manager: ProcessManager::new(session),
20    }));
21
22    let listener = UnixListener::bind(socket_path).expect("failed to bind socket");
23
24    loop {
25        let (stream, _) = match listener.accept().await {
26            Ok(conn) => conn,
27            Err(_) => break,
28        };
29
30        let state = Arc::clone(&state);
31        tokio::spawn(async move {
32            let (reader, writer) = stream.into_split();
33            let writer = Arc::new(Mutex::new(writer));
34            let mut lines = BufReader::new(reader).lines();
35
36            while let Ok(Some(line)) = lines.next_line().await {
37                let request: Request = match serde_json::from_str(&line) {
38                    Ok(r) => r,
39                    Err(e) => {
40                        let resp = Response::Error { code: 1, message: format!("invalid request: {}", e) };
41                        let _ = send_response(&writer, &resp).await;
42                        continue;
43                    }
44                };
45
46                // Handle follow requests with streaming (before handle_request)
47                if let Request::Logs { follow: true, ref target, all, timeout_secs, lines, .. } = request {
48                    let output_rx = state.lock().await.process_manager.output_tx.subscribe();
49                    let max_lines = lines;
50                    let target_filter = target.clone();
51                    let show_all = all;
52
53                    handle_follow_stream(
54                        &writer, output_rx, target_filter, show_all, timeout_secs, max_lines
55                    ).await;
56                    continue; // Don't call handle_request
57                }
58
59                let is_shutdown = matches!(request, Request::Shutdown);
60
61                let response = handle_request(&state, request).await;
62                let _ = send_response(&writer, &response).await;
63
64                if is_shutdown {
65                    tokio::time::sleep(Duration::from_millis(100)).await;
66                    std::process::exit(0);
67                }
68            }
69        });
70    }
71}
72
73async fn handle_follow_stream(
74    writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>,
75    mut output_rx: broadcast::Receiver<super::log_writer::OutputLine>,
76    target: Option<String>,
77    all: bool,
78    timeout_secs: Option<u64>,
79    max_lines: Option<usize>,
80) {
81    let mut line_count: usize = 0;
82
83    let stream_loop = async {
84        loop {
85            match output_rx.recv().await {
86                Ok(output_line) => {
87                    if !all {
88                        if let Some(ref t) = target {
89                            if output_line.process != *t { continue; }
90                        }
91                    }
92
93                    let resp = Response::LogLine {
94                        process: output_line.process,
95                        stream: output_line.stream,
96                        line: output_line.line,
97                    };
98                    if send_response(writer, &resp).await.is_err() {
99                        return;
100                    }
101
102                    line_count += 1;
103                    if let Some(max) = max_lines {
104                        if line_count >= max { return; }
105                    }
106                }
107                Err(broadcast::error::RecvError::Lagged(_)) => continue,
108                Err(broadcast::error::RecvError::Closed) => return,
109            }
110        }
111    };
112
113    // Apply timeout only if specified; otherwise stream indefinitely
114    match timeout_secs {
115        Some(secs) => { let _ = tokio::time::timeout(Duration::from_secs(secs), stream_loop).await; }
116        None => { stream_loop.await; }
117    }
118
119    let _ = send_response(writer, &Response::LogEnd).await;
120}
121
122async fn handle_request(state: &Arc<Mutex<DaemonState>>, request: Request) -> Response {
123    match request {
124        Request::Run { command, name, cwd, env } => {
125            state.lock().await.process_manager.spawn_process(&command, name, cwd.as_deref(), env.as_ref()).await
126        }
127        Request::Stop { target } => {
128            state.lock().await.process_manager.stop_process(&target).await
129        }
130        Request::StopAll => {
131            state.lock().await.process_manager.stop_all().await
132        }
133        Request::Restart { target } => {
134            state.lock().await.process_manager.restart_process(&target).await
135        }
136        Request::Status => {
137            state.lock().await.process_manager.status()
138        }
139        Request::Wait { target, until, regex, exit, timeout_secs } => {
140            // Check process exists
141            {
142                let s = state.lock().await;
143                if !s.process_manager.has_process(&target) {
144                    return Response::Error { code: 2, message: format!("process not found: {}", target) };
145                }
146            }
147            // Subscribe to output and delegate to wait engine
148            let output_rx = state.lock().await.process_manager.output_tx.subscribe();
149            let timeout = Duration::from_secs(timeout_secs.unwrap_or(30));
150            let state_clone = Arc::clone(state);
151            let target_clone = target.clone();
152            wait_engine::wait_for(
153                output_rx,
154                &target,
155                until.as_deref(),
156                regex,
157                exit,
158                timeout,
159                move || {
160                    // This is called synchronously from the wait loop
161                    // We can't hold the lock across the whole wait, so we check briefly
162                    let state = state_clone.clone();
163                    let target = target_clone.clone();
164                    // Use try_lock to avoid deadlock
165                    let result = match state.try_lock() {
166                        Ok(mut s) => s.process_manager.is_process_exited(&target),
167                        Err(_) => None,
168                    };
169                    result
170                },
171            ).await
172        }
173        Request::Logs { follow: false, .. } => {
174            // Non-follow logs are read directly from files by the CLI — no daemon involvement needed
175            Response::Error { code: 1, message: "non-follow logs are read directly from disk by CLI".into() }
176        }
177        Request::Logs { follow: true, .. } => {
178            // Handled separately in connection loop (needs streaming)
179            Response::Error { code: 1, message: "follow requests handled in connection loop".into() }
180        }
181        Request::Shutdown => {
182            state.lock().await.process_manager.stop_all().await;
183            Response::Ok { message: "daemon shutting down".into() }
184        }
185    }
186}
187
188async fn send_response(writer: &Arc<Mutex<tokio::net::unix::OwnedWriteHalf>>, response: &Response) -> std::io::Result<()> {
189    let mut w = writer.lock().await;
190    let mut json = serde_json::to_string(response).unwrap();
191    json.push('\n');
192    w.write_all(json.as_bytes()).await?;
193    w.flush().await
194}