1use crate::{Client, Error, IndexQuery, Query};
2use alto_types::{Block, Finalized, Kind, Notarized, Seed};
3use bytes::Bytes;
4use futures::{channel::mpsc::unbounded, Stream, StreamExt};
5use tokio_tungstenite::{connect_async, tungstenite::Message as TMessage};
6
7fn seed_upload_path(base: String) -> String {
8 format!("{}/seed", base)
9}
10
11fn seed_get_path(base: String, query: &IndexQuery) -> String {
12 format!("{}/seed/{}", base, query.serialize())
13}
14
15fn notarization_upload_path(base: String) -> String {
16 format!("{}/notarization", base)
17}
18
19fn notarization_get_path(base: String, query: &IndexQuery) -> String {
20 format!("{}/notarization/{}", base, query.serialize())
21}
22
23fn finalization_upload_path(base: String) -> String {
24 format!("{}/finalization", base)
25}
26
27fn finalization_get_path(base: String, query: &IndexQuery) -> String {
28 format!("{}/finalization/{}", base, query.serialize())
29}
30
31fn block_get_path(base: String, query: &Query) -> String {
34 format!("{}/block/{}", base, query.serialize())
35}
36
37fn listen_path(base: String) -> String {
38 format!("{}/consensus/ws", base)
39}
40
41pub enum Payload {
42 Finalized(Box<Finalized>),
43 Block(Block),
44}
45
46pub enum Message {
47 Seed(Seed),
48 Notarization(Notarized),
49 Finalization(Finalized),
50}
51
52impl Client {
53 pub async fn seed_upload(&self, seed: Bytes) -> Result<(), Error> {
54 let result = self
55 .client
56 .post(seed_upload_path(self.uri.clone()))
57 .body(seed)
58 .send()
59 .await
60 .map_err(Error::Reqwest)?;
61 if !result.status().is_success() {
62 return Err(Error::Failed(result.status()));
63 }
64 Ok(())
65 }
66
67 pub async fn seed_get(&self, query: IndexQuery) -> Result<Seed, Error> {
68 let result = self
70 .client
71 .get(seed_get_path(self.uri.clone(), &query))
72 .send()
73 .await
74 .map_err(Error::Reqwest)?;
75 if !result.status().is_success() {
76 return Err(Error::Failed(result.status()));
77 }
78 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
79 let result = Seed::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
80
81 match query {
83 IndexQuery::Latest => {}
84 IndexQuery::Index(index) => {
85 if result.view != index {
86 return Err(Error::InvalidData);
87 }
88 }
89 }
90 Ok(result)
91 }
92
93 pub async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Error> {
94 let result = self
95 .client
96 .post(notarization_upload_path(self.uri.clone()))
97 .body(notarized)
98 .send()
99 .await
100 .map_err(Error::Reqwest)?;
101 if !result.status().is_success() {
102 return Err(Error::Failed(result.status()));
103 }
104 Ok(())
105 }
106
107 pub async fn notarization_get(&self, query: IndexQuery) -> Result<Notarized, Error> {
108 let result = self
110 .client
111 .get(notarization_get_path(self.uri.clone(), &query))
112 .send()
113 .await
114 .map_err(Error::Reqwest)?;
115 if !result.status().is_success() {
116 return Err(Error::Failed(result.status()));
117 }
118 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
119 let result =
120 Notarized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
121
122 match query {
124 IndexQuery::Latest => {}
125 IndexQuery::Index(index) => {
126 if result.proof.view != index {
127 return Err(Error::InvalidData);
128 }
129 }
130 }
131 Ok(result)
132 }
133
134 pub async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Error> {
135 let result = self
136 .client
137 .post(finalization_upload_path(self.uri.clone()))
138 .body(finalized)
139 .send()
140 .await
141 .map_err(Error::Reqwest)?;
142 if !result.status().is_success() {
143 return Err(Error::Failed(result.status()));
144 }
145 Ok(())
146 }
147
148 pub async fn finalization_get(&self, query: IndexQuery) -> Result<Finalized, Error> {
149 let result = self
151 .client
152 .get(finalization_get_path(self.uri.clone(), &query))
153 .send()
154 .await
155 .map_err(Error::Reqwest)?;
156 if !result.status().is_success() {
157 return Err(Error::Failed(result.status()));
158 }
159 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
160 let result =
161 Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
162
163 match query {
165 IndexQuery::Latest => {}
166 IndexQuery::Index(index) => {
167 if result.proof.view != index {
168 return Err(Error::InvalidData);
169 }
170 }
171 }
172 Ok(result)
173 }
174
175 pub async fn block_get(&self, query: Query) -> Result<Payload, Error> {
176 let result = self
178 .client
179 .get(block_get_path(self.uri.clone(), &query))
180 .send()
181 .await
182 .map_err(Error::Reqwest)?;
183 if !result.status().is_success() {
184 return Err(Error::Failed(result.status()));
185 }
186 let bytes = result.bytes().await.map_err(Error::Reqwest)?;
187
188 let result = match query {
190 Query::Latest => {
191 let result =
192 Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
193 Payload::Finalized(Box::new(result))
194 }
195 Query::Index(index) => {
196 let result =
197 Finalized::deserialize(Some(&self.public), &bytes).ok_or(Error::InvalidData)?;
198 if result.block.height != index {
199 return Err(Error::InvalidData);
200 }
201 Payload::Finalized(Box::new(result))
202 }
203 Query::Digest(digest) => {
204 let result = Block::deserialize(&bytes).ok_or(Error::InvalidData)?;
205 if result.digest() != digest {
206 return Err(Error::InvalidData);
207 }
208 Payload::Block(result)
209 }
210 };
211 Ok(result)
212 }
213
214 pub async fn listen(&self) -> Result<impl Stream<Item = Result<Message, Error>>, Error> {
215 let (stream, _) = connect_async(listen_path(self.ws_uri.clone()))
217 .await
218 .map_err(Error::from)?;
219 let (_, read) = stream.split();
220
221 let public = self.public.clone();
223 let (sender, receiver) = unbounded();
224 tokio::spawn(async move {
225 read.for_each(|message| async {
226 match message {
227 Ok(TMessage::Binary(data)) => {
228 let kind = data[0];
230 let Some(kind) = Kind::from_u8(kind) else {
231 let _ = sender.unbounded_send(Err(Error::InvalidData));
232 return;
233 };
234 let data = &data[1..];
235
236 match kind {
238 Kind::Seed => {
239 if let Some(seed) = Seed::deserialize(Some(&public), data) {
240 let _ = sender.unbounded_send(Ok(Message::Seed(seed)));
241 } else {
242 let _ = sender.unbounded_send(Err(Error::InvalidData));
243 }
244 }
245 Kind::Notarization => {
246 if let Some(payload) = Notarized::deserialize(Some(&public), data) {
247 let _ =
248 sender.unbounded_send(Ok(Message::Notarization(payload)));
249 } else {
250 let _ = sender.unbounded_send(Err(Error::InvalidData));
251 }
252 }
253 Kind::Nullification => {} Kind::Finalization => {
255 if let Some(payload) = Finalized::deserialize(Some(&public), data) {
256 let _ =
257 sender.unbounded_send(Ok(Message::Finalization(payload)));
258 } else {
259 let _ = sender.unbounded_send(Err(Error::InvalidData));
260 }
261 }
262 }
263 }
264 Ok(_) => {} Err(e) => {
266 let _ = sender.unbounded_send(Err(Error::from(e)));
267 }
268 }
269 })
270 .await;
271 });
272 Ok(receiver)
273 }
274}