use chrono::Utc;
use netsky_core::consts::{AGENT0_NAME, AGENTINFINITY_NAME, ENV_AGENT_N, LOOP_FROM, LOOP_KIND};
use netsky_core::envelope::valid_agent_id;
use netsky_core::loops::{self, LoopEntry, LoopMode};
use netsky_core::paths::{loop_file_path, loops_dir};
use serde::Serialize;
use serde_json::{Value, json};
use crate::cli::LoopCommand;
use crate::cmd::channel::{self, SendEnvelopeOptions};
pub fn run(sub: LoopCommand) -> netsky_core::Result<()> {
match sub {
LoopCommand::Create {
interval_or_prompt,
prompt,
json,
} => create(&interval_or_prompt, prompt.as_deref(), json),
LoopCommand::List { json } => list(json),
LoopCommand::Delete { id, json } => delete(&id, json),
LoopCommand::Tick { json } => tick(json),
LoopCommand::Reschedule {
id,
delay_secs,
json,
} => reschedule(&id, delay_secs, json),
}
}
fn envelope(summary: &str, data: Value) -> Value {
json!({
"command": "loop",
"status": "green",
"summary": summary,
"generated_at": Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
"data": data,
})
}
fn emit(env: &Value) -> netsky_core::Result<()> {
println!("{}", serde_json::to_string_pretty(env)?);
Ok(())
}
fn create(
interval_or_prompt: &str,
prompt: Option<&str>,
json_out: bool,
) -> netsky_core::Result<()> {
let now = Utc::now();
let id = loops::next_id()?;
let agent = default_target();
let entry = match prompt {
Some(prompt) => LoopEntry::new_fixed(
id.clone(),
agent.clone(),
prompt.to_string(),
parse_interval_secs(interval_or_prompt)?,
now,
)?,
None => LoopEntry::new_dynamic(
id.clone(),
agent.clone(),
interval_or_prompt.to_string(),
now,
)?,
};
let path = loops::save(&entry)?;
if json_out {
return emit(&envelope(
&format!("created {}", entry.id),
json!({
"entry": entry,
"path": path.display().to_string(),
}),
));
}
println!(
"[netsky loop create] {} mode={} agent={} next={} ({})",
entry.id,
mode_name(entry.mode),
entry.agent,
entry.next_fire_utc,
path.display(),
);
Ok(())
}
fn list(json_out: bool) -> netsky_core::Result<()> {
let mut entries = loops::load_all()?;
entries.sort_by(|a, b| a.next_fire_utc.cmp(&b.next_fire_utc).then(a.id.cmp(&b.id)));
let rows: Vec<_> = entries.iter().map(ListRow::from).collect();
if json_out {
return emit(&envelope(
&format!(
"{} loop{}",
rows.len(),
if rows.len() == 1 { "" } else { "s" }
),
json!({
"entries": rows,
"count": rows.len(),
}),
));
}
if rows.is_empty() {
println!("(no loops)");
return Ok(());
}
for row in rows {
let last = row.last_fire_utc.as_deref().unwrap_or("-");
let interval = row
.interval_secs
.map(|secs| secs.to_string())
.unwrap_or_else(|| "-".to_string());
println!(
"{}\tmode={}\tagent={}\tinterval_secs={}\tnext={}\tlast={}\tprompt={}",
row.id, row.mode, row.agent, interval, row.next_fire_utc, last, row.prompt
);
}
Ok(())
}
fn delete(id: &str, json_out: bool) -> netsky_core::Result<()> {
if !loops::delete(id)? {
netsky_core::bail!("loop entry {id:?} not found");
}
if json_out {
return emit(&envelope(
&format!("deleted {id}"),
json!({ "id": id, "removed": 1 }),
));
}
println!("[netsky loop delete] {id}");
Ok(())
}
fn tick(json_out: bool) -> netsky_core::Result<()> {
tick_at(&loops_dir(), &channel::channel_root(), Utc::now(), json_out)
}
fn tick_at(
loops_root: &std::path::Path,
channel_root: &std::path::Path,
now: chrono::DateTime<Utc>,
json_out: bool,
) -> netsky_core::Result<()> {
let mut dispatched = 0usize;
let mut entries = loops::load_all_from(loops_root)?;
for entry in &mut entries {
if !entry.is_due(now)? {
continue;
}
channel::send_envelope_at(
channel_root,
&entry.agent,
&entry.prompt,
SendEnvelopeOptions {
from_override: Some(LOOP_FROM),
kind: Some(LOOP_KIND),
thread: Some(&entry.id),
in_reply_to: None,
requires_ack: None,
},
)?;
entry.mark_fired(now)?;
loops::save_to(&loop_file_path(&entry.id), entry)?;
dispatched += 1;
}
if json_out {
return emit(&envelope(
&format!(
"dispatched {dispatched} due loop{}",
if dispatched == 1 { "" } else { "s" }
),
json!({ "dispatched": dispatched }),
));
}
println!(
"[netsky loop tick] dispatched {dispatched} due loop{}",
if dispatched == 1 { "" } else { "s" }
);
Ok(())
}
fn reschedule(id: &str, delay_secs: u64, json_out: bool) -> netsky_core::Result<()> {
let now = Utc::now();
let mut entry = loops::load(id)?;
if entry.mode != LoopMode::Dynamic {
netsky_core::bail!("loop entry {id} is fixed; only dynamic loops can be rescheduled");
}
entry.reschedule(now, delay_secs)?;
let path = loops::save(&entry)?;
if json_out {
return emit(&envelope(
&format!("rescheduled {id}"),
json!({
"entry": entry,
"path": path.display().to_string(),
}),
));
}
println!(
"[netsky loop reschedule] {id} next={} ({})",
entry.next_fire_utc,
path.display(),
);
Ok(())
}
fn default_target() -> String {
let agent = std::env::var(ENV_AGENT_N)
.ok()
.filter(|value| !value.trim().is_empty())
.map(|value| {
if value == "infinity" {
AGENTINFINITY_NAME.to_string()
} else {
format!("agent{value}")
}
})
.unwrap_or_else(|| AGENT0_NAME.to_string());
if valid_agent_id(&agent) {
agent
} else {
AGENT0_NAME.to_string()
}
}
fn parse_interval_secs(raw: &str) -> netsky_core::Result<u64> {
let spec = raw.trim().strip_prefix("every ").unwrap_or(raw.trim());
if spec.is_empty() {
netsky_core::bail!("interval must not be empty");
}
let cut = spec
.find(|ch: char| !ch.is_ascii_digit())
.ok_or_else(|| netsky_core::anyhow!("interval {raw:?} missing unit suffix"))?;
let (digits, unit) = spec.split_at(cut);
let n: u64 = digits
.parse()
.map_err(|e| netsky_core::anyhow!("invalid interval {raw:?}: {e}"))?;
if n == 0 {
netsky_core::bail!("interval must be > 0");
}
let secs = match unit.trim() {
"s" => n,
"m" => n * 60,
"h" => n * 60 * 60,
"d" => n * 60 * 60 * 24,
_ => netsky_core::bail!("unsupported interval unit in {raw:?}; use s, m, h, or d"),
};
Ok(secs)
}
fn mode_name(mode: LoopMode) -> &'static str {
match mode {
LoopMode::Fixed => "fixed",
LoopMode::Dynamic => "dynamic",
}
}
#[derive(Debug, Serialize)]
struct ListRow {
id: String,
agent: String,
mode: &'static str,
interval_secs: Option<u64>,
prompt: String,
next_fire_utc: String,
last_fire_utc: Option<String>,
}
impl From<&LoopEntry> for ListRow {
fn from(entry: &LoopEntry) -> Self {
Self {
id: entry.id.clone(),
agent: entry.agent.clone(),
mode: mode_name(entry.mode),
interval_secs: entry.interval_secs,
prompt: entry.prompt.clone(),
next_fire_utc: entry.next_fire_utc.clone(),
last_fire_utc: entry.last_fire_utc.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use std::fs;
use tempfile::tempdir;
#[test]
fn interval_parser_accepts_skill_forms() {
assert_eq!(parse_interval_secs("5m").unwrap(), 300);
assert_eq!(parse_interval_secs("2h").unwrap(), 7200);
assert_eq!(parse_interval_secs("every 20m").unwrap(), 1200);
}
#[test]
fn tick_dispatches_fixed_loop_and_advances_next_fire() {
let dir = tempdir().unwrap();
unsafe {
std::env::set_var("HOME", dir.path());
}
let now = Utc.with_ymd_and_hms(2026, 4, 19, 16, 24, 0).unwrap();
let mut entry = LoopEntry::new_fixed(
"loop-00000001".to_string(),
"agent0".to_string(),
"/notes".to_string(),
60,
now,
)
.unwrap();
entry.next_fire_utc = "2026-04-19T16:23:00Z".to_string();
loops::save_to(&loop_file_path(&entry.id), &entry).unwrap();
tick_at(
&loops_dir(),
&dir.path().join(".claude/channels/agent"),
now,
false,
)
.unwrap();
let inbox = dir.path().join(".claude/channels/agent/agent0/inbox");
let files: Vec<_> = fs::read_dir(&inbox).unwrap().flatten().collect();
assert_eq!(files.len(), 1);
let raw = fs::read_to_string(files[0].path()).unwrap();
let env: netsky_core::envelope::Envelope = serde_json::from_str(&raw).unwrap();
assert_eq!(env.kind.as_deref(), Some(LOOP_KIND));
assert_eq!(env.thread.as_deref(), Some("loop-00000001"));
let updated = loops::load("loop-00000001").unwrap();
assert_eq!(
updated.last_fire_utc.as_deref(),
Some("2026-04-19T16:24:00+00:00")
);
assert_eq!(updated.next_fire_utc, "2026-04-19T16:25:00+00:00");
}
}