1#![feature(let_chains)]
4pub use acuity_index_substrate::shared::{
5 Bytes32, Event, EventMeta, PalletMeta, Span, SubstrateKey,
6};
7use futures_util::{SinkExt, StreamExt};
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10use tokio::net::TcpStream;
11use tokio_tungstenite::{
12 connect_async, tungstenite, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
13};
14
15#[cfg(test)]
16use tokio::net::TcpListener;
17
18#[derive(Error, Debug)]
20pub enum IndexError {
21 #[error("connection error")]
22 Websocket(#[from] tungstenite::Error),
23 #[error("decoding error")]
24 SerdeJson(#[from] serde_json::Error),
25 #[error("no message")]
26 NoMessage,
27}
28
29pub struct Index {
31 ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
32}
33
34impl Index {
35 async fn send_recv(&mut self, send_msg: RequestMessage) -> Result<ResponseMessage, IndexError> {
37 self.ws_stream
38 .send(Message::Text(serde_json::to_string(&send_msg)?))
39 .await?;
40 let msg = self.ws_stream.next().await.ok_or(IndexError::NoMessage)??;
41 Ok(serde_json::from_str(msg.to_text()?)?)
42 }
43
44 pub async fn connect(url: String) -> Result<Self, IndexError> {
46 let (ws_stream, _) = connect_async(url).await?;
47 let index = Index { ws_stream };
48 Ok(index)
49 }
50
51 pub async fn status(&mut self) -> Result<Vec<Span>, IndexError> {
53 match self.send_recv(RequestMessage::Status).await? {
54 ResponseMessage::Status(spans) => Ok(spans),
55 _ => Err(IndexError::NoMessage),
56 }
57 }
58
59 pub async fn subscribe_status(
61 &mut self,
62 ) -> Result<impl futures_util::Stream<Item = Result<Vec<Span>, IndexError>> + '_, IndexError>
63 {
64 if self.send_recv(RequestMessage::SubscribeStatus).await? != ResponseMessage::Subscribed {
65 return Err(IndexError::NoMessage);
66 };
67
68 Ok(self.ws_stream.by_ref().map(|msg| {
70 let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
71
72 match response {
73 ResponseMessage::Status(spans) => Ok(spans),
74 _ => Err(IndexError::NoMessage),
75 }
76 }))
77 }
78
79 pub async fn unsubscribe_status(&mut self) -> Result<(), IndexError> {
81 match self.send_recv(RequestMessage::UnsubscribeStatus).await? {
82 ResponseMessage::Unsubscribed => Ok(()),
83 _ => Err(IndexError::NoMessage),
84 }
85 }
86
87 pub async fn size_on_disk(&mut self) -> Result<u64, IndexError> {
89 match self.send_recv(RequestMessage::SizeOnDisk).await? {
90 ResponseMessage::SizeOnDisk(size) => Ok(size),
91 _ => Err(IndexError::NoMessage),
92 }
93 }
94
95 pub async fn get_variants(&mut self) -> Result<Vec<PalletMeta>, IndexError> {
97 match self.send_recv(RequestMessage::Variants).await? {
98 ResponseMessage::Variants(pallet_meta) => Ok(pallet_meta),
99 _ => Err(IndexError::NoMessage),
100 }
101 }
102
103 pub async fn get_events(&mut self, key: Key) -> Result<Vec<Event>, IndexError> {
105 match self.send_recv(RequestMessage::GetEvents { key }).await? {
106 ResponseMessage::Events { events, .. } => Ok(events),
107 _ => Err(IndexError::NoMessage),
108 }
109 }
110
111 pub async fn subscribe_events(
113 &mut self,
114 key: Key,
115 ) -> Result<impl futures_util::Stream<Item = Result<Vec<Event>, IndexError>> + '_, IndexError>
116 {
117 if self
118 .send_recv(RequestMessage::SubscribeEvents { key: key.clone() })
119 .await?
120 != ResponseMessage::Subscribed
121 {
122 return Err(IndexError::NoMessage);
123 };
124
125 Ok(self.ws_stream.by_ref().map(move |msg| {
127 let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
128
129 match response {
130 ResponseMessage::Events {
131 key: response_key,
132 events,
133 } => Ok(if response_key != key { vec![] } else { events }),
134 _ => Err(IndexError::NoMessage),
135 }
136 }))
137 }
138
139 pub async fn unsubscribe_events(&mut self, key: Key) -> Result<(), IndexError> {
141 match self
142 .send_recv(RequestMessage::UnsubscribeEvents { key })
143 .await?
144 {
145 ResponseMessage::Unsubscribed => Ok(()),
146 _ => Err(IndexError::NoMessage),
147 }
148 }
149}
150
151#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
153#[serde(tag = "type", content = "value")]
154pub enum Key {
155 Variant(u8, u8),
156 Substrate(SubstrateKey),
157}
158
159#[derive(Serialize, Deserialize, Debug, Clone)]
161#[serde(tag = "type")]
162pub enum RequestMessage {
163 Status,
164 SubscribeStatus,
165 UnsubscribeStatus,
166 Variants,
167 GetEvents { key: Key },
168 SubscribeEvents { key: Key },
169 UnsubscribeEvents { key: Key },
170 SizeOnDisk,
171}
172
173#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
175#[serde(tag = "type", content = "data")]
176#[serde(rename_all = "camelCase")]
177pub enum ResponseMessage {
178 Status(Vec<Span>),
179 Variants(Vec<PalletMeta>),
180 Events { key: Key, events: Vec<Event> },
181 Subscribed,
182 Unsubscribed,
183 SizeOnDisk(u64),
184 Error,
185}
186
187#[cfg(test)]
188impl Index {
189 pub async fn test_connect() -> Result<Self, IndexError> {
190 let try_socket = TcpListener::bind("127.0.0.1:0").await;
191 let listener = try_socket.expect("Failed to bind");
192
193 let addr = listener.local_addr().unwrap().to_string();
194 let mut url = "ws://".to_string();
195 url.push_str(&addr);
196
197 tokio::spawn(handle_connection(listener));
198
199 Index::connect(url).await
200 }
201}
202
203#[cfg(test)]
204async fn handle_connection(listener: TcpListener) {
205 let (raw_stream, addr) = listener.accept().await.unwrap();
206 println!("Incoming TCP connection from: {}", addr);
207
208 let ws_stream = tokio_tungstenite::accept_async(raw_stream)
209 .await
210 .expect("Error during the websocket handshake occurred");
211 println!("WebSocket connection established: {}", addr);
212
213 let (mut ws_sender, mut ws_receiver) = ws_stream.split();
214 let msg = ws_receiver.next().await.unwrap().unwrap();
215 let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
216
217 let response_msg = match request_msg {
218 RequestMessage::Status => ResponseMessage::Status(vec![
219 Span { start: 2, end: 4 },
220 Span { start: 9, end: 23 },
221 Span {
222 start: 20002,
223 end: 400000,
224 },
225 ]),
226 RequestMessage::SubscribeStatus => {
227 let response_msg = ResponseMessage::Subscribed;
228 let response_json = serde_json::to_string(&response_msg).unwrap();
229 ws_sender
230 .send(tungstenite::Message::Text(response_json))
231 .await
232 .unwrap();
233
234 let response_msg = ResponseMessage::Status(vec![
235 Span { start: 2, end: 4 },
236 Span { start: 9, end: 23 },
237 Span {
238 start: 20002,
239 end: 400000,
240 },
241 ]);
242
243 let response_json = serde_json::to_string(&response_msg).unwrap();
244 ws_sender
245 .send(tungstenite::Message::Text(response_json))
246 .await
247 .unwrap();
248
249 let response_msg = ResponseMessage::Status(vec![
250 Span { start: 2, end: 4 },
251 Span { start: 9, end: 23 },
252 Span {
253 start: 20002,
254 end: 400008,
255 },
256 ]);
257
258 let response_json = serde_json::to_string(&response_msg).unwrap();
259 ws_sender
260 .send(tungstenite::Message::Text(response_json))
261 .await
262 .unwrap();
263
264 let response_msg = ResponseMessage::Status(vec![
265 Span { start: 2, end: 4 },
266 Span { start: 9, end: 23 },
267 Span {
268 start: 20002,
269 end: 400028,
270 },
271 ]);
272
273 let response_json = serde_json::to_string(&response_msg).unwrap();
274 ws_sender
275 .send(tungstenite::Message::Text(response_json))
276 .await
277 .unwrap();
278 let msg = ws_receiver.next().await.unwrap().unwrap();
279 let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
280 match request_msg {
281 RequestMessage::UnsubscribeStatus => ResponseMessage::Unsubscribed,
282 _ => ResponseMessage::Error,
283 }
284 }
285 RequestMessage::Variants => ResponseMessage::Variants(vec![PalletMeta {
286 index: 0,
287 name: "test1".to_string(),
288 events: vec![EventMeta {
289 index: 0,
290 name: "event1".to_string(),
291 }],
292 }]),
293 RequestMessage::GetEvents { .. } => ResponseMessage::Events {
294 key: Key::Variant(0, 0),
295 events: vec![
296 Event {
297 block_number: 82,
298 event_index: 16,
299 },
300 Event {
301 block_number: 86,
302 event_index: 17,
303 },
304 ],
305 },
306 RequestMessage::SubscribeEvents { .. } => {
307 let response_msg = ResponseMessage::Subscribed;
308 let response_json = serde_json::to_string(&response_msg).unwrap();
309 ws_sender
310 .send(tungstenite::Message::Text(response_json))
311 .await
312 .unwrap();
313
314 let response_msg = ResponseMessage::Events {
315 key: Key::Variant(0, 0),
316 events: vec![
317 Event {
318 block_number: 82,
319 event_index: 16,
320 },
321 Event {
322 block_number: 86,
323 event_index: 17,
324 },
325 ],
326 };
327
328 let response_json = serde_json::to_string(&response_msg).unwrap();
329 ws_sender
330 .send(tungstenite::Message::Text(response_json))
331 .await
332 .unwrap();
333 let response_msg = ResponseMessage::Events {
334 key: Key::Variant(0, 1),
335 events: vec![Event {
336 block_number: 102,
337 event_index: 12,
338 }],
339 };
340
341 let response_json = serde_json::to_string(&response_msg).unwrap();
342 ws_sender
343 .send(tungstenite::Message::Text(response_json))
344 .await
345 .unwrap();
346
347 let response_msg = ResponseMessage::Events {
348 key: Key::Variant(0, 0),
349 events: vec![Event {
350 block_number: 102,
351 event_index: 12,
352 }],
353 };
354
355 let response_json = serde_json::to_string(&response_msg).unwrap();
356 ws_sender
357 .send(tungstenite::Message::Text(response_json))
358 .await
359 .unwrap();
360
361 let response_msg = ResponseMessage::Events {
362 key: Key::Variant(0, 0),
363 events: vec![Event {
364 block_number: 108,
365 event_index: 0,
366 }],
367 };
368
369 let response_json = serde_json::to_string(&response_msg).unwrap();
370 ws_sender
371 .send(tungstenite::Message::Text(response_json))
372 .await
373 .unwrap();
374 let msg = ws_receiver.next().await.unwrap().unwrap();
375 let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
376 match request_msg {
377 RequestMessage::UnsubscribeEvents { .. } => ResponseMessage::Unsubscribed,
378 _ => ResponseMessage::Error,
379 }
380 }
381 RequestMessage::SizeOnDisk => ResponseMessage::SizeOnDisk(640),
382 _ => ResponseMessage::Error,
383 };
384 let response_json = serde_json::to_string(&response_msg).unwrap();
385 ws_sender
386 .send(tungstenite::Message::Text(response_json))
387 .await
388 .unwrap();
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[tokio::test]
396 async fn test_status() {
397 let mut index = Index::test_connect().await.unwrap();
398 let status = index.status().await.unwrap();
399
400 assert_eq!(
401 status,
402 vec![
403 Span { start: 2, end: 4 },
404 Span { start: 9, end: 23 },
405 Span {
406 start: 20002,
407 end: 400000,
408 },
409 ]
410 );
411 }
412
413 #[tokio::test]
414 async fn test_subscribe_status() {
415 let mut index = Index::test_connect().await.unwrap();
416 let mut stream = index.subscribe_status().await.unwrap();
417 let status = stream.next().await.unwrap().unwrap();
418
419 assert_eq!(
420 status,
421 vec![
422 Span { start: 2, end: 4 },
423 Span { start: 9, end: 23 },
424 Span {
425 start: 20002,
426 end: 400000,
427 },
428 ]
429 );
430
431 let status = stream.next().await.unwrap().unwrap();
432
433 assert_eq!(
434 status,
435 vec![
436 Span { start: 2, end: 4 },
437 Span { start: 9, end: 23 },
438 Span {
439 start: 20002,
440 end: 400008,
441 },
442 ]
443 );
444 let status = stream.next().await.unwrap().unwrap();
445
446 assert_eq!(
447 status,
448 vec![
449 Span { start: 2, end: 4 },
450 Span { start: 9, end: 23 },
451 Span {
452 start: 20002,
453 end: 400028,
454 },
455 ]
456 );
457 drop(stream);
458 index.unsubscribe_status().await.unwrap();
459 }
460
461 #[tokio::test]
462 async fn test_variants() {
463 let mut index = Index::test_connect().await.unwrap();
464 let variants = index.get_variants().await.unwrap();
465
466 assert_eq!(
467 variants,
468 vec![PalletMeta {
469 index: 0,
470 name: "test1".to_string(),
471 events: vec![EventMeta {
472 index: 0,
473 name: "event1".to_string()
474 }]
475 },]
476 );
477 }
478
479 #[tokio::test]
480 async fn test_get_events() {
481 let mut index = Index::test_connect().await.unwrap();
482 let events = index.get_events(Key::Variant(0, 0)).await.unwrap();
483
484 assert_eq!(
485 events,
486 vec![
487 Event {
488 block_number: 82,
489 event_index: 16,
490 },
491 Event {
492 block_number: 86,
493 event_index: 17,
494 },
495 ]
496 );
497 }
498
499 #[tokio::test]
500 async fn test_subscribe_events() {
501 let mut index = Index::test_connect().await.unwrap();
502 let mut stream = index.subscribe_events(Key::Variant(0, 0)).await.unwrap();
503 let events = stream.next().await.unwrap().unwrap();
504
505 assert_eq!(
506 events,
507 vec![
508 Event {
509 block_number: 82,
510 event_index: 16,
511 },
512 Event {
513 block_number: 86,
514 event_index: 17,
515 },
516 ]
517 );
518
519 let events = stream.next().await.unwrap().unwrap();
520
521 assert_eq!(events, vec![]);
522 let events = stream.next().await.unwrap().unwrap();
523
524 assert_eq!(
525 events,
526 vec![Event {
527 block_number: 102,
528 event_index: 12,
529 }]
530 );
531 let events = stream.next().await.unwrap().unwrap();
532
533 assert_eq!(
534 events,
535 vec![Event {
536 block_number: 108,
537 event_index: 0,
538 }]
539 );
540 drop(stream);
541 index.unsubscribe_events(Key::Variant(0, 0)).await.unwrap();
542 }
543
544 #[tokio::test]
545 async fn test_size_on_disk() {
546 let mut index = Index::test_connect().await.unwrap();
547 let size = index.size_on_disk().await.unwrap();
548
549 assert_eq!(size, 640);
550 }
551}