1use std::{fmt::Debug, sync::Arc, time::Duration};
2
3use said::SelfAddressingIdentifier;
4
5use super::{
6 event_storage::EventStorage,
7 notification::{JustNotification, Notification, NotificationBus, Notifier},
8 validator::EventValidator,
9};
10use crate::{
11 database::{
12 escrow::{Escrow, EscrowDb},
13 sled::SledEventDatabase,
14 EventDatabase,
15 },
16 error::Error,
17 event::{
18 event_data::EventData,
19 sections::seal::{EventSeal, Seal, SourceSeal},
20 KeyEvent,
21 },
22 event_message::{
23 msg::KeriEvent,
24 signature::Nontransferable,
25 signed_event_message::{
26 SignedEventMessage, SignedNontransferableReceipt, SignedTransferableReceipt,
27 },
28 },
29 prefix::{BasicPrefix, IdentifierPrefix, SelfSigningPrefix},
30};
31#[cfg(feature = "query")]
32use crate::{
33 processor::validator::{MoreInfoError, VerificationError},
34 query::reply_event::ReplyRoute,
35};
36
37#[derive(Debug, Clone)]
38pub struct EscrowConfig {
39 pub out_of_order_timeout: Duration,
40 pub partially_signed_timeout: Duration,
41 pub partially_witnessed_timeout: Duration,
42 pub trans_receipt_timeout: Duration,
43 pub delegation_timeout: Duration,
44}
45
46impl Default for EscrowConfig {
47 fn default() -> Self {
48 Self {
49 out_of_order_timeout: Duration::from_secs(60),
50 partially_signed_timeout: Duration::from_secs(60),
51 partially_witnessed_timeout: Duration::from_secs(60),
52 trans_receipt_timeout: Duration::from_secs(60),
53 delegation_timeout: Duration::from_secs(60),
54 }
55 }
56}
57
58pub fn default_escrow_bus<D: EventDatabase + Send + Sync + 'static>(
59 event_db: Arc<D>,
60 sled_db: Arc<SledEventDatabase>,
61 escrow_db: Arc<EscrowDb>,
62 escrow_config: EscrowConfig,
63) -> (
64 NotificationBus,
65 (
66 Arc<OutOfOrderEscrow<D>>,
67 Arc<PartiallySignedEscrow<D>>,
68 Arc<PartiallyWitnessedEscrow<D>>,
69 Arc<DelegationEscrow<D>>,
70 ),
71) {
72 let mut bus = NotificationBus::new();
73
74 let ooo_escrow = Arc::new(OutOfOrderEscrow::new(
76 event_db.clone(),
77 sled_db.clone(),
78 escrow_db.clone(),
79 escrow_config.out_of_order_timeout,
80 ));
81 bus.register_observer(
82 ooo_escrow.clone(),
83 vec![
84 JustNotification::OutOfOrder,
85 JustNotification::KeyEventAdded,
86 ],
87 );
88
89 let ps_escrow = Arc::new(PartiallySignedEscrow::new(
90 event_db.clone(),
91 sled_db.clone(),
92 escrow_db.clone(),
93 escrow_config.partially_signed_timeout,
94 ));
95 bus.register_observer(ps_escrow.clone(), vec![JustNotification::PartiallySigned]);
96
97 let pw_escrow = Arc::new(PartiallyWitnessedEscrow::new(
98 event_db.clone(),
99 sled_db.clone(),
100 escrow_db.clone(),
101 escrow_config.partially_witnessed_timeout,
102 ));
103 bus.register_observer(
104 pw_escrow.clone(),
105 vec![
106 JustNotification::PartiallyWitnessed,
107 JustNotification::ReceiptOutOfOrder,
108 ],
109 );
110
111 bus.register_observer(
112 Arc::new(TransReceiptsEscrow::new(
113 event_db.clone(),
114 sled_db.clone(),
115 escrow_db.clone(),
116 escrow_config.trans_receipt_timeout,
117 )),
118 vec![
119 JustNotification::KeyEventAdded,
120 JustNotification::TransReceiptOutOfOrder,
121 ],
122 );
123
124 let delegation_escrow = Arc::new(DelegationEscrow::new(
125 event_db,
126 sled_db,
127 escrow_db,
128 escrow_config.delegation_timeout,
129 ));
130 bus.register_observer(
131 delegation_escrow.clone(),
132 vec![
133 JustNotification::MissingDelegatingEvent,
134 JustNotification::KeyEventAdded,
135 ],
136 );
137
138 (bus, (ooo_escrow, ps_escrow, pw_escrow, delegation_escrow))
139}
140
141pub struct OutOfOrderEscrow<D: EventDatabase> {
142 db: Arc<D>,
143 sled_db: Arc<SledEventDatabase>,
144 pub escrowed_out_of_order: Escrow<SignedEventMessage>,
145}
146
147impl<D: EventDatabase> OutOfOrderEscrow<D> {
148 pub fn new(
149 db: Arc<D>,
150 sled_db: Arc<SledEventDatabase>,
151 escrow_db: Arc<EscrowDb>,
152 duration: Duration,
153 ) -> Self {
154 let escrow = Escrow::new(b"ooes", duration, escrow_db);
155 Self {
156 db,
157 sled_db,
158 escrowed_out_of_order: escrow,
159 }
160 }
161
162 pub fn get_event_by_sn_and_digest(
163 &self,
164 sn: u64,
165 id: &IdentifierPrefix,
166 event_digest: &SelfAddressingIdentifier,
167 ) -> Option<SignedEventMessage> {
168 self.escrowed_out_of_order.get(id).and_then(|mut events| {
169 events.find(|event| {
170 event.event_message.data.sn == sn
171 && &event.event_message.data.prefix == id
172 && event.event_message.digest().ok().as_ref() == Some(event_digest)
173 })
174 })
175 }
176}
177impl<D: EventDatabase> Notifier for OutOfOrderEscrow<D> {
178 fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
179 match notification {
180 Notification::KeyEventAdded(ev_message) => {
181 let id = ev_message.event_message.data.get_prefix();
182 self.process_out_of_order_events(bus, &id)?;
183 }
184 Notification::OutOfOrder(signed_event) => {
185 if !signed_event.signatures.is_empty() {
187 let id = signed_event.event_message.data.get_prefix();
188 self.escrowed_out_of_order.add(&id, signed_event.clone())?;
189 }
190 }
191 _ => return Err(Error::SemanticError("Wrong notification".into())),
192 }
193
194 Ok(())
195 }
196}
197
198impl<D: EventDatabase> OutOfOrderEscrow<D> {
199 pub fn process_out_of_order_events(
200 &self,
201 bus: &NotificationBus,
202 id: &IdentifierPrefix,
203 ) -> Result<(), Error> {
204 if let Some(esc) = self.escrowed_out_of_order.get(id) {
205 for event in esc {
206 let validator = EventValidator::new(self.sled_db.clone(), self.db.clone());
207 match validator.validate_event(&event) {
208 Ok(_) => {
209 self.db
211 .add_kel_finalized_event(event.clone(), id)
212 .map_err(|_| Error::DbError)?;
213 self.escrowed_out_of_order.remove(id, &event)?;
215 bus.notify(&Notification::KeyEventAdded(event))?;
216 break;
218 }
219 Err(Error::SignatureVerificationError) => {
220 self.escrowed_out_of_order.remove(id, &event)?;
222 }
223 Err(_e) => (), }
225 }
226 };
227
228 Ok(())
229 }
230}
231
232pub struct PartiallySignedEscrow<D: EventDatabase> {
233 db: Arc<D>,
234 old_db: Arc<SledEventDatabase>,
235 pub escrowed_partially_signed: Escrow<SignedEventMessage>,
236}
237
238impl<D: EventDatabase> PartiallySignedEscrow<D> {
239 pub fn new(
240 db: Arc<D>,
241 sled_db: Arc<SledEventDatabase>,
242 escrow_db: Arc<EscrowDb>,
243 duration: Duration,
244 ) -> Self {
245 let escrow = Escrow::new(b"pses", duration, escrow_db);
246 Self {
247 db,
248 old_db: sled_db,
249 escrowed_partially_signed: escrow,
250 }
251 }
252}
253
254impl<D: EventDatabase> PartiallySignedEscrow<D> {
255 pub fn get_partially_signed_for_event(
256 &self,
257 event: KeriEvent<KeyEvent>,
258 ) -> Option<impl DoubleEndedIterator<Item = SignedEventMessage>> {
259 let id = event.data.get_prefix();
260 self.escrowed_partially_signed
261 .get(&id)
262 .map(|events| events.filter(move |ev| ev.event_message == event))
263 }
264
265 fn remove_partially_signed(&self, event: &KeriEvent<KeyEvent>) -> Result<(), Error> {
266 let id = event.data.get_prefix();
267 self.escrowed_partially_signed.get(&id).map(|events| {
268 events
269 .filter(|ev| &ev.event_message == event)
270 .try_for_each(|ev| self.escrowed_partially_signed.remove(&id, &ev))
271 });
272 Ok(())
273 }
274}
275impl<D: EventDatabase> Notifier for PartiallySignedEscrow<D> {
276 fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
277 match notification {
278 Notification::PartiallySigned(ev) => {
279 if ev.signatures.is_empty() {
280 Ok(())
282 } else {
283 self.process_partially_signed_events(bus, ev)
284 }
285 }
286 _ => Err(Error::SemanticError("Wrong notification".into())),
287 }
288 }
289}
290
291impl<D: EventDatabase> PartiallySignedEscrow<D> {
292 pub fn process_partially_signed_events(
293 &self,
294 bus: &NotificationBus,
295 signed_event: &SignedEventMessage,
296 ) -> Result<(), Error> {
297 let id = signed_event.event_message.data.get_prefix();
298 if let Some(esc) = self
299 .escrowed_partially_signed
300 .get(&id)
301 .map(|events| events.filter(|event| event.event_message == signed_event.event_message))
302 {
303 let mut signatures = esc.flat_map(|ev| ev.signatures).collect::<Vec<_>>();
304 let signatures_from_event = signed_event.signatures.clone();
305 let without_duplicates = signatures_from_event
306 .into_iter()
307 .filter(|sig| !signatures.contains(sig))
308 .collect::<Vec<_>>();
309
310 signatures.append(&mut without_duplicates.clone());
311
312 let new_event = SignedEventMessage {
313 signatures,
314 ..signed_event.to_owned()
315 };
316
317 let validator = EventValidator::new(self.old_db.clone(), self.db.clone());
318 match validator.validate_event(&new_event) {
319 Ok(_) => {
320 self.db
322 .add_kel_finalized_event(new_event.clone(), &id)
323 .unwrap_or_default();
324 self.remove_partially_signed(&new_event.event_message)?;
326 bus.notify(&Notification::KeyEventAdded(new_event))?;
327 }
328 Err(Error::NotEnoughReceiptsError) => {
329 self.remove_partially_signed(&new_event.event_message)?;
331 bus.notify(&Notification::PartiallyWitnessed(new_event))?;
332 }
333 Err(Error::MissingDelegatingEventError)
334 | Err(Error::MissingDelegatorSealError(_)) => {
335 self.remove_partially_signed(&new_event.event_message)?;
337 bus.notify(&Notification::MissingDelegatingEvent(new_event))?;
338 }
339 Err(Error::SignatureVerificationError) => {
340 }
342 Err(Error::NotEnoughSigsError) => {
343 let to_add = SignedEventMessage {
345 signatures: without_duplicates,
346 ..signed_event.to_owned()
347 };
348 self.escrowed_partially_signed.add(&id, to_add)?;
349 }
350 Err(_e) => {
351 }
353 }
354 } else {
355 self.escrowed_partially_signed
356 .add(&id, signed_event.clone())?;
357 };
358
359 Ok(())
360 }
361}
362
363pub struct PartiallyWitnessedEscrow<D: EventDatabase> {
366 db: Arc<D>,
367 old_db: Arc<SledEventDatabase>,
368 pub(crate) escrowed_partially_witnessed: Escrow<SignedEventMessage>,
369 pub(crate) escrowed_nontranferable_receipts: Escrow<SignedNontransferableReceipt>,
370}
371
372impl<D: EventDatabase> PartiallyWitnessedEscrow<D> {
373 pub fn new(
374 db: Arc<D>,
375 old_db: Arc<SledEventDatabase>,
376 escrow_db: Arc<EscrowDb>,
377 duration: Duration,
378 ) -> Self {
379 Self {
380 db,
381 old_db,
382 escrowed_partially_witnessed: Escrow::new(b"pwes", duration, escrow_db.clone()),
383 escrowed_nontranferable_receipts: Escrow::new(b"ures", duration, escrow_db.clone()),
384 }
385 }
386
387 pub fn get_event_by_sn_and_digest(
390 &self,
391 sn: u64,
392 id: &IdentifierPrefix,
393 event_digest: &SelfAddressingIdentifier,
394 ) -> Option<SignedEventMessage> {
395 self.escrowed_partially_witnessed
396 .get(id)
397 .and_then(|mut events| {
398 events.find(|event| {
399 event.event_message.data.sn == sn
400 && &event.event_message.data.prefix == id
401 && event.event_message.digest().ok().as_ref() == Some(event_digest)
402 })
403 })
404 }
405
406 fn get_escrowed_receipts(
407 &self,
408 id: &IdentifierPrefix,
409 sn: u64,
410 digest: &SelfAddressingIdentifier,
411 ) -> Option<Vec<SignedNontransferableReceipt>> {
412 self.escrowed_nontranferable_receipts.get(id).map(|r| {
413 r.filter(|rct| rct.body.sn == sn && &rct.body.receipted_event_digest == digest)
414 .collect()
416 })
417 }
418
419 pub fn get_partially_witnessed_events(&self) -> Vec<SignedEventMessage> {
420 match self.escrowed_partially_witnessed.get_all() {
421 Some(events) => events.collect(),
422 None => vec![],
423 }
424 }
425
426 fn escrow_receipt(
428 &self,
429 receipt: SignedNontransferableReceipt,
430 bus: &NotificationBus,
431 ) -> Result<(), Error> {
432 if receipt.signatures.is_empty() {
433 Ok(())
435 } else {
436 let id = &receipt.body.prefix;
437 self.escrowed_nontranferable_receipts
438 .add(id, receipt.clone())?;
439 bus.notify(&Notification::ReceiptEscrowed)
440 }
441 }
442
443 fn accept_receipts_for(&self, event: &SignedEventMessage) -> Result<(), Error> {
444 let id = event.event_message.data.get_prefix();
445 Ok(self
446 .get_escrowed_receipts(
447 &id,
448 event.event_message.data.get_sn(),
449 &event.event_message.digest()?,
450 )
451 .unwrap_or_default()
452 .into_iter()
453 .try_for_each(|receipt| {
454 self.escrowed_nontranferable_receipts
455 .remove(&id, &receipt)
456 .unwrap();
457 self.db.add_receipt_nt(receipt.clone(), &id)
458 })
459 .unwrap_or_default())
460 }
461
462 fn get_receipt_couplets(
464 &self,
465 rct: &SignedNontransferableReceipt,
466 witnesses: &[BasicPrefix],
467 ) -> Result<Vec<(BasicPrefix, SelfSigningPrefix)>, Error> {
468 let (mut indexed, mut couplets) = (vec![], vec![]);
469 rct.signatures.iter().for_each(|signature| match signature {
470 Nontransferable::Indexed(indexed_sigs) => indexed.append(&mut indexed_sigs.clone()),
471 Nontransferable::Couplet(couplets_sigs) => couplets.append(&mut couplets_sigs.clone()),
472 });
473
474 let indexes: Result<Vec<(BasicPrefix, SelfSigningPrefix)>, Error> = indexed
475 .iter()
476 .map(|inx| -> Result<_, _> {
477 Ok((
478 witnesses
479 .get(inx.index.current() as usize)
480 .ok_or_else(|| Error::SemanticError("No matching witness prefix".into()))?
481 .clone(),
482 inx.signature.to_owned(),
483 ))
484 })
485 .collect();
486
487 Ok(couplets.into_iter().chain(indexes?).collect())
488 }
489
490 pub fn validate_receipt(
493 &self,
494 rct: &SignedNontransferableReceipt,
495 receipted_event: &SignedEventMessage,
496 witnesses: &[BasicPrefix],
497 ) -> Result<(), Error> {
498 let serialized_event = receipted_event.event_message.encode()?;
500 self.get_receipt_couplets(rct, witnesses)?
501 .into_iter()
502 .try_for_each(|(witness, signature)| {
503 if witness.verify(&serialized_event, &signature)? {
504 Ok(())
505 } else {
506 Err(Error::SignatureVerificationError)
507 }
508 })
509 .map_err(|e| {
510 match self
512 .escrowed_nontranferable_receipts
513 .remove(&rct.body.prefix, rct)
514 {
515 Ok(_) => e,
516 Err(e) => e.into(),
517 }
518 })
519 }
520
521 pub fn validate_partialy_witnessed(
522 &self,
523 receipted_event: &SignedEventMessage,
524 additional_receipt: Option<SignedNontransferableReceipt>,
525 ) -> Result<(), Error> {
526 let storage = EventStorage::new(self.db.clone(), self.old_db.clone());
527 let id = receipted_event.event_message.data.get_prefix();
528 let sn = receipted_event.event_message.data.get_sn();
529 let digest = receipted_event.event_message.digest()?;
530 let new_state = storage
531 .get_state(&id)
532 .unwrap_or_default()
533 .apply(receipted_event)?;
534
535 if let Some(ref receipt) = additional_receipt {
537 let couplets =
538 self.get_receipt_couplets(receipt, &new_state.witness_config.witnesses)?;
539 couplets.iter().try_for_each(|(bp, sp)| {
540 bp.verify(&receipted_event.event_message.encode()?, sp)?
541 .then_some(())
542 .ok_or(Error::ReceiptVerificationError)
543 })?;
544 }
545 new_state
547 .current
548 .verify(
549 &receipted_event.event_message.encode()?,
550 &receipted_event.signatures,
551 )?
552 .then_some(())
553 .ok_or(Error::SignatureVerificationError)?;
554
555 let (couplets, indexed) = self
557 .get_escrowed_receipts(&id, sn, &digest)
558 .unwrap_or_default()
559 .into_iter()
560 .filter(|rct| {
561 let rr = self.validate_receipt(
562 rct,
563 receipted_event,
564 &new_state.witness_config.witnesses,
565 );
566 rr.is_ok()
567 })
568 .chain(if let Some(rct) = additional_receipt {
569 vec![rct]
570 } else {
571 Vec::default()
572 })
573 .fold(
574 (vec![], vec![]),
575 |(mut all_couplets, mut all_indexed), snr| {
576 snr.signatures.into_iter().for_each(|signature| {
577 match signature {
578 Nontransferable::Indexed(indexed_sigs) => {
579 all_indexed.append(&mut indexed_sigs.clone())
580 }
581 Nontransferable::Couplet(couplets_sigs) => {
582 all_couplets.append(&mut couplets_sigs.clone())
583 }
584 };
585 });
586 (all_couplets, all_indexed)
587 },
588 );
589 new_state
591 .witness_config
592 .enough_receipts(couplets, indexed)?
593 .then_some(())
594 .ok_or(Error::NotEnoughReceiptsError)
595 }
596}
597
598impl<D: EventDatabase> Notifier for PartiallyWitnessedEscrow<D> {
599 fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
600 match notification {
601 Notification::ReceiptOutOfOrder(ooo) => {
602 let sn = ooo.body.sn;
605 let id = ooo.body.prefix.clone();
606 match self.get_event_by_sn_and_digest(sn, &id, &ooo.body.receipted_event_digest) {
608 None => self.escrow_receipt(ooo.clone(), bus),
609 Some(receipted_event) => {
610 match self
612 .validate_partialy_witnessed(&receipted_event, Some(ooo.to_owned()))
613 {
614 Ok(_) => {
615 self.db
617 .add_kel_finalized_event(receipted_event.clone(), &id)
618 .map_err(|_| Error::DbError)?;
619 self.escrowed_partially_witnessed
621 .remove(&id, &receipted_event)?;
622 self.accept_receipts_for(&receipted_event)?;
624 self.db
625 .add_receipt_nt(ooo.to_owned(), &id)
626 .map_err(|_| Error::DbError)?;
627 bus.notify(&Notification::KeyEventAdded(receipted_event))?;
628 }
629 Err(Error::SignatureVerificationError) => {
630 self.escrowed_partially_witnessed
632 .remove(&id, &receipted_event)?;
633 }
634 Err(Error::ReceiptVerificationError) => {
635 }
637 Err(_e) => {
639 self.escrow_receipt(ooo.clone(), bus)?;
640 }
641 }
642 Ok(())
643 }
644 }
645 }
646 Notification::PartiallyWitnessed(signed_event) => {
647 if !signed_event.signatures.is_empty() {
649 let id = signed_event.event_message.data.get_prefix();
650 match self.validate_partialy_witnessed(signed_event, None) {
651 Ok(_) => {
652 self.escrowed_partially_witnessed
653 .add(&id, signed_event.clone())?;
654 }
655 Err(Error::SignatureVerificationError) => (),
656 Err(_) => {
657 self.escrowed_partially_witnessed
658 .add(&id, signed_event.clone())?;
659 }
660 };
661 Ok(())
662 } else {
663 Ok(())
664 }
665 }
666 _ => Err(Error::SemanticError("Wrong notification".into())),
667 }
668 }
669}
670
671pub struct TransReceiptsEscrow<D: EventDatabase> {
672 db: Arc<D>,
673 old_db: Arc<SledEventDatabase>,
674 pub(crate) escrowed_trans_receipts: Escrow<SignedTransferableReceipt>,
675}
676impl<D: EventDatabase> TransReceiptsEscrow<D> {
677 pub fn new(
678 db: Arc<D>,
679 sled_db: Arc<SledEventDatabase>,
680 escrow_db: Arc<EscrowDb>,
681 duration: Duration,
682 ) -> Self {
683 Self {
684 db,
685 old_db: sled_db,
686 escrowed_trans_receipts: Escrow::new(b"vres", duration, escrow_db.clone()),
687 }
688 }
689}
690impl<D: EventDatabase> Notifier for TransReceiptsEscrow<D> {
691 fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
692 match notification {
693 Notification::KeyEventAdded(event) => {
694 self.process_t_receipts_escrow(&event.event_message.data.get_prefix(), bus)?;
695 }
696 Notification::TransReceiptOutOfOrder(receipt) => {
697 if !receipt.signatures.is_empty() {
699 let id = receipt.validator_seal.prefix.clone();
700 self.escrowed_trans_receipts.add(&id, receipt.to_owned())?;
701 }
702 }
703 _ => return Err(Error::SemanticError("Wrong notification".into())),
704 }
705 Ok(())
706 }
707}
708impl<D: EventDatabase> TransReceiptsEscrow<D> {
709 pub fn process_t_receipts_escrow(
710 &self,
711 id: &IdentifierPrefix,
712 bus: &NotificationBus,
713 ) -> Result<(), Error> {
714 if let Some(esc) = self.escrowed_trans_receipts.get(id) {
715 for timestamped_receipt in esc {
716 let validator = EventValidator::new(self.old_db.clone(), self.db.clone());
717 match validator.validate_validator_receipt(×tamped_receipt) {
718 Ok(_) => {
719 self.db
721 .add_receipt_t(timestamped_receipt.clone(), id)
722 .map_err(|_| Error::DbError)?;
723 self.escrowed_trans_receipts
725 .remove(id, ×tamped_receipt)?;
726 bus.notify(&Notification::ReceiptAccepted)?;
727 }
728 Err(Error::SignatureVerificationError) => {
729 self.escrowed_trans_receipts
731 .remove(id, ×tamped_receipt)?;
732 }
733 Err(e) => return Err(e), }
735 }
736 };
737
738 Ok(())
739 }
740}
741
742#[cfg(feature = "query")]
743#[derive(Clone)]
744pub struct ReplyEscrow<D: EventDatabase> {
745 events_db: Arc<D>,
746 escrow_db: Arc<SledEventDatabase>,
747}
748
749#[cfg(feature = "query")]
750impl<D: EventDatabase> ReplyEscrow<D> {
751 pub fn new(db: Arc<SledEventDatabase>, events_db: Arc<D>) -> Self {
752 Self {
753 escrow_db: db,
754 events_db,
755 }
756 }
757}
758#[cfg(feature = "query")]
759impl<D: EventDatabase> Notifier for ReplyEscrow<D> {
760 fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
761 match notification {
762 Notification::KsnOutOfOrder(rpy) => {
763 if let ReplyRoute::Ksn(_id, ksn) = rpy.reply.get_route() {
764 self.escrow_db
765 .add_escrowed_reply(rpy.clone(), &ksn.state.prefix)?;
766 };
767 Ok(())
768 }
769 &Notification::KeyEventAdded(_) => self.process_reply_escrow(bus),
770 _ => Ok(()),
771 }
772 }
773}
774
775#[cfg(feature = "query")]
776impl<D: EventDatabase> ReplyEscrow<D> {
777 pub fn process_reply_escrow(&self, _bus: &NotificationBus) -> Result<(), Error> {
778 use crate::query::QueryError;
779
780 if let Some(esc) = self.escrow_db.get_all_escrowed_replys() {
781 for sig_rep in esc {
782 let validator = EventValidator::new(self.escrow_db.clone(), self.events_db.clone());
783 let id = if let ReplyRoute::Ksn(_id, ksn) = sig_rep.reply.get_route() {
784 Ok(ksn.state.prefix)
785 } else {
786 Err(Error::SemanticError("Wrong event type".into()))
787 }?;
788 match validator.process_signed_ksn_reply(&sig_rep) {
789 Ok(_) => {
790 self.escrow_db.remove_escrowed_reply(&id, &sig_rep)?;
791 self.escrow_db.update_accepted_reply(sig_rep, &id)?;
792 }
793 Err(Error::SignatureVerificationError)
794 | Err(Error::QueryError(QueryError::StaleRpy)) => {
795 self.escrow_db.remove_escrowed_reply(&id, &sig_rep)?;
797 }
798 Err(Error::EventOutOfOrderError)
799 | Err(Error::VerificationError(VerificationError::MoreInfo(
800 MoreInfoError::EventNotFound(_),
801 ))) => (), Err(e) => return Err(e),
803 };
804 }
805 };
806 Ok(())
807 }
808}
809
810pub struct DelegationEscrow<D: EventDatabase> {
812 db: Arc<D>,
813 sled_db: Arc<SledEventDatabase>,
814 pub delegation_escrow: Escrow<SignedEventMessage>,
815}
816
817impl<D: EventDatabase> DelegationEscrow<D> {
818 pub fn new(
819 db: Arc<D>,
820 sled_db: Arc<SledEventDatabase>,
821 escrow_db: Arc<EscrowDb>,
822 duration: Duration,
823 ) -> Self {
824 let escrow = Escrow::new(b"dees", duration, escrow_db);
825 Self {
826 db,
827 sled_db,
828 delegation_escrow: escrow,
829 }
830 }
831
832 pub fn get_event_by_sn_and_digest(
833 &self,
834 sn: u64,
835 delegator_id: &IdentifierPrefix,
836 event_digest: &SelfAddressingIdentifier,
837 ) -> Option<SignedEventMessage> {
838 self.delegation_escrow
839 .get(delegator_id)
840 .and_then(|mut events| {
841 events.find(|event| {
842 event.event_message.data.sn == sn
843 && event.event_message.digest().ok().as_ref() == Some(event_digest)
844 })
845 })
846 }
847}
848
849impl<D: EventDatabase> Notifier for DelegationEscrow<D> {
850 fn notify(&self, notification: &Notification, bus: &NotificationBus) -> Result<(), Error> {
851 match notification {
852 Notification::KeyEventAdded(ev_message) => {
853 let id = ev_message.event_message.data.get_prefix();
855 let anchored_data: Vec<Seal> = match &ev_message.event_message.data.event_data {
857 EventData::Icp(icp) => icp.data.clone(),
858 EventData::Rot(rot) => rot.data.clone(),
859 EventData::Ixn(ixn) => ixn.data.clone(),
860 EventData::Dip(dip) => dip.inception_data.data.clone(),
861 EventData::Drt(drt) => drt.data.clone(),
862 };
863
864 let seals: Vec<EventSeal> = anchored_data
865 .into_iter()
866 .filter_map(|seal| match seal {
867 Seal::Event(es) => Some(es),
868 _ => None,
869 })
870 .collect();
871 if !seals.is_empty() {
872 let potential_delegator_seal = SourceSeal {
873 sn: ev_message.event_message.data.get_sn(),
874 digest: ev_message.event_message.digest()?,
875 };
876 self.process_delegation_events(bus, &id, seals, potential_delegator_seal)?;
877 }
878 }
879 Notification::MissingDelegatingEvent(signed_event) => {
880 if !signed_event.signatures.is_empty() {
882 let delegators_id = match &signed_event.event_message.data.event_data {
883 EventData::Dip(dip) => Ok(dip.delegator.clone()),
884 EventData::Drt(_drt) => {
885 let storage = EventStorage::new(self.db.clone(), self.sled_db.clone());
886 storage
887 .get_state(&signed_event.event_message.data.get_prefix())
888 .ok_or(Error::MissingDelegatingEventError)?
889 .delegator
890 .ok_or(Error::MissingDelegatingEventError)
891 }
892 _ => {
893 Err(Error::SemanticError("Not delegated event".to_string()))
895 }
896 }?;
897 self.delegation_escrow
898 .add(&delegators_id, signed_event.clone())?;
899 }
900 }
901 _ => return Err(Error::SemanticError("Wrong notification".into())),
902 }
903
904 Ok(())
905 }
906}
907
908impl<D: EventDatabase> DelegationEscrow<D> {
909 pub fn process_delegation_events(
910 &self,
911 bus: &NotificationBus,
912 delegator_id: &IdentifierPrefix,
913 anchored_seals: Vec<EventSeal>,
914 potential_delegator_seal: SourceSeal,
915 ) -> Result<(), Error> {
916 if let Some(esc) = self.delegation_escrow.get(delegator_id) {
917 for event in esc {
918 let event_digest = event.event_message.digest()?;
919 let seal = anchored_seals.iter().find(|seal| {
920 seal.event_digest() == event_digest
921 && seal.sn == event.event_message.data.get_sn()
922 && seal.prefix == event.event_message.data.get_prefix()
923 });
924 let delegated_event = match seal {
925 Some(_s) => SignedEventMessage {
926 delegator_seal: Some(potential_delegator_seal.clone()),
927 ..event.clone()
928 },
929 None => event.clone(),
930 };
931 let validator = EventValidator::new(self.sled_db.clone(), self.db.clone());
932 match validator.validate_event(&delegated_event) {
933 Ok(_) => {
934 let child_id = event.event_message.data.get_prefix();
936 self.db
937 .add_kel_finalized_event(delegated_event.clone(), &child_id)
938 .map_err(|_| Error::DbError)?;
939 self.delegation_escrow.remove(delegator_id, &event)?;
941 bus.notify(&Notification::KeyEventAdded(event))?;
942 break;
944 }
945 Err(Error::SignatureVerificationError) => {
946 self.delegation_escrow.remove(delegator_id, &event)?;
948 }
949 Err(Error::NotEnoughReceiptsError) => {
950 self.delegation_escrow.remove(delegator_id, &event)?;
952 bus.notify(&Notification::PartiallyWitnessed(delegated_event))?;
953 }
954 Err(_e) => (), }
956 }
957 };
958
959 Ok(())
960 }
961}