use std::fs;
use std::path::Path;
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{DateTime, Duration, Utc};
use netsky_core::consts::ESCALATE_FAILED_MARKER_PREFIX;
use netsky_core::paths::{home, logs_dir, state_dir};
use netsky_db::Db;
use serde_json::Value;
const DEFAULT_LIMIT: usize = 50;
const DETAIL_LIMIT: usize = 160;
#[derive(Debug, Clone)]
pub struct Event {
ts_utc: DateTime<Utc>,
kind: String,
agent: Option<String>,
detail: Value,
source: String,
}
#[derive(Debug, Clone, Default)]
pub struct EventFilter {
since: Option<DateTime<Utc>>,
until: Option<DateTime<Utc>>,
agent: Option<String>,
kind: Option<String>,
limit: Option<usize>,
}
pub fn run(
since: Option<&str>,
until: Option<&str>,
agent: Option<&str>,
kind: Option<&str>,
limit: Option<usize>,
json: bool,
) -> netsky_core::Result<()> {
let filter = EventFilter {
since: match since {
Some(s) => Some(Utc::now() - parse_duration(s)?),
None => None,
},
until: match until {
Some(s) => Some(parse_ts(s)?),
None => None,
},
agent: agent.map(str::to_string),
kind: kind.map(str::to_string),
limit,
};
let db_path = home().join(".netsky").join("meta.db");
let events = collect_events(&logs_dir(), &state_dir(), &db_path, &filter)?;
if json {
println!(
"{}",
serde_json::to_string_pretty(&envelope(&events, &filter))?
);
return Ok(());
}
if events.is_empty() {
println!("no events");
return Ok(());
}
for event in events {
println!(
"{} | {} | {} | {}",
event.ts_utc.format("%Y-%m-%dT%H:%M:%SZ"),
event.kind,
event.agent.as_deref().unwrap_or("-"),
truncate(&compact_detail(&event.detail)?, DETAIL_LIMIT)
);
}
Ok(())
}
pub fn collect_events(
logs_dir: &Path,
state_dir: &Path,
db_path: &Path,
filter: &EventFilter,
) -> netsky_core::Result<Vec<Event>> {
let mut events = Vec::new();
events.extend(read_jsonl_family(
logs_dir,
"watchdog-events-",
".jsonl",
parse_watchdog,
)?);
events.extend(read_jsonl_family(
logs_dir,
"channel-acks-",
".jsonl",
parse_channel_ack,
)?);
events.extend(read_restart_status(state_dir)?);
events.extend(read_escalate_failed(state_dir)?);
events.extend(read_meta_db(db_path)?);
events.retain(|event| filter.matches(event));
events.sort_by_key(|event| event.ts_utc);
let limit = filter.limit.unwrap_or(DEFAULT_LIMIT);
if events.len() > limit {
events = events.split_off(events.len() - limit);
}
Ok(events)
}
impl Event {
fn to_json(&self) -> Value {
serde_json::json!({
"ts_utc": self.ts_utc.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
"kind": self.kind,
"agent": self.agent,
"detail": self.detail,
"source": self.source,
})
}
}
fn envelope(events: &[Event], filter: &EventFilter) -> Value {
let items = events.iter().map(Event::to_json).collect::<Vec<_>>();
serde_json::json!({
"command": "events",
"status": "green",
"summary": format!("{} event(s)", items.len()),
"generated_at": Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string(),
"data": {
"filter": {
"since": filter.since.map(|t| t.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
"until": filter.until.map(|t| t.format("%Y-%m-%dT%H:%M:%SZ").to_string()),
"agent": filter.agent,
"kind": filter.kind,
"limit": filter.limit,
},
"count": items.len(),
"events": items,
},
})
}
impl EventFilter {
fn matches(&self, event: &Event) -> bool {
self.since.is_none_or(|since| event.ts_utc >= since)
&& self.until.is_none_or(|until| event.ts_utc <= until)
&& self
.agent
.as_deref()
.is_none_or(|agent| event.agent.as_deref() == Some(agent))
&& self.kind.as_deref().is_none_or(|kind| event.kind == kind)
}
}
fn read_jsonl_family(
dir: &Path,
prefix: &str,
suffix: &str,
parse: fn(Value) -> netsky_core::Result<Event>,
) -> netsky_core::Result<Vec<Event>> {
let mut out = Vec::new();
let Ok(entries) = fs::read_dir(dir) else {
return Ok(out);
};
let mut paths = Vec::new();
for entry in entries.filter_map(|entry| entry.ok()) {
let path = entry.path();
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
if name.starts_with(prefix) && name.ends_with(suffix) {
paths.push(path);
}
}
paths.sort();
for path in paths {
for (idx, value) in netsky_core::jsonl::read_records(&path)?.enumerate() {
let value = value.map_err(|err| {
netsky_core::Error::Invalid(format!(
"{}:{}: {err}",
path.display(),
idx.saturating_add(1)
))
})?;
out.push(parse(value)?);
}
}
Ok(out)
}
fn parse_watchdog(raw: Value) -> netsky_core::Result<Event> {
let ts_utc = parse_ts(required_str(&raw, "ts")?)?;
let kind = required_str(&raw, "kind")?.to_string();
let detail = raw.get("detail").cloned().unwrap_or(Value::Null);
let agent = detail
.get("agent")
.and_then(Value::as_str)
.map(str::to_string);
Ok(Event {
ts_utc,
kind,
agent,
detail,
source: "watchdog-jsonl".to_string(),
})
}
fn parse_channel_ack(raw: Value) -> netsky_core::Result<Event> {
let ts_utc = parse_ts(required_str(&raw, "ts_utc")?)?;
let agent = raw.get("agent").and_then(Value::as_str).map(str::to_string);
Ok(Event {
ts_utc,
kind: "channel-ack".to_string(),
agent,
detail: raw,
source: "channel-ack-jsonl".to_string(),
})
}
fn read_restart_status(state_dir: &Path) -> netsky_core::Result<Vec<Event>> {
let dir = state_dir.join("restart-status");
let Ok(entries) = fs::read_dir(dir) else {
return Ok(Vec::new());
};
let mut out = Vec::new();
for entry in entries.filter_map(|entry| entry.ok()) {
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
let raw: Value = serde_json::from_str(&fs::read_to_string(&path)?)?;
let ts_utc = raw
.get("updated_at")
.or_else(|| raw.get("started_at"))
.and_then(Value::as_str)
.map(parse_ts)
.transpose()?
.unwrap_or(system_time_to_utc(
entry.metadata()?.modified().unwrap_or(UNIX_EPOCH),
));
out.push(Event {
ts_utc,
kind: "restart".to_string(),
agent: Some("agent0".to_string()),
detail: raw,
source: "restart-status".to_string(),
});
}
Ok(out)
}
fn read_escalate_failed(state_dir: &Path) -> netsky_core::Result<Vec<Event>> {
let Ok(entries) = fs::read_dir(state_dir) else {
return Ok(Vec::new());
};
let mut out = Vec::new();
for entry in entries.filter_map(|entry| entry.ok()) {
let path = entry.path();
let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
continue;
};
if !name.starts_with(ESCALATE_FAILED_MARKER_PREFIX) {
continue;
}
let ts_utc = name
.strip_prefix(ESCALATE_FAILED_MARKER_PREFIX)
.and_then(parse_compact_ts)
.unwrap_or(system_time_to_utc(
entry.metadata()?.modified().unwrap_or(UNIX_EPOCH),
));
let body = fs::read_to_string(&path).unwrap_or_default();
out.push(Event {
ts_utc,
kind: "escalate-failed".to_string(),
agent: Some("agent0".to_string()),
detail: serde_json::json!({
"marker": name,
"body": truncate(&body, DETAIL_LIMIT),
}),
source: "state-marker".to_string(),
});
}
Ok(out)
}
fn read_meta_db(path: &Path) -> netsky_core::Result<Vec<Event>> {
if !path.exists() {
return Ok(Vec::new());
}
let Ok(db) = super::db_diag::with_lock_retry(|| Db::open_path(path)) else {
return Ok(Vec::new());
};
let Ok(rows) = super::db_diag::with_lock_retry(|| db.list_communication_events()) else {
return Ok(Vec::new());
};
Ok(rows
.into_iter()
.map(|row| Event {
ts_utc: row.ts_utc,
kind: "communication".to_string(),
agent: row.agent,
detail: serde_json::json!({
"id": row.id,
"source": row.source,
"tool": row.tool,
"direction": row.direction,
"message_id": row.message_id,
"status": row.status,
"body_preview": row.body.map(|body| truncate(&body, 80)),
"detail_json": row.detail_json.and_then(|json| serde_json::from_str::<Value>(&json).ok()),
}),
source: "meta-db".to_string(),
})
.collect())
}
fn required_str<'a>(value: &'a Value, key: &str) -> netsky_core::Result<&'a str> {
value
.get(key)
.and_then(Value::as_str)
.ok_or_else(|| netsky_core::Error::Invalid(format!("missing {key}")))
}
fn parse_ts(s: &str) -> netsky_core::Result<DateTime<Utc>> {
DateTime::parse_from_rfc3339(s)
.map(|ts| ts.with_timezone(&Utc))
.map_err(|err| netsky_core::Error::Invalid(format!("bad timestamp {s}: {err}")))
}
fn parse_compact_ts(s: &str) -> Option<DateTime<Utc>> {
chrono::NaiveDateTime::parse_from_str(s, "%Y%m%dT%H%M%SZ")
.ok()
.map(|ts| ts.and_utc())
}
fn parse_duration(s: &str) -> netsky_core::Result<Duration> {
let s = s.trim();
if s.is_empty() {
return Err(netsky_core::Error::Invalid(
"empty duration — expected `<N><unit>` (e.g. 10m, 2h, 1d)".to_string(),
));
}
let (digits, unit) = s.split_at(s.find(|ch: char| !ch.is_ascii_digit()).unwrap_or(s.len()));
if digits.is_empty() {
return Err(netsky_core::Error::Invalid(format!(
"bad duration '{s}' — expected `<N><unit>` with N an integer (e.g. 10m, 2h, 1d)"
)));
}
let n: i64 = digits.parse()?;
match unit {
"" | "s" | "sec" | "secs" => Ok(Duration::seconds(n)),
"m" | "min" | "mins" => Ok(Duration::minutes(n)),
"h" | "hr" | "hrs" => Ok(Duration::hours(n)),
"d" | "day" | "days" => Ok(Duration::days(n)),
_ => Err(netsky_core::Error::Invalid(format!(
"bad duration unit '{unit}' — expected one of m|min|mins, h|hr|hrs, d|day|days"
))),
}
}
fn system_time_to_utc(ts: SystemTime) -> DateTime<Utc> {
DateTime::<Utc>::from(ts)
}
fn compact_detail(value: &Value) -> netsky_core::Result<String> {
Ok(serde_json::to_string(value)?)
}
fn truncate(value: &str, max_chars: usize) -> String {
let cleaned: String = value
.chars()
.map(|ch| if ch.is_control() { ' ' } else { ch })
.collect();
if cleaned.chars().count() <= max_chars {
return cleaned;
}
let mut out = cleaned
.chars()
.take(max_chars.saturating_sub(3))
.collect::<String>();
out.push_str("...");
out
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use netsky_db::{CommunicationEventRecord, Direction};
use tempfile::tempdir;
#[test]
fn duration_accepts_watchdog_units() {
assert_eq!(parse_duration("10m").unwrap(), Duration::minutes(10));
assert_eq!(parse_duration("2h").unwrap(), Duration::hours(2));
assert_eq!(parse_duration("1d").unwrap(), Duration::days(1));
}
#[test]
fn collect_events_merges_sources_chronologically() {
let dir = tempdir().unwrap();
let logs = dir.path().join("logs");
let state = dir.path().join("state");
let restart = state.join("restart-status");
fs::create_dir_all(&logs).unwrap();
fs::create_dir_all(&restart).unwrap();
fs::write(
logs.join("watchdog-events-2026-04-17.jsonl"),
r#"{"ts":"2026-04-17T00:03:00Z","kind":"pane_transition","detail":{"agent":"agent5","transition":"progressed"}}"#,
)
.unwrap();
fs::write(
logs.join("channel-acks-2026-04-17.jsonl"),
r#"{"ts_utc":"2026-04-17T00:02:00Z","message_id":"m1","status":"started","note":null,"agent":"agent5"}"#,
)
.unwrap();
fs::write(
restart.join("r.json"),
r#"{"started_at":"2026-04-17T00:04:00Z","updated_at":"2026-04-17T00:04:30Z","phase":"errored","error":"boom"}"#,
)
.unwrap();
fs::write(
state.join("escalate-failed-20260417T000500Z"),
"escalate FAILED twice",
)
.unwrap();
let db_path = dir.path().join("meta.db");
let db = Db::open_path(&db_path).unwrap();
db.migrate().unwrap();
db.record_communication_event(CommunicationEventRecord {
ts_utc: Utc.with_ymd_and_hms(2026, 4, 17, 0, 1, 0).unwrap(),
source: "agent",
tool: Some("reply"),
direction: Direction::Inbound,
chat_id: None,
message_id: Some("m0"),
handle: None,
agent: Some("agent5"),
body: Some("hello"),
status: Some("received"),
detail_json: None,
})
.unwrap();
let filter = EventFilter::default();
let events = collect_events(&logs, &state, &db_path, &filter).unwrap();
assert_eq!(
events
.iter()
.map(|event| event.kind.as_str())
.collect::<Vec<_>>(),
vec![
"communication",
"channel-ack",
"pane_transition",
"restart",
"escalate-failed",
]
);
}
#[test]
fn filters_agent_kind_and_handles_empty_sources() {
let dir = tempdir().unwrap();
let filter = EventFilter {
agent: Some("agent5".to_string()),
kind: Some("channel-ack".to_string()),
..EventFilter::default()
};
let events = collect_events(
&dir.path().join("logs"),
&dir.path().join("state"),
&dir.path().join("meta.db"),
&filter,
)
.unwrap();
assert!(events.is_empty());
}
#[test]
fn collect_events_skips_malformed_trailing_jsonl_record() {
let dir = tempdir().unwrap();
let logs = dir.path().join("logs");
let state = dir.path().join("state");
fs::create_dir_all(&logs).unwrap();
fs::create_dir_all(&state).unwrap();
fs::write(
logs.join("channel-acks-2026-04-17.jsonl"),
concat!(
r#"{"ts_utc":"2026-04-17T00:02:00Z","message_id":"m1","status":"started","note":null,"agent":"agent5"}"#,
"\n",
r#"{"ts_utc":"2026-04-17T00:03:00Z","message_id":"m2""#
),
)
.unwrap();
let events = collect_events(
&logs,
&state,
&dir.path().join("missing-meta.db"),
&EventFilter::default(),
)
.unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].kind, "channel-ack");
assert_eq!(events[0].agent.as_deref(), Some("agent5"));
}
}