1use std::fs;
2use std::time::Duration;
3
4use chrono::{DateTime, SecondsFormat, Utc};
5use rusqlite::{Connection, OptionalExtension, params};
6use serde::{Deserialize, Serialize};
7use thiserror::Error;
8
9pub type Result<T> = std::result::Result<T, Error>;
10
11pub const SCHEMA_VERSION: i64 = 1;
12
13#[derive(Debug, Error)]
14pub enum Error {
15 #[error("home directory not found")]
16 HomeDirMissing,
17 #[error("schema version {found} is newer than supported {supported}")]
18 FutureSchemaVersion { found: i64, supported: i64 },
19 #[error(transparent)]
20 Sqlite(#[from] rusqlite::Error),
21 #[error(transparent)]
22 Io(#[from] std::io::Error),
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum Direction {
27 Inbound,
28 Outbound,
29}
30
31impl Direction {
32 fn as_str(self) -> &'static str {
33 match self {
34 Direction::Inbound => "inbound",
35 Direction::Outbound => "outbound",
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum SessionEvent {
42 Up,
43 Down,
44 Note,
45}
46
47impl SessionEvent {
48 fn as_str(self) -> &'static str {
49 match self {
50 SessionEvent::Up => "up",
51 SessionEvent::Down => "down",
52 SessionEvent::Note => "note",
53 }
54 }
55}
56
57pub struct Db {
58 conn: Connection,
59}
60
61pub struct MessageRecord<'a> {
62 pub ts_utc: DateTime<Utc>,
63 pub source: &'a str,
64 pub direction: Direction,
65 pub chat_id: Option<&'a str>,
66 pub from_agent: Option<&'a str>,
67 pub to_agent: Option<&'a str>,
68 pub body: Option<&'a str>,
69 pub raw_json: Option<&'a str>,
70}
71
72impl Db {
73 pub fn open() -> Result<Self> {
74 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
75 let dir = home.join(".netsky");
76 fs::create_dir_all(&dir)?;
77 let path = dir.join("meta.db");
78 let conn = Connection::open(path)?;
79 conn.busy_timeout(Duration::from_secs(5))?;
80 Ok(Self { conn })
81 }
82
83 #[cfg(test)]
84 pub(crate) fn open_in_memory() -> Result<Self> {
85 let conn = Connection::open_in_memory()?;
86 conn.busy_timeout(Duration::from_secs(5))?;
87 Ok(Self { conn })
88 }
89
90 pub fn migrate(&self) -> Result<()> {
91 self.conn.execute_batch(SCHEMA_VERSION_SQL)?;
92 let current = self.schema_version()?;
93 if current > SCHEMA_VERSION {
94 return Err(Error::FutureSchemaVersion {
95 found: current,
96 supported: SCHEMA_VERSION,
97 });
98 }
99
100 self.conn.execute_batch(MIGRATE_V1_SQL)?;
101 self.conn.execute(
102 "UPDATE schema_version SET version = ?1 WHERE id = 1",
103 params![SCHEMA_VERSION],
104 )?;
105 Ok(())
106 }
107
108 pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
109 self.conn.execute(
110 "INSERT INTO messages \
111 (ts_utc, source, direction, chat_id, from_agent, to_agent, body, raw_json) \
112 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
113 params![
114 ts(record.ts_utc),
115 record.source,
116 record.direction.as_str(),
117 record.chat_id,
118 record.from_agent,
119 record.to_agent,
120 record.body,
121 record.raw_json,
122 ],
123 )?;
124 Ok(self.conn.last_insert_rowid())
125 }
126
127 pub fn record_cli(
128 &self,
129 ts_utc: DateTime<Utc>,
130 bin: &str,
131 argv_json: &str,
132 exit_code: Option<i64>,
133 duration_ms: Option<i64>,
134 host: &str,
135 ) -> Result<i64> {
136 self.conn.execute(
137 "INSERT INTO cli_invocations \
138 (ts_utc, bin, argv_json, exit_code, duration_ms, host) \
139 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
140 params![ts(ts_utc), bin, argv_json, exit_code, duration_ms, host],
141 )?;
142 Ok(self.conn.last_insert_rowid())
143 }
144
145 pub fn record_crash(
146 &self,
147 ts_utc: DateTime<Utc>,
148 kind: &str,
149 agent: &str,
150 detail_json: &str,
151 ) -> Result<i64> {
152 self.conn.execute(
153 "INSERT INTO crashes (ts_utc, kind, agent, detail_json) VALUES (?1, ?2, ?3, ?4)",
154 params![ts(ts_utc), kind, agent, detail_json],
155 )?;
156 Ok(self.conn.last_insert_rowid())
157 }
158
159 pub fn record_tick(
160 &self,
161 ts_utc: DateTime<Utc>,
162 source: &str,
163 detail_json: &str,
164 ) -> Result<i64> {
165 self.conn.execute(
166 "INSERT INTO ticks (ts_utc, source, detail_json) VALUES (?1, ?2, ?3)",
167 params![ts(ts_utc), source, detail_json],
168 )?;
169 Ok(self.conn.last_insert_rowid())
170 }
171
172 pub fn record_workspace(
173 &self,
174 ts_utc_created: DateTime<Utc>,
175 name: &str,
176 branch: &str,
177 ts_utc_deleted: Option<DateTime<Utc>>,
178 verdict: Option<&str>,
179 ) -> Result<i64> {
180 self.conn.execute(
181 "INSERT INTO workspaces \
182 (ts_utc_created, name, branch, ts_utc_deleted, verdict) \
183 VALUES (?1, ?2, ?3, ?4, ?5)",
184 params![
185 ts(ts_utc_created),
186 name,
187 branch,
188 ts_utc_deleted.map(ts),
189 verdict,
190 ],
191 )?;
192 Ok(self.conn.last_insert_rowid())
193 }
194
195 pub fn record_session(
196 &self,
197 ts_utc: DateTime<Utc>,
198 agent: &str,
199 session_num: i64,
200 event: SessionEvent,
201 ) -> Result<i64> {
202 self.conn.execute(
203 "INSERT INTO sessions (ts_utc, agent, session_num, event) VALUES (?1, ?2, ?3, ?4)",
204 params![ts(ts_utc), agent, session_num, event.as_str()],
205 )?;
206 Ok(self.conn.last_insert_rowid())
207 }
208
209 fn schema_version(&self) -> Result<i64> {
210 Ok(self
211 .conn
212 .query_row(
213 "SELECT version FROM schema_version WHERE id = 1",
214 [],
215 |row| row.get(0),
216 )
217 .optional()?
218 .unwrap_or(0))
219 }
220}
221
222fn ts(ts_utc: DateTime<Utc>) -> String {
223 ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
224}
225
226const SCHEMA_VERSION_SQL: &str = r#"
227CREATE TABLE IF NOT EXISTS schema_version (
228 id INTEGER PRIMARY KEY CHECK (id = 1),
229 version INTEGER NOT NULL
230);
231INSERT OR IGNORE INTO schema_version (id, version) VALUES (1, 0);
232"#;
233
234const MIGRATE_V1_SQL: &str = r#"
235CREATE TABLE IF NOT EXISTS messages (
236 id INTEGER PRIMARY KEY,
237 ts_utc TEXT NOT NULL,
238 source TEXT NOT NULL,
239 direction TEXT NOT NULL,
240 chat_id TEXT,
241 from_agent TEXT,
242 to_agent TEXT,
243 body TEXT,
244 raw_json TEXT
245);
246CREATE INDEX IF NOT EXISTS idx_messages_ts_utc ON messages(ts_utc);
247CREATE INDEX IF NOT EXISTS idx_messages_source_ts_utc ON messages(source, ts_utc);
248
249CREATE TABLE IF NOT EXISTS cli_invocations (
250 id INTEGER PRIMARY KEY,
251 ts_utc TEXT NOT NULL,
252 bin TEXT NOT NULL,
253 argv_json TEXT NOT NULL,
254 exit_code INTEGER,
255 duration_ms INTEGER,
256 host TEXT NOT NULL
257);
258CREATE INDEX IF NOT EXISTS idx_cli_invocations_ts_utc ON cli_invocations(ts_utc);
259
260CREATE TABLE IF NOT EXISTS crashes (
261 id INTEGER PRIMARY KEY,
262 ts_utc TEXT NOT NULL,
263 kind TEXT NOT NULL,
264 agent TEXT NOT NULL,
265 detail_json TEXT NOT NULL
266);
267CREATE INDEX IF NOT EXISTS idx_crashes_ts_utc ON crashes(ts_utc);
268CREATE INDEX IF NOT EXISTS idx_crashes_agent_ts_utc ON crashes(agent, ts_utc);
269
270CREATE TABLE IF NOT EXISTS ticks (
271 id INTEGER PRIMARY KEY,
272 ts_utc TEXT NOT NULL,
273 source TEXT NOT NULL,
274 detail_json TEXT NOT NULL
275);
276CREATE INDEX IF NOT EXISTS idx_ticks_ts_utc ON ticks(ts_utc);
277
278CREATE TABLE IF NOT EXISTS workspaces (
279 id INTEGER PRIMARY KEY,
280 ts_utc_created TEXT NOT NULL,
281 name TEXT NOT NULL,
282 branch TEXT NOT NULL,
283 ts_utc_deleted TEXT,
284 verdict TEXT
285);
286CREATE INDEX IF NOT EXISTS idx_workspaces_ts_utc_created ON workspaces(ts_utc_created);
287CREATE INDEX IF NOT EXISTS idx_workspaces_ts_utc_deleted ON workspaces(ts_utc_deleted);
288
289CREATE TABLE IF NOT EXISTS sessions (
290 id INTEGER PRIMARY KEY,
291 ts_utc TEXT NOT NULL,
292 agent TEXT NOT NULL,
293 session_num INTEGER NOT NULL,
294 event TEXT NOT NULL
295);
296CREATE INDEX IF NOT EXISTS idx_sessions_ts_utc ON sessions(ts_utc);
297CREATE INDEX IF NOT EXISTS idx_sessions_agent_ts_utc ON sessions(agent, ts_utc);
298"#;
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 fn db() -> Result<Db> {
305 let db = Db::open_in_memory()?;
306 db.migrate()?;
307 Ok(db)
308 }
309
310 #[test]
311 fn message_round_trip() -> Result<()> {
312 let db = db()?;
313 let ts = chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
314 .unwrap()
315 .with_timezone(&Utc);
316 let id = db.record_message(MessageRecord {
317 ts_utc: ts,
318 source: "imessage",
319 direction: Direction::Inbound,
320 chat_id: Some("chat-1"),
321 from_agent: Some("agent9"),
322 to_agent: None,
323 body: Some("hello"),
324 raw_json: Some("{\"chat_id\":\"chat-1\"}"),
325 })?;
326 let row: (i64, String, String, String, String, Option<String>) = db.conn.query_row(
327 "SELECT id, ts_utc, source, direction, chat_id, body FROM messages WHERE id = ?1",
328 params![id],
329 |row| {
330 Ok((
331 row.get(0)?,
332 row.get(1)?,
333 row.get(2)?,
334 row.get(3)?,
335 row.get(4)?,
336 row.get(5)?,
337 ))
338 },
339 )?;
340 assert_eq!(row.0, id);
341 assert_eq!(row.1, "2026-04-15T23:00:00.000Z");
342 assert_eq!(row.2, "imessage");
343 assert_eq!(row.3, "inbound");
344 assert_eq!(row.4, "chat-1");
345 assert_eq!(row.5.as_deref(), Some("hello"));
346 Ok(())
347 }
348
349 #[test]
350 fn cli_round_trip() -> Result<()> {
351 let db = db()?;
352 let ts = Utc::now();
353 let id = db.record_cli(
354 ts,
355 "netsky",
356 "[\"db\", \"migrate\"]",
357 Some(0),
358 Some(12),
359 "host-a",
360 )?;
361 let row: (String, String, i64, i64, String) = db.conn.query_row(
362 "SELECT ts_utc, bin, exit_code, duration_ms, host FROM cli_invocations WHERE id = ?1",
363 params![id],
364 |row| {
365 Ok((
366 row.get(0)?,
367 row.get(1)?,
368 row.get(2)?,
369 row.get(3)?,
370 row.get(4)?,
371 ))
372 },
373 )?;
374 assert_eq!(row.1, "netsky");
375 assert_eq!(row.2, 0);
376 assert_eq!(row.3, 12);
377 assert_eq!(row.4, "host-a");
378 assert!(row.0.contains('T'));
379 Ok(())
380 }
381
382 #[test]
383 fn crash_round_trip() -> Result<()> {
384 let db = db()?;
385 let id = db.record_crash(Utc::now(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
386 let row: (String, String, String, String) = db.conn.query_row(
387 "SELECT ts_utc, kind, agent, detail_json FROM crashes WHERE id = ?1",
388 params![id],
389 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
390 )?;
391 assert_eq!(row.1, "panic");
392 assert_eq!(row.2, "agent8");
393 assert_eq!(row.3, "{\"reason\":\"wedged\"}");
394 assert!(row.0.contains('T'));
395 Ok(())
396 }
397
398 #[test]
399 fn tick_round_trip() -> Result<()> {
400 let db = db()?;
401 let id = db.record_tick(Utc::now(), "ticker", "{\"beat\":1}")?;
402 let row: (String, String, String) = db.conn.query_row(
403 "SELECT ts_utc, source, detail_json FROM ticks WHERE id = ?1",
404 params![id],
405 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
406 )?;
407 assert_eq!(row.1, "ticker");
408 assert_eq!(row.2, "{\"beat\":1}");
409 assert!(row.0.contains('T'));
410 Ok(())
411 }
412
413 #[test]
414 fn workspace_round_trip() -> Result<()> {
415 let db = db()?;
416 let created = Utc::now();
417 let deleted = created + chrono::Duration::minutes(5);
418 let id = db.record_workspace(
419 created,
420 "session8",
421 "feat/netsky-db-v0",
422 Some(deleted),
423 Some("kept"),
424 )?;
425 let row: (String, String, String, Option<String>, Option<String>) = db.conn.query_row(
426 "SELECT ts_utc_created, name, branch, ts_utc_deleted, verdict FROM workspaces WHERE id = ?1",
427 params![id],
428 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)),
429 )?;
430 assert_eq!(row.1, "session8");
431 assert_eq!(row.2, "feat/netsky-db-v0");
432 assert_eq!(row.4.as_deref(), Some("kept"));
433 assert!(row.0.contains('T'));
434 assert!(row.3.as_deref().unwrap_or("").contains('T'));
435 Ok(())
436 }
437
438 #[test]
439 fn session_round_trip() -> Result<()> {
440 let db = db()?;
441 let id = db.record_session(Utc::now(), "agent8", 8, SessionEvent::Note)?;
442 let row: (String, String, i64, String) = db.conn.query_row(
443 "SELECT ts_utc, agent, session_num, event FROM sessions WHERE id = ?1",
444 params![id],
445 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
446 )?;
447 assert_eq!(row.1, "agent8");
448 assert_eq!(row.2, 8);
449 assert_eq!(row.3, "note");
450 assert!(row.0.contains('T'));
451 Ok(())
452 }
453}