1use std::sync::Arc;
2
3use futures::future::join_all;
4use keri_core::{
5 actor::{error::ActorError, parse_event_stream, possible_response::PossibleResponse},
6 event_message::signed_event_message::{Message, Notice, Op, SignedEventMessage},
7 oobi::{EndRole, LocationScheme, Oobi, Scheme},
8 prefix::{BasicPrefix, IdentifierPrefix},
9 query::{
10 mailbox::SignedMailboxQuery,
11 query_event::{SignedKelQuery, SignedQueryMessage},
12 },
13 transport::{Transport, TransportError},
14};
15use teliox::{event::verifiable_event::VerifiableEvent, query::SignedTelQuery};
16
17use crate::{
18 error::ControllerError,
19 identifier::mechanics::MechanicsError,
20 known_events::{KnownEvents, OobiRetrieveError},
21};
22
23#[derive(Debug, thiserror::Error)]
24pub enum SendingError {
25 #[error("Actor doesn't have identifier {missing} oobi")]
26 WatcherDosntHaveOobi { missing: IdentifierPrefix },
27
28 #[error("Actor internal error: {0}")]
29 ActorInternalError(#[from] ActorError),
30
31 #[error("Transport error: {0}")]
32 TransportError(keri_core::transport::TransportError),
33
34 #[error("Http request error: {0}")]
35 HTTPError(#[from] reqwest::Error),
36
37 #[error(transparent)]
38 OobiError(#[from] OobiRetrieveError),
39
40 #[error("Invalid url: {0}")]
41 InvalidUrl(#[from] url::ParseError),
42}
43
44impl From<TransportError> for SendingError {
45 fn from(value: TransportError) -> Self {
46 match value {
47 TransportError::RemoteError(ActorError::NoIdentState { prefix }) => {
48 Self::WatcherDosntHaveOobi { missing: prefix }
49 }
50 TransportError::RemoteError(internal_error) => Self::ActorInternalError(internal_error),
51 e => Self::TransportError(e),
52 }
53 }
54}
55
56pub struct Communication {
57 pub events: Arc<KnownEvents>,
58 pub transport: Box<dyn Transport + Send + Sync>,
59 pub tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
60}
61
62impl Communication {
63 pub fn new(
64 known_events: Arc<KnownEvents>,
65 transport: Box<dyn Transport<ActorError> + Send + Sync>,
66 tel_transport: Box<dyn IdentifierTelTransport + Send + Sync>,
67 ) -> Self {
68 Communication {
69 events: known_events,
70 transport,
71 tel_transport,
72 }
73 }
74
75 pub async fn resolve_loc_schema(&self, lc: &LocationScheme) -> Result<(), MechanicsError> {
77 let oobis = self.transport.request_loc_scheme(lc.clone()).await?;
78 for oobi in oobis {
79 self.events.save(&Message::Op(oobi))?;
80 }
81 Ok(())
82 }
83
84 pub async fn resolve_end_role(&self, er: &EndRole) -> Result<(), MechanicsError> {
86 let EndRole { cid, role, eid } = er.clone();
87 let loc = self
89 .events
90 .get_loc_schemas(&eid)
91 .map_err(SendingError::OobiError)?
92 .first()
93 .ok_or(SendingError::OobiError(OobiRetrieveError::MissingOobi(
94 cid.clone(),
95 None,
96 )))?
97 .clone();
98 let response = self.transport.request_end_role(loc, cid, role, eid).await?;
99
100 let msgs = parse_event_stream(response.as_ref()).map_err(|e| {
101 MechanicsError::OtherError(format!(
102 "Can't parse response while oobi resolving: {}",
103 e.to_string()
104 ))
105 })?;
106 for msg in msgs {
107 if let Message::Op(Op::Reply(signed_oobi)) = msg {
109 self.events.save_oobi(&signed_oobi)?;
110 } else {
111 self.events.save(&msg)?;
112 }
113 }
114 Ok(())
115 }
116
117 pub async fn resolve_oobi(&self, oobi: &Oobi) -> Result<(), MechanicsError> {
119 match oobi {
120 Oobi::Location(loc) => self.resolve_loc_schema(loc).await,
121 Oobi::EndRole(er) => self.resolve_end_role(er).await,
122 }
123 }
124
125 pub async fn send_message_to(
126 &self,
127 id: IdentifierPrefix,
128 scheme: Scheme,
129 msg: Message,
130 ) -> Result<(), SendingError> {
131 let loc = self.events.find_location(&id, scheme)?;
132 self.transport.send_message(loc, msg).await?;
133 Ok(())
134 }
135
136 pub async fn send_query_to(
137 &self,
138 id: &IdentifierPrefix,
139 scheme: Scheme,
140 query: SignedKelQuery,
141 ) -> Result<PossibleResponse, SendingError> {
142 let loc = self.events.find_location(id, scheme)?;
143 Ok(self
144 .transport
145 .send_query(loc, SignedQueryMessage::KelQuery(query))
146 .await?)
147 }
148
149 pub async fn send_management_query_to(
150 &self,
151 id: &IdentifierPrefix,
152 scheme: Scheme,
153 query: SignedMailboxQuery,
154 ) -> Result<PossibleResponse, SendingError> {
155 let loc = self.events.find_location(id, scheme)?;
156 Ok(self
157 .transport
158 .send_query(loc, SignedQueryMessage::MailboxQuery(query))
159 .await?)
160 }
161
162 async fn send_oobi_to(
163 &self,
164 id: &IdentifierPrefix,
165 scheme: Scheme,
166 oobi: Oobi,
167 ) -> Result<(), SendingError> {
168 let loc = self.events.find_location(id, scheme)?;
169 self.transport.resolve_oobi(loc, oobi).await?;
170 Ok(())
171 }
172
173 pub async fn publish(
179 &self,
180 witness_prefixes: Vec<BasicPrefix>,
181 message: &SignedEventMessage,
182 ) -> Result<(), MechanicsError> {
183 let (prefix, sn, digest) = (
187 message.event_message.data.get_prefix(),
188 message.event_message.data.get_sn(),
189 message.event_message.digest(),
190 );
191 let rcts_from_db = self
192 .events
193 .find_receipt(&prefix, sn, &digest?)?
194 .map(|rct| Message::Notice(Notice::NontransferableRct(rct)));
195
196 let messages_to_send = if let Some(receipt) = rcts_from_db {
197 vec![Message::Notice(Notice::Event(message.clone())), receipt]
198 } else {
199 vec![Message::Notice(Notice::Event(message.clone()))]
200 };
201
202 join_all(
203 itertools::iproduct!(messages_to_send, witness_prefixes).map(
204 |(message, witness_id)| {
205 self.send_message_to(
206 IdentifierPrefix::Basic(witness_id.clone()),
207 Scheme::Http,
208 message.clone(),
209 )
210 },
211 ),
212 )
213 .await;
214
215 Ok(())
216 }
217
218 pub async fn send_oobi_to_watcher(
221 &self,
222 id: &IdentifierPrefix,
223 oobi: &Oobi,
224 ) -> Result<(), ControllerError> {
225 for watcher in self.events.get_watchers(id)?.iter() {
226 self.send_oobi_to(watcher, Scheme::Http, oobi.clone())
227 .await?;
228 }
229
230 Ok(())
231 }
232
233 pub async fn send_tel_query(
234 &self,
235 qry: SignedTelQuery,
236 location: LocationScheme,
237 ) -> Result<String, SendingError> {
238 self.tel_transport.send_query(qry, location).await
239 }
240
241 pub async fn send_tel_event(
242 &self,
243 event: VerifiableEvent,
244 location: LocationScheme,
245 ) -> Result<(), SendingError> {
246 self.tel_transport.send_tel_event(event, location).await
247 }
248}
249
250#[async_trait::async_trait]
251pub trait IdentifierTelTransport {
252 async fn send_query(
253 &self,
254 qry: SignedTelQuery,
255 location: LocationScheme,
256 ) -> Result<String, SendingError>;
257 async fn send_tel_event(
258 &self,
259 qry: VerifiableEvent,
260 location: LocationScheme,
261 ) -> Result<(), SendingError>;
262}
263
264pub struct HTTPTelTransport;
265
266#[async_trait::async_trait]
267impl IdentifierTelTransport for HTTPTelTransport {
268 async fn send_query(
269 &self,
270 qry: SignedTelQuery,
271 location: LocationScheme,
272 ) -> Result<String, SendingError> {
273 let url = match location.scheme {
274 Scheme::Http => location.url.join("query/tel")?,
275 Scheme::Tcp => todo!(),
276 };
277 let resp = reqwest::Client::new()
278 .post(url)
279 .body(qry.to_cesr().unwrap())
280 .send()
281 .await?;
282
283 Ok(resp.text().await?)
284 }
285
286 async fn send_tel_event(
287 &self,
288 event: VerifiableEvent,
289 location: LocationScheme,
290 ) -> Result<(), SendingError> {
291 let url = match location.scheme {
292 Scheme::Http => location.url.join("process/tel")?,
293 Scheme::Tcp => todo!(),
294 };
295 let client = reqwest::Client::new();
296 let query = event.serialize().unwrap();
297 let resp = client.post(url).body(query).send().await?;
298 resp.text().await?;
299
300 Ok(())
301 }
302}