netsky 0.2.0

netsky CLI: the viable system launcher and subcommand dispatcher
Documentation
use std::fs;
use std::io::ErrorKind;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::{Duration, Instant};

use netsky_core::consts::{AGENTINFINITY_NAME, ENV_AGENT_N};
use netsky_core::envelope::Envelope;
use netsky_core::restart_handshake::{RestartAck, RestartConfirm, RestartMessage, RestartRequest};

use crate::cmd::channel::{self, SendEnvelopeOptions};

const RESTART_CLAIMED_DIR: &str = "restart-claimed";
const RESTART_DELIVERED_DIR: &str = "restart-delivered";

pub fn run(handoff: &str, timeout_s: u64) -> netsky_core::Result<()> {
    let from = agent_from_env()?;
    let root = channel::channel_root();
    request_at(&root, &from, handoff, Duration::from_secs(timeout_s))
}

pub(crate) fn request_at(
    root: &Path,
    from: &str,
    handoff: &str,
    timeout: Duration,
) -> netsky_core::Result<()> {
    send_restart_message(
        root,
        AGENTINFINITY_NAME,
        from,
        RestartMessage::Request(RestartRequest {
            handoff_path: handoff.to_string(),
        }),
    )?;

    let deadline = Instant::now() + timeout;
    let ack = wait_for_restart_message(
        root,
        from,
        deadline,
        Duration::from_millis(100),
        |env, msg| {
            env.from == AGENTINFINITY_NAME
                && matches!(
                    msg,
                    RestartMessage::Ack(RestartAck {
                        handoff_path,
                        verified: true,
                    }) if handoff_path == handoff
                )
        },
    )?;

    let Some((_, RestartMessage::Ack(ack))) = ack else {
        netsky_core::bail!(
            "restart request timed out after {}s waiting for agentinfinity ack",
            timeout.as_secs()
        );
    };

    send_restart_message(
        root,
        AGENTINFINITY_NAME,
        from,
        RestartMessage::Confirm(RestartConfirm {
            handoff_path: ack.handoff_path,
        }),
    )?;
    println!(
        "[netsky restart request] {} -> {} confirmed {}",
        from, AGENTINFINITY_NAME, handoff
    );
    Ok(())
}

pub(crate) fn send_restart_message(
    root: &Path,
    target: &str,
    from: &str,
    msg: RestartMessage,
) -> netsky_core::Result<()> {
    let text = msg.to_text()?;
    channel::send_envelope_at(
        root,
        target,
        &text,
        SendEnvelopeOptions {
            from_override: Some(from),
            kind: Some(msg.kind()),
            thread: Some("restart"),
            in_reply_to: None,
            requires_ack: None,
        },
    )
}

pub(crate) fn wait_for_restart_message<F>(
    root: &Path,
    agent: &str,
    deadline: Instant,
    poll: Duration,
    mut pred: F,
) -> netsky_core::Result<Option<(Envelope, RestartMessage)>>
where
    F: FnMut(&Envelope, &RestartMessage) -> bool,
{
    while Instant::now() < deadline {
        if let Some(found) = take_matching_restart_message(root, agent, &mut pred)? {
            return Ok(Some(found));
        }
        thread::sleep(poll);
    }
    Ok(None)
}

