hybrid_api/
lib.rs

1//! A library for querying Hybrid indexes.
2//!
3//! The [Hybrid Indexer](https://docs.rs/hybrid-indexer/latest/hybrid_indexer/) library can be used to write indexers for Substrate blockchains. Rust programs that need to query Hybrid indexes can use this helper library.
4//!
5//! For an example of how to use this library, consult the [hybrid-cli source code](https://github.com/hybrid-explorer/hybrid-cli/blob/178ff966877c86c855e7d6d6b1a0ffddeea33376/src/main.rs#L161).
6
7#![feature(let_chains)]
8use futures_util::{SinkExt, StreamExt};
9pub use hybrid_indexer::shared::{Bytes32, Event, EventMeta, PalletMeta, Span, SubstrateKey};
10use serde::{Deserialize, Serialize};
11use thiserror::Error;
12use tokio::net::TcpStream;
13use tokio_tungstenite::{
14    connect_async, tungstenite, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
15};
16
17#[cfg(test)]
18use tokio::net::TcpListener;
19
20/// Errors this crate can return
21#[derive(Error, Debug)]
22pub enum IndexError {
23    #[error("connection error")]
24    Websocket(#[from] tungstenite::Error),
25    #[error("decoding error")]
26    SerdeJson(#[from] serde_json::Error),
27    #[error("no message")]
28    NoMessage,
29}
30
31/// Indexer state and methods
32pub struct Index {
33    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
34}
35
36impl Index {
37    /// Send a RequestMessage and wait for a ResponseMessage
38    async fn send_recv(&mut self, send_msg: RequestMessage) -> Result<ResponseMessage, IndexError> {
39        self.ws_stream
40            .send(Message::Text(serde_json::to_string(&send_msg)?))
41            .await?;
42        let msg = self.ws_stream.next().await.ok_or(IndexError::NoMessage)??;
43        Ok(serde_json::from_str(msg.to_text()?)?)
44    }
45
46    /// Connect to a Hybrid indexer via WebSocket
47    pub async fn connect(url: String) -> Result<Self, IndexError> {
48        let (ws_stream, _) = connect_async(url).await?;
49        let index = Index { ws_stream };
50        Ok(index)
51    }
52
53    /// Request status.
54    pub async fn status(&mut self) -> Result<Vec<Span>, IndexError> {
55        match self.send_recv(RequestMessage::Status).await? {
56            ResponseMessage::Status(spans) => Ok(spans),
57            _ => Err(IndexError::NoMessage),
58        }
59    }
60
61    /// Subscribe to a stream of status updates.
62    pub async fn subscribe_status(
63        &mut self,
64    ) -> Result<impl futures_util::Stream<Item = Result<Vec<Span>, IndexError>> + '_, IndexError>
65    {
66        if self.send_recv(RequestMessage::SubscribeStatus).await? != ResponseMessage::Subscribed {
67            return Err(IndexError::NoMessage);
68        };
69
70        // Return a stream that emits each ResponseMessage as it is received.
71        Ok(self.ws_stream.by_ref().map(|msg| {
72            let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
73
74            match response {
75                ResponseMessage::Status(spans) => Ok(spans),
76                _ => Err(IndexError::NoMessage),
77            }
78        }))
79    }
80
81    /// Unsubscribe to a stream of status updates.
82    pub async fn unsubscribe_status(&mut self) -> Result<(), IndexError> {
83        match self.send_recv(RequestMessage::UnsubscribeStatus).await? {
84            ResponseMessage::Unsubscribed => Ok(()),
85            _ => Err(IndexError::NoMessage),
86        }
87    }
88
89    /// Request size on disk.
90    pub async fn size_on_disk(&mut self) -> Result<u64, IndexError> {
91        match self.send_recv(RequestMessage::SizeOnDisk).await? {
92            ResponseMessage::SizeOnDisk(size) => Ok(size),
93            _ => Err(IndexError::NoMessage),
94        }
95    }
96
97    /// Request a list of all event variants being indexed.
98    pub async fn get_variants(&mut self) -> Result<Vec<PalletMeta>, IndexError> {
99        match self.send_recv(RequestMessage::Variants).await? {
100            ResponseMessage::Variants(pallet_meta) => Ok(pallet_meta),
101            _ => Err(IndexError::NoMessage),
102        }
103    }
104
105    /// Get events that have emitted a specific key.
106    pub async fn get_events(&mut self, key: Key) -> Result<Vec<Event>, IndexError> {
107        match self.send_recv(RequestMessage::GetEvents { key }).await? {
108            ResponseMessage::Events { events, .. } => Ok(events),
109            _ => Err(IndexError::NoMessage),
110        }
111    }
112
113    /// Subscribe to events that have emitted a specific key.
114    pub async fn subscribe_events(
115        &mut self,
116        key: Key,
117    ) -> Result<impl futures_util::Stream<Item = Result<Vec<Event>, IndexError>> + '_, IndexError>
118    {
119        if self
120            .send_recv(RequestMessage::SubscribeEvents { key: key.clone() })
121            .await?
122            != ResponseMessage::Subscribed
123        {
124            return Err(IndexError::NoMessage);
125        };
126
127        // Return a stream that emits each ResponseMessage as it is received.
128        Ok(self.ws_stream.by_ref().map(move |msg| {
129            let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
130
131            match response {
132                ResponseMessage::Events {
133                    key: response_key,
134                    events,
135                } => Ok(if response_key != key { vec![] } else { events }),
136                _ => Err(IndexError::NoMessage),
137            }
138        }))
139    }
140
141    /// Unsubscribe to an event subscription.
142    pub async fn unsubscribe_events(&mut self, key: Key) -> Result<(), IndexError> {
143        match self
144            .send_recv(RequestMessage::UnsubscribeEvents { key })
145            .await?
146        {
147            ResponseMessage::Unsubscribed => Ok(()),
148            _ => Err(IndexError::NoMessage),
149        }
150    }
151}
152
153/// Top-level key types that can be queried for
154#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
155#[serde(tag = "type", content = "value")]
156pub enum Key {
157    Variant(u8, u8),
158    Substrate(SubstrateKey),
159}
160
161/// JSON request messages
162#[derive(Serialize, Deserialize, Debug, Clone)]
163#[serde(tag = "type")]
164pub enum RequestMessage {
165    Status,
166    SubscribeStatus,
167    UnsubscribeStatus,
168    Variants,
169    GetEvents { key: Key },
170    SubscribeEvents { key: Key },
171    UnsubscribeEvents { key: Key },
172    SizeOnDisk,
173}
174
175/// JSON response messages
176#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
177#[serde(tag = "type", content = "data")]
178#[serde(rename_all = "camelCase")]
179pub enum ResponseMessage {
180    Status(Vec<Span>),
181    Variants(Vec<PalletMeta>),
182    Events { key: Key, events: Vec<Event> },
183    Subscribed,
184    Unsubscribed,
185    SizeOnDisk(u64),
186    Error,
187}
188
189#[cfg(test)]
190impl Index {
191    pub async fn test_connect() -> Result<Self, IndexError> {
192        let try_socket = TcpListener::bind("127.0.0.1:0").await;
193        let listener = try_socket.expect("Failed to bind");
194
195        let addr = listener.local_addr().unwrap().to_string();
196        let mut url = "ws://".to_string();
197        url.push_str(&addr);
198
199        tokio::spawn(handle_connection(listener));
200
201        Index::connect(url).await
202    }
203}
204
205#[cfg(test)]
206async fn handle_connection(listener: TcpListener) {
207    let (raw_stream, addr) = listener.accept().await.unwrap();
208    println!("Incoming TCP connection from: {}", addr);
209
210    let ws_stream = tokio_tungstenite::accept_async(raw_stream)
211        .await
212        .expect("Error during the websocket handshake occurred");
213    println!("WebSocket connection established: {}", addr);
214
215    let (mut ws_sender, mut ws_receiver) = ws_stream.split();
216    let msg = ws_receiver.next().await.unwrap().unwrap();
217    let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
218
219    let response_msg = match request_msg {
220        RequestMessage::Status => ResponseMessage::Status(vec![
221            Span { start: 2, end: 4 },
222            Span { start: 9, end: 23 },
223            Span {
224                start: 20002,
225                end: 400000,
226            },
227        ]),
228        RequestMessage::SubscribeStatus => {
229            let response_msg = ResponseMessage::Subscribed;
230            let response_json = serde_json::to_string(&response_msg).unwrap();
231            ws_sender
232                .send(tungstenite::Message::Text(response_json))
233                .await
234                .unwrap();
235
236            let response_msg = ResponseMessage::Status(vec![
237                Span { start: 2, end: 4 },
238                Span { start: 9, end: 23 },
239                Span {
240                    start: 20002,
241                    end: 400000,
242                },
243            ]);
244
245            let response_json = serde_json::to_string(&response_msg).unwrap();
246            ws_sender
247                .send(tungstenite::Message::Text(response_json))
248                .await
249                .unwrap();
250
251            let response_msg = ResponseMessage::Status(vec![
252                Span { start: 2, end: 4 },
253                Span { start: 9, end: 23 },
254                Span {
255                    start: 20002,
256                    end: 400008,
257                },
258            ]);
259
260            let response_json = serde_json::to_string(&response_msg).unwrap();
261            ws_sender
262                .send(tungstenite::Message::Text(response_json))
263                .await
264                .unwrap();
265
266            let response_msg = ResponseMessage::Status(vec![
267                Span { start: 2, end: 4 },
268                Span { start: 9, end: 23 },
269                Span {
270                    start: 20002,
271                    end: 400028,
272                },
273            ]);
274
275            let response_json = serde_json::to_string(&response_msg).unwrap();
276            ws_sender
277                .send(tungstenite::Message::Text(response_json))
278                .await
279                .unwrap();
280            let msg = ws_receiver.next().await.unwrap().unwrap();
281            let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
282            match request_msg {
283                RequestMessage::UnsubscribeStatus => ResponseMessage::Unsubscribed,
284                _ => ResponseMessage::Error,
285            }
286        }
287        RequestMessage::Variants => ResponseMessage::Variants(vec![PalletMeta {
288            index: 0,
289            name: "test1".to_string(),
290            events: vec![EventMeta {
291                index: 0,
292                name: "event1".to_string(),
293            }],
294        }]),
295        RequestMessage::GetEvents { .. } => ResponseMessage::Events {
296            key: Key::Variant(0, 0),
297            events: vec![
298                Event {
299                    block_number: 82,
300                    event_index: 16,
301                },
302                Event {
303                    block_number: 86,
304                    event_index: 17,
305                },
306            ],
307        },
308        RequestMessage::SubscribeEvents { .. } => {
309            let response_msg = ResponseMessage::Subscribed;
310            let response_json = serde_json::to_string(&response_msg).unwrap();
311            ws_sender
312                .send(tungstenite::Message::Text(response_json))
313                .await
314                .unwrap();
315
316            let response_msg = ResponseMessage::Events {
317                key: Key::Variant(0, 0),
318                events: vec![
319                    Event {
320                        block_number: 82,
321                        event_index: 16,
322                    },
323                    Event {
324                        block_number: 86,
325                        event_index: 17,
326                    },
327                ],
328            };
329
330            let response_json = serde_json::to_string(&response_msg).unwrap();
331            ws_sender
332                .send(tungstenite::Message::Text(response_json))
333                .await
334                .unwrap();
335            let response_msg = ResponseMessage::Events {
336                key: Key::Variant(0, 1),
337                events: vec![Event {
338                    block_number: 102,
339                    event_index: 12,
340                }],
341            };
342
343            let response_json = serde_json::to_string(&response_msg).unwrap();
344            ws_sender
345                .send(tungstenite::Message::Text(response_json))
346                .await
347                .unwrap();
348
349            let response_msg = ResponseMessage::Events {
350                key: Key::Variant(0, 0),
351                events: vec![Event {
352                    block_number: 102,
353                    event_index: 12,
354                }],
355            };
356
357            let response_json = serde_json::to_string(&response_msg).unwrap();
358            ws_sender
359                .send(tungstenite::Message::Text(response_json))
360                .await
361                .unwrap();
362
363            let response_msg = ResponseMessage::Events {
364                key: Key::Variant(0, 0),
365                events: vec![Event {
366                    block_number: 108,
367                    event_index: 0,
368                }],
369            };
370
371            let response_json = serde_json::to_string(&response_msg).unwrap();
372            ws_sender
373                .send(tungstenite::Message::Text(response_json))
374                .await
375                .unwrap();
376            let msg = ws_receiver.next().await.unwrap().unwrap();
377            let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
378            match request_msg {
379                RequestMessage::UnsubscribeEvents { .. } => ResponseMessage::Unsubscribed,
380                _ => ResponseMessage::Error,
381            }
382        }
383        RequestMessage::SizeOnDisk => ResponseMessage::SizeOnDisk(640),
384        _ => ResponseMessage::Error,
385    };
386    let response_json = serde_json::to_string(&response_msg).unwrap();
387    ws_sender
388        .send(tungstenite::Message::Text(response_json))
389        .await
390        .unwrap();
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396
397    #[tokio::test]
398    async fn test_status() {
399        let mut index = Index::test_connect().await.unwrap();
400        let status = index.status().await.unwrap();
401
402        assert_eq!(
403            status,
404            vec![
405                Span { start: 2, end: 4 },
406                Span { start: 9, end: 23 },
407                Span {
408                    start: 20002,
409                    end: 400000,
410                },
411            ]
412        );
413    }
414
415    #[tokio::test]
416    async fn test_subscribe_status() {
417        let mut index = Index::test_connect().await.unwrap();
418        let mut stream = index.subscribe_status().await.unwrap();
419        let status = stream.next().await.unwrap().unwrap();
420
421        assert_eq!(
422            status,
423            vec![
424                Span { start: 2, end: 4 },
425                Span { start: 9, end: 23 },
426                Span {
427                    start: 20002,
428                    end: 400000,
429                },
430            ]
431        );
432
433        let status = stream.next().await.unwrap().unwrap();
434
435        assert_eq!(
436            status,
437            vec![
438                Span { start: 2, end: 4 },
439                Span { start: 9, end: 23 },
440                Span {
441                    start: 20002,
442                    end: 400008,
443                },
444            ]
445        );
446        let status = stream.next().await.unwrap().unwrap();
447
448        assert_eq!(
449            status,
450            vec![
451                Span { start: 2, end: 4 },
452                Span { start: 9, end: 23 },
453                Span {
454                    start: 20002,
455                    end: 400028,
456                },
457            ]
458        );
459        drop(stream);
460        index.unsubscribe_status().await.unwrap();
461    }
462
463    #[tokio::test]
464    async fn test_variants() {
465        let mut index = Index::test_connect().await.unwrap();
466        let variants = index.get_variants().await.unwrap();
467
468        assert_eq!(
469            variants,
470            vec![PalletMeta {
471                index: 0,
472                name: "test1".to_string(),
473                events: vec![EventMeta {
474                    index: 0,
475                    name: "event1".to_string()
476                }]
477            },]
478        );
479    }
480
481    #[tokio::test]
482    async fn test_get_events() {
483        let mut index = Index::test_connect().await.unwrap();
484        let events = index.get_events(Key::Variant(0, 0)).await.unwrap();
485
486        assert_eq!(
487            events,
488            vec![
489                Event {
490                    block_number: 82,
491                    event_index: 16,
492                },
493                Event {
494                    block_number: 86,
495                    event_index: 17,
496                },
497            ]
498        );
499    }
500
501    #[tokio::test]
502    async fn test_subscribe_events() {
503        let mut index = Index::test_connect().await.unwrap();
504        let mut stream = index.subscribe_events(Key::Variant(0, 0)).await.unwrap();
505        let events = stream.next().await.unwrap().unwrap();
506
507        assert_eq!(
508            events,
509            vec![
510                Event {
511                    block_number: 82,
512                    event_index: 16,
513                },
514                Event {
515                    block_number: 86,
516                    event_index: 17,
517                },
518            ]
519        );
520
521        let events = stream.next().await.unwrap().unwrap();
522
523        assert_eq!(events, vec![]);
524        let events = stream.next().await.unwrap().unwrap();
525
526        assert_eq!(
527            events,
528            vec![Event {
529                block_number: 102,
530                event_index: 12,
531            }]
532        );
533        let events = stream.next().await.unwrap().unwrap();
534
535        assert_eq!(
536            events,
537            vec![Event {
538                block_number: 108,
539                event_index: 0,
540            }]
541        );
542        drop(stream);
543        index.unsubscribe_events(Key::Variant(0, 0)).await.unwrap();
544    }
545
546    #[tokio::test]
547    async fn test_size_on_disk() {
548        let mut index = Index::test_connect().await.unwrap();
549        let size = index.size_on_disk().await.unwrap();
550
551        assert_eq!(size, 640);
552    }
553}