use std::fs;
use std::io::Read;
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, SecondsFormat, Utc};
use datafusion::arrow::array::{BooleanArray, Int64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::util::pretty::pretty_format_batches;
use datafusion::datasource::MemTable;
use datafusion::prelude::SessionContext;
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
pub const SCHEMA_VERSION: i64 = 2;
const META: TableDefinition<&str, i64> = TableDefinition::new("meta");
const IDS: TableDefinition<&str, i64> = TableDefinition::new("ids");
const MESSAGES: TableDefinition<i64, &[u8]> = TableDefinition::new("messages");
const CLI_INVOCATIONS: TableDefinition<i64, &[u8]> = TableDefinition::new("cli_invocations");
const CRASHES: TableDefinition<i64, &[u8]> = TableDefinition::new("crashes");
const TICKS: TableDefinition<i64, &[u8]> = TableDefinition::new("ticks");
const WORKSPACES: TableDefinition<i64, &[u8]> = TableDefinition::new("workspaces");
const SESSIONS: TableDefinition<i64, &[u8]> = TableDefinition::new("sessions");
const CLONE_DISPATCHES: TableDefinition<i64, &[u8]> = TableDefinition::new("clone_dispatches");
const HARVEST_EVENTS: TableDefinition<i64, &[u8]> = TableDefinition::new("harvest_events");
const COMMUNICATION_EVENTS: TableDefinition<i64, &[u8]> =
TableDefinition::new("communication_events");
const MCP_TOOL_CALLS: TableDefinition<i64, &[u8]> = TableDefinition::new("mcp_tool_calls");
const GIT_OPERATIONS: TableDefinition<i64, &[u8]> = TableDefinition::new("git_operations");
const OWNER_DIRECTIVES: TableDefinition<i64, &[u8]> = TableDefinition::new("owner_directives");
const TOKEN_USAGE: TableDefinition<i64, &[u8]> = TableDefinition::new("token_usage");
const WATCHDOG_EVENTS: TableDefinition<i64, &[u8]> = TableDefinition::new("watchdog_events");
#[derive(Debug, Error)]
pub enum Error {
#[error("home directory not found")]
HomeDirMissing,
#[error("schema version {found} is newer than supported {supported}")]
FutureSchemaVersion { found: i64, supported: i64 },
#[error(transparent)]
Redb(Box<redb::Error>),
#[error(transparent)]
DataFusion(#[from] datafusion::error::DataFusionError),
#[error(transparent)]
Arrow(#[from] datafusion::arrow::error::ArrowError),
#[error(transparent)]
Serde(#[from] serde_json::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
}
impl From<redb::Error> for Error {
fn from(error: redb::Error) -> Self {
Self::Redb(Box::new(error))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum Direction {
Inbound,
Outbound,
}
impl Direction {
fn as_str(self) -> &'static str {
match self {
Direction::Inbound => "inbound",
Direction::Outbound => "outbound",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SessionEvent {
Up,
Down,
Note,
}
impl SessionEvent {
fn as_str(self) -> &'static str {
match self {
SessionEvent::Up => "up",
SessionEvent::Down => "down",
SessionEvent::Note => "note",
}
}
}
pub struct Db {
db: Database,
}
pub struct MessageRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub direction: Direction,
pub chat_id: Option<&'a str>,
pub from_agent: Option<&'a str>,
pub to_agent: Option<&'a str>,
pub body: Option<&'a str>,
pub raw_json: Option<&'a str>,
}
pub struct CloneDispatchRecord<'a> {
pub ts_utc_start: DateTime<Utc>,
pub ts_utc_end: Option<DateTime<Utc>>,
pub agent_id: &'a str,
pub runtime: Option<&'a str>,
pub brief_path: Option<&'a str>,
pub brief: Option<&'a str>,
pub workspace: Option<&'a str>,
pub branch: Option<&'a str>,
pub status: Option<&'a str>,
pub exit_code: Option<i64>,
pub detail_json: Option<&'a str>,
}
pub struct HarvestEventRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source_branch: &'a str,
pub target_branch: &'a str,
pub commit_sha: Option<&'a str>,
pub status: &'a str,
pub conflicts: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
pub struct CommunicationEventRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub tool: Option<&'a str>,
pub direction: Direction,
pub chat_id: Option<&'a str>,
pub message_id: Option<&'a str>,
pub handle: Option<&'a str>,
pub agent: Option<&'a str>,
pub body: Option<&'a str>,
pub status: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
pub struct McpToolCallRecord<'a> {
pub ts_utc_start: DateTime<Utc>,
pub ts_utc_end: Option<DateTime<Utc>>,
pub source: &'a str,
pub tool: &'a str,
pub agent: Option<&'a str>,
pub duration_ms: Option<i64>,
pub success: bool,
pub error: Option<&'a str>,
pub timeout_race: bool,
pub request_json: Option<&'a str>,
pub response_json: Option<&'a str>,
}
pub struct GitOperationRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub operation: &'a str,
pub repo: &'a str,
pub branch: Option<&'a str>,
pub remote: Option<&'a str>,
pub from_sha: Option<&'a str>,
pub to_sha: Option<&'a str>,
pub status: &'a str,
pub detail_json: Option<&'a str>,
}
pub struct OwnerDirectiveRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub source: &'a str,
pub chat_id: Option<&'a str>,
pub raw_text: &'a str,
pub resolved_action: Option<&'a str>,
pub agent: Option<&'a str>,
pub status: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
pub struct TokenUsageRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub session_id: Option<&'a str>,
pub agent: Option<&'a str>,
pub model: Option<&'a str>,
pub input_tokens: Option<i64>,
pub output_tokens: Option<i64>,
pub cached_input_tokens: Option<i64>,
pub cost_usd_micros: Option<i64>,
pub detail_json: Option<&'a str>,
}
pub struct WatchdogEventRecord<'a> {
pub ts_utc: DateTime<Utc>,
pub event: &'a str,
pub agent: Option<&'a str>,
pub severity: Option<&'a str>,
pub status: Option<&'a str>,
pub detail_json: Option<&'a str>,
}
#[derive(Debug, Serialize, Deserialize)]
struct MessageRow {
id: i64,
ts_utc: String,
source: String,
direction: String,
chat_id: Option<String>,
from_agent: Option<String>,
to_agent: Option<String>,
body: Option<String>,
raw_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct CliRow {
id: i64,
ts_utc: String,
bin: String,
argv_json: String,
exit_code: Option<i64>,
duration_ms: Option<i64>,
host: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct CrashRow {
id: i64,
ts_utc: String,
kind: String,
agent: String,
detail_json: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct TickRow {
id: i64,
ts_utc: String,
source: String,
detail_json: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct WorkspaceRow {
id: i64,
ts_utc_created: String,
name: String,
branch: String,
ts_utc_deleted: Option<String>,
verdict: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct SessionRow {
id: i64,
ts_utc: String,
agent: String,
session_num: i64,
event: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct CloneDispatchRow {
id: i64,
ts_utc_start: String,
ts_utc_end: Option<String>,
agent_id: String,
runtime: Option<String>,
brief_path: Option<String>,
brief: Option<String>,
workspace: Option<String>,
branch: Option<String>,
status: Option<String>,
exit_code: Option<i64>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct HarvestEventRow {
id: i64,
ts_utc: String,
source_branch: String,
target_branch: String,
commit_sha: Option<String>,
status: String,
conflicts: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct CommunicationEventRow {
id: i64,
ts_utc: String,
source: String,
tool: Option<String>,
direction: String,
chat_id: Option<String>,
message_id: Option<String>,
handle: Option<String>,
agent: Option<String>,
body: Option<String>,
status: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct McpToolCallRow {
id: i64,
ts_utc_start: String,
ts_utc_end: Option<String>,
source: String,
tool: String,
agent: Option<String>,
duration_ms: Option<i64>,
success: bool,
error: Option<String>,
timeout_race: bool,
request_json: Option<String>,
response_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct GitOperationRow {
id: i64,
ts_utc: String,
operation: String,
repo: String,
branch: Option<String>,
remote: Option<String>,
from_sha: Option<String>,
to_sha: Option<String>,
status: String,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct OwnerDirectiveRow {
id: i64,
ts_utc: String,
source: String,
chat_id: Option<String>,
raw_text: String,
resolved_action: Option<String>,
agent: Option<String>,
status: Option<String>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TokenUsageRow {
id: i64,
ts_utc: String,
session_id: Option<String>,
agent: Option<String>,
model: Option<String>,
input_tokens: Option<i64>,
output_tokens: Option<i64>,
cached_input_tokens: Option<i64>,
cost_usd_micros: Option<i64>,
detail_json: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct WatchdogEventRow {
id: i64,
ts_utc: String,
event: String,
agent: Option<String>,
severity: Option<String>,
status: Option<String>,
detail_json: Option<String>,
}
impl Db {
pub fn open() -> Result<Self> {
let home = dirs::home_dir().ok_or(Error::HomeDirMissing)?;
let dir = home.join(".netsky");
fs::create_dir_all(&dir)?;
Self::open_path(dir.join("meta.db"))
}
pub fn open_path(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
archive_legacy_sqlite(path)?;
let db = if path.exists() {
Database::open(path).map_err(redb::Error::from)?
} else {
Database::create(path).map_err(redb::Error::from)?
};
Ok(Self { db })
}
#[cfg(test)]
pub(crate) fn open_in_memory() -> Result<Self> {
let file = tempfile::NamedTempFile::new()?;
let db = Database::create(file.path()).map_err(redb::Error::from)?;
Ok(Self { db })
}
pub fn migrate(&self) -> Result<()> {
let write = self.db.begin_write().map_err(redb::Error::from)?;
{
let mut meta = write.open_table(META).map_err(redb::Error::from)?;
let current = meta
.get("schema_version")
.map_err(redb::Error::from)?
.map(|value| value.value())
.unwrap_or(0);
if current > SCHEMA_VERSION {
return Err(Error::FutureSchemaVersion {
found: current,
supported: SCHEMA_VERSION,
});
}
meta.insert("schema_version", SCHEMA_VERSION)
.map_err(redb::Error::from)?;
}
{
write.open_table(IDS).map_err(redb::Error::from)?;
write.open_table(MESSAGES).map_err(redb::Error::from)?;
write
.open_table(CLI_INVOCATIONS)
.map_err(redb::Error::from)?;
write.open_table(CRASHES).map_err(redb::Error::from)?;
write.open_table(TICKS).map_err(redb::Error::from)?;
write.open_table(WORKSPACES).map_err(redb::Error::from)?;
write.open_table(SESSIONS).map_err(redb::Error::from)?;
write
.open_table(CLONE_DISPATCHES)
.map_err(redb::Error::from)?;
write
.open_table(HARVEST_EVENTS)
.map_err(redb::Error::from)?;
write
.open_table(COMMUNICATION_EVENTS)
.map_err(redb::Error::from)?;
write
.open_table(MCP_TOOL_CALLS)
.map_err(redb::Error::from)?;
write
.open_table(GIT_OPERATIONS)
.map_err(redb::Error::from)?;
write
.open_table(OWNER_DIRECTIVES)
.map_err(redb::Error::from)?;
write.open_table(TOKEN_USAGE).map_err(redb::Error::from)?;
write
.open_table(WATCHDOG_EVENTS)
.map_err(redb::Error::from)?;
}
write.commit().map_err(redb::Error::from)?;
Ok(())
}
pub fn record_message(&self, record: MessageRecord<'_>) -> Result<i64> {
self.insert_json("messages", MESSAGES, |id| MessageRow {
id,
ts_utc: ts(record.ts_utc),
source: record.source.to_string(),
direction: record.direction.as_str().to_string(),
chat_id: record.chat_id.map(str::to_string),
from_agent: record.from_agent.map(str::to_string),
to_agent: record.to_agent.map(str::to_string),
body: record.body.map(str::to_string),
raw_json: record.raw_json.map(str::to_string),
})
}
pub fn record_cli(
&self,
ts_utc: DateTime<Utc>,
bin: &str,
argv_json: &str,
exit_code: Option<i64>,
duration_ms: Option<i64>,
host: &str,
) -> Result<i64> {
self.insert_json("cli_invocations", CLI_INVOCATIONS, |id| CliRow {
id,
ts_utc: ts(ts_utc),
bin: bin.to_string(),
argv_json: argv_json.to_string(),
exit_code,
duration_ms,
host: host.to_string(),
})
}
pub fn record_crash(
&self,
ts_utc: DateTime<Utc>,
kind: &str,
agent: &str,
detail_json: &str,
) -> Result<i64> {
self.insert_json("crashes", CRASHES, |id| CrashRow {
id,
ts_utc: ts(ts_utc),
kind: kind.to_string(),
agent: agent.to_string(),
detail_json: detail_json.to_string(),
})
}
pub fn record_tick(
&self,
ts_utc: DateTime<Utc>,
source: &str,
detail_json: &str,
) -> Result<i64> {
self.insert_json("ticks", TICKS, |id| TickRow {
id,
ts_utc: ts(ts_utc),
source: source.to_string(),
detail_json: detail_json.to_string(),
})
}
pub fn record_workspace(
&self,
ts_utc_created: DateTime<Utc>,
name: &str,
branch: &str,
ts_utc_deleted: Option<DateTime<Utc>>,
verdict: Option<&str>,
) -> Result<i64> {
self.insert_json("workspaces", WORKSPACES, |id| WorkspaceRow {
id,
ts_utc_created: ts(ts_utc_created),
name: name.to_string(),
branch: branch.to_string(),
ts_utc_deleted: ts_utc_deleted.map(ts),
verdict: verdict.map(str::to_string),
})
}
pub fn record_session(
&self,
ts_utc: DateTime<Utc>,
agent: &str,
session_num: i64,
event: SessionEvent,
) -> Result<i64> {
self.insert_json("sessions", SESSIONS, |id| SessionRow {
id,
ts_utc: ts(ts_utc),
agent: agent.to_string(),
session_num,
event: event.as_str().to_string(),
})
}
pub fn record_clone_dispatch(&self, record: CloneDispatchRecord<'_>) -> Result<i64> {
self.insert_json("clone_dispatches", CLONE_DISPATCHES, |id| {
CloneDispatchRow {
id,
ts_utc_start: ts(record.ts_utc_start),
ts_utc_end: record.ts_utc_end.map(ts),
agent_id: record.agent_id.to_string(),
runtime: record.runtime.map(str::to_string),
brief_path: record.brief_path.map(str::to_string),
brief: truncate_opt(record.brief, 16_384),
workspace: record.workspace.map(str::to_string),
branch: record.branch.map(str::to_string),
status: record.status.map(str::to_string),
exit_code: record.exit_code,
detail_json: record.detail_json.map(str::to_string),
}
})
}
pub fn record_harvest_event(&self, record: HarvestEventRecord<'_>) -> Result<i64> {
self.insert_json("harvest_events", HARVEST_EVENTS, |id| HarvestEventRow {
id,
ts_utc: ts(record.ts_utc),
source_branch: record.source_branch.to_string(),
target_branch: record.target_branch.to_string(),
commit_sha: record.commit_sha.map(str::to_string),
status: record.status.to_string(),
conflicts: record.conflicts.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_communication_event(&self, record: CommunicationEventRecord<'_>) -> Result<i64> {
self.insert_json("communication_events", COMMUNICATION_EVENTS, |id| {
CommunicationEventRow {
id,
ts_utc: ts(record.ts_utc),
source: record.source.to_string(),
tool: record.tool.map(str::to_string),
direction: record.direction.as_str().to_string(),
chat_id: record.chat_id.map(str::to_string),
message_id: record.message_id.map(str::to_string),
handle: record.handle.map(str::to_string),
agent: record.agent.map(str::to_string),
body: truncate_opt(record.body, 4096),
status: record.status.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
}
})
}
pub fn record_mcp_tool_call(&self, record: McpToolCallRecord<'_>) -> Result<i64> {
self.insert_json("mcp_tool_calls", MCP_TOOL_CALLS, |id| McpToolCallRow {
id,
ts_utc_start: ts(record.ts_utc_start),
ts_utc_end: record.ts_utc_end.map(ts),
source: record.source.to_string(),
tool: record.tool.to_string(),
agent: record.agent.map(str::to_string),
duration_ms: record.duration_ms,
success: record.success,
error: truncate_opt(record.error, 2048),
timeout_race: record.timeout_race,
request_json: truncate_opt(record.request_json, 8192),
response_json: truncate_opt(record.response_json, 8192),
})
}
pub fn record_git_operation(&self, record: GitOperationRecord<'_>) -> Result<i64> {
self.insert_json("git_operations", GIT_OPERATIONS, |id| GitOperationRow {
id,
ts_utc: ts(record.ts_utc),
operation: record.operation.to_string(),
repo: record.repo.to_string(),
branch: record.branch.map(str::to_string),
remote: record.remote.map(str::to_string),
from_sha: record.from_sha.map(str::to_string),
to_sha: record.to_sha.map(str::to_string),
status: record.status.to_string(),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_owner_directive(&self, record: OwnerDirectiveRecord<'_>) -> Result<i64> {
self.insert_json("owner_directives", OWNER_DIRECTIVES, |id| {
OwnerDirectiveRow {
id,
ts_utc: ts(record.ts_utc),
source: record.source.to_string(),
chat_id: record.chat_id.map(str::to_string),
raw_text: truncate(record.raw_text, 16_384),
resolved_action: record.resolved_action.map(str::to_string),
agent: record.agent.map(str::to_string),
status: record.status.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
}
})
}
pub fn record_token_usage(&self, record: TokenUsageRecord<'_>) -> Result<i64> {
self.insert_json("token_usage", TOKEN_USAGE, |id| TokenUsageRow {
id,
ts_utc: ts(record.ts_utc),
session_id: record.session_id.map(str::to_string),
agent: record.agent.map(str::to_string),
model: record.model.map(str::to_string),
input_tokens: record.input_tokens,
output_tokens: record.output_tokens,
cached_input_tokens: record.cached_input_tokens,
cost_usd_micros: record.cost_usd_micros,
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn record_watchdog_event(&self, record: WatchdogEventRecord<'_>) -> Result<i64> {
self.insert_json("watchdog_events", WATCHDOG_EVENTS, |id| WatchdogEventRow {
id,
ts_utc: ts(record.ts_utc),
event: record.event.to_string(),
agent: record.agent.map(str::to_string),
severity: record.severity.map(str::to_string),
status: record.status.map(str::to_string),
detail_json: record.detail_json.map(str::to_string),
})
}
pub fn query(&self, sql: &str) -> Result<String> {
let batches = self.snapshot()?;
let runtime = tokio::runtime::Runtime::new()?;
runtime.block_on(async move {
let ctx = SessionContext::new();
register(&ctx, "messages", messages_batch(batches.messages)?)?;
register(&ctx, "cli_invocations", cli_batch(batches.cli_invocations)?)?;
register(&ctx, "crashes", crashes_batch(batches.crashes)?)?;
register(&ctx, "ticks", ticks_batch(batches.ticks)?)?;
register(&ctx, "workspaces", workspaces_batch(batches.workspaces)?)?;
register(&ctx, "sessions", sessions_batch(batches.sessions)?)?;
register(
&ctx,
"clone_dispatches",
clone_dispatches_batch(batches.clone_dispatches)?,
)?;
register(
&ctx,
"harvest_events",
harvest_events_batch(batches.harvest_events)?,
)?;
register(
&ctx,
"communication_events",
communication_events_batch(batches.communication_events)?,
)?;
register(
&ctx,
"mcp_tool_calls",
mcp_tool_calls_batch(batches.mcp_tool_calls)?,
)?;
register(
&ctx,
"git_operations",
git_operations_batch(batches.git_operations)?,
)?;
register(
&ctx,
"owner_directives",
owner_directives_batch(batches.owner_directives)?,
)?;
register(&ctx, "token_usage", token_usage_batch(batches.token_usage)?)?;
register(
&ctx,
"watchdog_events",
watchdog_events_batch(batches.watchdog_events)?,
)?;
let df = ctx.sql(sql).await?;
let rows = df.collect().await?;
Ok(pretty_format_batches(&rows)?.to_string())
})
}
pub fn schema_version(&self) -> Result<i64> {
let read = self.db.begin_read().map_err(redb::Error::from)?;
let meta = read.open_table(META).map_err(redb::Error::from)?;
Ok(meta
.get("schema_version")
.map_err(redb::Error::from)?
.map(|value| value.value())
.unwrap_or(0))
}
fn insert_json<T, F>(
&self,
name: &'static str,
table_def: TableDefinition<'static, i64, &[u8]>,
row: F,
) -> Result<i64>
where
T: Serialize,
F: FnOnce(i64) -> T,
{
let write = self.db.begin_write().map_err(redb::Error::from)?;
let id = {
let mut ids = write.open_table(IDS).map_err(redb::Error::from)?;
let id = ids
.get(name)
.map_err(redb::Error::from)?
.map(|value| value.value())
.unwrap_or(0)
+ 1;
ids.insert(name, id).map_err(redb::Error::from)?;
id
};
{
let bytes = serde_json::to_vec(&row(id))?;
let mut table = write.open_table(table_def).map_err(redb::Error::from)?;
table
.insert(id, bytes.as_slice())
.map_err(redb::Error::from)?;
}
write.commit().map_err(redb::Error::from)?;
Ok(id)
}
fn snapshot(&self) -> Result<Snapshot> {
let read = self.db.begin_read().map_err(redb::Error::from)?;
Ok(Snapshot {
messages: rows(&read, MESSAGES)?,
cli_invocations: rows(&read, CLI_INVOCATIONS)?,
crashes: rows(&read, CRASHES)?,
ticks: rows(&read, TICKS)?,
workspaces: rows(&read, WORKSPACES)?,
sessions: rows(&read, SESSIONS)?,
clone_dispatches: rows(&read, CLONE_DISPATCHES)?,
harvest_events: rows(&read, HARVEST_EVENTS)?,
communication_events: rows(&read, COMMUNICATION_EVENTS)?,
mcp_tool_calls: rows(&read, MCP_TOOL_CALLS)?,
git_operations: rows(&read, GIT_OPERATIONS)?,
owner_directives: rows(&read, OWNER_DIRECTIVES)?,
token_usage: rows(&read, TOKEN_USAGE)?,
watchdog_events: rows(&read, WATCHDOG_EVENTS)?,
})
}
}
struct Snapshot {
messages: Vec<MessageRow>,
cli_invocations: Vec<CliRow>,
crashes: Vec<CrashRow>,
ticks: Vec<TickRow>,
workspaces: Vec<WorkspaceRow>,
sessions: Vec<SessionRow>,
clone_dispatches: Vec<CloneDispatchRow>,
harvest_events: Vec<HarvestEventRow>,
communication_events: Vec<CommunicationEventRow>,
mcp_tool_calls: Vec<McpToolCallRow>,
git_operations: Vec<GitOperationRow>,
owner_directives: Vec<OwnerDirectiveRow>,
token_usage: Vec<TokenUsageRow>,
watchdog_events: Vec<WatchdogEventRow>,
}
fn rows<T>(read: &redb::ReadTransaction, table_def: TableDefinition<i64, &[u8]>) -> Result<Vec<T>>
where
T: DeserializeOwned,
{
let table = read.open_table(table_def).map_err(redb::Error::from)?;
table
.iter()
.map_err(redb::Error::from)?
.map(|entry| {
let (_, value) = entry.map_err(redb::Error::from)?;
Ok(serde_json::from_slice(value.value())?)
})
.collect()
}
fn register(ctx: &SessionContext, name: &str, batch: RecordBatch) -> Result<()> {
let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
ctx.register_table(name, Arc::new(table))?;
Ok(())
}
fn messages_batch(rows: Vec<MessageRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("direction", DataType::Utf8, false),
("chat_id", DataType::Utf8, true),
("from_agent", DataType::Utf8, true),
("to_agent", DataType::Utf8, true),
("body", DataType::Utf8, true),
("raw_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
string(rows.iter().map(|r| Some(r.direction.as_str()))),
opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
opt_string(rows.iter().map(|r| r.from_agent.as_deref())),
opt_string(rows.iter().map(|r| r.to_agent.as_deref())),
opt_string(rows.iter().map(|r| r.body.as_deref())),
opt_string(rows.iter().map(|r| r.raw_json.as_deref())),
],
)?)
}
fn cli_batch(rows: Vec<CliRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("bin", DataType::Utf8, false),
("argv_json", DataType::Utf8, false),
("exit_code", DataType::Int64, true),
("duration_ms", DataType::Int64, true),
("host", DataType::Utf8, false),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.bin.as_str()))),
string(rows.iter().map(|r| Some(r.argv_json.as_str()))),
int64_opt(rows.iter().map(|r| r.exit_code)),
int64_opt(rows.iter().map(|r| r.duration_ms)),
string(rows.iter().map(|r| Some(r.host.as_str()))),
],
)?)
}
fn crashes_batch(rows: Vec<CrashRow>) -> Result<RecordBatch> {
simple_event_batch(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("kind", DataType::Utf8, false),
("agent", DataType::Utf8, false),
("detail_json", DataType::Utf8, false),
]),
rows.iter()
.map(|r| (r.id, &r.ts_utc, &r.kind, &r.agent, &r.detail_json)),
)
}
fn ticks_batch(rows: Vec<TickRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("detail_json", DataType::Utf8, false),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
string(rows.iter().map(|r| Some(r.detail_json.as_str()))),
],
)?)
}
fn workspaces_batch(rows: Vec<WorkspaceRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc_created", DataType::Utf8, false),
("name", DataType::Utf8, false),
("branch", DataType::Utf8, false),
("ts_utc_deleted", DataType::Utf8, true),
("verdict", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc_created.as_str()))),
string(rows.iter().map(|r| Some(r.name.as_str()))),
string(rows.iter().map(|r| Some(r.branch.as_str()))),
opt_string(rows.iter().map(|r| r.ts_utc_deleted.as_deref())),
opt_string(rows.iter().map(|r| r.verdict.as_deref())),
],
)?)
}
fn sessions_batch(rows: Vec<SessionRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("agent", DataType::Utf8, false),
("session_num", DataType::Int64, false),
("event", DataType::Utf8, false),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.agent.as_str()))),
int64(rows.iter().map(|r| r.session_num)),
string(rows.iter().map(|r| Some(r.event.as_str()))),
],
)?)
}
fn clone_dispatches_batch(rows: Vec<CloneDispatchRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc_start", DataType::Utf8, false),
("ts_utc_end", DataType::Utf8, true),
("agent_id", DataType::Utf8, false),
("runtime", DataType::Utf8, true),
("brief_path", DataType::Utf8, true),
("brief", DataType::Utf8, true),
("workspace", DataType::Utf8, true),
("branch", DataType::Utf8, true),
("status", DataType::Utf8, true),
("exit_code", DataType::Int64, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
string(rows.iter().map(|r| Some(r.agent_id.as_str()))),
opt_string(rows.iter().map(|r| r.runtime.as_deref())),
opt_string(rows.iter().map(|r| r.brief_path.as_deref())),
opt_string(rows.iter().map(|r| r.brief.as_deref())),
opt_string(rows.iter().map(|r| r.workspace.as_deref())),
opt_string(rows.iter().map(|r| r.branch.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
int64_opt(rows.iter().map(|r| r.exit_code)),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn harvest_events_batch(rows: Vec<HarvestEventRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source_branch", DataType::Utf8, false),
("target_branch", DataType::Utf8, false),
("commit_sha", DataType::Utf8, true),
("status", DataType::Utf8, false),
("conflicts", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source_branch.as_str()))),
string(rows.iter().map(|r| Some(r.target_branch.as_str()))),
opt_string(rows.iter().map(|r| r.commit_sha.as_deref())),
string(rows.iter().map(|r| Some(r.status.as_str()))),
opt_string(rows.iter().map(|r| r.conflicts.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn communication_events_batch(rows: Vec<CommunicationEventRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("tool", DataType::Utf8, true),
("direction", DataType::Utf8, false),
("chat_id", DataType::Utf8, true),
("message_id", DataType::Utf8, true),
("handle", DataType::Utf8, true),
("agent", DataType::Utf8, true),
("body", DataType::Utf8, true),
("status", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
opt_string(rows.iter().map(|r| r.tool.as_deref())),
string(rows.iter().map(|r| Some(r.direction.as_str()))),
opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
opt_string(rows.iter().map(|r| r.message_id.as_deref())),
opt_string(rows.iter().map(|r| r.handle.as_deref())),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.body.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn mcp_tool_calls_batch(rows: Vec<McpToolCallRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc_start", DataType::Utf8, false),
("ts_utc_end", DataType::Utf8, true),
("source", DataType::Utf8, false),
("tool", DataType::Utf8, false),
("agent", DataType::Utf8, true),
("duration_ms", DataType::Int64, true),
("success", DataType::Boolean, false),
("error", DataType::Utf8, true),
("timeout_race", DataType::Boolean, false),
("request_json", DataType::Utf8, true),
("response_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc_start.as_str()))),
opt_string(rows.iter().map(|r| r.ts_utc_end.as_deref())),
string(rows.iter().map(|r| Some(r.source.as_str()))),
string(rows.iter().map(|r| Some(r.tool.as_str()))),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
int64_opt(rows.iter().map(|r| r.duration_ms)),
bools(rows.iter().map(|r| r.success)),
opt_string(rows.iter().map(|r| r.error.as_deref())),
bools(rows.iter().map(|r| r.timeout_race)),
opt_string(rows.iter().map(|r| r.request_json.as_deref())),
opt_string(rows.iter().map(|r| r.response_json.as_deref())),
],
)?)
}
fn git_operations_batch(rows: Vec<GitOperationRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("operation", DataType::Utf8, false),
("repo", DataType::Utf8, false),
("branch", DataType::Utf8, true),
("remote", DataType::Utf8, true),
("from_sha", DataType::Utf8, true),
("to_sha", DataType::Utf8, true),
("status", DataType::Utf8, false),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.operation.as_str()))),
string(rows.iter().map(|r| Some(r.repo.as_str()))),
opt_string(rows.iter().map(|r| r.branch.as_deref())),
opt_string(rows.iter().map(|r| r.remote.as_deref())),
opt_string(rows.iter().map(|r| r.from_sha.as_deref())),
opt_string(rows.iter().map(|r| r.to_sha.as_deref())),
string(rows.iter().map(|r| Some(r.status.as_str()))),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn owner_directives_batch(rows: Vec<OwnerDirectiveRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("source", DataType::Utf8, false),
("chat_id", DataType::Utf8, true),
("raw_text", DataType::Utf8, false),
("resolved_action", DataType::Utf8, true),
("agent", DataType::Utf8, true),
("status", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.source.as_str()))),
opt_string(rows.iter().map(|r| r.chat_id.as_deref())),
string(rows.iter().map(|r| Some(r.raw_text.as_str()))),
opt_string(rows.iter().map(|r| r.resolved_action.as_deref())),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn token_usage_batch(rows: Vec<TokenUsageRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("session_id", DataType::Utf8, true),
("agent", DataType::Utf8, true),
("model", DataType::Utf8, true),
("input_tokens", DataType::Int64, true),
("output_tokens", DataType::Int64, true),
("cached_input_tokens", DataType::Int64, true),
("cost_usd_micros", DataType::Int64, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
opt_string(rows.iter().map(|r| r.session_id.as_deref())),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.model.as_deref())),
int64_opt(rows.iter().map(|r| r.input_tokens)),
int64_opt(rows.iter().map(|r| r.output_tokens)),
int64_opt(rows.iter().map(|r| r.cached_input_tokens)),
int64_opt(rows.iter().map(|r| r.cost_usd_micros)),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn watchdog_events_batch(rows: Vec<WatchdogEventRow>) -> Result<RecordBatch> {
Ok(RecordBatch::try_new(
schema(&[
("id", DataType::Int64, false),
("ts_utc", DataType::Utf8, false),
("event", DataType::Utf8, false),
("agent", DataType::Utf8, true),
("severity", DataType::Utf8, true),
("status", DataType::Utf8, true),
("detail_json", DataType::Utf8, true),
]),
vec![
ids(&rows, |r| r.id),
string(rows.iter().map(|r| Some(r.ts_utc.as_str()))),
string(rows.iter().map(|r| Some(r.event.as_str()))),
opt_string(rows.iter().map(|r| r.agent.as_deref())),
opt_string(rows.iter().map(|r| r.severity.as_deref())),
opt_string(rows.iter().map(|r| r.status.as_deref())),
opt_string(rows.iter().map(|r| r.detail_json.as_deref())),
],
)?)
}
fn simple_event_batch<'a>(
schema: SchemaRef,
rows: impl Iterator<Item = (i64, &'a String, &'a String, &'a String, &'a String)>,
) -> Result<RecordBatch> {
let rows = rows.collect::<Vec<_>>();
Ok(RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(
rows.iter().map(|r| r.0).collect::<Vec<_>>(),
)),
string(rows.iter().map(|r| Some(r.1.as_str()))),
string(rows.iter().map(|r| Some(r.2.as_str()))),
string(rows.iter().map(|r| Some(r.3.as_str()))),
string(rows.iter().map(|r| Some(r.4.as_str()))),
],
)?)
}
fn schema(fields: &[(&str, DataType, bool)]) -> SchemaRef {
Arc::new(Schema::new(
fields
.iter()
.map(|(name, ty, nullable)| Field::new(*name, ty.clone(), *nullable))
.collect::<Vec<_>>(),
))
}
fn ids<T>(rows: &[T], id: impl Fn(&T) -> i64) -> Arc<Int64Array> {
int64(rows.iter().map(id))
}
fn int64(values: impl Iterator<Item = i64>) -> Arc<Int64Array> {
Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
}
fn int64_opt(values: impl Iterator<Item = Option<i64>>) -> Arc<Int64Array> {
Arc::new(Int64Array::from(values.collect::<Vec<_>>()))
}
fn bools(values: impl Iterator<Item = bool>) -> Arc<BooleanArray> {
Arc::new(BooleanArray::from(values.collect::<Vec<_>>()))
}
fn string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
Arc::new(StringArray::from(values.collect::<Vec<_>>()))
}
fn opt_string<'a>(values: impl Iterator<Item = Option<&'a str>>) -> Arc<StringArray> {
string(values)
}
fn ts(ts_utc: DateTime<Utc>) -> String {
ts_utc.to_rfc3339_opts(SecondsFormat::Millis, true)
}
fn truncate(value: &str, max_chars: usize) -> String {
value.chars().take(max_chars).collect()
}
fn truncate_opt(value: Option<&str>, max_chars: usize) -> Option<String> {
value.map(|s| truncate(s, max_chars))
}
fn is_legacy_sqlite(path: &Path) -> Result<bool> {
if !path.exists() || path.metadata()?.len() < 16 {
return Ok(false);
}
let mut header = [0_u8; 16];
fs::File::open(path)?.read_exact(&mut header)?;
Ok(header.starts_with(b"SQLite format 3\0"))
}
fn archive_legacy_sqlite(path: &Path) -> Result<()> {
if !is_legacy_sqlite(path)? {
return Ok(());
}
let ts = Utc::now().format("%Y%m%d%H%M%S");
let backup = path.with_extension(format!("sqlite-v1-{ts}.bak"));
fs::rename(path, backup)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn db() -> Result<Db> {
let db = Db::open_in_memory()?;
db.migrate()?;
Ok(db)
}
fn fixed_ts() -> DateTime<Utc> {
chrono::DateTime::parse_from_rfc3339("2026-04-15T23:00:00Z")
.unwrap()
.with_timezone(&Utc)
}
#[test]
fn message_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_message(MessageRecord {
ts_utc: fixed_ts(),
source: "imessage",
direction: Direction::Inbound,
chat_id: Some("chat-1"),
from_agent: Some("agent9"),
to_agent: None,
body: Some("hello"),
raw_json: Some("{\"chat_id\":\"chat-1\"}"),
})?;
let out = db.query("SELECT id, ts_utc, source, direction, chat_id, body FROM messages")?;
assert_eq!(id, 1);
assert!(out.contains("2026-04-15T23:00:00.000Z"));
assert!(out.contains("imessage"));
assert!(out.contains("inbound"));
assert!(out.contains("chat-1"));
assert!(out.contains("hello"));
Ok(())
}
#[test]
fn cli_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_cli(
fixed_ts(),
"netsky",
"[\"db\", \"migrate\"]",
Some(0),
Some(12),
"host-a",
)?;
let out = db.query("SELECT bin, exit_code, duration_ms, host FROM cli_invocations")?;
assert_eq!(id, 1);
assert!(out.contains("netsky"));
assert!(out.contains("host-a"));
assert!(out.contains("12"));
Ok(())
}
#[test]
fn crash_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_crash(fixed_ts(), "panic", "agent8", "{\"reason\":\"wedged\"}")?;
let out = db.query("SELECT kind, agent, detail_json FROM crashes")?;
assert_eq!(id, 1);
assert!(out.contains("panic"));
assert!(out.contains("agent8"));
assert!(out.contains("wedged"));
Ok(())
}
#[test]
fn tick_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_tick(fixed_ts(), "ticker", "{\"beat\":1}")?;
let out = db.query("SELECT source, detail_json FROM ticks")?;
assert_eq!(id, 1);
assert!(out.contains("ticker"));
assert!(out.contains("beat"));
Ok(())
}
#[test]
fn workspace_round_trip() -> Result<()> {
let db = db()?;
let created = fixed_ts();
let deleted = created + chrono::Duration::minutes(5);
let id = db.record_workspace(
created,
"session8",
"feat/netsky-db-v0",
Some(deleted),
Some("kept"),
)?;
let out = db.query("SELECT name, branch, verdict FROM workspaces")?;
assert_eq!(id, 1);
assert!(out.contains("session8"));
assert!(out.contains("feat/netsky-db-v0"));
assert!(out.contains("kept"));
Ok(())
}
#[test]
fn session_round_trip() -> Result<()> {
let db = db()?;
let id = db.record_session(fixed_ts(), "agent8", 8, SessionEvent::Note)?;
let out = db.query("SELECT agent, session_num, event FROM sessions")?;
assert_eq!(id, 1);
assert!(out.contains("agent8"));
assert!(out.contains("note"));
Ok(())
}
#[test]
fn legacy_sqlite_file_is_archived_before_redb_create() -> Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("meta.db");
fs::write(&path, b"SQLite format 3\0legacy")?;
let db = Db::open_path(&path)?;
db.migrate()?;
assert!(!is_legacy_sqlite(&path)?);
let archived = fs::read_dir(dir.path())?
.filter_map(std::result::Result::ok)
.any(|entry| {
entry
.file_name()
.to_string_lossy()
.starts_with("meta.sqlite-v1-")
});
assert!(archived);
Ok(())
}
#[test]
fn v2_tables_round_trip() -> Result<()> {
let db = db()?;
db.record_clone_dispatch(CloneDispatchRecord {
ts_utc_start: fixed_ts(),
ts_utc_end: Some(fixed_ts()),
agent_id: "agent3",
runtime: Some("codex"),
brief_path: Some("briefs/foo.md"),
brief: Some("do the thing"),
workspace: Some("workspaces/foo/repo"),
branch: Some("feat/foo"),
status: Some("done"),
exit_code: Some(0),
detail_json: Some("{\"ok\":true}"),
})?;
db.record_harvest_event(HarvestEventRecord {
ts_utc: fixed_ts(),
source_branch: "feat/foo",
target_branch: "main",
commit_sha: Some("abc123"),
status: "clean",
conflicts: None,
detail_json: None,
})?;
db.record_communication_event(CommunicationEventRecord {
ts_utc: fixed_ts(),
source: "imessage",
tool: Some("reply"),
direction: Direction::Outbound,
chat_id: Some("chat-1"),
message_id: Some("msg-1"),
handle: Some("+15551234567"),
agent: Some("agent0"),
body: Some("sent"),
status: Some("ok"),
detail_json: None,
})?;
db.record_mcp_tool_call(McpToolCallRecord {
ts_utc_start: fixed_ts(),
ts_utc_end: Some(fixed_ts()),
source: "drive",
tool: "list_files",
agent: Some("agent0"),
duration_ms: Some(22),
success: true,
error: None,
timeout_race: false,
request_json: Some("{}"),
response_json: Some("[]"),
})?;
db.record_git_operation(GitOperationRecord {
ts_utc: fixed_ts(),
operation: "push",
repo: "netsky",
branch: Some("feat/foo"),
remote: Some("origin"),
from_sha: Some("abc"),
to_sha: Some("def"),
status: "ok",
detail_json: None,
})?;
db.record_owner_directive(OwnerDirectiveRecord {
ts_utc: fixed_ts(),
source: "imessage",
chat_id: Some("chat-1"),
raw_text: "ship it",
resolved_action: Some("dispatch"),
agent: Some("agent0"),
status: Some("accepted"),
detail_json: None,
})?;
db.record_token_usage(TokenUsageRecord {
ts_utc: fixed_ts(),
session_id: Some("session-1"),
agent: Some("agent4"),
model: Some("gpt-5.4"),
input_tokens: Some(100),
output_tokens: Some(25),
cached_input_tokens: Some(10),
cost_usd_micros: Some(1234),
detail_json: None,
})?;
db.record_watchdog_event(WatchdogEventRecord {
ts_utc: fixed_ts(),
event: "respawn",
agent: Some("agent0"),
severity: Some("warn"),
status: Some("ok"),
detail_json: Some("{}"),
})?;
for table in [
"clone_dispatches",
"harvest_events",
"communication_events",
"mcp_tool_calls",
"git_operations",
"owner_directives",
"token_usage",
"watchdog_events",
] {
let out = db.query(&format!("SELECT COUNT(*) AS count FROM {table}"))?;
assert!(out.contains('1'), "{table}: {out}");
}
Ok(())
}
#[test]
fn query_registers_all_tables() -> Result<()> {
let db = db()?;
let out = db.query(
"SELECT COUNT(*) FROM messages \
UNION ALL SELECT COUNT(*) FROM cli_invocations \
UNION ALL SELECT COUNT(*) FROM crashes \
UNION ALL SELECT COUNT(*) FROM ticks \
UNION ALL SELECT COUNT(*) FROM workspaces \
UNION ALL SELECT COUNT(*) FROM sessions \
UNION ALL SELECT COUNT(*) FROM clone_dispatches \
UNION ALL SELECT COUNT(*) FROM harvest_events \
UNION ALL SELECT COUNT(*) FROM communication_events \
UNION ALL SELECT COUNT(*) FROM mcp_tool_calls \
UNION ALL SELECT COUNT(*) FROM git_operations \
UNION ALL SELECT COUNT(*) FROM owner_directives \
UNION ALL SELECT COUNT(*) FROM token_usage \
UNION ALL SELECT COUNT(*) FROM watchdog_events",
)?;
assert!(out.contains('0'));
Ok(())
}
#[test]
fn migrate_sets_schema_version_2() -> Result<()> {
let db = db()?;
assert_eq!(db.schema_version()?, 2);
Ok(())
}
}