Skip to main content

alto_client/
consensus.rs

1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed};
3use commonware_codec::{DecodeExt, Encode};
4use commonware_consensus::Viewable;
5use commonware_cryptography::Digestible;
6use commonware_parallel::Strategy;
7use futures::{channel::mpsc::unbounded, Stream, StreamExt};
8use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::Message as TMessage};
9
10fn seed_upload_path(base: String) -> String {
11    format!("{base}/seed")
12}
13
14fn seed_get_path(base: String, query: &IndexQuery) -> String {
15    format!("{base}/seed/{}", query.serialize())
16}
17
18fn notarization_upload_path(base: String) -> String {
19    format!("{base}/notarization")
20}
21
22fn notarization_get_path(base: String, query: &IndexQuery) -> String {
23    format!("{base}/notarization/{}", query.serialize())
24}
25
26fn finalization_upload_path(base: String) -> String {
27    format!("{base}/finalization")
28}
29
30fn finalization_get_path(base: String, query: &IndexQuery) -> String {
31    format!("{base}/finalization/{}", query.serialize())
32}
33
34fn block_upload_path(base: String) -> String {
35    format!("{base}/block")
36}
37
38fn block_get_path(base: String, query: &Query) -> String {
39    format!("{base}/block/{}", query.serialize())
40}
41
42fn listen_path(base: String) -> String {
43    format!("{base}/consensus/ws")
44}
45
46pub enum Payload {
47    Finalized(Box<Finalized>),
48    Block(Box<Block>),
49}
50
51pub enum Message {
52    Seed(Seed),
53    Notarization(Notarized),
54    Finalization(Finalized),
55}
56
57impl<S: Strategy> Client<S> {
58    pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
59        let result = self
60            .http_client
61            .post(seed_upload_path(self.uri.clone()))
62            .body(seed.encode().to_vec())
63            .send()
64            .await
65            .map_err(Error::Reqwest)?;
66        if !result.status().is_success() {
67            return Err(Error::Failed(result.status()));
68        }
69        Ok(())
70    }
71
72    pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
73        // Get the seed
74        let result = self
75            .http_client
76            .get(seed_get_path(self.uri.clone(), &query))
77            .send()
78            .await
79            .map_err(Error::Reqwest)?;
80        if !result.status().is_success() {
81            return Err(Error::Failed(result.status()));
82        }
83        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
84        let seed = Seed::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
85        if self.verify && !seed.verify(&self.certificate_verifier) {
86            return Err(Error::InvalidSignature);
87        }
88
89        // Verify the seed matches the query
90        match query {
91            IndexQuery::Latest => {}
92            IndexQuery::Index(index) => {
93                if seed.view().get() != index {
94                    return Err(Error::UnexpectedResponse);
95                }
96            }
97        }
98        Ok(seed)
99    }
100
101    pub async fn notarized_upload(&self, notarized: Notarized) -> Result<(), Error> {
102        let result = self
103            .http_client
104            .post(notarization_upload_path(self.uri.clone()))
105            .body(notarized.encode().to_vec())
106            .send()
107            .await
108            .map_err(Error::Reqwest)?;
109        if !result.status().is_success() {
110            return Err(Error::Failed(result.status()));
111        }
112        Ok(())
113    }
114
115    pub async fn notarized_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
116        // Get the notarization
117        let result = self
118            .http_client
119            .get(notarization_get_path(self.uri.clone(), &query))
120            .send()
121            .await
122            .map_err(Error::Reqwest)?;
123        if !result.status().is_success() {
124            return Err(Error::Failed(result.status()));
125        }
126        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
127        let notarized = Notarized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
128        if self.verify && !notarized.verify(&self.certificate_verifier, &self.strategy) {
129            return Err(Error::InvalidSignature);
130        }
131
132        // Verify the notarization matches the query
133        match query {
134            IndexQuery::Latest => {}
135            IndexQuery::Index(index) => {
136                if notarized.proof.view().get() != index {
137                    return Err(Error::UnexpectedResponse);
138                }
139            }
140        }
141        Ok(notarized)
142    }
143
144    pub async fn finalized_upload(&self, finalized: Finalized) -> Result<(), Error> {
145        let result = self
146            .http_client
147            .post(finalization_upload_path(self.uri.clone()))
148            .body(finalized.encode().to_vec())
149            .send()
150            .await
151            .map_err(Error::Reqwest)?;
152        if !result.status().is_success() {
153            return Err(Error::Failed(result.status()));
154        }
155        Ok(())
156    }
157
158    pub async fn finalized_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
159        // Get the finalization
160        let result = self
161            .http_client
162            .get(finalization_get_path(self.uri.clone(), &query))
163            .send()
164            .await
165            .map_err(Error::Reqwest)?;
166        if !result.status().is_success() {
167            return Err(Error::Failed(result.status()));
168        }
169        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
170        let finalized = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
171        if self.verify && !finalized.verify(&self.certificate_verifier, &self.strategy) {
172            return Err(Error::InvalidSignature);
173        }
174
175        // Verify the finalization matches the query
176        match query {
177            IndexQuery::Latest => {}
178            IndexQuery::Index(index) => {
179                if finalized.proof.view().get() != index {
180                    return Err(Error::UnexpectedResponse);
181                }
182            }
183        }
184        Ok(finalized)
185    }
186
187    pub async fn block_upload(&self, block: Block) -> Result<(), Error> {
188        // Upload a block (without a certificate)
189        let result = self
190            .http_client
191            .post(block_upload_path(self.uri.clone()))
192            .body(block.encode().to_vec())
193            .send()
194            .await
195            .map_err(Error::Reqwest)?;
196        if !result.status().is_success() {
197            return Err(Error::Failed(result.status()));
198        }
199        Ok(())
200    }
201
202    pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
203        // Get the block
204        let result = self
205            .http_client
206            .get(block_get_path(self.uri.clone(), &query))
207            .send()
208            .await
209            .map_err(Error::Reqwest)?;
210        if !result.status().is_success() {
211            return Err(Error::Failed(result.status()));
212        }
213        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
214
215        // Verify the block matches the query
216        let result = match query {
217            Query::Latest => {
218                let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
219                if self.verify && !result.verify(&self.certificate_verifier, &self.strategy) {
220                    return Err(Error::InvalidSignature);
221                }
222                Payload::Finalized(Box::new(result))
223            }
224            Query::Index(index) => {
225                let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
226                if self.verify && !result.verify(&self.certificate_verifier, &self.strategy) {
227                    return Err(Error::InvalidSignature);
228                }
229                if result.block.height.get() != index {
230                    return Err(Error::UnexpectedResponse);
231                }
232                Payload::Finalized(Box::new(result))
233            }
234            Query::Digest(digest) => {
235                let result = Block::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
236                if result.digest() != digest {
237                    return Err(Error::UnexpectedResponse);
238                }
239                Payload::Block(Box::new(result))
240            }
241        };
242        Ok(result)
243    }
244
245    pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
246        // Connect to the websocket endpoint
247        let (stream, _) = connect_async_tls_with_config(
248            listen_path(self.ws_uri.clone()),
249            None,
250            false,
251            Some(self.ws_connector.clone()),
252        )
253        .await
254        .map_err(Error::from)?;
255        let (_, read) = stream.split();
256
257        // Create an unbounded channel for streaming consensus messages
258        let (sender, receiver) = unbounded();
259        tokio::spawn({
260            let certificate_verifier = self.certificate_verifier.clone();
261            let strategy = self.strategy.clone();
262            let verify = self.verify;
263            async move {
264                read.for_each(|message| async {
265                    match message {
266                        Ok(TMessage::Binary(data)) => {
267                            // Get kind
268                            let kind = data[0];
269                            let Some(kind) = Kind::from_u8(kind) else {
270                                let _ = sender.unbounded_send(Err(Error::UnexpectedResponse));
271                                return;
272                            };
273                            let data = &data[1..];
274
275                            // Deserialize the message
276                            match kind {
277                                Kind::Seed => {
278                                    let result = Seed::decode(data);
279                                    match result {
280                                        Ok(seed) => {
281                                            if verify && !seed.verify(&certificate_verifier) {
282                                                let _ = sender
283                                                    .unbounded_send(Err(Error::InvalidSignature));
284                                                return;
285                                            }
286                                            let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
287                                        }
288                                        Err(e) => {
289                                            let _ =
290                                                sender.unbounded_send(Err(Error::InvalidData(e)));
291                                        }
292                                    }
293                                }
294                                Kind::Notarization => {
295                                    let result = Notarized::decode(data);
296                                    match result {
297                                        Ok(notarized) => {
298                                            if verify
299                                                && !notarized
300                                                    .verify(&certificate_verifier, &strategy)
301                                            {
302                                                let _ = sender
303                                                    .unbounded_send(Err(Error::InvalidSignature));
304                                                return;
305                                            }
306                                            let _ = sender.unbounded_send(Ok(
307                                                Message::Notarization(notarized),
308                                            ));
309                                        }
310                                        Err(e) => {
311                                            let _ =
312                                                sender.unbounded_send(Err(Error::InvalidData(e)));
313                                        }
314                                    }
315                                }
316                                Kind::Finalization => {
317                                    let result = Finalized::decode(data);
318                                    match result {
319                                        Ok(finalized) => {
320                                            if verify
321                                                && !finalized
322                                                    .verify(&certificate_verifier, &strategy)
323                                            {
324                                                let _ = sender
325                                                    .unbounded_send(Err(Error::InvalidSignature));
326                                                return;
327                                            }
328                                            let _ = sender.unbounded_send(Ok(
329                                                Message::Finalization(finalized),
330                                            ));
331                                        }
332                                        Err(e) => {
333                                            let _ =
334                                                sender.unbounded_send(Err(Error::InvalidData(e)));
335                                        }
336                                    }
337                                }
338                            }
339                        }
340                        Ok(_) => {} // Ignore non-binary messages.
341                        Err(e) => {
342                            let _ = sender.unbounded_send(Err(Error::from(e)));
343                        }
344                    }
345                })
346                .await;
347            }
348        });
349        Ok(receiver)
350    }
351}