alto_client/
consensus.rs

1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed, NAMESPACE};
3use commonware_codec::{DecodeExt, Encode};
4use commonware_consensus::Viewable;
5use commonware_cryptography::Digestible;
6use futures::{channel::mpsc::unbounded, Stream, StreamExt};
7use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage};
8
9fn seed_upload_path(base: String) -> String {
10    format!("{base}/seed")
11}
12
13fn seed_get_path(base: String, query: &IndexQuery) -> String {
14    format!("{base}/seed/{}", query.serialize())
15}
16
17fn notarization_upload_path(base: String) -> String {
18    format!("{base}/notarization")
19}
20
21fn notarization_get_path(base: String, query: &IndexQuery) -> String {
22    format!("{base}/notarization/{}", query.serialize())
23}
24
25fn finalization_upload_path(base: String) -> String {
26    format!("{base}/finalization")
27}
28
29fn finalization_get_path(base: String, query: &IndexQuery) -> String {
30    format!("{base}/finalization/{}", query.serialize())
31}
32
33/// There is no block upload path. Blocks are uploaded as a byproduct of notarization
34/// and finalization uploads.
35fn block_get_path(base: String, query: &Query) -> String {
36    format!("{base}/block/{}", query.serialize())
37}
38
39fn listen_path(base: String) -> String {
40    format!("{base}/consensus/ws")
41}
42
43pub enum Payload {
44    Finalized(Box<Finalized>),
45    Block(Block),
46}
47
48pub enum Message {
49    Seed(Seed),
50    Notarization(Notarized),
51    Finalization(Finalized),
52}
53
54impl Client {
55    pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
56        let result = self
57            .client
58            .post(seed_upload_path(self.uri.clone()))
59            .body(seed.encode().to_vec())
60            .send()
61            .await
62            .map_err(Error::Reqwest)?;
63        if !result.status().is_success() {
64            return Err(Error::Failed(result.status()));
65        }
66        Ok(())
67    }
68
69    pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
70        // Get the seed
71        let result = self
72            .client
73            .get(seed_get_path(self.uri.clone(), &query))
74            .send()
75            .await
76            .map_err(Error::Reqwest)?;
77        if !result.status().is_success() {
78            return Err(Error::Failed(result.status()));
79        }
80        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
81        let seed = Seed::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
82        if !seed.verify(&self.certificate_verifier, NAMESPACE) {
83            return Err(Error::InvalidSignature);
84        }
85
86        // Verify the seed matches the query
87        match query {
88            IndexQuery::Latest => {}
89            IndexQuery::Index(index) => {
90                if seed.view() != index {
91                    return Err(Error::UnexpectedResponse);
92                }
93            }
94        }
95        Ok(seed)
96    }
97
98    pub async fn notarized_upload(&self, notarized: Notarized) -> Result<(), Error> {
99        let result = self
100            .client
101            .post(notarization_upload_path(self.uri.clone()))
102            .body(notarized.encode().to_vec())
103            .send()
104            .await
105            .map_err(Error::Reqwest)?;
106        if !result.status().is_success() {
107            return Err(Error::Failed(result.status()));
108        }
109        Ok(())
110    }
111
112    pub async fn notarized_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
113        // Get the notarization
114        let result = self
115            .client
116            .get(notarization_get_path(self.uri.clone(), &query))
117            .send()
118            .await
119            .map_err(Error::Reqwest)?;
120        if !result.status().is_success() {
121            return Err(Error::Failed(result.status()));
122        }
123        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
124        let notarized = Notarized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
125        if !notarized.verify(&self.certificate_verifier, NAMESPACE) {
126            return Err(Error::InvalidSignature);
127        }
128
129        // Verify the notarization matches the query
130        match query {
131            IndexQuery::Latest => {}
132            IndexQuery::Index(index) => {
133                if notarized.proof.view() != index {
134                    return Err(Error::UnexpectedResponse);
135                }
136            }
137        }
138        Ok(notarized)
139    }
140
141    pub async fn finalized_upload(&self, finalized: Finalized) -> Result<(), Error> {
142        let result = self
143            .client
144            .post(finalization_upload_path(self.uri.clone()))
145            .body(finalized.encode().to_vec())
146            .send()
147            .await
148            .map_err(Error::Reqwest)?;
149        if !result.status().is_success() {
150            return Err(Error::Failed(result.status()));
151        }
152        Ok(())
153    }
154
155    pub async fn finalized_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
156        // Get the finalization
157        let result = self
158            .client
159            .get(finalization_get_path(self.uri.clone(), &query))
160            .send()
161            .await
162            .map_err(Error::Reqwest)?;
163        if !result.status().is_success() {
164            return Err(Error::Failed(result.status()));
165        }
166        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
167        let finalized = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
168        if !finalized.verify(&self.certificate_verifier, NAMESPACE) {
169            return Err(Error::InvalidSignature);
170        }
171
172        // Verify the finalization matches the query
173        match query {
174            IndexQuery::Latest => {}
175            IndexQuery::Index(index) => {
176                if finalized.proof.view() != index {
177                    return Err(Error::UnexpectedResponse);
178                }
179            }
180        }
181        Ok(finalized)
182    }
183
184    pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
185        // Get the block
186        let result = self
187            .client
188            .get(block_get_path(self.uri.clone(), &query))
189            .send()
190            .await
191            .map_err(Error::Reqwest)?;
192        if !result.status().is_success() {
193            return Err(Error::Failed(result.status()));
194        }
195        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
196
197        // Verify the block matches the query
198        let result = match query {
199            Query::Latest => {
200                let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
201                if !result.verify(&self.certificate_verifier, NAMESPACE) {
202                    return Err(Error::InvalidSignature);
203                }
204                Payload::Finalized(Box::new(result))
205            }
206            Query::Index(index) => {
207                let result = Finalized::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
208                if !result.verify(&self.certificate_verifier, NAMESPACE) {
209                    return Err(Error::InvalidSignature);
210                }
211                if result.block.height != index {
212                    return Err(Error::UnexpectedResponse);
213                }
214                Payload::Finalized(Box::new(result))
215            }
216            Query::Digest(digest) => {
217                let result = Block::decode(bytes.as_ref()).map_err(Error::InvalidData)?;
218                if result.digest() != digest {
219                    return Err(Error::UnexpectedResponse);
220                }
221                Payload::Block(result)
222            }
223        };
224        Ok(result)
225    }
226
227    pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
228        // Connect to the websocket endpoint
229        let (stream, _) = connect_async(listen_path(self.ws_uri.clone()))
230            .await
231            .map_err(Error::from)?;
232        let (_, read) = stream.split();
233
234        // Create an unbounded channel for streaming consensus messages
235        let (sender, receiver) = unbounded();
236        tokio::spawn({
237            let certificate_verifier = self.certificate_verifier.clone();
238            async move {
239                read.for_each(|message| async {
240                    match message {
241                        Ok(TMessage::Binary(data)) => {
242                            // Get kind
243                            let kind = data[0];
244                            let Some(kind) = Kind::from_u8(kind) else {
245                                let _ = sender.unbounded_send(Err(Error::UnexpectedResponse));
246                                return;
247                            };
248                            let data = &data[1..];
249
250                            // Deserialize the message
251                            match kind {
252                                Kind::Seed => {
253                                    let result = Seed::decode(data);
254                                    match result {
255                                        Ok(seed) => {
256                                            if !seed.verify(&certificate_verifier, NAMESPACE) {
257                                                let _ = sender
258                                                    .unbounded_send(Err(Error::InvalidSignature));
259                                                return;
260                                            }
261                                            let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
262                                        }
263                                        Err(e) => {
264                                            let _ =
265                                                sender.unbounded_send(Err(Error::InvalidData(e)));
266                                        }
267                                    }
268                                }
269                                Kind::Notarization => {
270                                    let result = Notarized::decode(data);
271                                    match result {
272                                        Ok(notarized) => {
273                                            if !notarized.verify(&certificate_verifier, NAMESPACE) {
274                                                let _ = sender
275                                                    .unbounded_send(Err(Error::InvalidSignature));
276                                                return;
277                                            }
278                                            let _ = sender.unbounded_send(Ok(
279                                                Message::Notarization(notarized),
280                                            ));
281                                        }
282                                        Err(e) => {
283                                            let _ =
284                                                sender.unbounded_send(Err(Error::InvalidData(e)));
285                                        }
286                                    }
287                                }
288                                Kind::Finalization => {
289                                    let result = Finalized::decode(data);
290                                    match result {
291                                        Ok(finalized) => {
292                                            if !finalized.verify(&certificate_verifier, NAMESPACE) {
293                                                let _ = sender
294                                                    .unbounded_send(Err(Error::InvalidSignature));
295                                                return;
296                                            }
297                                            let _ = sender.unbounded_send(Ok(
298                                                Message::Finalization(finalized),
299                                            ));
300                                        }
301                                        Err(e) => {
302                                            let _ =
303                                                sender.unbounded_send(Err(Error::InvalidData(e)));
304                                        }
305                                    }
306                                }
307                            }
308                        }
309                        Ok(_) => {} // Ignore non-binary messages.
310                        Err(e) => {
311                            let _ = sender.unbounded_send(Err(Error::from(e)));
312                        }
313                    }
314                })
315                .await;
316            }
317        });
318        Ok(receiver)
319    }
320}