alto_client/
consensus.rs

1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{
3    Block, Finalization, Finalized, Kind, Notarization, Notarized, Nullification, Seed,
4};
5use futures::{channel::mpsc::unbounded, Stream, StreamExt};
6use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage};
7
8fn seed_upload_path(base: String) -> String {
9    format!("{}/seed", base)
10}
11
12fn seed_get_path(base: String, query: &IndexQuery) -> String {
13    format!("{}/seed/{}", base, query.serialize())
14}
15
16fn nullification_upload_path(base: String) -> String {
17    format!("{}/nullification", base)
18}
19
20fn nullification_get_path(base: String, query: &IndexQuery) -> String {
21    format!("{}/nullification/{}", base, query.serialize())
22}
23
24fn notarization_upload_path(base: String) -> String {
25    format!("{}/notarization", base)
26}
27
28fn notarization_get_path(base: String, query: &IndexQuery) -> String {
29    format!("{}/notarization/{}", base, query.serialize())
30}
31
32fn finalization_upload_path(base: String) -> String {
33    format!("{}/finalization", base)
34}
35
36fn finalization_get_path(base: String, query: &IndexQuery) -> String {
37    format!("{}/finalization/{}", base, query.serialize())
38}
39
40/// There is no block upload path. Blocks are uploaded as a byproduct of notarization
41/// and finalization uploads.
42fn block_get_path(base: String, query: &Query) -> String {
43    format!("{}/block/{}", base, query.serialize())
44}
45
46fn register_path(base: String) -> String {
47    format!("{}/consensus/ws", base)
48}
49
50pub enum Payload {
51    Finalized(Box<Finalized>),
52    Block(Block),
53}
54
55pub enum Message {
56    Seed(Seed),
57    Nullification(Nullification),
58    Notarization(Notarized),
59    Finalization(Finalized),
60}
61
62impl Client {
63    pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
64        let request = seed.serialize();
65        let client = reqwest::Client::new();
66        let result = client
67            .post(seed_upload_path(self.uri.clone()))
68            .body(request)
69            .send()
70            .await
71            .map_err(Error::Reqwest)?;
72        if !result.status().is_success() {
73            return Err(Error::Failed(result.status()));
74        }
75        Ok(())
76    }
77
78    pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
79        // Get the seed
80        let client = reqwest::Client::new();
81        let result = client
82            .get(seed_get_path(self.uri.clone(), &query))
83            .send()
84            .await
85            .map_err(Error::Reqwest)?;
86        if !result.status().is_success() {
87            return Err(Error::Failed(result.status()));
88        }
89        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
90        let result = Seed::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
91
92        // Verify the seed matches the query
93        match query {
94            IndexQuery::Latest => {}
95            IndexQuery::Index(index) => {
96                if result.view != index {
97                    return Err(Error::InvalidData);
98                }
99            }
100        }
101        Ok(result)
102    }
103
104    pub async fn nullification_upload(&self, nullification: Nullification) -> Result<(), Error> {
105        let request = nullification.serialize();
106        let client = reqwest::Client::new();
107        let result = client
108            .post(nullification_upload_path(self.uri.clone()))
109            .body(request)
110            .send()
111            .await
112            .map_err(Error::Reqwest)?;
113        if !result.status().is_success() {
114            return Err(Error::Failed(result.status()));
115        }
116        Ok(())
117    }
118
119    pub async fn nullification_get(&self, query: IndexQuery) -> Result<Nullification, Error> {
120        // Get the nullification
121        let client = reqwest::Client::new();
122        let result = client
123            .get(nullification_get_path(self.uri.clone(), &query))
124            .send()
125            .await
126            .map_err(Error::Reqwest)?;
127        if !result.status().is_success() {
128            return Err(Error::Failed(result.status()));
129        }
130        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
131        let result =
132            Nullification::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
133
134        // Verify the nullification matches the query
135        match query {
136            IndexQuery::Latest => {}
137            IndexQuery::Index(index) => {
138                if result.view != index {
139                    return Err(Error::InvalidData);
140                }
141            }
142        }
143        Ok(result)
144    }
145
146    pub async fn notarization_upload(
147        &self,
148        proof: Notarization,
149        block: Block,
150    ) -> Result<(), Error> {
151        let request = Notarized::new(proof, block).serialize();
152        let client = reqwest::Client::new();
153        let result = client
154            .post(notarization_upload_path(self.uri.clone()))
155            .body(request)
156            .send()
157            .await
158            .map_err(Error::Reqwest)?;
159        if !result.status().is_success() {
160            return Err(Error::Failed(result.status()));
161        }
162        Ok(())
163    }
164
165    pub async fn notarization_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
166        // Get the notarization
167        let client = reqwest::Client::new();
168        let result = client
169            .get(notarization_get_path(self.uri.clone(), &query))
170            .send()
171            .await
172            .map_err(Error::Reqwest)?;
173        if !result.status().is_success() {
174            return Err(Error::Failed(result.status()));
175        }
176        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
177        let result =
178            Notarized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
179
180        // Verify the notarization matches the query
181        match query {
182            IndexQuery::Latest => {}
183            IndexQuery::Index(index) => {
184                if result.proof.view != index {
185                    return Err(Error::InvalidData);
186                }
187            }
188        }
189        Ok(result)
190    }
191
192    pub async fn finalization_upload(
193        &self,
194        proof: Finalization,
195        block: Block,
196    ) -> Result<(), Error> {
197        let request = Finalized::new(proof, block).serialize();
198        let client = reqwest::Client::new();
199        let result = client
200            .post(finalization_upload_path(self.uri.clone()))
201            .body(request)
202            .send()
203            .await
204            .map_err(Error::Reqwest)?;
205        if !result.status().is_success() {
206            return Err(Error::Failed(result.status()));
207        }
208        Ok(())
209    }
210
211    pub async fn finalization_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
212        // Get the finalization
213        let client = reqwest::Client::new();
214        let result = client
215            .get(finalization_get_path(self.uri.clone(), &query))
216            .send()
217            .await
218            .map_err(Error::Reqwest)?;
219        if !result.status().is_success() {
220            return Err(Error::Failed(result.status()));
221        }
222        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
223        let result =
224            Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
225
226        // Verify the finalization matches the query
227        match query {
228            IndexQuery::Latest => {}
229            IndexQuery::Index(index) => {
230                if result.proof.view != index {
231                    return Err(Error::InvalidData);
232                }
233            }
234        }
235        Ok(result)
236    }
237
238    pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
239        // Get the block
240        let client = reqwest::Client::new();
241        let result = client
242            .get(block_get_path(self.uri.clone(), &query))
243            .send()
244            .await
245            .map_err(Error::Reqwest)?;
246        if !result.status().is_success() {
247            return Err(Error::Failed(result.status()));
248        }
249        let bytes = result.bytes().await.map_err(Error::Reqwest)?;
250
251        // Verify the block matches the query
252        let result = match query {
253            Query::Latest => {
254                let result =
255                    Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
256                Payload::Finalized(Box::new(result))
257            }
258            Query::Index(index) => {
259                let result =
260                    Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
261                if result.block.height != index {
262                    return Err(Error::InvalidData);
263                }
264                Payload::Finalized(Box::new(result))
265            }
266            Query::Digest(digest) => {
267                let result = Block::deserialize(&bytes).ok_or(Error::InvalidData)?;
268                if result.digest() != digest {
269                    return Err(Error::InvalidData);
270                }
271                Payload::Block(result)
272            }
273        };
274        Ok(result)
275    }
276
277    pub async fn register(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
278        // Connect to the websocket endpoint
279        let (stream, _) = connect_async(register_path(self.ws_uri.clone()))
280            .await
281            .map_err(Error::from)?;
282        let (_, read) = stream.split();
283
284        // Create an unbounded channel for streaming consensus messages
285        let public = self.public.clone();
286        let (sender, receiver) = unbounded();
287        tokio::spawn(async move {
288            read.for_each(|message| async {
289                match message {
290                    Ok(TMessage::Binary(data)) => {
291                        // Get kind
292                        let kind = data[0];
293                        let Some(kind) = Kind::from_u8(kind) else {
294                            let _ = sender.unbounded_send(Err(Error::InvalidData));
295                            return;
296                        };
297                        let data = &data[1..];
298
299                        // Deserialize the message
300                        match kind {
301                            Kind::Seed => {
302                                if let Some(seed) = Seed::deserialize(Some(&public), data) {
303                                    let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
304                                } else {
305                                    let _ = sender.unbounded_send(Err(Error::InvalidData));
306                                }
307                            }
308                            Kind::Notarization => {
309                                if let Some(payload) = Notarized::deserialize(Some(&public), data) {
310                                    let _ =
311                                        sender.unbounded_send(Ok(Message::Notarization(payload)));
312                                } else {
313                                    let _ = sender.unbounded_send(Err(Error::InvalidData));
314                                }
315                            }
316                            Kind::Nullification => {
317                                if let Some(nullification) =
318                                    Nullification::deserialize(Some(&public), data)
319                                {
320                                    let _ = sender
321                                        .unbounded_send(Ok(Message::Nullification(nullification)));
322                                } else {
323                                    let _ = sender.unbounded_send(Err(Error::InvalidData));
324                                }
325                            }
326                            Kind::Finalization => {
327                                if let Some(payload) = Finalized::deserialize(Some(&public), data) {
328                                    let _ =
329                                        sender.unbounded_send(Ok(Message::Finalization(payload)));
330                                } else {
331                                    let _ = sender.unbounded_send(Err(Error::InvalidData));
332                                }
333                            }
334                        }
335                    }
336                    Ok(_) => {} // Ignore non-binary messages.
337                    Err(e) => {
338                        let _ = sender.unbounded_send(Err(Error::from(e)));
339                    }
340                }
341            })
342            .await;
343        });
344        Ok(receiver)
345    }
346}