Skip to main content

kaizen/store/
outbox_redb.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! redb sync outbox: append + drain queue, one writer, many readers.
3
4use anyhow::Result;
5use redb::{Database, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition};
6use std::path::Path;
7
8const ROWS: TableDefinition<u64, &[u8]> = TableDefinition::new("rows");
9const META: TableDefinition<&str, u64> = TableDefinition::new("meta");
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
12struct Row {
13    owner_id: String,
14    kind: String,
15    payload: String,
16}
17
18pub struct Outbox {
19    db: Database,
20}
21
22impl Outbox {
23    pub fn open(root: &Path) -> Result<Self> {
24        let dir = root.join("hot");
25        std::fs::create_dir_all(&dir)?;
26        let db = Database::create(dir.join("outbox.redb"))?;
27        let tx = db.begin_write()?;
28        tx.open_table(ROWS)?;
29        tx.open_table(META)?;
30        tx.commit()?;
31        Ok(Self { db })
32    }
33
34    pub fn append(&self, owner_id: &str, kind: &str, payload: &str) -> Result<u64> {
35        let tx = self.db.begin_write()?;
36        let id = next_id(&tx)?;
37        let row = serde_json::to_vec(&Row {
38            owner_id: owner_id.into(),
39            kind: kind.into(),
40            payload: payload.into(),
41        })?;
42        tx.open_table(ROWS)?.insert(id, row.as_slice())?;
43        tx.commit()?;
44        Ok(id)
45    }
46
47    pub fn list_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
48        let tx = self.db.begin_read()?;
49        let table = tx.open_table(ROWS)?;
50        let mut out = Vec::new();
51        for row in table.iter()? {
52            let (id, bytes) = row?;
53            let q: Row = serde_json::from_slice(bytes.value())?;
54            out.push((id.value() as i64, q.kind, q.payload));
55            if out.len() >= limit {
56                break;
57            }
58        }
59        Ok(out)
60    }
61
62    pub fn delete_ids(&self, ids: &[i64]) -> Result<()> {
63        let tx = self.db.begin_write()?;
64        {
65            let mut table = tx.open_table(ROWS)?;
66            for id in ids {
67                table.remove(*id as u64)?;
68            }
69        }
70        tx.commit()?;
71        Ok(())
72    }
73
74    pub fn replace(&self, owner_id: &str, kind: &str, payloads: &[String]) -> Result<()> {
75        let tx = self.db.begin_write()?;
76        let mut delete = Vec::new();
77        {
78            let table = tx.open_table(ROWS)?;
79            for row in table.iter()? {
80                let (id, bytes) = row?;
81                let q: Row = serde_json::from_slice(bytes.value())?;
82                if q.owner_id == owner_id && q.kind == kind {
83                    delete.push(id.value());
84                }
85            }
86        }
87        let mut next = next_id_value(&tx)?;
88        {
89            let mut table = tx.open_table(ROWS)?;
90            for id in delete {
91                table.remove(id)?;
92            }
93            for payload in payloads {
94                let row = serde_json::to_vec(&Row {
95                    owner_id: owner_id.into(),
96                    kind: kind.into(),
97                    payload: payload.clone(),
98                })?;
99                table.insert(next, row.as_slice())?;
100                next += 1;
101            }
102        }
103        tx.open_table(META)?.insert("next_id", next)?;
104        tx.commit()?;
105        Ok(())
106    }
107
108    pub fn pending_count(&self) -> Result<u64> {
109        Ok(self.db.begin_read()?.open_table(ROWS)?.len()?)
110    }
111}
112
113fn next_id(tx: &redb::WriteTransaction) -> Result<u64> {
114    let id = next_id_value(tx)?;
115    tx.open_table(META)?.insert("next_id", id + 1)?;
116    Ok(id)
117}
118
119fn next_id_value(tx: &redb::WriteTransaction) -> Result<u64> {
120    Ok(tx
121        .open_table(META)?
122        .get("next_id")?
123        .map(|v| v.value())
124        .unwrap_or(1))
125}