hybrid_indexer/
lib.rs

1//! # Hybrid Indexer
2//!
3//! A library for indexing events from Substrate blockchains.
4
5#![feature(let_chains)]
6use byte_unit::Byte;
7use futures::StreamExt;
8use signal_hook::{consts::TERM_SIGNALS, flag};
9use signal_hook_tokio::Signals;
10use std::{
11    path::PathBuf,
12    process::exit,
13    sync::{atomic::AtomicBool, Arc},
14};
15use subxt::{
16    backend::{legacy::LegacyRpcMethods, rpc::RpcClient},
17    OnlineClient,
18};
19use tokio::{
20    join, spawn,
21    sync::{mpsc, watch},
22};
23use tracing::{error, info};
24use tracing_subscriber::filter::LevelFilter;
25
26pub mod shared;
27pub mod substrate;
28pub mod substrate_pallets;
29pub mod websockets;
30
31use crate::shared::*;
32use substrate::*;
33use websockets::websockets_listen;
34
35#[cfg(test)]
36mod tests;
37
38pub fn open_trees<R: RuntimeIndexer>(
39    db_config: sled::Config,
40) -> Result<Trees<<R::ChainKey as IndexKey>::ChainTrees>, sled::Error> {
41    let db = db_config.open()?;
42    let trees = Trees {
43        root: db.clone(),
44        span: db.open_tree(b"span")?,
45        variant: db.open_tree(b"variant")?,
46        // Each event parameter to be indexed has its own tree.
47        substrate: SubstrateTrees::open(&db)?,
48        chain: <R::ChainKey as IndexKey>::ChainTrees::open(&db)?,
49    };
50    Ok(trees)
51}
52
53pub fn close_trees<R: RuntimeIndexer>(
54    trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
55) -> Result<(), sled::Error> {
56    info!("Closing db.");
57    trees.root.flush()?;
58    trees.span.flush()?;
59    trees.variant.flush()?;
60    trees.substrate.flush()?;
61    Ok(())
62}
63
64/// Starts the indexer. Chain is defined by `R`.
65#[allow(clippy::too_many_arguments)]
66pub async fn start<R: RuntimeIndexer + 'static>(
67    db_path: Option<String>,
68    db_mode: sled::Mode,
69    db_cache_capacity: u64,
70    url: Option<String>,
71    queue_depth: u8,
72    index_variant: bool,
73    port: u16,
74    log_level: LevelFilter,
75) {
76    tracing_subscriber::fmt().with_max_level(log_level).init();
77    let name = R::get_name();
78    info!("Indexing {}", name);
79    let genesis_hash_config = R::get_genesis_hash().as_ref().to_vec();
80    // Open database.
81    let db_path = match db_path {
82        Some(db_path) => PathBuf::from(db_path),
83        None => match home::home_dir() {
84            Some(mut db_path) => {
85                db_path.push(".local/share/hybrid-indexer");
86                db_path.push(name);
87                db_path.push("db");
88                db_path
89            }
90            None => {
91                error!("No home directory.");
92                exit(1);
93            }
94        },
95    };
96    info!("Database path: {}", db_path.display());
97    info!("Database mode: {:?}", db_mode);
98    info!(
99        "Database cache capacity: {}",
100        Byte::from_bytes(db_cache_capacity.into()).get_appropriate_unit(true)
101    );
102    let db_config = sled::Config::new()
103        .path(db_path)
104        .mode(db_mode)
105        .cache_capacity(db_cache_capacity);
106    let trees = match open_trees::<R>(db_config) {
107        Ok(trees) => trees,
108        Err(_) => {
109            error!("Failed to open database.");
110            exit(1);
111        }
112    };
113    let genesis_hash_db = match trees.root.get("genesis_hash").unwrap() {
114        Some(value) => value.to_vec(),
115        //    vector_as_u8_32_array(&value),
116        None => {
117            trees
118                .root
119                .insert("genesis_hash", genesis_hash_config.clone())
120                .unwrap();
121            genesis_hash_config.clone()
122        }
123    };
124
125    if genesis_hash_db != genesis_hash_config {
126        error!("Database has wrong genesis hash.");
127        error!("Correct hash:  0x{}", hex::encode(genesis_hash_config));
128        error!("Database hash: 0x{}", hex::encode(genesis_hash_db));
129        let _ = close_trees::<R>(trees);
130        exit(1);
131    }
132    // Determine url of Substrate node to connect to.
133    let url = match url {
134        Some(url) => url,
135        None => R::get_default_url().to_owned(),
136    };
137    info!("Connecting to: {}", url);
138    let rpc_client = match RpcClient::from_url(&url).await {
139        Ok(rpc_client) => rpc_client,
140        Err(err) => {
141            error!("Failed to connect: {}", err);
142            let _ = close_trees::<R>(trees);
143            exit(1);
144        }
145    };
146    let api = match OnlineClient::<R::RuntimeConfig>::from_rpc_client(rpc_client.clone()).await {
147        Ok(api) => api,
148        Err(err) => {
149            error!("Failed to connect: {}", err);
150            let _ = close_trees::<R>(trees);
151            exit(1);
152        }
153    };
154    let rpc = LegacyRpcMethods::<R::RuntimeConfig>::new(rpc_client);
155
156    let genesis_hash_api = api.genesis_hash().as_ref().to_vec();
157
158    if genesis_hash_api != genesis_hash_config {
159        error!("Chain has wrong genesis hash.");
160        error!("Correct hash: 0x{}", hex::encode(genesis_hash_config));
161        error!("Chain hash:   0x{}", hex::encode(genesis_hash_api));
162        let _ = close_trees::<R>(trees);
163        exit(1);
164    }
165    // https://docs.rs/signal-hook/0.3.17/signal_hook/#a-complex-signal-handling-with-a-background-thread
166    // Make sure double CTRL+C and similar kills.
167    let term_now = Arc::new(AtomicBool::new(false));
168    for sig in TERM_SIGNALS {
169        // When terminated by a second term signal, exit with exit code 1.
170        // This will do nothing the first time (because term_now is false).
171        flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now)).unwrap();
172        // But this will "arm" the above for the second time, by setting it to true.
173        // The order of registering these is important, if you put this one first, it will
174        // first arm and then terminate ‒ all in the first round.
175        flag::register(*sig, Arc::clone(&term_now)).unwrap();
176    }
177    // Create a watch channel to exit the program.
178    let (exit_tx, exit_rx) = watch::channel(false);
179    // Create the channel for the websockets threads to send subscribe messages to the head thread.
180    let (sub_tx, sub_rx) = mpsc::unbounded_channel();
181    // Start indexer thread.
182    let substrate_index = spawn(substrate_index::<R>(
183        trees.clone(),
184        api.clone(),
185        rpc.clone(),
186        queue_depth.into(),
187        index_variant,
188        exit_rx.clone(),
189        sub_rx,
190    ));
191    // Spawn websockets task.
192    let websockets_task = spawn(websockets_listen::<R>(
193        trees.clone(),
194        rpc,
195        port,
196        exit_rx,
197        sub_tx,
198    ));
199    // Wait for signal.
200    let mut signals = Signals::new(TERM_SIGNALS).unwrap();
201    signals.next().await;
202    info!("Exiting.");
203    let _ = exit_tx.send(true);
204    // Wait to exit.
205    let _result = join!(substrate_index, websockets_task);
206    // Close db.
207    let _ = close_trees::<R>(trees);
208    exit(0);
209}