Skip to main content

keri_controller/identifier/mechanics/
broadcast.rs

1use futures::future::join_all;
2use keri_core::{
3    actor::prelude::SelfAddressingIdentifier,
4    database::EventDatabase,
5    event_message::signed_event_message::{Message, Notice},
6    oobi::Scheme,
7    prefix::IdentifierPrefix,
8};
9
10use crate::{communication::SendingError, identifier::Identifier};
11
12#[derive(thiserror::Error, Debug)]
13pub enum BroadcastingError {
14    #[error("Sending error while broadcasting events: {0}")]
15    SendingError(#[from] SendingError),
16    #[error("There's no event of digest: {digest}")]
17    MissingEvent { digest: SelfAddressingIdentifier },
18    #[error("Cache saving error")]
19    CacheSavingError,
20}
21
22impl Identifier {
23    /// Send new receipts obtained via [`Self::finalize_query`] to specified witnesses.
24    /// Returns number of new receipts sent per witness or first error.
25    pub async fn broadcast_receipts(
26        &mut self,
27        dest_wit_ids: &[IdentifierPrefix],
28    ) -> Result<(), BroadcastingError> {
29        for witness in dest_wit_ids {
30            #[cfg(feature = "query_cache")]
31            let sn = self
32                .query_cache
33                .load_published_receipts_sn(witness)
34                .map_err(|_| BroadcastingError::CacheSavingError)?;
35
36            #[cfg(not(feature = "query_cache"))]
37            let sn = 0;
38
39            let receipts_to_publish = self.known_events.storage.events_db.get_receipts_nt(
40                keri_core::database::QueryParameters::Range {
41                    id: self.id.clone(),
42                    start: sn as u64,
43                    limit: 10,
44                },
45            );
46
47            if let Some(receipts_to_publish) = receipts_to_publish {
48                let mut max_sn = 0;
49                let receipts_futures = receipts_to_publish.map(|rct| {
50                    max_sn = rct.body.sn.max(max_sn);
51                    self.communication.send_message_to(
52                        witness.clone(),
53                        Scheme::Http,
54                        Message::Notice(Notice::NontransferableRct(rct.clone())),
55                    )
56                });
57                join_all(receipts_futures).await;
58
59                #[cfg(feature = "query_cache")]
60                self.query_cache
61                    .update_last_published_receipt(witness, max_sn)
62                    .unwrap();
63            }
64        }
65
66        Ok(())
67    }
68}
69
70#[cfg(test)]
71mod test {
72    use std::{collections::HashMap, sync::Arc};
73
74    use keri_core::{
75        event::event_data::EventData,
76        event_message::signed_event_message::Notice,
77        oobi::LocationScheme,
78        prefix::{BasicPrefix, IdentifierPrefix, SelfSigningPrefix},
79        signer::{CryptoBox, KeyManager},
80        transport::test::{TestActorMap, TestTransport},
81    };
82    use tempfile::Builder;
83    use url::Host;
84    use witness::{WitnessEscrowConfig, WitnessListener};
85
86    use crate::{config::ControllerConfig, controller::Controller, error::ControllerError};
87
88    #[async_std::test]
89    async fn test_2_wit() -> Result<(), ControllerError> {
90        use url::Url;
91        let root = Builder::new().prefix("test-db").tempdir().unwrap();
92
93        let witness1 = {
94            let seed = "AK8F6AAiYDpXlWdj2O5F5-6wNCCNJh2A4XOlqwR_HwwH";
95            let witness_root = Builder::new().prefix("test-wit1-db").tempdir().unwrap();
96            Arc::new(
97                WitnessListener::setup(
98                    url::Url::parse("http://witness1/").unwrap(),
99                    witness_root.path(),
100                    Some(seed.to_string()),
101                    WitnessEscrowConfig::default(),
102                )
103                .unwrap(),
104            )
105        };
106        let witness2 = {
107            let seed = "AJZ7ZLd7unQ4IkMUwE69NXcvDO9rrmmRH_Xk3TPu9BpP";
108            let witness_root = Builder::new().prefix("test-wit2-db").tempdir().unwrap();
109            Arc::new(
110                WitnessListener::setup(
111                    url::Url::parse("http://witness2/").unwrap(),
112                    witness_root.path(),
113                    Some(seed.to_string()),
114                    WitnessEscrowConfig::default(),
115                )
116                .unwrap(),
117            )
118        };
119
120        let wit1_id = witness1.get_prefix();
121        let wit1_location = LocationScheme {
122            eid: IdentifierPrefix::Basic(wit1_id.clone()),
123            scheme: keri_core::oobi::Scheme::Http,
124            url: Url::parse("http://witness1/").unwrap(),
125        };
126        let wit2_id = witness2.get_prefix();
127        let wit2_location = LocationScheme {
128            eid: IdentifierPrefix::Basic(wit2_id.clone()),
129            scheme: keri_core::oobi::Scheme::Http,
130            url: Url::parse("http://witness2/").unwrap(),
131        };
132
133        let transport = {
134            let mut actors: TestActorMap = HashMap::new();
135            actors.insert((Host::Domain("witness1".to_string()), 80), witness1.clone());
136            actors.insert((Host::Domain("witness2".to_string()), 80), witness2.clone());
137            TestTransport::new(actors)
138        };
139
140        let controller = Arc::new(Controller::new(ControllerConfig {
141            db_path: root.path().to_owned(),
142            transport: Box::new(transport.clone()),
143            ..Default::default()
144        })?);
145
146        let km1 = CryptoBox::new()?;
147
148        let pk = BasicPrefix::Ed25519(km1.public_key());
149        let npk = BasicPrefix::Ed25519(km1.next_public_key());
150
151        let icp_event = controller
152            .incept(
153                vec![pk],
154                vec![npk],
155                vec![wit1_location.clone(), wit2_location.clone()],
156                2,
157            )
158            .await
159            .unwrap();
160        let signature = SelfSigningPrefix::Ed25519Sha512(km1.sign(icp_event.as_bytes())?);
161
162        let mut identifier = controller.finalize_incept(icp_event.as_bytes(), &signature)?;
163
164        assert_eq!(identifier.notify_witnesses().await.unwrap(), 1);
165        assert!(matches!(
166            witness1.witness_data.event_storage.get_kel_messages_with_receipts_all(&identifier.id)?.unwrap().as_slice(),
167            [Notice::Event(evt)]
168            if matches!(evt.event_message.data.event_data, EventData::Icp(_))
169                && matches!(evt.witness_receipts.as_ref().unwrap().len(), 1)
170        ));
171
172        // Querying mailbox to get receipts
173        for qry in identifier.query_mailbox(&identifier.id, &[wit1_id.clone(), wit2_id.clone()])? {
174            let signature = SelfSigningPrefix::Ed25519Sha512(km1.sign(&qry.encode()?)?);
175            let act = identifier
176                .finalize_query_mailbox(vec![(qry, signature)])
177                .await?;
178            assert!(act.is_empty());
179        }
180
181        assert_eq!(identifier.notify_witnesses().await?, 0);
182        // Check if receipts was broadcasted
183        assert!(matches!(
184            witness1.witness_data.event_storage.get_kel_messages_with_receipts_all(&identifier.id)?.unwrap().as_slice(),
185            [Notice::Event(evt)]
186            if matches!(evt.event_message.data.event_data, EventData::Icp(_))
187                && matches!(evt.witness_receipts.as_ref().unwrap().len(), 2)
188        ));
189
190        Ok(())
191    }
192}