syspulse_core/ipc/
server.rs1use std::path::PathBuf;
2use std::sync::Arc;
3
4use tokio::io::AsyncWriteExt;
5use tracing::{error, info, warn};
6
7use crate::error::{Result, SyspulseError};
8use crate::ipc::protocol::{read_message, write_message, Request, Response};
9
10#[cfg(unix)]
11use interprocess::local_socket::GenericNamespaced as NameType;
12#[cfg(windows)]
13use interprocess::local_socket::GenericNamespaced as NameType;
14
15use interprocess::local_socket::{tokio::prelude::*, traits::tokio::Listener, ListenerOptions};
16
17pub struct IpcServer {
18 socket_path: PathBuf,
19}
20
21impl IpcServer {
22 pub fn new(socket_path: PathBuf) -> Self {
23 Self { socket_path }
24 }
25
26 pub async fn run<F, Fut>(
32 &self,
33 handler: Arc<F>,
34 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
35 ) -> Result<()>
36 where
37 F: Fn(Request) -> Fut + Send + Sync + 'static,
38 Fut: std::future::Future<Output = Response> + Send,
39 {
40 #[cfg(unix)]
42 {
43 if self.socket_path.exists() {
44 std::fs::remove_file(&self.socket_path).ok();
45 }
46 }
47
48 let name = self.socket_name()?;
49 let listener = ListenerOptions::new()
50 .name(name)
51 .create_tokio()
52 .map_err(|e| SyspulseError::Ipc(format!("Failed to create listener: {}", e)))?;
53
54 info!("IPC server listening on {:?}", self.socket_path);
55
56 loop {
57 tokio::select! {
58 accept_result = listener.accept() => {
59 match accept_result {
60 Ok(stream) => {
61 let handler = Arc::clone(&handler);
62 tokio::spawn(async move {
63 if let Err(e) = handle_connection(stream, handler).await {
64 warn!("IPC connection error: {}", e);
65 }
66 });
67 }
68 Err(e) => {
69 error!("Failed to accept IPC connection: {}", e);
70 }
71 }
72 }
73 _ = shutdown_rx.recv() => {
74 info!("IPC server shutting down");
75 break;
76 }
77 }
78 }
79
80 #[cfg(unix)]
82 {
83 std::fs::remove_file(&self.socket_path).ok();
84 }
85
86 Ok(())
87 }
88
89 fn socket_name(&self) -> Result<interprocess::local_socket::Name<'_>> {
90 #[cfg(unix)]
91 {
92 let path_str = self
93 .socket_path
94 .to_str()
95 .ok_or_else(|| SyspulseError::Ipc("Invalid socket path".into()))?;
96 path_str
97 .to_ns_name::<NameType>()
98 .map_err(|e| SyspulseError::Ipc(format!("Invalid socket name: {}", e)))
99 }
100 #[cfg(windows)]
101 {
102 let name_str = self
105 .socket_path
106 .file_name()
107 .and_then(|n| n.to_str())
108 .unwrap_or("syspulse");
109 name_str
110 .to_ns_name::<NameType>()
111 .map_err(|e| SyspulseError::Ipc(format!("Invalid pipe name: {}", e)))
112 }
113 }
114}
115
116async fn handle_connection<F, Fut>(
117 stream: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
118 handler: Arc<F>,
119) -> Result<()>
120where
121 F: Fn(Request) -> Fut + Send + Sync + 'static,
122 Fut: std::future::Future<Output = Response> + Send,
123{
124 let (mut reader, mut writer) = tokio::io::split(stream);
125
126 loop {
128 let request: Option<Request> = read_message(&mut reader).await?;
129 let request = match request {
130 Some(r) => r,
131 None => break, };
133
134 let is_shutdown = matches!(request, Request::Shutdown);
135 let response = handler(request).await;
136 write_message(&mut writer, &response).await?;
137 writer.flush().await?;
138
139 if is_shutdown {
140 break;
141 }
142 }
143
144 Ok(())
145}