Skip to main content

contextdb_engine/
persistence.rs

1use crate::composite_store::ChangeLogEntry;
2use crate::sync_types::DdlChange;
3use contextdb_core::{AdjEntry, Error, Lsn, Result, TableMeta, VectorEntry, VersionedRow};
4use contextdb_tx::WriteSet;
5use redb::{ReadableDatabase, ReadableTable, TableDefinition};
6use std::collections::HashMap;
7use std::path::Path;
8
9const META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("meta");
10const CONFIG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("config");
11const CHANGE_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("change_log");
12const DDL_LOG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("ddl_log");
13const GRAPH_FWD_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_fwd");
14const GRAPH_REV_TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("graph_rev");
15const VECTORS_TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("vectors");
16
17pub struct RedbPersistence {
18    path: std::path::PathBuf,
19}
20
21impl RedbPersistence {
22    pub fn create(path: &Path) -> Result<Self> {
23        let _db = redb::Database::create(path).map_err(Self::storage_error)?;
24        Self::acquire_pid_lock(path)?;
25        Ok(Self {
26            path: path.to_path_buf(),
27        })
28    }
29
30    pub fn open(path: &Path) -> Result<Self> {
31        Self::acquire_pid_lock(path)?;
32        let _db = redb::Database::open(path).map_err(Self::storage_error)?;
33        Ok(Self {
34            path: path.to_path_buf(),
35        })
36    }
37
38    pub fn close(&self) {
39        Self::release_pid_lock(&self.path);
40    }
41
42    pub fn path(&self) -> &Path {
43        &self.path
44    }
45
46    /// PID-based advisory lock. Writes current PID to a .lock file.
47    /// On open, checks if the .lock file exists and if the PID in it is still alive.
48    fn acquire_pid_lock(path: &Path) -> Result<()> {
49        let lock_path = path.with_extension("lock");
50        if lock_path.exists()
51            && let Ok(contents) = std::fs::read_to_string(&lock_path)
52            && let Ok(pid) = contents.trim().parse::<u32>()
53        {
54            let proc_path = format!("/proc/{}", pid);
55            if std::path::Path::new(&proc_path).exists() && pid != std::process::id() {
56                return Err(Error::Other(
57                    "database is locked (another process may have it open)".to_string(),
58                ));
59            }
60            // Either PID not running (stale lock) or same process — overwrite
61        }
62        std::fs::write(&lock_path, std::process::id().to_string()).map_err(Self::storage_error)?;
63        Ok(())
64    }
65
66    fn release_pid_lock(path: &Path) {
67        let lock_path = path.with_extension("lock");
68        let _ = std::fs::remove_file(&lock_path);
69    }
70
71    pub fn flush_data(&self, ws: &WriteSet) -> Result<()> {
72        self.flush_data_with_logs(ws, &[])
73    }
74
75    pub fn flush_data_with_logs(&self, ws: &WriteSet, change_log: &[ChangeLogEntry]) -> Result<()> {
76        self.with_db(|db| {
77            let write_txn = db.begin_write().map_err(Self::storage_error)?;
78
79            for (table, row) in &ws.relational_inserts {
80                let table_name = Self::rel_table_name(table);
81                let table_def: TableDefinition<u64, &[u8]> =
82                    TableDefinition::new(table_name.as_str());
83                let mut redb_table = write_txn
84                    .open_table(table_def)
85                    .map_err(Self::storage_error)?;
86                let encoded = Self::encode(row)?;
87                redb_table
88                    .insert(row.row_id.0, encoded.as_slice())
89                    .map_err(Self::storage_error)?;
90            }
91
92            for (table, row_id, deleted_tx) in &ws.relational_deletes {
93                let table_name = Self::rel_table_name(table);
94                let table_def: TableDefinition<u64, &[u8]> =
95                    TableDefinition::new(table_name.as_str());
96                let mut redb_table = write_txn
97                    .open_table(table_def)
98                    .map_err(Self::storage_error)?;
99                let bytes = {
100                    let existing = redb_table
101                        .get(row_id.0)
102                        .map_err(Self::storage_error)?
103                        .ok_or_else(|| Error::NotFound(format!("row {row_id} in table {table}")))?;
104                    let bytes: &[u8] = existing.value();
105                    bytes.to_vec()
106                };
107                let mut row: VersionedRow = Self::decode(&bytes)?;
108                row.deleted_tx = Some(*deleted_tx);
109                let encoded = Self::encode(&row)?;
110                redb_table
111                    .insert(row_id.0, encoded.as_slice())
112                    .map_err(Self::storage_error)?;
113            }
114
115            {
116                let mut fwd_table = write_txn
117                    .open_table(GRAPH_FWD_TABLE)
118                    .map_err(Self::storage_error)?;
119                let mut rev_table = write_txn
120                    .open_table(GRAPH_REV_TABLE)
121                    .map_err(Self::storage_error)?;
122
123                for entry in &ws.adj_inserts {
124                    let encoded = Self::encode(entry)?;
125                    let fwd_key = Self::graph_fwd_key(entry);
126                    let rev_key = Self::graph_rev_key(entry);
127                    fwd_table
128                        .insert(fwd_key.as_slice(), encoded.as_slice())
129                        .map_err(Self::storage_error)?;
130                    rev_table
131                        .insert(rev_key.as_slice(), encoded.as_slice())
132                        .map_err(Self::storage_error)?;
133                }
134
135                for (source, edge_type, target, deleted_tx) in &ws.adj_deletes {
136                    let fwd_key = Self::graph_fwd_key_parts(source, target, edge_type);
137                    let rev_key = Self::graph_rev_key_parts(source, target, edge_type);
138
139                    let bytes = {
140                        let fwd_existing = fwd_table
141                            .get(fwd_key.as_slice())
142                            .map_err(Self::storage_error)?
143                            .ok_or_else(|| {
144                                Error::NotFound(format!(
145                                    "edge {source} -[{edge_type}]-> {target} in graph_fwd"
146                                ))
147                            })?;
148                        let bytes: &[u8] = fwd_existing.value();
149                        bytes.to_vec()
150                    };
151                    let mut edge: AdjEntry = Self::decode(&bytes)?;
152                    edge.deleted_tx = Some(*deleted_tx);
153                    let encoded = Self::encode(&edge)?;
154
155                    fwd_table
156                        .insert(fwd_key.as_slice(), encoded.as_slice())
157                        .map_err(Self::storage_error)?;
158                    rev_table
159                        .insert(rev_key.as_slice(), encoded.as_slice())
160                        .map_err(Self::storage_error)?;
161                }
162            }
163
164            {
165                let mut vectors_table = write_txn
166                    .open_table(VECTORS_TABLE)
167                    .map_err(Self::storage_error)?;
168
169                for entry in &ws.vector_inserts {
170                    let encoded = Self::encode(entry)?;
171                    vectors_table
172                        .insert(entry.row_id.0, encoded.as_slice())
173                        .map_err(Self::storage_error)?;
174                }
175
176                for (row_id, deleted_tx) in &ws.vector_deletes {
177                    let bytes = {
178                        let existing = vectors_table
179                            .get(row_id.0)
180                            .map_err(Self::storage_error)?
181                            .ok_or_else(|| Error::NotFound(format!("vector row {row_id}")))?;
182                        let bytes: &[u8] = existing.value();
183                        bytes.to_vec()
184                    };
185                    let mut entry: VectorEntry = Self::decode(&bytes)?;
186                    entry.deleted_tx = Some(*deleted_tx);
187                    let encoded = Self::encode(&entry)?;
188                    vectors_table
189                        .insert(row_id.0, encoded.as_slice())
190                        .map_err(Self::storage_error)?;
191                }
192            }
193
194            if !change_log.is_empty() {
195                let mut table = write_txn
196                    .open_table(CHANGE_LOG_TABLE)
197                    .map_err(Self::storage_error)?;
198                let lsn = ws.commit_lsn.unwrap_or(Lsn(0));
199                for (index, entry) in change_log.iter().enumerate() {
200                    let key = Self::change_log_key(lsn, index);
201                    let encoded = Self::encode(entry)?;
202                    table
203                        .insert(key.as_str(), encoded.as_slice())
204                        .map_err(Self::storage_error)?;
205                }
206            }
207
208            write_txn.commit().map_err(Self::storage_error)?;
209            Ok(())
210        })
211    }
212
213    pub fn flush_table_meta(&self, name: &str, meta: &TableMeta) -> Result<()> {
214        self.with_db(|db| {
215            let write_txn = db.begin_write().map_err(Self::storage_error)?;
216            {
217                let mut meta_table = write_txn
218                    .open_table(META_TABLE)
219                    .map_err(Self::storage_error)?;
220                let key = Self::meta_key(name);
221                let encoded = Self::encode(meta)?;
222                meta_table
223                    .insert(key.as_str(), encoded.as_slice())
224                    .map_err(Self::storage_error)?;
225            }
226            write_txn.commit().map_err(Self::storage_error)?;
227            Ok(())
228        })
229    }
230
231    pub fn remove_table_meta(&self, name: &str) -> Result<()> {
232        self.with_db(|db| {
233            let write_txn = db.begin_write().map_err(Self::storage_error)?;
234            {
235                let mut meta_table = write_txn
236                    .open_table(META_TABLE)
237                    .map_err(Self::storage_error)?;
238                let key = Self::meta_key(name);
239                meta_table
240                    .remove(key.as_str())
241                    .map_err(Self::storage_error)?;
242            }
243            write_txn.commit().map_err(Self::storage_error)?;
244            Ok(())
245        })
246    }
247
248    pub fn flush_config_value<T: serde::Serialize>(&self, key: &str, value: &T) -> Result<()> {
249        self.with_db(|db| {
250            let write_txn = db.begin_write().map_err(Self::storage_error)?;
251            {
252                let mut config_table = write_txn
253                    .open_table(CONFIG_TABLE)
254                    .map_err(Self::storage_error)?;
255                let encoded = Self::encode(value)?;
256                config_table
257                    .insert(key, encoded.as_slice())
258                    .map_err(Self::storage_error)?;
259            }
260            write_txn.commit().map_err(Self::storage_error)?;
261            Ok(())
262        })
263    }
264
265    pub fn remove_config_value(&self, key: &str) -> Result<()> {
266        self.with_db(|db| {
267            let write_txn = db.begin_write().map_err(Self::storage_error)?;
268            {
269                let mut config_table = write_txn
270                    .open_table(CONFIG_TABLE)
271                    .map_err(Self::storage_error)?;
272                config_table.remove(key).map_err(Self::storage_error)?;
273            }
274            write_txn.commit().map_err(Self::storage_error)?;
275            Ok(())
276        })
277    }
278
279    pub fn append_change_log(&self, lsn: Lsn, entries: &[ChangeLogEntry]) -> Result<()> {
280        if entries.is_empty() {
281            return Ok(());
282        }
283        self.with_db(|db| {
284            let write_txn = db.begin_write().map_err(Self::storage_error)?;
285            {
286                let mut table = write_txn
287                    .open_table(CHANGE_LOG_TABLE)
288                    .map_err(Self::storage_error)?;
289                for (index, entry) in entries.iter().enumerate() {
290                    let key = Self::change_log_key(lsn, index);
291                    let encoded = Self::encode(entry)?;
292                    table
293                        .insert(key.as_str(), encoded.as_slice())
294                        .map_err(Self::storage_error)?;
295                }
296            }
297            write_txn.commit().map_err(Self::storage_error)?;
298            Ok(())
299        })
300    }
301
302    pub fn append_ddl_log(&self, lsn: Lsn, change: &DdlChange) -> Result<()> {
303        self.with_db(|db| {
304            let write_txn = db.begin_write().map_err(Self::storage_error)?;
305            {
306                let mut table = write_txn
307                    .open_table(DDL_LOG_TABLE)
308                    .map_err(Self::storage_error)?;
309                let key = Self::ddl_log_key(lsn);
310                let encoded = Self::encode(change)?;
311                table
312                    .insert(key.as_str(), encoded.as_slice())
313                    .map_err(Self::storage_error)?;
314            }
315            write_txn.commit().map_err(Self::storage_error)?;
316            Ok(())
317        })
318    }
319
320    pub fn remove_table_data(&self, name: &str) -> Result<()> {
321        self.with_db(|db| {
322            let write_txn = db.begin_write().map_err(Self::storage_error)?;
323            let table_name = Self::rel_table_name(name);
324            let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
325            let _ = write_txn
326                .delete_table(table_def)
327                .map_err(Self::storage_error)?;
328            write_txn.commit().map_err(Self::storage_error)?;
329            Ok(())
330        })
331    }
332
333    pub fn rewrite_table_rows(&self, name: &str, rows: &[VersionedRow]) -> Result<()> {
334        self.with_db(|db| {
335            let write_txn = db.begin_write().map_err(Self::storage_error)?;
336            {
337                let table_name = Self::rel_table_name(name);
338                let table_def: TableDefinition<u64, &[u8]> =
339                    TableDefinition::new(table_name.as_str());
340                let _ = write_txn.delete_table(table_def);
341                let mut redb_table = write_txn
342                    .open_table(table_def)
343                    .map_err(Self::storage_error)?;
344                for row in rows {
345                    let encoded = Self::encode(row)?;
346                    redb_table
347                        .insert(row.row_id.0, encoded.as_slice())
348                        .map_err(Self::storage_error)?;
349                }
350            }
351            write_txn.commit().map_err(Self::storage_error)?;
352            Ok(())
353        })
354    }
355
356    pub fn rewrite_vectors(&self, vectors: &[VectorEntry]) -> Result<()> {
357        self.with_db(|db| {
358            let write_txn = db.begin_write().map_err(Self::storage_error)?;
359            let _ = write_txn.delete_table(VECTORS_TABLE);
360            {
361                let mut table = write_txn
362                    .open_table(VECTORS_TABLE)
363                    .map_err(Self::storage_error)?;
364                for entry in vectors {
365                    let encoded = Self::encode(entry)?;
366                    table
367                        .insert(entry.row_id.0, encoded.as_slice())
368                        .map_err(Self::storage_error)?;
369                }
370            }
371            write_txn.commit().map_err(Self::storage_error)?;
372            Ok(())
373        })
374    }
375
376    pub fn rewrite_graph_edges(&self, edges: &[AdjEntry]) -> Result<()> {
377        self.with_db(|db| {
378            let write_txn = db.begin_write().map_err(Self::storage_error)?;
379            let _ = write_txn.delete_table(GRAPH_FWD_TABLE);
380            let _ = write_txn.delete_table(GRAPH_REV_TABLE);
381            {
382                let mut fwd_table = write_txn
383                    .open_table(GRAPH_FWD_TABLE)
384                    .map_err(Self::storage_error)?;
385                let mut rev_table = write_txn
386                    .open_table(GRAPH_REV_TABLE)
387                    .map_err(Self::storage_error)?;
388
389                for entry in edges {
390                    let encoded = Self::encode(entry)?;
391                    let fwd_key = Self::graph_fwd_key(entry);
392                    let rev_key = Self::graph_rev_key(entry);
393                    fwd_table
394                        .insert(fwd_key.as_slice(), encoded.as_slice())
395                        .map_err(Self::storage_error)?;
396                    rev_table
397                        .insert(rev_key.as_slice(), encoded.as_slice())
398                        .map_err(Self::storage_error)?;
399                }
400            }
401            write_txn.commit().map_err(Self::storage_error)?;
402            Ok(())
403        })
404    }
405
406    pub fn load_all_table_meta(&self) -> Result<HashMap<String, TableMeta>> {
407        self.with_db(|db| {
408            let read_txn = db.begin_read().map_err(Self::storage_error)?;
409            let meta_table = match read_txn.open_table(META_TABLE) {
410                Ok(table) => table,
411                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(HashMap::new()),
412                Err(err) => return Err(Self::storage_error(err)),
413            };
414
415            let mut tables = HashMap::new();
416            for entry in meta_table.iter().map_err(Self::storage_error)? {
417                let (key, value) = entry.map_err(Self::storage_error)?;
418                let key = key.value();
419                if let Some(name) = key.strip_prefix("table:") {
420                    tables.insert(name.to_string(), Self::decode(value.value())?);
421                }
422            }
423            Ok(tables)
424        })
425    }
426
427    pub fn load_config_value<T: serde::de::DeserializeOwned>(
428        &self,
429        key: &str,
430    ) -> Result<Option<T>> {
431        self.with_db(|db| {
432            let read_txn = db.begin_read().map_err(Self::storage_error)?;
433            let config_table = match read_txn.open_table(CONFIG_TABLE) {
434                Ok(table) => table,
435                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None),
436                Err(err) => return Err(Self::storage_error(err)),
437            };
438            let value = match config_table.get(key).map_err(Self::storage_error)? {
439                Some(value) => Some(Self::decode(value.value())?),
440                None => None,
441            };
442            Ok(value)
443        })
444    }
445
446    pub fn load_relational_table(&self, name: &str) -> Result<Vec<VersionedRow>> {
447        self.with_db(|db| {
448            let read_txn = db.begin_read().map_err(Self::storage_error)?;
449            let table_name = Self::rel_table_name(name);
450            let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new(table_name.as_str());
451            let table = match read_txn.open_table(table_def) {
452                Ok(table) => table,
453                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
454                Err(err) => return Err(Self::storage_error(err)),
455            };
456
457            let mut rows = Vec::new();
458            for entry in table.iter().map_err(Self::storage_error)? {
459                let (_, value) = entry.map_err(Self::storage_error)?;
460                rows.push(Self::decode(value.value())?);
461            }
462            Ok(rows)
463        })
464    }
465
466    pub fn load_all_tables(&self) -> Result<HashMap<String, Vec<VersionedRow>>> {
467        let mut all_tables = HashMap::new();
468        for name in self.load_all_table_meta()?.into_keys() {
469            let rows = self.load_relational_table(&name)?;
470            all_tables.insert(name, rows);
471        }
472        Ok(all_tables)
473    }
474
475    pub fn load_forward_edges(&self) -> Result<Vec<AdjEntry>> {
476        self.load_graph_table(GRAPH_FWD_TABLE)
477    }
478
479    pub fn load_reverse_edges(&self) -> Result<Vec<AdjEntry>> {
480        self.load_graph_table(GRAPH_REV_TABLE)
481    }
482
483    pub fn load_vectors(&self) -> Result<Vec<VectorEntry>> {
484        self.with_db(|db| {
485            let read_txn = db.begin_read().map_err(Self::storage_error)?;
486            let table = match read_txn.open_table(VECTORS_TABLE) {
487                Ok(table) => table,
488                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
489                Err(err) => return Err(Self::storage_error(err)),
490            };
491
492            let mut vectors = Vec::new();
493            for entry in table.iter().map_err(Self::storage_error)? {
494                let (_, value) = entry.map_err(Self::storage_error)?;
495                vectors.push(Self::decode(value.value())?);
496            }
497            Ok(vectors)
498        })
499    }
500
501    pub fn load_change_log(&self) -> Result<Vec<ChangeLogEntry>> {
502        self.with_db(|db| {
503            let read_txn = db.begin_read().map_err(Self::storage_error)?;
504            let table = match read_txn.open_table(CHANGE_LOG_TABLE) {
505                Ok(table) => table,
506                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
507                Err(err) => return Err(Self::storage_error(err)),
508            };
509
510            let mut entries = Vec::new();
511            for entry in table.iter().map_err(Self::storage_error)? {
512                let (_, value) = entry.map_err(Self::storage_error)?;
513                entries.push(Self::decode(value.value())?);
514            }
515            Ok(entries)
516        })
517    }
518
519    pub fn load_ddl_log(&self) -> Result<Vec<(Lsn, DdlChange)>> {
520        self.with_db(|db| {
521            let read_txn = db.begin_read().map_err(Self::storage_error)?;
522            let table = match read_txn.open_table(DDL_LOG_TABLE) {
523                Ok(table) => table,
524                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
525                Err(err) => return Err(Self::storage_error(err)),
526            };
527
528            let mut entries = Vec::new();
529            for entry in table.iter().map_err(Self::storage_error)? {
530                let (key, value) = entry.map_err(Self::storage_error)?;
531                let lsn = key
532                    .value()
533                    .parse::<u64>()
534                    .map_err(|err| Error::Other(format!("invalid ddl log key: {err}")))?;
535                entries.push((Lsn(lsn), Self::decode(value.value())?));
536            }
537            Ok(entries)
538        })
539    }
540
541    fn load_graph_table(&self, definition: TableDefinition<&[u8], &[u8]>) -> Result<Vec<AdjEntry>> {
542        self.with_db(|db| {
543            let read_txn = db.begin_read().map_err(Self::storage_error)?;
544            let table = match read_txn.open_table(definition) {
545                Ok(table) => table,
546                Err(redb::TableError::TableDoesNotExist(_)) => return Ok(Vec::new()),
547                Err(err) => return Err(Self::storage_error(err)),
548            };
549
550            let mut entries = Vec::new();
551            for entry in table.iter().map_err(Self::storage_error)? {
552                let (_, value) = entry.map_err(Self::storage_error)?;
553                entries.push(Self::decode(value.value())?);
554            }
555            Ok(entries)
556        })
557    }
558
559    fn rel_table_name(name: &str) -> String {
560        format!("rel_{name}")
561    }
562
563    fn meta_key(name: &str) -> String {
564        format!("table:{name}")
565    }
566
567    fn change_log_key(lsn: Lsn, index: usize) -> String {
568        format!("{:020}:{index:06}", lsn.0)
569    }
570
571    fn ddl_log_key(lsn: Lsn) -> String {
572        format!("{:020}", lsn.0)
573    }
574
575    fn graph_fwd_key(entry: &AdjEntry) -> Vec<u8> {
576        Self::graph_fwd_key_parts(&entry.source, &entry.target, &entry.edge_type)
577    }
578
579    fn graph_rev_key(entry: &AdjEntry) -> Vec<u8> {
580        Self::graph_rev_key_parts(&entry.source, &entry.target, &entry.edge_type)
581    }
582
583    fn graph_fwd_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
584        let mut key = Vec::with_capacity(32 + edge_type.len());
585        key.extend_from_slice(source.as_bytes());
586        key.extend_from_slice(target.as_bytes());
587        key.extend_from_slice(edge_type.as_bytes());
588        key
589    }
590
591    fn graph_rev_key_parts(source: &uuid::Uuid, target: &uuid::Uuid, edge_type: &str) -> Vec<u8> {
592        let mut key = Vec::with_capacity(32 + edge_type.len());
593        key.extend_from_slice(target.as_bytes());
594        key.extend_from_slice(source.as_bytes());
595        key.extend_from_slice(edge_type.as_bytes());
596        key
597    }
598
599    fn encode<T: serde::Serialize>(value: &T) -> Result<Vec<u8>> {
600        bincode::serde::encode_to_vec(value, bincode::config::standard())
601            .map_err(|err| Error::Other(format!("bincode encode error: {err}")))
602    }
603
604    fn decode<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T> {
605        let (value, _) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())
606            .map_err(|err| Error::Other(format!("bincode decode error: {err}")))?;
607        Ok(value)
608    }
609
610    fn storage_error(err: impl std::fmt::Display) -> Error {
611        let msg = err.to_string();
612        if msg.contains("lock") || msg.contains("already open") {
613            Error::Other(format!(
614                "database is locked (another process may have it open): {msg}"
615            ))
616        } else {
617            Error::Other(format!("redb error: {msg}"))
618        }
619    }
620
621    fn with_db<T>(&self, f: impl FnOnce(&redb::Database) -> Result<T>) -> Result<T> {
622        let db = redb::Database::open(&self.path).map_err(Self::storage_error)?;
623        f(&db)
624    }
625}
626
627impl Drop for RedbPersistence {
628    fn drop(&mut self) {
629        Self::release_pid_lock(&self.path);
630    }
631}