use kanade_shared::ipc::envelope::{RpcRequest, RpcResponse, decode_params};
use kanade_shared::ipc::error::{ErrorKind, RpcError};
use kanade_shared::ipc::handshake::HandshakeParams;
use kanade_shared::ipc::method;
use kanade_shared::ipc::state::{
StateSnapshotParams, StateSubscribeParams, StateUnsubscribeParams,
};
use kanade_shared::ipc::system::{LogTailParams, PingParams, VersionParams};
use tracing::warn;
use super::connection::ConnectionState;
use super::handlers;
pub async fn dispatch_request(conn: &mut ConnectionState, req: RpcRequest) -> RpcResponse {
let result = dispatch_inner(conn, &req).await;
match result {
Ok(value) => RpcResponse {
jsonrpc: kanade_shared::ipc::envelope::JSONRPC_VERSION.to_string(),
id: Some(req.id),
payload: kanade_shared::ipc::envelope::RpcResponsePayload::Ok { result: value },
},
Err(err) => RpcResponse::err(req.id, err),
}
}
async fn dispatch_inner(
conn: &mut ConnectionState,
req: &RpcRequest,
) -> std::result::Result<serde_json::Value, RpcError> {
if !conn.handshake_complete() && req.method != method::SYSTEM_HANDSHAKE {
return Err(RpcError::new(
ErrorKind::InvalidRequest,
format!(
"method '{}' requires handshake; call system.handshake first",
req.method
),
));
}
match req.method.as_str() {
method::SYSTEM_HANDSHAKE => {
let params: HandshakeParams =
serde_json::from_value(req.params.clone()).map_err(invalid_params)?;
let result = handlers::system::handle_handshake(conn, params)?;
serde_json::to_value(&result).map_err(internal)
}
method::SYSTEM_PING => {
let params: PingParams = decode_params(req.params.clone()).map_err(invalid_params)?;
let result = handlers::system::handle_ping(conn, params)?;
serde_json::to_value(&result).map_err(internal)
}
method::SYSTEM_VERSION => {
let params: VersionParams =
decode_params(req.params.clone()).map_err(invalid_params)?;
let result = handlers::system::handle_version(conn, params)?;
serde_json::to_value(&result).map_err(internal)
}
method::SYSTEM_LOG_TAIL => {
let params: LogTailParams =
decode_params(req.params.clone()).map_err(invalid_params)?;
let result = handlers::system::handle_log_tail(conn, params).await?;
serde_json::to_value(&result).map_err(internal)
}
method::STATE_SNAPSHOT => {
let params: StateSnapshotParams =
decode_params(req.params.clone()).map_err(invalid_params)?;
let result = handlers::state::handle_state_snapshot(conn, params)?;
serde_json::to_value(&result).map_err(internal)
}
method::STATE_SUBSCRIBE => {
let params: StateSubscribeParams =
decode_params(req.params.clone()).map_err(invalid_params)?;
let result = handlers::state::handle_state_subscribe(conn, params)?;
serde_json::to_value(&result).map_err(internal)
}
method::STATE_UNSUBSCRIBE => {
let params: StateUnsubscribeParams =
serde_json::from_value(req.params.clone()).map_err(invalid_params)?;
handlers::state::handle_state_unsubscribe(conn, params)?;
Ok(serde_json::Value::Null)
}
unknown => {
warn!(
method = %unknown,
pc_id = %conn.pc_id,
user = %conn.peer.user,
"KLP method not yet implemented in this PR",
);
Err(RpcError::new(
ErrorKind::MethodNotFound,
format!("method '{unknown}' is not implemented in this agent build"),
))
}
}
}
fn invalid_params(e: serde_json::Error) -> RpcError {
RpcError::new(ErrorKind::InvalidParams, e.to_string())
}
fn internal(e: serde_json::Error) -> RpcError {
RpcError::new(ErrorKind::InternalError, e.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::klp::auth::PeerCredentials;
use kanade_shared::ipc::envelope::RpcResponsePayload;
use kanade_shared::ipc::handshake::PROTOCOL_V1;
use kanade_shared::ipc::state::StateSnapshot;
use kanade_shared::wire::EffectiveConfig;
use std::path::PathBuf;
use tokio::sync::{mpsc, watch};
fn dummy_snapshot() -> StateSnapshot {
StateSnapshot {
pc_id: "PC1234".into(),
online: true,
vpn: "unknown".into(),
checks: vec![],
agent_version: "0.41.0".into(),
target_version: "0.41.0".into(),
}
}
fn fresh_conn() -> ConnectionState {
let (_cfg_tx, cfg_rx) = watch::channel(EffectiveConfig::builtin_defaults());
let (_state_tx, state_rx) = watch::channel(dummy_snapshot());
let (push_tx, _push_rx) = mpsc::channel(8);
ConnectionState::new(
PeerCredentials {
user: "DOMAIN\\alice".into(),
session_id: 2,
},
"PC1234".into(),
"0.41.0".into(),
cfg_rx,
state_rx,
PathBuf::from("agent.log"),
push_tx,
)
}
fn handshake_req() -> RpcRequest {
RpcRequest::new(
"h1",
method::SYSTEM_HANDSHAKE,
&HandshakeParams {
client: "kanade-client".into(),
client_version: "0.1.0".into(),
protocol: vec![PROTOCOL_V1],
features: vec![],
},
)
.unwrap()
}
#[tokio::test]
async fn pre_handshake_ping_returns_invalid_request() {
let mut conn = fresh_conn();
let req = RpcRequest {
jsonrpc: kanade_shared::ipc::envelope::JSONRPC_VERSION.to_string(),
id: "p1".into(),
method: method::SYSTEM_PING.to_string(),
params: serde_json::Value::Null,
};
let resp = dispatch_request(&mut conn, req).await;
match resp.payload {
RpcResponsePayload::Err { error } => {
let data = error.data.expect("data populated");
assert_eq!(data.kind, ErrorKind::InvalidRequest);
assert!(data.detail.contains("requires handshake"));
}
other => panic!("expected Err, got {other:?}"),
}
}
#[tokio::test]
async fn handshake_then_ping_succeeds() {
let mut conn = fresh_conn();
let h_resp = dispatch_request(&mut conn, handshake_req()).await;
assert!(
matches!(h_resp.payload, RpcResponsePayload::Ok { .. }),
"handshake should succeed: {:?}",
h_resp.payload,
);
assert!(conn.handshake_complete());
let p_req = RpcRequest {
jsonrpc: kanade_shared::ipc::envelope::JSONRPC_VERSION.to_string(),
id: "p1".into(),
method: method::SYSTEM_PING.to_string(),
params: serde_json::Value::Null,
};
let p_resp = dispatch_request(&mut conn, p_req).await;
match p_resp.payload {
RpcResponsePayload::Ok { result } => {
assert!(result.get("agent_time").is_some(), "wire: {result}");
}
other => panic!("expected Ok, got {other:?}"),
}
}
#[tokio::test]
async fn unknown_method_returns_method_not_found() {
let mut conn = fresh_conn();
let _ = dispatch_request(&mut conn, handshake_req()).await;
let req = RpcRequest {
jsonrpc: kanade_shared::ipc::envelope::JSONRPC_VERSION.to_string(),
id: "u1".into(),
method: "jobs.list".to_string(),
params: serde_json::Value::Null,
};
let resp = dispatch_request(&mut conn, req).await;
match resp.payload {
RpcResponsePayload::Err { error } => {
let data = error.data.expect("data populated");
assert_eq!(data.kind, ErrorKind::MethodNotFound);
}
other => panic!("expected Err, got {other:?}"),
}
}
#[tokio::test]
async fn handshake_with_wrong_params_returns_invalid_params() {
let mut conn = fresh_conn();
let req = RpcRequest {
jsonrpc: kanade_shared::ipc::envelope::JSONRPC_VERSION.to_string(),
id: "h1".into(),
method: method::SYSTEM_HANDSHAKE.to_string(),
params: serde_json::json!(["wrong", "shape"]),
};
let resp = dispatch_request(&mut conn, req).await;
match resp.payload {
RpcResponsePayload::Err { error } => {
let data = error.data.expect("data populated");
assert_eq!(data.kind, ErrorKind::InvalidParams);
}
other => panic!("expected Err, got {other:?}"),
}
}
#[tokio::test]
async fn response_id_correlates_to_request_id() {
let mut conn = fresh_conn();
let mut req = handshake_req();
req.id = "correlation-test-123".into();
let resp = dispatch_request(&mut conn, req).await;
assert_eq!(resp.id.as_deref(), Some("correlation-test-123"));
}
}