#![cfg(target_os = "windows")]
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::sync::mpsc;
use tracing::{info, warn};
use crate::env_gate::set_console_idle;
use crate::process_as_user::{active_console_session, read_lines, spawn_session_agent_child};
const POLL_INTERVAL: Duration = Duration::from_secs(3);
const BACKOFF_MIN: Duration = Duration::from_secs(2);
const BACKOFF_MAX: Duration = Duration::from_secs(60);
pub async fn run(exe: PathBuf) {
let mut backoff = BACKOFF_MIN;
loop {
match active_console_session() {
None => {
set_console_idle(None);
tokio::time::sleep(POLL_INTERVAL).await;
}
Some(session) => {
let clean = pump_one(&exe, session).await;
set_console_idle(None);
if clean {
backoff = BACKOFF_MIN; } else {
warn!(
target: "kanade_agent::session_supervisor",
backoff_s = backoff.as_secs(),
"session-agent ended unexpectedly — backing off before respawn",
);
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(BACKOFF_MAX);
}
}
}
}
}
async fn pump_one(exe: &Path, session: u32) -> bool {
let exe_owned = exe.to_path_buf();
let mut child = match tokio::task::spawn_blocking(move || spawn_session_agent_child(&exe_owned))
.await
{
Ok(Ok(c)) => c,
Ok(Err(e)) => {
warn!(target: "kanade_agent::session_supervisor", error = %e, "spawn session-agent failed");
return false;
}
Err(e) => {
warn!(target: "kanade_agent::session_supervisor", error = %e, "spawn join failed");
return false;
}
};
info!(target: "kanade_agent::session_supervisor", session, "session-agent started");
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let Some(stdout) = child.take_stdout() else {
return false;
};
let reader = tokio::task::spawn_blocking(move || {
read_lines(stdout, |line| {
let _ = tx.send(line.to_string());
});
});
let mut ended_cleanly = false;
loop {
tokio::select! {
line = rx.recv() => match line {
Some(l) => set_console_idle(parse_idle_ms(&l).map(Duration::from_millis)),
None => break, },
_ = tokio::time::sleep(POLL_INTERVAL) => {
if active_console_session() != Some(session) {
info!(
target: "kanade_agent::session_supervisor",
"console session changed/ended — stopping session-agent",
);
ended_cleanly = true;
break;
}
}
}
}
child.terminate();
let _ = reader.await;
ended_cleanly
}
fn parse_idle_ms(line: &str) -> Option<u64> {
let v: serde_json::Value = serde_json::from_str(line.trim()).ok()?;
v.get("idle_ms")?.as_u64()
}
#[cfg(test)]
mod tests {
use super::parse_idle_ms;
#[test]
fn parses_idle_ms_and_ignores_junk() {
assert_eq!(parse_idle_ms(r#"{"idle_ms":2500}"#), Some(2500));
assert_eq!(parse_idle_ms(r#" {"idle_ms":0} "#), Some(0));
assert_eq!(parse_idle_ms(r#"{"idle_ms":null}"#), None);
assert_eq!(parse_idle_ms("not json"), None);
assert_eq!(parse_idle_ms(""), None);
assert_eq!(parse_idle_ms(r#"{"other":1}"#), None);
}
}