1use ahash::AHashMap;
2use futures::future;
3use num_format::{Locale, ToFormattedString};
4use sled::Tree;
5use std::{collections::HashMap, future::Future, sync::Mutex};
6use subxt::{backend::legacy::LegacyRpcMethods, blocks::Block, metadata::Metadata, OnlineClient};
7use tokio::{
8 sync::{mpsc, watch, RwLock},
9 time::{self, Duration, Instant, MissedTickBehavior},
10};
11use tracing::{debug, error, info};
12use zerocopy::{AsBytes, FromBytes};
13
14use crate::{shared::*, websockets::process_msg_status};
15
16#[allow(clippy::type_complexity)]
17pub struct Indexer<R: RuntimeIndexer + ?Sized> {
18 trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
19 api: Option<OnlineClient<R::RuntimeConfig>>,
20 rpc: Option<LegacyRpcMethods<R::RuntimeConfig>>,
21 index_variant: bool,
22 metadata_map_lock: RwLock<AHashMap<u32, Metadata>>,
23 status_sub: Mutex<Vec<mpsc::UnboundedSender<ResponseMessage<R::ChainKey>>>>,
24 events_sub_map:
25 Mutex<HashMap<Key<R::ChainKey>, Vec<mpsc::UnboundedSender<ResponseMessage<R::ChainKey>>>>>,
26}
27
28impl<R: RuntimeIndexer> Indexer<R> {
29 fn new(
30 trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
31 api: OnlineClient<R::RuntimeConfig>,
32 rpc: LegacyRpcMethods<R::RuntimeConfig>,
33 index_variant: bool,
34 ) -> Self {
35 Indexer {
36 trees,
37 api: Some(api),
38 rpc: Some(rpc),
39 index_variant,
40 metadata_map_lock: RwLock::new(AHashMap::new()),
41 status_sub: Vec::new().into(),
42 events_sub_map: HashMap::new().into(),
43 }
44 }
45
46 pub fn new_test(trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>) -> Self {
47 Indexer {
48 trees,
49 api: None,
50 rpc: None,
51 index_variant: true,
52 metadata_map_lock: RwLock::new(AHashMap::new()),
53 status_sub: Vec::new().into(),
54 events_sub_map: HashMap::new().into(),
55 }
56 }
57
58 async fn index_head(
59 &self,
60 next: impl Future<
61 Output = Option<
62 Result<Block<R::RuntimeConfig, OnlineClient<R::RuntimeConfig>>, subxt::Error>,
63 >,
64 >,
65 ) -> Result<(u32, u32, u32), IndexError> {
66 let block = next.await.unwrap()?;
67 self.index_block(block.number().into().try_into().unwrap())
68 .await
69 }
70
71 async fn index_block(&self, block_number: u32) -> Result<(u32, u32, u32), IndexError> {
72 let mut key_count = 0;
73 let api = self.api.as_ref().unwrap();
74 let rpc = self.rpc.as_ref().unwrap();
75
76 let block_hash = match rpc.chain_get_block_hash(Some(block_number.into())).await? {
77 Some(block_hash) => block_hash,
78 None => return Err(IndexError::BlockNotFound(block_number)),
79 };
80 let runtime_version = rpc.state_get_runtime_version(Some(block_hash)).await?;
82
83 let metadata_map = self.metadata_map_lock.read().await;
84 let metadata = match metadata_map.get(&runtime_version.spec_version) {
85 Some(metadata) => {
86 let metadata = metadata.clone();
87 drop(metadata_map);
88 metadata
89 }
90 None => {
91 drop(metadata_map);
92 let mut metadata_map = self.metadata_map_lock.write().await;
93
94 match metadata_map.get(&runtime_version.spec_version) {
95 Some(metadata) => metadata.clone(),
96 None => {
97 info!(
98 "Downloading metadata for spec version {}",
99 runtime_version.spec_version
100 );
101 let metadata = rpc.state_get_metadata(Some(block_hash)).await?;
102 info!(
103 "Finished downloading metadata for spec version {}",
104 runtime_version.spec_version
105 );
106 metadata_map.insert(runtime_version.spec_version, metadata.clone());
107 metadata
108 }
109 }
110 }
111 };
112
113 let events =
114 subxt::events::Events::new_from_client(metadata, block_hash, api.clone()).await?;
115
116 for (i, event) in events.iter().enumerate() {
117 match event {
118 Ok(event) => {
119 let event_index = i.try_into().unwrap();
120 if self.index_variant {
121 self.index_event(
122 Key::Variant(event.pallet_index(), event.variant_index()),
123 block_number,
124 event_index,
125 )?;
126 key_count += 1;
127 }
128 if let Ok(event_key_count) =
129 R::process_event(self, block_number, event_index, event)
130 {
131 key_count += event_key_count;
132 }
133 }
134 Err(error) => error!("Block: {}, error: {}", block_number, error),
135 }
136 }
137
138 Ok((block_number, events.len(), key_count))
139 }
140
141 pub fn notify_status_subscribers(&self) {
142 let msg = process_msg_status::<R>(&self.trees.span);
143 let txs = self.status_sub.lock().unwrap();
144 for tx in txs.iter() {
145 if tx.send(msg.clone()).is_ok() {}
146 }
147 }
148
149 pub fn notify_subscribers(&self, search_key: Key<R::ChainKey>, event: Event) {
150 let events_sub_map = self.events_sub_map.lock().unwrap();
151 if let Some(txs) = events_sub_map.get(&search_key) {
152 let msg = ResponseMessage::Events {
153 key: search_key,
154 events: vec![event],
155 };
156 for tx in txs.iter() {
157 if tx.send(msg.clone()).is_ok() {}
158 }
159 }
160 }
161
162 pub fn index_event(
163 &self,
164 key: Key<R::ChainKey>,
165 block_number: u32,
166 event_index: u16,
167 ) -> Result<(), sled::Error> {
168 key.write_db_key(&self.trees, block_number, event_index)?;
169 self.notify_subscribers(
170 key,
171 Event {
172 block_number,
173 event_index,
174 },
175 );
176 Ok(())
177 }
178}
179
180pub fn load_spans<R: RuntimeIndexer>(
181 span_db: &Tree,
182 index_variant: bool,
183) -> Result<Vec<Span>, IndexError> {
184 let mut spans = vec![];
185 'span: for (key, value) in span_db.into_iter().flatten() {
186 let span_value = SpanDbValue::read_from(&value).unwrap();
187 let start: u32 = span_value.start.into();
188 let mut end: u32 = u32::from_be_bytes(key.as_ref().try_into().unwrap());
189 if index_variant && (span_value.index_variant != 1) {
191 span_db.remove(key)?;
193 info!(
194 "📚 Re-indexing span of blocks from #{} to #{}.",
195 start.to_formatted_string(&Locale::en),
196 end.to_formatted_string(&Locale::en)
197 );
198 info!("📚 Reason: event variants not indexed.");
199 continue;
200 }
201 let span_version: u16 = span_value.version.into();
202 for (version, block_number) in R::get_versions().iter().enumerate() {
204 if span_version < version.try_into().unwrap() && end >= *block_number {
205 span_db.remove(key)?;
206 if start >= *block_number {
207 info!(
208 "📚 Re-indexing span of blocks from #{} to #{}.",
209 start.to_formatted_string(&Locale::en),
210 end.to_formatted_string(&Locale::en)
211 );
212 continue 'span;
213 }
214 info!(
215 "📚 Re-indexing span of blocks from #{} to #{}.",
216 block_number.to_formatted_string(&Locale::en),
217 end.to_formatted_string(&Locale::en)
218 );
219 end = block_number - 1;
221 span_db.insert(end.to_be_bytes(), value)?;
222 break;
223 }
224 }
225 let span = Span { start, end };
226 info!(
227 "📚 Previous span of indexed blocks from #{} to #{}.",
228 start.to_formatted_string(&Locale::en),
229 end.to_formatted_string(&Locale::en)
230 );
231 spans.push(span);
232 }
233 Ok(spans)
234}
235
236pub fn check_span(
237 span_db: &Tree,
238 spans: &mut Vec<Span>,
239 current_span: &mut Span,
240) -> Result<(), IndexError> {
241 while let Some(span) = spans.last() {
242 if current_span.start > span.start && current_span.start - 1 <= span.end {
244 let skipped = span.end - span.start + 1;
245 info!(
246 "📚 Skipping {} blocks from #{} to #{}",
247 skipped.to_formatted_string(&Locale::en),
248 span.start.to_formatted_string(&Locale::en),
249 span.end.to_formatted_string(&Locale::en),
250 );
251 current_span.start = span.start;
252 span_db.remove(span.end.to_be_bytes())?;
254 spans.pop();
255 } else {
256 break;
257 }
258 }
259 Ok(())
260}
261
262pub fn check_next_batch_block(spans: &[Span], next_batch_block: &mut u32) {
263 let mut i = spans.len();
265 while i != 0 {
266 i -= 1;
267 if *next_batch_block >= spans[i].start && *next_batch_block <= spans[i].end {
268 *next_batch_block = spans[i].start - 1;
269 }
270 }
271}
272
273pub fn process_sub_msg<R: RuntimeIndexer>(
274 indexer: &Indexer<R>,
275 msg: SubscriptionMessage<R::ChainKey>,
276) {
277 match msg {
278 SubscriptionMessage::SubscribeStatus { sub_response_tx } => {
279 let mut txs = indexer.status_sub.lock().unwrap();
280 txs.push(sub_response_tx);
281 }
282 SubscriptionMessage::UnsubscribeStatus { sub_response_tx } => {
283 let mut txs = indexer.status_sub.lock().unwrap();
284 txs.retain(|value| !sub_response_tx.same_channel(value));
285 }
286 SubscriptionMessage::SubscribeEvents {
287 key,
288 sub_response_tx,
289 } => {
290 let mut events_sub_map = indexer.events_sub_map.lock().unwrap();
291 match events_sub_map.get_mut(&key) {
292 Some(txs) => {
293 txs.push(sub_response_tx);
294 }
295 None => {
296 let txs = vec![sub_response_tx];
297 events_sub_map.insert(key, txs);
298 }
299 };
300 }
301 SubscriptionMessage::UnsubscribeEvents {
302 key,
303 sub_response_tx,
304 } => {
305 let mut events_sub_map = indexer.events_sub_map.lock().unwrap();
306 match events_sub_map.get_mut(&key) {
307 Some(txs) => {
308 txs.retain(|value| !sub_response_tx.same_channel(value));
309 }
310 None => {}
311 };
312 }
313 };
314}
315
316pub async fn substrate_index<R: RuntimeIndexer>(
317 trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
318 api: OnlineClient<R::RuntimeConfig>,
319 rpc: LegacyRpcMethods<R::RuntimeConfig>,
320 queue_depth: u32,
321 index_variant: bool,
322 mut exit_rx: watch::Receiver<bool>,
323 mut sub_rx: mpsc::UnboundedReceiver<SubscriptionMessage<R::ChainKey>>,
324) -> Result<(), IndexError> {
325 info!(
326 "📇 Event variant indexing: {}",
327 match index_variant {
328 false => "disabled",
329 true => "enabled",
330 },
331 );
332 let mut blocks_sub = api.blocks().subscribe_finalized().await?;
334 let mut next_batch_block: u32 = blocks_sub
336 .next()
337 .await
338 .ok_or(IndexError::BlockNotFound(0))??
339 .number()
340 .into()
341 .try_into()
342 .unwrap();
343 info!(
344 "📚 Indexing backwards from #{}",
345 next_batch_block.to_formatted_string(&Locale::en)
346 );
347 let mut spans = load_spans::<R>(&trees.span, index_variant)?;
349 let mut current_span = if let Some(span) = spans.last()
351 && span.end == next_batch_block
352 {
353 let span = span.clone();
354 let skipped = span.end - span.start + 1;
355 info!(
356 "📚 Skipping {} blocks from #{} to #{}",
357 skipped.to_formatted_string(&Locale::en),
358 span.start.to_formatted_string(&Locale::en),
359 span.end.to_formatted_string(&Locale::en),
360 );
361 trees.span.remove(span.end.to_be_bytes())?;
363 spans.pop();
364 next_batch_block = span.start - 1;
365 span
366 } else {
367 Span {
368 start: next_batch_block + 1,
369 end: next_batch_block + 1,
370 }
371 };
372
373 let indexer = Indexer::<R>::new(trees.clone(), api, rpc, index_variant);
374
375 let mut head_future = Box::pin(indexer.index_head(blocks_sub.next()));
376
377 info!("📚 Queue depth: {}", queue_depth);
378 let mut futures = Vec::with_capacity(queue_depth.try_into().unwrap());
379
380 for _ in 0..queue_depth {
381 check_next_batch_block(&spans, &mut next_batch_block);
382 futures.push(Box::pin(indexer.index_block(next_batch_block)));
383 debug!(
384 "⬆️ Block #{} queued.",
385 next_batch_block.to_formatted_string(&Locale::en)
386 );
387 next_batch_block -= 1;
388 }
389
390 let mut orphans: AHashMap<u32, ()> = AHashMap::new();
391
392 let mut stats_block_count = 0;
393 let mut stats_event_count = 0;
394 let mut stats_key_count = 0;
395 let mut stats_start_time = Instant::now();
396
397 let interval_duration = Duration::from_millis(2000);
398 let mut interval = time::interval_at(Instant::now() + interval_duration, interval_duration);
399 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
400
401 let mut is_batching = true;
402
403 loop {
404 tokio::select! {
405 biased;
406
407 _ = exit_rx.changed() => {
408 if current_span.start != current_span.end {
409 let value = SpanDbValue {
410 start: current_span.start.into(),
411 version: (R::get_versions().len() - 1).try_into().unwrap(),
412 index_variant: index_variant.into(),
413 };
414 trees.span.insert(current_span.end.to_be_bytes(), value.as_bytes())?;
415 info!(
416 "📚 Recording current indexed span from #{} to #{}",
417 current_span.start.to_formatted_string(&Locale::en),
418 current_span.end.to_formatted_string(&Locale::en)
419 );
420 }
421 return Ok(());
422 }
423 Some(msg) = sub_rx.recv() => process_sub_msg(&indexer, msg),
424 result = &mut head_future => {
425 match result {
426 Ok((block_number, event_count, key_count)) => {
427 trees.span.remove(current_span.end.to_be_bytes())?;
428 current_span.end = block_number;
429 let value = SpanDbValue {
430 start: current_span.start.into(),
431 version: (R::get_versions().len() - 1).try_into().unwrap(),
432 index_variant: index_variant.into(),
433 };
434 trees.span.insert(current_span.end.to_be_bytes(), value.as_bytes())?;
435 info!(
436 "✨ #{}: {} events, {} keys",
437 block_number.to_formatted_string(&Locale::en),
438 event_count.to_formatted_string(&Locale::en),
439 key_count.to_formatted_string(&Locale::en),
440 );
441 indexer.notify_status_subscribers();
442 drop(head_future);
443 head_future = Box::pin(indexer.index_head(blocks_sub.next()));
444 },
445 Err(error) => {
446 match error {
447 IndexError::BlockNotFound(block_number) => {
448 error!("✨ Block not found #{}", block_number.to_formatted_string(&Locale::en));
449 },
450 err => {
451 error!("✨ Indexing failed: {}", err);
452 },
453 }
454 },
455 };
456 }
457 _ = interval.tick(), if is_batching => {
458 let current_time = Instant::now();
459 let duration = (current_time.duration_since(stats_start_time)).as_micros();
460 if duration != 0 {
461 info!(
462 "📚 #{}: {} blocks/sec, {} events/sec, {} keys/sec",
463 current_span.start.to_formatted_string(&Locale::en),
464 (<u32 as Into<u128>>::into(stats_block_count) * 1_000_000 / duration).to_formatted_string(&Locale::en),
465 (<u32 as Into<u128>>::into(stats_event_count) * 1_000_000 / duration).to_formatted_string(&Locale::en),
466 (<u32 as Into<u128>>::into(stats_key_count) * 1_000_000 / duration).to_formatted_string(&Locale::en),
467 );
468 }
469 stats_block_count = 0;
470 stats_event_count = 0;
471 stats_key_count = 0;
472 stats_start_time = current_time;
473 }
474 (result, index, _) = future::select_all(&mut futures), if is_batching => {
475 match result {
476 Ok((block_number, event_count, key_count)) => {
477 if block_number == current_span.start - 1 {
479 current_span.start = block_number;
480 debug!("⬇️ Block #{} indexed.", block_number.to_formatted_string(&Locale::en));
481 check_span(&trees.span, &mut spans, &mut current_span)?;
482 while orphans.contains_key(&(current_span.start - 1)) {
484 current_span.start -= 1;
485 orphans.remove(¤t_span.start);
486 debug!("➡️ Block #{} unorphaned.", current_span.start.to_formatted_string(&Locale::en));
487 check_span(&trees.span, &mut spans, &mut current_span)?;
488 }
489 }
490 else {
491 orphans.insert(block_number, ());
492 debug!("⬇️ Block #{} indexed and orphaned.", block_number.to_formatted_string(&Locale::en));
493 }
494 stats_block_count += 1;
495 stats_event_count += event_count;
496 stats_key_count += key_count;
497 },
498 Err(error) => {
499 match error {
500 IndexError::BlockNotFound(block_number) => {
501 error!("📚 Block not found #{}", block_number.to_formatted_string(&Locale::en));
502 is_batching = false;
503 },
504 _ => {
505 error!("📚 Batch indexing failed: {:?}", error);
506 is_batching = false;
507 },
508 }
509 }
510 }
511 check_next_batch_block(&spans, &mut next_batch_block);
512 futures[index] = Box::pin(indexer.index_block(next_batch_block));
513 debug!("⬆️ Block #{} queued.", next_batch_block.to_formatted_string(&Locale::en));
514 next_batch_block -= 1;
515 }
516 }
517 }
518}