keri-core 0.17.13

Core library for the Key Event Receipt Infrastructure
Documentation
use std::sync::Arc;

use said::SelfAddressingIdentifier;

use crate::{
    database::{
        redb::{
            escrow_database::SnKeyDatabase,
            ksn_log::{AcceptedKsn, KsnLogDatabase},
            RedbDatabase, RedbError,
        },
        EventDatabase, SequencedEventDatabase,
    },
    error::Error,
    prefix::IdentifierPrefix,
    processor::{
        notification::{Notification, NotificationBus, Notifier},
        validator::{EventValidator, MoreInfoError, VerificationError},
    },
    query::reply_event::{ReplyEvent, ReplyRoute, SignedReply},
};

#[derive(Clone)]
pub struct ReplyEscrow<D: EventDatabase> {
    events_db: Arc<D>,
    accepted_ksn: Arc<AcceptedKsn>,
    escrowed_reply: Arc<SnKeyReplyEscrow>,
}

impl ReplyEscrow<RedbDatabase> {
    pub fn new(events_db: Arc<RedbDatabase>) -> Self {
        let acc = Arc::new(AcceptedKsn::new(events_db.db.clone()).unwrap());
        let reply_esc_db = Arc::new(SnKeyReplyEscrow::new(
            Arc::new(SnKeyDatabase::new(events_db.db.clone(), "reply_escrow").unwrap()),
            acc.ksn_log.clone(),
        ));
        Self {
            events_db,
            accepted_ksn: acc,
            escrowed_reply: reply_esc_db,
        }
    }
}
impl Notifier for ReplyEscrow<RedbDatabase> {
    fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
        match notification {
            Notification::KsnOutOfOrder(rpy) => {
                match rpy.reply.get_route() {
                    ReplyRoute::Ksn(_id, _ksn) => {
                        self.escrowed_reply.insert(rpy)?;
                    }
                    _ => return Err(Error::SemanticError("Wrong event type".to_string())),
                };
                Ok(())
            }
            Notification::KeyEventAdded(ev) => {
                let id = ev.event_message.data.get_prefix();
                let sn = ev.event_message.data.sn;
                self.process_reply_escrow(bus, &id, sn)
            }
            _ => Ok(()),
        }
    }
}

impl ReplyEscrow<RedbDatabase> {
    pub fn process_reply_escrow(
        &self,
        _bus: &NotificationBus,
        id: &IdentifierPrefix,
        sn: u64,
    ) -> Result<(), Error> {
        use crate::query::QueryError;

        for sig_rep in self.escrowed_reply.get_from_sn(id, sn)? {
            let validator = EventValidator::new(self.events_db.clone());
            match validator.process_signed_ksn_reply(&sig_rep) {
                Ok(_) => {
                    self.escrowed_reply.remove(&sig_rep.reply);
                    self.accepted_ksn.insert(sig_rep.clone())?;
                }
                Err(Error::SignatureVerificationError)
                | Err(Error::QueryError(QueryError::StaleRpy)) => {
                    // remove from escrow
                    self.escrowed_reply.remove(&sig_rep.reply);
                }
                Err(Error::EventOutOfOrderError)
                | Err(Error::VerificationError(VerificationError::MoreInfo(
                    MoreInfoError::EventNotFound(_),
                ))) => (), // keep in escrow,
                Err(e) => return Err(e),
            };
        }
        // };
        Ok(())
    }

    pub fn get_all(&self, id: &IdentifierPrefix) -> Result<Vec<SignedReply>, Error> {
        Ok(self.escrowed_reply.get_from_sn(id, 0)?.collect())
    }
}

pub struct SnKeyReplyEscrow {
    escrow: Arc<SnKeyDatabase>,
    log: Arc<KsnLogDatabase>,
}

impl SnKeyReplyEscrow {
    pub(crate) fn new(escrow: Arc<SnKeyDatabase>, log: Arc<KsnLogDatabase>) -> Self {
        Self { escrow, log }
    }

    pub fn save_digest(
        &self,
        id: &IdentifierPrefix,
        sn: u64,
        event_digest: &SelfAddressingIdentifier,
    ) -> Result<(), RedbError> {
        self.escrow.insert(id, sn, event_digest)?;

        Ok(())
    }

