synwire_daemon/
lifecycle.rs1use std::path::Path;
15use std::sync::Arc;
16use std::sync::atomic::{AtomicUsize, Ordering};
17
18use tokio::sync::Notify;
19use tracing::{error, info, warn};
20
21pub const GRACE_PERIOD: std::time::Duration = std::time::Duration::from_secs(5 * 60);
24
25pub async fn run_daemon(sock_path: &Path, pid_path: &Path) -> i32 {
33 remove_stale_socket(sock_path);
35
36 let Some(listener) = bind_listener(sock_path) else {
37 cleanup(pid_path, sock_path);
38 return 1;
39 };
40
41 info!(path = %sock_path.display(), "listening on Unix domain socket");
42
43 let client_count = Arc::new(AtomicUsize::new(0));
44 let client_changed = Arc::new(Notify::new());
45
46 let accept_clients = client_count.clone();
48 let accept_notify = client_changed.clone();
49 let accept_handle = tokio::spawn(async move {
50 accept_loop(listener, accept_clients, accept_notify).await;
51 });
52
53 let reason = shutdown_monitor(client_count, client_changed).await;
55 info!(reason = %reason, "initiating shutdown");
56
57 accept_handle.abort();
59 let _ = accept_handle.await;
60
61 0
62}
63
64#[cfg(unix)]
68async fn accept_loop(
69 listener: tokio::net::UnixListener,
70 client_count: Arc<AtomicUsize>,
71 client_changed: Arc<Notify>,
72) {
73 loop {
74 let (stream, _addr) = match listener.accept().await {
75 Ok(pair) => pair,
76 Err(e) => {
77 warn!("Failed to accept connection: {e}");
78 continue;
79 }
80 };
81
82 let count = client_count.clone();
83 let notify = client_changed.clone();
84
85 let prev = count.fetch_add(1, Ordering::SeqCst);
86 info!(clients = prev + 1, "client connected");
87 notify.notify_waiters();
88
89 let _handle: tokio::task::JoinHandle<()> = tokio::spawn(async move {
92 handle_client(stream).await;
93
94 let prev = count.fetch_sub(1, Ordering::SeqCst);
95 info!(clients = prev - 1, "client disconnected");
96 notify.notify_waiters();
97 });
98 }
99}
100
101#[cfg(unix)]
104async fn handle_client(stream: tokio::net::UnixStream) {
105 let _ = stream.readable().await;
107 let mut buf = [0u8; 1024];
109 loop {
110 match stream.try_read(&mut buf) {
111 Ok(0) | Err(_) => break,
113 Ok(_) => {
114 let _ = stream.readable().await;
116 }
117 }
118 }
119}
120
121#[cfg(not(unix))]
123async fn accept_loop(_listener: (), _client_count: Arc<AtomicUsize>, _client_changed: Arc<Notify>) {
124 std::future::pending::<()>().await;
126}
127
128async fn shutdown_monitor(
135 client_count: Arc<AtomicUsize>,
136 client_changed: Arc<Notify>,
137) -> &'static str {
138 loop {
140 let clients = client_count.load(Ordering::SeqCst);
141
142 if clients > 0 {
143 tokio::select! {
146 () = shutdown_signal() => return "received shutdown signal",
147 () = client_changed.notified() => {}
148 }
149 } else {
150 info!(
152 grace_secs = GRACE_PERIOD.as_secs(),
153 "no active clients, grace period started"
154 );
155
156 tokio::select! {
157 () = shutdown_signal() => return "received shutdown signal",
158 () = tokio::time::sleep(GRACE_PERIOD) => {
159 if client_count.load(Ordering::SeqCst) == 0 {
162 return "grace period expired with no clients";
163 }
164 }
166 () = client_changed.notified() => {}
167 }
168 }
169 }
170}
171
172async fn shutdown_signal() {
174 #[cfg(unix)]
175 {
176 use tokio::signal::unix::{SignalKind, signal};
177 let mut sigterm = signal(SignalKind::terminate()).unwrap_or_else(|e| {
178 error!("Failed to register SIGTERM handler: {e}");
180 std::process::exit(1);
181 });
182 let mut sigint = signal(SignalKind::interrupt()).unwrap_or_else(|e| {
183 error!("Failed to register SIGINT handler: {e}");
184 std::process::exit(1);
185 });
186 tokio::select! {
187 () = async { let _ = sigterm.recv().await; } => {}
188 () = async { let _ = sigint.recv().await; } => {}
189 }
190 }
191
192 #[cfg(not(unix))]
193 {
194 let _ = tokio::signal::ctrl_c().await;
195 }
196}
197
198pub fn check_existing_daemon(pid_path: &Path) -> Result<(), String> {
205 let contents = match std::fs::read_to_string(pid_path) {
206 Ok(c) => c,
207 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
208 Err(e) => {
209 return Err(format!(
210 "Unable to read PID file {}: {e}",
211 pid_path.display()
212 ));
213 }
214 };
215
216 let Ok(pid) = contents.trim().parse::<u32>() else {
217 warn!(
218 path = %pid_path.display(),
219 contents = %contents.trim(),
220 "Removing PID file with invalid contents"
221 );
222 let _ = std::fs::remove_file(pid_path);
223 return Ok(());
224 };
225
226 if is_process_alive(pid) {
227 return Err(format!(
228 "Another synwire-daemon is already running (PID {pid})"
229 ));
230 }
231
232 warn!(pid, "Removing stale PID file for dead process");
234 let _ = std::fs::remove_file(pid_path);
235 Ok(())
236}
237
238pub fn write_pid_file(pid_path: &Path) -> std::io::Result<()> {
240 std::fs::write(pid_path, format!("{}\n", std::process::id()))
241}
242
243#[cfg(unix)]
245fn is_process_alive(pid: u32) -> bool {
246 nix::sys::signal::kill(
248 nix::unistd::Pid::from_raw(i32::try_from(pid).unwrap_or(0)),
249 None,
250 )
251 .is_ok()
252}
253
254#[cfg(not(unix))]
256fn is_process_alive(_pid: u32) -> bool {
257 true
260}
261
262fn remove_stale_socket(sock_path: &Path) {
266 if sock_path.exists() {
267 warn!(path = %sock_path.display(), "Removing stale daemon socket");
268 let _ = std::fs::remove_file(sock_path);
269 }
270}
271
272#[cfg(unix)]
274fn bind_listener(sock_path: &Path) -> Option<tokio::net::UnixListener> {
275 match tokio::net::UnixListener::bind(sock_path) {
276 Ok(l) => {
277 use std::os::unix::fs::PermissionsExt;
279 let perms = std::fs::Permissions::from_mode(0o600);
280 if let Err(e) = std::fs::set_permissions(sock_path, perms) {
281 warn!(
282 path = %sock_path.display(),
283 "Failed to restrict socket permissions: {e}"
284 );
285 }
286 Some(l)
287 }
288 Err(e) => {
289 error!(path = %sock_path.display(), "Failed to bind Unix socket: {e}");
290 None
291 }
292 }
293}
294
295#[cfg(not(unix))]
297fn bind_listener(sock_path: &Path) -> Option<()> {
298 warn!(
299 path = %sock_path.display(),
300 "Unix domain sockets are not supported on this platform; running without IPC listener"
301 );
302 Some(())
303}
304
305pub fn cleanup(pid_path: &Path, sock_path: &Path) {
309 if let Err(e) = std::fs::remove_file(pid_path)
310 && e.kind() != std::io::ErrorKind::NotFound
311 {
312 warn!(path = %pid_path.display(), "Failed to remove PID file: {e}");
313 }
314 if let Err(e) = std::fs::remove_file(sock_path)
315 && e.kind() != std::io::ErrorKind::NotFound
316 {
317 warn!(path = %sock_path.display(), "Failed to remove socket: {e}");
318 }
319}