Skip to main content

alto_client/
consensus.rs

1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed};
3use commonware_codec::{DecodeExt, Encode};
4use commonware_consensus::Viewable;
5use commonware_cryptography::Digestible;
6use commonware_parallel::Strategy;
7use futures::{channel::mpsc::unbounded, Stream, StreamExt};
8use tokio_tungstenite::{connect_async_tls_with_config, tungstenite::Message as TMessage};
9
10fn seed_upload_path(base: String) -> String {
11    format!("{base}/seed")
12}
13
14fn seed_get_path(base: String, query: &IndexQuery) -> String {
15    format!("{base}/seed/{}", query.serialize())
16}
17
18fn notarization_upload_path(base: String) -> String {
19    format!("{base}/notarization")
20}
21
22fn notarization_get_path(base: String, query: &IndexQuery) -> String {
23    format!("{base}/notarization/{}", query.serialize())
24}
25
26fn finalization_upload_path(base: String) -> String {
27    format!("{base}/finalization")
28}
29
30fn finalization_get_path(base: String, query: &IndexQuery) -> String {
31    format!("{base}/finalization/{}", query.serialize())
32}
33
34/// There is no block upload path. Blocks are uploaded as a byproduct of notarization
35/// and finalization uploads.
36fn block_get_path(base: String, query: &Query) -> String {
37    format!("{base}/block/{}", query.serialize())
38}
39
40fn listen_path(base: String) -> String {
41    format!("{base}/consensus/ws")
42}
43
44pub enum Payload {
45    Finalized(Box<Finalized>),
46    Block(Box<Block>),
47}
48
49pub enum Message {
50    Seed(Seed),
51    Notarization(Notarized),
52    Finalization(Finalized),
53}
54
55impl<S: Strategy> Client<S> {
56    pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
57        let result = self
58            .http_client
59            .post(seed_upload_path(self.uri.clone()))
60            .body(seed.encode().to_vec())
61            .send()
62            .await
63            .map_err(Error::Reqwest)?;
64        if !result.status().is_success() {
65            return Err(Error::Failed(result.status()));
66        }
67        Ok(())
68    }
69
70    pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
71        // Get the seed
72        let result = self
73            .http_client
74            .get(seed_get_path(self.uri.clone(), &query))
75            .send()
76            .await
77            .map_err(Error::Reqwest)?;
78        if !result.status().is_success() {
79            return Err(Error::Failed(result.status()));
80        }
81        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
82        let seed = Seed::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
83        if !seed.verify(&self.certificate_verifier) {
84            return Err(Error::InvalidSignature);
85        }
86
87        // Verify the seed matches the query
88        match query {
89            IndexQuery::Latest => {}
90            IndexQuery::Index(index) => {
91                if seed.view().get() != index {
92                    return Err(Error::UnexpectedResponse);
93                }
94            }
95        }
96        Ok(seed)
97    }
98
99    pub async fn notarized_upload(&self, notarized: Notarized) -> Result<(), Error> {
100        let result = self
101            .http_client
102            .post(notarization_upload_path(self.uri.clone()))
103            .body(notarized.encode().to_vec())
104            .send()
105            .await
106            .map_err(Error::Reqwest)?;
107        if !result.status().is_success() {
108            return Err(Error::Failed(result.status()));
109        }
110        Ok(())
111    }
112
113    pub async fn notarized_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
114        // Get the notarization
115        let result = self
116            .http_client
117            .get(notarization_get_path(self.uri.clone(), &query))
118            .send()
119            .await
120            .map_err(Error::Reqwest)?;
121        if !result.status().is_success() {
122            return Err(Error::Failed(result.status()));
123        }
124        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
125        let notarized = Notarized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
126        if !notarized.verify(&self.certificate_verifier, &self.strategy) {
127            return Err(Error::InvalidSignature);
128        }
129
130        // Verify the notarization matches the query
131        match query {
132            IndexQuery::Latest => {}
133            IndexQuery::Index(index) => {
134                if notarized.proof.view().get() != index {
135                    return Err(Error::UnexpectedResponse);
136                }
137            }
138        }
139        Ok(notarized)
140    }
141
142    pub async fn finalized_upload(&self, finalized: Finalized) -> Result<(), Error> {
143        let result = self
144            .http_client
145            .post(finalization_upload_path(self.uri.clone()))
146            .body(finalized.encode().to_vec())
147            .send()
148            .await
149            .map_err(Error::Reqwest)?;
150        if !result.status().is_success() {
151            return Err(Error::Failed(result.status()));
152        }
153        Ok(())
154    }
155
156    pub async fn finalized_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
157        // Get the finalization
158        let result = self
159            .http_client
160            .get(finalization_get_path(self.uri.clone(), &query))
161            .send()
162            .await
163            .map_err(Error::Reqwest)?;
164        if !result.status().is_success() {
165            return Err(Error::Failed(result.status()));
166        }
167        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
168        let finalized = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
169        if !finalized.verify(&self.certificate_verifier, &self.strategy) {
170            return Err(Error::InvalidSignature);
171        }
172
173        // Verify the finalization matches the query
174        match query {
175            IndexQuery::Latest => {}
176            IndexQuery::Index(index) => {
177                if finalized.proof.view().get() != index {
178                    return Err(Error::UnexpectedResponse);
179                }
180            }
181        }
182        Ok(finalized)
183    }
184
185    pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
186        // Get the block
187        let result = self
188            .http_client
189            .get(block_get_path(self.uri.clone(), &query))
190            .send()
191            .await
192            .map_err(Error::Reqwest)?;
193        if !result.status().is_success() {
194            return Err(Error::Failed(result.status()));
195        }
196        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
197
198        // Verify the block matches the query
199        let result = match query {
200            Query::Latest => {
201                let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
202                if !result.verify(&self.certificate_verifier, &self.strategy) {
203                    return Err(Error::InvalidSignature);
204                }
205                Payload::Finalized(Box::new(result))
206            }
207            Query::Index(index) => {
208                let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
209                if !result.verify(&self.certificate_verifier, &self.strategy) {
210                    return Err(Error::InvalidSignature);
211                }
212                if result.block.height.get() != index {
213                    return Err(Error::UnexpectedResponse);
214                }
215                Payload::Finalized(Box::new(result))
216            }
217            Query::Digest(digest) => {
218                let result = Block::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
219                if result.digest() != digest {
220                    return Err(Error::UnexpectedResponse);
221                }
222                Payload::Block(Box::new(result))
223            }
224        };
225        Ok(result)
226    }
227
228    pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
229        // Connect to the websocket endpoint
230        let (stream, _) = connect_async_tls_with_config(
231            listen_path(self.ws_uri.clone()),
232            None,
233            false,
234            Some(self.ws_connector.clone()),
235        )
236        .await
237        .map_err(Error::from)?;
238        let (_, read) = stream.split();
239
240        // Create an unbounded channel for streaming consensus messages
241        let (sender, receiver) = unbounded();
242        tokio::spawn({
243            let certificate_verifier = self.certificate_verifier.clone();
244            let strategy = self.strategy.clone();
245            async move {
246                read.for_each(|message| async {
247                    match message {
248                        Ok(TMessage::Binary(data)) => {
249                            // Get kind
250                            let kind = data[0];
251                            let Some(kind) = Kind::from_u8(kind) else {
252                                let _ = sender.unbounded_send(Err(Error::UnexpectedResponse));
253                                return;
254                            };
255                            let data = &data[1..];
256
257                            // Deserialize the message
258                            match kind {
259                                Kind::Seed => {
260                                    let result = Seed::decode(data);
261                                    match result {
262                                        Ok(seed) => {
263                                            if !seed.verify(&certificate_verifier) {
264                                                let _ = sender
265                                                    .unbounded_send(Err(Error::InvalidSignature));
266                                                return;
267                                            }
268                                            let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
269                                        }
270                                        Err(e) => {
271                                            let _ =
272                                                sender.unbounded_send(Err(Error::InvalidData(e)));
273                                        }
274                                    }
275                                }
276                                Kind::Notarization => {
277                                    let result = Notarized::decode(data);
278                                    match result {
279                                        Ok(notarized) => {
280                                            if !notarized.verify(&certificate_verifier, &strategy) {
281                                                let _ = sender
282                                                    .unbounded_send(Err(Error::InvalidSignature));
283                                                return;
284                                            }
285                                            let _ = sender.unbounded_send(Ok(
286                                                Message::Notarization(notarized),
287                                            ));
288                                        }
289                                        Err(e) => {
290                                            let _ =
291                                                sender.unbounded_send(Err(Error::InvalidData(e)));
292                                        }
293                                    }
294                                }
295                                Kind::Finalization => {
296                                    let result = Finalized::decode(data);
297                                    match result {
298                                        Ok(finalized) => {
299                                            if !finalized.verify(&certificate_verifier, &strategy) {
300                                                let _ = sender
301                                                    .unbounded_send(Err(Error::InvalidSignature));
302                                                return;
303                                            }
304                                            let _ = sender.unbounded_send(Ok(
305                                                Message::Finalization(finalized),
306                                            ));
307                                        }
308                                        Err(e) => {
309                                            let _ =
310                                                sender.unbounded_send(Err(Error::InvalidData(e)));
311                                        }
312                                    }
313                                }
314                            }
315                        }
316                        Ok(_) => {} // Ignore non-binary messages.
317                        Err(e) => {
318                            let _ = sender.unbounded_send(Err(Error::from(e)));
319                        }
320                    }
321                })
322                .await;
323            }
324        });
325        Ok(receiver)
326    }
327}