use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_util::sync::CancellationToken;
use koi_mdns::MdnsCore;
use super::dispatch;
const SESSION_GRACE: Duration = Duration::from_secs(30);
pub async fn start(
core: Arc<MdnsCore>,
path: std::path::PathBuf,
cancel: CancellationToken,
) -> anyhow::Result<()> {
#[cfg(unix)]
{
start_unix(core, path, cancel).await
}
#[cfg(windows)]
{
start_windows(core, path, cancel).await
}
}
#[cfg(unix)]
async fn start_unix(
core: Arc<MdnsCore>,
path: std::path::PathBuf,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let _ = std::fs::remove_file(&path);
let listener = tokio::net::UnixListener::bind(&path)?;
tracing::info!(path = %path.display(), "IPC adapter listening (Unix socket)");
loop {
tokio::select! {
result = listener.accept() => {
let (stream, _addr) = result?;
let core = core.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(core, stream).await {
tracing::warn!(error = %e, "IPC connection error");
}
});
}
_ = cancel.cancelled() => break,
}
}
tracing::debug!("IPC adapter stopped (Unix)");
Ok(())
}
#[cfg(unix)]
async fn handle_connection(
core: Arc<MdnsCore>,
stream: tokio::net::UnixStream,
) -> anyhow::Result<()> {
let session_id = dispatch::new_session_id();
let (reader, mut writer) = stream.into_split();
let reader = BufReader::new(reader);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
dispatch::handle_line(&core, &session_id, &line, SESSION_GRACE, &mut writer).await?;
}
core.session_disconnected(&session_id);
Ok(())
}
#[cfg(windows)]
async fn start_windows(
core: Arc<MdnsCore>,
path: std::path::PathBuf,
cancel: CancellationToken,
) -> anyhow::Result<()> {
use tokio::net::windows::named_pipe::ServerOptions;
let pipe_name = path.to_string_lossy();
tracing::info!(pipe = %pipe_name, "IPC adapter listening (Named Pipe)");
loop {
let server = ServerOptions::new()
.first_pipe_instance(false)
.create(pipe_name.as_ref())?;
tokio::select! {
result = server.connect() => {
result?;
let core = core.clone();
tokio::spawn(async move {
if let Err(e) = handle_pipe_connection(core, server).await {
tracing::warn!(error = %e, "IPC pipe connection error");
}
});
}
_ = cancel.cancelled() => break,
}
}
tracing::debug!("IPC adapter stopped (Named Pipe)");
Ok(())
}
#[cfg(windows)]
async fn handle_pipe_connection(
core: Arc<MdnsCore>,
pipe: tokio::net::windows::named_pipe::NamedPipeServer,
) -> anyhow::Result<()> {
let session_id = dispatch::new_session_id();
let (reader, mut writer) = tokio::io::split(pipe);
let reader = BufReader::new(reader);
let mut lines = reader.lines();
while let Some(line) = lines.next_line().await? {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
dispatch::handle_line(&core, &session_id, &line, SESSION_GRACE, &mut writer).await?;
}
core.session_disconnected(&session_id);
Ok(())
}