Skip to main content

netsky_db/
lib.rs

1//! Pure Rust netsky observability database.
2//!
3//! OLTP writes use turso by default. OLAP reads snapshot JSON rows into Arrow
4//! RecordBatches and execute SQL with DataFusion.
5
6use std::collections::HashSet;
7use std::fs;
8use std::path::Path;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::{DateTime, SecondsFormat, Utc};
14use datafusion::arrow::array::{BooleanArray, Int64Array, StringArray};
15use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
16use datafusion::arrow::record_batch::RecordBatch;
17use datafusion::arrow::util::pretty::pretty_format_batches;
18use datafusion::datasource::MemTable;
19use datafusion::prelude::SessionContext;
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use thiserror::Error;
22use tokio::runtime::Runtime;
23use turso::{Builder, Connection, Value, params_from_iter};
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27/// Re-exports for callers that consume `query_batches` results.
28pub use datafusion::arrow::array as arrow_array;
29pub use datafusion::arrow::record_batch::RecordBatch as ArrowRecordBatch;
30
31pub const SCHEMA_VERSION: i64 = 6;
32
33const TABLE_NAMES: [&str; 17] = [
34    "messages",
35    "cli_invocations",
36    "crashes",
37    "ticks",
38    "workspaces",
39    "sessions",
40    "clone_dispatches",
41    "harvest_events",
42    "communication_events",
43    "mcp_tool_calls",
44    "git_operations",
45    "owner_directives",
46    "token_usage",
47    "watchdog_events",
48    "netsky_tasks",
49    "source_errors",
50    "iroh_events",
51];
52
53const MESSAGES: &str = "messages";
54const CLI_INVOCATIONS: &str = "cli_invocations";
55const CRASHES: &str = "crashes";
56const TICKS: &str = "ticks";
57const WORKSPACES: &str = "workspaces";
58const SESSIONS: &str = "sessions";
59const CLONE_DISPATCHES: &str = "clone_dispatches";
60const HARVEST_EVENTS: &str = "harvest_events";
61const COMMUNICATION_EVENTS: &str = "communication_events";
62const MCP_TOOL_CALLS: &str = "mcp_tool_calls";
63const GIT_OPERATIONS: &str = "git_operations";
64const OWNER_DIRECTIVES: &str = "owner_directives";
65const TOKEN_USAGE: &str = "token_usage";
66const WATCHDOG_EVENTS: &str = "watchdog_events";
67const TASKS: &str = "netsky_tasks";
68const SOURCE_ERRORS: &str = "source_errors";
69const IROH_EVENTS: &str = "iroh_events";
70
71/// Names visible in DataFusion queries, paired with their Turso storage tables.
72/// The only divergence is `tasks` vs `netsky_tasks`; everything else matches.
73const DATAFUSION_TABLES: [(&str, &str); 17] = [
74    ("messages", MESSAGES),
75    ("cli_invocations", CLI_INVOCATIONS),
76    ("crashes", CRASHES),
77    ("ticks", TICKS),
78    ("workspaces", WORKSPACES),
79    ("sessions", SESSIONS),
80    ("clone_dispatches", CLONE_DISPATCHES),
81    ("harvest_events", HARVEST_EVENTS),
82    ("communication_events", COMMUNICATION_EVENTS),
83    ("mcp_tool_calls", MCP_TOOL_CALLS),
84    ("git_operations", GIT_OPERATIONS),
85    ("owner_directives", OWNER_DIRECTIVES),
86    ("token_usage", TOKEN_USAGE),
87    ("watchdog_events", WATCHDOG_EVENTS),
88    ("tasks", TASKS),
89    ("source_errors", SOURCE_ERRORS),
90    ("iroh_events", IROH_EVENTS),
91];
92
93/// DataFusion view that depends on `clone_dispatches` rows.
94const CLONE_LIFETIMES_VIEW: &str = "clone_lifetimes";
95
96/// (storage_table, json_key) pairs indexed as range targets. Every
97/// analytics query in `netsky-cli/src/cmd/analytics.rs` is time-bounded,
98/// and `list_tasks` sorts by `created_at` when we push its ORDER BY
99/// down. Without these indexes, a time-range pushdown still has to
100/// scan every row. Expression indexes on `json_extract(row_json,
101/// '$.<key>')` let SQLite serve `WHERE json_extract(...) >= 'X' AND
102/// json_extract(...) < 'Y'` as an index range scan.
103const INDEXED_TIME_COLUMNS: &[(&str, &str)] = &[
104    (MESSAGES, "ts_utc"),
105    (CLI_INVOCATIONS, "ts_utc"),
106    (CRASHES, "ts_utc"),
107    (TICKS, "ts_utc"),
108    (WORKSPACES, "ts_utc_created"),
109    (SESSIONS, "ts_utc"),
110    (CLONE_DISPATCHES, "ts_utc_start"),
111    (HARVEST_EVENTS, "ts_utc"),
112    (COMMUNICATION_EVENTS, "ts_utc"),
113    (MCP_TOOL_CALLS, "ts_utc_start"),
114    (GIT_OPERATIONS, "ts_utc"),
115    (OWNER_DIRECTIVES, "ts_utc"),
116    (TOKEN_USAGE, "ts_utc"),
117    (WATCHDOG_EVENTS, "ts_utc"),
118    (TASKS, "created_at"),
119    (SOURCE_ERRORS, "ts_utc"),
120    (IROH_EVENTS, "ts_utc"),
121];
122
123/// (storage_table, json_key) pairs used as equality-lookup filters. These
124/// back the `list_tasks` push-down and the `iroh_summary` /
125/// `messages_summary` group-bys.
126const INDEXED_EQ_COLUMNS: &[(&str, &str)] = &[
127    (TASKS, "status"),
128    (TASKS, "priority"),
129    (MESSAGES, "source"),
130    (SESSIONS, "event"),
131    (IROH_EVENTS, "event_type"),
132];
133
134#[derive(Debug, Error)]
135pub enum Error {
136    #[error("home directory not found")]
137    HomeDirMissing,
138    #[error("schema version {found} is newer than supported {supported}")]
139    FutureSchemaVersion { found: i64, supported: i64 },
140    #[error("meta.db has not been initialized at {0}; a writer must run first.")]
141    NotInitialized(PathBuf),
142    #[error("read-only connection was writable at {0}")]
143    ReadOnlyNotEnforced(PathBuf),
144    #[error(transparent)]
145    Turso(#[from] turso::Error),
146    #[error(transparent)]
147    DataFusion(#[from] datafusion::error::DataFusionError),
148    #[error(transparent)]
149    Arrow(#[from] datafusion::arrow::error::ArrowError),
150    #[error(transparent)]
151    Serde(#[from] serde_json::Error),
152    #[error(transparent)]
153    Io(#[from] std::io::Error),
154    #[error(transparent)]
155    Chrono(#[from] chrono::ParseError),
156    #[error("{0} not found")]
157    NotFound(String),
158}
159
160#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
161pub enum Direction {
162    Inbound,
163    Outbound,
164}
165
166impl Direction {
167    fn as_str(self) -> &'static str {
168        match self {
169            Direction::Inbound => "inbound",
170            Direction::Outbound => "outbound",
171        }
172    }
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
176pub enum SessionEvent {
177    Up,
178    Down,
179    Note,
180}
181
182impl SessionEvent {
183    fn as_str(self) -> &'static str {
184        match self {
185            SessionEvent::Up => "up",
186            SessionEvent::Down => "down",
187            SessionEvent::Note => "note",
188        }
189    }
190}
191
192/// Delivery lifecycle for an event. A row is first inserted with
193/// `Pending` before the delivery adapter runs; once the adapter returns,
194/// the row is updated to `Delivered` or `Failed` with a reason.
195#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
196pub enum EventStatus {
197    Pending,
198    Delivered,
199    Failed,
200}
201
202impl EventStatus {
203    pub fn as_str(self) -> &'static str {
204        match self {
205            EventStatus::Pending => "pending",
206            EventStatus::Delivered => "delivered",
207            EventStatus::Failed => "failed",
208        }
209    }
210}
211
212/// One row from `source_cursors`.
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
214pub struct SourceCursorRow {
215    pub source: String,
216    pub cursor_value: String,
217    pub updated_at: String,
218}
219
220/// One row from `events`.
221#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
222pub struct EventRow {
223    pub id: i64,
224    pub source: String,
225    pub ts_utc: String,
226    pub payload_json: String,
227    pub delivery_status: String,
228    pub reason: Option<String>,
229}
230
231/// Bounded error class for `source_errors`. Prevents cardinality blowup
232/// from free-form strings (stack traces, paths, or human text) making it
233/// into the analytics surface. Classification happens at the call site;
234/// anything that doesn't map cleanly goes to `Unknown`.
235#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
236pub enum SourceErrorClass {
237    Timeout,
238    AuthFailure,
239    RateLimit,
240    NetworkError,
241    ProtocolError,
242    NotFound,
243    PermissionDenied,
244    Unknown,
245}
246
247impl SourceErrorClass {
248    pub fn as_str(self) -> &'static str {
249        match self {
250            SourceErrorClass::Timeout => "timeout",
251            SourceErrorClass::AuthFailure => "auth_failure",
252            SourceErrorClass::RateLimit => "rate_limit",
253            SourceErrorClass::NetworkError => "network_error",
254            SourceErrorClass::ProtocolError => "protocol_error",
255            SourceErrorClass::NotFound => "not_found",
256            SourceErrorClass::PermissionDenied => "permission_denied",
257            SourceErrorClass::Unknown => "unknown",
258        }
259    }
260}
261
262/// Bounded event type for `iroh_events`. Analytics show connect counts,
263/// eviction pressure, and reconnect churn per peer-hash per day.
264#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
265pub enum IrohEventType {
266    Connect,
267    Evict,
268    Reconnect,
269    HandshakeRefused,
270}
271
272impl IrohEventType {
273    pub fn as_str(self) -> &'static str {
274        match self {
275            IrohEventType::Connect => "connect",
276            IrohEventType::Evict => "evict",
277            IrohEventType::Reconnect => "reconnect",
278            IrohEventType::HandshakeRefused => "handshake_refused",
279        }
280    }
281}
282
283/// SHA256-truncated, URL-safe hex hash of a raw iroh NodeId. Stable
284/// across a paired peer's identity lifetime; does NOT expose the raw
285/// ed25519 public key. The published analytics surface reads these
286/// hashes, never the raw ID.
287pub fn hash_peer_id(raw_node_id: &str) -> String {
288    use sha2::{Digest, Sha256};
289    let mut hasher = Sha256::new();
290    hasher.update(raw_node_id.as_bytes());
291    let digest = hasher.finalize();
292    hex_encode(&digest[..8])
293}
294
295fn hex_encode(bytes: &[u8]) -> String {
296    const HEX: &[u8; 16] = b"0123456789abcdef";
297    let mut out = String::with_capacity(bytes.len() * 2);
298    for byte in bytes {
299        out.push(HEX[(byte >> 4) as usize] as char);
300        out.push(HEX[(byte & 0x0f) as usize] as char);
301    }
302    out
303}
304
305pub struct Db {
306    path: PathBuf,
307}
308
309pub struct MessageRecord<'a> {
310    pub ts_utc: DateTime<Utc>,
311    pub source: &'a str,
312    pub direction: Direction,
313    pub chat_id: Option<&'a str>,
314    pub from_agent: Option<&'a str>,
315    pub to_agent: Option<&'a str>,
316    pub body: Option<&'a str>,
317    pub raw_json: Option<&'a str>,
318}
319
320pub struct CloneDispatchRecord<'a> {
321    pub ts_utc_start: DateTime<Utc>,
322    pub ts_utc_end: Option<DateTime<Utc>>,
323    pub agent_id: &'a str,
324    pub runtime: Option<&'a str>,
325    pub brief_path: Option<&'a str>,
326    pub brief: Option<&'a str>,
327    pub workspace: Option<&'a str>,
328    pub branch: Option<&'a str>,
329    pub status: Option<&'a str>,
330    pub exit_code: Option<i64>,
331    pub detail_json: Option<&'a str>,
332}
333
334pub struct HarvestEventRecord<'a> {
335    pub ts_utc: DateTime<Utc>,
336    pub source_branch: &'a str,
337    pub target_branch: &'a str,
338    pub commit_sha: Option<&'a str>,
339    pub status: &'a str,
340    pub conflicts: Option<&'a str>,
341    pub detail_json: Option<&'a str>,
342}
343
344pub struct CommunicationEventRecord<'a> {
345    pub ts_utc: DateTime<Utc>,
346    pub source: &'a str,
347    pub tool: Option<&'a str>,
348    pub direction: Direction,
349    pub chat_id: Option<&'a str>,
350    pub message_id: Option<&'a str>,
351    pub handle: Option<&'a str>,
352    pub agent: Option<&'a str>,
353    pub body: Option<&'a str>,
354    pub status: Option<&'a str>,
355    pub detail_json: Option<&'a str>,
356}
357
358#[derive(Debug, Clone, Eq, PartialEq)]
359pub struct CommunicationEvent {
360    pub id: i64,
361    pub ts_utc: DateTime<Utc>,
362    pub source: String,
363    pub tool: Option<String>,
364    pub direction: String,
365    pub chat_id: Option<String>,
366    pub message_id: Option<String>,
367    pub handle: Option<String>,
368    pub agent: Option<String>,
369    pub body: Option<String>,
370    pub status: Option<String>,
371    pub detail_json: Option<String>,
372}
373
374pub struct McpToolCallRecord<'a> {
375    pub ts_utc_start: DateTime<Utc>,
376    pub ts_utc_end: Option<DateTime<Utc>>,
377    pub source: &'a str,
378    pub tool: &'a str,
379    pub agent: Option<&'a str>,
380    pub duration_ms: Option<i64>,
381    pub success: bool,
382    pub error: Option<&'a str>,
383    pub timeout_race: bool,
384    pub request_json: Option<&'a str>,
385    pub response_json: Option<&'a str>,
386}
387
388pub struct GitOperationRecord<'a> {
389    pub ts_utc: DateTime<Utc>,
390    pub operation: &'a str,
391    pub repo: &'a str,
392    pub branch: Option<&'a str>,
393    pub remote: Option<&'a str>,
394    pub from_sha: Option<&'a str>,
395    pub to_sha: Option<&'a str>,
396    pub status: &'a str,
397    pub detail_json: Option<&'a str>,
398}
399
400pub struct OwnerDirectiveRecord<'a> {
401    pub ts_utc: DateTime<Utc>,
402    pub source: &'a str,
403    pub chat_id: Option<&'a str>,
404    pub raw_text: &'a str,
405    pub resolved_action: Option<&'a str>,
406    pub agent: Option<&'a str>,
407    pub status: Option<&'a str>,
408    pub detail_json: Option<&'a str>,
409}
410
411pub struct TokenUsageRecord<'a> {
412    pub ts_utc: DateTime<Utc>,
413    pub session_id: Option<&'a str>,
414    pub agent: Option<&'a str>,
415    /// Runtime that emitted the usage: "claude", "codex", "copilot", etc.
416    /// Distinct from `model` so analytics can slice by process runtime
417    /// independent of which model the runtime happened to invoke.
418    pub runtime: Option<&'a str>,
419    pub model: Option<&'a str>,
420    pub input_tokens: Option<i64>,
421    pub output_tokens: Option<i64>,
422    pub cached_input_tokens: Option<i64>,
423    pub cost_usd_micros: Option<i64>,
424    pub detail_json: Option<&'a str>,
425}
426
427pub struct SourceErrorRecord<'a> {
428    pub ts_utc: DateTime<Utc>,
429    pub source: &'a str,
430    pub error_class: SourceErrorClass,
431    /// Count for pre-aggregated writes. Defaults to 1 for per-event
432    /// writes. A future in-process bucketing layer can flush rolled-up
433    /// counts without changing the schema.
434    pub count: i64,
435    pub detail_json: Option<&'a str>,
436}
437
438pub struct IrohEventRecord<'a> {
439    pub ts_utc: DateTime<Utc>,
440    pub event_type: IrohEventType,
441    /// SHA256-truncated hash of the raw NodeId. Call `hash_peer_id` at
442    /// the hook site; never pass the raw ed25519 ID here.
443    pub peer_id_hash: &'a str,
444    /// Owner-defined label from peers.toml, if the peer was resolved
445    /// before the event fired. `None` for `HandshakeRefused` where the
446    /// label lookup failed.
447    pub peer_label: Option<&'a str>,
448    pub detail_json: Option<&'a str>,
449}
450
451pub struct WatchdogEventRecord<'a> {
452    pub ts_utc: DateTime<Utc>,
453    pub event: &'a str,
454    pub agent: Option<&'a str>,
455    pub severity: Option<&'a str>,
456    pub status: Option<&'a str>,
457    pub detail_json: Option<&'a str>,
458}
459
460pub struct TaskRecord<'a> {
461    pub title: &'a str,
462    pub body: Option<&'a str>,
463    pub status: &'a str,
464    pub priority: Option<&'a str>,
465    pub labels: Option<&'a str>,
466    pub source: Option<&'a str>,
467    pub source_ref: Option<&'a str>,
468    pub closed_at: Option<&'a str>,
469    pub closed_reason: Option<&'a str>,
470    pub closed_evidence: Option<&'a str>,
471    pub agent: Option<&'a str>,
472    pub sync_calendar: bool,
473    pub calendar_task_id: Option<&'a str>,
474}
475
476pub struct TaskUpdate<'a> {
477    pub id: i64,
478    pub status: Option<&'a str>,
479    pub priority: Option<&'a str>,
480    pub agent: Option<&'a str>,
481    pub closed_reason: Option<&'a str>,
482    pub closed_evidence: Option<&'a str>,
483    pub sync_calendar: Option<bool>,
484    pub calendar_task_id: Option<&'a str>,
485}
486
487#[derive(Debug, Serialize, Deserialize)]
488struct MessageRow {
489    id: i64,
490    ts_utc: String,
491    source: String,
492    direction: String,
493    chat_id: Option<String>,
494    from_agent: Option<String>,
495    to_agent: Option<String>,
496    body: Option<String>,
497    raw_json: Option<String>,
498}
499
500#[derive(Debug, Serialize, Deserialize)]
501struct CliRow {
502    id: i64,
503    ts_utc: String,
504    bin: String,
505    argv_json: String,
506    exit_code: Option<i64>,
507    duration_ms: Option<i64>,
508    host: String,
509}
510
511#[derive(Debug, Serialize, Deserialize)]
512struct CrashRow {
513    id: i64,
514    ts_utc: String,
515    kind: String,
516    agent: String,
517    detail_json: String,
518}
519
520#[derive(Debug, Serialize, Deserialize)]
521struct TickRow {
522    id: i64,
523    ts_utc: String,
524    source: String,
525    detail_json: String,
526}
527
528#[derive(Debug, Serialize, Deserialize)]
529struct WorkspaceRow {
530    id: i64,
531    ts_utc_created: String,
532    name: String,
533    branch: String,
534    ts_utc_deleted: Option<String>,
535    verdict: Option<String>,
536}
537
538#[derive(Debug, Serialize, Deserialize)]
539struct SessionRow {
540    id: i64,
541    ts_utc: String,
542    agent: String,
543    session_num: i64,
544    event: String,
545}
546
547#[derive(Debug, Serialize, Deserialize)]
548struct CloneDispatchRow {
549    id: i64,
550    ts_utc_start: String,
551    ts_utc_end: Option<String>,
552    agent_id: String,
553    runtime: Option<String>,
554    brief_path: Option<String>,
555    brief: Option<String>,
556    workspace: Option<String>,
557    branch: Option<String>,
558    status: Option<String>,
559    exit_code: Option<i64>,
560    detail_json: Option<String>,
561}
562
563#[derive(Debug, Serialize, Deserialize)]
564struct HarvestEventRow {
565    id: i64,
566    ts_utc: String,
567    source_branch: String,
568    target_branch: String,
569    commit_sha: Option<String>,
570    status: String,
571    conflicts: Option<String>,
572    detail_json: Option<String>,
573}
574
575#[derive(Debug, Serialize, Deserialize)]
576struct CommunicationEventRow {
577    id: i64,
578    ts_utc: String,
579    source: String,
580    tool: Option<String>,
581    direction: String,
582    chat_id: Option<String>,
583    message_id: Option<String>,
584    handle: Option<String>,
585    agent: Option<String>,
586    body: Option<String>,
587    status: Option<String>,
588    detail_json: Option<String>,
589}
590
591impl TryFrom<CommunicationEventRow> for CommunicationEvent {
592    type Error = Error;
593
594    fn try_from(row: CommunicationEventRow) -> Result<Self> {
595        Ok(Self {
596            id: row.id,
597            ts_utc: parse_ts(&row.ts_utc)?,
598            source: row.source,
599            tool: row.tool,
600            direction: row.direction,
601            chat_id: row.chat_id,
602            message_id: row.message_id,
603            handle: row.handle,
604            agent: row.agent,
605            body: row.body,
606            status: row.status,
607            detail_json: row.detail_json,
608        })
609    }
610}
611
612#[derive(Debug, Serialize, Deserialize)]
613struct McpToolCallRow {
614    id: i64,
615    ts_utc_start: String,
616    ts_utc_end: Option<String>,
617    source: String,
618    tool: String,
619    agent: Option<String>,
620    duration_ms: Option<i64>,
621    success: bool,
622    error: Option<String>,
623    timeout_race: bool,
624    request_json: Option<String>,
625    response_json: Option<String>,
626}
627
628#[derive(Debug, Serialize, Deserialize)]
629struct GitOperationRow {
630    id: i64,
631    ts_utc: String,
632    operation: String,
633    repo: String,
634    branch: Option<String>,
635    remote: Option<String>,
636    from_sha: Option<String>,
637    to_sha: Option<String>,
638    status: String,
639    detail_json: Option<String>,
640}
641
642#[derive(Debug, Serialize, Deserialize)]
643struct OwnerDirectiveRow {
644    id: i64,
645    ts_utc: String,
646    source: String,
647    chat_id: Option<String>,
648    raw_text: String,
649    resolved_action: Option<String>,
650    agent: Option<String>,
651    status: Option<String>,
652    detail_json: Option<String>,
653}
654
655#[derive(Debug, Serialize, Deserialize)]
656struct TokenUsageRow {
657    id: i64,
658    ts_utc: String,
659    session_id: Option<String>,
660    agent: Option<String>,
661    #[serde(default)]
662    runtime: Option<String>,
663    model: Option<String>,
664    input_tokens: Option<i64>,
665    output_tokens: Option<i64>,
666    cached_input_tokens: Option<i64>,
667    cost_usd_micros: Option<i64>,
668    detail_json: Option<String>,
669}
670
671#[derive(Debug, Serialize, Deserialize)]
672struct SourceErrorRow {
673    id: i64,
674    ts_utc: String,
675    source: String,
676    error_class: String,
677    count: i64,
678    detail_json: Option<String>,
679}
680
681#[derive(Debug, Serialize, Deserialize)]
682struct IrohEventRow {
683    id: i64,
684    ts_utc: String,
685    event_type: String,
686    peer_id_hash: String,
687    peer_label: Option<String>,
688    detail_json: Option<String>,
689}
690
691#[derive(Debug, Clone, Serialize, Deserialize)]
692struct WatchdogEventRow {
693    id: i64,
694    ts_utc: String,
695    event: String,
696    agent: Option<String>,
697    severity: Option<String>,
698    status: Option<String>,
699    detail_json: Option<String>,
700}
701
702#[derive(Debug, Clone, Serialize, Deserialize)]
703pub struct TaskRow {
704    pub id: i64,
705    pub created_at: String,
706    pub updated_at: String,
707    pub title: String,
708    pub body: Option<String>,
709    pub status: String,
710    pub priority: Option<String>,
711    pub labels: Option<String>,
712    pub source: Option<String>,
713    pub source_ref: Option<String>,
714    pub closed_at: Option<String>,
715    pub closed_reason: Option<String>,
716    pub closed_evidence: Option<String>,
717    pub agent: Option<String>,
718    #[serde(default)]
719    pub sync_calendar: bool,
720    #[serde(default)]
721    pub calendar_task_id: Option<String>,
722}
723
724impl Db {
725    pub fn open() -> Result<Self> {
726        let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
727        let dir = home.join(".netsky");
728        fs::create_dir_all(&dir)?;
729        Self::open_path(dir.join("meta.db"))
730    }
731
732    pub fn open_read_only() -> Result<Self> {
733        let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
734        Self::open_read_only_path(home.join(".netsky").join("meta.db"))
735    }
736
737    pub fn open_path(path: impl AsRef<Path>) -> Result<Self> {
738        let path = path.as_ref().to_path_buf();
739        if path != Path::new(":memory:")
740            && let Some(parent) = path.parent()
741        {
742            fs::create_dir_all(parent)?;
743        }
744        let db = Self { path };
745        db.with_conn(|_| async { Ok(()) })?;
746        Ok(db)
747    }
748
749    pub fn open_read_only_path(path: impl AsRef<Path>) -> Result<Self> {
750        let path = path.as_ref().to_path_buf();
751        if path != Path::new(":memory:") && !path.exists() {
752            return Err(Error::NotInitialized(path));
753        }
754        let db = Self { path };
755        db.with_read_only_conn(|_| async { Ok(()) })?;
756        Ok(db)
757    }
758
759    #[cfg(test)]
760    pub(crate) fn open_in_memory() -> Result<Self> {
761        let path = std::env::temp_dir().join(format!(
762            "netsky-db-test-{}-{}.db",
763            std::process::id(),
764            Utc::now().timestamp_nanos_opt().unwrap_or_default()
765        ));
766        Self::open_path(path)
767    }
768
769    pub fn migrate(&self) -> Result<()> {
770        self.with_conn(|conn| async move {
771            let current = schema_version(&conn).await?;
772            if current > SCHEMA_VERSION {
773                return Err(Error::FutureSchemaVersion {
774                    found: current,
775                    supported: SCHEMA_VERSION,
776                });
777            }
778            conn.execute(
779                "INSERT INTO meta (key, value) VALUES (?1, ?2) \
780                 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
781                params_from_iter([Value::from("schema_version"), Value::from(SCHEMA_VERSION)]),
782            )
783            .await?;
784            for table in TABLE_NAMES {
785                let sql = format!(
786                    "CREATE TABLE IF NOT EXISTS {table} \
787                     (id INTEGER PRIMARY KEY, row_json TEXT NOT NULL)"
788                );
789                conn.execute(&sql, ()).await?;
790            }
791            for &(table, column) in INDEXED_TIME_COLUMNS {
792                let sql = format!(
793                    "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
794                     ON {table}(json_extract(row_json, '$.{column}'))"
795                );
796                conn.execute(&sql, ()).await?;
797            }
798            for &(table, column) in INDEXED_EQ_COLUMNS {
799                let sql = format!(
800                    "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
801                     ON {table}(json_extract(row_json, '$.{column}'))"
802                );
803                conn.execute(&sql, ()).await?;
804            }
805            // v6: structured tables for source-cursor durability and
806            // event delivery status. Columns are real (not row_json)
807            // because both tables have a small fixed shape and the CLI
808            // needs to read individual fields without a JSON parse.
809            conn.execute(
810                "CREATE TABLE IF NOT EXISTS source_cursors ( \
811                     source TEXT PRIMARY KEY, \
812                     cursor_value TEXT NOT NULL, \
813                     updated_at TEXT NOT NULL \
814                 )",
815                (),
816            )
817            .await?;
818            conn.execute(
819                "CREATE TABLE IF NOT EXISTS events ( \
820                     id INTEGER PRIMARY KEY AUTOINCREMENT, \
821                     source TEXT NOT NULL, \
822                     ts_utc TEXT NOT NULL, \
823                     payload_json TEXT NOT NULL, \
824                     delivery_status TEXT NOT NULL, \
825                     reason TEXT \
826                 )",
827                (),
828            )
829            .await?;
830            conn.execute(&format!("PRAGMA user_version = {SCHEMA_VERSION}"), ())
831                .await?;
832            Ok(())
833        })
834    }
835
836    pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
837        self.insert_json("messages", MESSAGES, |id| MessageRow {
838            id,
839            ts_utc: ts(record.ts_utc),
840            source: record.source.to_string(),
841            direction: record.direction.as_str().to_string(),
842            chat_id: record.chat_id.map(str::to_string),
843            from_agent: record.from_agent.map(str::to_string),
844            to_agent: record.to_agent.map(str::to_string),
845            body: record.body.map(str::to_string),
846            raw_json: record.raw_json.map(str::to_string),
847        })
848    }
849
850    pub fn record_cli(
851        &self,
852        ts_utc: DateTime<Utc>,
853        bin: &str,
854        argv_json: &str,
855        exit_code: Option<i64>,
856        duration_ms: Option<i64>,
857        host: &str,
858    ) -> Result<i64> {
859        self.insert_json("cli_invocations", CLI_INVOCATIONS, |id| CliRow {
860            id,
861            ts_utc: ts(ts_utc),
862            bin: bin.to_string(),
863            argv_json: argv_json.to_string(),
864            exit_code,
865            duration_ms,
866            host: host.to_string(),
867        })
868    }
869
870    pub fn record_crash(
871        &self,
872        ts_utc: DateTime<Utc>,
873        kind: &str,
874        agent: &str,
875        detail_json: &str,
876    ) -> Result<i64> {
877        self.insert_json("crashes", CRASHES, |id| CrashRow {
878            id,
879            ts_utc: ts(ts_utc),
880            kind: kind.to_string(),
881            agent: agent.to_string(),
882            detail_json: detail_json.to_string(),
883        })
884    }
885
886    pub fn record_tick(
887        &self,
888        ts_utc: DateTime<Utc>,
889        source: &str,
890        detail_json: &str,
891    ) -> Result<i64> {
892        self.insert_json("ticks", TICKS, |id| TickRow {
893            id,
894            ts_utc: ts(ts_utc),
895            source: source.to_string(),
896            detail_json: detail_json.to_string(),
897        })
898    }
899
900    pub fn record_workspace(
901        &self,
902        ts_utc_created: DateTime<Utc>,
903        name: &str,
904        branch: &str,
905        ts_utc_deleted: Option<DateTime<Utc>>,
906        verdict: Option<&str>,
907    ) -> Result<i64> {
908        self.insert_json("workspaces", WORKSPACES, |id| WorkspaceRow {
909            id,
910            ts_utc_created: ts(ts_utc_created),
911            name: name.to_string(),
912            branch: branch.to_string(),
913            ts_utc_deleted: ts_utc_deleted.map(ts),
914            verdict: verdict.map(str::to_string),
915        })
916    }
917
918    pub fn record_session(
919        &self,
920        ts_utc: DateTime<Utc>,
921        agent: &str,
922        session_num: i64,
923        event: SessionEvent,
924    ) -> Result<i64> {
925        self.insert_json("sessions", SESSIONS, |id| SessionRow {
926            id,
927            ts_utc: ts(ts_utc),
928            agent: agent.to_string(),
929            session_num,
930            event: event.as_str().to_string(),
931        })
932    }
933
934    pub fn record_clone_dispatch(&self, record: CloneDispatchRecord<'_>) -> Result<i64> {
935        self.insert_json("clone_dispatches", CLONE_DISPATCHES, |id| {
936            CloneDispatchRow {
937                id,
938                ts_utc_start: ts(record.ts_utc_start),
939                ts_utc_end: record.ts_utc_end.map(ts),
940                agent_id: record.agent_id.to_string(),
941                runtime: record.runtime.map(str::to_string),
942                brief_path: record.brief_path.map(str::to_string),
943                brief: truncate_opt(record.brief, 16_384),
944                workspace: record.workspace.map(str::to_string),
945                branch: record.branch.map(str::to_string),
946                status: record.status.map(str::to_string),
947                exit_code: record.exit_code,
948                detail_json: record.detail_json.map(str::to_string),
949            }
950        })
951    }
952
953    pub fn record_harvest_event(&self, record: HarvestEventRecord<'_>) -> Result<i64> {
954        self.insert_json("harvest_events", HARVEST_EVENTS, |id| HarvestEventRow {
955            id,
956            ts_utc: ts(record.ts_utc),
957            source_branch: record.source_branch.to_string(),
958            target_branch: record.target_branch.to_string(),
959            commit_sha: record.commit_sha.map(str::to_string),
960            status: record.status.to_string(),
961            conflicts: record.conflicts.map(str::to_string),
962            detail_json: record.detail_json.map(str::to_string),
963        })
964    }
965
966    pub fn record_communication_event(&self, record: CommunicationEventRecord<'_>) -> Result<i64> {
967        self.insert_json("communication_events", COMMUNICATION_EVENTS, |id| {
968            CommunicationEventRow {
969                id,
970                ts_utc: ts(record.ts_utc),
971                source: record.source.to_string(),
972                tool: record.tool.map(str::to_string),
973                direction: record.direction.as_str().to_string(),
974                chat_id: record.chat_id.map(str::to_string),
975                message_id: record.message_id.map(str::to_string),
976                handle: record.handle.map(str::to_string),
977                agent: record.agent.map(str::to_string),
978                body: truncate_opt(record.body, 4096),
979                status: record.status.map(str::to_string),
980                detail_json: record.detail_json.map(str::to_string),
981            }
982        })
983    }
984
985    pub fn list_communication_events(&self) -> Result<Vec<CommunicationEvent>> {
986        let rows: Vec<CommunicationEventRow> = self
987            .with_read_only_conn(|conn| async move { rows(&conn, COMMUNICATION_EVENTS).await })?;
988        rows.into_iter().map(CommunicationEvent::try_from).collect()
989    }
990
991    pub fn record_mcp_tool_call(&self, record: McpToolCallRecord<'_>) -> Result<i64> {
992        self.insert_json("mcp_tool_calls", MCP_TOOL_CALLS, |id| McpToolCallRow {
993            id,
994            ts_utc_start: ts(record.ts_utc_start),
995            ts_utc_end: record.ts_utc_end.map(ts),
996            source: record.source.to_string(),
997            tool: record.tool.to_string(),
998            agent: record.agent.map(str::to_string),
999            duration_ms: record.duration_ms,
1000            success: record.success,
1001            error: truncate_opt(record.error, 2048),
1002            timeout_race: record.timeout_race,
1003            request_json: truncate_opt(record.request_json, 8192),
1004            response_json: truncate_opt(record.response_json, 8192),
1005        })
1006    }
1007
1008    pub fn record_git_operation(&self, record: GitOperationRecord<'_>) -> Result<i64> {
1009        self.insert_json("git_operations", GIT_OPERATIONS, |id| GitOperationRow {
1010            id,
1011            ts_utc: ts(record.ts_utc),
1012            operation: record.operation.to_string(),
1013            repo: record.repo.to_string(),
1014            branch: record.branch.map(str::to_string),
1015            remote: record.remote.map(str::to_string),
1016            from_sha: record.from_sha.map(str::to_string),
1017            to_sha: record.to_sha.map(str::to_string),
1018            status: record.status.to_string(),
1019            detail_json: record.detail_json.map(str::to_string),
1020        })
1021    }
1022
1023    pub fn record_owner_directive(&self, record: OwnerDirectiveRecord<'_>) -> Result<i64> {
1024        self.insert_json("owner_directives", OWNER_DIRECTIVES, |id| {
1025            OwnerDirectiveRow {
1026                id,
1027                ts_utc: ts(record.ts_utc),
1028                source: record.source.to_string(),
1029                chat_id: record.chat_id.map(str::to_string),
1030                raw_text: truncate(record.raw_text, 16_384),
1031                resolved_action: record.resolved_action.map(str::to_string),
1032                agent: record.agent.map(str::to_string),
1033                status: record.status.map(str::to_string),
1034                detail_json: record.detail_json.map(str::to_string),
1035            }
1036        })
1037    }
1038
1039    pub fn record_token_usage(&self, record: TokenUsageRecord<'_>) -> Result<i64> {
1040        self.insert_json("token_usage", TOKEN_USAGE, |id| TokenUsageRow {
1041            id,
1042            ts_utc: ts(record.ts_utc),
1043            session_id: record.session_id.map(str::to_string),
1044            agent: record.agent.map(str::to_string),
1045            runtime: record.runtime.map(str::to_string),
1046            model: record.model.map(str::to_string),
1047            input_tokens: record.input_tokens,
1048            output_tokens: record.output_tokens,
1049            cached_input_tokens: record.cached_input_tokens,
1050            cost_usd_micros: record.cost_usd_micros,
1051            detail_json: record.detail_json.map(str::to_string),
1052        })
1053    }
1054
1055    /// Batch-insert token usage rows. One turso connection, one transaction,
1056    /// one bulk id allocation. Use this from ingest paths that emit
1057    /// thousands of rows; per-row [`record_token_usage`] opens a fresh
1058    /// turso connection per call which is dominated by setup cost.
1059    ///
1060    /// Returns the number of rows written. Empty input is a no-op.
1061    pub fn record_token_usage_batch<'a, I>(&self, records: I) -> Result<usize>
1062    where
1063        I: IntoIterator<Item = TokenUsageRecord<'a>>,
1064    {
1065        let raw: Vec<TokenUsageRecord<'_>> = records.into_iter().collect();
1066        if raw.is_empty() {
1067            return Ok(0);
1068        }
1069        let count = raw.len();
1070        let start_id = self.bulk_alloc_ids(TOKEN_USAGE, count as i64)?;
1071        let mut payloads: Vec<(i64, String)> = Vec::with_capacity(count);
1072        let mut owned_rows: Vec<TokenUsageRow> = Vec::with_capacity(count);
1073        for (offset, record) in raw.into_iter().enumerate() {
1074            let id = start_id + offset as i64;
1075            let row = TokenUsageRow {
1076                id,
1077                ts_utc: ts(record.ts_utc),
1078                session_id: record.session_id.map(str::to_string),
1079                agent: record.agent.map(str::to_string),
1080                runtime: record.runtime.map(str::to_string),
1081                model: record.model.map(str::to_string),
1082                input_tokens: record.input_tokens,
1083                output_tokens: record.output_tokens,
1084                cached_input_tokens: record.cached_input_tokens,
1085                cost_usd_micros: record.cost_usd_micros,
1086                detail_json: record.detail_json.map(str::to_string),
1087            };
1088            payloads.push((id, serde_json::to_string(&row)?));
1089            owned_rows.push(row);
1090        }
1091        let table = TOKEN_USAGE;
1092        let payloads_for_write = payloads.clone();
1093        let write = self.with_conn(move |conn| async move {
1094            let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
1095            conn.execute("BEGIN", ()).await?;
1096            for (id, json) in payloads_for_write {
1097                if let Err(error) = conn
1098                    .execute(&sql, params_from_iter([Value::from(id), Value::from(json)]))
1099                    .await
1100                {
1101                    let _ = conn.execute("ROLLBACK", ()).await;
1102                    return Err(error.into());
1103                }
1104            }
1105            conn.execute("COMMIT", ()).await?;
1106            Ok(())
1107        });
1108        if let Err(error) = write {
1109            for row in &owned_rows {
1110                spool_write_error(table, row.id, row, &error)?;
1111            }
1112            return Err(error);
1113        }
1114        Ok(count)
1115    }
1116
1117    /// Reserve `count` consecutive ids for `table` in a single round-trip.
1118    /// Returns the first id of the reserved range; ids span `[start,
1119    /// start + count)`. On failure, falls back to a per-row id sequence
1120    /// derived from the wall clock so the caller can still spool rows.
1121    fn bulk_alloc_ids(&self, name: &'static str, count: i64) -> Result<i64> {
1122        if count <= 0 {
1123            return Ok(0);
1124        }
1125        let result = self.with_conn(move |conn| async move {
1126            let mut rows = conn
1127                .query(
1128                    "INSERT INTO ids (name, id) VALUES (?1, ?2) \
1129                     ON CONFLICT(name) DO UPDATE SET id = id + ?2 \
1130                     RETURNING id",
1131                    params_from_iter([Value::from(name), Value::from(count)]),
1132                )
1133                .await?;
1134            if let Some(row) = rows.next().await? {
1135                let last = integer_value(&row.get_value(0)?)?;
1136                return Ok(last - count + 1);
1137            }
1138            Err(Error::NotFound(format!("bulk id allocation for {name}")))
1139        });
1140        match result {
1141            Ok(start) => Ok(start),
1142            Err(error) => {
1143                let fallback = Utc::now().timestamp_micros();
1144                spool_error_json(serde_json::json!({
1145                    "ts_utc": ts(Utc::now()),
1146                    "table": name,
1147                    "id": fallback,
1148                    "count": count,
1149                    "error": error.to_string(),
1150                    "record": {"kind": "bulk_id_allocation"}
1151                }))?;
1152                Ok(fallback)
1153            }
1154        }
1155    }
1156
1157    pub fn record_source_error(&self, record: SourceErrorRecord<'_>) -> Result<i64> {
1158        self.insert_json("source_errors", SOURCE_ERRORS, |id| SourceErrorRow {
1159            id,
1160            ts_utc: ts(record.ts_utc),
1161            source: record.source.to_string(),
1162            error_class: record.error_class.as_str().to_string(),
1163            count: record.count.max(1),
1164            detail_json: record.detail_json.map(str::to_string),
1165        })
1166    }
1167
1168    pub fn record_iroh_event(&self, record: IrohEventRecord<'_>) -> Result<i64> {
1169        self.insert_json("iroh_events", IROH_EVENTS, |id| IrohEventRow {
1170            id,
1171            ts_utc: ts(record.ts_utc),
1172            event_type: record.event_type.as_str().to_string(),
1173            peer_id_hash: record.peer_id_hash.to_string(),
1174            peer_label: record.peer_label.map(str::to_string),
1175            detail_json: record.detail_json.map(str::to_string),
1176        })
1177    }
1178
1179    pub fn record_watchdog_event(&self, record: WatchdogEventRecord<'_>) -> Result<i64> {
1180        self.insert_json("watchdog_events", WATCHDOG_EVENTS, |id| WatchdogEventRow {
1181            id,
1182            ts_utc: ts(record.ts_utc),
1183            event: record.event.to_string(),
1184            agent: record.agent.map(str::to_string),
1185            severity: record.severity.map(str::to_string),
1186            status: record.status.map(str::to_string),
1187            detail_json: record.detail_json.map(str::to_string),
1188        })
1189    }
1190
1191    /// Read the cursor recorded for this source. `None` means no cursor
1192    /// has been written yet (first-run).
1193    pub fn read_source_cursor(&self, source: &str) -> Result<Option<String>> {
1194        let source = source.to_string();
1195        self.with_conn(move |conn| async move {
1196            let mut rows = conn
1197                .query(
1198                    "SELECT cursor_value FROM source_cursors WHERE source = ?1",
1199                    params_from_iter([Value::from(source)]),
1200                )
1201                .await?;
1202            let Some(row) = rows.next().await? else {
1203                return Ok(None);
1204            };
1205            Ok(Some(text_value(&row.get_value(0)?)?))
1206        })
1207    }
1208
1209    /// Write (upsert) the cursor for this source. `updated_at` is set
1210    /// to the current UTC instant.
1211    pub fn update_source_cursor(&self, source: &str, value: &str) -> Result<()> {
1212        let source = source.to_string();
1213        let value = value.to_string();
1214        let now = ts(Utc::now());
1215        self.with_conn(move |conn| async move {
1216            conn.execute(
1217                "INSERT INTO source_cursors (source, cursor_value, updated_at) \
1218                 VALUES (?1, ?2, ?3) \
1219                 ON CONFLICT(source) DO UPDATE SET \
1220                     cursor_value = excluded.cursor_value, \
1221                     updated_at = excluded.updated_at",
1222                params_from_iter([Value::from(source), Value::from(value), Value::from(now)]),
1223            )
1224            .await?;
1225            Ok(())
1226        })
1227    }
1228
1229    /// Delete the cursor row for this source. Idempotent; returns
1230    /// whether a row was removed.
1231    pub fn reset_source_cursor(&self, source: &str) -> Result<bool> {
1232        let source = source.to_string();
1233        self.with_conn(move |conn| async move {
1234            let mut rows = conn
1235                .query(
1236                    "DELETE FROM source_cursors WHERE source = ?1 RETURNING source",
1237                    params_from_iter([Value::from(source)]),
1238                )
1239                .await?;
1240            Ok(rows.next().await?.is_some())
1241        })
1242    }
1243
1244    pub fn list_source_cursors(&self) -> Result<Vec<SourceCursorRow>> {
1245        self.with_read_only_conn(|conn| async move {
1246            let mut rows = conn
1247                .query(
1248                    "SELECT source, cursor_value, updated_at FROM source_cursors ORDER BY source",
1249                    (),
1250                )
1251                .await?;
1252            let mut out = Vec::new();
1253            while let Some(row) = rows.next().await? {
1254                out.push(SourceCursorRow {
1255                    source: text_value(&row.get_value(0)?)?,
1256                    cursor_value: text_value(&row.get_value(1)?)?,
1257                    updated_at: text_value(&row.get_value(2)?)?,
1258                });
1259            }
1260            Ok(out)
1261        })
1262    }
1263
1264    /// Insert an event row with `delivery_status = 'pending'`. Returns
1265    /// the allocated id so the caller can update the status after the
1266    /// delivery adapter runs.
1267    pub fn insert_event(
1268        &self,
1269        source: &str,
1270        ts_utc: DateTime<Utc>,
1271        payload_json: &str,
1272    ) -> Result<i64> {
1273        let source = source.to_string();
1274        let ts_str = ts(ts_utc);
1275        let payload = payload_json.to_string();
1276        self.with_conn(move |conn| async move {
1277            let mut rows = conn
1278                .query(
1279                    "INSERT INTO events (source, ts_utc, payload_json, delivery_status) \
1280                     VALUES (?1, ?2, ?3, 'pending') RETURNING id",
1281                    params_from_iter([
1282                        Value::from(source),
1283                        Value::from(ts_str),
1284                        Value::from(payload),
1285                    ]),
1286                )
1287                .await?;
1288            let Some(row) = rows.next().await? else {
1289                return Err(Error::NotFound("event id after insert".into()));
1290            };
1291            integer_value(&row.get_value(0)?)
1292        })
1293    }
1294
1295    /// Transition an event to `delivered` or `failed`. `reason` is the
1296    /// failure text for `failed`; ignored (but recorded) for `delivered`.
1297    pub fn update_event_delivery(
1298        &self,
1299        id: i64,
1300        status: EventStatus,
1301        reason: Option<&str>,
1302    ) -> Result<()> {
1303        let reason_value = reason.map(str::to_string);
1304        let status_str = status.as_str().to_string();
1305        self.with_conn(move |conn| async move {
1306            conn.execute(
1307                "UPDATE events SET delivery_status = ?1, reason = ?2 WHERE id = ?3",
1308                params_from_iter([
1309                    Value::from(status_str),
1310                    match reason_value {
1311                        Some(r) => Value::from(r),
1312                        None => Value::Null,
1313                    },
1314                    Value::from(id),
1315                ]),
1316            )
1317            .await?;
1318            Ok(())
1319        })
1320    }
1321
1322    /// Tail the most recent events for a source (newest first).
1323    pub fn tail_events(&self, source: &str, limit: i64) -> Result<Vec<EventRow>> {
1324        let source = source.to_string();
1325        self.with_read_only_conn(move |conn| async move {
1326            let mut rows = conn
1327                .query(
1328                    "SELECT id, source, ts_utc, payload_json, delivery_status, reason \
1329                     FROM events WHERE source = ?1 ORDER BY id DESC LIMIT ?2",
1330                    params_from_iter([Value::from(source), Value::from(limit)]),
1331                )
1332                .await?;
1333            let mut out = Vec::new();
1334            while let Some(row) = rows.next().await? {
1335                let reason = match row.get_value(5)? {
1336                    Value::Null => None,
1337                    other => Some(
1338                        other
1339                            .as_text()
1340                            .cloned()
1341                            .ok_or_else(|| Error::NotFound("event reason".into()))?,
1342                    ),
1343                };
1344                out.push(EventRow {
1345                    id: integer_value(&row.get_value(0)?)?,
1346                    source: text_value(&row.get_value(1)?)?,
1347                    ts_utc: text_value(&row.get_value(2)?)?,
1348                    payload_json: text_value(&row.get_value(3)?)?,
1349                    delivery_status: text_value(&row.get_value(4)?)?,
1350                    reason,
1351                });
1352            }
1353            Ok(out)
1354        })
1355    }
1356
1357    pub fn record_task(&self, record: TaskRecord<'_>) -> Result<i64> {
1358        let now = ts(Utc::now());
1359        self.insert_json("netsky_tasks", TASKS, |id| TaskRow {
1360            id,
1361            created_at: now.clone(),
1362            updated_at: now.clone(),
1363            title: record.title.to_string(),
1364            body: record.body.map(str::to_string),
1365            status: record.status.to_string(),
1366            priority: record.priority.map(str::to_string),
1367            labels: record.labels.map(str::to_string),
1368            source: record.source.map(str::to_string),
1369            source_ref: record.source_ref.map(str::to_string),
1370            closed_at: record.closed_at.map(|value| {
1371                if value.is_empty() {
1372                    now.clone()
1373                } else {
1374                    value.to_string()
1375                }
1376            }),
1377            closed_reason: record.closed_reason.map(str::to_string),
1378            closed_evidence: record.closed_evidence.map(str::to_string),
1379            agent: record.agent.map(str::to_string),
1380            sync_calendar: record.sync_calendar,
1381            calendar_task_id: record.calendar_task_id.map(str::to_string),
1382        })
1383    }
1384
1385    pub fn update_task(&self, update: TaskUpdate<'_>) -> Result<TaskRow> {
1386        let mut row = self
1387            .get_task(update.id)?
1388            .ok_or_else(|| Error::NotFound(format!("task {}", update.id)))?;
1389        let now = ts(Utc::now());
1390        if let Some(status) = update.status {
1391            row.status = status.to_string();
1392            if status == "closed" && row.closed_at.is_none() {
1393                row.closed_at = Some(now.clone());
1394            }
1395        }
1396        if let Some(priority) = update.priority {
1397            row.priority = Some(priority.to_string());
1398        }
1399        if let Some(agent) = update.agent {
1400            row.agent = Some(agent.to_string());
1401        }
1402        if let Some(reason) = update.closed_reason {
1403            row.closed_reason = Some(reason.to_string());
1404        }
1405        if let Some(evidence) = update.closed_evidence {
1406            row.closed_evidence = Some(evidence.to_string());
1407        }
1408        if let Some(sync_calendar) = update.sync_calendar {
1409            row.sync_calendar = sync_calendar;
1410        }
1411        if let Some(calendar_task_id) = update.calendar_task_id {
1412            row.calendar_task_id = Some(calendar_task_id.to_string());
1413        }
1414        row.updated_at = now;
1415        let row_json = serde_json::to_string(&row)?;
1416        let id = update.id;
1417        let write = self.with_conn(move |conn| async move {
1418            conn.execute(
1419                "UPDATE netsky_tasks SET row_json = ?1 WHERE id = ?2",
1420                params_from_iter([Value::from(row_json), Value::from(id)]),
1421            )
1422            .await?;
1423            Ok(())
1424        });
1425        if let Err(error) = write {
1426            spool_write_error("netsky_tasks", update.id, &row, &error)?;
1427        }
1428        Ok(row)
1429    }
1430
1431    pub fn get_task(&self, id: i64) -> Result<Option<TaskRow>> {
1432        self.with_conn(move |conn| async move {
1433            let mut rows = conn
1434                .query(
1435                    "SELECT row_json FROM netsky_tasks WHERE id = ?1",
1436                    params_from_iter([Value::from(id)]),
1437                )
1438                .await?;
1439            let Some(row) = rows.next().await? else {
1440                return Ok(None);
1441            };
1442            let json = text_value(&row.get_value(0)?)?;
1443            Ok(Some(serde_json::from_str(&json)?))
1444        })
1445    }
1446
1447    pub fn list_tasks(&self, status: Option<&str>, priority: Option<&str>) -> Result<Vec<TaskRow>> {
1448        let status = status.map(str::to_string);
1449        let priority = priority.map(str::to_string);
1450        self.with_conn(move |conn| async move {
1451            let status_val = status.map(Value::from).unwrap_or(Value::Null);
1452            let priority_val = priority.map(Value::from).unwrap_or(Value::Null);
1453            let mut rows = conn
1454                .query(
1455                    "SELECT row_json FROM netsky_tasks \
1456                     WHERE (?1 IS NULL OR json_extract(row_json, '$.status') = ?1) \
1457                       AND (?2 IS NULL OR json_extract(row_json, '$.priority') = ?2) \
1458                     ORDER BY json_extract(row_json, '$.status'), \
1459                              json_extract(row_json, '$.priority'), \
1460                              id",
1461                    params_from_iter([status_val, priority_val]),
1462                )
1463                .await?;
1464            let mut out = Vec::new();
1465            while let Some(row) = rows.next().await? {
1466                let json = text_value(&row.get_value(0)?)?;
1467                out.push(serde_json::from_str(&json)?);
1468            }
1469            Ok(out)
1470        })
1471    }
1472
1473    pub fn query(&self, sql: &str) -> Result<String> {
1474        let batches = self.query_batches(sql)?;
1475        Ok(pretty_format_batches(&batches)?.to_string())
1476    }
1477
1478    pub fn query_batches(&self, sql: &str) -> Result<Vec<RecordBatch>> {
1479        let scope = referenced_storage_tables(sql);
1480        let snapshot = self.snapshot_scoped(scope.as_ref())?;
1481        let runtime = tokio::runtime::Runtime::new()?;
1482        runtime.block_on(async move {
1483            let ctx = register_snapshot(snapshot).await?;
1484            let df = ctx.sql(sql).await?;
1485            Ok(df.collect().await?)
1486        })
1487    }
1488
1489    pub fn schema_version(&self) -> Result<i64> {
1490        self.with_conn(|conn| async move { schema_version(&conn).await })
1491    }
1492
1493    fn insert_json<T, F>(&self, name: &'static str, table: &'static str, row: F) -> Result<i64>
1494    where
1495        T: Serialize,
1496        F: FnOnce(i64) -> T,
1497    {
1498        let id = self.next_id(name)?;
1499        let row = row(id);
1500        let row_json = serde_json::to_string(&row)?;
1501        let write = self.with_conn(move |conn| async move {
1502            let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
1503            conn.execute(
1504                &sql,
1505                params_from_iter([Value::from(id), Value::from(row_json)]),
1506            )
1507            .await?;
1508            Ok(())
1509        });
1510        if let Err(error) = write {
1511            spool_write_error(table, id, &row, &error)?;
1512        }
1513        Ok(id)
1514    }
1515
1516    /// Snapshot only the storage tables in `scope`; unreferenced tables
1517    /// return empty `Vec`s and their DataFusion registrations become empty
1518    /// MemTables. `None` loads every table (back-compat for callers without
1519    /// a parsed SQL scope).
1520    fn snapshot_scoped(&self, scope: Option<&HashSet<&'static str>>) -> Result<Snapshot> {
1521        let owned: Option<HashSet<&'static str>> = scope.cloned();
1522        self.with_read_only_conn(move |conn| async move {
1523            let wants = |t: &'static str| owned.as_ref().is_none_or(|s| s.contains(t));
1524            Ok(Snapshot {
1525                messages: if wants(MESSAGES) {
1526                    rows(&conn, MESSAGES).await?
1527                } else {
1528                    Vec::new()
1529                },
1530                cli_invocations: if wants(CLI_INVOCATIONS) {
1531                    rows(&conn, CLI_INVOCATIONS).await?
1532                } else {
1533                    Vec::new()
1534                },
1535                crashes: if wants(CRASHES) {
1536                    rows(&conn, CRASHES).await?
1537                } else {
1538                    Vec::new()
1539                },
1540                ticks: if wants(TICKS) {
1541                    rows(&conn, TICKS).await?
1542                } else {
1543                    Vec::new()
1544                },
1545                workspaces: if wants(WORKSPACES) {
1546                    rows(&conn, WORKSPACES).await?
1547                } else {
1548                    Vec::new()
1549                },
1550                sessions: if wants(SESSIONS) {
1551                    rows(&conn, SESSIONS).await?
1552                } else {
1553                    Vec::new()
1554                },
1555                clone_dispatches: if wants(CLONE_DISPATCHES) {
1556                    rows(&conn, CLONE_DISPATCHES).await?
1557                } else {
1558                    Vec::new()
1559                },
1560                harvest_events: if wants(HARVEST_EVENTS) {
1561                    rows(&conn, HARVEST_EVENTS).await?
1562                } else {
1563                    Vec::new()
1564                },
1565                communication_events: if wants(COMMUNICATION_EVENTS) {
1566                    rows(&conn, COMMUNICATION_EVENTS).await?
1567                } else {
1568                    Vec::new()
1569                },
1570                mcp_tool_calls: if wants(MCP_TOOL_CALLS) {
1571                    rows(&conn, MCP_TOOL_CALLS).await?
1572                } else {
1573                    Vec::new()
1574                },
1575                git_operations: if wants(GIT_OPERATIONS) {
1576                    rows(&conn, GIT_OPERATIONS).await?
1577                } else {
1578                    Vec::new()
1579                },
1580                owner_directives: if wants(OWNER_DIRECTIVES) {
1581                    rows(&conn, OWNER_DIRECTIVES).await?
1582                } else {
1583                    Vec::new()
1584                },
1585                token_usage: if wants(TOKEN_USAGE) {
1586                    rows(&conn, TOKEN_USAGE).await?
1587                } else {
1588                    Vec::new()
1589                },
1590                watchdog_events: if wants(WATCHDOG_EVENTS) {
1591                    rows(&conn, WATCHDOG_EVENTS).await?
1592                } else {
1593                    Vec::new()
1594                },
1595                tasks: if wants(TASKS) {
1596                    rows(&conn, TASKS).await?
1597                } else {
1598                    Vec::new()
1599                },
1600                source_errors: if wants(SOURCE_ERRORS) {
1601                    rows(&conn, SOURCE_ERRORS).await?
1602                } else {
1603                    Vec::new()
1604                },
1605                iroh_events: if wants(IROH_EVENTS) {
1606                    rows(&conn, IROH_EVENTS).await?
1607                } else {
1608                    Vec::new()
1609                },
1610            })
1611        })
1612    }
1613
1614    fn next_id(&self, name: &'static str) -> Result<i64> {
1615        let result = self.with_conn(move |conn| async move {
1616            let mut rows = conn
1617                .query(
1618                    "INSERT INTO ids (name, id) VALUES (?1, 1) \
1619                     ON CONFLICT(name) DO UPDATE SET id = id + 1 \
1620                     RETURNING id",
1621                    params_from_iter([Value::from(name)]),
1622                )
1623                .await?;
1624            if let Some(row) = rows.next().await? {
1625                return integer_value(&row.get_value(0)?);
1626            }
1627            Err(Error::NotFound(format!("id allocation for {name}")))
1628        });
1629        match result {
1630            Ok(id) => Ok(id),
1631            Err(error) => {
1632                let fallback = Utc::now().timestamp_micros();
1633                spool_error_json(serde_json::json!({
1634                    "ts_utc": ts(Utc::now()),
1635                    "table": name,
1636                    "id": fallback,
1637                    "error": error.to_string(),
1638                    "record": {"kind": "id_allocation"}
1639                }))?;
1640                Ok(fallback)
1641            }
1642        }
1643    }
1644
1645    fn with_conn<T, F, Fut>(&self, f: F) -> Result<T>
1646    where
1647        T: Send + 'static,
1648        F: FnOnce(Connection) -> Fut + Send + 'static,
1649        Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1650    {
1651        let path = self.path.clone();
1652        let max_attempts = if tokio::runtime::Handle::try_current().is_ok() {
1653            3
1654        } else {
1655            10
1656        };
1657        let task = async move {
1658            let path = path.to_string_lossy().to_string();
1659            let mut last_error = None;
1660            for attempt in 0..=max_attempts {
1661                match Builder::new_local(&path).build().await {
1662                    Ok(db) => {
1663                        let conn = db.connect()?;
1664                        configure_conn(&conn).await?;
1665                        return f(conn).await;
1666                    }
1667                    Err(error) if is_locking_error(&error) && attempt < max_attempts => {
1668                        last_error = Some(error);
1669                        tokio::time::sleep(Duration::from_millis(10 + attempt * 2)).await;
1670                    }
1671                    Err(error) => return Err(error.into()),
1672                }
1673            }
1674            Err(last_error
1675                .map(Error::from)
1676                .unwrap_or_else(|| Error::NotFound("turso connection".to_string())))
1677        };
1678        block_on_turso(task)
1679    }
1680
1681    fn with_read_only_conn<T, F, Fut>(&self, f: F) -> Result<T>
1682    where
1683        T: Send + 'static,
1684        F: FnOnce(Connection) -> Fut + Send + 'static,
1685        Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1686    {
1687        let path = self.path.clone();
1688        let task = async move {
1689            let path_str = path.to_string_lossy().to_string();
1690            let db = Builder::new_local(&path_str).build().await?;
1691            let conn = db.connect()?;
1692            configure_read_only_conn(&conn, &path).await?;
1693            f(conn).await
1694        };
1695        block_on_turso(task)
1696    }
1697
1698    #[cfg(test)]
1699    fn read_only_write_probe_fails(&self) -> Result<bool> {
1700        self.with_read_only_conn(|conn| async move {
1701            Ok(conn
1702                .execute(
1703                    "INSERT INTO meta (key, value) VALUES ('read_only_probe', 1)",
1704                    (),
1705                )
1706                .await
1707                .is_err())
1708        })
1709    }
1710}
1711
1712struct Snapshot {
1713    messages: Vec<MessageRow>,
1714    cli_invocations: Vec<CliRow>,
1715    crashes: Vec<CrashRow>,
1716    ticks: Vec<TickRow>,
1717    workspaces: Vec<WorkspaceRow>,
1718    sessions: Vec<SessionRow>,
1719    clone_dispatches: Vec<CloneDispatchRow>,
1720    harvest_events: Vec<HarvestEventRow>,
1721    communication_events: Vec<CommunicationEventRow>,
1722    mcp_tool_calls: Vec<McpToolCallRow>,
1723    git_operations: Vec<GitOperationRow>,
1724    owner_directives: Vec<OwnerDirectiveRow>,
1725    token_usage: Vec<TokenUsageRow>,
1726    watchdog_events: Vec<WatchdogEventRow>,
1727    tasks: Vec<TaskRow>,
1728    source_errors: Vec<SourceErrorRow>,
1729    iroh_events: Vec<IrohEventRow>,
1730}
1731
1732async fn configure_conn(conn: &Connection) -> Result<()> {
1733    conn.busy_timeout(Duration::from_secs(10))?;
1734    if user_version(conn).await? == SCHEMA_VERSION {
1735        return Ok(());
1736    }
1737    let mut wal = conn.query("PRAGMA journal_mode=WAL", ()).await?;
1738    while wal.next().await?.is_some() {}
1739    conn.execute_batch(
1740        "PRAGMA synchronous=NORMAL; \
1741         CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value INTEGER NOT NULL); \
1742         CREATE TABLE IF NOT EXISTS ids (name TEXT PRIMARY KEY, id INTEGER NOT NULL);",
1743    )
1744    .await?;
1745    Ok(())
1746}
1747
1748async fn configure_read_only_conn(conn: &Connection, path: &Path) -> Result<()> {
1749    conn.busy_timeout(Duration::from_secs(10))?;
1750    conn.execute("PRAGMA query_only = 1", ()).await?;
1751    if query_only(conn).await? != 1 {
1752        return Err(Error::ReadOnlyNotEnforced(path.to_path_buf()));
1753    }
1754    ensure_initialized(conn, path).await
1755}
1756
1757async fn ensure_initialized(conn: &Connection, path: &Path) -> Result<()> {
1758    if !table_exists(conn, "meta").await? {
1759        return Err(Error::NotInitialized(path.to_path_buf()));
1760    }
1761    let current = schema_version(conn).await?;
1762    if current == 0 {
1763        return Err(Error::NotInitialized(path.to_path_buf()));
1764    }
1765    if current > SCHEMA_VERSION {
1766        return Err(Error::FutureSchemaVersion {
1767            found: current,
1768            supported: SCHEMA_VERSION,
1769        });
1770    }
1771    Ok(())
1772}
1773
1774async fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
1775    let mut rows = conn
1776        .query(
1777            "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1",
1778            params_from_iter([Value::from(table)]),
1779        )
1780        .await?;
1781    Ok(rows.next().await?.is_some())
1782}
1783
1784async fn query_only(conn: &Connection) -> Result<i64> {
1785    let mut rows = conn.query("PRAGMA query_only", ()).await?;
1786    let Some(row) = rows.next().await? else {
1787        return Ok(0);
1788    };
1789    integer_value(&row.get_value(0)?)
1790}
1791
1792async fn user_version(conn: &Connection) -> Result<i64> {
1793    let mut rows = conn.query("PRAGMA user_version", ()).await?;
1794    let Some(row) = rows.next().await? else {
1795        return Ok(0);
1796    };
1797    integer_value(&row.get_value(0)?)
1798}
1799
1800async fn schema_version(conn: &Connection) -> Result<i64> {
1801    let mut rows = conn
1802        .query(
1803            "SELECT value FROM meta WHERE key = ?1",
1804            params_from_iter([Value::from("schema_version")]),
1805        )
1806        .await?;
1807    let Some(row) = rows.next().await? else {
1808        return Ok(0);
1809    };
1810    integer_value(&row.get_value(0)?)
1811}
1812
1813async fn rows<T>(conn: &Connection, table: &str) -> Result<Vec<T>>
1814where
1815    T: DeserializeOwned,
1816{
1817    let sql = format!("SELECT row_json FROM {table} ORDER BY id");
1818    let mut out = Vec::new();
1819    let mut rows = conn.query(&sql, ()).await?;
1820    while let Some(row) = rows.next().await? {
1821        let json = text_value(&row.get_value(0)?)?;
1822        out.push(serde_json::from_str(&json)?);
1823    }
1824    Ok(out)
1825}
1826
1827fn integer_value(value: &Value) -> Result<i64> {
1828    value
1829        .as_integer()
1830        .copied()
1831        .ok_or_else(|| Error::NotFound("integer value".to_string()))
1832}
1833
1834fn text_value(value: &Value) -> Result<String> {
1835    value
1836        .as_text()
1837        .cloned()
1838        .ok_or_else(|| Error::NotFound("text value".to_string()))
1839}
1840
1841fn block_on_turso<T, Fut>(future: Fut) -> Result<T>
1842where
1843    T: Send + 'static,
1844    Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1845{
1846    if tokio::runtime::Handle::try_current().is_ok() {
1847        std::thread::spawn(move || Runtime::new()?.block_on(future))
1848            .join()
1849            .unwrap_or_else(|_| Err(Error::NotFound("turso worker thread".to_string())))
1850    } else {
1851        Runtime::new()?.block_on(future)
1852    }
1853}
1854
1855fn is_locking_error(error: &turso::Error) -> bool {
1856    let message = error.to_string();
1857    message.contains("Locking error") || message.contains("locked")
1858}
1859
1860fn spool_write_error<T: Serialize>(table: &str, id: i64, record: &T, error: &Error) -> Result<()> {
1861    spool_error_json(serde_json::json!({
1862        "ts_utc": ts(Utc::now()),
1863        "table": table,
1864        "id": id,
1865        "error": error.to_string(),
1866        "record": record,
1867    }))
1868}
1869
1870fn spool_error_json(value: serde_json::Value) -> Result<()> {
1871    let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
1872    let dir = home.join(".netsky").join("logs");
1873    fs::create_dir_all(&dir)?;
1874    let path = dir.join(format!(
1875        "meta-db-errors-{}.jsonl",
1876        Utc::now().format("%Y-%m-%d")
1877    ));
1878    if let Err(spool_error) = netsky_core::jsonl::append_json_line(&path, &value) {
1879        write_spool_failure_marker(&value, &spool_error)?;
1880        return Err(Error::Io(spool_error));
1881    }
1882    Ok(())
1883}
1884
1885fn write_spool_failure_marker(
1886    value: &serde_json::Value,
1887    spool_error: &std::io::Error,
1888) -> Result<()> {
1889    let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
1890    let dir = home.join(".netsky").join("state");
1891    fs::create_dir_all(&dir)?;
1892    let path = dir.join("meta-db-spool-failed");
1893    let body = serde_json::json!({
1894        "ts_utc": ts(Utc::now()),
1895        "spool_error": spool_error.to_string(),
1896        "record": value,
1897    });
1898    fs::write(path, serde_json::to_vec_pretty(&body)?)?;
1899    Ok(())
1900}
1901
1902/// Scope the snapshot loader to storage tables the SQL actually references.
1903/// Conservative: returns `None` (load everything) when no known name
1904/// appears, so ad-hoc SQL still works. False positives only waste a
1905/// `json_extract` name check; false negatives produce empty tables, so
1906/// the scan deliberately over-includes.
1907fn referenced_storage_tables(sql: &str) -> Option<HashSet<&'static str>> {
1908    let lower = sql.to_ascii_lowercase();
1909    let mut out: HashSet<&'static str> = HashSet::new();
1910    for &(df_name, storage) in &DATAFUSION_TABLES {
1911        if contains_word(&lower, df_name) {
1912            out.insert(storage);
1913        }
1914    }
1915    if contains_word(&lower, CLONE_LIFETIMES_VIEW) {
1916        out.insert(CLONE_DISPATCHES);
1917    }
1918    if out.is_empty() { None } else { Some(out) }
1919}
1920
1921/// Word-boundary containment check: `needle` must be surrounded by
1922/// non-identifier chars (alphanumeric or `_`) on both sides, so
1923/// `SELECT tasks` matches but `netsky_tasks` does not match the search
1924/// for `tasks`.
1925fn contains_word(haystack: &str, needle: &str) -> bool {
1926    if needle.is_empty() || needle.len() > haystack.len() {
1927        return false;
1928    }
1929    let hb = haystack.as_bytes();
1930    let nb = needle.as_bytes();
1931    let mut cursor = 0;
1932    while cursor + nb.len() <= hb.len() {
1933        let rest = &haystack[cursor..];
1934        let Some(pos) = rest.find(needle) else { break };
1935        let start = cursor + pos;
1936        let end = start + nb.len();
1937        let before_ok = start == 0 || !is_ident_byte(hb[start - 1]);
1938        let after_ok = end == hb.len() || !is_ident_byte(hb[end]);
1939        if before_ok && after_ok {
1940            return true;
1941        }
1942        cursor = start + 1;
1943    }
1944    false
1945}
1946
1947fn is_ident_byte(b: u8) -> bool {
1948    b.is_ascii_alphanumeric() || b == b'_'
1949}
1950
1951fn register(ctx: &SessionContext, name: &str, batch: RecordBatch) -> Result<()> {
1952    let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
1953    ctx.register_table(name, Arc::new(table))?;
1954    Ok(())
1955}
1956
1957async fn register_snapshot(snapshot: Snapshot) -> Result<SessionContext> {
1958    let ctx = SessionContext::new();
1959    register(&ctx, "messages", messages_batch(snapshot.messages)?)?;
1960    register(
1961        &ctx,
1962        "cli_invocations",
1963        cli_batch(snapshot.cli_invocations)?,
1964    )?;
1965    register(&ctx, "crashes", crashes_batch(snapshot.crashes)?)?;
1966    register(&ctx, "ticks", ticks_batch(snapshot.ticks)?)?;
1967    register(&ctx, "workspaces", workspaces_batch(snapshot.workspaces)?)?;
1968    register(&ctx, "sessions", sessions_batch(snapshot.sessions)?)?;
1969    register(
1970        &ctx,
1971        "clone_dispatches",
1972        clone_dispatches_batch(snapshot.clone_dispatches)?,
1973    )?;
1974    register(
1975        &ctx,
1976        "harvest_events",
1977        harvest_events_batch(snapshot.harvest_events)?,
1978    )?;
1979    register(
1980        &ctx,
1981        "communication_events",
1982        communication_events_batch(snapshot.communication_events)?,
1983    )?;
1984    register(
1985        &ctx,
1986        "mcp_tool_calls",
1987        mcp_tool_calls_batch(snapshot.mcp_tool_calls)?,
1988    )?;
1989    register(
1990        &ctx,
1991        "git_operations",
1992        git_operations_batch(snapshot.git_operations)?,
1993    )?;
1994    register(
1995        &ctx,
1996        "owner_directives",
1997        owner_directives_batch(snapshot.owner_directives)?,
1998    )?;
1999    register(
2000        &ctx,
2001        "token_usage",
2002        token_usage_batch(snapshot.token_usage)?,
2003    )?;
2004    register(
2005        &ctx,
2006        "watchdog_events",
2007        watchdog_events_batch(snapshot.watchdog_events)?,
2008    )?;
2009    register(&ctx, "tasks", tasks_batch(snapshot.tasks)?)?;
2010    register(
2011        &ctx,
2012        "source_errors",
2013        source_errors_batch(snapshot.source_errors)?,
2014    )?;
2015    register(
2016        &ctx,
2017        "iroh_events",
2018        iroh_events_batch(snapshot.iroh_events)?,
2019    )?;
2020    ctx.sql(
2021        "CREATE VIEW clone_lifetimes AS \
2022         SELECT id, agent_id, runtime, workspace, branch, status, \
2023                ts_utc_start, ts_utc_end \
2024         FROM clone_dispatches \
2025         WHERE ts_utc_end IS NOT NULL",
2026    )
2027    .await?;
2028    Ok(ctx)
2029}
2030
2031fn messages_batch(rows: Vec<MessageRow>) -> Result<RecordBatch> {
2032    Ok(RecordBatch::try_new(
2033        schema(&[
2034            ("id", DataType::Int64, false),
2035            ("ts_utc", DataType::Utf8, false),
2036            ("source", DataType::Utf8, false),
2037            ("direction", DataType::Utf8, false),
2038            ("chat_id", DataType::Utf8, true),
2039            ("from_agent", DataType::Utf8, true),
2040            ("to_agent", DataType::Utf8, true),
2041            ("body", DataType::Utf8, true),
2042            ("raw_json", DataType::Utf8, true),
2043        ]),
2044        vec![
2045            ids(&rows, |r| r.id),
2046            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2047            string(rows.iter().map(|r| Some(r.source.as_str()))),
2048            string(rows.iter().map(|r| Some(r.direction.as_str()))),
2049            opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2050            opt_string(rows.iter().map(|r| r.from_agent.as_deref())),
2051            opt_string(rows.iter().map(|r| r.to_agent.as_deref())),
2052            opt_string(rows.iter().map(|r| r.body.as_deref())),
2053            opt_string(rows.iter().map(|r| r.raw_json.as_deref())),
2054        ],
2055    )?)
2056}
2057
2058fn cli_batch(rows: Vec<CliRow>) -> Result<RecordBatch> {
2059    Ok(RecordBatch::try_new(
2060        schema(&[
2061            ("id", DataType::Int64, false),
2062            ("ts_utc", DataType::Utf8, false),
2063            ("bin", DataType::Utf8, false),
2064            ("argv_json", DataType::Utf8, false),
2065            ("exit_code", DataType::Int64, true),
2066            ("duration_ms", DataType::Int64, true),
2067            ("host", DataType::Utf8, false),
2068        ]),
2069        vec![
2070            ids(&rows, |r| r.id),
2071            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2072            string(rows.iter().map(|r| Some(r.bin.as_str()))),
2073            string(rows.iter().map(|r| Some(r.argv_json.as_str()))),
2074            int64_opt(rows.iter().map(|r| r.exit_code)),
2075            int64_opt(rows.iter().map(|r| r.duration_ms)),
2076            string(rows.iter().map(|r| Some(r.host.as_str()))),
2077        ],
2078    )?)
2079}
2080
2081fn crashes_batch(rows: Vec<CrashRow>) -> Result<RecordBatch> {
2082    simple_event_batch(
2083        schema(&[
2084            ("id", DataType::Int64, false),
2085            ("ts_utc", DataType::Utf8, false),
2086            ("kind", DataType::Utf8, false),
2087            ("agent", DataType::Utf8, false),
2088            ("detail_json", DataType::Utf8, false),
2089        ]),
2090        rows.iter()
2091            .map(|r| (r.id, &r.ts_utc, &r.kind, &r.agent, &r.detail_json)),
2092    )
2093}
2094
2095fn ticks_batch(rows: Vec<TickRow>) -> Result<RecordBatch> {
2096    Ok(RecordBatch::try_new(
2097        schema(&[
2098            ("id", DataType::Int64, false),
2099            ("ts_utc", DataType::Utf8, false),
2100            ("source", DataType::Utf8, false),
2101            ("detail_json", DataType::Utf8, false),
2102        ]),
2103        vec![
2104            ids(&rows, |r| r.id),
2105            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2106            string(rows.iter().map(|r| Some(r.source.as_str()))),
2107            string(rows.iter().map(|r| Some(r.detail_json.as_str()))),
2108        ],
2109    )?)
2110}
2111
2112fn workspaces_batch(rows: Vec<WorkspaceRow>) -> Result<RecordBatch> {
2113    Ok(RecordBatch::try_new(
2114        schema(&[
2115            ("id", DataType::Int64, false),
2116            ("ts_utc_created", DataType::Utf8, false),
2117            ("name", DataType::Utf8, false),
2118            ("branch", DataType::Utf8, false),
2119            ("ts_utc_deleted", DataType::Utf8, true),
2120            ("verdict", DataType::Utf8, true),
2121        ]),
2122        vec![
2123            ids(&rows, |r| r.id),
2124            string(rows.iter().map(|r| Some(r.ts_utc_created.as_str()))),
2125            string(rows.iter().map(|r| Some(r.name.as_str()))),
2126            string(rows.iter().map(|r| Some(r.branch.as_str()))),
2127            opt_string(rows.iter().map(|r| r.ts_utc_deleted.as_deref())),
2128            opt_string(rows.iter().map(|r| r.verdict.as_deref())),
2129        ],
2130    )?)
2131}
2132
2133fn sessions_batch(rows: Vec<SessionRow>) -> Result<RecordBatch> {
2134    Ok(RecordBatch::try_new(
2135        schema(&[
2136            ("id", DataType::Int64, false),
2137            ("ts_utc", DataType::Utf8, false),
2138            ("agent", DataType::Utf8, false),
2139            ("session_num", DataType::Int64, false),
2140            ("event", DataType::Utf8, false),
2141        ]),
2142        vec![
2143            ids(&rows, |r| r.id),
2144            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2145            string(rows.iter().map(|r| Some(r.agent.as_str()))),
2146            int64(rows.iter().map(|r| r.session_num)),
2147            string(rows.iter().map(|r| Some(r.event.as_str()))),
2148        ],
2149    )?)
2150}
2151
2152fn clone_dispatches_batch(rows: Vec<CloneDispatchRow>) -> Result<RecordBatch> {
2153    Ok(RecordBatch::try_new(
2154        schema(&[
2155            ("id", DataType::Int64, false),
2156            ("ts_utc_start", DataType::Utf8, false),
2157            ("ts_utc_end", DataType::Utf8, true),
2158            ("agent_id", DataType::Utf8, false),
2159            ("runtime", DataType::Utf8, true),
2160            ("brief_path", DataType::Utf8, true),
2161            ("brief", DataType::Utf8, true),
2162            ("workspace", DataType::Utf8, true),
2163            ("branch", DataType::Utf8, true),
2164            ("status", DataType::Utf8, true),
2165            ("exit_code", DataType::Int64, true),
2166            ("detail_json", DataType::Utf8, true),
2167        ]),
2168        vec![
2169            ids(&rows, |r| r.id),
2170            string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2171            opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2172            string(rows.iter().map(|r| Some(r.agent_id.as_str()))),
2173            opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2174            opt_string(rows.iter().map(|r| r.brief_path.as_deref())),
2175            opt_string(rows.iter().map(|r| r.brief.as_deref())),
2176            opt_string(rows.iter().map(|r| r.workspace.as_deref())),
2177            opt_string(rows.iter().map(|r| r.branch.as_deref())),
2178            opt_string(rows.iter().map(|r| r.status.as_deref())),
2179            int64_opt(rows.iter().map(|r| r.exit_code)),
2180            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2181        ],
2182    )?)
2183}
2184
2185fn harvest_events_batch(rows: Vec<HarvestEventRow>) -> Result<RecordBatch> {
2186    Ok(RecordBatch::try_new(
2187        schema(&[
2188            ("id", DataType::Int64, false),
2189            ("ts_utc", DataType::Utf8, false),
2190            ("source_branch", DataType::Utf8, false),
2191            ("target_branch", DataType::Utf8, false),
2192            ("commit_sha", DataType::Utf8, true),
2193            ("status", DataType::Utf8, false),
2194            ("conflicts", DataType::Utf8, true),
2195            ("detail_json", DataType::Utf8, true),
2196        ]),
2197        vec![
2198            ids(&rows, |r| r.id),
2199            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2200            string(rows.iter().map(|r| Some(r.source_branch.as_str()))),
2201            string(rows.iter().map(|r| Some(r.target_branch.as_str()))),
2202            opt_string(rows.iter().map(|r| r.commit_sha.as_deref())),
2203            string(rows.iter().map(|r| Some(r.status.as_str()))),
2204            opt_string(rows.iter().map(|r| r.conflicts.as_deref())),
2205            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2206        ],
2207    )?)
2208}
2209
2210fn communication_events_batch(rows: Vec<CommunicationEventRow>) -> Result<RecordBatch> {
2211    Ok(RecordBatch::try_new(
2212        schema(&[
2213            ("id", DataType::Int64, false),
2214            ("ts_utc", DataType::Utf8, false),
2215            ("source", DataType::Utf8, false),
2216            ("tool", DataType::Utf8, true),
2217            ("direction", DataType::Utf8, false),
2218            ("chat_id", DataType::Utf8, true),
2219            ("message_id", DataType::Utf8, true),
2220            ("handle", DataType::Utf8, true),
2221            ("agent", DataType::Utf8, true),
2222            ("body", DataType::Utf8, true),
2223            ("status", DataType::Utf8, true),
2224            ("detail_json", DataType::Utf8, true),
2225        ]),
2226        vec![
2227            ids(&rows, |r| r.id),
2228            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2229            string(rows.iter().map(|r| Some(r.source.as_str()))),
2230            opt_string(rows.iter().map(|r| r.tool.as_deref())),
2231            string(rows.iter().map(|r| Some(r.direction.as_str()))),
2232            opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2233            opt_string(rows.iter().map(|r| r.message_id.as_deref())),
2234            opt_string(rows.iter().map(|r| r.handle.as_deref())),
2235            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2236            opt_string(rows.iter().map(|r| r.body.as_deref())),
2237            opt_string(rows.iter().map(|r| r.status.as_deref())),
2238            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2239        ],
2240    )?)
2241}
2242
2243fn mcp_tool_calls_batch(rows: Vec<McpToolCallRow>) -> Result<RecordBatch> {
2244    Ok(RecordBatch::try_new(
2245        schema(&[
2246            ("id", DataType::Int64, false),
2247            ("ts_utc_start", DataType::Utf8, false),
2248            ("ts_utc_end", DataType::Utf8, true),
2249            ("source", DataType::Utf8, false),
2250            ("tool", DataType::Utf8, false),
2251            ("agent", DataType::Utf8, true),
2252            ("duration_ms", DataType::Int64, true),
2253            ("success", DataType::Boolean, false),
2254            ("error", DataType::Utf8, true),
2255            ("timeout_race", DataType::Boolean, false),
2256            ("request_json", DataType::Utf8, true),
2257            ("response_json", DataType::Utf8, true),
2258        ]),
2259        vec![
2260            ids(&rows, |r| r.id),
2261            string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2262            opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2263            string(rows.iter().map(|r| Some(r.source.as_str()))),
2264            string(rows.iter().map(|r| Some(r.tool.as_str()))),
2265            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2266            int64_opt(rows.iter().map(|r| r.duration_ms)),
2267            bools(rows.iter().map(|r| r.success)),
2268            opt_string(rows.iter().map(|r| r.error.as_deref())),
2269            bools(rows.iter().map(|r| r.timeout_race)),
2270            opt_string(rows.iter().map(|r| r.request_json.as_deref())),
2271            opt_string(rows.iter().map(|r| r.response_json.as_deref())),
2272        ],
2273    )?)
2274}
2275
2276fn git_operations_batch(rows: Vec<GitOperationRow>) -> Result<RecordBatch> {
2277    Ok(RecordBatch::try_new(
2278        schema(&[
2279            ("id", DataType::Int64, false),
2280            ("ts_utc", DataType::Utf8, false),
2281            ("operation", DataType::Utf8, false),
2282            ("repo", DataType::Utf8, false),
2283            ("branch", DataType::Utf8, true),
2284            ("remote", DataType::Utf8, true),
2285            ("from_sha", DataType::Utf8, true),
2286            ("to_sha", DataType::Utf8, true),
2287            ("status", DataType::Utf8, false),
2288            ("detail_json", DataType::Utf8, true),
2289        ]),
2290        vec![
2291            ids(&rows, |r| r.id),
2292            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2293            string(rows.iter().map(|r| Some(r.operation.as_str()))),
2294            string(rows.iter().map(|r| Some(r.repo.as_str()))),
2295            opt_string(rows.iter().map(|r| r.branch.as_deref())),
2296            opt_string(rows.iter().map(|r| r.remote.as_deref())),
2297            opt_string(rows.iter().map(|r| r.from_sha.as_deref())),
2298            opt_string(rows.iter().map(|r| r.to_sha.as_deref())),
2299            string(rows.iter().map(|r| Some(r.status.as_str()))),
2300            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2301        ],
2302    )?)
2303}
2304
2305fn owner_directives_batch(rows: Vec<OwnerDirectiveRow>) -> Result<RecordBatch> {
2306    Ok(RecordBatch::try_new(
2307        schema(&[
2308            ("id", DataType::Int64, false),
2309            ("ts_utc", DataType::Utf8, false),
2310            ("source", DataType::Utf8, false),
2311            ("chat_id", DataType::Utf8, true),
2312            ("raw_text", DataType::Utf8, false),
2313            ("resolved_action", DataType::Utf8, true),
2314            ("agent", DataType::Utf8, true),
2315            ("status", DataType::Utf8, true),
2316            ("detail_json", DataType::Utf8, true),
2317        ]),
2318        vec![
2319            ids(&rows, |r| r.id),
2320            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2321            string(rows.iter().map(|r| Some(r.source.as_str()))),
2322            opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2323            string(rows.iter().map(|r| Some(r.raw_text.as_str()))),
2324            opt_string(rows.iter().map(|r| r.resolved_action.as_deref())),
2325            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2326            opt_string(rows.iter().map(|r| r.status.as_deref())),
2327            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2328        ],
2329    )?)
2330}
2331
2332fn token_usage_batch(rows: Vec<TokenUsageRow>) -> Result<RecordBatch> {
2333    Ok(RecordBatch::try_new(
2334        schema(&[
2335            ("id", DataType::Int64, false),
2336            ("ts_utc", DataType::Utf8, false),
2337            ("session_id", DataType::Utf8, true),
2338            ("agent", DataType::Utf8, true),
2339            ("runtime", DataType::Utf8, true),
2340            ("model", DataType::Utf8, true),
2341            ("input_tokens", DataType::Int64, true),
2342            ("output_tokens", DataType::Int64, true),
2343            ("cached_input_tokens", DataType::Int64, true),
2344            ("cost_usd_micros", DataType::Int64, true),
2345            ("detail_json", DataType::Utf8, true),
2346        ]),
2347        vec![
2348            ids(&rows, |r| r.id),
2349            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2350            opt_string(rows.iter().map(|r| r.session_id.as_deref())),
2351            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2352            opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2353            opt_string(rows.iter().map(|r| r.model.as_deref())),
2354            int64_opt(rows.iter().map(|r| r.input_tokens)),
2355            int64_opt(rows.iter().map(|r| r.output_tokens)),
2356            int64_opt(rows.iter().map(|r| r.cached_input_tokens)),
2357            int64_opt(rows.iter().map(|r| r.cost_usd_micros)),
2358            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2359        ],
2360    )?)
2361}
2362
2363fn source_errors_batch(rows: Vec<SourceErrorRow>) -> Result<RecordBatch> {
2364    Ok(RecordBatch::try_new(
2365        schema(&[
2366            ("id", DataType::Int64, false),
2367            ("ts_utc", DataType::Utf8, false),
2368            ("source", DataType::Utf8, false),
2369            ("error_class", DataType::Utf8, false),
2370            ("count", DataType::Int64, false),
2371            ("detail_json", DataType::Utf8, true),
2372        ]),
2373        vec![
2374            ids(&rows, |r| r.id),
2375            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2376            string(rows.iter().map(|r| Some(r.source.as_str()))),
2377            string(rows.iter().map(|r| Some(r.error_class.as_str()))),
2378            int64(rows.iter().map(|r| r.count)),
2379            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2380        ],
2381    )?)
2382}
2383
2384fn iroh_events_batch(rows: Vec<IrohEventRow>) -> Result<RecordBatch> {
2385    Ok(RecordBatch::try_new(
2386        schema(&[
2387            ("id", DataType::Int64, false),
2388            ("ts_utc", DataType::Utf8, false),
2389            ("event_type", DataType::Utf8, false),
2390            ("peer_id_hash", DataType::Utf8, false),
2391            ("peer_label", DataType::Utf8, true),
2392            ("detail_json", DataType::Utf8, true),
2393        ]),
2394        vec![
2395            ids(&rows, |r| r.id),
2396            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2397            string(rows.iter().map(|r| Some(r.event_type.as_str()))),
2398            string(rows.iter().map(|r| Some(r.peer_id_hash.as_str()))),
2399            opt_string(rows.iter().map(|r| r.peer_label.as_deref())),
2400            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2401        ],
2402    )?)
2403}
2404
2405fn watchdog_events_batch(rows: Vec<WatchdogEventRow>) -> Result<RecordBatch> {
2406    Ok(RecordBatch::try_new(
2407        schema(&[
2408            ("id", DataType::Int64, false),
2409            ("ts_utc", DataType::Utf8, false),
2410            ("event", DataType::Utf8, false),
2411            ("agent", DataType::Utf8, true),
2412            ("severity", DataType::Utf8, true),
2413            ("status", DataType::Utf8, true),
2414            ("detail_json", DataType::Utf8, true),
2415        ]),
2416        vec![
2417            ids(&rows, |r| r.id),
2418            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2419            string(rows.iter().map(|r| Some(r.event.as_str()))),
2420            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2421            opt_string(rows.iter().map(|r| r.severity.as_deref())),
2422            opt_string(rows.iter().map(|r| r.status.as_deref())),
2423            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2424        ],
2425    )?)
2426}
2427
2428fn tasks_batch(rows: Vec<TaskRow>) -> Result<RecordBatch> {
2429    Ok(RecordBatch::try_new(
2430        schema(&[
2431            ("id", DataType::Int64, false),
2432            ("created_at", DataType::Utf8, false),
2433            ("updated_at", DataType::Utf8, false),
2434            ("title", DataType::Utf8, false),
2435            ("body", DataType::Utf8, true),
2436            ("status", DataType::Utf8, false),
2437            ("priority", DataType::Utf8, true),
2438            ("labels", DataType::Utf8, true),
2439            ("source", DataType::Utf8, true),
2440            ("source_ref", DataType::Utf8, true),
2441            ("closed_at", DataType::Utf8, true),
2442            ("closed_reason", DataType::Utf8, true),
2443            ("closed_evidence", DataType::Utf8, true),
2444            ("agent", DataType::Utf8, true),
2445            ("sync_calendar", DataType::Boolean, false),
2446            ("calendar_task_id", DataType::Utf8, true),
2447        ]),
2448        vec![
2449            ids(&rows, |r| r.id),
2450            string(rows.iter().map(|r| Some(r.created_at.as_str()))),
2451            string(rows.iter().map(|r| Some(r.updated_at.as_str()))),
2452            string(rows.iter().map(|r| Some(r.title.as_str()))),
2453            opt_string(rows.iter().map(|r| r.body.as_deref())),
2454            string(rows.iter().map(|r| Some(r.status.as_str()))),
2455            opt_string(rows.iter().map(|r| r.priority.as_deref())),
2456            opt_string(rows.iter().map(|r| r.labels.as_deref())),
2457            opt_string(rows.iter().map(|r| r.source.as_deref())),
2458            opt_string(rows.iter().map(|r| r.source_ref.as_deref())),
2459            opt_string(rows.iter().map(|r| r.closed_at.as_deref())),
2460            opt_string(rows.iter().map(|r| r.closed_reason.as_deref())),
2461            opt_string(rows.iter().map(|r| r.closed_evidence.as_deref())),
2462            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2463            bools(rows.iter().map(|r| r.sync_calendar)),
2464            opt_string(rows.iter().map(|r| r.calendar_task_id.as_deref())),
2465        ],
2466    )?)
2467}
2468
2469fn simple_event_batch<'a>(
2470    schema: SchemaRef,
2471    rows: impl Iterator<Item = (i64, &'a String, &'a String, &'a String, &'a String)>,
2472) -> Result<RecordBatch> {
2473    let rows = rows.collect::<Vec<_>>();
2474    Ok(RecordBatch::try_new(
2475        schema,
2476        vec![
2477            Arc::new(Int64Array::from(
2478                rows.iter().map(|r| r.0).collect::<Vec<_>>(),
2479            )),
2480            string(rows.iter().map(|r| Some(r.1.as_str()))),
2481            string(rows.iter().map(|r| Some(r.2.as_str()))),
2482            string(rows.iter().map(|r| Some(r.3.as_str()))),
2483            string(rows.iter().map(|r| Some(r.4.as_str()))),
2484        ],
2485    )?)
2486}
2487
2488fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
2489    Arc::new(Schema::new(
2490        fields
2491            .iter()
2492            .map(|(name, ty, nullable)| Field::new(*name, ty.clone(), *nullable))
2493            .collect::<Vec<_>>(),
2494    ))
2495}
2496
2497fn ids<T>(rows: &[T], id: impl Fn(&T) -> i64) -> Arc<Int64Array> {
2498    int64(rows.iter().map(id))
2499}
2500
2501fn int64(values: impl Iterator<Item = i64>) -> Arc<Int64Array> {
2502    Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2503}
2504
2505fn int64_opt(values: impl Iterator<Item = Option<i64>>) -> Arc<Int64Array> {
2506    Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2507}
2508
2509fn bools(values: impl Iterator<Item = bool>) -> Arc<BooleanArray> {
2510    Arc::new(BooleanArray::from(values.collect::<Vec<_>>()))
2511}
2512
2513fn string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2514    Arc::new(StringArray::from(values.collect::<Vec<_>>()))
2515}
2516
2517fn opt_string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2518    string(values)
2519}
2520
2521fn ts(ts_utc: DateTime<Utc>) -> String {
2522    ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
2523}
2524
2525fn parse_ts(ts_utc: &str) -> Result<DateTime<Utc>> {
2526    Ok(DateTime::parse_from_rfc3339(ts_utc)?.with_timezone(&Utc))
2527}
2528
2529fn truncate(value: &str, max_chars: usize) -> String {
2530    value.chars().take(max_chars).collect()
2531}
2532
2533fn truncate_opt(value: Option<&str>, max_chars: usize) -> Option<String> {
2534    value.map(|s| truncate(s, max_chars))
2535}
2536
2537#[cfg(test)]
2538mod tests {
2539    use super::*;
2540
2541    fn db() -> Result<Db> {
2542        let db = Db::open_in_memory()?;
2543        db.migrate()?;
2544        Ok(db)
2545    }
2546
2547    fn fixed_ts() -> DateTime<Utc> {
2548        chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
2549            .unwrap()
2550            .with_timezone(&Utc)
2551    }
2552
2553    #[test]
2554    fn referenced_tables_extracts_scope() {
2555        let scope =
2556            referenced_storage_tables("SELECT COUNT(*) FROM messages WHERE ts_utc >= '2026-04-18'")
2557                .expect("single-table scope");
2558        assert!(scope.contains(MESSAGES));
2559        assert!(!scope.contains(CRASHES));
2560        assert_eq!(scope.len(), 1);
2561    }
2562
2563    #[test]
2564    fn referenced_tables_handles_alias_and_view() {
2565        let scope = referenced_storage_tables(
2566            "SELECT ts_utc_start FROM clone_lifetimes WHERE status IS NOT NULL",
2567        )
2568        .expect("view resolves to backing table");
2569        assert!(scope.contains(CLONE_DISPATCHES));
2570    }
2571
2572    #[test]
2573    fn referenced_tables_maps_datafusion_name_to_storage() {
2574        let scope =
2575            referenced_storage_tables("SELECT title FROM tasks").expect("tasks -> netsky_tasks");
2576        assert!(scope.contains(TASKS));
2577        assert!(!scope.contains("tasks"));
2578    }
2579
2580    #[test]
2581    fn referenced_tables_unknown_sql_returns_none() {
2582        assert!(referenced_storage_tables("SELECT 1").is_none());
2583    }
2584
2585    #[test]
2586    fn referenced_tables_join_captures_both() {
2587        let scope = referenced_storage_tables(
2588            "SELECT a.id FROM messages a JOIN crashes b ON a.ts_utc = b.ts_utc",
2589        )
2590        .expect("two tables");
2591        assert!(scope.contains(MESSAGES));
2592        assert!(scope.contains(CRASHES));
2593    }
2594
2595    #[test]
2596    fn list_tasks_pushes_filter_and_order_into_sql() -> Result<()> {
2597        let db = db()?;
2598        let mk = |title: &'static str, status: &'static str, priority: &'static str| {
2599            db.record_task(TaskRecord {
2600                title,
2601                body: None,
2602                status,
2603                priority: Some(priority),
2604                labels: None,
2605                source: None,
2606                source_ref: None,
2607                closed_at: None,
2608                closed_reason: None,
2609                closed_evidence: None,
2610                agent: None,
2611                sync_calendar: false,
2612                calendar_task_id: None,
2613            })
2614        };
2615        mk("c", "open", "p2")?;
2616        mk("a", "open", "p1")?;
2617        mk("b", "in_progress", "p1")?;
2618        mk("d", "closed", "p3")?;
2619
2620        let all = db.list_tasks(None, None)?;
2621        let order: Vec<_> = all.iter().map(|r| r.title.as_str()).collect();
2622        assert_eq!(
2623            order,
2624            vec!["d", "b", "a", "c"],
2625            "status then priority then id"
2626        );
2627
2628        let open_only = db.list_tasks(Some("open"), None)?;
2629        assert_eq!(open_only.len(), 2);
2630        assert!(open_only.iter().all(|r| r.status == "open"));
2631
2632        let p1 = db.list_tasks(None, Some("p1"))?;
2633        assert_eq!(p1.len(), 2);
2634        assert!(p1.iter().all(|r| r.priority.as_deref() == Some("p1")));
2635
2636        let open_p1 = db.list_tasks(Some("open"), Some("p1"))?;
2637        assert_eq!(open_p1.len(), 1);
2638        assert_eq!(open_p1[0].title, "a");
2639        Ok(())
2640    }
2641
2642    #[test]
2643    fn migrate_installs_expression_indexes_for_timed_tables() -> Result<()> {
2644        let db = db()?;
2645        let names = db.with_conn(|conn| async move {
2646            let mut rows = conn
2647                .query("SELECT name FROM sqlite_master WHERE type = 'index'", ())
2648                .await?;
2649            let mut out = Vec::new();
2650            while let Some(row) = rows.next().await? {
2651                out.push(text_value(&row.get_value(0)?)?);
2652            }
2653            Ok(out)
2654        })?;
2655        for (table, column) in INDEXED_TIME_COLUMNS {
2656            let expected = format!("idx_{table}_{column}");
2657            assert!(
2658                names.iter().any(|n| n == &expected),
2659                "missing index {expected} in {names:?}"
2660            );
2661        }
2662        for (table, column) in INDEXED_EQ_COLUMNS {
2663            let expected = format!("idx_{table}_{column}");
2664            assert!(
2665                names.iter().any(|n| n == &expected),
2666                "missing index {expected} in {names:?}"
2667            );
2668        }
2669        Ok(())
2670    }
2671
2672    #[test]
2673    fn contains_word_respects_identifier_boundary() {
2674        assert!(!contains_word("select from netsky_tasks", "tasks"));
2675        assert!(contains_word("select from tasks where", "tasks"));
2676        assert!(contains_word("FROM tasks;", "tasks"));
2677        assert!(!contains_word("classes", "class"));
2678    }
2679
2680    #[test]
2681    fn message_round_trip() -> Result<()> {
2682        let db = db()?;
2683        let id = db.record_message(MessageRecord {
2684            ts_utc: fixed_ts(),
2685            source: "imessage",
2686            direction: Direction::Inbound,
2687            chat_id: Some("chat-1"),
2688            from_agent: Some("agent9"),
2689            to_agent: None,
2690            body: Some("hello"),
2691            raw_json: Some("{\"chat_id\":\"chat-1\"}"),
2692        })?;
2693        let out = db.query("SELECT id, ts_utc, source, direction, chat_id, body FROM messages")?;
2694        assert_eq!(id, 1);
2695        assert!(out.contains("2026-04-15T23:00:00.000Z"));
2696        assert!(out.contains("imessage"));
2697        assert!(out.contains("inbound"));
2698        assert!(out.contains("chat-1"));
2699        assert!(out.contains("hello"));
2700        Ok(())
2701    }
2702
2703    #[test]
2704    fn cli_round_trip() -> Result<()> {
2705        let db = db()?;
2706        let id = db.record_cli(
2707            fixed_ts(),
2708            "netsky",
2709            "[\"db\", \"migrate\"]",
2710            Some(0),
2711            Some(12),
2712            "host-a",
2713        )?;
2714        let out = db.query("SELECT bin, exit_code, duration_ms, host FROM cli_invocations")?;
2715        assert_eq!(id, 1);
2716        assert!(out.contains("netsky"));
2717        assert!(out.contains("host-a"));
2718        assert!(out.contains("12"));
2719        Ok(())
2720    }
2721
2722    #[test]
2723    fn crash_round_trip() -> Result<()> {
2724        let db = db()?;
2725        let id = db.record_crash(fixed_ts(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
2726        let out = db.query("SELECT kind, agent, detail_json FROM crashes")?;
2727        assert_eq!(id, 1);
2728        assert!(out.contains("panic"));
2729        assert!(out.contains("agent8"));
2730        assert!(out.contains("wedged"));
2731        Ok(())
2732    }
2733
2734    #[test]
2735    fn tick_round_trip() -> Result<()> {
2736        let db = db()?;
2737        let id = db.record_tick(fixed_ts(), "ticker", "{\"beat\":1}")?;
2738        let out = db.query("SELECT source, detail_json FROM ticks")?;
2739        assert_eq!(id, 1);
2740        assert!(out.contains("ticker"));
2741        assert!(out.contains("beat"));
2742        Ok(())
2743    }
2744
2745    #[test]
2746    fn workspace_round_trip() -> Result<()> {
2747        let db = db()?;
2748        let created = fixed_ts();
2749        let deleted = created + chrono::Duration::minutes(5);
2750        let id = db.record_workspace(
2751            created,
2752            "session8",
2753            "feat/netsky-db-v0",
2754            Some(deleted),
2755            Some("kept"),
2756        )?;
2757        let out = db.query("SELECT name, branch, verdict FROM workspaces")?;
2758        assert_eq!(id, 1);
2759        assert!(out.contains("session8"));
2760        assert!(out.contains("feat/netsky-db-v0"));
2761        assert!(out.contains("kept"));
2762        Ok(())
2763    }
2764
2765    #[test]
2766    fn session_round_trip() -> Result<()> {
2767        let db = db()?;
2768        let id = db.record_session(fixed_ts(), "agent8", 8, SessionEvent::Note)?;
2769        let out = db.query("SELECT agent, session_num, event FROM sessions")?;
2770        assert_eq!(id, 1);
2771        assert!(out.contains("agent8"));
2772        assert!(out.contains("note"));
2773        Ok(())
2774    }
2775
2776    #[test]
2777    fn v2_tables_round_trip() -> Result<()> {
2778        let db = db()?;
2779        db.record_clone_dispatch(CloneDispatchRecord {
2780            ts_utc_start: fixed_ts(),
2781            ts_utc_end: Some(fixed_ts()),
2782            agent_id: "agent3",
2783            runtime: Some("codex"),
2784            brief_path: Some("briefs/foo.md"),
2785            brief: Some("do the thing"),
2786            workspace: Some("workspaces/foo/repo"),
2787            branch: Some("feat/foo"),
2788            status: Some("done"),
2789            exit_code: Some(0),
2790            detail_json: Some("{\"ok\":true}"),
2791        })?;
2792        db.record_harvest_event(HarvestEventRecord {
2793            ts_utc: fixed_ts(),
2794            source_branch: "feat/foo",
2795            target_branch: "main",
2796            commit_sha: Some("abc123"),
2797            status: "clean",
2798            conflicts: None,
2799            detail_json: None,
2800        })?;
2801        db.record_communication_event(CommunicationEventRecord {
2802            ts_utc: fixed_ts(),
2803            source: "imessage",
2804            tool: Some("reply"),
2805            direction: Direction::Outbound,
2806            chat_id: Some("chat-1"),
2807            message_id: Some("msg-1"),
2808            handle: Some("+15551234567"),
2809            agent: Some("agent0"),
2810            body: Some("sent"),
2811            status: Some("ok"),
2812            detail_json: None,
2813        })?;
2814        let comms = db.list_communication_events()?;
2815        assert_eq!(comms.len(), 1);
2816        assert_eq!(comms[0].source, "imessage");
2817        assert_eq!(comms[0].agent.as_deref(), Some("agent0"));
2818        db.record_mcp_tool_call(McpToolCallRecord {
2819            ts_utc_start: fixed_ts(),
2820            ts_utc_end: Some(fixed_ts()),
2821            source: "drive",
2822            tool: "list_files",
2823            agent: Some("agent0"),
2824            duration_ms: Some(22),
2825            success: true,
2826            error: None,
2827            timeout_race: false,
2828            request_json: Some("{}"),
2829            response_json: Some("[]"),
2830        })?;
2831        db.record_git_operation(GitOperationRecord {
2832            ts_utc: fixed_ts(),
2833            operation: "push",
2834            repo: "netsky",
2835            branch: Some("feat/foo"),
2836            remote: Some("origin"),
2837            from_sha: Some("abc"),
2838            to_sha: Some("def"),
2839            status: "ok",
2840            detail_json: None,
2841        })?;
2842        db.record_owner_directive(OwnerDirectiveRecord {
2843            ts_utc: fixed_ts(),
2844            source: "imessage",
2845            chat_id: Some("chat-1"),
2846            raw_text: "ship it",
2847            resolved_action: Some("dispatch"),
2848            agent: Some("agent0"),
2849            status: Some("accepted"),
2850            detail_json: None,
2851        })?;
2852        db.record_token_usage(TokenUsageRecord {
2853            ts_utc: fixed_ts(),
2854            session_id: Some("session-1"),
2855            agent: Some("agent4"),
2856            runtime: Some("codex"),
2857            model: Some("gpt-5.4"),
2858            input_tokens: Some(100),
2859            output_tokens: Some(25),
2860            cached_input_tokens: Some(10),
2861            cost_usd_micros: Some(1234),
2862            detail_json: None,
2863        })?;
2864        db.record_watchdog_event(WatchdogEventRecord {
2865            ts_utc: fixed_ts(),
2866            event: "respawn",
2867            agent: Some("agent0"),
2868            severity: Some("warn"),
2869            status: Some("ok"),
2870            detail_json: Some("{}"),
2871        })?;
2872
2873        for table in [
2874            "clone_dispatches",
2875            "harvest_events",
2876            "communication_events",
2877            "mcp_tool_calls",
2878            "git_operations",
2879            "owner_directives",
2880            "token_usage",
2881            "watchdog_events",
2882        ] {
2883            let out = db.query(&format!("SELECT COUNT(*) AS count FROM {table}"))?;
2884            assert!(out.contains('1'), "{table}: {out}");
2885        }
2886        Ok(())
2887    }
2888
2889    #[test]
2890    fn token_usage_batch_inserts_contiguous_ids() -> Result<()> {
2891        let db = db()?;
2892        let count = 1500;
2893        let records: Vec<_> = (0..count)
2894            .map(|i| TokenUsageRecord {
2895                ts_utc: fixed_ts(),
2896                session_id: Some("batch-session"),
2897                agent: Some("agent45"),
2898                runtime: Some("claude"),
2899                model: Some("claude-opus-4-7"),
2900                input_tokens: Some(i),
2901                output_tokens: Some(i * 2),
2902                cached_input_tokens: None,
2903                cost_usd_micros: None,
2904                detail_json: None,
2905            })
2906            .collect();
2907        let written = db.record_token_usage_batch(records)?;
2908        assert_eq!(written, count as usize);
2909        let out = db.query(
2910            "SELECT COUNT(*) AS c, MIN(id) AS lo, MAX(id) AS hi, \
2911             SUM(input_tokens) AS sum_in FROM token_usage",
2912        )?;
2913        assert!(out.contains(&count.to_string()), "{out}");
2914        // Sum of 0..1500 = 1124250
2915        assert!(out.contains("1124250"), "{out}");
2916        // A second batch should pick up where the first left off.
2917        let written = db.record_token_usage_batch(vec![TokenUsageRecord {
2918            ts_utc: fixed_ts(),
2919            session_id: None,
2920            agent: None,
2921            runtime: None,
2922            model: None,
2923            input_tokens: Some(7),
2924            output_tokens: None,
2925            cached_input_tokens: None,
2926            cost_usd_micros: None,
2927            detail_json: None,
2928        }])?;
2929        assert_eq!(written, 1);
2930        let out = db.query("SELECT MAX(id) AS hi FROM token_usage")?;
2931        assert!(out.contains(&(count + 1).to_string()), "{out}");
2932        Ok(())
2933    }
2934
2935    #[test]
2936    fn token_usage_batch_empty_is_noop() -> Result<()> {
2937        let db = db()?;
2938        let written = db.record_token_usage_batch(Vec::<TokenUsageRecord<'_>>::new())?;
2939        assert_eq!(written, 0);
2940        Ok(())
2941    }
2942
2943    #[test]
2944    fn task_round_trip() -> Result<()> {
2945        let db = db()?;
2946        let id = db.record_task(TaskRecord {
2947            title: "Add a task tracker",
2948            body: Some("Use turso and DataFusion."),
2949            status: "open",
2950            priority: Some("p1"),
2951            labels: Some("db,cli"),
2952            source: Some("test"),
2953            source_ref: Some("briefs/task-tracker.md"),
2954            closed_at: None,
2955            closed_reason: None,
2956            closed_evidence: None,
2957            agent: None,
2958            sync_calendar: false,
2959            calendar_task_id: None,
2960        })?;
2961        let row = db.update_task(TaskUpdate {
2962            id,
2963            status: Some("in_progress"),
2964            priority: None,
2965            agent: Some("agent3"),
2966            closed_reason: None,
2967            closed_evidence: None,
2968            sync_calendar: Some(true),
2969            calendar_task_id: Some("calendar-1"),
2970        })?;
2971        assert_eq!(row.id, 1);
2972        assert_eq!(row.status, "in_progress");
2973        assert_eq!(row.agent.as_deref(), Some("agent3"));
2974        assert!(row.sync_calendar);
2975        assert_eq!(row.calendar_task_id.as_deref(), Some("calendar-1"));
2976        let rows = db.list_tasks(Some("in_progress"), Some("p1"))?;
2977        assert_eq!(rows.len(), 1);
2978        let out = db.query(
2979            "SELECT title, status, priority, agent, sync_calendar, calendar_task_id FROM tasks",
2980        )?;
2981        assert!(out.contains("Add a task tracker"));
2982        assert!(out.contains("in_progress"));
2983        assert!(out.contains("agent3"));
2984        assert!(out.contains("calendar-1"));
2985        Ok(())
2986    }
2987
2988    #[test]
2989    fn query_registers_all_tables() -> Result<()> {
2990        let db = db()?;
2991        let out = db.query(
2992            "SELECT COUNT(*) FROM messages \
2993             UNION ALL SELECT COUNT(*) FROM cli_invocations \
2994             UNION ALL SELECT COUNT(*) FROM crashes \
2995             UNION ALL SELECT COUNT(*) FROM ticks \
2996             UNION ALL SELECT COUNT(*) FROM workspaces \
2997             UNION ALL SELECT COUNT(*) FROM sessions \
2998             UNION ALL SELECT COUNT(*) FROM clone_dispatches \
2999             UNION ALL SELECT COUNT(*) FROM harvest_events \
3000             UNION ALL SELECT COUNT(*) FROM communication_events \
3001             UNION ALL SELECT COUNT(*) FROM mcp_tool_calls \
3002             UNION ALL SELECT COUNT(*) FROM git_operations \
3003             UNION ALL SELECT COUNT(*) FROM owner_directives \
3004             UNION ALL SELECT COUNT(*) FROM token_usage \
3005             UNION ALL SELECT COUNT(*) FROM watchdog_events \
3006             UNION ALL SELECT COUNT(*) FROM tasks \
3007             UNION ALL SELECT COUNT(*) FROM source_errors \
3008             UNION ALL SELECT COUNT(*) FROM iroh_events",
3009        )?;
3010        assert!(out.contains('0'));
3011        Ok(())
3012    }
3013
3014    #[test]
3015    fn read_only_path_reports_uninitialized_then_reads_initialized_db() -> Result<()> {
3016        let dir = tempfile::tempdir()?;
3017        let path = dir.path().join("meta.db");
3018
3019        let missing = match Db::open_read_only_path(&path) {
3020            Ok(_) => panic!("missing database opened read-only"),
3021            Err(error) => error.to_string(),
3022        };
3023        assert_eq!(
3024            missing,
3025            format!(
3026                "meta.db has not been initialized at {}; a writer must run first.",
3027                path.display()
3028            )
3029        );
3030        assert!(!path.exists());
3031
3032        let uninitialized = Db::open_path(&path)?;
3033        drop(uninitialized);
3034        let no_schema = match Db::open_read_only_path(&path) {
3035            Ok(_) => panic!("uninitialized database opened read-only"),
3036            Err(error) => error.to_string(),
3037        };
3038        assert_eq!(
3039            no_schema,
3040            format!(
3041                "meta.db has not been initialized at {}; a writer must run first.",
3042                path.display()
3043            )
3044        );
3045
3046        let writer = Db::open_path(&path)?;
3047        writer.migrate()?;
3048        writer.record_message(MessageRecord {
3049            ts_utc: fixed_ts(),
3050            source: "agent",
3051            direction: Direction::Outbound,
3052            chat_id: Some("agent0"),
3053            from_agent: Some("agent5"),
3054            to_agent: Some("agent0"),
3055            body: Some("checkpoint acked"),
3056            raw_json: None,
3057        })?;
3058
3059        let reader = Db::open_read_only_path(&path)?;
3060        assert!(reader.read_only_write_probe_fails()?);
3061        let out = reader.query("SELECT source, chat_id, body FROM messages")?;
3062        assert!(out.contains("agent"));
3063        assert!(out.contains("agent0"));
3064        assert!(out.contains("checkpoint acked"));
3065        Ok(())
3066    }
3067
3068    #[test]
3069    fn migrate_sets_schema_version_6() -> Result<()> {
3070        let db = db()?;
3071        assert_eq!(db.schema_version()?, 6);
3072        Ok(())
3073    }
3074
3075    #[test]
3076    fn migrate_sets_sqlite_user_version_6() -> Result<()> {
3077        let db = db()?;
3078        assert_eq!(
3079            db.with_conn(|conn| async move { user_version(&conn).await })?,
3080            SCHEMA_VERSION
3081        );
3082        Ok(())
3083    }
3084
3085    #[test]
3086    fn hash_peer_id_is_stable_and_truncated() {
3087        let a = hash_peer_id("abc123");
3088        let b = hash_peer_id("abc123");
3089        let c = hash_peer_id("abc124");
3090        assert_eq!(a, b, "same input hashes the same");
3091        assert_ne!(a, c, "different input hashes differently");
3092        assert_eq!(a.len(), 16, "8 bytes hex-encoded = 16 chars");
3093        assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
3094    }
3095
3096    /// Fixed-vector test against RFC 6234 SHA-256 outputs. A silent swap
3097    /// to a different digest (blake3, a different truncation width, a
3098    /// non-URL-safe hex alphabet) would invalidate every historical
3099    /// `iroh_events.peer_id_hash` and break cross-run joins on the
3100    /// analytics surface. Pinning the exact bytes means that drift
3101    /// blows this test, not a midnight query.
3102    #[test]
3103    fn hash_peer_id_matches_known_sha256_vectors() {
3104        // sha256("abc") = ba7816bf8f01cfea414140de5dae2223b00361a396177a9c...
3105        assert_eq!(hash_peer_id("abc"), "ba7816bf8f01cfea");
3106        // sha256("") = e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934c...
3107        assert_eq!(hash_peer_id(""), "e3b0c44298fc1c14");
3108        // sha256("netsky0") — representative "local peer" input
3109        assert_eq!(hash_peer_id("netsky0"), "66863bafcf1591d6");
3110    }
3111
3112    #[test]
3113    fn source_error_round_trip() -> Result<()> {
3114        let db = db()?;
3115        let id = db.record_source_error(SourceErrorRecord {
3116            ts_utc: fixed_ts(),
3117            source: "email",
3118            error_class: SourceErrorClass::Timeout,
3119            count: 1,
3120            detail_json: Some("{\"endpoint\":\"gmail\"}"),
3121        })?;
3122        assert_eq!(id, 1);
3123        db.record_source_error(SourceErrorRecord {
3124            ts_utc: fixed_ts(),
3125            source: "iroh",
3126            error_class: SourceErrorClass::NetworkError,
3127            count: 3,
3128            detail_json: None,
3129        })?;
3130        let out = db.query(
3131            "SELECT source, error_class, SUM(count) AS n FROM source_errors \
3132             GROUP BY source, error_class ORDER BY source",
3133        )?;
3134        assert!(out.contains("email"));
3135        assert!(out.contains("timeout"));
3136        assert!(out.contains("network_error"));
3137        Ok(())
3138    }
3139
3140    #[test]
3141    fn source_error_count_floors_at_one() -> Result<()> {
3142        let db = db()?;
3143        db.record_source_error(SourceErrorRecord {
3144            ts_utc: fixed_ts(),
3145            source: "email",
3146            error_class: SourceErrorClass::Unknown,
3147            count: 0,
3148            detail_json: None,
3149        })?;
3150        let out = db.query("SELECT count FROM source_errors")?;
3151        assert!(out.contains('1'));
3152        Ok(())
3153    }
3154
3155    #[test]
3156    fn source_cursor_round_trip() -> Result<()> {
3157        let db = db()?;
3158        assert_eq!(db.read_source_cursor("imessage")?, None);
3159        db.update_source_cursor("imessage", "42")?;
3160        assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("42"));
3161        // Upsert semantics: second write replaces the value.
3162        db.update_source_cursor("imessage", "99")?;
3163        assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("99"));
3164        // Sources are scoped independently.
3165        db.update_source_cursor("email", "abc")?;
3166        let all = db.list_source_cursors()?;
3167        assert_eq!(all.len(), 2);
3168        assert_eq!(all[0].source, "email");
3169        assert_eq!(all[0].cursor_value, "abc");
3170        assert_eq!(all[1].source, "imessage");
3171        assert_eq!(all[1].cursor_value, "99");
3172        // Reset drops the row.
3173        assert!(db.reset_source_cursor("imessage")?);
3174        assert_eq!(db.read_source_cursor("imessage")?, None);
3175        assert!(!db.reset_source_cursor("imessage")?);
3176        Ok(())
3177    }
3178
3179    #[test]
3180    fn event_insert_then_transition_round_trip() -> Result<()> {
3181        let db = db()?;
3182        let delivered = db.insert_event("imessage", fixed_ts(), r#"{"chat":"a"}"#)?;
3183        let failed = db.insert_event("imessage", fixed_ts(), r#"{"chat":"b"}"#)?;
3184        assert!(delivered > 0 && failed > delivered);
3185        db.update_event_delivery(delivered, EventStatus::Delivered, None)?;
3186        db.update_event_delivery(failed, EventStatus::Failed, Some("adapter timed out"))?;
3187        // tail_events returns newest first and surfaces delivery status + reason.
3188        let rows = db.tail_events("imessage", 10)?;
3189        assert_eq!(rows.len(), 2);
3190        assert_eq!(rows[0].id, failed);
3191        assert_eq!(rows[0].delivery_status, "failed");
3192        assert_eq!(rows[0].reason.as_deref(), Some("adapter timed out"));
3193        assert_eq!(rows[1].id, delivered);
3194        assert_eq!(rows[1].delivery_status, "delivered");
3195        assert_eq!(rows[1].reason, None);
3196        // Events from a different source are isolated.
3197        let other = db.insert_event("email", fixed_ts(), "{}")?;
3198        assert!(other > failed);
3199        let only_imessage = db.tail_events("imessage", 10)?;
3200        assert_eq!(only_imessage.len(), 2);
3201        Ok(())
3202    }
3203
3204    #[test]
3205    fn iroh_event_round_trip() -> Result<()> {
3206        let db = db()?;
3207        let hash = hash_peer_id("raw_node_id_abc");
3208        let id = db.record_iroh_event(IrohEventRecord {
3209            ts_utc: fixed_ts(),
3210            event_type: IrohEventType::Connect,
3211            peer_id_hash: &hash,
3212            peer_label: Some("work"),
3213            detail_json: None,
3214        })?;
3215        assert_eq!(id, 1);
3216        db.record_iroh_event(IrohEventRecord {
3217            ts_utc: fixed_ts(),
3218            event_type: IrohEventType::HandshakeRefused,
3219            peer_id_hash: &hash_peer_id("unknown_peer"),
3220            peer_label: None,
3221            detail_json: Some("{\"reason\":\"not_in_allowlist\"}"),
3222        })?;
3223        let out =
3224            db.query("SELECT event_type, peer_id_hash, peer_label FROM iroh_events ORDER BY id")?;
3225        assert!(out.contains("connect"));
3226        assert!(out.contains("handshake_refused"));
3227        assert!(out.contains("work"));
3228        assert!(!out.contains("raw_node_id_abc"), "raw node id leaked");
3229        Ok(())
3230    }
3231
3232    #[test]
3233    fn token_usage_includes_runtime_column() -> Result<()> {
3234        let db = db()?;
3235        db.record_token_usage(TokenUsageRecord {
3236            ts_utc: fixed_ts(),
3237            session_id: Some("s1"),
3238            agent: Some("agent12"),
3239            runtime: Some("claude"),
3240            model: Some("claude-opus-4-7"),
3241            input_tokens: Some(1000),
3242            output_tokens: Some(500),
3243            cached_input_tokens: None,
3244            cost_usd_micros: Some(12_500),
3245            detail_json: None,
3246        })?;
3247        let out =
3248            db.query("SELECT runtime, SUM(input_tokens) AS ins FROM token_usage GROUP BY runtime")?;
3249        assert!(out.contains("claude"));
3250        assert!(out.contains("1000"));
3251        Ok(())
3252    }
3253
3254    #[test]
3255    fn clone_lifetimes_view_reads_finished_dispatches() -> Result<()> {
3256        let db = db()?;
3257        db.record_clone_dispatch(CloneDispatchRecord {
3258            ts_utc_start: fixed_ts(),
3259            ts_utc_end: Some(fixed_ts() + chrono::Duration::minutes(7)),
3260            agent_id: "agent5",
3261            runtime: Some("claude"),
3262            brief_path: None,
3263            brief: None,
3264            workspace: Some("workspaces/foo/repo"),
3265            branch: Some("feat/foo"),
3266            status: Some("done"),
3267            exit_code: Some(0),
3268            detail_json: None,
3269        })?;
3270        db.record_clone_dispatch(CloneDispatchRecord {
3271            ts_utc_start: fixed_ts(),
3272            ts_utc_end: None,
3273            agent_id: "agent6",
3274            runtime: Some("codex"),
3275            brief_path: None,
3276            brief: None,
3277            workspace: None,
3278            branch: None,
3279            status: Some("in_flight"),
3280            exit_code: None,
3281            detail_json: None,
3282        })?;
3283        let out = db.query("SELECT agent_id, runtime FROM clone_lifetimes")?;
3284        assert!(out.contains("agent5"));
3285        assert!(
3286            !out.contains("agent6"),
3287            "in-flight dispatches stay excluded"
3288        );
3289        Ok(())
3290    }
3291
3292    #[test]
3293    fn migration_from_v4_preserves_existing_rows_and_adds_new_tables() -> Result<()> {
3294        let dir = tempfile::tempdir()?;
3295        let path = dir.path().join("meta.db");
3296
3297        {
3298            let db = Db::open_path(&path)?;
3299            db.migrate()?;
3300            // Simulate an older binary by stamping down to v4 after
3301            // migration. The tables will still exist (they're all
3302            // created unconditionally), but the next migrate() call
3303            // must bump the version back to SCHEMA_VERSION without
3304            // losing data.
3305            db.with_conn(|conn| async move {
3306                conn.execute("UPDATE meta SET value = 4 WHERE key = 'schema_version'", ())
3307                    .await?;
3308                conn.execute("PRAGMA user_version = 4", ()).await?;
3309                Ok(())
3310            })?;
3311            db.record_message(MessageRecord {
3312                ts_utc: fixed_ts(),
3313                source: "imessage",
3314                direction: Direction::Inbound,
3315                chat_id: Some("chat-1"),
3316                from_agent: None,
3317                to_agent: None,
3318                body: Some("pre-migration"),
3319                raw_json: None,
3320            })?;
3321        }
3322
3323        let db = Db::open_path(&path)?;
3324        db.migrate()?;
3325        assert_eq!(db.schema_version()?, 6);
3326
3327        let out = db.query("SELECT body FROM messages")?;
3328        assert!(
3329            out.contains("pre-migration"),
3330            "migration lost pre-existing row: {out}"
3331        );
3332        // New tables are queryable after migrating up.
3333        let se = db.query("SELECT COUNT(*) FROM source_errors")?;
3334        assert!(se.contains('0'));
3335        let ie = db.query("SELECT COUNT(*) FROM iroh_events")?;
3336        assert!(ie.contains('0'));
3337        // v6 additions are structured (not row_json); assert they exist
3338        // and start empty by reading through the typed helpers.
3339        assert!(db.list_source_cursors()?.is_empty());
3340        assert!(db.tail_events("imessage", 10)?.is_empty());
3341        Ok(())
3342    }
3343}