Skip to main content

netsky_db/
lib.rs

1use std::fs;
2use std::time::Duration;
3
4use chrono::{DateTime, SecondsFormat, Utc};
5use rusqlite::{Connection, OptionalExtension, params};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9pub type Result<T> = std::result::Result<T, Error>;
10
11pub const SCHEMA_VERSION: i64 = 1;
12
13#[derive(Debug, Error)]
14pub enum Error {
15    #[error("home directory not found")]
16    HomeDirMissing,
17    #[error("schema version {found} is newer than supported {supported}")]
18    FutureSchemaVersion { found: i64, supported: i64 },
19    #[error(transparent)]
20    Sqlite(#[from] rusqlite::Error),
21    #[error(transparent)]
22    Io(#[from] std::io::Error),
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum Direction {
27    Inbound,
28    Outbound,
29}
30
31impl Direction {
32    fn as_str(self) -> &'static str {
33        match self {
34            Direction::Inbound => "inbound",
35            Direction::Outbound => "outbound",
36        }
37    }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum SessionEvent {
42    Up,
43    Down,
44    Note,
45}
46
47impl SessionEvent {
48    fn as_str(self) -> &'static str {
49        match self {
50            SessionEvent::Up => "up",
51            SessionEvent::Down => "down",
52            SessionEvent::Note => "note",
53        }
54    }
55}
56
57pub struct Db {
58    conn: Connection,
59}
60
61pub struct MessageRecord<'a> {
62    pub ts_utc: DateTime<Utc>,
63    pub source: &'a str,
64    pub direction: Direction,
65    pub chat_id: Option<&'a str>,
66    pub from_agent: Option<&'a str>,
67    pub to_agent: Option<&'a str>,
68    pub body: Option<&'a str>,
69    pub raw_json: Option<&'a str>,
70}
71
72impl Db {
73    pub fn open() -> Result<Self> {
74        let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
75        let dir = home.join(".netsky");
76        fs::create_dir_all(&dir)?;
77        let path = dir.join("meta.db");
78        let conn = Connection::open(path)?;
79        conn.busy_timeout(Duration::from_secs(5))?;
80        Ok(Self { conn })
81    }
82
83    #[cfg(test)]
84    pub(crate) fn open_in_memory() -> Result<Self> {
85        let conn = Connection::open_in_memory()?;
86        conn.busy_timeout(Duration::from_secs(5))?;
87        Ok(Self { conn })
88    }
89
90    pub fn migrate(&self) -> Result<()> {
91        self.conn.execute_batch(SCHEMA_VERSION_SQL)?;
92        let current = self.schema_version()?;
93        if current > SCHEMA_VERSION {
94            return Err(Error::FutureSchemaVersion {
95                found: current,
96                supported: SCHEMA_VERSION,
97            });
98        }
99
100        self.conn.execute_batch(MIGRATE_V1_SQL)?;
101        self.conn.execute(
102            "UPDATE schema_version SET version = ?1 WHERE id = 1",
103            params![SCHEMA_VERSION],
104        )?;
105        Ok(())
106    }
107
108    pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
109        self.conn.execute(
110            "INSERT INTO messages \
111             (ts_utc, source, direction, chat_id, from_agent, to_agent, body, raw_json) \
112             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
113            params![
114                ts(record.ts_utc),
115                record.source,
116                record.direction.as_str(),
117                record.chat_id,
118                record.from_agent,
119                record.to_agent,
120                record.body,
121                record.raw_json,
122            ],
123        )?;
124        Ok(self.conn.last_insert_rowid())
125    }
126
127    pub fn record_cli(
128        &self,
129        ts_utc: DateTime<Utc>,
130        bin: &str,
131        argv_json: &str,
132        exit_code: Option<i64>,
133        duration_ms: Option<i64>,
134        host: &str,
135    ) -> Result<i64> {
136        self.conn.execute(
137            "INSERT INTO cli_invocations \
138             (ts_utc, bin, argv_json, exit_code, duration_ms, host) \
139             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
140            params![ts(ts_utc), bin, argv_json, exit_code, duration_ms, host],
141        )?;
142        Ok(self.conn.last_insert_rowid())
143    }
144
145    pub fn record_crash(
146        &self,
147        ts_utc: DateTime<Utc>,
148        kind: &str,
149        agent: &str,
150        detail_json: &str,
151    ) -> Result<i64> {
152        self.conn.execute(
153            "INSERT INTO crashes (ts_utc, kind, agent, detail_json) VALUES (?1, ?2, ?3, ?4)",
154            params![ts(ts_utc), kind, agent, detail_json],
155        )?;
156        Ok(self.conn.last_insert_rowid())
157    }
158
159    pub fn record_tick(
160        &self,
161        ts_utc: DateTime<Utc>,
162        source: &str,
163        detail_json: &str,
164    ) -> Result<i64> {
165        self.conn.execute(
166            "INSERT INTO ticks (ts_utc, source, detail_json) VALUES (?1, ?2, ?3)",
167            params![ts(ts_utc), source, detail_json],
168        )?;
169        Ok(self.conn.last_insert_rowid())
170    }
171
172    pub fn record_workspace(
173        &self,
174        ts_utc_created: DateTime<Utc>,
175        name: &str,
176        branch: &str,
177        ts_utc_deleted: Option<DateTime<Utc>>,
178        verdict: Option<&str>,
179    ) -> Result<i64> {
180        self.conn.execute(
181            "INSERT INTO workspaces \
182             (ts_utc_created, name, branch, ts_utc_deleted, verdict) \
183             VALUES (?1, ?2, ?3, ?4, ?5)",
184            params![
185                ts(ts_utc_created),
186                name,
187                branch,
188                ts_utc_deleted.map(ts),
189                verdict,
190            ],
191        )?;
192        Ok(self.conn.last_insert_rowid())
193    }
194
195    pub fn record_session(
196        &self,
197        ts_utc: DateTime<Utc>,
198        agent: &str,
199        session_num: i64,
200        event: SessionEvent,
201    ) -> Result<i64> {
202        self.conn.execute(
203            "INSERT INTO sessions (ts_utc, agent, session_num, event) VALUES (?1, ?2, ?3, ?4)",
204            params![ts(ts_utc), agent, session_num, event.as_str()],
205        )?;
206        Ok(self.conn.last_insert_rowid())
207    }
208
209    fn schema_version(&self) -> Result<i64> {
210        Ok(self
211            .conn
212            .query_row(
213                "SELECT version FROM schema_version WHERE id = 1",
214                [],
215                |row| row.get(0),
216            )
217            .optional()?
218            .unwrap_or(0))
219    }
220}
221
222fn ts(ts_utc: DateTime<Utc>) -> String {
223    ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
224}
225
226const SCHEMA_VERSION_SQL: &str = r#"
227CREATE TABLE IF NOT EXISTS schema_version (
228    id INTEGER PRIMARY KEY CHECK (id = 1),
229    version INTEGER NOT NULL
230);
231INSERT OR IGNORE INTO schema_version (id, version) VALUES (1, 0);
232"#;
233
234const MIGRATE_V1_SQL: &str = r#"
235CREATE TABLE IF NOT EXISTS messages (
236    id INTEGER PRIMARY KEY,
237    ts_utc TEXT NOT NULL,
238    source TEXT NOT NULL,
239    direction TEXT NOT NULL,
240    chat_id TEXT,
241    from_agent TEXT,
242    to_agent TEXT,
243    body TEXT,
244    raw_json TEXT
245);
246CREATE INDEX IF NOT EXISTS idx_messages_ts_utc ON messages(ts_utc);
247CREATE INDEX IF NOT EXISTS idx_messages_source_ts_utc ON messages(source, ts_utc);
248
249CREATE TABLE IF NOT EXISTS cli_invocations (
250    id INTEGER PRIMARY KEY,
251    ts_utc TEXT NOT NULL,
252    bin TEXT NOT NULL,
253    argv_json TEXT NOT NULL,
254    exit_code INTEGER,
255    duration_ms INTEGER,
256    host TEXT NOT NULL
257);
258CREATE INDEX IF NOT EXISTS idx_cli_invocations_ts_utc ON cli_invocations(ts_utc);
259
260CREATE TABLE IF NOT EXISTS crashes (
261    id INTEGER PRIMARY KEY,
262    ts_utc TEXT NOT NULL,
263    kind TEXT NOT NULL,
264    agent TEXT NOT NULL,
265    detail_json TEXT NOT NULL
266);
267CREATE INDEX IF NOT EXISTS idx_crashes_ts_utc ON crashes(ts_utc);
268CREATE INDEX IF NOT EXISTS idx_crashes_agent_ts_utc ON crashes(agent, ts_utc);
269
270CREATE TABLE IF NOT EXISTS ticks (
271    id INTEGER PRIMARY KEY,
272    ts_utc TEXT NOT NULL,
273    source TEXT NOT NULL,
274    detail_json TEXT NOT NULL
275);
276CREATE INDEX IF NOT EXISTS idx_ticks_ts_utc ON ticks(ts_utc);
277
278CREATE TABLE IF NOT EXISTS workspaces (
279    id INTEGER PRIMARY KEY,
280    ts_utc_created TEXT NOT NULL,
281    name TEXT NOT NULL,
282    branch TEXT NOT NULL,
283    ts_utc_deleted TEXT,
284    verdict TEXT
285);
286CREATE INDEX IF NOT EXISTS idx_workspaces_ts_utc_created ON workspaces(ts_utc_created);
287CREATE INDEX IF NOT EXISTS idx_workspaces_ts_utc_deleted ON workspaces(ts_utc_deleted);
288
289CREATE TABLE IF NOT EXISTS sessions (
290    id INTEGER PRIMARY KEY,
291    ts_utc TEXT NOT NULL,
292    agent TEXT NOT NULL,
293    session_num INTEGER NOT NULL,
294    event TEXT NOT NULL
295);
296CREATE INDEX IF NOT EXISTS idx_sessions_ts_utc ON sessions(ts_utc);
297CREATE INDEX IF NOT EXISTS idx_sessions_agent_ts_utc ON sessions(agent, ts_utc);
298"#;
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    fn db() -> Result<Db> {
305        let db = Db::open_in_memory()?;
306        db.migrate()?;
307        Ok(db)
308    }
309
310    #[test]
311    fn message_round_trip() -> Result<()> {
312        let db = db()?;
313        let ts = chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
314            .unwrap()
315            .with_timezone(&Utc);
316        let id = db.record_message(MessageRecord {
317            ts_utc: ts,
318            source: "imessage",
319            direction: Direction::Inbound,
320            chat_id: Some("chat-1"),
321            from_agent: Some("agent9"),
322            to_agent: None,
323            body: Some("hello"),
324            raw_json: Some("{\"chat_id\":\"chat-1\"}"),
325        })?;
326        let row: (i64, String, String, String, String, Option<String>) = db.conn.query_row(
327            "SELECT id, ts_utc, source, direction, chat_id, body FROM messages WHERE id = ?1",
328            params![id],
329            |row| {
330                Ok((
331                    row.get(0)?,
332                    row.get(1)?,
333                    row.get(2)?,
334                    row.get(3)?,
335                    row.get(4)?,
336                    row.get(5)?,
337                ))
338            },
339        )?;
340        assert_eq!(row.0, id);
341        assert_eq!(row.1, "2026-04-15T23:00:00.000Z");
342        assert_eq!(row.2, "imessage");
343        assert_eq!(row.3, "inbound");
344        assert_eq!(row.4, "chat-1");
345        assert_eq!(row.5.as_deref(), Some("hello"));
346        Ok(())
347    }
348
349    #[test]
350    fn cli_round_trip() -> Result<()> {
351        let db = db()?;
352        let ts = Utc::now();
353        let id = db.record_cli(
354            ts,
355            "netsky",
356            "[\"db\", \"migrate\"]",
357            Some(0),
358            Some(12),
359            "host-a",
360        )?;
361        let row: (String, String, i64, i64, String) = db.conn.query_row(
362            "SELECT ts_utc, bin, exit_code, duration_ms, host FROM cli_invocations WHERE id = ?1",
363            params![id],
364            |row| {
365                Ok((
366                    row.get(0)?,
367                    row.get(1)?,
368                    row.get(2)?,
369                    row.get(3)?,
370                    row.get(4)?,
371                ))
372            },
373        )?;
374        assert_eq!(row.1, "netsky");
375        assert_eq!(row.2, 0);
376        assert_eq!(row.3, 12);
377        assert_eq!(row.4, "host-a");
378        assert!(row.0.contains('T'));
379        Ok(())
380    }
381
382    #[test]
383    fn crash_round_trip() -> Result<()> {
384        let db = db()?;
385        let id = db.record_crash(Utc::now(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
386        let row: (String, String, String, String) = db.conn.query_row(
387            "SELECT ts_utc, kind, agent, detail_json FROM crashes WHERE id = ?1",
388            params![id],
389            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
390        )?;
391        assert_eq!(row.1, "panic");
392        assert_eq!(row.2, "agent8");
393        assert_eq!(row.3, "{\"reason\":\"wedged\"}");
394        assert!(row.0.contains('T'));
395        Ok(())
396    }
397
398    #[test]
399    fn tick_round_trip() -> Result<()> {
400        let db = db()?;
401        let id = db.record_tick(Utc::now(), "ticker", "{\"beat\":1}")?;
402        let row: (String, String, String) = db.conn.query_row(
403            "SELECT ts_utc, source, detail_json FROM ticks WHERE id = ?1",
404            params![id],
405            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
406        )?;
407        assert_eq!(row.1, "ticker");
408        assert_eq!(row.2, "{\"beat\":1}");
409        assert!(row.0.contains('T'));
410        Ok(())
411    }
412
413    #[test]
414    fn workspace_round_trip() -> Result<()> {
415        let db = db()?;
416        let created = Utc::now();
417        let deleted = created + chrono::Duration::minutes(5);
418        let id = db.record_workspace(
419            created,
420            "session8",
421            "feat/netsky-db-v0",
422            Some(deleted),
423            Some("kept"),
424        )?;
425        let row: (String, String, String, Option<String>, Option<String>) = db.conn.query_row(
426            "SELECT ts_utc_created, name, branch, ts_utc_deleted, verdict FROM workspaces WHERE id = ?1",
427            params![id],
428            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)),
429        )?;
430        assert_eq!(row.1, "session8");
431        assert_eq!(row.2, "feat/netsky-db-v0");
432        assert_eq!(row.4.as_deref(), Some("kept"));
433        assert!(row.0.contains('T'));
434        assert!(row.3.as_deref().unwrap_or("").contains('T'));
435        Ok(())
436    }
437
438    #[test]
439    fn session_round_trip() -> Result<()> {
440        let db = db()?;
441        let id = db.record_session(Utc::now(), "agent8", 8, SessionEvent::Note)?;
442        let row: (String, String, i64, String) = db.conn.query_row(
443            "SELECT ts_utc, agent, session_num, event FROM sessions WHERE id = ?1",
444            params![id],
445            |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
446        )?;
447        assert_eq!(row.1, "agent8");
448        assert_eq!(row.2, 8);
449        assert_eq!(row.3, "note");
450        assert!(row.0.contains('T'));
451        Ok(())
452    }
453}