keri_controller/identifier/mechanics/
broadcast.rs1use futures::future::join_all;
2use keri_core::{
3 actor::prelude::SelfAddressingIdentifier,
4 database::EventDatabase,
5 event_message::signed_event_message::{Message, Notice},
6 oobi::Scheme,
7 prefix::IdentifierPrefix,
8};
9
10use crate::{communication::SendingError, identifier::Identifier};
11
12#[derive(thiserror::Error, Debug)]
13pub enum BroadcastingError {
14 #[error("Sending error while broadcasting events: {0}")]
15 SendingError(#[from] SendingError),
16 #[error("There's no event of digest: {digest}")]
17 MissingEvent { digest: SelfAddressingIdentifier },
18 #[error("Cache saving error")]
19 CacheSavingError,
20}
21
22impl Identifier {
23 pub async fn broadcast_receipts(
26 &mut self,
27 dest_wit_ids: &[IdentifierPrefix],
28 ) -> Result<(), BroadcastingError> {
29 for witness in dest_wit_ids {
30 #[cfg(feature = "query_cache")]
31 let sn = self
32 .query_cache
33 .load_published_receipts_sn(witness)
34 .map_err(|_| BroadcastingError::CacheSavingError)?;
35
36 #[cfg(not(feature = "query_cache"))]
37 let sn = 0;
38
39 let receipts_to_publish = self.known_events.storage.events_db.get_receipts_nt(
40 keri_core::database::QueryParameters::Range {
41 id: self.id.clone(),
42 start: sn as u64,
43 limit: 10,
44 },
45 );
46
47 if let Some(receipts_to_publish) = receipts_to_publish {
48 let mut max_sn = 0;
49 let receipts_futures = receipts_to_publish.map(|rct| {
50 max_sn = rct.body.sn.max(max_sn);
51 self.communication.send_message_to(
52 witness.clone(),
53 Scheme::Http,
54 Message::Notice(Notice::NontransferableRct(rct.clone())),
55 )
56 });
57 join_all(receipts_futures).await;
58
59 #[cfg(feature = "query_cache")]
60 self.query_cache
61 .update_last_published_receipt(witness, max_sn)
62 .unwrap();
63 }
64 }
65
66 Ok(())
67 }
68}
69
70#[cfg(test)]
71mod test {
72 use std::{collections::HashMap, sync::Arc};
73
74 use keri_core::{
75 event::event_data::EventData,
76 event_message::signed_event_message::Notice,
77 oobi::LocationScheme,
78 prefix::{BasicPrefix, IdentifierPrefix, SelfSigningPrefix},
79 signer::{CryptoBox, KeyManager},
80 transport::test::{TestActorMap, TestTransport},
81 };
82 use tempfile::Builder;
83 use url::Host;
84 use witness::{WitnessEscrowConfig, WitnessListener};
85
86 use crate::{config::ControllerConfig, controller::Controller, error::ControllerError};
87
88 #[async_std::test]
89 async fn test_2_wit() -> Result<(), ControllerError> {
90 use url::Url;
91 let root = Builder::new().prefix("test-db").tempdir().unwrap();
92
93 let witness1 = {
94 let seed = "AK8F6AAiYDpXlWdj2O5F5-6wNCCNJh2A4XOlqwR_HwwH";
95 let witness_root = Builder::new().prefix("test-wit1-db").tempdir().unwrap();
96 Arc::new(
97 WitnessListener::setup(
98 url::Url::parse("http://witness1/").unwrap(),
99 witness_root.path(),
100 Some(seed.to_string()),
101 WitnessEscrowConfig::default(),
102 )
103 .unwrap(),
104 )
105 };
106 let witness2 = {
107 let seed = "AJZ7ZLd7unQ4IkMUwE69NXcvDO9rrmmRH_Xk3TPu9BpP";
108 let witness_root = Builder::new().prefix("test-wit2-db").tempdir().unwrap();
109 Arc::new(
110 WitnessListener::setup(
111 url::Url::parse("http://witness2/").unwrap(),
112 witness_root.path(),
113 Some(seed.to_string()),
114 WitnessEscrowConfig::default(),
115 )
116 .unwrap(),
117 )
118 };
119
120 let wit1_id = witness1.get_prefix();
121 let wit1_location = LocationScheme {
122 eid: IdentifierPrefix::Basic(wit1_id.clone()),
123 scheme: keri_core::oobi::Scheme::Http,
124 url: Url::parse("http://witness1/").unwrap(),
125 };
126 let wit2_id = witness2.get_prefix();
127 let wit2_location = LocationScheme {
128 eid: IdentifierPrefix::Basic(wit2_id.clone()),
129 scheme: keri_core::oobi::Scheme::Http,
130 url: Url::parse("http://witness2/").unwrap(),
131 };
132
133 let transport = {
134 let mut actors: TestActorMap = HashMap::new();
135 actors.insert((Host::Domain("witness1".to_string()), 80), witness1.clone());
136 actors.insert((Host::Domain("witness2".to_string()), 80), witness2.clone());
137 TestTransport::new(actors)
138 };
139
140 let controller = Arc::new(Controller::new(ControllerConfig {
141 db_path: root.path().to_owned(),
142 transport: Box::new(transport.clone()),
143 ..Default::default()
144 })?);
145
146 let km1 = CryptoBox::new()?;
147
148 let pk = BasicPrefix::Ed25519(km1.public_key());
149 let npk = BasicPrefix::Ed25519(km1.next_public_key());
150
151 let icp_event = controller
152 .incept(
153 vec![pk],
154 vec![npk],
155 vec![wit1_location.clone(), wit2_location.clone()],
156 2,
157 )
158 .await
159 .unwrap();
160 let signature = SelfSigningPrefix::Ed25519Sha512(km1.sign(icp_event.as_bytes())?);
161
162 let mut identifier = controller.finalize_incept(icp_event.as_bytes(), &signature)?;
163
164 assert_eq!(identifier.notify_witnesses().await.unwrap(), 1);
165 assert!(matches!(
166 witness1.witness_data.event_storage.get_kel_messages_with_receipts_all(&identifier.id)?.unwrap().as_slice(),
167 [Notice::Event(evt)]
168 if matches!(evt.event_message.data.event_data, EventData::Icp(_))
169 && matches!(evt.witness_receipts.as_ref().unwrap().len(), 1)
170 ));
171
172 for qry in identifier.query_mailbox(&identifier.id, &[wit1_id.clone(), wit2_id.clone()])? {
174 let signature = SelfSigningPrefix::Ed25519Sha512(km1.sign(&qry.encode()?)?);
175 let act = identifier
176 .finalize_query_mailbox(vec![(qry, signature)])
177 .await?;
178 assert!(act.is_empty());
179 }
180
181 assert_eq!(identifier.notify_witnesses().await?, 0);
182 assert!(matches!(
184 witness1.witness_data.event_storage.get_kel_messages_with_receipts_all(&identifier.id)?.unwrap().as_slice(),
185 [Notice::Event(evt)]
186 if matches!(evt.event_message.data.event_data, EventData::Icp(_))
187 && matches!(evt.witness_receipts.as_ref().unwrap().len(), 2)
188 ));
189
190 Ok(())
191 }
192}