use interprocess::local_socket::{GenericFilePath, ToFsName, tokio::prelude::*};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::sync::mpsc;
use super::IpcError;
use super::message::{DnsEvent, IpcRequest, IpcResponse};
use anyhow::Result;
pub struct IpcClient {
socket_path: String,
}
impl Default for IpcClient {
fn default() -> Self {
Self {
socket_path: crate::IPC_SOCKET_PATH.to_string(),
}
}
}
impl IpcClient {
pub fn with_path(path: impl Into<String>) -> Self {
Self {
socket_path: path.into(),
}
}
pub async fn is_running(&self) -> bool {
let Ok(name) = self.socket_path.as_str().to_fs_name::<GenericFilePath>() else {
return false;
};
LocalSocketStream::connect(name).await.is_ok()
}
pub async fn send(&self, request: IpcRequest) -> Result<IpcResponse> {
let name = self.socket_path.as_str().to_fs_name::<GenericFilePath>()?;
let conn = LocalSocketStream::connect(name)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to IPC server. Is it running?\n{e}"))?;
let (reader, mut writer) = tokio::io::split(conn);
let mut payload = serde_json::to_string(&request)?;
payload.push('\n');
writer.write_all(payload.as_bytes()).await?;
writer.flush().await?;
let mut lines = BufReader::new(reader).lines();
let line = lines.next_line().await?.ok_or(IpcError::ConnectionClosed)?;
Ok(serde_json::from_str(&line)?)
}
pub async fn subscribe(&self) -> Result<mpsc::Receiver<DnsEvent>> {
let name = self.socket_path.as_str().to_fs_name::<GenericFilePath>()?;
let conn = LocalSocketStream::connect(name)
.await
.map_err(|e| anyhow::anyhow!("Failed to connect to IPC server. Is it running?\n{e}"))?;
let (reader, mut writer) = tokio::io::split(conn);
let mut payload = serde_json::to_string(&IpcRequest::Subscribe)?;
payload.push('\n');
writer.write_all(payload.as_bytes()).await?;
writer.flush().await?;
let (tx, rx) = mpsc::channel(32);
tokio::spawn(async move {
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
let Ok(response) = serde_json::from_str::<IpcResponse>(&line) else {
continue;
};
if let IpcResponse::Event(event) = response
&& tx.send(event).await.is_err()
{
break;
}
}
});
Ok(rx)
}
}