keri_core/processor/
escrow.rs

1use std::{fmt::Debug, sync::Arc, time::Duration};
2
3use said::SelfAddressingIdentifier;
4
5use super::{
6    event_storage::EventStorage,
7    notification::{JustNotification, Notification, NotificationBus, Notifier},
8    validator::EventValidator,
9};
10use crate::{
11    database::{
12        escrow::{Escrow, EscrowDb},
13        sled::SledEventDatabase,
14        EventDatabase,
15    },
16    error::Error,
17    event::{
18        event_data::EventData,
19        sections::seal::{EventSeal, Seal, SourceSeal},
20        KeyEvent,
21    },
22    event_message::{
23        msg::KeriEvent,
24        signature::Nontransferable,
25        signed_event_message::{
26            SignedEventMessage, SignedNontransferableReceipt, SignedTransferableReceipt,
27        },
28    },
29    prefix::{BasicPrefix, IdentifierPrefix, SelfSigningPrefix},
30};
31#[cfg(feature = "query")]
32use crate::{
33    processor::validator::{MoreInfoError, VerificationError},
34    query::reply_event::ReplyRoute,
35};
36
37#[derive(Debug, Clone)]
38pub struct EscrowConfig {
39    pub out_of_order_timeout: Duration,
40    pub partially_signed_timeout: Duration,
41    pub partially_witnessed_timeout: Duration,
42    pub trans_receipt_timeout: Duration,
43    pub delegation_timeout: Duration,
44}
45
46impl Default for EscrowConfig {
47    fn default() -> Self {
48        Self {
49            out_of_order_timeout: Duration::from_secs(60),
50            partially_signed_timeout: Duration::from_secs(60),
51            partially_witnessed_timeout: Duration::from_secs(60),
52            trans_receipt_timeout: Duration::from_secs(60),
53            delegation_timeout: Duration::from_secs(60),
54        }
55    }
56}
57
58pub fn default_escrow_bus<D: EventDatabase + Send + Sync + 'static>(
59    event_db: Arc<D>,
60    sled_db: Arc<SledEventDatabase>,
61    escrow_db: Arc<EscrowDb>,
62    escrow_config: EscrowConfig,
63) -> (
64    NotificationBus,
65    (
66        Arc<OutOfOrderEscrow<D>>,
67        Arc<PartiallySignedEscrow<D>>,
68        Arc<PartiallyWitnessedEscrow<D>>,
69        Arc<DelegationEscrow<D>>,
70    ),
71) {
72    let mut bus = NotificationBus::new();
73
74    // Register out of order escrow, to save and reprocess out of order events
75    let ooo_escrow = Arc::new(OutOfOrderEscrow::new(
76        event_db.clone(),
77        sled_db.clone(),
78        escrow_db.clone(),
79        escrow_config.out_of_order_timeout,
80    ));
81    bus.register_observer(
82        ooo_escrow.clone(),
83        vec![
84            JustNotification::OutOfOrder,
85            JustNotification::KeyEventAdded,
86        ],
87    );
88
89    let ps_escrow = Arc::new(PartiallySignedEscrow::new(
90        event_db.clone(),
91        sled_db.clone(),
92        escrow_db.clone(),
93        escrow_config.partially_signed_timeout,
94    ));
95    bus.register_observer(ps_escrow.clone(), vec![JustNotification::PartiallySigned]);
96
97    let pw_escrow = Arc::new(PartiallyWitnessedEscrow::new(
98        event_db.clone(),
99        sled_db.clone(),
100        escrow_db.clone(),
101        escrow_config.partially_witnessed_timeout,
102    ));
103    bus.register_observer(
104        pw_escrow.clone(),
105        vec![
106            JustNotification::PartiallyWitnessed,
107            JustNotification::ReceiptOutOfOrder,
108        ],
109    );
110
111    bus.register_observer(
112        Arc::new(TransReceiptsEscrow::new(
113            event_db.clone(),
114            sled_db.clone(),
115            escrow_db.clone(),
116            escrow_config.trans_receipt_timeout,
117        )),
118        vec![
119            JustNotification::KeyEventAdded,
120            JustNotification::TransReceiptOutOfOrder,
121        ],
122    );
123
124    let delegation_escrow = Arc::new(DelegationEscrow::new(
125        event_db,
126        sled_db,
127        escrow_db,
128        escrow_config.delegation_timeout,
129    ));
130    bus.register_observer(
131        delegation_escrow.clone(),
132        vec![
133            JustNotification::MissingDelegatingEvent,
134            JustNotification::KeyEventAdded,
135        ],
136    );
137
138    (bus, (ooo_escrow, ps_escrow, pw_escrow, delegation_escrow))
139}
140
141pub struct OutOfOrderEscrow<D: EventDatabase> {
142    db: Arc<D>,
143    sled_db: Arc<SledEventDatabase>,
144    pub escrowed_out_of_order: Escrow<SignedEventMessage>,
145}
146
147impl<D: EventDatabase> OutOfOrderEscrow<D> {
148    pub fn new(
149        db: Arc<D>,
150        sled_db: Arc<SledEventDatabase>,
151        escrow_db: Arc<EscrowDb>,
152        duration: Duration,
153    ) -> Self {
154        let escrow = Escrow::new(b"ooes", duration, escrow_db);
155        Self {
156            db,
157            sled_db,
158            escrowed_out_of_order: escrow,
159        }
160    }
161
162    pub fn get_event_by_sn_and_digest(
163        &self,
164        sn: u64,
165        id: &IdentifierPrefix,
166        event_digest: &SelfAddressingIdentifier,
167    ) -> Option<SignedEventMessage> {
168        self.escrowed_out_of_order.get(id).and_then(|mut events| {
169            events.find(|event| {
170                event.event_message.data.sn == sn
171                    && &event.event_message.data.prefix == id
172                    && event.event_message.digest().ok().as_ref() == Some(event_digest)
173            })
174        })
175    }
176}
177impl<D: EventDatabase> Notifier for OutOfOrderEscrow<D> {
178    fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
179        match notification {
180            Notification::KeyEventAdded(ev_message) => {
181                let id = ev_message.event_message.data.get_prefix();
182                self.process_out_of_order_events(bus, &id)?;
183            }
184            Notification::OutOfOrder(signed_event) => {
185                // ignore events with no signatures
186                if !signed_event.signatures.is_empty() {
187                    let id = signed_event.event_message.data.get_prefix();
188                    self.escrowed_out_of_order.add(&id, signed_event.clone())?;
189                }
190            }
191            _ => return Err(Error::SemanticError("Wrong notification".into())),
192        }
193
194        Ok(())
195    }
196}
197
198impl<D: EventDatabase> OutOfOrderEscrow<D> {
199    pub fn process_out_of_order_events(
200        &self,
201        bus: &NotificationBus,
202        id: &IdentifierPrefix,
203    ) -> Result<(), Error> {
204        if let Some(esc) = self.escrowed_out_of_order.get(id) {
205            for event in esc {
206                let validator = EventValidator::new(self.sled_db.clone(), self.db.clone());
207                match validator.validate_event(&event) {
208                    Ok(_) => {
209                        // add to kel
210                        self.db
211                            .add_kel_finalized_event(event.clone(), id)
212                            .map_err(|_| Error::DbError)?;
213                        // remove from escrow
214                        self.escrowed_out_of_order.remove(id, &event)?;
215                        bus.notify(&Notification::KeyEventAdded(event))?;
216                        // stop processing the escrow if kel was updated. It needs to start again.
217                        break;
218                    }
219                    Err(Error::SignatureVerificationError) => {
220                        // remove from escrow
221                        self.escrowed_out_of_order.remove(id, &event)?;
222                    }
223                    Err(_e) => (), // keep in escrow,
224                }
225            }
226        };
227
228        Ok(())
229    }
230}
231
232pub struct PartiallySignedEscrow<D: EventDatabase> {
233    db: Arc<D>,
234    old_db: Arc<SledEventDatabase>,
235    pub escrowed_partially_signed: Escrow<SignedEventMessage>,
236}
237
238impl<D: EventDatabase> PartiallySignedEscrow<D> {
239    pub fn new(
240        db: Arc<D>,
241        sled_db: Arc<SledEventDatabase>,
242        escrow_db: Arc<EscrowDb>,
243        duration: Duration,
244    ) -> Self {
245        let escrow = Escrow::new(b"pses", duration, escrow_db);
246        Self {
247            db,
248            old_db: sled_db,
249            escrowed_partially_signed: escrow,
250        }
251    }
252}
253
254impl<D: EventDatabase> PartiallySignedEscrow<D> {
255    pub fn get_partially_signed_for_event(
256        &self,
257        event: KeriEvent<KeyEvent>,
258    ) -> Option<impl DoubleEndedIterator<Item = SignedEventMessage>> {
259        let id = event.data.get_prefix();
260        self.escrowed_partially_signed
261            .get(&id)
262            .map(|events| events.filter(move |ev| ev.event_message == event))
263    }
264
265    fn remove_partially_signed(&self, event: &KeriEvent<KeyEvent>) -> Result<(), Error> {
266        let id = event.data.get_prefix();
267        self.escrowed_partially_signed.get(&id).map(|events| {
268            events
269                .filter(|ev| &ev.event_message == event)
270                .try_for_each(|ev| self.escrowed_partially_signed.remove(&id, &ev))
271        });
272        Ok(())
273    }
274}
275impl<D: EventDatabase> Notifier for PartiallySignedEscrow<D> {
276    fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
277        match notification {
278            Notification::PartiallySigned(ev) => {
279                if ev.signatures.is_empty() {
280                    // ignore events with no signatures
281                    Ok(())
282                } else {
283                    self.process_partially_signed_events(bus, ev)
284                }
285            }
286            _ => Err(Error::SemanticError("Wrong notification".into())),
287        }
288    }
289}
290
291impl<D: EventDatabase> PartiallySignedEscrow<D> {
292    pub fn process_partially_signed_events(
293        &self,
294        bus: &NotificationBus,
295        signed_event: &SignedEventMessage,
296    ) -> Result<(), Error> {
297        let id = signed_event.event_message.data.get_prefix();
298        if let Some(esc) = self
299            .escrowed_partially_signed
300            .get(&id)
301            .map(|events| events.filter(|event| event.event_message == signed_event.event_message))
302        {
303            let mut signatures = esc.flat_map(|ev| ev.signatures).collect::<Vec<_>>();
304            let signatures_from_event = signed_event.signatures.clone();
305            let without_duplicates = signatures_from_event
306                .into_iter()
307                .filter(|sig| !signatures.contains(sig))
308                .collect::<Vec<_>>();
309
310            signatures.append(&mut without_duplicates.clone());
311
312            let new_event = SignedEventMessage {
313                signatures,
314                ..signed_event.to_owned()
315            };
316
317            let validator = EventValidator::new(self.old_db.clone(), self.db.clone());
318            match validator.validate_event(&new_event) {
319                Ok(_) => {
320                    // add to kel
321                    self.db
322                        .add_kel_finalized_event(new_event.clone(), &id)
323                        .unwrap_or_default();
324                    // remove from escrow
325                    self.remove_partially_signed(&new_event.event_message)?;
326                    bus.notify(&Notification::KeyEventAdded(new_event))?;
327                }
328                Err(Error::NotEnoughReceiptsError) => {
329                    // remove from escrow
330                    self.remove_partially_signed(&new_event.event_message)?;
331                    bus.notify(&Notification::PartiallyWitnessed(new_event))?;
332                }
333                Err(Error::MissingDelegatingEventError)
334                | Err(Error::MissingDelegatorSealError(_)) => {
335                    // remove from escrow
336                    self.remove_partially_signed(&new_event.event_message)?;
337                    bus.notify(&Notification::MissingDelegatingEvent(new_event))?;
338                }
339                Err(Error::SignatureVerificationError) => {
340                    // ignore
341                }
342                Err(Error::NotEnoughSigsError) => {
343                    //keep in escrow and save new partially signed event
344                    let to_add = SignedEventMessage {
345                        signatures: without_duplicates,
346                        ..signed_event.to_owned()
347                    };
348                    self.escrowed_partially_signed.add(&id, to_add)?;
349                }
350                Err(_e) => {
351                    // keep in escrow
352                }
353            }
354        } else {
355            self.escrowed_partially_signed
356                .add(&id, signed_event.clone())?;
357        };
358
359        Ok(())
360    }
361}
362
363/// Store partially witnessed events and nontransferable receipts of events that
364/// wasn't accepted into kel yet.
365pub struct PartiallyWitnessedEscrow<D: EventDatabase> {
366    db: Arc<D>,
367    old_db: Arc<SledEventDatabase>,
368    pub(crate) escrowed_partially_witnessed: Escrow<SignedEventMessage>,
369    pub(crate) escrowed_nontranferable_receipts: Escrow<SignedNontransferableReceipt>,
370}
371
372impl<D: EventDatabase> PartiallyWitnessedEscrow<D> {
373    pub fn new(
374        db: Arc<D>,
375        old_db: Arc<SledEventDatabase>,
376        escrow_db: Arc<EscrowDb>,
377        duration: Duration,
378    ) -> Self {
379        Self {
380            db,
381            old_db,
382            escrowed_partially_witnessed: Escrow::new(b"pwes", duration, escrow_db.clone()),
383            escrowed_nontranferable_receipts: Escrow::new(b"ures", duration, escrow_db.clone()),
384        }
385    }
386
387    /// Return escrowed partially witness events of given identifier, sn and
388    /// digest.
389    pub fn get_event_by_sn_and_digest(
390        &self,
391        sn: u64,
392        id: &IdentifierPrefix,
393        event_digest: &SelfAddressingIdentifier,
394    ) -> Option<SignedEventMessage> {
395        self.escrowed_partially_witnessed
396            .get(id)
397            .and_then(|mut events| {
398                events.find(|event| {
399                    event.event_message.data.sn == sn
400                        && &event.event_message.data.prefix == id
401                        && event.event_message.digest().ok().as_ref() == Some(event_digest)
402                })
403            })
404    }
405
406    fn get_escrowed_receipts(
407        &self,
408        id: &IdentifierPrefix,
409        sn: u64,
410        digest: &SelfAddressingIdentifier,
411    ) -> Option<Vec<SignedNontransferableReceipt>> {
412        self.escrowed_nontranferable_receipts.get(id).map(|r| {
413            r.filter(|rct| rct.body.sn == sn && &rct.body.receipted_event_digest == digest)
414                // TODO avoid collect
415                .collect()
416        })
417    }
418
419    pub fn get_partially_witnessed_events(&self) -> Vec<SignedEventMessage> {
420        match self.escrowed_partially_witnessed.get_all() {
421            Some(events) => events.collect(),
422            None => vec![],
423        }
424    }
425
426    /// Saves nontransferable receipt in escrow.
427    fn escrow_receipt(
428        &self,
429        receipt: SignedNontransferableReceipt,
430        bus: &NotificationBus,
431    ) -> Result<(), Error> {
432        if receipt.signatures.is_empty() {
433            // ignore events with no signatures
434            Ok(())
435        } else {
436            let id = &receipt.body.prefix;
437            self.escrowed_nontranferable_receipts
438                .add(id, receipt.clone())?;
439            bus.notify(&Notification::ReceiptEscrowed)
440        }
441    }
442
443    fn accept_receipts_for(&self, event: &SignedEventMessage) -> Result<(), Error> {
444        let id = event.event_message.data.get_prefix();
445        Ok(self
446            .get_escrowed_receipts(
447                &id,
448                event.event_message.data.get_sn(),
449                &event.event_message.digest()?,
450            )
451            .unwrap_or_default()
452            .into_iter()
453            .try_for_each(|receipt| {
454                self.escrowed_nontranferable_receipts
455                    .remove(&id, &receipt)
456                    .unwrap();
457                self.db.add_receipt_nt(receipt.clone(), &id)
458            })
459            .unwrap_or_default())
460    }
461
462    /// Returns receipt couplets of event
463    fn get_receipt_couplets(
464        &self,
465        rct: &SignedNontransferableReceipt,
466        witnesses: &[BasicPrefix],
467    ) -> Result<Vec<(BasicPrefix, SelfSigningPrefix)>, Error> {
468        let (mut indexed, mut couplets) = (vec![], vec![]);
469        rct.signatures.iter().for_each(|signature| match signature {
470            Nontransferable::Indexed(indexed_sigs) => indexed.append(&mut indexed_sigs.clone()),
471            Nontransferable::Couplet(couplets_sigs) => couplets.append(&mut couplets_sigs.clone()),
472        });
473
474        let indexes: Result<Vec<(BasicPrefix, SelfSigningPrefix)>, Error> = indexed
475            .iter()
476            .map(|inx| -> Result<_, _> {
477                Ok((
478                    witnesses
479                        .get(inx.index.current() as usize)
480                        .ok_or_else(|| Error::SemanticError("No matching witness prefix".into()))?
481                        .clone(),
482                    inx.signature.to_owned(),
483                ))
484            })
485            .collect();
486
487        Ok(couplets.into_iter().chain(indexes?).collect())
488    }
489
490    /// Verify escrowed receipts and remove those with wrong
491    /// signatures.
492    pub fn validate_receipt(
493        &self,
494        rct: &SignedNontransferableReceipt,
495        receipted_event: &SignedEventMessage,
496        witnesses: &[BasicPrefix],
497    ) -> Result<(), Error> {
498        // verify receipts signatuers
499        let serialized_event = receipted_event.event_message.encode()?;
500        self.get_receipt_couplets(rct, witnesses)?
501            .into_iter()
502            .try_for_each(|(witness, signature)| {
503                if witness.verify(&serialized_event, &signature)? {
504                    Ok(())
505                } else {
506                    Err(Error::SignatureVerificationError)
507                }
508            })
509            .map_err(|e| {
510                // remove from escrow if any signature is wrong
511                match self
512                    .escrowed_nontranferable_receipts
513                    .remove(&rct.body.prefix, rct)
514                {
515                    Ok(_) => e,
516                    Err(e) => e.into(),
517                }
518            })
519    }
520
521    pub fn validate_partialy_witnessed(
522        &self,
523        receipted_event: &SignedEventMessage,
524        additional_receipt: Option<SignedNontransferableReceipt>,
525    ) -> Result<(), Error> {
526        let storage = EventStorage::new(self.db.clone(), self.old_db.clone());
527        let id = receipted_event.event_message.data.get_prefix();
528        let sn = receipted_event.event_message.data.get_sn();
529        let digest = receipted_event.event_message.digest()?;
530        let new_state = storage
531            .get_state(&id)
532            .unwrap_or_default()
533            .apply(receipted_event)?;
534
535        // Verify additional receipt signature
536        if let Some(ref receipt) = additional_receipt {
537            let couplets =
538                self.get_receipt_couplets(receipt, &new_state.witness_config.witnesses)?;
539            couplets.iter().try_for_each(|(bp, sp)| {
540                bp.verify(&receipted_event.event_message.encode()?, sp)?
541                    .then_some(())
542                    .ok_or(Error::ReceiptVerificationError)
543            })?;
544        }
545        // Verify receipted event signatures
546        new_state
547            .current
548            .verify(
549                &receipted_event.event_message.encode()?,
550                &receipted_event.signatures,
551            )?
552            .then_some(())
553            .ok_or(Error::SignatureVerificationError)?;
554
555        // Verify signatures of all receipts and remove those with wrong signatures
556        let (couplets, indexed) = self
557            .get_escrowed_receipts(&id, sn, &digest)
558            .unwrap_or_default()
559            .into_iter()
560            .filter(|rct| {
561                let rr = self.validate_receipt(
562                    rct,
563                    receipted_event,
564                    &new_state.witness_config.witnesses,
565                );
566                rr.is_ok()
567            })
568            .chain(if let Some(rct) = additional_receipt {
569                vec![rct]
570            } else {
571                Vec::default()
572            })
573            .fold(
574                (vec![], vec![]),
575                |(mut all_couplets, mut all_indexed), snr| {
576                    snr.signatures.into_iter().for_each(|signature| {
577                        match signature {
578                            Nontransferable::Indexed(indexed_sigs) => {
579                                all_indexed.append(&mut indexed_sigs.clone())
580                            }
581                            Nontransferable::Couplet(couplets_sigs) => {
582                                all_couplets.append(&mut couplets_sigs.clone())
583                            }
584                        };
585                    });
586                    (all_couplets, all_indexed)
587                },
588            );
589        // check if there is enough of receipts
590        new_state
591            .witness_config
592            .enough_receipts(couplets, indexed)?
593            .then_some(())
594            .ok_or(Error::NotEnoughReceiptsError)
595    }
596}
597
598impl<D: EventDatabase> Notifier for PartiallyWitnessedEscrow<D> {
599    fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
600        match notification {
601            Notification::ReceiptOutOfOrder(ooo) => {
602                // Receipted event wasn't accepted into kel yet, so check escrowed
603                // partailly witnessed events.
604                let sn = ooo.body.sn;
605                let id = ooo.body.prefix.clone();
606                // look for receipted event in partially witnessed. If there's no event yet, escrow receipt.
607                match self.get_event_by_sn_and_digest(sn, &id, &ooo.body.receipted_event_digest) {
608                    None => self.escrow_receipt(ooo.clone(), bus),
609                    Some(receipted_event) => {
610                        // verify receipt signature
611                        match self
612                            .validate_partialy_witnessed(&receipted_event, Some(ooo.to_owned()))
613                        {
614                            Ok(_) => {
615                                // accept event and remove receipts
616                                self.db
617                                    .add_kel_finalized_event(receipted_event.clone(), &id)
618                                    .map_err(|_| Error::DbError)?;
619                                // remove from escrow
620                                self.escrowed_partially_witnessed
621                                    .remove(&id, &receipted_event)?;
622                                // accept receipts and remove them from escrow
623                                self.accept_receipts_for(&receipted_event)?;
624                                self.db
625                                    .add_receipt_nt(ooo.to_owned(), &id)
626                                    .map_err(|_| Error::DbError)?;
627                                bus.notify(&Notification::KeyEventAdded(receipted_event))?;
628                            }
629                            Err(Error::SignatureVerificationError) => {
630                                // remove from escrow
631                                self.escrowed_partially_witnessed
632                                    .remove(&id, &receipted_event)?;
633                            }
634                            Err(Error::ReceiptVerificationError) => {
635                                // ignore receipt with wrong signature
636                            }
637                            // save receipt in escrow
638                            Err(_e) => {
639                                self.escrow_receipt(ooo.clone(), bus)?;
640                            }
641                        }
642                        Ok(())
643                    }
644                }
645            }
646            Notification::PartiallyWitnessed(signed_event) => {
647                // ignore events with no signatures
648                if !signed_event.signatures.is_empty() {
649                    let id = signed_event.event_message.data.get_prefix();
650                    match self.validate_partialy_witnessed(signed_event, None) {
651                        Ok(_) => {
652                            self.escrowed_partially_witnessed
653                                .add(&id, signed_event.clone())?;
654                        }
655                        Err(Error::SignatureVerificationError) => (),
656                        Err(_) => {
657                            self.escrowed_partially_witnessed
658                                .add(&id, signed_event.clone())?;
659                        }
660                    };
661                    Ok(())
662                } else {
663                    Ok(())
664                }
665            }
666            _ => Err(Error::SemanticError("Wrong notification".into())),
667        }
668    }
669}
670
671pub struct TransReceiptsEscrow<D: EventDatabase> {
672    db: Arc<D>,
673    old_db: Arc<SledEventDatabase>,
674    pub(crate) escrowed_trans_receipts: Escrow<SignedTransferableReceipt>,
675}
676impl<D: EventDatabase> TransReceiptsEscrow<D> {
677    pub fn new(
678        db: Arc<D>,
679        sled_db: Arc<SledEventDatabase>,
680        escrow_db: Arc<EscrowDb>,
681        duration: Duration,
682    ) -> Self {
683        Self {
684            db,
685            old_db: sled_db,
686            escrowed_trans_receipts: Escrow::new(b"vres", duration, escrow_db.clone()),
687        }
688    }
689}
690impl<D: EventDatabase> Notifier for TransReceiptsEscrow<D> {
691    fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
692        match notification {
693            Notification::KeyEventAdded(event) => {
694                self.process_t_receipts_escrow(&event.event_message.data.get_prefix(), bus)?;
695            }
696            Notification::TransReceiptOutOfOrder(receipt) => {
697                // ignore events with no signatures
698                if !receipt.signatures.is_empty() {
699                    let id = receipt.validator_seal.prefix.clone();
700                    self.escrowed_trans_receipts.add(&id, receipt.to_owned())?;
701                }
702            }
703            _ => return Err(Error::SemanticError("Wrong notification".into())),
704        }
705        Ok(())
706    }
707}
708impl<D: EventDatabase> TransReceiptsEscrow<D> {
709    pub fn process_t_receipts_escrow(
710        &self,
711        id: &IdentifierPrefix,
712        bus: &NotificationBus,
713    ) -> Result<(), Error> {
714        if let Some(esc) = self.escrowed_trans_receipts.get(id) {
715            for timestamped_receipt in esc {
716                let validator = EventValidator::new(self.old_db.clone(), self.db.clone());
717                match validator.validate_validator_receipt(&timestamped_receipt) {
718                    Ok(_) => {
719                        // add to receipts
720                        self.db
721                            .add_receipt_t(timestamped_receipt.clone(), id)
722                            .map_err(|_| Error::DbError)?;
723                        // remove from escrow
724                        self.escrowed_trans_receipts
725                            .remove(id, &timestamped_receipt)?;
726                        bus.notify(&Notification::ReceiptAccepted)?;
727                    }
728                    Err(Error::SignatureVerificationError) => {
729                        // remove from escrow
730                        self.escrowed_trans_receipts
731                            .remove(id, &timestamped_receipt)?;
732                    }
733                    Err(e) => return Err(e), // keep in escrow,
734                }
735            }
736        };
737
738        Ok(())
739    }
740}
741
742#[cfg(feature = "query")]
743#[derive(Clone)]
744pub struct ReplyEscrow<D: EventDatabase> {
745    events_db: Arc<D>,
746    escrow_db: Arc<SledEventDatabase>,
747}
748
749#[cfg(feature = "query")]
750impl<D: EventDatabase> ReplyEscrow<D> {
751    pub fn new(db: Arc<SledEventDatabase>, events_db: Arc<D>) -> Self {
752        Self {
753            escrow_db: db,
754            events_db,
755        }
756    }
757}
758#[cfg(feature = "query")]
759impl<D: EventDatabase> Notifier for ReplyEscrow<D> {
760    fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
761        match notification {
762            Notification::KsnOutOfOrder(rpy) => {
763                if let ReplyRoute::Ksn(_id, ksn) = rpy.reply.get_route() {
764                    self.escrow_db
765                        .add_escrowed_reply(rpy.clone(), &ksn.state.prefix)?;
766                };
767                Ok(())
768            }
769            &Notification::KeyEventAdded(_) => self.process_reply_escrow(bus),
770            _ => Ok(()),
771        }
772    }
773}
774
775#[cfg(feature = "query")]
776impl<D: EventDatabase> ReplyEscrow<D> {
777    pub fn process_reply_escrow(&self, _bus: &NotificationBus) -> Result<(), Error> {
778        use crate::query::QueryError;
779
780        if let Some(esc) = self.escrow_db.get_all_escrowed_replys() {
781            for sig_rep in esc {
782                let validator = EventValidator::new(self.escrow_db.clone(), self.events_db.clone());
783                let id = if let ReplyRoute::Ksn(_id, ksn) = sig_rep.reply.get_route() {
784                    Ok(ksn.state.prefix)
785                } else {
786                    Err(Error::SemanticError("Wrong event type".into()))
787                }?;
788                match validator.process_signed_ksn_reply(&sig_rep) {
789                    Ok(_) => {
790                        self.escrow_db.remove_escrowed_reply(&id, &sig_rep)?;
791                        self.escrow_db.update_accepted_reply(sig_rep, &id)?;
792                    }
793                    Err(Error::SignatureVerificationError)
794                    | Err(Error::QueryError(QueryError::StaleRpy)) => {
795                        // remove from escrow
796                        self.escrow_db.remove_escrowed_reply(&id, &sig_rep)?;
797                    }
798                    Err(Error::EventOutOfOrderError)
799                    | Err(Error::VerificationError(VerificationError::MoreInfo(
800                        MoreInfoError::EventNotFound(_),
801                    ))) => (), // keep in escrow,
802                    Err(e) => return Err(e),
803                };
804            }
805        };
806        Ok(())
807    }
808}
809
810/// Stores delegated events until delegating event is provided
811pub struct DelegationEscrow<D: EventDatabase> {
812    db: Arc<D>,
813    sled_db: Arc<SledEventDatabase>,
814    pub delegation_escrow: Escrow<SignedEventMessage>,
815}
816
817impl<D: EventDatabase> DelegationEscrow<D> {
818    pub fn new(
819        db: Arc<D>,
820        sled_db: Arc<SledEventDatabase>,
821        escrow_db: Arc<EscrowDb>,
822        duration: Duration,
823    ) -> Self {
824        let escrow = Escrow::new(b"dees", duration, escrow_db);
825        Self {
826            db,
827            sled_db,
828            delegation_escrow: escrow,
829        }
830    }
831
832    pub fn get_event_by_sn_and_digest(
833        &self,
834        sn: u64,
835        delegator_id: &IdentifierPrefix,
836        event_digest: &SelfAddressingIdentifier,
837    ) -> Option<SignedEventMessage> {
838        self.delegation_escrow
839            .get(delegator_id)
840            .and_then(|mut events| {
841                events.find(|event| {
842                    event.event_message.data.sn == sn
843                        && event.event_message.digest().ok().as_ref() == Some(event_digest)
844                })
845            })
846    }
847}
848
849impl<D: EventDatabase> Notifier for DelegationEscrow<D> {
850    fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
851        match notification {
852            Notification::KeyEventAdded(ev_message) => {
853                // delegator's prefix
854                let id = ev_message.event_message.data.get_prefix();
855                // get anchored data
856                let anchored_data: Vec<Seal> = match &ev_message.event_message.data.event_data {
857                    EventData::Icp(icp) => icp.data.clone(),
858                    EventData::Rot(rot) => rot.data.clone(),
859                    EventData::Ixn(ixn) => ixn.data.clone(),
860                    EventData::Dip(dip) => dip.inception_data.data.clone(),
861                    EventData::Drt(drt) => drt.data.clone(),
862                };
863
864                let seals: Vec<EventSeal> = anchored_data
865                    .into_iter()
866                    .filter_map(|seal| match seal {
867                        Seal::Event(es) => Some(es),
868                        _ => None,
869                    })
870                    .collect();
871                if !seals.is_empty() {
872                    let potential_delegator_seal = SourceSeal {
873                        sn: ev_message.event_message.data.get_sn(),
874                        digest: ev_message.event_message.digest()?,
875                    };
876                    self.process_delegation_events(bus, &id, seals, potential_delegator_seal)?;
877                }
878            }
879            Notification::MissingDelegatingEvent(signed_event) => {
880                // ignore events with no signatures
881                if !signed_event.signatures.is_empty() {
882                    let delegators_id = match &signed_event.event_message.data.event_data {
883                        EventData::Dip(dip) => Ok(dip.delegator.clone()),
884                        EventData::Drt(_drt) => {
885                            let storage = EventStorage::new(self.db.clone(), self.sled_db.clone());
886                            storage
887                                .get_state(&signed_event.event_message.data.get_prefix())
888                                .ok_or(Error::MissingDelegatingEventError)?
889                                .delegator
890                                .ok_or(Error::MissingDelegatingEventError)
891                        }
892                        _ => {
893                            // not delegated event
894                            Err(Error::SemanticError("Not delegated event".to_string()))
895                        }
896                    }?;
897                    self.delegation_escrow
898                        .add(&delegators_id, signed_event.clone())?;
899                }
900            }
901            _ => return Err(Error::SemanticError("Wrong notification".into())),
902        }
903
904        Ok(())
905    }
906}
907
908impl<D: EventDatabase> DelegationEscrow<D> {
909    pub fn process_delegation_events(
910        &self,
911        bus: &NotificationBus,
912        delegator_id: &IdentifierPrefix,
913        anchored_seals: Vec<EventSeal>,
914        potential_delegator_seal: SourceSeal,
915    ) -> Result<(), Error> {
916        if let Some(esc) = self.delegation_escrow.get(delegator_id) {
917            for event in esc {
918                let event_digest = event.event_message.digest()?;
919                let seal = anchored_seals.iter().find(|seal| {
920                    seal.event_digest() == event_digest
921                        && seal.sn == event.event_message.data.get_sn()
922                        && seal.prefix == event.event_message.data.get_prefix()
923                });
924                let delegated_event = match seal {
925                    Some(_s) => SignedEventMessage {
926                        delegator_seal: Some(potential_delegator_seal.clone()),
927                        ..event.clone()
928                    },
929                    None => event.clone(),
930                };
931                let validator = EventValidator::new(self.sled_db.clone(), self.db.clone());
932                match validator.validate_event(&delegated_event) {
933                    Ok(_) => {
934                        // add to kel
935                        let child_id = event.event_message.data.get_prefix();
936                        self.db
937                            .add_kel_finalized_event(delegated_event.clone(), &child_id)
938                            .map_err(|_| Error::DbError)?;
939                        // remove from escrow
940                        self.delegation_escrow.remove(delegator_id, &event)?;
941                        bus.notify(&Notification::KeyEventAdded(event))?;
942                        // stop processing the escrow if kel was updated. It needs to start again.
943                        break;
944                    }
945                    Err(Error::SignatureVerificationError) => {
946                        // remove from escrow
947                        self.delegation_escrow.remove(delegator_id, &event)?;
948                    }
949                    Err(Error::NotEnoughReceiptsError) => {
950                        // remove from escrow
951                        self.delegation_escrow.remove(delegator_id, &event)?;
952                        bus.notify(&Notification::PartiallyWitnessed(delegated_event))?;
953                    }
954                    Err(_e) => (), // keep in escrow,
955                }
956            }
957        };
958
959        Ok(())
960    }
961}