1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed};
3use commonware_codec::{DecodeExt, Encode};
4use commonware_consensus::Viewable;
5use commonware_cryptography::Digestible;
6use commonware_parallel::Strategy;
7use futures::{channel::mpsc::unbounded, Stream, StreamExt};
8use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::Message as TMessage};
9
10fn seed_upload_path(base: String) -> String {
11 format!("{base}/seed")
12}
13
14fn seed_get_path(base: String, query: &IndexQuery) -> String {
15 format!("{base}/seed/{}", query.serialize())
16}
17
18fn notarization_upload_path(base: String) -> String {
19 format!("{base}/notarization")
20}
21
22fn notarization_get_path(base: String, query: &IndexQuery) -> String {
23 format!("{base}/notarization/{}", query.serialize())
24}
25
26fn finalization_upload_path(base: String) -> String {
27 format!("{base}/finalization")
28}
29
30fn finalization_get_path(base: String, query: &IndexQuery) -> String {
31 format!("{base}/finalization/{}", query.serialize())
32}
33
34fn block_get_path(base: String, query: &Query) -> String {
37 format!("{base}/block/{}", query.serialize())
38}
39
40fn listen_path(base: String) -> String {
41 format!("{base}/consensus/ws")
42}
43
44pub enum Payload {
45 Finalized(Box<Finalized>),
46 Block(Box<Block>),
47}
48
49pub enum Message {
50 Seed(Seed),
51 Notarization(Notarized),
52 Finalization(Finalized),
53}
54
55impl<S: Strategy> Client<S> {
56 pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
57 let result = self
58 .http_client
59 .post(seed_upload_path(self.uri.clone()))
60 .body(seed.encode().to_vec())
61 .send()
62 .await
63 .map_err(Error::Reqwest)?;
64 if !result.status().is_success() {
65 return Err(Error::Failed(result.status()));
66 }
67 Ok(())
68 }
69
70 pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
71 let result = self
73 .http_client
74 .get(seed_get_path(self.uri.clone(), &query))
75 .send()
76 .await
77 .map_err(Error::Reqwest)?;
78 if !result.status().is_success() {
79 return Err(Error::Failed(result.status()));
80 }
81 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
82 let seed = Seed::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
83 if !seed.verify(&self.certificate_verifier) {
84 return Err(Error::InvalidSignature);
85 }
86
87 match query {
89 IndexQuery::Latest => {}
90 IndexQuery::Index(index) => {
91 if seed.view().get() != index {
92 return Err(Error::UnexpectedResponse);
93 }
94 }
95 }
96 Ok(seed)
97 }
98
99 pub async fn notarized_upload(&self, notarized: Notarized) -> Result<(), Error> {
100 let result = self
101 .http_client
102 .post(notarization_upload_path(self.uri.clone()))
103 .body(notarized.encode().to_vec())
104 .send()
105 .await
106 .map_err(Error::Reqwest)?;
107 if !result.status().is_success() {
108 return Err(Error::Failed(result.status()));
109 }
110 Ok(())
111 }
112
113 pub async fn notarized_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
114 let result = self
116 .http_client
117 .get(notarization_get_path(self.uri.clone(), &query))
118 .send()
119 .await
120 .map_err(Error::Reqwest)?;
121 if !result.status().is_success() {
122 return Err(Error::Failed(result.status()));
123 }
124 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
125 let notarized = Notarized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
126 if !notarized.verify(&self.certificate_verifier, &self.strategy) {
127 return Err(Error::InvalidSignature);
128 }
129
130 match query {
132 IndexQuery::Latest => {}
133 IndexQuery::Index(index) => {
134 if notarized.proof.view().get() != index {
135 return Err(Error::UnexpectedResponse);
136 }
137 }
138 }
139 Ok(notarized)
140 }
141
142 pub async fn finalized_upload(&self, finalized: Finalized) -> Result<(), Error> {
143 let result = self
144 .http_client
145 .post(finalization_upload_path(self.uri.clone()))
146 .body(finalized.encode().to_vec())
147 .send()
148 .await
149 .map_err(Error::Reqwest)?;
150 if !result.status().is_success() {
151 return Err(Error::Failed(result.status()));
152 }
153 Ok(())
154 }
155
156 pub async fn finalized_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
157 let result = self
159 .http_client
160 .get(finalization_get_path(self.uri.clone(), &query))
161 .send()
162 .await
163 .map_err(Error::Reqwest)?;
164 if !result.status().is_success() {
165 return Err(Error::Failed(result.status()));
166 }
167 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
168 let finalized = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
169 if !finalized.verify(&self.certificate_verifier, &self.strategy) {
170 return Err(Error::InvalidSignature);
171 }
172
173 match query {
175 IndexQuery::Latest => {}
176 IndexQuery::Index(index) => {
177 if finalized.proof.view().get() != index {
178 return Err(Error::UnexpectedResponse);
179 }
180 }
181 }
182 Ok(finalized)
183 }
184
185 pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
186 let result = self
188 .http_client
189 .get(block_get_path(self.uri.clone(), &query))
190 .send()
191 .await
192 .map_err(Error::Reqwest)?;
193 if !result.status().is_success() {
194 return Err(Error::Failed(result.status()));
195 }
196 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
197
198 let result = match query {
200 Query::Latest => {
201 let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
202 if !result.verify(&self.certificate_verifier, &self.strategy) {
203 return Err(Error::InvalidSignature);
204 }
205 Payload::Finalized(Box::new(result))
206 }
207 Query::Index(index) => {
208 let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
209 if !result.verify(&self.certificate_verifier, &self.strategy) {
210 return Err(Error::InvalidSignature);
211 }
212 if result.block.height.get() != index {
213 return Err(Error::UnexpectedResponse);
214 }
215 Payload::Finalized(Box::new(result))
216 }
217 Query::Digest(digest) => {
218 let result = Block::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
219 if result.digest() != digest {
220 return Err(Error::UnexpectedResponse);
221 }
222 Payload::Block(Box::new(result))
223 }
224 };
225 Ok(result)
226 }
227
228 pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
229 let (stream, _) = connect_async_tls_with_config(
231 listen_path(self.ws_uri.clone()),
232 None,
233 false,
234 Some(self.ws_connector.clone()),
235 )
236 .await
237 .map_err(Error::from)?;
238 let (_, read) = stream.split();
239
240 let (sender, receiver) = unbounded();
242 tokio::spawn({
243 let certificate_verifier = self.certificate_verifier.clone();
244 let strategy = self.strategy.clone();
245 async move {
246 read.for_each(|message| async {
247 match message {
248 Ok(TMessage::Binary(data)) => {
249 let kind = data[0];
251 let Some(kind) = Kind::from_u8(kind) else {
252 let _ = sender.unbounded_send(Err(Error::UnexpectedResponse));
253 return;
254 };
255 let data = &data[1..];
256
257 match kind {
259 Kind::Seed => {
260 let result = Seed::decode(data);
261 match result {
262 Ok(seed) => {
263 if !seed.verify(&certificate_verifier) {
264 let _ = sender
265 .unbounded_send(Err(Error::InvalidSignature));
266 return;
267 }
268 let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
269 }
270 Err(e) => {
271 let _ =
272 sender.unbounded_send(Err(Error::InvalidData(e)));
273 }
274 }
275 }
276 Kind::Notarization => {
277 let result = Notarized::decode(data);
278 match result {
279 Ok(notarized) => {
280 if !notarized.verify(&certificate_verifier, &strategy) {
281 let _ = sender
282 .unbounded_send(Err(Error::InvalidSignature));
283 return;
284 }
285 let _ = sender.unbounded_send(Ok(
286 Message::Notarization(notarized),
287 ));
288 }
289 Err(e) => {
290 let _ =
291 sender.unbounded_send(Err(Error::InvalidData(e)));
292 }
293 }
294 }
295 Kind::Finalization => {
296 let result = Finalized::decode(data);
297 match result {
298 Ok(finalized) => {
299 if !finalized.verify(&certificate_verifier, &strategy) {
300 let _ = sender
301 .unbounded_send(Err(Error::InvalidSignature));
302 return;
303 }
304 let _ = sender.unbounded_send(Ok(
305 Message::Finalization(finalized),
306 ));
307 }
308 Err(e) => {
309 let _ =
310 sender.unbounded_send(Err(Error::InvalidData(e)));
311 }
312 }
313 }
314 }
315 }
316 Ok(_) => {} Err(e) => {
318 let _ = sender.unbounded_send(Err(Error::from(e)));
319 }
320 }
321 })
322 .await;
323 }
324 });
325 Ok(receiver)
326 }
327}