teliox 0.17.13

Library for the Transaction Event Log
Documentation
use std::{sync::Arc, time::Duration};

use keri_core::{
    database::redb::{RedbDatabase, WriteTxnMode},
    processor::{
        event_storage::EventStorage,
        notification::{Notification, NotificationBus, Notifier},
    },
};
use said::SelfAddressingIdentifier;

use crate::{
    database::{
        digest_key_database::DigestKeyDatabase, EscrowDatabase, TelEventDatabase, TelLogDatabase,
    },
    error::Error,
    event::Event,
    processor::{
        notification::{TelNotification, TelNotificationBus, TelNotifier},
        storage::TelEventStorage,
        validator::TelEventValidator,
    },
};

pub struct MissingIssuerEscrow<D: TelEventDatabase> {
    kel_reference: Arc<EventStorage<RedbDatabase>>,
    tel_reference: Arc<TelEventStorage<D>>,
    publisher: TelNotificationBus,
    escrowed_missing_issuer: DigestKeyDatabase,
}

impl<D: TelEventDatabase> MissingIssuerEscrow<D> {
    pub fn new(
        db: Arc<D>,
        escrow_db: &EscrowDatabase,
        duration: Duration,
        kel_reference: Arc<EventStorage<RedbDatabase>>,
        bus: TelNotificationBus,
    ) -> Self {
        let escrow = DigestKeyDatabase::new(escrow_db.0.clone(), "missing_issuer_escrow");

        let tel_event_storage = Arc::new(TelEventStorage::new(db.clone()));
        Self {
            tel_reference: tel_event_storage,
            escrowed_missing_issuer: escrow,
            kel_reference,
            publisher: bus,
        }
    }
}
impl<D: TelEventDatabase + TelLogDatabase> Notifier for MissingIssuerEscrow<D> {
    fn notify(
        &self,
        notification: &Notification,
        _bus: &NotificationBus,
    ) -> Result<(), keri_core::error::Error> {
        match notification {
            Notification::KeyEventAdded(ev_message) => {
                let digest = ev_message.event_message.digest()?;

                self.process_missing_issuer_escrow(&digest).unwrap();
            }
            _ => {
                return Err(keri_core::error::Error::SemanticError(
                    "Wrong notification".into(),
                ))
            }
        }

        Ok(())
    }
}

impl<D: TelEventDatabase + TelLogDatabase> TelNotifier for MissingIssuerEscrow<D> {
    fn notify(
        &self,
        notification: &TelNotification,
        _bus: &TelNotificationBus,
    ) -> Result<(), Error> {
        match notification {
            TelNotification::MissingIssuer(event) => {
                let tel_event_digest = event.event.get_digest()?;
                self.tel_reference
                    .db
                    .log_event(&event, &WriteTxnMode::CreateNew)?;
                let missing_event_digest = event.seal.seal.digest.clone().to_string();
                self.escrowed_missing_issuer
                    .insert(&missing_event_digest.as_str(), &tel_event_digest)
                    .map_err(|e| Error::EscrowDatabaseError(e.to_string()))
            }
            _ => return Err(Error::Generic("Wrong notification".into())),
        }
    }
}

