keri_controller/
communication.rs

1use std::sync::Arc;
2
3use futures::future::join_all;
4use keri_core::{
5    actor::{error::ActorError, parse_event_stream, possible_response::PossibleResponse},
6    event_message::signed_event_message::{Message, Notice, Op, SignedEventMessage},
7    oobi::{EndRole, LocationScheme, Oobi, Scheme},
8    prefix::{BasicPrefix, IdentifierPrefix},
9    query::{
10        mailbox::SignedMailboxQuery,
11        query_event::{SignedKelQuery, SignedQueryMessage},
12    },
13    transport::{Transport, TransportError},
14};
15use teliox::{event::verifiable_event::VerifiableEvent, query::SignedTelQuery};
16
17use crate::{
18    error::ControllerError,
19    identifier::mechanics::MechanicsError,
20    known_events::{KnownEvents, OobiRetrieveError},
21};
22
23#[derive(Debug, thiserror::Error)]
24pub enum SendingError {
25    #[error("Actor doesn't have identifier {missing} oobi")]
26    WatcherDosntHaveOobi { missing: IdentifierPrefix },
27
28    #[error("Actor internal error: {0}")]
29    ActorInternalError(#[from] ActorError),
30
31    #[error("Transport error: {0}")]
32    TransportError(keri_core::transport::TransportError),
33
34    #[error("Http request error: {0}")]
35    HTTPError(#[from] reqwest::Error),
36
37    #[error(transparent)]
38    OobiError(#[from] OobiRetrieveError),
39
40    #[error("Invalid url: {0}")]
41    InvalidUrl(#[from] url::ParseError),
42}
43
44impl From<TransportError> for SendingError {
45    fn from(value: TransportError) -> Self {
46        match value {
47            TransportError::RemoteError(ActorError::NoIdentState { prefix }) => {
48                Self::WatcherDosntHaveOobi { missing: prefix }
49            }
50            TransportError::RemoteError(internal_error) => Self::ActorInternalError(internal_error),
51            e => Self::TransportError(e),
52        }
53    }
54}
55
56pub struct Communication {
57    pub events: Arc<KnownEvents>,
58    pub transport: Box<dyn Transport + Send + Sync>,
59    pub tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
60}
61
62impl Communication {
63    pub fn new(
64        known_events: Arc<KnownEvents>,
65        transport: Box<dyn Transport<ActorError> + Send + Sync>,
66        tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
67    ) -> Self {
68        Communication {
69            events: known_events,
70            transport,
71            tel_transport,
72        }
73    }
74
75    /// Make http request to get identifier's endpoints information.
76    pub async fn resolve_loc_schema(&self, lc: &LocationScheme) -> Result<(), MechanicsError> {
77        let oobis = self.transport.request_loc_scheme(lc.clone()).await?;
78        for oobi in oobis {
79            self.events.save(&Message::Op(oobi))?;
80        }
81        Ok(())
82    }
83
84    /// Make http request to get identifier's endpoints information.
85    pub async fn resolve_end_role(&self, er: &EndRole) -> Result<(), MechanicsError> {
86        let EndRole { cid, role, eid } = er.clone();
87        // TODO what if more than one
88        let loc = self
89            .events
90            .get_loc_schemas(&eid)
91            .map_err(SendingError::OobiError)?
92            .first()
93            .ok_or(SendingError::OobiError(OobiRetrieveError::MissingOobi(
94                cid.clone(),
95                None,
96            )))?
97            .clone();
98        let response = self.transport.request_end_role(loc, cid, role, eid).await?;
99
100        let msgs = parse_event_stream(response.as_ref()).map_err(|e| {
101            MechanicsError::OtherError(format!(
102                "Can't parse response while oobi resolving: {}",
103                e.to_string()
104            ))
105        })?;
106        for msg in msgs {
107            // TODO This ignore signatures. Add verification.
108            if let Message::Op(Op::Reply(signed_oobi)) = msg {
109                self.events.save_oobi(&signed_oobi)?;
110            } else {
111                self.events.save(&msg)?;
112            }
113        }
114        Ok(())
115    }
116
117    /// Make http request to get identifier's endpoints information.
118    pub async fn resolve_oobi(&self, oobi: &Oobi) -> Result<(), MechanicsError> {
119        match oobi {
120            Oobi::Location(loc) => self.resolve_loc_schema(loc).await,
121            Oobi::EndRole(er) => self.resolve_end_role(er).await,
122        }
123    }
124
125    pub async fn send_message_to(
126        &self,
127        id: IdentifierPrefix,
128        scheme: Scheme,
129        msg: Message,
130    ) -> Result<(), SendingError> {
131        let loc = self.events.find_location(&id, scheme)?;
132        self.transport.send_message(loc, msg).await?;
133        Ok(())
134    }
135
136    pub async fn send_query_to(
137        &self,
138        id: &IdentifierPrefix,
139        scheme: Scheme,
140        query: SignedKelQuery,
141    ) -> Result<PossibleResponse, SendingError> {
142        let loc = self.events.find_location(id, scheme)?;
143        Ok(self
144            .transport
145            .send_query(loc, SignedQueryMessage::KelQuery(query))
146            .await?)
147    }
148
149    pub async fn send_management_query_to(
150        &self,
151        id: &IdentifierPrefix,
152        scheme: Scheme,
153        query: SignedMailboxQuery,
154    ) -> Result<PossibleResponse, SendingError> {
155        let loc = self.events.find_location(id, scheme)?;
156        Ok(self
157            .transport
158            .send_query(loc, SignedQueryMessage::MailboxQuery(query))
159            .await?)
160    }
161
162    async fn send_oobi_to(
163        &self,
164        id: &IdentifierPrefix,
165        scheme: Scheme,
166        oobi: Oobi,
167    ) -> Result<(), SendingError> {
168        let loc = self.events.find_location(id, scheme)?;
169        self.transport.resolve_oobi(loc, oobi).await?;
170        Ok(())
171    }
172
173    /// Publish key event to witnesses
174    ///
175    ///  1. send it to all witnesses
176    ///  2. collect witness receipts and process them
177    ///  3. get processed receipts from db and send it to all witnesses
178    pub async fn publish(
179        &self,
180        witness_prefixes: Vec<BasicPrefix>,
181        message: &SignedEventMessage,
182    ) -> Result<(), MechanicsError> {
183        // Get processed receipts from database to send all of them to witnesses. It
184        // will return one receipt with all witness signatures as one attachment,
185        // not three separate receipts as in `collected_receipts`.
186        let (prefix, sn, digest) = (
187            message.event_message.data.get_prefix(),
188            message.event_message.data.get_sn(),
189            message.event_message.digest(),
190        );
191        let rcts_from_db = self
192            .events
193            .find_receipt(&prefix, sn, &digest?)?
194            .map(|rct| Message::Notice(Notice::NontransferableRct(rct)));
195
196        let messages_to_send = if let Some(receipt) = rcts_from_db {
197            vec![Message::Notice(Notice::Event(message.clone())), receipt]
198        } else {
199            vec![Message::Notice(Notice::Event(message.clone()))]
200        };
201
202        join_all(
203            itertools::iproduct!(messages_to_send, witness_prefixes).map(
204                |(message, witness_id)| {
205                    self.send_message_to(
206                        IdentifierPrefix::Basic(witness_id.clone()),
207                        Scheme::Http,
208                        message.clone(),
209                    )
210                },
211            ),
212        )
213        .await;
214
215        Ok(())
216    }
217
218    /// Sends identifier's endpoint information to identifiers's watchers.
219    // TODO use stream instead of json
220    pub async fn send_oobi_to_watcher(
221        &self,
222        id: &IdentifierPrefix,
223        oobi: &Oobi,
224    ) -> Result<(), ControllerError> {
225        for watcher in self.events.get_watchers(id)?.iter() {
226            self.send_oobi_to(watcher, Scheme::Http, oobi.clone())
227                .await?;
228        }
229
230        Ok(())
231    }
232
233    pub async fn send_tel_query(
234        &self,
235        qry: SignedTelQuery,
236        location: LocationScheme,
237    ) -> Result<String, SendingError> {
238        self.tel_transport.send_query(qry, location).await
239    }
240
241    pub async fn send_tel_event(
242        &self,
243        event: VerifiableEvent,
244        location: LocationScheme,
245    ) -> Result<(), SendingError> {
246        self.tel_transport.send_tel_event(event, location).await
247    }
248}
249
250#[async_trait::async_trait]
251pub trait IdentifierTelTransport {
252    async fn send_query(
253        &self,
254        qry: SignedTelQuery,
255        location: LocationScheme,
256    ) -> Result<String, SendingError>;
257    async fn send_tel_event(
258        &self,
259        qry: VerifiableEvent,
260        location: LocationScheme,
261    ) -> Result<(), SendingError>;
262}
263
264pub struct HTTPTelTransport;
265
266#[async_trait::async_trait]
267impl IdentifierTelTransport for HTTPTelTransport {
268    async fn send_query(
269        &self,
270        qry: SignedTelQuery,
271        location: LocationScheme,
272    ) -> Result<String, SendingError> {
273        let url = match location.scheme {
274            Scheme::Http => location.url.join("query/tel")?,
275            Scheme::Tcp => todo!(),
276        };
277        let resp = reqwest::Client::new()
278            .post(url)
279            .body(qry.to_cesr().unwrap())
280            .send()
281            .await?;
282
283        Ok(resp.text().await?)
284    }
285
286    async fn send_tel_event(
287        &self,
288        event: VerifiableEvent,
289        location: LocationScheme,
290    ) -> Result<(), SendingError> {
291        let url = match location.scheme {
292            Scheme::Http => location.url.join("process/tel")?,
293            Scheme::Tcp => todo!(),
294        };
295        let client = reqwest::Client::new();
296        let query = event.serialize().unwrap();
297        let resp = client.post(url).body(query).send().await?;
298        resp.text().await?;
299
300        Ok(())
301    }
302}