keri_core/processor/
mod.rs

1use std::sync::Arc;
2
3pub mod basic_processor;
4pub mod escrow;
5#[cfg(test)]
6mod escrow_tests;
7pub mod event_storage;
8pub mod notification;
9#[cfg(test)]
10mod processor_tests;
11
12pub mod validator;
13
14use said::version::format::SerializationFormats;
15
16use self::{
17    notification::{JustNotification, Notification, NotificationBus, Notifier},
18    validator::EventValidator,
19};
20#[cfg(feature = "query")]
21use crate::query::reply_event::{ReplyRoute, SignedReply};
22use crate::{
23    database::{timestamped::TimestampedSignedEventMessage, EventDatabase},
24    error::Error,
25    event::receipt::Receipt,
26    event_message::signed_event_message::{
27        Notice, SignedEventMessage, SignedNontransferableReceipt,
28    },
29    prefix::IdentifierPrefix,
30    state::IdentifierState,
31};
32
33pub trait Processor {
34    type Database: EventDatabase + 'static;
35    fn process_notice(&self, notice: &Notice) -> Result<(), Error>;
36
37    #[cfg(feature = "query")]
38    fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error>;
39
40    fn register_observer(
41        &mut self,
42        observer: Arc<dyn Notifier + Send + Sync>,
43        notifications: &[JustNotification],
44    ) -> Result<(), Error>;
45
46    fn process(
47        &self,
48        msg: &crate::event_message::signed_event_message::Message,
49    ) -> Result<(), Error> {
50        use crate::event_message::signed_event_message::Message;
51        #[cfg(feature = "query")]
52        use crate::event_message::signed_event_message::Op;
53        match msg {
54            Message::Notice(notice) => self.process_notice(notice),
55            #[cfg(any(feature = "query", feature = "oobi"))]
56            Message::Op(op) => match op {
57                #[cfg(feature = "query")]
58                Op::Query(_query) => panic!("processor can't handle query op"),
59                #[cfg(feature = "query")]
60                Op::Reply(reply) => self.process_op_reply(reply),
61                _ => todo!(),
62            },
63        }
64    }
65}
66
67pub struct EventProcessor<D: EventDatabase> {
68    events_db: Arc<D>,
69    validator: EventValidator<D>,
70    publisher: NotificationBus,
71}
72
73/* impl EventProcessor<RedbDatabase> {
74    pub fn new(publisher: NotificationBus, events_db: Arc<RedbDatabase>) -> Self {
75        let validator = EventValidator::new(events_db.clone());
76        Self {
77            events_db,
78            validator,
79            publisher,
80        }
81    }
82} */
83
84impl<D: EventDatabase + 'static> EventProcessor<D> {
85    pub fn new(publisher: NotificationBus, events_db: Arc<D>) -> Self {
86        let validator = EventValidator::new(events_db.clone());
87        Self {
88            events_db,
89            validator,
90            publisher,
91        }
92    }
93
94    pub fn register_observer(
95        &mut self,
96        observer: Arc<dyn Notifier + Send + Sync>,
97        notifications: Vec<JustNotification>,
98    ) -> Result<(), Error> {
99        self.publisher.register_observer(observer, notifications);
100        Ok(())
101    }
102
103    #[cfg(feature = "query")]
104    pub fn process_op_reply(&self, rpy: &SignedReply) -> Result<(), Error> {
105        use crate::processor::validator::MoreInfoError;
106
107        use self::validator::VerificationError;
108
109        match rpy.reply.get_route() {
110            ReplyRoute::Ksn(_, _) => match self.validator.process_signed_ksn_reply(rpy) {
111                Ok(_) => {
112                    self.events_db
113                        .save_reply(rpy.clone())
114                        .map_err(|_e| Error::DbError)?;
115                }
116                Err(Error::VerificationError(VerificationError::MoreInfo(
117                    MoreInfoError::EventNotFound(_),
118                ))) => {
119                    self.publisher
120                        .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
121                }
122                Err(Error::EventOutOfOrderError) => {
123                    self.publisher
124                        .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
125                }
126                Err(anything) => return Err(anything),
127            },
128            _ => {}
129        }
130        Ok(())
131    }
132
133    pub fn process_notice<F>(&self, notice: &Notice, processing_strategy: F) -> Result<(), Error>
134    where
135        F: Fn(
136            Arc<D>,
137            // Arc<SledEventDatabase>,
138            &NotificationBus,
139            SignedEventMessage,
140        ) -> Result<(), Error>,
141    {
142        match notice {
143            Notice::Event(signed_event) => {
144                processing_strategy(
145                    self.events_db.clone(),
146                    // self.db.clone(),
147                    &self.publisher,
148                    signed_event.clone(),
149                )?;
150                // check if receipts are attached
151                if let Some(witness_receipts) = &signed_event.witness_receipts {
152                    // Create and process witness receipts
153                    let id = signed_event.event_message.data.get_prefix();
154                    let receipt = Receipt::new(
155                        SerializationFormats::JSON,
156                        signed_event.event_message.digest()?,
157                        id,
158                        signed_event.event_message.data.get_sn(),
159                    );
160                    let signed_receipt =
161                        SignedNontransferableReceipt::new(&receipt, witness_receipts.clone());
162                    self.process_notice(
163                        &Notice::NontransferableRct(signed_receipt),
164                        processing_strategy,
165                    )
166                } else {
167                    Ok(())
168                }
169            }
170            Notice::NontransferableRct(rct) => {
171                let id = &rct.body.prefix;
172                match self.validator.validate_witness_receipt(rct) {
173                    Ok(_) => {
174                        self.events_db
175                            .add_receipt_nt(rct.to_owned(), id)
176                            .map_err(|_| Error::DbError)?;
177                        self.publisher.notify(&Notification::ReceiptAccepted)
178                    }
179                    Err(Error::MissingEvent) => self
180                        .publisher
181                        .notify(&Notification::ReceiptOutOfOrder(rct.clone())),
182                    Err(e) => Err(e),
183                }
184            }
185            Notice::TransferableRct(vrc) => match self.validator.validate_validator_receipt(vrc) {
186                Ok(_) => {
187                    self.events_db
188                        .add_receipt_t(vrc.clone(), &vrc.body.prefix)
189                        .map_err(|_| Error::DbError)?;
190                    self.publisher.notify(&Notification::ReceiptAccepted)
191                }
192                Err(Error::MissingEvent) | Err(Error::EventOutOfOrderError) => self
193                    .publisher
194                    .notify(&Notification::TransReceiptOutOfOrder(vrc.clone())),
195                Err(e) => Err(e),
196            },
197        }
198    }
199}
200
201/// Compute State for Prefix
202///
203/// Returns the current State associated with
204/// the given Prefix
205pub fn compute_state<D: EventDatabase>(
206    db: Arc<D>,
207    id: &IdentifierPrefix,
208) -> Option<IdentifierState> {
209    if let Some(events) = db.get_kel_finalized_events(crate::database::QueryParameters::All { id })
210    {
211        // start with empty state
212        let mut state = IdentifierState::default();
213        // we sort here to get inception first
214        let mut sorted_events = events.collect::<Vec<TimestampedSignedEventMessage>>();
215        // TODO why identifier is in database if there are no events for it?
216        if sorted_events.is_empty() {
217            return None;
218        };
219        sorted_events.sort();
220        for event in sorted_events {
221            state = match state.clone().apply(&event.signed_event_message) {
222                Ok(s) => s,
223                // will happen when a recovery has overridden some part of the KEL,
224                Err(e) => match e {
225                    // skip out of order and partially signed events
226                    Error::EventOutOfOrderError | Error::NotEnoughSigsError => continue,
227                    // stop processing here
228                    _ => break,
229                },
230            };
231        }
232        Some(state)
233    } else {
234        // no inception event, no state
235        None
236    }
237}