impl<D: TelEventDatabase + TelLogDatabase> MissingIssuerEscrow<D> {
    /// Reprocess escrowed events that need issuer event of given digest for acceptance.
    pub fn process_missing_issuer_escrow(
        &self,
        said: &SelfAddressingIdentifier,
    ) -> Result<(), Error> {
        if let Ok(esc) = self.escrowed_missing_issuer.get(&said.to_string().as_str()) {
            for digest in esc {
                let event = self.tel_reference.db.get(&digest)?.unwrap();
                let kel_event_digest = event.event.get_digest()?;
                let validator =
                    TelEventValidator::new(self.tel_reference.clone(), self.kel_reference.clone());
                let result = match &event.event {
                    Event::Management(man) => validator.validate_management(&man, &event.seal),
                    Event::Vc(vc) => validator.validate_vc(&vc, &event.seal),
                };
                match result {
                    Ok(_) => {
                        // remove from escrow
                        self.escrowed_missing_issuer
                            .remove(said, &event.event.get_digest()?)
                            .map_err(|e| Error::EscrowDatabaseError(e.to_string()))?;
                        // accept tel event
                        self.tel_reference.add_event(event.clone())?;

                        self.publisher
                            .notify(&TelNotification::TelEventAdded(event))?;
                    }
                    Err(Error::MissingSealError) => {
                        // remove from escrow
                        self.escrowed_missing_issuer
                            .remove(said, &kel_event_digest)
                            .unwrap();
                    }
                    Err(Error::OutOfOrderError) => {
                        self.escrowed_missing_issuer
                            .remove(said, &kel_event_digest)
                            .unwrap();
                        self.publisher.notify(&TelNotification::OutOfOrder(event))?;
                    }
                    Err(Error::MissingRegistryError) => {
                        self.escrowed_missing_issuer
                            .remove(said, &kel_event_digest)
                            .unwrap();
                        self.publisher
                            .notify(&TelNotification::MissingRegistry(event))?;
                    }
                    Err(_e) => (), // keep in escrow,
                }
            }
        };

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use std::{sync::Arc, time::Duration};

    use keri_core::{
        actor::parse_event_stream,
        database::redb::RedbDatabase,
        prefix::IdentifierPrefix,
        processor::{
            basic_processor::BasicProcessor, event_storage::EventStorage,
            notification::JustNotification, Processor,
        },
    };
    use redb::Database;

    use crate::{
        database::{redb::RedbTelDatabase, EscrowDatabase, TelEventDatabase},
        error::Error,
        event::{manager_event, verifiable_event::VerifiableEvent},
        processor::{
            escrow::missing_issuer::MissingIssuerEscrow,
            notification::{TelNotificationBus, TelNotificationKind},
            TelEventProcessor, TelEventStorage,
        },
        seal::EventSourceSeal,
        tel::event_generator,
    };

    #[test]
    pub fn test_missing_issuer_escrow() -> Result<(), Error> {
        use tempfile::Builder;

        // Setup issuer key event log. Without ixn events tel event's can't be validated.
        let keri_root = Builder::new().prefix("test-db").tempfile().unwrap();
        let keri_db = Arc::new(RedbDatabase::new(keri_root.path()).unwrap());
        let mut keri_processor = BasicProcessor::new(keri_db.clone(), None);
        let keri_storage = Arc::new(EventStorage::new(keri_db.clone()));

        let issuer_kel = r#"{"v":"KERI10JSON00012b_","t":"icp","d":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","i":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","s":"0","kt":"1","k":["DHdoiqT1iac2HI6-HfCYcc01Piz2FTTPvZDFt6vADioD"],"nt":"1","n":["EH8IzIWeQFiUr3rr2dh8xAiW9Akwl6EooDt8iduQYyq_"],"bt":"0","b":[],"c":[],"a":[]}-AABAABvFFeXb9uW2G16o3C9xJZvY3a_utMPxd4NIUcGWRTqykMO1NzKwjsA_AQrOEwgO5jselWHREcK6vcAxRfv6-QC{"v":"KERI10JSON00013a_","t":"ixn","d":"EMOzEVoFjbkS3ZS5JtmJO4LeZ4gydbr8iXNrEQAt1OR2","i":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","s":"1","p":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","a":[{"i":"EF3TVac5quxrbLGLKAHF21laISjMgjYQAIg3OsTen969","s":"0","d":"ENIKpuUkjM-1K2Sv_TZwF_k8FTVkefAgy8sIpiFp0uWh"}]}-AABAACvrSS_EZUMKQ6Ax8FaB_Sf99O0y6MmfoRDBKMphVWWtuCOlFQm6N0XrTwtYxO3pO0AEZkJ1vzu52-RDK-w3YAN{"v":"KERI10JSON00013a_","t":"ixn","d":"EDvnfU2yMZUXEy9D_22YOkeSZOq6YG9zfItawvx3GR_6","i":"EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI","s":"2","p":"EMOzEVoFjbkS3ZS5JtmJO4LeZ4gydbr8iXNrEQAt1OR2","a":[{"i":"EC8Oej-3HAUpBY_kxzBK3B-0RV9j4dXw1H0NRKxJg7g-","s":"0","d":"EDBM1ys50vEJxRzvBjTOrmOhokELjVtozXy3ZbJ8-KFk"}]}-AABAAABtEQ7SoGt2IcZBMX0GaEaMqGdMsrGpj1fABDKgE5dA7s7AGXTkWrZjzA4GXkGXuOspi6upqBhpxr6d5ySeKQH"#;

        let kel = parse_event_stream(issuer_kel.as_bytes()).unwrap();
        let issuer_icp = kel[0].clone();
        let issuer_vcp_ixn = kel[1].clone();

        // Incept identifier
        keri_processor.process(&issuer_icp)?;

        // Initiate tel and it's escrows
        let tel_root = Builder::new().prefix("test-db").tempfile().unwrap();
        let tel_escrow_root = Builder::new().prefix("test-db").tempfile().unwrap();

        let db = EscrowDatabase::new(tel_escrow_root.path()).unwrap();
        let tel_events_db = Arc::new(RedbTelDatabase::new(&tel_root.path()).unwrap());

        let tel_storage = Arc::new(TelEventStorage::new(tel_events_db.clone()));
        let tel_bus = TelNotificationBus::new();

        let missing_issuer_escrow = Arc::new(MissingIssuerEscrow::new(
            tel_events_db,
            &db,
            Duration::from_secs(100),
            keri_storage.clone(),
            tel_bus.clone(),
        ));

        tel_bus.register_observer(
            missing_issuer_escrow.clone(),
            vec![TelNotificationKind::MissingIssuer],
        )?;

        keri_processor.register_observer(
            missing_issuer_escrow.clone(),
            &vec![JustNotification::KeyEventAdded],
        )?;

        let processor = TelEventProcessor::new(keri_storage, tel_storage.clone(), Some(tel_bus)); // TelEventProcessor{database: TelEventDatabase::new(db, db_escrow)};

        let issuer_prefix: IdentifierPrefix = "EETk5xW-rl2TgHTTXr8m5kGXiC30m3gMgsYcBAjOE9eI"
            .parse()
            .unwrap();
        let dummy_source_seal = EventSourceSeal {
            sn: 1,
            digest: "EMOzEVoFjbkS3ZS5JtmJO4LeZ4gydbr8iXNrEQAt1OR2"
                .parse()
                .unwrap(),
        };

        let vcp = event_generator::make_inception_event(
            issuer_prefix,
            vec![manager_event::Config::NoBackers],
            0,
            vec![],
            None,
            None,
        )?;

        let management_tel_prefix = vcp.get_prefix();

        // before applying vcp to management tel, insert anchor event seal with proper ixn event data.
        let verifiable_vcp = VerifiableEvent::new(vcp.clone(), dummy_source_seal.clone().into());
        processor.process(verifiable_vcp.clone())?;

        // Check management state. Vcp event should't be accepted, because of
        // missing issuer event. It should be in missing issuer escrow.
        let st = tel_storage.compute_management_tel_state(&management_tel_prefix)?;
        assert_eq!(st, None);

        // check if vcp event is in db.
        let man_event_from_db =
            tel_storage.get_management_event_at_sn(&management_tel_prefix, 0)?;
        assert!(man_event_from_db.is_none());

        // Process missing ixn in issuer's kel. Now escrowed vcp event should be
        // accepted.
        keri_processor.process(&issuer_vcp_ixn)?;

        let management_state = tel_storage
            .compute_management_tel_state(&management_tel_prefix)?
            .unwrap();
        assert_eq!(management_state.sn, 0);

        // check if vcp event is in db.
        let man_event_from_db =
            tel_storage.get_management_event_at_sn(&management_tel_prefix, 0)?;
        assert!(man_event_from_db.is_some());
        assert_eq!(
            man_event_from_db.unwrap().serialize().unwrap(),
            verifiable_vcp.serialize().unwrap()
        );

        Ok(())
    }
}