keri_core/database/redb/
mod.rs

1pub(crate) mod rkyv_adapter;
2
3/// Kel storage. (identifier, sn) -> event digest
4/// The `KELS` table links an identifier and sequence number to the digest of an event,
5/// referencing the actual event stored in the `EVENTS` table.
6const KELS: TableDefinition<(&str, u64), &[u8]> = TableDefinition::new("kels");
7
8/// Events store. (event digest) -> key event
9/// The `EVENTS` table directly stores the event data, which other tables reference
10/// by its digest.
11const EVENTS: TableDefinition<&[u8], &[u8]> = TableDefinition::new("events");
12
13/// Signatures storage. (identifier, sn) -> signature
14/// The `SIGS` table links an identifier and sequence number to one or more
15/// signatures.
16const SIGS: MultimapTableDefinition<(&str, u64), &[u8]> =
17    MultimapTableDefinition::new("signatures");
18
19/// Nontransferable receipts storage. (identifier, sn) -> signature couplet (one or more)
20const NONTRANS_RCTS: MultimapTableDefinition<(&str, u64), &[u8]> =
21    MultimapTableDefinition::new("nontrans_receipts");
22
23/// Nontransferable receipts storage. (identifier, sn) -> transferable receipt (one or more)
24const TRANS_RCTS: MultimapTableDefinition<(&str, u64), &[u8]> =
25    MultimapTableDefinition::new("trans_receipts");
26
27use std::{path::Path, u64};
28
29use redb::{Database, MultimapTableDefinition, TableDefinition};
30use rkyv::{
31    api::high::HighSerializer, rancor::Failure, ser::allocator::ArenaHandle, util::AlignedVec,
32};
33use rkyv_adapter::deserialize_indexed_signatures;
34use said::{sad::SerializationFormats, SelfAddressingIdentifier};
35
36use crate::{
37    event::{receipt::Receipt, KeyEvent},
38    event_message::{
39        msg::KeriEvent,
40        signature::{Nontransferable, Transferable},
41        signed_event_message::{
42            SignedEventMessage, SignedNontransferableReceipt, SignedTransferableReceipt,
43        },
44    },
45    prefix::{IdentifierPrefix, IndexedSignature},
46};
47use cesrox::primitives::CesrPrimitive;
48
49use self::timestamped::TimestampedSignedEventMessage;
50
51use super::{timestamped, EventDatabase, QueryParameters};
52
53#[derive(Debug, thiserror::Error)]
54pub enum RedbError {
55    #[error("Failed to create database. Reason: {0}")]
56    DatabaseCreationFiled(#[from] redb::DatabaseError),
57    #[error("Failed to save to database. Reason: {0}")]
58    TransactionFiled(#[from] redb::TransactionError),
59    #[error("Failed to save to database. Reason: {0}")]
60    CommitFiled(#[from] redb::CommitError),
61    #[error("Table opening error. Reason: {0}")]
62    TableError(#[from] redb::TableError),
63    #[error("Saving element error. Reason: {0}")]
64    InsertingError(#[from] redb::StorageError),
65    #[error("Retrieving element error. Reason: {0}")]
66    RetrievingError(redb::Error),
67    #[error("Value format error")]
68    WrongValue,
69    #[error("Key format error")]
70    WrongKey(#[from] KeyError),
71    #[error("No event for digest {0} found")]
72    NotFound(SelfAddressingIdentifier),
73    #[error("No digest in provided event")]
74    MissingDigest,
75    #[error("Rkyv error: {0}")]
76    Rkyv(#[from] rkyv::rancor::Error),
77}
78
79#[derive(Debug, thiserror::Error)]
80pub enum KeyError {
81    #[error("Can't parse said in key")]
82    UnparsableSaid,
83    #[error("Can't parse index in key")]
84    UnparsableIndex,
85}
86
87pub struct RedbDatabase {
88    db: Database,
89}
90
91impl RedbDatabase {
92    pub fn new(db_path: &Path) -> Result<Self, RedbError> {
93        let db = Database::create(db_path)?;
94        // Create tables
95        let write_txn = db.begin_write()?;
96        {
97            write_txn.open_table(EVENTS)?;
98            write_txn.open_table(KELS)?;
99            write_txn.open_multimap_table(SIGS)?;
100            write_txn.open_multimap_table(TRANS_RCTS)?;
101            write_txn.open_multimap_table(NONTRANS_RCTS)?;
102        }
103        write_txn.commit()?;
104        Ok(Self { db })
105    }
106}
107
108impl EventDatabase for RedbDatabase {
109    type Error = RedbError;
110    fn add_kel_finalized_event(
111        &self,
112        signed_event: SignedEventMessage,
113        _id: &IdentifierPrefix,
114    ) -> Result<(), RedbError> {
115        let event = &signed_event.event_message;
116        self.insert_key_event(event)?;
117        let id = &event.data.prefix;
118        let sn = event.data.sn;
119
120        self.insert_indexed_signatures(&id, sn, &signed_event.signatures)?;
121        if let Some(wits) = signed_event.witness_receipts {
122            self.insert_nontrans_receipt(&id.to_str(), sn, &wits)?;
123        };
124        self.save_to_kel(event)?;
125        Ok(())
126    }
127
128    fn add_receipt_t(
129        &self,
130        receipt: SignedTransferableReceipt,
131        _id: &IdentifierPrefix,
132    ) -> Result<(), RedbError> {
133        let sn = receipt.body.sn;
134        let id = receipt.body.prefix;
135        let transferable = Transferable::Seal(receipt.validator_seal, receipt.signatures);
136        self.insert_trans_receipt(&id.to_str(), sn, &[transferable])
137    }
138
139    fn add_receipt_nt(
140        &self,
141        receipt: SignedNontransferableReceipt,
142        _id: &IdentifierPrefix,
143    ) -> Result<(), RedbError> {
144        let sn = receipt.body.sn;
145        let id = receipt.body.prefix;
146        let receipts = receipt.signatures;
147        self.insert_nontrans_receipt(&id.to_str(), sn, &receipts)
148    }
149
150    fn get_kel_finalized_events(
151        &self,
152        params: super::QueryParameters,
153    ) -> Option<impl DoubleEndedIterator<Item = super::timestamped::TimestampedSignedEventMessage>>
154    {
155        match params {
156            QueryParameters::BySn { id, sn } => Some(self.get_kel(&id, sn, 1).into_iter()),
157            QueryParameters::Range { id, start, limit } => {
158                Some(self.get_kel(&id, start, limit).into_iter())
159            }
160            QueryParameters::All { id } => self.get_full_kel(id).map(|kel| kel.into_iter()),
161        }
162    }
163
164    fn get_receipts_t(
165        &self,
166        params: super::QueryParameters,
167    ) -> Option<impl DoubleEndedIterator<Item = Transferable>> {
168        match params {
169            QueryParameters::BySn { id, sn } => self.get_trans_receipts(&id.to_str(), sn).ok(),
170            QueryParameters::Range {
171                id: _,
172                start: _,
173                limit: _,
174            } => todo!(),
175            QueryParameters::All { id: _ } => todo!(),
176        }
177    }
178
179    fn get_receipts_nt(
180        &self,
181        params: super::QueryParameters,
182    ) -> Option<impl DoubleEndedIterator<Item = SignedNontransferableReceipt>> {
183        match params {
184            QueryParameters::BySn { id, sn } => self
185                .get_nontrans_receipts_range(&id.to_str(), sn, 1)
186                .ok()
187                .map(|e| e.into_iter()),
188            QueryParameters::Range { id, start, limit } => self
189                .get_nontrans_receipts_range(&id.to_str(), start, limit)
190                .ok()
191                .map(|e| e.into_iter()),
192            QueryParameters::All { id } => self
193                .get_nontrans_receipts_range(&id.to_str(), 0, u64::MAX)
194                .ok()
195                .map(|e| e.into_iter()),
196        }
197    }
198}
199
200impl RedbDatabase {
201    /// Saves provided event into key event table. Key is it's digest and value is event.
202    fn insert_key_event(&self, event: &KeriEvent<KeyEvent>) -> Result<(), RedbError> {
203        let digest = event.digest().map_err(|_e| RedbError::MissingDigest)?;
204        let value = rkyv::to_bytes::<rkyv::rancor::Error>(event)?;
205        let write_txn = self.db.begin_write()?;
206        {
207            let mut table = write_txn.open_table(EVENTS)?;
208            let key = rkyv_adapter::serialize_said(&digest)?;
209            table.insert(key.as_slice(), &value.as_ref())?;
210        }
211        write_txn.commit()?;
212
213        Ok(())
214    }
215
216    /// Saves KEL event of given identifier. Key is identifier and sn of event, and value is event digest.
217    fn save_to_kel(&self, event: &KeriEvent<KeyEvent>) -> Result<(), RedbError> {
218        let digest = event.digest().map_err(|_e| RedbError::MissingDigest)?;
219
220        let write_txn = self.db.begin_write()?;
221        {
222            let mut table = write_txn.open_table(KELS)?;
223            let id = event.data.prefix.to_str();
224            let sn = event.data.sn;
225            let serialized_said = rkyv_adapter::serialize_said(&digest)?;
226            table.insert((id.as_str(), sn), &serialized_said.as_slice())?;
227        }
228        write_txn.commit()?;
229
230        Ok(())
231    }
232
233    fn insert_with_sn_key<
234        V: for<'a> rkyv::Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rkyv::rancor::Error>>,
235    >(
236        &self,
237        table: MultimapTableDefinition<(&str, u64), &[u8]>,
238        id: &str,
239        sn: u64,
240        values: &[V],
241    ) -> Result<(), RedbError> {
242        let write_txn = self.db.begin_write()?;
243        {
244            let mut table = write_txn.open_multimap_table(table)?;
245
246            for value in values {
247                let sig = rkyv::to_bytes(value)?;
248                table.insert((id, sn), sig.as_slice())?;
249            }
250        }
251        write_txn.commit()?;
252
253        Ok(())
254    }
255
256    fn insert_nontrans_receipt(
257        &self,
258        id: &str,
259        sn: u64,
260        nontrans: &[Nontransferable],
261    ) -> Result<(), RedbError> {
262        self.insert_with_sn_key(NONTRANS_RCTS, id, sn, nontrans)
263    }
264
265    fn insert_trans_receipt(
266        &self,
267        id: &str,
268        sn: u64,
269        trans: &[Transferable],
270    ) -> Result<(), RedbError> {
271        self.insert_with_sn_key(TRANS_RCTS, id, sn, trans)
272    }
273
274    fn insert_indexed_signatures(
275        &self,
276        identifier: &IdentifierPrefix,
277        sn: u64,
278        signatures: &[IndexedSignature],
279    ) -> Result<(), RedbError> {
280        self.insert_with_sn_key(SIGS, &identifier.to_str(), sn, signatures)
281    }
282
283    fn get_nontrans_couplets(
284        &self,
285        id: &str,
286        sn: u64,
287    ) -> Result<impl Iterator<Item = Nontransferable>, RedbError> {
288        let from_db_iterator = {
289            let read_txn = self.db.begin_read()?;
290            let table = read_txn.open_multimap_table(NONTRANS_RCTS)?;
291            table.get((id, sn))
292        }?;
293        let nontrans = from_db_iterator
294            .map(|sig| match sig {
295                Ok(sig) => Ok(rkyv_adapter::deserialize_nontransferable(sig.value()).unwrap()),
296                Err(e) => Err(RedbError::from(e)),
297            })
298            .collect::<Result<Vec<_>, _>>();
299        nontrans.map(|el| el.into_iter())
300    }
301
302    fn get_nontrans_receipts_range(
303        &self,
304        id: &str,
305        start: u64,
306        limit: u64,
307    ) -> Result<Vec<SignedNontransferableReceipt>, RedbError> {
308        let from_db_iterator = {
309            let read_txn = self.db.begin_read()?;
310            let table = read_txn.open_multimap_table(NONTRANS_RCTS)?;
311            table.range((id, start)..(id, start + limit))
312        }?;
313        let out: Vec<SignedNontransferableReceipt> = from_db_iterator
314            .map(|sig| match sig {
315                Ok((key, value)) => {
316                    let (identifier, sn) = key.value();
317                    let id = identifier.parse().unwrap();
318                    let digest = self.get_event_digest(&id, sn).unwrap();
319                    let nontrans = value
320                        .map(|value| match value {
321                            Ok(element) => {
322                                rkyv_adapter::deserialize_nontransferable(element.value()).unwrap()
323                            }
324                            Err(_) => todo!(),
325                        })
326                        .collect::<Vec<_>>();
327                    let rct = Receipt::new(SerializationFormats::JSON, digest.unwrap(), id, sn);
328                    SignedNontransferableReceipt {
329                        body: rct,
330                        signatures: nontrans,
331                    }
332                }
333                Err(_) => todo!(),
334            })
335            .collect();
336        Ok(out)
337    }
338
339    fn get_all_nontrans_receipts_couplets(
340        &self,
341        id: &str,
342    ) -> Result<Box<dyn DoubleEndedIterator<Item = Nontransferable>>, RedbError> {
343        let from_db_iterator = {
344            let read_txn = self.db.begin_read()?;
345            let table = read_txn.open_multimap_table(NONTRANS_RCTS)?;
346            table.range((id, 0)..(id, u64::MAX))
347        }?;
348        let out = from_db_iterator
349            .map(|sig| match sig {
350                Ok((_key, value)) => value.map(|value| match value {
351                    Ok(element) => {
352                        rkyv_adapter::deserialize_nontransferable(element.value()).unwrap()
353                    }
354                    Err(_) => todo!(),
355                }),
356                Err(_) => todo!(),
357            })
358            .flatten();
359        Ok(Box::new(out))
360    }
361
362    fn get_trans_receipts(
363        &self,
364        id: &str,
365        sn: u64,
366    ) -> Result<impl DoubleEndedIterator<Item = Transferable>, RedbError> {
367        let from_db_iterator = {
368            let read_txn = self.db.begin_read()?;
369            let table = read_txn.open_multimap_table(TRANS_RCTS)?;
370            table.get((id, sn))
371        }?;
372        Ok(from_db_iterator.map(|sig| match sig {
373            Ok(sig) => rkyv_adapter::deserialize_transferable(sig.value()).unwrap(),
374            Err(_) => todo!(),
375        }))
376    }
377
378    fn get_event_digest(
379        &self,
380        identifier: &IdentifierPrefix,
381        sn: u64,
382    ) -> Result<Option<SelfAddressingIdentifier>, RedbError> {
383        Ok({
384            let read_txn = self.db.begin_read().unwrap();
385            let table = read_txn.open_table(KELS)?;
386            table
387                .get((identifier.to_str().as_str(), sn))?
388                .map(|value| -> Result<SelfAddressingIdentifier, RedbError> {
389                    let digest: SelfAddressingIdentifier =
390                        rkyv_adapter::deserialize_said(value.value())?;
391                    Ok(digest)
392                })
393                .transpose()?
394        })
395    }
396
397    fn get_event_by_digest(
398        &self,
399        said: &SelfAddressingIdentifier,
400    ) -> Result<Option<KeriEvent<KeyEvent>>, RedbError> {
401        let read_txn = self.db.begin_read()?;
402        let table = read_txn.open_table(EVENTS)?;
403
404        let key = rkyv_adapter::serialize_said(&said).unwrap();
405        if let Some(event) = table.get(key.as_slice())? {
406            let bytes = event.value().to_vec();
407            let deserialized: KeriEvent<KeyEvent> = rkyv::from_bytes::<_, Failure>(&bytes).unwrap();
408            Ok(Some(deserialized))
409        } else {
410            Ok(None)
411        }
412    }
413
414    fn get_event_by_serialized_key(
415        &self,
416        said_arch: &[u8],
417    ) -> Result<Option<KeriEvent<KeyEvent>>, RedbError> {
418        let read_txn = self.db.begin_read()?;
419        let table = read_txn.open_table(EVENTS)?;
420
421        if let Some(event) = table.get(said_arch)? {
422            let bytes = event.value().to_vec();
423            let deser: KeriEvent<KeyEvent> = rkyv::from_bytes::<_, Failure>(&bytes).unwrap();
424            Ok(Some(deser))
425        } else {
426            Ok(None)
427        }
428    }
429
430    fn get_signatures(
431        &self,
432        key: (&str, u64),
433    ) -> Result<Option<impl Iterator<Item = IndexedSignature>>, RedbError> {
434        let from_db_iterator = {
435            let read_txn = self.db.begin_read()?;
436            let table: redb::ReadOnlyMultimapTable<(&str, u64), &[u8]> =
437                read_txn.open_multimap_table(SIGS)?;
438            table.get(key)
439        }?;
440        Ok(Some(from_db_iterator.map(|sig| match sig {
441            Ok(sig) => deserialize_indexed_signatures(sig.value()).unwrap(),
442            Err(_) => todo!(),
443        })))
444    }
445
446    fn get_kel<'a>(
447        &'a self,
448        id: &IdentifierPrefix,
449        from: u64,
450        limit: u64,
451    ) -> Vec<timestamped::Timestamped<SignedEventMessage>> {
452        let digests = {
453            let read_txn = self.db.begin_read().unwrap();
454            let table = read_txn.open_table(KELS).unwrap();
455            table
456                .range((id.to_str().as_str(), from)..(id.to_str().as_str(), from + limit))
457                .unwrap()
458        };
459
460        digests
461            .map(|entry| {
462                let (key, value) = entry.unwrap();
463                let signatures = self.get_signatures(key.value()).unwrap().unwrap().collect();
464
465                let event = self
466                    .get_event_by_serialized_key(&value.value())
467                    .unwrap()
468                    .unwrap();
469                TimestampedSignedEventMessage::new(SignedEventMessage::new(
470                    &event, signatures, None, None,
471                ))
472            })
473            .collect()
474    }
475
476    fn get_full_kel<'a>(
477        &'a self,
478        id: &IdentifierPrefix,
479    ) -> Option<Vec<timestamped::Timestamped<SignedEventMessage>>> {
480        let digests = {
481            let read_txn = self.db.begin_read().unwrap();
482            let table = read_txn.open_table(KELS);
483            match table {
484                Ok(table) => table
485                    .range((id.to_str().as_str(), 0)..(id.to_str().as_str(), u64::MAX))
486                    .unwrap(),
487                Err(_e) => return None,
488            }
489        };
490
491        Some(
492            digests
493                .map(|entry| {
494                    let (key, value) = entry.unwrap();
495                    let signatures = self.get_signatures(key.value()).unwrap().unwrap().collect();
496
497                    let event = self
498                        .get_event_by_serialized_key(value.value())
499                        .unwrap()
500                        .unwrap();
501                    TimestampedSignedEventMessage::new(SignedEventMessage::new(
502                        &event, signatures, None, None,
503                    ))
504                })
505                .collect(),
506        )
507    }
508}
509
510#[test]
511fn test_retrieve_kel() {
512    use crate::actor::parse_event_stream;
513    use crate::event_message::signed_event_message::{Message, Notice};
514    use crate::event_message::EventTypeTag;
515    use tempfile::NamedTempFile;
516    // Create test db path.
517    let file_path = NamedTempFile::new().unwrap();
518
519    let db = RedbDatabase::new(file_path.path()).unwrap();
520
521    let icp_raw: &[u8] = br#"{"v":"KERI10JSON0001e7_","t":"icp","d":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"0","kt":"2","k":["DErocgXD2RGSyvn3MObcx59jeOsEQhv2TqHirVkzrp0Q","DFXLiTjiRdSBPLL6hLa0rskIxk3dh4XwJLfctkJFLRSS","DE9YgIQVgpLwocTVrG8tidKScsQSMWwLWywNC48fhq4f"],"nt":"2","n":["EDJk5EEpC4-tQ7YDwBiKbpaZahh1QCyQOnZRF7p2i8k8","EAXfDjKvUFRj-IEB_o4y-Y_qeJAjYfZtOMD9e7vHNFss","EN8l6yJC2PxribTN0xfri6bLz34Qvj-x3cNwcV3DvT2m"],"bt":"0","b":[],"c":[],"a":[]}-AADAAD4SyJSYlsQG22MGXzRGz2PTMqpkgOyUfq7cS99sC2BCWwdVmEMKiTEeWe5kv-l_d9auxdadQuArLtAGEArW8wEABD0z_vQmFImZXfdR-0lclcpZFfkJJJNXDcUNrf7a-mGsxNLprJo-LROwDkH5m7tVrb-a1jcor2dHD9Jez-r4bQIACBFeU05ywfZycLdR0FxCvAR9BfV9im8tWe1DglezqJLf-vHRQSChY1KafbYNc96hYYpbuN90WzuCRMgV8KgRsEC"#;
522    let rot_raw: &[u8] = br#"{"v":"KERI10JSON00021c_","t":"rot","d":"EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"1","p":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","kt":"2","k":["DCjxOXniUc5EUzDqERlXdptfKPHy6jNo_ZGsS4Vd8fAE","DNZHARO4dCJlluv0qezEMRmErIWWc-lzOzolBOQ15tHV","DOCQ4KN1jUlKbfjRteDYt9fxgpq1NK9_MqO5IA7shpED"],"nt":"2","n":["EN8l6yJC2PxribTN0xfri6bLz34Qvj-x3cNwcV3DvT2m","EATiZAHl0kzKID6faaQP2O7zB3Hj7eH3bE-vgKVAtsyU","EG6e7dJhh78ZqeIZ-eMbe-OB3TwFMPmrSsh9k75XIjLP"],"bt":"0","br":[],"ba":[],"a":[]}-AADAAAqV6xpsAAEB_FJP5UdYO5qiJphz8cqXbTjB9SRy8V0wIim-lgafF4o-b7TW0spZtzx2RXUfZLQQCIKZsw99k8AABBP8nfF3t6bf4z7eNoBgUJR-hdhw7wnlljMZkeY5j2KFRI_s8wqtcOFx1A913xarGJlO6UfrqFWo53e9zcD8egIACB8DKLMZcCGICuk98RCEVuS0GsqVngi1d-7gAX0jid42qUcR3aiYDMp2wJhqJn-iHJVvtB-LK7TRTggBtMDjuwB"#;
523    let ixn_raw: &[u8] = br#"{"v":"KERI10JSON0000cb_","t":"ixn","d":"EL6Dpm72KXayaUHYvVHlhPplg69fBvRt1P3YzuOGVpmz","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"2","p":"EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz","a":[]}-AADAABgep0kbpgl91vvcXziJ7tHY1WVTAcUJyYCBNqTcNuK9AfzLHfKHhJeSC67wFRU845qjLSAC-XwWaqWgyAgw_8MABD5wTnqqJcnLWMA7NZ1vLOTzDspInJrly7O4Kt6Jwzue9z2TXkDXi1jr69JeKbzUQ6c2Ka1qPXAst0JzrOiyuAPACAcLHnOz1Owtgq8mcR_-PpAr91zOTK_Zj9r0V-9P47vzGsYwAxcVshclfhCMhu73aZuZbvQhy9Rxcj-qRz96cIL"#;
524    let second_icp_raw = br#"{"v":"KERI10JSON000159_","t":"icp","d":"EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf","i":"EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf","s":"0","kt":"1","k":["DIwDbi2Sr1kLZFpsX0Od6Y8ariGVLLjZXxBC5bXEI85e"],"nt":"1","n":["ELhmgZ5JFc-ACs9TJxHMxtcKzQxKXLhlAmUT_sKf1-l7"],"bt":"0","b":["DM73ulUG2_DJyA27DfxBXT5SJ5U3A3c2oeG8Z4bUOgyL"],"c":[],"a":[]}-AABAAAPGpCUdR6EfVWROUjpuTsxg5BIcMnfi7PDciv8VuY9NqZ0ioRoaHxMZue_5ALys86sX4aQzKqm_bID3ZBwlMUP"#;
525
526    let first_id: IdentifierPrefix = "EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen"
527        .parse()
528        .unwrap();
529    let second_id: IdentifierPrefix = "EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf"
530        .parse()
531        .unwrap();
532
533    for event in [icp_raw, rot_raw, ixn_raw, second_icp_raw] {
534        let evs = parse_event_stream(event).unwrap();
535        let ev = evs.first().unwrap();
536        match ev {
537            Message::Notice(Notice::Event(event)) => {
538                db.add_kel_finalized_event(event.clone(), &event.event_message.data.get_prefix())
539                    .unwrap();
540            }
541            _ => unreachable!(),
542        }
543    }
544
545    // Find event by digest
546    let ev_digest: SelfAddressingIdentifier = "EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen"
547        .parse()
548        .unwrap();
549    let events = db.get_event_by_digest(&ev_digest).unwrap().unwrap();
550    let expected_event = &icp_raw[..487]; // icp event without signatures
551    assert_eq!(events.encode().unwrap(), expected_event);
552
553    let sigs_from_db = db.get_signatures((&first_id.to_str(), 0)).unwrap().unwrap();
554    assert_eq!(sigs_from_db.count(), 3);
555
556    // Warning: order of retrieved signatures isn't the same as insertion order
557    let sigs_from_db = db
558        .get_signatures((&second_id.to_str(), 0))
559        .unwrap()
560        .unwrap();
561    assert_eq!(sigs_from_db.count(), 1);
562
563    // Retrieve KEL in range
564    let mut part_of_kel_events = db.get_kel(&first_id, 1, 2).into_iter();
565
566    let rot = part_of_kel_events.next().unwrap();
567    assert_eq!(
568        rot.signed_event_message.event_message.event_type,
569        EventTypeTag::Rot
570    );
571    assert_eq!(
572        rot.signed_event_message.event_message.digest,
573        Some(
574            "EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz"
575                .parse::<SelfAddressingIdentifier>()
576                .unwrap()
577                .into()
578        )
579    );
580    assert_eq!(rot.signed_event_message.signatures.len(), 3);
581
582    let ixn = part_of_kel_events.next().unwrap();
583    assert_eq!(
584        ixn.signed_event_message.event_message.event_type,
585        EventTypeTag::Ixn
586    );
587    assert_eq!(
588        ixn.signed_event_message.event_message.digest,
589        Some(
590            "EL6Dpm72KXayaUHYvVHlhPplg69fBvRt1P3YzuOGVpmz"
591                .parse::<SelfAddressingIdentifier>()
592                .unwrap()
593                .into()
594        )
595    );
596    assert_eq!(ixn.signed_event_message.signatures.len(), 3);
597
598    assert_eq!(part_of_kel_events.next(), None);
599
600    // Retrieve KEL in range
601    let mut part_of_kel_events = db.get_kel(&first_id, 0, 2).into_iter();
602    let icp = part_of_kel_events.next().unwrap();
603    assert_eq!(
604        icp.signed_event_message.event_message.event_type,
605        EventTypeTag::Icp
606    );
607    assert_eq!(
608        icp.signed_event_message.event_message.digest,
609        Some(
610            "EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen"
611                .parse::<SelfAddressingIdentifier>()
612                .unwrap()
613                .into()
614        )
615    );
616    assert_eq!(icp.signed_event_message.signatures.len(), 3);
617
618    let rot = part_of_kel_events.next().unwrap();
619    assert_eq!(
620        rot.signed_event_message.event_message.event_type,
621        EventTypeTag::Rot
622    );
623    assert_eq!(
624        rot.signed_event_message.event_message.digest,
625        Some(
626            "EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz"
627                .parse::<SelfAddressingIdentifier>()
628                .unwrap()
629                .into()
630        )
631    );
632    assert_eq!(rot.signed_event_message.signatures.len(), 3);
633
634    assert_eq!(part_of_kel_events.next(), None);
635}
636
637#[test]
638fn test_retrieve_receipts() {
639    use crate::actor::parse_event_stream;
640    use crate::event_message::signed_event_message::{Message, Notice};
641    use tempfile::NamedTempFile;
642    // Create test db path.
643    let file_path = NamedTempFile::new().unwrap();
644
645    let db = RedbDatabase::new(file_path.path()).unwrap();
646
647    let receipt0_0 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","i":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","s":"0"}-CABBN_PYSns7oFNixSohVW4raBwMV6iYeh0PEZ_bR-38Xev0BDbyebqZQKwn7TqU92Vtw8n2wy5FptP42F1HEmCc9nQLzbXrXuA9SMl9nCZ-vi2bdaeT3aqInXGFAW70QPzM4kJ"#;
648    let receipt0_1 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","i":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","s":"0"}-CABBHndk6cXPCnghFqKt_0SikY1P9z_nIUrHq_SeHgLQCui0BBqAOBXFKVivgf0jh2ySWX1VshnkUYK3ev_L--sPB_onF7w2WhiK2AB7mf4IIuaSQCLumsr2sV77S6U5VMx0CAD"#;
649
650    let receipt1_0 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","i":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","s":"1"}-CABBHndk6cXPCnghFqKt_0SikY1P9z_nIUrHq_SeHgLQCui0BBqAOBXFKVivgf0jh2ySWX1VshnkUYK3ev_L--sPB_onF7w2WhiK2AB7mf4IIuaSQCLumsr2sV77S6U5VMx0CAD"#;
651    let receipt1_1 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","i":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","s":"1"}-CABBN_PYSns7oFNixSohVW4raBwMV6iYeh0PEZ_bR-38Xev0BDbyebqZQKwn7TqU92Vtw8n2wy5FptP42F1HEmCc9nQLzbXrXuA9SMl9nCZ-vi2bdaeT3aqInXGFAW70QPzM4kJ"#;
652
653    let first_id: IdentifierPrefix = "EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9"
654        .parse()
655        .unwrap();
656
657    for event in [receipt0_0, receipt0_1, receipt1_0, receipt1_1] {
658        let evs = parse_event_stream(event).unwrap();
659        let ev = evs.first().unwrap();
660        match ev {
661            Message::Notice(Notice::NontransferableRct(rct)) => {
662                db.add_receipt_nt(rct.clone(), &first_id).unwrap();
663            }
664            _ => unreachable!(),
665        }
666    }
667
668    let retrived_rcts = db.get_nontrans_couplets(&first_id.to_str(), 0).unwrap();
669    assert_eq!(retrived_rcts.count(), 2);
670
671    let all_retrived_rcts = db
672        .get_all_nontrans_receipts_couplets(&first_id.to_str())
673        .unwrap();
674    assert_eq!(all_retrived_rcts.count(), 4);
675}