use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::thread;
use std::time::{Duration, Instant};
use chrono::{DateTime, Utc};
use rusqlite::{Connection, OptionalExtension, params};
use serde::Deserialize;
use sha2::{Digest, Sha256};
use netsky_core::agent::AgentId;
use netsky_core::envelope::Envelope;
use netsky_core::paths::{agent_pane_hash_file, home, state_dir};
use netsky_db::{CloneDispatchRecord, Db};
use netsky_sh::tmux;
use crate::cli::{AgentType, CloneCommand};
use crate::cmd::agent::{self, RuntimeKind};
use crate::cmd::channel::{self, SendEnvelopeOptions};
use crate::cmd::workspace;
const DEFAULT_REPLY_TARGET: &str = "agent0";
const DEFAULT_BRIEF_FROM: &str = "agent0";
const CODEX_BRIEF_SETTLE_MS: u64 = 750;
const CODEX_BRIEF_TIMEOUT_S: u64 = 10;
const CODEX_BRIEF_IDLE_POLLS: u8 = 8;
const WAIT_POLL_MS: u64 = 250;
pub fn run(cmd: CloneCommand) -> netsky_core::Result<()> {
let paths = ClonePaths::for_home(&home());
match cmd {
CloneCommand::Brief {
path,
agent_type,
workspace,
agent,
} => {
let result = run_brief_with(
&paths,
BriefRequest {
brief_path: path,
workspace,
requested_agent: agent,
agent_type,
from_agent: DEFAULT_BRIEF_FROM.to_string(),
reply_target: DEFAULT_REPLY_TARGET.to_string(),
},
|task| workspace::clone_workspace(task).map(|_| ()),
|n, kind| {
agent::spawn_resident(n, false, kind, false)?;
Ok(())
},
)?;
println!("{}", result.agent);
Ok(())
}
CloneCommand::Status { n } => {
print!("{}", render_status(&clone_status(&paths, &agent_name(n))?));
Ok(())
}
CloneCommand::Wait { n, timeout } => {
let result = wait_for_reply(
&paths,
&agent_name(n),
DEFAULT_REPLY_TARGET,
Duration::from_secs(timeout),
WAIT_POLL_MS,
tmux::session_is_alive,
)?;
println!(
"{} {} {}",
result.status,
result.agent,
result.path.display()
);
Ok(())
}
CloneCommand::Kill { n } => {
tmux::kill_session(&agent_name(n))?;
println!("{}", agent_name(n));
Ok(())
}
CloneCommand::Ls => {
print!("{}", render_ls(&clone_ls(&paths)?));
Ok(())
}
}
}
#[derive(Debug, Clone)]
struct ClonePaths {
channel_root: PathBuf,
meta_db_path: PathBuf,
state_dir: PathBuf,
}
impl ClonePaths {
fn for_home(home_dir: &Path) -> Self {
Self {
channel_root: channel::channel_root(),
meta_db_path: home_dir.join(".netsky").join("meta.db"),
state_dir: state_dir(),
}
}
}
#[derive(Debug, Clone)]
struct BriefRequest {
brief_path: PathBuf,
workspace: String,
requested_agent: Option<u32>,
agent_type: AgentType,
from_agent: String,
reply_target: String,
}
#[derive(Debug, Clone)]
struct BriefResult {
agent: String,
}
fn run_brief_with<EnsureWorkspace, SpawnAgent>(
paths: &ClonePaths,
request: BriefRequest,
ensure_workspace: EnsureWorkspace,
spawn_agent: SpawnAgent,
) -> netsky_core::Result<BriefResult>
where
EnsureWorkspace: Fn(&str) -> netsky_core::Result<()>,
SpawnAgent: Fn(u32, RuntimeKind) -> netsky_core::Result<()>,
{
let brief = fs::read_to_string(&request.brief_path)?;
ensure_workspace(&request.workspace)?;
if let Some(n) = request.requested_agent {
let requested = agent_name(n);
if tmux::has_session(&requested) {
netsky_core::bail!(
"clone brief: agent '{requested}' is already up; use a different --agent or kill the existing session first"
);
}
}
let agent_num = request
.requested_agent
.unwrap_or_else(|| allocate_agent_number(&tmux::list_sessions()));
let agent = agent_name(agent_num);
let runtime = runtime_kind(request.agent_type);
spawn_agent(agent_num, runtime)?;
let ts_utc = Utc::now();
let dispatch_id = format!(
"clone-{agent}-{:.0}",
ts_utc.timestamp_nanos_opt().unwrap_or(0)
);
let thread = format!("clone:{dispatch_id}");
let send = channel::send_envelope_with_at(
&paths.channel_root,
&agent,
&brief,
SendEnvelopeOptions {
from_override: Some(&request.from_agent),
kind: Some("brief"),
thread: Some(&thread),
in_reply_to: None,
requires_ack: Some(true),
},
)?;
if matches!(request.agent_type, AgentType::Codex) {
if wait_for_codex_brief_target(
&agent,
Duration::from_secs(CODEX_BRIEF_TIMEOUT_S),
Duration::from_millis(WAIT_POLL_MS),
) {
thread::sleep(Duration::from_millis(CODEX_BRIEF_SETTLE_MS));
match channel::deliver_path_to_tmux(&paths.channel_root, &agent, &agent, &send.path) {
Ok(true) => {}
Ok(false) => {
eprintln!(
"[clone brief] {agent}: codex auto-delivery skipped; brief stays on the bus"
);
}
Err(err) => {
eprintln!(
"[clone brief] {agent}: codex auto-delivery failed: {err}; brief stays on the bus"
);
}
}
} else {
eprintln!("[clone brief] {agent}: codex pane never settled; brief stays on the bus");
}
}
let detail_json = serde_json::json!({
"dispatch_id": dispatch_id,
"dispatch_message_id": send.path.file_name().and_then(|name| name.to_str()),
"dispatch_envelope_id": send.envelope_id,
"dispatch_thread": thread,
"reply_target": request.reply_target,
})
.to_string();
record_clone_dispatch_at(
&paths.meta_db_path,
CloneDispatchRecord {
ts_utc_start: ts_utc,
ts_utc_end: None,
agent_id: &agent,
runtime: Some(runtime_label(request.agent_type)),
brief_path: Some(&request.brief_path.display().to_string()),
brief: None,
workspace: Some(&request.workspace),
branch: None,
status: Some("brief-sent"),
exit_code: None,
detail_json: Some(&detail_json),
},
)?;
Ok(BriefResult { agent })
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct DispatchRow {
ts_utc_start: DateTime<Utc>,
runtime: Option<String>,
brief_path: Option<String>,
workspace: Option<String>,
status: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct MessageRow {
ts_utc: DateTime<Utc>,
tool: Option<String>,
direction: Option<String>,
chat_id: Option<String>,
message_id: Option<String>,
status: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct PaneState {
label: String,
last_activity_utc: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CloneStatus {
agent: String,
alive: bool,
runtime: Option<String>,
dispatch_status: Option<String>,
brief_path: Option<String>,
workspace: Option<String>,
inbox_count: usize,
claimed_count: usize,
delivered_count: usize,
poison_count: usize,
last_message: Option<MessageRow>,
pane_state: PaneState,
}
fn clone_status(paths: &ClonePaths, agent: &str) -> netsky_core::Result<CloneStatus> {
let dispatch = latest_dispatch(&paths.meta_db_path, agent)?;
let counts = channel_counts(&paths.channel_root, agent)?;
let pane_state = pane_state(agent, &paths.state_dir);
Ok(CloneStatus {
agent: agent.to_string(),
alive: tmux::session_is_alive(agent),
runtime: dispatch.as_ref().and_then(|row| row.runtime.clone()),
dispatch_status: dispatch.as_ref().and_then(|row| row.status.clone()),
brief_path: dispatch.as_ref().and_then(|row| row.brief_path.clone()),
workspace: dispatch.as_ref().and_then(|row| row.workspace.clone()),
inbox_count: counts.inbox_count,
claimed_count: counts.claimed_count,
delivered_count: counts.delivered_count,
poison_count: counts.poison_count,
last_message: latest_message(&paths.meta_db_path, agent)?,
pane_state,
})
}
fn render_status(status: &CloneStatus) -> String {
let last_message = status
.last_message
.as_ref()
.map(|row| {
format!(
"{} {} {} {} {} {}",
row.ts_utc.to_rfc3339(),
row.tool.as_deref().unwrap_or("-"),
row.direction.as_deref().unwrap_or("-"),
row.chat_id.as_deref().unwrap_or("-"),
row.message_id.as_deref().unwrap_or("-"),
row.status.as_deref().unwrap_or("-"),
)
})
.unwrap_or_else(|| "-".to_string());
format!(
"agent: {}\nalive: {}\nruntime: {}\ndispatch_status: {}\nbrief_path: {}\nworkspace: {}\ninbox: {}\nclaimed: {}\ndelivered: {}\npoison: {}\nlast_message: {}\npane_state: {}\nlast_tmux_activity_utc: {}\n",
status.agent,
status.alive,
status.runtime.as_deref().unwrap_or("-"),
status.dispatch_status.as_deref().unwrap_or("-"),
status.brief_path.as_deref().unwrap_or("-"),
status.workspace.as_deref().unwrap_or("-"),
status.inbox_count,
status.claimed_count,
status.delivered_count,
status.poison_count,
last_message,
status.pane_state.label,
status
.pane_state
.last_activity_utc
.map(|ts| ts.to_rfc3339())
.unwrap_or_else(|| "-".to_string())
)
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct WaitResult {
agent: String,
status: String,
path: PathBuf,
}
fn wait_for_reply<F>(
paths: &ClonePaths,
agent: &str,
reply_target: &str,
timeout: Duration,
poll_ms: u64,
session_alive: F,
) -> netsky_core::Result<WaitResult>
where
F: Fn(&str) -> bool,
{
let dispatch = latest_dispatch(&paths.meta_db_path, agent)?
.ok_or_else(|| netsky_core::Error::msg(format!("no dispatch found for {agent}")))?;
let detail = dispatch_detail(dispatch.detail_json.as_deref())?;
let deadline = Instant::now() + timeout;
loop {
if let Some(path) = find_reply_envelope(
&paths.channel_root,
reply_target,
agent,
dispatch.ts_utc_start,
detail.dispatch_thread.as_deref(),
)? {
return Ok(WaitResult {
agent: agent.to_string(),
status: "done".to_string(),
path,
});
}
if Instant::now() >= deadline {
netsky_core::bail!("status=timeout agent={agent}");
}
if !session_alive(agent) {
netsky_core::bail!("status=agent-exited agent={agent}");
}
thread::sleep(Duration::from_millis(poll_ms.max(1)));
}
}
#[derive(Debug, Default, Clone, Deserialize)]
struct DispatchDetail {
dispatch_thread: Option<String>,
}
fn dispatch_detail(detail_json: Option<&str>) -> netsky_core::Result<DispatchDetail> {
match detail_json {
Some(raw) => Ok(serde_json::from_str(raw)?),
None => Ok(DispatchDetail::default()),
}
}
fn find_reply_envelope(
channel_root: &Path,
reply_target: &str,
from_agent: &str,
min_ts: DateTime<Utc>,
thread_name: Option<&str>,
) -> netsky_core::Result<Option<PathBuf>> {
let base = channel_root.join(reply_target);
for dir_name in ["inbox", "claimed", "delivered"] {
let dir = base.join(dir_name);
let Ok(entries) = fs::read_dir(&dir) else {
continue;
};
let mut paths: Vec<PathBuf> = entries.flatten().map(|entry| entry.path()).collect();
paths.sort();
for path in paths {
let Ok(raw) = fs::read_to_string(&path) else {
continue;
};
let Ok(env) = serde_json::from_str::<Envelope>(&raw) else {
continue;
};
if env.from != from_agent {
continue;
}
if env.to.as_deref() != Some(reply_target) {
continue;
}
let Ok(ts) = DateTime::parse_from_rfc3339(&env.ts) else {
continue;
};
let ts = ts.with_timezone(&Utc);
if ts < min_ts {
continue;
}
if thread_name.is_some() && env.thread.as_deref() != thread_name {
continue;
}
return Ok(Some(path));
}
}
Ok(None)
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ChannelCounts {
inbox_count: usize,
claimed_count: usize,
delivered_count: usize,
poison_count: usize,
}
fn channel_counts(channel_root: &Path, agent: &str) -> netsky_core::Result<ChannelCounts> {
let dir = channel_root.join(agent);
Ok(ChannelCounts {
inbox_count: count_files(&dir.join("inbox"))?,
claimed_count: count_files(&dir.join("claimed"))?,
delivered_count: count_files(&dir.join("delivered"))?,
poison_count: count_files(&dir.join("poison"))?,
})
}
fn count_files(path: &Path) -> netsky_core::Result<usize> {
let entries = match fs::read_dir(path) {
Ok(entries) => entries,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(0),
Err(err) => return Err(err.into()),
};
Ok(entries
.flatten()
.filter(|entry| entry.file_type().map(|ty| ty.is_file()).unwrap_or(false))
.count())
}
fn pane_state(agent: &str, state_root: &Path) -> PaneState {
let path = if state_root == state_dir() {
agent_pane_hash_file(agent)
} else {
state_root.join(format!("{agent}-pane-hash"))
};
let Ok(saved) = fs::read_to_string(&path) else {
return PaneState {
label: "unknown".to_string(),
last_activity_utc: None,
};
};
let mut lines = saved.lines();
let Some(saved_hash) = lines.next() else {
return PaneState {
label: "unknown".to_string(),
last_activity_utc: None,
};
};
let last_ts = lines
.next()
.and_then(|line| line.parse::<u64>().ok())
.and_then(epoch_to_utc);
let Ok(pane) = tmux::capture_pane(agent, None) else {
return PaneState {
label: "unknown".to_string(),
last_activity_utc: last_ts,
};
};
let current_hash = sha256_hex(&pane);
let label = if current_hash == saved_hash {
"stable"
} else {
"changing"
};
let last_activity_utc = if label == "changing" {
Some(Utc::now())
} else {
last_ts
};
PaneState {
label: label.to_string(),
last_activity_utc,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CloneListRow {
agent: String,
age: Option<Duration>,
last_envelope_utc: Option<DateTime<Utc>>,
}
fn clone_ls(paths: &ClonePaths) -> netsky_core::Result<Vec<CloneListRow>> {
let mut rows = Vec::new();
for session in list_clone_sessions()? {
rows.push(CloneListRow {
last_envelope_utc: latest_message(&paths.meta_db_path, &session.name)?
.map(|row| row.ts_utc),
agent: session.name,
age: session
.created_at
.and_then(|ts| Utc::now().signed_duration_since(ts).to_std().ok()),
});
}
Ok(rows)
}
fn render_ls(rows: &[CloneListRow]) -> String {
if rows.is_empty() {
return "no active clones\n".to_string();
}
let mut out = String::from("agent\tage\tlast_envelope_utc\n");
for row in rows {
out.push_str(&format!(
"{}\t{}\t{}\n",
row.agent,
row.age
.map(format_duration)
.unwrap_or_else(|| "-".to_string()),
row.last_envelope_utc
.map(|ts| ts.to_rfc3339())
.unwrap_or_else(|| "-".to_string())
));
}
out
}
fn wait_for_codex_brief_target(agent: &str, timeout: Duration, poll: Duration) -> bool {
let deadline = Instant::now() + timeout;
let mut last_ready_pane = String::new();
let mut stable_polls = 0u8;
while Instant::now() < deadline {
if let Ok(pane) = tmux::capture_pane(agent, None) {
if codex_brief_target_ready(&pane) {
if pane == last_ready_pane {
stable_polls = stable_polls.saturating_add(1);
} else {
last_ready_pane = pane;
stable_polls = 0;
}
if stable_polls >= CODEX_BRIEF_IDLE_POLLS {
return true;
}
} else {
last_ready_pane.clear();
stable_polls = 0;
}
}
thread::sleep(poll);
}
false
}
fn codex_brief_target_ready(pane: &str) -> bool {
codex_pane_has_prompt_marker(pane) && !codex_pane_is_busy(pane)
}
fn codex_pane_has_prompt_marker(pane: &str) -> bool {
pane.contains("/up") || pane.contains("gpt-5.4") || pane.contains('›')
}
fn codex_pane_is_busy(pane: &str) -> bool {
const BUSY_MARKERS: [&str; 6] = [
"Working",
"Thinking",
"Esc to interrupt",
"Ctrl+C to stop",
"Running",
"Analyzing",
];
BUSY_MARKERS.iter().any(|marker| pane.contains(marker))
}
#[derive(Debug, Clone)]
struct SessionRow {
name: String,
created_at: Option<DateTime<Utc>>,
}
fn list_clone_sessions() -> netsky_core::Result<Vec<SessionRow>> {
let output = Command::new("tmux")
.args(["list-sessions", "-F", "#{session_name}\t#{session_created}"])
.output()?;
if !output.status.success() {
netsky_core::bail!("tmux list-sessions failed");
}
let stdout = String::from_utf8_lossy(&output.stdout);
let mut rows = parse_session_rows(&stdout);
rows.retain(|row| is_clone_agent_name(&row.name));
rows.sort_by(|a, b| a.name.cmp(&b.name));
Ok(rows)
}
fn parse_session_rows(stdout: &str) -> Vec<SessionRow> {
stdout
.lines()
.filter_map(|line| {
let (name, created) = line.split_once('\t')?;
Some(SessionRow {
name: name.to_string(),
created_at: created.parse::<u64>().ok().and_then(epoch_to_utc),
})
})
.collect()
}
fn latest_dispatch(db_path: &Path, agent: &str) -> netsky_core::Result<Option<DispatchRow>> {
with_db_conn(db_path, |conn| {
conn.query_row(
"SELECT \
json_extract(row_json, '$.ts_utc_start'), \
json_extract(row_json, '$.runtime'), \
json_extract(row_json, '$.brief_path'), \
json_extract(row_json, '$.workspace'), \
json_extract(row_json, '$.status'), \
json_extract(row_json, '$.detail_json') \
FROM clone_dispatches \
WHERE json_extract(row_json, '$.agent_id') = ?1 \
ORDER BY id DESC \
LIMIT 1",
params![agent],
|row| {
Ok(DispatchRow {
ts_utc_start: parse_utc(&row.get::<_, String>(0)?)?,
runtime: row.get(1)?,
brief_path: row.get(2)?,
workspace: row.get(3)?,
status: row.get(4)?,
detail_json: row.get(5)?,
})
},
)
.optional()
})
}
fn latest_message(db_path: &Path, agent: &str) -> netsky_core::Result<Option<MessageRow>> {
with_db_conn(db_path, |conn| {
conn.query_row(
"SELECT \
json_extract(row_json, '$.ts_utc'), \
json_extract(row_json, '$.tool'), \
json_extract(row_json, '$.direction'), \
json_extract(row_json, '$.chat_id'), \
json_extract(row_json, '$.message_id'), \
json_extract(row_json, '$.status') \
FROM communication_events \
WHERE json_extract(row_json, '$.agent') = ?1 \
OR json_extract(row_json, '$.chat_id') = ?1 \
ORDER BY id DESC \
LIMIT 1",
params![agent],
|row| {
Ok(MessageRow {
ts_utc: parse_utc(&row.get::<_, String>(0)?)?,
tool: row.get(1)?,
direction: row.get(2)?,
chat_id: row.get(3)?,
message_id: row.get(4)?,
status: row.get(5)?,
})
},
)
.optional()
})
}
fn with_db_conn<T, F>(db_path: &Path, f: F) -> netsky_core::Result<Option<T>>
where
F: FnOnce(&Connection) -> rusqlite::Result<Option<T>>,
{
let conn = match Connection::open(db_path) {
Ok(conn) => conn,
Err(err) if err.sqlite_error_code().is_none() => return Ok(None),
Err(err) => return Err(netsky_core::Error::msg(err.to_string())),
};
f(&conn).map_err(|err| netsky_core::Error::msg(err.to_string()))
}
fn record_clone_dispatch_at(
db_path: &Path,
record: CloneDispatchRecord<'_>,
) -> netsky_core::Result<()> {
if let Some(parent) = db_path.parent() {
fs::create_dir_all(parent)?;
}
let db = super::db_diag::with_lock_retry(|| Db::open_path(db_path))
.map_err(super::db_diag::wrap_open_retry_error)?;
super::db_diag::with_lock_retry(|| db.migrate())
.map_err(super::db_diag::wrap_open_retry_error)?;
super::db_diag::with_lock_retry(|| {
db.record_clone_dispatch(CloneDispatchRecord {
ts_utc_start: record.ts_utc_start,
ts_utc_end: record.ts_utc_end,
agent_id: record.agent_id,
runtime: record.runtime,
brief_path: record.brief_path,
brief: record.brief,
workspace: record.workspace,
branch: record.branch,
status: record.status,
exit_code: record.exit_code,
detail_json: record.detail_json,
})
})
.map_err(super::db_diag::wrap_retry_error)?;
Ok(())
}
fn allocate_agent_number(sessions: &[String]) -> u32 {
let mut used: Vec<u32> = sessions
.iter()
.filter_map(|name| name.strip_prefix("agent"))
.filter_map(|suffix| suffix.parse::<u32>().ok())
.filter(|value| *value > 0)
.collect();
used.sort_unstable();
let mut next = 1u32;
for value in used {
if value == next {
next += 1;
}
}
next
}
fn agent_name(n: u32) -> String {
AgentId::Clone(n).name()
}
fn runtime_kind(agent_type: AgentType) -> RuntimeKind {
match agent_type {
AgentType::Claude => RuntimeKind::Claude,
AgentType::Codex => RuntimeKind::Codex,
}
}
fn runtime_label(agent_type: AgentType) -> &'static str {
match agent_type {
AgentType::Claude => "claude",
AgentType::Codex => "codex",
}
}
fn parse_utc(raw: &str) -> rusqlite::Result<DateTime<Utc>> {
DateTime::parse_from_rfc3339(raw)
.map(|ts| ts.with_timezone(&Utc))
.map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(err))
})
}
fn epoch_to_utc(epoch: u64) -> Option<DateTime<Utc>> {
DateTime::<Utc>::from_timestamp(epoch as i64, 0)
}
fn format_duration(duration: Duration) -> String {
let secs = duration.as_secs();
if secs < 90 {
format!("{secs}s")
} else if secs < 5400 {
format!("{}m", secs / 60)
} else if secs < 86_400 {
format!("{}h", secs / 3600)
} else {
format!("{}d", secs / 86_400)
}
}
fn sha256_hex(input: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(input.as_bytes());
format!("{:x}", hasher.finalize())
}
fn is_clone_agent_name(name: &str) -> bool {
name.strip_prefix("agent")
.is_some_and(|suffix| !suffix.is_empty() && suffix.chars().all(|ch| ch.is_ascii_digit()))
}
#[cfg(test)]
mod tests {
use super::*;
use std::process::{Command, Stdio};
use std::sync::mpsc;
use std::time::SystemTime;
use netsky_core::envelope::write_envelope;
use netsky_sh::tmux::{self, TEST_SESSION_PREFIX};
use serial_test::serial;
use tempfile::tempdir;
fn test_paths(root: &Path) -> ClonePaths {
ClonePaths {
channel_root: root.join(".claude").join("channels").join("agent"),
meta_db_path: root.join(".netsky").join("meta.db"),
state_dir: root.join(".netsky").join("state"),
}
}
fn best_effort_kill_session(name: &str) {
for _ in 0..25 {
let _ = Command::new("tmux")
.args(["kill-session", "-t", name])
.stdout(Stdio::null())
.stderr(Stdio::null())
.status();
if !tmux::has_session(name) {
return;
}
thread::sleep(Duration::from_millis(20));
}
}
fn wait_for_session(name: &str, present: bool) -> bool {
for _ in 0..25 {
if tmux::has_session(name) == present {
return true;
}
thread::sleep(Duration::from_millis(20));
}
false
}
struct TmuxSessionGuard {
name: String,
}
impl TmuxSessionGuard {
fn new(name: impl Into<String>) -> Self {
let name = name.into();
best_effort_kill_session(&name);
Self { name }
}
}
impl Drop for TmuxSessionGuard {
fn drop(&mut self) {
best_effort_kill_session(&self.name);
}
}
#[test]
fn parse_session_rows_keeps_created_epoch() {
let rows = parse_session_rows("agent7\t1713500000\nagent8\t1713500300\n");
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].name, "agent7");
assert!(rows[0].created_at.is_some());
}
#[test]
fn status_shape_synthesizes_counts_and_message() -> netsky_core::Result<()> {
let dir = tempdir()?;
let paths = test_paths(dir.path());
fs::create_dir_all(paths.channel_root.join("agent7").join("inbox"))?;
fs::create_dir_all(paths.channel_root.join("agent7").join("claimed"))?;
fs::create_dir_all(paths.channel_root.join("agent7").join("delivered"))?;
fs::create_dir_all(paths.channel_root.join("agent7").join("poison"))?;
fs::write(
paths
.channel_root
.join("agent7")
.join("inbox")
.join("a.json"),
"{}",
)?;
fs::write(
paths
.channel_root
.join("agent7")
.join("delivered")
.join("b.json"),
"{}",
)?;
record_clone_dispatch_at(
&paths.meta_db_path,
CloneDispatchRecord {
ts_utc_start: Utc::now(),
ts_utc_end: None,
agent_id: "agent7",
runtime: Some("codex"),
brief_path: Some("/tmp/brief.md"),
brief: None,
workspace: Some("demo"),
branch: None,
status: Some("brief-sent"),
exit_code: None,
detail_json: Some("{\"dispatch_thread\":\"clone:x\"}"),
},
)?;
let db = Db::open_path(&paths.meta_db_path)
.map_err(|err| netsky_core::Error::msg(err.to_string()))?;
db.record_communication_event(netsky_db::CommunicationEventRecord {
ts_utc: Utc::now(),
source: "agent",
tool: Some("channel send"),
direction: netsky_db::Direction::Outbound,
chat_id: Some("agent7"),
message_id: Some("msg-1"),
handle: None,
agent: Some("agent0"),
body: Some("brief"),
status: Some("sent"),
detail_json: None,
})
.map_err(|err| netsky_core::Error::msg(err.to_string()))?;
let status = clone_status(&paths, "agent7")?;
assert_eq!(status.runtime.as_deref(), Some("codex"));
assert_eq!(status.dispatch_status.as_deref(), Some("brief-sent"));
assert_eq!(status.inbox_count, 1);
assert_eq!(status.delivered_count, 1);
assert_eq!(
status
.last_message
.as_ref()
.and_then(|row| row.message_id.as_deref()),
Some("msg-1")
);
Ok(())
}
#[test]
fn wait_times_out_without_reply() -> netsky_core::Result<()> {
let dir = tempdir()?;
let paths = test_paths(dir.path());
record_clone_dispatch_at(
&paths.meta_db_path,
CloneDispatchRecord {
ts_utc_start: Utc::now(),
ts_utc_end: None,
agent_id: "agent7",
runtime: Some("codex"),
brief_path: Some("/tmp/brief.md"),
brief: None,
workspace: Some("demo"),
branch: None,
status: Some("brief-sent"),
exit_code: None,
detail_json: Some("{\"dispatch_thread\":\"clone:x\"}"),
},
)?;
let err = wait_for_reply(
&paths,
"agent7",
"agent0",
Duration::from_millis(50),
10,
|_| true,
)
.unwrap_err();
assert!(format!("{err}").contains("status=timeout"));
Ok(())
}
#[test]
fn codex_brief_target_ready_requires_prompt_and_no_busy_marker() {
assert!(codex_brief_target_ready("gpt-5.4\n›\nReady for the brief."));
assert!(codex_brief_target_ready("/up\nReady for the brief."));
assert!(!codex_brief_target_ready("Working...\ngpt-5.4\n›"));
assert!(!codex_brief_target_ready("Thinking...\n›"));
assert!(!codex_brief_target_ready("Ready for the brief."));
}
fn unique_test_session_name(base: &str) -> String {
let nanos = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0);
format!("{TEST_SESSION_PREFIX}{base}-{nanos}")
}
fn run_roundtrip_with_mock_clone(test_agent: String) -> netsky_core::Result<()> {
let dir = tempdir()?;
let paths = test_paths(dir.path());
let brief_path = dir.path().join("brief.md");
fs::write(&brief_path, "brief body")?;
let reply_target = "agent0".to_string();
let _test_agent_guard = TmuxSessionGuard::new(test_agent.clone());
let (tx, rx) = mpsc::channel::<()>();
let session_name = test_agent.clone();
let reply_target_for_thread = reply_target.clone();
let root = dir.path().to_path_buf();
let handle = thread::spawn(move || -> netsky_core::Result<()> {
rx.recv()
.map_err(|err| netsky_core::Error::msg(err.to_string()))?;
let inbox = root
.join(".claude")
.join("channels")
.join("agent")
.join(&reply_target_for_thread)
.join("inbox");
fs::create_dir_all(&inbox)?;
let mut env = Envelope::new(
session_name.clone(),
"done",
DateTime::<Utc>::from(SystemTime::now()).to_rfc3339(),
);
env.to = Some(reply_target_for_thread.clone());
env.thread = Some("clone:mock".to_string());
write_envelope(&inbox, &env)?;
Ok(())
});
let result = run_brief_with(
&paths,
BriefRequest {
brief_path,
workspace: "demo".to_string(),
requested_agent: Some(7),
agent_type: AgentType::Codex,
from_agent: reply_target.clone(),
reply_target: reply_target.clone(),
},
|_| Ok(()),
|_, _| {
tmux::new_test_session_detached(&test_agent, "sleep 5", None, &[])?;
Ok(())
},
)?;
assert_eq!(result.agent, "agent7");
let db = Db::open_path(&paths.meta_db_path)
.map_err(|err| netsky_core::Error::msg(err.to_string()))?;
db.record_clone_dispatch(CloneDispatchRecord {
ts_utc_start: Utc::now(),
ts_utc_end: None,
agent_id: &test_agent,
runtime: Some("codex"),
brief_path: Some("/tmp/brief.md"),
brief: None,
workspace: Some("demo"),
branch: None,
status: Some("brief-sent"),
exit_code: None,
detail_json: Some("{\"dispatch_thread\":\"clone:mock\"}"),
})
.map_err(|err| netsky_core::Error::msg(err.to_string()))?;
tx.send(())
.map_err(|err| netsky_core::Error::msg(err.to_string()))?;
let wait = wait_for_reply(
&paths,
&test_agent,
&reply_target,
Duration::from_secs(2),
25,
|_| true,
)?;
assert_eq!(wait.status, "done");
handle
.join()
.map_err(|_| netsky_core::Error::msg("reply thread panicked"))??;
Ok(())
}
#[test]
#[serial(tmux)]
fn brief_dispatch_and_wait_roundtrip_with_mock_clone() -> netsky_core::Result<()> {
run_roundtrip_with_mock_clone(unique_test_session_name("agent7"))
}
#[test]
#[serial(tmux)]
fn brief_dispatch_roundtrip_cleans_up_leaked_test_session() -> netsky_core::Result<()> {
let leaked = format!("{TEST_SESSION_PREFIX}agent7");
best_effort_kill_session(&leaked);
let status = Command::new("tmux")
.args(["new-session", "-d", "-s", &leaked, "sleep 60"])
.status()?;
if !status.success() {
return Err(netsky_core::Error::msg(format!(
"failed to seed leaked tmux session '{leaked}'"
)));
}
if !wait_for_session(&leaked, true) {
return Err(netsky_core::Error::msg(format!(
"timed out waiting for leaked tmux session '{leaked}'"
)));
}
run_roundtrip_with_mock_clone(leaked.clone())?;
assert!(!tmux::has_session(&leaked));
Ok(())
}
}