use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result};
use kanade_shared::ipc::envelope::{RpcMessage, RpcResponse};
use kanade_shared::ipc::error::{ErrorKind, RpcError};
use kanade_shared::ipc::state::StateSnapshot;
use kanade_shared::wire::EffectiveConfig;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
use tokio::sync::{mpsc, watch};
use tracing::{debug, info, warn};
use crate::klp::auth::resolve_peer;
use crate::klp::connection::ConnectionState;
use crate::klp::dispatcher::dispatch_request;
use crate::klp::framing::{read_frame, write_frame};
use crate::klp::security::PipeSecurity;
pub const PIPE_NAME: &str = r"\\.\pipe\kanade-agent";
const PUSH_CHANNEL_CAPACITY: usize = 64;
#[derive(Clone)]
pub struct ListenerContext {
pub pc_id: Arc<str>,
pub agent_version: Arc<str>,
pub config_rx: watch::Receiver<EffectiveConfig>,
pub state_rx: watch::Receiver<StateSnapshot>,
pub log_path: PathBuf,
}
pub fn spawn(ctx: ListenerContext) -> tokio::task::JoinHandle<Result<()>> {
tokio::spawn(run(ctx))
}
async fn run(ctx: ListenerContext) -> Result<()> {
let security = PipeSecurity::new().context("build KLP pipe SECURITY_DESCRIPTOR")?;
let mut server = unsafe {
ServerOptions::new()
.first_pipe_instance(true)
.create_with_security_attributes_raw(PIPE_NAME, security.as_ptr())
}
.with_context(|| format!("create Named Pipe {PIPE_NAME}"))?;
info!(
pipe = PIPE_NAME,
sd = "Authenticated Users RW, deny Anonymous",
"KLP listener ready",
);
loop {
if let Err(e) = server.connect().await {
warn!(error = %e, "KLP server.connect() failed; reseating listener");
server = create_with_retry(&security).await;
continue;
}
let next = create_with_retry(&security).await;
let connected = std::mem::replace(&mut server, next);
let task_ctx = ctx.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(connected, task_ctx).await {
warn!(error = %e, "KLP connection task failed");
}
});
}
}
async fn create_with_retry(security: &PipeSecurity) -> NamedPipeServer {
let mut delay_ms: u64 = 200;
loop {
let result = unsafe {
ServerOptions::new().create_with_security_attributes_raw(PIPE_NAME, security.as_ptr())
};
match result {
Ok(server) => return server,
Err(e) => {
warn!(
error = %e,
delay_ms,
pipe = PIPE_NAME,
"KLP create() failed; backing off and retrying",
);
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(30_000);
}
}
}
}
async fn handle_connection(pipe: NamedPipeServer, ctx: ListenerContext) -> Result<()> {
let peer = match resolve_peer(&pipe) {
Ok(p) => p,
Err(e) => {
warn!(error = %e, "KLP peer auth failed; closing connection");
return Ok(());
}
};
debug!(
user = %peer.user,
session_id = peer.session_id,
"KLP peer connected",
);
let (reader, writer) = tokio::io::split(pipe);
let (push_tx, push_rx) = mpsc::channel::<Vec<u8>>(PUSH_CHANNEL_CAPACITY);
let writer_log_pc = ctx.pc_id.to_string();
let writer_handle = tokio::spawn(writer_task(writer, push_rx, writer_log_pc));
let mut conn = ConnectionState::new(
peer,
ctx.pc_id.to_string(),
ctx.agent_version.to_string(),
ctx.config_rx.clone(),
ctx.state_rx.clone(),
ctx.log_path.clone(),
push_tx.clone(),
);
let read_loop_result = run_read_loop(reader, &mut conn, &push_tx).await;
drop(push_tx);
drop(conn);
let _ = writer_handle.await;
read_loop_result
}
async fn run_read_loop(
mut reader: ReadHalf<NamedPipeServer>,
conn: &mut ConnectionState,
push_tx: &mpsc::Sender<Vec<u8>>,
) -> Result<()> {
loop {
let frame = match read_frame(&mut reader).await {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
debug!(user = %conn.peer.user, "KLP client disconnected (EOF)");
return Ok(());
}
Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
warn!(error = %e, "KLP oversize frame; closing connection");
let _ =
push_anonymous_error(push_tx, ErrorKind::PayloadTooLarge, &e.to_string()).await;
return Ok(());
}
Err(e) => {
debug!(
error = %e,
user = %conn.peer.user,
"KLP connection torn down by I/O error",
);
return Ok(());
}
};
let msg: RpcMessage = match serde_json::from_slice(&frame) {
Ok(m) => m,
Err(e) => {
warn!(error = %e, "KLP JSON parse error");
let _ = push_anonymous_error(push_tx, ErrorKind::ParseError, &e.to_string()).await;
continue;
}
};
match msg {
RpcMessage::Request(req) => {
let resp = dispatch_request(conn, req).await;
let body = serde_json::to_vec(&resp).context("encode RpcResponse")?;
if push_tx.send(body).await.is_err() {
debug!(user = %conn.peer.user, "KLP push channel closed, exiting read loop");
return Ok(());
}
}
RpcMessage::Notification(notif) => {
debug!(method = %notif.method, "KLP notification received (no response)");
}
RpcMessage::Response(resp) => {
debug!(id = ?resp.id, "KLP unexpected client → agent response, ignoring");
}
}
}
}
async fn writer_task(
mut writer: WriteHalf<NamedPipeServer>,
mut push_rx: mpsc::Receiver<Vec<u8>>,
pc_id_for_log: String,
) {
while let Some(body) = push_rx.recv().await {
if let Err(e) = write_frame(&mut writer, &body).await {
warn!(
error = %e,
pc_id = %pc_id_for_log,
"KLP writer: pipe broken, exiting",
);
return;
}
}
debug!(pc_id = %pc_id_for_log, "KLP writer: push channel closed, exiting");
}
async fn push_anonymous_error(
push_tx: &mpsc::Sender<Vec<u8>>,
kind: ErrorKind,
detail: &str,
) -> Result<()> {
let err = RpcError::new(kind, detail);
let resp = RpcResponse::err_anonymous(err);
let body = serde_json::to_vec(&resp).context("encode anonymous error response")?;
push_tx
.send(body)
.await
.map_err(|_| anyhow::anyhow!("KLP push channel closed"))?;
Ok(())
}