keri_core/database/redb/
mod.rs

1pub mod escrow_database;
2#[cfg(feature = "query")]
3pub(crate) mod ksn_log;
4pub mod loging;
5pub(crate) mod rkyv_adapter;
6
7/// Kel storage. (identifier, sn) -> event digest
8/// The `KELS` table links an identifier and sequence number to the digest of an event,
9/// referencing the actual event stored in the `EVENTS` table.
10const KELS: TableDefinition<(&str, u64), &[u8]> = TableDefinition::new("kels");
11
12/// Key states storage. (identifier) -> key state
13/// The `KEY_STATES` table stores the state of each identifier, which is updated
14/// as events are processed.
15const KEY_STATES: TableDefinition<&str, &[u8]> = TableDefinition::new("key_states");
16
17use std::{path::Path, sync::Arc, u64};
18
19#[cfg(feature = "query")]
20use crate::query::reply_event::SignedReply;
21#[cfg(feature = "query")]
22use ksn_log::AcceptedKsn;
23use loging::LogDatabase;
24use redb::{Database, ReadableTable, TableDefinition};
25use said::{sad::SerializationFormats, SelfAddressingIdentifier};
26
27use crate::{
28    event::{receipt::Receipt, KeyEvent},
29    event_message::{
30        msg::KeriEvent,
31        signature::{Nontransferable, Transferable},
32        signed_event_message::{
33            SignedEventMessage, SignedNontransferableReceipt, SignedTransferableReceipt,
34        },
35    },
36    prefix::IdentifierPrefix,
37    state::IdentifierState,
38};
39use cesrox::primitives::CesrPrimitive;
40
41use super::{timestamped, EventDatabase, LogDatabase as LogDatabaseTrait, QueryParameters};
42
43#[derive(Debug, thiserror::Error)]
44pub enum RedbError {
45    #[error("Failed to create database. Reason: {0}")]
46    DatabaseCreationFiled(#[from] redb::DatabaseError),
47    #[error("Failed to save to database. Reason: {0}")]
48    TransactionFiled(#[from] redb::TransactionError),
49    #[error("Failed to save to database. Reason: {0}")]
50    CommitFiled(#[from] redb::CommitError),
51    #[error("Table opening error. Reason: {0}")]
52    TableError(#[from] redb::TableError),
53    #[error("Saving element error. Reason: {0}")]
54    InsertingError(#[from] redb::StorageError),
55    #[error("Retrieving element error. Reason: {0}")]
56    RetrievingError(redb::Error),
57    #[error("Value format error")]
58    WrongValue,
59    #[error("Key format error")]
60    WrongKey(#[from] KeyError),
61    #[error("No event for digest {0} found")]
62    NotFound(SelfAddressingIdentifier),
63    #[error("No digest in provided event")]
64    MissingDigest,
65    #[error("Rkyv error: {0}")]
66    Rkyv(#[from] rkyv::rancor::Error),
67    #[error("Already saved: {0}")]
68    AlreadySaved(SelfAddressingIdentifier),
69}
70
71#[derive(Debug, thiserror::Error)]
72pub enum KeyError {
73    #[error("Can't parse said in key")]
74    UnparsableSaid,
75    #[error("Can't parse index in key")]
76    UnparsableIndex,
77}
78
79/// Represents the mode for executing database transactions.
80pub enum WriteTxnMode<'a> {
81    /// Initiates a new transaction that is committed after operations are executed.
82    CreateNew,
83    /// Utilizes an already active transaction for operations.
84    UseExisting(&'a redb::WriteTransaction),
85}
86pub struct RedbDatabase {
87    pub(crate) db: Arc<Database>,
88    pub(crate) log_db: Arc<LogDatabase>,
89    #[cfg(feature = "query")]
90    accepted_rpy: Arc<AcceptedKsn>,
91}
92
93impl RedbDatabase {
94    pub fn new(db_path: &Path) -> Result<Self, RedbError> {
95        let db = Arc::new(Database::create(db_path)?);
96        let log_db = Arc::new(LogDatabase::new(db.clone())?);
97        // Create tables
98        let write_txn = db.begin_write()?;
99        {
100            write_txn.open_table(KELS)?;
101            write_txn.open_table(KEY_STATES)?;
102        }
103        write_txn.commit()?;
104        Ok(Self {
105            db: db.clone(),
106            log_db,
107            #[cfg(feature = "query")]
108            accepted_rpy: Arc::new(AcceptedKsn::new(db.clone())?),
109        })
110    }
111}
112
113impl EventDatabase for RedbDatabase {
114    type Error = RedbError;
115    type LogDatabaseType = LogDatabase;
116
117    fn get_log_db(&self) -> Arc<Self::LogDatabaseType> {
118        self.log_db.clone()
119    }
120
121    fn add_kel_finalized_event(
122        &self,
123        signed_event: SignedEventMessage,
124        _id: &IdentifierPrefix,
125    ) -> Result<(), RedbError> {
126        let write_txn = self.db.begin_write()?;
127        let txn_mode = WriteTxnMode::UseExisting(&write_txn);
128
129        self.update_key_state(&txn_mode, &signed_event.event_message)?;
130        self.log_db.log_event(&txn_mode, &signed_event)?;
131
132        self.save_to_kel(&txn_mode, &signed_event.event_message)?;
133
134        write_txn.commit()?;
135        Ok(())
136    }
137
138    fn add_receipt_t(
139        &self,
140        receipt: SignedTransferableReceipt,
141        _id: &IdentifierPrefix,
142    ) -> Result<(), RedbError> {
143        let digest = receipt.body.receipted_event_digest;
144        let transferable = Transferable::Seal(receipt.validator_seal, receipt.signatures);
145        self.log_db.insert_trans_receipt(&digest, &[transferable])
146    }
147
148    fn add_receipt_nt(
149        &self,
150        receipt: SignedNontransferableReceipt,
151        _id: &IdentifierPrefix,
152    ) -> Result<(), RedbError> {
153        let receipted_event_digest = receipt.body.receipted_event_digest;
154        let receipts = receipt.signatures;
155        self.log_db.insert_nontrans_receipt(
156            &WriteTxnMode::CreateNew,
157            &receipted_event_digest,
158            &receipts,
159        )
160    }
161
162    fn get_key_state(&self, id: &IdentifierPrefix) -> Option<IdentifierState> {
163        let read_txn = self.db.begin_read().unwrap();
164        let table = read_txn.open_table(KEY_STATES).unwrap();
165        let key = id.to_str();
166        if let Some(key_state) = table.get(key.as_str()).unwrap() {
167            let bytes = key_state.value();
168            Some(rkyv_adapter::deserialize_identifier_state(bytes).unwrap())
169        } else {
170            None
171        }
172    }
173
174    fn get_kel_finalized_events(
175        &self,
176        params: super::QueryParameters,
177    ) -> Option<impl DoubleEndedIterator<Item = super::timestamped::TimestampedSignedEventMessage>>
178    {
179        let out = match params {
180            QueryParameters::BySn { id, sn } => self
181                .get_kel(&id, sn, 1)
182                .map(|el| Some(el.into_iter()))
183                .unwrap(),
184            QueryParameters::Range { id, start, limit } => self
185                .get_kel(&id, start, limit)
186                .map(|el| Some(el.into_iter()))
187                .unwrap(),
188            QueryParameters::All { id } => self.get_full_kel(id).map(|kel| kel.into_iter()),
189        };
190        out
191    }
192
193    fn get_receipts_t(
194        &self,
195        params: super::QueryParameters,
196    ) -> Option<impl DoubleEndedIterator<Item = Transferable>> {
197        match params {
198            QueryParameters::BySn { id, sn } => {
199                if let Ok(Some(said)) = self.get_event_digest(&id, sn) {
200                    let receipts = self.log_db.get_trans_receipts(&said).ok()?;
201                    Some(receipts.collect::<Vec<_>>().into_iter())
202                } else {
203                    None
204                }
205            }
206            QueryParameters::Range {
207                id: _,
208                start: _,
209                limit: _,
210            } => todo!(),
211            QueryParameters::All { id: _ } => todo!(),
212        }
213    }
214
215    fn get_receipts_nt(
216        &self,
217        params: super::QueryParameters,
218    ) -> Option<impl DoubleEndedIterator<Item = SignedNontransferableReceipt>> {
219        match params {
220            QueryParameters::BySn { id, sn } => self
221                .get_nontrans_receipts_range(&id.to_str(), sn, 1)
222                .ok()
223                .map(|e| e.into_iter()),
224            QueryParameters::Range { id, start, limit } => self
225                .get_nontrans_receipts_range(&id.to_str(), start, limit)
226                .ok()
227                .map(|e| e.into_iter()),
228            QueryParameters::All { id } => self
229                .get_nontrans_receipts_range(&id.to_str(), 0, u64::MAX)
230                .ok()
231                .map(|e| e.into_iter()),
232        }
233    }
234
235    fn accept_to_kel(&self, event: &KeriEvent<KeyEvent>) -> Result<(), RedbError> {
236        let txn_mode = WriteTxnMode::CreateNew;
237        self.save_to_kel(&txn_mode, event)?;
238        self.update_key_state(&txn_mode, event)?;
239
240        Ok(())
241    }
242
243    #[cfg(feature = "query")]
244    fn save_reply(&self, reply: SignedReply) -> Result<(), Self::Error> {
245        self.accepted_rpy.insert(reply)
246    }
247
248    #[cfg(feature = "query")]
249    fn get_reply(&self, id: &IdentifierPrefix, from_who: &IdentifierPrefix) -> Option<SignedReply> {
250        self.accepted_rpy.get(id, from_who).unwrap()
251    }
252}
253
254impl RedbDatabase {
255    /// Saves KEL event of given identifier. Key is identifier and sn of event, and value is event digest.
256    fn save_to_kel(
257        &self,
258        txn_mode: &WriteTxnMode,
259        event: &KeriEvent<KeyEvent>,
260    ) -> Result<(), RedbError> {
261        let digest = event.digest().map_err(|_e| RedbError::MissingDigest)?;
262
263        execute_in_transaction(self.db.clone(), txn_mode, |write_txn| {
264            let mut table = write_txn.open_table(KELS)?;
265            let id = event.data.prefix.to_str();
266            let sn = event.data.sn;
267            let serialized_said = rkyv_adapter::serialize_said(&digest)?;
268            table.insert((id.as_str(), sn), &serialized_said.as_slice())?;
269            Ok(())
270        })
271    }
272
273    fn update_key_state(
274        &self,
275        txn_mode: &WriteTxnMode,
276        event: &KeriEvent<KeyEvent>,
277    ) -> Result<(), RedbError> {
278        execute_in_transaction(self.db.clone(), txn_mode, |write_txn| {
279            let mut table = write_txn.open_table(KEY_STATES)?;
280            let key = event.data.prefix.to_str();
281
282            let key_state = if let Some(key_state) = table.get(key.as_str())? {
283                let bytes = key_state.value();
284                rkyv_adapter::deserialize_identifier_state(bytes).unwrap()
285            } else {
286                IdentifierState::default()
287            };
288
289            let key_state = key_state
290                .apply(event)
291                .map_err(|_e| RedbError::AlreadySaved(event.digest().unwrap()))?;
292            let value = rkyv::to_bytes::<rkyv::rancor::Error>(&key_state)?;
293            table.insert(key.as_str(), value.as_ref())?;
294
295            Ok(())
296        })
297    }
298
299    fn get_event_digest(
300        &self,
301        identifier: &IdentifierPrefix,
302        sn: u64,
303    ) -> Result<Option<SelfAddressingIdentifier>, RedbError> {
304        Ok({
305            let read_txn = self.db.begin_read().unwrap();
306            let table = read_txn.open_table(KELS)?;
307            table
308                .get((identifier.to_str().as_str(), sn))?
309                .map(|value| -> Result<SelfAddressingIdentifier, RedbError> {
310                    let digest: SelfAddressingIdentifier =
311                        rkyv_adapter::deserialize_said(value.value())?;
312                    Ok(digest)
313                })
314                .transpose()?
315        })
316    }
317
318    fn get_nontrans_receipts_range(
319        &self,
320        id: &str,
321        start: u64,
322        limit: u64,
323    ) -> Result<Vec<SignedNontransferableReceipt>, RedbError> {
324        let corresponding_digests = {
325            let read_txn = self.db.begin_read()?;
326            let table = read_txn.open_table(KELS)?;
327            table.range((id, start)..(id, start + limit))
328        }?;
329        let out = corresponding_digests
330            .map(|digest| match digest {
331                Ok((_, value)) => {
332                    let said = rkyv_adapter::deserialize_said(value.value()).unwrap();
333                    let nontrans = self
334                        .log_db
335                        .get_nontrans_couplets_by_key(value.value())
336                        .unwrap()
337                        .map(|vec| vec.collect())
338                        .unwrap_or_default();
339                    let identifier = id.parse().unwrap();
340                    let rct = Receipt::new(SerializationFormats::JSON, said, identifier, start);
341                    SignedNontransferableReceipt {
342                        body: rct,
343                        signatures: nontrans,
344                    }
345                }
346                Err(_) => todo!(),
347            })
348            .collect();
349        Ok(out)
350    }
351
352    fn get_all_nontrans_receipts_couplets(
353        &self,
354        id: &str,
355    ) -> Result<Box<dyn DoubleEndedIterator<Item = Nontransferable>>, RedbError> {
356        let corresponding_digests = {
357            let read_txn = self.db.begin_read()?;
358            let table = read_txn.open_table(KELS)?;
359            table.range((id, 0)..(id, u64::MAX))
360        }?;
361        let out = corresponding_digests
362            .map(|digest| match digest {
363                Ok((_, value)) => self
364                    .log_db
365                    .get_nontrans_couplets_by_key(value.value())
366                    .unwrap()
367                    .unwrap(),
368                Err(_) => todo!(),
369            })
370            .flatten();
371
372        Ok(Box::new(out.collect::<Vec<_>>().into_iter()))
373    }
374
375    fn get_kel<'a>(
376        &'a self,
377        id: &IdentifierPrefix,
378        from: u64,
379        limit: u64,
380    ) -> Result<Vec<timestamped::Timestamped<SignedEventMessage>>, RedbError> {
381        let digests = {
382            let read_txn = self.db.begin_read()?;
383            let table = read_txn.open_table(KELS)?;
384            table.range((id.to_str().as_str(), from)..(id.to_str().as_str(), from + limit))?
385        };
386
387        digests
388            .filter_map(|entry| {
389                let (_, value) = entry.unwrap();
390                self.log_db
391                    .get_signed_event_by_serialized_key(value.value())
392                    .transpose()
393            })
394            .collect()
395    }
396
397    fn get_full_kel<'a>(
398        &'a self,
399        id: &IdentifierPrefix,
400    ) -> Option<Vec<timestamped::Timestamped<SignedEventMessage>>> {
401        let digests = {
402            let read_txn = self.db.begin_read().unwrap();
403            let table = read_txn.open_table(KELS);
404            match table {
405                Ok(table) => table
406                    .range((id.to_str().as_str(), 0)..(id.to_str().as_str(), u64::MAX))
407                    .unwrap(),
408                Err(_e) => return None,
409            }
410        };
411
412        let kel = digests
413            .map(|entry| {
414                let (_key, value) = entry.unwrap();
415                self.log_db
416                    .get_signed_event_by_serialized_key(value.value())
417                    .unwrap()
418                    .unwrap()
419            })
420            .collect::<Vec<_>>();
421        if kel.is_empty() {
422            None
423        } else {
424            Some(kel)
425        }
426    }
427}
428
429/// Executes a given operation within a transaction context.
430/// Uses an existing transaction if `WriteTxnMode::UseExisting` is specified.
431/// Creates and commits a new transaction if `WriteTxnMode::CreateNew` is specified.
432pub fn execute_in_transaction<F>(
433    db: Arc<Database>,
434    txn_mode: &WriteTxnMode,
435    operation: F,
436) -> Result<(), RedbError>
437where
438    F: FnOnce(&redb::WriteTransaction) -> Result<(), RedbError>,
439{
440    match *txn_mode {
441        WriteTxnMode::UseExisting(existing_txn) => {
442            operation(existing_txn)?;
443        }
444        WriteTxnMode::CreateNew => {
445            let txn = db.begin_write()?;
446            operation(&txn)?;
447            txn.commit()?;
448        }
449    };
450
451    Ok(())
452}
453
454#[test]
455fn test_retrieve_kel() -> Result<(), RedbError> {
456    use crate::actor::parse_event_stream;
457    use crate::event_message::signed_event_message::{Message, Notice};
458    use crate::event_message::EventTypeTag;
459    use tempfile::NamedTempFile;
460    // Create test db path.
461    let file_path = NamedTempFile::new().unwrap();
462
463    let db = RedbDatabase::new(file_path.path()).unwrap();
464
465    let icp_raw: &[u8] = br#"{"v":"KERI10JSON0001e7_","t":"icp","d":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"0","kt":"2","k":["DErocgXD2RGSyvn3MObcx59jeOsEQhv2TqHirVkzrp0Q","DFXLiTjiRdSBPLL6hLa0rskIxk3dh4XwJLfctkJFLRSS","DE9YgIQVgpLwocTVrG8tidKScsQSMWwLWywNC48fhq4f"],"nt":"2","n":["EDJk5EEpC4-tQ7YDwBiKbpaZahh1QCyQOnZRF7p2i8k8","EAXfDjKvUFRj-IEB_o4y-Y_qeJAjYfZtOMD9e7vHNFss","EN8l6yJC2PxribTN0xfri6bLz34Qvj-x3cNwcV3DvT2m"],"bt":"0","b":[],"c":[],"a":[]}-AADAAD4SyJSYlsQG22MGXzRGz2PTMqpkgOyUfq7cS99sC2BCWwdVmEMKiTEeWe5kv-l_d9auxdadQuArLtAGEArW8wEABD0z_vQmFImZXfdR-0lclcpZFfkJJJNXDcUNrf7a-mGsxNLprJo-LROwDkH5m7tVrb-a1jcor2dHD9Jez-r4bQIACBFeU05ywfZycLdR0FxCvAR9BfV9im8tWe1DglezqJLf-vHRQSChY1KafbYNc96hYYpbuN90WzuCRMgV8KgRsEC"#;
466    let rot_raw: &[u8] = br#"{"v":"KERI10JSON00021c_","t":"rot","d":"EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"1","p":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","kt":"2","k":["DCjxOXniUc5EUzDqERlXdptfKPHy6jNo_ZGsS4Vd8fAE","DNZHARO4dCJlluv0qezEMRmErIWWc-lzOzolBOQ15tHV","DOCQ4KN1jUlKbfjRteDYt9fxgpq1NK9_MqO5IA7shpED"],"nt":"2","n":["EN8l6yJC2PxribTN0xfri6bLz34Qvj-x3cNwcV3DvT2m","EATiZAHl0kzKID6faaQP2O7zB3Hj7eH3bE-vgKVAtsyU","EG6e7dJhh78ZqeIZ-eMbe-OB3TwFMPmrSsh9k75XIjLP"],"bt":"0","br":[],"ba":[],"a":[]}-AADAAAqV6xpsAAEB_FJP5UdYO5qiJphz8cqXbTjB9SRy8V0wIim-lgafF4o-b7TW0spZtzx2RXUfZLQQCIKZsw99k8AABBP8nfF3t6bf4z7eNoBgUJR-hdhw7wnlljMZkeY5j2KFRI_s8wqtcOFx1A913xarGJlO6UfrqFWo53e9zcD8egIACB8DKLMZcCGICuk98RCEVuS0GsqVngi1d-7gAX0jid42qUcR3aiYDMp2wJhqJn-iHJVvtB-LK7TRTggBtMDjuwB"#;
467    let ixn_raw: &[u8] = br#"{"v":"KERI10JSON0000cb_","t":"ixn","d":"EL6Dpm72KXayaUHYvVHlhPplg69fBvRt1P3YzuOGVpmz","i":"EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen","s":"2","p":"EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz","a":[]}-AADAABgep0kbpgl91vvcXziJ7tHY1WVTAcUJyYCBNqTcNuK9AfzLHfKHhJeSC67wFRU845qjLSAC-XwWaqWgyAgw_8MABD5wTnqqJcnLWMA7NZ1vLOTzDspInJrly7O4Kt6Jwzue9z2TXkDXi1jr69JeKbzUQ6c2Ka1qPXAst0JzrOiyuAPACAcLHnOz1Owtgq8mcR_-PpAr91zOTK_Zj9r0V-9P47vzGsYwAxcVshclfhCMhu73aZuZbvQhy9Rxcj-qRz96cIL"#;
468    let second_icp_raw = br#"{"v":"KERI10JSON000159_","t":"icp","d":"EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf","i":"EFb-WY7Ie1WPEgsioZz1CyzwnuCg-C9k2QCNpcUfM5Jf","s":"0","kt":"1","k":["DIwDbi2Sr1kLZFpsX0Od6Y8ariGVLLjZXxBC5bXEI85e"],"nt":"1","n":["ELhmgZ5JFc-ACs9TJxHMxtcKzQxKXLhlAmUT_sKf1-l7"],"bt":"0","b":["DM73ulUG2_DJyA27DfxBXT5SJ5U3A3c2oeG8Z4bUOgyL"],"c":[],"a":[]}-AABAAAPGpCUdR6EfVWROUjpuTsxg5BIcMnfi7PDciv8VuY9NqZ0ioRoaHxMZue_5ALys86sX4aQzKqm_bID3ZBwlMUP"#;
469
470    let first_id: IdentifierPrefix = "EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen"
471        .parse()
472        .unwrap();
473
474    for event in [icp_raw, rot_raw, ixn_raw, second_icp_raw] {
475        let evs = parse_event_stream(event).unwrap();
476        let ev = evs.first().unwrap();
477        match ev {
478            Message::Notice(Notice::Event(event)) => {
479                db.add_kel_finalized_event(event.clone(), &event.event_message.data.get_prefix())
480                    .unwrap();
481            }
482            _ => unreachable!(),
483        }
484    }
485
486    let first_event = &db.get_kel(&first_id, 0, 1)?[0].signed_event_message;
487
488    let expected_event = &icp_raw[..487]; // icp event without signatures
489    assert_eq!(first_event.event_message.encode().unwrap(), expected_event);
490
491    let sigs_from_db = &first_event.signatures;
492    assert_eq!(sigs_from_db.len(), 3);
493
494    // Retrieve KEL in range
495    let mut part_of_kel_events = db.get_kel(&first_id, 1, 2)?.into_iter();
496
497    let rot = part_of_kel_events.next().unwrap();
498    assert_eq!(
499        rot.signed_event_message.event_message.event_type,
500        EventTypeTag::Rot
501    );
502    assert_eq!(
503        rot.signed_event_message.event_message.digest,
504        Some(
505            "EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz"
506                .parse::<SelfAddressingIdentifier>()
507                .unwrap()
508                .into()
509        )
510    );
511    assert_eq!(rot.signed_event_message.signatures.len(), 3);
512
513    let ixn = part_of_kel_events.next().unwrap();
514    assert_eq!(
515        ixn.signed_event_message.event_message.event_type,
516        EventTypeTag::Ixn
517    );
518    assert_eq!(
519        ixn.signed_event_message.event_message.digest,
520        Some(
521            "EL6Dpm72KXayaUHYvVHlhPplg69fBvRt1P3YzuOGVpmz"
522                .parse::<SelfAddressingIdentifier>()
523                .unwrap()
524                .into()
525        )
526    );
527    assert_eq!(ixn.signed_event_message.signatures.len(), 3);
528
529    assert_eq!(part_of_kel_events.next(), None);
530
531    // Retrieve KEL in range
532    let mut part_of_kel_events = db.get_kel(&first_id, 0, 2)?.into_iter();
533    let icp = part_of_kel_events.next().unwrap();
534    assert_eq!(
535        icp.signed_event_message.event_message.event_type,
536        EventTypeTag::Icp
537    );
538    assert_eq!(
539        icp.signed_event_message.event_message.digest,
540        Some(
541            "EBfxc4RiVY6saIFmUfEtETs1FcqmktZW88UkbnOg0Qen"
542                .parse::<SelfAddressingIdentifier>()
543                .unwrap()
544                .into()
545        )
546    );
547    assert_eq!(icp.signed_event_message.signatures.len(), 3);
548
549    let rot = part_of_kel_events.next().unwrap();
550    assert_eq!(
551        rot.signed_event_message.event_message.event_type,
552        EventTypeTag::Rot
553    );
554    assert_eq!(
555        rot.signed_event_message.event_message.digest,
556        Some(
557            "EHjzZj4i_-RpTN2Yh-NocajFROJ_GkBtlByhRykqiXgz"
558                .parse::<SelfAddressingIdentifier>()
559                .unwrap()
560                .into()
561        )
562    );
563    assert_eq!(rot.signed_event_message.signatures.len(), 3);
564
565    assert_eq!(part_of_kel_events.next(), None);
566
567    let key_state = db.get_key_state(&first_id).unwrap();
568    assert_eq!(key_state.sn, 2);
569    assert_eq!(
570        key_state.last_event_digest,
571        "EL6Dpm72KXayaUHYvVHlhPplg69fBvRt1P3YzuOGVpmz"
572            .parse::<SelfAddressingIdentifier>()
573            .unwrap()
574            .into()
575    );
576    Ok(())
577}