use std::io;
use kanade_shared::ipc::error::{ErrorKind, RpcError};
use kanade_shared::ipc::handshake::{HandshakeParams, HandshakeResult, PROTOCOL_V1};
use kanade_shared::ipc::system::{
LogTailParams, LogTailResult, PingParams, PingResult, VersionParams, VersionResult,
};
use tracing::warn;
use super::super::connection::ConnectionState;
pub type HandlerResult<T> = std::result::Result<T, RpcError>;
const SUPPORTED_FEATURES: &[&str] = &[];
const LOG_TAIL_HARD_CAP: u32 = 1000;
pub fn handle_handshake(
conn: &mut ConnectionState,
params: HandshakeParams,
) -> HandlerResult<HandshakeResult> {
if params.protocol.is_empty() {
return Err(RpcError::new(
ErrorKind::InvalidParams,
"handshake.protocol must contain at least one version",
));
}
let agreed = params
.protocol
.iter()
.copied()
.filter(|&v| v == PROTOCOL_V1)
.max();
let Some(agreed) = agreed else {
return Err(RpcError::new(
ErrorKind::StaleProtocol,
format!(
"no overlap with client versions {:?} (agent supports {:?})",
params.protocol,
[PROTOCOL_V1],
),
));
};
conn.mark_handshake(agreed);
Ok(HandshakeResult {
protocol: agreed,
agent_version: conn.agent_version.clone(),
features: SUPPORTED_FEATURES.iter().map(|&s| s.to_string()).collect(),
session: conn.session(),
})
}
pub fn handle_ping(_conn: &ConnectionState, _params: PingParams) -> HandlerResult<PingResult> {
Ok(PingResult {
agent_time: chrono::Utc::now(),
})
}
pub fn handle_version(
conn: &ConnectionState,
_params: VersionParams,
) -> HandlerResult<VersionResult> {
let target = conn
.config_rx
.borrow()
.target_version
.as_deref()
.filter(|s| !s.is_empty())
.map(str::to_owned)
.unwrap_or_else(|| conn.agent_version.clone());
Ok(VersionResult {
agent_version: conn.agent_version.clone(),
target_agent_version: target,
target_client_version: None,
})
}
pub async fn handle_log_tail(
conn: &ConnectionState,
params: LogTailParams,
) -> HandlerResult<LogTailResult> {
let requested = params.lines.min(LOG_TAIL_HARD_CAP);
let truncated = params.lines > LOG_TAIL_HARD_CAP;
let active_path = match crate::logs::locate_active_file(&conn.log_path).await {
Ok(p) => p,
Err(e) => {
warn!(
error = %e,
template = %conn.log_path.display(),
"system.log_tail: log directory missing, returning empty result",
);
return Ok(LogTailResult {
lines: vec![],
truncated,
});
}
};
let body = match tokio::fs::read_to_string(&active_path).await {
Ok(s) => s,
Err(e) if e.kind() == io::ErrorKind::NotFound => {
warn!(
path = %active_path.display(),
"system.log_tail: log file not found, returning empty result",
);
return Ok(LogTailResult {
lines: vec![],
truncated,
});
}
Err(e) => {
return Err(RpcError::new(
ErrorKind::InternalError,
format!("read agent.log ({}): {e}", active_path.display()),
));
}
};
let all: Vec<&str> = body.lines().collect();
let take = (requested as usize).min(all.len());
let tail_start = all.len() - take;
let lines = all[tail_start..].iter().map(|s| s.to_string()).collect();
Ok(LogTailResult { lines, truncated })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::klp::auth::PeerCredentials;
use kanade_shared::ipc::state::StateSnapshot;
use kanade_shared::wire::EffectiveConfig;
use std::path::PathBuf;
use tempfile::NamedTempFile;
use tokio::sync::{mpsc, watch};
fn dummy_snapshot() -> StateSnapshot {
StateSnapshot {
pc_id: "PC1234".into(),
online: true,
vpn: "unknown".into(),
checks: vec![],
agent_version: "0.40.0".into(),
target_version: "0.40.0".into(),
}
}
fn fresh_conn_with(cfg: EffectiveConfig, log_path: PathBuf) -> ConnectionState {
let (_cfg_tx, cfg_rx) = watch::channel(cfg);
let (_state_tx, state_rx) = watch::channel(dummy_snapshot());
let (push_tx, _push_rx) = mpsc::channel(8);
ConnectionState::new(
PeerCredentials {
user: "DOMAIN\\alice".into(),
session_id: 2,
},
"PC1234".into(),
"0.40.0".into(),
cfg_rx,
state_rx,
log_path,
push_tx,
)
}
fn fresh_conn() -> ConnectionState {
fresh_conn_with(
EffectiveConfig::builtin_defaults(),
PathBuf::from("agent.log"),
)
}
#[test]
fn handshake_v1_only_client_succeeds() {
let mut conn = fresh_conn();
let result = handle_handshake(
&mut conn,
HandshakeParams {
client: "kanade-client".into(),
client_version: "0.1.0".into(),
protocol: vec![PROTOCOL_V1],
features: vec![],
},
)
.expect("handshake should succeed");
assert_eq!(result.protocol, PROTOCOL_V1);
assert_eq!(result.agent_version, "0.40.0");
assert_eq!(result.session.user, "DOMAIN\\alice");
assert_eq!(result.session.pc_id, "PC1234");
assert!(conn.handshake_complete());
}
#[test]
fn handshake_picks_highest_mutual_version_from_multi_version_client() {
let mut conn = fresh_conn();
let result = handle_handshake(
&mut conn,
HandshakeParams {
client: "kanade-client".into(),
client_version: "0.2.0".into(),
protocol: vec![1, 2, 3],
features: vec![],
},
)
.expect("handshake should succeed via downshift");
assert_eq!(result.protocol, PROTOCOL_V1);
}
#[test]
fn handshake_rejects_empty_protocol_list() {
let mut conn = fresh_conn();
let err = handle_handshake(
&mut conn,
HandshakeParams {
client: "kanade-client".into(),
client_version: "0.1.0".into(),
protocol: vec![],
features: vec![],
},
)
.expect_err("empty protocol must fail");
let data = err.data.as_ref().expect("data populated");
assert_eq!(data.kind, ErrorKind::InvalidParams);
assert!(!conn.handshake_complete(), "conn must stay pre-handshake");
}
#[test]
fn handshake_rejects_when_no_version_overlap() {
let mut conn = fresh_conn();
let err = handle_handshake(
&mut conn,
HandshakeParams {
client: "kanade-client".into(),
client_version: "9.9.9".into(),
protocol: vec![2, 3],
features: vec![],
},
)
.expect_err("must fail with StaleProtocol");
let data = err.data.as_ref().expect("data populated");
assert_eq!(data.kind, ErrorKind::StaleProtocol);
assert!(!conn.handshake_complete());
}
#[test]
fn ping_returns_recent_agent_time() {
let conn = fresh_conn();
let before = chrono::Utc::now();
let result = handle_ping(&conn, PingParams::default()).unwrap();
let after = chrono::Utc::now();
assert!(
result.agent_time >= before && result.agent_time <= after,
"agent_time {} should be in [{before}, {after}]",
result.agent_time
);
}
#[test]
fn handshake_advertises_no_features_yet() {
let mut conn = fresh_conn();
let result = handle_handshake(
&mut conn,
HandshakeParams {
client: "kanade-client".into(),
client_version: "0.1.0".into(),
protocol: vec![PROTOCOL_V1],
features: vec!["push.notifications".into()],
},
)
.unwrap();
assert!(
result.features.is_empty(),
"no features advertised yet; got {:?}",
result.features,
);
}
#[test]
fn version_falls_back_to_running_when_no_target_set() {
let conn = fresh_conn();
let result = handle_version(&conn, VersionParams::default()).unwrap();
assert_eq!(result.agent_version, "0.40.0");
assert_eq!(result.target_agent_version, "0.40.0");
assert!(result.target_client_version.is_none());
}
#[test]
fn version_returns_distinct_target_when_supervisor_set_one() {
let mut cfg = EffectiveConfig::builtin_defaults();
cfg.target_version = Some("0.42.0".into());
let conn = fresh_conn_with(cfg, PathBuf::from("agent.log"));
let result = handle_version(&conn, VersionParams::default()).unwrap();
assert_eq!(result.agent_version, "0.40.0");
assert_eq!(result.target_agent_version, "0.42.0");
}
#[tokio::test]
async fn log_tail_returns_empty_when_file_missing() {
let tmpdir = tempfile::tempdir().unwrap();
let conn = fresh_conn_with(
EffectiveConfig::builtin_defaults(),
tmpdir.path().join("agent.log"),
);
let result = handle_log_tail(&conn, LogTailParams::default())
.await
.expect("missing file is not an error");
assert!(result.lines.is_empty());
assert!(!result.truncated);
}
#[tokio::test]
async fn log_tail_picks_rotated_file_matching_template() {
let tmpdir = tempfile::tempdir().unwrap();
let template = tmpdir.path().join("agent.log");
let active = tmpdir.path().join("agent.2026-05-24.log");
std::fs::write(&active, "first\nsecond\nthird\n").unwrap();
let conn = fresh_conn_with(EffectiveConfig::builtin_defaults(), template);
let result = handle_log_tail(&conn, LogTailParams { lines: 10 })
.await
.expect("locate_active_file should find the rotated file");
assert_eq!(result.lines, vec!["first", "second", "third"]);
}
#[tokio::test]
async fn log_tail_returns_all_lines_when_file_smaller_than_request() {
let f = NamedTempFile::new().unwrap();
std::fs::write(f.path(), "alpha\nbeta\ngamma\n").unwrap();
let conn = fresh_conn_with(EffectiveConfig::builtin_defaults(), f.path().to_path_buf());
let result = handle_log_tail(
&conn,
LogTailParams {
lines: 100, },
)
.await
.unwrap();
assert_eq!(result.lines, vec!["alpha", "beta", "gamma"]);
assert!(!result.truncated);
}
#[tokio::test]
async fn log_tail_returns_only_last_n_when_file_larger_than_request() {
let f = NamedTempFile::new().unwrap();
let body = (1..=20)
.map(|i| format!("line-{i:02}"))
.collect::<Vec<_>>()
.join("\n");
std::fs::write(f.path(), body).unwrap();
let conn = fresh_conn_with(EffectiveConfig::builtin_defaults(), f.path().to_path_buf());
let result = handle_log_tail(&conn, LogTailParams { lines: 5 })
.await
.unwrap();
assert_eq!(
result.lines,
vec!["line-16", "line-17", "line-18", "line-19", "line-20"]
);
assert!(!result.truncated);
}
#[tokio::test]
async fn log_tail_clamps_to_hard_cap_and_flags_truncated() {
let f = NamedTempFile::new().unwrap();
let body = (1..=(LOG_TAIL_HARD_CAP + 50))
.map(|i| format!("L{i}"))
.collect::<Vec<_>>()
.join("\n");
std::fs::write(f.path(), body).unwrap();
let conn = fresh_conn_with(EffectiveConfig::builtin_defaults(), f.path().to_path_buf());
let result = handle_log_tail(
&conn,
LogTailParams {
lines: LOG_TAIL_HARD_CAP + 100,
},
)
.await
.unwrap();
assert_eq!(result.lines.len(), LOG_TAIL_HARD_CAP as usize);
assert!(result.truncated, "asking past the cap must set truncated");
assert_eq!(
result.lines.last().unwrap(),
&format!("L{}", LOG_TAIL_HARD_CAP + 50)
);
}
#[tokio::test]
async fn log_tail_default_is_200_lines() {
let f = NamedTempFile::new().unwrap();
let body = (1..=500)
.map(|i| format!("L{i}"))
.collect::<Vec<_>>()
.join("\n");
std::fs::write(f.path(), body).unwrap();
let conn = fresh_conn_with(EffectiveConfig::builtin_defaults(), f.path().to_path_buf());
let result = handle_log_tail(&conn, LogTailParams::default())
.await
.unwrap();
assert_eq!(result.lines.len(), 200);
assert!(!result.truncated);
}
}