1use std::sync::Arc;
2
3pub mod basic_processor;
4pub mod escrow;
5#[cfg(test)]
6mod escrow_tests;
7pub mod event_storage;
8pub mod notification;
9#[cfg(test)]
10mod processor_tests;
11
12pub mod validator;
13
14use said::version::format::SerializationFormats;
15
16use self::{
17 notification::{JustNotification, Notification, NotificationBus, Notifier},
18 validator::EventValidator,
19};
20#[cfg(feature = "query")]
21use crate::query::reply_event::{ReplyRoute, SignedReply};
22use crate::{
23 database::{
24 sled::SledEventDatabase, timestamped::TimestampedSignedEventMessage, EventDatabase,
25 },
26 error::Error,
27 event::receipt::Receipt,
28 event_message::signed_event_message::{
29 Notice, SignedEventMessage, SignedNontransferableReceipt,
30 },
31 prefix::IdentifierPrefix,
32 state::IdentifierState,
33};
34
35pub trait Processor {
36 type Database: EventDatabase;
37 fn process_notice(&self, notice: &Notice) -> Result<(), Error>;
38
39 #[cfg(feature = "query")]
40 fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error>;
41
42 fn register_observer(
43 &mut self,
44 observer: Arc<dyn Notifier + Send + Sync>,
45 notifications: &[JustNotification],
46 ) -> Result<(), Error>;
47
48 fn process(
49 &self,
50 msg: &crate::event_message::signed_event_message::Message,
51 ) -> Result<(), Error> {
52 use crate::event_message::signed_event_message::Message;
53 #[cfg(feature = "query")]
54 use crate::event_message::signed_event_message::Op;
55 match msg {
56 Message::Notice(notice) => self.process_notice(notice),
57 #[cfg(any(feature = "query", feature = "oobi"))]
58 Message::Op(op) => match op {
59 #[cfg(feature = "query")]
60 Op::Query(_query) => panic!("processor can't handle query op"),
61 #[cfg(feature = "oobi")]
62 Op::Reply(reply) => self.process_op_reply(reply),
63 _ => todo!(),
64 },
65 }
66 }
67}
68
69pub struct EventProcessor<D: EventDatabase> {
70 events_db: Arc<D>,
71 db: Arc<SledEventDatabase>,
72 validator: EventValidator<D>,
73 publisher: NotificationBus,
74}
75
76impl<D: EventDatabase> EventProcessor<D> {
77 pub fn new(db: Arc<SledEventDatabase>, publisher: NotificationBus, events_db: Arc<D>) -> Self {
78 let validator = EventValidator::new(db.clone(), events_db.clone());
79 Self {
80 events_db,
81 db,
82 validator,
83 publisher,
84 }
85 }
86
87 pub fn register_observer(
88 &mut self,
89 observer: Arc<dyn Notifier + Send + Sync>,
90 notifications: Vec<JustNotification>,
91 ) -> Result<(), Error> {
92 self.publisher.register_observer(observer, notifications);
93 Ok(())
94 }
95
96 #[cfg(feature = "query")]
97 pub fn process_op_reply(&self, rpy: &SignedReply) -> Result<(), Error> {
98 use crate::processor::validator::MoreInfoError;
99
100 use self::validator::VerificationError;
101
102 match rpy.reply.get_route() {
103 ReplyRoute::Ksn(_, _) => match self.validator.process_signed_ksn_reply(rpy) {
104 Ok(_) => {
105 self.db
106 .update_accepted_reply(rpy.clone(), &rpy.reply.get_prefix())?;
107 }
108 Err(Error::VerificationError(VerificationError::MoreInfo(
109 MoreInfoError::EventNotFound(_),
110 ))) => {
111 self.publisher
112 .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
113 }
114 Err(Error::EventOutOfOrderError) => {
115 self.publisher
116 .notify(&Notification::KsnOutOfOrder(rpy.clone()))?;
117 }
118 Err(anything) => return Err(anything),
119 },
120 _ => {}
121 }
122 Ok(())
123 }
124
125 pub fn process_notice<F>(&self, notice: &Notice, processing_strategy: F) -> Result<(), Error>
126 where
127 F: Fn(
128 Arc<D>,
129 Arc<SledEventDatabase>,
130 &NotificationBus,
131 SignedEventMessage,
132 ) -> Result<(), Error>,
133 {
134 match notice {
135 Notice::Event(signed_event) => {
136 processing_strategy(
137 self.events_db.clone(),
138 self.db.clone(),
139 &self.publisher,
140 signed_event.clone(),
141 )?;
142 if let Some(witness_receipts) = &signed_event.witness_receipts {
144 let id = signed_event.event_message.data.get_prefix();
146 let receipt = Receipt::new(
147 SerializationFormats::JSON,
148 signed_event.event_message.digest()?,
149 id,
150 signed_event.event_message.data.get_sn(),
151 );
152 let signed_receipt =
153 SignedNontransferableReceipt::new(&receipt, witness_receipts.clone());
154 self.process_notice(
155 &Notice::NontransferableRct(signed_receipt),
156 processing_strategy,
157 )
158 } else {
159 Ok(())
160 }
161 }
162 Notice::NontransferableRct(rct) => {
163 let id = &rct.body.prefix;
164 match self.validator.validate_witness_receipt(rct) {
165 Ok(_) => {
166 self.events_db
167 .add_receipt_nt(rct.to_owned(), id)
168 .map_err(|_| Error::DbError)?;
169 self.publisher.notify(&Notification::ReceiptAccepted)
170 }
171 Err(Error::MissingEvent) => self
172 .publisher
173 .notify(&Notification::ReceiptOutOfOrder(rct.clone())),
174 Err(e) => Err(e),
175 }
176 }
177 Notice::TransferableRct(vrc) => match self.validator.validate_validator_receipt(vrc) {
178 Ok(_) => {
179 self.events_db
180 .add_receipt_t(vrc.clone(), &vrc.body.prefix)
181 .map_err(|_| Error::DbError)?;
182 self.publisher.notify(&Notification::ReceiptAccepted)
183 }
184 Err(Error::MissingEvent) | Err(Error::EventOutOfOrderError) => self
185 .publisher
186 .notify(&Notification::TransReceiptOutOfOrder(vrc.clone())),
187 Err(e) => Err(e),
188 },
189 }
190 }
191}
192
193pub fn compute_state<D: EventDatabase>(
198 db: Arc<D>,
199 id: &IdentifierPrefix,
200) -> Option<IdentifierState> {
201 if let Some(events) = db.get_kel_finalized_events(crate::database::QueryParameters::All { id })
202 {
203 let mut state = IdentifierState::default();
205 let mut sorted_events = events.collect::<Vec<TimestampedSignedEventMessage>>();
207 if sorted_events.is_empty() {
209 return None;
210 };
211 sorted_events.sort();
212 for event in sorted_events {
213 state = match state.clone().apply(&event.signed_event_message) {
214 Ok(s) => s,
215 Err(e) => match e {
217 Error::EventOutOfOrderError | Error::NotEnoughSigsError => continue,
219 _ => break,
221 },
222 };
223 }
224 Some(state)
225 } else {
226 None
228 }
229}