keri_core/processor/
basic_processor.rs

1use std::sync::Arc;
2
3use super::{
4    notification::{JustNotification, Notification, NotificationBus, Notifier},
5    validator::EventValidator,
6    EventProcessor, Processor,
7};
8#[cfg(feature = "query")]
9use crate::query::reply_event::SignedReply;
10use crate::{
11    database::EventDatabase,
12    error::Error,
13    event_message::signed_event_message::{Notice, SignedEventMessage},
14};
15
16pub struct BasicProcessor<D: EventDatabase>(EventProcessor<D>);
17
18impl<D: EventDatabase + 'static> Processor for BasicProcessor<D> {
19    type Database = D;
20    fn register_observer(
21        &mut self,
22        observer: Arc<dyn Notifier + Send + Sync>,
23        notification: &[JustNotification],
24    ) -> Result<(), Error> {
25        self.0.register_observer(observer, notification.to_vec())
26    }
27
28    fn process_notice(&self, notice: &Notice) -> Result<(), Error> {
29        self.0
30            .process_notice(notice, BasicProcessor::basic_processing_strategy)?;
31        Ok(())
32    }
33
34    #[cfg(feature = "query")]
35    fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error> {
36        self.0.process_op_reply(reply)?;
37        Ok(())
38    }
39}
40
41impl<D: EventDatabase + 'static> BasicProcessor<D> {
42    pub fn new(db: Arc<D>, notification_bus: Option<NotificationBus>) -> Self {
43        let processor = EventProcessor::new(notification_bus.unwrap_or_default(), db.clone());
44        Self(processor)
45    }
46
47    fn basic_processing_strategy(
48        events_db: Arc<D>,
49        publisher: &NotificationBus,
50        signed_event: SignedEventMessage,
51    ) -> Result<(), Error> {
52        let id = &signed_event.event_message.data.get_prefix();
53        let validator = EventValidator::new(events_db.clone());
54        match validator.validate_event(&signed_event) {
55            Ok(_) => {
56                events_db
57                    .add_kel_finalized_event(signed_event.clone(), id)
58                    .map_err(|_e| Error::DbError)?;
59                publisher.notify(&Notification::KeyEventAdded(signed_event))
60            }
61            Err(Error::EventOutOfOrderError) => {
62                publisher.notify(&Notification::OutOfOrder(signed_event))
63            }
64            Err(Error::NotEnoughReceiptsError) => {
65                publisher.notify(&Notification::PartiallyWitnessed(signed_event))
66            }
67            Err(Error::NotEnoughSigsError) => {
68                publisher.notify(&Notification::PartiallySigned(signed_event))
69            }
70            Err(Error::EventDuplicateError) => {
71                publisher.notify(&Notification::DupliciousEvent(signed_event))
72            }
73            Err(Error::MissingDelegatingEventError | Error::MissingDelegatorSealError(_)) => {
74                publisher.notify(&Notification::MissingDelegatingEvent(signed_event))
75            }
76            Err(e) => Err(e),
77        }
78    }
79}