1#![feature(let_chains)]
8use futures_util::{SinkExt, StreamExt};
9pub use hybrid_indexer::shared::{Bytes32, Event, EventMeta, PalletMeta, Span, SubstrateKey};
10use serde::{Deserialize, Serialize};
11use thiserror::Error;
12use tokio::net::TcpStream;
13use tokio_tungstenite::{
14 connect_async, tungstenite, tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream,
15};
16
17#[cfg(test)]
18use tokio::net::TcpListener;
19
20#[derive(Error, Debug)]
22pub enum IndexError {
23 #[error("connection error")]
24 Websocket(#[from] tungstenite::Error),
25 #[error("decoding error")]
26 SerdeJson(#[from] serde_json::Error),
27 #[error("no message")]
28 NoMessage,
29}
30
31pub struct Index {
33 ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
34}
35
36impl Index {
37 async fn send_recv(&mut self, send_msg: RequestMessage) -> Result<ResponseMessage, IndexError> {
39 self.ws_stream
40 .send(Message::Text(serde_json::to_string(&send_msg)?))
41 .await?;
42 let msg = self.ws_stream.next().await.ok_or(IndexError::NoMessage)??;
43 Ok(serde_json::from_str(msg.to_text()?)?)
44 }
45
46 pub async fn connect(url: String) -> Result<Self, IndexError> {
48 let (ws_stream, _) = connect_async(url).await?;
49 let index = Index { ws_stream };
50 Ok(index)
51 }
52
53 pub async fn status(&mut self) -> Result<Vec<Span>, IndexError> {
55 match self.send_recv(RequestMessage::Status).await? {
56 ResponseMessage::Status(spans) => Ok(spans),
57 _ => Err(IndexError::NoMessage),
58 }
59 }
60
61 pub async fn subscribe_status(
63 &mut self,
64 ) -> Result<impl futures_util::Stream<Item = Result<Vec<Span>, IndexError>> + '_, IndexError>
65 {
66 if self.send_recv(RequestMessage::SubscribeStatus).await? != ResponseMessage::Subscribed {
67 return Err(IndexError::NoMessage);
68 };
69
70 Ok(self.ws_stream.by_ref().map(|msg| {
72 let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
73
74 match response {
75 ResponseMessage::Status(spans) => Ok(spans),
76 _ => Err(IndexError::NoMessage),
77 }
78 }))
79 }
80
81 pub async fn unsubscribe_status(&mut self) -> Result<(), IndexError> {
83 match self.send_recv(RequestMessage::UnsubscribeStatus).await? {
84 ResponseMessage::Unsubscribed => Ok(()),
85 _ => Err(IndexError::NoMessage),
86 }
87 }
88
89 pub async fn size_on_disk(&mut self) -> Result<u64, IndexError> {
91 match self.send_recv(RequestMessage::SizeOnDisk).await? {
92 ResponseMessage::SizeOnDisk(size) => Ok(size),
93 _ => Err(IndexError::NoMessage),
94 }
95 }
96
97 pub async fn get_variants(&mut self) -> Result<Vec<PalletMeta>, IndexError> {
99 match self.send_recv(RequestMessage::Variants).await? {
100 ResponseMessage::Variants(pallet_meta) => Ok(pallet_meta),
101 _ => Err(IndexError::NoMessage),
102 }
103 }
104
105 pub async fn get_events(&mut self, key: Key) -> Result<Vec<Event>, IndexError> {
107 match self.send_recv(RequestMessage::GetEvents { key }).await? {
108 ResponseMessage::Events { events, .. } => Ok(events),
109 _ => Err(IndexError::NoMessage),
110 }
111 }
112
113 pub async fn subscribe_events(
115 &mut self,
116 key: Key,
117 ) -> Result<impl futures_util::Stream<Item = Result<Vec<Event>, IndexError>> + '_, IndexError>
118 {
119 if self
120 .send_recv(RequestMessage::SubscribeEvents { key: key.clone() })
121 .await?
122 != ResponseMessage::Subscribed
123 {
124 return Err(IndexError::NoMessage);
125 };
126
127 Ok(self.ws_stream.by_ref().map(move |msg| {
129 let response: ResponseMessage = serde_json::from_str(msg?.to_text()?)?;
130
131 match response {
132 ResponseMessage::Events {
133 key: response_key,
134 events,
135 } => Ok(if response_key != key { vec![] } else { events }),
136 _ => Err(IndexError::NoMessage),
137 }
138 }))
139 }
140
141 pub async fn unsubscribe_events(&mut self, key: Key) -> Result<(), IndexError> {
143 match self
144 .send_recv(RequestMessage::UnsubscribeEvents { key })
145 .await?
146 {
147 ResponseMessage::Unsubscribed => Ok(()),
148 _ => Err(IndexError::NoMessage),
149 }
150 }
151}
152
153#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
155#[serde(tag = "type", content = "value")]
156pub enum Key {
157 Variant(u8, u8),
158 Substrate(SubstrateKey),
159}
160
161#[derive(Serialize, Deserialize, Debug, Clone)]
163#[serde(tag = "type")]
164pub enum RequestMessage {
165 Status,
166 SubscribeStatus,
167 UnsubscribeStatus,
168 Variants,
169 GetEvents { key: Key },
170 SubscribeEvents { key: Key },
171 UnsubscribeEvents { key: Key },
172 SizeOnDisk,
173}
174
175#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
177#[serde(tag = "type", content = "data")]
178#[serde(rename_all = "camelCase")]
179pub enum ResponseMessage {
180 Status(Vec<Span>),
181 Variants(Vec<PalletMeta>),
182 Events { key: Key, events: Vec<Event> },
183 Subscribed,
184 Unsubscribed,
185 SizeOnDisk(u64),
186 Error,
187}
188
189#[cfg(test)]
190impl Index {
191 pub async fn test_connect() -> Result<Self, IndexError> {
192 let try_socket = TcpListener::bind("127.0.0.1:0").await;
193 let listener = try_socket.expect("Failed to bind");
194
195 let addr = listener.local_addr().unwrap().to_string();
196 let mut url = "ws://".to_string();
197 url.push_str(&addr);
198
199 tokio::spawn(handle_connection(listener));
200
201 Index::connect(url).await
202 }
203}
204
205#[cfg(test)]
206async fn handle_connection(listener: TcpListener) {
207 let (raw_stream, addr) = listener.accept().await.unwrap();
208 println!("Incoming TCP connection from: {}", addr);
209
210 let ws_stream = tokio_tungstenite::accept_async(raw_stream)
211 .await
212 .expect("Error during the websocket handshake occurred");
213 println!("WebSocket connection established: {}", addr);
214
215 let (mut ws_sender, mut ws_receiver) = ws_stream.split();
216 let msg = ws_receiver.next().await.unwrap().unwrap();
217 let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
218
219 let response_msg = match request_msg {
220 RequestMessage::Status => ResponseMessage::Status(vec![
221 Span { start: 2, end: 4 },
222 Span { start: 9, end: 23 },
223 Span {
224 start: 20002,
225 end: 400000,
226 },
227 ]),
228 RequestMessage::SubscribeStatus => {
229 let response_msg = ResponseMessage::Subscribed;
230 let response_json = serde_json::to_string(&response_msg).unwrap();
231 ws_sender
232 .send(tungstenite::Message::Text(response_json))
233 .await
234 .unwrap();
235
236 let response_msg = ResponseMessage::Status(vec![
237 Span { start: 2, end: 4 },
238 Span { start: 9, end: 23 },
239 Span {
240 start: 20002,
241 end: 400000,
242 },
243 ]);
244
245 let response_json = serde_json::to_string(&response_msg).unwrap();
246 ws_sender
247 .send(tungstenite::Message::Text(response_json))
248 .await
249 .unwrap();
250
251 let response_msg = ResponseMessage::Status(vec![
252 Span { start: 2, end: 4 },
253 Span { start: 9, end: 23 },
254 Span {
255 start: 20002,
256 end: 400008,
257 },
258 ]);
259
260 let response_json = serde_json::to_string(&response_msg).unwrap();
261 ws_sender
262 .send(tungstenite::Message::Text(response_json))
263 .await
264 .unwrap();
265
266 let response_msg = ResponseMessage::Status(vec![
267 Span { start: 2, end: 4 },
268 Span { start: 9, end: 23 },
269 Span {
270 start: 20002,
271 end: 400028,
272 },
273 ]);
274
275 let response_json = serde_json::to_string(&response_msg).unwrap();
276 ws_sender
277 .send(tungstenite::Message::Text(response_json))
278 .await
279 .unwrap();
280 let msg = ws_receiver.next().await.unwrap().unwrap();
281 let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
282 match request_msg {
283 RequestMessage::UnsubscribeStatus => ResponseMessage::Unsubscribed,
284 _ => ResponseMessage::Error,
285 }
286 }
287 RequestMessage::Variants => ResponseMessage::Variants(vec![PalletMeta {
288 index: 0,
289 name: "test1".to_string(),
290 events: vec![EventMeta {
291 index: 0,
292 name: "event1".to_string(),
293 }],
294 }]),
295 RequestMessage::GetEvents { .. } => ResponseMessage::Events {
296 key: Key::Variant(0, 0),
297 events: vec![
298 Event {
299 block_number: 82,
300 event_index: 16,
301 },
302 Event {
303 block_number: 86,
304 event_index: 17,
305 },
306 ],
307 },
308 RequestMessage::SubscribeEvents { .. } => {
309 let response_msg = ResponseMessage::Subscribed;
310 let response_json = serde_json::to_string(&response_msg).unwrap();
311 ws_sender
312 .send(tungstenite::Message::Text(response_json))
313 .await
314 .unwrap();
315
316 let response_msg = ResponseMessage::Events {
317 key: Key::Variant(0, 0),
318 events: vec![
319 Event {
320 block_number: 82,
321 event_index: 16,
322 },
323 Event {
324 block_number: 86,
325 event_index: 17,
326 },
327 ],
328 };
329
330 let response_json = serde_json::to_string(&response_msg).unwrap();
331 ws_sender
332 .send(tungstenite::Message::Text(response_json))
333 .await
334 .unwrap();
335 let response_msg = ResponseMessage::Events {
336 key: Key::Variant(0, 1),
337 events: vec![Event {
338 block_number: 102,
339 event_index: 12,
340 }],
341 };
342
343 let response_json = serde_json::to_string(&response_msg).unwrap();
344 ws_sender
345 .send(tungstenite::Message::Text(response_json))
346 .await
347 .unwrap();
348
349 let response_msg = ResponseMessage::Events {
350 key: Key::Variant(0, 0),
351 events: vec![Event {
352 block_number: 102,
353 event_index: 12,
354 }],
355 };
356
357 let response_json = serde_json::to_string(&response_msg).unwrap();
358 ws_sender
359 .send(tungstenite::Message::Text(response_json))
360 .await
361 .unwrap();
362
363 let response_msg = ResponseMessage::Events {
364 key: Key::Variant(0, 0),
365 events: vec![Event {
366 block_number: 108,
367 event_index: 0,
368 }],
369 };
370
371 let response_json = serde_json::to_string(&response_msg).unwrap();
372 ws_sender
373 .send(tungstenite::Message::Text(response_json))
374 .await
375 .unwrap();
376 let msg = ws_receiver.next().await.unwrap().unwrap();
377 let request_msg: RequestMessage = serde_json::from_str(msg.to_text().unwrap()).unwrap();
378 match request_msg {
379 RequestMessage::UnsubscribeEvents { .. } => ResponseMessage::Unsubscribed,
380 _ => ResponseMessage::Error,
381 }
382 }
383 RequestMessage::SizeOnDisk => ResponseMessage::SizeOnDisk(640),
384 _ => ResponseMessage::Error,
385 };
386 let response_json = serde_json::to_string(&response_msg).unwrap();
387 ws_sender
388 .send(tungstenite::Message::Text(response_json))
389 .await
390 .unwrap();
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[tokio::test]
398 async fn test_status() {
399 let mut index = Index::test_connect().await.unwrap();
400 let status = index.status().await.unwrap();
401
402 assert_eq!(
403 status,
404 vec![
405 Span { start: 2, end: 4 },
406 Span { start: 9, end: 23 },
407 Span {
408 start: 20002,
409 end: 400000,
410 },
411 ]
412 );
413 }
414
415 #[tokio::test]
416 async fn test_subscribe_status() {
417 let mut index = Index::test_connect().await.unwrap();
418 let mut stream = index.subscribe_status().await.unwrap();
419 let status = stream.next().await.unwrap().unwrap();
420
421 assert_eq!(
422 status,
423 vec![
424 Span { start: 2, end: 4 },
425 Span { start: 9, end: 23 },
426 Span {
427 start: 20002,
428 end: 400000,
429 },
430 ]
431 );
432
433 let status = stream.next().await.unwrap().unwrap();
434
435 assert_eq!(
436 status,
437 vec![
438 Span { start: 2, end: 4 },
439 Span { start: 9, end: 23 },
440 Span {
441 start: 20002,
442 end: 400008,
443 },
444 ]
445 );
446 let status = stream.next().await.unwrap().unwrap();
447
448 assert_eq!(
449 status,
450 vec![
451 Span { start: 2, end: 4 },
452 Span { start: 9, end: 23 },
453 Span {
454 start: 20002,
455 end: 400028,
456 },
457 ]
458 );
459 drop(stream);
460 index.unsubscribe_status().await.unwrap();
461 }
462
463 #[tokio::test]
464 async fn test_variants() {
465 let mut index = Index::test_connect().await.unwrap();
466 let variants = index.get_variants().await.unwrap();
467
468 assert_eq!(
469 variants,
470 vec![PalletMeta {
471 index: 0,
472 name: "test1".to_string(),
473 events: vec![EventMeta {
474 index: 0,
475 name: "event1".to_string()
476 }]
477 },]
478 );
479 }
480
481 #[tokio::test]
482 async fn test_get_events() {
483 let mut index = Index::test_connect().await.unwrap();
484 let events = index.get_events(Key::Variant(0, 0)).await.unwrap();
485
486 assert_eq!(
487 events,
488 vec![
489 Event {
490 block_number: 82,
491 event_index: 16,
492 },
493 Event {
494 block_number: 86,
495 event_index: 17,
496 },
497 ]
498 );
499 }
500
501 #[tokio::test]
502 async fn test_subscribe_events() {
503 let mut index = Index::test_connect().await.unwrap();
504 let mut stream = index.subscribe_events(Key::Variant(0, 0)).await.unwrap();
505 let events = stream.next().await.unwrap().unwrap();
506
507 assert_eq!(
508 events,
509 vec![
510 Event {
511 block_number: 82,
512 event_index: 16,
513 },
514 Event {
515 block_number: 86,
516 event_index: 17,
517 },
518 ]
519 );
520
521 let events = stream.next().await.unwrap().unwrap();
522
523 assert_eq!(events, vec![]);
524 let events = stream.next().await.unwrap().unwrap();
525
526 assert_eq!(
527 events,
528 vec![Event {
529 block_number: 102,
530 event_index: 12,
531 }]
532 );
533 let events = stream.next().await.unwrap().unwrap();
534
535 assert_eq!(
536 events,
537 vec![Event {
538 block_number: 108,
539 event_index: 0,
540 }]
541 );
542 drop(stream);
543 index.unsubscribe_events(Key::Variant(0, 0)).await.unwrap();
544 }
545
546 #[tokio::test]
547 async fn test_size_on_disk() {
548 let mut index = Index::test_connect().await.unwrap();
549 let size = index.size_on_disk().await.unwrap();
550
551 assert_eq!(size, 640);
552 }
553}