alto_client/
consensus.rs

1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed};
3use bytes::Bytes;
4use futures::{channel::mpsc::unbounded, Stream, StreamExt};
5use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage};
6
7fn seed_upload_path(base: String) -> String {
8    format!("{}/seed", base)
9}
10
11fn seed_get_path(base: String, query: &IndexQuery) -> String {
12    format!("{}/seed/{}", base, query.serialize())
13}
14
15fn notarization_upload_path(base: String) -> String {
16    format!("{}/notarization", base)
17}
18
19fn notarization_get_path(base: String, query: &IndexQuery) -> String {
20    format!("{}/notarization/{}", base, query.serialize())
21}
22
23fn finalization_upload_path(base: String) -> String {
24    format!("{}/finalization", base)
25}
26
27fn finalization_get_path(base: String, query: &IndexQuery) -> String {
28    format!("{}/finalization/{}", base, query.serialize())
29}
30
31/// There is no block upload path. Blocks are uploaded as a byproduct of notarization
32/// and finalization uploads.
33fn block_get_path(base: String, query: &Query) -> String {
34    format!("{}/block/{}", base, query.serialize())
35}
36
37fn listen_path(base: String) -> String {
38    format!("{}/consensus/ws", base)
39}
40
41pub enum Payload {
42    Finalized(Box<Finalized>),
43    Block(Block),
44}
45
46pub enum Message {
47    Seed(Seed),
48    Notarization(Notarized),
49    Finalization(Finalized),
50}
51
52impl Client {
53    pub async fn seed_upload(&self, seed: Bytes) -> Result<(), Error> {
54        let result = self
55            .client
56            .post(seed_upload_path(self.uri.clone()))
57            .body(seed)
58            .send()
59            .await
60            .map_err(Error::Reqwest)?;
61        if !result.status().is_success() {
62            return Err(Error::Failed(result.status()));
63        }
64        Ok(())
65    }
66
67    pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
68        // Get the seed
69        let result = self
70            .client
71            .get(seed_get_path(self.uri.clone(), &query))
72            .send()
73            .await
74            .map_err(Error::Reqwest)?;
75        if !result.status().is_success() {
76            return Err(Error::Failed(result.status()));
77        }
78        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
79        let result = Seed::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
80
81        // Verify the seed matches the query
82        match query {
83            IndexQuery::Latest => {}
84            IndexQuery::Index(index) => {
85                if result.view != index {
86                    return Err(Error::InvalidData);
87                }
88            }
89        }
90        Ok(result)
91    }
92
93    pub async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Error> {
94        let result = self
95            .client
96            .post(notarization_upload_path(self.uri.clone()))
97            .body(notarized)
98            .send()
99            .await
100            .map_err(Error::Reqwest)?;
101        if !result.status().is_success() {
102            return Err(Error::Failed(result.status()));
103        }
104        Ok(())
105    }
106
107    pub async fn notarization_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
108        // Get the notarization
109        let result = self
110            .client
111            .get(notarization_get_path(self.uri.clone(), &query))
112            .send()
113            .await
114            .map_err(Error::Reqwest)?;
115        if !result.status().is_success() {
116            return Err(Error::Failed(result.status()));
117        }
118        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
119        let result =
120            Notarized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
121
122        // Verify the notarization matches the query
123        match query {
124            IndexQuery::Latest => {}
125            IndexQuery::Index(index) => {
126                if result.proof.view != index {
127                    return Err(Error::InvalidData);
128                }
129            }
130        }
131        Ok(result)
132    }
133
134    pub async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Error> {
135        let result = self
136            .client
137            .post(finalization_upload_path(self.uri.clone()))
138            .body(finalized)
139            .send()
140            .await
141            .map_err(Error::Reqwest)?;
142        if !result.status().is_success() {
143            return Err(Error::Failed(result.status()));
144        }
145        Ok(())
146    }
147
148    pub async fn finalization_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
149        // Get the finalization
150        let result = self
151            .client
152            .get(finalization_get_path(self.uri.clone(), &query))
153            .send()
154            .await
155            .map_err(Error::Reqwest)?;
156        if !result.status().is_success() {
157            return Err(Error::Failed(result.status()));
158        }
159        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
160        let result =
161            Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
162
163        // Verify the finalization matches the query
164        match query {
165            IndexQuery::Latest => {}
166            IndexQuery::Index(index) => {
167                if result.proof.view != index {
168                    return Err(Error::InvalidData);
169                }
170            }
171        }
172        Ok(result)
173    }
174
175    pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
176        // Get the block
177        let result = self
178            .client
179            .get(block_get_path(self.uri.clone(), &query))
180            .send()
181            .await
182            .map_err(Error::Reqwest)?;
183        if !result.status().is_success() {
184            return Err(Error::Failed(result.status()));
185        }
186        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
187
188        // Verify the block matches the query
189        let result = match query {
190            Query::Latest => {
191                let result =
192                    Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
193                Payload::Finalized(Box::new(result))
194            }
195            Query::Index(index) => {
196                let result =
197                    Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
198                if result.block.height != index {
199                    return Err(Error::InvalidData);
200                }
201                Payload::Finalized(Box::new(result))
202            }
203            Query::Digest(digest) => {
204                let result = Block::deserialize(&bytes).ok_or(Error::InvalidData)?;
205                if result.digest() != digest {
206                    return Err(Error::InvalidData);
207                }
208                Payload::Block(result)
209            }
210        };
211        Ok(result)
212    }
213
214    pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
215        // Connect to the websocket endpoint
216        let (stream, _) = connect_async(listen_path(self.ws_uri.clone()))
217            .await
218            .map_err(Error::from)?;
219        let (_, read) = stream.split();
220
221        // Create an unbounded channel for streaming consensus messages
222        let public = self.public.clone();
223        let (sender, receiver) = unbounded();
224        tokio::spawn(async move {
225            read.for_each(|message| async {
226                match message {
227                    Ok(TMessage::Binary(data)) => {
228                        // Get kind
229                        let kind = data[0];
230                        let Some(kind) = Kind::from_u8(kind) else {
231                            let _ = sender.unbounded_send(Err(Error::InvalidData));
232                            return;
233                        };
234                        let data = &data[1..];
235
236                        // Deserialize the message
237                        match kind {
238                            Kind::Seed => {
239                                if let Some(seed) = Seed::deserialize(Some(&public), data) {
240                                    let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
241                                } else {
242                                    let _ = sender.unbounded_send(Err(Error::InvalidData));
243                                }
244                            }
245                            Kind::Notarization => {
246                                if let Some(payload) = Notarized::deserialize(Some(&public), data) {
247                                    let _ =
248                                        sender.unbounded_send(Ok(Message::Notarization(payload)));
249                                } else {
250                                    let _ = sender.unbounded_send(Err(Error::InvalidData));
251                                }
252                            }
253                            Kind::Nullification => {} // Ignore nullifications
254                            Kind::Finalization => {
255                                if let Some(payload) = Finalized::deserialize(Some(&public), data) {
256                                    let _ =
257                                        sender.unbounded_send(Ok(Message::Finalization(payload)));
258                                } else {
259                                    let _ = sender.unbounded_send(Err(Error::InvalidData));
260                                }
261                            }
262                        }
263                    }
264                    Ok(_) => {} // Ignore non-binary messages.
265                    Err(e) => {
266                        let _ = sender.unbounded_send(Err(Error::from(e)));
267                    }
268                }
269            })
270            .await;
271        });
272        Ok(receiver)
273    }
274}