kaizen/store/
outbox_redb.rs1use anyhow::Result;
5use redb::{Database, ReadableDatabase, ReadableTable, ReadableTableMetadata, TableDefinition};
6use std::path::Path;
7
8const ROWS: TableDefinition<u64, &[u8]> = TableDefinition::new("rows");
9const META: TableDefinition<&str, u64> = TableDefinition::new("meta");
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
12struct Row {
13 owner_id: String,
14 kind: String,
15 payload: String,
16}
17
18pub struct Outbox {
19 db: Database,
20}
21
22impl Outbox {
23 pub fn open(root: &Path) -> Result<Self> {
24 let dir = root.join("hot");
25 std::fs::create_dir_all(&dir)?;
26 let db = Database::create(dir.join("outbox.redb"))?;
27 let tx = db.begin_write()?;
28 tx.open_table(ROWS)?;
29 tx.open_table(META)?;
30 tx.commit()?;
31 Ok(Self { db })
32 }
33
34 pub fn append(&self, owner_id: &str, kind: &str, payload: &str) -> Result<u64> {
35 let tx = self.db.begin_write()?;
36 let id = next_id(&tx)?;
37 let row = serde_json::to_vec(&Row {
38 owner_id: owner_id.into(),
39 kind: kind.into(),
40 payload: payload.into(),
41 })?;
42 tx.open_table(ROWS)?.insert(id, row.as_slice())?;
43 tx.commit()?;
44 Ok(id)
45 }
46
47 pub fn list_pending(&self, limit: usize) -> Result<Vec<(i64, String, String)>> {
48 let tx = self.db.begin_read()?;
49 let table = tx.open_table(ROWS)?;
50 let mut out = Vec::new();
51 for row in table.iter()? {
52 let (id, bytes) = row?;
53 let q: Row = serde_json::from_slice(bytes.value())?;
54 out.push((id.value() as i64, q.kind, q.payload));
55 if out.len() >= limit {
56 break;
57 }
58 }
59 Ok(out)
60 }
61
62 pub fn delete_ids(&self, ids: &[i64]) -> Result<()> {
63 let tx = self.db.begin_write()?;
64 {
65 let mut table = tx.open_table(ROWS)?;
66 for id in ids {
67 table.remove(*id as u64)?;
68 }
69 }
70 tx.commit()?;
71 Ok(())
72 }
73
74 pub fn replace(&self, owner_id: &str, kind: &str, payloads: &[String]) -> Result<()> {
75 let tx = self.db.begin_write()?;
76 let mut delete = Vec::new();
77 {
78 let table = tx.open_table(ROWS)?;
79 for row in table.iter()? {
80 let (id, bytes) = row?;
81 let q: Row = serde_json::from_slice(bytes.value())?;
82 if q.owner_id == owner_id && q.kind == kind {
83 delete.push(id.value());
84 }
85 }
86 }
87 let mut next = next_id_value(&tx)?;
88 {
89 let mut table = tx.open_table(ROWS)?;
90 for id in delete {
91 table.remove(id)?;
92 }
93 for payload in payloads {
94 let row = serde_json::to_vec(&Row {
95 owner_id: owner_id.into(),
96 kind: kind.into(),
97 payload: payload.clone(),
98 })?;
99 table.insert(next, row.as_slice())?;
100 next += 1;
101 }
102 }
103 tx.open_table(META)?.insert("next_id", next)?;
104 tx.commit()?;
105 Ok(())
106 }
107
108 pub fn pending_count(&self) -> Result<u64> {
109 Ok(self.db.begin_read()?.open_table(ROWS)?.len()?)
110 }
111}
112
113fn next_id(tx: &redb::WriteTransaction) -> Result<u64> {
114 let id = next_id_value(tx)?;
115 tx.open_table(META)?.insert("next_id", id + 1)?;
116 Ok(id)
117}
118
119fn next_id_value(tx: &redb::WriteTransaction) -> Result<u64> {
120 Ok(tx
121 .open_table(META)?
122 .get("next_id")?
123 .map(|v| v.value())
124 .unwrap_or(1))
125}