use std::fs;
use std::time::Duration;
use chrono::{DateTime, SecondsFormat, Utc};
use rusqlite::{Connection, OptionalExtension, params};
use serde::{Deserialize, Serialize};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
pub const SCHEMA_VERSION: i64 = 1;
#[derive(Debug, Error)]
pub enum Error {
#[error("home directory not found")]
HomeDirMissing,
#[error("schema version {found} is newer than supported {supported}")]
FutureSchemaVersion { found: i64, supported: i64 },
#[error(transparent)]
Sqlite(#[from] rusqlite::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Direction {
Inbound,
Outbound,
}
impl Direction {
fn as_str(self) -> &'static str {
match self {
Direction::Inbound => "inbound",
Direction::Outbound => "outbound",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SessionEvent {
Up,
Down,
Note,
}
impl SessionEvent {
fn as_str(self) -> &'static str {
match self {
SessionEvent::Up => "up",
SessionEvent::Down => "down",
SessionEvent::Note => "note",
}
}
}
pub struct Db {
conn: Connection,
}
pub struct MessageRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub direction: Direction,
pub chat_id: Option<&'a str>,
pub from_agent: Option<&'a str>,
pub to_agent: Option<&'a str>,
pub body: Option<&'a str>,
pub raw_json: Option<&'a str>,
}
impl Db {
pub fn open() -> Result<Self> {
let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
let dir = home.join(".netsky");
fs::create_dir_all(&dir)?;
let path = dir.join("meta.db");
let conn = Connection::open(path)?;
conn.busy_timeout(Duration::from_secs(5))?;
Ok(Self { conn })
}
#[cfg(test)]
pub(crate) fn open_in_memory() -> Result<Self> {
let conn = Connection::open_in_memory()?;
conn.busy_timeout(Duration::from_secs(5))?;
Ok(Self { conn })
}
pub fn migrate(&self) -> Result<()> {
self.conn.execute_batch(SCHEMA_VERSION_SQL)?;
let current = self.schema_version()?;
if current > SCHEMA_VERSION {
return Err(Error::FutureSchemaVersion {
found: current,
supported: SCHEMA_VERSION,
});
}
self.conn.execute_batch(MIGRATE_V1_SQL)?;
self.conn.execute(
"UPDATE schema_version SET version = ?1 WHERE id = 1",
params![SCHEMA_VERSION],
)?;
Ok(())
}
pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
self.conn.execute(
"INSERT INTO messages \
(ts_utc, source, direction, chat_id, from_agent, to_agent, body, raw_json) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
ts(record.ts_utc),
record.source,
record.direction.as_str(),
record.chat_id,
record.from_agent,
record.to_agent,
record.body,
record.raw_json,
],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn record_cli(
&self,
ts_utc: DateTime<Utc>,
bin: &str,
argv_json: &str,
exit_code: Option<i64>,
duration_ms: Option<i64>,
host: &str,
) -> Result<i64> {
self.conn.execute(
"INSERT INTO cli_invocations \
(ts_utc, bin, argv_json, exit_code, duration_ms, host) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![ts(ts_utc), bin, argv_json, exit_code, duration_ms, host],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn record_crash(
&self,
ts_utc: DateTime<Utc>,
kind: &str,
agent: &str,
detail_json: &str,
) -> Result<i64> {
self.conn.execute(
"INSERT INTO crashes (ts_utc, kind, agent, detail_json) VALUES (?1, ?2, ?3, ?4)",
params![ts(ts_utc), kind, agent, detail_json],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn record_tick(
&self,
ts_utc: DateTime<Utc>,
source: &str,
detail_json: &str,
) -> Result<i64> {
self.conn.execute(
"INSERT INTO ticks (ts_utc, source, detail_json) VALUES (?1, ?2, ?3)",
params![ts(ts_utc), source, detail_json],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn record_workspace(
&self,
ts_utc_created: DateTime<Utc>,
name: &str,
branch: &str,
ts_utc_deleted: Option<DateTime<Utc>>,
verdict: Option<&str>,
) -> Result<i64> {
self.conn.execute(
"INSERT INTO workspaces \
(ts_utc_created, name, branch, ts_utc_deleted, verdict) \
VALUES (?1, ?2, ?3, ?4, ?5)",
params![
ts(ts_utc_created),
name,
branch,
ts_utc_deleted.map(ts),
verdict,
],
)?;
Ok(self.conn.last_insert_rowid())
}
pub fn record_session(
&self,
ts_utc: DateTime<Utc>,
agent: &str,
session_num: i64,
event: SessionEvent,
) -> Result<i64> {
self.conn.execute(
"INSERT INTO sessions (ts_utc, agent, session_num, event) VALUES (?1, ?2, ?3, ?4)",
params![ts(ts_utc), agent, session_num, event.as_str()],
)?;
Ok(self.conn.last_insert_rowid())
}
fn schema_version(&self) -> Result<i64> {
Ok(self
.conn
.query_row(
"SELECT version FROM schema_version WHERE id = 1",
[],
|row| row.get(0),
)
.optional()?
.unwrap_or(0))
}
}
fn ts(ts_utc: DateTime<Utc>) -> String {
ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
}
const SCHEMA_VERSION_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS schema_version (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL
);
INSERT OR IGNORE INTO schema_version (id, version) VALUES (1, 0);
"#;
const MIGRATE_V1_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY,
ts_utc TEXT NOT NULL,
source TEXT NOT NULL,
direction TEXT NOT NULL,
chat_id TEXT,
from_agent TEXT,
to_agent TEXT,
body TEXT,
raw_json TEXT
);
CREATE INDEX IF NOT EXISTS idx_messages_ts_utc ON messages(ts_utc);
CREATE INDEX IF NOT EXISTS idx_messages_source_ts_utc ON messages(source, ts_utc);
CREATE TABLE IF NOT EXISTS cli_invocations (
id INTEGER PRIMARY KEY,
ts_utc TEXT NOT NULL,
bin TEXT NOT NULL,
argv_json TEXT NOT NULL,
exit_code INTEGER,
duration_ms INTEGER,
host TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cli_invocations_ts_utc ON cli_invocations(ts_utc);
CREATE TABLE IF NOT EXISTS crashes (
id INTEGER PRIMARY KEY,
ts_utc TEXT NOT NULL,
kind TEXT NOT NULL,
agent TEXT NOT NULL,
detail_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_crashes_ts_utc ON crashes(ts_utc);
CREATE INDEX IF NOT EXISTS idx_crashes_agent_ts_utc ON crashes(agent, ts_utc);
CREATE TABLE IF NOT EXISTS ticks (
id INTEGER PRIMARY KEY,
ts_utc TEXT NOT NULL,
source TEXT NOT NULL,
detail_json TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_ticks_ts_utc ON ticks(ts_utc);
CREATE TABLE IF NOT EXISTS workspaces (
id INTEGER PRIMARY KEY,
ts_utc_created TEXT NOT NULL,
name TEXT NOT NULL,
branch TEXT NOT NULL,
ts_utc_deleted TEXT,
verdict TEXT
);
CREATE INDEX IF NOT EXISTS idx_workspaces_ts_utc_created ON workspaces(ts_utc_created);
CREATE INDEX IF NOT EXISTS idx_workspaces_ts_utc_deleted ON workspaces(ts_utc_deleted);
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY,
ts_utc TEXT NOT NULL,
agent TEXT NOT NULL,
session_num INTEGER NOT NULL,
event TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sessions_ts_utc ON sessions(ts_utc);
CREATE INDEX IF NOT EXISTS idx_sessions_agent_ts_utc ON sessions(agent, ts_utc);
"#;
#[cfg(test)]
mod tests {
use super::*;
fn db() -> Result<Db> {
let db = Db::open_in_memory()?;
db.migrate()?;
Ok(db)
}
#[test]
fn message_round_trip() -> Result<()> {
let db = db()?;
let ts = chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
.unwrap()
.with_timezone(&Utc);
let id = db.record_message(MessageRecord {
ts_utc: ts,
source: "imessage",
direction: Direction::Inbound,
chat_id: Some("chat-1"),
from_agent: Some("agent9"),
to_agent: None,
body: Some("hello"),
raw_json: Some("{\"chat_id\":\"chat-1\"}"),
})?;
let row: (i64, String, String, String, String, Option<String>) = db.conn.query_row(
"SELECT id, ts_utc, source, direction, chat_id, body FROM messages WHERE id = ?1",
params![id],
|row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
row.get(5)?,
))
},
)?;
assert_eq!(row.0, id);
assert_eq!(row.1, "2026-04-15T23:00:00.000Z");
assert_eq!(row.2, "imessage");
assert_eq!(row.3, "inbound");
assert_eq!(row.4, "chat-1");
assert_eq!(row.5.as_deref(), Some("hello"));
Ok(())
}
#[test]
fn cli_round_trip() -> Result<()> {
let db = db()?;
let ts = Utc::now();
let id = db.record_cli(
ts,
"netsky",
"[\"db\", \"migrate\"]",
Some(0),
Some(12),
"host-a",
)?;
let row: (String, String, i64, i64, String) = db.conn.query_row(
"SELECT ts_utc, bin, exit_code, duration_ms, host FROM cli_invocations WHERE id = ?1",
params![id],
|row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
))
},
)?;
assert_eq!(row.1, "netsky");
assert_eq!(row.2, 0);
assert_eq!(row.3, 12);
assert_eq!(row.4, "host-a");
assert!(row.0.contains('T'));
Ok(())
}
#[test]
fn crash_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_crash(Utc::now(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
let row: (String, String, String, String) = db.conn.query_row(
"SELECT ts_utc, kind, agent, detail_json FROM crashes WHERE id = ?1",
params![id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)?;
assert_eq!(row.1, "panic");
assert_eq!(row.2, "agent8");
assert_eq!(row.3, "{\"reason\":\"wedged\"}");
assert!(row.0.contains('T'));
Ok(())
}
#[test]
fn tick_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_tick(Utc::now(), "ticker", "{\"beat\":1}")?;
let row: (String, String, String) = db.conn.query_row(
"SELECT ts_utc, source, detail_json FROM ticks WHERE id = ?1",
params![id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)?;
assert_eq!(row.1, "ticker");
assert_eq!(row.2, "{\"beat\":1}");
assert!(row.0.contains('T'));
Ok(())
}
#[test]
fn workspace_round_trip() -> Result<()> {
let db = db()?;
let created = Utc::now();
let deleted = created + chrono::Duration::minutes(5);
let id = db.record_workspace(
created,
"session8",
"feat/netsky-db-v0",
Some(deleted),
Some("kept"),
)?;
let row: (String, String, String, Option<String>, Option<String>) = db.conn.query_row(
"SELECT ts_utc_created, name, branch, ts_utc_deleted, verdict FROM workspaces WHERE id = ?1",
params![id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)),
)?;
assert_eq!(row.1, "session8");
assert_eq!(row.2, "feat/netsky-db-v0");
assert_eq!(row.4.as_deref(), Some("kept"));
assert!(row.0.contains('T'));
assert!(row.3.as_deref().unwrap_or("").contains('T'));
Ok(())
}
#[test]
fn session_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_session(Utc::now(), "agent8", 8, SessionEvent::Note)?;
let row: (String, String, i64, String) = db.conn.query_row(
"SELECT ts_utc, agent, session_num, event FROM sessions WHERE id = ?1",
params![id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)?;
assert_eq!(row.1, "agent8");
assert_eq!(row.2, 8);
assert_eq!(row.3, "note");
assert!(row.0.contains('T'));
Ok(())
}
}