acuity_index_substrate_api/
lib.rs

1//! A library for querying indexes built with Acuity Index Substrate.
2
3#![feature(let_chains)]
4pub use acuity_index_substrate::shared::{
5    Bytes32, Event, EventMeta, PalletMeta, Span, SubstrateKey,
6};
7use futures_util::{SinkExt, StreamExt};
8use serde::{Deserialize, Serialize};
9use thiserror::Error;
10use tokio::net::TcpStream;
11use tokio_tungstenite::{
12    connect_async, tungstenite, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
13};
14
15#[cfg(test)]
16use tokio::net::TcpListener;
17
18/// Errors this crate can return
19#[derive(Error, Debug)]
20pub enum IndexError {
21    #[error("connection error")]
22    Websocket(#[from] tungstenite::Error),
23    #[error("decoding error")]
24    SerdeJson(#[from] serde_json::Error),
25    #[error("no message")]
26    NoMessage,
27}
28
29/// Indexer state and methods
30pub struct Index {
31    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
32}
33
34impl Index {
35    /// Send a RequestMessage and wait for a ResponseMessage
36    async fn send_recv(&mut self, send_msg: RequestMessage) -> Result<ResponseMessage, IndexError> {
37        self.ws_stream
38            .send(Message::Text(serde_json::to_string(&send_msg)?))
39            .await?;
40        let msg = self.ws_stream.next().await.ok_or(IndexError::NoMessage)??;
41        Ok(serde_json::from_str(msg.to_text()?)?)
42    }
43
44    /// Connect to a Acuity Substrate Index node via WebSocket
45    pub async fn connect(url: String) -> Result<Self, IndexError> {
46        let (ws_stream, _) = connect_async(url).await?;
47        let index = Index { ws_stream };
48        Ok(index)
49    }
50
51    /// Request status.
52    pub async fn status(&mut self) -> Result<Vec<Span>, IndexError> {
53        match self.send_recv(RequestMessage::Status).await? {
54            ResponseMessage::Status(spans) => Ok(spans),
55            _ => Err(IndexError::NoMessage),
56        }
57    }
58
59    /// Subscribe to a stream of status updates.
60    pub async fn subscribe_status(
61        &mut self,
62    ) -> Result<impl futures_util::Stream<Item = Result<Vec<Span>, IndexError>> + '_, IndexError>
63    {
64        if self.send_recv(RequestMessage::SubscribeStatus).await? != ResponseMessage::Subscribed {
65            return Err(IndexError::NoMessage);
66        };
67
68        // Return a stream that emits each ResponseMessage as it is received.
69        Ok(self.ws_stream.by_ref().map(|msg| {
70            let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
71
72            match response {
73                ResponseMessage::Status(spans) => Ok(spans),
74                _ => Err(IndexError::NoMessage),
75            }
76        }))
77    }
78
79    /// Unsubscribe to a stream of status updates.
80    pub async fn unsubscribe_status(&mut self) -> Result<(), IndexError> {
81        match self.send_recv(RequestMessage::UnsubscribeStatus).await? {
82            ResponseMessage::Unsubscribed => Ok(()),
83            _ => Err(IndexError::NoMessage),
84        }
85    }
86
87    /// Request size on disk.
88    pub async fn size_on_disk(&mut self) -> Result<u64, IndexError> {
89        match self.send_recv(RequestMessage::SizeOnDisk).await? {
90            ResponseMessage::SizeOnDisk(size) => Ok(size),
91            _ => Err(IndexError::NoMessage),
92        }
93    }
94
95    /// Request a list of all event variants being indexed.
96    pub async fn get_variants(&mut self) -> Result<Vec<PalletMeta>, IndexError> {
97        match self.send_recv(RequestMessage::Variants).await? {
98            ResponseMessage::Variants(pallet_meta) => Ok(pallet_meta),
99            _ => Err(IndexError::NoMessage),
100        }
101    }
102
103    /// Get events that have emitted a specific key.
104    pub async fn get_events(&mut self, key: Key) -> Result<Vec<Event>, IndexError> {
105        match self.send_recv(RequestMessage::GetEvents { key }).await? {
106            ResponseMessage::Events { events, .. } => Ok(events),
107            _ => Err(IndexError::NoMessage),
108        }
109    }
110
111    /// Subscribe to events that have emitted a specific key.
112    pub async fn subscribe_events(
113        &mut self,
114        key: Key,
115    ) -> Result<impl futures_util::Stream<Item = Result<Vec<Event>, IndexError>> + '_, IndexError>
116    {
117        if self
118            .send_recv(RequestMessage::SubscribeEvents { key: key.clone() })
119            .await?
120            != ResponseMessage::Subscribed
121        {
122            return Err(IndexError::NoMessage);
123        };
124
125        // Return a stream that emits each ResponseMessage as it is received.
126        Ok(self.ws_stream.by_ref().map(move |msg| {
127            let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
128
129            match response {
130                ResponseMessage::Events {
131                    key: response_key,
132                    events,
133                } => Ok(if response_key != key { vec![] } else { events }),
134                _ => Err(IndexError::NoMessage),
135            }
136        }))
137    }
138
139    /// Unsubscribe to an event subscription.
140    pub async fn unsubscribe_events(&mut self, key: Key) -> Result<(), IndexError> {
141        match self
142            .send_recv(RequestMessage::UnsubscribeEvents { key })
143            .await?
144        {
145            ResponseMessage::Unsubscribed => Ok(()),
146            _ => Err(IndexError::NoMessage),
147        }
148    }
149}
150
151/// Top-level key types that can be queried for
152#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
153#[serde(tag = "type", content = "value")]
154pub enum Key {
155    Variant(u8, u8),
156    Substrate(SubstrateKey),
157}
158
159/// JSON request messages
160#[derive(Serialize, Deserialize, Debug, Clone)]
161#[serde(tag = "type")]
162pub enum RequestMessage {
163    Status,
164    SubscribeStatus,
165    UnsubscribeStatus,
166    Variants,
167    GetEvents { key: Key },
168    SubscribeEvents { key: Key },
169    UnsubscribeEvents { key: Key },
170    SizeOnDisk,
171}
172
173/// JSON response messages
174#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
175#[serde(tag = "type", content = "data")]
176#[serde(rename_all = "camelCase")]
177pub enum ResponseMessage {
178    Status(Vec<Span>),
179    Variants(Vec<PalletMeta>),
180    Events { key: Key, events: Vec<Event> },
181    Subscribed,
182    Unsubscribed,
183    SizeOnDisk(u64),
184    Error,
185}
186
187#[cfg(test)]
188impl Index {
189    pub async fn test_connect() -> Result<Self, IndexError> {
190        let try_socket = TcpListener::bind("127.0.0.1:0").await;
191        let listener = try_socket.expect("Failed to bind");
192
193        let addr = listener.local_addr().unwrap().to_string();
194        let mut url = "ws://".to_string();
195        url.push_str(&addr);
196
197        tokio::spawn(handle_connection(listener));
198
199        Index::connect(url).await
200    }
201}
202
203#[cfg(test)]
204async fn handle_connection(listener: TcpListener) {
205    let (raw_stream, addr) = listener.accept().await.unwrap();
206    println!("Incoming TCP connection from: {}", addr);
207
208    let ws_stream = tokio_tungstenite::accept_async(raw_stream)
209        .await
210        .expect("Error during the websocket handshake occurred");
211    println!("WebSocket connection established: {}", addr);
212
213    let (mut ws_sender, mut ws_receiver) = ws_stream.split();
214    let msg = ws_receiver.next().await.unwrap().unwrap();
215    let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
216
217    let response_msg = match request_msg {
218        RequestMessage::Status => ResponseMessage::Status(vec![
219            Span { start: 2, end: 4 },
220            Span { start: 9, end: 23 },
221            Span {
222                start: 20002,
223                end: 400000,
224            },
225        ]),
226        RequestMessage::SubscribeStatus => {
227            let response_msg = ResponseMessage::Subscribed;
228            let response_json = serde_json::to_string(&response_msg).unwrap();
229            ws_sender
230                .send(tungstenite::Message::Text(response_json))
231                .await
232                .unwrap();
233
234            let response_msg = ResponseMessage::Status(vec![
235                Span { start: 2, end: 4 },
236                Span { start: 9, end: 23 },
237                Span {
238                    start: 20002,
239                    end: 400000,
240                },
241            ]);
242
243            let response_json = serde_json::to_string(&response_msg).unwrap();
244            ws_sender
245                .send(tungstenite::Message::Text(response_json))
246                .await
247                .unwrap();
248
249            let response_msg = ResponseMessage::Status(vec![
250                Span { start: 2, end: 4 },
251                Span { start: 9, end: 23 },
252                Span {
253                    start: 20002,
254                    end: 400008,
255                },
256            ]);
257
258            let response_json = serde_json::to_string(&response_msg).unwrap();
259            ws_sender
260                .send(tungstenite::Message::Text(response_json))
261                .await
262                .unwrap();
263
264            let response_msg = ResponseMessage::Status(vec![
265                Span { start: 2, end: 4 },
266                Span { start: 9, end: 23 },
267                Span {
268                    start: 20002,
269                    end: 400028,
270                },
271            ]);
272
273            let response_json = serde_json::to_string(&response_msg).unwrap();
274            ws_sender
275                .send(tungstenite::Message::Text(response_json))
276                .await
277                .unwrap();
278            let msg = ws_receiver.next().await.unwrap().unwrap();
279            let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
280            match request_msg {
281                RequestMessage::UnsubscribeStatus => ResponseMessage::Unsubscribed,
282                _ => ResponseMessage::Error,
283            }
284        }
285        RequestMessage::Variants => ResponseMessage::Variants(vec![PalletMeta {
286            index: 0,
287            name: "test1".to_string(),
288            events: vec![EventMeta {
289                index: 0,
290                name: "event1".to_string(),
291            }],
292        }]),
293        RequestMessage::GetEvents { .. } => ResponseMessage::Events {
294            key: Key::Variant(0, 0),
295            events: vec![
296                Event {
297                    block_number: 82,
298                    event_index: 16,
299                },
300                Event {
301                    block_number: 86,
302                    event_index: 17,
303                },
304            ],
305        },
306        RequestMessage::SubscribeEvents { .. } => {
307            let response_msg = ResponseMessage::Subscribed;
308            let response_json = serde_json::to_string(&response_msg).unwrap();
309            ws_sender
310                .send(tungstenite::Message::Text(response_json))
311                .await
312                .unwrap();
313
314            let response_msg = ResponseMessage::Events {
315                key: Key::Variant(0, 0),
316                events: vec![
317                    Event {
318                        block_number: 82,
319                        event_index: 16,
320                    },
321                    Event {
322                        block_number: 86,
323                        event_index: 17,
324                    },
325                ],
326            };
327
328            let response_json = serde_json::to_string(&response_msg).unwrap();
329            ws_sender
330                .send(tungstenite::Message::Text(response_json))
331                .await
332                .unwrap();
333            let response_msg = ResponseMessage::Events {
334                key: Key::Variant(0, 1),
335                events: vec![Event {
336                    block_number: 102,
337                    event_index: 12,
338                }],
339            };
340
341            let response_json = serde_json::to_string(&response_msg).unwrap();
342            ws_sender
343                .send(tungstenite::Message::Text(response_json))
344                .await
345                .unwrap();
346
347            let response_msg = ResponseMessage::Events {
348                key: Key::Variant(0, 0),
349                events: vec![Event {
350                    block_number: 102,
351                    event_index: 12,
352                }],
353            };
354
355            let response_json = serde_json::to_string(&response_msg).unwrap();
356            ws_sender
357                .send(tungstenite::Message::Text(response_json))
358                .await
359                .unwrap();
360
361            let response_msg = ResponseMessage::Events {
362                key: Key::Variant(0, 0),
363                events: vec![Event {
364                    block_number: 108,
365                    event_index: 0,
366                }],
367            };
368
369            let response_json = serde_json::to_string(&response_msg).unwrap();
370            ws_sender
371                .send(tungstenite::Message::Text(response_json))
372                .await
373                .unwrap();
374            let msg = ws_receiver.next().await.unwrap().unwrap();
375            let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
376            match request_msg {
377                RequestMessage::UnsubscribeEvents { .. } => ResponseMessage::Unsubscribed,
378                _ => ResponseMessage::Error,
379            }
380        }
381        RequestMessage::SizeOnDisk => ResponseMessage::SizeOnDisk(640),
382        _ => ResponseMessage::Error,
383    };
384    let response_json = serde_json::to_string(&response_msg).unwrap();
385    ws_sender
386        .send(tungstenite::Message::Text(response_json))
387        .await
388        .unwrap();
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    #[tokio::test]
396    async fn test_status() {
397        let mut index = Index::test_connect().await.unwrap();
398        let status = index.status().await.unwrap();
399
400        assert_eq!(
401            status,
402            vec![
403                Span { start: 2, end: 4 },
404                Span { start: 9, end: 23 },
405                Span {
406                    start: 20002,
407                    end: 400000,
408                },
409            ]
410        );
411    }
412
413    #[tokio::test]
414    async fn test_subscribe_status() {
415        let mut index = Index::test_connect().await.unwrap();
416        let mut stream = index.subscribe_status().await.unwrap();
417        let status = stream.next().await.unwrap().unwrap();
418
419        assert_eq!(
420            status,
421            vec![
422                Span { start: 2, end: 4 },
423                Span { start: 9, end: 23 },
424                Span {
425                    start: 20002,
426                    end: 400000,
427                },
428            ]
429        );
430
431        let status = stream.next().await.unwrap().unwrap();
432
433        assert_eq!(
434            status,
435            vec![
436                Span { start: 2, end: 4 },
437                Span { start: 9, end: 23 },
438                Span {
439                    start: 20002,
440                    end: 400008,
441                },
442            ]
443        );
444        let status = stream.next().await.unwrap().unwrap();
445
446        assert_eq!(
447            status,
448            vec![
449                Span { start: 2, end: 4 },
450                Span { start: 9, end: 23 },
451                Span {
452                    start: 20002,
453                    end: 400028,
454                },
455            ]
456        );
457        drop(stream);
458        index.unsubscribe_status().await.unwrap();
459    }
460
461    #[tokio::test]
462    async fn test_variants() {
463        let mut index = Index::test_connect().await.unwrap();
464        let variants = index.get_variants().await.unwrap();
465
466        assert_eq!(
467            variants,
468            vec![PalletMeta {
469                index: 0,
470                name: "test1".to_string(),
471                events: vec![EventMeta {
472                    index: 0,
473                    name: "event1".to_string()
474                }]
475            },]
476        );
477    }
478
479    #[tokio::test]
480    async fn test_get_events() {
481        let mut index = Index::test_connect().await.unwrap();
482        let events = index.get_events(Key::Variant(0, 0)).await.unwrap();
483
484        assert_eq!(
485            events,
486            vec![
487                Event {
488                    block_number: 82,
489                    event_index: 16,
490                },
491                Event {
492                    block_number: 86,
493                    event_index: 17,
494                },
495            ]
496        );
497    }
498
499    #[tokio::test]
500    async fn test_subscribe_events() {
501        let mut index = Index::test_connect().await.unwrap();
502        let mut stream = index.subscribe_events(Key::Variant(0, 0)).await.unwrap();
503        let events = stream.next().await.unwrap().unwrap();
504
505        assert_eq!(
506            events,
507            vec![
508                Event {
509                    block_number: 82,
510                    event_index: 16,
511                },
512                Event {
513                    block_number: 86,
514                    event_index: 17,
515                },
516            ]
517        );
518
519        let events = stream.next().await.unwrap().unwrap();
520
521        assert_eq!(events, vec![]);
522        let events = stream.next().await.unwrap().unwrap();
523
524        assert_eq!(
525            events,
526            vec![Event {
527                block_number: 102,
528                event_index: 12,
529            }]
530        );
531        let events = stream.next().await.unwrap().unwrap();
532
533        assert_eq!(
534            events,
535            vec![Event {
536                block_number: 108,
537                event_index: 0,
538            }]
539        );
540        drop(stream);
541        index.unsubscribe_events(Key::Variant(0, 0)).await.unwrap();
542    }
543
544    #[tokio::test]
545    async fn test_size_on_disk() {
546        let mut index = Index::test_connect().await.unwrap();
547        let size = index.size_on_disk().await.unwrap();
548
549        assert_eq!(size, 640);
550    }
551}