use std::fs;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use netsky_core::consts::{
AGENT0_NAME, CLONE_PREFIX, ENV_HANDOFF_KEEP, HANDOFF_FROM, HANDOFF_KEEP_DEFAULT,
RESTART_AGENT0_TOS_WAIT_S, RESTART_AGENT0_UP_WAIT_S, RESTART_STATUS_KEEP,
RESTART_TEARDOWN_SETTLE_MS,
};
use netsky_core::paths::{agent0_inbox_dir, handoff_archive_dir, restart_status_dir};
use netsky_sh::tmux;
pub fn run(n: u32, handoff_path: Option<&str>) -> netsky_core::Result<()> {
let status = StatusWriter::new();
let result = run_impl(n, handoff_path.map(Path::new), &status);
if let Err(e) = &result {
status.write_errored(&e.to_string());
}
result
}
fn run_impl(n: u32, handoff: Option<&Path>, status: &StatusWriter) -> netsky_core::Result<()> {
netsky_core::spawn::require_deps()?;
teardown_agents();
thread::sleep(Duration::from_millis(RESTART_TEARDOWN_SETTLE_MS));
println!("[netsky-restart] spawning constellation (N={n})");
crate::cmd::up::run(n, crate::cli::AgentType::Claude)?;
status.write_spawned();
if !wait_session_up(AGENT0_NAME) {
let err = agent0_up_failure_error(RESTART_AGENT0_UP_WAIT_S);
status.write_errored(&err.to_string());
return Err(err);
}
status.write_up_detected();
match handoff {
Some(p) if p != Path::new("/dev/null") => {
deliver_handoff(p)?;
}
_ => println!("[netsky-restart] no handoff (file missing or empty)"),
}
for i in 1..=n {
let _ = wait_session_up(&format!("{CLONE_PREFIX}{i}"));
}
println!("[netsky-restart] complete");
Ok(())
}
fn agent0_up_failure_error(wait_s: u64) -> netsky_core::Error {
netsky_core::Error::Message(format!(
"restart failed at agent0 step: agent0 did not finish /up within {wait_s}s"
))
}
fn teardown_agents() {
for s in tmux::list_sessions()
.into_iter()
.filter(|s| is_agent_session(s))
{
println!("[netsky-restart] killing session {s}");
let _ = tmux::kill_session(&s);
}
}
fn is_agent_session(name: &str) -> bool {
let rest = match name.strip_prefix(CLONE_PREFIX) {
Some(r) => r,
None => return false,
};
!rest.is_empty() && rest.chars().all(|c| c.is_ascii_digit())
}
fn wait_session_up(sess: &str) -> bool {
if !tmux::session_is_alive(sess) {
println!("[netsky-restart] warn: session {sess} did not spawn; skipping");
return false;
}
if netsky_core::spawn::dismiss_tos(sess, Duration::from_secs(RESTART_AGENT0_TOS_WAIT_S)) {
println!("[netsky-restart] {sess}: TOS dismissed");
}
let mut found = false;
for _ in 0..RESTART_AGENT0_UP_WAIT_S {
if let Ok(out) = tmux::capture_pane(sess, None)
&& contains_session_marker(&out)
{
found = true;
break;
}
thread::sleep(Duration::from_secs(1));
}
if found {
println!("[netsky-restart] {sess}: /up complete");
} else {
println!(
"[netsky-restart] warn: {sess} did not finish /up within {RESTART_AGENT0_UP_WAIT_S}s"
);
}
found
}
fn contains_session_marker(pane: &str) -> bool {
for line in pane.lines() {
if let Some((_, rest)) = line.split_once("session ")
&& rest.chars().next().is_some_and(|c| c.is_ascii_digit())
{
return true;
}
}
false
}
fn deliver_handoff(file: &Path) -> netsky_core::Result<()> {
deliver_handoff_core(file, &agent0_inbox_dir(), &handoff_archive_dir())
}
fn deliver_handoff_core(source: &Path, inbox: &Path, archive: &Path) -> netsky_core::Result<()> {
let text = match fs::read_to_string(source) {
Ok(t) => t,
Err(e) => {
println!(
"[netsky-restart] warn: handoff read failed for {}: {e}",
source.display()
);
return Ok(());
}
};
if text.is_empty() {
println!("[netsky-restart] no handoff (file missing or empty)");
return Ok(());
}
fs::create_dir_all(inbox)?;
fs::create_dir_all(archive)?;
let ts_iso = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
let ts_compact = chrono::Utc::now().format("%Y%m%dT%H%M%SZ");
let pid = std::process::id();
let name = format!("{ts_compact}-{pid}-from-{HANDOFF_FROM}.json");
let envelope = serde_json::json!({
"from": HANDOFF_FROM,
"text": text,
"ts": ts_iso,
});
let body = serde_json::to_string(&envelope)?;
let tmp = inbox.join(format!(".{name}.tmp"));
let final_ = inbox.join(&name);
let archive_path = archive.join(&name);
fs::write(&tmp, &body)?;
fs::rename(&tmp, &final_)?;
fs::copy(&final_, &archive_path)?;
println!(
"[netsky-restart] handoff delivered to {name} (archive: {})",
archive_path.display()
);
match fs::remove_file(source) {
Ok(()) => println!(
"[netsky-restart] handoff source removed: {}",
source.display()
),
Err(e) => println!(
"[netsky-restart] warn: handoff source rm failed for {}: {e}",
source.display()
),
}
prune_archive(archive)?;
Ok(())
}
fn prune_archive(dir: &Path) -> netsky_core::Result<()> {
let keep = std::env::var(ENV_HANDOFF_KEEP)
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or(HANDOFF_KEEP_DEFAULT);
let mut entries: Vec<(SystemTime, PathBuf)> = Vec::new();
for e in fs::read_dir(dir)? {
let e = e?;
let p = e.path();
if p.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
let mtime = e.metadata()?.modified().unwrap_or(UNIX_EPOCH);
entries.push((mtime, p));
}
entries.sort_by(|a, b| b.0.cmp(&a.0));
for (_, p) in entries.into_iter().skip(keep) {
let _ = fs::remove_file(p);
}
Ok(())
}
struct StatusWriter {
pid: u32,
started_at: String,
path: PathBuf,
}
impl StatusWriter {
fn new() -> Self {
let pid = std::process::id();
let ts_compact = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
let ts_iso = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
let dir = restart_status_dir();
let _ = fs::create_dir_all(&dir);
let path = dir.join(format!("{ts_compact}-{pid}.json"));
Self {
pid,
started_at: ts_iso,
path,
}
}
fn write_spawned(&self) {
self.write_phase("spawned", None, None);
}
fn write_up_detected(&self) {
self.write_phase("up-detected", Some(0), None);
}
fn write_errored(&self, error: &str) {
self.write_phase("errored", Some(1), Some(error));
}
fn write_phase(&self, phase: &str, exit_code: Option<i32>, error: Option<&str>) {
let updated_at = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string();
let rendered = match render_status_body(
self.pid,
&self.started_at,
phase,
exit_code,
error,
&updated_at,
) {
Some(s) => s,
None => return,
};
let tmp = self.path.with_extension("json.tmp");
if fs::write(&tmp, &rendered).is_err() {
return;
}
let _ = fs::rename(&tmp, &self.path);
let _ = prune_status_dir(&restart_status_dir());
}
}
fn render_status_body(
pid: u32,
started_at: &str,
phase: &str,
exit_code: Option<i32>,
error: Option<&str>,
updated_at: &str,
) -> Option<String> {
let body = serde_json::json!({
"pid": pid,
"started_at": started_at,
"phase": phase,
"exit_code": exit_code,
"error": error,
"updated_at": updated_at,
});
serde_json::to_string(&body).ok()
}
fn prune_status_dir(dir: &Path) -> netsky_core::Result<()> {
let mut entries: Vec<(SystemTime, PathBuf)> = Vec::new();
for e in fs::read_dir(dir)? {
let e = e?;
let p = e.path();
if p.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
let mtime = e.metadata()?.modified().unwrap_or(UNIX_EPOCH);
entries.push((mtime, p));
}
entries.sort_by(|a, b| b.0.cmp(&a.0));
for (_, p) in entries.into_iter().skip(RESTART_STATUS_KEEP) {
let _ = fs::remove_file(p);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn agent_session_filter() {
assert!(is_agent_session("agent0"));
assert!(is_agent_session("agent42"));
assert!(!is_agent_session("agentinfinity"));
assert!(!is_agent_session("netsky-ticker"));
}
#[test]
fn up_completion_probe() {
assert!(contains_session_marker("agent0 session 3 starting at ..."));
assert!(!contains_session_marker("no marker here"));
}
#[test]
fn status_body_shape_spawned() {
let rendered = render_status_body(
1234,
"2026-04-15T16:00:00Z",
"spawned",
None,
None,
"2026-04-15T16:00:05Z",
)
.expect("serialize");
let v: serde_json::Value = serde_json::from_str(&rendered).expect("parse");
assert_eq!(v["pid"], 1234);
assert_eq!(v["phase"], "spawned");
assert!(v["exit_code"].is_null());
assert!(v["error"].is_null());
}
#[test]
fn status_body_shape_errored_carries_error_text() {
let rendered = render_status_body(
9,
"2026-04-15T16:00:00Z",
"errored",
Some(1),
Some("tmux refused: command too long"),
"2026-04-15T16:00:05Z",
)
.expect("serialize");
let v: serde_json::Value = serde_json::from_str(&rendered).expect("parse");
assert_eq!(v["phase"], "errored");
assert_eq!(v["exit_code"], 1);
assert_eq!(v["error"], "tmux refused: command too long");
}
#[test]
fn prune_status_dir_keeps_only_most_recent() {
let dir = tempdir().unwrap();
let total = RESTART_STATUS_KEEP + 5;
for i in 0..total {
let p = dir.path().join(format!("{i:04}.json"));
fs::write(&p, "{}").unwrap();
let age_mins = total - i;
let stamp = format!("2025010100{age_mins:02}"); std::process::Command::new("touch")
.args(["-t", &stamp, p.to_str().unwrap()])
.status()
.unwrap();
}
prune_status_dir(dir.path()).unwrap();
let remaining: Vec<_> = fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension().and_then(|s| s.to_str()) == Some("json"))
.collect();
assert_eq!(remaining.len(), RESTART_STATUS_KEEP);
}
#[test]
fn prune_status_dir_ignores_non_json() {
let dir = tempdir().unwrap();
fs::write(dir.path().join("a.json"), "{}").unwrap();
fs::write(dir.path().join("a.json.tmp"), "{}").unwrap();
fs::write(dir.path().join("random.txt"), "x").unwrap();
prune_status_dir(dir.path()).unwrap();
assert!(dir.path().join("a.json").exists());
assert!(dir.path().join("a.json.tmp").exists());
assert!(dir.path().join("random.txt").exists());
}
#[test]
fn deliver_handoff_removes_source_after_copy() {
let dir = tempdir().unwrap();
let source = dir.path().join("src.txt");
let inbox = dir.path().join("inbox");
let archive = dir.path().join("archive");
std::fs::write(&source, "hello handoff").unwrap();
deliver_handoff_core(&source, &inbox, &archive).unwrap();
assert!(!source.exists(), "source draft must be reaped");
let inbox_entries: Vec<_> = std::fs::read_dir(&inbox)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name().into_string().unwrap())
.filter(|n| !n.starts_with('.'))
.collect();
assert_eq!(inbox_entries.len(), 1, "exactly one envelope in inbox");
let archive_entries: Vec<_> = std::fs::read_dir(&archive)
.unwrap()
.filter_map(|e| e.ok())
.map(|e| e.file_name().into_string().unwrap())
.collect();
assert_eq!(archive_entries.len(), 1, "exactly one envelope in archive");
assert_eq!(inbox_entries, archive_entries);
let envelope_path = inbox.join(&inbox_entries[0]);
let body = std::fs::read_to_string(&envelope_path).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(parsed["text"].as_str().unwrap(), "hello handoff");
assert_eq!(parsed["from"].as_str().unwrap(), HANDOFF_FROM);
}
#[test]
fn deliver_handoff_no_op_on_empty_source() {
let dir = tempdir().unwrap();
let source = dir.path().join("src.txt");
let inbox = dir.path().join("inbox");
let archive = dir.path().join("archive");
std::fs::write(&source, "").unwrap();
deliver_handoff_core(&source, &inbox, &archive).unwrap();
assert!(source.exists(), "empty-source draft must not be reaped");
assert!(!inbox.exists() || std::fs::read_dir(&inbox).unwrap().count() == 0);
assert!(!archive.exists() || std::fs::read_dir(&archive).unwrap().count() == 0);
}
#[test]
fn agent0_up_failure_error_names_the_timeout() {
let err = agent0_up_failure_error(90);
let msg = err.to_string();
assert!(
msg.contains("restart failed at agent0 step"),
"error must name the failing step for the crashloop page: {msg}"
);
assert!(
msg.contains("within 90s"),
"error must include the actual timeout: {msg}"
);
}
#[test]
fn agent0_up_failure_error_is_message_variant() {
let err = agent0_up_failure_error(5);
assert!(
matches!(err, netsky_core::Error::Message(_)),
"agent0_up failure should surface as Error::Message until promoted"
);
}
#[test]
fn deliver_handoff_archive_mirrors_inbox_content() {
let dir = tempdir().unwrap();
let source = dir.path().join("src.txt");
let inbox = dir.path().join("inbox");
let archive = dir.path().join("archive");
std::fs::write(&source, "payload-AAA").unwrap();
deliver_handoff_core(&source, &inbox, &archive).unwrap();
let inbox_files: Vec<_> = fs::read_dir(&inbox).unwrap().flatten().collect();
let archive_files: Vec<_> = fs::read_dir(&archive).unwrap().flatten().collect();
assert_eq!(inbox_files.len(), 1, "exactly one envelope in inbox");
assert_eq!(archive_files.len(), 1, "exactly one record in archive");
let i = std::fs::read_to_string(inbox_files[0].path()).unwrap();
let a = std::fs::read_to_string(archive_files[0].path()).unwrap();
assert_eq!(i, a, "archive must mirror inbox content byte-for-byte");
assert!(
i.contains("payload-AAA"),
"envelope must include the source payload"
);
}
#[test]
fn deliver_handoff_leaves_no_tmp_in_inbox_after_success() {
let dir = tempdir().unwrap();
let source = dir.path().join("src.txt");
let inbox = dir.path().join("inbox");
let archive = dir.path().join("archive");
std::fs::write(&source, "payload").unwrap();
deliver_handoff_core(&source, &inbox, &archive).unwrap();
let stray_tmps: Vec<_> = fs::read_dir(&inbox)
.unwrap()
.flatten()
.filter(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str())
.map(|s| s.ends_with(".tmp") || s.contains(".tmp"))
.unwrap_or(false)
})
.collect();
assert!(
stray_tmps.is_empty(),
"inbox must not contain .tmp debris after successful delivery: {stray_tmps:?}"
);
}
}