1use byteorder::BigEndian;
2use serde::{Deserialize, Serialize};
3use sled::{Db, Tree};
4use std::fmt;
5use std::hash::Hash;
6use tokio::sync::mpsc::UnboundedSender;
7use tokio_tungstenite::tungstenite;
8use zerocopy::{
9 byteorder::{U16, U32},
10 AsBytes,
11};
12use zerocopy_derive::{AsBytes, FromBytes, FromZeroes, Unaligned};
13
14#[derive(thiserror::Error, Debug)]
16pub enum IndexError {
17 #[error("database error")]
18 Sled(#[from] sled::Error),
19 #[error("connection error")]
20 Subxt(#[from] subxt::Error),
21 #[error("connection error")]
22 Tungstenite(#[from] tungstenite::Error),
23 #[error("parse error")]
24 Hex(#[from] hex::FromHexError),
25 #[error("parse error")]
26 ParseError,
27 #[error("connection error")]
28 BlockNotFound(u32),
29}
30
31pub trait RuntimeIndexer {
33 type RuntimeConfig: subxt::Config;
34 type ChainKey: IndexKey
35 + Serialize
36 + for<'a> Deserialize<'a>
37 + Clone
38 + Eq
39 + PartialEq
40 + Hash
41 + Send;
42
43 fn get_name() -> &'static str;
44
45 fn get_genesis_hash() -> <Self::RuntimeConfig as subxt::Config>::Hash;
46
47 fn get_versions() -> &'static [u32];
48
49 fn get_default_url() -> &'static str;
50
51 fn process_event(
52 indexer: &crate::Indexer<Self>,
53 block_number: u32,
54 event_index: u16,
55 event: subxt::events::EventDetails<Self::RuntimeConfig>,
56 ) -> Result<u32, IndexError>;
57}
58
59pub trait IndexTrees {
60 fn open(db: &Db) -> Result<Self, sled::Error>
61 where
62 Self: Sized;
63 fn flush(&self) -> Result<(), sled::Error>;
64}
65
66#[derive(Clone)]
68pub struct SubstrateTrees {
69 pub account_id: Tree,
70 pub account_index: Tree,
71 pub bounty_index: Tree,
72 pub era_index: Tree,
73 pub message_id: Tree,
74 pub pool_id: Tree,
75 pub preimage_hash: Tree,
76 pub proposal_hash: Tree,
77 pub proposal_index: Tree,
78 pub ref_index: Tree,
79 pub registrar_index: Tree,
80 pub session_index: Tree,
81 pub tip_hash: Tree,
82}
83
84impl SubstrateTrees {
85 pub fn open(db: &Db) -> Result<Self, sled::Error> {
86 Ok(SubstrateTrees {
87 account_id: db.open_tree(b"account_id")?,
88 account_index: db.open_tree(b"account_index")?,
89 bounty_index: db.open_tree(b"bounty_index")?,
90 era_index: db.open_tree(b"era_index")?,
91 message_id: db.open_tree(b"message_id")?,
92 pool_id: db.open_tree(b"pool_id")?,
93 preimage_hash: db.open_tree(b"preimage_hash")?,
94 proposal_hash: db.open_tree(b"proposal_hash")?,
95 proposal_index: db.open_tree(b"proposal_index")?,
96 ref_index: db.open_tree(b"ref_index")?,
97 registrar_index: db.open_tree(b"registrar_index")?,
98 session_index: db.open_tree(b"session_index")?,
99 tip_hash: db.open_tree(b"tip_hash")?,
100 })
101 }
102
103 pub fn flush(&self) -> Result<(), sled::Error> {
104 self.account_id.flush()?;
105 self.account_index.flush()?;
106 self.bounty_index.flush()?;
107 self.era_index.flush()?;
108 self.message_id.flush()?;
109 self.pool_id.flush()?;
110 self.preimage_hash.flush()?;
111 self.proposal_hash.flush()?;
112 self.proposal_index.flush()?;
113 self.ref_index.flush()?;
114 self.registrar_index.flush()?;
115 self.session_index.flush()?;
116 self.tip_hash.flush()?;
117 Ok(())
118 }
119}
120
121#[derive(Clone)]
123pub struct Trees<CT> {
124 pub root: sled::Db,
125 pub span: Tree,
126 pub variant: Tree,
127 pub substrate: SubstrateTrees,
128 pub chain: CT,
129}
130
131#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
133#[repr(C)]
134pub struct VariantKey {
135 pub pallet_index: u8,
136 pub variant_index: u8,
137 pub block_number: U32<BigEndian>,
138 pub event_index: U16<BigEndian>,
139}
140
141#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
143#[repr(C)]
144pub struct Bytes32Key {
145 pub key: [u8; 32],
146 pub block_number: U32<BigEndian>,
147 pub event_index: U16<BigEndian>,
148}
149
150#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
152#[repr(C)]
153pub struct U32Key {
154 pub key: U32<BigEndian>,
155 pub block_number: U32<BigEndian>,
156 pub event_index: U16<BigEndian>,
157}
158
159#[derive(Copy, Clone, Debug, PartialEq, Hash, Eq)]
161pub struct Bytes32(pub [u8; 32]);
162
163impl AsRef<[u8; 32]> for Bytes32 {
164 fn as_ref(&self) -> &[u8; 32] {
165 &self.0
166 }
167}
168
169impl From<[u8; 32]> for Bytes32 {
170 fn from(x: [u8; 32]) -> Self {
171 Bytes32(x)
172 }
173}
174
175impl AsRef<[u8]> for Bytes32 {
176 fn as_ref(&self) -> &[u8] {
177 &self.0[..]
178 }
179}
180
181impl Serialize for Bytes32 {
182 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
183 where
184 S: serde::Serializer,
185 {
186 let mut hex_string = "0x".to_owned();
187 hex_string.push_str(&hex::encode(self.0));
188 serializer.serialize_str(&hex_string)
189 }
190}
191
192impl<'de> Deserialize<'de> for Bytes32 {
193 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
194 where
195 D: serde::Deserializer<'de>,
196 {
197 match String::deserialize(deserializer)?.get(2..66) {
198 Some(message_id) => match hex::decode(message_id) {
199 Ok(message_id) => Ok(Bytes32(message_id.try_into().unwrap())),
200 Err(_error) => Err(serde::de::Error::custom("error")),
201 },
202 None => Err(serde::de::Error::custom("error")),
203 }
204 }
205}
206
207impl std::str::FromStr for Bytes32 {
208 type Err = IndexError;
209
210 fn from_str(s: &str) -> Result<Self, Self::Err> {
211 Ok(Bytes32(
212 hex::decode(s)?
213 .try_into()
214 .map_err(|_| IndexError::ParseError)?,
215 ))
216 }
217}
218
219#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
221#[serde(tag = "type", content = "value")]
222pub enum SubstrateKey {
223 AccountId(Bytes32),
224 AccountIndex(u32),
225 BountyIndex(u32),
226 EraIndex(u32),
227 MessageId(Bytes32),
228 PoolId(u32),
229 PreimageHash(Bytes32),
230 ProposalHash(Bytes32),
231 ProposalIndex(u32),
232 RefIndex(u32),
233 RegistrarIndex(u32),
234 SessionIndex(u32),
235 TipHash(Bytes32),
236}
237
238impl SubstrateKey {
239 pub fn write_db_key(
240 &self,
241 trees: &SubstrateTrees,
242 block_number: u32,
243 event_index: u16,
244 ) -> Result<(), sled::Error> {
245 let block_number = block_number.into();
246 let event_index = event_index.into();
247 match self {
248 SubstrateKey::AccountId(account_id) => {
249 let key = Bytes32Key {
250 key: account_id.0,
251 block_number,
252 event_index,
253 };
254 trees.account_id.insert(key.as_bytes(), &[])?
255 }
256 SubstrateKey::AccountIndex(account_index) => {
257 let key = U32Key {
258 key: (*account_index).into(),
259 block_number,
260 event_index,
261 };
262 trees.account_index.insert(key.as_bytes(), &[])?
263 }
264 SubstrateKey::BountyIndex(bounty_index) => {
265 let key = U32Key {
266 key: (*bounty_index).into(),
267 block_number,
268 event_index,
269 };
270 trees.bounty_index.insert(key.as_bytes(), &[])?
271 }
272 SubstrateKey::EraIndex(era_index) => {
273 let key = U32Key {
274 key: (*era_index).into(),
275 block_number,
276 event_index,
277 };
278 trees.era_index.insert(key.as_bytes(), &[])?
279 }
280 SubstrateKey::MessageId(message_id) => {
281 let key = Bytes32Key {
282 key: message_id.0,
283 block_number,
284 event_index,
285 };
286 trees.message_id.insert(key.as_bytes(), &[])?
287 }
288 SubstrateKey::PoolId(pool_id) => {
289 let key = U32Key {
290 key: (*pool_id).into(),
291 block_number,
292 event_index,
293 };
294 trees.pool_id.insert(key.as_bytes(), &[])?
295 }
296 SubstrateKey::PreimageHash(preimage_hash) => {
297 let key = Bytes32Key {
298 key: preimage_hash.0,
299 block_number,
300 event_index,
301 };
302 trees.preimage_hash.insert(key.as_bytes(), &[])?
303 }
304 SubstrateKey::ProposalHash(proposal_hash) => {
305 let key = Bytes32Key {
306 key: proposal_hash.0,
307 block_number,
308 event_index,
309 };
310 trees.proposal_hash.insert(key.as_bytes(), &[])?
311 }
312 SubstrateKey::ProposalIndex(proposal_index) => {
313 let key = U32Key {
314 key: (*proposal_index).into(),
315 block_number,
316 event_index,
317 };
318 trees.proposal_index.insert(key.as_bytes(), &[])?
319 }
320 SubstrateKey::RefIndex(ref_index) => {
321 let key = U32Key {
322 key: (*ref_index).into(),
323 block_number,
324 event_index,
325 };
326 trees.ref_index.insert(key.as_bytes(), &[])?
327 }
328 SubstrateKey::RegistrarIndex(registrar_index) => {
329 let key = U32Key {
330 key: (*registrar_index).into(),
331 block_number,
332 event_index,
333 };
334 trees.registrar_index.insert(key.as_bytes(), &[])?
335 }
336 SubstrateKey::SessionIndex(session_index) => {
337 let key = U32Key {
338 key: (*session_index).into(),
339 block_number,
340 event_index,
341 };
342 trees.session_index.insert(key.as_bytes(), &[])?
343 }
344 SubstrateKey::TipHash(tip_hash) => {
345 let key = Bytes32Key {
346 key: tip_hash.0,
347 block_number,
348 event_index,
349 };
350 trees.tip_hash.insert(key.as_bytes(), &[])?
351 }
352 };
353 Ok(())
354 }
355}
356
357pub trait IndexKey {
358 type ChainTrees: IndexTrees + Send + Sync + Clone;
359
360 fn write_db_key(
361 &self,
362 trees: &Self::ChainTrees,
363 block_number: u32,
364 event_index: u16,
365 ) -> Result<(), sled::Error>;
366
367 fn get_key_events(&self, trees: &Self::ChainTrees) -> Vec<Event>;
368}
369
370#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
372#[serde(tag = "type", content = "value")]
373pub enum Key<CK: IndexKey> {
374 Variant(u8, u8),
375 Substrate(SubstrateKey),
376 Chain(CK),
377}
378
379impl<CK: IndexKey> Key<CK> {
380 pub fn write_db_key(
381 &self,
382 trees: &Trees<CK::ChainTrees>,
383 block_number: u32,
384 event_index: u16,
385 ) -> Result<(), sled::Error> {
386 match self {
387 Key::Variant(pallet_index, variant_index) => {
388 let key = VariantKey {
389 pallet_index: *pallet_index,
390 variant_index: *variant_index,
391 block_number: block_number.into(),
392 event_index: event_index.into(),
393 };
394 trees.variant.insert(key.as_bytes(), &[])?;
395 }
396 Key::Substrate(substrate_key) => {
397 substrate_key.write_db_key(&trees.substrate, block_number, event_index)?;
398 }
399 Key::Chain(chain_key) => {
400 chain_key.write_db_key(&trees.chain, block_number, event_index)?;
401 }
402 };
403 Ok(())
404 }
405}
406
407#[derive(Deserialize, Debug, Clone)]
409#[serde(tag = "type")]
410pub enum RequestMessage<CK: IndexKey> {
411 Status,
412 SubscribeStatus,
413 UnsubscribeStatus,
414 Variants,
415 GetEvents { key: Key<CK> },
416 SubscribeEvents { key: Key<CK> },
417 UnsubscribeEvents { key: Key<CK> },
418 SizeOnDisk,
419}
420
421#[derive(Serialize, Debug, Clone, Deserialize, PartialEq)]
423#[serde(rename_all = "camelCase")]
424pub struct Event {
425 pub block_number: u32,
426 pub event_index: u16,
427}
428
429impl fmt::Display for Event {
430 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
431 write!(
432 f,
433 "block number: {}, event index: {}",
434 self.block_number, self.event_index
435 )
436 }
437}
438
439#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
441pub struct EventMeta {
442 pub index: u8,
443 pub name: String,
444}
445
446#[derive(Serialize, Debug, Clone, Deserialize, PartialEq)]
448pub struct PalletMeta {
449 pub index: u8,
450 pub name: String,
451 pub events: Vec<EventMeta>,
452}
453
454#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
456#[repr(C)]
457pub struct SpanDbValue {
458 pub start: U32<BigEndian>,
459 pub version: U16<BigEndian>,
460 pub index_variant: u8,
461}
462
463#[derive(Serialize, Debug, Clone, PartialEq, Deserialize)]
465pub struct Span {
466 pub start: u32,
467 pub end: u32,
468}
469
470impl fmt::Display for Span {
471 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
472 write!(f, "start: {}, end: {}", self.start, self.end)
473 }
474}
475
476#[derive(Serialize, Debug, Clone)]
478#[serde(tag = "type", content = "data")]
479#[serde(rename_all = "camelCase")]
480pub enum ResponseMessage<CK: IndexKey> {
481 Status(Vec<Span>),
482 Variants(Vec<PalletMeta>),
483 Events { key: Key<CK>, events: Vec<Event> },
484 Subscribed,
485 Unsubscribed,
486 SizeOnDisk(u64),
487 }
489
490#[derive(Debug)]
492pub enum SubscriptionMessage<CK: IndexKey> {
493 SubscribeStatus {
494 sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
495 },
496 UnsubscribeStatus {
497 sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
498 },
499 SubscribeEvents {
500 key: Key<CK>,
501 sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
502 },
503 UnsubscribeEvents {
504 key: Key<CK>,
505 sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
506 },
507}