use netsky_core::retry::{LockRetryError, LockRetryPolicy};
use netsky_db::Db;
use netsky_db::arrow_array::Array;
const META_DB_HINT: &str = "another netsky process holds meta.db. \
Run `netsky doctor` to see what's alive, or wait for it to release. \
Read paths (`netsky query`, `events`, `analytics`) use a snapshot \
and don't conflict — only writers (ingest, task, channel) lock.";
const CORRUPT_DB_HINT: &str = "meta.db looks corrupt or non-turso. \
Check `~/.netsky/backups/` for the most recent rolling snapshot, \
or rotate to a fresh db with `mv ~/.netsky/meta.db{,.broken}` \
(loses observability history; netsky will recreate on next write).";
pub fn wrap_open_error<E: std::fmt::Display>(err: E) -> netsky_core::Error {
let msg = err.to_string();
if msg.starts_with("netsky-db locked after ") {
netsky_core::Error::Message(msg)
} else if netsky_core::retry::is_lock_error_message(&msg) {
netsky_core::Error::Message(format!("open netsky-db: {msg}\n hint: {META_DB_HINT}"))
} else if msg.contains("invalid page size") || msg.contains("not a database") {
netsky_core::Error::Message(format!("open netsky-db: {msg}\n hint: {CORRUPT_DB_HINT}"))
} else {
netsky_core::Error::Message(format!("open netsky-db: {msg}"))
}
}
pub fn wrap_query_error<E: std::fmt::Display>(db: &Db, err: E) -> netsky_core::Error {
let msg = err.to_string();
if let Some(missing) = parse_missing_table(&msg) {
let available = list_tables(db);
return netsky_core::Error::Message(format!(
"query netsky-db: table '{missing}' not found\n available: {available}"
));
}
netsky_core::Error::Message(format!("query netsky-db: {msg}"))
}
pub fn with_lock_retry<T, E, F>(op: F) -> std::result::Result<T, LockRetryError<E>>
where
E: std::fmt::Display,
F: FnMut() -> std::result::Result<T, E>,
{
netsky_core::retry::on_lock(op, LockRetryPolicy::db_default())
}
pub fn wrap_retry_error<E: std::fmt::Display>(err: LockRetryError<E>) -> netsky_core::Error {
match err {
LockRetryError::Operation(err) => netsky_core::Error::Message(err.to_string()),
LockRetryError::Exhausted {
retries,
total_wait,
..
} => lock_exhausted_error(retries, total_wait),
}
}
pub fn wrap_open_retry_error<E: std::fmt::Display>(err: LockRetryError<E>) -> netsky_core::Error {
match err {
LockRetryError::Operation(err) => wrap_open_error(err),
LockRetryError::Exhausted {
retries,
total_wait,
..
} => lock_exhausted_error(retries, total_wait),
}
}
pub fn wrap_query_retry_error<E: std::fmt::Display>(
db: &Db,
err: LockRetryError<E>,
) -> netsky_core::Error {
match err {
LockRetryError::Operation(err) => wrap_query_error(db, err),
LockRetryError::Exhausted {
retries,
total_wait,
..
} => lock_exhausted_error(retries, total_wait),
}
}
fn lock_exhausted_error(retries: u32, total_wait: std::time::Duration) -> netsky_core::Error {
netsky_core::Error::Message(format!(
"netsky-db locked after {retries} retries (total wait {}ms).\n hint: a clone is writing heavily; retry or check `lsof` for holders.",
total_wait.as_millis()
))
}
fn parse_missing_table(msg: &str) -> Option<&str> {
let lower = msg.to_lowercase();
let needle = if let Some(i) = lower.find("table '") {
i + "table '".len()
} else {
return None;
};
let rest = &msg[needle..];
rest.find('\'').map(|end| &rest[..end])
}
fn list_tables(db: &Db) -> String {
let probe = db.query_batches(
"SELECT table_name FROM information_schema.tables \
WHERE table_schema = 'public' ORDER BY table_name",
);
let mut names: Vec<String> = Vec::new();
if let Ok(batches) = probe {
for batch in batches {
if let Some(col) = batch
.column(0)
.as_any()
.downcast_ref::<netsky_db::arrow_array::StringArray>()
{
for i in 0..col.len() {
if !col.is_null(i) {
names.push(col.value(i).to_string());
}
}
}
}
}
if names.is_empty() {
return "messages, sessions, crashes, ticks, workspaces, clone_dispatches, \
clone_lifetimes, token_usage, iroh_events, communication_events, \
mcp_calls (canonical list — live probe unavailable)"
.to_string();
}
names.join(", ")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn wrap_open_error_routes_lock_to_hint() {
let err = wrap_open_error("Database already open. Cannot acquire lock.");
let s = err.to_string();
assert!(s.contains("hint:"));
assert!(s.contains("netsky doctor"));
}
#[test]
fn wrap_open_error_routes_corrupt_to_recovery() {
let err = wrap_open_error("invalid page size in database header: 33280");
let s = err.to_string();
assert!(s.contains("hint:"));
assert!(s.contains("backups"));
}
#[test]
fn wrap_open_error_passes_unknown_through() {
let err = wrap_open_error("disk full");
let s = err.to_string();
assert!(s.starts_with("open netsky-db: disk full"));
assert!(!s.contains("hint:"));
}
#[test]
fn wrap_open_error_handles_failed_locking_file_shape() {
let err = wrap_open_error(
"Locking error: Failed locking file. File is locked by another process",
);
let s = err.to_string();
assert!(s.contains("open netsky-db:"));
assert!(s.contains("netsky doctor"));
}
#[test]
fn wrap_open_error_passes_lock_exhaustion_through() {
let err = wrap_open_error(
"netsky-db locked after 5 retries (total wait 1234ms).\n hint: a clone is writing heavily; retry or check `lsof` for holders.",
);
assert!(
err.to_string()
.starts_with("netsky-db locked after 5 retries")
);
}
#[test]
fn parse_missing_table_handles_both_shapes() {
assert_eq!(parse_missing_table("table 'foo' not found"), Some("foo"));
assert_eq!(
parse_missing_table("Table 'source_errors' does not exist"),
Some("source_errors")
);
assert_eq!(parse_missing_table("syntax error"), None);
}
}