1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed};
3use commonware_codec::{DecodeExt, Encode};
4use commonware_consensus::Viewable;
5use commonware_cryptography::Digestible;
6use commonware_parallel::Strategy;
7use futures::{channel::mpsc::unbounded, Stream, StreamExt};
8use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::Message as TMessage};
9
10fn seed_upload_path(base: String) -> String {
11 format!("{base}/seed")
12}
13
14fn seed_get_path(base: String, query: &IndexQuery) -> String {
15 format!("{base}/seed/{}", query.serialize())
16}
17
18fn notarization_upload_path(base: String) -> String {
19 format!("{base}/notarization")
20}
21
22fn notarization_get_path(base: String, query: &IndexQuery) -> String {
23 format!("{base}/notarization/{}", query.serialize())
24}
25
26fn finalization_upload_path(base: String) -> String {
27 format!("{base}/finalization")
28}
29
30fn finalization_get_path(base: String, query: &IndexQuery) -> String {
31 format!("{base}/finalization/{}", query.serialize())
32}
33
34fn block_upload_path(base: String) -> String {
35 format!("{base}/block")
36}
37
38fn block_get_path(base: String, query: &Query) -> String {
39 format!("{base}/block/{}", query.serialize())
40}
41
42fn listen_path(base: String) -> String {
43 format!("{base}/consensus/ws")
44}
45
46pub enum Payload {
47 Finalized(Box<Finalized>),
48 Block(Box<Block>),
49}
50
51pub enum Message {
52 Seed(Seed),
53 Notarization(Notarized),
54 Finalization(Finalized),
55}
56
57impl<S: Strategy> Client<S> {
58 pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
59 let result = self
60 .http_client
61 .post(seed_upload_path(self.uri.clone()))
62 .body(seed.encode().to_vec())
63 .send()
64 .await
65 .map_err(Error::Reqwest)?;
66 if !result.status().is_success() {
67 return Err(Error::Failed(result.status()));
68 }
69 Ok(())
70 }
71
72 pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
73 let result = self
75 .http_client
76 .get(seed_get_path(self.uri.clone(), &query))
77 .send()
78 .await
79 .map_err(Error::Reqwest)?;
80 if !result.status().is_success() {
81 return Err(Error::Failed(result.status()));
82 }
83 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
84 let seed = Seed::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
85 if self.verify && !seed.verify(&self.certificate_verifier) {
86 return Err(Error::InvalidSignature);
87 }
88
89 match query {
91 IndexQuery::Latest => {}
92 IndexQuery::Index(index) => {
93 if seed.view().get() != index {
94 return Err(Error::UnexpectedResponse);
95 }
96 }
97 }
98 Ok(seed)
99 }
100
101 pub async fn notarized_upload(&self, notarized: Notarized) -> Result<(), Error> {
102 let result = self
103 .http_client
104 .post(notarization_upload_path(self.uri.clone()))
105 .body(notarized.encode().to_vec())
106 .send()
107 .await
108 .map_err(Error::Reqwest)?;
109 if !result.status().is_success() {
110 return Err(Error::Failed(result.status()));
111 }
112 Ok(())
113 }
114
115 pub async fn notarized_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
116 let result = self
118 .http_client
119 .get(notarization_get_path(self.uri.clone(), &query))
120 .send()
121 .await
122 .map_err(Error::Reqwest)?;
123 if !result.status().is_success() {
124 return Err(Error::Failed(result.status()));
125 }
126 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
127 let notarized = Notarized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
128 if self.verify && !notarized.verify(&self.certificate_verifier, &self.strategy) {
129 return Err(Error::InvalidSignature);
130 }
131
132 match query {
134 IndexQuery::Latest => {}
135 IndexQuery::Index(index) => {
136 if notarized.proof.view().get() != index {
137 return Err(Error::UnexpectedResponse);
138 }
139 }
140 }
141 Ok(notarized)
142 }
143
144 pub async fn finalized_upload(&self, finalized: Finalized) -> Result<(), Error> {
145 let result = self
146 .http_client
147 .post(finalization_upload_path(self.uri.clone()))
148 .body(finalized.encode().to_vec())
149 .send()
150 .await
151 .map_err(Error::Reqwest)?;
152 if !result.status().is_success() {
153 return Err(Error::Failed(result.status()));
154 }
155 Ok(())
156 }
157
158 pub async fn finalized_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
159 let result = self
161 .http_client
162 .get(finalization_get_path(self.uri.clone(), &query))
163 .send()
164 .await
165 .map_err(Error::Reqwest)?;
166 if !result.status().is_success() {
167 return Err(Error::Failed(result.status()));
168 }
169 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
170 let finalized = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
171 if self.verify && !finalized.verify(&self.certificate_verifier, &self.strategy) {
172 return Err(Error::InvalidSignature);
173 }
174
175 match query {
177 IndexQuery::Latest => {}
178 IndexQuery::Index(index) => {
179 if finalized.proof.view().get() != index {
180 return Err(Error::UnexpectedResponse);
181 }
182 }
183 }
184 Ok(finalized)
185 }
186
187 pub async fn block_upload(&self, block: Block) -> Result<(), Error> {
188 let result = self
190 .http_client
191 .post(block_upload_path(self.uri.clone()))
192 .body(block.encode().to_vec())
193 .send()
194 .await
195 .map_err(Error::Reqwest)?;
196 if !result.status().is_success() {
197 return Err(Error::Failed(result.status()));
198 }
199 Ok(())
200 }
201
202 pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
203 let result = self
205 .http_client
206 .get(block_get_path(self.uri.clone(), &query))
207 .send()
208 .await
209 .map_err(Error::Reqwest)?;
210 if !result.status().is_success() {
211 return Err(Error::Failed(result.status()));
212 }
213 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
214
215 let result = match query {
217 Query::Latest => {
218 let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
219 if self.verify && !result.verify(&self.certificate_verifier, &self.strategy) {
220 return Err(Error::InvalidSignature);
221 }
222 Payload::Finalized(Box::new(result))
223 }
224 Query::Index(index) => {
225 let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
226 if self.verify && !result.verify(&self.certificate_verifier, &self.strategy) {
227 return Err(Error::InvalidSignature);
228 }
229 if result.block.height.get() != index {
230 return Err(Error::UnexpectedResponse);
231 }
232 Payload::Finalized(Box::new(result))
233 }
234 Query::Digest(digest) => {
235 let result = Block::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
236 if result.digest() != digest {
237 return Err(Error::UnexpectedResponse);
238 }
239 Payload::Block(Box::new(result))
240 }
241 };
242 Ok(result)
243 }
244
245 pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
246 let (stream, _) = connect_async_tls_with_config(
248 listen_path(self.ws_uri.clone()),
249 None,
250 false,
251 Some(self.ws_connector.clone()),
252 )
253 .await
254 .map_err(Error::from)?;
255 let (_, read) = stream.split();
256
257 let (sender, receiver) = unbounded();
259 tokio::spawn({
260 let certificate_verifier = self.certificate_verifier.clone();
261 let strategy = self.strategy.clone();
262 let verify = self.verify;
263 async move {
264 read.for_each(|message| async {
265 match message {
266 Ok(TMessage::Binary(data)) => {
267 let kind = data[0];
269 let Some(kind) = Kind::from_u8(kind) else {
270 let _ = sender.unbounded_send(Err(Error::UnexpectedResponse));
271 return;
272 };
273 let data = &data[1..];
274
275 match kind {
277 Kind::Seed => {
278 let result = Seed::decode(data);
279 match result {
280 Ok(seed) => {
281 if verify && !seed.verify(&certificate_verifier) {
282 let _ = sender
283 .unbounded_send(Err(Error::InvalidSignature));
284 return;
285 }
286 let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
287 }
288 Err(e) => {
289 let _ =
290 sender.unbounded_send(Err(Error::InvalidData(e)));
291 }
292 }
293 }
294 Kind::Notarization => {
295 let result = Notarized::decode(data);
296 match result {
297 Ok(notarized) => {
298 if verify
299 && !notarized
300 .verify(&certificate_verifier, &strategy)
301 {
302 let _ = sender
303 .unbounded_send(Err(Error::InvalidSignature));
304 return;
305 }
306 let _ = sender.unbounded_send(Ok(
307 Message::Notarization(notarized),
308 ));
309 }
310 Err(e) => {
311 let _ =
312 sender.unbounded_send(Err(Error::InvalidData(e)));
313 }
314 }
315 }
316 Kind::Finalization => {
317 let result = Finalized::decode(data);
318 match result {
319 Ok(finalized) => {
320 if verify
321 && !finalized
322 .verify(&certificate_verifier, &strategy)
323 {
324 let _ = sender
325 .unbounded_send(Err(Error::InvalidSignature));
326 return;
327 }
328 let _ = sender.unbounded_send(Ok(
329 Message::Finalization(finalized),
330 ));
331 }
332 Err(e) => {
333 let _ =
334 sender.unbounded_send(Err(Error::InvalidData(e)));
335 }
336 }
337 }
338 }
339 }
340 Ok(_) => {} Err(e) => {
342 let _ = sender.unbounded_send(Err(Error::from(e)));
343 }
344 }
345 })
346 .await;
347 }
348 });
349 Ok(receiver)
350 }
351}