1#![feature(let_chains)]
6use byte_unit::Byte;
7use futures::StreamExt;
8use signal_hook::{consts::TERM_SIGNALS, flag};
9use signal_hook_tokio::Signals;
10use std::{
11 path::PathBuf,
12 process::exit,
13 sync::{atomic::AtomicBool, Arc},
14};
15use subxt::{
16 backend::{legacy::LegacyRpcMethods, rpc::RpcClient},
17 OnlineClient,
18};
19use tokio::{
20 join, spawn,
21 sync::{mpsc, watch},
22};
23use tracing::{error, info};
24use tracing_subscriber::filter::LevelFilter;
25
26pub mod shared;
27pub mod substrate;
28pub mod substrate_pallets;
29pub mod websockets;
30
31use crate::shared::*;
32use substrate::*;
33use websockets::websockets_listen;
34
35#[cfg(test)]
36mod tests;
37
38pub fn open_trees<R: RuntimeIndexer>(
39 db_config: sled::Config,
40) -> Result<Trees<<R::ChainKey as IndexKey>::ChainTrees>, sled::Error> {
41 let db = db_config.open()?;
42 let trees = Trees {
43 root: db.clone(),
44 span: db.open_tree(b"span")?,
45 variant: db.open_tree(b"variant")?,
46 substrate: SubstrateTrees::open(&db)?,
48 chain: <R::ChainKey as IndexKey>::ChainTrees::open(&db)?,
49 };
50 Ok(trees)
51}
52
53pub fn close_trees<R: RuntimeIndexer>(
54 trees: Trees<<R::ChainKey as IndexKey>::ChainTrees>,
55) -> Result<(), sled::Error> {
56 info!("Closing db.");
57 trees.root.flush()?;
58 trees.span.flush()?;
59 trees.variant.flush()?;
60 trees.substrate.flush()?;
61 Ok(())
62}
63
64#[allow(clippy::too_many_arguments)]
66pub async fn start<R: RuntimeIndexer + 'static>(
67 db_path: Option<String>,
68 db_mode: sled::Mode,
69 db_cache_capacity: u64,
70 url: Option<String>,
71 queue_depth: u8,
72 index_variant: bool,
73 port: u16,
74 log_level: LevelFilter,
75) {
76 tracing_subscriber::fmt().with_max_level(log_level).init();
77 let name = R::get_name();
78 info!("Indexing {}", name);
79 let genesis_hash_config = R::get_genesis_hash().as_ref().to_vec();
80 let db_path = match db_path {
82 Some(db_path) => PathBuf::from(db_path),
83 None => match home::home_dir() {
84 Some(mut db_path) => {
85 db_path.push(".local/share/hybrid-indexer");
86 db_path.push(name);
87 db_path.push("db");
88 db_path
89 }
90 None => {
91 error!("No home directory.");
92 exit(1);
93 }
94 },
95 };
96 info!("Database path: {}", db_path.display());
97 info!("Database mode: {:?}", db_mode);
98 info!(
99 "Database cache capacity: {}",
100 Byte::from_bytes(db_cache_capacity.into()).get_appropriate_unit(true)
101 );
102 let db_config = sled::Config::new()
103 .path(db_path)
104 .mode(db_mode)
105 .cache_capacity(db_cache_capacity);
106 let trees = match open_trees::<R>(db_config) {
107 Ok(trees) => trees,
108 Err(_) => {
109 error!("Failed to open database.");
110 exit(1);
111 }
112 };
113 let genesis_hash_db = match trees.root.get("genesis_hash").unwrap() {
114 Some(value) => value.to_vec(),
115 None => {
117 trees
118 .root
119 .insert("genesis_hash", genesis_hash_config.clone())
120 .unwrap();
121 genesis_hash_config.clone()
122 }
123 };
124
125 if genesis_hash_db != genesis_hash_config {
126 error!("Database has wrong genesis hash.");
127 error!("Correct hash: 0x{}", hex::encode(genesis_hash_config));
128 error!("Database hash: 0x{}", hex::encode(genesis_hash_db));
129 let _ = close_trees::<R>(trees);
130 exit(1);
131 }
132 let url = match url {
134 Some(url) => url,
135 None => R::get_default_url().to_owned(),
136 };
137 info!("Connecting to: {}", url);
138 let rpc_client = match RpcClient::from_url(&url).await {
139 Ok(rpc_client) => rpc_client,
140 Err(err) => {
141 error!("Failed to connect: {}", err);
142 let _ = close_trees::<R>(trees);
143 exit(1);
144 }
145 };
146 let api = match OnlineClient::<R::RuntimeConfig>::from_rpc_client(rpc_client.clone()).await {
147 Ok(api) => api,
148 Err(err) => {
149 error!("Failed to connect: {}", err);
150 let _ = close_trees::<R>(trees);
151 exit(1);
152 }
153 };
154 let rpc = LegacyRpcMethods::<R::RuntimeConfig>::new(rpc_client);
155
156 let genesis_hash_api = api.genesis_hash().as_ref().to_vec();
157
158 if genesis_hash_api != genesis_hash_config {
159 error!("Chain has wrong genesis hash.");
160 error!("Correct hash: 0x{}", hex::encode(genesis_hash_config));
161 error!("Chain hash: 0x{}", hex::encode(genesis_hash_api));
162 let _ = close_trees::<R>(trees);
163 exit(1);
164 }
165 let term_now = Arc::new(AtomicBool::new(false));
168 for sig in TERM_SIGNALS {
169 flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now)).unwrap();
172 flag::register(*sig, Arc::clone(&term_now)).unwrap();
176 }
177 let (exit_tx, exit_rx) = watch::channel(false);
179 let (sub_tx, sub_rx) = mpsc::unbounded_channel();
181 let substrate_index = spawn(substrate_index::<R>(
183 trees.clone(),
184 api.clone(),
185 rpc.clone(),
186 queue_depth.into(),
187 index_variant,
188 exit_rx.clone(),
189 sub_rx,
190 ));
191 let websockets_task = spawn(websockets_listen::<R>(
193 trees.clone(),
194 rpc,
195 port,
196 exit_rx,
197 sub_tx,
198 ));
199 let mut signals = Signals::new(TERM_SIGNALS).unwrap();
201 signals.next().await;
202 info!("Exiting.");
203 let _ = exit_tx.send(true);
204 let _result = join!(substrate_index, websockets_task);
206 let _ = close_trees::<R>(trees);
208 exit(0);
209}