1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{
3 Block, Finalization, Finalized, Kind, Notarization, Notarized, Nullification, Seed,
4};
5use futures::{channel::mpsc::unbounded, Stream, StreamExt};
6use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage};
7
8fn seed_upload_path(base: String) -> String {
9 format!("{}/seed", base)
10}
11
12fn seed_get_path(base: String, query: &IndexQuery) -> String {
13 format!("{}/seed/{}", base, query.serialize())
14}
15
16fn nullification_upload_path(base: String) -> String {
17 format!("{}/nullification", base)
18}
19
20fn nullification_get_path(base: String, query: &IndexQuery) -> String {
21 format!("{}/nullification/{}", base, query.serialize())
22}
23
24fn notarization_upload_path(base: String) -> String {
25 format!("{}/notarization", base)
26}
27
28fn notarization_get_path(base: String, query: &IndexQuery) -> String {
29 format!("{}/notarization/{}", base, query.serialize())
30}
31
32fn finalization_upload_path(base: String) -> String {
33 format!("{}/finalization", base)
34}
35
36fn finalization_get_path(base: String, query: &IndexQuery) -> String {
37 format!("{}/finalization/{}", base, query.serialize())
38}
39
40fn block_get_path(base: String, query: &Query) -> String {
43 format!("{}/block/{}", base, query.serialize())
44}
45
46fn register_path(base: String) -> String {
47 format!("{}/consensus/ws", base)
48}
49
50pub enum Payload {
51 Finalized(Box<Finalized>),
52 Block(Block),
53}
54
55pub enum Message {
56 Seed(Seed),
57 Nullification(Nullification),
58 Notarization(Notarized),
59 Finalization(Finalized),
60}
61
62impl Client {
63 pub async fn seed_upload(&self, seed: Seed) -> Result<(), Error> {
64 let request = seed.serialize();
65 let client = reqwest::Client::new();
66 let result = client
67 .post(seed_upload_path(self.uri.clone()))
68 .body(request)
69 .send()
70 .await
71 .map_err(Error::Reqwest)?;
72 if !result.status().is_success() {
73 return Err(Error::Failed(result.status()));
74 }
75 Ok(())
76 }
77
78 pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
79 let client = reqwest::Client::new();
81 let result = client
82 .get(seed_get_path(self.uri.clone(), &query))
83 .send()
84 .await
85 .map_err(Error::Reqwest)?;
86 if !result.status().is_success() {
87 return Err(Error::Failed(result.status()));
88 }
89 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
90 let result = Seed::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
91
92 match query {
94 IndexQuery::Latest => {}
95 IndexQuery::Index(index) => {
96 if result.view != index {
97 return Err(Error::InvalidData);
98 }
99 }
100 }
101 Ok(result)
102 }
103
104 pub async fn nullification_upload(&self, nullification: Nullification) -> Result<(), Error> {
105 let request = nullification.serialize();
106 let client = reqwest::Client::new();
107 let result = client
108 .post(nullification_upload_path(self.uri.clone()))
109 .body(request)
110 .send()
111 .await
112 .map_err(Error::Reqwest)?;
113 if !result.status().is_success() {
114 return Err(Error::Failed(result.status()));
115 }
116 Ok(())
117 }
118
119 pub async fn nullification_get(&self, query: IndexQuery) -> Result<Nullification, Error> {
120 let client = reqwest::Client::new();
122 let result = client
123 .get(nullification_get_path(self.uri.clone(), &query))
124 .send()
125 .await
126 .map_err(Error::Reqwest)?;
127 if !result.status().is_success() {
128 return Err(Error::Failed(result.status()));
129 }
130 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
131 let result =
132 Nullification::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
133
134 match query {
136 IndexQuery::Latest => {}
137 IndexQuery::Index(index) => {
138 if result.view != index {
139 return Err(Error::InvalidData);
140 }
141 }
142 }
143 Ok(result)
144 }
145
146 pub async fn notarization_upload(
147 &self,
148 proof: Notarization,
149 block: Block,
150 ) -> Result<(), Error> {
151 let request = Notarized::new(proof, block).serialize();
152 let client = reqwest::Client::new();
153 let result = client
154 .post(notarization_upload_path(self.uri.clone()))
155 .body(request)
156 .send()
157 .await
158 .map_err(Error::Reqwest)?;
159 if !result.status().is_success() {
160 return Err(Error::Failed(result.status()));
161 }
162 Ok(())
163 }
164
165 pub async fn notarization_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
166 let client = reqwest::Client::new();
168 let result = client
169 .get(notarization_get_path(self.uri.clone(), &query))
170 .send()
171 .await
172 .map_err(Error::Reqwest)?;
173 if !result.status().is_success() {
174 return Err(Error::Failed(result.status()));
175 }
176 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
177 let result =
178 Notarized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
179
180 match query {
182 IndexQuery::Latest => {}
183 IndexQuery::Index(index) => {
184 if result.proof.view != index {
185 return Err(Error::InvalidData);
186 }
187 }
188 }
189 Ok(result)
190 }
191
192 pub async fn finalization_upload(
193 &self,
194 proof: Finalization,
195 block: Block,
196 ) -> Result<(), Error> {
197 let request = Finalized::new(proof, block).serialize();
198 let client = reqwest::Client::new();
199 let result = client
200 .post(finalization_upload_path(self.uri.clone()))
201 .body(request)
202 .send()
203 .await
204 .map_err(Error::Reqwest)?;
205 if !result.status().is_success() {
206 return Err(Error::Failed(result.status()));
207 }
208 Ok(())
209 }
210
211 pub async fn finalization_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
212 let client = reqwest::Client::new();
214 let result = client
215 .get(finalization_get_path(self.uri.clone(), &query))
216 .send()
217 .await
218 .map_err(Error::Reqwest)?;
219 if !result.status().is_success() {
220 return Err(Error::Failed(result.status()));
221 }
222 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
223 let result =
224 Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
225
226 match query {
228 IndexQuery::Latest => {}
229 IndexQuery::Index(index) => {
230 if result.proof.view != index {
231 return Err(Error::InvalidData);
232 }
233 }
234 }
235 Ok(result)
236 }
237
238 pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
239 let client = reqwest::Client::new();
241 let result = client
242 .get(block_get_path(self.uri.clone(), &query))
243 .send()
244 .await
245 .map_err(Error::Reqwest)?;
246 if !result.status().is_success() {
247 return Err(Error::Failed(result.status()));
248 }
249 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
250
251 let result = match query {
253 Query::Latest => {
254 let result =
255 Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
256 Payload::Finalized(Box::new(result))
257 }
258 Query::Index(index) => {
259 let result =
260 Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
261 if result.block.height != index {
262 return Err(Error::InvalidData);
263 }
264 Payload::Finalized(Box::new(result))
265 }
266 Query::Digest(digest) => {
267 let result = Block::deserialize(&bytes).ok_or(Error::InvalidData)?;
268 if result.digest() != digest {
269 return Err(Error::InvalidData);
270 }
271 Payload::Block(result)
272 }
273 };
274 Ok(result)
275 }
276
277 pub async fn register(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
278 let (stream, _) = connect_async(register_path(self.ws_uri.clone()))
280 .await
281 .map_err(Error::from)?;
282 let (_, read) = stream.split();
283
284 let public = self.public.clone();
286 let (sender, receiver) = unbounded();
287 tokio::spawn(async move {
288 read.for_each(|message| async {
289 match message {
290 Ok(TMessage::Binary(data)) => {
291 let kind = data[0];
293 let Some(kind) = Kind::from_u8(kind) else {
294 let _ = sender.unbounded_send(Err(Error::InvalidData));
295 return;
296 };
297 let data = &data[1..];
298
299 match kind {
301 Kind::Seed => {
302 if let Some(seed) = Seed::deserialize(Some(&public), data) {
303 let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
304 } else {
305 let _ = sender.unbounded_send(Err(Error::InvalidData));
306 }
307 }
308 Kind::Notarization => {
309 if let Some(payload) = Notarized::deserialize(Some(&public), data) {
310 let _ =
311 sender.unbounded_send(Ok(Message::Notarization(payload)));
312 } else {
313 let _ = sender.unbounded_send(Err(Error::InvalidData));
314 }
315 }
316 Kind::Nullification => {
317 if let Some(nullification) =
318 Nullification::deserialize(Some(&public), data)
319 {
320 let _ = sender
321 .unbounded_send(Ok(Message::Nullification(nullification)));
322 } else {
323 let _ = sender.unbounded_send(Err(Error::InvalidData));
324 }
325 }
326 Kind::Finalization => {
327 if let Some(payload) = Finalized::deserialize(Some(&public), data) {
328 let _ =
329 sender.unbounded_send(Ok(Message::Finalization(payload)));
330 } else {
331 let _ = sender.unbounded_send(Err(Error::InvalidData));
332 }
333 }
334 }
335 }
336 Ok(_) => {} Err(e) => {
338 let _ = sender.unbounded_send(Err(Error::from(e)));
339 }
340 }
341 })
342 .await;
343 });
344 Ok(receiver)
345 }
346}