Skip to main content

netsky_db/
lib.rs

1//! Pure Rust netsky observability database.
2//!
3//! OLTP writes use turso by default. OLAP reads snapshot JSON rows into Arrow
4//! RecordBatches and execute SQL with DataFusion.
5
6use std::collections::HashSet;
7use std::fs;
8use std::path::Path;
9use std::path::PathBuf;
10use std::sync::Arc;
11use std::time::Duration;
12
13use chrono::{DateTime, SecondsFormat, Utc};
14use datafusion::arrow::array::{BooleanArray, Int64Array, StringArray};
15use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
16use datafusion::arrow::record_batch::RecordBatch;
17use datafusion::arrow::util::pretty::pretty_format_batches;
18use datafusion::datasource::MemTable;
19use datafusion::prelude::SessionContext;
20use serde::{Deserialize, Serialize, de::DeserializeOwned};
21use thiserror::Error;
22use tokio::runtime::Runtime;
23use turso::{Builder, Connection, Value, params_from_iter};
24
25pub type Result<T> = std::result::Result<T, Error>;
26
27/// Re-exports for callers that consume `query_batches` results.
28pub use datafusion::arrow::array as arrow_array;
29pub use datafusion::arrow::record_batch::RecordBatch as ArrowRecordBatch;
30
31pub const SCHEMA_VERSION: i64 = 7;
32
33const TABLE_NAMES: [&str; 17] = [
34    "messages",
35    "cli_invocations",
36    "crashes",
37    "ticks",
38    "workspaces",
39    "sessions",
40    "clone_dispatches",
41    "harvest_events",
42    "communication_events",
43    "mcp_tool_calls",
44    "git_operations",
45    "owner_directives",
46    "token_usage",
47    "watchdog_events",
48    "netsky_tasks",
49    "source_errors",
50    "iroh_events",
51];
52
53const MESSAGES: &str = "messages";
54const CLI_INVOCATIONS: &str = "cli_invocations";
55const CRASHES: &str = "crashes";
56const TICKS: &str = "ticks";
57const WORKSPACES: &str = "workspaces";
58const SESSIONS: &str = "sessions";
59const CLONE_DISPATCHES: &str = "clone_dispatches";
60const HARVEST_EVENTS: &str = "harvest_events";
61const COMMUNICATION_EVENTS: &str = "communication_events";
62const MCP_TOOL_CALLS: &str = "mcp_tool_calls";
63const GIT_OPERATIONS: &str = "git_operations";
64const OWNER_DIRECTIVES: &str = "owner_directives";
65const TOKEN_USAGE: &str = "token_usage";
66const WATCHDOG_EVENTS: &str = "watchdog_events";
67const TASKS: &str = "netsky_tasks";
68const SOURCE_ERRORS: &str = "source_errors";
69const IROH_EVENTS: &str = "iroh_events";
70const TOKEN_USAGE_CLAUDE_SESSIONS_UNIQUE_INDEX: &str =
71    "idx_token_usage_claude_sessions_session_path_line";
72const TOKEN_USAGE_CLAUDE_SESSIONS_FILTER: &str = "json_extract(json_extract(row_json, '$.detail_json'), '$.source') = 'claude-sessions' \
73     AND json_extract(json_extract(row_json, '$.detail_json'), '$.path') IS NOT NULL \
74     AND json_extract(json_extract(row_json, '$.detail_json'), '$.line') IS NOT NULL";
75const TOKEN_USAGE_CLAUDE_SESSIONS_KEY: &str = "COALESCE(json_extract(row_json, '$.session_id'), ''), \
76     json_extract(json_extract(row_json, '$.detail_json'), '$.path'), \
77     json_extract(json_extract(row_json, '$.detail_json'), '$.line')";
78
79/// Names visible in DataFusion queries, paired with their Turso storage tables.
80/// The only divergence is `tasks` vs `netsky_tasks`; everything else matches.
81const DATAFUSION_TABLES: [(&str, &str); 17] = [
82    ("messages", MESSAGES),
83    ("cli_invocations", CLI_INVOCATIONS),
84    ("crashes", CRASHES),
85    ("ticks", TICKS),
86    ("workspaces", WORKSPACES),
87    ("sessions", SESSIONS),
88    ("clone_dispatches", CLONE_DISPATCHES),
89    ("harvest_events", HARVEST_EVENTS),
90    ("communication_events", COMMUNICATION_EVENTS),
91    ("mcp_tool_calls", MCP_TOOL_CALLS),
92    ("git_operations", GIT_OPERATIONS),
93    ("owner_directives", OWNER_DIRECTIVES),
94    ("token_usage", TOKEN_USAGE),
95    ("watchdog_events", WATCHDOG_EVENTS),
96    ("tasks", TASKS),
97    ("source_errors", SOURCE_ERRORS),
98    ("iroh_events", IROH_EVENTS),
99];
100
101/// DataFusion view that depends on `clone_dispatches` rows.
102const CLONE_LIFETIMES_VIEW: &str = "clone_lifetimes";
103
104/// (storage_table, json_key) pairs indexed as range targets. Every
105/// analytics query in `netsky-cli/src/cmd/analytics.rs` is time-bounded,
106/// and `list_tasks` sorts by `created_at` when we push its ORDER BY
107/// down. Without these indexes, a time-range pushdown still has to
108/// scan every row. Expression indexes on `json_extract(row_json,
109/// '$.<key>')` let SQLite serve `WHERE json_extract(...) >= 'X' AND
110/// json_extract(...) < 'Y'` as an index range scan.
111const INDEXED_TIME_COLUMNS: &[(&str, &str)] = &[
112    (MESSAGES, "ts_utc"),
113    (CLI_INVOCATIONS, "ts_utc"),
114    (CRASHES, "ts_utc"),
115    (TICKS, "ts_utc"),
116    (WORKSPACES, "ts_utc_created"),
117    (SESSIONS, "ts_utc"),
118    (CLONE_DISPATCHES, "ts_utc_start"),
119    (HARVEST_EVENTS, "ts_utc"),
120    (COMMUNICATION_EVENTS, "ts_utc"),
121    (MCP_TOOL_CALLS, "ts_utc_start"),
122    (GIT_OPERATIONS, "ts_utc"),
123    (OWNER_DIRECTIVES, "ts_utc"),
124    (TOKEN_USAGE, "ts_utc"),
125    (WATCHDOG_EVENTS, "ts_utc"),
126    (TASKS, "created_at"),
127    (SOURCE_ERRORS, "ts_utc"),
128    (IROH_EVENTS, "ts_utc"),
129];
130
131/// (storage_table, json_key) pairs used as equality-lookup filters. These
132/// back the `list_tasks` push-down and the `iroh_summary` /
133/// `messages_summary` group-bys.
134const INDEXED_EQ_COLUMNS: &[(&str, &str)] = &[
135    (TASKS, "status"),
136    (TASKS, "priority"),
137    (MESSAGES, "source"),
138    (SESSIONS, "event"),
139    (IROH_EVENTS, "event_type"),
140];
141
142#[derive(Debug, Error)]
143pub enum Error {
144    #[error("home directory not found")]
145    HomeDirMissing,
146    #[error("schema version {found} is newer than supported {supported}")]
147    FutureSchemaVersion { found: i64, supported: i64 },
148    #[error("meta.db has not been initialized at {0}; a writer must run first.")]
149    NotInitialized(PathBuf),
150    #[error("read-only connection was writable at {0}")]
151    ReadOnlyNotEnforced(PathBuf),
152    #[error(transparent)]
153    Turso(#[from] turso::Error),
154    #[error(transparent)]
155    DataFusion(#[from] datafusion::error::DataFusionError),
156    #[error(transparent)]
157    Arrow(#[from] datafusion::arrow::error::ArrowError),
158    #[error(transparent)]
159    Serde(#[from] serde_json::Error),
160    #[error(transparent)]
161    Io(#[from] std::io::Error),
162    #[error(transparent)]
163    Chrono(#[from] chrono::ParseError),
164    #[error(
165        "netsky-db locked after {retries} retries (total wait {total_wait_ms}ms).\n  hint: a clone is writing heavily; retry or check `lsof` for holders."
166    )]
167    Locked {
168        retries: u32,
169        total_wait_ms: u128,
170        detail: String,
171    },
172    #[error("{0} not found")]
173    NotFound(String),
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
177pub enum Direction {
178    Inbound,
179    Outbound,
180}
181
182impl Direction {
183    fn as_str(self) -> &'static str {
184        match self {
185            Direction::Inbound => "inbound",
186            Direction::Outbound => "outbound",
187        }
188    }
189}
190
191#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
192pub enum SessionEvent {
193    Up,
194    Down,
195    Note,
196}
197
198impl SessionEvent {
199    fn as_str(self) -> &'static str {
200        match self {
201            SessionEvent::Up => "up",
202            SessionEvent::Down => "down",
203            SessionEvent::Note => "note",
204        }
205    }
206}
207
208/// Delivery lifecycle for an event. A row is first inserted with
209/// `Pending` before the delivery adapter runs; once the adapter returns,
210/// the row is updated to `Delivered` or `Failed` with a reason.
211#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
212pub enum EventStatus {
213    Pending,
214    Delivered,
215    Failed,
216}
217
218impl EventStatus {
219    pub fn as_str(self) -> &'static str {
220        match self {
221            EventStatus::Pending => "pending",
222            EventStatus::Delivered => "delivered",
223            EventStatus::Failed => "failed",
224        }
225    }
226}
227
228/// One row from `source_cursors`.
229#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
230pub struct SourceCursorRow {
231    pub source: String,
232    pub cursor_value: String,
233    pub updated_at: String,
234}
235
236/// One row from `events`.
237#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
238pub struct EventRow {
239    pub id: i64,
240    pub source: String,
241    pub ts_utc: String,
242    pub payload_json: String,
243    pub delivery_status: String,
244    pub reason: Option<String>,
245}
246
247/// Bounded error class for `source_errors`. Prevents cardinality blowup
248/// from free-form strings (stack traces, paths, or human text) making it
249/// into the analytics surface. Classification happens at the call site;
250/// anything that doesn't map cleanly goes to `Unknown`.
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
252pub enum SourceErrorClass {
253    Timeout,
254    AuthFailure,
255    RateLimit,
256    NetworkError,
257    ProtocolError,
258    NotFound,
259    PermissionDenied,
260    Unknown,
261}
262
263impl SourceErrorClass {
264    pub fn as_str(self) -> &'static str {
265        match self {
266            SourceErrorClass::Timeout => "timeout",
267            SourceErrorClass::AuthFailure => "auth_failure",
268            SourceErrorClass::RateLimit => "rate_limit",
269            SourceErrorClass::NetworkError => "network_error",
270            SourceErrorClass::ProtocolError => "protocol_error",
271            SourceErrorClass::NotFound => "not_found",
272            SourceErrorClass::PermissionDenied => "permission_denied",
273            SourceErrorClass::Unknown => "unknown",
274        }
275    }
276}
277
278/// Bounded event type for `iroh_events`. Analytics show connect counts,
279/// eviction pressure, and reconnect churn per peer-hash per day.
280#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
281pub enum IrohEventType {
282    Connect,
283    Evict,
284    Reconnect,
285    HandshakeRefused,
286}
287
288impl IrohEventType {
289    pub fn as_str(self) -> &'static str {
290        match self {
291            IrohEventType::Connect => "connect",
292            IrohEventType::Evict => "evict",
293            IrohEventType::Reconnect => "reconnect",
294            IrohEventType::HandshakeRefused => "handshake_refused",
295        }
296    }
297}
298
299/// SHA256-truncated, URL-safe hex hash of a raw iroh NodeId. Stable
300/// across a paired peer's identity lifetime; does NOT expose the raw
301/// ed25519 public key. The published analytics surface reads these
302/// hashes, never the raw ID.
303pub fn hash_peer_id(raw_node_id: &str) -> String {
304    use sha2::{Digest, Sha256};
305    let mut hasher = Sha256::new();
306    hasher.update(raw_node_id.as_bytes());
307    let digest = hasher.finalize();
308    hex_encode(&digest[..8])
309}
310
311fn hex_encode(bytes: &[u8]) -> String {
312    const HEX: &[u8; 16] = b"0123456789abcdef";
313    let mut out = String::with_capacity(bytes.len() * 2);
314    for byte in bytes {
315        out.push(HEX[(byte >> 4) as usize] as char);
316        out.push(HEX[(byte & 0x0f) as usize] as char);
317    }
318    out
319}
320
321pub struct Db {
322    path: PathBuf,
323}
324
325pub struct MessageRecord<'a> {
326    pub ts_utc: DateTime<Utc>,
327    pub source: &'a str,
328    pub direction: Direction,
329    pub chat_id: Option<&'a str>,
330    pub from_agent: Option<&'a str>,
331    pub to_agent: Option<&'a str>,
332    pub body: Option<&'a str>,
333    pub raw_json: Option<&'a str>,
334}
335
336pub struct CloneDispatchRecord<'a> {
337    pub ts_utc_start: DateTime<Utc>,
338    pub ts_utc_end: Option<DateTime<Utc>>,
339    pub agent_id: &'a str,
340    pub runtime: Option<&'a str>,
341    pub brief_path: Option<&'a str>,
342    pub brief: Option<&'a str>,
343    pub workspace: Option<&'a str>,
344    pub branch: Option<&'a str>,
345    pub status: Option<&'a str>,
346    pub exit_code: Option<i64>,
347    pub detail_json: Option<&'a str>,
348}
349
350pub struct HarvestEventRecord<'a> {
351    pub ts_utc: DateTime<Utc>,
352    pub source_branch: &'a str,
353    pub target_branch: &'a str,
354    pub commit_sha: Option<&'a str>,
355    pub status: &'a str,
356    pub conflicts: Option<&'a str>,
357    pub detail_json: Option<&'a str>,
358}
359
360pub struct CommunicationEventRecord<'a> {
361    pub ts_utc: DateTime<Utc>,
362    pub source: &'a str,
363    pub tool: Option<&'a str>,
364    pub direction: Direction,
365    pub chat_id: Option<&'a str>,
366    pub message_id: Option<&'a str>,
367    pub handle: Option<&'a str>,
368    pub agent: Option<&'a str>,
369    pub body: Option<&'a str>,
370    pub status: Option<&'a str>,
371    pub detail_json: Option<&'a str>,
372}
373
374#[derive(Debug, Clone, Eq, PartialEq)]
375pub struct CommunicationEvent {
376    pub id: i64,
377    pub ts_utc: DateTime<Utc>,
378    pub source: String,
379    pub tool: Option<String>,
380    pub direction: String,
381    pub chat_id: Option<String>,
382    pub message_id: Option<String>,
383    pub handle: Option<String>,
384    pub agent: Option<String>,
385    pub body: Option<String>,
386    pub status: Option<String>,
387    pub detail_json: Option<String>,
388}
389
390pub struct McpToolCallRecord<'a> {
391    pub ts_utc_start: DateTime<Utc>,
392    pub ts_utc_end: Option<DateTime<Utc>>,
393    pub source: &'a str,
394    pub tool: &'a str,
395    pub agent: Option<&'a str>,
396    pub duration_ms: Option<i64>,
397    pub success: bool,
398    pub error: Option<&'a str>,
399    pub timeout_race: bool,
400    pub request_json: Option<&'a str>,
401    pub response_json: Option<&'a str>,
402}
403
404pub struct GitOperationRecord<'a> {
405    pub ts_utc: DateTime<Utc>,
406    pub operation: &'a str,
407    pub repo: &'a str,
408    pub branch: Option<&'a str>,
409    pub remote: Option<&'a str>,
410    pub from_sha: Option<&'a str>,
411    pub to_sha: Option<&'a str>,
412    pub status: &'a str,
413    pub detail_json: Option<&'a str>,
414}
415
416pub struct OwnerDirectiveRecord<'a> {
417    pub ts_utc: DateTime<Utc>,
418    pub source: &'a str,
419    pub chat_id: Option<&'a str>,
420    pub raw_text: &'a str,
421    pub resolved_action: Option<&'a str>,
422    pub agent: Option<&'a str>,
423    pub status: Option<&'a str>,
424    pub detail_json: Option<&'a str>,
425}
426
427pub struct TokenUsageRecord<'a> {
428    pub ts_utc: DateTime<Utc>,
429    pub session_id: Option<&'a str>,
430    pub agent: Option<&'a str>,
431    /// Runtime that emitted the usage: "claude", "codex", "copilot", etc.
432    /// Distinct from `model` so analytics can slice by process runtime
433    /// independent of which model the runtime happened to invoke.
434    pub runtime: Option<&'a str>,
435    pub model: Option<&'a str>,
436    pub input_tokens: Option<i64>,
437    pub output_tokens: Option<i64>,
438    pub cached_input_tokens: Option<i64>,
439    pub cost_usd_micros: Option<i64>,
440    pub detail_json: Option<&'a str>,
441}
442
443pub struct SourceErrorRecord<'a> {
444    pub ts_utc: DateTime<Utc>,
445    pub source: &'a str,
446    pub error_class: SourceErrorClass,
447    /// Count for pre-aggregated writes. Defaults to 1 for per-event
448    /// writes. A future in-process bucketing layer can flush rolled-up
449    /// counts without changing the schema.
450    pub count: i64,
451    pub detail_json: Option<&'a str>,
452}
453
454pub struct IrohEventRecord<'a> {
455    pub ts_utc: DateTime<Utc>,
456    pub event_type: IrohEventType,
457    /// SHA256-truncated hash of the raw NodeId. Call `hash_peer_id` at
458    /// the hook site; never pass the raw ed25519 ID here.
459    pub peer_id_hash: &'a str,
460    /// Owner-defined label from peers.toml, if the peer was resolved
461    /// before the event fired. `None` for `HandshakeRefused` where the
462    /// label lookup failed.
463    pub peer_label: Option<&'a str>,
464    pub detail_json: Option<&'a str>,
465}
466
467pub struct WatchdogEventRecord<'a> {
468    pub ts_utc: DateTime<Utc>,
469    pub event: &'a str,
470    pub agent: Option<&'a str>,
471    pub severity: Option<&'a str>,
472    pub status: Option<&'a str>,
473    pub detail_json: Option<&'a str>,
474}
475
476pub struct TaskRecord<'a> {
477    pub title: &'a str,
478    pub body: Option<&'a str>,
479    pub status: &'a str,
480    pub priority: Option<&'a str>,
481    pub labels: Option<&'a str>,
482    pub source: Option<&'a str>,
483    pub source_ref: Option<&'a str>,
484    pub closed_at: Option<&'a str>,
485    pub closed_reason: Option<&'a str>,
486    pub closed_evidence: Option<&'a str>,
487    pub agent: Option<&'a str>,
488    pub sync_calendar: bool,
489    pub calendar_task_id: Option<&'a str>,
490}
491
492pub struct TaskUpdate<'a> {
493    pub id: i64,
494    pub status: Option<&'a str>,
495    pub priority: Option<&'a str>,
496    pub agent: Option<&'a str>,
497    pub closed_reason: Option<&'a str>,
498    pub closed_evidence: Option<&'a str>,
499    pub sync_calendar: Option<bool>,
500    pub calendar_task_id: Option<&'a str>,
501}
502
503#[derive(Debug, Serialize, Deserialize)]
504struct MessageRow {
505    id: i64,
506    ts_utc: String,
507    source: String,
508    direction: String,
509    chat_id: Option<String>,
510    from_agent: Option<String>,
511    to_agent: Option<String>,
512    body: Option<String>,
513    raw_json: Option<String>,
514}
515
516#[derive(Debug, Serialize, Deserialize)]
517struct CliRow {
518    id: i64,
519    ts_utc: String,
520    bin: String,
521    argv_json: String,
522    exit_code: Option<i64>,
523    duration_ms: Option<i64>,
524    host: String,
525}
526
527#[derive(Debug, Serialize, Deserialize)]
528struct CrashRow {
529    id: i64,
530    ts_utc: String,
531    kind: String,
532    agent: String,
533    detail_json: String,
534}
535
536#[derive(Debug, Serialize, Deserialize)]
537struct TickRow {
538    id: i64,
539    ts_utc: String,
540    source: String,
541    detail_json: String,
542}
543
544#[derive(Debug, Serialize, Deserialize)]
545struct WorkspaceRow {
546    id: i64,
547    ts_utc_created: String,
548    name: String,
549    branch: String,
550    ts_utc_deleted: Option<String>,
551    verdict: Option<String>,
552}
553
554#[derive(Debug, Serialize, Deserialize)]
555struct SessionRow {
556    id: i64,
557    ts_utc: String,
558    agent: String,
559    session_num: i64,
560    event: String,
561}
562
563#[derive(Debug, Serialize, Deserialize)]
564struct CloneDispatchRow {
565    id: i64,
566    ts_utc_start: String,
567    ts_utc_end: Option<String>,
568    agent_id: String,
569    runtime: Option<String>,
570    brief_path: Option<String>,
571    brief: Option<String>,
572    workspace: Option<String>,
573    branch: Option<String>,
574    status: Option<String>,
575    exit_code: Option<i64>,
576    detail_json: Option<String>,
577}
578
579#[derive(Debug, Serialize, Deserialize)]
580struct HarvestEventRow {
581    id: i64,
582    ts_utc: String,
583    source_branch: String,
584    target_branch: String,
585    commit_sha: Option<String>,
586    status: String,
587    conflicts: Option<String>,
588    detail_json: Option<String>,
589}
590
591#[derive(Debug, Serialize, Deserialize)]
592struct CommunicationEventRow {
593    id: i64,
594    ts_utc: String,
595    source: String,
596    tool: Option<String>,
597    direction: String,
598    chat_id: Option<String>,
599    message_id: Option<String>,
600    handle: Option<String>,
601    agent: Option<String>,
602    body: Option<String>,
603    status: Option<String>,
604    detail_json: Option<String>,
605}
606
607impl TryFrom<CommunicationEventRow> for CommunicationEvent {
608    type Error = Error;
609
610    fn try_from(row: CommunicationEventRow) -> Result<Self> {
611        Ok(Self {
612            id: row.id,
613            ts_utc: parse_ts(&row.ts_utc)?,
614            source: row.source,
615            tool: row.tool,
616            direction: row.direction,
617            chat_id: row.chat_id,
618            message_id: row.message_id,
619            handle: row.handle,
620            agent: row.agent,
621            body: row.body,
622            status: row.status,
623            detail_json: row.detail_json,
624        })
625    }
626}
627
628#[derive(Debug, Serialize, Deserialize)]
629struct McpToolCallRow {
630    id: i64,
631    ts_utc_start: String,
632    ts_utc_end: Option<String>,
633    source: String,
634    tool: String,
635    agent: Option<String>,
636    duration_ms: Option<i64>,
637    success: bool,
638    error: Option<String>,
639    timeout_race: bool,
640    request_json: Option<String>,
641    response_json: Option<String>,
642}
643
644#[derive(Debug, Serialize, Deserialize)]
645struct GitOperationRow {
646    id: i64,
647    ts_utc: String,
648    operation: String,
649    repo: String,
650    branch: Option<String>,
651    remote: Option<String>,
652    from_sha: Option<String>,
653    to_sha: Option<String>,
654    status: String,
655    detail_json: Option<String>,
656}
657
658#[derive(Debug, Serialize, Deserialize)]
659struct OwnerDirectiveRow {
660    id: i64,
661    ts_utc: String,
662    source: String,
663    chat_id: Option<String>,
664    raw_text: String,
665    resolved_action: Option<String>,
666    agent: Option<String>,
667    status: Option<String>,
668    detail_json: Option<String>,
669}
670
671#[derive(Debug, Serialize, Deserialize)]
672struct TokenUsageRow {
673    id: i64,
674    ts_utc: String,
675    session_id: Option<String>,
676    agent: Option<String>,
677    #[serde(default)]
678    runtime: Option<String>,
679    model: Option<String>,
680    input_tokens: Option<i64>,
681    output_tokens: Option<i64>,
682    cached_input_tokens: Option<i64>,
683    cost_usd_micros: Option<i64>,
684    detail_json: Option<String>,
685}
686
687#[derive(Debug, Serialize, Deserialize)]
688struct SourceErrorRow {
689    id: i64,
690    ts_utc: String,
691    source: String,
692    error_class: String,
693    count: i64,
694    detail_json: Option<String>,
695}
696
697#[derive(Debug, Serialize, Deserialize)]
698struct IrohEventRow {
699    id: i64,
700    ts_utc: String,
701    event_type: String,
702    peer_id_hash: String,
703    peer_label: Option<String>,
704    detail_json: Option<String>,
705}
706
707#[derive(Debug, Clone, Serialize, Deserialize)]
708struct WatchdogEventRow {
709    id: i64,
710    ts_utc: String,
711    event: String,
712    agent: Option<String>,
713    severity: Option<String>,
714    status: Option<String>,
715    detail_json: Option<String>,
716}
717
718#[derive(Debug, Clone, Serialize, Deserialize)]
719pub struct TaskRow {
720    pub id: i64,
721    pub created_at: String,
722    pub updated_at: String,
723    pub title: String,
724    pub body: Option<String>,
725    pub status: String,
726    pub priority: Option<String>,
727    pub labels: Option<String>,
728    pub source: Option<String>,
729    pub source_ref: Option<String>,
730    pub closed_at: Option<String>,
731    pub closed_reason: Option<String>,
732    pub closed_evidence: Option<String>,
733    pub agent: Option<String>,
734    #[serde(default)]
735    pub sync_calendar: bool,
736    #[serde(default)]
737    pub calendar_task_id: Option<String>,
738}
739
740impl Db {
741    pub fn open() -> Result<Self> {
742        let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
743        let dir = home.join(".netsky");
744        fs::create_dir_all(&dir)?;
745        Self::open_path(dir.join("meta.db"))
746    }
747
748    pub fn open_read_only() -> Result<Self> {
749        let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
750        Self::open_read_only_path(home.join(".netsky").join("meta.db"))
751    }
752
753    pub fn open_path(path: impl AsRef<Path>) -> Result<Self> {
754        let path = path.as_ref().to_path_buf();
755        if path != Path::new(":memory:")
756            && let Some(parent) = path.parent()
757        {
758            fs::create_dir_all(parent)?;
759        }
760        let db = Self { path };
761        db.with_conn(|_| async { Ok(()) })?;
762        Ok(db)
763    }
764
765    pub fn open_read_only_path(path: impl AsRef<Path>) -> Result<Self> {
766        let path = path.as_ref().to_path_buf();
767        if path != Path::new(":memory:") && !path.exists() {
768            return Err(Error::NotInitialized(path));
769        }
770        let db = Self { path };
771        db.with_read_only_conn(|_| async { Ok(()) })?;
772        Ok(db)
773    }
774
775    #[cfg(test)]
776    pub(crate) fn open_in_memory() -> Result<Self> {
777        let path = std::env::temp_dir().join(format!(
778            "netsky-db-test-{}-{}.db",
779            std::process::id(),
780            Utc::now().timestamp_nanos_opt().unwrap_or_default()
781        ));
782        Self::open_path(path)
783    }
784
785    pub fn migrate(&self) -> Result<()> {
786        let path = self.path.clone();
787        self.with_conn(move |conn| async move {
788            let current = schema_version(&conn).await?;
789            if current > SCHEMA_VERSION {
790                return Err(Error::FutureSchemaVersion {
791                    found: current,
792                    supported: SCHEMA_VERSION,
793                });
794            }
795            conn.execute(
796                "INSERT INTO meta (key, value) VALUES (?1, ?2) \
797                 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
798                params_from_iter([Value::from("schema_version"), Value::from(SCHEMA_VERSION)]),
799            )
800            .await?;
801            for table in TABLE_NAMES {
802                let sql = format!(
803                    "CREATE TABLE IF NOT EXISTS {table} \
804                     (id INTEGER PRIMARY KEY, row_json TEXT NOT NULL)"
805                );
806                conn.execute(&sql, ()).await?;
807            }
808            for &(table, column) in INDEXED_TIME_COLUMNS {
809                let sql = format!(
810                    "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
811                     ON {table}(json_extract(row_json, '$.{column}'))"
812                );
813                conn.execute(&sql, ()).await?;
814            }
815            for &(table, column) in INDEXED_EQ_COLUMNS {
816                let sql = format!(
817                    "CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
818                     ON {table}(json_extract(row_json, '$.{column}'))"
819                );
820                conn.execute(&sql, ()).await?;
821            }
822            // v6: structured tables for source-cursor durability and
823            // event delivery status. Columns are real (not row_json)
824            // because both tables have a small fixed shape and the CLI
825            // needs to read individual fields without a JSON parse.
826            conn.execute(
827                "CREATE TABLE IF NOT EXISTS source_cursors ( \
828                     source TEXT PRIMARY KEY, \
829                     cursor_value TEXT NOT NULL, \
830                     updated_at TEXT NOT NULL \
831                 )",
832                (),
833            )
834            .await?;
835            conn.execute(
836                "CREATE TABLE IF NOT EXISTS events ( \
837                     id INTEGER PRIMARY KEY AUTOINCREMENT, \
838                     source TEXT NOT NULL, \
839                     ts_utc TEXT NOT NULL, \
840                     payload_json TEXT NOT NULL, \
841                     delivery_status TEXT NOT NULL, \
842                     reason TEXT \
843                 )",
844                (),
845            )
846            .await?;
847            migrate_token_usage_ingest_idempotency(&path, &conn, current).await?;
848            conn.execute(&format!("PRAGMA user_version = {SCHEMA_VERSION}"), ())
849                .await?;
850            Ok(())
851        })
852    }
853
854    pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
855        self.insert_json("messages", MESSAGES, |id| MessageRow {
856            id,
857            ts_utc: ts(record.ts_utc),
858            source: record.source.to_string(),
859            direction: record.direction.as_str().to_string(),
860            chat_id: record.chat_id.map(str::to_string),
861            from_agent: record.from_agent.map(str::to_string),
862            to_agent: record.to_agent.map(str::to_string),
863            body: record.body.map(str::to_string),
864            raw_json: record.raw_json.map(str::to_string),
865        })
866    }
867
868    pub fn record_cli(
869        &self,
870        ts_utc: DateTime<Utc>,
871        bin: &str,
872        argv_json: &str,
873        exit_code: Option<i64>,
874        duration_ms: Option<i64>,
875        host: &str,
876    ) -> Result<i64> {
877        self.insert_json("cli_invocations", CLI_INVOCATIONS, |id| CliRow {
878            id,
879            ts_utc: ts(ts_utc),
880            bin: bin.to_string(),
881            argv_json: argv_json.to_string(),
882            exit_code,
883            duration_ms,
884            host: host.to_string(),
885        })
886    }
887
888    pub fn record_crash(
889        &self,
890        ts_utc: DateTime<Utc>,
891        kind: &str,
892        agent: &str,
893        detail_json: &str,
894    ) -> Result<i64> {
895        self.insert_json("crashes", CRASHES, |id| CrashRow {
896            id,
897            ts_utc: ts(ts_utc),
898            kind: kind.to_string(),
899            agent: agent.to_string(),
900            detail_json: detail_json.to_string(),
901        })
902    }
903
904    pub fn record_tick(
905        &self,
906        ts_utc: DateTime<Utc>,
907        source: &str,
908        detail_json: &str,
909    ) -> Result<i64> {
910        self.insert_json("ticks", TICKS, |id| TickRow {
911            id,
912            ts_utc: ts(ts_utc),
913            source: source.to_string(),
914            detail_json: detail_json.to_string(),
915        })
916    }
917
918    pub fn record_workspace(
919        &self,
920        ts_utc_created: DateTime<Utc>,
921        name: &str,
922        branch: &str,
923        ts_utc_deleted: Option<DateTime<Utc>>,
924        verdict: Option<&str>,
925    ) -> Result<i64> {
926        self.insert_json("workspaces", WORKSPACES, |id| WorkspaceRow {
927            id,
928            ts_utc_created: ts(ts_utc_created),
929            name: name.to_string(),
930            branch: branch.to_string(),
931            ts_utc_deleted: ts_utc_deleted.map(ts),
932            verdict: verdict.map(str::to_string),
933        })
934    }
935
936    pub fn record_session(
937        &self,
938        ts_utc: DateTime<Utc>,
939        agent: &str,
940        session_num: i64,
941        event: SessionEvent,
942    ) -> Result<i64> {
943        self.insert_json("sessions", SESSIONS, |id| SessionRow {
944            id,
945            ts_utc: ts(ts_utc),
946            agent: agent.to_string(),
947            session_num,
948            event: event.as_str().to_string(),
949        })
950    }
951
952    pub fn record_clone_dispatch(&self, record: CloneDispatchRecord<'_>) -> Result<i64> {
953        self.insert_json("clone_dispatches", CLONE_DISPATCHES, |id| {
954            CloneDispatchRow {
955                id,
956                ts_utc_start: ts(record.ts_utc_start),
957                ts_utc_end: record.ts_utc_end.map(ts),
958                agent_id: record.agent_id.to_string(),
959                runtime: record.runtime.map(str::to_string),
960                brief_path: record.brief_path.map(str::to_string),
961                brief: truncate_opt(record.brief, 16_384),
962                workspace: record.workspace.map(str::to_string),
963                branch: record.branch.map(str::to_string),
964                status: record.status.map(str::to_string),
965                exit_code: record.exit_code,
966                detail_json: record.detail_json.map(str::to_string),
967            }
968        })
969    }
970
971    pub fn record_harvest_event(&self, record: HarvestEventRecord<'_>) -> Result<i64> {
972        self.insert_json("harvest_events", HARVEST_EVENTS, |id| HarvestEventRow {
973            id,
974            ts_utc: ts(record.ts_utc),
975            source_branch: record.source_branch.to_string(),
976            target_branch: record.target_branch.to_string(),
977            commit_sha: record.commit_sha.map(str::to_string),
978            status: record.status.to_string(),
979            conflicts: record.conflicts.map(str::to_string),
980            detail_json: record.detail_json.map(str::to_string),
981        })
982    }
983
984    pub fn record_communication_event(&self, record: CommunicationEventRecord<'_>) -> Result<i64> {
985        self.insert_json("communication_events", COMMUNICATION_EVENTS, |id| {
986            CommunicationEventRow {
987                id,
988                ts_utc: ts(record.ts_utc),
989                source: record.source.to_string(),
990                tool: record.tool.map(str::to_string),
991                direction: record.direction.as_str().to_string(),
992                chat_id: record.chat_id.map(str::to_string),
993                message_id: record.message_id.map(str::to_string),
994                handle: record.handle.map(str::to_string),
995                agent: record.agent.map(str::to_string),
996                body: truncate_opt(record.body, 4096),
997                status: record.status.map(str::to_string),
998                detail_json: record.detail_json.map(str::to_string),
999            }
1000        })
1001    }
1002
1003    pub fn list_communication_events(&self) -> Result<Vec<CommunicationEvent>> {
1004        let rows: Vec<CommunicationEventRow> = self
1005            .with_read_only_conn(|conn| async move { rows(&conn, COMMUNICATION_EVENTS).await })?;
1006        rows.into_iter().map(CommunicationEvent::try_from).collect()
1007    }
1008
1009    pub fn record_mcp_tool_call(&self, record: McpToolCallRecord<'_>) -> Result<i64> {
1010        self.insert_json("mcp_tool_calls", MCP_TOOL_CALLS, |id| McpToolCallRow {
1011            id,
1012            ts_utc_start: ts(record.ts_utc_start),
1013            ts_utc_end: record.ts_utc_end.map(ts),
1014            source: record.source.to_string(),
1015            tool: record.tool.to_string(),
1016            agent: record.agent.map(str::to_string),
1017            duration_ms: record.duration_ms,
1018            success: record.success,
1019            error: truncate_opt(record.error, 2048),
1020            timeout_race: record.timeout_race,
1021            request_json: truncate_opt(record.request_json, 8192),
1022            response_json: truncate_opt(record.response_json, 8192),
1023        })
1024    }
1025
1026    pub fn record_git_operation(&self, record: GitOperationRecord<'_>) -> Result<i64> {
1027        self.insert_json("git_operations", GIT_OPERATIONS, |id| GitOperationRow {
1028            id,
1029            ts_utc: ts(record.ts_utc),
1030            operation: record.operation.to_string(),
1031            repo: record.repo.to_string(),
1032            branch: record.branch.map(str::to_string),
1033            remote: record.remote.map(str::to_string),
1034            from_sha: record.from_sha.map(str::to_string),
1035            to_sha: record.to_sha.map(str::to_string),
1036            status: record.status.to_string(),
1037            detail_json: record.detail_json.map(str::to_string),
1038        })
1039    }
1040
1041    pub fn record_owner_directive(&self, record: OwnerDirectiveRecord<'_>) -> Result<i64> {
1042        self.insert_json("owner_directives", OWNER_DIRECTIVES, |id| {
1043            OwnerDirectiveRow {
1044                id,
1045                ts_utc: ts(record.ts_utc),
1046                source: record.source.to_string(),
1047                chat_id: record.chat_id.map(str::to_string),
1048                raw_text: truncate(record.raw_text, 16_384),
1049                resolved_action: record.resolved_action.map(str::to_string),
1050                agent: record.agent.map(str::to_string),
1051                status: record.status.map(str::to_string),
1052                detail_json: record.detail_json.map(str::to_string),
1053            }
1054        })
1055    }
1056
1057    pub fn record_token_usage(&self, record: TokenUsageRecord<'_>) -> Result<i64> {
1058        self.insert_json("token_usage", TOKEN_USAGE, |id| TokenUsageRow {
1059            id,
1060            ts_utc: ts(record.ts_utc),
1061            session_id: record.session_id.map(str::to_string),
1062            agent: record.agent.map(str::to_string),
1063            runtime: record.runtime.map(str::to_string),
1064            model: record.model.map(str::to_string),
1065            input_tokens: record.input_tokens,
1066            output_tokens: record.output_tokens,
1067            cached_input_tokens: record.cached_input_tokens,
1068            cost_usd_micros: record.cost_usd_micros,
1069            detail_json: record.detail_json.map(str::to_string),
1070        })
1071    }
1072
1073    /// Batch-insert token usage rows. One turso connection, one transaction,
1074    /// one bulk id allocation. Use this from ingest paths that emit
1075    /// thousands of rows; per-row [`record_token_usage`] opens a fresh
1076    /// turso connection per call which is dominated by setup cost.
1077    ///
1078    /// Returns the number of rows written. Empty input is a no-op.
1079    pub fn record_token_usage_batch<'a, I>(&self, records: I) -> Result<usize>
1080    where
1081        I: IntoIterator<Item = TokenUsageRecord<'a>>,
1082    {
1083        let raw: Vec<TokenUsageRecord<'_>> = records.into_iter().collect();
1084        if raw.is_empty() {
1085            return Ok(0);
1086        }
1087        let count = raw.len();
1088        let start_id = self.bulk_alloc_ids(TOKEN_USAGE, count as i64)?;
1089        let mut payloads: Vec<(i64, String)> = Vec::with_capacity(count);
1090        let mut owned_rows: Vec<TokenUsageRow> = Vec::with_capacity(count);
1091        for (offset, record) in raw.into_iter().enumerate() {
1092            let id = start_id + offset as i64;
1093            let row = TokenUsageRow {
1094                id,
1095                ts_utc: ts(record.ts_utc),
1096                session_id: record.session_id.map(str::to_string),
1097                agent: record.agent.map(str::to_string),
1098                runtime: record.runtime.map(str::to_string),
1099                model: record.model.map(str::to_string),
1100                input_tokens: record.input_tokens,
1101                output_tokens: record.output_tokens,
1102                cached_input_tokens: record.cached_input_tokens,
1103                cost_usd_micros: record.cost_usd_micros,
1104                detail_json: record.detail_json.map(str::to_string),
1105            };
1106            payloads.push((id, serde_json::to_string(&row)?));
1107            owned_rows.push(row);
1108        }
1109        let table = TOKEN_USAGE;
1110        let payloads_for_write = payloads.clone();
1111        let write = retry_db_lock(|| {
1112            let payloads_for_write = payloads_for_write.clone();
1113            self.with_conn(move |conn| async move {
1114                let sql = format!(
1115                    "INSERT INTO {table} (id, row_json) VALUES (?1, ?2) ON CONFLICT DO NOTHING"
1116                );
1117                conn.execute("BEGIN", ()).await?;
1118                let mut inserted = 0usize;
1119                for (id, json) in payloads_for_write {
1120                    match conn
1121                        .execute(&sql, params_from_iter([Value::from(id), Value::from(json)]))
1122                        .await
1123                    {
1124                        Ok(changed) => inserted += changed as usize,
1125                        Err(error) => {
1126                            let _ = conn.execute("ROLLBACK", ()).await;
1127                            return Err(error.into());
1128                        }
1129                    }
1130                }
1131                conn.execute("COMMIT", ()).await?;
1132                Ok(inserted)
1133            })
1134        });
1135        let inserted = match write {
1136            Ok(inserted) => inserted,
1137            Err(error) => {
1138                for row in &owned_rows {
1139                    spool_write_error(table, row.id, row, &error)?;
1140                }
1141                return Err(error);
1142            }
1143        };
1144        if inserted < count {
1145            self.with_conn(move |conn| async move {
1146                let sql =
1147                    "UPDATE ids SET id = MAX(COALESCE((SELECT MAX(id) FROM token_usage), 0), 0) \
1148                     WHERE name = ?1";
1149                conn.execute(sql, params_from_iter([Value::from(TOKEN_USAGE)]))
1150                    .await?;
1151                Ok(())
1152            })?;
1153        }
1154        Ok(inserted)
1155    }
1156
1157    /// Reserve `count` consecutive ids for `table` in a single round-trip.
1158    /// Returns the first id of the reserved range; ids span `[start,
1159    /// start + count)`. On failure, falls back to a per-row id sequence
1160    /// derived from the wall clock so the caller can still spool rows.
1161    fn bulk_alloc_ids(&self, name: &'static str, count: i64) -> Result<i64> {
1162        if count <= 0 {
1163            return Ok(0);
1164        }
1165        let result = self.with_conn(move |conn| async move {
1166            let mut rows = conn
1167                .query(
1168                    "INSERT INTO ids (name, id) VALUES (?1, ?2) \
1169                     ON CONFLICT(name) DO UPDATE SET id = id + ?2 \
1170                     RETURNING id",
1171                    params_from_iter([Value::from(name), Value::from(count)]),
1172                )
1173                .await?;
1174            if let Some(row) = rows.next().await? {
1175                let last = integer_value(&row.get_value(0)?)?;
1176                return Ok(last - count + 1);
1177            }
1178            Err(Error::NotFound(format!("bulk id allocation for {name}")))
1179        });
1180        match result {
1181            Ok(start) => Ok(start),
1182            Err(error) => {
1183                let fallback = Utc::now().timestamp_micros();
1184                spool_error_json(serde_json::json!({
1185                    "ts_utc": ts(Utc::now()),
1186                    "table": name,
1187                    "id": fallback,
1188                    "count": count,
1189                    "error": error.to_string(),
1190                    "record": {"kind": "bulk_id_allocation"}
1191                }))?;
1192                Ok(fallback)
1193            }
1194        }
1195    }
1196
1197    pub fn record_source_error(&self, record: SourceErrorRecord<'_>) -> Result<i64> {
1198        self.insert_json("source_errors", SOURCE_ERRORS, |id| SourceErrorRow {
1199            id,
1200            ts_utc: ts(record.ts_utc),
1201            source: record.source.to_string(),
1202            error_class: record.error_class.as_str().to_string(),
1203            count: record.count.max(1),
1204            detail_json: record.detail_json.map(str::to_string),
1205        })
1206    }
1207
1208    pub fn record_iroh_event(&self, record: IrohEventRecord<'_>) -> Result<i64> {
1209        self.insert_json("iroh_events", IROH_EVENTS, |id| IrohEventRow {
1210            id,
1211            ts_utc: ts(record.ts_utc),
1212            event_type: record.event_type.as_str().to_string(),
1213            peer_id_hash: record.peer_id_hash.to_string(),
1214            peer_label: record.peer_label.map(str::to_string),
1215            detail_json: record.detail_json.map(str::to_string),
1216        })
1217    }
1218
1219    pub fn record_watchdog_event(&self, record: WatchdogEventRecord<'_>) -> Result<i64> {
1220        self.insert_json("watchdog_events", WATCHDOG_EVENTS, |id| WatchdogEventRow {
1221            id,
1222            ts_utc: ts(record.ts_utc),
1223            event: record.event.to_string(),
1224            agent: record.agent.map(str::to_string),
1225            severity: record.severity.map(str::to_string),
1226            status: record.status.map(str::to_string),
1227            detail_json: record.detail_json.map(str::to_string),
1228        })
1229    }
1230
1231    /// Read the cursor recorded for this source. `None` means no cursor
1232    /// has been written yet (first-run).
1233    pub fn read_source_cursor(&self, source: &str) -> Result<Option<String>> {
1234        let source = source.to_string();
1235        self.with_conn(move |conn| async move {
1236            let mut rows = conn
1237                .query(
1238                    "SELECT cursor_value FROM source_cursors WHERE source = ?1",
1239                    params_from_iter([Value::from(source)]),
1240                )
1241                .await?;
1242            let Some(row) = rows.next().await? else {
1243                return Ok(None);
1244            };
1245            Ok(Some(text_value(&row.get_value(0)?)?))
1246        })
1247    }
1248
1249    /// Write (upsert) the cursor for this source. `updated_at` is set
1250    /// to the current UTC instant.
1251    pub fn update_source_cursor(&self, source: &str, value: &str) -> Result<()> {
1252        let source = source.to_string();
1253        let value = value.to_string();
1254        let now = ts(Utc::now());
1255        retry_db_lock(|| {
1256            let source = source.clone();
1257            let value = value.clone();
1258            let now = now.clone();
1259            self.with_conn(move |conn| async move {
1260                conn.execute(
1261                    "INSERT INTO source_cursors (source, cursor_value, updated_at) \
1262                     VALUES (?1, ?2, ?3) \
1263                     ON CONFLICT(source) DO UPDATE SET \
1264                         cursor_value = excluded.cursor_value, \
1265                         updated_at = excluded.updated_at",
1266                    params_from_iter([Value::from(source), Value::from(value), Value::from(now)]),
1267                )
1268                .await?;
1269                Ok(())
1270            })
1271        })
1272    }
1273
1274    /// Delete the cursor row for this source. Idempotent; returns
1275    /// whether a row was removed.
1276    pub fn reset_source_cursor(&self, source: &str) -> Result<bool> {
1277        let source = source.to_string();
1278        retry_db_lock(|| {
1279            let source = source.clone();
1280            self.with_conn(move |conn| async move {
1281                let mut rows = conn
1282                    .query(
1283                        "DELETE FROM source_cursors WHERE source = ?1 RETURNING source",
1284                        params_from_iter([Value::from(source)]),
1285                    )
1286                    .await?;
1287                Ok(rows.next().await?.is_some())
1288            })
1289        })
1290    }
1291
1292    pub fn list_source_cursors(&self) -> Result<Vec<SourceCursorRow>> {
1293        self.with_read_only_conn(|conn| async move {
1294            let mut rows = conn
1295                .query(
1296                    "SELECT source, cursor_value, updated_at FROM source_cursors ORDER BY source",
1297                    (),
1298                )
1299                .await?;
1300            let mut out = Vec::new();
1301            while let Some(row) = rows.next().await? {
1302                out.push(SourceCursorRow {
1303                    source: text_value(&row.get_value(0)?)?,
1304                    cursor_value: text_value(&row.get_value(1)?)?,
1305                    updated_at: text_value(&row.get_value(2)?)?,
1306                });
1307            }
1308            Ok(out)
1309        })
1310    }
1311
1312    /// Insert an event row with `delivery_status = 'pending'`. Returns
1313    /// the allocated id so the caller can update the status after the
1314    /// delivery adapter runs.
1315    pub fn insert_event(
1316        &self,
1317        source: &str,
1318        ts_utc: DateTime<Utc>,
1319        payload_json: &str,
1320    ) -> Result<i64> {
1321        let source = source.to_string();
1322        let ts_str = ts(ts_utc);
1323        let payload = payload_json.to_string();
1324        retry_db_lock(|| {
1325            let source = source.clone();
1326            let ts_str = ts_str.clone();
1327            let payload = payload.clone();
1328            self.with_conn(move |conn| async move {
1329                let mut rows = conn
1330                    .query(
1331                        "INSERT INTO events (source, ts_utc, payload_json, delivery_status) \
1332                         VALUES (?1, ?2, ?3, 'pending') RETURNING id",
1333                        params_from_iter([
1334                            Value::from(source),
1335                            Value::from(ts_str),
1336                            Value::from(payload),
1337                        ]),
1338                    )
1339                    .await?;
1340                let Some(row) = rows.next().await? else {
1341                    return Err(Error::NotFound("event id after insert".into()));
1342                };
1343                integer_value(&row.get_value(0)?)
1344            })
1345        })
1346    }
1347
1348    /// Transition an event to `delivered` or `failed`. `reason` is the
1349    /// failure text for `failed`; ignored (but recorded) for `delivered`.
1350    pub fn update_event_delivery(
1351        &self,
1352        id: i64,
1353        status: EventStatus,
1354        reason: Option<&str>,
1355    ) -> Result<()> {
1356        let reason_value = reason.map(str::to_string);
1357        let status_str = status.as_str().to_string();
1358        retry_db_lock(|| {
1359            let status_str = status_str.clone();
1360            let reason_value = reason_value.clone();
1361            self.with_conn(move |conn| async move {
1362                conn.execute(
1363                    "UPDATE events SET delivery_status = ?1, reason = ?2 WHERE id = ?3",
1364                    params_from_iter([
1365                        Value::from(status_str),
1366                        match reason_value {
1367                            Some(r) => Value::from(r),
1368                            None => Value::Null,
1369                        },
1370                        Value::from(id),
1371                    ]),
1372                )
1373                .await?;
1374                Ok(())
1375            })
1376        })
1377    }
1378
1379    /// Tail the most recent events for a source (newest first).
1380    pub fn tail_events(&self, source: &str, limit: i64) -> Result<Vec<EventRow>> {
1381        let source = source.to_string();
1382        self.with_read_only_conn(move |conn| async move {
1383            let mut rows = conn
1384                .query(
1385                    "SELECT id, source, ts_utc, payload_json, delivery_status, reason \
1386                     FROM events WHERE source = ?1 ORDER BY id DESC LIMIT ?2",
1387                    params_from_iter([Value::from(source), Value::from(limit)]),
1388                )
1389                .await?;
1390            let mut out = Vec::new();
1391            while let Some(row) = rows.next().await? {
1392                let reason = match row.get_value(5)? {
1393                    Value::Null => None,
1394                    other => Some(
1395                        other
1396                            .as_text()
1397                            .cloned()
1398                            .ok_or_else(|| Error::NotFound("event reason".into()))?,
1399                    ),
1400                };
1401                out.push(EventRow {
1402                    id: integer_value(&row.get_value(0)?)?,
1403                    source: text_value(&row.get_value(1)?)?,
1404                    ts_utc: text_value(&row.get_value(2)?)?,
1405                    payload_json: text_value(&row.get_value(3)?)?,
1406                    delivery_status: text_value(&row.get_value(4)?)?,
1407                    reason,
1408                });
1409            }
1410            Ok(out)
1411        })
1412    }
1413
1414    pub fn record_task(&self, record: TaskRecord<'_>) -> Result<i64> {
1415        let now = ts(Utc::now());
1416        self.insert_json("netsky_tasks", TASKS, |id| TaskRow {
1417            id,
1418            created_at: now.clone(),
1419            updated_at: now.clone(),
1420            title: record.title.to_string(),
1421            body: record.body.map(str::to_string),
1422            status: record.status.to_string(),
1423            priority: record.priority.map(str::to_string),
1424            labels: record.labels.map(str::to_string),
1425            source: record.source.map(str::to_string),
1426            source_ref: record.source_ref.map(str::to_string),
1427            closed_at: record.closed_at.map(|value| {
1428                if value.is_empty() {
1429                    now.clone()
1430                } else {
1431                    value.to_string()
1432                }
1433            }),
1434            closed_reason: record.closed_reason.map(str::to_string),
1435            closed_evidence: record.closed_evidence.map(str::to_string),
1436            agent: record.agent.map(str::to_string),
1437            sync_calendar: record.sync_calendar,
1438            calendar_task_id: record.calendar_task_id.map(str::to_string),
1439        })
1440    }
1441
1442    pub fn update_task(&self, update: TaskUpdate<'_>) -> Result<TaskRow> {
1443        let mut row = self
1444            .get_task(update.id)?
1445            .ok_or_else(|| Error::NotFound(format!("task {}", update.id)))?;
1446        let now = ts(Utc::now());
1447        if let Some(status) = update.status {
1448            row.status = status.to_string();
1449            if status == "closed" && row.closed_at.is_none() {
1450                row.closed_at = Some(now.clone());
1451            }
1452        }
1453        if let Some(priority) = update.priority {
1454            row.priority = Some(priority.to_string());
1455        }
1456        if let Some(agent) = update.agent {
1457            row.agent = Some(agent.to_string());
1458        }
1459        if let Some(reason) = update.closed_reason {
1460            row.closed_reason = Some(reason.to_string());
1461        }
1462        if let Some(evidence) = update.closed_evidence {
1463            row.closed_evidence = Some(evidence.to_string());
1464        }
1465        if let Some(sync_calendar) = update.sync_calendar {
1466            row.sync_calendar = sync_calendar;
1467        }
1468        if let Some(calendar_task_id) = update.calendar_task_id {
1469            row.calendar_task_id = Some(calendar_task_id.to_string());
1470        }
1471        row.updated_at = now;
1472        let row_json = serde_json::to_string(&row)?;
1473        let id = update.id;
1474        let write = retry_db_lock(|| {
1475            let row_json = row_json.clone();
1476            self.with_conn(move |conn| async move {
1477                conn.execute(
1478                    "UPDATE netsky_tasks SET row_json = ?1 WHERE id = ?2",
1479                    params_from_iter([Value::from(row_json), Value::from(id)]),
1480                )
1481                .await?;
1482                Ok(())
1483            })
1484        });
1485        if let Err(error) = write {
1486            spool_write_error("netsky_tasks", update.id, &row, &error)?;
1487        }
1488        Ok(row)
1489    }
1490
1491    pub fn get_task(&self, id: i64) -> Result<Option<TaskRow>> {
1492        self.with_conn(move |conn| async move {
1493            let mut rows = conn
1494                .query(
1495                    "SELECT row_json FROM netsky_tasks WHERE id = ?1",
1496                    params_from_iter([Value::from(id)]),
1497                )
1498                .await?;
1499            let Some(row) = rows.next().await? else {
1500                return Ok(None);
1501            };
1502            let json = text_value(&row.get_value(0)?)?;
1503            Ok(Some(serde_json::from_str(&json)?))
1504        })
1505    }
1506
1507    pub fn list_tasks(&self, status: Option<&str>, priority: Option<&str>) -> Result<Vec<TaskRow>> {
1508        let status = status.map(str::to_string);
1509        let priority = priority.map(str::to_string);
1510        self.with_conn(move |conn| async move {
1511            let status_val = status.map(Value::from).unwrap_or(Value::Null);
1512            let priority_val = priority.map(Value::from).unwrap_or(Value::Null);
1513            let mut rows = conn
1514                .query(
1515                    "SELECT row_json FROM netsky_tasks \
1516                     WHERE (?1 IS NULL OR json_extract(row_json, '$.status') = ?1) \
1517                       AND (?2 IS NULL OR json_extract(row_json, '$.priority') = ?2) \
1518                     ORDER BY json_extract(row_json, '$.status'), \
1519                              json_extract(row_json, '$.priority'), \
1520                              id",
1521                    params_from_iter([status_val, priority_val]),
1522                )
1523                .await?;
1524            let mut out = Vec::new();
1525            while let Some(row) = rows.next().await? {
1526                let json = text_value(&row.get_value(0)?)?;
1527                out.push(serde_json::from_str(&json)?);
1528            }
1529            Ok(out)
1530        })
1531    }
1532
1533    pub fn query(&self, sql: &str) -> Result<String> {
1534        let batches = self.query_batches(sql)?;
1535        Ok(pretty_format_batches(&batches)?.to_string())
1536    }
1537
1538    pub fn query_batches(&self, sql: &str) -> Result<Vec<RecordBatch>> {
1539        let scope = referenced_storage_tables(sql);
1540        let snapshot = self.snapshot_scoped(scope.as_ref())?;
1541        let runtime = tokio::runtime::Runtime::new()?;
1542        runtime.block_on(async move {
1543            let ctx = register_snapshot(snapshot).await?;
1544            let df = ctx.sql(sql).await?;
1545            Ok(df.collect().await?)
1546        })
1547    }
1548
1549    pub fn schema_version(&self) -> Result<i64> {
1550        self.with_conn(|conn| async move { schema_version(&conn).await })
1551    }
1552
1553    fn insert_json<T, F>(&self, name: &'static str, table: &'static str, row: F) -> Result<i64>
1554    where
1555        T: Serialize,
1556        F: FnOnce(i64) -> T,
1557    {
1558        let id = self.next_id(name)?;
1559        let row = row(id);
1560        let row_json = serde_json::to_string(&row)?;
1561        let write = retry_db_lock(|| {
1562            let row_json = row_json.clone();
1563            self.with_conn({
1564                let table = table.to_string();
1565                move |conn| async move {
1566                    let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
1567                    conn.execute(
1568                        &sql,
1569                        params_from_iter([Value::from(id), Value::from(row_json)]),
1570                    )
1571                    .await?;
1572                    Ok(())
1573                }
1574            })
1575        });
1576        if let Err(error) = write {
1577            spool_write_error(table, id, &row, &error)?;
1578        }
1579        Ok(id)
1580    }
1581
1582    /// Snapshot only the storage tables in `scope`; unreferenced tables
1583    /// return empty `Vec`s and their DataFusion registrations become empty
1584    /// MemTables. `None` loads every table (back-compat for callers without
1585    /// a parsed SQL scope).
1586    fn snapshot_scoped(&self, scope: Option<&HashSet<&'static str>>) -> Result<Snapshot> {
1587        let owned: Option<HashSet<&'static str>> = scope.cloned();
1588        self.with_read_only_conn(move |conn| async move {
1589            let wants = |t: &'static str| owned.as_ref().is_none_or(|s| s.contains(t));
1590            Ok(Snapshot {
1591                messages: if wants(MESSAGES) {
1592                    rows(&conn, MESSAGES).await?
1593                } else {
1594                    Vec::new()
1595                },
1596                cli_invocations: if wants(CLI_INVOCATIONS) {
1597                    rows(&conn, CLI_INVOCATIONS).await?
1598                } else {
1599                    Vec::new()
1600                },
1601                crashes: if wants(CRASHES) {
1602                    rows(&conn, CRASHES).await?
1603                } else {
1604                    Vec::new()
1605                },
1606                ticks: if wants(TICKS) {
1607                    rows(&conn, TICKS).await?
1608                } else {
1609                    Vec::new()
1610                },
1611                workspaces: if wants(WORKSPACES) {
1612                    rows(&conn, WORKSPACES).await?
1613                } else {
1614                    Vec::new()
1615                },
1616                sessions: if wants(SESSIONS) {
1617                    rows(&conn, SESSIONS).await?
1618                } else {
1619                    Vec::new()
1620                },
1621                clone_dispatches: if wants(CLONE_DISPATCHES) {
1622                    rows(&conn, CLONE_DISPATCHES).await?
1623                } else {
1624                    Vec::new()
1625                },
1626                harvest_events: if wants(HARVEST_EVENTS) {
1627                    rows(&conn, HARVEST_EVENTS).await?
1628                } else {
1629                    Vec::new()
1630                },
1631                communication_events: if wants(COMMUNICATION_EVENTS) {
1632                    rows(&conn, COMMUNICATION_EVENTS).await?
1633                } else {
1634                    Vec::new()
1635                },
1636                mcp_tool_calls: if wants(MCP_TOOL_CALLS) {
1637                    rows(&conn, MCP_TOOL_CALLS).await?
1638                } else {
1639                    Vec::new()
1640                },
1641                git_operations: if wants(GIT_OPERATIONS) {
1642                    rows(&conn, GIT_OPERATIONS).await?
1643                } else {
1644                    Vec::new()
1645                },
1646                owner_directives: if wants(OWNER_DIRECTIVES) {
1647                    rows(&conn, OWNER_DIRECTIVES).await?
1648                } else {
1649                    Vec::new()
1650                },
1651                token_usage: if wants(TOKEN_USAGE) {
1652                    rows(&conn, TOKEN_USAGE).await?
1653                } else {
1654                    Vec::new()
1655                },
1656                watchdog_events: if wants(WATCHDOG_EVENTS) {
1657                    rows(&conn, WATCHDOG_EVENTS).await?
1658                } else {
1659                    Vec::new()
1660                },
1661                tasks: if wants(TASKS) {
1662                    rows(&conn, TASKS).await?
1663                } else {
1664                    Vec::new()
1665                },
1666                source_errors: if wants(SOURCE_ERRORS) {
1667                    rows(&conn, SOURCE_ERRORS).await?
1668                } else {
1669                    Vec::new()
1670                },
1671                iroh_events: if wants(IROH_EVENTS) {
1672                    rows(&conn, IROH_EVENTS).await?
1673                } else {
1674                    Vec::new()
1675                },
1676            })
1677        })
1678    }
1679
1680    fn next_id(&self, name: &'static str) -> Result<i64> {
1681        let result = self.with_conn(move |conn| async move {
1682            let mut rows = conn
1683                .query(
1684                    "INSERT INTO ids (name, id) VALUES (?1, 1) \
1685                     ON CONFLICT(name) DO UPDATE SET id = id + 1 \
1686                     RETURNING id",
1687                    params_from_iter([Value::from(name)]),
1688                )
1689                .await?;
1690            if let Some(row) = rows.next().await? {
1691                return integer_value(&row.get_value(0)?);
1692            }
1693            Err(Error::NotFound(format!("id allocation for {name}")))
1694        });
1695        match result {
1696            Ok(id) => Ok(id),
1697            Err(error) => {
1698                let fallback = Utc::now().timestamp_micros();
1699                spool_error_json(serde_json::json!({
1700                    "ts_utc": ts(Utc::now()),
1701                    "table": name,
1702                    "id": fallback,
1703                    "error": error.to_string(),
1704                    "record": {"kind": "id_allocation"}
1705                }))?;
1706                Ok(fallback)
1707            }
1708        }
1709    }
1710
1711    fn with_conn<T, F, Fut>(&self, f: F) -> Result<T>
1712    where
1713        T: Send + 'static,
1714        F: FnOnce(Connection) -> Fut + Send + 'static,
1715        Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1716    {
1717        let path = self.path.clone();
1718        let max_attempts = if tokio::runtime::Handle::try_current().is_ok() {
1719            3
1720        } else {
1721            10
1722        };
1723        let task = async move {
1724            let path = path.to_string_lossy().to_string();
1725            let mut last_error = None;
1726            for attempt in 0..=max_attempts {
1727                match Builder::new_local(&path).build().await {
1728                    Ok(db) => {
1729                        let conn = db.connect()?;
1730                        configure_conn(&conn).await?;
1731                        return f(conn).await;
1732                    }
1733                    Err(error) if is_locking_error(&error) && attempt < max_attempts => {
1734                        last_error = Some(error);
1735                        tokio::time::sleep(Duration::from_millis(10 + attempt * 2)).await;
1736                    }
1737                    Err(error) => return Err(error.into()),
1738                }
1739            }
1740            Err(last_error
1741                .map(Error::from)
1742                .unwrap_or_else(|| Error::NotFound("turso connection".to_string())))
1743        };
1744        block_on_turso(task)
1745    }
1746
1747    fn with_read_only_conn<T, F, Fut>(&self, f: F) -> Result<T>
1748    where
1749        T: Send + 'static,
1750        F: FnOnce(Connection) -> Fut + Send + 'static,
1751        Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1752    {
1753        let path = self.path.clone();
1754        let task = async move {
1755            let path_str = path.to_string_lossy().to_string();
1756            let db = Builder::new_local(&path_str).build().await?;
1757            let conn = db.connect()?;
1758            configure_read_only_conn(&conn, &path).await?;
1759            f(conn).await
1760        };
1761        block_on_turso(task)
1762    }
1763
1764    #[cfg(test)]
1765    fn read_only_write_probe_fails(&self) -> Result<bool> {
1766        self.with_read_only_conn(|conn| async move {
1767            Ok(conn
1768                .execute(
1769                    "INSERT INTO meta (key, value) VALUES ('read_only_probe', 1)",
1770                    (),
1771                )
1772                .await
1773                .is_err())
1774        })
1775    }
1776}
1777
1778struct Snapshot {
1779    messages: Vec<MessageRow>,
1780    cli_invocations: Vec<CliRow>,
1781    crashes: Vec<CrashRow>,
1782    ticks: Vec<TickRow>,
1783    workspaces: Vec<WorkspaceRow>,
1784    sessions: Vec<SessionRow>,
1785    clone_dispatches: Vec<CloneDispatchRow>,
1786    harvest_events: Vec<HarvestEventRow>,
1787    communication_events: Vec<CommunicationEventRow>,
1788    mcp_tool_calls: Vec<McpToolCallRow>,
1789    git_operations: Vec<GitOperationRow>,
1790    owner_directives: Vec<OwnerDirectiveRow>,
1791    token_usage: Vec<TokenUsageRow>,
1792    watchdog_events: Vec<WatchdogEventRow>,
1793    tasks: Vec<TaskRow>,
1794    source_errors: Vec<SourceErrorRow>,
1795    iroh_events: Vec<IrohEventRow>,
1796}
1797
1798async fn configure_conn(conn: &Connection) -> Result<()> {
1799    conn.busy_timeout(Duration::from_secs(10))?;
1800    if user_version(conn).await? == SCHEMA_VERSION {
1801        return Ok(());
1802    }
1803    let mut wal = conn.query("PRAGMA journal_mode=WAL", ()).await?;
1804    while wal.next().await?.is_some() {}
1805    conn.execute_batch(
1806        "PRAGMA synchronous=NORMAL; \
1807         CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value INTEGER NOT NULL); \
1808         CREATE TABLE IF NOT EXISTS ids (name TEXT PRIMARY KEY, id INTEGER NOT NULL);",
1809    )
1810    .await?;
1811    Ok(())
1812}
1813
1814async fn configure_read_only_conn(conn: &Connection, path: &Path) -> Result<()> {
1815    conn.busy_timeout(Duration::from_secs(10))?;
1816    conn.execute("PRAGMA query_only = 1", ()).await?;
1817    if query_only(conn).await? != 1 {
1818        return Err(Error::ReadOnlyNotEnforced(path.to_path_buf()));
1819    }
1820    ensure_initialized(conn, path).await
1821}
1822
1823async fn ensure_initialized(conn: &Connection, path: &Path) -> Result<()> {
1824    if !table_exists(conn, "meta").await? {
1825        return Err(Error::NotInitialized(path.to_path_buf()));
1826    }
1827    let current = schema_version(conn).await?;
1828    if current == 0 {
1829        return Err(Error::NotInitialized(path.to_path_buf()));
1830    }
1831    if current > SCHEMA_VERSION {
1832        return Err(Error::FutureSchemaVersion {
1833            found: current,
1834            supported: SCHEMA_VERSION,
1835        });
1836    }
1837    Ok(())
1838}
1839
1840async fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
1841    let mut rows = conn
1842        .query(
1843            "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1",
1844            params_from_iter([Value::from(table)]),
1845        )
1846        .await?;
1847    Ok(rows.next().await?.is_some())
1848}
1849
1850async fn query_only(conn: &Connection) -> Result<i64> {
1851    let mut rows = conn.query("PRAGMA query_only", ()).await?;
1852    let Some(row) = rows.next().await? else {
1853        return Ok(0);
1854    };
1855    integer_value(&row.get_value(0)?)
1856}
1857
1858async fn user_version(conn: &Connection) -> Result<i64> {
1859    let mut rows = conn.query("PRAGMA user_version", ()).await?;
1860    let Some(row) = rows.next().await? else {
1861        return Ok(0);
1862    };
1863    integer_value(&row.get_value(0)?)
1864}
1865
1866async fn schema_version(conn: &Connection) -> Result<i64> {
1867    let mut rows = conn
1868        .query(
1869            "SELECT value FROM meta WHERE key = ?1",
1870            params_from_iter([Value::from("schema_version")]),
1871        )
1872        .await?;
1873    let Some(row) = rows.next().await? else {
1874        return Ok(0);
1875    };
1876    integer_value(&row.get_value(0)?)
1877}
1878
1879async fn migrate_token_usage_ingest_idempotency(
1880    path: &Path,
1881    conn: &Connection,
1882    current: i64,
1883) -> Result<()> {
1884    if current < 7 {
1885        backup_db_before_token_usage_dedupe(path)?;
1886    }
1887    dedupe_token_usage_claude_sessions_rows(conn).await?;
1888    ensure_token_usage_claude_sessions_unique_index(conn).await
1889}
1890
1891fn backup_db_before_token_usage_dedupe(path: &Path) -> Result<()> {
1892    if path == Path::new(":memory:") || !path.exists() {
1893        return Ok(());
1894    }
1895    let backup = path.with_extension("db.bak.v7-token-usage-dedupe");
1896    if backup.exists() {
1897        return Ok(());
1898    }
1899    fs::copy(path, backup)?;
1900    Ok(())
1901}
1902
1903async fn dedupe_token_usage_claude_sessions_rows(conn: &Connection) -> Result<()> {
1904    let sql = format!(
1905        "DELETE FROM token_usage \
1906         WHERE id IN ( \
1907             SELECT victim.id \
1908             FROM token_usage AS victim \
1909             JOIN ( \
1910                 SELECT COALESCE(json_extract(row_json, '$.session_id'), '') AS session_id_key, \
1911                        json_extract(json_extract(row_json, '$.detail_json'), '$.path') AS path_key, \
1912                        json_extract(json_extract(row_json, '$.detail_json'), '$.line') AS line_key, \
1913                        MIN(id) AS keep_id \
1914                 FROM token_usage \
1915                 WHERE {filter} \
1916                 GROUP BY 1, 2, 3 \
1917                 HAVING COUNT(*) > 1 \
1918             ) AS dup \
1919               ON COALESCE(json_extract(victim.row_json, '$.session_id'), '') = dup.session_id_key \
1920              AND json_extract(json_extract(victim.row_json, '$.detail_json'), '$.path') = dup.path_key \
1921              AND json_extract(json_extract(victim.row_json, '$.detail_json'), '$.line') = dup.line_key \
1922             WHERE {victim_filter} \
1923               AND victim.id <> dup.keep_id \
1924         )",
1925        filter = TOKEN_USAGE_CLAUDE_SESSIONS_FILTER,
1926        victim_filter = qualify_token_usage_filter("victim"),
1927    );
1928    conn.execute(&sql, ()).await?;
1929    Ok(())
1930}
1931
1932async fn ensure_token_usage_claude_sessions_unique_index(conn: &Connection) -> Result<()> {
1933    let sql = format!(
1934        "CREATE UNIQUE INDEX IF NOT EXISTS {TOKEN_USAGE_CLAUDE_SESSIONS_UNIQUE_INDEX} \
1935         ON token_usage ({TOKEN_USAGE_CLAUDE_SESSIONS_KEY}) \
1936         WHERE {TOKEN_USAGE_CLAUDE_SESSIONS_FILTER}"
1937    );
1938    conn.execute(&sql, ()).await?;
1939    Ok(())
1940}
1941
1942fn qualify_token_usage_filter(alias: &str) -> String {
1943    TOKEN_USAGE_CLAUDE_SESSIONS_FILTER.replace("row_json", &format!("{alias}.row_json"))
1944}
1945
1946async fn rows<T>(conn: &Connection, table: &str) -> Result<Vec<T>>
1947where
1948    T: DeserializeOwned,
1949{
1950    let sql = format!("SELECT row_json FROM {table} ORDER BY id");
1951    let mut out = Vec::new();
1952    let mut rows = conn.query(&sql, ()).await?;
1953    while let Some(row) = rows.next().await? {
1954        let json = text_value(&row.get_value(0)?)?;
1955        out.push(serde_json::from_str(&json)?);
1956    }
1957    Ok(out)
1958}
1959
1960fn integer_value(value: &Value) -> Result<i64> {
1961    value
1962        .as_integer()
1963        .copied()
1964        .ok_or_else(|| Error::NotFound("integer value".to_string()))
1965}
1966
1967fn text_value(value: &Value) -> Result<String> {
1968    value
1969        .as_text()
1970        .cloned()
1971        .ok_or_else(|| Error::NotFound("text value".to_string()))
1972}
1973
1974fn block_on_turso<T, Fut>(future: Fut) -> Result<T>
1975where
1976    T: Send + 'static,
1977    Fut: std::future::Future<Output = Result<T>> + Send + 'static,
1978{
1979    if tokio::runtime::Handle::try_current().is_ok() {
1980        std::thread::spawn(move || Runtime::new()?.block_on(future))
1981            .join()
1982            .unwrap_or_else(|_| Err(Error::NotFound("turso worker thread".to_string())))
1983    } else {
1984        Runtime::new()?.block_on(future)
1985    }
1986}
1987
1988fn is_locking_error(error: &turso::Error) -> bool {
1989    let message = error.to_string();
1990    netsky_core::retry::is_lock_error_message(&message)
1991}
1992
1993fn retry_db_lock<T, F>(op: F) -> Result<T>
1994where
1995    F: FnMut() -> Result<T>,
1996{
1997    match netsky_core::retry::on_lock(op, netsky_core::retry::LockRetryPolicy::db_default()) {
1998        Ok(value) => Ok(value),
1999        Err(netsky_core::retry::LockRetryError::Operation(error)) => Err(error),
2000        Err(netsky_core::retry::LockRetryError::Exhausted {
2001            source,
2002            retries,
2003            total_wait,
2004        }) => Err(Error::Locked {
2005            retries,
2006            total_wait_ms: total_wait.as_millis(),
2007            detail: source.to_string(),
2008        }),
2009    }
2010}
2011
2012fn spool_write_error<T: Serialize>(table: &str, id: i64, record: &T, error: &Error) -> Result<()> {
2013    spool_error_json(serde_json::json!({
2014        "ts_utc": ts(Utc::now()),
2015        "table": table,
2016        "id": id,
2017        "error": error.to_string(),
2018        "record": record,
2019    }))
2020}
2021
2022fn spool_error_json(value: serde_json::Value) -> Result<()> {
2023    let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
2024    let dir = home.join(".netsky").join("logs");
2025    fs::create_dir_all(&dir)?;
2026    let path = dir.join(format!(
2027        "meta-db-errors-{}.jsonl",
2028        Utc::now().format("%Y-%m-%d")
2029    ));
2030    if let Err(spool_error) = netsky_core::jsonl::append_json_line(&path, &value) {
2031        write_spool_failure_marker(&value, &spool_error)?;
2032        return Err(Error::Io(spool_error));
2033    }
2034    Ok(())
2035}
2036
2037fn write_spool_failure_marker(
2038    value: &serde_json::Value,
2039    spool_error: &std::io::Error,
2040) -> Result<()> {
2041    let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
2042    let dir = home.join(".netsky").join("state");
2043    fs::create_dir_all(&dir)?;
2044    let path = dir.join("meta-db-spool-failed");
2045    let body = serde_json::json!({
2046        "ts_utc": ts(Utc::now()),
2047        "spool_error": spool_error.to_string(),
2048        "record": value,
2049    });
2050    fs::write(path, serde_json::to_vec_pretty(&body)?)?;
2051    Ok(())
2052}
2053
2054/// Scope the snapshot loader to storage tables the SQL actually references.
2055/// Conservative: returns `None` (load everything) when no known name
2056/// appears, so ad-hoc SQL still works. False positives only waste a
2057/// `json_extract` name check; false negatives produce empty tables, so
2058/// the scan deliberately over-includes.
2059fn referenced_storage_tables(sql: &str) -> Option<HashSet<&'static str>> {
2060    let lower = sql.to_ascii_lowercase();
2061    let mut out: HashSet<&'static str> = HashSet::new();
2062    for &(df_name, storage) in &DATAFUSION_TABLES {
2063        if contains_word(&lower, df_name) {
2064            out.insert(storage);
2065        }
2066    }
2067    if contains_word(&lower, CLONE_LIFETIMES_VIEW) {
2068        out.insert(CLONE_DISPATCHES);
2069    }
2070    if out.is_empty() { None } else { Some(out) }
2071}
2072
2073/// Word-boundary containment check: `needle` must be surrounded by
2074/// non-identifier chars (alphanumeric or `_`) on both sides, so
2075/// `SELECT tasks` matches but `netsky_tasks` does not match the search
2076/// for `tasks`.
2077fn contains_word(haystack: &str, needle: &str) -> bool {
2078    if needle.is_empty() || needle.len() > haystack.len() {
2079        return false;
2080    }
2081    let hb = haystack.as_bytes();
2082    let nb = needle.as_bytes();
2083    let mut cursor = 0;
2084    while cursor + nb.len() <= hb.len() {
2085        let rest = &haystack[cursor..];
2086        let Some(pos) = rest.find(needle) else { break };
2087        let start = cursor + pos;
2088        let end = start + nb.len();
2089        let before_ok = start == 0 || !is_ident_byte(hb[start - 1]);
2090        let after_ok = end == hb.len() || !is_ident_byte(hb[end]);
2091        if before_ok && after_ok {
2092            return true;
2093        }
2094        cursor = start + 1;
2095    }
2096    false
2097}
2098
2099fn is_ident_byte(b: u8) -> bool {
2100    b.is_ascii_alphanumeric() || b == b'_'
2101}
2102
2103fn register(ctx: &SessionContext, name: &str, batch: RecordBatch) -> Result<()> {
2104    let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
2105    ctx.register_table(name, Arc::new(table))?;
2106    Ok(())
2107}
2108
2109async fn register_snapshot(snapshot: Snapshot) -> Result<SessionContext> {
2110    let ctx = SessionContext::new();
2111    register(&ctx, "messages", messages_batch(snapshot.messages)?)?;
2112    register(
2113        &ctx,
2114        "cli_invocations",
2115        cli_batch(snapshot.cli_invocations)?,
2116    )?;
2117    register(&ctx, "crashes", crashes_batch(snapshot.crashes)?)?;
2118    register(&ctx, "ticks", ticks_batch(snapshot.ticks)?)?;
2119    register(&ctx, "workspaces", workspaces_batch(snapshot.workspaces)?)?;
2120    register(&ctx, "sessions", sessions_batch(snapshot.sessions)?)?;
2121    register(
2122        &ctx,
2123        "clone_dispatches",
2124        clone_dispatches_batch(snapshot.clone_dispatches)?,
2125    )?;
2126    register(
2127        &ctx,
2128        "harvest_events",
2129        harvest_events_batch(snapshot.harvest_events)?,
2130    )?;
2131    register(
2132        &ctx,
2133        "communication_events",
2134        communication_events_batch(snapshot.communication_events)?,
2135    )?;
2136    register(
2137        &ctx,
2138        "mcp_tool_calls",
2139        mcp_tool_calls_batch(snapshot.mcp_tool_calls)?,
2140    )?;
2141    register(
2142        &ctx,
2143        "git_operations",
2144        git_operations_batch(snapshot.git_operations)?,
2145    )?;
2146    register(
2147        &ctx,
2148        "owner_directives",
2149        owner_directives_batch(snapshot.owner_directives)?,
2150    )?;
2151    register(
2152        &ctx,
2153        "token_usage",
2154        token_usage_batch(snapshot.token_usage)?,
2155    )?;
2156    register(
2157        &ctx,
2158        "watchdog_events",
2159        watchdog_events_batch(snapshot.watchdog_events)?,
2160    )?;
2161    register(&ctx, "tasks", tasks_batch(snapshot.tasks)?)?;
2162    register(
2163        &ctx,
2164        "source_errors",
2165        source_errors_batch(snapshot.source_errors)?,
2166    )?;
2167    register(
2168        &ctx,
2169        "iroh_events",
2170        iroh_events_batch(snapshot.iroh_events)?,
2171    )?;
2172    ctx.sql(
2173        "CREATE VIEW clone_lifetimes AS \
2174         SELECT id, agent_id, runtime, workspace, branch, status, \
2175                ts_utc_start, ts_utc_end \
2176         FROM clone_dispatches \
2177         WHERE ts_utc_end IS NOT NULL",
2178    )
2179    .await?;
2180    Ok(ctx)
2181}
2182
2183fn messages_batch(rows: Vec<MessageRow>) -> Result<RecordBatch> {
2184    Ok(RecordBatch::try_new(
2185        schema(&[
2186            ("id", DataType::Int64, false),
2187            ("ts_utc", DataType::Utf8, false),
2188            ("source", DataType::Utf8, false),
2189            ("direction", DataType::Utf8, false),
2190            ("chat_id", DataType::Utf8, true),
2191            ("from_agent", DataType::Utf8, true),
2192            ("to_agent", DataType::Utf8, true),
2193            ("body", DataType::Utf8, true),
2194            ("raw_json", DataType::Utf8, true),
2195        ]),
2196        vec![
2197            ids(&rows, |r| r.id),
2198            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2199            string(rows.iter().map(|r| Some(r.source.as_str()))),
2200            string(rows.iter().map(|r| Some(r.direction.as_str()))),
2201            opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2202            opt_string(rows.iter().map(|r| r.from_agent.as_deref())),
2203            opt_string(rows.iter().map(|r| r.to_agent.as_deref())),
2204            opt_string(rows.iter().map(|r| r.body.as_deref())),
2205            opt_string(rows.iter().map(|r| r.raw_json.as_deref())),
2206        ],
2207    )?)
2208}
2209
2210fn cli_batch(rows: Vec<CliRow>) -> Result<RecordBatch> {
2211    Ok(RecordBatch::try_new(
2212        schema(&[
2213            ("id", DataType::Int64, false),
2214            ("ts_utc", DataType::Utf8, false),
2215            ("bin", DataType::Utf8, false),
2216            ("argv_json", DataType::Utf8, false),
2217            ("exit_code", DataType::Int64, true),
2218            ("duration_ms", DataType::Int64, true),
2219            ("host", DataType::Utf8, false),
2220        ]),
2221        vec![
2222            ids(&rows, |r| r.id),
2223            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2224            string(rows.iter().map(|r| Some(r.bin.as_str()))),
2225            string(rows.iter().map(|r| Some(r.argv_json.as_str()))),
2226            int64_opt(rows.iter().map(|r| r.exit_code)),
2227            int64_opt(rows.iter().map(|r| r.duration_ms)),
2228            string(rows.iter().map(|r| Some(r.host.as_str()))),
2229        ],
2230    )?)
2231}
2232
2233fn crashes_batch(rows: Vec<CrashRow>) -> Result<RecordBatch> {
2234    simple_event_batch(
2235        schema(&[
2236            ("id", DataType::Int64, false),
2237            ("ts_utc", DataType::Utf8, false),
2238            ("kind", DataType::Utf8, false),
2239            ("agent", DataType::Utf8, false),
2240            ("detail_json", DataType::Utf8, false),
2241        ]),
2242        rows.iter()
2243            .map(|r| (r.id, &r.ts_utc, &r.kind, &r.agent, &r.detail_json)),
2244    )
2245}
2246
2247fn ticks_batch(rows: Vec<TickRow>) -> Result<RecordBatch> {
2248    Ok(RecordBatch::try_new(
2249        schema(&[
2250            ("id", DataType::Int64, false),
2251            ("ts_utc", DataType::Utf8, false),
2252            ("source", DataType::Utf8, false),
2253            ("detail_json", DataType::Utf8, false),
2254        ]),
2255        vec![
2256            ids(&rows, |r| r.id),
2257            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2258            string(rows.iter().map(|r| Some(r.source.as_str()))),
2259            string(rows.iter().map(|r| Some(r.detail_json.as_str()))),
2260        ],
2261    )?)
2262}
2263
2264fn workspaces_batch(rows: Vec<WorkspaceRow>) -> Result<RecordBatch> {
2265    Ok(RecordBatch::try_new(
2266        schema(&[
2267            ("id", DataType::Int64, false),
2268            ("ts_utc_created", DataType::Utf8, false),
2269            ("name", DataType::Utf8, false),
2270            ("branch", DataType::Utf8, false),
2271            ("ts_utc_deleted", DataType::Utf8, true),
2272            ("verdict", DataType::Utf8, true),
2273        ]),
2274        vec![
2275            ids(&rows, |r| r.id),
2276            string(rows.iter().map(|r| Some(r.ts_utc_created.as_str()))),
2277            string(rows.iter().map(|r| Some(r.name.as_str()))),
2278            string(rows.iter().map(|r| Some(r.branch.as_str()))),
2279            opt_string(rows.iter().map(|r| r.ts_utc_deleted.as_deref())),
2280            opt_string(rows.iter().map(|r| r.verdict.as_deref())),
2281        ],
2282    )?)
2283}
2284
2285fn sessions_batch(rows: Vec<SessionRow>) -> Result<RecordBatch> {
2286    Ok(RecordBatch::try_new(
2287        schema(&[
2288            ("id", DataType::Int64, false),
2289            ("ts_utc", DataType::Utf8, false),
2290            ("agent", DataType::Utf8, false),
2291            ("session_num", DataType::Int64, false),
2292            ("event", DataType::Utf8, false),
2293        ]),
2294        vec![
2295            ids(&rows, |r| r.id),
2296            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2297            string(rows.iter().map(|r| Some(r.agent.as_str()))),
2298            int64(rows.iter().map(|r| r.session_num)),
2299            string(rows.iter().map(|r| Some(r.event.as_str()))),
2300        ],
2301    )?)
2302}
2303
2304fn clone_dispatches_batch(rows: Vec<CloneDispatchRow>) -> Result<RecordBatch> {
2305    Ok(RecordBatch::try_new(
2306        schema(&[
2307            ("id", DataType::Int64, false),
2308            ("ts_utc_start", DataType::Utf8, false),
2309            ("ts_utc_end", DataType::Utf8, true),
2310            ("agent_id", DataType::Utf8, false),
2311            ("runtime", DataType::Utf8, true),
2312            ("brief_path", DataType::Utf8, true),
2313            ("brief", DataType::Utf8, true),
2314            ("workspace", DataType::Utf8, true),
2315            ("branch", DataType::Utf8, true),
2316            ("status", DataType::Utf8, true),
2317            ("exit_code", DataType::Int64, true),
2318            ("detail_json", DataType::Utf8, true),
2319        ]),
2320        vec![
2321            ids(&rows, |r| r.id),
2322            string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2323            opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2324            string(rows.iter().map(|r| Some(r.agent_id.as_str()))),
2325            opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2326            opt_string(rows.iter().map(|r| r.brief_path.as_deref())),
2327            opt_string(rows.iter().map(|r| r.brief.as_deref())),
2328            opt_string(rows.iter().map(|r| r.workspace.as_deref())),
2329            opt_string(rows.iter().map(|r| r.branch.as_deref())),
2330            opt_string(rows.iter().map(|r| r.status.as_deref())),
2331            int64_opt(rows.iter().map(|r| r.exit_code)),
2332            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2333        ],
2334    )?)
2335}
2336
2337fn harvest_events_batch(rows: Vec<HarvestEventRow>) -> Result<RecordBatch> {
2338    Ok(RecordBatch::try_new(
2339        schema(&[
2340            ("id", DataType::Int64, false),
2341            ("ts_utc", DataType::Utf8, false),
2342            ("source_branch", DataType::Utf8, false),
2343            ("target_branch", DataType::Utf8, false),
2344            ("commit_sha", DataType::Utf8, true),
2345            ("status", DataType::Utf8, false),
2346            ("conflicts", DataType::Utf8, true),
2347            ("detail_json", DataType::Utf8, true),
2348        ]),
2349        vec![
2350            ids(&rows, |r| r.id),
2351            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2352            string(rows.iter().map(|r| Some(r.source_branch.as_str()))),
2353            string(rows.iter().map(|r| Some(r.target_branch.as_str()))),
2354            opt_string(rows.iter().map(|r| r.commit_sha.as_deref())),
2355            string(rows.iter().map(|r| Some(r.status.as_str()))),
2356            opt_string(rows.iter().map(|r| r.conflicts.as_deref())),
2357            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2358        ],
2359    )?)
2360}
2361
2362fn communication_events_batch(rows: Vec<CommunicationEventRow>) -> Result<RecordBatch> {
2363    Ok(RecordBatch::try_new(
2364        schema(&[
2365            ("id", DataType::Int64, false),
2366            ("ts_utc", DataType::Utf8, false),
2367            ("source", DataType::Utf8, false),
2368            ("tool", DataType::Utf8, true),
2369            ("direction", DataType::Utf8, false),
2370            ("chat_id", DataType::Utf8, true),
2371            ("message_id", DataType::Utf8, true),
2372            ("handle", DataType::Utf8, true),
2373            ("agent", DataType::Utf8, true),
2374            ("body", DataType::Utf8, true),
2375            ("status", DataType::Utf8, true),
2376            ("detail_json", DataType::Utf8, true),
2377        ]),
2378        vec![
2379            ids(&rows, |r| r.id),
2380            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2381            string(rows.iter().map(|r| Some(r.source.as_str()))),
2382            opt_string(rows.iter().map(|r| r.tool.as_deref())),
2383            string(rows.iter().map(|r| Some(r.direction.as_str()))),
2384            opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2385            opt_string(rows.iter().map(|r| r.message_id.as_deref())),
2386            opt_string(rows.iter().map(|r| r.handle.as_deref())),
2387            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2388            opt_string(rows.iter().map(|r| r.body.as_deref())),
2389            opt_string(rows.iter().map(|r| r.status.as_deref())),
2390            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2391        ],
2392    )?)
2393}
2394
2395fn mcp_tool_calls_batch(rows: Vec<McpToolCallRow>) -> Result<RecordBatch> {
2396    Ok(RecordBatch::try_new(
2397        schema(&[
2398            ("id", DataType::Int64, false),
2399            ("ts_utc_start", DataType::Utf8, false),
2400            ("ts_utc_end", DataType::Utf8, true),
2401            ("source", DataType::Utf8, false),
2402            ("tool", DataType::Utf8, false),
2403            ("agent", DataType::Utf8, true),
2404            ("duration_ms", DataType::Int64, true),
2405            ("success", DataType::Boolean, false),
2406            ("error", DataType::Utf8, true),
2407            ("timeout_race", DataType::Boolean, false),
2408            ("request_json", DataType::Utf8, true),
2409            ("response_json", DataType::Utf8, true),
2410        ]),
2411        vec![
2412            ids(&rows, |r| r.id),
2413            string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
2414            opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
2415            string(rows.iter().map(|r| Some(r.source.as_str()))),
2416            string(rows.iter().map(|r| Some(r.tool.as_str()))),
2417            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2418            int64_opt(rows.iter().map(|r| r.duration_ms)),
2419            bools(rows.iter().map(|r| r.success)),
2420            opt_string(rows.iter().map(|r| r.error.as_deref())),
2421            bools(rows.iter().map(|r| r.timeout_race)),
2422            opt_string(rows.iter().map(|r| r.request_json.as_deref())),
2423            opt_string(rows.iter().map(|r| r.response_json.as_deref())),
2424        ],
2425    )?)
2426}
2427
2428fn git_operations_batch(rows: Vec<GitOperationRow>) -> Result<RecordBatch> {
2429    Ok(RecordBatch::try_new(
2430        schema(&[
2431            ("id", DataType::Int64, false),
2432            ("ts_utc", DataType::Utf8, false),
2433            ("operation", DataType::Utf8, false),
2434            ("repo", DataType::Utf8, false),
2435            ("branch", DataType::Utf8, true),
2436            ("remote", DataType::Utf8, true),
2437            ("from_sha", DataType::Utf8, true),
2438            ("to_sha", DataType::Utf8, true),
2439            ("status", DataType::Utf8, false),
2440            ("detail_json", DataType::Utf8, true),
2441        ]),
2442        vec![
2443            ids(&rows, |r| r.id),
2444            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2445            string(rows.iter().map(|r| Some(r.operation.as_str()))),
2446            string(rows.iter().map(|r| Some(r.repo.as_str()))),
2447            opt_string(rows.iter().map(|r| r.branch.as_deref())),
2448            opt_string(rows.iter().map(|r| r.remote.as_deref())),
2449            opt_string(rows.iter().map(|r| r.from_sha.as_deref())),
2450            opt_string(rows.iter().map(|r| r.to_sha.as_deref())),
2451            string(rows.iter().map(|r| Some(r.status.as_str()))),
2452            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2453        ],
2454    )?)
2455}
2456
2457fn owner_directives_batch(rows: Vec<OwnerDirectiveRow>) -> Result<RecordBatch> {
2458    Ok(RecordBatch::try_new(
2459        schema(&[
2460            ("id", DataType::Int64, false),
2461            ("ts_utc", DataType::Utf8, false),
2462            ("source", DataType::Utf8, false),
2463            ("chat_id", DataType::Utf8, true),
2464            ("raw_text", DataType::Utf8, false),
2465            ("resolved_action", DataType::Utf8, true),
2466            ("agent", DataType::Utf8, true),
2467            ("status", DataType::Utf8, true),
2468            ("detail_json", DataType::Utf8, true),
2469        ]),
2470        vec![
2471            ids(&rows, |r| r.id),
2472            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2473            string(rows.iter().map(|r| Some(r.source.as_str()))),
2474            opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
2475            string(rows.iter().map(|r| Some(r.raw_text.as_str()))),
2476            opt_string(rows.iter().map(|r| r.resolved_action.as_deref())),
2477            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2478            opt_string(rows.iter().map(|r| r.status.as_deref())),
2479            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2480        ],
2481    )?)
2482}
2483
2484fn token_usage_batch(rows: Vec<TokenUsageRow>) -> Result<RecordBatch> {
2485    Ok(RecordBatch::try_new(
2486        schema(&[
2487            ("id", DataType::Int64, false),
2488            ("ts_utc", DataType::Utf8, false),
2489            ("session_id", DataType::Utf8, true),
2490            ("agent", DataType::Utf8, true),
2491            ("runtime", DataType::Utf8, true),
2492            ("model", DataType::Utf8, true),
2493            ("input_tokens", DataType::Int64, true),
2494            ("output_tokens", DataType::Int64, true),
2495            ("cached_input_tokens", DataType::Int64, true),
2496            ("cost_usd_micros", DataType::Int64, true),
2497            ("detail_json", DataType::Utf8, true),
2498        ]),
2499        vec![
2500            ids(&rows, |r| r.id),
2501            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2502            opt_string(rows.iter().map(|r| r.session_id.as_deref())),
2503            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2504            opt_string(rows.iter().map(|r| r.runtime.as_deref())),
2505            opt_string(rows.iter().map(|r| r.model.as_deref())),
2506            int64_opt(rows.iter().map(|r| r.input_tokens)),
2507            int64_opt(rows.iter().map(|r| r.output_tokens)),
2508            int64_opt(rows.iter().map(|r| r.cached_input_tokens)),
2509            int64_opt(rows.iter().map(|r| r.cost_usd_micros)),
2510            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2511        ],
2512    )?)
2513}
2514
2515fn source_errors_batch(rows: Vec<SourceErrorRow>) -> Result<RecordBatch> {
2516    Ok(RecordBatch::try_new(
2517        schema(&[
2518            ("id", DataType::Int64, false),
2519            ("ts_utc", DataType::Utf8, false),
2520            ("source", DataType::Utf8, false),
2521            ("error_class", DataType::Utf8, false),
2522            ("count", DataType::Int64, false),
2523            ("detail_json", DataType::Utf8, true),
2524        ]),
2525        vec![
2526            ids(&rows, |r| r.id),
2527            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2528            string(rows.iter().map(|r| Some(r.source.as_str()))),
2529            string(rows.iter().map(|r| Some(r.error_class.as_str()))),
2530            int64(rows.iter().map(|r| r.count)),
2531            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2532        ],
2533    )?)
2534}
2535
2536fn iroh_events_batch(rows: Vec<IrohEventRow>) -> Result<RecordBatch> {
2537    Ok(RecordBatch::try_new(
2538        schema(&[
2539            ("id", DataType::Int64, false),
2540            ("ts_utc", DataType::Utf8, false),
2541            ("event_type", DataType::Utf8, false),
2542            ("peer_id_hash", DataType::Utf8, false),
2543            ("peer_label", DataType::Utf8, true),
2544            ("detail_json", DataType::Utf8, true),
2545        ]),
2546        vec![
2547            ids(&rows, |r| r.id),
2548            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2549            string(rows.iter().map(|r| Some(r.event_type.as_str()))),
2550            string(rows.iter().map(|r| Some(r.peer_id_hash.as_str()))),
2551            opt_string(rows.iter().map(|r| r.peer_label.as_deref())),
2552            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2553        ],
2554    )?)
2555}
2556
2557fn watchdog_events_batch(rows: Vec<WatchdogEventRow>) -> Result<RecordBatch> {
2558    Ok(RecordBatch::try_new(
2559        schema(&[
2560            ("id", DataType::Int64, false),
2561            ("ts_utc", DataType::Utf8, false),
2562            ("event", DataType::Utf8, false),
2563            ("agent", DataType::Utf8, true),
2564            ("severity", DataType::Utf8, true),
2565            ("status", DataType::Utf8, true),
2566            ("detail_json", DataType::Utf8, true),
2567        ]),
2568        vec![
2569            ids(&rows, |r| r.id),
2570            string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
2571            string(rows.iter().map(|r| Some(r.event.as_str()))),
2572            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2573            opt_string(rows.iter().map(|r| r.severity.as_deref())),
2574            opt_string(rows.iter().map(|r| r.status.as_deref())),
2575            opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
2576        ],
2577    )?)
2578}
2579
2580fn tasks_batch(rows: Vec<TaskRow>) -> Result<RecordBatch> {
2581    Ok(RecordBatch::try_new(
2582        schema(&[
2583            ("id", DataType::Int64, false),
2584            ("created_at", DataType::Utf8, false),
2585            ("updated_at", DataType::Utf8, false),
2586            ("title", DataType::Utf8, false),
2587            ("body", DataType::Utf8, true),
2588            ("status", DataType::Utf8, false),
2589            ("priority", DataType::Utf8, true),
2590            ("labels", DataType::Utf8, true),
2591            ("source", DataType::Utf8, true),
2592            ("source_ref", DataType::Utf8, true),
2593            ("closed_at", DataType::Utf8, true),
2594            ("closed_reason", DataType::Utf8, true),
2595            ("closed_evidence", DataType::Utf8, true),
2596            ("agent", DataType::Utf8, true),
2597            ("sync_calendar", DataType::Boolean, false),
2598            ("calendar_task_id", DataType::Utf8, true),
2599        ]),
2600        vec![
2601            ids(&rows, |r| r.id),
2602            string(rows.iter().map(|r| Some(r.created_at.as_str()))),
2603            string(rows.iter().map(|r| Some(r.updated_at.as_str()))),
2604            string(rows.iter().map(|r| Some(r.title.as_str()))),
2605            opt_string(rows.iter().map(|r| r.body.as_deref())),
2606            string(rows.iter().map(|r| Some(r.status.as_str()))),
2607            opt_string(rows.iter().map(|r| r.priority.as_deref())),
2608            opt_string(rows.iter().map(|r| r.labels.as_deref())),
2609            opt_string(rows.iter().map(|r| r.source.as_deref())),
2610            opt_string(rows.iter().map(|r| r.source_ref.as_deref())),
2611            opt_string(rows.iter().map(|r| r.closed_at.as_deref())),
2612            opt_string(rows.iter().map(|r| r.closed_reason.as_deref())),
2613            opt_string(rows.iter().map(|r| r.closed_evidence.as_deref())),
2614            opt_string(rows.iter().map(|r| r.agent.as_deref())),
2615            bools(rows.iter().map(|r| r.sync_calendar)),
2616            opt_string(rows.iter().map(|r| r.calendar_task_id.as_deref())),
2617        ],
2618    )?)
2619}
2620
2621fn simple_event_batch<'a>(
2622    schema: SchemaRef,
2623    rows: impl Iterator<Item = (i64, &'a String, &'a String, &'a String, &'a String)>,
2624) -> Result<RecordBatch> {
2625    let rows = rows.collect::<Vec<_>>();
2626    Ok(RecordBatch::try_new(
2627        schema,
2628        vec![
2629            Arc::new(Int64Array::from(
2630                rows.iter().map(|r| r.0).collect::<Vec<_>>(),
2631            )),
2632            string(rows.iter().map(|r| Some(r.1.as_str()))),
2633            string(rows.iter().map(|r| Some(r.2.as_str()))),
2634            string(rows.iter().map(|r| Some(r.3.as_str()))),
2635            string(rows.iter().map(|r| Some(r.4.as_str()))),
2636        ],
2637    )?)
2638}
2639
2640fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
2641    Arc::new(Schema::new(
2642        fields
2643            .iter()
2644            .map(|(name, ty, nullable)| Field::new(*name, ty.clone(), *nullable))
2645            .collect::<Vec<_>>(),
2646    ))
2647}
2648
2649fn ids<T>(rows: &[T], id: impl Fn(&T) -> i64) -> Arc<Int64Array> {
2650    int64(rows.iter().map(id))
2651}
2652
2653fn int64(values: impl Iterator<Item = i64>) -> Arc<Int64Array> {
2654    Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2655}
2656
2657fn int64_opt(values: impl Iterator<Item = Option<i64>>) -> Arc<Int64Array> {
2658    Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
2659}
2660
2661fn bools(values: impl Iterator<Item = bool>) -> Arc<BooleanArray> {
2662    Arc::new(BooleanArray::from(values.collect::<Vec<_>>()))
2663}
2664
2665fn string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2666    Arc::new(StringArray::from(values.collect::<Vec<_>>()))
2667}
2668
2669fn opt_string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
2670    string(values)
2671}
2672
2673fn ts(ts_utc: DateTime<Utc>) -> String {
2674    ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
2675}
2676
2677fn parse_ts(ts_utc: &str) -> Result<DateTime<Utc>> {
2678    Ok(DateTime::parse_from_rfc3339(ts_utc)?.with_timezone(&Utc))
2679}
2680
2681fn truncate(value: &str, max_chars: usize) -> String {
2682    value.chars().take(max_chars).collect()
2683}
2684
2685fn truncate_opt(value: Option<&str>, max_chars: usize) -> Option<String> {
2686    value.map(|s| truncate(s, max_chars))
2687}
2688
2689#[cfg(test)]
2690mod tests {
2691    use super::*;
2692
2693    fn db() -> Result<Db> {
2694        let db = Db::open_in_memory()?;
2695        db.migrate()?;
2696        Ok(db)
2697    }
2698
2699    fn fixed_ts() -> DateTime<Utc> {
2700        chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
2701            .unwrap()
2702            .with_timezone(&Utc)
2703    }
2704
2705    #[test]
2706    fn referenced_tables_extracts_scope() {
2707        let scope =
2708            referenced_storage_tables("SELECT COUNT(*) FROM messages WHERE ts_utc >= '2026-04-18'")
2709                .expect("single-table scope");
2710        assert!(scope.contains(MESSAGES));
2711        assert!(!scope.contains(CRASHES));
2712        assert_eq!(scope.len(), 1);
2713    }
2714
2715    #[test]
2716    fn referenced_tables_handles_alias_and_view() {
2717        let scope = referenced_storage_tables(
2718            "SELECT ts_utc_start FROM clone_lifetimes WHERE status IS NOT NULL",
2719        )
2720        .expect("view resolves to backing table");
2721        assert!(scope.contains(CLONE_DISPATCHES));
2722    }
2723
2724    #[test]
2725    fn referenced_tables_maps_datafusion_name_to_storage() {
2726        let scope =
2727            referenced_storage_tables("SELECT title FROM tasks").expect("tasks -> netsky_tasks");
2728        assert!(scope.contains(TASKS));
2729        assert!(!scope.contains("tasks"));
2730    }
2731
2732    #[test]
2733    fn referenced_tables_unknown_sql_returns_none() {
2734        assert!(referenced_storage_tables("SELECT 1").is_none());
2735    }
2736
2737    #[test]
2738    fn referenced_tables_join_captures_both() {
2739        let scope = referenced_storage_tables(
2740            "SELECT a.id FROM messages a JOIN crashes b ON a.ts_utc = b.ts_utc",
2741        )
2742        .expect("two tables");
2743        assert!(scope.contains(MESSAGES));
2744        assert!(scope.contains(CRASHES));
2745    }
2746
2747    #[test]
2748    fn list_tasks_pushes_filter_and_order_into_sql() -> Result<()> {
2749        let db = db()?;
2750        let mk = |title: &'static str, status: &'static str, priority: &'static str| {
2751            db.record_task(TaskRecord {
2752                title,
2753                body: None,
2754                status,
2755                priority: Some(priority),
2756                labels: None,
2757                source: None,
2758                source_ref: None,
2759                closed_at: None,
2760                closed_reason: None,
2761                closed_evidence: None,
2762                agent: None,
2763                sync_calendar: false,
2764                calendar_task_id: None,
2765            })
2766        };
2767        mk("c", "open", "p2")?;
2768        mk("a", "open", "p1")?;
2769        mk("b", "in_progress", "p1")?;
2770        mk("d", "closed", "p3")?;
2771
2772        let all = db.list_tasks(None, None)?;
2773        let order: Vec<_> = all.iter().map(|r| r.title.as_str()).collect();
2774        assert_eq!(
2775            order,
2776            vec!["d", "b", "a", "c"],
2777            "status then priority then id"
2778        );
2779
2780        let open_only = db.list_tasks(Some("open"), None)?;
2781        assert_eq!(open_only.len(), 2);
2782        assert!(open_only.iter().all(|r| r.status == "open"));
2783
2784        let p1 = db.list_tasks(None, Some("p1"))?;
2785        assert_eq!(p1.len(), 2);
2786        assert!(p1.iter().all(|r| r.priority.as_deref() == Some("p1")));
2787
2788        let open_p1 = db.list_tasks(Some("open"), Some("p1"))?;
2789        assert_eq!(open_p1.len(), 1);
2790        assert_eq!(open_p1[0].title, "a");
2791        Ok(())
2792    }
2793
2794    #[test]
2795    fn migrate_installs_expression_indexes_for_timed_tables() -> Result<()> {
2796        let db = db()?;
2797        let names = db.with_conn(|conn| async move {
2798            let mut rows = conn
2799                .query("SELECT name FROM sqlite_master WHERE type = 'index'", ())
2800                .await?;
2801            let mut out = Vec::new();
2802            while let Some(row) = rows.next().await? {
2803                out.push(text_value(&row.get_value(0)?)?);
2804            }
2805            Ok(out)
2806        })?;
2807        for (table, column) in INDEXED_TIME_COLUMNS {
2808            let expected = format!("idx_{table}_{column}");
2809            assert!(
2810                names.iter().any(|n| n == &expected),
2811                "missing index {expected} in {names:?}"
2812            );
2813        }
2814        for (table, column) in INDEXED_EQ_COLUMNS {
2815            let expected = format!("idx_{table}_{column}");
2816            assert!(
2817                names.iter().any(|n| n == &expected),
2818                "missing index {expected} in {names:?}"
2819            );
2820        }
2821        Ok(())
2822    }
2823
2824    #[test]
2825    fn contains_word_respects_identifier_boundary() {
2826        assert!(!contains_word("select from netsky_tasks", "tasks"));
2827        assert!(contains_word("select from tasks where", "tasks"));
2828        assert!(contains_word("FROM tasks;", "tasks"));
2829        assert!(!contains_word("classes", "class"));
2830    }
2831
2832    #[test]
2833    fn message_round_trip() -> Result<()> {
2834        let db = db()?;
2835        let id = db.record_message(MessageRecord {
2836            ts_utc: fixed_ts(),
2837            source: "imessage",
2838            direction: Direction::Inbound,
2839            chat_id: Some("chat-1"),
2840            from_agent: Some("agent9"),
2841            to_agent: None,
2842            body: Some("hello"),
2843            raw_json: Some("{\"chat_id\":\"chat-1\"}"),
2844        })?;
2845        let out = db.query("SELECT id, ts_utc, source, direction, chat_id, body FROM messages")?;
2846        assert_eq!(id, 1);
2847        assert!(out.contains("2026-04-15T23:00:00.000Z"));
2848        assert!(out.contains("imessage"));
2849        assert!(out.contains("inbound"));
2850        assert!(out.contains("chat-1"));
2851        assert!(out.contains("hello"));
2852        Ok(())
2853    }
2854
2855    #[test]
2856    fn cli_round_trip() -> Result<()> {
2857        let db = db()?;
2858        let id = db.record_cli(
2859            fixed_ts(),
2860            "netsky",
2861            "[\"db\", \"migrate\"]",
2862            Some(0),
2863            Some(12),
2864            "host-a",
2865        )?;
2866        let out = db.query("SELECT bin, exit_code, duration_ms, host FROM cli_invocations")?;
2867        assert_eq!(id, 1);
2868        assert!(out.contains("netsky"));
2869        assert!(out.contains("host-a"));
2870        assert!(out.contains("12"));
2871        Ok(())
2872    }
2873
2874    #[test]
2875    fn crash_round_trip() -> Result<()> {
2876        let db = db()?;
2877        let id = db.record_crash(fixed_ts(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
2878        let out = db.query("SELECT kind, agent, detail_json FROM crashes")?;
2879        assert_eq!(id, 1);
2880        assert!(out.contains("panic"));
2881        assert!(out.contains("agent8"));
2882        assert!(out.contains("wedged"));
2883        Ok(())
2884    }
2885
2886    #[test]
2887    fn tick_round_trip() -> Result<()> {
2888        let db = db()?;
2889        let id = db.record_tick(fixed_ts(), "ticker", "{\"beat\":1}")?;
2890        let out = db.query("SELECT source, detail_json FROM ticks")?;
2891        assert_eq!(id, 1);
2892        assert!(out.contains("ticker"));
2893        assert!(out.contains("beat"));
2894        Ok(())
2895    }
2896
2897    #[test]
2898    fn workspace_round_trip() -> Result<()> {
2899        let db = db()?;
2900        let created = fixed_ts();
2901        let deleted = created + chrono::Duration::minutes(5);
2902        let id = db.record_workspace(
2903            created,
2904            "session8",
2905            "feat/netsky-db-v0",
2906            Some(deleted),
2907            Some("kept"),
2908        )?;
2909        let out = db.query("SELECT name, branch, verdict FROM workspaces")?;
2910        assert_eq!(id, 1);
2911        assert!(out.contains("session8"));
2912        assert!(out.contains("feat/netsky-db-v0"));
2913        assert!(out.contains("kept"));
2914        Ok(())
2915    }
2916
2917    #[test]
2918    fn session_round_trip() -> Result<()> {
2919        let db = db()?;
2920        let id = db.record_session(fixed_ts(), "agent8", 8, SessionEvent::Note)?;
2921        let out = db.query("SELECT agent, session_num, event FROM sessions")?;
2922        assert_eq!(id, 1);
2923        assert!(out.contains("agent8"));
2924        assert!(out.contains("note"));
2925        Ok(())
2926    }
2927
2928    #[test]
2929    fn v2_tables_round_trip() -> Result<()> {
2930        let db = db()?;
2931        db.record_clone_dispatch(CloneDispatchRecord {
2932            ts_utc_start: fixed_ts(),
2933            ts_utc_end: Some(fixed_ts()),
2934            agent_id: "agent3",
2935            runtime: Some("codex"),
2936            brief_path: Some("briefs/foo.md"),
2937            brief: Some("do the thing"),
2938            workspace: Some("workspaces/foo/repo"),
2939            branch: Some("feat/foo"),
2940            status: Some("done"),
2941            exit_code: Some(0),
2942            detail_json: Some("{\"ok\":true}"),
2943        })?;
2944        db.record_harvest_event(HarvestEventRecord {
2945            ts_utc: fixed_ts(),
2946            source_branch: "feat/foo",
2947            target_branch: "main",
2948            commit_sha: Some("abc123"),
2949            status: "clean",
2950            conflicts: None,
2951            detail_json: None,
2952        })?;
2953        db.record_communication_event(CommunicationEventRecord {
2954            ts_utc: fixed_ts(),
2955            source: "imessage",
2956            tool: Some("reply"),
2957            direction: Direction::Outbound,
2958            chat_id: Some("chat-1"),
2959            message_id: Some("msg-1"),
2960            handle: Some("+15551234567"),
2961            agent: Some("agent0"),
2962            body: Some("sent"),
2963            status: Some("ok"),
2964            detail_json: None,
2965        })?;
2966        let comms = db.list_communication_events()?;
2967        assert_eq!(comms.len(), 1);
2968        assert_eq!(comms[0].source, "imessage");
2969        assert_eq!(comms[0].agent.as_deref(), Some("agent0"));
2970        db.record_mcp_tool_call(McpToolCallRecord {
2971            ts_utc_start: fixed_ts(),
2972            ts_utc_end: Some(fixed_ts()),
2973            source: "drive",
2974            tool: "list_files",
2975            agent: Some("agent0"),
2976            duration_ms: Some(22),
2977            success: true,
2978            error: None,
2979            timeout_race: false,
2980            request_json: Some("{}"),
2981            response_json: Some("[]"),
2982        })?;
2983        db.record_git_operation(GitOperationRecord {
2984            ts_utc: fixed_ts(),
2985            operation: "push",
2986            repo: "netsky",
2987            branch: Some("feat/foo"),
2988            remote: Some("origin"),
2989            from_sha: Some("abc"),
2990            to_sha: Some("def"),
2991            status: "ok",
2992            detail_json: None,
2993        })?;
2994        db.record_owner_directive(OwnerDirectiveRecord {
2995            ts_utc: fixed_ts(),
2996            source: "imessage",
2997            chat_id: Some("chat-1"),
2998            raw_text: "ship it",
2999            resolved_action: Some("dispatch"),
3000            agent: Some("agent0"),
3001            status: Some("accepted"),
3002            detail_json: None,
3003        })?;
3004        db.record_token_usage(TokenUsageRecord {
3005            ts_utc: fixed_ts(),
3006            session_id: Some("session-1"),
3007            agent: Some("agent4"),
3008            runtime: Some("codex"),
3009            model: Some("gpt-5.4"),
3010            input_tokens: Some(100),
3011            output_tokens: Some(25),
3012            cached_input_tokens: Some(10),
3013            cost_usd_micros: Some(1234),
3014            detail_json: None,
3015        })?;
3016        db.record_watchdog_event(WatchdogEventRecord {
3017            ts_utc: fixed_ts(),
3018            event: "respawn",
3019            agent: Some("agent0"),
3020            severity: Some("warn"),
3021            status: Some("ok"),
3022            detail_json: Some("{}"),
3023        })?;
3024
3025        for table in [
3026            "clone_dispatches",
3027            "harvest_events",
3028            "communication_events",
3029            "mcp_tool_calls",
3030            "git_operations",
3031            "owner_directives",
3032            "token_usage",
3033            "watchdog_events",
3034        ] {
3035            let out = db.query(&format!("SELECT COUNT(*) AS count FROM {table}"))?;
3036            assert!(out.contains('1'), "{table}: {out}");
3037        }
3038        Ok(())
3039    }
3040
3041    #[test]
3042    fn token_usage_batch_inserts_contiguous_ids() -> Result<()> {
3043        let db = db()?;
3044        let count = 1500;
3045        let records: Vec<_> = (0..count)
3046            .map(|i| TokenUsageRecord {
3047                ts_utc: fixed_ts(),
3048                session_id: Some("batch-session"),
3049                agent: Some("agent45"),
3050                runtime: Some("claude"),
3051                model: Some("claude-opus-4-7"),
3052                input_tokens: Some(i),
3053                output_tokens: Some(i * 2),
3054                cached_input_tokens: None,
3055                cost_usd_micros: None,
3056                detail_json: None,
3057            })
3058            .collect();
3059        let written = db.record_token_usage_batch(records)?;
3060        assert_eq!(written, count as usize);
3061        let out = db.query(
3062            "SELECT COUNT(*) AS c, MIN(id) AS lo, MAX(id) AS hi, \
3063             SUM(input_tokens) AS sum_in \
3064             FROM token_usage \
3065             WHERE session_id = 'batch-session'",
3066        )?;
3067        assert!(out.contains(&count.to_string()), "{out}");
3068        // Sum of 0..1500 = 1124250
3069        assert!(out.contains("1124250"), "{out}");
3070        // A second batch should pick up where the first left off.
3071        let written = db.record_token_usage_batch(vec![TokenUsageRecord {
3072            ts_utc: fixed_ts(),
3073            session_id: None,
3074            agent: None,
3075            runtime: None,
3076            model: None,
3077            input_tokens: Some(7),
3078            output_tokens: None,
3079            cached_input_tokens: None,
3080            cost_usd_micros: None,
3081            detail_json: None,
3082        }])?;
3083        assert_eq!(written, 1);
3084        let out = db.query(
3085            "SELECT MAX(id) - MAX(CASE WHEN session_id = 'batch-session' THEN id END) AS delta \
3086             FROM token_usage",
3087        )?;
3088        assert!(out.contains('1'), "{out}");
3089        Ok(())
3090    }
3091
3092    #[test]
3093    fn token_usage_batch_empty_is_noop() -> Result<()> {
3094        let db = db()?;
3095        let written = db.record_token_usage_batch(Vec::<TokenUsageRecord<'_>>::new())?;
3096        assert_eq!(written, 0);
3097        Ok(())
3098    }
3099
3100    #[test]
3101    fn task_round_trip() -> Result<()> {
3102        let db = db()?;
3103        let id = db.record_task(TaskRecord {
3104            title: "Add a task tracker",
3105            body: Some("Use turso and DataFusion."),
3106            status: "open",
3107            priority: Some("p1"),
3108            labels: Some("db,cli"),
3109            source: Some("test"),
3110            source_ref: Some("briefs/task-tracker.md"),
3111            closed_at: None,
3112            closed_reason: None,
3113            closed_evidence: None,
3114            agent: None,
3115            sync_calendar: false,
3116            calendar_task_id: None,
3117        })?;
3118        let row = db.update_task(TaskUpdate {
3119            id,
3120            status: Some("in_progress"),
3121            priority: None,
3122            agent: Some("agent3"),
3123            closed_reason: None,
3124            closed_evidence: None,
3125            sync_calendar: Some(true),
3126            calendar_task_id: Some("calendar-1"),
3127        })?;
3128        assert_eq!(row.id, 1);
3129        assert_eq!(row.status, "in_progress");
3130        assert_eq!(row.agent.as_deref(), Some("agent3"));
3131        assert!(row.sync_calendar);
3132        assert_eq!(row.calendar_task_id.as_deref(), Some("calendar-1"));
3133        let rows = db.list_tasks(Some("in_progress"), Some("p1"))?;
3134        assert_eq!(rows.len(), 1);
3135        let out = db.query(
3136            "SELECT title, status, priority, agent, sync_calendar, calendar_task_id FROM tasks",
3137        )?;
3138        assert!(out.contains("Add a task tracker"));
3139        assert!(out.contains("in_progress"));
3140        assert!(out.contains("agent3"));
3141        assert!(out.contains("calendar-1"));
3142        Ok(())
3143    }
3144
3145    #[test]
3146    fn query_registers_all_tables() -> Result<()> {
3147        let db = db()?;
3148        let out = db.query(
3149            "SELECT COUNT(*) FROM messages \
3150             UNION ALL SELECT COUNT(*) FROM cli_invocations \
3151             UNION ALL SELECT COUNT(*) FROM crashes \
3152             UNION ALL SELECT COUNT(*) FROM ticks \
3153             UNION ALL SELECT COUNT(*) FROM workspaces \
3154             UNION ALL SELECT COUNT(*) FROM sessions \
3155             UNION ALL SELECT COUNT(*) FROM clone_dispatches \
3156             UNION ALL SELECT COUNT(*) FROM harvest_events \
3157             UNION ALL SELECT COUNT(*) FROM communication_events \
3158             UNION ALL SELECT COUNT(*) FROM mcp_tool_calls \
3159             UNION ALL SELECT COUNT(*) FROM git_operations \
3160             UNION ALL SELECT COUNT(*) FROM owner_directives \
3161             UNION ALL SELECT COUNT(*) FROM token_usage \
3162             UNION ALL SELECT COUNT(*) FROM watchdog_events \
3163             UNION ALL SELECT COUNT(*) FROM tasks \
3164             UNION ALL SELECT COUNT(*) FROM source_errors \
3165             UNION ALL SELECT COUNT(*) FROM iroh_events",
3166        )?;
3167        assert!(out.contains('0'));
3168        Ok(())
3169    }
3170
3171    #[test]
3172    fn read_only_path_reports_uninitialized_then_reads_initialized_db() -> Result<()> {
3173        let dir = tempfile::tempdir()?;
3174        let path = dir.path().join("meta.db");
3175
3176        let missing = match Db::open_read_only_path(&path) {
3177            Ok(_) => panic!("missing database opened read-only"),
3178            Err(error) => error.to_string(),
3179        };
3180        assert_eq!(
3181            missing,
3182            format!(
3183                "meta.db has not been initialized at {}; a writer must run first.",
3184                path.display()
3185            )
3186        );
3187        assert!(!path.exists());
3188
3189        let uninitialized = Db::open_path(&path)?;
3190        drop(uninitialized);
3191        let no_schema = match Db::open_read_only_path(&path) {
3192            Ok(_) => panic!("uninitialized database opened read-only"),
3193            Err(error) => error.to_string(),
3194        };
3195        assert_eq!(
3196            no_schema,
3197            format!(
3198                "meta.db has not been initialized at {}; a writer must run first.",
3199                path.display()
3200            )
3201        );
3202
3203        let writer = Db::open_path(&path)?;
3204        writer.migrate()?;
3205        writer.record_message(MessageRecord {
3206            ts_utc: fixed_ts(),
3207            source: "agent",
3208            direction: Direction::Outbound,
3209            chat_id: Some("agent0"),
3210            from_agent: Some("agent5"),
3211            to_agent: Some("agent0"),
3212            body: Some("checkpoint acked"),
3213            raw_json: None,
3214        })?;
3215
3216        let reader = Db::open_read_only_path(&path)?;
3217        assert!(reader.read_only_write_probe_fails()?);
3218        let out = reader.query("SELECT source, chat_id, body FROM messages")?;
3219        assert!(out.contains("agent"));
3220        assert!(out.contains("agent0"));
3221        assert!(out.contains("checkpoint acked"));
3222        Ok(())
3223    }
3224
3225    #[test]
3226    fn migrate_sets_schema_version_7() -> Result<()> {
3227        let db = db()?;
3228        assert_eq!(db.schema_version()?, 7);
3229        Ok(())
3230    }
3231
3232    #[test]
3233    fn migrate_sets_sqlite_user_version_7() -> Result<()> {
3234        let db = db()?;
3235        assert_eq!(
3236            db.with_conn(|conn| async move { user_version(&conn).await })?,
3237            SCHEMA_VERSION
3238        );
3239        Ok(())
3240    }
3241
3242    #[test]
3243    fn hash_peer_id_is_stable_and_truncated() {
3244        let a = hash_peer_id("abc123");
3245        let b = hash_peer_id("abc123");
3246        let c = hash_peer_id("abc124");
3247        assert_eq!(a, b, "same input hashes the same");
3248        assert_ne!(a, c, "different input hashes differently");
3249        assert_eq!(a.len(), 16, "8 bytes hex-encoded = 16 chars");
3250        assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
3251    }
3252
3253    /// Fixed-vector test against RFC 6234 SHA-256 outputs. A silent swap
3254    /// to a different digest (blake3, a different truncation width, a
3255    /// non-URL-safe hex alphabet) would invalidate every historical
3256    /// `iroh_events.peer_id_hash` and break cross-run joins on the
3257    /// analytics surface. Pinning the exact bytes means that drift
3258    /// blows this test, not a midnight query.
3259    #[test]
3260    fn hash_peer_id_matches_known_sha256_vectors() {
3261        // sha256("abc") = ba7816bf8f01cfea414140de5dae2223b00361a396177a9c...
3262        assert_eq!(hash_peer_id("abc"), "ba7816bf8f01cfea");
3263        // sha256("") = e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934c...
3264        assert_eq!(hash_peer_id(""), "e3b0c44298fc1c14");
3265        // sha256("netsky0") — representative "local peer" input
3266        assert_eq!(hash_peer_id("netsky0"), "66863bafcf1591d6");
3267    }
3268
3269    #[test]
3270    fn source_error_round_trip() -> Result<()> {
3271        let db = db()?;
3272        let id = db.record_source_error(SourceErrorRecord {
3273            ts_utc: fixed_ts(),
3274            source: "email",
3275            error_class: SourceErrorClass::Timeout,
3276            count: 1,
3277            detail_json: Some("{\"endpoint\":\"gmail\"}"),
3278        })?;
3279        assert_eq!(id, 1);
3280        db.record_source_error(SourceErrorRecord {
3281            ts_utc: fixed_ts(),
3282            source: "iroh",
3283            error_class: SourceErrorClass::NetworkError,
3284            count: 3,
3285            detail_json: None,
3286        })?;
3287        let out = db.query(
3288            "SELECT source, error_class, SUM(count) AS n FROM source_errors \
3289             GROUP BY source, error_class ORDER BY source",
3290        )?;
3291        assert!(out.contains("email"));
3292        assert!(out.contains("timeout"));
3293        assert!(out.contains("network_error"));
3294        Ok(())
3295    }
3296
3297    #[test]
3298    fn source_error_count_floors_at_one() -> Result<()> {
3299        let db = db()?;
3300        db.record_source_error(SourceErrorRecord {
3301            ts_utc: fixed_ts(),
3302            source: "email",
3303            error_class: SourceErrorClass::Unknown,
3304            count: 0,
3305            detail_json: None,
3306        })?;
3307        let out = db.query("SELECT count FROM source_errors")?;
3308        assert!(out.contains('1'));
3309        Ok(())
3310    }
3311
3312    #[test]
3313    fn source_cursor_round_trip() -> Result<()> {
3314        let db = db()?;
3315        assert_eq!(db.read_source_cursor("imessage")?, None);
3316        db.update_source_cursor("imessage", "42")?;
3317        assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("42"));
3318        // Upsert semantics: second write replaces the value.
3319        db.update_source_cursor("imessage", "99")?;
3320        assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("99"));
3321        // Sources are scoped independently.
3322        db.update_source_cursor("email", "abc")?;
3323        let all = db.list_source_cursors()?;
3324        assert_eq!(all.len(), 2);
3325        assert_eq!(all[0].source, "email");
3326        assert_eq!(all[0].cursor_value, "abc");
3327        assert_eq!(all[1].source, "imessage");
3328        assert_eq!(all[1].cursor_value, "99");
3329        // Reset drops the row.
3330        assert!(db.reset_source_cursor("imessage")?);
3331        assert_eq!(db.read_source_cursor("imessage")?, None);
3332        assert!(!db.reset_source_cursor("imessage")?);
3333        Ok(())
3334    }
3335
3336    #[test]
3337    fn event_insert_then_transition_round_trip() -> Result<()> {
3338        let db = db()?;
3339        let delivered = db.insert_event("imessage", fixed_ts(), r#"{"chat":"a"}"#)?;
3340        let failed = db.insert_event("imessage", fixed_ts(), r#"{"chat":"b"}"#)?;
3341        assert!(delivered > 0 && failed > delivered);
3342        db.update_event_delivery(delivered, EventStatus::Delivered, None)?;
3343        db.update_event_delivery(failed, EventStatus::Failed, Some("adapter timed out"))?;
3344        // tail_events returns newest first and surfaces delivery status + reason.
3345        let rows = db.tail_events("imessage", 10)?;
3346        assert_eq!(rows.len(), 2);
3347        assert_eq!(rows[0].id, failed);
3348        assert_eq!(rows[0].delivery_status, "failed");
3349        assert_eq!(rows[0].reason.as_deref(), Some("adapter timed out"));
3350        assert_eq!(rows[1].id, delivered);
3351        assert_eq!(rows[1].delivery_status, "delivered");
3352        assert_eq!(rows[1].reason, None);
3353        // Events from a different source are isolated.
3354        let other = db.insert_event("email", fixed_ts(), "{}")?;
3355        assert!(other > failed);
3356        let only_imessage = db.tail_events("imessage", 10)?;
3357        assert_eq!(only_imessage.len(), 2);
3358        Ok(())
3359    }
3360
3361    #[test]
3362    fn iroh_event_round_trip() -> Result<()> {
3363        let db = db()?;
3364        let hash = hash_peer_id("raw_node_id_abc");
3365        let id = db.record_iroh_event(IrohEventRecord {
3366            ts_utc: fixed_ts(),
3367            event_type: IrohEventType::Connect,
3368            peer_id_hash: &hash,
3369            peer_label: Some("work"),
3370            detail_json: None,
3371        })?;
3372        assert_eq!(id, 1);
3373        db.record_iroh_event(IrohEventRecord {
3374            ts_utc: fixed_ts(),
3375            event_type: IrohEventType::HandshakeRefused,
3376            peer_id_hash: &hash_peer_id("unknown_peer"),
3377            peer_label: None,
3378            detail_json: Some("{\"reason\":\"not_in_allowlist\"}"),
3379        })?;
3380        let out =
3381            db.query("SELECT event_type, peer_id_hash, peer_label FROM iroh_events ORDER BY id")?;
3382        assert!(out.contains("connect"));
3383        assert!(out.contains("handshake_refused"));
3384        assert!(out.contains("work"));
3385        assert!(!out.contains("raw_node_id_abc"), "raw node id leaked");
3386        Ok(())
3387    }
3388
3389    #[test]
3390    fn token_usage_includes_runtime_column() -> Result<()> {
3391        let db = db()?;
3392        db.record_token_usage(TokenUsageRecord {
3393            ts_utc: fixed_ts(),
3394            session_id: Some("s1"),
3395            agent: Some("agent12"),
3396            runtime: Some("claude"),
3397            model: Some("claude-opus-4-7"),
3398            input_tokens: Some(1000),
3399            output_tokens: Some(500),
3400            cached_input_tokens: None,
3401            cost_usd_micros: Some(12_500),
3402            detail_json: None,
3403        })?;
3404        let out =
3405            db.query("SELECT runtime, SUM(input_tokens) AS ins FROM token_usage GROUP BY runtime")?;
3406        assert!(out.contains("claude"));
3407        assert!(out.contains("1000"));
3408        Ok(())
3409    }
3410
3411    #[test]
3412    fn token_usage_batch_ignores_duplicate_claude_session_ingest_rows() -> Result<()> {
3413        let db = db()?;
3414        let detail = r#"{"source":"claude-sessions","path":"/tmp/session.jsonl","line":7}"#;
3415        let records = vec![
3416            TokenUsageRecord {
3417                ts_utc: fixed_ts(),
3418                session_id: Some("session-1"),
3419                agent: Some("agent12"),
3420                runtime: Some("claude"),
3421                model: Some("claude-sonnet-4"),
3422                input_tokens: Some(10),
3423                output_tokens: Some(5),
3424                cached_input_tokens: None,
3425                cost_usd_micros: None,
3426                detail_json: Some(detail),
3427            },
3428            TokenUsageRecord {
3429                ts_utc: fixed_ts(),
3430                session_id: Some("session-1"),
3431                agent: Some("agent12"),
3432                runtime: Some("claude"),
3433                model: Some("claude-sonnet-4"),
3434                input_tokens: Some(10),
3435                output_tokens: Some(5),
3436                cached_input_tokens: None,
3437                cost_usd_micros: None,
3438                detail_json: Some(detail),
3439            },
3440        ];
3441        let written = db.record_token_usage_batch(records)?;
3442        assert_eq!(written, 1);
3443        let out = db.query("SELECT COUNT(*) AS c, MAX(id) AS hi FROM token_usage")?;
3444        assert!(out.contains('1'), "{out}");
3445        Ok(())
3446    }
3447
3448    #[test]
3449    fn migration_from_v6_dedupes_existing_claude_session_rows() -> Result<()> {
3450        let dir = tempfile::tempdir()?;
3451        let path = dir.path().join("meta.db");
3452        let db = Db::open_path(&path)?;
3453
3454        db.with_conn(|conn| async move {
3455            conn.execute_batch(
3456                "PRAGMA synchronous=NORMAL; \
3457                 CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value INTEGER NOT NULL); \
3458                 CREATE TABLE IF NOT EXISTS ids (name TEXT PRIMARY KEY, id INTEGER NOT NULL); \
3459                 CREATE TABLE IF NOT EXISTS token_usage (id INTEGER PRIMARY KEY, row_json TEXT NOT NULL);",
3460            )
3461            .await?;
3462            conn.execute(
3463                "INSERT INTO meta (key, value) VALUES ('schema_version', 6)",
3464                (),
3465            )
3466            .await?;
3467            conn.execute("PRAGMA user_version = 6", ()).await?;
3468            conn.execute("INSERT INTO ids (name, id) VALUES ('token_usage', 2)", ())
3469                .await?;
3470            conn.execute(
3471                "INSERT INTO token_usage (id, row_json) VALUES (?1, ?2)",
3472                params_from_iter([
3473                    Value::from(1),
3474                    Value::from(
3475                        r#"{"id":1,"ts_utc":"2026-04-18T00:00:00Z","session_id":"s1","agent":"agent1","runtime":"claude","model":"claude-sonnet-4","input_tokens":10,"output_tokens":3,"cached_input_tokens":null,"cost_usd_micros":null,"detail_json":"{\"source\":\"claude-sessions\",\"path\":\"/tmp/session.jsonl\",\"line\":8}"}"#,
3476                    ),
3477                ]),
3478            )
3479            .await?;
3480            conn.execute(
3481                "INSERT INTO token_usage (id, row_json) VALUES (?1, ?2)",
3482                params_from_iter([
3483                    Value::from(2),
3484                    Value::from(
3485                        r#"{"id":2,"ts_utc":"2026-04-18T00:01:00Z","session_id":"s1","agent":"agent1","runtime":"claude","model":"claude-sonnet-4","input_tokens":10,"output_tokens":3,"cached_input_tokens":null,"cost_usd_micros":null,"detail_json":"{\"source\":\"claude-sessions\",\"path\":\"/tmp/session.jsonl\",\"line\":8}"}"#,
3486                    ),
3487                ]),
3488            )
3489            .await?;
3490            Ok(())
3491        })?;
3492
3493        let before = db.query("SELECT COUNT(*) AS c FROM token_usage")?;
3494        assert!(before.contains('2'), "{before}");
3495
3496        db.migrate()?;
3497
3498        let after = db.query("SELECT COUNT(*) AS c, MAX(id) AS hi FROM token_usage")?;
3499        assert!(after.contains('1'), "{after}");
3500        assert!(path.with_extension("db.bak.v7-token-usage-dedupe").exists());
3501        Ok(())
3502    }
3503
3504    #[test]
3505    fn clone_lifetimes_view_reads_finished_dispatches() -> Result<()> {
3506        let db = db()?;
3507        db.record_clone_dispatch(CloneDispatchRecord {
3508            ts_utc_start: fixed_ts(),
3509            ts_utc_end: Some(fixed_ts() + chrono::Duration::minutes(7)),
3510            agent_id: "agent5",
3511            runtime: Some("claude"),
3512            brief_path: None,
3513            brief: None,
3514            workspace: Some("workspaces/foo/repo"),
3515            branch: Some("feat/foo"),
3516            status: Some("done"),
3517            exit_code: Some(0),
3518            detail_json: None,
3519        })?;
3520        db.record_clone_dispatch(CloneDispatchRecord {
3521            ts_utc_start: fixed_ts(),
3522            ts_utc_end: None,
3523            agent_id: "agent6",
3524            runtime: Some("codex"),
3525            brief_path: None,
3526            brief: None,
3527            workspace: None,
3528            branch: None,
3529            status: Some("in_flight"),
3530            exit_code: None,
3531            detail_json: None,
3532        })?;
3533        let out = db.query("SELECT agent_id, runtime FROM clone_lifetimes")?;
3534        assert!(out.contains("agent5"));
3535        assert!(
3536            !out.contains("agent6"),
3537            "in-flight dispatches stay excluded"
3538        );
3539        Ok(())
3540    }
3541
3542    #[test]
3543    fn migration_from_v4_preserves_existing_rows_and_adds_new_tables() -> Result<()> {
3544        let dir = tempfile::tempdir()?;
3545        let path = dir.path().join("meta.db");
3546
3547        {
3548            let db = Db::open_path(&path)?;
3549            db.migrate()?;
3550            // Simulate an older binary by stamping down to v4 after
3551            // migration. The tables will still exist (they're all
3552            // created unconditionally), but the next migrate() call
3553            // must bump the version back to SCHEMA_VERSION without
3554            // losing data.
3555            db.with_conn(|conn| async move {
3556                conn.execute("UPDATE meta SET value = 4 WHERE key = 'schema_version'", ())
3557                    .await?;
3558                conn.execute("PRAGMA user_version = 4", ()).await?;
3559                Ok(())
3560            })?;
3561            db.record_message(MessageRecord {
3562                ts_utc: fixed_ts(),
3563                source: "imessage",
3564                direction: Direction::Inbound,
3565                chat_id: Some("chat-1"),
3566                from_agent: None,
3567                to_agent: None,
3568                body: Some("pre-migration"),
3569                raw_json: None,
3570            })?;
3571        }
3572
3573        let db = Db::open_path(&path)?;
3574        db.migrate()?;
3575        assert_eq!(db.schema_version()?, 7);
3576
3577        let out = db.query("SELECT body FROM messages")?;
3578        assert!(
3579            out.contains("pre-migration"),
3580            "migration lost pre-existing row: {out}"
3581        );
3582        // New tables are queryable after migrating up.
3583        let se = db.query("SELECT COUNT(*) FROM source_errors")?;
3584        assert!(se.contains('0'));
3585        let ie = db.query("SELECT COUNT(*) FROM iroh_events")?;
3586        assert!(ie.contains('0'));
3587        // v6 additions stay queryable after the v7 migration.
3588        // and start empty by reading through the typed helpers.
3589        assert!(db.list_source_cursors()?.is_empty());
3590        assert!(db.tail_events("imessage", 10)?.is_empty());
3591        Ok(())
3592    }
3593}