keri_core/database/
sled.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::Arc,
4};
5
6use serde::{Deserialize, Serialize};
7
8#[cfg(feature = "mailbox")]
9use super::mailbox::MailboxData;
10use super::tables::{SledEventTree, SledEventTreeVec};
11
12#[cfg(feature = "query")]
13use crate::query::reply_event::SignedReply;
14use crate::{
15    event::KeyEvent,
16    event_message::{
17        msg::KeriEvent,
18        signed_event_message::{SignedEventMessage, SignedNontransferableReceipt},
19        TimestampedEventMessage,
20    },
21    prefix::IdentifierPrefix,
22};
23
24use super::timestamped::TimestampedSignedEventMessage;
25
26pub struct SledEventDatabase {
27    db: Arc<sled::Db>,
28    // // "iids" tree
29    // this thing is expensive, but everything else is cheeeeeep
30    identifiers: SledEventTree<IdentifierPrefix>,
31    #[cfg(feature = "query")]
32    accepted_rpy: SledEventTreeVec<SignedReply>,
33
34    // "ldes" tree
35    likely_duplicious_events: SledEventTreeVec<TimestampedEventMessage>,
36    // "dels" tree
37    duplicitous_events: SledEventTreeVec<TimestampedSignedEventMessage>,
38
39    #[cfg(feature = "query")]
40    escrowed_replys: SledEventTreeVec<SignedReply>,
41
42    #[cfg(feature = "mailbox")]
43    mailbox: MailboxData,
44}
45
46// TODO: remove all the `.ok()`s
47impl SledEventDatabase {
48    pub fn new(path: impl AsRef<Path>) -> Result<Self, DbError> {
49        let mut events_path = PathBuf::new();
50        events_path.push(path);
51        let mut escrow_path = events_path.clone();
52
53        events_path.push("events");
54        escrow_path.push("escrow");
55
56        let db = Arc::new(sled::open(events_path.as_path())?);
57
58        Ok(Self {
59            identifiers: SledEventTree::new(db.open_tree(b"iids")?),
60            likely_duplicious_events: SledEventTreeVec::new(db.open_tree(b"ldes")?),
61            duplicitous_events: SledEventTreeVec::new(db.open_tree(b"dels")?),
62            #[cfg(feature = "query")]
63            accepted_rpy: SledEventTreeVec::new(db.open_tree(b"knas")?),
64            #[cfg(feature = "mailbox")]
65            mailbox: MailboxData::new(db.clone())?,
66
67            #[cfg(feature = "query")]
68            escrowed_replys: SledEventTreeVec::new(db.open_tree(b"knes")?),
69            db,
70        })
71    }
72
73    pub fn add_likely_duplicious_event(
74        &self,
75        event: KeriEvent<KeyEvent>,
76        id: &IdentifierPrefix,
77    ) -> Result<(), DbError> {
78        self.likely_duplicious_events
79            .push(self.identifiers.designated_key(id)?, event.into())?;
80        self.db.flush()?;
81        Ok(())
82    }
83
84    pub fn get_likely_duplicitous_events(
85        &self,
86        id: &IdentifierPrefix,
87    ) -> Option<impl DoubleEndedIterator<Item = TimestampedEventMessage>> {
88        self.likely_duplicious_events
89            .iter_values(self.identifiers.designated_key(id).ok()?)
90    }
91
92    pub fn add_duplicious_event(
93        &self,
94        event: SignedEventMessage,
95        id: &IdentifierPrefix,
96    ) -> Result<(), DbError> {
97        self.duplicitous_events
98            .push(self.identifiers.designated_key(id)?, event.into())
99    }
100
101    pub fn get_duplicious_events(
102        &self,
103        id: &IdentifierPrefix,
104    ) -> Option<impl DoubleEndedIterator<Item = TimestampedSignedEventMessage>> {
105        self.duplicitous_events
106            .iter_values(self.identifiers.designated_key(id).ok()?)
107    }
108
109    #[cfg(feature = "query")]
110    pub fn update_accepted_reply(
111        &self,
112        rpy: SignedReply,
113        id: &IdentifierPrefix,
114    ) -> Result<(), DbError> {
115        use crate::query::reply_event::ReplyRoute;
116
117        match self
118            .accepted_rpy
119            .iter_values(self.identifiers.designated_key(id)?)
120        {
121            Some(rpys) => {
122                let filtered = rpys
123                    .filter(|s| match (s.reply.get_route(), rpy.reply.get_route()) {
124                        (ReplyRoute::Ksn(id1, _), ReplyRoute::Ksn(id2, _)) => id1 != id2,
125                        _ => true,
126                    })
127                    .chain(Some(rpy.clone()).into_iter())
128                    .collect();
129                self.accepted_rpy
130                    .put(self.identifiers.designated_key(id)?, filtered)
131            }
132            None => self
133                .accepted_rpy
134                .push(self.identifiers.designated_key(id)?, rpy),
135        }?;
136        self.db.flush()?;
137        Ok(())
138    }
139
140    #[cfg(feature = "query")]
141    pub fn get_accepted_replys(
142        &self,
143        id: &IdentifierPrefix,
144    ) -> Option<impl DoubleEndedIterator<Item = SignedReply>> {
145        self.accepted_rpy
146            .iter_values(self.identifiers.designated_key(id).ok()?)
147    }
148
149    #[cfg(feature = "query")]
150    pub fn remove_accepted_reply(
151        &self,
152        id: &IdentifierPrefix,
153        rpy: SignedReply,
154    ) -> Result<(), DbError> {
155        self.accepted_rpy
156            .remove(self.identifiers.designated_key(id)?, &rpy)?;
157        self.db.flush()?;
158        Ok(())
159    }
160
161    #[cfg(feature = "query")]
162    pub fn add_escrowed_reply(
163        &self,
164        rpy: SignedReply,
165        id: &IdentifierPrefix,
166    ) -> Result<(), DbError> {
167        self.escrowed_replys
168            .push(self.identifiers.designated_key(id)?, rpy)?;
169        self.db.flush()?;
170        Ok(())
171    }
172
173    #[cfg(feature = "query")]
174    pub fn get_escrowed_replys(
175        &self,
176        id: &IdentifierPrefix,
177    ) -> Option<impl DoubleEndedIterator<Item = SignedReply>> {
178        self.escrowed_replys
179            .iter_values(self.identifiers.designated_key(id).ok()?)
180    }
181
182    #[cfg(feature = "query")]
183    pub fn remove_escrowed_reply(
184        &self,
185        id: &IdentifierPrefix,
186        rpy: &SignedReply,
187    ) -> Result<(), DbError> {
188        self.escrowed_replys
189            .remove(self.identifiers.designated_key(id)?, rpy)?;
190        self.db.flush()?;
191        Ok(())
192    }
193
194    #[cfg(feature = "query")]
195    pub fn get_all_escrowed_replys(&self) -> Option<impl DoubleEndedIterator<Item = SignedReply>> {
196        self.escrowed_replys.get_all()
197    }
198
199    #[cfg(feature = "mailbox")]
200    pub fn add_mailbox_receipt(
201        &self,
202        receipt: SignedNontransferableReceipt,
203        id: &IdentifierPrefix,
204    ) -> Result<(), DbError> {
205        self.mailbox
206            .add_mailbox_receipt(self.identifiers.designated_key(id)?, receipt)?;
207        self.db.flush()?;
208        Ok(())
209    }
210
211    #[cfg(feature = "mailbox")]
212    pub fn get_mailbox_receipts(
213        &self,
214        id: &IdentifierPrefix,
215    ) -> Option<impl DoubleEndedIterator<Item = SignedNontransferableReceipt>> {
216        self.mailbox
217            .get_mailbox_receipts(self.identifiers.designated_key(id).ok()?)
218    }
219
220    #[cfg(feature = "mailbox")]
221    pub fn add_mailbox_reply(
222        &self,
223        reply: SignedEventMessage,
224        id: &IdentifierPrefix,
225    ) -> Result<(), DbError> {
226        self.mailbox
227            .add_mailbox_reply(self.identifiers.designated_key(id)?, reply)?;
228        self.db.flush()?;
229        Ok(())
230    }
231
232    #[cfg(feature = "mailbox")]
233    pub fn get_mailbox_replies(
234        &self,
235        id: &IdentifierPrefix,
236    ) -> Option<impl DoubleEndedIterator<Item = SignedEventMessage>> {
237        self.mailbox
238            .get_mailbox_replies(self.identifiers.designated_key(id).ok()?)
239    }
240
241    #[cfg(feature = "mailbox")]
242    pub fn add_mailbox_multisig(
243        &self,
244        event: SignedEventMessage,
245        target_id: &IdentifierPrefix,
246    ) -> Result<(), DbError> {
247        self.mailbox
248            .add_mailbox_multisig(self.identifiers.designated_key(target_id)?, event)?;
249        self.db.flush()?;
250        Ok(())
251    }
252
253    #[cfg(feature = "mailbox")]
254    pub fn get_mailbox_multisig(
255        &self,
256        id: &IdentifierPrefix,
257    ) -> Option<impl DoubleEndedIterator<Item = TimestampedSignedEventMessage>> {
258        self.mailbox
259            .get_mailbox_multisig(self.identifiers.designated_key(id).ok()?)
260    }
261
262    #[cfg(feature = "mailbox")]
263    pub fn add_mailbox_delegate(
264        &self,
265        event: SignedEventMessage,
266        target_id: &IdentifierPrefix,
267    ) -> Result<(), DbError> {
268        self.mailbox
269            .add_mailbox_delegate(self.identifiers.designated_key(target_id)?, event)?;
270        self.db.flush()?;
271        Ok(())
272    }
273
274    #[cfg(feature = "mailbox")]
275    pub fn get_mailbox_delegate(
276        &self,
277        id: &IdentifierPrefix,
278    ) -> Option<impl DoubleEndedIterator<Item = TimestampedSignedEventMessage>> {
279        self.mailbox
280            .get_mailbox_delegate(self.identifiers.designated_key(id).ok()?)
281    }
282}
283
284#[derive(Debug, thiserror::Error, Serialize, Deserialize)]
285pub enum DbError {
286    // TODO: more variants
287    #[error("sled error")]
288    Sled,
289    #[error("serde error")]
290    Serde,
291}
292
293impl From<sled::Error> for DbError {
294    fn from(_: sled::Error) -> Self {
295        DbError::Sled
296    }
297}
298
299impl From<serde_cbor::Error> for DbError {
300    fn from(_: serde_cbor::Error) -> Self {
301        DbError::Serde
302    }
303}