#[cfg(windows)]
use std::os::windows::process::CommandExt;
use std::path::Path;
use anyhow::{Context, Result};
use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
#[allow(unused_imports)]
use serde::{Serialize, de::DeserializeOwned};
use tracing::debug;
use rsclaw_platform::MemoryTier;
pub const LEGACY_REDB_UPGRADE_HELPER_ENV: &str = "RSCLAW_INTERNAL_REDB_LEGACY_UPGRADE";
const SESSION_META: TableDefinition<&str, &str> = TableDefinition::new("session_meta");
const MESSAGES: TableDefinition<&str, &str> = TableDefinition::new("messages");
const PAIRING: TableDefinition<&str, &str> = TableDefinition::new("pairing");
const KV: TableDefinition<&str, &str> = TableDefinition::new("kv");
const SESSION_ALIASES: TableDefinition<&str, &str> = TableDefinition::new("session_aliases");
const TASK_QUEUE: TableDefinition<&str, &str> = TableDefinition::new("task_queue");
const EXTERNAL_JOBS: TableDefinition<&str, &str> = TableDefinition::new("external_jobs");
const IDEM_KEYS: TableDefinition<&str, i64> = TableDefinition::new("idem_keys");
const COMPUTER_PERMISSIONS: TableDefinition<&str, &str> =
TableDefinition::new("computer_permissions");
const CRON_JOBS: TableDefinition<&str, &str> = TableDefinition::new("cron_jobs");
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct ArchiveStat {
pub total_messages: u64,
pub oldest_seq: Option<u64>,
pub newest_seq: Option<u64>,
pub generations: Vec<u32>,
}
pub struct RedbStore {
db: Database,
}
impl std::fmt::Debug for RedbStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedbStore").finish_non_exhaustive()
}
}
pub fn upgrade_legacy_if_needed(path: &Path) -> Result<()> {
if !path.exists() {
return Ok(());
}
match Database::open(path) {
Ok(_db) => Ok(()),
Err(redb::DatabaseError::UpgradeRequired(legacy_version)) => {
backup_and_upgrade(path, legacy_version)
}
Err(_) => Ok(()),
}
}
const LOCK_RETRY_WINDOW: std::time::Duration = std::time::Duration::from_secs(10);
const LOCK_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(150);
pub fn create_with_lock_retry(
builder: &redb::Builder,
path: &Path,
) -> std::result::Result<Database, redb::DatabaseError> {
let deadline = std::time::Instant::now() + LOCK_RETRY_WINDOW;
let mut warned = false;
loop {
match builder.create(path) {
Err(redb::DatabaseError::DatabaseAlreadyOpen)
if std::time::Instant::now() < deadline =>
{
if !warned {
tracing::warn!(
path = %path.display(),
wait_secs = LOCK_RETRY_WINDOW.as_secs(),
"redb locked by another process (likely a restarting gateway handing off); retrying"
);
warned = true;
}
std::thread::sleep(LOCK_RETRY_INTERVAL);
}
other => return other,
}
}
}
#[cfg(test)]
fn panic_payload_to_string(payload: &(dyn std::any::Any + Send)) -> String {
if let Some(s) = payload.downcast_ref::<&str>() {
(*s).to_owned()
} else if let Some(s) = payload.downcast_ref::<String>() {
s.clone()
} else {
"non-string panic payload".to_owned()
}
}
#[cfg(test)]
fn run_legacy_redb_upgrade_safely<F>(upgrade: F) -> Result<()>
where
F: FnOnce() -> Result<()> + std::panic::UnwindSafe,
{
match std::panic::catch_unwind(upgrade) {
Ok(result) => result,
Err(payload) => anyhow::bail!(
"legacy redb upgrade panicked: {}",
panic_payload_to_string(payload.as_ref())
),
}
}
pub fn run_legacy_redb_upgrade_helper(path: &Path) -> Result<()> {
let mut legacy = redb_legacy::Database::open(path)
.with_context(|| format!("legacy open of {} for upgrade", path.display()))?;
let did_upgrade = legacy
.upgrade()
.with_context(|| format!("v2→v3 upgrade of {}", path.display()))?;
tracing::info!(path = %path.display(), did_upgrade, "redb file format upgrade complete");
Ok(())
}
fn backup_and_upgrade(path: &Path, legacy_version: u8) -> Result<()> {
let backup = path.with_extension("redb.v2.bak");
if backup.exists() {
tracing::info!(
backup = %backup.display(),
"pre-upgrade backup already exists; not overwriting"
);
} else {
std::fs::copy(path, &backup)
.with_context(|| format!("write backup to {}", backup.display()))?;
tracing::info!(
backup = %backup.display(),
original = %path.display(),
"wrote pre-upgrade backup",
);
}
tracing::info!(
path = %path.display(),
from_version = legacy_version,
"upgrading redb file format to v3 (one-time)",
);
match run_legacy_redb_upgrade_child(path) {
Ok(()) => {}
Err(e) => tracing::warn!(path = %path.display(), error = %e,
"legacy redb upgrade failed; leaving file for caller's open to handle"),
}
Ok(())
}
fn run_legacy_redb_upgrade_child(path: &Path) -> Result<()> {
if cfg!(test) {
return run_legacy_redb_upgrade_helper(path);
}
let exe = std::env::current_exe().context("resolve current executable for redb upgrade")?;
let mut cmd = std::process::Command::new(exe);
cmd.env(LEGACY_REDB_UPGRADE_HELPER_ENV, path)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
#[cfg(windows)]
{
cmd.creation_flags(0x08000000);
}
let status = cmd.status().context("spawn redb upgrade helper")?;
if status.success() {
Ok(())
} else {
anyhow::bail!("redb upgrade helper exited with {status}");
}
}
impl RedbStore {
pub fn open(path: &Path, tier: MemoryTier) -> Result<Self> {
let cache_bytes: usize = match tier {
MemoryTier::Low => 16 * 1024 * 1024, MemoryTier::Standard => 32 * 1024 * 1024, MemoryTier::High => 64 * 1024 * 1024, };
upgrade_legacy_if_needed(path)?;
let mut builder = Database::builder();
builder.set_cache_size(cache_bytes);
let db = create_with_lock_retry(&builder, path)
.with_context(|| format!("open redb at {}", path.display()))?;
let write = db.begin_write().context("begin write (init tables)")?;
{
write
.open_table(SESSION_META)
.context("init SESSION_META")?;
write.open_table(MESSAGES).context("init MESSAGES")?;
write.open_table(PAIRING).context("init PAIRING")?;
write.open_table(KV).context("init KV")?;
write
.open_table(SESSION_ALIASES)
.context("init SESSION_ALIASES")?;
write.open_table(TASK_QUEUE).context("init TASK_QUEUE")?;
write
.open_table(EXTERNAL_JOBS)
.context("init EXTERNAL_JOBS")?;
write.open_table(IDEM_KEYS).context("init IDEM_KEYS")?;
write
.open_table(COMPUTER_PERMISSIONS)
.context("init COMPUTER_PERMISSIONS")?;
}
write.commit().context("commit init")?;
debug!(path = %path.display(), cache_mb = cache_bytes / (1024*1024), "redb opened");
Ok(Self { db })
}
pub fn get_session_meta(&self, session_key: &str) -> Result<Option<SessionMeta>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_META)?;
match table.get(session_key)? {
Some(guard) => {
let v: SessionMeta = serde_json::from_str(guard.value())?;
Ok(Some(v))
}
None => Ok(None),
}
}
pub fn put_session_meta(&self, session_key: &str, meta: &SessionMeta) -> Result<()> {
let json = serde_json::to_string(meta)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(SESSION_META)?;
table.insert(session_key, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn delete_session(&self, session_key: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut meta = write.open_table(SESSION_META)?;
meta.remove(session_key)?;
let mut msgs = write.open_table(MESSAGES)?;
let prefix = format!("{session_key}:");
let keys: Vec<String> = msgs
.range(prefix.as_str()..)?
.take_while(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&prefix))
.unwrap_or(false)
})
.filter_map(|r| r.ok())
.map(|(k, _)| k.value().to_owned())
.collect();
for key in &keys {
msgs.remove(key.as_str())?;
}
}
write.commit()?;
Ok(())
}
pub fn list_sessions(&self) -> Result<Vec<String>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_META)?;
let keys = table
.range::<&str>(..)?
.filter_map(|r| r.ok())
.map(|(k, _)| k.value().to_owned())
.collect();
Ok(keys)
}
pub fn new_generation(&self, session_key: &str) -> Result<u32> {
let meta_opt = self.get_session_meta(session_key)?;
let mut meta = meta_opt.unwrap_or_else(|| SessionMeta {
session_key: session_key.to_owned(),
message_count: 0,
last_active: chrono::Utc::now().timestamp(),
created_at: chrono::Utc::now().timestamp(),
generation: 1,
});
meta.generation += 1;
meta.message_count = 0;
meta.last_active = chrono::Utc::now().timestamp();
let write = self.db.begin_write()?;
{
let mut msgs = write.open_table(MESSAGES)?;
let prefix = format!("{session_key}:");
let keys: Vec<String> = msgs
.range(prefix.as_str()..)?
.take_while(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&prefix))
.unwrap_or(false)
})
.filter_map(|r| r.ok())
.map(|(k, _)| k.value().to_owned())
.collect();
for key in &keys {
msgs.remove(key.as_str())?;
}
let meta_json = serde_json::to_string(&meta)?;
let mut metas = write.open_table(SESSION_META)?;
metas.insert(session_key, meta_json.as_str())?;
}
write.commit()?;
Ok(meta.generation)
}
pub fn append_message(&self, session_key: &str, message: &serde_json::Value) -> Result<u64> {
let meta_opt = self.get_session_meta(session_key)?;
let mut meta = meta_opt.unwrap_or_else(|| SessionMeta {
session_key: session_key.to_owned(),
message_count: 0,
last_active: chrono::Utc::now().timestamp(),
created_at: chrono::Utc::now().timestamp(),
generation: 1,
});
let seq = meta.message_count;
meta.message_count += 1;
meta.last_active = chrono::Utc::now().timestamp();
let msg_key = format!("{session_key}:{seq:016}");
let generation = meta.generation;
let archive_key = format!("archive:{session_key}:gen{generation}:{seq:016}");
let msg_json = serde_json::to_string(message)?;
let write = self.db.begin_write()?;
{
let mut msgs = write.open_table(MESSAGES)?;
msgs.insert(msg_key.as_str(), msg_json.as_str())?;
msgs.insert(archive_key.as_str(), msg_json.as_str())?;
let meta_json = serde_json::to_string(&meta)?;
let mut metas = write.open_table(SESSION_META)?;
metas.insert(session_key, meta_json.as_str())?;
}
write.commit()?;
Ok(seq)
}
pub fn load_messages(&self, session_key: &str) -> Result<Vec<serde_json::Value>> {
let read = self.db.begin_read()?;
let table = read.open_table(MESSAGES)?;
let prefix = format!("{session_key}:");
let messages: Vec<(String, serde_json::Value)> = table
.range(prefix.as_str()..)?
.take_while(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&prefix))
.unwrap_or(false)
})
.filter_map(|r| r.ok())
.filter_map(|(k, v)| {
let val: serde_json::Value = serde_json::from_str(v.value()).ok()?;
Some((k.value().to_owned(), val))
})
.collect();
if messages.is_empty() {
return Ok(vec![]);
}
let archive_prefix = format!("archive:{session_key}:");
let has_archive = table
.range(archive_prefix.as_str()..)?
.next()
.is_some_and(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&archive_prefix))
.unwrap_or(false)
});
if !has_archive {
drop(table);
drop(read);
if let Ok(write) = self.db.begin_write() {
if let Ok(mut msgs_table) = write.open_table(MESSAGES) {
for (key, val) in &messages {
let suffix = key.strip_prefix(&format!("{session_key}:")).unwrap_or("0");
let archive_key = format!("archive:{session_key}:gen1:{suffix}");
let json_str = serde_json::to_string(val).unwrap_or_default();
if let Err(e) = msgs_table.insert(archive_key.as_str(), json_str.as_str()) {
tracing::error!(error = %e, key = %archive_key, "failed to insert archive entry");
}
}
}
if let Err(e) = write.commit() {
tracing::error!(error = %e, "failed to commit archive backfill transaction");
}
debug!(
"backfilled {} archive entries for session {session_key}",
messages.len()
);
}
}
Ok(messages.into_iter().map(|(_, v)| v).collect())
}
pub fn archive_load(
&self,
session_key: &str,
generation: Option<u32>,
) -> Result<Vec<(u64, u32, serde_json::Value)>> {
let read = self.db.begin_read()?;
let table = read.open_table(MESSAGES)?;
let prefix = match generation {
Some(g) => format!("archive:{session_key}:gen{g}:"),
None => format!("archive:{session_key}:"),
};
let mut out = Vec::new();
for entry in table.range(prefix.as_str()..)? {
let (k, v) = entry?;
let key = k.value();
if !key.starts_with(&prefix) {
break;
}
let after = match key.strip_prefix(&format!("archive:{session_key}:gen")) {
Some(s) => s,
None => continue,
};
let (gen_str, seq_str) = match after.split_once(':') {
Some(pair) => pair,
None => continue,
};
let Ok(generation) = gen_str.parse::<u32>() else {
continue;
};
let Ok(seq) = seq_str.parse::<u64>() else {
continue;
};
let Ok(msg) = serde_json::from_str::<serde_json::Value>(v.value()) else {
continue;
};
out.push((seq, generation, msg));
}
out.sort_by(|a, b| a.1.cmp(&b.1).then(a.0.cmp(&b.0)));
Ok(out)
}
pub fn archive_stat(&self, session_key: &str) -> Result<ArchiveStat> {
let rows = self.archive_load(session_key, None)?;
if rows.is_empty() {
return Ok(ArchiveStat::default());
}
let mut generations: Vec<u32> = rows.iter().map(|(_, g, _)| *g).collect();
generations.sort_unstable();
generations.dedup();
let oldest_seq = rows.iter().map(|(s, _, _)| *s).min();
let newest_seq = rows.iter().map(|(s, _, _)| *s).max();
Ok(ArchiveStat {
total_messages: rows.len() as u64,
oldest_seq,
newest_seq,
generations,
})
}
pub fn get_pairing(&self, channel: &str, peer_id: &str) -> Result<Option<PairingState>> {
let key = format!("{channel}:{peer_id}");
let read = self.db.begin_read()?;
let table = read.open_table(PAIRING)?;
match table.get(key.as_str())? {
Some(g) => Ok(Some(serde_json::from_str(g.value())?)),
None => Ok(None),
}
}
pub fn put_pairing(&self, channel: &str, peer_id: &str, state: &PairingState) -> Result<()> {
let key = format!("{channel}:{peer_id}");
let json = serde_json::to_string(state)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(PAIRING)?;
table.insert(key.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn list_pairings(&self, channel: &str) -> Result<Vec<String>> {
let prefix = format!("{channel}:");
let read = self.db.begin_read()?;
let table = read.open_table(PAIRING)?;
let mut peers = Vec::new();
for entry in table.iter()? {
let (key, val) = entry?;
let k = key.value();
if k.starts_with(&prefix) {
if let Ok(state) = serde_json::from_str::<PairingState>(val.value()) {
if matches!(state, PairingState::Approved) {
peers.push(k[prefix.len()..].to_owned());
}
}
}
}
Ok(peers)
}
pub fn delete_pairing(&self, channel: &str, peer_id: &str) -> Result<()> {
let key = format!("{channel}:{peer_id}");
let write = self.db.begin_write()?;
{
let mut table = write.open_table(PAIRING)?;
table.remove(key.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn kv_get(&self, key: &str) -> Result<Option<String>> {
let read = self.db.begin_read()?;
let table = read.open_table(KV)?;
Ok(table.get(key)?.map(|g| g.value().to_owned()))
}
pub fn kv_set(&self, key: &str, value: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(KV)?;
table.insert(key, value)?;
}
write.commit()?;
Ok(())
}
pub fn kv_delete(&self, key: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(KV)?;
table.remove(key)?;
}
write.commit()?;
Ok(())
}
pub fn resolve_session_alias(&self, alias_key: &str) -> Result<Option<String>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_ALIASES)?;
Ok(table.get(alias_key)?.map(|g| g.value().to_owned()))
}
pub fn put_session_alias(&self, alias_key: &str, canonical_key: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(SESSION_ALIASES)?;
table.insert(alias_key, canonical_key)?;
}
write.commit()?;
Ok(())
}
pub fn put_session_aliases(&self, aliases: &[(&str, &str)]) -> Result<()> {
if aliases.is_empty() {
return Ok(());
}
let write = self.db.begin_write()?;
{
let mut table = write.open_table(SESSION_ALIASES)?;
for (alias_key, canonical_key) in aliases {
table.insert(*alias_key, *canonical_key)?;
}
}
write.commit()?;
Ok(())
}
pub fn load_all_aliases(&self) -> Result<std::collections::HashMap<String, String>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_ALIASES)?;
let mut map = std::collections::HashMap::new();
for entry in table.iter()? {
let (k, v) = entry?;
map.insert(k.value().to_owned(), v.value().to_owned());
}
Ok(map)
}
pub fn enqueue_task(&self, task: &rsclaw_types::QueuedTask) -> Result<()> {
let json = serde_json::to_string(task)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
table.insert(task.id.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn dequeue_task(&self) -> Result<Option<rsclaw_types::QueuedTask>> {
use rsclaw_types::TaskStatus;
let write = self.db.begin_write()?;
let result = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut best: Option<rsclaw_types::QueuedTask> = None;
for entry in table.iter()? {
let (_k, v) = entry?;
let task: rsclaw_types::QueuedTask = serde_json::from_str(v.value())?;
if task.status != TaskStatus::Pending {
continue;
}
let dominated = match &best {
None => true,
Some(b) => (task.priority, task.created_at) < (b.priority, b.created_at),
};
if dominated {
best = Some(task);
}
}
if let Some(mut task) = best {
task.status = TaskStatus::Running;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task.id.as_str(), json.as_str())?;
Some(task)
} else {
None
}
};
write.commit()?;
Ok(result)
}
pub fn requeue_running_tasks(&self) -> Result<usize> {
use rsclaw_types::TaskStatus;
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut to_revive = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let task: rsclaw_types::QueuedTask = serde_json::from_str(v.value())?;
if task.status == TaskStatus::Running {
to_revive.push(task);
}
}
let count = to_revive.len();
for mut task in to_revive {
task.status = TaskStatus::Pending;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task.id.as_str(), json.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
pub fn update_task_turn(&self, task_id: &str, turn: u32) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: rsclaw_types::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.turns = turn;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn update_task_last_reply(&self, task_id: &str, text: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: rsclaw_types::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.last_reply = Some(text.to_owned());
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn mark_task_notified(&self, task_id: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: rsclaw_types::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.notified = true;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn update_task_status(
&self,
task_id: &str,
status: rsclaw_types::TaskStatus,
) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: rsclaw_types::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.status = status;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn fail_task(
&self,
task_id: &str,
max_retries: u32,
) -> Result<rsclaw_types::TaskStatus> {
use rsclaw_types::TaskStatus;
let write = self.db.begin_write()?;
let status = {
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: rsclaw_types::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.retries += 1;
task.updated_at = chrono::Utc::now().timestamp();
if task.retries >= max_retries {
task.status = TaskStatus::Dead;
} else {
task.status = TaskStatus::Failed;
}
let new_status = task.status;
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
new_status
};
write.commit()?;
Ok(status)
}
pub fn get_task(
&self,
task_id: &str,
) -> Result<Option<rsclaw_types::QueuedTask>> {
let read = self.db.begin_read()?;
let table = read.open_table(TASK_QUEUE)?;
match table.get(task_id)? {
Some(guard) => {
let task = serde_json::from_str(guard.value())?;
Ok(Some(task))
}
None => Ok(None),
}
}
pub fn list_tasks(
&self,
status: Option<rsclaw_types::TaskStatus>,
) -> Result<Vec<rsclaw_types::QueuedTask>> {
let read = self.db.begin_read()?;
let table = read.open_table(TASK_QUEUE)?;
let mut tasks = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let task: rsclaw_types::QueuedTask = serde_json::from_str(v.value())?;
if let Some(ref s) = status {
if task.status != *s {
continue;
}
}
tasks.push(task);
}
Ok(tasks)
}
pub fn cleanup_expired_tasks(&self) -> Result<usize> {
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut expired_ids = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let task: rsclaw_types::QueuedTask = serde_json::from_str(v.value())?;
if task.is_expired() {
expired_ids.push(task.id);
}
}
let count = expired_ids.len();
for id in &expired_ids {
table.remove(id.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
pub fn has_duplicate(&self, session_key: &str, content_hash: &str) -> Result<bool> {
use rsclaw_types::TaskStatus;
let read = self.db.begin_read()?;
let table = read.open_table(TASK_QUEUE)?;
for entry in table.iter()? {
let (_k, v) = entry?;
let task: rsclaw_types::QueuedTask = serde_json::from_str(v.value())?;
if task.session_key == session_key
&& task.content_hash == content_hash
&& task.status == TaskStatus::Pending
{
return Ok(true);
}
}
Ok(false)
}
pub fn merge_into_pending(
&self,
session_key: &str,
message: &rsclaw_types::QueuedMessage,
) -> Result<bool> {
use rsclaw_types::TaskStatus;
let write = self.db.begin_write()?;
let merged = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut target_id: Option<String> = None;
for entry in table.iter()? {
let (_k, v) = entry?;
let task: rsclaw_types::QueuedTask = serde_json::from_str(v.value())?;
if task.session_key == session_key && task.status == TaskStatus::Pending {
target_id = Some(task.id);
break;
}
}
if let Some(id) = target_id {
let guard = table
.get(id.as_str())?
.ok_or_else(|| anyhow::anyhow!("task disappeared: {id}"))?;
let mut task: rsclaw_types::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.messages.push(message.clone());
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(id.as_str(), json.as_str())?;
true
} else {
false
}
};
write.commit()?;
Ok(merged)
}
pub fn is_idem_delivered(&self, key: &str) -> Result<bool> {
let read = self.db.begin_read()?;
let table = read.open_table(IDEM_KEYS)?;
Ok(table.get(key)?.is_some())
}
pub fn mark_idem_delivered(&self, key: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
let write = self.db.begin_write()?;
{
let mut table = write.open_table(IDEM_KEYS)?;
table.insert(key, now)?;
}
write.commit()?;
Ok(())
}
pub fn cleanup_idem_keys(&self, retention_secs: i64) -> Result<usize> {
let cutoff = chrono::Utc::now().timestamp() - retention_secs;
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(IDEM_KEYS)?;
let mut victims = Vec::new();
for entry in table.iter()? {
let (k, v) = entry?;
if v.value() < cutoff {
victims.push(k.value().to_owned());
}
}
let count = victims.len();
for key in &victims {
table.remove(key.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
pub fn permission_get(&self, key: &str) -> Result<Option<String>> {
let read = self.db.begin_read()?;
let table = read.open_table(COMPUTER_PERMISSIONS)?;
Ok(table.get(key)?.map(|g| g.value().to_owned()))
}
pub fn permission_put(&self, key: &str, value: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(COMPUTER_PERMISSIONS)?;
table.insert(key, value)?;
}
write.commit()?;
Ok(())
}
pub fn permission_delete(&self, key: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(COMPUTER_PERMISSIONS)?;
table.remove(key)?;
}
write.commit()?;
Ok(())
}
pub fn permission_list_all(&self) -> Result<Vec<(String, String)>> {
let read = self.db.begin_read()?;
let table = read.open_table(COMPUTER_PERMISSIONS)?;
let mut out = Vec::new();
for entry in table.iter()? {
let (k, v) = entry?;
out.push((k.value().to_owned(), v.value().to_owned()));
}
Ok(out)
}
pub fn cron_get(&self, id: &str) -> Result<Option<String>> {
let read = self.db.begin_read()?;
let table = match read.open_table(CRON_JOBS) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
Err(e) => return Err(e.into()),
};
Ok(table.get(id)?.map(|g| g.value().to_owned()))
}
pub fn cron_list(&self) -> Result<Vec<(String, String)>> {
let read = self.db.begin_read()?;
let table = match read.open_table(CRON_JOBS) {
Ok(t) => t,
Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
Err(e) => return Err(e.into()),
};
let mut out = Vec::new();
for entry in table.iter()? {
let (k, v) = entry?;
out.push((k.value().to_owned(), v.value().to_owned()));
}
Ok(out)
}
pub fn cron_put(&self, id: &str, value: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(CRON_JOBS)?;
table.insert(id, value)?;
}
write.commit()?;
Ok(())
}
pub fn cron_delete(&self, id: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(CRON_JOBS)?;
table.remove(id)?;
}
write.commit()?;
Ok(())
}
pub fn cron_bulk_replace(&self, entries: &[(String, String)]) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(CRON_JOBS)?;
let existing: Vec<String> = table
.iter()?
.filter_map(|e| e.ok().map(|(k, _)| k.value().to_owned()))
.collect();
for k in existing {
table.remove(k.as_str())?;
}
for (k, v) in entries {
table.insert(k.as_str(), v.as_str())?;
}
}
write.commit()?;
Ok(())
}
pub fn enqueue_external_job(
&self,
job: &rsclaw_types::ExternalJob,
) -> Result<()> {
let json = serde_json::to_string(job)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(EXTERNAL_JOBS)?;
table.insert(job.id.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn update_external_job(
&self,
job: &rsclaw_types::ExternalJob,
) -> Result<()> {
let json = serde_json::to_string(job)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(EXTERNAL_JOBS)?;
if table.get(job.id.as_str())?.is_none() {
anyhow::bail!("external job not found: {}", job.id);
}
table.insert(job.id.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn get_external_job(
&self,
job_id: &str,
) -> Result<Option<rsclaw_types::ExternalJob>> {
let read = self.db.begin_read()?;
let table = read.open_table(EXTERNAL_JOBS)?;
match table.get(job_id)? {
Some(guard) => {
let job = serde_json::from_str(guard.value())?;
Ok(Some(job))
}
None => Ok(None),
}
}
pub fn due_external_jobs(
&self,
now: i64,
) -> Result<Vec<rsclaw_types::ExternalJob>> {
let read = self.db.begin_read()?;
let table = read.open_table(EXTERNAL_JOBS)?;
let mut due = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let job: rsclaw_types::ExternalJob = serde_json::from_str(v.value())?;
if job.next_poll_at > now {
continue;
}
let needs_action = matches!(
job.status,
rsclaw_types::ExternalJobStatus::Pending
| rsclaw_types::ExternalJobStatus::Polling
) || job.needs_delivery();
if needs_action
&& job.delivery_attempts
< rsclaw_types::ExternalJob::MAX_DELIVERY_ATTEMPTS
{
due.push(job);
}
}
Ok(due)
}
pub fn delete_external_job(&self, job_id: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(EXTERNAL_JOBS)?;
table.remove(job_id)?;
}
write.commit()?;
Ok(())
}
pub fn cleanup_finished_external_jobs(&self, retention_secs: i64) -> Result<usize> {
use rsclaw_types::{ExternalJob, ExternalJobStatus};
let cutoff = chrono::Utc::now().timestamp() - retention_secs;
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(EXTERNAL_JOBS)?;
let mut victims = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let job: ExternalJob = serde_json::from_str(v.value())?;
let terminal = matches!(
job.status,
ExternalJobStatus::Done
| ExternalJobStatus::Failed
| ExternalJobStatus::TimedOut
);
let delivery_settled = job.delivered_at.is_some()
|| job.delivery_attempts >= ExternalJob::MAX_DELIVERY_ATTEMPTS;
if terminal && delivery_settled && job.submitted_at < cutoff {
victims.push(job.id);
}
}
let count = victims.len();
for id in &victims {
table.remove(id.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SessionMeta {
pub session_key: String,
pub message_count: u64,
pub last_active: i64, pub created_at: i64,
#[serde(default = "default_generation")]
pub generation: u32,
}
fn default_generation() -> u32 {
1
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum PairingState {
Approved,
Pending { code: String, expires_at: i64 },
}
#[cfg(test)]
mod tests {
use super::*;
const T_A: redb_legacy::TableDefinition<&str, &str> =
redb_legacy::TableDefinition::new("_upgrade_test_a");
const T_B: redb_legacy::TableDefinition<&str, u64> =
redb_legacy::TableDefinition::new("_upgrade_test_b");
const T_A_V3: TableDefinition<&str, &str> = TableDefinition::new("_upgrade_test_a");
const T_B_V3: TableDefinition<&str, u64> = TableDefinition::new("_upgrade_test_b");
fn write_v2_fixture(path: &std::path::Path) -> Vec<u8> {
let db = redb_legacy::Database::create(path).expect("create v2");
let txn = db.begin_write().expect("begin write");
{
let mut a = txn.open_table(T_A).expect("open T_A");
a.insert("hello", "world").expect("insert a1");
a.insert("foo", "bar").expect("insert a2");
let mut b = txn.open_table(T_B).expect("open T_B");
b.insert("count", 42u64).expect("insert b1");
b.insert("year", 2026u64).expect("insert b2");
}
txn.commit().expect("commit");
drop(db); std::fs::read(path).expect("read v2 bytes")
}
#[test]
fn upgrades_v2_database_to_v3_preserving_data_and_writes_backup() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("legacy.redb");
let original_bytes = write_v2_fixture(&path);
match Database::open(&path) {
Err(redb::DatabaseError::UpgradeRequired(_)) => {}
other => panic!("expected UpgradeRequired before upgrade, got {other:?}"),
}
upgrade_legacy_if_needed(&path).expect("upgrade");
let backup = path.with_extension("redb.v2.bak");
assert!(backup.exists(), "backup file should exist at {backup:?}");
let backup_bytes = std::fs::read(&backup).expect("read backup");
assert_eq!(
backup_bytes, original_bytes,
"backup must be byte-identical to the original v2 file"
);
{
let db = Database::open(&path).expect("open after upgrade");
let read = db.begin_read().expect("begin read");
let a = read.open_table(T_A_V3).expect("open T_A v3");
assert_eq!(a.get("hello").unwrap().unwrap().value(), "world");
assert_eq!(a.get("foo").unwrap().unwrap().value(), "bar");
let b = read.open_table(T_B_V3).expect("open T_B v3");
assert_eq!(b.get("count").unwrap().unwrap().value(), 42u64);
assert_eq!(b.get("year").unwrap().unwrap().value(), 2026u64);
}
upgrade_legacy_if_needed(&path).expect("second upgrade no-op");
}
#[test]
fn legacy_upgrade_runner_converts_panic_to_error() {
let result = run_legacy_redb_upgrade_safely(|| -> Result<()> {
panic!("legacy redb panic");
});
let err = result.expect_err("panic should become error");
let msg = format!("{err:#}");
assert!(msg.contains("legacy redb upgrade panicked"), "{msg}");
assert!(msg.contains("legacy redb panic"), "{msg}");
}
#[test]
fn upgrade_does_not_overwrite_existing_backup() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("legacy.redb");
write_v2_fixture(&path);
let backup = path.with_extension("redb.v2.bak");
std::fs::write(&backup, b"sentinel-do-not-overwrite").expect("seed backup");
upgrade_legacy_if_needed(&path).expect("upgrade");
assert_eq!(
std::fs::read(&backup).expect("read backup"),
b"sentinel-do-not-overwrite",
"existing backup must be preserved verbatim"
);
}
#[test]
fn corrupted_file_is_not_mistaken_for_legacy_v2() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("garbage.redb");
std::fs::write(&path, b"this is not a redb file at all").expect("seed garbage");
upgrade_legacy_if_needed(&path).expect("noop on garbage");
assert!(
!path.with_extension("redb.v2.bak").exists(),
"no backup should be written for non-legacy files",
);
assert!(Database::create(&path).is_err());
}
#[test]
fn upgrade_helper_noop_on_missing_file() {
let dir = tempfile::tempdir().expect("tempdir");
upgrade_legacy_if_needed(&dir.path().join("does-not-exist.redb")).expect("noop");
}
fn open_tmp() -> (RedbStore, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let store =
RedbStore::open(&dir.path().join("test.redb"), MemoryTier::Low).expect("open redb");
(store, dir)
}
#[test]
fn create_with_lock_retry_waits_for_lock_release() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("lock.redb");
let first = Database::create(&path).expect("first open");
let released = Arc::new(AtomicBool::new(false));
let released_writer = Arc::clone(&released);
let holder = std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(300));
released_writer.store(true, Ordering::SeqCst);
drop(first); });
assert!(
matches!(
Database::create(&path),
Err(redb::DatabaseError::DatabaseAlreadyOpen)
),
"lock should be held while the holder thread is alive"
);
let db = create_with_lock_retry(&Database::builder(), &path)
.expect("retry should succeed once the lock is released");
assert!(
released.load(Ordering::SeqCst),
"retry must only succeed after the holder released the lock"
);
drop(db);
holder.join().expect("holder thread");
}
#[test]
fn create_with_lock_retry_succeeds_immediately_when_unlocked() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("fresh.redb");
let db =
create_with_lock_retry(&Database::builder(), &path).expect("open uncontended redb");
drop(db);
}
#[test]
fn session_meta_round_trip() {
let (store, _dir) = open_tmp();
let meta = SessionMeta {
session_key: "agent:main:telegram:direct:u1".to_owned(),
message_count: 5,
last_active: 1_700_000_000,
created_at: 1_699_000_000,
generation: 1,
};
store
.put_session_meta(&meta.session_key, &meta)
.expect("put");
let got = store.get_session_meta(&meta.session_key).expect("get");
assert!(got.is_some());
assert_eq!(got.unwrap().message_count, 5);
}
#[test]
fn append_and_load_messages() {
let (store, _dir) = open_tmp();
let sk = "agent:main:cli:direct:user";
let msg1 = serde_json::json!({"role": "user", "content": "hello"});
let msg2 = serde_json::json!({"role": "assistant", "content": "hi there"});
store.append_message(sk, &msg1).expect("append 1");
store.append_message(sk, &msg2).expect("append 2");
let msgs = store.load_messages(sk).expect("load");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0]["role"], "user");
assert_eq!(msgs[1]["role"], "assistant");
}
#[test]
fn archive_load_returns_every_appended_message() {
let (store, _dir) = open_tmp();
let sk = "sess:archive_test";
for i in 1..=6 {
store
.append_message(
sk,
&serde_json::json!({ "role": "user", "content": format!("msg {i}") }),
)
.expect("append");
}
let rows = store.archive_load(sk, None).expect("archive_load");
assert_eq!(rows.len(), 6, "archive should keep every message");
assert_eq!(rows[0].0, 0);
assert_eq!(rows[0].1, 1, "generation should be 1");
assert_eq!(rows[0].2["content"], "msg 1");
assert_eq!(rows[5].2["content"], "msg 6");
}
#[test]
fn archive_load_orders_cross_generation_numerically() {
let (store, _dir) = open_tmp();
let sk = "sess:gen_order";
for _ in 0..11 {
store
.append_message(sk, &serde_json::json!({ "role": "user", "content": "msg" }))
.expect("append");
store.new_generation(sk).expect("new_generation");
}
store
.append_message(
sk,
&serde_json::json!({ "role": "user", "content": "final" }),
)
.expect("append");
let rows = store.archive_load(sk, None).expect("archive_load");
for win in rows.windows(2) {
assert!(
win[0].1 <= win[1].1,
"generations out of order: {} then {}",
win[0].1,
win[1].1,
);
}
assert_eq!(rows.first().map(|r| r.1), Some(1));
assert_eq!(rows.last().map(|r| r.1), Some(12));
}
#[test]
fn archive_stat_summarises_totals() {
let (store, _dir) = open_tmp();
let sk = "sess:stat_test";
for i in 0..3 {
store
.append_message(sk, &serde_json::json!({"i": i}))
.expect("append");
}
let stat = store.archive_stat(sk).expect("archive_stat");
assert_eq!(stat.total_messages, 3);
assert_eq!(stat.oldest_seq, Some(0));
assert_eq!(stat.newest_seq, Some(2));
assert_eq!(stat.generations, vec![1]);
}
#[test]
fn archive_survives_session_delete() {
let (store, _dir) = open_tmp();
let sk = "sess:permanence";
store
.append_message(
sk,
&serde_json::json!({"role": "user", "content": "remember me"}),
)
.expect("append");
assert_eq!(store.archive_stat(sk).unwrap().total_messages, 1);
}
#[test]
fn delete_session_removes_messages() {
let (store, _dir) = open_tmp();
let sk = "agent:main:cli:direct:del_user";
store
.append_message(sk, &serde_json::json!({"role": "user", "content": "x"}))
.expect("append");
store.delete_session(sk).expect("delete");
let msgs = store.load_messages(sk).expect("load after delete");
assert!(msgs.is_empty());
assert!(store.get_session_meta(sk).expect("meta").is_none());
}
#[test]
fn kv_set_get_delete() {
let (store, _dir) = open_tmp();
store.kv_set("my_key", "my_value").expect("set");
assert_eq!(
store.kv_get("my_key").expect("get").as_deref(),
Some("my_value")
);
store.kv_delete("my_key").expect("delete");
assert!(store.kv_get("my_key").expect("get after delete").is_none());
}
#[test]
fn list_sessions() {
let (store, _dir) = open_tmp();
let keys = ["sess:a", "sess:b", "sess:c"];
for k in &keys {
store
.append_message(k, &serde_json::json!({}))
.expect("append");
}
let listed = store.list_sessions().expect("list");
for k in &keys {
assert!(listed.contains(&k.to_string()), "missing {k}");
}
}
}