use std::path::Path;
use anyhow::{Context, Result};
use redb::{Database, ReadableTable, TableDefinition};
#[allow(unused_imports)]
use serde::{Serialize, de::DeserializeOwned};
use tracing::debug;
use crate::MemoryTier;
const SESSION_META: TableDefinition<&str, &str> = TableDefinition::new("session_meta");
const MESSAGES: TableDefinition<&str, &str> = TableDefinition::new("messages");
const PAIRING: TableDefinition<&str, &str> = TableDefinition::new("pairing");
const KV: TableDefinition<&str, &str> = TableDefinition::new("kv");
const SESSION_ALIASES: TableDefinition<&str, &str> = TableDefinition::new("session_aliases");
const TASK_QUEUE: TableDefinition<&str, &str> = TableDefinition::new("task_queue");
const EXTERNAL_JOBS: TableDefinition<&str, &str> = TableDefinition::new("external_jobs");
const IDEM_KEYS: TableDefinition<&str, i64> = TableDefinition::new("idem_keys");
pub struct RedbStore {
db: Database,
}
impl std::fmt::Debug for RedbStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedbStore").finish_non_exhaustive()
}
}
impl RedbStore {
pub fn open(path: &Path, tier: MemoryTier) -> Result<Self> {
let cache_bytes: usize = match tier {
MemoryTier::Low => 16 * 1024 * 1024, MemoryTier::Standard => 32 * 1024 * 1024, MemoryTier::High => 64 * 1024 * 1024, };
let db = Database::builder()
.set_cache_size(cache_bytes)
.create(path)
.with_context(|| format!("open redb at {}", path.display()))?;
let write = db.begin_write().context("begin write (init tables)")?;
{
write
.open_table(SESSION_META)
.context("init SESSION_META")?;
write.open_table(MESSAGES).context("init MESSAGES")?;
write.open_table(PAIRING).context("init PAIRING")?;
write.open_table(KV).context("init KV")?;
write
.open_table(SESSION_ALIASES)
.context("init SESSION_ALIASES")?;
write
.open_table(TASK_QUEUE)
.context("init TASK_QUEUE")?;
write
.open_table(EXTERNAL_JOBS)
.context("init EXTERNAL_JOBS")?;
write
.open_table(IDEM_KEYS)
.context("init IDEM_KEYS")?;
}
write.commit().context("commit init")?;
debug!(path = %path.display(), cache_mb = cache_bytes / (1024*1024), "redb opened");
Ok(Self { db })
}
pub fn get_session_meta(&self, session_key: &str) -> Result<Option<SessionMeta>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_META)?;
match table.get(session_key)? {
Some(guard) => {
let v: SessionMeta = serde_json::from_str(guard.value())?;
Ok(Some(v))
}
None => Ok(None),
}
}
pub fn put_session_meta(&self, session_key: &str, meta: &SessionMeta) -> Result<()> {
let json = serde_json::to_string(meta)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(SESSION_META)?;
table.insert(session_key, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn delete_session(&self, session_key: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut meta = write.open_table(SESSION_META)?;
meta.remove(session_key)?;
let mut msgs = write.open_table(MESSAGES)?;
let prefix = format!("{session_key}:");
let keys: Vec<String> = msgs
.range(prefix.as_str()..)?
.take_while(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&prefix))
.unwrap_or(false)
})
.filter_map(|r| r.ok())
.map(|(k, _)| k.value().to_owned())
.collect();
for key in &keys {
msgs.remove(key.as_str())?;
}
}
write.commit()?;
Ok(())
}
pub fn list_sessions(&self) -> Result<Vec<String>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_META)?;
let keys = table
.range::<&str>(..)?
.filter_map(|r| r.ok())
.map(|(k, _)| k.value().to_owned())
.collect();
Ok(keys)
}
pub fn new_generation(&self, session_key: &str) -> Result<u32> {
let meta_opt = self.get_session_meta(session_key)?;
let mut meta = meta_opt.unwrap_or_else(|| SessionMeta {
session_key: session_key.to_owned(),
message_count: 0,
last_active: chrono::Utc::now().timestamp(),
created_at: chrono::Utc::now().timestamp(),
generation: 1,
});
meta.generation += 1;
meta.message_count = 0;
meta.last_active = chrono::Utc::now().timestamp();
let write = self.db.begin_write()?;
{
let mut msgs = write.open_table(MESSAGES)?;
let prefix = format!("{session_key}:");
let keys: Vec<String> = msgs
.range(prefix.as_str()..)?
.take_while(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&prefix))
.unwrap_or(false)
})
.filter_map(|r| r.ok())
.map(|(k, _)| k.value().to_owned())
.collect();
for key in &keys {
msgs.remove(key.as_str())?;
}
let meta_json = serde_json::to_string(&meta)?;
let mut metas = write.open_table(SESSION_META)?;
metas.insert(session_key, meta_json.as_str())?;
}
write.commit()?;
Ok(meta.generation)
}
pub fn append_message(&self, session_key: &str, message: &serde_json::Value) -> Result<u64> {
let meta_opt = self.get_session_meta(session_key)?;
let mut meta = meta_opt.unwrap_or_else(|| SessionMeta {
session_key: session_key.to_owned(),
message_count: 0,
last_active: chrono::Utc::now().timestamp(),
created_at: chrono::Utc::now().timestamp(),
generation: 1,
});
let seq = meta.message_count;
meta.message_count += 1;
meta.last_active = chrono::Utc::now().timestamp();
let msg_key = format!("{session_key}:{seq:016}");
let generation = meta.generation;
let archive_key = format!("archive:{session_key}:gen{generation}:{seq:016}");
let msg_json = serde_json::to_string(message)?;
let write = self.db.begin_write()?;
{
let mut msgs = write.open_table(MESSAGES)?;
msgs.insert(msg_key.as_str(), msg_json.as_str())?;
msgs.insert(archive_key.as_str(), msg_json.as_str())?;
let meta_json = serde_json::to_string(&meta)?;
let mut metas = write.open_table(SESSION_META)?;
metas.insert(session_key, meta_json.as_str())?;
}
write.commit()?;
Ok(seq)
}
pub fn load_messages(&self, session_key: &str) -> Result<Vec<serde_json::Value>> {
let read = self.db.begin_read()?;
let table = read.open_table(MESSAGES)?;
let prefix = format!("{session_key}:");
let messages: Vec<(String, serde_json::Value)> = table
.range(prefix.as_str()..)?
.take_while(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&prefix))
.unwrap_or(false)
})
.filter_map(|r| r.ok())
.filter_map(|(k, v)| {
let val: serde_json::Value = serde_json::from_str(v.value()).ok()?;
Some((k.value().to_owned(), val))
})
.collect();
if messages.is_empty() {
return Ok(vec![]);
}
let archive_prefix = format!("archive:{session_key}:");
let has_archive = table
.range(archive_prefix.as_str()..)?
.next()
.is_some_and(|r| {
r.as_ref()
.map(|(k, _)| k.value().starts_with(&archive_prefix))
.unwrap_or(false)
});
if !has_archive {
drop(table);
drop(read);
if let Ok(write) = self.db.begin_write() {
if let Ok(mut msgs_table) = write.open_table(MESSAGES) {
for (key, val) in &messages {
let suffix = key.strip_prefix(&format!("{session_key}:")).unwrap_or("0");
let archive_key = format!("archive:{session_key}:gen1:{suffix}");
let json_str = serde_json::to_string(val).unwrap_or_default();
if let Err(e) = msgs_table.insert(archive_key.as_str(), json_str.as_str()) {
tracing::error!(error = %e, key = %archive_key, "failed to insert archive entry");
}
}
}
if let Err(e) = write.commit() {
tracing::error!(error = %e, "failed to commit archive backfill transaction");
}
debug!("backfilled {} archive entries for session {session_key}", messages.len());
}
}
Ok(messages.into_iter().map(|(_, v)| v).collect())
}
pub fn get_pairing(&self, channel: &str, peer_id: &str) -> Result<Option<PairingState>> {
let key = format!("{channel}:{peer_id}");
let read = self.db.begin_read()?;
let table = read.open_table(PAIRING)?;
match table.get(key.as_str())? {
Some(g) => Ok(Some(serde_json::from_str(g.value())?)),
None => Ok(None),
}
}
pub fn put_pairing(&self, channel: &str, peer_id: &str, state: &PairingState) -> Result<()> {
let key = format!("{channel}:{peer_id}");
let json = serde_json::to_string(state)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(PAIRING)?;
table.insert(key.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn list_pairings(&self, channel: &str) -> Result<Vec<String>> {
let prefix = format!("{channel}:");
let read = self.db.begin_read()?;
let table = read.open_table(PAIRING)?;
let mut peers = Vec::new();
for entry in table.iter()? {
let (key, val) = entry?;
let k = key.value();
if k.starts_with(&prefix) {
if let Ok(state) = serde_json::from_str::<PairingState>(val.value()) {
if matches!(state, PairingState::Approved) {
peers.push(k[prefix.len()..].to_owned());
}
}
}
}
Ok(peers)
}
pub fn delete_pairing(&self, channel: &str, peer_id: &str) -> Result<()> {
let key = format!("{channel}:{peer_id}");
let write = self.db.begin_write()?;
{
let mut table = write.open_table(PAIRING)?;
table.remove(key.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn kv_get(&self, key: &str) -> Result<Option<String>> {
let read = self.db.begin_read()?;
let table = read.open_table(KV)?;
Ok(table.get(key)?.map(|g| g.value().to_owned()))
}
pub fn kv_set(&self, key: &str, value: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(KV)?;
table.insert(key, value)?;
}
write.commit()?;
Ok(())
}
pub fn kv_delete(&self, key: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(KV)?;
table.remove(key)?;
}
write.commit()?;
Ok(())
}
pub fn resolve_session_alias(&self, alias_key: &str) -> Result<Option<String>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_ALIASES)?;
Ok(table.get(alias_key)?.map(|g| g.value().to_owned()))
}
pub fn put_session_alias(&self, alias_key: &str, canonical_key: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(SESSION_ALIASES)?;
table.insert(alias_key, canonical_key)?;
}
write.commit()?;
Ok(())
}
pub fn put_session_aliases(&self, aliases: &[(&str, &str)]) -> Result<()> {
if aliases.is_empty() {
return Ok(());
}
let write = self.db.begin_write()?;
{
let mut table = write.open_table(SESSION_ALIASES)?;
for (alias_key, canonical_key) in aliases {
table.insert(*alias_key, *canonical_key)?;
}
}
write.commit()?;
Ok(())
}
pub fn load_all_aliases(&self) -> Result<std::collections::HashMap<String, String>> {
let read = self.db.begin_read()?;
let table = read.open_table(SESSION_ALIASES)?;
let mut map = std::collections::HashMap::new();
for entry in table.iter()? {
let (k, v) = entry?;
map.insert(k.value().to_owned(), v.value().to_owned());
}
Ok(map)
}
pub fn enqueue_task(&self, task: &crate::gateway::task_queue::QueuedTask) -> Result<()> {
let json = serde_json::to_string(task)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
table.insert(task.id.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn dequeue_task(&self) -> Result<Option<crate::gateway::task_queue::QueuedTask>> {
use crate::gateway::task_queue::TaskStatus;
let write = self.db.begin_write()?;
let result = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut best: Option<crate::gateway::task_queue::QueuedTask> = None;
for entry in table.iter()? {
let (_k, v) = entry?;
let task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(v.value())?;
if task.status != TaskStatus::Pending {
continue;
}
let dominated = match &best {
None => true,
Some(b) => {
(task.priority, task.created_at) < (b.priority, b.created_at)
}
};
if dominated {
best = Some(task);
}
}
if let Some(mut task) = best {
task.status = TaskStatus::Running;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task.id.as_str(), json.as_str())?;
Some(task)
} else {
None
}
};
write.commit()?;
Ok(result)
}
pub fn requeue_running_tasks(&self) -> Result<usize> {
use crate::gateway::task_queue::TaskStatus;
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut to_revive = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(v.value())?;
if task.status == TaskStatus::Running {
to_revive.push(task);
}
}
let count = to_revive.len();
for mut task in to_revive {
task.status = TaskStatus::Pending;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task.id.as_str(), json.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
pub fn update_task_turn(&self, task_id: &str, turn: u32) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.turns = turn;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn update_task_last_reply(&self, task_id: &str, text: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.last_reply = Some(text.to_owned());
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn mark_task_notified(&self, task_id: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.notified = true;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn update_task_status(
&self,
task_id: &str,
status: crate::gateway::task_queue::TaskStatus,
) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.status = status;
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn fail_task(
&self,
task_id: &str,
max_retries: u32,
) -> Result<crate::gateway::task_queue::TaskStatus> {
use crate::gateway::task_queue::TaskStatus;
let write = self.db.begin_write()?;
let status = {
let mut table = write.open_table(TASK_QUEUE)?;
let guard = table
.get(task_id)?
.ok_or_else(|| anyhow::anyhow!("task not found: {task_id}"))?;
let mut task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.retries += 1;
task.updated_at = chrono::Utc::now().timestamp();
if task.retries >= max_retries {
task.status = TaskStatus::Dead;
} else {
task.status = TaskStatus::Failed;
}
let new_status = task.status;
let json = serde_json::to_string(&task)?;
table.insert(task_id, json.as_str())?;
new_status
};
write.commit()?;
Ok(status)
}
pub fn get_task(
&self,
task_id: &str,
) -> Result<Option<crate::gateway::task_queue::QueuedTask>> {
let read = self.db.begin_read()?;
let table = read.open_table(TASK_QUEUE)?;
match table.get(task_id)? {
Some(guard) => {
let task = serde_json::from_str(guard.value())?;
Ok(Some(task))
}
None => Ok(None),
}
}
pub fn list_tasks(
&self,
status: Option<crate::gateway::task_queue::TaskStatus>,
) -> Result<Vec<crate::gateway::task_queue::QueuedTask>> {
let read = self.db.begin_read()?;
let table = read.open_table(TASK_QUEUE)?;
let mut tasks = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(v.value())?;
if let Some(ref s) = status {
if task.status != *s {
continue;
}
}
tasks.push(task);
}
Ok(tasks)
}
pub fn cleanup_expired_tasks(&self) -> Result<usize> {
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut expired_ids = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(v.value())?;
if task.is_expired() {
expired_ids.push(task.id);
}
}
let count = expired_ids.len();
for id in &expired_ids {
table.remove(id.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
pub fn has_duplicate(&self, session_key: &str, content_hash: &str) -> Result<bool> {
use crate::gateway::task_queue::TaskStatus;
let read = self.db.begin_read()?;
let table = read.open_table(TASK_QUEUE)?;
for entry in table.iter()? {
let (_k, v) = entry?;
let task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(v.value())?;
if task.session_key == session_key
&& task.content_hash == content_hash
&& task.status == TaskStatus::Pending
{
return Ok(true);
}
}
Ok(false)
}
pub fn merge_into_pending(
&self,
session_key: &str,
message: &crate::gateway::task_queue::QueuedMessage,
) -> Result<bool> {
use crate::gateway::task_queue::TaskStatus;
let write = self.db.begin_write()?;
let merged = {
let mut table = write.open_table(TASK_QUEUE)?;
let mut target_id: Option<String> = None;
for entry in table.iter()? {
let (_k, v) = entry?;
let task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(v.value())?;
if task.session_key == session_key && task.status == TaskStatus::Pending {
target_id = Some(task.id);
break;
}
}
if let Some(id) = target_id {
let guard = table
.get(id.as_str())?
.ok_or_else(|| anyhow::anyhow!("task disappeared: {id}"))?;
let mut task: crate::gateway::task_queue::QueuedTask =
serde_json::from_str(guard.value())?;
drop(guard);
task.messages.push(message.clone());
task.updated_at = chrono::Utc::now().timestamp();
let json = serde_json::to_string(&task)?;
table.insert(id.as_str(), json.as_str())?;
true
} else {
false
}
};
write.commit()?;
Ok(merged)
}
pub fn is_idem_delivered(&self, key: &str) -> Result<bool> {
let read = self.db.begin_read()?;
let table = read.open_table(IDEM_KEYS)?;
Ok(table.get(key)?.is_some())
}
pub fn mark_idem_delivered(&self, key: &str) -> Result<()> {
let now = chrono::Utc::now().timestamp();
let write = self.db.begin_write()?;
{
let mut table = write.open_table(IDEM_KEYS)?;
table.insert(key, now)?;
}
write.commit()?;
Ok(())
}
pub fn cleanup_idem_keys(&self, retention_secs: i64) -> Result<usize> {
let cutoff = chrono::Utc::now().timestamp() - retention_secs;
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(IDEM_KEYS)?;
let mut victims = Vec::new();
for entry in table.iter()? {
let (k, v) = entry?;
if v.value() < cutoff {
victims.push(k.value().to_owned());
}
}
let count = victims.len();
for key in &victims {
table.remove(key.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
pub fn enqueue_external_job(
&self,
job: &crate::gateway::external_jobs::ExternalJob,
) -> Result<()> {
let json = serde_json::to_string(job)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(EXTERNAL_JOBS)?;
table.insert(job.id.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn update_external_job(
&self,
job: &crate::gateway::external_jobs::ExternalJob,
) -> Result<()> {
let json = serde_json::to_string(job)?;
let write = self.db.begin_write()?;
{
let mut table = write.open_table(EXTERNAL_JOBS)?;
if table.get(job.id.as_str())?.is_none() {
anyhow::bail!("external job not found: {}", job.id);
}
table.insert(job.id.as_str(), json.as_str())?;
}
write.commit()?;
Ok(())
}
pub fn get_external_job(
&self,
job_id: &str,
) -> Result<Option<crate::gateway::external_jobs::ExternalJob>> {
let read = self.db.begin_read()?;
let table = read.open_table(EXTERNAL_JOBS)?;
match table.get(job_id)? {
Some(guard) => {
let job = serde_json::from_str(guard.value())?;
Ok(Some(job))
}
None => Ok(None),
}
}
pub fn due_external_jobs(
&self,
now: i64,
) -> Result<Vec<crate::gateway::external_jobs::ExternalJob>> {
let read = self.db.begin_read()?;
let table = read.open_table(EXTERNAL_JOBS)?;
let mut due = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let job: crate::gateway::external_jobs::ExternalJob =
serde_json::from_str(v.value())?;
if job.next_poll_at > now {
continue;
}
let needs_action = matches!(
job.status,
crate::gateway::external_jobs::ExternalJobStatus::Pending
| crate::gateway::external_jobs::ExternalJobStatus::Polling
) || job.needs_delivery();
if needs_action
&& job.delivery_attempts
< crate::gateway::external_jobs::ExternalJob::MAX_DELIVERY_ATTEMPTS
{
due.push(job);
}
}
Ok(due)
}
pub fn delete_external_job(&self, job_id: &str) -> Result<()> {
let write = self.db.begin_write()?;
{
let mut table = write.open_table(EXTERNAL_JOBS)?;
table.remove(job_id)?;
}
write.commit()?;
Ok(())
}
pub fn cleanup_finished_external_jobs(&self, retention_secs: i64) -> Result<usize> {
use crate::gateway::external_jobs::{ExternalJob, ExternalJobStatus};
let cutoff = chrono::Utc::now().timestamp() - retention_secs;
let write = self.db.begin_write()?;
let count = {
let mut table = write.open_table(EXTERNAL_JOBS)?;
let mut victims = Vec::new();
for entry in table.iter()? {
let (_k, v) = entry?;
let job: ExternalJob = serde_json::from_str(v.value())?;
let terminal = matches!(
job.status,
ExternalJobStatus::Done
| ExternalJobStatus::Failed
| ExternalJobStatus::TimedOut
);
let delivery_settled = job.delivered_at.is_some()
|| job.delivery_attempts >= ExternalJob::MAX_DELIVERY_ATTEMPTS;
if terminal && delivery_settled && job.submitted_at < cutoff {
victims.push(job.id);
}
}
let count = victims.len();
for id in &victims {
table.remove(id.as_str())?;
}
count
};
write.commit()?;
Ok(count)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SessionMeta {
pub session_key: String,
pub message_count: u64,
pub last_active: i64, pub created_at: i64,
#[serde(default = "default_generation")]
pub generation: u32,
}
fn default_generation() -> u32 {
1
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum PairingState {
Approved,
Pending { code: String, expires_at: i64 },
}
#[cfg(test)]
mod tests {
use super::*;
fn open_tmp() -> (RedbStore, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let store =
RedbStore::open(&dir.path().join("test.redb"), MemoryTier::Low).expect("open redb");
(store, dir)
}
#[test]
fn session_meta_round_trip() {
let (store, _dir) = open_tmp();
let meta = SessionMeta {
session_key: "agent:main:telegram:direct:u1".to_owned(),
message_count: 5,
last_active: 1_700_000_000,
created_at: 1_699_000_000,
generation: 1,
};
store
.put_session_meta(&meta.session_key, &meta)
.expect("put");
let got = store.get_session_meta(&meta.session_key).expect("get");
assert!(got.is_some());
assert_eq!(got.unwrap().message_count, 5);
}
#[test]
fn append_and_load_messages() {
let (store, _dir) = open_tmp();
let sk = "agent:main:cli:direct:user";
let msg1 = serde_json::json!({"role": "user", "content": "hello"});
let msg2 = serde_json::json!({"role": "assistant", "content": "hi there"});
store.append_message(sk, &msg1).expect("append 1");
store.append_message(sk, &msg2).expect("append 2");
let msgs = store.load_messages(sk).expect("load");
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0]["role"], "user");
assert_eq!(msgs[1]["role"], "assistant");
}
#[test]
fn delete_session_removes_messages() {
let (store, _dir) = open_tmp();
let sk = "agent:main:cli:direct:del_user";
store
.append_message(sk, &serde_json::json!({"role": "user", "content": "x"}))
.expect("append");
store.delete_session(sk).expect("delete");
let msgs = store.load_messages(sk).expect("load after delete");
assert!(msgs.is_empty());
assert!(store.get_session_meta(sk).expect("meta").is_none());
}
#[test]
fn kv_set_get_delete() {
let (store, _dir) = open_tmp();
store.kv_set("my_key", "my_value").expect("set");
assert_eq!(
store.kv_get("my_key").expect("get").as_deref(),
Some("my_value")
);
store.kv_delete("my_key").expect("delete");
assert!(store.kv_get("my_key").expect("get after delete").is_none());
}
#[test]
fn list_sessions() {
let (store, _dir) = open_tmp();
let keys = ["sess:a", "sess:b", "sess:c"];
for k in &keys {
store
.append_message(k, &serde_json::json!({}))
.expect("append");
}
let listed = store.list_sessions().expect("list");
for k in &keys {
assert!(listed.contains(&k.to_string()), "missing {k}");
}
}
}