1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed, NAMESPACE};
3use commonware_codec::{DecodeExt, Encode};
4use commonware_consensus::Viewable;
5use commonware_cryptography::Digestible;
6use futures::{channel::mpsc::unbounded, Stream, StreamExt};
7use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage};
8
9fn seed_upload_path(base: String) -> String {
10 format!("{base}/seed")
11}
12
13fn seed_get_path(base: String, query: &IndexQuery) -> String {
14 format!("{base}/seed/{}", query.serialize())
15}
16
17fn notarization_upload_path(base: String) -> String {
18 format!("{base}/notarization")
19}
20
21fn notarization_get_path(base: String, query: &IndexQuery) -> String {
22 format!("{base}/notarization/{}", query.serialize())
23}
24
25fn finalization_upload_path(base: String) -> String {
26 format!("{base}/finalization")
27}
28
29fn finalization_get_path(base: String, query: &IndexQuery) -> String {
30 format!("{base}/finalization/{}", query.serialize())
31}
32
33fn block_get_path(base: String, query: &Query) -> String {
36 format!("{base}/block/{}", query.serialize())
37}
38
39fn listen_path(base: String) -> String {
40 format!("{base}/consensus/ws")
41}
42
43pub enum Payload {
44 Finalized(Box<Finalized>),
45 Block(Block),
46}
47
48pub enum Message {
49 Seed(Seed),
50 Notarization(Notarized),
51 Finalization(Finalized),
52}
53
54impl Client {
55 pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
56 let result = self
57 .client
58 .post(seed_upload_path(self.uri.clone()))
59 .body(seed.encode().to_vec())
60 .send()
61 .await
62 .map_err(Error::Reqwest)?;
63 if !result.status().is_success() {
64 return Err(Error::Failed(result.status()));
65 }
66 Ok(())
67 }
68
69 pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
70 let result = self
72 .client
73 .get(seed_get_path(self.uri.clone(), &query))
74 .send()
75 .await
76 .map_err(Error::Reqwest)?;
77 if !result.status().is_success() {
78 return Err(Error::Failed(result.status()));
79 }
80 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
81 let seed = Seed::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
82 if !seed.verify(&self.certificate_verifier, NAMESPACE) {
83 return Err(Error::InvalidSignature);
84 }
85
86 match query {
88 IndexQuery::Latest => {}
89 IndexQuery::Index(index) => {
90 if seed.view() != index {
91 return Err(Error::UnexpectedResponse);
92 }
93 }
94 }
95 Ok(seed)
96 }
97
98 pub async fn notarized_upload(&self, notarized: Notarized) -> Result<(), Error> {
99 let result = self
100 .client
101 .post(notarization_upload_path(self.uri.clone()))
102 .body(notarized.encode().to_vec())
103 .send()
104 .await
105 .map_err(Error::Reqwest)?;
106 if !result.status().is_success() {
107 return Err(Error::Failed(result.status()));
108 }
109 Ok(())
110 }
111
112 pub async fn notarized_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
113 let result = self
115 .client
116 .get(notarization_get_path(self.uri.clone(), &query))
117 .send()
118 .await
119 .map_err(Error::Reqwest)?;
120 if !result.status().is_success() {
121 return Err(Error::Failed(result.status()));
122 }
123 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
124 let notarized = Notarized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
125 if !notarized.verify(&self.certificate_verifier, NAMESPACE) {
126 return Err(Error::InvalidSignature);
127 }
128
129 match query {
131 IndexQuery::Latest => {}
132 IndexQuery::Index(index) => {
133 if notarized.proof.view() != index {
134 return Err(Error::UnexpectedResponse);
135 }
136 }
137 }
138 Ok(notarized)
139 }
140
141 pub async fn finalized_upload(&self, finalized: Finalized) -> Result<(), Error> {
142 let result = self
143 .client
144 .post(finalization_upload_path(self.uri.clone()))
145 .body(finalized.encode().to_vec())
146 .send()
147 .await
148 .map_err(Error::Reqwest)?;
149 if !result.status().is_success() {
150 return Err(Error::Failed(result.status()));
151 }
152 Ok(())
153 }
154
155 pub async fn finalized_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
156 let result = self
158 .client
159 .get(finalization_get_path(self.uri.clone(), &query))
160 .send()
161 .await
162 .map_err(Error::Reqwest)?;
163 if !result.status().is_success() {
164 return Err(Error::Failed(result.status()));
165 }
166 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
167 let finalized = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
168 if !finalized.verify(&self.certificate_verifier, NAMESPACE) {
169 return Err(Error::InvalidSignature);
170 }
171
172 match query {
174 IndexQuery::Latest => {}
175 IndexQuery::Index(index) => {
176 if finalized.proof.view() != index {
177 return Err(Error::UnexpectedResponse);
178 }
179 }
180 }
181 Ok(finalized)
182 }
183
184 pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
185 let result = self
187 .client
188 .get(block_get_path(self.uri.clone(), &query))
189 .send()
190 .await
191 .map_err(Error::Reqwest)?;
192 if !result.status().is_success() {
193 return Err(Error::Failed(result.status()));
194 }
195 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
196
197 let result = match query {
199 Query::Latest => {
200 let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
201 if !result.verify(&self.certificate_verifier, NAMESPACE) {
202 return Err(Error::InvalidSignature);
203 }
204 Payload::Finalized(Box::new(result))
205 }
206 Query::Index(index) => {
207 let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
208 if !result.verify(&self.certificate_verifier, NAMESPACE) {
209 return Err(Error::InvalidSignature);
210 }
211 if result.block.height != index {
212 return Err(Error::UnexpectedResponse);
213 }
214 Payload::Finalized(Box::new(result))
215 }
216 Query::Digest(digest) => {
217 let result = Block::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
218 if result.digest() != digest {
219 return Err(Error::UnexpectedResponse);
220 }
221 Payload::Block(result)
222 }
223 };
224 Ok(result)
225 }
226
227 pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
228 let (stream, _) = connect_async(listen_path(self.ws_uri.clone()))
230 .await
231 .map_err(Error::from)?;
232 let (_, read) = stream.split();
233
234 let (sender, receiver) = unbounded();
236 tokio::spawn({
237 let certificate_verifier = self.certificate_verifier.clone();
238 async move {
239 read.for_each(|message| async {
240 match message {
241 Ok(TMessage::Binary(data)) => {
242 let kind = data[0];
244 let Some(kind) = Kind::from_u8(kind) else {
245 let _ = sender.unbounded_send(Err(Error::UnexpectedResponse));
246 return;
247 };
248 let data = &data[1..];
249
250 match kind {
252 Kind::Seed => {
253 let result = Seed::decode(data);
254 match result {
255 Ok(seed) => {
256 if !seed.verify(&certificate_verifier, NAMESPACE) {
257 let _ = sender
258 .unbounded_send(Err(Error::InvalidSignature));
259 return;
260 }
261 let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
262 }
263 Err(e) => {
264 let _ =
265 sender.unbounded_send(Err(Error::InvalidData(e)));
266 }
267 }
268 }
269 Kind::Notarization => {
270 let result = Notarized::decode(data);
271 match result {
272 Ok(notarized) => {
273 if !notarized.verify(&certificate_verifier, NAMESPACE) {
274 let _ = sender
275 .unbounded_send(Err(Error::InvalidSignature));
276 return;
277 }
278 let _ = sender.unbounded_send(Ok(
279 Message::Notarization(notarized),
280 ));
281 }
282 Err(e) => {
283 let _ =
284 sender.unbounded_send(Err(Error::InvalidData(e)));
285 }
286 }
287 }
288 Kind::Finalization => {
289 let result = Finalized::decode(data);
290 match result {
291 Ok(finalized) => {
292 if !finalized.verify(&certificate_verifier, NAMESPACE) {
293 let _ = sender
294 .unbounded_send(Err(Error::InvalidSignature));
295 return;
296 }
297 let _ = sender.unbounded_send(Ok(
298 Message::Finalization(finalized),
299 ));
300 }
301 Err(e) => {
302 let _ =
303 sender.unbounded_send(Err(Error::InvalidData(e)));
304 }
305 }
306 }
307 }
308 }
309 Ok(_) => {} Err(e) => {
311 let _ = sender.unbounded_send(Err(Error::from(e)));
312 }
313 }
314 })
315 .await;
316 }
317 });
318 Ok(receiver)
319 }
320}