keri_core/processor/
basic_processor.rs1use std::sync::Arc;
2
3use super::{
4 notification::{JustNotification, Notification, NotificationBus, Notifier},
5 validator::EventValidator,
6 EventProcessor, Processor,
7};
8#[cfg(feature = "query")]
9use crate::query::reply_event::SignedReply;
10use crate::{
11 database::EventDatabase,
12 error::Error,
13 event_message::signed_event_message::{Notice, SignedEventMessage},
14};
15
16pub struct BasicProcessor<D: EventDatabase>(EventProcessor<D>);
17
18impl<D: EventDatabase + 'static> Processor for BasicProcessor<D> {
19 type Database = D;
20 fn register_observer(
21 &mut self,
22 observer: Arc<dyn Notifier + Send + Sync>,
23 notification: &[JustNotification],
24 ) -> Result<(), Error> {
25 self.0.register_observer(observer, notification.to_vec())
26 }
27
28 fn process_notice(&self, notice: &Notice) -> Result<(), Error> {
29 self.0
30 .process_notice(notice, BasicProcessor::basic_processing_strategy)?;
31 Ok(())
32 }
33
34 #[cfg(feature = "query")]
35 fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error> {
36 self.0.process_op_reply(reply)?;
37 Ok(())
38 }
39}
40
41impl<D: EventDatabase + 'static> BasicProcessor<D> {
42 pub fn new(db: Arc<D>, notification_bus: Option<NotificationBus>) -> Self {
43 let processor = EventProcessor::new(notification_bus.unwrap_or_default(), db.clone());
44 Self(processor)
45 }
46
47 fn basic_processing_strategy(
48 events_db: Arc<D>,
49 publisher: &NotificationBus,
50 signed_event: SignedEventMessage,
51 ) -> Result<(), Error> {
52 let id = &signed_event.event_message.data.get_prefix();
53 let validator = EventValidator::new(events_db.clone());
54 match validator.validate_event(&signed_event) {
55 Ok(_) => {
56 events_db
57 .add_kel_finalized_event(signed_event.clone(), id)
58 .map_err(|_e| Error::DbError)?;
59 publisher.notify(&Notification::KeyEventAdded(signed_event))
60 }
61 Err(Error::EventOutOfOrderError) => {
62 publisher.notify(&Notification::OutOfOrder(signed_event))
63 }
64 Err(Error::NotEnoughReceiptsError) => {
65 publisher.notify(&Notification::PartiallyWitnessed(signed_event))
66 }
67 Err(Error::NotEnoughSigsError) => {
68 publisher.notify(&Notification::PartiallySigned(signed_event))
69 }
70 Err(Error::EventDuplicateError) => {
71 publisher.notify(&Notification::DupliciousEvent(signed_event))
72 }
73 Err(Error::MissingDelegatingEventError | Error::MissingDelegatorSealError(_)) => {
74 publisher.notify(&Notification::MissingDelegatingEvent(signed_event))
75 }
76 Err(e) => Err(e),
77 }
78 }
79}