use std::fs;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::{Duration, Instant};
use netsky_core::consts::{AGENTINFINITY_NAME, ENV_AGENT_N};
use netsky_core::envelope::Envelope;
use netsky_core::restart_handshake::{RestartAck, RestartConfirm, RestartMessage, RestartRequest};
use crate::cmd::channel::{self, SendEnvelopeOptions};
const RESTART_CLAIMED_DIR: &str = "restart-claimed";
const RESTART_DELIVERED_DIR: &str = "restart-delivered";
pub fn run(handoff: &str, timeout_s: u64) -> netsky_core::Result<()> {
let from = agent_from_env()?;
let root = channel::channel_root();
request_at(&root, &from, handoff, Duration::from_secs(timeout_s))
}
pub(crate) fn request_at(
root: &Path,
from: &str,
handoff: &str,
timeout: Duration,
) -> netsky_core::Result<()> {
send_restart_message(
root,
AGENTINFINITY_NAME,
from,
RestartMessage::Request(RestartRequest {
handoff_path: handoff.to_string(),
}),
)?;
let deadline = Instant::now() + timeout;
let ack = wait_for_restart_message(
root,
from,
deadline,
Duration::from_millis(100),
|env, msg| {
env.from == AGENTINFINITY_NAME
&& matches!(
msg,
RestartMessage::Ack(RestartAck {
handoff_path,
verified: true,
}) if handoff_path == handoff
)
},
)?;
let Some((_, RestartMessage::Ack(ack))) = ack else {
netsky_core::bail!(
"restart request timed out after {}s waiting for agentinfinity ack",
timeout.as_secs()
);
};
send_restart_message(
root,
AGENTINFINITY_NAME,
from,
RestartMessage::Confirm(RestartConfirm {
handoff_path: ack.handoff_path,
}),
)?;
println!(
"[netsky restart request] {} -> {} confirmed {}",
from, AGENTINFINITY_NAME, handoff
);
Ok(())
}
pub(crate) fn send_restart_message(
root: &Path,
target: &str,
from: &str,
msg: RestartMessage,
) -> netsky_core::Result<()> {
let text = msg.to_text()?;
channel::send_envelope_at(
root,
target,
&text,
SendEnvelopeOptions {
from_override: Some(from),
kind: Some(msg.kind()),
thread: Some("restart"),
in_reply_to: None,
requires_ack: None,
},
)
}
pub(crate) fn wait_for_restart_message<F>(
root: &Path,
agent: &str,
deadline: Instant,
poll: Duration,
mut pred: F,
) -> netsky_core::Result<Option<(Envelope, RestartMessage)>>
where
F: FnMut(&Envelope, &RestartMessage) -> bool,
{
while Instant::now() < deadline {
if let Some(found) = take_matching_restart_message(root, agent, &mut pred)? {
return Ok(Some(found));
}
thread::sleep(poll);
}
Ok(None)
}
pub(crate) fn take_matching_restart_message<F>(
root: &Path,
agent: &str,
pred: &mut F,
) -> netsky_core::Result<Option<(Envelope, RestartMessage)>>
where
F: FnMut(&Envelope, &RestartMessage) -> bool,
{
let inbox = inbox_dir(root, agent);
let mut paths = pending_json_files(&inbox)?;
paths.sort();
for path in paths {
let Ok(raw) = fs::read_to_string(&path) else {
continue;
};
let Ok(env) = serde_json::from_str::<Envelope>(&raw) else {
continue;
};
if env.to.as_deref() != Some(agent) {
continue;
}
let Ok(Some(msg)) = RestartMessage::from_envelope(&env) else {
continue;
};
if !pred(&env, &msg) {
continue;
}
let Some(name) = path.file_name() else {
continue;
};
let claimed = claimed_dir(root, agent).join(name);
let delivered = delivered_dir(root, agent).join(name);
fs::create_dir_all(claimed_dir(root, agent))?;
fs::create_dir_all(delivered_dir(root, agent))?;
match fs::rename(&path, &claimed) {
Ok(()) => {}
Err(e) if e.kind() == ErrorKind::NotFound => continue,
Err(e) => return Err(e.into()),
}
fs::rename(&claimed, &delivered)?;
return Ok(Some((env, msg)));
}
Ok(None)
}
fn pending_json_files(dir: &Path) -> std::io::Result<Vec<PathBuf>> {
let rd = match fs::read_dir(dir) {
Ok(rd) => rd,
Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e),
};
Ok(rd
.flatten()
.map(|entry| entry.path())
.filter(|path| {
path.extension().map(|ext| ext == "json").unwrap_or(false)
&& !path
.file_name()
.map(|name| name.to_string_lossy().starts_with('.'))
.unwrap_or(true)
})
.collect())
}
fn agent_dir(root: &Path, agent: &str) -> PathBuf {
root.join(agent)
}
fn inbox_dir(root: &Path, agent: &str) -> PathBuf {
agent_dir(root, agent).join("inbox")
}
fn claimed_dir(root: &Path, agent: &str) -> PathBuf {
agent_dir(root, agent).join(RESTART_CLAIMED_DIR)
}
fn delivered_dir(root: &Path, agent: &str) -> PathBuf {
agent_dir(root, agent).join(RESTART_DELIVERED_DIR)
}
fn agent_from_env() -> netsky_core::Result<String> {
match std::env::var(ENV_AGENT_N) {
Ok(n) if !n.is_empty() => Ok(format!("agent{n}")),
_ => netsky_core::bail!(
"restart request requires {ENV_AGENT_N}; run inside agent0 or pass through the CLI environment"
),
}
}
#[cfg(test)]
mod tests {
use super::*;
use netsky_core::restart_handshake::RESTART_KIND_REQUEST;
#[test]
fn take_matching_restart_message_leaves_non_restart_envelopes_alone() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path();
let inbox = inbox_dir(root, "agentinfinity");
fs::create_dir_all(&inbox).unwrap();
let env = Envelope::new("agent0", "hello", "2026-04-19T20:44:47Z");
netsky_core::envelope::write_envelope(&inbox, &env).unwrap();
let found = take_matching_restart_message(root, "agentinfinity", &mut |_, _| true).unwrap();
assert!(found.is_none());
assert_eq!(pending_json_files(&inbox).unwrap().len(), 1);
}
#[test]
fn take_matching_restart_message_claims_and_archives_match() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path();
let inbox = inbox_dir(root, "agentinfinity");
fs::create_dir_all(&inbox).unwrap();
let mut env = Envelope::new(
"agent0",
r#"{"handoff_path":"/tmp/handoff.txt"}"#,
"2026-04-19T20:44:47Z",
);
env.to = Some("agentinfinity".to_string());
env.kind = Some(RESTART_KIND_REQUEST.to_string());
let path = netsky_core::envelope::write_envelope(&inbox, &env).unwrap();
let found = take_matching_restart_message(root, "agentinfinity", &mut |_, _| true).unwrap();
assert!(found.is_some());
assert!(!path.exists());
assert_eq!(
pending_json_files(&delivered_dir(root, "agentinfinity"))
.unwrap()
.len(),
1
);
}
}