Skip to main content

sidedns_core/ipc/
client.rs

1use interprocess::local_socket::{GenericFilePath, ToFsName, tokio::prelude::*};
2use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
3use tokio::sync::mpsc;
4
5use super::IpcError;
6use super::message::{DnsEvent, IpcRequest, IpcResponse};
7use anyhow::Result;
8
9/// Client for communicating with a running [`super::IpcServer`].
10///
11/// Uses Unix domain sockets on Linux/macOS and named pipes on Windows.
12/// Each method opens a fresh connection for the duration of the call.
13///
14/// # Examples
15///
16/// ```no_run
17/// # use sidedns_core::{IpcClient, IpcRequest};
18/// # #[tokio::main] async fn main() -> anyhow::Result<()> {
19/// let client = IpcClient::default();
20///
21/// if client.is_running().await {
22///     let response = client.send(IpcRequest::List).await?;
23///     println!("{response:?}");
24/// }
25/// # Ok(())
26/// # }
27/// ```
28pub struct IpcClient {
29    socket_path: String,
30}
31
32impl Default for IpcClient {
33    fn default() -> Self {
34        Self {
35            socket_path: crate::IPC_SOCKET_PATH.to_string(),
36        }
37    }
38}
39
40impl IpcClient {
41    /// Create a client targeting a custom socket path.
42    ///
43    /// Prefer [`IpcClient::default`] in production code.
44    /// This method exists primarily for test isolation.
45    pub fn with_path(path: impl Into<String>) -> Self {
46        Self {
47            socket_path: path.into(),
48        }
49    }
50
51    /// Return `true` if a daemon is reachable on the configured socket.
52    ///
53    /// This is a lightweight connection probe — no request is sent.
54    pub async fn is_running(&self) -> bool {
55        let Ok(name) = self.socket_path.as_str().to_fs_name::<GenericFilePath>() else {
56            return false;
57        };
58        LocalSocketStream::connect(name).await.is_ok()
59    }
60
61    /// Send a single request to the daemon and await its response.
62    ///
63    /// Opens a connection, writes the request as a newline-terminated JSON
64    /// object, reads one response line, then closes the connection.
65    ///
66    /// # Errors
67    ///
68    /// Returns [`IpcError`] if the connection fails, the daemon is not running,
69    /// or serialization of the request or response fails.
70    pub async fn send(&self, request: IpcRequest) -> Result<IpcResponse> {
71        let name = self.socket_path.as_str().to_fs_name::<GenericFilePath>()?;
72        let conn = LocalSocketStream::connect(name)
73            .await
74            .map_err(|e| anyhow::anyhow!("Failed to connect to IPC server. Is it running?\n{e}"))?;
75
76        let (reader, mut writer) = tokio::io::split(conn);
77
78        let mut payload = serde_json::to_string(&request)?;
79        payload.push('\n');
80        writer.write_all(payload.as_bytes()).await?;
81        writer.flush().await?;
82
83        let mut lines = BufReader::new(reader).lines();
84        let line = lines.next_line().await?.ok_or(IpcError::ConnectionClosed)?;
85
86        Ok(serde_json::from_str(&line)?)
87    }
88
89    /// Subscribe to daemon events and receive them through an async channel.
90    ///
91    /// Connects to the daemon, sends a [`IpcRequest::Subscribe`] command, and
92    /// immediately returns a [`mpsc::Receiver`] that yields [`DnsEvent`] values
93    /// as they arrive. The background task exits when the connection is closed.
94    ///
95    /// Dropping the returned receiver causes the background task to exit on the
96    /// next event delivery attempt.
97    ///
98    /// # Errors
99    ///
100    /// Returns [`IpcError`] if the initial connection or subscribe handshake fails.
101    pub async fn subscribe(&self) -> Result<mpsc::Receiver<DnsEvent>> {
102        let name = self.socket_path.as_str().to_fs_name::<GenericFilePath>()?;
103        let conn = LocalSocketStream::connect(name)
104            .await
105            .map_err(|e| anyhow::anyhow!("Failed to connect to IPC server. Is it running?\n{e}"))?;
106
107        let (reader, mut writer) = tokio::io::split(conn);
108
109        let mut payload = serde_json::to_string(&IpcRequest::Subscribe)?;
110        payload.push('\n');
111        writer.write_all(payload.as_bytes()).await?;
112        writer.flush().await?;
113
114        let (tx, rx) = mpsc::channel(32);
115
116        tokio::spawn(async move {
117            let mut lines = BufReader::new(reader).lines();
118
119            while let Ok(Some(line)) = lines.next_line().await {
120                let Ok(response) = serde_json::from_str::<IpcResponse>(&line) else {
121                    continue;
122                };
123
124                if let IpcResponse::Event(event) = response
125                    && tx.send(event).await.is_err()
126                {
127                    break;
128                }
129            }
130        });
131
132        Ok(rx)
133    }
134}