use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[allow(unused_imports)]
use tracing::{debug, info, warn};
#[derive(Debug, Serialize, Deserialize)]
pub enum IpcCommand {
EnableWormhole {
code: String,
password: Option<String>,
persistent: bool,
},
DisableWormhole,
GetStatus,
Shutdown,
}
pub enum InternalCommand {
EnableWormhole {
code: String,
password: Option<String>,
persistent: bool,
tx: tokio::sync::oneshot::Sender<IpcResponse>,
},
DisableWormhole {
tx: tokio::sync::oneshot::Sender<IpcResponse>,
},
GetStatus {
tx: tokio::sync::oneshot::Sender<IpcResponse>,
},
Shutdown {
tx: tokio::sync::oneshot::Sender<IpcResponse>,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub enum IpcResponse {
Ok,
Error(String),
Status {
wormhole_active: bool,
wormhole_code: Option<String>,
active_sessions: usize,
},
}
#[derive(Debug, thiserror::Error)]
pub enum IpcError {
#[error("failed to bind IPC socket at {path}")]
#[allow(dead_code)]
BindFailed {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("IPC I/O error")]
Io(#[from] std::io::Error),
#[error("IPC message serialization failed")]
Serialization(#[from] serde_json::Error),
}
pub struct IpcServer {
#[allow(dead_code)]
state_dir: PathBuf,
control_tx: tokio::sync::mpsc::Sender<InternalCommand>,
}
impl IpcServer {
pub fn new(state_dir: PathBuf, control_tx: tokio::sync::mpsc::Sender<InternalCommand>) -> Self {
Self {
state_dir,
control_tx,
}
}
fn socket_path(&self) -> PathBuf {
#[cfg(unix)]
{
self.state_dir.join("irosh.sock")
}
#[cfg(windows)]
{
use std::hash::{Hash, Hasher};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
self.state_dir.hash(&mut hasher);
let hash = hasher.finish();
PathBuf::from(format!(r"\\.\pipe\irosh-service-{:x}", hash))
}
}
pub async fn run(self) -> std::result::Result<(), IpcError> {
let path = self.socket_path();
#[cfg(unix)]
{
if path.exists() {
let _ = std::fs::remove_file(&path);
}
let listener =
tokio::net::UnixListener::bind(&path).map_err(|e| IpcError::BindFailed {
path: path.clone(),
source: e,
})?;
info!("IPC listener active at {}", path.display());
loop {
match listener.accept().await {
Ok((mut stream, _)) => {
let tx = self.control_tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_ipc_connection(&mut stream, tx).await {
debug!("IPC connection error: {}", e);
}
});
}
Err(e) => {
warn!("IPC accept error: {}", e);
}
}
}
}
#[cfg(windows)]
{
use tokio::net::windows::named_pipe::ServerOptions;
info!("IPC listener active at {}", path.display());
loop {
let mut server = ServerOptions::new()
.first_pipe_instance(true)
.create(&*path.to_string_lossy())?;
server.connect().await?;
let tx = self.control_tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_ipc_connection(&mut server, tx).await {
debug!("IPC connection error: {}", e);
}
});
}
}
}
}
async fn handle_ipc_connection<S>(
stream: &mut S,
control_tx: tokio::sync::mpsc::Sender<InternalCommand>,
) -> std::result::Result<(), IpcError>
where
S: AsyncReadExt + AsyncWriteExt + Unpin,
{
let mut buf = Vec::new();
stream.take(1024 * 64).read_to_end(&mut buf).await?;
let command: IpcCommand = serde_json::from_slice(&buf)?;
debug!("Received IPC command: {:?}", command);
let (res_tx, res_rx) = tokio::sync::oneshot::channel();
let internal_cmd = match command {
IpcCommand::EnableWormhole {
code,
password,
persistent,
} => InternalCommand::EnableWormhole {
code,
password,
persistent,
tx: res_tx,
},
IpcCommand::DisableWormhole => InternalCommand::DisableWormhole { tx: res_tx },
IpcCommand::GetStatus => InternalCommand::GetStatus { tx: res_tx },
IpcCommand::Shutdown => InternalCommand::Shutdown { tx: res_tx },
};
let response = if control_tx.send(internal_cmd).await.is_ok() {
res_rx.await.unwrap_or(IpcResponse::Error(
"Server failed to provide a response".to_string(),
))
} else {
IpcResponse::Error("Server control channel closed".to_string())
};
let res_buf = serde_json::to_vec(&response)?;
stream.write_all(&res_buf).await?;
stream.flush().await?;
Ok(())
}