    pub fn insert(&self, event: &SignedReply) -> Result<(), RedbError> {
        self.log
            .log_reply(&crate::database::redb::WriteTxnMode::CreateNew, &event)?;
        let said = event.reply.digest().unwrap();
        let id = event.reply.get_prefix();
        let sn = match &event.reply.data.data {
            ReplyRoute::Ksn(_identifier_prefix, key_state_notice) => key_state_notice.state.sn,
            _ => todo!(),
        };
        self.escrow.insert(&id, sn, &said)?;

        Ok(())
    }

    pub fn get_from_sn<'a>(
        &'a self,
        identifier: &IdentifierPrefix,
        sn: u64,
    ) -> Result<impl Iterator<Item = SignedReply> + 'a, RedbError> {
        Ok(self
            .escrow
            .get_greater_than(identifier, sn)?
            .map(move |said| self.log.get_signed_reply(&said).unwrap().unwrap()))
    }

    pub fn remove(&self, event: &ReplyEvent) {
        let said = event.digest().unwrap();
        let id = event.get_prefix();
        let sn = match &event.data.data {
            ReplyRoute::Ksn(_identifier_prefix, key_state_notice) => key_state_notice.state.sn,
            _ => todo!(),
        };
        self.escrow.remove(&id, sn, &said).unwrap();
    }

    pub fn contains(
        &self,
        id: &IdentifierPrefix,
        sn: u64,
        digest: &SelfAddressingIdentifier,
    ) -> Result<bool, RedbError> {
        Ok(self
            .escrow
            .get(id, sn)?
            .find(|said| said == digest)
            .is_some())
    }
}

#[cfg(test)]
mod tests {
    use crate::{
        actor::prelude::{BasicProcessor, Message},
        database::redb::RedbDatabase,
        error::Error,
        event_message::signed_event_message::Op,
        prefix::IdentifierPrefix,
        processor::{escrow::reply_escrow::ReplyEscrow, notification::JustNotification, Processor},
    };
    use cesrox::{parse, parse_many};
    use std::{fs, sync::Arc};
    use tempfile::{Builder, NamedTempFile};

