hybrid_indexer/
substrate.rs

1use ahash::AHashMap;
2use futures::future;
3use num_format::{Locale, ToFormattedString};
4use sled::Tree;
5use std::{collections::HashMap, future::Future, sync::Mutex};
6use subxt::{backend::legacy::LegacyRpcMethods, blocks::Block, metadata::Metadata, OnlineClient};
7use tokio::{
8    sync::{mpsc, watch, RwLock},
9    time::{self, Duration, Instant, MissedTickBehavior},
10};
11use tracing::{debug, error, info};
12use zerocopy::{AsBytes, FromBytes};
13
14use crate::{shared::*, websockets::process_msg_status};
15
16#[allow(clippy::type_complexity)]
17pub struct Indexer<R: RuntimeIndexer + ?Sized> {
18    trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
19    api: Option<OnlineClient<R::RuntimeConfig>>,
20    rpc: Option<LegacyRpcMethods<R::RuntimeConfig>>,
21    index_variant: bool,
22    metadata_map_lock: RwLock<AHashMap<u32, Metadata>>,
23    status_sub: Mutex<Vec<mpsc::UnboundedSender<ResponseMessage<R::ChainKey>>>>,
24    events_sub_map:
25        Mutex<HashMap<Key<R::ChainKey>, Vec<mpsc::UnboundedSender<ResponseMessage<R::ChainKey>>>>>,
26}
27
28impl<R: RuntimeIndexer> Indexer<R> {
29    fn new(
30        trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
31        api: OnlineClient<R::RuntimeConfig>,
32        rpc: LegacyRpcMethods<R::RuntimeConfig>,
33        index_variant: bool,
34    ) -> Self {
35        Indexer {
36            trees,
37            api: Some(api),
38            rpc: Some(rpc),
39            index_variant,
40            metadata_map_lock: RwLock::new(AHashMap::new()),
41            status_sub: Vec::new().into(),
42            events_sub_map: HashMap::new().into(),
43        }
44    }
45
46    pub fn new_test(trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>) -> Self {
47        Indexer {
48            trees,
49            api: None,
50            rpc: None,
51            index_variant: true,
52            metadata_map_lock: RwLock::new(AHashMap::new()),
53            status_sub: Vec::new().into(),
54            events_sub_map: HashMap::new().into(),
55        }
56    }
57
58    async fn index_head(
59        &self,
60        next: impl Future<
61            Output = Option<
62                Result<Block<R::RuntimeConfig, OnlineClient<R::RuntimeConfig>>, subxt::Error>,
63            >,
64        >,
65    ) -> Result<(u32, u32, u32), IndexError> {
66        let block = next.await.unwrap()?;
67        self.index_block(block.number().into().try_into().unwrap())
68            .await
69    }
70
71    async fn index_block(&self, block_number: u32) -> Result<(u32, u32, u32), IndexError> {
72        let mut key_count = 0;
73        let api = self.api.as_ref().unwrap();
74        let rpc = self.rpc.as_ref().unwrap();
75
76        let block_hash = match rpc.chain_get_block_hash(Some(block_number.into())).await? {
77            Some(block_hash) => block_hash,
78            None => return Err(IndexError::BlockNotFound(block_number)),
79        };
80        // Get the runtime version of the block.
81        let runtime_version = rpc.state_get_runtime_version(Some(block_hash)).await?;
82
83        let metadata_map = self.metadata_map_lock.read().await;
84        let metadata = match metadata_map.get(&runtime_version.spec_version) {
85            Some(metadata) => {
86                let metadata = metadata.clone();
87                drop(metadata_map);
88                metadata
89            }
90            None => {
91                drop(metadata_map);
92                let mut metadata_map = self.metadata_map_lock.write().await;
93
94                match metadata_map.get(&runtime_version.spec_version) {
95                    Some(metadata) => metadata.clone(),
96                    None => {
97                        info!(
98                            "Downloading metadata for spec version {}",
99                            runtime_version.spec_version
100                        );
101                        let metadata = rpc.state_get_metadata(Some(block_hash)).await?;
102                        info!(
103                            "Finished downloading metadata for spec version {}",
104                            runtime_version.spec_version
105                        );
106                        metadata_map.insert(runtime_version.spec_version, metadata.clone());
107                        metadata
108                    }
109                }
110            }
111        };
112
113        let events =
114            subxt::events::Events::new_from_client(metadata, block_hash, api.clone()).await?;
115
116        for (i, event) in events.iter().enumerate() {
117            match event {
118                Ok(event) => {
119                    let event_index = i.try_into().unwrap();
120                    if self.index_variant {
121                        self.index_event(
122                            Key::Variant(event.pallet_index(), event.variant_index()),
123                            block_number,
124                            event_index,
125                        )?;
126                        key_count += 1;
127                    }
128                    if let Ok(event_key_count) =
129                        R::process_event(self, block_number, event_index, event)
130                    {
131                        key_count += event_key_count;
132                    }
133                }
134                Err(error) => error!("Block: {}, error: {}", block_number, error),
135            }
136        }
137
138        Ok((block_number, events.len(), key_count))
139    }
140
141    pub fn notify_status_subscribers(&self) {
142        let msg = process_msg_status::<R>(&self.trees.span);
143        let txs = self.status_sub.lock().unwrap();
144        for tx in txs.iter() {
145            if tx.send(msg.clone()).is_ok() {}
146        }
147    }
148
149    pub fn notify_subscribers(&self, search_key: Key<R::ChainKey>, event: Event) {
150        let events_sub_map = self.events_sub_map.lock().unwrap();
151        if let Some(txs) = events_sub_map.get(&search_key) {
152            let msg = ResponseMessage::Events {
153                key: search_key,
154                events: vec![event],
155            };
156            for tx in txs.iter() {
157                if tx.send(msg.clone()).is_ok() {}
158            }
159        }
160    }
161
162    pub fn index_event(
163        &self,
164        key: Key<R::ChainKey>,
165        block_number: u32,
166        event_index: u16,
167    ) -> Result<(), sled::Error> {
168        key.write_db_key(&self.trees, block_number, event_index)?;
169        self.notify_subscribers(
170            key,
171            Event {
172                block_number,
173                event_index,
174            },
175        );
176        Ok(())
177    }
178}
179
180pub fn load_spans<R: RuntimeIndexer>(
181    span_db: &Tree,
182    index_variant: bool,
183) -> Result<Vec<Span>, IndexError> {
184    let mut spans = vec![];
185    'span: for (key, value) in span_db.into_iter().flatten() {
186        let span_value = SpanDbValue::read_from(&value).unwrap();
187        let start: u32 = span_value.start.into();
188        let mut end: u32 = u32::from_be_bytes(key.as_ref().try_into().unwrap());
189        // Check if variants are supposed to be indexed and they were not in this span.
190        if index_variant && (span_value.index_variant != 1) {
191            // Delete the span.
192            span_db.remove(key)?;
193            info!(
194                "📚 Re-indexing span of blocks from #{} to #{}.",
195                start.to_formatted_string(&Locale::en),
196                end.to_formatted_string(&Locale::en)
197            );
198            info!("📚 Reason: event variants not indexed.");
199            continue;
200        }
201        let span_version: u16 = span_value.version.into();
202        // Loop through each indexer version.
203        for (version, block_number) in R::get_versions().iter().enumerate() {
204            if span_version < version.try_into().unwrap() && end >= *block_number {
205                span_db.remove(key)?;
206                if start >= *block_number {
207                    info!(
208                        "📚 Re-indexing span of blocks from #{} to #{}.",
209                        start.to_formatted_string(&Locale::en),
210                        end.to_formatted_string(&Locale::en)
211                    );
212                    continue 'span;
213                }
214                info!(
215                    "📚 Re-indexing span of blocks from #{} to #{}.",
216                    block_number.to_formatted_string(&Locale::en),
217                    end.to_formatted_string(&Locale::en)
218                );
219                // Truncate the span.
220                end = block_number - 1;
221                span_db.insert(end.to_be_bytes(), value)?;
222                break;
223            }
224        }
225        let span = Span { start, end };
226        info!(
227            "📚 Previous span of indexed blocks from #{} to #{}.",
228            start.to_formatted_string(&Locale::en),
229            end.to_formatted_string(&Locale::en)
230        );
231        spans.push(span);
232    }
233    Ok(spans)
234}
235
236pub fn check_span(
237    span_db: &Tree,
238    spans: &mut Vec<Span>,
239    current_span: &mut Span,
240) -> Result<(), IndexError> {
241    while let Some(span) = spans.last() {
242        // Have we indexed all the blocks after the span?
243        if current_span.start > span.start && current_span.start - 1 <= span.end {
244            let skipped = span.end - span.start + 1;
245            info!(
246                "📚 Skipping {} blocks from #{} to #{}",
247                skipped.to_formatted_string(&Locale::en),
248                span.start.to_formatted_string(&Locale::en),
249                span.end.to_formatted_string(&Locale::en),
250            );
251            current_span.start = span.start;
252            // Remove the span.
253            span_db.remove(span.end.to_be_bytes())?;
254            spans.pop();
255        } else {
256            break;
257        }
258    }
259    Ok(())
260}
261
262pub fn check_next_batch_block(spans: &[Span], next_batch_block: &mut u32) {
263    // Figure out the next block to index, skipping the next span if we have reached it.
264    let mut i = spans.len();
265    while i != 0 {
266        i -= 1;
267        if *next_batch_block >= spans[i].start && *next_batch_block <= spans[i].end {
268            *next_batch_block = spans[i].start - 1;
269        }
270    }
271}
272
273pub fn process_sub_msg<R: RuntimeIndexer>(
274    indexer: &Indexer<R>,
275    msg: SubscriptionMessage<R::ChainKey>,
276) {
277    match msg {
278        SubscriptionMessage::SubscribeStatus { sub_response_tx } => {
279            let mut txs = indexer.status_sub.lock().unwrap();
280            txs.push(sub_response_tx);
281        }
282        SubscriptionMessage::UnsubscribeStatus { sub_response_tx } => {
283            let mut txs = indexer.status_sub.lock().unwrap();
284            txs.retain(|value| !sub_response_tx.same_channel(value));
285        }
286        SubscriptionMessage::SubscribeEvents {
287            key,
288            sub_response_tx,
289        } => {
290            let mut events_sub_map = indexer.events_sub_map.lock().unwrap();
291            match events_sub_map.get_mut(&key) {
292                Some(txs) => {
293                    txs.push(sub_response_tx);
294                }
295                None => {
296                    let txs = vec![sub_response_tx];
297                    events_sub_map.insert(key, txs);
298                }
299            };
300        }
301        SubscriptionMessage::UnsubscribeEvents {
302            key,
303            sub_response_tx,
304        } => {
305            let mut events_sub_map = indexer.events_sub_map.lock().unwrap();
306            match events_sub_map.get_mut(&key) {
307                Some(txs) => {
308                    txs.retain(|value| !sub_response_tx.same_channel(value));
309                }
310                None => {}
311            };
312        }
313    };
314}
315
316pub async fn substrate_index<R: RuntimeIndexer>(
317    trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
318    api: OnlineClient<R::RuntimeConfig>,
319    rpc: LegacyRpcMethods<R::RuntimeConfig>,
320    queue_depth: u32,
321    index_variant: bool,
322    mut exit_rx: watch::Receiver<bool>,
323    mut sub_rx: mpsc::UnboundedReceiver<SubscriptionMessage<R::ChainKey>>,
324) -> Result<(), IndexError> {
325    info!(
326        "📇 Event variant indexing: {}",
327        match index_variant {
328            false => "disabled",
329            true => "enabled",
330        },
331    );
332    // Subscribe to all finalized blocks:
333    let mut blocks_sub = api.blocks().subscribe_finalized().await?;
334    // Determine the correct block to start batch indexing.
335    let mut next_batch_block: u32 = blocks_sub
336        .next()
337        .await
338        .ok_or(IndexError::BlockNotFound(0))??
339        .number()
340        .into()
341        .try_into()
342        .unwrap();
343    info!(
344        "📚 Indexing backwards from #{}",
345        next_batch_block.to_formatted_string(&Locale::en)
346    );
347    // Load already indexed spans from the db.
348    let mut spans = load_spans::<R>(&trees.span, index_variant)?;
349    // If the first head block to be indexed will be touching the last span (the indexer was restarted), set the current span to the last span. Otherwise there will be no batch block indexed to connect the current span to the last span.
350    let mut current_span = if let Some(span) = spans.last()
351        && span.end == next_batch_block
352    {
353        let span = span.clone();
354        let skipped = span.end - span.start + 1;
355        info!(
356            "📚 Skipping {} blocks from #{} to #{}",
357            skipped.to_formatted_string(&Locale::en),
358            span.start.to_formatted_string(&Locale::en),
359            span.end.to_formatted_string(&Locale::en),
360        );
361        // Remove the span.
362        trees.span.remove(span.end.to_be_bytes())?;
363        spans.pop();
364        next_batch_block = span.start - 1;
365        span
366    } else {
367        Span {
368            start: next_batch_block + 1,
369            end: next_batch_block + 1,
370        }
371    };
372
373    let indexer = Indexer::<R>::new(trees.clone(), api, rpc, index_variant);
374
375    let mut head_future = Box::pin(indexer.index_head(blocks_sub.next()));
376
377    info!("📚 Queue depth: {}", queue_depth);
378    let mut futures = Vec::with_capacity(queue_depth.try_into().unwrap());
379
380    for _ in 0..queue_depth {
381        check_next_batch_block(&spans, &mut next_batch_block);
382        futures.push(Box::pin(indexer.index_block(next_batch_block)));
383        debug!(
384            "⬆️  Block #{} queued.",
385            next_batch_block.to_formatted_string(&Locale::en)
386        );
387        next_batch_block -= 1;
388    }
389
390    let mut orphans: AHashMap<u32, ()> = AHashMap::new();
391
392    let mut stats_block_count = 0;
393    let mut stats_event_count = 0;
394    let mut stats_key_count = 0;
395    let mut stats_start_time = Instant::now();
396
397    let interval_duration = Duration::from_millis(2000);
398    let mut interval = time::interval_at(Instant::now() + interval_duration, interval_duration);
399    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
400
401    let mut is_batching = true;
402
403    loop {
404        tokio::select! {
405            biased;
406
407            _ = exit_rx.changed() => {
408                if current_span.start != current_span.end {
409                    let value = SpanDbValue {
410                        start: current_span.start.into(),
411                        version: (R::get_versions().len() - 1).try_into().unwrap(),
412                        index_variant: index_variant.into(),
413                    };
414                    trees.span.insert(current_span.end.to_be_bytes(), value.as_bytes())?;
415                    info!(
416                        "📚 Recording current indexed span from #{} to #{}",
417                        current_span.start.to_formatted_string(&Locale::en),
418                        current_span.end.to_formatted_string(&Locale::en)
419                    );
420                }
421                return Ok(());
422            }
423            Some(msg) = sub_rx.recv() => process_sub_msg(&indexer, msg),
424            result = &mut head_future => {
425                match result {
426                    Ok((block_number, event_count, key_count)) => {
427                        trees.span.remove(current_span.end.to_be_bytes())?;
428                        current_span.end = block_number;
429                        let value = SpanDbValue {
430                            start: current_span.start.into(),
431                            version: (R::get_versions().len() - 1).try_into().unwrap(),
432                            index_variant: index_variant.into(),
433                        };
434                        trees.span.insert(current_span.end.to_be_bytes(), value.as_bytes())?;
435                        info!(
436                            "✨ #{}: {} events, {} keys",
437                            block_number.to_formatted_string(&Locale::en),
438                            event_count.to_formatted_string(&Locale::en),
439                            key_count.to_formatted_string(&Locale::en),
440                        );
441                        indexer.notify_status_subscribers();
442                        drop(head_future);
443                        head_future = Box::pin(indexer.index_head(blocks_sub.next()));
444                    },
445                    Err(error) => {
446                        match error {
447                            IndexError::BlockNotFound(block_number) => {
448                                error!("✨ Block not found #{}", block_number.to_formatted_string(&Locale::en));
449                            },
450                            err => {
451                                error!("✨ Indexing failed: {}", err);
452                            },
453                        }
454                    },
455                };
456            }
457            _ = interval.tick(), if is_batching => {
458                let current_time = Instant::now();
459                let duration = (current_time.duration_since(stats_start_time)).as_micros();
460                if duration != 0 {
461                    info!(
462                        "📚 #{}: {} blocks/sec, {} events/sec, {} keys/sec",
463                        current_span.start.to_formatted_string(&Locale::en),
464                        (<u32 as Into<u128>>::into(stats_block_count) * 1_000_000 / duration).to_formatted_string(&Locale::en),
465                        (<u32 as Into<u128>>::into(stats_event_count) * 1_000_000 / duration).to_formatted_string(&Locale::en),
466                        (<u32 as Into<u128>>::into(stats_key_count) * 1_000_000 / duration).to_formatted_string(&Locale::en),
467                    );
468                }
469                stats_block_count = 0;
470                stats_event_count = 0;
471                stats_key_count = 0;
472                stats_start_time = current_time;
473            }
474            (result, index, _) = future::select_all(&mut futures), if is_batching => {
475                match result {
476                    Ok((block_number, event_count, key_count)) => {
477                        // Is the new block contiguous to the current span or an orphan?
478                        if block_number == current_span.start - 1 {
479                            current_span.start = block_number;
480                            debug!("⬇️  Block #{} indexed.", block_number.to_formatted_string(&Locale::en));
481                            check_span(&trees.span, &mut spans, &mut current_span)?;
482                            // Check if any orphans are now contiguous.
483                            while orphans.contains_key(&(current_span.start - 1)) {
484                                current_span.start -= 1;
485                                orphans.remove(&current_span.start);
486                                debug!("➡️  Block #{} unorphaned.", current_span.start.to_formatted_string(&Locale::en));
487                                check_span(&trees.span, &mut spans, &mut current_span)?;
488                            }
489                        }
490                        else {
491                            orphans.insert(block_number, ());
492                            debug!("⬇️  Block #{} indexed and orphaned.", block_number.to_formatted_string(&Locale::en));
493                        }
494                        stats_block_count += 1;
495                        stats_event_count += event_count;
496                        stats_key_count += key_count;
497                    },
498                    Err(error) => {
499                        match error {
500                            IndexError::BlockNotFound(block_number) => {
501                                error!("📚 Block not found #{}", block_number.to_formatted_string(&Locale::en));
502                                is_batching = false;
503                            },
504                            _ => {
505                                error!("📚 Batch indexing failed: {:?}", error);
506                                is_batching = false;
507                            },
508                        }
509                    }
510                }
511                check_next_batch_block(&spans, &mut next_batch_block);
512                futures[index] = Box::pin(indexer.index_block(next_batch_block));
513                debug!("⬆️  Block #{} queued.", next_batch_block.to_formatted_string(&Locale::en));
514                next_batch_block -= 1;
515            }
516        }
517    }
518}