1use std::path::Path;
37use std::sync::Arc;
38
39use redb::{Database, ReadableTable, TableDefinition};
40
41use crate::backend::StorageBackend;
42use crate::codec::{Codec, JsonCodec};
43use crate::error::StorageError;
44use crate::memory::{
45 append_log_storage, kv_storage, snapshot_storage, AppendLogStorage, AppendLogStorageOptions,
46 KvStorage, KvStorageOptions, SnapshotStorage, SnapshotStorageOptions,
47};
48
49use serde::{de::DeserializeOwned, Serialize};
50
51const TABLE: TableDefinition<'_, &str, &[u8]> = TableDefinition::new("graphrefly");
54
55fn map_err(e: impl std::error::Error + Send + Sync + 'static) -> StorageError {
57 StorageError::BackendError {
58 message: e.to_string(),
59 source: Some(Box::new(e)),
60 }
61}
62
63pub struct RedbBackend {
79 db: Database,
80 name: String,
81}
82
83impl std::fmt::Debug for RedbBackend {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 f.debug_struct("RedbBackend")
87 .field("name", &self.name)
88 .finish_non_exhaustive()
89 }
90}
91
92impl RedbBackend {
93 pub fn new(path: impl AsRef<Path>) -> Result<Self, StorageError> {
100 let path = path.as_ref();
101 let db = Database::create(path).map_err(map_err)?;
102 let name = format!("redb:{}", path.display());
103 Ok(Self { db, name })
104 }
105
106 #[must_use]
108 pub fn name_str(&self) -> &str {
109 &self.name
110 }
111}
112
113impl StorageBackend for RedbBackend {
114 fn name(&self) -> &str {
115 &self.name
116 }
117
118 fn read(&self, key: &str) -> Result<Option<Vec<u8>>, StorageError> {
119 let txn = self.db.begin_read().map_err(map_err)?;
120 let table = match txn.open_table(TABLE) {
121 Ok(t) => t,
122 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
124 Err(e) => return Err(map_err(e)),
125 };
126 match table.get(key).map_err(map_err)? {
127 Some(guard) => Ok(Some(guard.value().to_vec())),
128 None => Ok(None),
129 }
130 }
131
132 fn write(&self, key: &str, bytes: &[u8]) -> Result<(), StorageError> {
133 let txn = self.db.begin_write().map_err(map_err)?;
134 {
135 let mut table = txn.open_table(TABLE).map_err(map_err)?;
136 table.insert(key, bytes).map_err(map_err)?;
137 }
138 txn.commit().map_err(map_err)?;
139 Ok(())
140 }
141
142 fn delete(&self, key: &str) -> Result<(), StorageError> {
143 let txn = self.db.begin_write().map_err(map_err)?;
144 {
145 let mut table = match txn.open_table(TABLE) {
146 Ok(t) => t,
147 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(()),
149 Err(e) => return Err(map_err(e)),
150 };
151 let _ = table.remove(key).map_err(map_err)?;
153 }
154 txn.commit().map_err(map_err)?;
155 Ok(())
156 }
157
158 fn list(&self, prefix: &str) -> Result<Vec<String>, StorageError> {
159 let txn = self.db.begin_read().map_err(map_err)?;
160 let table = match txn.open_table(TABLE) {
161 Ok(t) => t,
162 Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
163 Err(e) => return Err(map_err(e)),
164 };
165
166 let mut keys = Vec::new();
167 if prefix.is_empty() {
168 let iter = table.iter().map_err(map_err)?;
170 for entry in iter {
171 let entry = entry.map_err(map_err)?;
172 keys.push(entry.0.value().to_string());
173 }
174 } else {
175 let iter = table.range(prefix..).map_err(map_err)?;
178 for entry in iter {
179 let entry = entry.map_err(map_err)?;
180 let k = entry.0.value();
181 if !k.starts_with(prefix) {
182 break;
183 }
184 keys.push(k.to_string());
185 }
186 }
187 Ok(keys)
188 }
189
190 fn flush(&self) -> Result<(), StorageError> {
191 Ok(())
193 }
194}
195
196pub fn redb_backend(path: impl AsRef<Path>) -> Result<Arc<RedbBackend>, StorageError> {
208 Ok(Arc::new(RedbBackend::new(path)?))
209}
210
211pub fn redb_snapshot<T, C>(
223 path: impl AsRef<Path>,
224 opts: SnapshotStorageOptions<T, C>,
225) -> Result<SnapshotStorage<RedbBackend, T, C>, StorageError>
226where
227 T: Send + Sync + 'static,
228 C: Codec<T>,
229{
230 Ok(snapshot_storage(redb_backend(path)?, opts))
231}
232
233pub fn redb_snapshot_default<T>(
239 path: impl AsRef<Path>,
240) -> Result<SnapshotStorage<RedbBackend, T, JsonCodec>, StorageError>
241where
242 T: Serialize + DeserializeOwned + Send + Sync + 'static,
243{
244 redb_snapshot(path, SnapshotStorageOptions::default())
245}
246
247pub fn redb_append_log<T, C>(
257 path: impl AsRef<Path>,
258 opts: AppendLogStorageOptions<T, C>,
259) -> Result<AppendLogStorage<RedbBackend, T, C>, StorageError>
260where
261 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
262 C: Codec<Vec<T>>,
263{
264 Ok(append_log_storage(redb_backend(path)?, opts))
265}
266
267pub fn redb_append_log_default<T>(
273 path: impl AsRef<Path>,
274) -> Result<AppendLogStorage<RedbBackend, T, JsonCodec>, StorageError>
275where
276 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
277{
278 redb_append_log(path, AppendLogStorageOptions::default())
279}
280
281pub fn redb_kv<T, C>(
291 path: impl AsRef<Path>,
292 opts: KvStorageOptions<T, C>,
293) -> Result<KvStorage<RedbBackend, T, C>, StorageError>
294where
295 T: Send + Sync + 'static,
296 C: Codec<T>,
297{
298 Ok(kv_storage(redb_backend(path)?, opts))
299}
300
301pub fn redb_kv_default<T>(
307 path: impl AsRef<Path>,
308) -> Result<KvStorage<RedbBackend, T, JsonCodec>, StorageError>
309where
310 T: Serialize + DeserializeOwned + Send + Sync + 'static,
311{
312 redb_kv(path, KvStorageOptions::default())
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use tempfile::TempDir;
319
320 fn tmp_db() -> (TempDir, Arc<RedbBackend>) {
321 let dir = TempDir::new().unwrap();
322 let path = dir.path().join("test.redb");
323 let backend = redb_backend(&path).unwrap();
324 (dir, backend)
325 }
326
327 #[test]
328 fn read_write_round_trip() {
329 let (_dir, b) = tmp_db();
330 b.write("k1", b"hello").unwrap();
331 assert_eq!(b.read("k1").unwrap(), Some(b"hello".to_vec()));
332 }
333
334 #[test]
335 fn read_miss_returns_none() {
336 let (_dir, b) = tmp_db();
337 assert!(b.read("nope").unwrap().is_none());
338 }
339
340 #[test]
341 fn read_from_empty_database_returns_none() {
342 let (_dir, b) = tmp_db();
343 assert!(b.read("anything").unwrap().is_none());
345 }
346
347 #[test]
348 fn delete_removes_key() {
349 let (_dir, b) = tmp_db();
350 b.write("k", b"v").unwrap();
351 b.delete("k").unwrap();
352 assert!(b.read("k").unwrap().is_none());
353 }
354
355 #[test]
356 fn delete_nonexistent_key_is_ok() {
357 let (_dir, b) = tmp_db();
358 b.delete("nope").unwrap();
360 b.write("other", b"v").unwrap();
362 b.delete("nope").unwrap();
363 }
364
365 #[test]
366 fn list_lex_asc() {
367 let (_dir, b) = tmp_db();
368 b.write("g/10", b"a").unwrap();
369 b.write("g/02", b"b").unwrap();
370 b.write("g/01", b"c").unwrap();
371 b.write("other", b"d").unwrap();
372 let keys = b.list("g/").unwrap();
373 assert_eq!(keys, vec!["g/01", "g/02", "g/10"]);
374 }
375
376 #[test]
377 fn list_empty_prefix_returns_all_sorted() {
378 let (_dir, b) = tmp_db();
379 b.write("b", b"y").unwrap();
380 b.write("a", b"x").unwrap();
381 let keys = b.list("").unwrap();
382 assert_eq!(keys, vec!["a", "b"]);
383 }
384
385 #[test]
386 fn list_empty_database_returns_empty() {
387 let (_dir, b) = tmp_db();
388 let keys = b.list("g/").unwrap();
389 assert!(keys.is_empty());
390 }
391
392 #[test]
393 fn write_overwrites_existing_key() {
394 let (_dir, b) = tmp_db();
395 b.write("k", b"v1").unwrap();
396 b.write("k", b"v2").unwrap();
397 assert_eq!(b.read("k").unwrap(), Some(b"v2".to_vec()));
398 }
399
400 #[test]
401 fn name_includes_path() {
402 let dir = TempDir::new().unwrap();
403 let path = dir.path().join("test.redb");
404 let b = RedbBackend::new(&path).unwrap();
405 assert!(b.name().starts_with("redb:"));
406 assert!(b.name().contains("test.redb"));
407 }
408
409 #[test]
410 fn flush_is_noop() {
411 let (_dir, b) = tmp_db();
412 b.write("k", b"v").unwrap();
413 b.flush().unwrap();
414 assert_eq!(b.read("k").unwrap(), Some(b"v".to_vec()));
415 }
416
417 #[test]
418 fn data_survives_reopen() {
419 let dir = TempDir::new().unwrap();
420 let path = dir.path().join("persist.redb");
421 {
422 let b = RedbBackend::new(&path).unwrap();
423 b.write("k1", b"durable").unwrap();
424 }
425 let b2 = RedbBackend::new(&path).unwrap();
427 assert_eq!(b2.read("k1").unwrap(), Some(b"durable".to_vec()));
428 }
429
430 #[test]
431 fn concurrent_readers_and_writer() {
432 let (_dir, b) = tmp_db();
433 b.write("k", b"v1").unwrap();
434
435 let v = b.read("k").unwrap();
437 assert_eq!(v, Some(b"v1".to_vec()));
438 b.write("k", b"v2").unwrap();
439 let v2 = b.read("k").unwrap();
440 assert_eq!(v2, Some(b"v2".to_vec()));
441 }
442
443 #[test]
444 fn arc_factory_shares_backend() {
445 let dir = TempDir::new().unwrap();
446 let path = dir.path().join("shared.redb");
447 let b1 = redb_backend(&path).unwrap();
448 let b2 = Arc::clone(&b1);
450 b1.write("k", b"from_b1").unwrap();
451 assert_eq!(b2.read("k").unwrap(), Some(b"from_b1".to_vec()));
452 }
453}