1use std::{
2 sync::Arc,
3 time::{SystemTime, UNIX_EPOCH},
4};
5
6use redb::{Database, MultimapTableDefinition, TableDefinition};
7use said::SelfAddressingIdentifier;
8
9use crate::{
10 database::{EscrowCreator, EscrowDatabase, LogDatabase as _, SequencedEventDatabase},
11 event::KeyEvent,
12 event_message::{msg::KeriEvent, signed_event_message::SignedEventMessage},
13 prefix::IdentifierPrefix,
14};
15
16use super::{rkyv_adapter, LogDatabase, RedbDatabase, RedbError};
17
18impl EscrowCreator for RedbDatabase {
19 type EscrowDatabaseType = SnKeyEscrow;
20
21 fn create_escrow_db(&self, table_name: &'static str) -> Self::EscrowDatabaseType {
22 SnKeyEscrow::new(
23 Arc::new(SnKeyDatabase::new(self.db.clone(), table_name).unwrap()),
24 self.log_db.clone(),
25 )
26 }
27}
28
29pub struct SnKeyEscrow {
30 escrow: Arc<
31 dyn SequencedEventDatabase<
32 DatabaseType = redb::Database,
33 Error = RedbError,
34 DigestIter = Box<dyn Iterator<Item = said::SelfAddressingIdentifier>>,
35 >,
36 >,
37 log: Arc<LogDatabase>,
38}
39
40impl crate::database::EscrowDatabase for SnKeyEscrow {
41 type EscrowDatabaseType = redb::Database;
42 type LogDatabaseType = LogDatabase;
43 type Error = RedbError;
44 type EventIter = Box<dyn Iterator<Item = SignedEventMessage> + Send>;
45
46 fn new(
47 escrow: Arc<
48 dyn SequencedEventDatabase<
49 DatabaseType = Self::EscrowDatabaseType,
50 Error = Self::Error,
51 DigestIter = Box<dyn Iterator<Item = said::SelfAddressingIdentifier>>,
52 >,
53 >,
54 log: Arc<LogDatabase>,
55 ) -> Self
56 where
57 Self: Sized,
58 {
59 Self { escrow, log }
60 }
61
62 fn save_digest(
63 &self,
64 id: &IdentifierPrefix,
65 sn: u64,
66 event_digest: &SelfAddressingIdentifier,
67 ) -> Result<(), RedbError> {
68 self.escrow.insert(id, sn, event_digest)?;
69
70 Ok(())
71 }
72
73 fn insert(&self, event: &SignedEventMessage) -> Result<(), RedbError> {
74 self.log
75 .log_event(&crate::database::redb::WriteTxnMode::CreateNew, &event)?;
76 let said = event.event_message.digest().unwrap();
77 let id = event.event_message.data.get_prefix();
78 let sn = event.event_message.data.sn;
79 self.escrow.insert(&id, sn, &said)?;
80
81 Ok(())
82 }
83
84 fn insert_key_value(
85 &self,
86 id: &IdentifierPrefix,
87 sn: u64,
88 event: &SignedEventMessage,
89 ) -> Result<(), RedbError> {
90 self.log
91 .log_event(&crate::database::redb::WriteTxnMode::CreateNew, &event)?;
92 let said = event.event_message.digest().unwrap();
93
94 self.escrow.insert(&id, sn, &said)?;
95
96 Ok(())
97 }
98
99 fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result<Self::EventIter, Self::Error> {
100 let saids = self.escrow.get(identifier, sn)?;
101 let saids_vec: Vec<_> = saids.collect();
102
103 let log = Arc::clone(&self.log);
104
105 let events = saids_vec.into_iter().filter_map(move |said| {
106 log.get_signed_event(&said)
107 .ok()
108 .flatten()
109 .map(|el| el.signed_event_message)
110 });
111
112 Ok(Box::new(events))
113 }
114
115 fn get_from_sn(
116 &self,
117 identifier: &IdentifierPrefix,
118 sn: u64,
119 ) -> Result<Self::EventIter, Self::Error> {
120 let saids = self.escrow.get_greater_than(identifier, sn)?;
121 let saids_vec: Vec<_> = saids.collect();
122
123 let log = Arc::clone(&self.log);
124
125 let events = saids_vec.into_iter().filter_map(move |said| {
126 log.get_signed_event(&said)
127 .ok()
128 .flatten()
129 .map(|el| el.signed_event_message)
130 });
131
132 Ok(Box::new(events))
133 }
134
135 fn remove(&self, event: &KeriEvent<KeyEvent>) {
136 let said = event.digest().unwrap();
137 let id = event.data.get_prefix();
138 let sn = event.data.sn;
139 self.escrow.remove(&id, sn, &said).unwrap();
140 }
141
142 fn contains(
143 &self,
144 id: &IdentifierPrefix,
145 sn: u64,
146 digest: &SelfAddressingIdentifier,
147 ) -> Result<bool, RedbError> {
148 Ok(self
149 .escrow
150 .get(id, sn)?
151 .find(|said| said == digest)
152 .is_some())
153 }
154}
155
156pub struct SnKeyDatabase {
160 db: Arc<Database>,
161 sn_key_table: MultimapTableDefinition<'static, (&'static str, u64), &'static [u8]>,
165 dts_table: TableDefinition<'static, &'static [u8], u64>,
168}
169
170impl SequencedEventDatabase for SnKeyDatabase {
171 type DatabaseType = redb::Database;
172 type Error = RedbError;
173 type DigestIter = Box<dyn Iterator<Item = SelfAddressingIdentifier>>;
174
175 fn new(db: Arc<Self::DatabaseType>, table_name: &'static str) -> Result<Self, RedbError> {
176 let pse = MultimapTableDefinition::new(table_name);
178 let dts = TableDefinition::new("timestamps_escrow");
179
180 let write_txn = db.begin_write()?;
181 {
182 write_txn.open_multimap_table(pse)?;
183 write_txn.open_table(dts)?;
184 }
185 write_txn.commit()?;
186 Ok(Self {
187 db,
188 sn_key_table: pse,
189 dts_table: dts,
190 })
191 }
192
193 fn insert(
194 &self,
195 identifier: &IdentifierPrefix,
196 sn: u64,
197 digest: &SelfAddressingIdentifier,
198 ) -> Result<(), RedbError> {
199 let write_txn = self.db.begin_write()?;
200 {
201 let mut table = (&write_txn).open_multimap_table(self.sn_key_table)?;
202 let value = rkyv_adapter::serialize_said(&digest)?;
203 table.insert((identifier.to_string().as_str(), sn), value.as_ref())?;
204
205 let mut table = (&write_txn).open_table(self.dts_table)?;
206 let value = get_current_timestamp();
207 let key = rkyv_adapter::serialize_said(&digest)?;
208 table.insert(key.as_slice(), &value)?;
209 }
210 write_txn.commit()?;
211 Ok(())
212 }
213
214 fn get(&self, identifier: &IdentifierPrefix, sn: u64) -> Result<Self::DigestIter, RedbError> {
215 let read_txn = self.db.begin_read()?;
216 let table = read_txn.open_multimap_table(self.sn_key_table)?;
217 let value = table.get((identifier.to_string().as_str(), sn))?;
218 let out = value.filter_map(|value| match value {
219 Ok(value) => {
220 let said = rkyv_adapter::deserialize_said(value.value()).unwrap();
221 Some(said)
222 }
223 _ => None,
224 });
225 Ok(Box::new(out))
226 }
227
228 fn get_greater_than(
229 &self,
230 identifier: &IdentifierPrefix,
231 sn: u64,
232 ) -> Result<Self::DigestIter, RedbError> {
233 let read_txn = self.db.begin_read()?;
234 let table = read_txn.open_multimap_table(self.sn_key_table)?;
235 let lower_bound = identifier.to_string();
236 let upper_bound = {
237 let mut bytes = lower_bound.as_bytes().to_vec();
238 if let Some(last) = bytes.last_mut() {
239 *last += 1; };
241 String::from_utf8(bytes).unwrap()
242 };
243 let out = table
244 .range((lower_bound.as_str(), sn)..(upper_bound.as_str(), 0))?
245 .filter_map(|range| match range {
246 Ok((_key, value)) => Some(value.filter_map(|value| match value {
247 Ok(value) => {
248 let said = rkyv_adapter::deserialize_said(value.value()).unwrap();
249 Some(said)
250 }
251 Err(_) => None,
252 })),
253 _ => None,
254 })
255 .flatten();
256
257 Ok(Box::new(out))
258 }
259
260 fn remove(
261 &self,
262 identifier: &IdentifierPrefix,
263 sn: u64,
264 said: &SelfAddressingIdentifier,
265 ) -> Result<(), RedbError> {
266 let write_txn = self.db.begin_write()?;
267 {
268 let mut table = write_txn.open_multimap_table(self.sn_key_table)?;
269 let said = rkyv_adapter::serialize_said(said).unwrap();
270 table.remove((identifier.to_string().as_str(), sn), said.as_slice())?;
271
272 let mut table = write_txn.open_table(self.dts_table)?;
273 table.remove(said.as_slice())?;
274 }
275
276 write_txn.commit()?;
277 Ok(())
278 }
279}
280
281pub(crate) fn get_current_timestamp() -> u64 {
282 SystemTime::now()
283 .duration_since(UNIX_EPOCH)
284 .expect("Time went backwards")
285 .as_secs()
286}