    #[test]
    pub fn test_reply_escrow() -> Result<(), Error> {
        // Create test db and event processor.
        let root = Builder::new().prefix("test-db").tempdir().unwrap();
        fs::create_dir_all(root.path()).unwrap();
        let events_db_path = NamedTempFile::new().unwrap();
        let events_db = Arc::new(RedbDatabase::new(events_db_path.path()).unwrap());
        let mut event_processor = BasicProcessor::new(events_db.clone(), None);
        let rpy_escrow = Arc::new(ReplyEscrow::new(events_db.clone()));
        event_processor.register_observer(
            rpy_escrow.clone(),
            &[
                JustNotification::KeyEventAdded,
                JustNotification::KsnOutOfOrder,
            ],
        )?;

        let identifier: IdentifierPrefix =
            "EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH".parse()?;

        let kel = r#"{"v":"KERI10JSON00012b_","t":"icp","d":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"0","kt":"1","k":["DKiNnDmdOkcBjcAqL2FFhMZnSlPfNyGrJlCjJmX5b1nU"],"nt":"1","n":["EMP7Lg6BtehOYZt2RwOqXLNfMUiUllejAp8G_5EiANXR"],"bt":"0","b":[],"c":[],"a":[]}-VAn-AABAAArkDBeflIAo4kBsKnc754XHJvdLnf04iq-noTFEJkbv2MeIGZtx6lIfJPmRSEmFMUkFW4otRrMeBGQ0-nlhHEE-EAB0AAAAAAAAAAAAAAAAAAAAAAA1AAG2021-01-01T00c00c00d000000p00c00{"v":"KERI10JSON000160_","t":"rot","d":"EHZks1BQ_ieuzASY7VoZNIOgIfnlE-SZJzO3OP_Wf3zM","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"1","p":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","kt":"1","k":["DMm-PHnlVVw-yQGqxxQFH3ynIGBrwkOCll9NJsszS4M1"],"nt":"1","n":["EGDpG5Ca3-vx-0O_rCXo44CG9VfjvDM8kXZlXt5TRGqq"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAAB49lUrFy86023zwry5pLz3_stNBPLU2Zoj2HO02W-J-fXvA9EL7BOpuVjEdhPHz1KbRWOKljI8yY3PZR3PyiMG-EAB0AAAAAAAAAAAAAAAAAAAAAAB1AAG2021-01-01T00c00c00d000000p00c00{"v":"KERI10JSON000160_","t":"rot","d":"ECg9CiC6qW-Y8DF-TByP0x4tG_OvPkAtKSZuZU8ZiXYT","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"2","p":"EHZks1BQ_ieuzASY7VoZNIOgIfnlE-SZJzO3OP_Wf3zM","kt":"1","k":["DMjdd0iohdRbFaFUeQuK_9eSSS1AcQVwZpJXg-QGFqZX"],"nt":"1","n":["ECDBhT8ht1Z2WFeC6C_7sCAPkduj3DDjEz2cxI_RSo-I"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAAAtK2_idK0_YfBrswOvbjBFWtjTZ5XRRU42HC7eoph_gi67BCeTaMBUBKyx5LZYnAG3GzOl5Xj-CXkvzSlwJ10K-EAB0AAAAAAAAAAAAAAAAAAAAAAC1AAG2021-01-01T00c00c00d000000p00c00{"v":"KERI10JSON000160_","t":"rot","d":"EHnhq9u8zdNJB38yaY3r7G73LrnJsPakgSjJFk6vSxUs","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"3","p":"ECg9CiC6qW-Y8DF-TByP0x4tG_OvPkAtKSZuZU8ZiXYT","kt":"1","k":["DK3AM_4Jg07liB5_5jkA3kiv2iSEYsOSDMzw-4oMxA29"],"nt":"1","n":["EGdk-oXzuVUatJYeIuai9wlUJ0ulVUTrb9w0LPPuuyB0"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAAAMxebsLh1V2NIHJl0diSC242MSg5TNSbtgjZPuf34adjV9rs6B73pWyt-TmRWMIY-me9-pg3eN0p4wQsyBWIEC-EAB0AAAAAAAAAAAAAAAAAAAAAAD1AAG2021-01-01T00c00c00d000000p00c00"#;

        let parsed = parse_many(kel.as_bytes()).unwrap().1;
        let kel_events = parsed.into_iter().map(|ev| Message::try_from(ev).unwrap());

        let rest_of_kel = r#"{"v":"KERI10JSON000160_","t":"rot","d":"EHVjHgDO5Gm7VJX1_0pfajNdsr5yMWxeu8jTBDFM3Hxx","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"4","p":"EHnhq9u8zdNJB38yaY3r7G73LrnJsPakgSjJFk6vSxUs","kt":"1","k":["DHCf_d5wepRUYrkwii7D9D_ix9m8YJ1u6c7Kf4vChB2-"],"nt":"1","n":["EDnPDsp4HhTExdfBa_ZKoW9wwVsO8SXQnDAikGP2wbcX"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAAAp185sbBdPawKcVHO_8mrMIYtAG5mKNlQWWvCIFVlIszQcge3FAEfYq4cw9Gh_tY82PBPLBPlgXYjLYnRxXQkD-EAB0AAAAAAAAAAAAAAAAAAAAAAE1AAG2021-01-01T00c00c00d000000p00c00{"v":"KERI10JSON000160_","t":"rot","d":"EFltRrVgHygpUoAIpyiYHUe0Zt8-lPQ20iNk2fA0CGnB","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"5","p":"EHVjHgDO5Gm7VJX1_0pfajNdsr5yMWxeu8jTBDFM3Hxx","kt":"1","k":["DMOFqHNTbaOrzXv6Hs6d08Nd_mLo7wiH-888vFGtFWmV"],"nt":"1","n":["ECYVu54u3AXOkOZjf2RZnbeRxbu10vBW85rZTgXKcoVH"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAAAERG7cm3_SLovALeBeadaIzCh55ul_Mj3mp4UNdzvmdbBbGMDTxrkEVHpM25BOROuhIRKWUdw6yVFubbKgl7wI-EAB0AAAAAAAAAAAAAAAAAAAAAAF1AAG2021-01-01T00c00c00d000000p00c00{"v":"KERI10JSON000160_","t":"rot","d":"EGs1eJFrFrcenQouDUk57AA5lYs8E-xOVcLPu0me-gyS","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"6","p":"EFltRrVgHygpUoAIpyiYHUe0Zt8-lPQ20iNk2fA0CGnB","kt":"1","k":["DIxsQo0QFWhYlX34UHDw39OG4nIr5tky9S5Jwi97o7-d"],"nt":"1","n":["EJyVykl3kJen1vWe9MkOKNQ6DN6h16hAcqU7h77MSkpn"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAADkk6omMn4qARRuu6EJlSqvkEcZlSRxF9wDMuJLf31k0U7XBgFaYMJA50V7yUI46GGbF7t3x72NZxiM0EhzeqMB-EAB0AAAAAAAAAAAAAAAAAAAAAAG1AAG2021-01-01T00c00c00d000000p00c00{"v":"KERI10JSON000160_","t":"rot","d":"ECfNpyS_AJmk_tbcQfbMQAcXUZHzv_3Hb6OAxp7EMy09","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"7","p":"EGs1eJFrFrcenQouDUk57AA5lYs8E-xOVcLPu0me-gyS","kt":"1","k":["DME53BRS5KnXzsfYqT8I2DCfl6nMOrlYBN_Fm3wIh9M0"],"nt":"1","n":["EKsCb2J8R1gnygI5QiYIJ67CJYrXx-uZ_iM0yITQYDV9"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAAB6IYTjz7nE_44ttVWLyNsIxjMOUs8mx1_L-XiD1Hu61W8efpvlf2cpZWrhmFuNtAxGTyFpfXL9dwjaC0cQTsUH-EAB0AAAAAAAAAAAAAAAAAAAAAAH1AAG2021-01-01T00c00c00d000000p00c00{"v":"KERI10JSON000160_","t":"rot","d":"EJ8T_WbwJfv7xYI6PforMwweAT0o7i7c6rRNhdkqOyQ7","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"8","p":"ECfNpyS_AJmk_tbcQfbMQAcXUZHzv_3Hb6OAxp7EMy09","kt":"1","k":["DDnnUcnyEHuGfhjUpW4APLcSCsseC4trSdUtGIcu5dk_"],"nt":"1","n":["EEH-Nd5uWnAjXzX1mwBsz6WZWbCbGZBZIuurBl2r8TPn"],"bt":"0","br":[],"ba":[],"a":[]}-VAn-AABAAC5QaZ2nCtxB9-RQ68LxbKABJ_QP7aFbAVnAPW4usBCiNbTL6DDzSI1Z3ykh6RPczk0HYfRW39kbtMWsIPHQ1MJ-EAB0AAAAAAAAAAAAAAAAAAAAAAI1AAG2021-01-01T00c00c00d000000p00c00"#;
        let parsed = parse_many(rest_of_kel.as_bytes()).unwrap().1;
        let rest_of_kel = parsed.into_iter().map(|ev| Message::try_from(ev).unwrap());

        let old_rpy = r#"{"v":"KERI10JSON00029d_","t":"rpy","d":"EJjB0S6SaAA1ymaO0cXVmv5kJagHVVUVpxD6q5_jrcgP","dt":"2021-01-01T00:00:00.000000+00:00","r":"/ksn/EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","a":{"v":"KERI10JSON0001e2_","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"3","p":"ECg9CiC6qW-Y8DF-TByP0x4tG_OvPkAtKSZuZU8ZiXYT","d":"EHnhq9u8zdNJB38yaY3r7G73LrnJsPakgSjJFk6vSxUs","f":"3","dt":"2021-01-01T00:00:00.000000+00:00","et":"rot","kt":"1","k":["DK3AM_4Jg07liB5_5jkA3kiv2iSEYsOSDMzw-4oMxA29"],"nt":"1","n":["EGdk-oXzuVUatJYeIuai9wlUJ0ulVUTrb9w0LPPuuyB0"],"bt":"0","b":[],"c":[],"ee":{"s":"3","d":"EHnhq9u8zdNJB38yaY3r7G73LrnJsPakgSjJFk6vSxUs","br":[],"ba":[]},"di":""}}-VA0-FABEA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH0AAAAAAAAAAAAAAAAAAAAAADEHnhq9u8zdNJB38yaY3r7G73LrnJsPakgSjJFk6vSxUs-AABAADfyYgxdTg4vvKcbCHaog79P3KVJJX_bYMZOuOobmLM9uWLmTVHFvFB36-hS062DfCsCyBF0tmODSlmVY-TksUC"#;
        let parsed = parse(old_rpy.as_bytes()).unwrap().1;
        let deserialized_old_rpy = Message::try_from(parsed).unwrap();

        let new_rpy = r#"{"v":"KERI10JSON00029d_","t":"rpy","d":"EJX7EebLoW8VTVvO3iPuFGzy38BU6OEEsRR9nFjzgeL6","dt":"2021-01-01T00:00:00.000000+00:00","r":"/ksn/EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","a":{"v":"KERI10JSON0001e2_","i":"EA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH","s":"8","p":"ECfNpyS_AJmk_tbcQfbMQAcXUZHzv_3Hb6OAxp7EMy09","d":"EJ8T_WbwJfv7xYI6PforMwweAT0o7i7c6rRNhdkqOyQ7","f":"8","dt":"2021-01-01T00:00:00.000000+00:00","et":"rot","kt":"1","k":["DDnnUcnyEHuGfhjUpW4APLcSCsseC4trSdUtGIcu5dk_"],"nt":"1","n":["EEH-Nd5uWnAjXzX1mwBsz6WZWbCbGZBZIuurBl2r8TPn"],"bt":"0","b":[],"c":[],"ee":{"s":"8","d":"EJ8T_WbwJfv7xYI6PforMwweAT0o7i7c6rRNhdkqOyQ7","br":[],"ba":[]},"di":""}}-VA0-FABEA_SbBUZYwqLVlAAn14d6QUBQCSReJlZ755JqTgmRhXH0AAAAAAAAAAAAAAAAAAAAAAIEJ8T_WbwJfv7xYI6PforMwweAT0o7i7c6rRNhdkqOyQ7-AABAABuq6TXP5ZHc62y8NxNnKJjyJ1b4Nc1Mfu4ZKzg_47kbRyBriC9k7vnuidpIOfjUE7tnseaY5p6Gyr5qULXJWEK"#;
        let parsed = parse(new_rpy.as_bytes()).unwrap().1;
        let deserialized_new_rpy = Message::try_from(parsed).unwrap();

        // Try to process out of order reply
        event_processor
            .process(&deserialized_old_rpy.clone())
            .unwrap();

        let escrow = rpy_escrow
            .escrowed_reply
            .get_from_sn(&identifier, 0)
            .unwrap();
        assert_eq!(escrow.collect::<Vec<_>>().len(), 1);

        let accepted_rpys = rpy_escrow.clone().accepted_ksn.get_all(&identifier)?;
        assert!(accepted_rpys.is_empty());

        // process kel events and update escrow
        // reply event should be unescrowed and save as accepted
        kel_events.for_each(|ev| {
            event_processor.process(&ev).unwrap();
        });

        let escrow = rpy_escrow.escrowed_reply.get_from_sn(&identifier, 0);
        assert_eq!(escrow.unwrap().collect::<Vec<_>>().len(), 0);

        let accepted_rpys = rpy_escrow.accepted_ksn.get_all(&identifier)?;
        assert_eq!(accepted_rpys.len(), 1);

        // Try to process new out of order reply
        // reply event should be escrowed, accepted reply shouldn't change
        event_processor.process(&deserialized_new_rpy.clone())?;
        let mut escrow = rpy_escrow
            .escrowed_reply
            .get_from_sn(&identifier, 0)
            .unwrap();
        assert_eq!(
            Message::Op(Op::Reply(escrow.next().unwrap())),
            deserialized_new_rpy
        );
        assert!(escrow.next().is_none());

        let accepted_rpys = rpy_escrow.accepted_ksn.get_all(&identifier)?;
        assert_eq!(accepted_rpys.len(), 1);
        assert_eq!(
            Message::Op(Op::Reply(accepted_rpys[0].clone())),
            deserialized_old_rpy
        );

        // process rest of kel and update escrow
        // reply event should be unescrowed and save as accepted
        rest_of_kel.for_each(|ev| {
            event_processor.process(&ev).unwrap();
        });

        let escrow = rpy_escrow.escrowed_reply.get_from_sn(&identifier, 0);
        assert_eq!(escrow.unwrap().collect::<Vec<_>>().len(), 0);

        let accepted_rpys = rpy_escrow.accepted_ksn.get_all(&identifier)?;

        assert_eq!(accepted_rpys.len(), 1);
        assert_eq!(
            Message::Op(Op::Reply(accepted_rpys[0].clone())),
            deserialized_new_rpy
        );

        Ok(())
    }
}