1use std::collections::HashSet;
7use std::fs;
8use std::path::Path;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::{DateTime, SecondsFormat, Utc};
14use datafusion::arrow::array::{BooleanArray, Int64Array, StringArray};
15use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
16use datafusion::arrow::record_batch::RecordBatch;
17use datafusion::arrow::util::pretty::pretty_format_batches;
18use datafusion::datasource::MemTable;
19use datafusion::prelude::SessionContext;
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use thiserror::Error;
22use tokio::runtime::Runtime;
23use turso::{Builder, Connection, Value, params_from_iter};
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27pub use datafusion::arrow::array as arrow_array;
29pub use datafusion::arrow::record_batch::RecordBatch as ArrowRecordBatch;
30
31pub const SCHEMA_VERSION: i64 = 7;
32
33const TABLE_NAMES: [&str; 17] = [
34 "messages",
35 "cli_invocations",
36 "crashes",
37 "ticks",
38 "workspaces",
39 "sessions",
40 "clone_dispatches",
41 "harvest_events",
42 "communication_events",
43 "mcp_tool_calls",
44 "git_operations",
45 "owner_directives",
46 "token_usage",
47 "watchdog_events",
48 "netsky_tasks",
49 "source_errors",
50 "iroh_events",
51];
52
53const MESSAGES: &str = "messages";
54const CLI_INVOCATIONS: &str = "cli_invocations";
55const CRASHES: &str = "crashes";
56const TICKS: &str = "ticks";
57const WORKSPACES: &str = "workspaces";
58const SESSIONS: &str = "sessions";
59const CLONE_DISPATCHES: &str = "clone_dispatches";
60const HARVEST_EVENTS: &str = "harvest_events";
61const COMMUNICATION_EVENTS: &str = "communication_events";
62const MCP_TOOL_CALLS: &str = "mcp_tool_calls";
63const GIT_OPERATIONS: &str = "git_operations";
64const OWNER_DIRECTIVES: &str = "owner_directives";
65const TOKEN_USAGE: &str = "token_usage";
66const WATCHDOG_EVENTS: &str = "watchdog_events";
67const TASKS: &str = "netsky_tasks";
68const SOURCE_ERRORS: &str = "source_errors";
69const IROH_EVENTS: &str = "iroh_events";
70const TOKEN_USAGE_CLAUDE_SESSIONS_UNIQUE_INDEX: &str =
71 "idx_token_usage_claude_sessions_session_path_line";
72const TOKEN_USAGE_CLAUDE_SESSIONS_FILTER: &str = "json_extract(json_extract(row_json, '$.detail_json'), '$.source') = 'claude-sessions' \
73 AND json_extract(json_extract(row_json, '$.detail_json'), '$.path') IS NOT NULL \
74 AND json_extract(json_extract(row_json, '$.detail_json'), '$.line') IS NOT NULL";
75const TOKEN_USAGE_CLAUDE_SESSIONS_KEY: &str = "COALESCE(json_extract(row_json, '$.session_id'), ''), \
76 json_extract(json_extract(row_json, '$.detail_json'), '$.path'), \
77 json_extract(json_extract(row_json, '$.detail_json'), '$.line')";
78
79const DATAFUSION_TABLES: [(&str, &str); 17] = [
82 ("messages", MESSAGES),
83 ("cli_invocations", CLI_INVOCATIONS),
84 ("crashes", CRASHES),
85 ("ticks", TICKS),
86 ("workspaces", WORKSPACES),
87 ("sessions", SESSIONS),
88 ("clone_dispatches", CLONE_DISPATCHES),
89 ("harvest_events", HARVEST_EVENTS),
90 ("communication_events", COMMUNICATION_EVENTS),
91 ("mcp_tool_calls", MCP_TOOL_CALLS),
92 ("git_operations", GIT_OPERATIONS),
93 ("owner_directives", OWNER_DIRECTIVES),
94 ("token_usage", TOKEN_USAGE),
95 ("watchdog_events", WATCHDOG_EVENTS),
96 ("tasks", TASKS),
97 ("source_errors", SOURCE_ERRORS),
98 ("iroh_events", IROH_EVENTS),
99];
100
101const CLONE_LIFETIMES_VIEW: &str = "clone_lifetimes";
103
104const INDEXED_TIME_COLUMNS: &[(&str, &str)] = &[
112 (MESSAGES, "ts_utc"),
113 (CLI_INVOCATIONS, "ts_utc"),
114 (CRASHES, "ts_utc"),
115 (TICKS, "ts_utc"),
116 (WORKSPACES, "ts_utc_created"),
117 (SESSIONS, "ts_utc"),
118 (CLONE_DISPATCHES, "ts_utc_start"),
119 (HARVEST_EVENTS, "ts_utc"),
120 (COMMUNICATION_EVENTS, "ts_utc"),
121 (MCP_TOOL_CALLS, "ts_utc_start"),
122 (GIT_OPERATIONS, "ts_utc"),
123 (OWNER_DIRECTIVES, "ts_utc"),
124 (TOKEN_USAGE, "ts_utc"),
125 (WATCHDOG_EVENTS, "ts_utc"),
126 (TASKS, "created_at"),
127 (SOURCE_ERRORS, "ts_utc"),
128 (IROH_EVENTS, "ts_utc"),
129];
130
131const INDEXED_EQ_COLUMNS: &[(&str, &str)] = &[
135 (TASKS, "status"),
136 (TASKS, "priority"),
137 (MESSAGES, "source"),
138 (SESSIONS, "event"),
139 (IROH_EVENTS, "event_type"),
140];
141
142#[derive(Debug, Error)]
143pub enum Error {
144 #[error("home directory not found")]
145 HomeDirMissing,
146 #[error("schema version {found} is newer than supported {supported}")]
147 FutureSchemaVersion { found: i64, supported: i64 },
148 #[error("meta.db has not been initialized at {0}; a writer must run first.")]
149 NotInitialized(PathBuf),
150 #[error("read-only connection was writable at {0}")]
151 ReadOnlyNotEnforced(PathBuf),
152 #[error(transparent)]
153 Turso(#[from] turso::Error),
154 #[error(transparent)]
155 DataFusion(#[from] datafusion::error::DataFusionError),
156 #[error(transparent)]
157 Arrow(#[from] datafusion::arrow::error::ArrowError),
158 #[error(transparent)]
159 Serde(#[from] serde_json::Error),
160 #[error(transparent)]
161 Io(#[from] std::io::Error),
162 #[error(transparent)]
163 Chrono(#[from] chrono::ParseError),
164 #[error(
165 "netsky-db locked after {retries} retries (total wait {total_wait_ms}ms).\n hint: a clone is writing heavily; retry or check `lsof` for holders."
166 )]
167 Locked {
168 retries: u32,
169 total_wait_ms: u128,
170 detail: String,
171 },
172 #[error("{0} not found")]
173 NotFound(String),
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
177pub enum Direction {
178 Inbound,
179 Outbound,
180}
181
182impl Direction {
183 fn as_str(self) -> &'static str {
184 match self {
185 Direction::Inbound => "inbound",
186 Direction::Outbound => "outbound",
187 }
188 }
189}
190
191#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
192pub enum SessionEvent {
193 Up,
194 Down,
195 Note,
196}
197
198impl SessionEvent {
199 fn as_str(self) -> &'static str {
200 match self {
201 SessionEvent::Up => "up",
202 SessionEvent::Down => "down",
203 SessionEvent::Note => "note",
204 }
205 }
206}
207
208#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
212pub enum EventStatus {
213 Pending,
214 Delivered,
215 Failed,
216}
217
218impl EventStatus {
219 pub fn as_str(self) -> &'static str {
220 match self {
221 EventStatus::Pending => "pending",
222 EventStatus::Delivered => "delivered",
223 EventStatus::Failed => "failed",
224 }
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
230pub struct SourceCursorRow {
231 pub source: String,
232 pub cursor_value: String,
233 pub updated_at: String,
234}
235
236#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub struct EventRow {
239 pub id: i64,
240 pub source: String,
241 pub ts_utc: String,
242 pub payload_json: String,
243 pub delivery_status: String,
244 pub reason: Option<String>,
245}
246
247#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252pub enum SourceErrorClass {
253 Timeout,
254 AuthFailure,
255 RateLimit,
256 NetworkError,
257 ProtocolError,
258 NotFound,
259 PermissionDenied,
260 Unknown,
261}
262
263impl SourceErrorClass {
264 pub fn as_str(self) -> &'static str {
265 match self {
266 SourceErrorClass::Timeout => "timeout",
267 SourceErrorClass::AuthFailure => "auth_failure",
268 SourceErrorClass::RateLimit => "rate_limit",
269 SourceErrorClass::NetworkError => "network_error",
270 SourceErrorClass::ProtocolError => "protocol_error",
271 SourceErrorClass::NotFound => "not_found",
272 SourceErrorClass::PermissionDenied => "permission_denied",
273 SourceErrorClass::Unknown => "unknown",
274 }
275 }
276}
277
278#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
281pub enum IrohEventType {
282 Connect,
283 Evict,
284 Reconnect,
285 HandshakeRefused,
286}
287
288impl IrohEventType {
289 pub fn as_str(self) -> &'static str {
290 match self {
291 IrohEventType::Connect => "connect",
292 IrohEventType::Evict => "evict",
293 IrohEventType::Reconnect => "reconnect",
294 IrohEventType::HandshakeRefused => "handshake_refused",
295 }
296 }
297}
298
299pub fn hash_peer_id(raw_node_id: &str) -> String {
304 use sha2::{Digest, Sha256};
305 let mut hasher = Sha256::new();
306 hasher.update(raw_node_id.as_bytes());
307 let digest = hasher.finalize();
308 hex_encode(&digest[..8])
309}
310
311fn hex_encode(bytes: &[u8]) -> String {
312 const HEX: &[u8; 16] = b"0123456789abcdef";
313 let mut out = String::with_capacity(bytes.len() * 2);
314 for byte in bytes {
315 out.push(HEX[(byte >> 4) as usize] as char);
316 out.push(HEX[(byte & 0x0f) as usize] as char);
317 }
318 out
319}
320
321pub struct Db {
322 path: PathBuf,
323}
324
325pub struct MessageRecord<'a> {
326 pub ts_utc: DateTime<Utc>,
327 pub source: &'a str,
328 pub direction: Direction,
329 pub chat_id: Option<&'a str>,
330 pub from_agent: Option<&'a str>,
331 pub to_agent: Option<&'a str>,
332 pub body: Option<&'a str>,
333 pub raw_json: Option<&'a str>,
334}
335
336pub struct CloneDispatchRecord<'a> {
337 pub ts_utc_start: DateTime<Utc>,
338 pub ts_utc_end: Option<DateTime<Utc>>,
339 pub agent_id: &'a str,
340 pub runtime: Option<&'a str>,
341 pub brief_path: Option<&'a str>,
342 pub brief: Option<&'a str>,
343 pub workspace: Option<&'a str>,
344 pub branch: Option<&'a str>,
345 pub status: Option<&'a str>,
346 pub exit_code: Option<i64>,
347 pub detail_json: Option<&'a str>,
348}
349
350pub struct HarvestEventRecord<'a> {
351 pub ts_utc: DateTime<Utc>,
352 pub source_branch: &'a str,
353 pub target_branch: &'a str,
354 pub commit_sha: Option<&'a str>,
355 pub status: &'a str,
356 pub conflicts: Option<&'a str>,
357 pub detail_json: Option<&'a str>,
358}
359
360pub struct CommunicationEventRecord<'a> {
361 pub ts_utc: DateTime<Utc>,
362 pub source: &'a str,
363 pub tool: Option<&'a str>,
364 pub direction: Direction,
365 pub chat_id: Option<&'a str>,
366 pub message_id: Option<&'a str>,
367 pub handle: Option<&'a str>,
368 pub agent: Option<&'a str>,
369 pub body: Option<&'a str>,
370 pub status: Option<&'a str>,
371 pub detail_json: Option<&'a str>,
372}
373
374#[derive(Debug, Clone, Eq, PartialEq)]
375pub struct CommunicationEvent {
376 pub id: i64,
377 pub ts_utc: DateTime<Utc>,
378 pub source: String,
379 pub tool: Option<String>,
380 pub direction: String,
381 pub chat_id: Option<String>,
382 pub message_id: Option<String>,
383 pub handle: Option<String>,
384 pub agent: Option<String>,
385 pub body: Option<String>,
386 pub status: Option<String>,
387 pub detail_json: Option<String>,
388}
389
390pub struct McpToolCallRecord<'a> {
391 pub ts_utc_start: DateTime<Utc>,
392 pub ts_utc_end: Option<DateTime<Utc>>,
393 pub source: &'a str,
394 pub tool: &'a str,
395 pub agent: Option<&'a str>,
396 pub duration_ms: Option<i64>,
397 pub success: bool,
398 pub error: Option<&'a str>,
399 pub timeout_race: bool,
400 pub request_json: Option<&'a str>,
401 pub response_json: Option<&'a str>,
402}
403
404pub struct GitOperationRecord<'a> {
405 pub ts_utc: DateTime<Utc>,
406 pub operation: &'a str,
407 pub repo: &'a str,
408 pub branch: Option<&'a str>,
409 pub remote: Option<&'a str>,
410 pub from_sha: Option<&'a str>,
411 pub to_sha: Option<&'a str>,
412 pub status: &'a str,
413 pub detail_json: Option<&'a str>,
414}
415
416pub struct OwnerDirectiveRecord<'a> {
417 pub ts_utc: DateTime<Utc>,
418 pub source: &'a str,
419 pub chat_id: Option<&'a str>,
420 pub raw_text: &'a str,
421 pub resolved_action: Option<&'a str>,
422 pub agent: Option<&'a str>,
423 pub status: Option<&'a str>,
424 pub detail_json: Option<&'a str>,
425}
426
427pub struct TokenUsageRecord<'a> {
428 pub ts_utc: DateTime<Utc>,
429 pub session_id: Option<&'a str>,
430 pub agent: Option<&'a str>,
431 pub runtime: Option<&'a str>,
435 pub model: Option<&'a str>,
436 pub input_tokens: Option<i64>,
437 pub output_tokens: Option<i64>,
438 pub cached_input_tokens: Option<i64>,
439 pub cost_usd_micros: Option<i64>,
440 pub detail_json: Option<&'a str>,
441}
442
443pub struct SourceErrorRecord<'a> {
444 pub ts_utc: DateTime<Utc>,
445 pub source: &'a str,
446 pub error_class: SourceErrorClass,
447 pub count: i64,
451 pub detail_json: Option<&'a str>,
452}
453
454pub struct IrohEventRecord<'a> {
455 pub ts_utc: DateTime<Utc>,
456 pub event_type: IrohEventType,
457 pub peer_id_hash: &'a str,
460 pub peer_label: Option<&'a str>,
464 pub detail_json: Option<&'a str>,
465}
466
467pub struct WatchdogEventRecord<'a> {
468 pub ts_utc: DateTime<Utc>,
469 pub event: &'a str,
470 pub agent: Option<&'a str>,
471 pub severity: Option<&'a str>,
472 pub status: Option<&'a str>,
473 pub detail_json: Option<&'a str>,
474}
475
476pub struct TaskRecord<'a> {
477 pub title: &'a str,
478 pub body: Option<&'a str>,
479 pub status: &'a str,
480 pub priority: Option<&'a str>,
481 pub labels: Option<&'a str>,
482 pub source: Option<&'a str>,
483 pub source_ref: Option<&'a str>,
484 pub closed_at: Option<&'a str>,
485 pub closed_reason: Option<&'a str>,
486 pub closed_evidence: Option<&'a str>,
487 pub agent: Option<&'a str>,
488 pub sync_calendar: bool,
489 pub calendar_task_id: Option<&'a str>,
490}
491
492pub struct TaskUpdate<'a> {
493 pub id: i64,
494 pub status: Option<&'a str>,
495 pub priority: Option<&'a str>,
496 pub agent: Option<&'a str>,
497 pub closed_reason: Option<&'a str>,
498 pub closed_evidence: Option<&'a str>,
499 pub sync_calendar: Option<bool>,
500 pub calendar_task_id: Option<&'a str>,
501}
502
503#[derive(Debug, Serialize, Deserialize)]
504struct MessageRow {
505 id: i64,
506 ts_utc: String,
507 source: String,
508 direction: String,
509 chat_id: Option<String>,
510 from_agent: Option<String>,
511 to_agent: Option<String>,
512 body: Option<String>,
513 raw_json: Option<String>,
514}
515
516#[derive(Debug, Serialize, Deserialize)]
517struct CliRow {
518 id: i64,
519 ts_utc: String,
520 bin: String,
521 argv_json: String,
522 exit_code: Option<i64>,
523 duration_ms: Option<i64>,
524 host: String,
525}
526
527#[derive(Debug, Serialize, Deserialize)]
528struct CrashRow {
529 id: i64,
530 ts_utc: String,
531 kind: String,
532 agent: String,
533 detail_json: String,
534}
535
536#[derive(Debug, Serialize, Deserialize)]
537struct TickRow {
538 id: i64,
539 ts_utc: String,
540 source: String,
541 detail_json: String,
542}
543
544#[derive(Debug, Serialize, Deserialize)]
545struct WorkspaceRow {
546 id: i64,
547 ts_utc_created: String,
548 name: String,
549 branch: String,
550 ts_utc_deleted: Option<String>,
551 verdict: Option<String>,
552}
553
554#[derive(Debug, Serialize, Deserialize)]
555struct SessionRow {
556 id: i64,
557 ts_utc: String,
558 agent: String,
559 session_num: i64,
560 event: String,
561}
562
563#[derive(Debug, Serialize, Deserialize)]
564struct CloneDispatchRow {
565 id: i64,
566 ts_utc_start: String,
567 ts_utc_end: Option<String>,
568 agent_id: String,
569 runtime: Option<String>,
570 brief_path: Option<String>,
571 brief: Option<String>,
572 workspace: Option<String>,
573 branch: Option<String>,
574 status: Option<String>,
575 exit_code: Option<i64>,
576 detail_json: Option<String>,
577}
578
579#[derive(Debug, Serialize, Deserialize)]
580struct HarvestEventRow {
581 id: i64,
582 ts_utc: String,
583 source_branch: String,
584 target_branch: String,
585 commit_sha: Option<String>,
586 status: String,
587 conflicts: Option<String>,
588 detail_json: Option<String>,
589}
590
591#[derive(Debug, Serialize, Deserialize)]
592struct CommunicationEventRow {
593 id: i64,
594 ts_utc: String,
595 source: String,
596 tool: Option<String>,
597 direction: String,
598 chat_id: Option<String>,
599 message_id: Option<String>,
600 handle: Option<String>,
601 agent: Option<String>,
602 body: Option<String>,
603 status: Option<String>,
604 detail_json: Option<String>,
605}
606
607impl TryFrom<CommunicationEventRow> for CommunicationEvent {
608 type Error = Error;
609
610 fn try_from(row: CommunicationEventRow) -> Result<Self> {
611 Ok(Self {
612 id: row.id,
613 ts_utc: parse_ts(&row.ts_utc)?,
614 source: row.source,
615 tool: row.tool,
616 direction: row.direction,
617 chat_id: row.chat_id,
618 message_id: row.message_id,
619 handle: row.handle,
620 agent: row.agent,
621 body: row.body,
622 status: row.status,
623 detail_json: row.detail_json,
624 })
625 }
626}
627
628#[derive(Debug, Serialize, Deserialize)]
629struct McpToolCallRow {
630 id: i64,
631 ts_utc_start: String,
632 ts_utc_end: Option<String>,
633 source: String,
634 tool: String,
635 agent: Option<String>,
636 duration_ms: Option<i64>,
637 success: bool,
638 error: Option<String>,
639 timeout_race: bool,
640 request_json: Option<String>,
641 response_json: Option<String>,
642}
643
644#[derive(Debug, Serialize, Deserialize)]
645struct GitOperationRow {
646 id: i64,
647 ts_utc: String,
648 operation: String,
649 repo: String,
650 branch: Option<String>,
651 remote: Option<String>,
652 from_sha: Option<String>,
653 to_sha: Option<String>,
654 status: String,
655 detail_json: Option<String>,
656}
657
658#[derive(Debug, Serialize, Deserialize)]
659struct OwnerDirectiveRow {
660 id: i64,
661 ts_utc: String,
662 source: String,
663 chat_id: Option<String>,
664 raw_text: String,
665 resolved_action: Option<String>,
666 agent: Option<String>,
667 status: Option<String>,
668 detail_json: Option<String>,
669}
670
671#[derive(Debug, Serialize, Deserialize)]
672struct TokenUsageRow {
673 id: i64,
674 ts_utc: String,
675 session_id: Option<String>,
676 agent: Option<String>,
677 #[serde(default)]
678 runtime: Option<String>,
679 model: Option<String>,
680 input_tokens: Option<i64>,
681 output_tokens: Option<i64>,
682 cached_input_tokens: Option<i64>,
683 cost_usd_micros: Option<i64>,
684 detail_json: Option<String>,
685}
686
687#[derive(Debug, Serialize, Deserialize)]
688struct SourceErrorRow {
689 id: i64,
690 ts_utc: String,
691 source: String,
692 error_class: String,
693 count: i64,
694 detail_json: Option<String>,
695}
696
697#[derive(Debug, Serialize, Deserialize)]
698struct IrohEventRow {
699 id: i64,
700 ts_utc: String,
701 event_type: String,
702 peer_id_hash: String,
703 peer_label: Option<String>,
704 detail_json: Option<String>,
705}
706
707#[derive(Debug, Clone, Serialize, Deserialize)]
708struct WatchdogEventRow {
709 id: i64,
710 ts_utc: String,
711 event: String,
712 agent: Option<String>,
713 severity: Option<String>,
714 status: Option<String>,
715 detail_json: Option<String>,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct TaskRow {
720 pub id: i64,
721 pub created_at: String,
722 pub updated_at: String,
723 pub title: String,
724 pub body: Option<String>,
725 pub status: String,
726 pub priority: Option<String>,
727 pub labels: Option<String>,
728 pub source: Option<String>,
729 pub source_ref: Option<String>,
730 pub closed_at: Option<String>,
731 pub closed_reason: Option<String>,
732 pub closed_evidence: Option<String>,
733 pub agent: Option<String>,
734 #[serde(default)]
735 pub sync_calendar: bool,
736 #[serde(default)]
737 pub calendar_task_id: Option<String>,
738}
739
740impl Db {
741 pub fn open() -> Result<Self> {
742 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
743 let dir = home.join(".netsky");
744 fs::create_dir_all(&dir)?;
745 Self::open_path(dir.join("meta.db"))
746 }
747
748 pub fn open_read_only() -> Result<Self> {
749 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
750 Self::open_read_only_path(home.join(".netsky").join("meta.db"))
751 }
752
753 pub fn open_path(path: impl AsRef<Path>) -> Result<Self> {
754 let path = path.as_ref().to_path_buf();
755 if path != Path::new(":memory:")
756 && let Some(parent) = path.parent()
757 {
758 fs::create_dir_all(parent)?;
759 }
760 let db = Self { path };
761 db.with_conn(|_| async { Ok(()) })?;
762 Ok(db)
763 }
764
765 pub fn open_read_only_path(path: impl AsRef<Path>) -> Result<Self> {
766 let path = path.as_ref().to_path_buf();
767 if path != Path::new(":memory:") && !path.exists() {
768 return Err(Error::NotInitialized(path));
769 }
770 let db = Self { path };
771 db.with_read_only_conn(|_| async { Ok(()) })?;
772 Ok(db)
773 }
774
775 #[cfg(test)]
776 pub(crate) fn open_in_memory() -> Result<Self> {
777 let path = std::env::temp_dir().join(format!(
778 "netsky-db-test-{}-{}.db",
779 std::process::id(),
780 Utc::now().timestamp_nanos_opt().unwrap_or_default()
781 ));
782 Self::open_path(path)
783 }
784
785 pub fn migrate(&self) -> Result<()> {
786 let path = self.path.clone();
787 self.with_conn(move |conn| async move {
788 let current = schema_version(&conn).await?;
789 if current > SCHEMA_VERSION {
790 return Err(Error::FutureSchemaVersion {
791 found: current,
792 supported: SCHEMA_VERSION,
793 });
794 }
795 conn.execute(
796 "INSERT INTO meta (key, value) VALUES (?1, ?2) \
797 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
798 params_from_iter([Value::from("schema_version"), Value::from(SCHEMA_VERSION)]),
799 )
800 .await?;
801 for table in TABLE_NAMES {
802 let sql = format!(
803 "CREATE TABLE IF NOT EXISTS {table} \
804 (id INTEGER PRIMARY KEY, row_json TEXT NOT NULL)"
805 );
806 conn.execute(&sql, ()).await?;
807 }
808 for &(table, column) in INDEXED_TIME_COLUMNS {
809 let sql = format!(
810 "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
811 ON {table}(json_extract(row_json, '$.{column}'))"
812 );
813 conn.execute(&sql, ()).await?;
814 }
815 for &(table, column) in INDEXED_EQ_COLUMNS {
816 let sql = format!(
817 "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
818 ON {table}(json_extract(row_json, '$.{column}'))"
819 );
820 conn.execute(&sql, ()).await?;
821 }
822 conn.execute(
827 "CREATE TABLE IF NOT EXISTS source_cursors ( \
828 source TEXT PRIMARY KEY, \
829 cursor_value TEXT NOT NULL, \
830 updated_at TEXT NOT NULL \
831 )",
832 (),
833 )
834 .await?;
835 conn.execute(
836 "CREATE TABLE IF NOT EXISTS events ( \
837 id INTEGER PRIMARY KEY AUTOINCREMENT, \
838 source TEXT NOT NULL, \
839 ts_utc TEXT NOT NULL, \
840 payload_json TEXT NOT NULL, \
841 delivery_status TEXT NOT NULL, \
842 reason TEXT \
843 )",
844 (),
845 )
846 .await?;
847 migrate_token_usage_ingest_idempotency(&path, &conn, current).await?;
848 conn.execute(&format!("PRAGMA user_version = {SCHEMA_VERSION}"), ())
849 .await?;
850 Ok(())
851 })
852 }
853
854 pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
855 self.insert_json("messages", MESSAGES, |id| MessageRow {
856 id,
857 ts_utc: ts(record.ts_utc),
858 source: record.source.to_string(),
859 direction: record.direction.as_str().to_string(),
860 chat_id: record.chat_id.map(str::to_string),
861 from_agent: record.from_agent.map(str::to_string),
862 to_agent: record.to_agent.map(str::to_string),
863 body: record.body.map(str::to_string),
864 raw_json: record.raw_json.map(str::to_string),
865 })
866 }
867
868 pub fn record_cli(
869 &self,
870 ts_utc: DateTime<Utc>,
871 bin: &str,
872 argv_json: &str,
873 exit_code: Option<i64>,
874 duration_ms: Option<i64>,
875 host: &str,
876 ) -> Result<i64> {
877 self.insert_json("cli_invocations", CLI_INVOCATIONS, |id| CliRow {
878 id,
879 ts_utc: ts(ts_utc),
880 bin: bin.to_string(),
881 argv_json: argv_json.to_string(),
882 exit_code,
883 duration_ms,
884 host: host.to_string(),
885 })
886 }
887
888 pub fn record_crash(
889 &self,
890 ts_utc: DateTime<Utc>,
891 kind: &str,
892 agent: &str,
893 detail_json: &str,
894 ) -> Result<i64> {
895 self.insert_json("crashes", CRASHES, |id| CrashRow {
896 id,
897 ts_utc: ts(ts_utc),
898 kind: kind.to_string(),
899 agent: agent.to_string(),
900 detail_json: detail_json.to_string(),
901 })
902 }
903
904 pub fn record_tick(
905 &self,
906 ts_utc: DateTime<Utc>,
907 source: &str,
908 detail_json: &str,
909 ) -> Result<i64> {
910 self.insert_json("ticks", TICKS, |id| TickRow {
911 id,
912 ts_utc: ts(ts_utc),
913 source: source.to_string(),
914 detail_json: detail_json.to_string(),
915 })
916 }
917
918 pub fn record_workspace(
919 &self,
920 ts_utc_created: DateTime<Utc>,
921 name: &str,
922 branch: &str,
923 ts_utc_deleted: Option<DateTime<Utc>>,
924 verdict: Option<&str>,
925 ) -> Result<i64> {
926 self.insert_json("workspaces", WORKSPACES, |id| WorkspaceRow {
927 id,
928 ts_utc_created: ts(ts_utc_created),
929 name: name.to_string(),
930 branch: branch.to_string(),
931 ts_utc_deleted: ts_utc_deleted.map(ts),
932 verdict: verdict.map(str::to_string),
933 })
934 }
935
936 pub fn record_session(
937 &self,
938 ts_utc: DateTime<Utc>,
939 agent: &str,
940 session_num: i64,
941 event: SessionEvent,
942 ) -> Result<i64> {
943 self.insert_json("sessions", SESSIONS, |id| SessionRow {
944 id,
945 ts_utc: ts(ts_utc),
946 agent: agent.to_string(),
947 session_num,
948 event: event.as_str().to_string(),
949 })
950 }
951
952 pub fn record_clone_dispatch(&self, record: CloneDispatchRecord<'_>) -> Result<i64> {
953 self.insert_json("clone_dispatches", CLONE_DISPATCHES, |id| {
954 CloneDispatchRow {
955 id,
956 ts_utc_start: ts(record.ts_utc_start),
957 ts_utc_end: record.ts_utc_end.map(ts),
958 agent_id: record.agent_id.to_string(),
959 runtime: record.runtime.map(str::to_string),
960 brief_path: record.brief_path.map(str::to_string),
961 brief: truncate_opt(record.brief, 16_384),
962 workspace: record.workspace.map(str::to_string),
963 branch: record.branch.map(str::to_string),
964 status: record.status.map(str::to_string),
965 exit_code: record.exit_code,
966 detail_json: record.detail_json.map(str::to_string),
967 }
968 })
969 }
970
971 pub fn record_harvest_event(&self, record: HarvestEventRecord<'_>) -> Result<i64> {
972 self.insert_json("harvest_events", HARVEST_EVENTS, |id| HarvestEventRow {
973 id,
974 ts_utc: ts(record.ts_utc),
975 source_branch: record.source_branch.to_string(),
976 target_branch: record.target_branch.to_string(),
977 commit_sha: record.commit_sha.map(str::to_string),
978 status: record.status.to_string(),
979 conflicts: record.conflicts.map(str::to_string),
980 detail_json: record.detail_json.map(str::to_string),
981 })
982 }
983
984 pub fn record_communication_event(&self, record: CommunicationEventRecord<'_>) -> Result<i64> {
985 self.insert_json("communication_events", COMMUNICATION_EVENTS, |id| {
986 CommunicationEventRow {
987 id,
988 ts_utc: ts(record.ts_utc),
989 source: record.source.to_string(),
990 tool: record.tool.map(str::to_string),
991 direction: record.direction.as_str().to_string(),
992 chat_id: record.chat_id.map(str::to_string),
993 message_id: record.message_id.map(str::to_string),
994 handle: record.handle.map(str::to_string),
995 agent: record.agent.map(str::to_string),
996 body: truncate_opt(record.body, 4096),
997 status: record.status.map(str::to_string),
998 detail_json: record.detail_json.map(str::to_string),
999 }
1000 })
1001 }
1002
1003 pub fn list_communication_events(&self) -> Result<Vec<CommunicationEvent>> {
1004 let rows: Vec<CommunicationEventRow> = self
1005 .with_read_only_conn(|conn| async move { rows(&conn, COMMUNICATION_EVENTS).await })?;
1006 rows.into_iter().map(CommunicationEvent::try_from).collect()
1007 }
1008
1009 pub fn record_mcp_tool_call(&self, record: McpToolCallRecord<'_>) -> Result<i64> {
1010 self.insert_json("mcp_tool_calls", MCP_TOOL_CALLS, |id| McpToolCallRow {
1011 id,
1012 ts_utc_start: ts(record.ts_utc_start),
1013 ts_utc_end: record.ts_utc_end.map(ts),
1014 source: record.source.to_string(),
1015 tool: record.tool.to_string(),
1016 agent: record.agent.map(str::to_string),
1017 duration_ms: record.duration_ms,
1018 success: record.success,
1019 error: truncate_opt(record.error, 2048),
1020 timeout_race: record.timeout_race,
1021 request_json: truncate_opt(record.request_json, 8192),
1022 response_json: truncate_opt(record.response_json, 8192),
1023 })
1024 }
1025
1026 pub fn record_git_operation(&self, record: GitOperationRecord<'_>) -> Result<i64> {
1027 self.insert_json("git_operations", GIT_OPERATIONS, |id| GitOperationRow {
1028 id,
1029 ts_utc: ts(record.ts_utc),
1030 operation: record.operation.to_string(),
1031 repo: record.repo.to_string(),
1032 branch: record.branch.map(str::to_string),
1033 remote: record.remote.map(str::to_string),
1034 from_sha: record.from_sha.map(str::to_string),
1035 to_sha: record.to_sha.map(str::to_string),
1036 status: record.status.to_string(),
1037 detail_json: record.detail_json.map(str::to_string),
1038 })
1039 }
1040
1041 pub fn record_owner_directive(&self, record: OwnerDirectiveRecord<'_>) -> Result<i64> {
1042 self.insert_json("owner_directives", OWNER_DIRECTIVES, |id| {
1043 OwnerDirectiveRow {
1044 id,
1045 ts_utc: ts(record.ts_utc),
1046 source: record.source.to_string(),
1047 chat_id: record.chat_id.map(str::to_string),
1048 raw_text: truncate(record.raw_text, 16_384),
1049 resolved_action: record.resolved_action.map(str::to_string),
1050 agent: record.agent.map(str::to_string),
1051 status: record.status.map(str::to_string),
1052 detail_json: record.detail_json.map(str::to_string),
1053 }
1054 })
1055 }
1056
1057 pub fn record_token_usage(&self, record: TokenUsageRecord<'_>) -> Result<i64> {
1058 self.insert_json("token_usage", TOKEN_USAGE, |id| TokenUsageRow {
1059 id,
1060 ts_utc: ts(record.ts_utc),
1061 session_id: record.session_id.map(str::to_string),
1062 agent: record.agent.map(str::to_string),
1063 runtime: record.runtime.map(str::to_string),
1064 model: record.model.map(str::to_string),
1065 input_tokens: record.input_tokens,
1066 output_tokens: record.output_tokens,
1067 cached_input_tokens: record.cached_input_tokens,
1068 cost_usd_micros: record.cost_usd_micros,
1069 detail_json: record.detail_json.map(str::to_string),
1070 })
1071 }
1072
1073 pub fn record_token_usage_batch<'a, I>(&self, records: I) -> Result<usize>
1080 where
1081 I: IntoIterator<Item = TokenUsageRecord<'a>>,
1082 {
1083 let raw: Vec<TokenUsageRecord<'_>> = records.into_iter().collect();
1084 if raw.is_empty() {
1085 return Ok(0);
1086 }
1087 let count = raw.len();
1088 let start_id = self.bulk_alloc_ids(TOKEN_USAGE, count as i64)?;
1089 let mut payloads: Vec<(i64, String)> = Vec::with_capacity(count);
1090 let mut owned_rows: Vec<TokenUsageRow> = Vec::with_capacity(count);
1091 for (offset, record) in raw.into_iter().enumerate() {
1092 let id = start_id + offset as i64;
1093 let row = TokenUsageRow {
1094 id,
1095 ts_utc: ts(record.ts_utc),
1096 session_id: record.session_id.map(str::to_string),
1097 agent: record.agent.map(str::to_string),
1098 runtime: record.runtime.map(str::to_string),
1099 model: record.model.map(str::to_string),
1100 input_tokens: record.input_tokens,
1101 output_tokens: record.output_tokens,
1102 cached_input_tokens: record.cached_input_tokens,
1103 cost_usd_micros: record.cost_usd_micros,
1104 detail_json: record.detail_json.map(str::to_string),
1105 };
1106 payloads.push((id, serde_json::to_string(&row)?));
1107 owned_rows.push(row);
1108 }
1109 let table = TOKEN_USAGE;
1110 let payloads_for_write = payloads.clone();
1111 let write = retry_db_lock(|| {
1112 let payloads_for_write = payloads_for_write.clone();
1113 self.with_conn(move |conn| async move {
1114 let sql = format!(
1115 "INSERT INTO {table} (id, row_json) VALUES (?1, ?2) ON CONFLICT DO NOTHING"
1116 );
1117 conn.execute("BEGIN", ()).await?;
1118 let mut inserted = 0usize;
1119 for (id, json) in payloads_for_write {
1120 match conn
1121 .execute(&sql, params_from_iter([Value::from(id), Value::from(json)]))
1122 .await
1123 {
1124 Ok(changed) => inserted += changed as usize,
1125 Err(error) => {
1126 let _ = conn.execute("ROLLBACK", ()).await;
1127 return Err(error.into());
1128 }
1129 }
1130 }
1131 conn.execute("COMMIT", ()).await?;
1132 Ok(inserted)
1133 })
1134 });
1135 let inserted = match write {
1136 Ok(inserted) => inserted,
1137 Err(error) => {
1138 for row in &owned_rows {
1139 spool_write_error(table, row.id, row, &error)?;
1140 }
1141 return Err(error);
1142 }
1143 };
1144 if inserted < count {
1145 self.with_conn(move |conn| async move {
1146 let sql =
1147 "UPDATE ids SET id = MAX(COALESCE((SELECT MAX(id) FROM token_usage), 0), 0) \
1148 WHERE name = ?1";
1149 conn.execute(sql, params_from_iter([Value::from(TOKEN_USAGE)]))
1150 .await?;
1151 Ok(())
1152 })?;
1153 }
1154 Ok(inserted)
1155 }
1156
1157 fn bulk_alloc_ids(&self, name: &'static str, count: i64) -> Result<i64> {
1162 if count <= 0 {
1163 return Ok(0);
1164 }
1165 let result = self.with_conn(move |conn| async move {
1166 let mut rows = conn
1167 .query(
1168 "INSERT INTO ids (name, id) VALUES (?1, ?2) \
1169 ON CONFLICT(name) DO UPDATE SET id = id + ?2 \
1170 RETURNING id",
1171 params_from_iter([Value::from(name), Value::from(count)]),
1172 )
1173 .await?;
1174 if let Some(row) = rows.next().await? {
1175 let last = integer_value(&row.get_value(0)?)?;
1176 return Ok(last - count + 1);
1177 }
1178 Err(Error::NotFound(format!("bulk id allocation for {name}")))
1179 });
1180 match result {
1181 Ok(start) => Ok(start),
1182 Err(error) => {
1183 let fallback = Utc::now().timestamp_micros();
1184 spool_error_json(serde_json::json!({
1185 "ts_utc": ts(Utc::now()),
1186 "table": name,
1187 "id": fallback,
1188 "count": count,
1189 "error": error.to_string(),
1190 "record": {"kind": "bulk_id_allocation"}
1191 }))?;
1192 Ok(fallback)
1193 }
1194 }
1195 }
1196
1197 pub fn record_source_error(&self, record: SourceErrorRecord<'_>) -> Result<i64> {
1198 self.insert_json("source_errors", SOURCE_ERRORS, |id| SourceErrorRow {
1199 id,
1200 ts_utc: ts(record.ts_utc),
1201 source: record.source.to_string(),
1202 error_class: record.error_class.as_str().to_string(),
1203 count: record.count.max(1),
1204 detail_json: record.detail_json.map(str::to_string),
1205 })
1206 }
1207
1208 pub fn record_iroh_event(&self, record: IrohEventRecord<'_>) -> Result<i64> {
1209 self.insert_json("iroh_events", IROH_EVENTS, |id| IrohEventRow {
1210 id,
1211 ts_utc: ts(record.ts_utc),
1212 event_type: record.event_type.as_str().to_string(),
1213 peer_id_hash: record.peer_id_hash.to_string(),
1214 peer_label: record.peer_label.map(str::to_string),
1215 detail_json: record.detail_json.map(str::to_string),
1216 })
1217 }
1218
1219 pub fn record_watchdog_event(&self, record: WatchdogEventRecord<'_>) -> Result<i64> {
1220 self.insert_json("watchdog_events", WATCHDOG_EVENTS, |id| WatchdogEventRow {
1221 id,
1222 ts_utc: ts(record.ts_utc),
1223 event: record.event.to_string(),
1224 agent: record.agent.map(str::to_string),
1225 severity: record.severity.map(str::to_string),
1226 status: record.status.map(str::to_string),
1227 detail_json: record.detail_json.map(str::to_string),
1228 })
1229 }
1230
1231 pub fn read_source_cursor(&self, source: &str) -> Result<Option<String>> {
1234 let source = source.to_string();
1235 self.with_conn(move |conn| async move {
1236 let mut rows = conn
1237 .query(
1238 "SELECT cursor_value FROM source_cursors WHERE source = ?1",
1239 params_from_iter([Value::from(source)]),
1240 )
1241 .await?;
1242 let Some(row) = rows.next().await? else {
1243 return Ok(None);
1244 };
1245 Ok(Some(text_value(&row.get_value(0)?)?))
1246 })
1247 }
1248
1249 pub fn update_source_cursor(&self, source: &str, value: &str) -> Result<()> {
1252 let source = source.to_string();
1253 let value = value.to_string();
1254 let now = ts(Utc::now());
1255 retry_db_lock(|| {
1256 let source = source.clone();
1257 let value = value.clone();
1258 let now = now.clone();
1259 self.with_conn(move |conn| async move {
1260 conn.execute(
1261 "INSERT INTO source_cursors (source, cursor_value, updated_at) \
1262 VALUES (?1, ?2, ?3) \
1263 ON CONFLICT(source) DO UPDATE SET \
1264 cursor_value = excluded.cursor_value, \
1265 updated_at = excluded.updated_at",
1266 params_from_iter([Value::from(source), Value::from(value), Value::from(now)]),
1267 )
1268 .await?;
1269 Ok(())
1270 })
1271 })
1272 }
1273
1274 pub fn reset_source_cursor(&self, source: &str) -> Result<bool> {
1277 let source = source.to_string();
1278 retry_db_lock(|| {
1279 let source = source.clone();
1280 self.with_conn(move |conn| async move {
1281 let mut rows = conn
1282 .query(
1283 "DELETE FROM source_cursors WHERE source = ?1 RETURNING source",
1284 params_from_iter([Value::from(source)]),
1285 )
1286 .await?;
1287 Ok(rows.next().await?.is_some())
1288 })
1289 })
1290 }
1291
1292 pub fn list_source_cursors(&self) -> Result<Vec<SourceCursorRow>> {
1293 self.with_read_only_conn(|conn| async move {
1294 let mut rows = conn
1295 .query(
1296 "SELECT source, cursor_value, updated_at FROM source_cursors ORDER BY source",
1297 (),
1298 )
1299 .await?;
1300 let mut out = Vec::new();
1301 while let Some(row) = rows.next().await? {
1302 out.push(SourceCursorRow {
1303 source: text_value(&row.get_value(0)?)?,
1304 cursor_value: text_value(&row.get_value(1)?)?,
1305 updated_at: text_value(&row.get_value(2)?)?,
1306 });
1307 }
1308 Ok(out)
1309 })
1310 }
1311
1312 pub fn insert_event(
1316 &self,
1317 source: &str,
1318 ts_utc: DateTime<Utc>,
1319 payload_json: &str,
1320 ) -> Result<i64> {
1321 let source = source.to_string();
1322 let ts_str = ts(ts_utc);
1323 let payload = payload_json.to_string();
1324 retry_db_lock(|| {
1325 let source = source.clone();
1326 let ts_str = ts_str.clone();
1327 let payload = payload.clone();
1328 self.with_conn(move |conn| async move {
1329 let mut rows = conn
1330 .query(
1331 "INSERT INTO events (source, ts_utc, payload_json, delivery_status) \
1332 VALUES (?1, ?2, ?3, 'pending') RETURNING id",
1333 params_from_iter([
1334 Value::from(source),
1335 Value::from(ts_str),
1336 Value::from(payload),
1337 ]),
1338 )
1339 .await?;
1340 let Some(row) = rows.next().await? else {
1341 return Err(Error::NotFound("event id after insert".into()));
1342 };
1343 integer_value(&row.get_value(0)?)
1344 })
1345 })
1346 }
1347
1348 pub fn update_event_delivery(
1351 &self,
1352 id: i64,
1353 status: EventStatus,
1354 reason: Option<&str>,
1355 ) -> Result<()> {
1356 let reason_value = reason.map(str::to_string);
1357 let status_str = status.as_str().to_string();
1358 retry_db_lock(|| {
1359 let status_str = status_str.clone();
1360 let reason_value = reason_value.clone();
1361 self.with_conn(move |conn| async move {
1362 conn.execute(
1363 "UPDATE events SET delivery_status = ?1, reason = ?2 WHERE id = ?3",
1364 params_from_iter([
1365 Value::from(status_str),
1366 match reason_value {
1367 Some(r) => Value::from(r),
1368 None => Value::Null,
1369 },
1370 Value::from(id),
1371 ]),
1372 )
1373 .await?;
1374 Ok(())
1375 })
1376 })
1377 }
1378
1379 pub fn tail_events(&self, source: &str, limit: i64) -> Result<Vec<EventRow>> {
1381 let source = source.to_string();
1382 self.with_read_only_conn(move |conn| async move {
1383 let mut rows = conn
1384 .query(
1385 "SELECT id, source, ts_utc, payload_json, delivery_status, reason \
1386 FROM events WHERE source = ?1 ORDER BY id DESC LIMIT ?2",
1387 params_from_iter([Value::from(source), Value::from(limit)]),
1388 )
1389 .await?;
1390 let mut out = Vec::new();
1391 while let Some(row) = rows.next().await? {
1392 let reason = match row.get_value(5)? {
1393 Value::Null => None,
1394 other => Some(
1395 other
1396 .as_text()
1397 .cloned()
1398 .ok_or_else(|| Error::NotFound("event reason".into()))?,
1399 ),
1400 };
1401 out.push(EventRow {
1402 id: integer_value(&row.get_value(0)?)?,
1403 source: text_value(&row.get_value(1)?)?,
1404 ts_utc: text_value(&row.get_value(2)?)?,
1405 payload_json: text_value(&row.get_value(3)?)?,
1406 delivery_status: text_value(&row.get_value(4)?)?,
1407 reason,
1408 });
1409 }
1410 Ok(out)
1411 })
1412 }
1413
1414 pub fn record_task(&self, record: TaskRecord<'_>) -> Result<i64> {
1415 let now = ts(Utc::now());
1416 self.insert_json("netsky_tasks", TASKS, |id| TaskRow {
1417 id,
1418 created_at: now.clone(),
1419 updated_at: now.clone(),
1420 title: record.title.to_string(),
1421 body: record.body.map(str::to_string),
1422 status: record.status.to_string(),
1423 priority: record.priority.map(str::to_string),
1424 labels: record.labels.map(str::to_string),
1425 source: record.source.map(str::to_string),
1426 source_ref: record.source_ref.map(str::to_string),
1427 closed_at: record.closed_at.map(|value| {
1428 if value.is_empty() {
1429 now.clone()
1430 } else {
1431 value.to_string()
1432 }
1433 }),
1434 closed_reason: record.closed_reason.map(str::to_string),
1435 closed_evidence: record.closed_evidence.map(str::to_string),
1436 agent: record.agent.map(str::to_string),
1437 sync_calendar: record.sync_calendar,
1438 calendar_task_id: record.calendar_task_id.map(str::to_string),
1439 })
1440 }
1441
1442 pub fn update_task(&self, update: TaskUpdate<'_>) -> Result<TaskRow> {
1443 let mut row = self
1444 .get_task(update.id)?
1445 .ok_or_else(|| Error::NotFound(format!("task {}", update.id)))?;
1446 let now = ts(Utc::now());
1447 if let Some(status) = update.status {
1448 row.status = status.to_string();
1449 if status == "closed" && row.closed_at.is_none() {
1450 row.closed_at = Some(now.clone());
1451 }
1452 }
1453 if let Some(priority) = update.priority {
1454 row.priority = Some(priority.to_string());
1455 }
1456 if let Some(agent) = update.agent {
1457 row.agent = Some(agent.to_string());
1458 }
1459 if let Some(reason) = update.closed_reason {
1460 row.closed_reason = Some(reason.to_string());
1461 }
1462 if let Some(evidence) = update.closed_evidence {
1463 row.closed_evidence = Some(evidence.to_string());
1464 }
1465 if let Some(sync_calendar) = update.sync_calendar {
1466 row.sync_calendar = sync_calendar;
1467 }
1468 if let Some(calendar_task_id) = update.calendar_task_id {
1469 row.calendar_task_id = Some(calendar_task_id.to_string());
1470 }
1471 row.updated_at = now;
1472 let row_json = serde_json::to_string(&row)?;
1473 let id = update.id;
1474 let write = retry_db_lock(|| {
1475 let row_json = row_json.clone();
1476 self.with_conn(move |conn| async move {
1477 conn.execute(
1478 "UPDATE netsky_tasks SET row_json = ?1 WHERE id = ?2",
1479 params_from_iter([Value::from(row_json), Value::from(id)]),
1480 )
1481 .await?;
1482 Ok(())
1483 })
1484 });
1485 if let Err(error) = write {
1486 spool_write_error("netsky_tasks", update.id, &row, &error)?;
1487 }
1488 Ok(row)
1489 }
1490
1491 pub fn get_task(&self, id: i64) -> Result<Option<TaskRow>> {
1492 self.with_conn(move |conn| async move {
1493 let mut rows = conn
1494 .query(
1495 "SELECT row_json FROM netsky_tasks WHERE id = ?1",
1496 params_from_iter([Value::from(id)]),
1497 )
1498 .await?;
1499 let Some(row) = rows.next().await? else {
1500 return Ok(None);
1501 };
1502 let json = text_value(&row.get_value(0)?)?;
1503 Ok(Some(serde_json::from_str(&json)?))
1504 })
1505 }
1506
1507 pub fn list_tasks(&self, status: Option<&str>, priority: Option<&str>) -> Result<Vec<TaskRow>> {
1508 let status = status.map(str::to_string);
1509 let priority = priority.map(str::to_string);
1510 self.with_conn(move |conn| async move {
1511 let status_val = status.map(Value::from).unwrap_or(Value::Null);
1512 let priority_val = priority.map(Value::from).unwrap_or(Value::Null);
1513 let mut rows = conn
1514 .query(
1515 "SELECT row_json FROM netsky_tasks \
1516 WHERE (?1 IS NULL OR json_extract(row_json, '$.status') = ?1) \
1517 AND (?2 IS NULL OR json_extract(row_json, '$.priority') = ?2) \
1518 ORDER BY json_extract(row_json, '$.status'), \
1519 json_extract(row_json, '$.priority'), \
1520 id",
1521 params_from_iter([status_val, priority_val]),
1522 )
1523 .await?;
1524 let mut out = Vec::new();
1525 while let Some(row) = rows.next().await? {
1526 let json = text_value(&row.get_value(0)?)?;
1527 out.push(serde_json::from_str(&json)?);
1528 }
1529 Ok(out)
1530 })
1531 }
1532
1533 pub fn query(&self, sql: &str) -> Result<String> {
1534 let batches = self.query_batches(sql)?;
1535 Ok(pretty_format_batches(&batches)?.to_string())
1536 }
1537
1538 pub fn query_batches(&self, sql: &str) -> Result<Vec<RecordBatch>> {
1539 let scope = referenced_storage_tables(sql);
1540 let snapshot = self.snapshot_scoped(scope.as_ref())?;
1541 let runtime = tokio::runtime::Runtime::new()?;
1542 runtime.block_on(async move {
1543 let ctx = register_snapshot(snapshot).await?;
1544 let df = ctx.sql(sql).await?;
1545 Ok(df.collect().await?)
1546 })
1547 }
1548
1549 pub fn schema_version(&self) -> Result<i64> {
1550 self.with_conn(|conn| async move { schema_version(&conn).await })
1551 }
1552
1553 fn insert_json<T, F>(&self, name: &'static str, table: &'static str, row: F) -> Result<i64>
1554 where
1555 T: Serialize,
1556 F: FnOnce(i64) -> T,
1557 {
1558 let id = self.next_id(name)?;
1559 let row = row(id);
1560 let row_json = serde_json::to_string(&row)?;
1561 let write = retry_db_lock(|| {
1562 let row_json = row_json.clone();
1563 self.with_conn({
1564 let table = table.to_string();
1565 move |conn| async move {
1566 let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
1567 conn.execute(
1568 &sql,
1569 params_from_iter([Value::from(id), Value::from(row_json)]),
1570 )
1571 .await?;
1572 Ok(())
1573 }
1574 })
1575 });
1576 if let Err(error) = write {
1577 spool_write_error(table, id, &row, &error)?;
1578 }
1579 Ok(id)
1580 }
1581
1582 fn snapshot_scoped(&self, scope: Option<&HashSet<&'static str>>) -> Result<Snapshot> {
1587 let owned: Option<HashSet<&'static str>> = scope.cloned();
1588 self.with_read_only_conn(move |conn| async move {
1589 let wants = |t: &'static str| owned.as_ref().is_none_or(|s| s.contains(t));
1590 Ok(Snapshot {
1591 messages: if wants(MESSAGES) {
1592 rows(&conn, MESSAGES).await?
1593 } else {
1594 Vec::new()
1595 },
1596 cli_invocations: if wants(CLI_INVOCATIONS) {
1597 rows(&conn, CLI_INVOCATIONS).await?
1598 } else {
1599 Vec::new()
1600 },
1601 crashes: if wants(CRASHES) {
1602 rows(&conn, CRASHES).await?
1603 } else {
1604 Vec::new()
1605 },
1606 ticks: if wants(TICKS) {
1607 rows(&conn, TICKS).await?
1608 } else {
1609 Vec::new()
1610 },
1611 workspaces: if wants(WORKSPACES) {
1612 rows(&conn, WORKSPACES).await?
1613 } else {
1614 Vec::new()
1615 },
1616 sessions: if wants(SESSIONS) {
1617 rows(&conn, SESSIONS).await?
1618 } else {
1619 Vec::new()
1620 },
1621 clone_dispatches: if wants(CLONE_DISPATCHES) {
1622 rows(&conn, CLONE_DISPATCHES).await?
1623 } else {
1624 Vec::new()
1625 },
1626 harvest_events: if wants(HARVEST_EVENTS) {
1627 rows(&conn, HARVEST_EVENTS).await?
1628 } else {
1629 Vec::new()
1630 },
1631 communication_events: if wants(COMMUNICATION_EVENTS) {
1632 rows(&conn, COMMUNICATION_EVENTS).await?
1633 } else {
1634 Vec::new()
1635 },
1636 mcp_tool_calls: if wants(MCP_TOOL_CALLS) {
1637 rows(&conn, MCP_TOOL_CALLS).await?
1638 } else {
1639 Vec::new()
1640 },
1641 git_operations: if wants(GIT_OPERATIONS) {
1642 rows(&conn, GIT_OPERATIONS).await?
1643 } else {
1644 Vec::new()
1645 },
1646 owner_directives: if wants(OWNER_DIRECTIVES) {
1647 rows(&conn, OWNER_DIRECTIVES).await?
1648 } else {
1649 Vec::new()
1650 },
1651 token_usage: if wants(TOKEN_USAGE) {
1652 rows(&conn, TOKEN_USAGE).await?
1653 } else {
1654 Vec::new()
1655 },
1656 watchdog_events: if wants(WATCHDOG_EVENTS) {
1657 rows(&conn, WATCHDOG_EVENTS).await?
1658 } else {
1659 Vec::new()
1660 },
1661 tasks: if wants(TASKS) {
1662 rows(&conn, TASKS).await?
1663 } else {
1664 Vec::new()
1665 },
1666 source_errors: if wants(SOURCE_ERRORS) {
1667 rows(&conn, SOURCE_ERRORS).await?
1668 } else {
1669 Vec::new()
1670 },
1671 iroh_events: if wants(IROH_EVENTS) {
1672 rows(&conn, IROH_EVENTS).await?
1673 } else {
1674 Vec::new()
1675 },
1676 })
1677 })
1678 }
1679
1680 fn next_id(&self, name: &'static str) -> Result<i64> {
1681 let result = self.with_conn(move |conn| async move {
1682 let mut rows = conn
1683 .query(
1684 "INSERT INTO ids (name, id) VALUES (?1, 1) \
1685 ON CONFLICT(name) DO UPDATE SET id = id + 1 \
1686 RETURNING id",
1687 params_from_iter([Value::from(name)]),
1688 )
1689 .await?;
1690 if let Some(row) = rows.next().await? {
1691 return integer_value(&row.get_value(0)?);
1692 }
1693 Err(Error::NotFound(format!("id allocation for {name}")))
1694 });
1695 match result {
1696 Ok(id) => Ok(id),
1697 Err(error) => {
1698 let fallback = Utc::now().timestamp_micros();
1699 spool_error_json(serde_json::json!({
1700 "ts_utc": ts(Utc::now()),
1701 "table": name,
1702 "id": fallback,
1703 "error": error.to_string(),
1704 "record": {"kind": "id_allocation"}
1705 }))?;
1706 Ok(fallback)
1707 }
1708 }
1709 }
1710
1711 fn with_conn<T, F, Fut>(&self, f: F) -> Result<T>
1712 where
1713 T: Send + 'static,
1714 F: FnOnce(Connection) -> Fut + Send + 'static,
1715 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1716 {
1717 let path = self.path.clone();
1718 let max_attempts = if tokio::runtime::Handle::try_current().is_ok() {
1719 3
1720 } else {
1721 10
1722 };
1723 let task = async move {
1724 let path = path.to_string_lossy().to_string();
1725 let mut last_error = None;
1726 for attempt in 0..=max_attempts {
1727 match Builder::new_local(&path).build().await {
1728 Ok(db) => {
1729 let conn = db.connect()?;
1730 configure_conn(&conn).await?;
1731 return f(conn).await;
1732 }
1733 Err(error) if is_locking_error(&error) && attempt < max_attempts => {
1734 last_error = Some(error);
1735 tokio::time::sleep(Duration::from_millis(10 + attempt * 2)).await;
1736 }
1737 Err(error) => return Err(error.into()),
1738 }
1739 }
1740 Err(last_error
1741 .map(Error::from)
1742 .unwrap_or_else(|| Error::NotFound("turso connection".to_string())))
1743 };
1744 block_on_turso(task)
1745 }
1746
1747 fn with_read_only_conn<T, F, Fut>(&self, f: F) -> Result<T>
1748 where
1749 T: Send + 'static,
1750 F: FnOnce(Connection) -> Fut + Send + 'static,
1751 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1752 {
1753 let path = self.path.clone();
1754 let task = async move {
1755 let path_str = path.to_string_lossy().to_string();
1756 let db = Builder::new_local(&path_str).build().await?;
1757 let conn = db.connect()?;
1758 configure_read_only_conn(&conn, &path).await?;
1759 f(conn).await
1760 };
1761 block_on_turso(task)
1762 }
1763
1764 #[cfg(test)]
1765 fn read_only_write_probe_fails(&self) -> Result<bool> {
1766 self.with_read_only_conn(|conn| async move {
1767 Ok(conn
1768 .execute(
1769 "INSERT INTO meta (key, value) VALUES ('read_only_probe', 1)",
1770 (),
1771 )
1772 .await
1773 .is_err())
1774 })
1775 }
1776}
1777
1778struct Snapshot {
1779 messages: Vec<MessageRow>,
1780 cli_invocations: Vec<CliRow>,
1781 crashes: Vec<CrashRow>,
1782 ticks: Vec<TickRow>,
1783 workspaces: Vec<WorkspaceRow>,
1784 sessions: Vec<SessionRow>,
1785 clone_dispatches: Vec<CloneDispatchRow>,
1786 harvest_events: Vec<HarvestEventRow>,
1787 communication_events: Vec<CommunicationEventRow>,
1788 mcp_tool_calls: Vec<McpToolCallRow>,
1789 git_operations: Vec<GitOperationRow>,
1790 owner_directives: Vec<OwnerDirectiveRow>,
1791 token_usage: Vec<TokenUsageRow>,
1792 watchdog_events: Vec<WatchdogEventRow>,
1793 tasks: Vec<TaskRow>,
1794 source_errors: Vec<SourceErrorRow>,
1795 iroh_events: Vec<IrohEventRow>,
1796}
1797
1798async fn configure_conn(conn: &Connection) -> Result<()> {
1799 conn.busy_timeout(Duration::from_secs(10))?;
1800 if user_version(conn).await? == SCHEMA_VERSION {
1801 return Ok(());
1802 }
1803 let mut wal = conn.query("PRAGMA journal_mode=WAL", ()).await?;
1804 while wal.next().await?.is_some() {}
1805 conn.execute_batch(
1806 "PRAGMA synchronous=NORMAL; \
1807 CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value INTEGER NOT NULL); \
1808 CREATE TABLE IF NOT EXISTS ids (name TEXT PRIMARY KEY, id INTEGER NOT NULL);",
1809 )
1810 .await?;
1811 Ok(())
1812}
1813
1814async fn configure_read_only_conn(conn: &Connection, path: &Path) -> Result<()> {
1815 conn.busy_timeout(Duration::from_secs(10))?;
1816 conn.execute("PRAGMA query_only = 1", ()).await?;
1817 if query_only(conn).await? != 1 {
1818 return Err(Error::ReadOnlyNotEnforced(path.to_path_buf()));
1819 }
1820 ensure_initialized(conn, path).await
1821}
1822
1823async fn ensure_initialized(conn: &Connection, path: &Path) -> Result<()> {
1824 if !table_exists(conn, "meta").await? {
1825 return Err(Error::NotInitialized(path.to_path_buf()));
1826 }
1827 let current = schema_version(conn).await?;
1828 if current == 0 {
1829 return Err(Error::NotInitialized(path.to_path_buf()));
1830 }
1831 if current > SCHEMA_VERSION {
1832 return Err(Error::FutureSchemaVersion {
1833 found: current,
1834 supported: SCHEMA_VERSION,
1835 });
1836 }
1837 Ok(())
1838}
1839
1840async fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
1841 let mut rows = conn
1842 .query(
1843 "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1",
1844 params_from_iter([Value::from(table)]),
1845 )
1846 .await?;
1847 Ok(rows.next().await?.is_some())
1848}
1849
1850async fn query_only(conn: &Connection) -> Result<i64> {
1851 let mut rows = conn.query("PRAGMA query_only", ()).await?;
1852 let Some(row) = rows.next().await? else {
1853 return Ok(0);
1854 };
1855 integer_value(&row.get_value(0)?)
1856}
1857
1858async fn user_version(conn: &Connection) -> Result<i64> {
1859 let mut rows = conn.query("PRAGMA user_version", ()).await?;
1860 let Some(row) = rows.next().await? else {
1861 return Ok(0);
1862 };
1863 integer_value(&row.get_value(0)?)
1864}
1865
1866async fn schema_version(conn: &Connection) -> Result<i64> {
1867 let mut rows = conn
1868 .query(
1869 "SELECT value FROM meta WHERE key = ?1",
1870 params_from_iter([Value::from("schema_version")]),
1871 )
1872 .await?;
1873 let Some(row) = rows.next().await? else {
1874 return Ok(0);
1875 };
1876 integer_value(&row.get_value(0)?)
1877}
1878
1879async fn migrate_token_usage_ingest_idempotency(
1880 path: &Path,
1881 conn: &Connection,
1882 current: i64,
1883) -> Result<()> {
1884 if current < 7 {
1885 backup_db_before_token_usage_dedupe(path)?;
1886 }
1887 dedupe_token_usage_claude_sessions_rows(conn).await?;
1888 ensure_token_usage_claude_sessions_unique_index(conn).await
1889}
1890
1891fn backup_db_before_token_usage_dedupe(path: &Path) -> Result<()> {
1892 if path == Path::new(":memory:") || !path.exists() {
1893 return Ok(());
1894 }
1895 let backup = path.with_extension("db.bak.v7-token-usage-dedupe");
1896 if backup.exists() {
1897 return Ok(());
1898 }
1899 fs::copy(path, backup)?;
1900 Ok(())
1901}
1902
1903async fn dedupe_token_usage_claude_sessions_rows(conn: &Connection) -> Result<()> {
1904 let sql = format!(
1905 "DELETE FROM token_usage \
1906 WHERE id IN ( \
1907 SELECT victim.id \
1908 FROM token_usage AS victim \
1909 JOIN ( \
1910 SELECT COALESCE(json_extract(row_json, '$.session_id'), '') AS session_id_key, \
1911 json_extract(json_extract(row_json, '$.detail_json'), '$.path') AS path_key, \
1912 json_extract(json_extract(row_json, '$.detail_json'), '$.line') AS line_key, \
1913 MIN(id) AS keep_id \
1914 FROM token_usage \
1915 WHERE {filter} \
1916 GROUP BY 1, 2, 3 \
1917 HAVING COUNT(*) > 1 \
1918 ) AS dup \
1919 ON COALESCE(json_extract(victim.row_json, '$.session_id'), '') = dup.session_id_key \
1920 AND json_extract(json_extract(victim.row_json, '$.detail_json'), '$.path') = dup.path_key \
1921 AND json_extract(json_extract(victim.row_json, '$.detail_json'), '$.line') = dup.line_key \
1922 WHERE {victim_filter} \
1923 AND victim.id <> dup.keep_id \
1924 )",
1925 filter = TOKEN_USAGE_CLAUDE_SESSIONS_FILTER,
1926 victim_filter = qualify_token_usage_filter("victim"),
1927 );
1928 conn.execute(&sql, ()).await?;
1929 Ok(())
1930}
1931
1932async fn ensure_token_usage_claude_sessions_unique_index(conn: &Connection) -> Result<()> {
1933 let sql = format!(
1934 "CREATE UNIQUE INDEX IF NOT EXISTS {TOKEN_USAGE_CLAUDE_SESSIONS_UNIQUE_INDEX} \
1935 ON token_usage ({TOKEN_USAGE_CLAUDE_SESSIONS_KEY}) \
1936 WHERE {TOKEN_USAGE_CLAUDE_SESSIONS_FILTER}"
1937 );
1938 conn.execute(&sql, ()).await?;
1939 Ok(())
1940}
1941
1942fn qualify_token_usage_filter(alias: &str) -> String {
1943 TOKEN_USAGE_CLAUDE_SESSIONS_FILTER.replace("row_json", &format!("{alias}.row_json"))
1944}
1945
1946async fn rows<T>(conn: &Connection, table: &str) -> Result<Vec<T>>
1947where
1948 T: DeserializeOwned,
1949{
1950 let sql = format!("SELECT row_json FROM {table} ORDER BY id");
1951 let mut out = Vec::new();
1952 let mut rows = conn.query(&sql, ()).await?;
1953 while let Some(row) = rows.next().await? {
1954 let json = text_value(&row.get_value(0)?)?;
1955 out.push(serde_json::from_str(&json)?);
1956 }
1957 Ok(out)
1958}
1959
1960fn integer_value(value: &Value) -> Result<i64> {
1961 value
1962 .as_integer()
1963 .copied()
1964 .ok_or_else(|| Error::NotFound("integer value".to_string()))
1965}
1966
1967fn text_value(value: &Value) -> Result<String> {
1968 value
1969 .as_text()
1970 .cloned()
1971 .ok_or_else(|| Error::NotFound("text value".to_string()))
1972}
1973
1974fn block_on_turso<T, Fut>(future: Fut) -> Result<T>
1975where
1976 T: Send + 'static,
1977 Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1978{
1979 if tokio::runtime::Handle::try_current().is_ok() {
1980 std::thread::spawn(move || Runtime::new()?.block_on(future))
1981 .join()
1982 .unwrap_or_else(|_| Err(Error::NotFound("turso worker thread".to_string())))
1983 } else {
1984 Runtime::new()?.block_on(future)
1985 }
1986}
1987
1988fn is_locking_error(error: &turso::Error) -> bool {
1989 let message = error.to_string();
1990 netsky_core::retry::is_lock_error_message(&message)
1991}
1992
1993fn retry_db_lock<T, F>(op: F) -> Result<T>
1994where
1995 F: FnMut() -> Result<T>,
1996{
1997 match netsky_core::retry::on_lock(op, netsky_core::retry::LockRetryPolicy::db_default()) {
1998 Ok(value) => Ok(value),
1999 Err(netsky_core::retry::LockRetryError::Operation(error)) => Err(error),
2000 Err(netsky_core::retry::LockRetryError::Exhausted {
2001 source,
2002 retries,
2003 total_wait,
2004 }) => Err(Error::Locked {
2005 retries,
2006 total_wait_ms: total_wait.as_millis(),
2007 detail: source.to_string(),
2008 }),
2009 }
2010}
2011
2012fn spool_write_error<T: Serialize>(table: &str, id: i64, record: &T, error: &Error) -> Result<()> {
2013 spool_error_json(serde_json::json!({
2014 "ts_utc": ts(Utc::now()),
2015 "table": table,
2016 "id": id,
2017 "error": error.to_string(),
2018 "record": record,
2019 }))
2020}
2021
2022fn spool_error_json(value: serde_json::Value) -> Result<()> {
2023 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
2024 let dir = home.join(".netsky").join("logs");
2025 fs::create_dir_all(&dir)?;
2026 let path = dir.join(format!(
2027 "meta-db-errors-{}.jsonl",
2028 Utc::now().format("%Y-%m-%d")
2029 ));
2030 if let Err(spool_error) = netsky_core::jsonl::append_json_line(&path, &value) {
2031 write_spool_failure_marker(&value, &spool_error)?;
2032 return Err(Error::Io(spool_error));
2033 }
2034 Ok(())
2035}
2036
2037fn write_spool_failure_marker(
2038 value: &serde_json::Value,
2039 spool_error: &std::io::Error,
2040) -> Result<()> {
2041 let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
2042 let dir = home.join(".netsky").join("state");
2043 fs::create_dir_all(&dir)?;
2044 let path = dir.join("meta-db-spool-failed");
2045 let body = serde_json::json!({
2046 "ts_utc": ts(Utc::now()),
2047 "spool_error": spool_error.to_string(),
2048 "record": value,
2049 });
2050 fs::write(path, serde_json::to_vec_pretty(&body)?)?;
2051 Ok(())
2052}
2053
2054fn referenced_storage_tables(sql: &str) -> Option<HashSet<&'static str>> {
2060 let lower = sql.to_ascii_lowercase();
2061 let mut out: HashSet<&'static str> = HashSet::new();
2062 for &(df_name, storage) in &DATAFUSION_TABLES {
2063 if contains_word(&lower, df_name) {
2064 out.insert(storage);
2065 }
2066 }
2067 if contains_word(&lower, CLONE_LIFETIMES_VIEW) {
2068 out.insert(CLONE_DISPATCHES);
2069 }
2070 if out.is_empty() { None } else { Some(out) }
2071}
2072
2073fn contains_word(haystack: &str, needle: &str) -> bool {
2078 if needle.is_empty() || needle.len() > haystack.len() {
2079 return false;
2080 }
2081 let hb = haystack.as_bytes();
2082 let nb = needle.as_bytes();
2083 let mut cursor = 0;
2084 while cursor + nb.len() <= hb.len() {
2085 let rest = &haystack[cursor..];
2086 let Some(pos) = rest.find(needle) else { break };
2087 let start = cursor + pos;
2088 let end = start + nb.len();
2089 let before_ok = start == 0 || !is_ident_byte(hb[start - 1]);
2090 let after_ok = end == hb.len() || !is_ident_byte(hb[end]);
2091 if before_ok && after_ok {
2092 return true;
2093 }
2094 cursor = start + 1;
2095 }
2096 false
2097}
2098
2099fn is_ident_byte(b: u8) -> bool {
2100 b.is_ascii_alphanumeric() || b == b'_'
2101}
2102
2103fn register(ctx: &SessionContext, name: &str, batch: RecordBatch) -> Result<()> {
2104 let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
2105 ctx.register_table(name, Arc::new(table))?;
2106 Ok(())
2107}
2108
2109async fn register_snapshot(snapshot: Snapshot) -> Result<SessionContext> {
2110 let ctx = SessionContext::new();
2111 register(&ctx, "messages", messages_batch(snapshot.messages)?)?;
2112 register(
2113 &ctx,
2114 "cli_invocations",
2115 cli_batch(snapshot.cli_invocations)?,
2116 )?;
2117 register(&ctx, "crashes", crashes_batch(snapshot.crashes)?)?;
2118 register(&ctx, "ticks", ticks_batch(snapshot.ticks)?)?;
2119 register(&ctx, "workspaces", workspaces_batch(snapshot.workspaces)?)?;
2120 register(&ctx, "sessions", sessions_batch(snapshot.sessions)?)?;
2121 register(
2122 &ctx,
2123 "clone_dispatches",
2124 clone_dispatches_batch(snapshot.clone_dispatches)?,
2125 )?;
2126 register(
2127 &ctx,
2128 "harvest_events",
2129 harvest_events_batch(snapshot.harvest_events)?,
2130 )?;
2131 register(
2132 &ctx,
2133 "communication_events",
2134 communication_events_batch(snapshot.communication_events)?,
2135 )?;
2136 register(
2137 &ctx,
2138 "mcp_tool_calls",
2139 mcp_tool_calls_batch(snapshot.mcp_tool_calls)?,
2140 )?;
2141 register(
2142 &ctx,
2143 "git_operations",
2144 git_operations_batch(snapshot.git_operations)?,
2145 )?;
2146 register(
2147 &ctx,
2148 "owner_directives",
2149 owner_directives_batch(snapshot.owner_directives)?,
2150 )?;
2151 register(
2152 &ctx,
2153 "token_usage",
2154 token_usage_batch(snapshot.token_usage)?,
2155 )?;
2156 register(
2157 &ctx,
2158 "watchdog_events",
2159 watchdog_events_batch(snapshot.watchdog_events)?,
2160 )?;
2161 register(&ctx, "tasks", tasks_batch(snapshot.tasks)?)?;
2162 register(
2163 &ctx,
2164 "source_errors",
2165 source_errors_batch(snapshot.source_errors)?,
2166 )?;
2167 register(
2168 &ctx,
2169 "iroh_events",
2170 iroh_events_batch(snapshot.iroh_events)?,
2171 )?;
2172 ctx.sql(
2173 "CREATE VIEW clone_lifetimes AS \
2174 SELECT id, agent_id, runtime, workspace, branch, status, \
2175 ts_utc_start, ts_utc_end \
2176 FROM clone_dispatches \
2177 WHERE ts_utc_end IS NOT NULL",
2178 )
2179 .await?;
2180 Ok(ctx)
2181}
2182
2183fn messages_batch(rows: Vec<MessageRow>) -> Result<RecordBatch> {
2184 Ok(RecordBatch::try_new(
2185 schema(&[
2186 ("id", DataType::Int64, false),
2187 ("ts_utc", DataType::Utf8, false),
2188 ("source", DataType::Utf8, false),
2189 ("direction", DataType::Utf8, false),
2190 ("chat_id", DataType::Utf8, true),
2191 ("from_agent", DataType::Utf8, true),
2192 ("to_agent", DataType::Utf8, true),
2193 ("body", DataType::Utf8, true),
2194 ("raw_json", DataType::Utf8, true),
2195 ]),
2196 vec![
2197 ids(&rows, |r| r.id),
2198 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2199 string(rows.iter().map(|r| Some(r.source.as_str()))),
2200 string(rows.iter().map(|r| Some(r.direction.as_str()))),
2201 opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2202 opt_string(rows.iter().map(|r| r.from_agent.as_deref())),
2203 opt_string(rows.iter().map(|r| r.to_agent.as_deref())),
2204 opt_string(rows.iter().map(|r| r.body.as_deref())),
2205 opt_string(rows.iter().map(|r| r.raw_json.as_deref())),
2206 ],
2207 )?)
2208}
2209
2210fn cli_batch(rows: Vec<CliRow>) -> Result<RecordBatch> {
2211 Ok(RecordBatch::try_new(
2212 schema(&[
2213 ("id", DataType::Int64, false),
2214 ("ts_utc", DataType::Utf8, false),
2215 ("bin", DataType::Utf8, false),
2216 ("argv_json", DataType::Utf8, false),
2217 ("exit_code", DataType::Int64, true),
2218 ("duration_ms", DataType::Int64, true),
2219 ("host", DataType::Utf8, false),
2220 ]),
2221 vec![
2222 ids(&rows, |r| r.id),
2223 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2224 string(rows.iter().map(|r| Some(r.bin.as_str()))),
2225 string(rows.iter().map(|r| Some(r.argv_json.as_str()))),
2226 int64_opt(rows.iter().map(|r| r.exit_code)),
2227 int64_opt(rows.iter().map(|r| r.duration_ms)),
2228 string(rows.iter().map(|r| Some(r.host.as_str()))),
2229 ],
2230 )?)
2231}
2232
2233fn crashes_batch(rows: Vec<CrashRow>) -> Result<RecordBatch> {
2234 simple_event_batch(
2235 schema(&[
2236 ("id", DataType::Int64, false),
2237 ("ts_utc", DataType::Utf8, false),
2238 ("kind", DataType::Utf8, false),
2239 ("agent", DataType::Utf8, false),
2240 ("detail_json", DataType::Utf8, false),
2241 ]),
2242 rows.iter()
2243 .map(|r| (r.id, &r.ts_utc, &r.kind, &r.agent, &r.detail_json)),
2244 )
2245}
2246
2247fn ticks_batch(rows: Vec<TickRow>) -> Result<RecordBatch> {
2248 Ok(RecordBatch::try_new(
2249 schema(&[
2250 ("id", DataType::Int64, false),
2251 ("ts_utc", DataType::Utf8, false),
2252 ("source", DataType::Utf8, false),
2253 ("detail_json", DataType::Utf8, false),
2254 ]),
2255 vec![
2256 ids(&rows, |r| r.id),
2257 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2258 string(rows.iter().map(|r| Some(r.source.as_str()))),
2259 string(rows.iter().map(|r| Some(r.detail_json.as_str()))),
2260 ],
2261 )?)
2262}
2263
2264fn workspaces_batch(rows: Vec<WorkspaceRow>) -> Result<RecordBatch> {
2265 Ok(RecordBatch::try_new(
2266 schema(&[
2267 ("id", DataType::Int64, false),
2268 ("ts_utc_created", DataType::Utf8, false),
2269 ("name", DataType::Utf8, false),
2270 ("branch", DataType::Utf8, false),
2271 ("ts_utc_deleted", DataType::Utf8, true),
2272 ("verdict", DataType::Utf8, true),
2273 ]),
2274 vec![
2275 ids(&rows, |r| r.id),
2276 string(rows.iter().map(|r| Some(r.ts_utc_created.as_str()))),
2277 string(rows.iter().map(|r| Some(r.name.as_str()))),
2278 string(rows.iter().map(|r| Some(r.branch.as_str()))),
2279 opt_string(rows.iter().map(|r| r.ts_utc_deleted.as_deref())),
2280 opt_string(rows.iter().map(|r| r.verdict.as_deref())),
2281 ],
2282 )?)
2283}
2284
2285fn sessions_batch(rows: Vec<SessionRow>) -> Result<RecordBatch> {
2286 Ok(RecordBatch::try_new(
2287 schema(&[
2288 ("id", DataType::Int64, false),
2289 ("ts_utc", DataType::Utf8, false),
2290 ("agent", DataType::Utf8, false),
2291 ("session_num", DataType::Int64, false),
2292 ("event", DataType::Utf8, false),
2293 ]),
2294 vec![
2295 ids(&rows, |r| r.id),
2296 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2297 string(rows.iter().map(|r| Some(r.agent.as_str()))),
2298 int64(rows.iter().map(|r| r.session_num)),
2299 string(rows.iter().map(|r| Some(r.event.as_str()))),
2300 ],
2301 )?)
2302}
2303
2304fn clone_dispatches_batch(rows: Vec<CloneDispatchRow>) -> Result<RecordBatch> {
2305 Ok(RecordBatch::try_new(
2306 schema(&[
2307 ("id", DataType::Int64, false),
2308 ("ts_utc_start", DataType::Utf8, false),
2309 ("ts_utc_end", DataType::Utf8, true),
2310 ("agent_id", DataType::Utf8, false),
2311 ("runtime", DataType::Utf8, true),
2312 ("brief_path", DataType::Utf8, true),
2313 ("brief", DataType::Utf8, true),
2314 ("workspace", DataType::Utf8, true),
2315 ("branch", DataType::Utf8, true),
2316 ("status", DataType::Utf8, true),
2317 ("exit_code", DataType::Int64, true),
2318 ("detail_json", DataType::Utf8, true),
2319 ]),
2320 vec![
2321 ids(&rows, |r| r.id),
2322 string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2323 opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2324 string(rows.iter().map(|r| Some(r.agent_id.as_str()))),
2325 opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2326 opt_string(rows.iter().map(|r| r.brief_path.as_deref())),
2327 opt_string(rows.iter().map(|r| r.brief.as_deref())),
2328 opt_string(rows.iter().map(|r| r.workspace.as_deref())),
2329 opt_string(rows.iter().map(|r| r.branch.as_deref())),
2330 opt_string(rows.iter().map(|r| r.status.as_deref())),
2331 int64_opt(rows.iter().map(|r| r.exit_code)),
2332 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2333 ],
2334 )?)
2335}
2336
2337fn harvest_events_batch(rows: Vec<HarvestEventRow>) -> Result<RecordBatch> {
2338 Ok(RecordBatch::try_new(
2339 schema(&[
2340 ("id", DataType::Int64, false),
2341 ("ts_utc", DataType::Utf8, false),
2342 ("source_branch", DataType::Utf8, false),
2343 ("target_branch", DataType::Utf8, false),
2344 ("commit_sha", DataType::Utf8, true),
2345 ("status", DataType::Utf8, false),
2346 ("conflicts", DataType::Utf8, true),
2347 ("detail_json", DataType::Utf8, true),
2348 ]),
2349 vec![
2350 ids(&rows, |r| r.id),
2351 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2352 string(rows.iter().map(|r| Some(r.source_branch.as_str()))),
2353 string(rows.iter().map(|r| Some(r.target_branch.as_str()))),
2354 opt_string(rows.iter().map(|r| r.commit_sha.as_deref())),
2355 string(rows.iter().map(|r| Some(r.status.as_str()))),
2356 opt_string(rows.iter().map(|r| r.conflicts.as_deref())),
2357 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2358 ],
2359 )?)
2360}
2361
2362fn communication_events_batch(rows: Vec<CommunicationEventRow>) -> Result<RecordBatch> {
2363 Ok(RecordBatch::try_new(
2364 schema(&[
2365 ("id", DataType::Int64, false),
2366 ("ts_utc", DataType::Utf8, false),
2367 ("source", DataType::Utf8, false),
2368 ("tool", DataType::Utf8, true),
2369 ("direction", DataType::Utf8, false),
2370 ("chat_id", DataType::Utf8, true),
2371 ("message_id", DataType::Utf8, true),
2372 ("handle", DataType::Utf8, true),
2373 ("agent", DataType::Utf8, true),
2374 ("body", DataType::Utf8, true),
2375 ("status", DataType::Utf8, true),
2376 ("detail_json", DataType::Utf8, true),
2377 ]),
2378 vec![
2379 ids(&rows, |r| r.id),
2380 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2381 string(rows.iter().map(|r| Some(r.source.as_str()))),
2382 opt_string(rows.iter().map(|r| r.tool.as_deref())),
2383 string(rows.iter().map(|r| Some(r.direction.as_str()))),
2384 opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2385 opt_string(rows.iter().map(|r| r.message_id.as_deref())),
2386 opt_string(rows.iter().map(|r| r.handle.as_deref())),
2387 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2388 opt_string(rows.iter().map(|r| r.body.as_deref())),
2389 opt_string(rows.iter().map(|r| r.status.as_deref())),
2390 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2391 ],
2392 )?)
2393}
2394
2395fn mcp_tool_calls_batch(rows: Vec<McpToolCallRow>) -> Result<RecordBatch> {
2396 Ok(RecordBatch::try_new(
2397 schema(&[
2398 ("id", DataType::Int64, false),
2399 ("ts_utc_start", DataType::Utf8, false),
2400 ("ts_utc_end", DataType::Utf8, true),
2401 ("source", DataType::Utf8, false),
2402 ("tool", DataType::Utf8, false),
2403 ("agent", DataType::Utf8, true),
2404 ("duration_ms", DataType::Int64, true),
2405 ("success", DataType::Boolean, false),
2406 ("error", DataType::Utf8, true),
2407 ("timeout_race", DataType::Boolean, false),
2408 ("request_json", DataType::Utf8, true),
2409 ("response_json", DataType::Utf8, true),
2410 ]),
2411 vec![
2412 ids(&rows, |r| r.id),
2413 string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2414 opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2415 string(rows.iter().map(|r| Some(r.source.as_str()))),
2416 string(rows.iter().map(|r| Some(r.tool.as_str()))),
2417 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2418 int64_opt(rows.iter().map(|r| r.duration_ms)),
2419 bools(rows.iter().map(|r| r.success)),
2420 opt_string(rows.iter().map(|r| r.error.as_deref())),
2421 bools(rows.iter().map(|r| r.timeout_race)),
2422 opt_string(rows.iter().map(|r| r.request_json.as_deref())),
2423 opt_string(rows.iter().map(|r| r.response_json.as_deref())),
2424 ],
2425 )?)
2426}
2427
2428fn git_operations_batch(rows: Vec<GitOperationRow>) -> Result<RecordBatch> {
2429 Ok(RecordBatch::try_new(
2430 schema(&[
2431 ("id", DataType::Int64, false),
2432 ("ts_utc", DataType::Utf8, false),
2433 ("operation", DataType::Utf8, false),
2434 ("repo", DataType::Utf8, false),
2435 ("branch", DataType::Utf8, true),
2436 ("remote", DataType::Utf8, true),
2437 ("from_sha", DataType::Utf8, true),
2438 ("to_sha", DataType::Utf8, true),
2439 ("status", DataType::Utf8, false),
2440 ("detail_json", DataType::Utf8, true),
2441 ]),
2442 vec![
2443 ids(&rows, |r| r.id),
2444 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2445 string(rows.iter().map(|r| Some(r.operation.as_str()))),
2446 string(rows.iter().map(|r| Some(r.repo.as_str()))),
2447 opt_string(rows.iter().map(|r| r.branch.as_deref())),
2448 opt_string(rows.iter().map(|r| r.remote.as_deref())),
2449 opt_string(rows.iter().map(|r| r.from_sha.as_deref())),
2450 opt_string(rows.iter().map(|r| r.to_sha.as_deref())),
2451 string(rows.iter().map(|r| Some(r.status.as_str()))),
2452 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2453 ],
2454 )?)
2455}
2456
2457fn owner_directives_batch(rows: Vec<OwnerDirectiveRow>) -> Result<RecordBatch> {
2458 Ok(RecordBatch::try_new(
2459 schema(&[
2460 ("id", DataType::Int64, false),
2461 ("ts_utc", DataType::Utf8, false),
2462 ("source", DataType::Utf8, false),
2463 ("chat_id", DataType::Utf8, true),
2464 ("raw_text", DataType::Utf8, false),
2465 ("resolved_action", DataType::Utf8, true),
2466 ("agent", DataType::Utf8, true),
2467 ("status", DataType::Utf8, true),
2468 ("detail_json", DataType::Utf8, true),
2469 ]),
2470 vec![
2471 ids(&rows, |r| r.id),
2472 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2473 string(rows.iter().map(|r| Some(r.source.as_str()))),
2474 opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2475 string(rows.iter().map(|r| Some(r.raw_text.as_str()))),
2476 opt_string(rows.iter().map(|r| r.resolved_action.as_deref())),
2477 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2478 opt_string(rows.iter().map(|r| r.status.as_deref())),
2479 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2480 ],
2481 )?)
2482}
2483
2484fn token_usage_batch(rows: Vec<TokenUsageRow>) -> Result<RecordBatch> {
2485 Ok(RecordBatch::try_new(
2486 schema(&[
2487 ("id", DataType::Int64, false),
2488 ("ts_utc", DataType::Utf8, false),
2489 ("session_id", DataType::Utf8, true),
2490 ("agent", DataType::Utf8, true),
2491 ("runtime", DataType::Utf8, true),
2492 ("model", DataType::Utf8, true),
2493 ("input_tokens", DataType::Int64, true),
2494 ("output_tokens", DataType::Int64, true),
2495 ("cached_input_tokens", DataType::Int64, true),
2496 ("cost_usd_micros", DataType::Int64, true),
2497 ("detail_json", DataType::Utf8, true),
2498 ]),
2499 vec![
2500 ids(&rows, |r| r.id),
2501 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2502 opt_string(rows.iter().map(|r| r.session_id.as_deref())),
2503 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2504 opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2505 opt_string(rows.iter().map(|r| r.model.as_deref())),
2506 int64_opt(rows.iter().map(|r| r.input_tokens)),
2507 int64_opt(rows.iter().map(|r| r.output_tokens)),
2508 int64_opt(rows.iter().map(|r| r.cached_input_tokens)),
2509 int64_opt(rows.iter().map(|r| r.cost_usd_micros)),
2510 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2511 ],
2512 )?)
2513}
2514
2515fn source_errors_batch(rows: Vec<SourceErrorRow>) -> Result<RecordBatch> {
2516 Ok(RecordBatch::try_new(
2517 schema(&[
2518 ("id", DataType::Int64, false),
2519 ("ts_utc", DataType::Utf8, false),
2520 ("source", DataType::Utf8, false),
2521 ("error_class", DataType::Utf8, false),
2522 ("count", DataType::Int64, false),
2523 ("detail_json", DataType::Utf8, true),
2524 ]),
2525 vec![
2526 ids(&rows, |r| r.id),
2527 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2528 string(rows.iter().map(|r| Some(r.source.as_str()))),
2529 string(rows.iter().map(|r| Some(r.error_class.as_str()))),
2530 int64(rows.iter().map(|r| r.count)),
2531 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2532 ],
2533 )?)
2534}
2535
2536fn iroh_events_batch(rows: Vec<IrohEventRow>) -> Result<RecordBatch> {
2537 Ok(RecordBatch::try_new(
2538 schema(&[
2539 ("id", DataType::Int64, false),
2540 ("ts_utc", DataType::Utf8, false),
2541 ("event_type", DataType::Utf8, false),
2542 ("peer_id_hash", DataType::Utf8, false),
2543 ("peer_label", DataType::Utf8, true),
2544 ("detail_json", DataType::Utf8, true),
2545 ]),
2546 vec![
2547 ids(&rows, |r| r.id),
2548 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2549 string(rows.iter().map(|r| Some(r.event_type.as_str()))),
2550 string(rows.iter().map(|r| Some(r.peer_id_hash.as_str()))),
2551 opt_string(rows.iter().map(|r| r.peer_label.as_deref())),
2552 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2553 ],
2554 )?)
2555}
2556
2557fn watchdog_events_batch(rows: Vec<WatchdogEventRow>) -> Result<RecordBatch> {
2558 Ok(RecordBatch::try_new(
2559 schema(&[
2560 ("id", DataType::Int64, false),
2561 ("ts_utc", DataType::Utf8, false),
2562 ("event", DataType::Utf8, false),
2563 ("agent", DataType::Utf8, true),
2564 ("severity", DataType::Utf8, true),
2565 ("status", DataType::Utf8, true),
2566 ("detail_json", DataType::Utf8, true),
2567 ]),
2568 vec![
2569 ids(&rows, |r| r.id),
2570 string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2571 string(rows.iter().map(|r| Some(r.event.as_str()))),
2572 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2573 opt_string(rows.iter().map(|r| r.severity.as_deref())),
2574 opt_string(rows.iter().map(|r| r.status.as_deref())),
2575 opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2576 ],
2577 )?)
2578}
2579
2580fn tasks_batch(rows: Vec<TaskRow>) -> Result<RecordBatch> {
2581 Ok(RecordBatch::try_new(
2582 schema(&[
2583 ("id", DataType::Int64, false),
2584 ("created_at", DataType::Utf8, false),
2585 ("updated_at", DataType::Utf8, false),
2586 ("title", DataType::Utf8, false),
2587 ("body", DataType::Utf8, true),
2588 ("status", DataType::Utf8, false),
2589 ("priority", DataType::Utf8, true),
2590 ("labels", DataType::Utf8, true),
2591 ("source", DataType::Utf8, true),
2592 ("source_ref", DataType::Utf8, true),
2593 ("closed_at", DataType::Utf8, true),
2594 ("closed_reason", DataType::Utf8, true),
2595 ("closed_evidence", DataType::Utf8, true),
2596 ("agent", DataType::Utf8, true),
2597 ("sync_calendar", DataType::Boolean, false),
2598 ("calendar_task_id", DataType::Utf8, true),
2599 ]),
2600 vec![
2601 ids(&rows, |r| r.id),
2602 string(rows.iter().map(|r| Some(r.created_at.as_str()))),
2603 string(rows.iter().map(|r| Some(r.updated_at.as_str()))),
2604 string(rows.iter().map(|r| Some(r.title.as_str()))),
2605 opt_string(rows.iter().map(|r| r.body.as_deref())),
2606 string(rows.iter().map(|r| Some(r.status.as_str()))),
2607 opt_string(rows.iter().map(|r| r.priority.as_deref())),
2608 opt_string(rows.iter().map(|r| r.labels.as_deref())),
2609 opt_string(rows.iter().map(|r| r.source.as_deref())),
2610 opt_string(rows.iter().map(|r| r.source_ref.as_deref())),
2611 opt_string(rows.iter().map(|r| r.closed_at.as_deref())),
2612 opt_string(rows.iter().map(|r| r.closed_reason.as_deref())),
2613 opt_string(rows.iter().map(|r| r.closed_evidence.as_deref())),
2614 opt_string(rows.iter().map(|r| r.agent.as_deref())),
2615 bools(rows.iter().map(|r| r.sync_calendar)),
2616 opt_string(rows.iter().map(|r| r.calendar_task_id.as_deref())),
2617 ],
2618 )?)
2619}
2620
2621fn simple_event_batch<'a>(
2622 schema: SchemaRef,
2623 rows: impl Iterator<Item = (i64, &'a String, &'a String, &'a String, &'a String)>,
2624) -> Result<RecordBatch> {
2625 let rows = rows.collect::<Vec<_>>();
2626 Ok(RecordBatch::try_new(
2627 schema,
2628 vec![
2629 Arc::new(Int64Array::from(
2630 rows.iter().map(|r| r.0).collect::<Vec<_>>(),
2631 )),
2632 string(rows.iter().map(|r| Some(r.1.as_str()))),
2633 string(rows.iter().map(|r| Some(r.2.as_str()))),
2634 string(rows.iter().map(|r| Some(r.3.as_str()))),
2635 string(rows.iter().map(|r| Some(r.4.as_str()))),
2636 ],
2637 )?)
2638}
2639
2640fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
2641 Arc::new(Schema::new(
2642 fields
2643 .iter()
2644 .map(|(name, ty, nullable)| Field::new(*name, ty.clone(), *nullable))
2645 .collect::<Vec<_>>(),
2646 ))
2647}
2648
2649fn ids<T>(rows: &[T], id: impl Fn(&T) -> i64) -> Arc<Int64Array> {
2650 int64(rows.iter().map(id))
2651}
2652
2653fn int64(values: impl Iterator<Item = i64>) -> Arc<Int64Array> {
2654 Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2655}
2656
2657fn int64_opt(values: impl Iterator<Item = Option<i64>>) -> Arc<Int64Array> {
2658 Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2659}
2660
2661fn bools(values: impl Iterator<Item = bool>) -> Arc<BooleanArray> {
2662 Arc::new(BooleanArray::from(values.collect::<Vec<_>>()))
2663}
2664
2665fn string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2666 Arc::new(StringArray::from(values.collect::<Vec<_>>()))
2667}
2668
2669fn opt_string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2670 string(values)
2671}
2672
2673fn ts(ts_utc: DateTime<Utc>) -> String {
2674 ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
2675}
2676
2677fn parse_ts(ts_utc: &str) -> Result<DateTime<Utc>> {
2678 Ok(DateTime::parse_from_rfc3339(ts_utc)?.with_timezone(&Utc))
2679}
2680
2681fn truncate(value: &str, max_chars: usize) -> String {
2682 value.chars().take(max_chars).collect()
2683}
2684
2685fn truncate_opt(value: Option<&str>, max_chars: usize) -> Option<String> {
2686 value.map(|s| truncate(s, max_chars))
2687}
2688
2689#[cfg(test)]
2690mod tests {
2691 use super::*;
2692
2693 fn db() -> Result<Db> {
2694 let db = Db::open_in_memory()?;
2695 db.migrate()?;
2696 Ok(db)
2697 }
2698
2699 fn fixed_ts() -> DateTime<Utc> {
2700 chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
2701 .unwrap()
2702 .with_timezone(&Utc)
2703 }
2704
2705 #[test]
2706 fn referenced_tables_extracts_scope() {
2707 let scope =
2708 referenced_storage_tables("SELECT COUNT(*) FROM messages WHERE ts_utc >= '2026-04-18'")
2709 .expect("single-table scope");
2710 assert!(scope.contains(MESSAGES));
2711 assert!(!scope.contains(CRASHES));
2712 assert_eq!(scope.len(), 1);
2713 }
2714
2715 #[test]
2716 fn referenced_tables_handles_alias_and_view() {
2717 let scope = referenced_storage_tables(
2718 "SELECT ts_utc_start FROM clone_lifetimes WHERE status IS NOT NULL",
2719 )
2720 .expect("view resolves to backing table");
2721 assert!(scope.contains(CLONE_DISPATCHES));
2722 }
2723
2724 #[test]
2725 fn referenced_tables_maps_datafusion_name_to_storage() {
2726 let scope =
2727 referenced_storage_tables("SELECT title FROM tasks").expect("tasks -> netsky_tasks");
2728 assert!(scope.contains(TASKS));
2729 assert!(!scope.contains("tasks"));
2730 }
2731
2732 #[test]
2733 fn referenced_tables_unknown_sql_returns_none() {
2734 assert!(referenced_storage_tables("SELECT 1").is_none());
2735 }
2736
2737 #[test]
2738 fn referenced_tables_join_captures_both() {
2739 let scope = referenced_storage_tables(
2740 "SELECT a.id FROM messages a JOIN crashes b ON a.ts_utc = b.ts_utc",
2741 )
2742 .expect("two tables");
2743 assert!(scope.contains(MESSAGES));
2744 assert!(scope.contains(CRASHES));
2745 }
2746
2747 #[test]
2748 fn list_tasks_pushes_filter_and_order_into_sql() -> Result<()> {
2749 let db = db()?;
2750 let mk = |title: &'static str, status: &'static str, priority: &'static str| {
2751 db.record_task(TaskRecord {
2752 title,
2753 body: None,
2754 status,
2755 priority: Some(priority),
2756 labels: None,
2757 source: None,
2758 source_ref: None,
2759 closed_at: None,
2760 closed_reason: None,
2761 closed_evidence: None,
2762 agent: None,
2763 sync_calendar: false,
2764 calendar_task_id: None,
2765 })
2766 };
2767 mk("c", "open", "p2")?;
2768 mk("a", "open", "p1")?;
2769 mk("b", "in_progress", "p1")?;
2770 mk("d", "closed", "p3")?;
2771
2772 let all = db.list_tasks(None, None)?;
2773 let order: Vec<_> = all.iter().map(|r| r.title.as_str()).collect();
2774 assert_eq!(
2775 order,
2776 vec!["d", "b", "a", "c"],
2777 "status then priority then id"
2778 );
2779
2780 let open_only = db.list_tasks(Some("open"), None)?;
2781 assert_eq!(open_only.len(), 2);
2782 assert!(open_only.iter().all(|r| r.status == "open"));
2783
2784 let p1 = db.list_tasks(None, Some("p1"))?;
2785 assert_eq!(p1.len(), 2);
2786 assert!(p1.iter().all(|r| r.priority.as_deref() == Some("p1")));
2787
2788 let open_p1 = db.list_tasks(Some("open"), Some("p1"))?;
2789 assert_eq!(open_p1.len(), 1);
2790 assert_eq!(open_p1[0].title, "a");
2791 Ok(())
2792 }
2793
2794 #[test]
2795 fn migrate_installs_expression_indexes_for_timed_tables() -> Result<()> {
2796 let db = db()?;
2797 let names = db.with_conn(|conn| async move {
2798 let mut rows = conn
2799 .query("SELECT name FROM sqlite_master WHERE type = 'index'", ())
2800 .await?;
2801 let mut out = Vec::new();
2802 while let Some(row) = rows.next().await? {
2803 out.push(text_value(&row.get_value(0)?)?);
2804 }
2805 Ok(out)
2806 })?;
2807 for (table, column) in INDEXED_TIME_COLUMNS {
2808 let expected = format!("idx_{table}_{column}");
2809 assert!(
2810 names.iter().any(|n| n == &expected),
2811 "missing index {expected} in {names:?}"
2812 );
2813 }
2814 for (table, column) in INDEXED_EQ_COLUMNS {
2815 let expected = format!("idx_{table}_{column}");
2816 assert!(
2817 names.iter().any(|n| n == &expected),
2818 "missing index {expected} in {names:?}"
2819 );
2820 }
2821 Ok(())
2822 }
2823
2824 #[test]
2825 fn contains_word_respects_identifier_boundary() {
2826 assert!(!contains_word("select from netsky_tasks", "tasks"));
2827 assert!(contains_word("select from tasks where", "tasks"));
2828 assert!(contains_word("FROM tasks;", "tasks"));
2829 assert!(!contains_word("classes", "class"));
2830 }
2831
2832 #[test]
2833 fn message_round_trip() -> Result<()> {
2834 let db = db()?;
2835 let id = db.record_message(MessageRecord {
2836 ts_utc: fixed_ts(),
2837 source: "imessage",
2838 direction: Direction::Inbound,
2839 chat_id: Some("chat-1"),
2840 from_agent: Some("agent9"),
2841 to_agent: None,
2842 body: Some("hello"),
2843 raw_json: Some("{\"chat_id\":\"chat-1\"}"),
2844 })?;
2845 let out = db.query("SELECT id, ts_utc, source, direction, chat_id, body FROM messages")?;
2846 assert_eq!(id, 1);
2847 assert!(out.contains("2026-04-15T23:00:00.000Z"));
2848 assert!(out.contains("imessage"));
2849 assert!(out.contains("inbound"));
2850 assert!(out.contains("chat-1"));
2851 assert!(out.contains("hello"));
2852 Ok(())
2853 }
2854
2855 #[test]
2856 fn cli_round_trip() -> Result<()> {
2857 let db = db()?;
2858 let id = db.record_cli(
2859 fixed_ts(),
2860 "netsky",
2861 "[\"db\", \"migrate\"]",
2862 Some(0),
2863 Some(12),
2864 "host-a",
2865 )?;
2866 let out = db.query("SELECT bin, exit_code, duration_ms, host FROM cli_invocations")?;
2867 assert_eq!(id, 1);
2868 assert!(out.contains("netsky"));
2869 assert!(out.contains("host-a"));
2870 assert!(out.contains("12"));
2871 Ok(())
2872 }
2873
2874 #[test]
2875 fn crash_round_trip() -> Result<()> {
2876 let db = db()?;
2877 let id = db.record_crash(fixed_ts(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
2878 let out = db.query("SELECT kind, agent, detail_json FROM crashes")?;
2879 assert_eq!(id, 1);
2880 assert!(out.contains("panic"));
2881 assert!(out.contains("agent8"));
2882 assert!(out.contains("wedged"));
2883 Ok(())
2884 }
2885
2886 #[test]
2887 fn tick_round_trip() -> Result<()> {
2888 let db = db()?;
2889 let id = db.record_tick(fixed_ts(), "ticker", "{\"beat\":1}")?;
2890 let out = db.query("SELECT source, detail_json FROM ticks")?;
2891 assert_eq!(id, 1);
2892 assert!(out.contains("ticker"));
2893 assert!(out.contains("beat"));
2894 Ok(())
2895 }
2896
2897 #[test]
2898 fn workspace_round_trip() -> Result<()> {
2899 let db = db()?;
2900 let created = fixed_ts();
2901 let deleted = created + chrono::Duration::minutes(5);
2902 let id = db.record_workspace(
2903 created,
2904 "session8",
2905 "feat/netsky-db-v0",
2906 Some(deleted),
2907 Some("kept"),
2908 )?;
2909 let out = db.query("SELECT name, branch, verdict FROM workspaces")?;
2910 assert_eq!(id, 1);
2911 assert!(out.contains("session8"));
2912 assert!(out.contains("feat/netsky-db-v0"));
2913 assert!(out.contains("kept"));
2914 Ok(())
2915 }
2916
2917 #[test]
2918 fn session_round_trip() -> Result<()> {
2919 let db = db()?;
2920 let id = db.record_session(fixed_ts(), "agent8", 8, SessionEvent::Note)?;
2921 let out = db.query("SELECT agent, session_num, event FROM sessions")?;
2922 assert_eq!(id, 1);
2923 assert!(out.contains("agent8"));
2924 assert!(out.contains("note"));
2925 Ok(())
2926 }
2927
2928 #[test]
2929 fn v2_tables_round_trip() -> Result<()> {
2930 let db = db()?;
2931 db.record_clone_dispatch(CloneDispatchRecord {
2932 ts_utc_start: fixed_ts(),
2933 ts_utc_end: Some(fixed_ts()),
2934 agent_id: "agent3",
2935 runtime: Some("codex"),
2936 brief_path: Some("briefs/foo.md"),
2937 brief: Some("do the thing"),
2938 workspace: Some("workspaces/foo/repo"),
2939 branch: Some("feat/foo"),
2940 status: Some("done"),
2941 exit_code: Some(0),
2942 detail_json: Some("{\"ok\":true}"),
2943 })?;
2944 db.record_harvest_event(HarvestEventRecord {
2945 ts_utc: fixed_ts(),
2946 source_branch: "feat/foo",
2947 target_branch: "main",
2948 commit_sha: Some("abc123"),
2949 status: "clean",
2950 conflicts: None,
2951 detail_json: None,
2952 })?;
2953 db.record_communication_event(CommunicationEventRecord {
2954 ts_utc: fixed_ts(),
2955 source: "imessage",
2956 tool: Some("reply"),
2957 direction: Direction::Outbound,
2958 chat_id: Some("chat-1"),
2959 message_id: Some("msg-1"),
2960 handle: Some("+15551234567"),
2961 agent: Some("agent0"),
2962 body: Some("sent"),
2963 status: Some("ok"),
2964 detail_json: None,
2965 })?;
2966 let comms = db.list_communication_events()?;
2967 assert_eq!(comms.len(), 1);
2968 assert_eq!(comms[0].source, "imessage");
2969 assert_eq!(comms[0].agent.as_deref(), Some("agent0"));
2970 db.record_mcp_tool_call(McpToolCallRecord {
2971 ts_utc_start: fixed_ts(),
2972 ts_utc_end: Some(fixed_ts()),
2973 source: "drive",
2974 tool: "list_files",
2975 agent: Some("agent0"),
2976 duration_ms: Some(22),
2977 success: true,
2978 error: None,
2979 timeout_race: false,
2980 request_json: Some("{}"),
2981 response_json: Some("[]"),
2982 })?;
2983 db.record_git_operation(GitOperationRecord {
2984 ts_utc: fixed_ts(),
2985 operation: "push",
2986 repo: "netsky",
2987 branch: Some("feat/foo"),
2988 remote: Some("origin"),
2989 from_sha: Some("abc"),
2990 to_sha: Some("def"),
2991 status: "ok",
2992 detail_json: None,
2993 })?;
2994 db.record_owner_directive(OwnerDirectiveRecord {
2995 ts_utc: fixed_ts(),
2996 source: "imessage",
2997 chat_id: Some("chat-1"),
2998 raw_text: "ship it",
2999 resolved_action: Some("dispatch"),
3000 agent: Some("agent0"),
3001 status: Some("accepted"),
3002 detail_json: None,
3003 })?;
3004 db.record_token_usage(TokenUsageRecord {
3005 ts_utc: fixed_ts(),
3006 session_id: Some("session-1"),
3007 agent: Some("agent4"),
3008 runtime: Some("codex"),
3009 model: Some("gpt-5.4"),
3010 input_tokens: Some(100),
3011 output_tokens: Some(25),
3012 cached_input_tokens: Some(10),
3013 cost_usd_micros: Some(1234),
3014 detail_json: None,
3015 })?;
3016 db.record_watchdog_event(WatchdogEventRecord {
3017 ts_utc: fixed_ts(),
3018 event: "respawn",
3019 agent: Some("agent0"),
3020 severity: Some("warn"),
3021 status: Some("ok"),
3022 detail_json: Some("{}"),
3023 })?;
3024
3025 for table in [
3026 "clone_dispatches",
3027 "harvest_events",
3028 "communication_events",
3029 "mcp_tool_calls",
3030 "git_operations",
3031 "owner_directives",
3032 "token_usage",
3033 "watchdog_events",
3034 ] {
3035 let out = db.query(&format!("SELECT COUNT(*) AS count FROM {table}"))?;
3036 assert!(out.contains('1'), "{table}: {out}");
3037 }
3038 Ok(())
3039 }
3040
3041 #[test]
3042 fn token_usage_batch_inserts_contiguous_ids() -> Result<()> {
3043 let db = db()?;
3044 let count = 1500;
3045 let records: Vec<_> = (0..count)
3046 .map(|i| TokenUsageRecord {
3047 ts_utc: fixed_ts(),
3048 session_id: Some("batch-session"),
3049 agent: Some("agent45"),
3050 runtime: Some("claude"),
3051 model: Some("claude-opus-4-7"),
3052 input_tokens: Some(i),
3053 output_tokens: Some(i * 2),
3054 cached_input_tokens: None,
3055 cost_usd_micros: None,
3056 detail_json: None,
3057 })
3058 .collect();
3059 let written = db.record_token_usage_batch(records)?;
3060 assert_eq!(written, count as usize);
3061 let out = db.query(
3062 "SELECT COUNT(*) AS c, MIN(id) AS lo, MAX(id) AS hi, \
3063 SUM(input_tokens) AS sum_in \
3064 FROM token_usage \
3065 WHERE session_id = 'batch-session'",
3066 )?;
3067 assert!(out.contains(&count.to_string()), "{out}");
3068 assert!(out.contains("1124250"), "{out}");
3070 let written = db.record_token_usage_batch(vec![TokenUsageRecord {
3072 ts_utc: fixed_ts(),
3073 session_id: None,
3074 agent: None,
3075 runtime: None,
3076 model: None,
3077 input_tokens: Some(7),
3078 output_tokens: None,
3079 cached_input_tokens: None,
3080 cost_usd_micros: None,
3081 detail_json: None,
3082 }])?;
3083 assert_eq!(written, 1);
3084 let out = db.query(
3085 "SELECT MAX(id) - MAX(CASE WHEN session_id = 'batch-session' THEN id END) AS delta \
3086 FROM token_usage",
3087 )?;
3088 assert!(out.contains('1'), "{out}");
3089 Ok(())
3090 }
3091
3092 #[test]
3093 fn token_usage_batch_empty_is_noop() -> Result<()> {
3094 let db = db()?;
3095 let written = db.record_token_usage_batch(Vec::<TokenUsageRecord<'_>>::new())?;
3096 assert_eq!(written, 0);
3097 Ok(())
3098 }
3099
3100 #[test]
3101 fn task_round_trip() -> Result<()> {
3102 let db = db()?;
3103 let id = db.record_task(TaskRecord {
3104 title: "Add a task tracker",
3105 body: Some("Use turso and DataFusion."),
3106 status: "open",
3107 priority: Some("p1"),
3108 labels: Some("db,cli"),
3109 source: Some("test"),
3110 source_ref: Some("briefs/task-tracker.md"),
3111 closed_at: None,
3112 closed_reason: None,
3113 closed_evidence: None,
3114 agent: None,
3115 sync_calendar: false,
3116 calendar_task_id: None,
3117 })?;
3118 let row = db.update_task(TaskUpdate {
3119 id,
3120 status: Some("in_progress"),
3121 priority: None,
3122 agent: Some("agent3"),
3123 closed_reason: None,
3124 closed_evidence: None,
3125 sync_calendar: Some(true),
3126 calendar_task_id: Some("calendar-1"),
3127 })?;
3128 assert_eq!(row.id, 1);
3129 assert_eq!(row.status, "in_progress");
3130 assert_eq!(row.agent.as_deref(), Some("agent3"));
3131 assert!(row.sync_calendar);
3132 assert_eq!(row.calendar_task_id.as_deref(), Some("calendar-1"));
3133 let rows = db.list_tasks(Some("in_progress"), Some("p1"))?;
3134 assert_eq!(rows.len(), 1);
3135 let out = db.query(
3136 "SELECT title, status, priority, agent, sync_calendar, calendar_task_id FROM tasks",
3137 )?;
3138 assert!(out.contains("Add a task tracker"));
3139 assert!(out.contains("in_progress"));
3140 assert!(out.contains("agent3"));
3141 assert!(out.contains("calendar-1"));
3142 Ok(())
3143 }
3144
3145 #[test]
3146 fn query_registers_all_tables() -> Result<()> {
3147 let db = db()?;
3148 let out = db.query(
3149 "SELECT COUNT(*) FROM messages \
3150 UNION ALL SELECT COUNT(*) FROM cli_invocations \
3151 UNION ALL SELECT COUNT(*) FROM crashes \
3152 UNION ALL SELECT COUNT(*) FROM ticks \
3153 UNION ALL SELECT COUNT(*) FROM workspaces \
3154 UNION ALL SELECT COUNT(*) FROM sessions \
3155 UNION ALL SELECT COUNT(*) FROM clone_dispatches \
3156 UNION ALL SELECT COUNT(*) FROM harvest_events \
3157 UNION ALL SELECT COUNT(*) FROM communication_events \
3158 UNION ALL SELECT COUNT(*) FROM mcp_tool_calls \
3159 UNION ALL SELECT COUNT(*) FROM git_operations \
3160 UNION ALL SELECT COUNT(*) FROM owner_directives \
3161 UNION ALL SELECT COUNT(*) FROM token_usage \
3162 UNION ALL SELECT COUNT(*) FROM watchdog_events \
3163 UNION ALL SELECT COUNT(*) FROM tasks \
3164 UNION ALL SELECT COUNT(*) FROM source_errors \
3165 UNION ALL SELECT COUNT(*) FROM iroh_events",
3166 )?;
3167 assert!(out.contains('0'));
3168 Ok(())
3169 }
3170
3171 #[test]
3172 fn read_only_path_reports_uninitialized_then_reads_initialized_db() -> Result<()> {
3173 let dir = tempfile::tempdir()?;
3174 let path = dir.path().join("meta.db");
3175
3176 let missing = match Db::open_read_only_path(&path) {
3177 Ok(_) => panic!("missing database opened read-only"),
3178 Err(error) => error.to_string(),
3179 };
3180 assert_eq!(
3181 missing,
3182 format!(
3183 "meta.db has not been initialized at {}; a writer must run first.",
3184 path.display()
3185 )
3186 );
3187 assert!(!path.exists());
3188
3189 let uninitialized = Db::open_path(&path)?;
3190 drop(uninitialized);
3191 let no_schema = match Db::open_read_only_path(&path) {
3192 Ok(_) => panic!("uninitialized database opened read-only"),
3193 Err(error) => error.to_string(),
3194 };
3195 assert_eq!(
3196 no_schema,
3197 format!(
3198 "meta.db has not been initialized at {}; a writer must run first.",
3199 path.display()
3200 )
3201 );
3202
3203 let writer = Db::open_path(&path)?;
3204 writer.migrate()?;
3205 writer.record_message(MessageRecord {
3206 ts_utc: fixed_ts(),
3207 source: "agent",
3208 direction: Direction::Outbound,
3209 chat_id: Some("agent0"),
3210 from_agent: Some("agent5"),
3211 to_agent: Some("agent0"),
3212 body: Some("checkpoint acked"),
3213 raw_json: None,
3214 })?;
3215
3216 let reader = Db::open_read_only_path(&path)?;
3217 assert!(reader.read_only_write_probe_fails()?);
3218 let out = reader.query("SELECT source, chat_id, body FROM messages")?;
3219 assert!(out.contains("agent"));
3220 assert!(out.contains("agent0"));
3221 assert!(out.contains("checkpoint acked"));
3222 Ok(())
3223 }
3224
3225 #[test]
3226 fn migrate_sets_schema_version_7() -> Result<()> {
3227 let db = db()?;
3228 assert_eq!(db.schema_version()?, 7);
3229 Ok(())
3230 }
3231
3232 #[test]
3233 fn migrate_sets_sqlite_user_version_7() -> Result<()> {
3234 let db = db()?;
3235 assert_eq!(
3236 db.with_conn(|conn| async move { user_version(&conn).await })?,
3237 SCHEMA_VERSION
3238 );
3239 Ok(())
3240 }
3241
3242 #[test]
3243 fn hash_peer_id_is_stable_and_truncated() {
3244 let a = hash_peer_id("abc123");
3245 let b = hash_peer_id("abc123");
3246 let c = hash_peer_id("abc124");
3247 assert_eq!(a, b, "same input hashes the same");
3248 assert_ne!(a, c, "different input hashes differently");
3249 assert_eq!(a.len(), 16, "8 bytes hex-encoded = 16 chars");
3250 assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
3251 }
3252
3253 #[test]
3260 fn hash_peer_id_matches_known_sha256_vectors() {
3261 assert_eq!(hash_peer_id("abc"), "ba7816bf8f01cfea");
3263 assert_eq!(hash_peer_id(""), "e3b0c44298fc1c14");
3265 assert_eq!(hash_peer_id("netsky0"), "66863bafcf1591d6");
3267 }
3268
3269 #[test]
3270 fn source_error_round_trip() -> Result<()> {
3271 let db = db()?;
3272 let id = db.record_source_error(SourceErrorRecord {
3273 ts_utc: fixed_ts(),
3274 source: "email",
3275 error_class: SourceErrorClass::Timeout,
3276 count: 1,
3277 detail_json: Some("{\"endpoint\":\"gmail\"}"),
3278 })?;
3279 assert_eq!(id, 1);
3280 db.record_source_error(SourceErrorRecord {
3281 ts_utc: fixed_ts(),
3282 source: "iroh",
3283 error_class: SourceErrorClass::NetworkError,
3284 count: 3,
3285 detail_json: None,
3286 })?;
3287 let out = db.query(
3288 "SELECT source, error_class, SUM(count) AS n FROM source_errors \
3289 GROUP BY source, error_class ORDER BY source",
3290 )?;
3291 assert!(out.contains("email"));
3292 assert!(out.contains("timeout"));
3293 assert!(out.contains("network_error"));
3294 Ok(())
3295 }
3296
3297 #[test]
3298 fn source_error_count_floors_at_one() -> Result<()> {
3299 let db = db()?;
3300 db.record_source_error(SourceErrorRecord {
3301 ts_utc: fixed_ts(),
3302 source: "email",
3303 error_class: SourceErrorClass::Unknown,
3304 count: 0,
3305 detail_json: None,
3306 })?;
3307 let out = db.query("SELECT count FROM source_errors")?;
3308 assert!(out.contains('1'));
3309 Ok(())
3310 }
3311
3312 #[test]
3313 fn source_cursor_round_trip() -> Result<()> {
3314 let db = db()?;
3315 assert_eq!(db.read_source_cursor("imessage")?, None);
3316 db.update_source_cursor("imessage", "42")?;
3317 assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("42"));
3318 db.update_source_cursor("imessage", "99")?;
3320 assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("99"));
3321 db.update_source_cursor("email", "abc")?;
3323 let all = db.list_source_cursors()?;
3324 assert_eq!(all.len(), 2);
3325 assert_eq!(all[0].source, "email");
3326 assert_eq!(all[0].cursor_value, "abc");
3327 assert_eq!(all[1].source, "imessage");
3328 assert_eq!(all[1].cursor_value, "99");
3329 assert!(db.reset_source_cursor("imessage")?);
3331 assert_eq!(db.read_source_cursor("imessage")?, None);
3332 assert!(!db.reset_source_cursor("imessage")?);
3333 Ok(())
3334 }
3335
3336 #[test]
3337 fn event_insert_then_transition_round_trip() -> Result<()> {
3338 let db = db()?;
3339 let delivered = db.insert_event("imessage", fixed_ts(), r#"{"chat":"a"}"#)?;
3340 let failed = db.insert_event("imessage", fixed_ts(), r#"{"chat":"b"}"#)?;
3341 assert!(delivered > 0 && failed > delivered);
3342 db.update_event_delivery(delivered, EventStatus::Delivered, None)?;
3343 db.update_event_delivery(failed, EventStatus::Failed, Some("adapter timed out"))?;
3344 let rows = db.tail_events("imessage", 10)?;
3346 assert_eq!(rows.len(), 2);
3347 assert_eq!(rows[0].id, failed);
3348 assert_eq!(rows[0].delivery_status, "failed");
3349 assert_eq!(rows[0].reason.as_deref(), Some("adapter timed out"));
3350 assert_eq!(rows[1].id, delivered);
3351 assert_eq!(rows[1].delivery_status, "delivered");
3352 assert_eq!(rows[1].reason, None);
3353 let other = db.insert_event("email", fixed_ts(), "{}")?;
3355 assert!(other > failed);
3356 let only_imessage = db.tail_events("imessage", 10)?;
3357 assert_eq!(only_imessage.len(), 2);
3358 Ok(())
3359 }
3360
3361 #[test]
3362 fn iroh_event_round_trip() -> Result<()> {
3363 let db = db()?;
3364 let hash = hash_peer_id("raw_node_id_abc");
3365 let id = db.record_iroh_event(IrohEventRecord {
3366 ts_utc: fixed_ts(),
3367 event_type: IrohEventType::Connect,
3368 peer_id_hash: &hash,
3369 peer_label: Some("work"),
3370 detail_json: None,
3371 })?;
3372 assert_eq!(id, 1);
3373 db.record_iroh_event(IrohEventRecord {
3374 ts_utc: fixed_ts(),
3375 event_type: IrohEventType::HandshakeRefused,
3376 peer_id_hash: &hash_peer_id("unknown_peer"),
3377 peer_label: None,
3378 detail_json: Some("{\"reason\":\"not_in_allowlist\"}"),
3379 })?;
3380 let out =
3381 db.query("SELECT event_type, peer_id_hash, peer_label FROM iroh_events ORDER BY id")?;
3382 assert!(out.contains("connect"));
3383 assert!(out.contains("handshake_refused"));
3384 assert!(out.contains("work"));
3385 assert!(!out.contains("raw_node_id_abc"), "raw node id leaked");
3386 Ok(())
3387 }
3388
3389 #[test]
3390 fn token_usage_includes_runtime_column() -> Result<()> {
3391 let db = db()?;
3392 db.record_token_usage(TokenUsageRecord {
3393 ts_utc: fixed_ts(),
3394 session_id: Some("s1"),
3395 agent: Some("agent12"),
3396 runtime: Some("claude"),
3397 model: Some("claude-opus-4-7"),
3398 input_tokens: Some(1000),
3399 output_tokens: Some(500),
3400 cached_input_tokens: None,
3401 cost_usd_micros: Some(12_500),
3402 detail_json: None,
3403 })?;
3404 let out =
3405 db.query("SELECT runtime, SUM(input_tokens) AS ins FROM token_usage GROUP BY runtime")?;
3406 assert!(out.contains("claude"));
3407 assert!(out.contains("1000"));
3408 Ok(())
3409 }
3410
3411 #[test]
3412 fn token_usage_batch_ignores_duplicate_claude_session_ingest_rows() -> Result<()> {
3413 let db = db()?;
3414 let detail = r#"{"source":"claude-sessions","path":"/tmp/session.jsonl","line":7}"#;
3415 let records = vec![
3416 TokenUsageRecord {
3417 ts_utc: fixed_ts(),
3418 session_id: Some("session-1"),
3419 agent: Some("agent12"),
3420 runtime: Some("claude"),
3421 model: Some("claude-sonnet-4"),
3422 input_tokens: Some(10),
3423 output_tokens: Some(5),
3424 cached_input_tokens: None,
3425 cost_usd_micros: None,
3426 detail_json: Some(detail),
3427 },
3428 TokenUsageRecord {
3429 ts_utc: fixed_ts(),
3430 session_id: Some("session-1"),
3431 agent: Some("agent12"),
3432 runtime: Some("claude"),
3433 model: Some("claude-sonnet-4"),
3434 input_tokens: Some(10),
3435 output_tokens: Some(5),
3436 cached_input_tokens: None,
3437 cost_usd_micros: None,
3438 detail_json: Some(detail),
3439 },
3440 ];
3441 let written = db.record_token_usage_batch(records)?;
3442 assert_eq!(written, 1);
3443 let out = db.query("SELECT COUNT(*) AS c, MAX(id) AS hi FROM token_usage")?;
3444 assert!(out.contains('1'), "{out}");
3445 Ok(())
3446 }
3447
3448 #[test]
3449 fn migration_from_v6_dedupes_existing_claude_session_rows() -> Result<()> {
3450 let dir = tempfile::tempdir()?;
3451 let path = dir.path().join("meta.db");
3452 let db = Db::open_path(&path)?;
3453
3454 db.with_conn(|conn| async move {
3455 conn.execute_batch(
3456 "PRAGMA synchronous=NORMAL; \
3457 CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value INTEGER NOT NULL); \
3458 CREATE TABLE IF NOT EXISTS ids (name TEXT PRIMARY KEY, id INTEGER NOT NULL); \
3459 CREATE TABLE IF NOT EXISTS token_usage (id INTEGER PRIMARY KEY, row_json TEXT NOT NULL);",
3460 )
3461 .await?;
3462 conn.execute(
3463 "INSERT INTO meta (key, value) VALUES ('schema_version', 6)",
3464 (),
3465 )
3466 .await?;
3467 conn.execute("PRAGMA user_version = 6", ()).await?;
3468 conn.execute("INSERT INTO ids (name, id) VALUES ('token_usage', 2)", ())
3469 .await?;
3470 conn.execute(
3471 "INSERT INTO token_usage (id, row_json) VALUES (?1, ?2)",
3472 params_from_iter([
3473 Value::from(1),
3474 Value::from(
3475 r#"{"id":1,"ts_utc":"2026-04-18T00:00:00Z","session_id":"s1","agent":"agent1","runtime":"claude","model":"claude-sonnet-4","input_tokens":10,"output_tokens":3,"cached_input_tokens":null,"cost_usd_micros":null,"detail_json":"{\"source\":\"claude-sessions\",\"path\":\"/tmp/session.jsonl\",\"line\":8}"}"#,
3476 ),
3477 ]),
3478 )
3479 .await?;
3480 conn.execute(
3481 "INSERT INTO token_usage (id, row_json) VALUES (?1, ?2)",
3482 params_from_iter([
3483 Value::from(2),
3484 Value::from(
3485 r#"{"id":2,"ts_utc":"2026-04-18T00:01:00Z","session_id":"s1","agent":"agent1","runtime":"claude","model":"claude-sonnet-4","input_tokens":10,"output_tokens":3,"cached_input_tokens":null,"cost_usd_micros":null,"detail_json":"{\"source\":\"claude-sessions\",\"path\":\"/tmp/session.jsonl\",\"line\":8}"}"#,
3486 ),
3487 ]),
3488 )
3489 .await?;
3490 Ok(())
3491 })?;
3492
3493 let before = db.query("SELECT COUNT(*) AS c FROM token_usage")?;
3494 assert!(before.contains('2'), "{before}");
3495
3496 db.migrate()?;
3497
3498 let after = db.query("SELECT COUNT(*) AS c, MAX(id) AS hi FROM token_usage")?;
3499 assert!(after.contains('1'), "{after}");
3500 assert!(path.with_extension("db.bak.v7-token-usage-dedupe").exists());
3501 Ok(())
3502 }
3503
3504 #[test]
3505 fn clone_lifetimes_view_reads_finished_dispatches() -> Result<()> {
3506 let db = db()?;
3507 db.record_clone_dispatch(CloneDispatchRecord {
3508 ts_utc_start: fixed_ts(),
3509 ts_utc_end: Some(fixed_ts() + chrono::Duration::minutes(7)),
3510 agent_id: "agent5",
3511 runtime: Some("claude"),
3512 brief_path: None,
3513 brief: None,
3514 workspace: Some("workspaces/foo/repo"),
3515 branch: Some("feat/foo"),
3516 status: Some("done"),
3517 exit_code: Some(0),
3518 detail_json: None,
3519 })?;
3520 db.record_clone_dispatch(CloneDispatchRecord {
3521 ts_utc_start: fixed_ts(),
3522 ts_utc_end: None,
3523 agent_id: "agent6",
3524 runtime: Some("codex"),
3525 brief_path: None,
3526 brief: None,
3527 workspace: None,
3528 branch: None,
3529 status: Some("in_flight"),
3530 exit_code: None,
3531 detail_json: None,
3532 })?;
3533 let out = db.query("SELECT agent_id, runtime FROM clone_lifetimes")?;
3534 assert!(out.contains("agent5"));
3535 assert!(
3536 !out.contains("agent6"),
3537 "in-flight dispatches stay excluded"
3538 );
3539 Ok(())
3540 }
3541
3542 #[test]
3543 fn migration_from_v4_preserves_existing_rows_and_adds_new_tables() -> Result<()> {
3544 let dir = tempfile::tempdir()?;
3545 let path = dir.path().join("meta.db");
3546
3547 {
3548 let db = Db::open_path(&path)?;
3549 db.migrate()?;
3550 db.with_conn(|conn| async move {
3556 conn.execute("UPDATE meta SET value = 4 WHERE key = 'schema_version'", ())
3557 .await?;
3558 conn.execute("PRAGMA user_version = 4", ()).await?;
3559 Ok(())
3560 })?;
3561 db.record_message(MessageRecord {
3562 ts_utc: fixed_ts(),
3563 source: "imessage",
3564 direction: Direction::Inbound,
3565 chat_id: Some("chat-1"),
3566 from_agent: None,
3567 to_agent: None,
3568 body: Some("pre-migration"),
3569 raw_json: None,
3570 })?;
3571 }
3572
3573 let db = Db::open_path(&path)?;
3574 db.migrate()?;
3575 assert_eq!(db.schema_version()?, 7);
3576
3577 let out = db.query("SELECT body FROM messages")?;
3578 assert!(
3579 out.contains("pre-migration"),
3580 "migration lost pre-existing row: {out}"
3581 );
3582 let se = db.query("SELECT COUNT(*) FROM source_errors")?;
3584 assert!(se.contains('0'));
3585 let ie = db.query("SELECT COUNT(*) FROM iroh_events")?;
3586 assert!(ie.contains('0'));
3587 assert!(db.list_source_cursors()?.is_empty());
3590 assert!(db.tail_events("imessage", 10)?.is_empty());
3591 Ok(())
3592 }
3593}