use std::collections::HashSet;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, SecondsFormat, Utc};
use datafusion::arrow::array::{BooleanArray, Int64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::datasource::MemTable;
use datafusion::prelude::SessionContext;
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use thiserror::Error;
use tokio::runtime::Runtime;
use turso::{Builder, Connection, Value, params_from_iter};
pub type Result<T> = std::result::Result<T, Error>;
pub use datafusion::arrow::array as arrow_array;
pub use datafusion::arrow::record_batch::RecordBatch as ArrowRecordBatch;
pub const SCHEMA_VERSION: i64 = 6;
const TABLE_NAMES: [&str; 17] = [
"messages",
"cli_invocations",
"crashes",
"ticks",
"workspaces",
"sessions",
"clone_dispatches",
"harvest_events",
"communication_events",
"mcp_tool_calls",
"git_operations",
"owner_directives",
"token_usage",
"watchdog_events",
"netsky_tasks",
"source_errors",
"iroh_events",
];
const MESSAGES: &str = "messages";
const CLI_INVOCATIONS: &str = "cli_invocations";
const CRASHES: &str = "crashes";
const TICKS: &str = "ticks";
const WORKSPACES: &str = "workspaces";
const SESSIONS: &str = "sessions";
const CLONE_DISPATCHES: &str = "clone_dispatches";
const HARVEST_EVENTS: &str = "harvest_events";
const COMMUNICATION_EVENTS: &str = "communication_events";
const MCP_TOOL_CALLS: &str = "mcp_tool_calls";
const GIT_OPERATIONS: &str = "git_operations";
const OWNER_DIRECTIVES: &str = "owner_directives";
const TOKEN_USAGE: &str = "token_usage";
const WATCHDOG_EVENTS: &str = "watchdog_events";
const TASKS: &str = "netsky_tasks";
const SOURCE_ERRORS: &str = "source_errors";
const IROH_EVENTS: &str = "iroh_events";
const DATAFUSION_TABLES: [(&str, &str); 17] = [
("messages", MESSAGES),
("cli_invocations", CLI_INVOCATIONS),
("crashes", CRASHES),
("ticks", TICKS),
("workspaces", WORKSPACES),
("sessions", SESSIONS),
("clone_dispatches", CLONE_DISPATCHES),
("harvest_events", HARVEST_EVENTS),
("communication_events", COMMUNICATION_EVENTS),
("mcp_tool_calls", MCP_TOOL_CALLS),
("git_operations", GIT_OPERATIONS),
("owner_directives", OWNER_DIRECTIVES),
("token_usage", TOKEN_USAGE),
("watchdog_events", WATCHDOG_EVENTS),
("tasks", TASKS),
("source_errors", SOURCE_ERRORS),
("iroh_events", IROH_EVENTS),
];
const CLONE_LIFETIMES_VIEW: &str = "clone_lifetimes";
const INDEXED_TIME_COLUMNS: &[(&str, &str)] = &[
(MESSAGES, "ts_utc"),
(CLI_INVOCATIONS, "ts_utc"),
(CRASHES, "ts_utc"),
(TICKS, "ts_utc"),
(WORKSPACES, "ts_utc_created"),
(SESSIONS, "ts_utc"),
(CLONE_DISPATCHES, "ts_utc_start"),
(HARVEST_EVENTS, "ts_utc"),
(COMMUNICATION_EVENTS, "ts_utc"),
(MCP_TOOL_CALLS, "ts_utc_start"),
(GIT_OPERATIONS, "ts_utc"),
(OWNER_DIRECTIVES, "ts_utc"),
(TOKEN_USAGE, "ts_utc"),
(WATCHDOG_EVENTS, "ts_utc"),
(TASKS, "created_at"),
(SOURCE_ERRORS, "ts_utc"),
(IROH_EVENTS, "ts_utc"),
];
const INDEXED_EQ_COLUMNS: &[(&str, &str)] = &[
(TASKS, "status"),
(TASKS, "priority"),
(MESSAGES, "source"),
(SESSIONS, "event"),
(IROH_EVENTS, "event_type"),
];
#[derive(Debug, Error)]
pub enum Error {
#[error("home directory not found")]
HomeDirMissing,
#[error("schema version {found} is newer than supported {supported}")]
FutureSchemaVersion { found: i64, supported: i64 },
#[error("meta.db has not been initialized at {0}; a writer must run first.")]
NotInitialized(PathBuf),
#[error("read-only connection was writable at {0}")]
ReadOnlyNotEnforced(PathBuf),
#[error(transparent)]
Turso(#[from] turso::Error),
#[error(transparent)]
DataFusion(#[from] datafusion::error::DataFusionError),
#[error(transparent)]
Arrow(#[from] datafusion::arrow::error::ArrowError),
#[error(transparent)]
Serde(#[from] serde_json::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Chrono(#[from] chrono::ParseError),
#[error("{0} not found")]
NotFound(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Direction {
Inbound,
Outbound,
}
impl Direction {
fn as_str(self) -> &'static str {
match self {
Direction::Inbound => "inbound",
Direction::Outbound => "outbound",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SessionEvent {
Up,
Down,
Note,
}
impl SessionEvent {
fn as_str(self) -> &'static str {
match self {
SessionEvent::Up => "up",
SessionEvent::Down => "down",
SessionEvent::Note => "note",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum EventStatus {
Pending,
Delivered,
Failed,
}
impl EventStatus {
pub fn as_str(self) -> &'static str {
match self {
EventStatus::Pending => "pending",
EventStatus::Delivered => "delivered",
EventStatus::Failed => "failed",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SourceCursorRow {
pub source: String,
pub cursor_value: String,
pub updated_at: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EventRow {
pub id: i64,
pub source: String,
pub ts_utc: String,
pub payload_json: String,
pub delivery_status: String,
pub reason: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SourceErrorClass {
Timeout,
AuthFailure,
RateLimit,
NetworkError,
ProtocolError,
NotFound,
PermissionDenied,
Unknown,
}
impl SourceErrorClass {
pub fn as_str(self) -> &'static str {
match self {
SourceErrorClass::Timeout => "timeout",
SourceErrorClass::AuthFailure => "auth_failure",
SourceErrorClass::RateLimit => "rate_limit",
SourceErrorClass::NetworkError => "network_error",
SourceErrorClass::ProtocolError => "protocol_error",
SourceErrorClass::NotFound => "not_found",
SourceErrorClass::PermissionDenied => "permission_denied",
SourceErrorClass::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum IrohEventType {
Connect,
Evict,
Reconnect,
HandshakeRefused,
}
impl IrohEventType {
pub fn as_str(self) -> &'static str {
match self {
IrohEventType::Connect => "connect",
IrohEventType::Evict => "evict",
IrohEventType::Reconnect => "reconnect",
IrohEventType::HandshakeRefused => "handshake_refused",
}
}
}
pub fn hash_peer_id(raw_node_id: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(raw_node_id.as_bytes());
let digest = hasher.finalize();
hex_encode(&digest[..8])
}
fn hex_encode(bytes: &[u8]) -> String {
const HEX: &[u8; 16] = b"0123456789abcdef";
let mut out = String::with_capacity(bytes.len() * 2);
for byte in bytes {
out.push(HEX[(byte >> 4) as usize] as char);
out.push(HEX[(byte & 0x0f) as usize] as char);
}
out
}
pub struct Db {
path: PathBuf,
}
pub struct MessageRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub direction: Direction,
pub chat_id: Option<&'a str>,
pub from_agent: Option<&'a str>,
pub to_agent: Option<&'a str>,
pub body: Option<&'a str>,
pub raw_json: Option<&'a str>,
}
pub struct CloneDispatchRecord<'a> {
pub ts_utc_start: DateTime<Utc>,
pub ts_utc_end: Option<DateTime<Utc>>,
pub agent_id: &'a str,
pub runtime: Option<&'a str>,
pub brief_path: Option<&'a str>,
pub brief: Option<&'a str>,
pub workspace: Option<&'a str>,
pub branch: Option<&'a str>,
pub status: Option<&'a str>,
pub exit_code: Option<i64>,
pub detail_json: Option<&'a str>,
}
pub struct HarvestEventRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source_branch: &'a str,
pub target_branch: &'a str,
pub commit_sha: Option<&'a str>,
pub status: &'a str,
pub conflicts: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
pub struct CommunicationEventRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub tool: Option<&'a str>,
pub direction: Direction,
pub chat_id: Option<&'a str>,
pub message_id: Option<&'a str>,
pub handle: Option<&'a str>,
pub agent: Option<&'a str>,
pub body: Option<&'a str>,
pub status: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct CommunicationEvent {
pub id: i64,
pub ts_utc: DateTime<Utc>,
pub source: String,
pub tool: Option<String>,
pub direction: String,
pub chat_id: Option<String>,
pub message_id: Option<String>,
pub handle: Option<String>,
pub agent: Option<String>,
pub body: Option<String>,
pub status: Option<String>,
pub detail_json: Option<String>,
}
pub struct McpToolCallRecord<'a> {
pub ts_utc_start: DateTime<Utc>,
pub ts_utc_end: Option<DateTime<Utc>>,
pub source: &'a str,
pub tool: &'a str,
pub agent: Option<&'a str>,
pub duration_ms: Option<i64>,
pub success: bool,
pub error: Option<&'a str>,
pub timeout_race: bool,
pub request_json: Option<&'a str>,
pub response_json: Option<&'a str>,
}
pub struct GitOperationRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub operation: &'a str,
pub repo: &'a str,
pub branch: Option<&'a str>,
pub remote: Option<&'a str>,
pub from_sha: Option<&'a str>,
pub to_sha: Option<&'a str>,
pub status: &'a str,
pub detail_json: Option<&'a str>,
}
pub struct OwnerDirectiveRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub chat_id: Option<&'a str>,
pub raw_text: &'a str,
pub resolved_action: Option<&'a str>,
pub agent: Option<&'a str>,
pub status: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
pub struct TokenUsageRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub session_id: Option<&'a str>,
pub agent: Option<&'a str>,
pub runtime: Option<&'a str>,
pub model: Option<&'a str>,
pub input_tokens: Option<i64>,
pub output_tokens: Option<i64>,
pub cached_input_tokens: Option<i64>,
pub cost_usd_micros: Option<i64>,
pub detail_json: Option<&'a str>,
}
pub struct SourceErrorRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub error_class: SourceErrorClass,
pub count: i64,
pub detail_json: Option<&'a str>,
}
pub struct IrohEventRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub event_type: IrohEventType,
pub peer_id_hash: &'a str,
pub peer_label: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
pub struct WatchdogEventRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub event: &'a str,
pub agent: Option<&'a str>,
pub severity: Option<&'a str>,
pub status: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
pub struct TaskRecord<'a> {
pub title: &'a str,
pub body: Option<&'a str>,
pub status: &'a str,
pub priority: Option<&'a str>,
pub labels: Option<&'a str>,
pub source: Option<&'a str>,
pub source_ref: Option<&'a str>,
pub closed_at: Option<&'a str>,
pub closed_reason: Option<&'a str>,
pub closed_evidence: Option<&'a str>,
pub agent: Option<&'a str>,
pub sync_calendar: bool,
pub calendar_task_id: Option<&'a str>,
}
pub struct TaskUpdate<'a> {
pub id: i64,
pub status: Option<&'a str>,
pub priority: Option<&'a str>,
pub agent: Option<&'a str>,
pub closed_reason: Option<&'a str>,
pub closed_evidence: Option<&'a str>,
pub sync_calendar: Option<bool>,
pub calendar_task_id: Option<&'a str>,
}
#[derive(Debug, Serialize, Deserialize)]
struct MessageRow {
id: i64,
ts_utc: String,
source: String,
direction: String,
chat_id: Option<String>,
from_agent: Option<String>,
to_agent: Option<String>,
body: Option<String>,
raw_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct CliRow {
id: i64,
ts_utc: String,
bin: String,
argv_json: String,
exit_code: Option<i64>,
duration_ms: Option<i64>,
host: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct CrashRow {
id: i64,
ts_utc: String,
kind: String,
agent: String,
detail_json: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct TickRow {
id: i64,
ts_utc: String,
source: String,
detail_json: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct WorkspaceRow {
id: i64,
ts_utc_created: String,
name: String,
branch: String,
ts_utc_deleted: Option<String>,
verdict: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct SessionRow {
id: i64,
ts_utc: String,
agent: String,
session_num: i64,
event: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct CloneDispatchRow {
id: i64,
ts_utc_start: String,
ts_utc_end: Option<String>,
agent_id: String,
runtime: Option<String>,
brief_path: Option<String>,
brief: Option<String>,
workspace: Option<String>,
branch: Option<String>,
status: Option<String>,
exit_code: Option<i64>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct HarvestEventRow {
id: i64,
ts_utc: String,
source_branch: String,
target_branch: String,
commit_sha: Option<String>,
status: String,
conflicts: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct CommunicationEventRow {
id: i64,
ts_utc: String,
source: String,
tool: Option<String>,
direction: String,
chat_id: Option<String>,
message_id: Option<String>,
handle: Option<String>,
agent: Option<String>,
body: Option<String>,
status: Option<String>,
detail_json: Option<String>,
}
impl TryFrom<CommunicationEventRow> for CommunicationEvent {
type Error = Error;
fn try_from(row: CommunicationEventRow) -> Result<Self> {
Ok(Self {
id: row.id,
ts_utc: parse_ts(&row.ts_utc)?,
source: row.source,
tool: row.tool,
direction: row.direction,
chat_id: row.chat_id,
message_id: row.message_id,
handle: row.handle,
agent: row.agent,
body: row.body,
status: row.status,
detail_json: row.detail_json,
})
}
}
#[derive(Debug, Serialize, Deserialize)]
struct McpToolCallRow {
id: i64,
ts_utc_start: String,
ts_utc_end: Option<String>,
source: String,
tool: String,
agent: Option<String>,
duration_ms: Option<i64>,
success: bool,
error: Option<String>,
timeout_race: bool,
request_json: Option<String>,
response_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct GitOperationRow {
id: i64,
ts_utc: String,
operation: String,
repo: String,
branch: Option<String>,
remote: Option<String>,
from_sha: Option<String>,
to_sha: Option<String>,
status: String,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct OwnerDirectiveRow {
id: i64,
ts_utc: String,
source: String,
chat_id: Option<String>,
raw_text: String,
resolved_action: Option<String>,
agent: Option<String>,
status: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TokenUsageRow {
id: i64,
ts_utc: String,
session_id: Option<String>,
agent: Option<String>,
#[serde(default)]
runtime: Option<String>,
model: Option<String>,
input_tokens: Option<i64>,
output_tokens: Option<i64>,
cached_input_tokens: Option<i64>,
cost_usd_micros: Option<i64>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct SourceErrorRow {
id: i64,
ts_utc: String,
source: String,
error_class: String,
count: i64,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct IrohEventRow {
id: i64,
ts_utc: String,
event_type: String,
peer_id_hash: String,
peer_label: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WatchdogEventRow {
id: i64,
ts_utc: String,
event: String,
agent: Option<String>,
severity: Option<String>,
status: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskRow {
pub id: i64,
pub created_at: String,
pub updated_at: String,
pub title: String,
pub body: Option<String>,
pub status: String,
pub priority: Option<String>,
pub labels: Option<String>,
pub source: Option<String>,
pub source_ref: Option<String>,
pub closed_at: Option<String>,
pub closed_reason: Option<String>,
pub closed_evidence: Option<String>,
pub agent: Option<String>,
#[serde(default)]
pub sync_calendar: bool,
#[serde(default)]
pub calendar_task_id: Option<String>,
}
impl Db {
pub fn open() -> Result<Self> {
let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
let dir = home.join(".netsky");
fs::create_dir_all(&dir)?;
Self::open_path(dir.join("meta.db"))
}
pub fn open_read_only() -> Result<Self> {
let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
Self::open_read_only_path(home.join(".netsky").join("meta.db"))
}
pub fn open_path(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if path != Path::new(":memory:")
&& let Some(parent) = path.parent()
{
fs::create_dir_all(parent)?;
}
let db = Self { path };
db.with_conn(|_| async { Ok(()) })?;
Ok(db)
}
pub fn open_read_only_path(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if path != Path::new(":memory:") && !path.exists() {
return Err(Error::NotInitialized(path));
}
let db = Self { path };
db.with_read_only_conn(|_| async { Ok(()) })?;
Ok(db)
}
#[cfg(test)]
pub(crate) fn open_in_memory() -> Result<Self> {
let path = std::env::temp_dir().join(format!(
"netsky-db-test-{}-{}.db",
std::process::id(),
Utc::now().timestamp_nanos_opt().unwrap_or_default()
));
Self::open_path(path)
}
pub fn migrate(&self) -> Result<()> {
self.with_conn(|conn| async move {
let current = schema_version(&conn).await?;
if current > SCHEMA_VERSION {
return Err(Error::FutureSchemaVersion {
found: current,
supported: SCHEMA_VERSION,
});
}
conn.execute(
"INSERT INTO meta (key, value) VALUES (?1, ?2) \
ON CONFLICT(key) DO UPDATE SET value = excluded.value",
params_from_iter([Value::from("schema_version"), Value::from(SCHEMA_VERSION)]),
)
.await?;
for table in TABLE_NAMES {
let sql = format!(
"CREATE TABLE IF NOT EXISTS {table} \
(id INTEGER PRIMARY KEY, row_json TEXT NOT NULL)"
);
conn.execute(&sql, ()).await?;
}
for &(table, column) in INDEXED_TIME_COLUMNS {
let sql = format!(
"CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
ON {table}(json_extract(row_json, '$.{column}'))"
);
conn.execute(&sql, ()).await?;
}
for &(table, column) in INDEXED_EQ_COLUMNS {
let sql = format!(
"CREATE INDEX IF NOT EXISTS idx_{table}_{column} \
ON {table}(json_extract(row_json, '$.{column}'))"
);
conn.execute(&sql, ()).await?;
}
conn.execute(
"CREATE TABLE IF NOT EXISTS source_cursors ( \
source TEXT PRIMARY KEY, \
cursor_value TEXT NOT NULL, \
updated_at TEXT NOT NULL \
)",
(),
)
.await?;
conn.execute(
"CREATE TABLE IF NOT EXISTS events ( \
id INTEGER PRIMARY KEY AUTOINCREMENT, \
source TEXT NOT NULL, \
ts_utc TEXT NOT NULL, \
payload_json TEXT NOT NULL, \
delivery_status TEXT NOT NULL, \
reason TEXT \
)",
(),
)
.await?;
conn.execute(&format!("PRAGMA user_version = {SCHEMA_VERSION}"), ())
.await?;
Ok(())
})
}
pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
self.insert_json("messages", MESSAGES, |id| MessageRow {
id,
ts_utc: ts(record.ts_utc),
source: record.source.to_string(),
direction: record.direction.as_str().to_string(),
chat_id: record.chat_id.map(str::to_string),
from_agent: record.from_agent.map(str::to_string),
to_agent: record.to_agent.map(str::to_string),
body: record.body.map(str::to_string),
raw_json: record.raw_json.map(str::to_string),
})
}
pub fn record_cli(
&self,
ts_utc: DateTime<Utc>,
bin: &str,
argv_json: &str,
exit_code: Option<i64>,
duration_ms: Option<i64>,
host: &str,
) -> Result<i64> {
self.insert_json("cli_invocations", CLI_INVOCATIONS, |id| CliRow {
id,
ts_utc: ts(ts_utc),
bin: bin.to_string(),
argv_json: argv_json.to_string(),
exit_code,
duration_ms,
host: host.to_string(),
})
}
pub fn record_crash(
&self,
ts_utc: DateTime<Utc>,
kind: &str,
agent: &str,
detail_json: &str,
) -> Result<i64> {
self.insert_json("crashes", CRASHES, |id| CrashRow {
id,
ts_utc: ts(ts_utc),
kind: kind.to_string(),
agent: agent.to_string(),
detail_json: detail_json.to_string(),
})
}
pub fn record_tick(
&self,
ts_utc: DateTime<Utc>,
source: &str,
detail_json: &str,
) -> Result<i64> {
self.insert_json("ticks", TICKS, |id| TickRow {
id,
ts_utc: ts(ts_utc),
source: source.to_string(),
detail_json: detail_json.to_string(),
})
}
pub fn record_workspace(
&self,
ts_utc_created: DateTime<Utc>,
name: &str,
branch: &str,
ts_utc_deleted: Option<DateTime<Utc>>,
verdict: Option<&str>,
) -> Result<i64> {
self.insert_json("workspaces", WORKSPACES, |id| WorkspaceRow {
id,
ts_utc_created: ts(ts_utc_created),
name: name.to_string(),
branch: branch.to_string(),
ts_utc_deleted: ts_utc_deleted.map(ts),
verdict: verdict.map(str::to_string),
})
}
pub fn record_session(
&self,
ts_utc: DateTime<Utc>,
agent: &str,
session_num: i64,
event: SessionEvent,
) -> Result<i64> {
self.insert_json("sessions", SESSIONS, |id| SessionRow {
id,
ts_utc: ts(ts_utc),
agent: agent.to_string(),
session_num,
event: event.as_str().to_string(),
})
}
pub fn record_clone_dispatch(&self, record: CloneDispatchRecord<'_>) -> Result<i64> {
self.insert_json("clone_dispatches", CLONE_DISPATCHES, |id| {
CloneDispatchRow {
id,
ts_utc_start: ts(record.ts_utc_start),
ts_utc_end: record.ts_utc_end.map(ts),
agent_id: record.agent_id.to_string(),
runtime: record.runtime.map(str::to_string),
brief_path: record.brief_path.map(str::to_string),
brief: truncate_opt(record.brief, 16_384),
workspace: record.workspace.map(str::to_string),
branch: record.branch.map(str::to_string),
status: record.status.map(str::to_string),
exit_code: record.exit_code,
detail_json: record.detail_json.map(str::to_string),
}
})
}
pub fn record_harvest_event(&self, record: HarvestEventRecord<'_>) -> Result<i64> {
self.insert_json("harvest_events", HARVEST_EVENTS, |id| HarvestEventRow {
id,
ts_utc: ts(record.ts_utc),
source_branch: record.source_branch.to_string(),
target_branch: record.target_branch.to_string(),
commit_sha: record.commit_sha.map(str::to_string),
status: record.status.to_string(),
conflicts: record.conflicts.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_communication_event(&self, record: CommunicationEventRecord<'_>) -> Result<i64> {
self.insert_json("communication_events", COMMUNICATION_EVENTS, |id| {
CommunicationEventRow {
id,
ts_utc: ts(record.ts_utc),
source: record.source.to_string(),
tool: record.tool.map(str::to_string),
direction: record.direction.as_str().to_string(),
chat_id: record.chat_id.map(str::to_string),
message_id: record.message_id.map(str::to_string),
handle: record.handle.map(str::to_string),
agent: record.agent.map(str::to_string),
body: truncate_opt(record.body, 4096),
status: record.status.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
}
})
}
pub fn list_communication_events(&self) -> Result<Vec<CommunicationEvent>> {
let rows: Vec<CommunicationEventRow> = self
.with_read_only_conn(|conn| async move { rows(&conn, COMMUNICATION_EVENTS).await })?;
rows.into_iter().map(CommunicationEvent::try_from).collect()
}
pub fn record_mcp_tool_call(&self, record: McpToolCallRecord<'_>) -> Result<i64> {
self.insert_json("mcp_tool_calls", MCP_TOOL_CALLS, |id| McpToolCallRow {
id,
ts_utc_start: ts(record.ts_utc_start),
ts_utc_end: record.ts_utc_end.map(ts),
source: record.source.to_string(),
tool: record.tool.to_string(),
agent: record.agent.map(str::to_string),
duration_ms: record.duration_ms,
success: record.success,
error: truncate_opt(record.error, 2048),
timeout_race: record.timeout_race,
request_json: truncate_opt(record.request_json, 8192),
response_json: truncate_opt(record.response_json, 8192),
})
}
pub fn record_git_operation(&self, record: GitOperationRecord<'_>) -> Result<i64> {
self.insert_json("git_operations", GIT_OPERATIONS, |id| GitOperationRow {
id,
ts_utc: ts(record.ts_utc),
operation: record.operation.to_string(),
repo: record.repo.to_string(),
branch: record.branch.map(str::to_string),
remote: record.remote.map(str::to_string),
from_sha: record.from_sha.map(str::to_string),
to_sha: record.to_sha.map(str::to_string),
status: record.status.to_string(),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_owner_directive(&self, record: OwnerDirectiveRecord<'_>) -> Result<i64> {
self.insert_json("owner_directives", OWNER_DIRECTIVES, |id| {
OwnerDirectiveRow {
id,
ts_utc: ts(record.ts_utc),
source: record.source.to_string(),
chat_id: record.chat_id.map(str::to_string),
raw_text: truncate(record.raw_text, 16_384),
resolved_action: record.resolved_action.map(str::to_string),
agent: record.agent.map(str::to_string),
status: record.status.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
}
})
}
pub fn record_token_usage(&self, record: TokenUsageRecord<'_>) -> Result<i64> {
self.insert_json("token_usage", TOKEN_USAGE, |id| TokenUsageRow {
id,
ts_utc: ts(record.ts_utc),
session_id: record.session_id.map(str::to_string),
agent: record.agent.map(str::to_string),
runtime: record.runtime.map(str::to_string),
model: record.model.map(str::to_string),
input_tokens: record.input_tokens,
output_tokens: record.output_tokens,
cached_input_tokens: record.cached_input_tokens,
cost_usd_micros: record.cost_usd_micros,
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_token_usage_batch<'a, I>(&self, records: I) -> Result<usize>
where
I: IntoIterator<Item = TokenUsageRecord<'a>>,
{
let raw: Vec<TokenUsageRecord<'_>> = records.into_iter().collect();
if raw.is_empty() {
return Ok(0);
}
let count = raw.len();
let start_id = self.bulk_alloc_ids(TOKEN_USAGE, count as i64)?;
let mut payloads: Vec<(i64, String)> = Vec::with_capacity(count);
let mut owned_rows: Vec<TokenUsageRow> = Vec::with_capacity(count);
for (offset, record) in raw.into_iter().enumerate() {
let id = start_id + offset as i64;
let row = TokenUsageRow {
id,
ts_utc: ts(record.ts_utc),
session_id: record.session_id.map(str::to_string),
agent: record.agent.map(str::to_string),
runtime: record.runtime.map(str::to_string),
model: record.model.map(str::to_string),
input_tokens: record.input_tokens,
output_tokens: record.output_tokens,
cached_input_tokens: record.cached_input_tokens,
cost_usd_micros: record.cost_usd_micros,
detail_json: record.detail_json.map(str::to_string),
};
payloads.push((id, serde_json::to_string(&row)?));
owned_rows.push(row);
}
let table = TOKEN_USAGE;
let payloads_for_write = payloads.clone();
let write = self.with_conn(move |conn| async move {
let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
conn.execute("BEGIN", ()).await?;
for (id, json) in payloads_for_write {
if let Err(error) = conn
.execute(&sql, params_from_iter([Value::from(id), Value::from(json)]))
.await
{
let _ = conn.execute("ROLLBACK", ()).await;
return Err(error.into());
}
}
conn.execute("COMMIT", ()).await?;
Ok(())
});
if let Err(error) = write {
for row in &owned_rows {
spool_write_error(table, row.id, row, &error)?;
}
return Err(error);
}
Ok(count)
}
fn bulk_alloc_ids(&self, name: &'static str, count: i64) -> Result<i64> {
if count <= 0 {
return Ok(0);
}
let result = self.with_conn(move |conn| async move {
let mut rows = conn
.query(
"INSERT INTO ids (name, id) VALUES (?1, ?2) \
ON CONFLICT(name) DO UPDATE SET id = id + ?2 \
RETURNING id",
params_from_iter([Value::from(name), Value::from(count)]),
)
.await?;
if let Some(row) = rows.next().await? {
let last = integer_value(&row.get_value(0)?)?;
return Ok(last - count + 1);
}
Err(Error::NotFound(format!("bulk id allocation for {name}")))
});
match result {
Ok(start) => Ok(start),
Err(error) => {
let fallback = Utc::now().timestamp_micros();
spool_error_json(serde_json::json!({
"ts_utc": ts(Utc::now()),
"table": name,
"id": fallback,
"count": count,
"error": error.to_string(),
"record": {"kind": "bulk_id_allocation"}
}))?;
Ok(fallback)
}
}
}
pub fn record_source_error(&self, record: SourceErrorRecord<'_>) -> Result<i64> {
self.insert_json("source_errors", SOURCE_ERRORS, |id| SourceErrorRow {
id,
ts_utc: ts(record.ts_utc),
source: record.source.to_string(),
error_class: record.error_class.as_str().to_string(),
count: record.count.max(1),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_iroh_event(&self, record: IrohEventRecord<'_>) -> Result<i64> {
self.insert_json("iroh_events", IROH_EVENTS, |id| IrohEventRow {
id,
ts_utc: ts(record.ts_utc),
event_type: record.event_type.as_str().to_string(),
peer_id_hash: record.peer_id_hash.to_string(),
peer_label: record.peer_label.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_watchdog_event(&self, record: WatchdogEventRecord<'_>) -> Result<i64> {
self.insert_json("watchdog_events", WATCHDOG_EVENTS, |id| WatchdogEventRow {
id,
ts_utc: ts(record.ts_utc),
event: record.event.to_string(),
agent: record.agent.map(str::to_string),
severity: record.severity.map(str::to_string),
status: record.status.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn read_source_cursor(&self, source: &str) -> Result<Option<String>> {
let source = source.to_string();
self.with_conn(move |conn| async move {
let mut rows = conn
.query(
"SELECT cursor_value FROM source_cursors WHERE source = ?1",
params_from_iter([Value::from(source)]),
)
.await?;
let Some(row) = rows.next().await? else {
return Ok(None);
};
Ok(Some(text_value(&row.get_value(0)?)?))
})
}
pub fn update_source_cursor(&self, source: &str, value: &str) -> Result<()> {
let source = source.to_string();
let value = value.to_string();
let now = ts(Utc::now());
self.with_conn(move |conn| async move {
conn.execute(
"INSERT INTO source_cursors (source, cursor_value, updated_at) \
VALUES (?1, ?2, ?3) \
ON CONFLICT(source) DO UPDATE SET \
cursor_value = excluded.cursor_value, \
updated_at = excluded.updated_at",
params_from_iter([Value::from(source), Value::from(value), Value::from(now)]),
)
.await?;
Ok(())
})
}
pub fn reset_source_cursor(&self, source: &str) -> Result<bool> {
let source = source.to_string();
self.with_conn(move |conn| async move {
let mut rows = conn
.query(
"DELETE FROM source_cursors WHERE source = ?1 RETURNING source",
params_from_iter([Value::from(source)]),
)
.await?;
Ok(rows.next().await?.is_some())
})
}
pub fn list_source_cursors(&self) -> Result<Vec<SourceCursorRow>> {
self.with_read_only_conn(|conn| async move {
let mut rows = conn
.query(
"SELECT source, cursor_value, updated_at FROM source_cursors ORDER BY source",
(),
)
.await?;
let mut out = Vec::new();
while let Some(row) = rows.next().await? {
out.push(SourceCursorRow {
source: text_value(&row.get_value(0)?)?,
cursor_value: text_value(&row.get_value(1)?)?,
updated_at: text_value(&row.get_value(2)?)?,
});
}
Ok(out)
})
}
pub fn insert_event(
&self,
source: &str,
ts_utc: DateTime<Utc>,
payload_json: &str,
) -> Result<i64> {
let source = source.to_string();
let ts_str = ts(ts_utc);
let payload = payload_json.to_string();
self.with_conn(move |conn| async move {
let mut rows = conn
.query(
"INSERT INTO events (source, ts_utc, payload_json, delivery_status) \
VALUES (?1, ?2, ?3, 'pending') RETURNING id",
params_from_iter([
Value::from(source),
Value::from(ts_str),
Value::from(payload),
]),
)
.await?;
let Some(row) = rows.next().await? else {
return Err(Error::NotFound("event id after insert".into()));
};
integer_value(&row.get_value(0)?)
})
}
pub fn update_event_delivery(
&self,
id: i64,
status: EventStatus,
reason: Option<&str>,
) -> Result<()> {
let reason_value = reason.map(str::to_string);
let status_str = status.as_str().to_string();
self.with_conn(move |conn| async move {
conn.execute(
"UPDATE events SET delivery_status = ?1, reason = ?2 WHERE id = ?3",
params_from_iter([
Value::from(status_str),
match reason_value {
Some(r) => Value::from(r),
None => Value::Null,
},
Value::from(id),
]),
)
.await?;
Ok(())
})
}
pub fn tail_events(&self, source: &str, limit: i64) -> Result<Vec<EventRow>> {
let source = source.to_string();
self.with_read_only_conn(move |conn| async move {
let mut rows = conn
.query(
"SELECT id, source, ts_utc, payload_json, delivery_status, reason \
FROM events WHERE source = ?1 ORDER BY id DESC LIMIT ?2",
params_from_iter([Value::from(source), Value::from(limit)]),
)
.await?;
let mut out = Vec::new();
while let Some(row) = rows.next().await? {
let reason = match row.get_value(5)? {
Value::Null => None,
other => Some(
other
.as_text()
.cloned()
.ok_or_else(|| Error::NotFound("event reason".into()))?,
),
};
out.push(EventRow {
id: integer_value(&row.get_value(0)?)?,
source: text_value(&row.get_value(1)?)?,
ts_utc: text_value(&row.get_value(2)?)?,
payload_json: text_value(&row.get_value(3)?)?,
delivery_status: text_value(&row.get_value(4)?)?,
reason,
});
}
Ok(out)
})
}
pub fn record_task(&self, record: TaskRecord<'_>) -> Result<i64> {
let now = ts(Utc::now());
self.insert_json("netsky_tasks", TASKS, |id| TaskRow {
id,
created_at: now.clone(),
updated_at: now.clone(),
title: record.title.to_string(),
body: record.body.map(str::to_string),
status: record.status.to_string(),
priority: record.priority.map(str::to_string),
labels: record.labels.map(str::to_string),
source: record.source.map(str::to_string),
source_ref: record.source_ref.map(str::to_string),
closed_at: record.closed_at.map(|value| {
if value.is_empty() {
now.clone()
} else {
value.to_string()
}
}),
closed_reason: record.closed_reason.map(str::to_string),
closed_evidence: record.closed_evidence.map(str::to_string),
agent: record.agent.map(str::to_string),
sync_calendar: record.sync_calendar,
calendar_task_id: record.calendar_task_id.map(str::to_string),
})
}
pub fn update_task(&self, update: TaskUpdate<'_>) -> Result<TaskRow> {
let mut row = self
.get_task(update.id)?
.ok_or_else(|| Error::NotFound(format!("task {}", update.id)))?;
let now = ts(Utc::now());
if let Some(status) = update.status {
row.status = status.to_string();
if status == "closed" && row.closed_at.is_none() {
row.closed_at = Some(now.clone());
}
}
if let Some(priority) = update.priority {
row.priority = Some(priority.to_string());
}
if let Some(agent) = update.agent {
row.agent = Some(agent.to_string());
}
if let Some(reason) = update.closed_reason {
row.closed_reason = Some(reason.to_string());
}
if let Some(evidence) = update.closed_evidence {
row.closed_evidence = Some(evidence.to_string());
}
if let Some(sync_calendar) = update.sync_calendar {
row.sync_calendar = sync_calendar;
}
if let Some(calendar_task_id) = update.calendar_task_id {
row.calendar_task_id = Some(calendar_task_id.to_string());
}
row.updated_at = now;
let row_json = serde_json::to_string(&row)?;
let id = update.id;
let write = self.with_conn(move |conn| async move {
conn.execute(
"UPDATE netsky_tasks SET row_json = ?1 WHERE id = ?2",
params_from_iter([Value::from(row_json), Value::from(id)]),
)
.await?;
Ok(())
});
if let Err(error) = write {
spool_write_error("netsky_tasks", update.id, &row, &error)?;
}
Ok(row)
}
pub fn get_task(&self, id: i64) -> Result<Option<TaskRow>> {
self.with_conn(move |conn| async move {
let mut rows = conn
.query(
"SELECT row_json FROM netsky_tasks WHERE id = ?1",
params_from_iter([Value::from(id)]),
)
.await?;
let Some(row) = rows.next().await? else {
return Ok(None);
};
let json = text_value(&row.get_value(0)?)?;
Ok(Some(serde_json::from_str(&json)?))
})
}
pub fn list_tasks(&self, status: Option<&str>, priority: Option<&str>) -> Result<Vec<TaskRow>> {
let status = status.map(str::to_string);
let priority = priority.map(str::to_string);
self.with_conn(move |conn| async move {
let status_val = status.map(Value::from).unwrap_or(Value::Null);
let priority_val = priority.map(Value::from).unwrap_or(Value::Null);
let mut rows = conn
.query(
"SELECT row_json FROM netsky_tasks \
WHERE (?1 IS NULL OR json_extract(row_json, '$.status') = ?1) \
AND (?2 IS NULL OR json_extract(row_json, '$.priority') = ?2) \
ORDER BY json_extract(row_json, '$.status'), \
json_extract(row_json, '$.priority'), \
id",
params_from_iter([status_val, priority_val]),
)
.await?;
let mut out = Vec::new();
while let Some(row) = rows.next().await? {
let json = text_value(&row.get_value(0)?)?;
out.push(serde_json::from_str(&json)?);
}
Ok(out)
})
}
pub fn query(&self, sql: &str) -> Result<String> {
let batches = self.query_batches(sql)?;
Ok(pretty_format_batches(&batches)?.to_string())
}
pub fn query_batches(&self, sql: &str) -> Result<Vec<RecordBatch>> {
let scope = referenced_storage_tables(sql);
let snapshot = self.snapshot_scoped(scope.as_ref())?;
let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(async move {
let ctx = register_snapshot(snapshot).await?;
let df = ctx.sql(sql).await?;
Ok(df.collect().await?)
})
}
pub fn schema_version(&self) -> Result<i64> {
self.with_conn(|conn| async move { schema_version(&conn).await })
}
fn insert_json<T, F>(&self, name: &'static str, table: &'static str, row: F) -> Result<i64>
where
T: Serialize,
F: FnOnce(i64) -> T,
{
let id = self.next_id(name)?;
let row = row(id);
let row_json = serde_json::to_string(&row)?;
let write = self.with_conn(move |conn| async move {
let sql = format!("INSERT INTO {table} (id, row_json) VALUES (?1, ?2)");
conn.execute(
&sql,
params_from_iter([Value::from(id), Value::from(row_json)]),
)
.await?;
Ok(())
});
if let Err(error) = write {
spool_write_error(table, id, &row, &error)?;
}
Ok(id)
}
fn snapshot_scoped(&self, scope: Option<&HashSet<&'static str>>) -> Result<Snapshot> {
let owned: Option<HashSet<&'static str>> = scope.cloned();
self.with_read_only_conn(move |conn| async move {
let wants = |t: &'static str| owned.as_ref().is_none_or(|s| s.contains(t));
Ok(Snapshot {
messages: if wants(MESSAGES) {
rows(&conn, MESSAGES).await?
} else {
Vec::new()
},
cli_invocations: if wants(CLI_INVOCATIONS) {
rows(&conn, CLI_INVOCATIONS).await?
} else {
Vec::new()
},
crashes: if wants(CRASHES) {
rows(&conn, CRASHES).await?
} else {
Vec::new()
},
ticks: if wants(TICKS) {
rows(&conn, TICKS).await?
} else {
Vec::new()
},
workspaces: if wants(WORKSPACES) {
rows(&conn, WORKSPACES).await?
} else {
Vec::new()
},
sessions: if wants(SESSIONS) {
rows(&conn, SESSIONS).await?
} else {
Vec::new()
},
clone_dispatches: if wants(CLONE_DISPATCHES) {
rows(&conn, CLONE_DISPATCHES).await?
} else {
Vec::new()
},
harvest_events: if wants(HARVEST_EVENTS) {
rows(&conn, HARVEST_EVENTS).await?
} else {
Vec::new()
},
communication_events: if wants(COMMUNICATION_EVENTS) {
rows(&conn, COMMUNICATION_EVENTS).await?
} else {
Vec::new()
},
mcp_tool_calls: if wants(MCP_TOOL_CALLS) {
rows(&conn, MCP_TOOL_CALLS).await?
} else {
Vec::new()
},
git_operations: if wants(GIT_OPERATIONS) {
rows(&conn, GIT_OPERATIONS).await?
} else {
Vec::new()
},
owner_directives: if wants(OWNER_DIRECTIVES) {
rows(&conn, OWNER_DIRECTIVES).await?
} else {
Vec::new()
},
token_usage: if wants(TOKEN_USAGE) {
rows(&conn, TOKEN_USAGE).await?
} else {
Vec::new()
},
watchdog_events: if wants(WATCHDOG_EVENTS) {
rows(&conn, WATCHDOG_EVENTS).await?
} else {
Vec::new()
},
tasks: if wants(TASKS) {
rows(&conn, TASKS).await?
} else {
Vec::new()
},
source_errors: if wants(SOURCE_ERRORS) {
rows(&conn, SOURCE_ERRORS).await?
} else {
Vec::new()
},
iroh_events: if wants(IROH_EVENTS) {
rows(&conn, IROH_EVENTS).await?
} else {
Vec::new()
},
})
})
}
fn next_id(&self, name: &'static str) -> Result<i64> {
let result = self.with_conn(move |conn| async move {
let mut rows = conn
.query(
"INSERT INTO ids (name, id) VALUES (?1, 1) \
ON CONFLICT(name) DO UPDATE SET id = id + 1 \
RETURNING id",
params_from_iter([Value::from(name)]),
)
.await?;
if let Some(row) = rows.next().await? {
return integer_value(&row.get_value(0)?);
}
Err(Error::NotFound(format!("id allocation for {name}")))
});
match result {
Ok(id) => Ok(id),
Err(error) => {
let fallback = Utc::now().timestamp_micros();
spool_error_json(serde_json::json!({
"ts_utc": ts(Utc::now()),
"table": name,
"id": fallback,
"error": error.to_string(),
"record": {"kind": "id_allocation"}
}))?;
Ok(fallback)
}
}
}
fn with_conn<T, F, Fut>(&self, f: F) -> Result<T>
where
T: Send + 'static,
F: FnOnce(Connection) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
{
let path = self.path.clone();
let max_attempts = if tokio::runtime::Handle::try_current().is_ok() {
3
} else {
10
};
let task = async move {
let path = path.to_string_lossy().to_string();
let mut last_error = None;
for attempt in 0..=max_attempts {
match Builder::new_local(&path).build().await {
Ok(db) => {
let conn = db.connect()?;
configure_conn(&conn).await?;
return f(conn).await;
}
Err(error) if is_locking_error(&error) && attempt < max_attempts => {
last_error = Some(error);
tokio::time::sleep(Duration::from_millis(10 + attempt * 2)).await;
}
Err(error) => return Err(error.into()),
}
}
Err(last_error
.map(Error::from)
.unwrap_or_else(|| Error::NotFound("turso connection".to_string())))
};
block_on_turso(task)
}
fn with_read_only_conn<T, F, Fut>(&self, f: F) -> Result<T>
where
T: Send + 'static,
F: FnOnce(Connection) -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
{
let path = self.path.clone();
let task = async move {
let path_str = path.to_string_lossy().to_string();
let db = Builder::new_local(&path_str).build().await?;
let conn = db.connect()?;
configure_read_only_conn(&conn, &path).await?;
f(conn).await
};
block_on_turso(task)
}
#[cfg(test)]
fn read_only_write_probe_fails(&self) -> Result<bool> {
self.with_read_only_conn(|conn| async move {
Ok(conn
.execute(
"INSERT INTO meta (key, value) VALUES ('read_only_probe', 1)",
(),
)
.await
.is_err())
})
}
}
struct Snapshot {
messages: Vec<MessageRow>,
cli_invocations: Vec<CliRow>,
crashes: Vec<CrashRow>,
ticks: Vec<TickRow>,
workspaces: Vec<WorkspaceRow>,
sessions: Vec<SessionRow>,
clone_dispatches: Vec<CloneDispatchRow>,
harvest_events: Vec<HarvestEventRow>,
communication_events: Vec<CommunicationEventRow>,
mcp_tool_calls: Vec<McpToolCallRow>,
git_operations: Vec<GitOperationRow>,
owner_directives: Vec<OwnerDirectiveRow>,
token_usage: Vec<TokenUsageRow>,
watchdog_events: Vec<WatchdogEventRow>,
tasks: Vec<TaskRow>,
source_errors: Vec<SourceErrorRow>,
iroh_events: Vec<IrohEventRow>,
}
async fn configure_conn(conn: &Connection) -> Result<()> {
conn.busy_timeout(Duration::from_secs(10))?;
if user_version(conn).await? == SCHEMA_VERSION {
return Ok(());
}
let mut wal = conn.query("PRAGMA journal_mode=WAL", ()).await?;
while wal.next().await?.is_some() {}
conn.execute_batch(
"PRAGMA synchronous=NORMAL; \
CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value INTEGER NOT NULL); \
CREATE TABLE IF NOT EXISTS ids (name TEXT PRIMARY KEY, id INTEGER NOT NULL);",
)
.await?;
Ok(())
}
async fn configure_read_only_conn(conn: &Connection, path: &Path) -> Result<()> {
conn.busy_timeout(Duration::from_secs(10))?;
conn.execute("PRAGMA query_only = 1", ()).await?;
if query_only(conn).await? != 1 {
return Err(Error::ReadOnlyNotEnforced(path.to_path_buf()));
}
ensure_initialized(conn, path).await
}
async fn ensure_initialized(conn: &Connection, path: &Path) -> Result<()> {
if !table_exists(conn, "meta").await? {
return Err(Error::NotInitialized(path.to_path_buf()));
}
let current = schema_version(conn).await?;
if current == 0 {
return Err(Error::NotInitialized(path.to_path_buf()));
}
if current > SCHEMA_VERSION {
return Err(Error::FutureSchemaVersion {
found: current,
supported: SCHEMA_VERSION,
});
}
Ok(())
}
async fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
let mut rows = conn
.query(
"SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1",
params_from_iter([Value::from(table)]),
)
.await?;
Ok(rows.next().await?.is_some())
}
async fn query_only(conn: &Connection) -> Result<i64> {
let mut rows = conn.query("PRAGMA query_only", ()).await?;
let Some(row) = rows.next().await? else {
return Ok(0);
};
integer_value(&row.get_value(0)?)
}
async fn user_version(conn: &Connection) -> Result<i64> {
let mut rows = conn.query("PRAGMA user_version", ()).await?;
let Some(row) = rows.next().await? else {
return Ok(0);
};
integer_value(&row.get_value(0)?)
}
async fn schema_version(conn: &Connection) -> Result<i64> {
let mut rows = conn
.query(
"SELECT value FROM meta WHERE key = ?1",
params_from_iter([Value::from("schema_version")]),
)
.await?;
let Some(row) = rows.next().await? else {
return Ok(0);
};
integer_value(&row.get_value(0)?)
}
async fn rows<T>(conn: &Connection, table: &str) -> Result<Vec<T>>
where
T: DeserializeOwned,
{
let sql = format!("SELECT row_json FROM {table} ORDER BY id");
let mut out = Vec::new();
let mut rows = conn.query(&sql, ()).await?;
while let Some(row) = rows.next().await? {
let json = text_value(&row.get_value(0)?)?;
out.push(serde_json::from_str(&json)?);
}
Ok(out)
}
fn integer_value(value: &Value) -> Result<i64> {
value
.as_integer()
.copied()
.ok_or_else(|| Error::NotFound("integer value".to_string()))
}
fn text_value(value: &Value) -> Result<String> {
value
.as_text()
.cloned()
.ok_or_else(|| Error::NotFound("text value".to_string()))
}
fn block_on_turso<T, Fut>(future: Fut) -> Result<T>
where
T: Send + 'static,
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
{
if tokio::runtime::Handle::try_current().is_ok() {
std::thread::spawn(move || Runtime::new()?.block_on(future))
.join()
.unwrap_or_else(|_| Err(Error::NotFound("turso worker thread".to_string())))
} else {
Runtime::new()?.block_on(future)
}
}
fn is_locking_error(error: &turso::Error) -> bool {
let message = error.to_string();
message.contains("Locking error") || message.contains("locked")
}
fn spool_write_error<T: Serialize>(table: &str, id: i64, record: &T, error: &Error) -> Result<()> {
spool_error_json(serde_json::json!({
"ts_utc": ts(Utc::now()),
"table": table,
"id": id,
"error": error.to_string(),
"record": record,
}))
}
fn spool_error_json(value: serde_json::Value) -> Result<()> {
let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
let dir = home.join(".netsky").join("logs");
fs::create_dir_all(&dir)?;
let path = dir.join(format!(
"meta-db-errors-{}.jsonl",
Utc::now().format("%Y-%m-%d")
));
if let Err(spool_error) = netsky_core::jsonl::append_json_line(&path, &value) {
write_spool_failure_marker(&value, &spool_error)?;
return Err(Error::Io(spool_error));
}
Ok(())
}
fn write_spool_failure_marker(
value: &serde_json::Value,
spool_error: &std::io::Error,
) -> Result<()> {
let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
let dir = home.join(".netsky").join("state");
fs::create_dir_all(&dir)?;
let path = dir.join("meta-db-spool-failed");
let body = serde_json::json!({
"ts_utc": ts(Utc::now()),
"spool_error": spool_error.to_string(),
"record": value,
});
fs::write(path, serde_json::to_vec_pretty(&body)?)?;
Ok(())
}
fn referenced_storage_tables(sql: &str) -> Option<HashSet<&'static str>> {
let lower = sql.to_ascii_lowercase();
let mut out: HashSet<&'static str> = HashSet::new();
for &(df_name, storage) in &DATAFUSION_TABLES {
if contains_word(&lower, df_name) {
out.insert(storage);
}
}
if contains_word(&lower, CLONE_LIFETIMES_VIEW) {
out.insert(CLONE_DISPATCHES);
}
if out.is_empty() { None } else { Some(out) }
}
fn contains_word(haystack: &str, needle: &str) -> bool {
if needle.is_empty() || needle.len() > haystack.len() {
return false;
}
let hb = haystack.as_bytes();
let nb = needle.as_bytes();
let mut cursor = 0;
while cursor + nb.len() <= hb.len() {
let rest = &haystack[cursor..];
let Some(pos) = rest.find(needle) else { break };
let start = cursor + pos;
let end = start + nb.len();
let before_ok = start == 0 || !is_ident_byte(hb[start - 1]);
let after_ok = end == hb.len() || !is_ident_byte(hb[end]);
if before_ok && after_ok {
return true;
}
cursor = start + 1;
}
false
}
fn is_ident_byte(b: u8) -> bool {
b.is_ascii_alphanumeric() || b == b'_'
}
fn register(ctx: &SessionContext, name: &str, batch: RecordBatch) -> Result<()> {
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
ctx.register_table(name, Arc::new(table))?;
Ok(())
}
async fn register_snapshot(snapshot: Snapshot) -> Result<SessionContext> {
let ctx = SessionContext::new();
register(&ctx, "messages", messages_batch(snapshot.messages)?)?;
register(
&ctx,
"cli_invocations",
cli_batch(snapshot.cli_invocations)?,
)?;
register(&ctx, "crashes", crashes_batch(snapshot.crashes)?)?;
register(&ctx, "ticks", ticks_batch(snapshot.ticks)?)?;
register(&ctx, "workspaces", workspaces_batch(snapshot.workspaces)?)?;
register(&ctx, "sessions", sessions_batch(snapshot.sessions)?)?;
register(
&ctx,
"clone_dispatches",
clone_dispatches_batch(snapshot.clone_dispatches)?,
)?;
register(
&ctx,
"harvest_events",
harvest_events_batch(snapshot.harvest_events)?,
)?;
register(
&ctx,
"communication_events",
communication_events_batch(snapshot.communication_events)?,
)?;
register(
&ctx,
"mcp_tool_calls",
mcp_tool_calls_batch(snapshot.mcp_tool_calls)?,
)?;
register(
&ctx,
"git_operations",
git_operations_batch(snapshot.git_operations)?,
)?;
register(
&ctx,
"owner_directives",
owner_directives_batch(snapshot.owner_directives)?,
)?;
register(
&ctx,
"token_usage",
token_usage_batch(snapshot.token_usage)?,
)?;
register(
&ctx,
"watchdog_events",
watchdog_events_batch(snapshot.watchdog_events)?,
)?;
register(&ctx, "tasks", tasks_batch(snapshot.tasks)?)?;
register(
&ctx,
"source_errors",
source_errors_batch(snapshot.source_errors)?,
)?;
register(
&ctx,
"iroh_events",
iroh_events_batch(snapshot.iroh_events)?,
)?;
ctx.sql(
"CREATE VIEW clone_lifetimes AS \
SELECT id, agent_id, runtime, workspace, branch, status, \
ts_utc_start, ts_utc_end \
FROM clone_dispatches \
WHERE ts_utc_end IS NOT NULL",
)
.await?;
Ok(ctx)
}
fn messages_batch(rows: Vec<MessageRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("direction", DataType::Utf8, false),
("chat_id", DataType::Utf8, true),
("from_agent", DataType::Utf8, true),
("to_agent", DataType::Utf8, true),
("body", DataType::Utf8, true),
("raw_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
string(rows.iter().map(|r| Some(r.direction.as_str()))),
opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
opt_string(rows.iter().map(|r| r.from_agent.as_deref())),
opt_string(rows.iter().map(|r| r.to_agent.as_deref())),
opt_string(rows.iter().map(|r| r.body.as_deref())),
opt_string(rows.iter().map(|r| r.raw_json.as_deref())),
],
)?)
}
fn cli_batch(rows: Vec<CliRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("bin", DataType::Utf8, false),
("argv_json", DataType::Utf8, false),
("exit_code", DataType::Int64, true),
("duration_ms", DataType::Int64, true),
("host", DataType::Utf8, false),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.bin.as_str()))),
string(rows.iter().map(|r| Some(r.argv_json.as_str()))),
int64_opt(rows.iter().map(|r| r.exit_code)),
int64_opt(rows.iter().map(|r| r.duration_ms)),
string(rows.iter().map(|r| Some(r.host.as_str()))),
],
)?)
}
fn crashes_batch(rows: Vec<CrashRow>) -> Result<RecordBatch> {
simple_event_batch(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("kind", DataType::Utf8, false),
("agent", DataType::Utf8, false),
("detail_json", DataType::Utf8, false),
]),
rows.iter()
.map(|r| (r.id, &r.ts_utc, &r.kind, &r.agent, &r.detail_json)),
)
}
fn ticks_batch(rows: Vec<TickRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("detail_json", DataType::Utf8, false),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
string(rows.iter().map(|r| Some(r.detail_json.as_str()))),
],
)?)
}
fn workspaces_batch(rows: Vec<WorkspaceRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc_created", DataType::Utf8, false),
("name", DataType::Utf8, false),
("branch", DataType::Utf8, false),
("ts_utc_deleted", DataType::Utf8, true),
("verdict", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc_created.as_str()))),
string(rows.iter().map(|r| Some(r.name.as_str()))),
string(rows.iter().map(|r| Some(r.branch.as_str()))),
opt_string(rows.iter().map(|r| r.ts_utc_deleted.as_deref())),
opt_string(rows.iter().map(|r| r.verdict.as_deref())),
],
)?)
}
fn sessions_batch(rows: Vec<SessionRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("agent", DataType::Utf8, false),
("session_num", DataType::Int64, false),
("event", DataType::Utf8, false),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.agent.as_str()))),
int64(rows.iter().map(|r| r.session_num)),
string(rows.iter().map(|r| Some(r.event.as_str()))),
],
)?)
}
fn clone_dispatches_batch(rows: Vec<CloneDispatchRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc_start", DataType::Utf8, false),
("ts_utc_end", DataType::Utf8, true),
("agent_id", DataType::Utf8, false),
("runtime", DataType::Utf8, true),
("brief_path", DataType::Utf8, true),
("brief", DataType::Utf8, true),
("workspace", DataType::Utf8, true),
("branch", DataType::Utf8, true),
("status", DataType::Utf8, true),
("exit_code", DataType::Int64, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
string(rows.iter().map(|r| Some(r.agent_id.as_str()))),
opt_string(rows.iter().map(|r| r.runtime.as_deref())),
opt_string(rows.iter().map(|r| r.brief_path.as_deref())),
opt_string(rows.iter().map(|r| r.brief.as_deref())),
opt_string(rows.iter().map(|r| r.workspace.as_deref())),
opt_string(rows.iter().map(|r| r.branch.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
int64_opt(rows.iter().map(|r| r.exit_code)),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn harvest_events_batch(rows: Vec<HarvestEventRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source_branch", DataType::Utf8, false),
("target_branch", DataType::Utf8, false),
("commit_sha", DataType::Utf8, true),
("status", DataType::Utf8, false),
("conflicts", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source_branch.as_str()))),
string(rows.iter().map(|r| Some(r.target_branch.as_str()))),
opt_string(rows.iter().map(|r| r.commit_sha.as_deref())),
string(rows.iter().map(|r| Some(r.status.as_str()))),
opt_string(rows.iter().map(|r| r.conflicts.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn communication_events_batch(rows: Vec<CommunicationEventRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("tool", DataType::Utf8, true),
("direction", DataType::Utf8, false),
("chat_id", DataType::Utf8, true),
("message_id", DataType::Utf8, true),
("handle", DataType::Utf8, true),
("agent", DataType::Utf8, true),
("body", DataType::Utf8, true),
("status", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
opt_string(rows.iter().map(|r| r.tool.as_deref())),
string(rows.iter().map(|r| Some(r.direction.as_str()))),
opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
opt_string(rows.iter().map(|r| r.message_id.as_deref())),
opt_string(rows.iter().map(|r| r.handle.as_deref())),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.body.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn mcp_tool_calls_batch(rows: Vec<McpToolCallRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc_start", DataType::Utf8, false),
("ts_utc_end", DataType::Utf8, true),
("source", DataType::Utf8, false),
("tool", DataType::Utf8, false),
("agent", DataType::Utf8, true),
("duration_ms", DataType::Int64, true),
("success", DataType::Boolean, false),
("error", DataType::Utf8, true),
("timeout_race", DataType::Boolean, false),
("request_json", DataType::Utf8, true),
("response_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
string(rows.iter().map(|r| Some(r.source.as_str()))),
string(rows.iter().map(|r| Some(r.tool.as_str()))),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
int64_opt(rows.iter().map(|r| r.duration_ms)),
bools(rows.iter().map(|r| r.success)),
opt_string(rows.iter().map(|r| r.error.as_deref())),
bools(rows.iter().map(|r| r.timeout_race)),
opt_string(rows.iter().map(|r| r.request_json.as_deref())),
opt_string(rows.iter().map(|r| r.response_json.as_deref())),
],
)?)
}
fn git_operations_batch(rows: Vec<GitOperationRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("operation", DataType::Utf8, false),
("repo", DataType::Utf8, false),
("branch", DataType::Utf8, true),
("remote", DataType::Utf8, true),
("from_sha", DataType::Utf8, true),
("to_sha", DataType::Utf8, true),
("status", DataType::Utf8, false),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.operation.as_str()))),
string(rows.iter().map(|r| Some(r.repo.as_str()))),
opt_string(rows.iter().map(|r| r.branch.as_deref())),
opt_string(rows.iter().map(|r| r.remote.as_deref())),
opt_string(rows.iter().map(|r| r.from_sha.as_deref())),
opt_string(rows.iter().map(|r| r.to_sha.as_deref())),
string(rows.iter().map(|r| Some(r.status.as_str()))),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn owner_directives_batch(rows: Vec<OwnerDirectiveRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("chat_id", DataType::Utf8, true),
("raw_text", DataType::Utf8, false),
("resolved_action", DataType::Utf8, true),
("agent", DataType::Utf8, true),
("status", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
string(rows.iter().map(|r| Some(r.raw_text.as_str()))),
opt_string(rows.iter().map(|r| r.resolved_action.as_deref())),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn token_usage_batch(rows: Vec<TokenUsageRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("session_id", DataType::Utf8, true),
("agent", DataType::Utf8, true),
("runtime", DataType::Utf8, true),
("model", DataType::Utf8, true),
("input_tokens", DataType::Int64, true),
("output_tokens", DataType::Int64, true),
("cached_input_tokens", DataType::Int64, true),
("cost_usd_micros", DataType::Int64, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
opt_string(rows.iter().map(|r| r.session_id.as_deref())),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.runtime.as_deref())),
opt_string(rows.iter().map(|r| r.model.as_deref())),
int64_opt(rows.iter().map(|r| r.input_tokens)),
int64_opt(rows.iter().map(|r| r.output_tokens)),
int64_opt(rows.iter().map(|r| r.cached_input_tokens)),
int64_opt(rows.iter().map(|r| r.cost_usd_micros)),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn source_errors_batch(rows: Vec<SourceErrorRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("error_class", DataType::Utf8, false),
("count", DataType::Int64, false),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
string(rows.iter().map(|r| Some(r.error_class.as_str()))),
int64(rows.iter().map(|r| r.count)),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn iroh_events_batch(rows: Vec<IrohEventRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("event_type", DataType::Utf8, false),
("peer_id_hash", DataType::Utf8, false),
("peer_label", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.event_type.as_str()))),
string(rows.iter().map(|r| Some(r.peer_id_hash.as_str()))),
opt_string(rows.iter().map(|r| r.peer_label.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn watchdog_events_batch(rows: Vec<WatchdogEventRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("event", DataType::Utf8, false),
("agent", DataType::Utf8, true),
("severity", DataType::Utf8, true),
("status", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.event.as_str()))),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.severity.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn tasks_batch(rows: Vec<TaskRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("created_at", DataType::Utf8, false),
("updated_at", DataType::Utf8, false),
("title", DataType::Utf8, false),
("body", DataType::Utf8, true),
("status", DataType::Utf8, false),
("priority", DataType::Utf8, true),
("labels", DataType::Utf8, true),
("source", DataType::Utf8, true),
("source_ref", DataType::Utf8, true),
("closed_at", DataType::Utf8, true),
("closed_reason", DataType::Utf8, true),
("closed_evidence", DataType::Utf8, true),
("agent", DataType::Utf8, true),
("sync_calendar", DataType::Boolean, false),
("calendar_task_id", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.created_at.as_str()))),
string(rows.iter().map(|r| Some(r.updated_at.as_str()))),
string(rows.iter().map(|r| Some(r.title.as_str()))),
opt_string(rows.iter().map(|r| r.body.as_deref())),
string(rows.iter().map(|r| Some(r.status.as_str()))),
opt_string(rows.iter().map(|r| r.priority.as_deref())),
opt_string(rows.iter().map(|r| r.labels.as_deref())),
opt_string(rows.iter().map(|r| r.source.as_deref())),
opt_string(rows.iter().map(|r| r.source_ref.as_deref())),
opt_string(rows.iter().map(|r| r.closed_at.as_deref())),
opt_string(rows.iter().map(|r| r.closed_reason.as_deref())),
opt_string(rows.iter().map(|r| r.closed_evidence.as_deref())),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
bools(rows.iter().map(|r| r.sync_calendar)),
opt_string(rows.iter().map(|r| r.calendar_task_id.as_deref())),
],
)?)
}
fn simple_event_batch<'a>(
schema: SchemaRef,
rows: impl Iterator<Item = (i64, &'a String, &'a String, &'a String, &'a String)>,
) -> Result<RecordBatch> {
let rows = rows.collect::<Vec<_>>();
Ok(RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(
rows.iter().map(|r| r.0).collect::<Vec<_>>(),
)),
string(rows.iter().map(|r| Some(r.1.as_str()))),
string(rows.iter().map(|r| Some(r.2.as_str()))),
string(rows.iter().map(|r| Some(r.3.as_str()))),
string(rows.iter().map(|r| Some(r.4.as_str()))),
],
)?)
}
fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
Arc::new(Schema::new(
fields
.iter()
.map(|(name, ty, nullable)| Field::new(*name, ty.clone(), *nullable))
.collect::<Vec<_>>(),
))
}
fn ids<T>(rows: &[T], id: impl Fn(&T) -> i64) -> Arc<Int64Array> {
int64(rows.iter().map(id))
}
fn int64(values: impl Iterator<Item = i64>) -> Arc<Int64Array> {
Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
}
fn int64_opt(values: impl Iterator<Item = Option<i64>>) -> Arc<Int64Array> {
Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
}
fn bools(values: impl Iterator<Item = bool>) -> Arc<BooleanArray> {
Arc::new(BooleanArray::from(values.collect::<Vec<_>>()))
}
fn string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
Arc::new(StringArray::from(values.collect::<Vec<_>>()))
}
fn opt_string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
string(values)
}
fn ts(ts_utc: DateTime<Utc>) -> String {
ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
}
fn parse_ts(ts_utc: &str) -> Result<DateTime<Utc>> {
Ok(DateTime::parse_from_rfc3339(ts_utc)?.with_timezone(&Utc))
}
fn truncate(value: &str, max_chars: usize) -> String {
value.chars().take(max_chars).collect()
}
fn truncate_opt(value: Option<&str>, max_chars: usize) -> Option<String> {
value.map(|s| truncate(s, max_chars))
}
#[cfg(test)]
mod tests {
use super::*;
fn db() -> Result<Db> {
let db = Db::open_in_memory()?;
db.migrate()?;
Ok(db)
}
fn fixed_ts() -> DateTime<Utc> {
chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
.unwrap()
.with_timezone(&Utc)
}
#[test]
fn referenced_tables_extracts_scope() {
let scope =
referenced_storage_tables("SELECT COUNT(*) FROM messages WHERE ts_utc >= '2026-04-18'")
.expect("single-table scope");
assert!(scope.contains(MESSAGES));
assert!(!scope.contains(CRASHES));
assert_eq!(scope.len(), 1);
}
#[test]
fn referenced_tables_handles_alias_and_view() {
let scope = referenced_storage_tables(
"SELECT ts_utc_start FROM clone_lifetimes WHERE status IS NOT NULL",
)
.expect("view resolves to backing table");
assert!(scope.contains(CLONE_DISPATCHES));
}
#[test]
fn referenced_tables_maps_datafusion_name_to_storage() {
let scope =
referenced_storage_tables("SELECT title FROM tasks").expect("tasks -> netsky_tasks");
assert!(scope.contains(TASKS));
assert!(!scope.contains("tasks"));
}
#[test]
fn referenced_tables_unknown_sql_returns_none() {
assert!(referenced_storage_tables("SELECT 1").is_none());
}
#[test]
fn referenced_tables_join_captures_both() {
let scope = referenced_storage_tables(
"SELECT a.id FROM messages a JOIN crashes b ON a.ts_utc = b.ts_utc",
)
.expect("two tables");
assert!(scope.contains(MESSAGES));
assert!(scope.contains(CRASHES));
}
#[test]
fn list_tasks_pushes_filter_and_order_into_sql() -> Result<()> {
let db = db()?;
let mk = |title: &'static str, status: &'static str, priority: &'static str| {
db.record_task(TaskRecord {
title,
body: None,
status,
priority: Some(priority),
labels: None,
source: None,
source_ref: None,
closed_at: None,
closed_reason: None,
closed_evidence: None,
agent: None,
sync_calendar: false,
calendar_task_id: None,
})
};
mk("c", "open", "p2")?;
mk("a", "open", "p1")?;
mk("b", "in_progress", "p1")?;
mk("d", "closed", "p3")?;
let all = db.list_tasks(None, None)?;
let order: Vec<_> = all.iter().map(|r| r.title.as_str()).collect();
assert_eq!(
order,
vec!["d", "b", "a", "c"],
"status then priority then id"
);
let open_only = db.list_tasks(Some("open"), None)?;
assert_eq!(open_only.len(), 2);
assert!(open_only.iter().all(|r| r.status == "open"));
let p1 = db.list_tasks(None, Some("p1"))?;
assert_eq!(p1.len(), 2);
assert!(p1.iter().all(|r| r.priority.as_deref() == Some("p1")));
let open_p1 = db.list_tasks(Some("open"), Some("p1"))?;
assert_eq!(open_p1.len(), 1);
assert_eq!(open_p1[0].title, "a");
Ok(())
}
#[test]
fn migrate_installs_expression_indexes_for_timed_tables() -> Result<()> {
let db = db()?;
let names = db.with_conn(|conn| async move {
let mut rows = conn
.query("SELECT name FROM sqlite_master WHERE type = 'index'", ())
.await?;
let mut out = Vec::new();
while let Some(row) = rows.next().await? {
out.push(text_value(&row.get_value(0)?)?);
}
Ok(out)
})?;
for (table, column) in INDEXED_TIME_COLUMNS {
let expected = format!("idx_{table}_{column}");
assert!(
names.iter().any(|n| n == &expected),
"missing index {expected} in {names:?}"
);
}
for (table, column) in INDEXED_EQ_COLUMNS {
let expected = format!("idx_{table}_{column}");
assert!(
names.iter().any(|n| n == &expected),
"missing index {expected} in {names:?}"
);
}
Ok(())
}
#[test]
fn contains_word_respects_identifier_boundary() {
assert!(!contains_word("select from netsky_tasks", "tasks"));
assert!(contains_word("select from tasks where", "tasks"));
assert!(contains_word("FROM tasks;", "tasks"));
assert!(!contains_word("classes", "class"));
}
#[test]
fn message_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_message(MessageRecord {
ts_utc: fixed_ts(),
source: "imessage",
direction: Direction::Inbound,
chat_id: Some("chat-1"),
from_agent: Some("agent9"),
to_agent: None,
body: Some("hello"),
raw_json: Some("{\"chat_id\":\"chat-1\"}"),
})?;
let out = db.query("SELECT id, ts_utc, source, direction, chat_id, body FROM messages")?;
assert_eq!(id, 1);
assert!(out.contains("2026-04-15T23:00:00.000Z"));
assert!(out.contains("imessage"));
assert!(out.contains("inbound"));
assert!(out.contains("chat-1"));
assert!(out.contains("hello"));
Ok(())
}
#[test]
fn cli_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_cli(
fixed_ts(),
"netsky",
"[\"db\", \"migrate\"]",
Some(0),
Some(12),
"host-a",
)?;
let out = db.query("SELECT bin, exit_code, duration_ms, host FROM cli_invocations")?;
assert_eq!(id, 1);
assert!(out.contains("netsky"));
assert!(out.contains("host-a"));
assert!(out.contains("12"));
Ok(())
}
#[test]
fn crash_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_crash(fixed_ts(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
let out = db.query("SELECT kind, agent, detail_json FROM crashes")?;
assert_eq!(id, 1);
assert!(out.contains("panic"));
assert!(out.contains("agent8"));
assert!(out.contains("wedged"));
Ok(())
}
#[test]
fn tick_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_tick(fixed_ts(), "ticker", "{\"beat\":1}")?;
let out = db.query("SELECT source, detail_json FROM ticks")?;
assert_eq!(id, 1);
assert!(out.contains("ticker"));
assert!(out.contains("beat"));
Ok(())
}
#[test]
fn workspace_round_trip() -> Result<()> {
let db = db()?;
let created = fixed_ts();
let deleted = created + chrono::Duration::minutes(5);
let id = db.record_workspace(
created,
"session8",
"feat/netsky-db-v0",
Some(deleted),
Some("kept"),
)?;
let out = db.query("SELECT name, branch, verdict FROM workspaces")?;
assert_eq!(id, 1);
assert!(out.contains("session8"));
assert!(out.contains("feat/netsky-db-v0"));
assert!(out.contains("kept"));
Ok(())
}
#[test]
fn session_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_session(fixed_ts(), "agent8", 8, SessionEvent::Note)?;
let out = db.query("SELECT agent, session_num, event FROM sessions")?;
assert_eq!(id, 1);
assert!(out.contains("agent8"));
assert!(out.contains("note"));
Ok(())
}
#[test]
fn v2_tables_round_trip() -> Result<()> {
let db = db()?;
db.record_clone_dispatch(CloneDispatchRecord {
ts_utc_start: fixed_ts(),
ts_utc_end: Some(fixed_ts()),
agent_id: "agent3",
runtime: Some("codex"),
brief_path: Some("briefs/foo.md"),
brief: Some("do the thing"),
workspace: Some("workspaces/foo/repo"),
branch: Some("feat/foo"),
status: Some("done"),
exit_code: Some(0),
detail_json: Some("{\"ok\":true}"),
})?;
db.record_harvest_event(HarvestEventRecord {
ts_utc: fixed_ts(),
source_branch: "feat/foo",
target_branch: "main",
commit_sha: Some("abc123"),
status: "clean",
conflicts: None,
detail_json: None,
})?;
db.record_communication_event(CommunicationEventRecord {
ts_utc: fixed_ts(),
source: "imessage",
tool: Some("reply"),
direction: Direction::Outbound,
chat_id: Some("chat-1"),
message_id: Some("msg-1"),
handle: Some("+15551234567"),
agent: Some("agent0"),
body: Some("sent"),
status: Some("ok"),
detail_json: None,
})?;
let comms = db.list_communication_events()?;
assert_eq!(comms.len(), 1);
assert_eq!(comms[0].source, "imessage");
assert_eq!(comms[0].agent.as_deref(), Some("agent0"));
db.record_mcp_tool_call(McpToolCallRecord {
ts_utc_start: fixed_ts(),
ts_utc_end: Some(fixed_ts()),
source: "drive",
tool: "list_files",
agent: Some("agent0"),
duration_ms: Some(22),
success: true,
error: None,
timeout_race: false,
request_json: Some("{}"),
response_json: Some("[]"),
})?;
db.record_git_operation(GitOperationRecord {
ts_utc: fixed_ts(),
operation: "push",
repo: "netsky",
branch: Some("feat/foo"),
remote: Some("origin"),
from_sha: Some("abc"),
to_sha: Some("def"),
status: "ok",
detail_json: None,
})?;
db.record_owner_directive(OwnerDirectiveRecord {
ts_utc: fixed_ts(),
source: "imessage",
chat_id: Some("chat-1"),
raw_text: "ship it",
resolved_action: Some("dispatch"),
agent: Some("agent0"),
status: Some("accepted"),
detail_json: None,
})?;
db.record_token_usage(TokenUsageRecord {
ts_utc: fixed_ts(),
session_id: Some("session-1"),
agent: Some("agent4"),
runtime: Some("codex"),
model: Some("gpt-5.4"),
input_tokens: Some(100),
output_tokens: Some(25),
cached_input_tokens: Some(10),
cost_usd_micros: Some(1234),
detail_json: None,
})?;
db.record_watchdog_event(WatchdogEventRecord {
ts_utc: fixed_ts(),
event: "respawn",
agent: Some("agent0"),
severity: Some("warn"),
status: Some("ok"),
detail_json: Some("{}"),
})?;
for table in [
"clone_dispatches",
"harvest_events",
"communication_events",
"mcp_tool_calls",
"git_operations",
"owner_directives",
"token_usage",
"watchdog_events",
] {
let out = db.query(&format!("SELECT COUNT(*) AS count FROM {table}"))?;
assert!(out.contains('1'), "{table}: {out}");
}
Ok(())
}
#[test]
fn token_usage_batch_inserts_contiguous_ids() -> Result<()> {
let db = db()?;
let count = 1500;
let records: Vec<_> = (0..count)
.map(|i| TokenUsageRecord {
ts_utc: fixed_ts(),
session_id: Some("batch-session"),
agent: Some("agent45"),
runtime: Some("claude"),
model: Some("claude-opus-4-7"),
input_tokens: Some(i),
output_tokens: Some(i * 2),
cached_input_tokens: None,
cost_usd_micros: None,
detail_json: None,
})
.collect();
let written = db.record_token_usage_batch(records)?;
assert_eq!(written, count as usize);
let out = db.query(
"SELECT COUNT(*) AS c, MIN(id) AS lo, MAX(id) AS hi, \
SUM(input_tokens) AS sum_in FROM token_usage",
)?;
assert!(out.contains(&count.to_string()), "{out}");
assert!(out.contains("1124250"), "{out}");
let written = db.record_token_usage_batch(vec![TokenUsageRecord {
ts_utc: fixed_ts(),
session_id: None,
agent: None,
runtime: None,
model: None,
input_tokens: Some(7),
output_tokens: None,
cached_input_tokens: None,
cost_usd_micros: None,
detail_json: None,
}])?;
assert_eq!(written, 1);
let out = db.query("SELECT MAX(id) AS hi FROM token_usage")?;
assert!(out.contains(&(count + 1).to_string()), "{out}");
Ok(())
}
#[test]
fn token_usage_batch_empty_is_noop() -> Result<()> {
let db = db()?;
let written = db.record_token_usage_batch(Vec::<TokenUsageRecord<'_>>::new())?;
assert_eq!(written, 0);
Ok(())
}
#[test]
fn task_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_task(TaskRecord {
title: "Add a task tracker",
body: Some("Use turso and DataFusion."),
status: "open",
priority: Some("p1"),
labels: Some("db,cli"),
source: Some("test"),
source_ref: Some("briefs/task-tracker.md"),
closed_at: None,
closed_reason: None,
closed_evidence: None,
agent: None,
sync_calendar: false,
calendar_task_id: None,
})?;
let row = db.update_task(TaskUpdate {
id,
status: Some("in_progress"),
priority: None,
agent: Some("agent3"),
closed_reason: None,
closed_evidence: None,
sync_calendar: Some(true),
calendar_task_id: Some("calendar-1"),
})?;
assert_eq!(row.id, 1);
assert_eq!(row.status, "in_progress");
assert_eq!(row.agent.as_deref(), Some("agent3"));
assert!(row.sync_calendar);
assert_eq!(row.calendar_task_id.as_deref(), Some("calendar-1"));
let rows = db.list_tasks(Some("in_progress"), Some("p1"))?;
assert_eq!(rows.len(), 1);
let out = db.query(
"SELECT title, status, priority, agent, sync_calendar, calendar_task_id FROM tasks",
)?;
assert!(out.contains("Add a task tracker"));
assert!(out.contains("in_progress"));
assert!(out.contains("agent3"));
assert!(out.contains("calendar-1"));
Ok(())
}
#[test]
fn query_registers_all_tables() -> Result<()> {
let db = db()?;
let out = db.query(
"SELECT COUNT(*) FROM messages \
UNION ALL SELECT COUNT(*) FROM cli_invocations \
UNION ALL SELECT COUNT(*) FROM crashes \
UNION ALL SELECT COUNT(*) FROM ticks \
UNION ALL SELECT COUNT(*) FROM workspaces \
UNION ALL SELECT COUNT(*) FROM sessions \
UNION ALL SELECT COUNT(*) FROM clone_dispatches \
UNION ALL SELECT COUNT(*) FROM harvest_events \
UNION ALL SELECT COUNT(*) FROM communication_events \
UNION ALL SELECT COUNT(*) FROM mcp_tool_calls \
UNION ALL SELECT COUNT(*) FROM git_operations \
UNION ALL SELECT COUNT(*) FROM owner_directives \
UNION ALL SELECT COUNT(*) FROM token_usage \
UNION ALL SELECT COUNT(*) FROM watchdog_events \
UNION ALL SELECT COUNT(*) FROM tasks \
UNION ALL SELECT COUNT(*) FROM source_errors \
UNION ALL SELECT COUNT(*) FROM iroh_events",
)?;
assert!(out.contains('0'));
Ok(())
}
#[test]
fn read_only_path_reports_uninitialized_then_reads_initialized_db() -> Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("meta.db");
let missing = match Db::open_read_only_path(&path) {
Ok(_) => panic!("missing database opened read-only"),
Err(error) => error.to_string(),
};
assert_eq!(
missing,
format!(
"meta.db has not been initialized at {}; a writer must run first.",
path.display()
)
);
assert!(!path.exists());
let uninitialized = Db::open_path(&path)?;
drop(uninitialized);
let no_schema = match Db::open_read_only_path(&path) {
Ok(_) => panic!("uninitialized database opened read-only"),
Err(error) => error.to_string(),
};
assert_eq!(
no_schema,
format!(
"meta.db has not been initialized at {}; a writer must run first.",
path.display()
)
);
let writer = Db::open_path(&path)?;
writer.migrate()?;
writer.record_message(MessageRecord {
ts_utc: fixed_ts(),
source: "agent",
direction: Direction::Outbound,
chat_id: Some("agent0"),
from_agent: Some("agent5"),
to_agent: Some("agent0"),
body: Some("checkpoint acked"),
raw_json: None,
})?;
let reader = Db::open_read_only_path(&path)?;
assert!(reader.read_only_write_probe_fails()?);
let out = reader.query("SELECT source, chat_id, body FROM messages")?;
assert!(out.contains("agent"));
assert!(out.contains("agent0"));
assert!(out.contains("checkpoint acked"));
Ok(())
}
#[test]
fn migrate_sets_schema_version_6() -> Result<()> {
let db = db()?;
assert_eq!(db.schema_version()?, 6);
Ok(())
}
#[test]
fn migrate_sets_sqlite_user_version_6() -> Result<()> {
let db = db()?;
assert_eq!(
db.with_conn(|conn| async move { user_version(&conn).await })?,
SCHEMA_VERSION
);
Ok(())
}
#[test]
fn hash_peer_id_is_stable_and_truncated() {
let a = hash_peer_id("abc123");
let b = hash_peer_id("abc123");
let c = hash_peer_id("abc124");
assert_eq!(a, b, "same input hashes the same");
assert_ne!(a, c, "different input hashes differently");
assert_eq!(a.len(), 16, "8 bytes hex-encoded = 16 chars");
assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn hash_peer_id_matches_known_sha256_vectors() {
assert_eq!(hash_peer_id("abc"), "ba7816bf8f01cfea");
assert_eq!(hash_peer_id(""), "e3b0c44298fc1c14");
assert_eq!(hash_peer_id("netsky0"), "66863bafcf1591d6");
}
#[test]
fn source_error_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_source_error(SourceErrorRecord {
ts_utc: fixed_ts(),
source: "email",
error_class: SourceErrorClass::Timeout,
count: 1,
detail_json: Some("{\"endpoint\":\"gmail\"}"),
})?;
assert_eq!(id, 1);
db.record_source_error(SourceErrorRecord {
ts_utc: fixed_ts(),
source: "iroh",
error_class: SourceErrorClass::NetworkError,
count: 3,
detail_json: None,
})?;
let out = db.query(
"SELECT source, error_class, SUM(count) AS n FROM source_errors \
GROUP BY source, error_class ORDER BY source",
)?;
assert!(out.contains("email"));
assert!(out.contains("timeout"));
assert!(out.contains("network_error"));
Ok(())
}
#[test]
fn source_error_count_floors_at_one() -> Result<()> {
let db = db()?;
db.record_source_error(SourceErrorRecord {
ts_utc: fixed_ts(),
source: "email",
error_class: SourceErrorClass::Unknown,
count: 0,
detail_json: None,
})?;
let out = db.query("SELECT count FROM source_errors")?;
assert!(out.contains('1'));
Ok(())
}
#[test]
fn source_cursor_round_trip() -> Result<()> {
let db = db()?;
assert_eq!(db.read_source_cursor("imessage")?, None);
db.update_source_cursor("imessage", "42")?;
assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("42"));
db.update_source_cursor("imessage", "99")?;
assert_eq!(db.read_source_cursor("imessage")?.as_deref(), Some("99"));
db.update_source_cursor("email", "abc")?;
let all = db.list_source_cursors()?;
assert_eq!(all.len(), 2);
assert_eq!(all[0].source, "email");
assert_eq!(all[0].cursor_value, "abc");
assert_eq!(all[1].source, "imessage");
assert_eq!(all[1].cursor_value, "99");
assert!(db.reset_source_cursor("imessage")?);
assert_eq!(db.read_source_cursor("imessage")?, None);
assert!(!db.reset_source_cursor("imessage")?);
Ok(())
}
#[test]
fn event_insert_then_transition_round_trip() -> Result<()> {
let db = db()?;
let delivered = db.insert_event("imessage", fixed_ts(), r#"{"chat":"a"}"#)?;
let failed = db.insert_event("imessage", fixed_ts(), r#"{"chat":"b"}"#)?;
assert!(delivered > 0 && failed > delivered);
db.update_event_delivery(delivered, EventStatus::Delivered, None)?;
db.update_event_delivery(failed, EventStatus::Failed, Some("adapter timed out"))?;
let rows = db.tail_events("imessage", 10)?;
assert_eq!(rows.len(), 2);
assert_eq!(rows[0].id, failed);
assert_eq!(rows[0].delivery_status, "failed");
assert_eq!(rows[0].reason.as_deref(), Some("adapter timed out"));
assert_eq!(rows[1].id, delivered);
assert_eq!(rows[1].delivery_status, "delivered");
assert_eq!(rows[1].reason, None);
let other = db.insert_event("email", fixed_ts(), "{}")?;
assert!(other > failed);
let only_imessage = db.tail_events("imessage", 10)?;
assert_eq!(only_imessage.len(), 2);
Ok(())
}
#[test]
fn iroh_event_round_trip() -> Result<()> {
let db = db()?;
let hash = hash_peer_id("raw_node_id_abc");
let id = db.record_iroh_event(IrohEventRecord {
ts_utc: fixed_ts(),
event_type: IrohEventType::Connect,
peer_id_hash: &hash,
peer_label: Some("work"),
detail_json: None,
})?;
assert_eq!(id, 1);
db.record_iroh_event(IrohEventRecord {
ts_utc: fixed_ts(),
event_type: IrohEventType::HandshakeRefused,
peer_id_hash: &hash_peer_id("unknown_peer"),
peer_label: None,
detail_json: Some("{\"reason\":\"not_in_allowlist\"}"),
})?;
let out =
db.query("SELECT event_type, peer_id_hash, peer_label FROM iroh_events ORDER BY id")?;
assert!(out.contains("connect"));
assert!(out.contains("handshake_refused"));
assert!(out.contains("work"));
assert!(!out.contains("raw_node_id_abc"), "raw node id leaked");
Ok(())
}
#[test]
fn token_usage_includes_runtime_column() -> Result<()> {
let db = db()?;
db.record_token_usage(TokenUsageRecord {
ts_utc: fixed_ts(),
session_id: Some("s1"),
agent: Some("agent12"),
runtime: Some("claude"),
model: Some("claude-opus-4-7"),
input_tokens: Some(1000),
output_tokens: Some(500),
cached_input_tokens: None,
cost_usd_micros: Some(12_500),
detail_json: None,
})?;
let out =
db.query("SELECT runtime, SUM(input_tokens) AS ins FROM token_usage GROUP BY runtime")?;
assert!(out.contains("claude"));
assert!(out.contains("1000"));
Ok(())
}
#[test]
fn clone_lifetimes_view_reads_finished_dispatches() -> Result<()> {
let db = db()?;
db.record_clone_dispatch(CloneDispatchRecord {
ts_utc_start: fixed_ts(),
ts_utc_end: Some(fixed_ts() + chrono::Duration::minutes(7)),
agent_id: "agent5",
runtime: Some("claude"),
brief_path: None,
brief: None,
workspace: Some("workspaces/foo/repo"),
branch: Some("feat/foo"),
status: Some("done"),
exit_code: Some(0),
detail_json: None,
})?;
db.record_clone_dispatch(CloneDispatchRecord {
ts_utc_start: fixed_ts(),
ts_utc_end: None,
agent_id: "agent6",
runtime: Some("codex"),
brief_path: None,
brief: None,
workspace: None,
branch: None,
status: Some("in_flight"),
exit_code: None,
detail_json: None,
})?;
let out = db.query("SELECT agent_id, runtime FROM clone_lifetimes")?;
assert!(out.contains("agent5"));
assert!(
!out.contains("agent6"),
"in-flight dispatches stay excluded"
);
Ok(())
}
#[test]
fn migration_from_v4_preserves_existing_rows_and_adds_new_tables() -> Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("meta.db");
{
let db = Db::open_path(&path)?;
db.migrate()?;
db.with_conn(|conn| async move {
conn.execute("UPDATE meta SET value = 4 WHERE key = 'schema_version'", ())
.await?;
conn.execute("PRAGMA user_version = 4", ()).await?;
Ok(())
})?;
db.record_message(MessageRecord {
ts_utc: fixed_ts(),
source: "imessage",
direction: Direction::Inbound,
chat_id: Some("chat-1"),
from_agent: None,
to_agent: None,
body: Some("pre-migration"),
raw_json: None,
})?;
}
let db = Db::open_path(&path)?;
db.migrate()?;
assert_eq!(db.schema_version()?, 6);
let out = db.query("SELECT body FROM messages")?;
assert!(
out.contains("pre-migration"),
"migration lost pre-existing row: {out}"
);
let se = db.query("SELECT COUNT(*) FROM source_errors")?;
assert!(se.contains('0'));
let ie = db.query("SELECT COUNT(*) FROM iroh_events")?;
assert!(ie.contains('0'));
assert!(db.list_source_cursors()?.is_empty());
assert!(db.tail_events("imessage", 10)?.is_empty());
Ok(())
}
}