1use std::collections::HashSet;
7use std::fs;
8use std::path::Path;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::{DateTime, SecondsFormat, Utc};
14use datafusion::arrow::array::{BooleanArray, Int64Array, StringArray};
15use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
16use datafusion::arrow::record_batch::RecordBatch;
17use datafusion::arrow::util::pretty::pretty_format_batches;
18use datafusion::datasource::MemTable;
19use datafusion::prelude::SessionContext;
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use thiserror::Error;
22use tokio::runtime::Runtime;
23use turso::{Builder, Connection, Value, params_from_iter};
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27pub use datafusion::arrow::array as arrow_array;
29pub use datafusion::arrow::record_batch::RecordBatch as ArrowRecordBatch;
30
31pub const SCHEMA_VERSION: i64 = 6;
32
33const TABLE_NAMES: [&str; 17] = [
34 "messages",
35 "cli_invocations",
36 "crashes",
37 "ticks",
38 "workspaces",
39 "sessions",
40 "clone_dispatches",
41 "harvest_events",
42 "communication_events",
43 "mcp_tool_calls",
44 "git_operations",
45 "owner_directives",
46 "token_usage",
47 "watchdog_events",
48 "netsky_tasks",
49 "source_errors",
50 "iroh_events",
51];
52
53const MESSAGES: &str = "messages";
54const CLI_INVOCATIONS: &str = "cli_invocations";
55const CRASHES: &str = "crashes";
56const TICKS: &str = "ticks";
57const WORKSPACES: &str = "workspaces";
58const SESSIONS: &str = "sessions";
59const CLONE_DISPATCHES: &str = "clone_dispatches";
60const HARVEST_EVENTS: &str = "harvest_events";
61const COMMUNICATION_EVENTS: &str = "communication_events";
62const MCP_TOOL_CALLS: &str = "mcp_tool_calls";
63const GIT_OPERATIONS: &str = "git_operations";
64const OWNER_DIRECTIVES: &str = "owner_directives";
65const TOKEN_USAGE: &str = "token_usage";
66const WATCHDOG_EVENTS: &str = "watchdog_events";
67const TASKS: &str = "netsky_tasks";
68const SOURCE_ERRORS: &str = "source_errors";
69const IROH_EVENTS: &str = "iroh_events";
70
71const DATAFUSION_TABLES: [(&str, &str); 17] = [
74 ("messages", MESSAGES),
75 ("cli_invocations", CLI_INVOCATIONS),
76 ("crashes", CRASHES),
77 ("ticks", TICKS),
78 ("workspaces", WORKSPACES),
79 ("sessions", SESSIONS),
80 ("clone_dispatches", CLONE_DISPATCHES),
81 ("harvest_events", HARVEST_EVENTS),
82 ("communication_events", COMMUNICATION_EVENTS),
83 ("mcp_tool_calls", MCP_TOOL_CALLS),
84 ("git_operations", GIT_OPERATIONS),
85 ("owner_directives", OWNER_DIRECTIVES),
86 ("token_usage", TOKEN_USAGE),
87 ("watchdog_events", WATCHDOG_EVENTS),
88 ("tasks", TASKS),
89 ("source_errors", SOURCE_ERRORS),
90 ("iroh_events", IROH_EVENTS),
91];
92
93const CLONE_LIFETIMES_VIEW: &str = "clone_lifetimes";
95
96const INDEXED_TIME_COLUMNS: &[(&str, &str)] = &[
104 (MESSAGES, "ts_utc"),
105 (CLI_INVOCATIONS, "ts_utc"),
106 (CRASHES, "ts_utc"),
107 (TICKS, "ts_utc"),
108 (WORKSPACES, "ts_utc_created"),
109 (SESSIONS, "ts_utc"),
110 (CLONE_DISPATCHES, "ts_utc_start"),
111 (HARVEST_EVENTS, "ts_utc"),
112 (COMMUNICATION_EVENTS, "ts_utc"),
113 (MCP_TOOL_CALLS, "ts_utc_start"),
114 (GIT_OPERATIONS, "ts_utc"),
115 (OWNER_DIRECTIVES, "ts_utc"),
116 (TOKEN_USAGE, "ts_utc"),
117 (WATCHDOG_EVENTS, "ts_utc"),
118 (TASKS, "created_at"),
119 (SOURCE_ERRORS, "ts_utc"),
120 (IROH_EVENTS, "ts_utc"),
121];
122
123const INDEXED_EQ_COLUMNS: &[(&str, &str)] = &[
127 (TASKS, "status"),
128 (TASKS, "priority"),
129 (MESSAGES, "source"),
130 (SESSIONS, "event"),
131 (IROH_EVENTS, "event_type"),
132];
133
134#[derive(Debug, Error)]
135pub enum Error {
136 #[error("home directory not found")]
137 HomeDirMissing,
138 #[error("schema version {found} is newer than supported {supported}")]
139 FutureSchemaVersion { found: i64, supported: i64 },
140 #[error("meta.db has not been initialized at {0}; a writer must run first.")]
141 NotInitialized(PathBuf),
142 #[error("read-only connection was writable at {0}")]
143 ReadOnlyNotEnforced(PathBuf),
144 #[error(transparent)]
145 Turso(#[from] turso::Error),
146 #[error(transparent)]
147 DataFusion(#[from] datafusion::error::DataFusionError),
148 #[error(transparent)]
149 Arrow(#[from] datafusion::arrow::error::ArrowError),
150 #[error(transparent)]
151 Serde(#[from] serde_json::Error),
152 #[error(transparent)]
153 Io(#[from] std::io::Error),
154 #[error(transparent)]
155 Chrono(#[from] chrono::ParseError),
156 #[error("{0} not found")]
157 NotFound(String),
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
161pub enum Direction {
162 Inbound,
163 Outbound,
164}
165
166impl Direction {
167 fn as_str(self) -> &'static str {
168 match self {
169 Direction::Inbound => "inbound",
170 Direction::Outbound => "outbound",
171 }
172 }
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
176pub enum SessionEvent {
177 Up,
178 Down,
179 Note,
180}
181
182impl SessionEvent {
183 fn as_str(self) -> &'static str {
184 match self {
185 SessionEvent::Up => "up",
186 SessionEvent::Down => "down",
187 SessionEvent::Note => "note",
188 }
189 }
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
196pub enum EventStatus {
197 Pending,
198 Delivered,
199 Failed,
200}
201
202impl EventStatus {
203 pub fn as_str(self) -> &'static str {
204 match self {
205 EventStatus::Pending => "pending",
206 EventStatus::Delivered => "delivered",
207 EventStatus::Failed => "failed",
208 }
209 }
210}
211
212#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214pub struct SourceCursorRow {
215 pub source: String,
216 pub cursor_value: String,
217 pub updated_at: String,
218}
219
220#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
222pub struct EventRow {
223 pub id: i64,
224 pub source: String,
225 pub ts_utc: String,
226 pub payload_json: String,
227 pub delivery_status: String,
228 pub reason: Option<String>,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
236pub enum SourceErrorClass {
237 Timeout,
238 AuthFailure,
239 RateLimit,
240 NetworkError,
241 ProtocolError,
242 NotFound,
243 PermissionDenied,
244 Unknown,
245}
246
247impl SourceErrorClass {
248 pub fn as_str(self) -> &'static str {
249 match self {
250 SourceErrorClass::Timeout => "timeout",
251 SourceErrorClass::AuthFailure => "auth_failure",
252 SourceErrorClass::RateLimit => "rate_limit",
253 SourceErrorClass::NetworkError => "network_error",
254 SourceErrorClass::ProtocolError => "protocol_error",
255 SourceErrorClass::NotFound => "not_found",
256 SourceErrorClass::PermissionDenied => "permission_denied",
257 SourceErrorClass::Unknown => "unknown",
258 }
259 }
260}
261
262#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
265pub enum IrohEventType {
266 Connect,
267 Evict,
268 Reconnect,
269 HandshakeRefused,
270}
271
272impl IrohEventType {
273 pub fn as_str(self) -> &'static str {
274 match self {
275 IrohEventType::Connect => "connect",
276 IrohEventType::Evict => "evict",
277 IrohEventType::Reconnect => "reconnect",
278 IrohEventType::HandshakeRefused => "handshake_refused",
279 }
280 }
281}
282
283pub fn hash_peer_id(raw_node_id: &str) -> String {
288 use sha2::{Digest, Sha256};
289 let mut hasher = Sha256::new();
290 hasher.update(raw_node_id.as_bytes());
291 let digest = hasher.finalize();
292 hex_encode(&digest[..8])
293}
294
295fn hex_encode(bytes: &[u8]) -> String {
296 const HEX: &[u8; 16] = b"0123456789abcdef";
297 let mut out = String::with_capacity(bytes.len() * 2);
298 for byte in bytes {
299 out.push(HEX[(byte >> 4) as usize] as char);
300 out.push(HEX[(byte & 0x0f) as usize] as char);
301 }
302 out
303}
304
305pub struct Db {
306 path: PathBuf,
307}
308
309pub struct MessageRecord<'a> {
310 pub ts_utc: DateTime<Utc>,
311 pub source: &'a str,
312 pub direction: Direction,
313 pub chat_id: Option<&'a str>,
314 pub from_agent: Option<&'a str>,
315 pub to_agent: Option<&'a str>,
316 pub body: Option<&'a str>,
317 pub raw_json: Option<&'a str>,
318}
319
320pub struct CloneDispatchRecord<'a> {
321 pub ts_utc_start: DateTime<Utc>,
322 pub ts_utc_end: Option<DateTime<Utc>>,
323 pub agent_id: &'a str,
324 pub runtime: Option<&'a str>,
325 pub brief_path: Option<&'a str>,
326 pub brief: Option<&'a str>,
327 pub workspace: Option<&'a str>,
328 pub branch: Option<&'a str>,
329 pub status: Option<&'a str>,
330 pub exit_code: Option<i64>,
331 pub detail_json: Option<&'a str>,
332}
333
334pub struct HarvestEventRecord<'a> {
335 pub ts_utc: DateTime<Utc>,
336 pub source_branch: &'a str,
337 pub target_branch: &'a str,
338 pub commit_sha: Option<&'a str>,
339 pub status: &'a str,
340 pub conflicts: Option<&'a str>,
341 pub detail_json: Option<&'a str>,
342}
343
344pub struct CommunicationEventRecord<'a> {
345 pub ts_utc: DateTime<Utc>,
346 pub source: &'a str,
347 pub tool: Option<&'a str>,
348 pub direction: Direction,
349 pub chat_id: Option<&'a str>,
350 pub message_id: Option<&'a str>,
351 pub handle: Option<&'a str>,
352 pub agent: Option<&'a str>,
353 pub body: Option<&'a str>,
354 pub status: Option<&'a str>,
355 pub detail_json: Option<&'a str>,
356}
357
358#[derive(Debug, Clone, Eq, PartialEq)]
359pub struct CommunicationEvent {
360 pub id: i64,
361 pub ts_utc: DateTime<Utc>,
362 pub source: String,
363 pub tool: Option<String>,
364 pub direction: String,
365 pub chat_id: Option<String>,
366 pub message_id: Option<String>,
367 pub handle: Option<String>,
368 pub agent: Option<String>,
369 pub body: Option<String>,
370 pub status: Option<String>,
371 pub detail_json: Option<String>,
372}
373
374pub struct McpToolCallRecord<'a> {
375 pub ts_utc_start: DateTime<Utc>,
376 pub ts_utc_end: Option<DateTime<Utc>>,
377 pub source: &'a str,
378 pub tool: &'a str,
379 pub agent: Option<&'a str>,
380 pub duration_ms: Option<i64>,
381 pub success: bool,
382 pub error: Option<&'a str>,
383 pub timeout_race: bool,
384 pub request_json: Option<&'a str>,
385 pub response_json: Option<&'a str>,
386}
387
388pub struct GitOperationRecord<'a> {
389 pub ts_utc: DateTime<Utc>,
390 pub operation: &'a str,
391 pub repo: &'a str,
392 pub branch: Option<&'a str>,
393 pub remote: Option<&'a str>,
394 pub from_sha: Option<&'a str>,
395 pub to_sha: Option<&'a str>,
396 pub status: &'a str,
397 pub detail_json: Option<&'a str>,
398}
399
400pub struct OwnerDirectiveRecord<'a> {
401 pub ts_utc: DateTime<Utc>,
402 pub source: &'a str,
403 pub chat_id: Option<&'a str>,
404 pub raw_text: &'a str,
405 pub resolved_action: Option<&'a str>,
406 pub agent: Option<&'a str>,
407 pub status: Option<&'a str>,
408 pub detail_json: Option<&'a str>,
409}
410
411pub struct TokenUsageRecord<'a> {
412 pub ts_utc: DateTime<Utc>,
413 pub session_id: Option<&'a str>,
414 pub agent: Option<&'a str>,
415 pub runtime: Option<&'a str>,
419 pub model: Option<&'a str>,
420 pub input_tokens: Option<i64>,
421 pub output_tokens: Option<i64>,
422 pub cached_input_tokens: Option<i64>,
423 pub cost_usd_micros: Option<i64>,
424 pub detail_json: Option<&'a str>,
425}
426
427pub struct SourceErrorRecord<'a> {
428 pub ts_utc: DateTime<Utc>,
429 pub source: &'a str,
430 pub error_class: SourceErrorClass,
431 pub count: i64,
435 pub detail_json: Option<&'a str>,
436}
437
438pub struct IrohEventRecord<'a> {
439 pub ts_utc: DateTime<Utc>,
440 pub event_type: IrohEventType,
441 pub peer_id_hash: &'a str,
444 pub peer_label: Option<&'a str>,
448 pub detail_json: Option<&'a str>,
449}
450
451pub struct WatchdogEventRecord<'a> {
452 pub ts_utc: DateTime<Utc>,
453 pub event: &'a str,
454 pub agent: Option<&'a str>,
455 pub severity: Option<&'a str>,
456 pub status: Option<&'a str>,
457 pub detail_json: Option<&'a str>,
458}
459
460pub struct TaskRecord<'a> {
461 pub title: &'a str,
462 pub body: Option<&'a str>,
463 pub status: &'a str,
464 pub priority: Option<&'a str>,
465 pub labels: Option<&'a str>,
466 pub source: Option<&'a str>,
467 pub source_ref: Option<&'a str>,
468 pub closed_at: Option<&'a str>,
469 pub closed_reason: Option<&'a str>,
470 pub closed_evidence: Option<&'a str>,
471 pub agent: Option<&'a str>,
472 pub sync_calendar: bool,
473 pub calendar_task_id: Option<&'a str>,
474}
475
476pub struct TaskUpdate<'a> {
477 pub id: i64,
478 pub status: Option<&'a str>,
479 pub priority: Option<&'a str>,
480 pub agent: Option<&'a str>,
481 pub closed_reason: Option<&'a str>,
482 pub closed_evidence: Option<&'a str>,
483 pub sync_calendar: Option<bool>,
484 pub calendar_task_id: Option<&'a str>,
485}
486
487#[derive(Debug, Serialize, Deserialize)]
488struct MessageRow {
489 id: i64,
490 ts_utc: String,
491 source: String,
492 direction: String,
493 chat_id: Option<String>,
494 from_agent: Option<String>,
495 to_agent: Option<String>,
496 body: Option<String>,
497 raw_json: Option<String>,
498}
499
500#[derive(Debug, Serialize, Deserialize)]
501struct CliRow {
502 id: i64,
503 ts_utc: String,
504 bin: String,
505 argv_json: String,
506 exit_code: Option<i64>,
507 duration_ms: Option<i64>,
508 host: String,
509}
510
511#[derive(Debug, Serialize, Deserialize)]
512struct CrashRow {
513 id: i64,
514 ts_utc: String,
515 kind: String,
516 agent: String,
517 detail_json: String,
518}
519
520#[derive(Debug, Serialize, Deserialize)]
521struct TickRow {
522 id: i64,
523 ts_utc: String,
524 source: String,
525 detail_json: String,
526}
527
528#[derive(Debug, Serialize, Deserialize)]
529struct WorkspaceRow {
530 id: i64,
531 ts_utc_created: String,
532 name: String,
533 branch: String,
534 ts_utc_deleted: Option<String>,
535 verdict: Option<String>,
536}
537
538#[derive(Debug, Serialize, Deserialize)]
539struct SessionRow {
540 id: i64,
541 ts_utc: String,
542 agent: String,
543 session_num: i64,
544 event: String,
545}
546
547#[derive(Debug, Serialize, Deserialize)]
548struct CloneDispatchRow {
549 id: i64,
550 ts_utc_start: String,
551 ts_utc_end: Option<String>,
552 agent_id: String,
553 runtime: Option<String>,
554 brief_path: Option<String>,
555 brief: Option<String>,
556 workspace: Option<String>,
557 branch: Option<String>,
558 status: Option<String>,
559 exit_code: Option<i64>,
560 detail_json: Option<String>,
561}
562
563#[derive(Debug, Serialize, Deserialize)]
564struct HarvestEventRow {
565 id: i64,
566 ts_utc: String,
567 source_branch: String,
568 target_branch: String,
569 commit_sha: Option<String>,
570 status: String,
571 conflicts: Option<String>,
572 detail_json: Option<String>,
573}
574
575#[derive(Debug, Serialize, Deserialize)]
576struct CommunicationEventRow {
577 id: i64,
578 ts_utc: String,
579 source: String,
580 tool: Option<String>,
581 direction: String,
582 chat_id: Option<String>,
583 message_id: Option<String>,
584 handle: Option<String>,
585 agent: Option<String>,
586 body: Option<String>,
587 status: Option<String>,
588 detail_json: Option<String>,
589}
590
591impl TryFrom<CommunicationEventRow> for CommunicationEvent {
592 type Error = Error;
593
594 fn try_from(row: CommunicationEventRow) -> Result<Self> {
595 Ok(Self {
596 id: row.id,
597 ts_utc: parse_ts(&row.ts_utc)?,
598 source: row.source,
599 tool: row.tool,
600 direction: row.direction,
601 chat_id: row.chat_id,
602 message_id: row.message_id,
603 handle: row.handle,
604 agent: row.agent,
605 body: row.body,
606 status: row.status,
607 detail_json: row.detail_json,
608 })
609 }
610}
611
612#[derive(Debug, Serialize, Deserialize)]
613struct McpToolCallRow {
614 id: i64,
615 ts_utc_start: String,
616 ts_utc_end: Option<String>,
617 source: String,
618 tool: String,
619 agent: Option<String>,
620 duration_ms: Option<i64>,
621 success: bool,
622 error: Option<String>,
623 timeout_race: bool,
624 request_json: Option<String>,
625 response_json: Option<String>,
626}
627
628#[derive(Debug, Serialize, Deserialize)]
629struct GitOperationRow {
630 id: i64,
631 ts_utc: String,
632 operation: String,
633 repo: String,
634 branch: Option<String>,
635 remote: Option<String>,
636 from_sha: Option<String>,
637 to_sha: Option<String>,
638 status: String,
639 detail_json: Option<String>,
640}
641
642#[derive(Debug, Serialize, Deserialize)]
643struct OwnerDirectiveRow {
644 id: i64,
645 ts_utc: String,
646 source: String,
647 chat_id: Option<String>,
648 raw_text: String,
649 resolved_action: Option<String>,
650 agent: Option<String>,
651 status: Option<String>,
652 detail_json: Option<String>,
653}
654
655#[derive(Debug, Serialize, Deserialize)]
656struct TokenUsageRow {
657 id: i64,
658 ts_utc: String,
659 session_id: Option<String>,
660 agent: Option<String>,
661 #[serde(default)]
662 runtime: Option<String>,
663 model: Option<String>,
664 input_tokens: Option<i64>,
665 output_tokens: Option<i64>,
666 cached_input_tokens: Option<i64>,
667 cost_usd_micros: Option<i64>,
668 detail_json: Option<String>,
669}
670
671#[derive(Debug, Serialize, Deserialize)]
672struct SourceErrorRow {
673 id: i64,
674 ts_utc: String,
675 source: String,
676 error_class: String,
677 count: i64,
678 detail_json: Option<String>,
679}
680
681#[derive(Debug, Serialize, Deserialize)]
682struct IrohEventRow {
683 id: i64,
684 ts_utc: String,
685 event_type: String,
686 peer_id_hash: String,
687 peer_label: Option<String>,
688 detail_json: Option<String>,
689}
690
691#[derive(Debug, Clone, Serialize, Deserialize)]
692struct WatchdogEventRow {
693 id: i64,
694 ts_utc: String,
695 event: String,
696 agent: Option<String>,
697 severity: Option<String>,
698 status: Option<String>,
699 detail_json: Option<String>,
700}
701
702#[derive(Debug, Clone, Serialize, Deserialize)]
703pub struct TaskRow {
704 pub id: i64,
705 pub created_at: String,
706 pub updated_at: String,
707 pub title: String,
708 pub body: Option<String>,
709 pub status: String,
710 pub priority: Option<String>,
711 pub labels: Option<String>,
712 pub source: Option<String>,
713 pub source_ref: Option<String>,
714 pub closed_at: Option<String>,
715 pub closed_reason: Option<String>,
716 pub closed_evidence: Option<String>,
717 pub agent: Option<String>,
718 #[serde(default)]
719 pub sync_calendar: bool,
720 #[serde(default)]
721 pub calendar_task_id: Option<String>,
722}
723
724impl Db {
725 pub fn open() -> Result<Self> {
726 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
727 let dir = home.join(".netsky");
728 fs::create_dir_all(&dir)?;
729 Self::open_path(dir.join("meta.db"))
730 }
731
732 pub fn open_read_only() -> Result<Self> {
733 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
734 Self::open_read_only_path(home.join(".netsky").join("meta.db"))
735 }
736
737 pub fn open_path(path: impl AsRef<Path>) -> Result<Self> {
738 let path = path.as_ref().to_path_buf();
739 if path != Path::new(":memory:")
740 && let Some(parent) = path.parent()
741 {
742 fs::create_dir_all(parent)?;
743 }
744 let db = Self { path };
745 db.with_conn(|_| async { Ok(()) })?;
746 Ok(db)
747 }
748
749 pub fn open_read_only_path(path: impl AsRef<Path>) -> Result<Self> {
750 let path = path.as_ref().to_path_buf();
751 if path != Path::new(":memory:") && !path.exists() {
752 return Err(Error::NotInitialized(path));
753 }
754 let db = Self { path };
755 db.with_read_only_conn(|_| async { Ok(()) })?;
756 Ok(db)
757 }
758
759 #[cfg(test)]
760 pub(crate) fn open_in_memory() -> Result<Self> {
761 let path = std::env::temp_dir().join(format!(
762 "netsky-db-test-{}-{}.db",
763 std::process::id(),
764 Utc::now().timestamp_nanos_opt().unwrap_or_default()
765 ));
766 Self::open_path(path)
767 }
768
769 pub fn migrate(&self) -> Result<()> {
770 self.with_conn(|conn| async move {
771 let current = schema_version(&conn).await?;
772 if current > SCHEMA_VERSION {
773 return Err(Error::FutureSchemaVersion {
774 found: current,
775 supported: SCHEMA_VERSION,
776 });
777 }
778 conn.execute(
779 "INSERT INTO meta (key, value) VALUES (?1, ?2) \
780 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
781 params_from_iter([Value::from("schema_version"), Value::from(SCHEMA_VERSION)]),
782 )
783 .await?;
784 for table in TABLE_NAMES {
785 let sql = format!(
786 "CREATE TABLE IF NOT EXISTS {table} \
787 (id INTEGER PRIMARY KEY, row_json TEXT NOT NULL)"
788 );
789 conn.execute(&sql, ()).await?;
790 }
791 for &(table, column) in INDEXED_TIME_COLUMNS {
792 let sql = format!(
793 "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
794 ON {table}(json_extract(row_json, '$.{column}'))"
795 );
796 conn.execute(&sql, ()).await?;
797 }
798 for &(table, column) in INDEXED_EQ_COLUMNS {
799 let sql = format!(
800 "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
801 ON {table}(json_extract(row_json, '$.{column}'))"
802 );
803 conn.execute(&sql, ()).await?;
804 }
805 conn.execute(
810 "CREATE TABLE IF NOT EXISTS source_cursors ( \
811 source TEXT PRIMARY KEY, \
812 cursor_value TEXT NOT NULL, \
813 updated_at TEXT NOT NULL \
814 )",
815 (),
816 )
817 .await?;
818 conn.execute(
819 "CREATE TABLE IF NOT EXISTS events ( \
820 id INTEGER PRIMARY KEY AUTOINCREMENT, \
821 source TEXT NOT NULL, \
822 ts_utc TEXT NOT NULL, \
823 payload_json TEXT NOT NULL, \
824 delivery_status TEXT NOT NULL, \
825 reason TEXT \
826 )",
827 (),
828 )
829 .await?;
830 conn.execute(&format!("PRAGMA user_version = {SCHEMA_VERSION}"), ())
831 .await?;
832 Ok(())
833 })
834 }
835
836 pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
837 self.insert_json("messages", MESSAGES, |id| MessageRow {
838 id,
839 ts_utc: ts(record.ts_utc),
840 source: record.source.to_string(),
841 direction: record.direction.as_str().to_string(),
842 chat_id: record.chat_id.map(str::to_string),
843 from_agent: record.from_agent.map(str::to_string),
844 to_agent: record.to_agent.map(str::to_string),
845 body: record.body.map(str::to_string),
846 raw_json: record.raw_json.map(str::to_string),
847 })
848 }
849
850 pub fn record_cli(
851 &self,
852 ts_utc: DateTime<Utc>,
853 bin: &str,
854 argv_json: &str,
855 exit_code: Option<i64>,
856 duration_ms: Option<i64>,
857 host: &str,
858 ) -> Result<i64> {
859 self.insert_json("cli_invocations", CLI_INVOCATIONS, |id| CliRow {
860 id,
861 ts_utc: ts(ts_utc),
862 bin: bin.to_string(),
863 argv_json: argv_json.to_string(),
864 exit_code,
865 duration_ms,
866 host: host.to_string(),
867 })
868 }
869
870 pub fn record_crash(
871 &self,
872 ts_utc: DateTime<Utc>,
873 kind: &str,
874 agent: &str,
875 detail_json: &str,
876 ) -> Result<i64> {
877 self.insert_json("crashes", CRASHES, |id| CrashRow {
878 id,
879 ts_utc: ts(ts_utc),
880 kind: kind.to_string(),
881 agent: agent.to_string(),
882 detail_json: detail_json.to_string(),
883 })
884 }
885
886 pub fn record_tick(
887 &self,
888 ts_utc: DateTime<Utc>,
889 source: &str,
890 detail_json: &str,
891 ) -> Result<i64> {
892 self.insert_json("ticks", TICKS, |id| TickRow {
893 id,
894 ts_utc: ts(ts_utc),
895 source: source.to_string(),
896 detail_json: detail_json.to_string(),
897 })
898 }
899
900 pub fn record_workspace(
901 &self,
902 ts_utc_created: DateTime<Utc>,
903 name: &str,
904 branch: &str,
905 ts_utc_deleted: Option<DateTime<Utc>>,
906 verdict: Option<&str>,
907 ) -> Result<i64> {
908 self.insert_json("workspaces", WORKSPACES, |id| WorkspaceRow {
909 id,
910 ts_utc_created: ts(ts_utc_created),
911 name: name.to_string(),
912 branch: branch.to_string(),
913 ts_utc_deleted: ts_utc_deleted.map(ts),
914 verdict: verdict.map(str::to_string),
915 })
916 }
917
918 pub fn record_session(
919 &self,
920 ts_utc: DateTime<Utc>,
921 agent: &str,
922 session_num: i64,
923 event: SessionEvent,
924 ) -> Result<i64> {
925 self.insert_json("sessions", SESSIONS, |id| SessionRow {
926 id,
927 ts_utc: ts(ts_utc),
928 agent: agent.to_string(),
929 session_num,
930 event: event.as_str().to_string(),
931 })
932 }
933
934 pub fn record_clone_dispatch(&self, record: CloneDispatchRecord<'_>) -> Result<i64> {
935 self.insert_json("clone_dispatches", CLONE_DISPATCHES, |id| {
936 CloneDispatchRow {
937 id,
938 ts_utc_start: ts(record.ts_utc_start),
939 ts_utc_end: record.ts_utc_end.map(ts),
940 agent_id: record.agent_id.to_string(),
941 runtime: record.runtime.map(str::to_string),
942 brief_path: record.brief_path.map(str::to_string),
943 brief: truncate_opt(record.brief, 16_384),
944 workspace: record.workspace.map(str::to_string),
945 branch: record.branch.map(str::to_string),
946 status: record.status.map(str::to_string),
947 exit_code: record.exit_code,
948 detail_json: record.detail_json.map(str::to_string),
949 }
950 })
951 }
952
953 pub fn record_harvest_event(&self, record: HarvestEventRecord<'_>) -> Result<i64> {
954 self.insert_json("harvest_events", HARVEST_EVENTS, |id| HarvestEventRow {
955 id,
956 ts_utc: ts(record.ts_utc),
957 source_branch: record.source_branch.to_string(),
958 target_branch: record.target_branch.to_string(),
959 commit_sha: record.commit_sha.map(str::to_string),
960 status: record.status.to_string(),
961 conflicts: record.conflicts.map(str::to_string),
962 detail_json: record.detail_json.map(str::to_string),
963 })
964 }
965
966 pub fn record_communication_event(&self, record: CommunicationEventRecord<'_>) -> Result<i64> {
967 self.insert_json("communication_events", COMMUNICATION_EVENTS, |id| {
968 CommunicationEventRow {
969 id,
970 ts_utc: ts(record.ts_utc),
971 source: record.source.to_string(),
972 tool: record.tool.map(str::to_string),
973 direction: record.direction.as_str().to_string(),
974 chat_id: record.chat_id.map(str::to_string),
975 message_id: record.message_id.map(str::to_string),
976 handle: record.handle.map(str::to_string),
977 agent: record.agent.map(str::to_string),
978 body: truncate_opt(record.body, 4096),
979 status: record.status.map(str::to_string),
980 detail_json: record.detail_json.map(str::to_string),
981 }
982 })
983 }
984
985 pub fn list_communication_events(&self) -> Result<Vec<CommunicationEvent>> {
986 let rows: Vec<CommunicationEventRow> = self
987 .with_read_only_conn(|conn| async move { rows(&conn, COMMUNICATION_EVENTS).await })?;
988 rows.into_iter().map(CommunicationEvent::try_from).collect()
989 }
990
991 pub fn record_mcp_tool_call(&self, record: McpToolCallRecord<'_>) -> Result<i64> {
992 self.insert_json("mcp_tool_calls", MCP_TOOL_CALLS, |id| McpToolCallRow {
993 id,
994 ts_utc_start: ts(record.ts_utc_start),
995 ts_utc_end: record.ts_utc_end.map(ts),
996 source: record.source.to_string(),
997 tool: record.tool.to_string(),
998 agent: record.agent.map(str::to_string),
999 duration_ms: record.duration_ms,
1000 success: record.success,
1001 error: truncate_opt(record.error, 2048),
1002 timeout_race: record.timeout_race,
1003 request_json: truncate_opt(record.request_json, 8192),
1004 response_json: truncate_opt(record.response_json, 8192),
1005 })
1006 }
1007
1008 pub fn record_git_operation(&self, record: GitOperationRecord<'_>) -> Result<i64> {
1009 self.insert_json("git_operations", GIT_OPERATIONS, |id| GitOperationRow {
1010 id,
1011 ts_utc: ts(record.ts_utc),
1012 operation: record.operation.to_string(),
1013 repo: record.repo.to_string(),
1014 branch: record.branch.map(str::to_string),
1015 remote: record.remote.map(str::to_string),
1016 from_sha: record.from_sha.map(str::to_string),
1017 to_sha: record.to_sha.map(str::to_string),
1018 status: record.status.to_string(),
1019 detail_json: record.detail_json.map(str::to_string),
1020 })
1021 }
1022
1023 pub fn record_owner_directive(&self, record: OwnerDirectiveRecord<'_>) -> Result<i64> {
1024 self.insert_json("owner_directives", OWNER_DIRECTIVES, |id| {
1025 OwnerDirectiveRow {
1026 id,
1027 ts_utc: ts(record.ts_utc),
1028 source: record.source.to_string(),
1029 chat_id: record.chat_id.map(str::to_string),
1030 raw_text: truncate(record.raw_text, 16_384),
1031 resolved_action: record.resolved_action.map(str::to_string),
1032 agent: record.agent.map(str::to_string),
1033 status: record.status.map(str::to_string),
1034 detail_json: record.detail_json.map(str::to_string),
1035 }
1036 })
1037 }
1038
1039 pub fn record_token_usage(&self, record: TokenUsageRecord<'_>) -> Result<i64> {
1040 self.insert_json("token_usage", TOKEN_USAGE, |id| TokenUsageRow {
1041 id,
1042 ts_utc: ts(record.ts_utc),
1043 session_id: record.session_id.map(str::to_string),
1044 agent: record.agent.map(str::to_string),
1045 runtime: record.runtime.map(str::to_string),
1046 model: record.model.map(str::to_string),
1047 input_tokens: record.input_tokens,
1048 output_tokens: record.output_tokens,
1049 cached_input_tokens: record.cached_input_tokens,
1050 cost_usd_micros: record.cost_usd_micros,
1051 detail_json: record.detail_json.map(str::to_string),
1052 })
1053 }
1054
1055 pub fn record_token_usage_batch<'a, I>(&self, records: I) -> Result<usize>
1062 where
1063 I: IntoIterator<Item = TokenUsageRecord<'a>>,
1064 {
1065 let raw: Vec<TokenUsageRecord<'_>> = records.into_iter().collect();
1066 if raw.is_empty() {
1067 return Ok(0);
1068 }
1069 let count = raw.len();
1070 let start_id = self.bulk_alloc_ids(TOKEN_USAGE, count as i64)?;
1071 let mut payloads: Vec<(i64, String)> = Vec::with_capacity(count);
1072 let mut owned_rows: Vec<TokenUsageRow> = Vec::with_capacity(count);
1073 for (offset, record) in raw.into_iter().enumerate() {
1074 let id = start_id + offset as i64;
1075 let row = TokenUsageRow {
1076 id,
1077 ts_utc: ts(record.ts_utc),
1078 session_id: record.session_id.map(str::to_string),
1079 agent: record.agent.map(str::to_string),
1080 runtime: record.runtime.map(str::to_string),
1081 model: record.model.map(str::to_string),
1082 input_tokens: record.input_tokens,
1083 output_tokens: record.output_tokens,
1084 cached_input_tokens: record.cached_input_tokens,
1085 cost_usd_micros: record.cost_usd_micros,
1086 detail_json: record.detail_json.map(str::to_string),
1087 };
1088 payloads.push((id, serde_json::to_string(&row)?));
1089 owned_rows.push(row);
1090 }
1091 let table = TOKEN_USAGE;
1092 let payloads_for_write = payloads.clone();
1093 let write = self.with_conn(move |conn| async move {
1094 let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
1095 conn.execute("BEGIN", ()).await?;
1096 for (id, json) in payloads_for_write {
1097 if let Err(error) = conn
1098 .execute(&sql, params_from_iter([Value::from(id), Value::from(json)]))
1099 .await
1100 {
1101 let _ = conn.execute("ROLLBACK", ()).await;
1102 return Err(error.into());
1103 }
1104 }
1105 conn.execute("COMMIT", ()).await?;
1106 Ok(())
1107 });
1108 if let Err(error) = write {
1109 for row in &owned_rows {
1110 spool_write_error(table, row.id, row, &error)?;
1111 }
1112 return Err(error);
1113 }
1114 Ok(count)
1115 }
1116
1117 fn bulk_alloc_ids(&self, name: &'static str, count: i64) -> Result<i64> {
1122 if count <= 0 {
1123 return Ok(0);
1124 }
1125 let result = self.with_conn(move |conn| async move {
1126 let mut rows = conn
1127 .query(
1128 "INSERT INTO ids (name, id) VALUES (?1, ?2) \
1129 ON CONFLICT(name) DO UPDATE SET id = id + ?2 \
1130 RETURNING id",
1131 params_from_iter([Value::from(name), Value::from(count)]),
1132 )
1133 .await?;
1134 if let Some(row) = rows.next().await? {
1135 let last = integer_value(&row.get_value(0)?)?;
1136 return Ok(last - count + 1);
1137 }
1138 Err(Error::NotFound(format!("bulk id allocation for {name}")))
1139 });
1140 match result {
1141 Ok(start) => Ok(start),
1142 Err(error) => {
1143 let fallback = Utc::now().timestamp_micros();
1144 spool_error_json(serde_json::json!({
1145 "ts_utc": ts(Utc::now()),
1146 "table": name,
1147 "id": fallback,
1148 "count": count,
1149 "error": error.to_string(),
1150 "record": {"kind": "bulk_id_allocation"}
1151 }))?;
1152 Ok(fallback)
1153 }
1154 }
1155 }
1156
1157 pub fn record_source_error(&self, record: SourceErrorRecord<'_>) -> Result<i64> {
1158 self.insert_json("source_errors", SOURCE_ERRORS, |id| SourceErrorRow {
1159 id,
1160 ts_utc: ts(record.ts_utc),
1161 source: record.source.to_string(),
1162 error_class: record.error_class.as_str().to_string(),
1163 count: record.count.max(1),
1164 detail_json: record.detail_json.map(str::to_string),
1165 })
1166 }
1167
1168 pub fn record_iroh_event(&self, record: IrohEventRecord<'_>) -> Result<i64> {
1169 self.insert_json("iroh_events", IROH_EVENTS, |id| IrohEventRow {
1170 id,
1171 ts_utc: ts(record.ts_utc),
1172 event_type: record.event_type.as_str().to_string(),
1173 peer_id_hash: record.peer_id_hash.to_string(),
1174 peer_label: record.peer_label.map(str::to_string),
1175 detail_json: record.detail_json.map(str::to_string),
1176 })
1177 }
1178
1179 pub fn record_watchdog_event(&self, record: WatchdogEventRecord<'_>) -> Result<i64> {
1180 self.insert_json("watchdog_events", WATCHDOG_EVENTS, |id| WatchdogEventRow {
1181 id,
1182 ts_utc: ts(record.ts_utc),
1183 event: record.event.to_string(),
1184 agent: record.agent.map(str::to_string),
1185 severity: record.severity.map(str::to_string),
1186 status: record.status.map(str::to_string),
1187 detail_json: record.detail_json.map(str::to_string),
1188 })
1189 }
1190
1191 pub fn read_source_cursor(&self, source: &str) -> Result<Option<String>> {
1194 let source = source.to_string();
1195 self.with_conn(move |conn| async move {
1196 let mut rows = conn
1197 .query(
1198 "SELECT cursor_value FROM source_cursors WHERE source = ?1",
1199 params_from_iter([Value::from(source)]),
1200 )
1201 .await?;
1202 let Some(row) = rows.next().await? else {
1203 return Ok(None);
1204 };
1205 Ok(Some(text_value(&row.get_value(0)?)?))
1206 })
1207 }
1208
1209 pub fn update_source_cursor(&self, source: &str, value: &str) -> Result<()> {
1212 let source = source.to_string();
1213 let value = value.to_string();
1214 let now = ts(Utc::now());
1215 self.with_conn(move |conn| async move {
1216 conn.execute(
1217 "INSERT INTO source_cursors (source, cursor_value, updated_at) \
1218 VALUES (?1, ?2, ?3) \
1219 ON CONFLICT(source) DO UPDATE SET \
1220 cursor_value = excluded.cursor_value, \
1221 updated_at = excluded.updated_at",
1222 params_from_iter([Value::from(source), Value::from(value), Value::from(now)]),
1223 )
1224 .await?;
1225 Ok(())
1226 })
1227 }
1228
1229 pub fn reset_source_cursor(&self, source: &str) -> Result<bool> {
1232 let source = source.to_string();
1233 self.with_conn(move |conn| async move {
1234 let mut rows = conn
1235 .query(
1236 "DELETE FROM source_cursors WHERE source = ?1 RETURNING source",
1237 params_from_iter([Value::from(source)]),
1238 )
1239 .await?;
1240 Ok(rows.next().await?.is_some())
1241 })
1242 }
1243
1244 pub fn list_source_cursors(&self) -> Result<Vec<SourceCursorRow>> {
1245 self.with_read_only_conn(|conn| async move {
1246 let mut rows = conn
1247 .query(
1248 "SELECT source, cursor_value, updated_at FROM source_cursors ORDER BY source",
1249 (),
1250 )
1251 .await?;
1252 let mut out = Vec::new();
1253 while let Some(row) = rows.next().await? {
1254 out.push(SourceCursorRow {
1255 source: text_value(&row.get_value(0)?)?,
1256 cursor_value: text_value(&row.get_value(1)?)?,
1257 updated_at: text_value(&row.get_value(2)?)?,
1258 });
1259 }
1260 Ok(out)
1261 })
1262 }
1263
1264 pub fn insert_event(
1268 &self,
1269 source: &str,
1270 ts_utc: DateTime<Utc>,
1271 payload_json: &str,
1272 ) -> Result<i64> {
1273 let source = source.to_string();
1274 let ts_str = ts(ts_utc);
1275 let payload = payload_json.to_string();
1276 self.with_conn(move |conn| async move {
1277 let mut rows = conn
1278 .query(
1279 "INSERT INTO events (source, ts_utc, payload_json, delivery_status) \
1280 VALUES (?1, ?2, ?3, 'pending') RETURNING id",
1281 params_from_iter([
1282 Value::from(source),
1283 Value::from(ts_str),
1284 Value::from(payload),
1285 ]),
1286 )
1287 .await?;
1288 let Some(row) = rows.next().await? else {
1289 return Err(Error::NotFound("event id after insert".into()));
1290 };
1291 integer_value(&row.get_value(0)?)
1292 })
1293 }
1294
1295 pub fn update_event_delivery(
1298 &self,
1299 id: i64,
1300 status: EventStatus,
1301 reason: Option<&str>,
1302 ) -> Result<()> {
1303 let reason_value = reason.map(str::to_string);
1304 let status_str = status.as_str().to_string();
1305 self.with_conn(move |conn| async move {
1306 conn.execute(
1307 "UPDATE events SET delivery_status = ?1, reason = ?2 WHERE id = ?3",
1308 params_from_iter([
1309 Value::from(status_str),
1310 match reason_value {
1311 Some(r) => Value::from(r),
1312 None => Value::Null,
1313 },
1314 Value::from(id),
1315 ]),
1316 )
1317 .await?;
1318 Ok(())
1319 })
1320 }
1321
1322 pub fn tail_events(&self, source: &str, limit: i64) -> Result<Vec<EventRow>> {
1324 let source = source.to_string();
1325 self.with_read_only_conn(move |conn| async move {
1326 let mut rows = conn
1327 .query(
1328 "SELECT id, source, ts_utc, payload_json, delivery_status, reason \
1329 FROM events WHERE source = ?1 ORDER BY id DESC LIMIT ?2",
1330 params_from_iter([Value::from(source), Value::from(limit)]),
1331 )
1332 .await?;
1333 let mut out = Vec::new();
1334 while let Some(row) = rows.next().await? {
1335 let reason = match row.get_value(5)? {
1336 Value::Null => None,
1337 other => Some(
1338 other
1339 .as_text()
1340 .cloned()
1341 .ok_or_else(|| Error::NotFound("event reason".into()))?,
1342 ),
1343 };
1344 out.push(EventRow {
1345 id: integer_value(&row.get_value(0)?)?,
1346 source: text_value(&row.get_value(1)?)?,
1347 ts_utc: text_value(&row.get_value(2)?)?,
1348 payload_json: text_value(&row.get_value(3)?)?,
1349 delivery_status: text_value(&row.get_value(4)?)?,
1350 reason,
1351 });
1352 }
1353 Ok(out)
1354 })
1355 }
1356
1357 pub fn record_task(&self, record: TaskRecord<'_>) -> Result<i64> {
1358 let now = ts(Utc::now());
1359 self.insert_json("netsky_tasks", TASKS, |id| TaskRow {
1360 id,
1361 created_at: now.clone(),
1362 updated_at: now.clone(),
1363 title: record.title.to_string(),
1364 body: record.body.map(str::to_string),
1365 status: record.status.to_string(),
1366 priority: record.priority.map(str::to_string),
1367 labels: record.labels.map(str::to_string),
1368 source: record.source.map(str::to_string),
1369 source_ref: record.source_ref.map(str::to_string),
1370 closed_at: record.closed_at.map(|value| {
1371 if value.is_empty() {
1372 now.clone()
1373 } else {
1374 value.to_string()
1375 }
1376 }),
1377 closed_reason: record.closed_reason.map(str::to_string),
1378 closed_evidence: record.closed_evidence.map(str::to_string),
1379 agent: record.agent.map(str::to_string),
1380 sync_calendar: record.sync_calendar,
1381 calendar_task_id: record.calendar_task_id.map(str::to_string),
1382 })
1383 }
1384
1385 pub fn update_task(&self, update: TaskUpdate<'_>) -> Result<TaskRow> {
1386 let mut row = self
1387 .get_task(update.id)?
1388 .ok_or_else(|| Error::NotFound(format!("task {}", update.id)))?;
1389 let now = ts(Utc::now());
1390 if let Some(status) = update.status {
1391 row.status = status.to_string();
1392 if status == "closed" && row.closed_at.is_none() {
1393 row.closed_at = Some(now.clone());
1394 }
1395 }
1396 if let Some(priority) = update.priority {
1397 row.priority = Some(priority.to_string());
1398 }
1399 if let Some(agent) = update.agent {
1400 row.agent = Some(agent.to_string());
1401 }
1402 if let Some(reason) = update.closed_reason {
1403 row.closed_reason = Some(reason.to_string());
1404 }
1405 if let Some(evidence) = update.closed_evidence {
1406 row.closed_evidence = Some(evidence.to_string());
1407 }
1408 if let Some(sync_calendar) = update.sync_calendar {
1409 row.sync_calendar = sync_calendar;
1410 }
1411 if let Some(calendar_task_id) = update.calendar_task_id {
1412 row.calendar_task_id = Some(calendar_task_id.to_string());
1413 }
1414 row.updated_at = now;
1415 let row_json = serde_json::to_string(&row)?;
1416 let id = update.id;
1417 let write = self.with_conn(move |conn| async move {
1418 conn.execute(
1419 "UPDATE netsky_tasks SET row_json = ?1 WHERE id = ?2",
1420 params_from_iter([Value::from(row_json), Value::from(id)]),
1421 )
1422 .await?;
1423 Ok(())
1424 });
1425 if let Err(error) = write {
1426 spool_write_error("netsky_tasks", update.id, &row, &error)?;
1427 }
1428 Ok(row)
1429 }
1430
1431 pub fn get_task(&self, id: i64) -> Result<Option<TaskRow>> {
1432 self.with_conn(move |conn| async move {
1433 let mut rows = conn
1434 .query(
1435 "SELECT row_json FROM netsky_tasks WHERE id = ?1",
1436 params_from_iter([Value::from(id)]),
1437 )
1438 .await?;
1439 let Some(row) = rows.next().await? else {
1440 return Ok(None);
1441 };
1442 let json = text_value(&row.get_value(0)?)?;
1443 Ok(Some(serde_json::from_str(&json)?))
1444 })
1445 }
1446
1447 pub fn list_tasks(&self, status: Option<&str>, priority: Option<&str>) -> Result<Vec<TaskRow>> {
1448 let status = status.map(str::to_string);
1449 let priority = priority.map(str::to_string);
1450 self.with_conn(move |conn| async move {
1451 let status_val = status.map(Value::from).unwrap_or(Value::Null);
1452 let priority_val = priority.map(Value::from).unwrap_or(Value::Null);
1453 let mut rows = conn
1454 .query(
1455 "SELECT row_json FROM netsky_tasks \
1456 WHERE (?1 IS NULL OR json_extract(row_json, '$.status') = ?1) \
1457 AND (?2 IS NULL OR json_extract(row_json, '$.priority') = ?2) \
1458 ORDER BY json_extract(row_json, '$.status'), \
1459 json_extract(row_json, '$.priority'), \
1460 id",
1461 params_from_iter([status_val, priority_val]),
1462 )
1463 .await?;
1464 let mut out = Vec::new();
1465 while let Some(row) = rows.next().await? {
1466 let json = text_value(&row.get_value(0)?)?;
1467 out.push(serde_json::from_str(&json)?);
1468 }
1469 Ok(out)
1470 })
1471 }
1472
1473 pub fn query(&self, sql: &str) -> Result<String> {
1474 let batches = self.query_batches(sql)?;
1475 Ok(pretty_format_batches(&batches)?.to_string())
1476 }
1477
1478 pub fn query_batches(&self, sql: &str) -> Result<Vec<RecordBatch>> {
1479 let scope = referenced_storage_tables(sql);
1480 let snapshot = self.snapshot_scoped(scope.as_ref())?;
1481 let runtime = tokio::runtime::Runtime::new()?;
1482 runtime.block_on(async move {
1483 let ctx = register_snapshot(snapshot).await?;
1484 let df = ctx.sql(sql).await?;
1485 Ok(df.collect().await?)
1486 })
1487 }
1488
1489 pub fn schema_version(&self) -> Result<i64> {
1490 self.with_conn(|conn| async move { schema_version(&conn).await })
1491 }
1492
1493 fn insert_json<T, F>(&self, name: &'static str, table: &'static str, row: F) -> Result<i64>
1494 where
1495 T: Serialize,
1496 F: FnOnce(i64) -> T,
1497 {
1498 let id = self.next_id(name)?;
1499 let row = row(id);
1500 let row_json = serde_json::to_string(&row)?;
1501 let write = self.with_conn(move |conn| async move {
1502 let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
1503 conn.execute(
1504 &sql,
1505 params_from_iter([Value::from(id), Value::from(row_json)]),
1506 )
1507 .await?;
1508 Ok(())
1509 });
1510 if let Err(error) = write {
1511 spool_write_error(table, id, &row, &error)?;
1512 }
1513 Ok(id)
1514 }
1515
1516 fn snapshot_scoped(&self, scope: Option<&HashSet<&'static str>>) -> Result<Snapshot> {
1521 let owned: Option<HashSet<&'static str>> = scope.cloned();
1522 self.with_read_only_conn(move |conn| async move {
1523 let wants = |t: &'static str| owned.as_ref().is_none_or(|s| s.contains(t));
1524 Ok(Snapshot {
1525 messages: if wants(MESSAGES) {
1526 rows(&conn, MESSAGES).await?
1527 } else {
1528 Vec::new()
1529 },
1530 cli_invocations: if wants(CLI_INVOCATIONS) {
1531 rows(&conn, CLI_INVOCATIONS).await?
1532 } else {
1533 Vec::new()
1534 },
1535 crashes: if wants(CRASHES) {
1536 rows(&conn, CRASHES).await?
1537 } else {
1538 Vec::new()
1539 },
1540 ticks: if wants(TICKS) {
1541 rows(&conn, TICKS).await?
1542 } else {
1543 Vec::new()
1544 },
1545 workspaces: if wants(WORKSPACES) {
1546 rows(&conn, WORKSPACES).await?
1547 } else {
1548 Vec::new()
1549 },
1550 sessions: if wants(SESSIONS) {
1551 rows(&conn, SESSIONS).await?
1552 } else {
1553 Vec::new()
1554 },
1555 clone_dispatches: if wants(CLONE_DISPATCHES) {
1556 rows(&conn, CLONE_DISPATCHES).await?
1557 } else {
1558 Vec::new()
1559 },
1560 harvest_events: if wants(HARVEST_EVENTS) {
1561 rows(&conn, HARVEST_EVENTS).await?
1562 } else {
1563 Vec::new()
1564 },
1565 communication_events: if wants(COMMUNICATION_EVENTS) {
1566 rows(&conn, COMMUNICATION_EVENTS).await?
1567 } else {
1568 Vec::new()
1569 },
1570 mcp_tool_calls: if wants(MCP_TOOL_CALLS) {
1571 rows(&conn, MCP_TOOL_CALLS).await?
1572 } else {
1573 Vec::new()
1574 },
1575 git_operations: if wants(GIT_OPERATIONS) {
1576 rows(&conn, GIT_OPERATIONS).await?
1577 } else {
1578 Vec::new()
1579 },
1580 owner_directives: if wants(OWNER_DIRECTIVES) {
1581 rows(&conn, OWNER_DIRECTIVES).await?
1582 } else {
1583 Vec::new()
1584 },
1585 token_usage: if wants(TOKEN_USAGE) {
1586 rows(&conn, TOKEN_USAGE).await?
1587 } else {
1588 Vec::new()
1589 },
1590 watchdog_events: if wants(WATCHDOG_EVENTS) {
1591 rows(&conn, WATCHDOG_EVENTS).await?
1592 } else {
1593 Vec::new()
1594 },
1595 tasks: if wants(TASKS) {
1596 rows(&conn, TASKS).await?
1597 } else {
1598 Vec::new()
1599 },
1600 source_errors: if wants(SOURCE_ERRORS) {
1601 rows(&conn, SOURCE_ERRORS).await?
1602 } else {
1603 Vec::new()
1604 },
1605 iroh_events: if wants(IROH_EVENTS) {
1606 rows(&conn, IROH_EVENTS).await?
1607 } else {
1608 Vec::new()
1609 },
1610 })
1611 })
1612 }
1613
1614 fn next_id(&self, name: &'static str) -> Result<i64> {
1615 let result = self.with_conn(move |conn| async move {
1616 let mut rows = conn
1617 .query(
1618 "INSERT INTO ids (name, id) VALUES (?1, 1) \
1619 ON CONFLICT(name) DO UPDATE SET id = id + 1 \
1620 RETURNING id",
1621 params_from_iter([Value::from(name)]),
1622 )
1623 .await?;
1624 if let Some(row) = rows.next().await? {
1625 return integer_value(&row.get_value(0)?);
1626 }
1627 Err(Error::NotFound(format!("id allocation for {name}")))
1628 });
1629 match result {
1630 Ok(id) => Ok(id),
1631 Err(error) => {
1632 let fallback = Utc::now().timestamp_micros();
1633 spool_error_json(serde_json::json!({
1634 "ts_utc": ts(Utc::now()),
1635 "table": name,
1636 "id": fallback,
1637 "error": error.to_string(),
1638 "record": {"kind": "id_allocation"}
1639 }))?;
1640 Ok(fallback)
1641 }
1642 }
1643 }
1644
1645 fn with_conn<T, F, Fut>(&self, f: F) -> Result<T>
1646 where
1647 T: Send + 'static,
1648 F: FnOnce(Connection) -> Fut + Send + 'static,
1649 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1650 {
1651 let path = self.path.clone();
1652 let max_attempts = if tokio::runtime::Handle::try_current().is_ok() {
1653 3
1654 } else {
1655 10
1656 };
1657 let task = async move {
1658 let path = path.to_string_lossy().to_string();
1659 let mut last_error = None;
1660 for attempt in 0..=max_attempts {
1661 match Builder::new_local(&path).build().await {
1662 Ok(db) => {
1663 let conn = db.connect()?;
1664 configure_conn(&conn).await?;
1665 return f(conn).await;
1666 }
1667 Err(error) if is_locking_error(&error) && attempt < max_attempts => {
1668 last_error = Some(error);
1669 tokio::time::sleep(Duration::from_millis(10 + attempt * 2)).await;
1670 }
1671 Err(error) => return Err(error.into()),
1672 }
1673 }
1674 Err(last_error
1675 .map(Error::from)
1676 .unwrap_or_else(|| Error::NotFound("turso connection".to_string())))
1677 };
1678 block_on_turso(task)
1679 }
1680
1681 fn with_read_only_conn<T, F, Fut>(&self, f: F) -> Result<T>
1682 where
1683 T: Send + 'static,
1684 F: FnOnce(Connection) -> Fut + Send + 'static,
1685 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1686 {
1687 let path = self.path.clone();
1688 let task = async move {
1689 let path_str = path.to_string_lossy().to_string();
1690 let db = Builder::new_local(&path_str).build().await?;
1691 let conn = db.connect()?;
1692 configure_read_only_conn(&conn, &path).await?;
1693 f(conn).await
1694 };
1695 block_on_turso(task)
1696 }
1697
1698 #[cfg(test)]
1699 fn read_only_write_probe_fails(&self) -> Result<bool> {
1700 self.with_read_only_conn(|conn| async move {
1701 Ok(conn
1702 .execute(
1703 "INSERT INTO meta (key, value) VALUES ('read_only_probe', 1)",
1704 (),
1705 )
1706 .await
1707 .is_err())
1708 })
1709 }
1710}
1711
1712struct Snapshot {
1713 messages: Vec<MessageRow>,
1714 cli_invocations: Vec<CliRow>,
1715 crashes: Vec<CrashRow>,
1716 ticks: Vec<TickRow>,
1717 workspaces: Vec<WorkspaceRow>,
1718 sessions: Vec<SessionRow>,
1719 clone_dispatches: Vec<CloneDispatchRow>,
1720 harvest_events: Vec<HarvestEventRow>,
1721 communication_events: Vec<CommunicationEventRow>,
1722 mcp_tool_calls: Vec<McpToolCallRow>,
1723 git_operations: Vec<GitOperationRow>,
1724 owner_directives: Vec<OwnerDirectiveRow>,
1725 token_usage: Vec<TokenUsageRow>,
1726 watchdog_events: Vec<WatchdogEventRow>,
1727 tasks: Vec<TaskRow>,
1728 source_errors: Vec<SourceErrorRow>,
1729 iroh_events: Vec<IrohEventRow>,
1730}
1731
1732async fn configure_conn(conn: &Connection) -> Result<()> {
1733 conn.busy_timeout(Duration::from_secs(10))?;
1734 if user_version(conn).await? == SCHEMA_VERSION {
1735 return Ok(());
1736 }
1737 let mut wal = conn.query("PRAGMA journal_mode=WAL", ()).await?;
1738 while wal.next().await?.is_some() {}
1739 conn.execute_batch(
1740 "PRAGMA synchronous=NORMAL; \
1741 CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value INTEGER NOT NULL); \
1742 CREATE TABLE IF NOT EXISTS ids (name TEXT PRIMARY KEY, id INTEGER NOT NULL);",
1743 )
1744 .await?;
1745 Ok(())
1746}
1747
1748async fn configure_read_only_conn(conn: &Connection, path: &Path) -> Result<()> {
1749 conn.busy_timeout(Duration::from_secs(10))?;
1750 conn.execute("PRAGMA query_only = 1", ()).await?;
1751 if query_only(conn).await? != 1 {
1752 return Err(Error::ReadOnlyNotEnforced(path.to_path_buf()));
1753 }
1754 ensure_initialized(conn, path).await
1755}
1756
1757async fn ensure_initialized(conn: &Connection, path: &Path) -> Result<()> {
1758 if !table_exists(conn, "meta").await? {
1759 return Err(Error::NotInitialized(path.to_path_buf()));
1760 }
1761 let current = schema_version(conn).await?;
1762 if current == 0 {
1763 return Err(Error::NotInitialized(path.to_path_buf()));
1764 }
1765 if current > SCHEMA_VERSION {
1766 return Err(Error::FutureSchemaVersion {
1767 found: current,
1768 supported: SCHEMA_VERSION,
1769 });
1770 }
1771 Ok(())
1772}
1773
1774async fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
1775 let mut rows = conn
1776 .query(
1777 "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1",
1778 params_from_iter([Value::from(table)]),
1779 )
1780 .await?;
1781 Ok(rows.next().await?.is_some())
1782}
1783
1784async fn query_only(conn: &Connection) -> Result<i64> {
1785 let mut rows = conn.query("PRAGMA query_only", ()).await?;
1786 let Some(row) = rows.next().await? else {
1787 return Ok(0);
1788 };
1789 integer_value(&row.get_value(0)?)
1790}
1791
1792async fn user_version(conn: &Connection) -> Result<i64> {
1793 let mut rows = conn.query("PRAGMA user_version", ()).await?;
1794 let Some(row) = rows.next().await? else {
1795 return Ok(0);
1796 };
1797 integer_value(&row.get_value(0)?)
1798}
1799
1800async fn schema_version(conn: &Connection) -> Result<i64> {
1801 let mut rows = conn
1802 .query(
1803 "SELECT value FROM meta WHERE key = ?1",
1804 params_from_iter([Value::from("schema_version")]),
1805 )
1806 .await?;
1807 let Some(row) = rows.next().await? else {
1808 return Ok(0);
1809 };
1810 integer_value(&row.get_value(0)?)
1811}
1812
1813async fn rows<T>(conn: &Connection, table: &str) -> Result<Vec<T>>
1814where
1815 T: DeserializeOwned,
1816{
1817 let sql = format!("SELECT row_json FROM {table} ORDER BY id");
1818 let mut out = Vec::new();
1819 let mut rows = conn.query(&sql, ()).await?;
1820 while let Some(row) = rows.next().await? {
1821 let json = text_value(&row.get_value(0)?)?;
1822 out.push(serde_json::from_str(&json)?);
1823 }
1824 Ok(out)
1825}
1826
1827fn integer_value(value: &Value) -> Result<i64> {
1828 value
1829 .as_integer()
1830 .copied()
1831 .ok_or_else(|| Error::NotFound("integer value".to_string()))
1832}
1833
1834fn text_value(value: &Value) -> Result<String> {
1835 value
1836 .as_text()
1837 .cloned()
1838 .ok_or_else(|| Error::NotFound("text value".to_string()))
1839}
1840
1841fn block_on_turso<T, Fut>(future: Fut) -> Result<T>
1842where
1843 T: Send + 'static,
1844 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1845{
1846 if tokio::runtime::Handle::try_current().is_ok() {
1847 std::thread::spawn(move || Runtime::new()?.block_on(future))
1848 .join()
1849 .unwrap_or_else(|_| Err(Error::NotFound("turso worker thread".to_string())))
1850 } else {
1851 Runtime::new()?.block_on(future)
1852 }
1853}
1854
1855fn is_locking_error(error: &turso::Error) -> bool {
1856 let message = error.to_string();
1857 message.contains("Locking error") || message.contains("locked")
1858}
1859
1860fn spool_write_error<T: Serialize>(table: &str, id: i64, record: &T, error: &Error) -> Result<()> {
1861 spool_error_json(serde_json::json!({
1862 "ts_utc": ts(Utc::now()),
1863 "table": table,
1864 "id": id,
1865 "error": error.to_string(),
1866 "record": record,
1867 }))
1868}
1869
1870fn spool_error_json(value: serde_json::Value) -> Result<()> {
1871 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
1872 let dir = home.join(".netsky").join("logs");
1873 fs::create_dir_all(&dir)?;
1874 let path = dir.join(format!(
1875 "meta-db-errors-{}.jsonl",
1876 Utc::now().format("%Y-%m-%d")
1877 ));
1878 if let Err(spool_error) = netsky_core::jsonl::append_json_line(&path, &value) {
1879 write_spool_failure_marker(&value, &spool_error)?;
1880 return Err(Error::Io(spool_error));
1881 }
1882 Ok(())
1883}
1884
1885fn write_spool_failure_marker(
1886 value: &serde_json::Value,
1887 spool_error: &std::io::Error,
1888) -> Result<()> {
1889 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
1890 let dir = home.join(".netsky").join("state");
1891 fs::create_dir_all(&dir)?;
1892 let path = dir.join("meta-db-spool-failed");
1893 let body = serde_json::json!({
1894 "ts_utc": ts(Utc::now()),
1895 "spool_error": spool_error.to_string(),
1896 "record": value,
1897 });
1898 fs::write(path, serde_json::to_vec_pretty(&body)?)?;
1899 Ok(())
1900}
1901
1902fn referenced_storage_tables(sql: &str) -> Option<HashSet<&'static str>> {
1908 let lower = sql.to_ascii_lowercase();
1909 let mut out: HashSet<&'static str> = HashSet::new();
1910 for &(df_name, storage) in &DATAFUSION_TABLES {
1911 if contains_word(&lower, df_name) {
1912 out.insert(storage);
1913 }
1914 }
1915 if contains_word(&lower, CLONE_LIFETIMES_VIEW) {
1916 out.insert(CLONE_DISPATCHES);
1917 }
1918 if out.is_empty() { None } else { Some(out) }
1919}
1920
1921fn contains_word(haystack: &str, needle: &str) -> bool {
1926 if needle.is_empty() || needle.len() > haystack.len() {
1927 return false;
1928 }
1929 let hb = haystack.as_bytes();
1930 let nb = needle.as_bytes();
1931 let mut cursor = 0;
1932 while cursor + nb.len() <= hb.len() {
1933 let rest = &haystack[cursor..];
1934 let Some(pos) = rest.find(needle) else { break };
1935 let start = cursor + pos;
1936 let end = start + nb.len();
1937 let before_ok = start == 0 || !is_ident_byte(hb[start - 1]);
1938 let after_ok = end == hb.len() || !is_ident_byte(hb[end]);
1939 if before_ok && after_ok {
1940 return true;
1941 }
1942 cursor = start + 1;
1943 }
1944 false
1945}
1946
1947fn is_ident_byte(b: u8) -> bool {
1948 b.is_ascii_alphanumeric() || b == b'_'
1949}
1950
1951fn register(ctx: &SessionContext, name: &str, batch: RecordBatch) -> Result<()> {
1952 let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
1953 ctx.register_table(name, Arc::new(table))?;
1954 Ok(())
1955}
1956
1957async fn register_snapshot(snapshot: Snapshot) -> Result<SessionContext> {
1958 let ctx = SessionContext::new();
1959 register(&ctx, "messages", messages_batch(snapshot.messages)?)?;
1960 register(
1961 &ctx,
1962 "cli_invocations",
1963 cli_batch(snapshot.cli_invocations)?,
1964 )?;
1965 register(&ctx, "crashes", crashes_batch(snapshot.crashes)?)?;
1966 register(&ctx, "ticks", ticks_batch(snapshot.ticks)?)?;
1967 register(&ctx, "workspaces", workspaces_batch(snapshot.workspaces)?)?;
1968 register(&ctx, "sessions", sessions_batch(snapshot.sessions)?)?;
1969 register(
1970 &ctx,
1971 "clone_dispatches",
1972 clone_dispatches_batch(snapshot.clone_dispatches)?,
1973 )?;
1974 register(
1975 &ctx,
1976 "harvest_events",
1977 harvest_events_batch(snapshot.harvest_events)?,
1978 )?;
1979 register(
1980 &ctx,
1981 "communication_events",
1982 communication_events_batch(snapshot.communication_events)?,
1983 )?;
1984 register(
1985 &ctx,
1986 "mcp_tool_calls",
1987 mcp_tool_calls_batch(snapshot.mcp_tool_calls)?,
1988 )?;
1989 register(
1990 &ctx,
1991 "git_operations",
1992 git_operations_batch(snapshot.git_operations)?,
1993 )?;
1994 register(
1995 &ctx,
1996 "owner_directives",
1997 owner_directives_batch(snapshot.owner_directives)?,
1998 )?;
1999 register(
2000 &ctx,
2001 "token_usage",
2002 token_usage_batch(snapshot.token_usage)?,
2003 )?;
2004 register(
2005 &ctx,
2006 "watchdog_events",
2007 watchdog_events_batch(snapshot.watchdog_events)?,
2008 )?;
2009 register(&ctx, "tasks", tasks_batch(snapshot.tasks)?)?;
2010 register(
2011 &ctx,
2012 "source_errors",
2013 source_errors_batch(snapshot.source_errors)?,
2014 )?;
2015 register(
2016 &ctx,
2017 "iroh_events",
2018 iroh_events_batch(snapshot.iroh_events)?,
2019 )?;
2020 ctx.sql(
2021 "CREATE VIEW clone_lifetimes AS \
2022 SELECT id, agent_id, runtime, workspace, branch, status, \
2023 ts_utc_start, ts_utc_end \
2024 FROM clone_dispatches \
2025 WHERE ts_utc_end IS NOT NULL",
2026 )
2027 .await?;
2028 Ok(ctx)
2029}
2030
2031fn messages_batch(rows: Vec<MessageRow>) -> Result<RecordBatch> {
2032 Ok(RecordBatch::try_new(
2033 schema(&[
2034 ("id", DataType::Int64, false),
2035 ("ts_utc", DataType::Utf8, false),
2036 ("source", DataType::Utf8, false),
2037 ("direction", DataType::Utf8, false),
2038 ("chat_id", DataType::Utf8, true),
2039 ("from_agent", DataType::Utf8, true),
2040 ("to_agent", DataType::Utf8, true),
2041 ("body", DataType::Utf8, true),
2042 ("raw_json", DataType::Utf8, true),
2043 ]),
2044 vec![
2045 ids(&rows, |r| r.id),
2046 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2047 string(rows.iter().map(|r| Some(r.source.as_str()))),
2048 string(rows.iter().map(|r| Some(r.direction.as_str()))),
2049 opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2050 opt_string(rows.iter().map(|r| r.from_agent.as_deref())),
2051 opt_string(rows.iter().map(|r| r.to_agent.as_deref())),
2052 opt_string(rows.iter().map(|r| r.body.as_deref())),
2053 opt_string(rows.iter().map(|r| r.raw_json.as_deref())),
2054 ],
2055 )?)
2056}
2057
2058fn cli_batch(rows: Vec<CliRow>) -> Result<RecordBatch> {
2059 Ok(RecordBatch::try_new(
2060 schema(&[
2061 ("id", DataType::Int64, false),
2062 ("ts_utc", DataType::Utf8, false),
2063 ("bin", DataType::Utf8, false),
2064 ("argv_json", DataType::Utf8, false),
2065 ("exit_code", DataType::Int64, true),
2066 ("duration_ms", DataType::Int64, true),
2067 ("host", DataType::Utf8, false),
2068 ]),
2069 vec![
2070 ids(&rows, |r| r.id),
2071 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2072 string(rows.iter().map(|r| Some(r.bin.as_str()))),
2073 string(rows.iter().map(|r| Some(r.argv_json.as_str()))),
2074 int64_opt(rows.iter().map(|r| r.exit_code)),
2075 int64_opt(rows.iter().map(|r| r.duration_ms)),
2076 string(rows.iter().map(|r| Some(r.host.as_str()))),
2077 ],
2078 )?)
2079}
2080
2081fn crashes_batch(rows: Vec<CrashRow>) -> Result<RecordBatch> {
2082 simple_event_batch(
2083 schema(&[
2084 ("id", DataType::Int64, false),
2085 ("ts_utc", DataType::Utf8, false),
2086 ("kind", DataType::Utf8, false),
2087 ("agent", DataType::Utf8, false),
2088 ("detail_json", DataType::Utf8, false),
2089 ]),
2090 rows.iter()
2091 .map(|r| (r.id, &r.ts_utc, &r.kind, &r.agent, &r.detail_json)),
2092 )
2093}
2094
2095fn ticks_batch(rows: Vec<TickRow>) -> Result<RecordBatch> {
2096 Ok(RecordBatch::try_new(
2097 schema(&[
2098 ("id", DataType::Int64, false),
2099 ("ts_utc", DataType::Utf8, false),
2100 ("source", DataType::Utf8, false),
2101 ("detail_json", DataType::Utf8, false),
2102 ]),
2103 vec![
2104 ids(&rows, |r| r.id),
2105 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2106 string(rows.iter().map(|r| Some(r.source.as_str()))),
2107 string(rows.iter().map(|r| Some(r.detail_json.as_str()))),
2108 ],
2109 )?)
2110}
2111
2112fn workspaces_batch(rows: Vec<WorkspaceRow>) -> Result<RecordBatch> {
2113 Ok(RecordBatch::try_new(
2114 schema(&[
2115 ("id", DataType::Int64, false),
2116 ("ts_utc_created", DataType::Utf8, false),
2117 ("name", DataType::Utf8, false),
2118 ("branch", DataType::Utf8, false),
2119 ("ts_utc_deleted", DataType::Utf8, true),
2120 ("verdict", DataType::Utf8, true),
2121 ]),
2122 vec![
2123 ids(&rows, |r| r.id),
2124 string(rows.iter().map(|r| Some(r.ts_utc_created.as_str()))),
2125 string(rows.iter().map(|r| Some(r.name.as_str()))),
2126 string(rows.iter().map(|r| Some(r.branch.as_str()))),
2127 opt_string(rows.iter().map(|r| r.ts_utc_deleted.as_deref())),
2128 opt_string(rows.iter().map(|r| r.verdict.as_deref())),
2129 ],
2130 )?)
2131}
2132
2133fn sessions_batch(rows: Vec<SessionRow>) -> Result<RecordBatch> {
2134 Ok(RecordBatch::try_new(
2135 schema(&[
2136 ("id", DataType::Int64, false),
2137 ("ts_utc", DataType::Utf8, false),
2138 ("agent", DataType::Utf8, false),
2139 ("session_num", DataType::Int64, false),
2140 ("event", DataType::Utf8, false),
2141 ]),
2142 vec![
2143 ids(&rows, |r| r.id),
2144 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2145 string(rows.iter().map(|r| Some(r.agent.as_str()))),
2146 int64(rows.iter().map(|r| r.session_num)),
2147 string(rows.iter().map(|r| Some(r.event.as_str()))),
2148 ],
2149 )?)
2150}
2151
2152fn clone_dispatches_batch(rows: Vec<CloneDispatchRow>) -> Result<RecordBatch> {
2153 Ok(RecordBatch::try_new(
2154 schema(&[
2155 ("id", DataType::Int64, false),
2156 ("ts_utc_start", DataType::Utf8, false),
2157 ("ts_utc_end", DataType::Utf8, true),
2158 ("agent_id", DataType::Utf8, false),
2159 ("runtime", DataType::Utf8, true),
2160 ("brief_path", DataType::Utf8, true),
2161 ("brief", DataType::Utf8, true),
2162 ("workspace", DataType::Utf8, true),
2163 ("branch", DataType::Utf8, true),
2164 ("status", DataType::Utf8, true),
2165 ("exit_code", DataType::Int64, true),
2166 ("detail_json", DataType::Utf8, true),
2167 ]),
2168 vec![
2169 ids(&rows, |r| r.id),
2170 string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2171 opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2172 string(rows.iter().map(|r| Some(r.agent_id.as_str()))),
2173 opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2174 opt_string(rows.iter().map(|r| r.brief_path.as_deref())),
2175 opt_string(rows.iter().map(|r| r.brief.as_deref())),
2176 opt_string(rows.iter().map(|r| r.workspace.as_deref())),
2177 opt_string(rows.iter().map(|r| r.branch.as_deref())),
2178 opt_string(rows.iter().map(|r| r.status.as_deref())),
2179 int64_opt(rows.iter().map(|r| r.exit_code)),
2180 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2181 ],
2182 )?)
2183}
2184
2185fn harvest_events_batch(rows: Vec<HarvestEventRow>) -> Result<RecordBatch> {
2186 Ok(RecordBatch::try_new(
2187 schema(&[
2188 ("id", DataType::Int64, false),
2189 ("ts_utc", DataType::Utf8, false),
2190 ("source_branch", DataType::Utf8, false),
2191 ("target_branch", DataType::Utf8, false),
2192 ("commit_sha", DataType::Utf8, true),
2193 ("status", DataType::Utf8, false),
2194 ("conflicts", DataType::Utf8, true),
2195 ("detail_json", DataType::Utf8, true),
2196 ]),
2197 vec![
2198 ids(&rows, |r| r.id),
2199 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2200 string(rows.iter().map(|r| Some(r.source_branch.as_str()))),
2201 string(rows.iter().map(|r| Some(r.target_branch.as_str()))),
2202 opt_string(rows.iter().map(|r| r.commit_sha.as_deref())),
2203 string(rows.iter().map(|r| Some(r.status.as_str()))),
2204 opt_string(rows.iter().map(|r| r.conflicts.as_deref())),
2205 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2206 ],
2207 )?)
2208}
2209
2210fn communication_events_batch(rows: Vec<CommunicationEventRow>) -> Result<RecordBatch> {
2211 Ok(RecordBatch::try_new(
2212 schema(&[
2213 ("id", DataType::Int64, false),
2214 ("ts_utc", DataType::Utf8, false),
2215 ("source", DataType::Utf8, false),
2216 ("tool", DataType::Utf8, true),
2217 ("direction", DataType::Utf8, false),
2218 ("chat_id", DataType::Utf8, true),
2219 ("message_id", DataType::Utf8, true),
2220 ("handle", DataType::Utf8, true),
2221 ("agent", DataType::Utf8, true),
2222 ("body", DataType::Utf8, true),
2223 ("status", DataType::Utf8, true),
2224 ("detail_json", DataType::Utf8, true),
2225 ]),
2226 vec![
2227 ids(&rows, |r| r.id),
2228 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2229 string(rows.iter().map(|r| Some(r.source.as_str()))),
2230 opt_string(rows.iter().map(|r| r.tool.as_deref())),
2231 string(rows.iter().map(|r| Some(r.direction.as_str()))),
2232 opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2233 opt_string(rows.iter().map(|r| r.message_id.as_deref())),
2234 opt_string(rows.iter().map(|r| r.handle.as_deref())),
2235 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2236 opt_string(rows.iter().map(|r| r.body.as_deref())),
2237 opt_string(rows.iter().map(|r| r.status.as_deref())),
2238 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2239 ],
2240 )?)
2241}
2242
2243fn mcp_tool_calls_batch(rows: Vec<McpToolCallRow>) -> Result<RecordBatch> {
2244 Ok(RecordBatch::try_new(
2245 schema(&[
2246 ("id", DataType::Int64, false),
2247 ("ts_utc_start", DataType::Utf8, false),
2248 ("ts_utc_end", DataType::Utf8, true),
2249 ("source", DataType::Utf8, false),
2250 ("tool", DataType::Utf8, false),
2251 ("agent", DataType::Utf8, true),
2252 ("duration_ms", DataType::Int64, true),
2253 ("success", DataType::Boolean, false),
2254 ("error", DataType::Utf8, true),
2255 ("timeout_race", DataType::Boolean, false),
2256 ("request_json", DataType::Utf8, true),
2257 ("response_json", DataType::Utf8, true),
2258 ]),
2259 vec![
2260 ids(&rows, |r| r.id),
2261 string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2262 opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2263 string(rows.iter().map(|r| Some(r.source.as_str()))),
2264 string(rows.iter().map(|r| Some(r.tool.as_str()))),
2265 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2266 int64_opt(rows.iter().map(|r| r.duration_ms)),
2267 bools(rows.iter().map(|r| r.success)),
2268 opt_string(rows.iter().map(|r| r.error.as_deref())),
2269 bools(rows.iter().map(|r| r.timeout_race)),
2270 opt_string(rows.iter().map(|r| r.request_json.as_deref())),
2271 opt_string(rows.iter().map(|r| r.response_json.as_deref())),
2272 ],
2273 )?)
2274}
2275
2276fn git_operations_batch(rows: Vec<GitOperationRow>) -> Result<RecordBatch> {
2277 Ok(RecordBatch::try_new(
2278 schema(&[
2279 ("id", DataType::Int64, false),
2280 ("ts_utc", DataType::Utf8, false),
2281 ("operation", DataType::Utf8, false),
2282 ("repo", DataType::Utf8, false),
2283 ("branch", DataType::Utf8, true),
2284 ("remote", DataType::Utf8, true),
2285 ("from_sha", DataType::Utf8, true),
2286 ("to_sha", DataType::Utf8, true),
2287 ("status", DataType::Utf8, false),
2288 ("detail_json", DataType::Utf8, true),
2289 ]),
2290 vec![
2291 ids(&rows, |r| r.id),
2292 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2293 string(rows.iter().map(|r| Some(r.operation.as_str()))),
2294 string(rows.iter().map(|r| Some(r.repo.as_str()))),
2295 opt_string(rows.iter().map(|r| r.branch.as_deref())),
2296 opt_string(rows.iter().map(|r| r.remote.as_deref())),
2297 opt_string(rows.iter().map(|r| r.from_sha.as_deref())),
2298 opt_string(rows.iter().map(|r| r.to_sha.as_deref())),
2299 string(rows.iter().map(|r| Some(r.status.as_str()))),
2300 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2301 ],
2302 )?)
2303}
2304
2305fn owner_directives_batch(rows: Vec<OwnerDirectiveRow>) -> Result<RecordBatch> {
2306 Ok(RecordBatch::try_new(
2307 schema(&[
2308 ("id", DataType::Int64, false),
2309 ("ts_utc", DataType::Utf8, false),
2310 ("source", DataType::Utf8, false),
2311 ("chat_id", DataType::Utf8, true),
2312 ("raw_text", DataType::Utf8, false),
2313 ("resolved_action", DataType::Utf8, true),
2314 ("agent", DataType::Utf8, true),
2315 ("status", DataType::Utf8, true),
2316 ("detail_json", DataType::Utf8, true),
2317 ]),
2318 vec![
2319 ids(&rows, |r| r.id),
2320 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2321 string(rows.iter().map(|r| Some(r.source.as_str()))),
2322 opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2323 string(rows.iter().map(|r| Some(r.raw_text.as_str()))),
2324 opt_string(rows.iter().map(|r| r.resolved_action.as_deref())),
2325 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2326 opt_string(rows.iter().map(|r| r.status.as_deref())),
2327 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2328 ],
2329 )?)
2330}
2331
2332fn token_usage_batch(rows: Vec<TokenUsageRow>) -> Result<RecordBatch> {
2333 Ok(RecordBatch::try_new(
2334 schema(&[
2335 ("id", DataType::Int64, false),
2336 ("ts_utc", DataType::Utf8, false),
2337 ("session_id", DataType::Utf8, true),
2338 ("agent", DataType::Utf8, true),
2339 ("runtime", DataType::Utf8, true),
2340 ("model", DataType::Utf8, true),
2341 ("input_tokens", DataType::Int64, true),
2342 ("output_tokens", DataType::Int64, true),
2343 ("cached_input_tokens", DataType::Int64, true),
2344 ("cost_usd_micros", DataType::Int64, true),
2345 ("detail_json", DataType::Utf8, true),
2346 ]),
2347 vec![
2348 ids(&rows, |r| r.id),
2349 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2350 opt_string(rows.iter().map(|r| r.session_id.as_deref())),
2351 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2352 opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2353 opt_string(rows.iter().map(|r| r.model.as_deref())),
2354 int64_opt(rows.iter().map(|r| r.input_tokens)),
2355 int64_opt(rows.iter().map(|r| r.output_tokens)),
2356 int64_opt(rows.iter().map(|r| r.cached_input_tokens)),
2357 int64_opt(rows.iter().map(|r| r.cost_usd_micros)),
2358 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2359 ],
2360 )?)
2361}
2362
2363fn source_errors_batch(rows: Vec<SourceErrorRow>) -> Result<RecordBatch> {
2364 Ok(RecordBatch::try_new(
2365 schema(&[
2366 ("id", DataType::Int64, false),
2367 ("ts_utc", DataType::Utf8, false),
2368 ("source", DataType::Utf8, false),
2369 ("error_class", DataType::Utf8, false),
2370 ("count", DataType::Int64, false),
2371 ("detail_json", DataType::Utf8, true),
2372 ]),
2373 vec![
2374 ids(&rows, |r| r.id),
2375 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2376 string(rows.iter().map(|r| Some(r.source.as_str()))),
2377 string(rows.iter().map(|r| Some(r.error_class.as_str()))),
2378 int64(rows.iter().map(|r| r.count)),
2379 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2380 ],
2381 )?)
2382}
2383
2384fn iroh_events_batch(rows: Vec<IrohEventRow>) -> Result<RecordBatch> {
2385 Ok(RecordBatch::try_new(
2386 schema(&[
2387 ("id", DataType::Int64, false),
2388 ("ts_utc", DataType::Utf8, false),
2389 ("event_type", DataType::Utf8, false),
2390 ("peer_id_hash", DataType::Utf8, false),
2391 ("peer_label", DataType::Utf8, true),
2392 ("detail_json", DataType::Utf8, true),
2393 ]),
2394 vec![
2395 ids(&rows, |r| r.id),
2396 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2397 string(rows.iter().map(|r| Some(r.event_type.as_str()))),
2398 string(rows.iter().map(|r| Some(r.peer_id_hash.as_str()))),
2399 opt_string(rows.iter().map(|r| r.peer_label.as_deref())),
2400 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2401 ],
2402 )?)
2403}
2404
2405fn watchdog_events_batch(rows: Vec<WatchdogEventRow>) -> Result<RecordBatch> {
2406 Ok(RecordBatch::try_new(
2407 schema(&[
2408 ("id", DataType::Int64, false),
2409 ("ts_utc", DataType::Utf8, false),
2410 ("event", DataType::Utf8, false),
2411 ("agent", DataType::Utf8, true),
2412 ("severity", DataType::Utf8, true),
2413 ("status", DataType::Utf8, true),
2414 ("detail_json", DataType::Utf8, true),
2415 ]),
2416 vec![
2417 ids(&rows, |r| r.id),
2418 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2419 string(rows.iter().map(|r| Some(r.event.as_str()))),
2420 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2421 opt_string(rows.iter().map(|r| r.severity.as_deref())),
2422 opt_string(rows.iter().map(|r| r.status.as_deref())),
2423 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2424 ],
2425 )?)
2426}
2427
2428fn tasks_batch(rows: Vec<TaskRow>) -> Result<RecordBatch> {
2429 Ok(RecordBatch::try_new(
2430 schema(&[
2431 ("id", DataType::Int64, false),
2432 ("created_at", DataType::Utf8, false),
2433 ("updated_at", DataType::Utf8, false),
2434 ("title", DataType::Utf8, false),
2435 ("body", DataType::Utf8, true),
2436 ("status", DataType::Utf8, false),
2437 ("priority", DataType::Utf8, true),
2438 ("labels", DataType::Utf8, true),
2439 ("source", DataType::Utf8, true),
2440 ("source_ref", DataType::Utf8, true),
2441 ("closed_at", DataType::Utf8, true),
2442 ("closed_reason", DataType::Utf8, true),
2443 ("closed_evidence", DataType::Utf8, true),
2444 ("agent", DataType::Utf8, true),
2445 ("sync_calendar", DataType::Boolean, false),
2446 ("calendar_task_id", DataType::Utf8, true),
2447 ]),
2448 vec![
2449 ids(&rows, |r| r.id),
2450 string(rows.iter().map(|r| Some(r.created_at.as_str()))),
2451 string(rows.iter().map(|r| Some(r.updated_at.as_str()))),
2452 string(rows.iter().map(|r| Some(r.title.as_str()))),
2453 opt_string(rows.iter().map(|r| r.body.as_deref())),
2454 string(rows.iter().map(|r| Some(r.status.as_str()))),
2455 opt_string(rows.iter().map(|r| r.priority.as_deref())),
2456 opt_string(rows.iter().map(|r| r.labels.as_deref())),
2457 opt_string(rows.iter().map(|r| r.source.as_deref())),
2458 opt_string(rows.iter().map(|r| r.source_ref.as_deref())),
2459 opt_string(rows.iter().map(|r| r.closed_at.as_deref())),
2460 opt_string(rows.iter().map(|r| r.closed_reason.as_deref())),
2461 opt_string(rows.iter().map(|r| r.closed_evidence.as_deref())),
2462 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2463 bools(rows.iter().map(|r| r.sync_calendar)),
2464 opt_string(rows.iter().map(|r| r.calendar_task_id.as_deref())),
2465 ],
2466 )?)
2467}
2468
2469fn simple_event_batch<'a>(
2470 schema: SchemaRef,
2471 rows: impl Iterator<Item = (i64, &'a String, &'a String, &'a String, &'a String)>,
2472) -> Result<RecordBatch> {
2473 let rows = rows.collect::<Vec<_>>();
2474 Ok(RecordBatch::try_new(
2475 schema,
2476 vec![
2477 Arc::new(Int64Array::from(
2478 rows.iter().map(|r| r.0).collect::<Vec<_>>(),
2479 )),
2480 string(rows.iter().map(|r| Some(r.1.as_str()))),
2481 string(rows.iter().map(|r| Some(r.2.as_str()))),
2482 string(rows.iter().map(|r| Some(r.3.as_str()))),
2483 string(rows.iter().map(|r| Some(r.4.as_str()))),
2484 ],
2485 )?)
2486}
2487
2488fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
2489 Arc::new(Schema::new(
2490 fields
2491 .iter()
2492 .map(|(name, ty, nullable)| Field::new(*name, ty.clone(), *nullable))
2493 .collect::<Vec<_>>(),
2494 ))
2495}
2496
2497fn ids<T>(rows: &[T], id: impl Fn(&T) -> i64) -> Arc<Int64Array> {
2498 int64(rows.iter().map(id))
2499}
2500
2501fn int64(values: impl Iterator<Item = i64>) -> Arc<Int64Array> {
2502 Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2503}
2504
2505fn int64_opt(values: impl Iterator<Item = Option<i64>>) -> Arc<Int64Array> {
2506 Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2507}
2508
2509fn bools(values: impl Iterator<Item = bool>) -> Arc<BooleanArray> {
2510 Arc::new(BooleanArray::from(values.collect::<Vec<_>>()))
2511}
2512
2513fn string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2514 Arc::new(StringArray::from(values.collect::<Vec<_>>()))
2515}
2516
2517fn opt_string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2518 string(values)
2519}
2520
2521fn ts(ts_utc: DateTime<Utc>) -> String {
2522 ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
2523}
2524
2525fn parse_ts(ts_utc: &str) -> Result<DateTime<Utc>> {
2526 Ok(DateTime::parse_from_rfc3339(ts_utc)?.with_timezone(&Utc))
2527}
2528
2529fn truncate(value: &str, max_chars: usize) -> String {
2530 value.chars().take(max_chars).collect()
2531}
2532
2533fn truncate_opt(value: Option<&str>, max_chars: usize) -> Option<String> {
2534 value.map(|s| truncate(s, max_chars))
2535}
2536
2537#[cfg(test)]
2538mod tests {
2539 use super::*;
2540
2541 fn db() -> Result<Db> {
2542 let db = Db::open_in_memory()?;
2543 db.migrate()?;
2544 Ok(db)
2545 }
2546
2547 fn fixed_ts() -> DateTime<Utc> {
2548 chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
2549 .unwrap()
2550 .with_timezone(&Utc)
2551 }
2552
2553 #[test]
2554 fn referenced_tables_extracts_scope() {
2555 let scope =
2556 referenced_storage_tables("SELECT COUNT(*) FROM messages WHERE ts_utc >= '2026-04-18'")
2557 .expect("single-table scope");
2558 assert!(scope.contains(MESSAGES));
2559 assert!(!scope.contains(CRASHES));
2560 assert_eq!(scope.len(), 1);
2561 }
2562
2563 #[test]
2564 fn referenced_tables_handles_alias_and_view() {
2565 let scope = referenced_storage_tables(
2566 "SELECT ts_utc_start FROM clone_lifetimes WHERE status IS NOT NULL",
2567 )
2568 .expect("view resolves to backing table");
2569 assert!(scope.contains(CLONE_DISPATCHES));
2570 }
2571
2572 #[test]
2573 fn referenced_tables_maps_datafusion_name_to_storage() {
2574 let scope =
2575 referenced_storage_tables("SELECT title FROM tasks").expect("tasks -> netsky_tasks");
2576 assert!(scope.contains(TASKS));
2577 assert!(!scope.contains("tasks"));
2578 }
2579
2580 #[test]
2581 fn referenced_tables_unknown_sql_returns_none() {
2582 assert!(referenced_storage_tables("SELECT 1").is_none());
2583 }
2584
2585 #[test]
2586 fn referenced_tables_join_captures_both() {
2587 let scope = referenced_storage_tables(
2588 "SELECT a.id FROM messages a JOIN crashes b ON a.ts_utc = b.ts_utc",
2589 )
2590 .expect("two tables");
2591 assert!(scope.contains(MESSAGES));
2592 assert!(scope.contains(CRASHES));
2593 }
2594
2595 #[test]
2596 fn list_tasks_pushes_filter_and_order_into_sql() -> Result<()> {
2597 let db = db()?;
2598 let mk = |title: &'static str, status: &'static str, priority: &'static str| {
2599 db.record_task(TaskRecord {
2600 title,
2601 body: None,
2602 status,
2603 priority: Some(priority),
2604 labels: None,
2605 source: None,
2606 source_ref: None,
2607 closed_at: None,
2608 closed_reason: None,
2609 closed_evidence: None,
2610 agent: None,
2611 sync_calendar: false,
2612 calendar_task_id: None,
2613 })
2614 };
2615 mk("c", "open", "p2")?;
2616 mk("a", "open", "p1")?;
2617 mk("b", "in_progress", "p1")?;
2618 mk("d", "closed", "p3")?;
2619
2620 let all = db.list_tasks(None, None)?;
2621 let order: Vec<_> = all.iter().map(|r| r.title.as_str()).collect();
2622 assert_eq!(
2623 order,
2624 vec!["d", "b", "a", "c"],
2625 "status then priority then id"
2626 );
2627
2628 let open_only = db.list_tasks(Some("open"), None)?;
2629 assert_eq!(open_only.len(), 2);
2630 assert!(open_only.iter().all(|r| r.status == "open"));
2631
2632 let p1 = db.list_tasks(None, Some("p1"))?;
2633 assert_eq!(p1.len(), 2);
2634 assert!(p1.iter().all(|r| r.priority.as_deref() == Some("p1")));
2635
2636 let open_p1 = db.list_tasks(Some("open"), Some("p1"))?;
2637 assert_eq!(open_p1.len(), 1);
2638 assert_eq!(open_p1[0].title, "a");
2639 Ok(())
2640 }
2641
2642 #[test]
2643 fn migrate_installs_expression_indexes_for_timed_tables() -> Result<()> {
2644 let db = db()?;
2645 let names = db.with_conn(|conn| async move {
2646 let mut rows = conn
2647 .query("SELECT name FROM sqlite_master WHERE type = 'index'", ())
2648 .await?;
2649 let mut out = Vec::new();
2650 while let Some(row) = rows.next().await? {
2651 out.push(text_value(&row.get_value(0)?)?);
2652 }
2653 Ok(out)
2654 })?;
2655 for (table, column) in INDEXED_TIME_COLUMNS {
2656 let expected = format!("idx_{table}_{column}");
2657 assert!(
2658 names.iter().any(|n| n == &expected),
2659 "missing index {expected} in {names:?}"
2660 );
2661 }
2662 for (table, column) in INDEXED_EQ_COLUMNS {
2663 let expected = format!("idx_{table}_{column}");
2664 assert!(
2665 names.iter().any(|n| n == &expected),
2666 "missing index {expected} in {names:?}"
2667 );
2668 }
2669 Ok(())
2670 }
2671
2672 #[test]
2673 fn contains_word_respects_identifier_boundary() {
2674 assert!(!contains_word("select from netsky_tasks", "tasks"));
2675 assert!(contains_word("select from tasks where", "tasks"));
2676 assert!(contains_word("FROM tasks;", "tasks"));
2677 assert!(!contains_word("classes", "class"));
2678 }
2679
2680 #[test]
2681 fn message_round_trip() -> Result<()> {
2682 let db = db()?;
2683 let id = db.record_message(MessageRecord {
2684 ts_utc: fixed_ts(),
2685 source: "imessage",
2686 direction: Direction::Inbound,
2687 chat_id: Some("chat-1"),
2688 from_agent: Some("agent9"),
2689 to_agent: None,
2690 body: Some("hello"),
2691 raw_json: Some("{\"chat_id\":\"chat-1\"}"),
2692 })?;
2693 let out = db.query("SELECT id, ts_utc, source, direction, chat_id, body FROM messages")?;
2694 assert_eq!(id, 1);
2695 assert!(out.contains("2026-04-15T23:00:00.000Z"));
2696 assert!(out.contains("imessage"));
2697 assert!(out.contains("inbound"));
2698 assert!(out.contains("chat-1"));
2699 assert!(out.contains("hello"));
2700 Ok(())
2701 }
2702
2703 #[test]
2704 fn cli_round_trip() -> Result<()> {
2705 let db = db()?;
2706 let id = db.record_cli(
2707 fixed_ts(),
2708 "netsky",
2709 "[\"db\", \"migrate\"]",
2710 Some(0),
2711 Some(12),
2712 "host-a",
2713 )?;
2714 let out = db.query("SELECT bin, exit_code, duration_ms, host FROM cli_invocations")?;
2715 assert_eq!(id, 1);
2716 assert!(out.contains("netsky"));
2717 assert!(out.contains("host-a"));
2718 assert!(out.contains("12"));
2719 Ok(())
2720 }
2721
2722 #[test]
2723 fn crash_round_trip() -> Result<()> {
2724 let db = db()?;
2725 let id = db.record_crash(fixed_ts(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
2726 let out = db.query("SELECT kind, agent, detail_json FROM crashes")?;
2727 assert_eq!(id, 1);
2728 assert!(out.contains("panic"));
2729 assert!(out.contains("agent8"));
2730 assert!(out.contains("wedged"));
2731 Ok(())
2732 }
2733
2734 #[test]
2735 fn tick_round_trip() -> Result<()> {
2736 let db = db()?;
2737 let id = db.record_tick(fixed_ts(), "ticker", "{\"beat\":1}")?;
2738 let out = db.query("SELECT source, detail_json FROM ticks")?;
2739 assert_eq!(id, 1);
2740 assert!(out.contains("ticker"));
2741 assert!(out.contains("beat"));
2742 Ok(())
2743 }
2744
2745 #[test]
2746 fn workspace_round_trip() -> Result<()> {
2747 let db = db()?;
2748 let created = fixed_ts();
2749 let deleted = created + chrono::Duration::minutes(5);
2750 let id = db.record_workspace(
2751 created,
2752 "session8",
2753 "feat/netsky-db-v0",
2754 Some(deleted),
2755 Some("kept"),
2756 )?;
2757 let out = db.query("SELECT name, branch, verdict FROM workspaces")?;
2758 assert_eq!(id, 1);
2759 assert!(out.contains("session8"));
2760 assert!(out.contains("feat/netsky-db-v0"));
2761 assert!(out.contains("kept"));
2762 Ok(())
2763 }
2764
2765 #[test]
2766 fn session_round_trip() -> Result<()> {
2767 let db = db()?;
2768 let id = db.record_session(fixed_ts(), "agent8", 8, SessionEvent::Note)?;
2769 let out = db.query("SELECT agent, session_num, event FROM sessions")?;
2770 assert_eq!(id, 1);
2771 assert!(out.contains("agent8"));
2772 assert!(out.contains("note"));
2773 Ok(())
2774 }
2775
2776 #[test]
2777 fn v2_tables_round_trip() -> Result<()> {
2778 let db = db()?;
2779 db.record_clone_dispatch(CloneDispatchRecord {
2780 ts_utc_start: fixed_ts(),
2781 ts_utc_end: Some(fixed_ts()),
2782 agent_id: "agent3",
2783 runtime: Some("codex"),
2784 brief_path: Some("briefs/foo.md"),
2785 brief: Some("do the thing"),
2786 workspace: Some("workspaces/foo/repo"),
2787 branch: Some("feat/foo"),
2788 status: Some("done"),
2789 exit_code: Some(0),
2790 detail_json: Some("{\"ok\":true}"),
2791 })?;
2792 db.record_harvest_event(HarvestEventRecord {
2793 ts_utc: fixed_ts(),
2794 source_branch: "feat/foo",
2795 target_branch: "main",
2796 commit_sha: Some("abc123"),
2797 status: "clean",
2798 conflicts: None,
2799 detail_json: None,
2800 })?;
2801 db.record_communication_event(CommunicationEventRecord {
2802 ts_utc: fixed_ts(),
2803 source: "imessage",
2804 tool: Some("reply"),
2805 direction: Direction::Outbound,
2806 chat_id: Some("chat-1"),
2807 message_id: Some("msg-1"),
2808 handle: Some("+15551234567"),
2809 agent: Some("agent0"),
2810 body: Some("sent"),
2811 status: Some("ok"),
2812 detail_json: None,
2813 })?;
2814 let comms = db.list_communication_events()?;
2815 assert_eq!(comms.len(), 1);
2816 assert_eq!(comms[0].source, "imessage");
2817 assert_eq!(comms[0].agent.as_deref(), Some("agent0"));
2818 db.record_mcp_tool_call(McpToolCallRecord {
2819 ts_utc_start: fixed_ts(),
2820 ts_utc_end: Some(fixed_ts()),
2821 source: "drive",
2822 tool: "list_files",
2823 agent: Some("agent0"),
2824 duration_ms: Some(22),
2825 success: true,
2826 error: None,
2827 timeout_race: false,
2828 request_json: Some("{}"),
2829 response_json: Some("[]"),
2830 })?;
2831 db.record_git_operation(GitOperationRecord {
2832 ts_utc: fixed_ts(),
2833 operation: "push",
2834 repo: "netsky",
2835 branch: Some("feat/foo"),
2836 remote: Some("origin"),
2837 from_sha: Some("abc"),
2838 to_sha: Some("def"),
2839 status: "ok",
2840 detail_json: None,
2841 })?;
2842 db.record_owner_directive(OwnerDirectiveRecord {
2843 ts_utc: fixed_ts(),
2844 source: "imessage",
2845 chat_id: Some("chat-1"),
2846 raw_text: "ship it",
2847 resolved_action: Some("dispatch"),
2848 agent: Some("agent0"),
2849 status: Some("accepted"),
2850 detail_json: None,
2851 })?;
2852 db.record_token_usage(TokenUsageRecord {
2853 ts_utc: fixed_ts(),
2854 session_id: Some("session-1"),
2855 agent: Some("agent4"),
2856 runtime: Some("codex"),
2857 model: Some("gpt-5.4"),
2858 input_tokens: Some(100),
2859 output_tokens: Some(25),
2860 cached_input_tokens: Some(10),
2861 cost_usd_micros: Some(1234),
2862 detail_json: None,
2863 })?;
2864 db.record_watchdog_event(WatchdogEventRecord {
2865 ts_utc: fixed_ts(),
2866 event: "respawn",
2867 agent: Some("agent0"),
2868 severity: Some("warn"),
2869 status: Some("ok"),
2870 detail_json: Some("{}"),
2871 })?;
2872
2873 for table in [
2874 "clone_dispatches",
2875 "harvest_events",
2876 "communication_events",
2877 "mcp_tool_calls",
2878 "git_operations",
2879 "owner_directives",
2880 "token_usage",
2881 "watchdog_events",
2882 ] {
2883 let out = db.query(&format!("SELECT COUNT(*) AS count FROM {table}"))?;
2884 assert!(out.contains('1'), "{table}: {out}");
2885 }
2886 Ok(())
2887 }
2888
2889 #[test]
2890 fn token_usage_batch_inserts_contiguous_ids() -> Result<()> {
2891 let db = db()?;
2892 let count = 1500;
2893 let records: Vec<_> = (0..count)
2894 .map(|i| TokenUsageRecord {
2895 ts_utc: fixed_ts(),
2896 session_id: Some("batch-session"),
2897 agent: Some("agent45"),
2898 runtime: Some("claude"),
2899 model: Some("claude-opus-4-7"),
2900 input_tokens: Some(i),
2901 output_tokens: Some(i * 2),
2902 cached_input_tokens: None,
2903 cost_usd_micros: None,
2904 detail_json: None,
2905 })
2906 .collect();
2907 let written = db.record_token_usage_batch(records)?;
2908 assert_eq!(written, count as usize);
2909 let out = db.query(
2910 "SELECT COUNT(*) AS c, MIN(id) AS lo, MAX(id) AS hi, \
2911 SUM(input_tokens) AS sum_in FROM token_usage",
2912 )?;
2913 assert!(out.contains(&count.to_string()), "{out}");
2914 assert!(out.contains("1124250"), "{out}");
2916 let written = db.record_token_usage_batch(vec![TokenUsageRecord {
2918 ts_utc: fixed_ts(),
2919 session_id: None,
2920 agent: None,
2921 runtime: None,
2922 model: None,
2923 input_tokens: Some(7),
2924 output_tokens: None,
2925 cached_input_tokens: None,
2926 cost_usd_micros: None,
2927 detail_json: None,
2928 }])?;
2929 assert_eq!(written, 1);
2930 let out = db.query("SELECT MAX(id) AS hi FROM token_usage")?;
2931 assert!(out.contains(&(count + 1).to_string()), "{out}");
2932 Ok(())
2933 }
2934
2935 #[test]
2936 fn token_usage_batch_empty_is_noop() -> Result<()> {
2937 let db = db()?;
2938 let written = db.record_token_usage_batch(Vec::<TokenUsageRecord<'_>>::new())?;
2939 assert_eq!(written, 0);
2940 Ok(())
2941 }
2942
2943 #[test]
2944 fn task_round_trip() -> Result<()> {
2945 let db = db()?;
2946 let id = db.record_task(TaskRecord {
2947 title: "Add a task tracker",
2948 body: Some("Use turso and DataFusion."),
2949 status: "open",
2950 priority: Some("p1"),
2951 labels: Some("db,cli"),
2952 source: Some("test"),
2953 source_ref: Some("briefs/task-tracker.md"),
2954 closed_at: None,
2955 closed_reason: None,
2956 closed_evidence: None,
2957 agent: None,
2958 sync_calendar: false,
2959 calendar_task_id: None,
2960 })?;
2961 let row = db.update_task(TaskUpdate {
2962 id,
2963 status: Some("in_progress"),
2964 priority: None,
2965 agent: Some("agent3"),
2966 closed_reason: None,
2967 closed_evidence: None,
2968 sync_calendar: Some(true),
2969 calendar_task_id: Some("calendar-1"),
2970 })?;
2971 assert_eq!(row.id, 1);
2972 assert_eq!(row.status, "in_progress");
2973 assert_eq!(row.agent.as_deref(), Some("agent3"));
2974 assert!(row.sync_calendar);
2975 assert_eq!(row.calendar_task_id.as_deref(), Some("calendar-1"));
2976 let rows = db.list_tasks(Some("in_progress"), Some("p1"))?;
2977 assert_eq!(rows.len(), 1);
2978 let out = db.query(
2979 "SELECT title, status, priority, agent, sync_calendar, calendar_task_id FROM tasks",
2980 )?;
2981 assert!(out.contains("Add a task tracker"));
2982 assert!(out.contains("in_progress"));
2983 assert!(out.contains("agent3"));
2984 assert!(out.contains("calendar-1"));
2985 Ok(())
2986 }
2987
2988 #[test]
2989 fn query_registers_all_tables() -> Result<()> {
2990 let db = db()?;
2991 let out = db.query(
2992 "SELECT COUNT(*) FROM messages \
2993 UNION ALL SELECT COUNT(*) FROM cli_invocations \
2994 UNION ALL SELECT COUNT(*) FROM crashes \
2995 UNION ALL SELECT COUNT(*) FROM ticks \
2996 UNION ALL SELECT COUNT(*) FROM workspaces \
2997 UNION ALL SELECT COUNT(*) FROM sessions \
2998 UNION ALL SELECT COUNT(*) FROM clone_dispatches \
2999 UNION ALL SELECT COUNT(*) FROM harvest_events \
3000 UNION ALL SELECT COUNT(*) FROM communication_events \
3001 UNION ALL SELECT COUNT(*) FROM mcp_tool_calls \
3002 UNION ALL SELECT COUNT(*) FROM git_operations \
3003 UNION ALL SELECT COUNT(*) FROM owner_directives \
3004 UNION ALL SELECT COUNT(*) FROM token_usage \
3005 UNION ALL SELECT COUNT(*) FROM watchdog_events \
3006 UNION ALL SELECT COUNT(*) FROM tasks \
3007 UNION ALL SELECT COUNT(*) FROM source_errors \
3008 UNION ALL SELECT COUNT(*) FROM iroh_events",
3009 )?;
3010 assert!(out.contains('0'));
3011 Ok(())
3012 }
3013
3014 #[test]
3015 fn read_only_path_reports_uninitialized_then_reads_initialized_db() -> Result<()> {
3016 let dir = tempfile::tempdir()?;
3017 let path = dir.path().join("meta.db");
3018
3019 let missing = match Db::open_read_only_path(&path) {
3020 Ok(_) => panic!("missing database opened read-only"),
3021 Err(error) => error.to_string(),
3022 };
3023 assert_eq!(
3024 missing,
3025 format!(
3026 "meta.db has not been initialized at {}; a writer must run first.",
3027 path.display()
3028 )
3029 );
3030 assert!(!path.exists());
3031
3032 let uninitialized = Db::open_path(&path)?;
3033 drop(uninitialized);
3034 let no_schema = match Db::open_read_only_path(&path) {
3035 Ok(_) => panic!("uninitialized database opened read-only"),
3036 Err(error) => error.to_string(),
3037 };
3038 assert_eq!(
3039 no_schema,
3040 format!(
3041 "meta.db has not been initialized at {}; a writer must run first.",
3042 path.display()
3043 )
3044 );
3045
3046 let writer = Db::open_path(&path)?;
3047 writer.migrate()?;
3048 writer.record_message(MessageRecord {
3049 ts_utc: fixed_ts(),
3050 source: "agent",
3051 direction: Direction::Outbound,
3052 chat_id: Some("agent0"),
3053 from_agent: Some("agent5"),
3054 to_agent: Some("agent0"),
3055 body: Some("checkpoint acked"),
3056 raw_json: None,
3057 })?;
3058
3059 let reader = Db::open_read_only_path(&path)?;
3060 assert!(reader.read_only_write_probe_fails()?);
3061 let out = reader.query("SELECT source, chat_id, body FROM messages")?;
3062 assert!(out.contains("agent"));
3063 assert!(out.contains("agent0"));
3064 assert!(out.contains("checkpoint acked"));
3065 Ok(())
3066 }
3067
3068 #[test]
3069 fn migrate_sets_schema_version_6() -> Result<()> {
3070 let db = db()?;
3071 assert_eq!(db.schema_version()?, 6);
3072 Ok(())
3073 }
3074
3075 #[test]
3076 fn migrate_sets_sqlite_user_version_6() -> Result<()> {
3077 let db = db()?;
3078 assert_eq!(
3079 db.with_conn(|conn| async move { user_version(&conn).await })?,
3080 SCHEMA_VERSION
3081 );
3082 Ok(())
3083 }
3084
3085 #[test]
3086 fn hash_peer_id_is_stable_and_truncated() {
3087 let a = hash_peer_id("abc123");
3088 let b = hash_peer_id("abc123");
3089 let c = hash_peer_id("abc124");
3090 assert_eq!(a, b, "same input hashes the same");
3091 assert_ne!(a, c, "different input hashes differently");
3092 assert_eq!(a.len(), 16, "8 bytes hex-encoded = 16 chars");
3093 assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
3094 }
3095
3096 #[test]
3103 fn hash_peer_id_matches_known_sha256_vectors() {
3104 assert_eq!(hash_peer_id("abc"), "ba7816bf8f01cfea");
3106 assert_eq!(hash_peer_id(""), "e3b0c44298fc1c14");
3108 assert_eq!(hash_peer_id("netsky0"), "66863bafcf1591d6");
3110 }
3111
3112 #[test]
3113 fn source_error_round_trip() -> Result<()> {
3114 let db = db()?;
3115 let id = db.record_source_error(SourceErrorRecord {
3116 ts_utc: fixed_ts(),
3117 source: "email",
3118 error_class: SourceErrorClass::Timeout,
3119 count: 1,
3120 detail_json: Some("{\"endpoint\":\"gmail\"}"),
3121 })?;
3122 assert_eq!(id, 1);
3123 db.record_source_error(SourceErrorRecord {
3124 ts_utc: fixed_ts(),
3125 source: "iroh",
3126 error_class: SourceErrorClass::NetworkError,
3127 count: 3,
3128 detail_json: None,
3129 })?;
3130 let out = db.query(
3131 "SELECT source, error_class, SUM(count) AS n FROM source_errors \
3132 GROUP BY source, error_class ORDER BY source",
3133 )?;
3134 assert!(out.contains("email"));
3135 assert!(out.contains("timeout"));
3136 assert!(out.contains("network_error"));
3137 Ok(())
3138 }
3139
3140 #[test]
3141 fn source_error_count_floors_at_one() -> Result<()> {
3142 let db = db()?;
3143 db.record_source_error(SourceErrorRecord {
3144 ts_utc: fixed_ts(),
3145 source: "email",
3146 error_class: SourceErrorClass::Unknown,
3147 count: 0,
3148 detail_json: None,
3149 })?;
3150 let out = db.query("SELECT count FROM source_errors")?;
3151 assert!(out.contains('1'));
3152 Ok(())
3153 }
3154
3155 #[test]
3156 fn source_cursor_round_trip() -> Result<()> {
3157 let db = db()?;
3158 assert_eq!(db.read_source_cursor("imessage")?, None);
3159 db.update_source_cursor("imessage", "42")?;
3160 assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("42"));
3161 db.update_source_cursor("imessage", "99")?;
3163 assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("99"));
3164 db.update_source_cursor("email", "abc")?;
3166 let all = db.list_source_cursors()?;
3167 assert_eq!(all.len(), 2);
3168 assert_eq!(all[0].source, "email");
3169 assert_eq!(all[0].cursor_value, "abc");
3170 assert_eq!(all[1].source, "imessage");
3171 assert_eq!(all[1].cursor_value, "99");
3172 assert!(db.reset_source_cursor("imessage")?);
3174 assert_eq!(db.read_source_cursor("imessage")?, None);
3175 assert!(!db.reset_source_cursor("imessage")?);
3176 Ok(())
3177 }
3178
3179 #[test]
3180 fn event_insert_then_transition_round_trip() -> Result<()> {
3181 let db = db()?;
3182 let delivered = db.insert_event("imessage", fixed_ts(), r#"{"chat":"a"}"#)?;
3183 let failed = db.insert_event("imessage", fixed_ts(), r#"{"chat":"b"}"#)?;
3184 assert!(delivered > 0 && failed > delivered);
3185 db.update_event_delivery(delivered, EventStatus::Delivered, None)?;
3186 db.update_event_delivery(failed, EventStatus::Failed, Some("adapter timed out"))?;
3187 let rows = db.tail_events("imessage", 10)?;
3189 assert_eq!(rows.len(), 2);
3190 assert_eq!(rows[0].id, failed);
3191 assert_eq!(rows[0].delivery_status, "failed");
3192 assert_eq!(rows[0].reason.as_deref(), Some("adapter timed out"));
3193 assert_eq!(rows[1].id, delivered);
3194 assert_eq!(rows[1].delivery_status, "delivered");
3195 assert_eq!(rows[1].reason, None);
3196 let other = db.insert_event("email", fixed_ts(), "{}")?;
3198 assert!(other > failed);
3199 let only_imessage = db.tail_events("imessage", 10)?;
3200 assert_eq!(only_imessage.len(), 2);
3201 Ok(())
3202 }
3203
3204 #[test]
3205 fn iroh_event_round_trip() -> Result<()> {
3206 let db = db()?;
3207 let hash = hash_peer_id("raw_node_id_abc");
3208 let id = db.record_iroh_event(IrohEventRecord {
3209 ts_utc: fixed_ts(),
3210 event_type: IrohEventType::Connect,
3211 peer_id_hash: &hash,
3212 peer_label: Some("work"),
3213 detail_json: None,
3214 })?;
3215 assert_eq!(id, 1);
3216 db.record_iroh_event(IrohEventRecord {
3217 ts_utc: fixed_ts(),
3218 event_type: IrohEventType::HandshakeRefused,
3219 peer_id_hash: &hash_peer_id("unknown_peer"),
3220 peer_label: None,
3221 detail_json: Some("{\"reason\":\"not_in_allowlist\"}"),
3222 })?;
3223 let out =
3224 db.query("SELECT event_type, peer_id_hash, peer_label FROM iroh_events ORDER BY id")?;
3225 assert!(out.contains("connect"));
3226 assert!(out.contains("handshake_refused"));
3227 assert!(out.contains("work"));
3228 assert!(!out.contains("raw_node_id_abc"), "raw node id leaked");
3229 Ok(())
3230 }
3231
3232 #[test]
3233 fn token_usage_includes_runtime_column() -> Result<()> {
3234 let db = db()?;
3235 db.record_token_usage(TokenUsageRecord {
3236 ts_utc: fixed_ts(),
3237 session_id: Some("s1"),
3238 agent: Some("agent12"),
3239 runtime: Some("claude"),
3240 model: Some("claude-opus-4-7"),
3241 input_tokens: Some(1000),
3242 output_tokens: Some(500),
3243 cached_input_tokens: None,
3244 cost_usd_micros: Some(12_500),
3245 detail_json: None,
3246 })?;
3247 let out =
3248 db.query("SELECT runtime, SUM(input_tokens) AS ins FROM token_usage GROUP BY runtime")?;
3249 assert!(out.contains("claude"));
3250 assert!(out.contains("1000"));
3251 Ok(())
3252 }
3253
3254 #[test]
3255 fn clone_lifetimes_view_reads_finished_dispatches() -> Result<()> {
3256 let db = db()?;
3257 db.record_clone_dispatch(CloneDispatchRecord {
3258 ts_utc_start: fixed_ts(),
3259 ts_utc_end: Some(fixed_ts() + chrono::Duration::minutes(7)),
3260 agent_id: "agent5",
3261 runtime: Some("claude"),
3262 brief_path: None,
3263 brief: None,
3264 workspace: Some("workspaces/foo/repo"),
3265 branch: Some("feat/foo"),
3266 status: Some("done"),
3267 exit_code: Some(0),
3268 detail_json: None,
3269 })?;
3270 db.record_clone_dispatch(CloneDispatchRecord {
3271 ts_utc_start: fixed_ts(),
3272 ts_utc_end: None,
3273 agent_id: "agent6",
3274 runtime: Some("codex"),
3275 brief_path: None,
3276 brief: None,
3277 workspace: None,
3278 branch: None,
3279 status: Some("in_flight"),
3280 exit_code: None,
3281 detail_json: None,
3282 })?;
3283 let out = db.query("SELECT agent_id, runtime FROM clone_lifetimes")?;
3284 assert!(out.contains("agent5"));
3285 assert!(
3286 !out.contains("agent6"),
3287 "in-flight dispatches stay excluded"
3288 );
3289 Ok(())
3290 }
3291
3292 #[test]
3293 fn migration_from_v4_preserves_existing_rows_and_adds_new_tables() -> Result<()> {
3294 let dir = tempfile::tempdir()?;
3295 let path = dir.path().join("meta.db");
3296
3297 {
3298 let db = Db::open_path(&path)?;
3299 db.migrate()?;
3300 db.with_conn(|conn| async move {
3306 conn.execute("UPDATE meta SET value = 4 WHERE key = 'schema_version'", ())
3307 .await?;
3308 conn.execute("PRAGMA user_version = 4", ()).await?;
3309 Ok(())
3310 })?;
3311 db.record_message(MessageRecord {
3312 ts_utc: fixed_ts(),
3313 source: "imessage",
3314 direction: Direction::Inbound,
3315 chat_id: Some("chat-1"),
3316 from_agent: None,
3317 to_agent: None,
3318 body: Some("pre-migration"),
3319 raw_json: None,
3320 })?;
3321 }
3322
3323 let db = Db::open_path(&path)?;
3324 db.migrate()?;
3325 assert_eq!(db.schema_version()?, 6);
3326
3327 let out = db.query("SELECT body FROM messages")?;
3328 assert!(
3329 out.contains("pre-migration"),
3330 "migration lost pre-existing row: {out}"
3331 );
3332 let se = db.query("SELECT COUNT(*) FROM source_errors")?;
3334 assert!(se.contains('0'));
3335 let ie = db.query("SELECT COUNT(*) FROM iroh_events")?;
3336 assert!(ie.contains('0'));
3337 assert!(db.list_source_cursors()?.is_empty());
3340 assert!(db.tail_events("imessage", 10)?.is_empty());
3341 Ok(())
3342 }
3343}