use std::sync::Arc;
use kanade_shared::ipc::handshake::HandshakeResult;
use kanade_shared::ipc::jobs::{
JobCategory, JobsExecuteResult, JobsKillResult, JobsListParams, JobsListResult,
};
use kanade_shared::ipc::state::StateSnapshot;
use kanade_shared::ipc::system::PingResult;
use tauri::{Emitter, State};
use tokio::sync::Mutex;
use tokio::sync::broadcast::error::RecvError;
use tracing::{info, warn};
use crate::klp_client::KlpClient;
const NOTIFICATION_EVENT: &str = "klp-notification";
const CONNECTED_EVENT: &str = "klp-connected";
const DISCONNECTED_EVENT: &str = "klp-disconnected";
pub struct AppState {
klp: Arc<Mutex<Option<KlpClient>>>,
}
async fn connected_client(state: &State<'_, AppState>) -> Result<KlpClient, String> {
state
.klp
.lock()
.await
.as_ref()
.cloned()
.ok_or_else(|| "agent not connected".to_string())
}
#[tauri::command]
async fn get_handshake(state: State<'_, AppState>) -> Result<HandshakeResult, String> {
let guard = state.klp.lock().await;
match guard.as_ref() {
Some(client) => Ok((*client.handshake()).clone()),
None => Err("agent not connected (pipe unavailable on startup)".into()),
}
}
#[tauri::command]
async fn ping_agent(state: State<'_, AppState>) -> Result<PingResult, String> {
let client = connected_client(&state).await?;
client.ping().await.map_err(|e| e.to_string())
}
#[tauri::command]
async fn state_snapshot(state: State<'_, AppState>) -> Result<StateSnapshot, String> {
let client = connected_client(&state).await?;
client.snapshot().await.map_err(|e| e.to_string())
}
#[tauri::command]
async fn jobs_list(
state: State<'_, AppState>,
category: Option<JobCategory>,
) -> Result<JobsListResult, String> {
let client = connected_client(&state).await?;
client
.jobs_list(&JobsListParams { category })
.await
.map_err(|e| e.to_string())
}
#[tauri::command]
async fn jobs_execute(state: State<'_, AppState>, id: String) -> Result<JobsExecuteResult, String> {
let client = connected_client(&state).await?;
client.jobs_execute(&id).await.map_err(|e| e.to_string())
}
#[tauri::command]
async fn jobs_kill(state: State<'_, AppState>, run_id: String) -> Result<JobsKillResult, String> {
let client = connected_client(&state).await?;
client.jobs_kill(&run_id).await.map_err(|e| e.to_string())
}
fn spawn_notification_forwarder(client: &KlpClient, handle: tauri::AppHandle) {
let mut rx = client.subscribe();
tauri::async_runtime::spawn(async move {
loop {
match rx.recv().await {
Ok(notif) => {
if let Err(e) = handle.emit(NOTIFICATION_EVENT, notif) {
warn!(error = %e, "klp notification emit failed");
}
}
Err(RecvError::Lagged(skipped)) => {
warn!(skipped, "klp notification forwarder lagged; dropped pushes");
}
Err(RecvError::Closed) => {
info!("klp notification forwarder: connection closed, exiting");
return;
}
}
}
});
}
async fn supervise_connection(slot: Arc<Mutex<Option<KlpClient>>>, handle: tauri::AppHandle) {
loop {
let client = loop {
match KlpClient::connect().await {
Ok(c) => break c,
Err(e) => {
warn!(error = %e, "KLP connect failed; retrying in 5s");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
};
info!(
agent_version = %client.handshake().agent_version,
"KLP client ready",
);
spawn_notification_forwarder(&client, handle.clone());
*slot.lock().await = Some(client.clone());
if let Err(e) = handle.emit(CONNECTED_EVENT, ()) {
warn!(error = %e, "klp-connected emit failed");
}
client.wait_closed().await;
warn!("KLP connection lost; reconnecting");
*slot.lock().await = None;
if let Err(e) = handle.emit(DISCONNECTED_EVENT, ()) {
warn!(error = %e, "klp-disconnected emit failed");
}
}
}
pub fn run() {
let state = AppState {
klp: Arc::new(Mutex::new(None)),
};
let klp_slot = state.klp.clone();
tauri::Builder::default()
.manage(state)
.invoke_handler(tauri::generate_handler![
get_handshake,
ping_agent,
state_snapshot,
jobs_list,
jobs_execute,
jobs_kill
])
.setup(move |app| {
tauri::async_runtime::spawn(supervise_connection(
klp_slot.clone(),
app.handle().clone(),
));
Ok(())
})
.run(tauri::generate_context!())
.expect("error while running kanade-client tauri application");
}