use std::cell::RefCell;
use std::ffi::OsString;
use std::fs;
use std::time::Duration;
use chrono::{DateTime, Utc};
use netsky_db::{
CloneDispatchRecord, CommunicationEventRecord, Db, GitOperationRecord, HarvestEventRecord,
IrohEventRecord, IrohEventType, McpToolCallRecord, OwnerDirectiveRecord, SessionEvent,
SourceErrorClass, SourceErrorRecord, TokenUsageRecord, WatchdogEventRecord,
};
thread_local! {
static DB: RefCell<Option<Db>> = const { RefCell::new(None) };
}
pub fn record_cli_invocation(argv: &[OsString], exit_code: Option<i64>, duration: Duration) {
let Some((bin, argv_json)) = cli_args(argv) else {
return;
};
let duration_ms = duration.as_millis().try_into().ok();
write_with_fallback(
"cli_invocations",
serde_json::json!({
"bin": bin,
"argv_json": argv_json,
"exit_code": exit_code,
"duration_ms": duration_ms,
"host": host(),
}),
|db| {
db.record_cli(
Utc::now(),
&bin,
&argv_json,
exit_code,
duration_ms,
&host(),
)
},
);
}
pub fn record_session(agent: &str, session_num: i64, event: SessionEvent) {
write_with_fallback(
"sessions",
serde_json::json!({
"ts_utc": Utc::now().to_rfc3339(),
"agent": agent,
"session_num": session_num,
"event": session_event_label(event),
}),
|db| db.record_session(Utc::now(), agent, session_num, event),
);
}
pub fn record_tick(source: &str, detail: serde_json::Value) {
let detail_json = detail.to_string();
write_with_fallback(
"ticks",
serde_json::json!({
"ts_utc": Utc::now().to_rfc3339(),
"source": source,
"detail_json": detail_json,
}),
|db| db.record_tick(Utc::now(), source, &detail_json),
);
}
pub fn record_clone_dispatch(record: CloneDispatchRecord<'_>) {
write_with_fallback(
"clone_dispatches",
serde_json::json!({
"ts_utc_start": record.ts_utc_start.to_rfc3339(),
"ts_utc_end": record.ts_utc_end.map(|ts| ts.to_rfc3339()),
"agent_id": record.agent_id,
"runtime": record.runtime,
"brief_path": record.brief_path,
"brief": record.brief,
"workspace": record.workspace,
"branch": record.branch,
"status": record.status,
"exit_code": record.exit_code,
"detail_json": record.detail_json,
}),
|db| db.record_clone_dispatch(record),
);
}
pub fn record_harvest_event(record: HarvestEventRecord<'_>) {
write_with_fallback(
"harvest_events",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"source_branch": record.source_branch,
"target_branch": record.target_branch,
"commit_sha": record.commit_sha,
"status": record.status,
"conflicts": record.conflicts,
"detail_json": record.detail_json,
}),
|db| db.record_harvest_event(record),
);
}
pub fn record_communication_event(record: CommunicationEventRecord<'_>) {
write_with_fallback(
"communication_events",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"source": record.source,
"tool": record.tool,
"direction": direction_label(record.direction),
"chat_id": record.chat_id,
"message_id": record.message_id,
"handle": record.handle,
"agent": record.agent,
"body": record.body,
"status": record.status,
"detail_json": record.detail_json,
}),
|db| db.record_communication_event(record),
);
}
pub fn record_mcp_tool_call(record: McpToolCallRecord<'_>) {
write_with_fallback(
"mcp_tool_calls",
serde_json::json!({
"ts_utc_start": record.ts_utc_start.to_rfc3339(),
"ts_utc_end": record.ts_utc_end.map(|ts| ts.to_rfc3339()),
"source": record.source,
"tool": record.tool,
"agent": record.agent,
"duration_ms": record.duration_ms,
"success": record.success,
"error": record.error,
"timeout_race": record.timeout_race,
"request_json": record.request_json,
"response_json": record.response_json,
}),
|db| db.record_mcp_tool_call(record),
);
}
pub fn record_git_operation(record: GitOperationRecord<'_>) {
write_with_fallback(
"git_operations",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"operation": record.operation,
"repo": record.repo,
"branch": record.branch,
"remote": record.remote,
"from_sha": record.from_sha,
"to_sha": record.to_sha,
"status": record.status,
"detail_json": record.detail_json,
}),
|db| db.record_git_operation(record),
);
}
pub fn record_owner_directive(record: OwnerDirectiveRecord<'_>) {
write_with_fallback(
"owner_directives",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"source": record.source,
"chat_id": record.chat_id,
"raw_text": record.raw_text,
"resolved_action": record.resolved_action,
"agent": record.agent,
"status": record.status,
"detail_json": record.detail_json,
}),
|db| db.record_owner_directive(record),
);
}
pub fn record_token_usage(record: TokenUsageRecord<'_>) {
write_with_fallback(
"token_usage",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"session_id": record.session_id,
"agent": record.agent,
"runtime": record.runtime,
"model": record.model,
"input_tokens": record.input_tokens,
"output_tokens": record.output_tokens,
"cached_input_tokens": record.cached_input_tokens,
"cost_usd_micros": record.cost_usd_micros,
"detail_json": record.detail_json,
}),
|db| db.record_token_usage(record),
);
}
pub fn record_watchdog_event(record: WatchdogEventRecord<'_>) {
write_with_fallback(
"watchdog_events",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"event": record.event,
"agent": record.agent,
"severity": record.severity,
"status": record.status,
"detail_json": record.detail_json,
}),
|db| db.record_watchdog_event(record),
);
}
pub fn record_source_error(record: SourceErrorRecord<'_>) {
write_with_fallback(
"source_errors",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"source": record.source,
"error_class": record.error_class.as_str(),
"count": record.count,
"detail_json": record.detail_json,
}),
|db| db.record_source_error(record),
);
}
pub fn record_iroh_event(record: IrohEventRecord<'_>) {
write_with_fallback(
"iroh_events",
serde_json::json!({
"ts_utc": record.ts_utc.to_rfc3339(),
"event_type": record.event_type.as_str(),
"peer_id_hash": record.peer_id_hash,
"peer_label": record.peer_label,
"detail_json": record.detail_json,
}),
|db| db.record_iroh_event(record),
);
}
pub fn record_source_err(source: &str, error_class: SourceErrorClass, detail: Option<&str>) {
record_source_error(SourceErrorRecord {
ts_utc: Utc::now(),
source,
error_class,
count: 1,
detail_json: detail,
});
}
pub fn record_iroh(
event_type: IrohEventType,
raw_peer_id: &str,
peer_label: Option<&str>,
detail: Option<&str>,
) {
let hash = netsky_db::hash_peer_id(raw_peer_id);
record_iroh_event(IrohEventRecord {
ts_utc: Utc::now(),
event_type,
peer_id_hash: &hash,
peer_label,
detail_json: detail,
});
}
#[allow(clippy::too_many_arguments)]
pub fn record_watchdog(
event: &str,
agent: Option<&str>,
severity: Option<&str>,
status: Option<&str>,
detail: serde_json::Value,
) {
let detail_json = detail.to_string();
record_watchdog_event(WatchdogEventRecord {
ts_utc: Utc::now(),
event,
agent,
severity,
status,
detail_json: Some(&detail_json),
});
}
#[allow(clippy::too_many_arguments)]
pub fn record_directive(
source: &str,
chat_id: Option<&str>,
raw_text: &str,
resolved_action: Option<&str>,
agent: Option<&str>,
status: Option<&str>,
detail: serde_json::Value,
) {
let detail_json = detail.to_string();
record_owner_directive(OwnerDirectiveRecord {
ts_utc: Utc::now(),
source,
chat_id,
raw_text,
resolved_action,
agent,
status,
detail_json: Some(&detail_json),
});
}
#[allow(clippy::too_many_arguments)]
pub fn record_mcp_call(
source: &str,
tool: &str,
agent: Option<&str>,
started: DateTime<Utc>,
duration: Duration,
success: bool,
error: Option<&str>,
timeout_race: bool,
request_json: Option<&str>,
response_json: Option<&str>,
) {
let duration_ms = duration.as_millis().try_into().ok();
record_mcp_tool_call(McpToolCallRecord {
ts_utc_start: started,
ts_utc_end: Some(Utc::now()),
source,
tool,
agent,
duration_ms,
success,
error,
timeout_race,
request_json,
response_json,
});
}
fn with_db<F, T>(f: F) -> netsky_db::Result<T>
where
F: FnOnce(&Db) -> netsky_db::Result<T>,
{
DB.with(|cell| {
if cell.borrow().is_none() {
let db = Db::open()?;
db.migrate()?;
cell.replace(Some(db));
}
let borrow = cell.borrow();
let Some(db) = borrow.as_ref() else {
unreachable!("db was initialized above")
};
f(db)
})
}
fn write_with_fallback<T, F>(table: &str, record: serde_json::Value, f: F)
where
F: FnOnce(&Db) -> netsky_db::Result<T>,
{
if let Err(error) = with_db(f)
&& let Err(spool_error) = spool_meta_db_error(table, &error, record)
{
eprintln!("meta-db fallback failed for {table}: {spool_error}");
}
}
fn cli_args(argv: &[OsString]) -> Option<(String, String)> {
let bin = argv.first()?.to_string_lossy().into_owned();
let args: Vec<String> = argv
.iter()
.skip(1)
.map(|s| s.to_string_lossy().into_owned())
.collect();
let argv_json = serde_json::to_string(&args).ok()?;
Some((bin, argv_json))
}
fn host() -> String {
std::env::var("HOSTNAME")
.or_else(|_| std::env::var("COMPUTERNAME"))
.unwrap_or_else(|_| "unknown".to_string())
}
fn direction_label(direction: netsky_db::Direction) -> &'static str {
match direction {
netsky_db::Direction::Inbound => "inbound",
netsky_db::Direction::Outbound => "outbound",
}
}
fn session_event_label(event: SessionEvent) -> &'static str {
match event {
SessionEvent::Up => "up",
SessionEvent::Down => "down",
SessionEvent::Note => "note",
}
}
fn spool_meta_db_error(
table: &str,
error: &netsky_db::Error,
record: serde_json::Value,
) -> std::io::Result<()> {
let home = dirs::home_dir().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "home directory not found")
})?;
let dir = home.join(".netsky").join("logs");
fs::create_dir_all(&dir)?;
let value = serde_json::json!({
"ts_utc": Utc::now().to_rfc3339(),
"table": table,
"error": error.to_string(),
"record": record,
});
let path = dir.join(format!(
"meta-db-errors-{}.jsonl",
Utc::now().format("%Y-%m-%d")
));
if let Err(spool_error) = netsky_core::jsonl::append_json_line(&path, &value) {
write_spool_failure_marker(table, error, &spool_error, &value)?;
}
Ok(())
}
fn write_spool_failure_marker(
table: &str,
db_error: &netsky_db::Error,
spool_error: &std::io::Error,
record: &serde_json::Value,
) -> std::io::Result<()> {
let home = dirs::home_dir().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "home directory not found")
})?;
let dir = home.join(".netsky").join("state");
fs::create_dir_all(&dir)?;
let path = dir.join("meta-db-spool-failed");
let body = serde_json::json!({
"ts_utc": Utc::now().to_rfc3339(),
"table": table,
"db_error": db_error.to_string(),
"spool_error": spool_error.to_string(),
"record": record,
});
fs::write(path, serde_json::to_vec_pretty(&body)?)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cli_args_serializes_tail() {
let argv = vec![
OsString::from("netsky"),
OsString::from("up"),
OsString::from("2"),
];
let (bin, json) = cli_args(&argv).expect("cli args");
assert_eq!(bin, "netsky");
assert_eq!(json, "[\"up\",\"2\"]");
}
}