sidedns_core/ipc/
server.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use interprocess::local_socket::{GenericFilePath, ListenerOptions, ToFsName, tokio::prelude::*};
5use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
6use tokio::sync::broadcast;
7use tokio_util::sync::CancellationToken;
8
9use super::message::{DnsEvent, IpcRequest, IpcResponse};
10use anyhow::Result;
11
12#[async_trait]
18pub trait IpcHandler: Send + Sync + 'static {
19 async fn handle(&self, request: IpcRequest) -> IpcResponse;
21
22 fn subscribe_events(&self) -> broadcast::Receiver<DnsEvent>;
27}
28
29pub struct IpcServer {
34 socket_path: String,
35}
36
37impl Default for IpcServer {
38 fn default() -> Self {
39 Self {
40 socket_path: crate::IPC_SOCKET_PATH.to_string(),
41 }
42 }
43}
44
45impl Drop for IpcServer {
46 fn drop(&mut self) {
47 #[cfg(not(windows))]
48 if std::path::Path::new(&self.socket_path).exists() {
49 if let Err(e) = std::fs::remove_file(&self.socket_path) {
50 tracing::error!("Failed to remove IpcServer socket file: {e}");
51 } else {
52 tracing::info!("IpcServer socket file removed");
53 }
54 }
55 }
56}
57
58impl IpcServer {
59 pub fn with_path(path: impl Into<String>) -> Self {
64 Self {
65 socket_path: path.into(),
66 }
67 }
68
69 #[tracing::instrument(skip(self, handler, token), name = "IPC Server")]
79 pub async fn serve<H: IpcHandler>(
80 &self,
81 handler: Arc<H>,
82 token: CancellationToken,
83 ) -> Result<()> {
84 #[cfg(not(windows))]
85 {
86 if let Err(e) = std::fs::remove_file(&self.socket_path)
87 && e.kind() != std::io::ErrorKind::NotFound
88 {
89 return Err(e.into());
90 }
91 }
92
93 let name = self.socket_path.as_str().to_fs_name::<GenericFilePath>()?;
94 let options = ListenerOptions::new().name(name);
95
96 #[cfg(target_os = "linux")]
97 let options = {
98 use interprocess::os::unix::local_socket::ListenerOptionsExt;
99 options.mode(0o666)
100 };
101
102 #[cfg(target_os = "macos")]
103 {
104 use std::os::unix::fs::PermissionsExt;
105 if let Ok(metadata) = std::fs::metadata(&self.socket_path) {
106 let mut perms = metadata.permissions();
107 perms.set_mode(0o666);
108 let _ = std::fs::set_permissions(&self.socket_path, perms);
109 }
110 }
111
112 #[cfg(windows)]
113 let options = {
114 use interprocess::os::windows::{
119 local_socket::ListenerOptionsExt, security_descriptor::SecurityDescriptor,
120 };
121 use widestring::u16cstr;
122 let sd =
123 SecurityDescriptor::deserialize(u16cstr!("D:(A;;GA;;;BA)(A;;GA;;;SY)(A;;GA;;;AU)"))
124 .map_err(std::io::Error::other)?;
125 options.security_descriptor(sd)
126 };
127
128 let listener = options.create_tokio()?;
129
130 tracing::info!("Started");
131
132 let result = self.accept_loop(&listener, handler, token).await;
133
134 tracing::info!("Stopped");
135
136 result
137 }
138
139 async fn accept_loop<H: IpcHandler>(
140 &self,
141 listener: &LocalSocketListener,
142 handler: Arc<H>,
143 token: CancellationToken,
144 ) -> Result<()> {
145 loop {
146 tokio::select! {
147 biased;
148 _ = token.cancelled() => {
149 tracing::info!("Shutdown requested, stopping...");
150 break;
151 },
152 result = listener.accept() => {
153 let conn = result?;
154 let handler = handler.clone();
155 let token = token.clone();
156
157 tokio::spawn(async move {
158 if let Err(e) = handle_connection(conn, handler, token).await {
159 tracing::error!("Connection error: {e}");
160 }
161 });
162 }
163 }
164 }
165
166 Ok(())
167 }
168}
169
170#[tracing::instrument(skip(conn, handler, token), name = "IPC Server")]
171async fn handle_connection<H: IpcHandler>(
172 conn: LocalSocketStream,
173 handler: Arc<H>,
174 token: CancellationToken,
175) -> Result<()> {
176 let (reader, mut writer) = tokio::io::split(conn);
177 let mut lines = BufReader::new(reader).lines();
178
179 let Some(line) = lines.next_line().await? else {
180 return Ok(());
181 };
182
183 let request: IpcRequest = serde_json::from_str(&line)?;
184
185 if matches!(request, IpcRequest::Subscribe) {
186 handle_subscribe(&mut writer, handler, token).await
187 } else {
188 let response = handler.handle(request).await;
189 write_response(&mut writer, &response).await
190 }
191}
192
193async fn handle_subscribe<W>(
194 writer: &mut W,
195 handler: Arc<impl IpcHandler>,
196 token: CancellationToken,
197) -> Result<()>
198where
199 W: AsyncWriteExt + Unpin,
200{
201 let mut rx = handler.subscribe_events();
202
203 loop {
204 tokio::select! {
205 biased;
206 _ = token.cancelled() => break,
207 result = rx.recv() => {
208 match result {
209 Ok(event) => {
210 write_response(writer, &IpcResponse::Event(event)).await?;
211 }
212 Err(broadcast::error::RecvError::Lagged(_)) => continue,
213 Err(broadcast::error::RecvError::Closed) => break,
214 }
215 }
216 }
217 }
218
219 Ok(())
220}
221
222async fn write_response<W>(writer: &mut W, response: &IpcResponse) -> Result<()>
223where
224 W: AsyncWriteExt + Unpin,
225{
226 let mut payload = serde_json::to_string(response)?;
227 payload.push('\n');
228 writer.write_all(payload.as_bytes()).await?;
229 writer.flush().await?;
230 Ok(())
231}