keri_core/database/redb/
loging.rs

1/// Events store. (event digest) -> key event
2/// The `EVENTS` table directly stores the event data, which other tables reference
3/// by its digest.
4const EVENTS: TableDefinition<&[u8], &[u8]> = TableDefinition::new("events");
5
6/// Signatures storage. (event digest) -> signature
7/// The `SIGS` table links event digest to one or more
8/// signatures.
9const SIGS: MultimapTableDefinition<&[u8], &[u8]> = MultimapTableDefinition::new("signatures");
10
11/// Nontransferable receipts storage. (event digest) -> signature couplet (one or more)
12const NONTRANS_RCTS: MultimapTableDefinition<&[u8], &[u8]> =
13    MultimapTableDefinition::new("nontrans_receipts");
14
15/// Nontransferable receipts storage. (event digest) -> transferable receipt (one or more)
16const TRANS_RCTS: MultimapTableDefinition<&[u8], &[u8]> =
17    MultimapTableDefinition::new("trans_receipts");
18
19/// Delegating Event Seals (event digest) -> seal
20const SEALS: TableDefinition<&[u8], &[u8]> = TableDefinition::new("seals");
21
22use std::sync::Arc;
23
24use redb::{Database, MultimapTableDefinition, TableDefinition};
25use rkyv::{
26    api::high::HighSerializer,
27    rancor::{self, Failure},
28    ser::allocator::ArenaHandle,
29    util::AlignedVec,
30};
31use said::SelfAddressingIdentifier;
32
33use crate::{
34    database::timestamped::TimestampedSignedEventMessage,
35    event::{sections::seal::SourceSeal, KeyEvent},
36    event_message::{
37        msg::KeriEvent,
38        signature::{Nontransferable, Transferable},
39        signed_event_message::{SignedEventMessage, SignedNontransferableReceipt},
40    },
41    prefix::IndexedSignature,
42};
43
44use crate::database::LogDatabase as LogDatabaseTrait;
45
46use super::{
47    execute_in_transaction,
48    rkyv_adapter::{self, deserialize_indexed_signatures, deserialize_source_seal},
49    RedbError, WriteTxnMode,
50};
51
52/// Stores all incoming signed events and enables retrieval by event digest.  
53/// Events are split into separate tables for events, signatures, and receipts,  
54/// with the digest serving as the key in each table.
55pub struct LogDatabase {
56    db: Arc<Database>,
57}
58
59impl<'db> LogDatabaseTrait<'db> for LogDatabase {
60    type DatabaseType = Database;
61    type Error = RedbError;
62    type TransactionType = WriteTxnMode<'db>;
63
64    fn new(db: Arc<Database>) -> Result<Self, RedbError> {
65        // Create tables
66        let write_txn = db.begin_write()?;
67        {
68            write_txn.open_table(EVENTS)?;
69            write_txn.open_multimap_table(SIGS)?;
70            write_txn.open_multimap_table(TRANS_RCTS)?;
71            write_txn.open_multimap_table(NONTRANS_RCTS)?;
72            write_txn.open_table(SEALS)?;
73        }
74        write_txn.commit()?;
75        Ok(Self { db })
76    }
77
78    fn log_event(
79        &self,
80        txn_mode: &WriteTxnMode,
81        signed_event: &SignedEventMessage,
82    ) -> Result<(), RedbError> {
83        self.insert_key_event(&txn_mode, &signed_event.event_message)?;
84        let digest = signed_event
85            .event_message
86            .digest()
87            .map_err(|_e| RedbError::MissingDigest)?;
88
89        self.insert_indexed_signatures(&txn_mode, &digest, &signed_event.signatures)?;
90        if let Some(wits) = &signed_event.witness_receipts {
91            self.insert_nontrans_receipt(&txn_mode, &digest, &wits)?;
92        };
93
94        if let Some(delegator_seal) = &signed_event.delegator_seal {
95            self.insert_source_seal(&txn_mode, &digest, delegator_seal)?;
96        }
97        Ok(())
98    }
99
100    fn log_event_with_new_transaction(
101        &self,
102        signed_event: &SignedEventMessage,
103    ) -> Result<(), RedbError> {
104        self.log_event(&WriteTxnMode::CreateNew, signed_event)
105    }
106
107    fn log_receipt(
108        &self,
109        txn_mode: &WriteTxnMode,
110        signed_receipt: &SignedNontransferableReceipt,
111    ) -> Result<(), RedbError> {
112        let digest = &signed_receipt.body.receipted_event_digest;
113
114        self.insert_nontrans_receipt(&txn_mode, digest, &signed_receipt.signatures)?;
115        Ok(())
116    }
117
118    fn log_receipt_with_new_transaction(
119        &self,
120        signed_receipt: &SignedNontransferableReceipt,
121    ) -> Result<(), RedbError> {
122        self.log_receipt(&WriteTxnMode::CreateNew, signed_receipt)
123    }
124
125    fn get_signed_event(
126        &self,
127        said: &SelfAddressingIdentifier,
128    ) -> Result<Option<TimestampedSignedEventMessage>, RedbError> {
129        let key = rkyv_adapter::serialize_said(said)?;
130        self.get_signed_event_by_serialized_key(key.as_slice())
131    }
132
133    fn get_event(
134        &self,
135        said: &SelfAddressingIdentifier,
136    ) -> Result<Option<KeriEvent<KeyEvent>>, RedbError> {
137        let key = rkyv_adapter::serialize_said(&said).unwrap();
138        self.get_event_by_serialized_key(&key.as_slice())
139    }
140
141    fn get_signatures(
142        &self,
143        said: &SelfAddressingIdentifier,
144    ) -> Result<Option<impl Iterator<Item = IndexedSignature>>, RedbError> {
145        let key = rkyv_adapter::serialize_said(&said).unwrap();
146        self.get_signatures_by_serialized_key(&key.as_slice())
147    }
148
149    fn get_nontrans_couplets(
150        &self,
151        said: &SelfAddressingIdentifier,
152    ) -> Result<Option<impl Iterator<Item = Nontransferable>>, RedbError> {
153        let said = rkyv_adapter::serialize_said(said)?;
154        self.get_nontrans_couplets_by_key(&said)
155    }
156
157    fn get_trans_receipts(
158        &self,
159        said: &SelfAddressingIdentifier,
160    ) -> Result<impl DoubleEndedIterator<Item = Transferable>, RedbError> {
161        let key = rkyv_adapter::serialize_said(said)?;
162        self.get_trans_receipts_by_serialized_key(key.as_slice())
163    }
164
165    fn remove_nontrans_receipt(
166        &self,
167        txn_mode: &WriteTxnMode,
168        said: &SelfAddressingIdentifier,
169        nontrans: impl IntoIterator<Item = Nontransferable>,
170    ) -> Result<(), RedbError> {
171        let serialized_said = rkyv_adapter::serialize_said(said)?;
172        execute_in_transaction(self.db.clone(), txn_mode, |write_txn| {
173            let mut table = write_txn.open_multimap_table(NONTRANS_RCTS)?;
174
175            for value in nontrans {
176                let value = rkyv::to_bytes::<rancor::Error>(&value)?;
177                table.remove(serialized_said.as_slice(), value.as_slice())?;
178            }
179            Ok(())
180        })
181    }
182
183    fn remove_nontrans_receipt_with_new_transaction(
184        &self,
185        said: &SelfAddressingIdentifier,
186        nontrans: impl IntoIterator<Item = Nontransferable>,
187    ) -> Result<(), RedbError> {
188        self.remove_nontrans_receipt(&WriteTxnMode::CreateNew, said, nontrans)
189    }
190}
191
192impl LogDatabase {
193    pub(super) fn get_signed_event_by_serialized_key(
194        &self,
195        key: &[u8],
196    ) -> Result<Option<TimestampedSignedEventMessage>, RedbError> {
197        let signatures = self
198            .get_signatures_by_serialized_key(&key)
199            .unwrap()
200            .unwrap()
201            .collect();
202        let source_seal = self.get_delegator_seal_by_serialized_key(key)?;
203
204        let event = self.get_event_by_serialized_key(&key)?;
205        Ok(event.map(|ev| {
206            let receipts = self
207                .get_nontrans_couplets_by_key(key)
208                .unwrap()
209                .map(|vec| vec.collect());
210            TimestampedSignedEventMessage::new(SignedEventMessage::new(
211                &ev,
212                signatures,
213                receipts,
214                source_seal,
215            ))
216        }))
217    }
218
219    /// Saves provided event into key event table. Key is it's digest and value is event.
220    fn insert_key_event(
221        &self,
222        txn_mode: &WriteTxnMode,
223        event: &KeriEvent<KeyEvent>,
224    ) -> Result<(), RedbError> {
225        let digest = event.digest().map_err(|_e| RedbError::MissingDigest)?;
226        let value = rkyv::to_bytes::<rkyv::rancor::Error>(event)?;
227
228        execute_in_transaction(self.db.clone(), txn_mode, |write_txn| {
229            let mut table = write_txn.open_table(EVENTS)?;
230            let key = rkyv_adapter::serialize_said(&digest)?;
231            table.insert(key.as_slice(), &value.as_ref())?;
232            Ok(())
233        })
234    }
235
236    fn insert_with_digest_key<
237        V: for<'a> rkyv::Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rkyv::rancor::Error>>,
238    >(
239        &self,
240        txn_mode: &WriteTxnMode,
241        table: MultimapTableDefinition<&[u8], &[u8]>,
242        said: &SelfAddressingIdentifier,
243        values: &[V],
244    ) -> Result<(), RedbError> {
245        let serialized_said = rkyv_adapter::serialize_said(said)?;
246        execute_in_transaction(self.db.clone(), txn_mode, |write_txn| {
247            let mut table = write_txn.open_multimap_table(table)?;
248
249            for value in values {
250                let sig = rkyv::to_bytes(value)?;
251                table.insert(serialized_said.as_slice(), sig.as_slice())?;
252            }
253            Ok(())
254        })
255    }
256
257    pub(super) fn insert_nontrans_receipt(
258        &self,
259        txn_mode: &WriteTxnMode,
260        said: &SelfAddressingIdentifier,
261        nontrans: &[Nontransferable],
262    ) -> Result<(), RedbError> {
263        self.insert_with_digest_key(txn_mode, NONTRANS_RCTS, said, nontrans)
264    }
265
266    pub(super) fn insert_source_seal(
267        &self,
268        txn_mode: &WriteTxnMode,
269        said: &SelfAddressingIdentifier,
270        seal: &SourceSeal,
271    ) -> Result<(), RedbError> {
272        let serialized_said = rkyv_adapter::serialize_said(said)?;
273        execute_in_transaction(self.db.clone(), txn_mode, |write_txn| {
274            let mut table = write_txn.open_table(SEALS)?;
275
276            let seal = rkyv::to_bytes::<rkyv::rancor::Error>(seal)?;
277            table.insert(serialized_said.as_slice(), seal.as_ref())?;
278            Ok(())
279        })
280    }
281
282    pub(super) fn insert_trans_receipt(
283        &self,
284        said: &SelfAddressingIdentifier,
285        trans: &[Transferable],
286    ) -> Result<(), RedbError> {
287        self.insert_with_digest_key(&WriteTxnMode::CreateNew, TRANS_RCTS, &said, trans)
288    }
289
290    pub(super) fn insert_indexed_signatures(
291        &self,
292        txn_mode: &WriteTxnMode,
293        said: &SelfAddressingIdentifier,
294        signatures: &[IndexedSignature],
295    ) -> Result<(), RedbError> {
296        self.insert_with_digest_key(txn_mode, SIGS, said, signatures)
297    }
298
299    pub(super) fn get_nontrans_couplets_by_key(
300        &self,
301        key: &[u8],
302    ) -> Result<Option<impl Iterator<Item = Nontransferable>>, RedbError> {
303        let from_db_iterator = {
304            let read_txn = self.db.begin_read()?;
305            let table = read_txn.open_multimap_table(NONTRANS_RCTS)?;
306            table.get(key)
307        }?;
308        let nontrans = from_db_iterator
309            .map(|sig| match sig {
310                Ok(sig) => Ok(rkyv_adapter::deserialize_nontransferable(sig.value()).unwrap()),
311                Err(e) => Err(RedbError::from(e)),
312            })
313            .collect::<Result<Vec<_>, _>>();
314        nontrans.map(|el| {
315            if el.is_empty() {
316                None
317            } else {
318                Some(el.into_iter())
319            }
320        })
321    }
322
323    fn get_trans_receipts_by_serialized_key(
324        &self,
325        key: &[u8],
326    ) -> Result<impl DoubleEndedIterator<Item = Transferable>, RedbError> {
327        let from_db_iterator = {
328            let read_txn = self.db.begin_read()?;
329            let table = read_txn.open_multimap_table(TRANS_RCTS)?;
330            table.get(key)
331        }?;
332        Ok(from_db_iterator.map(|sig| match sig {
333            Ok(sig) => rkyv_adapter::deserialize_transferable(sig.value()).unwrap(),
334            Err(_) => todo!(),
335        }))
336    }
337
338    fn get_event_by_serialized_key(
339        &self,
340        said_arch: &[u8],
341    ) -> Result<Option<KeriEvent<KeyEvent>>, RedbError> {
342        let read_txn = self.db.begin_read()?;
343        let table = read_txn.open_table(EVENTS)?;
344
345        if let Some(event) = table.get(said_arch)? {
346            let bytes = event.value().to_vec();
347            let deser: KeriEvent<KeyEvent> = rkyv::from_bytes::<_, Failure>(&bytes).unwrap();
348            Ok(Some(deser))
349        } else {
350            Ok(None)
351        }
352    }
353
354    fn get_signatures_by_serialized_key(
355        &self,
356        key: &[u8],
357    ) -> Result<Option<impl Iterator<Item = IndexedSignature>>, RedbError> {
358        let from_db_iterator = {
359            let read_txn = self.db.begin_read()?;
360            let table: redb::ReadOnlyMultimapTable<&[u8], &[u8]> =
361                read_txn.open_multimap_table(SIGS)?;
362            table.get(key)
363        }?;
364        Ok(Some(from_db_iterator.map(|sig| match sig {
365            Ok(sig) => deserialize_indexed_signatures(sig.value()).unwrap(),
366            Err(_) => todo!(),
367        })))
368    }
369
370    fn get_delegator_seal_by_serialized_key(
371        &self,
372        key: &[u8],
373    ) -> Result<Option<SourceSeal>, RedbError> {
374        let maybe_seal = {
375            let read_txn = self.db.begin_read()?;
376            let table = read_txn.open_table(SEALS)?;
377            table.get(key)
378        }?;
379        Ok(maybe_seal.map(|seal| deserialize_source_seal(seal.value()).unwrap()))
380    }
381}
382
383#[test]
384fn test_retrieve_by_digest() {
385    use crate::actor::parse_event_stream;
386    use crate::database::LogDatabase as LogDb;
387    use crate::event_message::signed_event_message::{Message, Notice};
388    use tempfile::NamedTempFile;
389    // Create test db path.
390    let file_path = NamedTempFile::new().unwrap();
391
392    let db = Arc::new(Database::create(file_path.path()).unwrap());
393    let log = LogDatabase::new(db.clone()).unwrap();
394    // let db = RedbDatabase::new(file_path.path()).unwrap();
395
396    let icp_raw: &[u8] = br#"{"v":"KERI10JSON0001e7_","t":"icp","d":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"0","kt":"2","k":["DErocgXD2RGSyvn3MObcx59jeOsEQhv2TqHirVkzrp0Q","DFXLiTjiRdSBPLL6hLa0rskIxk3dh4XwJLfctkJFLRSS","DE9YgIQVgpLwocTVrG8tidKScsQSMWwLWywNC48fhq4f"],"nt":"2","n":["EDJk5EEpC4-tQ7YDwBiKbpaZahh1QCyQOnZRF7p2i8k8","EAXfDjKvUFRj-IEB_o4y-Y_qeJAjYfZtOMD9e7vHNFss","EN8l6yJC2PxribTN0xfri6bLz34Qvj-x3cNwcV3DvT2m"],"bt":"0","b":[],"c":[],"a":[]}-AADAAD4SyJSYlsQG22MGXzRGz2PTMqpkgOyUfq7cS99sC2BCWwdVmEMKiTEeWe5kv-l_d9auxdadQuArLtAGEArW8wEABD0z_vQmFImZXfdR-0lclcpZFfkJJJNXDcUNrf7a-mGsxNLprJo-LROwDkH5m7tVrb-a1jcor2dHD9Jez-r4bQIACBFeU05ywfZycLdR0FxCvAR9BfV9im8tWe1DglezqJLf-vHRQSChY1KafbYNc96hYYpbuN90WzuCRMgV8KgRsEC"#;
397    let rot_raw: &[u8] = br#"{"v":"KERI10JSON00021c_","t":"rot","d":"EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"1","p":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","kt":"2","k":["DCjxOXniUc5EUzDqERlXdptfKPHy6jNo_ZGsS4Vd8fAE","DNZHARO4dCJlluv0qezEMRmErIWWc-lzOzolBOQ15tHV","DOCQ4KN1jUlKbfjRteDYt9fxgpq1NK9_MqO5IA7shpED"],"nt":"2","n":["EN8l6yJC2PxribTN0xfri6bLz34Qvj-x3cNwcV3DvT2m","EATiZAHl0kzKID6faaQP2O7zB3Hj7eH3bE-vgKVAtsyU","EG6e7dJhh78ZqeIZ-eMbe-OB3TwFMPmrSsh9k75XIjLP"],"bt":"0","br":[],"ba":[],"a":[]}-AADAAAqV6xpsAAEB_FJP5UdYO5qiJphz8cqXbTjB9SRy8V0wIim-lgafF4o-b7TW0spZtzx2RXUfZLQQCIKZsw99k8AABBP8nfF3t6bf4z7eNoBgUJR-hdhw7wnlljMZkeY5j2KFRI_s8wqtcOFx1A913xarGJlO6UfrqFWo53e9zcD8egIACB8DKLMZcCGICuk98RCEVuS0GsqVngi1d-7gAX0jid42qUcR3aiYDMp2wJhqJn-iHJVvtB-LK7TRTggBtMDjuwB"#;
398    let ixn_raw: &[u8] = br#"{"v":"KERI10JSON0000cb_","t":"ixn","d":"EL6Dpm72KXayaUHYvVHlhPplg69fBvRt1P3YzuOGVpmz","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"2","p":"EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz","a":[]}-AADAABgep0kbpgl91vvcXziJ7tHY1WVTAcUJyYCBNqTcNuK9AfzLHfKHhJeSC67wFRU845qjLSAC-XwWaqWgyAgw_8MABD5wTnqqJcnLWMA7NZ1vLOTzDspInJrly7O4Kt6Jwzue9z2TXkDXi1jr69JeKbzUQ6c2Ka1qPXAst0JzrOiyuAPACAcLHnOz1Owtgq8mcR_-PpAr91zOTK_Zj9r0V-9P47vzGsYwAxcVshclfhCMhu73aZuZbvQhy9Rxcj-qRz96cIL"#;
399    let second_icp_raw = br#"{"v":"KERI10JSON000159_","t":"icp","d":"EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf","i":"EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf","s":"0","kt":"1","k":["DIwDbi2Sr1kLZFpsX0Od6Y8ariGVLLjZXxBC5bXEI85e"],"nt":"1","n":["ELhmgZ5JFc-ACs9TJxHMxtcKzQxKXLhlAmUT_sKf1-l7"],"bt":"0","b":["DM73ulUG2_DJyA27DfxBXT5SJ5U3A3c2oeG8Z4bUOgyL"],"c":[],"a":[]}-AABAAAPGpCUdR6EfVWROUjpuTsxg5BIcMnfi7PDciv8VuY9NqZ0ioRoaHxMZue_5ALys86sX4aQzKqm_bID3ZBwlMUP"#;
400
401    for event in [icp_raw, rot_raw, ixn_raw, second_icp_raw] {
402        let evs = parse_event_stream(event).unwrap();
403        let ev = evs.first().unwrap();
404        match ev {
405            Message::Notice(Notice::Event(event)) => {
406                log.log_event(&WriteTxnMode::CreateNew, event).unwrap();
407            }
408            _ => unreachable!(),
409        }
410    }
411
412    // Find event by digest
413    let ev_digest: SelfAddressingIdentifier = "EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen"
414        .parse()
415        .unwrap();
416
417    let event_from_db = log.get_event(&ev_digest).unwrap().unwrap();
418    let expected_event = &icp_raw[..487]; // icp event without signatures
419    assert_eq!(event_from_db.encode().unwrap(), expected_event);
420
421    let sigs_from_db = log.get_signatures(&ev_digest).unwrap().unwrap();
422    assert_eq!(sigs_from_db.count(), 3);
423
424    // Find event by digest
425    let ev_digest: SelfAddressingIdentifier = "EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf"
426        .parse()
427        .unwrap();
428    // Warning: order of retrieved signatures isn't the same as insertion order
429    let sigs_from_db = log.get_signatures(&ev_digest).unwrap().unwrap();
430    assert_eq!(sigs_from_db.count(), 1);
431}
432
433#[test]
434fn test_retrieve_receipts() {
435    use crate::actor::parse_event_stream;
436    use crate::database::EventDatabase;
437    use crate::event_message::signed_event_message::{Message, Notice};
438    use crate::prefix::IdentifierPrefix;
439    use tempfile::NamedTempFile;
440    // Create test db path.
441    let file_path = NamedTempFile::new().unwrap();
442
443    let db = super::RedbDatabase::new(file_path.path()).unwrap();
444
445    let receipt0_0 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","i":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","s":"0"}-CABBN_PYSns7oFNixSohVW4raBwMV6iYeh0PEZ_bR-38Xev0BDbyebqZQKwn7TqU92Vtw8n2wy5FptP42F1HEmCc9nQLzbXrXuA9SMl9nCZ-vi2bdaeT3aqInXGFAW70QPzM4kJ"#;
446    let receipt0_1 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","i":"EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9","s":"0"}-CABBHndk6cXPCnghFqKt_0SikY1P9z_nIUrHq_SeHgLQCui0BBqAOBXFKVivgf0jh2ySWX1VshnkUYK3ev_L--sPB_onF7w2WhiK2AB7mf4IIuaSQCLumsr2sV77S6U5VMx0CAD"#;
447
448    let receipt1_0 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EBgRuemKRwpDnemmrA9bbWyp0Ar4BHVv4ZjIv8mBGJxj","i":"EPNYUP688XxtHUfxeHlqxqSduMHmWrpjRzlUCKPtvB7t","s":"2"}-CABBDg1zxxf8u4Hx5IPraZzmStfSCZFZbDzMHjqVcFW5OfP0BCjSM8pvkBQNcx3yN1fPMfaOqGllSBsYX9bijFWQV_d9PxJI1dvxt5lW4xAf9SGWb28Nzt3J0MOsO69aYMy0XMD"#;
449    let receipt1_1 = br#"{"v":"KERI10JSON000091_","t":"rct","d":"EBgRuemKRwpDnemmrA9bbWyp0Ar4BHVv4ZjIv8mBGJxj","i":"EPNYUP688XxtHUfxeHlqxqSduMHmWrpjRzlUCKPtvB7t","s":"2"}-CABBJq7UABlttINuWJh1Xl2lkqZG4NTdUdqnbFJDa6ZyxCC0BBRh0mufCacSimM85yV0kcFnx6U76XR5vibN4biUtzmrjl_s2yvVUDBu3vZvRwH-wy6FgU02WydaFmmeysG_pAN"#;
450
451    let first_id: IdentifierPrefix = "EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9"
452        .parse()
453        .unwrap();
454
455    for event in [receipt0_0, receipt0_1, receipt1_0, receipt1_1] {
456        let evs = parse_event_stream(event).unwrap();
457        let ev = evs.first().unwrap();
458        match ev {
459            Message::Notice(Notice::NontransferableRct(rct)) => {
460                db.add_receipt_nt(rct.clone(), &first_id).unwrap();
461            }
462            _ => unreachable!(),
463        }
464    }
465
466    let recipted_event_digest: SelfAddressingIdentifier =
467        "EJufgwH347N2kobmes1IQw_1pfMipEFFy0RwinZTtah9"
468            .parse()
469            .unwrap();
470    let retrived_rcts = db
471        .log_db
472        .get_nontrans_couplets(&recipted_event_digest)
473        .unwrap();
474    assert_eq!(retrived_rcts.unwrap().count(), 2);
475
476    let recipted_event_digest: SelfAddressingIdentifier =
477        "EBgRuemKRwpDnemmrA9bbWyp0Ar4BHVv4ZjIv8mBGJxj"
478            .parse()
479            .unwrap();
480    let retrived_rcts = db
481        .log_db
482        .get_nontrans_couplets(&recipted_event_digest)
483        .unwrap();
484    assert_eq!(retrived_rcts.unwrap().count(), 2);
485}