pub(crate) fn take_matching_restart_message<F>(
    root: &Path,
    agent: &str,
    pred: &mut F,
) -> netsky_core::Result<Option<(Envelope, RestartMessage)>>
where
    F: FnMut(&Envelope, &RestartMessage) -> bool,
{
    let inbox = inbox_dir(root, agent);
    let mut paths = pending_json_files(&inbox)?;
    paths.sort();
    for path in paths {
        let Ok(raw) = fs::read_to_string(&path) else {
            continue;
        };
        let Ok(env) = serde_json::from_str::<Envelope>(&raw) else {
            continue;
        };
        if env.to.as_deref() != Some(agent) {
            continue;
        }
        let Ok(Some(msg)) = RestartMessage::from_envelope(&env) else {
            continue;
        };
        if !pred(&env, &msg) {
            continue;
        }
        let Some(name) = path.file_name() else {
            continue;
        };
        let claimed = claimed_dir(root, agent).join(name);
        let delivered = delivered_dir(root, agent).join(name);
        fs::create_dir_all(claimed_dir(root, agent))?;
        fs::create_dir_all(delivered_dir(root, agent))?;
        match fs::rename(&path, &claimed) {
            Ok(()) => {}
            Err(e) if e.kind() == ErrorKind::NotFound => continue,
            Err(e) => return Err(e.into()),
        }
        fs::rename(&claimed, &delivered)?;
        return Ok(Some((env, msg)));
    }
    Ok(None)
}

fn pending_json_files(dir: &Path) -> std::io::Result<Vec<PathBuf>> {
    let rd = match fs::read_dir(dir) {
        Ok(rd) => rd,
        Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
        Err(e) => return Err(e),
    };
    Ok(rd
        .flatten()
        .map(|entry| entry.path())
        .filter(|path| {
            path.extension().map(|ext| ext == "json").unwrap_or(false)
                && !path
                    .file_name()
                    .map(|name| name.to_string_lossy().starts_with('.'))
                    .unwrap_or(true)
        })
        .collect())
}

fn agent_dir(root: &Path, agent: &str) -> PathBuf {
    root.join(agent)
}

fn inbox_dir(root: &Path, agent: &str) -> PathBuf {
    agent_dir(root, agent).join("inbox")
}

fn claimed_dir(root: &Path, agent: &str) -> PathBuf {
    agent_dir(root, agent).join(RESTART_CLAIMED_DIR)
}

fn delivered_dir(root: &Path, agent: &str) -> PathBuf {
    agent_dir(root, agent).join(RESTART_DELIVERED_DIR)
}

fn agent_from_env() -> netsky_core::Result<String> {
    match std::env::var(ENV_AGENT_N) {
        Ok(n) if !n.is_empty() => Ok(format!("agent{n}")),
        _ => netsky_core::bail!(
            "restart request requires {ENV_AGENT_N}; run inside agent0 or pass through the CLI environment"
        ),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use netsky_core::restart_handshake::RESTART_KIND_REQUEST;

    #[test]
    fn take_matching_restart_message_leaves_non_restart_envelopes_alone() {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path();
        let inbox = inbox_dir(root, "agentinfinity");
        fs::create_dir_all(&inbox).unwrap();
        let env = Envelope::new("agent0", "hello", "2026-04-19T20:44:47Z");
        netsky_core::envelope::write_envelope(&inbox, &env).unwrap();

        let found = take_matching_restart_message(root, "agentinfinity", &mut |_, _| true).unwrap();
        assert!(found.is_none());
        assert_eq!(pending_json_files(&inbox).unwrap().len(), 1);
    }

    #[test]
    fn take_matching_restart_message_claims_and_archives_match() {
        let dir = tempfile::tempdir().unwrap();
        let root = dir.path();
        let inbox = inbox_dir(root, "agentinfinity");
        fs::create_dir_all(&inbox).unwrap();
        let mut env = Envelope::new(
            "agent0",
            r#"{"handoff_path":"/tmp/handoff.txt"}"#,
            "2026-04-19T20:44:47Z",
        );
        env.to = Some("agentinfinity".to_string());
        env.kind = Some(RESTART_KIND_REQUEST.to_string());
        let path = netsky_core::envelope::write_envelope(&inbox, &env).unwrap();

        let found = take_matching_restart_message(root, "agentinfinity", &mut |_, _| true).unwrap();
        assert!(found.is_some());
        assert!(!path.exists());
        assert_eq!(
            pending_json_files(&delivered_dir(root, "agentinfinity"))
                .unwrap()
                .len(),
            1
        );
    }
}