1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use std::sync::Arc;

use super::{
    notification::{JustNotification, Notification, NotificationBus, Notifier},
    validator::EventValidator,
    EventProcessor, Processor,
};
#[cfg(feature = "query")]
use crate::query::reply_event::SignedReply;
use crate::{
    database::SledEventDatabase,
    error::Error,
    event_message::signed_event_message::{Notice, SignedEventMessage},
};

pub struct BasicProcessor(EventProcessor);

impl Processor for BasicProcessor {
    fn register_observer(
        &mut self,
        observer: Arc<dyn Notifier + Send + Sync>,
        notification: &[JustNotification],
    ) -> Result<(), Error> {
        self.0.register_observer(observer, notification.to_vec())
    }

    fn process_notice(&self, notice: &Notice) -> Result<(), Error> {
        self.0
            .process_notice(notice, BasicProcessor::basic_processing_strategy)?;
        Ok(())
    }

    #[cfg(feature = "query")]
    fn process_op_reply(&self, reply: &SignedReply) -> Result<(), Error> {
        self.0.process_op_reply(reply)?;
        Ok(())
    }
}

impl BasicProcessor {
    pub fn new(db: Arc<SledEventDatabase>, notification_bus: Option<NotificationBus>) -> Self {
        let processor = EventProcessor::new(db.clone(), notification_bus.unwrap_or_default());
        Self(processor)
    }

    fn basic_processing_strategy(
        db: Arc<SledEventDatabase>,
        publisher: &NotificationBus,
        signed_event: SignedEventMessage,
    ) -> Result<(), Error> {
        let id = &signed_event.event_message.data.get_prefix();
        let validator = EventValidator::new(db.clone());
        match validator.validate_event(&signed_event) {
            Ok(_) => {
                db.add_kel_finalized_event(signed_event.clone(), id)?;
                publisher.notify(&Notification::KeyEventAdded(signed_event))
            }
            Err(Error::EventOutOfOrderError) => {
                publisher.notify(&Notification::OutOfOrder(signed_event))
            }
            Err(Error::NotEnoughReceiptsError) => {
                publisher.notify(&Notification::PartiallyWitnessed(signed_event))
            }
            Err(Error::NotEnoughSigsError) => {
                publisher.notify(&Notification::PartiallySigned(signed_event))
            }
            Err(Error::EventDuplicateError) => {
                db.add_duplicious_event(signed_event.clone(), id)?;
                publisher.notify(&Notification::DupliciousEvent(signed_event))
            }
            Err(Error::MissingDelegatingEventError | Error::MissingDelegatorSealError(_)) => {
                publisher.notify(&Notification::MissingDelegatingEvent(signed_event))
            }
            Err(e) => Err(e),
        }
    }
}