use std::fs;
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};
use netsky_core::consts::{AGENT0_NAME, CLONE_PREFIX, MCP_CHANNEL_DIR_PREFIX};
use netsky_core::envelope::{Envelope, write_envelope};
use netsky_core::paths::{assert_no_symlink_under, home};
use netsky_db::SessionEvent;
use netsky_sh::tmux;
use crate::observability;
const GRACEFUL_TIMEOUT: Duration = Duration::from_secs(60);
const GRACEFUL_POLL: Duration = Duration::from_secs(1);
const SHUTDOWN_KIND: &str = "shutdown";
const SHUTDOWN_ACK_KIND: &str = "shutdown_ack";
pub fn run(force: bool) -> netsky_core::Result<()> {
let sessions = RealSessions;
let channel = FsShutdownChannel::new(home());
run_with(&sessions, &channel, force, GRACEFUL_TIMEOUT, GRACEFUL_POLL)
}
fn run_with<S, C>(
sessions: &S,
channel: &C,
force: bool,
timeout: Duration,
poll: Duration,
) -> netsky_core::Result<()>
where
S: SessionOps,
C: ShutdownChannel,
{
let targets = filter_agent_sessions(sessions.list_sessions());
if targets.is_empty() {
observability::record_session(AGENT0_NAME, 0, SessionEvent::Down);
println!("[netsky down] no agent sessions running");
return Ok(());
}
if force {
force_kill(sessions, &targets)?;
observability::record_session(AGENT0_NAME, 0, SessionEvent::Down);
return Ok(());
}
let requests = send_shutdown_requests(channel, &targets)?;
let outcomes = wait_for_graceful_closes(sessions, channel, &requests, timeout, poll);
for outcome in outcomes {
let name = outcome.agent;
match outcome.outcome {
ShutdownOutcome::Closed => {
println!("[netsky down] '{name}' closed gracefully");
}
ShutdownOutcome::Acked => {
println!("[netsky down] '{name}' acked shutdown; killing remaining tmux session");
sessions.kill_session(&name)?;
println!("[netsky down] killed '{name}' after shutdown ack");
}
ShutdownOutcome::TimedOut => {
println!(
"[netsky down] '{name}' did not close within {}s; forcing kill",
timeout.as_secs()
);
sessions.kill_session(&name)?;
println!("[netsky down] killed '{name}' after graceful timeout");
}
}
}
observability::record_session(AGENT0_NAME, 0, SessionEvent::Down);
Ok(())
}
fn send_shutdown_requests<C: ShutdownChannel>(
channel: &C,
targets: &[String],
) -> netsky_core::Result<Vec<ShutdownRequest>> {
targets
.iter()
.map(|name| {
let request_id = format!(
"shutdown-{name}-{}",
chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
);
channel.send_shutdown(name, &request_id)?;
println!("[netsky down] requested graceful shutdown for '{name}'");
Ok(ShutdownRequest {
agent: name.clone(),
request_id,
})
})
.collect()
}
fn force_kill<S: SessionOps>(sessions: &S, targets: &[String]) -> netsky_core::Result<()> {
for name in targets {
sessions.kill_session(name)?;
println!("[netsky down] killed '{name}'");
}
Ok(())
}
fn wait_for_graceful_closes<S, C>(
sessions: &S,
channel: &C,
requests: &[ShutdownRequest],
timeout: Duration,
poll: Duration,
) -> Vec<AgentShutdownOutcome>
where
S: SessionOps,
C: ShutdownChannel,
{
let deadline = Instant::now() + timeout;
let mut outcomes: Vec<Option<ShutdownOutcome>> = vec![None; requests.len()];
loop {
for (idx, request) in requests.iter().enumerate() {
if outcomes[idx].is_some() {
continue;
}
if !sessions.has_session(&request.agent) {
outcomes[idx] = Some(ShutdownOutcome::Closed);
continue;
}
if channel.has_shutdown_ack(&request.agent, &request.request_id) {
outcomes[idx] = Some(ShutdownOutcome::Acked);
}
}
if outcomes.iter().all(Option::is_some) {
break;
}
if Instant::now() >= deadline {
for outcome in &mut outcomes {
if outcome.is_none() {
*outcome = Some(ShutdownOutcome::TimedOut);
}
}
break;
}
thread::sleep(poll);
}
requests
.iter()
.zip(outcomes)
.map(|(request, outcome)| AgentShutdownOutcome {
agent: request.agent.clone(),
outcome: outcome.expect("shutdown outcome must be set"),
})
.collect()
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ShutdownRequest {
agent: String,
request_id: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct AgentShutdownOutcome {
agent: String,
outcome: ShutdownOutcome,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ShutdownOutcome {
Closed,
Acked,
TimedOut,
}
trait SessionOps {
fn list_sessions(&self) -> Vec<String>;
fn has_session(&self, name: &str) -> bool;
fn kill_session(&self, name: &str) -> netsky_core::Result<()>;
}
struct RealSessions;
impl SessionOps for RealSessions {
fn list_sessions(&self) -> Vec<String> {
tmux::list_sessions()
}
fn has_session(&self, name: &str) -> bool {
tmux::has_session(name)
}
fn kill_session(&self, name: &str) -> netsky_core::Result<()> {
tmux::kill_session(name).map_err(Into::into)
}
}
trait ShutdownChannel {
fn send_shutdown(&self, target: &str, request_id: &str) -> netsky_core::Result<()>;
fn has_shutdown_ack(&self, target: &str, request_id: &str) -> bool;
}
struct FsShutdownChannel {
root: PathBuf,
}
impl FsShutdownChannel {
fn new(home_dir: PathBuf) -> Self {
Self {
root: home_dir.join(MCP_CHANNEL_DIR_PREFIX),
}
}
fn inbox_dir(&self, agent: &str) -> PathBuf {
self.root.join(agent).join("inbox")
}
}
impl ShutdownChannel for FsShutdownChannel {
fn send_shutdown(&self, target: &str, request_id: &str) -> netsky_core::Result<()> {
let inbox = self.inbox_dir(target);
assert_no_symlink_under(&self.root, &inbox)?;
let mut envelope = Envelope::new(
AGENT0_NAME,
"shutdown: please /down",
chrono::Utc::now().to_rfc3339(),
);
envelope.id = Some(request_id.to_string());
envelope.to = Some(target.to_string());
envelope.kind = Some(SHUTDOWN_KIND.to_string());
envelope.requires_ack = Some(true);
write_envelope(&inbox, &envelope)?;
Ok(())
}
fn has_shutdown_ack(&self, target: &str, request_id: &str) -> bool {
let inbox = self.inbox_dir(AGENT0_NAME);
let Ok(entries) = fs::read_dir(inbox) else {
return false;
};
entries
.flatten()
.filter_map(|entry| fs::read_to_string(entry.path()).ok())
.filter_map(|raw| serde_json::from_str::<Envelope>(&raw).ok())
.any(|env| is_shutdown_ack(&env, target, request_id))
}
}
fn is_shutdown_ack(env: &Envelope, target: &str, request_id: &str) -> bool {
if env.from != target {
return false;
}
if env.kind.as_deref() == Some(SHUTDOWN_ACK_KIND)
&& env.in_reply_to.as_deref() == Some(request_id)
{
return true;
}
env.text.to_ascii_lowercase().contains("shutdown ack")
&& (env.in_reply_to.as_deref() == Some(request_id) || env.id.as_deref() == Some(request_id))
}
fn filter_agent_sessions(sessions: Vec<String>) -> Vec<String> {
sessions
.into_iter()
.filter(|n| is_agent_session(n))
.collect()
}
fn is_agent_session(name: &str) -> bool {
let rest = match name.strip_prefix(CLONE_PREFIX) {
Some(r) => r,
None => return false,
};
!rest.is_empty() && rest.chars().all(|c| c.is_ascii_digit())
}
#[cfg(test)]
mod tests {
use std::cell::{Cell, RefCell};
use std::collections::HashSet;
use std::rc::Rc;
use serial_test::serial;
use super::*;
#[derive(Default)]
struct FakeSessions {
listed: Vec<String>,
alive: RefCell<HashSet<String>>,
has_checks: Cell<usize>,
killed: RefCell<Vec<String>>,
events: Rc<RefCell<Vec<String>>>,
}
impl FakeSessions {
fn with(listed: &[&str]) -> Self {
Self::with_events(listed, Rc::new(RefCell::new(Vec::new())))
}
fn with_events(listed: &[&str], events: Rc<RefCell<Vec<String>>>) -> Self {
Self {
listed: listed.iter().map(|s| s.to_string()).collect(),
alive: RefCell::new(listed.iter().map(|s| s.to_string()).collect()),
has_checks: Cell::new(0),
killed: RefCell::new(Vec::new()),
events,
}
}
}
impl SessionOps for FakeSessions {
fn list_sessions(&self) -> Vec<String> {
self.listed.clone()
}
fn has_session(&self, name: &str) -> bool {
self.has_checks.set(self.has_checks.get() + 1);
self.alive.borrow().contains(name)
}
fn kill_session(&self, name: &str) -> netsky_core::Result<()> {
self.alive.borrow_mut().remove(name);
self.killed.borrow_mut().push(name.to_string());
self.events.borrow_mut().push(format!("kill {name}"));
Ok(())
}
}
#[derive(Default)]
struct FakeChannel {
sent: RefCell<Vec<(String, String)>>,
acked: RefCell<HashSet<String>>,
events: Rc<RefCell<Vec<String>>>,
}
impl FakeChannel {
fn with_events(events: Rc<RefCell<Vec<String>>>) -> Self {
Self {
events,
..Default::default()
}
}
}
impl ShutdownChannel for FakeChannel {
fn send_shutdown(&self, target: &str, request_id: &str) -> netsky_core::Result<()> {
self.sent
.borrow_mut()
.push((target.to_string(), request_id.to_string()));
self.events.borrow_mut().push(format!("send {target}"));
Ok(())
}
fn has_shutdown_ack(&self, target: &str, _request_id: &str) -> bool {
self.acked.borrow().contains(target)
}
}
#[test]
fn matches_agent_sessions_only() {
assert!(is_agent_session("agent0"));
assert!(is_agent_session("agent5"));
assert!(is_agent_session("agent42"));
assert!(!is_agent_session("agentinfinity"));
assert!(!is_agent_session("agent"));
assert!(!is_agent_session("agentfoo"));
assert!(!is_agent_session("foo"));
assert!(!is_agent_session("netsky-ticker"));
}
#[test]
fn default_targets_leave_agentinfinity_alive() {
let targets = filter_agent_sessions(vec![
"agent0".to_string(),
"agent1".to_string(),
"agentinfinity".to_string(),
"netsky-ticker".to_string(),
]);
assert_eq!(targets, vec!["agent0", "agent1"]);
}
#[test]
#[serial(meta_db)]
fn force_kills_immediately_without_shutdown_envelopes() {
let sessions = FakeSessions::with(&["agent0", "agent1"]);
let channel = FakeChannel::default();
run_with(
&sessions,
&channel,
true,
Duration::from_millis(0),
Duration::from_millis(0),
)
.unwrap();
assert_eq!(channel.sent.borrow().len(), 0);
assert_eq!(*sessions.killed.borrow(), vec!["agent0", "agent1"]);
assert_eq!(sessions.has_checks.get(), 0);
}
#[test]
#[serial(meta_db)]
fn default_sends_shutdown_envelopes_and_waits_before_force_kill() {
let events = Rc::new(RefCell::new(Vec::new()));
let sessions = FakeSessions::with_events(&["agent1", "agent2"], events.clone());
let channel = FakeChannel::with_events(events.clone());
run_with(
&sessions,
&channel,
false,
Duration::from_millis(0),
Duration::from_millis(0),
)
.unwrap();
assert_eq!(channel.sent.borrow().len(), 2);
assert_eq!(channel.sent.borrow()[0].0, "agent1");
assert_eq!(channel.sent.borrow()[1].0, "agent2");
assert_eq!(sessions.has_checks.get(), 2);
assert_eq!(*sessions.killed.borrow(), vec!["agent1", "agent2"]);
assert_eq!(
*events.borrow(),
vec!["send agent1", "send agent2", "kill agent1", "kill agent2"]
);
}
#[test]
#[serial(meta_db)]
fn default_kills_after_shutdown_ack() {
let sessions = FakeSessions::with(&["agent1"]);
let channel = FakeChannel::default();
channel.acked.borrow_mut().insert("agent1".to_string());
run_with(
&sessions,
&channel,
false,
Duration::from_secs(60),
Duration::from_millis(0),
)
.unwrap();
assert_eq!(channel.sent.borrow().len(), 1);
assert_eq!(*sessions.killed.borrow(), vec!["agent1"]);
}
#[test]
#[serial(meta_db)]
fn default_does_not_kill_session_that_closed_itself() {
let sessions = FakeSessions::with(&["agent1"]);
sessions.alive.borrow_mut().remove("agent1");
let channel = FakeChannel::default();
run_with(
&sessions,
&channel,
false,
Duration::from_secs(60),
Duration::from_millis(0),
)
.unwrap();
assert_eq!(channel.sent.borrow().len(), 1);
assert!(sessions.killed.borrow().is_empty());
}
#[test]
fn recognizes_shutdown_ack_envelope() {
let mut env = Envelope::new("agent3", "shutdown ack", "2026-04-17T00:00:00Z");
env.kind = Some(SHUTDOWN_ACK_KIND.to_string());
env.in_reply_to = Some("shutdown-agent3-1".to_string());
assert!(is_shutdown_ack(&env, "agent3", "shutdown-agent3-1"));
assert!(!is_shutdown_ack(&env, "agent4", "shutdown-agent3-1"));
assert!(!is_shutdown_ack(&env, "agent3", "other"));
}
}