keri_core/processor/
mod.rs1use std::sync::Arc;
2
3pub mod basic_processor;
4pub mod escrow;
5#[cfg(test)]
6mod escrow_tests;
7pub mod event_storage;
8pub mod notification;
9#[cfg(test)]
10mod processor_tests;
11
12pub mod validator;
13
14use said::version::format::SerializationFormats;
15
16use self::{
17 notification::{JustNotification, Notification, NotificationBus, Notifier},
18 validator::EventValidator,
19};
20#[cfg(feature = "query")]
21use crate::query::reply_event::{ReplyRoute, SignedReply};
22use crate::{
23 database::{timestamped::TimestampedSignedEventMessage, EventDatabase},
24 error::Error,
25 event::receipt::Receipt,
26 event_message::signed_event_message::{
27 Notice, SignedEventMessage, SignedNontransferableReceipt,
28 },
29 prefix::IdentifierPrefix,
30 state::IdentifierState,
31};
32
33pub trait Processor {
34 type Database: EventDatabase + 'static;
35 fn process_notice(&self, notice: &Notice) -> Result<(), Error>;
36
37 #[cfg(feature = "query")]
38 fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error>;
39
40 fn register_observer(
41 &mut self,
42 observer: Arc<dyn Notifier + Send + Sync>,
43 notifications: &[JustNotification],
44 ) -> Result<(), Error>;
45
46 fn process(
47 &self,
48 msg: &crate::event_message::signed_event_message::Message,
49 ) -> Result<(), Error> {
50 use crate::event_message::signed_event_message::Message;
51 #[cfg(feature = "query")]
52 use crate::event_message::signed_event_message::Op;
53 match msg {
54 Message::Notice(notice) => self.process_notice(notice),
55 #[cfg(any(feature = "query", feature = "oobi"))]
56 Message::Op(op) => match op {
57 #[cfg(feature = "query")]
58 Op::Query(_query) => panic!("processor can't handle query op"),
59 #[cfg(feature = "query")]
60 Op::Reply(reply) => self.process_op_reply(reply),
61 _ => todo!(),
62 },
63 }
64 }
65}
66
67pub struct EventProcessor<D: EventDatabase> {
68 events_db: Arc<D>,
69 validator: EventValidator<D>,
70 publisher: NotificationBus,
71}
72
73impl<D: EventDatabase + 'static> EventProcessor<D> {
85 pub fn new(publisher: NotificationBus, events_db: Arc<D>) -> Self {
86 let validator = EventValidator::new(events_db.clone());
87 Self {
88 events_db,
89 validator,
90 publisher,
91 }
92 }
93
94 pub fn register_observer(
95 &mut self,
96 observer: Arc<dyn Notifier + Send + Sync>,
97 notifications: Vec<JustNotification>,
98 ) -> Result<(), Error> {
99 self.publisher.register_observer(observer, notifications);
100 Ok(())
101 }
102
103 #[cfg(feature = "query")]
104 pub fn process_op_reply(&self, rpy: &SignedReply) -> Result<(), Error> {
105 use crate::processor::validator::MoreInfoError;
106
107 use self::validator::VerificationError;
108
109 match rpy.reply.get_route() {
110 ReplyRoute::Ksn(_, _) => match self.validator.process_signed_ksn_reply(rpy) {
111 Ok(_) => {
112 self.events_db
113 .save_reply(rpy.clone())
114 .map_err(|_e| Error::DbError)?;
115 }
116 Err(Error::VerificationError(VerificationError::MoreInfo(
117 MoreInfoError::EventNotFound(_),
118 ))) => {
119 self.publisher
120 .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
121 }
122 Err(Error::EventOutOfOrderError) => {
123 self.publisher
124 .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
125 }
126 Err(anything) => return Err(anything),
127 },
128 _ => {}
129 }
130 Ok(())
131 }
132
133 pub fn process_notice<F>(&self, notice: &Notice, processing_strategy: F) -> Result<(), Error>
134 where
135 F: Fn(
136 Arc<D>,
137 &NotificationBus,
139 SignedEventMessage,
140 ) -> Result<(), Error>,
141 {
142 match notice {
143 Notice::Event(signed_event) => {
144 processing_strategy(
145 self.events_db.clone(),
146 &self.publisher,
148 signed_event.clone(),
149 )?;
150 if let Some(witness_receipts) = &signed_event.witness_receipts {
152 let id = signed_event.event_message.data.get_prefix();
154 let receipt = Receipt::new(
155 SerializationFormats::JSON,
156 signed_event.event_message.digest()?,
157 id,
158 signed_event.event_message.data.get_sn(),
159 );
160 let signed_receipt =
161 SignedNontransferableReceipt::new(&receipt, witness_receipts.clone());
162 self.process_notice(
163 &Notice::NontransferableRct(signed_receipt),
164 processing_strategy,
165 )
166 } else {
167 Ok(())
168 }
169 }
170 Notice::NontransferableRct(rct) => {
171 let id = &rct.body.prefix;
172 match self.validator.validate_witness_receipt(rct) {
173 Ok(_) => {
174 self.events_db
175 .add_receipt_nt(rct.to_owned(), id)
176 .map_err(|_| Error::DbError)?;
177 self.publisher.notify(&Notification::ReceiptAccepted)
178 }
179 Err(Error::MissingEvent) => self
180 .publisher
181 .notify(&Notification::ReceiptOutOfOrder(rct.clone())),
182 Err(e) => Err(e),
183 }
184 }
185 Notice::TransferableRct(vrc) => match self.validator.validate_validator_receipt(vrc) {
186 Ok(_) => {
187 self.events_db
188 .add_receipt_t(vrc.clone(), &vrc.body.prefix)
189 .map_err(|_| Error::DbError)?;
190 self.publisher.notify(&Notification::ReceiptAccepted)
191 }
192 Err(Error::MissingEvent) | Err(Error::EventOutOfOrderError) => self
193 .publisher
194 .notify(&Notification::TransReceiptOutOfOrder(vrc.clone())),
195 Err(e) => Err(e),
196 },
197 }
198 }
199}
200
201pub fn compute_state<D: EventDatabase>(
206 db: Arc<D>,
207 id: &IdentifierPrefix,
208) -> Option<IdentifierState> {
209 if let Some(events) = db.get_kel_finalized_events(crate::database::QueryParameters::All { id })
210 {
211 let mut state = IdentifierState::default();
213 let mut sorted_events = events.collect::<Vec<TimestampedSignedEventMessage>>();
215 if sorted_events.is_empty() {
217 return None;
218 };
219 sorted_events.sort();
220 for event in sorted_events {
221 state = match state.clone().apply(&event.signed_event_message) {
222 Ok(s) => s,
223 Err(e) => match e {
225 Error::EventOutOfOrderError | Error::NotEnoughSigsError => continue,
227 _ => break,
229 },
230 };
231 }
232 Some(state)
233 } else {
234 None
236 }
237}