sidedns_core/ipc/
client.rs1use interprocess::local_socket::{GenericFilePath, ToFsName, tokio::prelude::*};
2use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
3use tokio::sync::mpsc;
4
5use super::IpcError;
6use super::message::{DnsEvent, IpcRequest, IpcResponse};
7use anyhow::Result;
8
9pub struct IpcClient {
29 socket_path: String,
30}
31
32impl Default for IpcClient {
33 fn default() -> Self {
34 Self {
35 socket_path: crate::IPC_SOCKET_PATH.to_string(),
36 }
37 }
38}
39
40impl IpcClient {
41 pub fn with_path(path: impl Into<String>) -> Self {
46 Self {
47 socket_path: path.into(),
48 }
49 }
50
51 pub async fn is_running(&self) -> bool {
55 let Ok(name) = self.socket_path.as_str().to_fs_name::<GenericFilePath>() else {
56 return false;
57 };
58 LocalSocketStream::connect(name).await.is_ok()
59 }
60
61 pub async fn send(&self, request: IpcRequest) -> Result<IpcResponse> {
71 let name = self.socket_path.as_str().to_fs_name::<GenericFilePath>()?;
72 let conn = LocalSocketStream::connect(name)
73 .await
74 .map_err(|e| anyhow::anyhow!("Failed to connect to IPC server. Is it running?\n{e}"))?;
75
76 let (reader, mut writer) = tokio::io::split(conn);
77
78 let mut payload = serde_json::to_string(&request)?;
79 payload.push('\n');
80 writer.write_all(payload.as_bytes()).await?;
81 writer.flush().await?;
82
83 let mut lines = BufReader::new(reader).lines();
84 let line = lines.next_line().await?.ok_or(IpcError::ConnectionClosed)?;
85
86 Ok(serde_json::from_str(&line)?)
87 }
88
89 pub async fn subscribe(&self) -> Result<mpsc::Receiver<DnsEvent>> {
102 let name = self.socket_path.as_str().to_fs_name::<GenericFilePath>()?;
103 let conn = LocalSocketStream::connect(name)
104 .await
105 .map_err(|e| anyhow::anyhow!("Failed to connect to IPC server. Is it running?\n{e}"))?;
106
107 let (reader, mut writer) = tokio::io::split(conn);
108
109 let mut payload = serde_json::to_string(&IpcRequest::Subscribe)?;
110 payload.push('\n');
111 writer.write_all(payload.as_bytes()).await?;
112 writer.flush().await?;
113
114 let (tx, rx) = mpsc::channel(32);
115
116 tokio::spawn(async move {
117 let mut lines = BufReader::new(reader).lines();
118
119 while let Ok(Some(line)) = lines.next_line().await {
120 let Ok(response) = serde_json::from_str::<IpcResponse>(&line) else {
121 continue;
122 };
123
124 if let IpcResponse::Event(event) = response
125 && tx.send(event).await.is_err()
126 {
127 break;
128 }
129 }
130 });
131
132 Ok(rx)
133 }
134}