balius_runtime/kv/
redb.rs1use std::{path::Path, sync::Arc};
3
4use crate::wit::balius::app::kv as wit;
5use redb::{Database, Durability, ReadableTable, TableDefinition};
6use tracing::warn;
7use wit::{KvError, Payload};
8
9use super::KvProvider;
10use crate::Error;
11
12#[derive(Clone)]
13pub struct RedbKv {
14 db: Arc<Database>,
15}
16
17impl RedbKv {
18 pub const DEF: TableDefinition<'static, String, Vec<u8>> = TableDefinition::new("kv");
19 pub fn try_new(path: impl AsRef<Path>, cache_size: Option<usize>) -> Result<Self, Error> {
20 let db = Database::builder()
21 .set_repair_callback(|x| warn!(progress = x.progress() * 100f64, "db is repairing"))
22 .set_cache_size(1024 * 1024 * cache_size.unwrap_or(10_000))
23 .create(path)
24 .map_err(|err| Error::KvError(err.to_string()))?;
25
26 let mut wx = db
27 .begin_write()
28 .map_err(|err| Error::KvError(err.to_string()))?;
29 wx.set_durability(Durability::Immediate);
30 wx.open_table(Self::DEF)
31 .map_err(|err| Error::KvError(err.to_string()))?;
32 wx.commit().map_err(|err| Error::KvError(err.to_string()))?;
33
34 Ok(Self { db: Arc::new(db) })
35 }
36
37 pub fn key_for_worker(worker_id: &str, key: &str) -> String {
38 format!("{worker_id}-{key}")
39 }
40
41 pub fn into_ephemeral(&mut self) -> Result<Self, Error> {
42 let new_db = redb::Database::builder()
43 .create_with_backend(redb::backends::InMemoryBackend::new())
44 .map_err(|e| Error::KvError(e.to_string()))?;
45
46 let rx = self.db.begin_read()?;
47 let wx = new_db.begin_write()?;
48
49 {
50 if let Ok(source) = rx.open_table(Self::DEF) {
51 let mut target = wx
52 .open_table(Self::DEF)
53 .map_err(|e| Error::KvError(e.to_string()))?;
54
55 for entry in source.iter().map_err(|e| Error::KvError(e.to_string()))? {
56 let (k, v) = entry.map_err(|e| Error::KvError(e.to_string()))?;
57 target
58 .insert(k.value(), v.value())
59 .map_err(|e| Error::KvError(e.to_string()))?;
60 }
61 };
62 }
63
64 wx.commit().map_err(|e| Error::KvError(e.to_string()))?;
65
66 let new = Self {
67 db: Arc::new(new_db),
68 };
69
70 Ok(new)
71 }
72}
73
74#[async_trait::async_trait]
75impl KvProvider for RedbKv {
76 async fn get_value(&mut self, worker_id: &str, key: String) -> Result<Payload, KvError> {
77 let rx = self
78 .db
79 .begin_read()
80 .map_err(|err| KvError::Internal(err.to_string()))?;
81
82 let table = rx
83 .open_table(Self::DEF)
84 .map_err(|err| KvError::Internal(err.to_string()))?;
85 match table
86 .get(Self::key_for_worker(worker_id, &key))
87 .map_err(|err| KvError::Internal(err.to_string()))?
88 {
89 Some(value) => Ok(value.value()),
90 None => Err(KvError::NotFound(key)),
91 }
92 }
93
94 async fn set_value(
95 &mut self,
96 worker_id: &str,
97 key: String,
98 value: Payload,
99 ) -> Result<(), KvError> {
100 let wx = self
101 .db
102 .begin_write()
103 .map_err(|err| KvError::Internal(err.to_string()))?;
104
105 {
106 let mut table = wx
107 .open_table(Self::DEF)
108 .map_err(|err| KvError::Internal(err.to_string()))?;
109
110 table
111 .insert(Self::key_for_worker(worker_id, &key), value)
112 .map_err(|err| KvError::Internal(err.to_string()))?;
113 }
114
115 wx.commit()
116 .map_err(|err| KvError::Internal(err.to_string()))?;
117
118 Ok(())
119 }
120
121 async fn list_values(
122 &mut self,
123 worker_id: &str,
124 prefix: String,
125 ) -> Result<Vec<String>, KvError> {
126 let rx = self
127 .db
128 .begin_read()
129 .map_err(|err| KvError::Internal(err.to_string()))?;
130
131 let table = rx
132 .open_table(Self::DEF)
133 .map_err(|err| KvError::Internal(err.to_string()))?;
134
135 let mut result = vec![];
136 let range = table
137 .range(Self::key_for_worker(worker_id, &prefix)..)
138 .map_err(|err| KvError::Internal(err.to_string()))?;
139
140 for item in range {
141 let (k, _) = item.unwrap();
142 if k.value()
143 .starts_with(&Self::key_for_worker(worker_id, &prefix))
144 {
145 result.push(k.value());
146 } else {
147 break;
148 }
149 }
150 Ok(result)
151 }
152}