keri_core/database/redb/
escrow_database.rs

1use std::{
2    sync::Arc,
3    time::{SystemTime, UNIX_EPOCH},
4};
5
6use redb::{Database, MultimapTableDefinition, TableDefinition};
7use said::SelfAddressingIdentifier;
8
9use crate::{
10    database::{EscrowCreator, EscrowDatabase, LogDatabase as _, SequencedEventDatabase},
11    event::KeyEvent,
12    event_message::{msg::KeriEvent, signed_event_message::SignedEventMessage},
13    prefix::IdentifierPrefix,
14};
15
16use super::{rkyv_adapter, LogDatabase, RedbDatabase, RedbError};
17
18impl EscrowCreator for RedbDatabase {
19    type EscrowDatabaseType = SnKeyEscrow;
20
21    fn create_escrow_db(&self, table_name: &'static str) -> Self::EscrowDatabaseType {
22        SnKeyEscrow::new(
23            Arc::new(SnKeyDatabase::new(self.db.clone(), table_name).unwrap()),
24            self.log_db.clone(),
25        )
26    }
27}
28
29pub struct SnKeyEscrow {
30    escrow: Arc<
31        dyn SequencedEventDatabase<
32            DatabaseType = redb::Database,
33            Error = RedbError,
34            DigestIter = Box<dyn Iterator<Item = said::SelfAddressingIdentifier>>,
35        >,
36    >,
37    log: Arc<LogDatabase>,
38}
39
40impl crate::database::EscrowDatabase for SnKeyEscrow {
41    type EscrowDatabaseType = redb::Database;
42    type LogDatabaseType = LogDatabase;
43    type Error = RedbError;
44    type EventIter = Box<dyn Iterator<Item = SignedEventMessage> + Send>;
45
46    fn new(
47        escrow: Arc<
48            dyn SequencedEventDatabase<
49                DatabaseType = Self::EscrowDatabaseType,
50                Error = Self::Error,
51                DigestIter = Box<dyn Iterator<Item = said::SelfAddressingIdentifier>>,
52            >,
53        >,
54        log: Arc<LogDatabase>,
55    ) -> Self
56    where
57        Self: Sized,
58    {
59        Self { escrow, log }
60    }
61
62    fn save_digest(
63        &self,
64        id: &IdentifierPrefix,
65        sn: u64,
66        event_digest: &SelfAddressingIdentifier,
67    ) -> Result<(), RedbError> {
68        self.escrow.insert(id, sn, event_digest)?;
69
70        Ok(())
71    }
72
73    fn insert(&self, event: &SignedEventMessage) -> Result<(), RedbError> {
74        self.log
75            .log_event(&crate::database::redb::WriteTxnMode::CreateNew, &event)?;
76        let said = event.event_message.digest().unwrap();
77        let id = event.event_message.data.get_prefix();
78        let sn = event.event_message.data.sn;
79        self.escrow.insert(&id, sn, &said)?;
80
81        Ok(())
82    }
83
84    fn insert_key_value(
85        &self,
86        id: &IdentifierPrefix,
87        sn: u64,
88        event: &SignedEventMessage,
89    ) -> Result<(), RedbError> {
90        self.log
91            .log_event(&crate::database::redb::WriteTxnMode::CreateNew, &event)?;
92        let said = event.event_message.digest().unwrap();
93
94        self.escrow.insert(&id, sn, &said)?;
95
96        Ok(())
97    }
98
99    fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result<Self::EventIter, Self::Error> {
100        let saids = self.escrow.get(identifier, sn)?;
101        let saids_vec: Vec<_> = saids.collect();
102
103        let log = Arc::clone(&self.log);
104
105        let events = saids_vec.into_iter().filter_map(move |said| {
106            log.get_signed_event(&said)
107                .ok()
108                .flatten()
109                .map(|el| el.signed_event_message)
110        });
111
112        Ok(Box::new(events))
113    }
114
115    fn get_from_sn(
116        &self,
117        identifier: &IdentifierPrefix,
118        sn: u64,
119    ) -> Result<Self::EventIter, Self::Error> {
120        let saids = self.escrow.get_greater_than(identifier, sn)?;
121        let saids_vec: Vec<_> = saids.collect();
122
123        let log = Arc::clone(&self.log);
124
125        let events = saids_vec.into_iter().filter_map(move |said| {
126            log.get_signed_event(&said)
127                .ok()
128                .flatten()
129                .map(|el| el.signed_event_message)
130        });
131
132        Ok(Box::new(events))
133    }
134
135    fn remove(&self, event: &KeriEvent<KeyEvent>) {
136        let said = event.digest().unwrap();
137        let id = event.data.get_prefix();
138        let sn = event.data.sn;
139        self.escrow.remove(&id, sn, &said).unwrap();
140    }
141
142    fn contains(
143        &self,
144        id: &IdentifierPrefix,
145        sn: u64,
146        digest: &SelfAddressingIdentifier,
147    ) -> Result<bool, RedbError> {
148        Ok(self
149            .escrow
150            .get(id, sn)?
151            .find(|said| said == digest)
152            .is_some())
153    }
154}
155
156/// Storage for digests of escrowed events.
157/// The digest of an escrowed event can be used to retrieve the full event from the `LogDatabase`.  
158/// The storage is indexed by a tuple of (identifier, sn), with the value being the event's digest.
159pub struct SnKeyDatabase {
160    db: Arc<Database>,
161    /// Escrowed events. (identifier, sn) -> event digest
162    /// Table links an identifier and sequence number to the digest of an event,
163    /// referencing the actual event stored in the `EVENTS` table in EventDatabase.
164    sn_key_table: MultimapTableDefinition<'static, (&'static str, u64), &'static [u8]>,
165    /// Timestamps. digest -> timestamp
166    /// Table links digest of an event witch time when an event was saved in the database.
167    dts_table: TableDefinition<'static, &'static [u8], u64>,
168}
169
170impl SequencedEventDatabase for SnKeyDatabase {
171    type DatabaseType = redb::Database;
172    type Error = RedbError;
173    type DigestIter = Box<dyn Iterator<Item = SelfAddressingIdentifier>>;
174
175    fn new(db: Arc<Self::DatabaseType>, table_name: &'static str) -> Result<Self, RedbError> {
176        // Create tables
177        let pse = MultimapTableDefinition::new(table_name);
178        let dts = TableDefinition::new("timestamps_escrow");
179
180        let write_txn = db.begin_write()?;
181        {
182            write_txn.open_multimap_table(pse)?;
183            write_txn.open_table(dts)?;
184        }
185        write_txn.commit()?;
186        Ok(Self {
187            db,
188            sn_key_table: pse,
189            dts_table: dts,
190        })
191    }
192
193    fn insert(
194        &self,
195        identifier: &IdentifierPrefix,
196        sn: u64,
197        digest: &SelfAddressingIdentifier,
198    ) -> Result<(), RedbError> {
199        let write_txn = self.db.begin_write()?;
200        {
201            let mut table = (&write_txn).open_multimap_table(self.sn_key_table)?;
202            let value = rkyv_adapter::serialize_said(&digest)?;
203            table.insert((identifier.to_string().as_str(), sn), value.as_ref())?;
204
205            let mut table = (&write_txn).open_table(self.dts_table)?;
206            let value = get_current_timestamp();
207            let key = rkyv_adapter::serialize_said(&digest)?;
208            table.insert(key.as_slice(), &value)?;
209        }
210        write_txn.commit()?;
211        Ok(())
212    }
213
214    fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result<Self::DigestIter, RedbError> {
215        let read_txn = self.db.begin_read()?;
216        let table = read_txn.open_multimap_table(self.sn_key_table)?;
217        let value = table.get((identifier.to_string().as_str(), sn))?;
218        let out = value.filter_map(|value| match value {
219            Ok(value) => {
220                let said = rkyv_adapter::deserialize_said(value.value()).unwrap();
221                Some(said)
222            }
223            _ => None,
224        });
225        Ok(Box::new(out))
226    }
227
228    fn get_greater_than(
229        &self,
230        identifier: &IdentifierPrefix,
231        sn: u64,
232    ) -> Result<Self::DigestIter, RedbError> {
233        let read_txn = self.db.begin_read()?;
234        let table = read_txn.open_multimap_table(self.sn_key_table)?;
235        let lower_bound = identifier.to_string();
236        let upper_bound = {
237            let mut bytes = lower_bound.as_bytes().to_vec();
238            if let Some(last) = bytes.last_mut() {
239                *last += 1; // Increment the last byte to get the next lexicographic string
240            };
241            String::from_utf8(bytes).unwrap()
242        };
243        let out = table
244            .range((lower_bound.as_str(), sn)..(upper_bound.as_str(), 0))?
245            .filter_map(|range| match range {
246                Ok((_key, value)) => Some(value.filter_map(|value| match value {
247                    Ok(value) => {
248                        let said = rkyv_adapter::deserialize_said(value.value()).unwrap();
249                        Some(said)
250                    }
251                    Err(_) => None,
252                })),
253                _ => None,
254            })
255            .flatten();
256
257        Ok(Box::new(out))
258    }
259
260    fn remove(
261        &self,
262        identifier: &IdentifierPrefix,
263        sn: u64,
264        said: &SelfAddressingIdentifier,
265    ) -> Result<(), RedbError> {
266        let write_txn = self.db.begin_write()?;
267        {
268            let mut table = write_txn.open_multimap_table(self.sn_key_table)?;
269            let said = rkyv_adapter::serialize_said(said).unwrap();
270            table.remove((identifier.to_string().as_str(), sn), said.as_slice())?;
271
272            let mut table = write_txn.open_table(self.dts_table)?;
273            table.remove(said.as_slice())?;
274        }
275
276        write_txn.commit()?;
277        Ok(())
278    }
279}
280
281pub(crate) fn get_current_timestamp() -> u64 {
282    SystemTime::now()
283        .duration_since(UNIX_EPOCH)
284        .expect("Time went backwards")
285        .as_secs()
286}