Skip to main content

syspulse_core/ipc/
server.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use tokio::io::AsyncWriteExt;
5use tracing::{error, info, warn};
6
7use crate::error::{Result, SyspulseError};
8use crate::ipc::protocol::{read_message, write_message, Request, Response};
9
10#[cfg(unix)]
11use interprocess::local_socket::GenericNamespaced as NameType;
12#[cfg(windows)]
13use interprocess::local_socket::GenericNamespaced as NameType;
14
15use interprocess::local_socket::{tokio::prelude::*, traits::tokio::Listener, ListenerOptions};
16
17pub struct IpcServer {
18    socket_path: PathBuf,
19}
20
21impl IpcServer {
22    pub fn new(socket_path: PathBuf) -> Self {
23        Self { socket_path }
24    }
25
26    /// Run the IPC server, dispatching each request to the given handler.
27    ///
28    /// The handler receives a `Request` and returns a `Response`. The server
29    /// keeps running until the handler returns a response indicating shutdown
30    /// or `shutdown_rx` fires.
31    pub async fn run<F, Fut>(
32        &self,
33        handler: Arc<F>,
34        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
35    ) -> Result<()>
36    where
37        F: Fn(Request) -> Fut + Send + Sync + 'static,
38        Fut: std::future::Future<Output = Response> + Send,
39    {
40        // On Unix, remove stale socket file if it exists.
41        #[cfg(unix)]
42        {
43            if self.socket_path.exists() {
44                std::fs::remove_file(&self.socket_path).ok();
45            }
46        }
47
48        let name = self.socket_name()?;
49        let listener = ListenerOptions::new()
50            .name(name)
51            .create_tokio()
52            .map_err(|e| SyspulseError::Ipc(format!("Failed to create listener: {}", e)))?;
53
54        info!("IPC server listening on {:?}", self.socket_path);
55
56        loop {
57            tokio::select! {
58                accept_result = listener.accept() => {
59                    match accept_result {
60                        Ok(stream) => {
61                            let handler = Arc::clone(&handler);
62                            tokio::spawn(async move {
63                                if let Err(e) = handle_connection(stream, handler).await {
64                                    warn!("IPC connection error: {}", e);
65                                }
66                            });
67                        }
68                        Err(e) => {
69                            error!("Failed to accept IPC connection: {}", e);
70                        }
71                    }
72                }
73                _ = shutdown_rx.recv() => {
74                    info!("IPC server shutting down");
75                    break;
76                }
77            }
78        }
79
80        // Cleanup socket file on Unix
81        #[cfg(unix)]
82        {
83            std::fs::remove_file(&self.socket_path).ok();
84        }
85
86        Ok(())
87    }
88
89    fn socket_name(&self) -> Result<interprocess::local_socket::Name<'_>> {
90        #[cfg(unix)]
91        {
92            let path_str = self
93                .socket_path
94                .to_str()
95                .ok_or_else(|| SyspulseError::Ipc("Invalid socket path".into()))?;
96            path_str
97                .to_ns_name::<NameType>()
98                .map_err(|e| SyspulseError::Ipc(format!("Invalid socket name: {}", e)))
99        }
100        #[cfg(windows)]
101        {
102            // On Windows, use a named pipe. The socket_path is something like
103            // \\.\pipe\syspulse, but interprocess expects just the name part.
104            let name_str = self
105                .socket_path
106                .file_name()
107                .and_then(|n| n.to_str())
108                .unwrap_or("syspulse");
109            name_str
110                .to_ns_name::<NameType>()
111                .map_err(|e| SyspulseError::Ipc(format!("Invalid pipe name: {}", e)))
112        }
113    }
114}
115
116async fn handle_connection<F, Fut>(
117    stream: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
118    handler: Arc<F>,
119) -> Result<()>
120where
121    F: Fn(Request) -> Fut + Send + Sync + 'static,
122    Fut: std::future::Future<Output = Response> + Send,
123{
124    let (mut reader, mut writer) = tokio::io::split(stream);
125
126    // Handle multiple requests per connection until the client disconnects.
127    loop {
128        let request: Option<Request> = read_message(&mut reader).await?;
129        let request = match request {
130            Some(r) => r,
131            None => break, // Client disconnected
132        };
133
134        let is_shutdown = matches!(request, Request::Shutdown);
135        let response = handler(request).await;
136        write_message(&mut writer, &response).await?;
137        writer.flush().await?;
138
139        if is_shutdown {
140            break;
141        }
142    }
143
144    Ok(())
145}