keri_core/processor/
mod.rs

1use std::sync::Arc;
2
3pub mod basic_processor;
4pub mod escrow;
5#[cfg(test)]
6mod escrow_tests;
7pub mod event_storage;
8pub mod notification;
9#[cfg(test)]
10mod processor_tests;
11
12pub mod validator;
13
14use said::version::format::SerializationFormats;
15
16use self::{
17    notification::{JustNotification, Notification, NotificationBus, Notifier},
18    validator::EventValidator,
19};
20#[cfg(feature = "query")]
21use crate::query::reply_event::{ReplyRoute, SignedReply};
22use crate::{
23    database::{
24        sled::SledEventDatabase, timestamped::TimestampedSignedEventMessage, EventDatabase,
25    },
26    error::Error,
27    event::receipt::Receipt,
28    event_message::signed_event_message::{
29        Notice, SignedEventMessage, SignedNontransferableReceipt,
30    },
31    prefix::IdentifierPrefix,
32    state::IdentifierState,
33};
34
35pub trait Processor {
36    type Database: EventDatabase;
37    fn process_notice(&self, notice: &Notice) -> Result<(), Error>;
38
39    #[cfg(feature = "query")]
40    fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error>;
41
42    fn register_observer(
43        &mut self,
44        observer: Arc<dyn Notifier + Send + Sync>,
45        notifications: &[JustNotification],
46    ) -> Result<(), Error>;
47
48    fn process(
49        &self,
50        msg: &crate::event_message::signed_event_message::Message,
51    ) -> Result<(), Error> {
52        use crate::event_message::signed_event_message::Message;
53        #[cfg(feature = "query")]
54        use crate::event_message::signed_event_message::Op;
55        match msg {
56            Message::Notice(notice) => self.process_notice(notice),
57            #[cfg(any(feature = "query", feature = "oobi"))]
58            Message::Op(op) => match op {
59                #[cfg(feature = "query")]
60                Op::Query(_query) => panic!("processor can't handle query op"),
61                #[cfg(feature = "oobi")]
62                Op::Reply(reply) => self.process_op_reply(reply),
63                _ => todo!(),
64            },
65        }
66    }
67}
68
69pub struct EventProcessor<D: EventDatabase> {
70    events_db: Arc<D>,
71    db: Arc<SledEventDatabase>,
72    validator: EventValidator<D>,
73    publisher: NotificationBus,
74}
75
76impl<D: EventDatabase> EventProcessor<D> {
77    pub fn new(db: Arc<SledEventDatabase>, publisher: NotificationBus, events_db: Arc<D>) -> Self {
78        let validator = EventValidator::new(db.clone(), events_db.clone());
79        Self {
80            events_db,
81            db,
82            validator,
83            publisher,
84        }
85    }
86
87    pub fn register_observer(
88        &mut self,
89        observer: Arc<dyn Notifier + Send + Sync>,
90        notifications: Vec<JustNotification>,
91    ) -> Result<(), Error> {
92        self.publisher.register_observer(observer, notifications);
93        Ok(())
94    }
95
96    #[cfg(feature = "query")]
97    pub fn process_op_reply(&self, rpy: &SignedReply) -> Result<(), Error> {
98        use crate::processor::validator::MoreInfoError;
99
100        use self::validator::VerificationError;
101
102        match rpy.reply.get_route() {
103            ReplyRoute::Ksn(_, _) => match self.validator.process_signed_ksn_reply(rpy) {
104                Ok(_) => {
105                    self.db
106                        .update_accepted_reply(rpy.clone(), &rpy.reply.get_prefix())?;
107                }
108                Err(Error::VerificationError(VerificationError::MoreInfo(
109                    MoreInfoError::EventNotFound(_),
110                ))) => {
111                    self.publisher
112                        .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
113                }
114                Err(Error::EventOutOfOrderError) => {
115                    self.publisher
116                        .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
117                }
118                Err(anything) => return Err(anything),
119            },
120            _ => {}
121        }
122        Ok(())
123    }
124
125    pub fn process_notice<F>(&self, notice: &Notice, processing_strategy: F) -> Result<(), Error>
126    where
127        F: Fn(
128            Arc<D>,
129            Arc<SledEventDatabase>,
130            &NotificationBus,
131            SignedEventMessage,
132        ) -> Result<(), Error>,
133    {
134        match notice {
135            Notice::Event(signed_event) => {
136                processing_strategy(
137                    self.events_db.clone(),
138                    self.db.clone(),
139                    &self.publisher,
140                    signed_event.clone(),
141                )?;
142                // check if receipts are attached
143                if let Some(witness_receipts) = &signed_event.witness_receipts {
144                    // Create and process witness receipts
145                    let id = signed_event.event_message.data.get_prefix();
146                    let receipt = Receipt::new(
147                        SerializationFormats::JSON,
148                        signed_event.event_message.digest()?,
149                        id,
150                        signed_event.event_message.data.get_sn(),
151                    );
152                    let signed_receipt =
153                        SignedNontransferableReceipt::new(&receipt, witness_receipts.clone());
154                    self.process_notice(
155                        &Notice::NontransferableRct(signed_receipt),
156                        processing_strategy,
157                    )
158                } else {
159                    Ok(())
160                }
161            }
162            Notice::NontransferableRct(rct) => {
163                let id = &rct.body.prefix;
164                match self.validator.validate_witness_receipt(rct) {
165                    Ok(_) => {
166                        self.events_db
167                            .add_receipt_nt(rct.to_owned(), id)
168                            .map_err(|_| Error::DbError)?;
169                        self.publisher.notify(&Notification::ReceiptAccepted)
170                    }
171                    Err(Error::MissingEvent) => self
172                        .publisher
173                        .notify(&Notification::ReceiptOutOfOrder(rct.clone())),
174                    Err(e) => Err(e),
175                }
176            }
177            Notice::TransferableRct(vrc) => match self.validator.validate_validator_receipt(vrc) {
178                Ok(_) => {
179                    self.events_db
180                        .add_receipt_t(vrc.clone(), &vrc.body.prefix)
181                        .map_err(|_| Error::DbError)?;
182                    self.publisher.notify(&Notification::ReceiptAccepted)
183                }
184                Err(Error::MissingEvent) | Err(Error::EventOutOfOrderError) => self
185                    .publisher
186                    .notify(&Notification::TransReceiptOutOfOrder(vrc.clone())),
187                Err(e) => Err(e),
188            },
189        }
190    }
191}
192
193/// Compute State for Prefix
194///
195/// Returns the current State associated with
196/// the given Prefix
197pub fn compute_state<D: EventDatabase>(
198    db: Arc<D>,
199    id: &IdentifierPrefix,
200) -> Option<IdentifierState> {
201    if let Some(events) = db.get_kel_finalized_events(crate::database::QueryParameters::All { id })
202    {
203        // start with empty state
204        let mut state = IdentifierState::default();
205        // we sort here to get inception first
206        let mut sorted_events = events.collect::<Vec<TimestampedSignedEventMessage>>();
207        // TODO why identifier is in database if there are no events for it?
208        if sorted_events.is_empty() {
209            return None;
210        };
211        sorted_events.sort();
212        for event in sorted_events {
213            state = match state.clone().apply(&event.signed_event_message) {
214                Ok(s) => s,
215                // will happen when a recovery has overridden some part of the KEL,
216                Err(e) => match e {
217                    // skip out of order and partially signed events
218                    Error::EventOutOfOrderError | Error::NotEnoughSigsError => continue,
219                    // stop processing here
220                    _ => break,
221                },
222            };
223        }
224        Some(state)
225    } else {
226        // no inception event, no state
227        None
228    }
229}