hybrid_indexer/
shared.rs

1use byteorder::BigEndian;
2use serde::{Deserialize, Serialize};
3use sled::{Db, Tree};
4use std::fmt;
5use std::hash::Hash;
6use tokio::sync::mpsc::UnboundedSender;
7use tokio_tungstenite::tungstenite;
8use zerocopy::{
9    byteorder::{U16, U32},
10    AsBytes,
11};
12use zerocopy_derive::{AsBytes, FromBytes, FromZeroes, Unaligned};
13
14/// Errors this crate can return
15#[derive(thiserror::Error, Debug)]
16pub enum IndexError {
17    #[error("database error")]
18    Sled(#[from] sled::Error),
19    #[error("connection error")]
20    Subxt(#[from] subxt::Error),
21    #[error("connection error")]
22    Tungstenite(#[from] tungstenite::Error),
23    #[error("parse error")]
24    Hex(#[from] hex::FromHexError),
25    #[error("parse error")]
26    ParseError,
27    #[error("connection error")]
28    BlockNotFound(u32),
29}
30
31/// Indexer for a specific chain
32pub trait RuntimeIndexer {
33    type RuntimeConfig: subxt::Config;
34    type ChainKey: IndexKey
35        + Serialize
36        + for<'a> Deserialize<'a>
37        + Clone
38        + Eq
39        + PartialEq
40        + Hash
41        + Send;
42
43    fn get_name() -> &'static str;
44
45    fn get_genesis_hash() -> <Self::RuntimeConfig as subxt::Config>::Hash;
46
47    fn get_versions() -> &'static [u32];
48
49    fn get_default_url() -> &'static str;
50
51    fn process_event(
52        indexer: &crate::Indexer<Self>,
53        block_number: u32,
54        event_index: u16,
55        event: subxt::events::EventDetails<Self::RuntimeConfig>,
56    ) -> Result<u32, IndexError>;
57}
58
59pub trait IndexTrees {
60    fn open(db: &Db) -> Result<Self, sled::Error>
61    where
62        Self: Sized;
63    fn flush(&self) -> Result<(), sled::Error>;
64}
65
66/// Database trees for built-in Substrate keys
67#[derive(Clone)]
68pub struct SubstrateTrees {
69    pub account_id: Tree,
70    pub account_index: Tree,
71    pub bounty_index: Tree,
72    pub era_index: Tree,
73    pub message_id: Tree,
74    pub pool_id: Tree,
75    pub preimage_hash: Tree,
76    pub proposal_hash: Tree,
77    pub proposal_index: Tree,
78    pub ref_index: Tree,
79    pub registrar_index: Tree,
80    pub session_index: Tree,
81    pub tip_hash: Tree,
82}
83
84impl SubstrateTrees {
85    pub fn open(db: &Db) -> Result<Self, sled::Error> {
86        Ok(SubstrateTrees {
87            account_id: db.open_tree(b"account_id")?,
88            account_index: db.open_tree(b"account_index")?,
89            bounty_index: db.open_tree(b"bounty_index")?,
90            era_index: db.open_tree(b"era_index")?,
91            message_id: db.open_tree(b"message_id")?,
92            pool_id: db.open_tree(b"pool_id")?,
93            preimage_hash: db.open_tree(b"preimage_hash")?,
94            proposal_hash: db.open_tree(b"proposal_hash")?,
95            proposal_index: db.open_tree(b"proposal_index")?,
96            ref_index: db.open_tree(b"ref_index")?,
97            registrar_index: db.open_tree(b"registrar_index")?,
98            session_index: db.open_tree(b"session_index")?,
99            tip_hash: db.open_tree(b"tip_hash")?,
100        })
101    }
102
103    pub fn flush(&self) -> Result<(), sled::Error> {
104        self.account_id.flush()?;
105        self.account_index.flush()?;
106        self.bounty_index.flush()?;
107        self.era_index.flush()?;
108        self.message_id.flush()?;
109        self.pool_id.flush()?;
110        self.preimage_hash.flush()?;
111        self.proposal_hash.flush()?;
112        self.proposal_index.flush()?;
113        self.ref_index.flush()?;
114        self.registrar_index.flush()?;
115        self.session_index.flush()?;
116        self.tip_hash.flush()?;
117        Ok(())
118    }
119}
120
121/// Database trees for the indexer
122#[derive(Clone)]
123pub struct Trees<CT> {
124    pub root: sled::Db,
125    pub span: Tree,
126    pub variant: Tree,
127    pub substrate: SubstrateTrees,
128    pub chain: CT,
129}
130
131/// On-disk format for variant keys
132#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
133#[repr(C)]
134pub struct VariantKey {
135    pub pallet_index: u8,
136    pub variant_index: u8,
137    pub block_number: U32<BigEndian>,
138    pub event_index: U16<BigEndian>,
139}
140
141/// On-disk format for 32-byte keys
142#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
143#[repr(C)]
144pub struct Bytes32Key {
145    pub key: [u8; 32],
146    pub block_number: U32<BigEndian>,
147    pub event_index: U16<BigEndian>,
148}
149
150/// On-disk format for u32 keys
151#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
152#[repr(C)]
153pub struct U32Key {
154    pub key: U32<BigEndian>,
155    pub block_number: U32<BigEndian>,
156    pub event_index: U16<BigEndian>,
157}
158
159/// Datatype to hold 32-byte keys
160#[derive(Copy, Clone, Debug, PartialEq, Hash, Eq)]
161pub struct Bytes32(pub [u8; 32]);
162
163impl AsRef<[u8; 32]> for Bytes32 {
164    fn as_ref(&self) -> &[u8; 32] {
165        &self.0
166    }
167}
168
169impl From<[u8; 32]> for Bytes32 {
170    fn from(x: [u8; 32]) -> Self {
171        Bytes32(x)
172    }
173}
174
175impl AsRef<[u8]> for Bytes32 {
176    fn as_ref(&self) -> &[u8] {
177        &self.0[..]
178    }
179}
180
181impl Serialize for Bytes32 {
182    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
183    where
184        S: serde::Serializer,
185    {
186        let mut hex_string = "0x".to_owned();
187        hex_string.push_str(&hex::encode(self.0));
188        serializer.serialize_str(&hex_string)
189    }
190}
191
192impl<'de> Deserialize<'de> for Bytes32 {
193    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
194    where
195        D: serde::Deserializer<'de>,
196    {
197        match String::deserialize(deserializer)?.get(2..66) {
198            Some(message_id) => match hex::decode(message_id) {
199                Ok(message_id) => Ok(Bytes32(message_id.try_into().unwrap())),
200                Err(_error) => Err(serde::de::Error::custom("error")),
201            },
202            None => Err(serde::de::Error::custom("error")),
203        }
204    }
205}
206
207impl std::str::FromStr for Bytes32 {
208    type Err = IndexError;
209
210    fn from_str(s: &str) -> Result<Self, Self::Err> {
211        Ok(Bytes32(
212            hex::decode(s)?
213                .try_into()
214                .map_err(|_| IndexError::ParseError)?,
215        ))
216    }
217}
218
219/// All the key types that are built-in to Substrate
220#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
221#[serde(tag = "type", content = "value")]
222pub enum SubstrateKey {
223    AccountId(Bytes32),
224    AccountIndex(u32),
225    BountyIndex(u32),
226    EraIndex(u32),
227    MessageId(Bytes32),
228    PoolId(u32),
229    PreimageHash(Bytes32),
230    ProposalHash(Bytes32),
231    ProposalIndex(u32),
232    RefIndex(u32),
233    RegistrarIndex(u32),
234    SessionIndex(u32),
235    TipHash(Bytes32),
236}
237
238impl SubstrateKey {
239    pub fn write_db_key(
240        &self,
241        trees: &SubstrateTrees,
242        block_number: u32,
243        event_index: u16,
244    ) -> Result<(), sled::Error> {
245        let block_number = block_number.into();
246        let event_index = event_index.into();
247        match self {
248            SubstrateKey::AccountId(account_id) => {
249                let key = Bytes32Key {
250                    key: account_id.0,
251                    block_number,
252                    event_index,
253                };
254                trees.account_id.insert(key.as_bytes(), &[])?
255            }
256            SubstrateKey::AccountIndex(account_index) => {
257                let key = U32Key {
258                    key: (*account_index).into(),
259                    block_number,
260                    event_index,
261                };
262                trees.account_index.insert(key.as_bytes(), &[])?
263            }
264            SubstrateKey::BountyIndex(bounty_index) => {
265                let key = U32Key {
266                    key: (*bounty_index).into(),
267                    block_number,
268                    event_index,
269                };
270                trees.bounty_index.insert(key.as_bytes(), &[])?
271            }
272            SubstrateKey::EraIndex(era_index) => {
273                let key = U32Key {
274                    key: (*era_index).into(),
275                    block_number,
276                    event_index,
277                };
278                trees.era_index.insert(key.as_bytes(), &[])?
279            }
280            SubstrateKey::MessageId(message_id) => {
281                let key = Bytes32Key {
282                    key: message_id.0,
283                    block_number,
284                    event_index,
285                };
286                trees.message_id.insert(key.as_bytes(), &[])?
287            }
288            SubstrateKey::PoolId(pool_id) => {
289                let key = U32Key {
290                    key: (*pool_id).into(),
291                    block_number,
292                    event_index,
293                };
294                trees.pool_id.insert(key.as_bytes(), &[])?
295            }
296            SubstrateKey::PreimageHash(preimage_hash) => {
297                let key = Bytes32Key {
298                    key: preimage_hash.0,
299                    block_number,
300                    event_index,
301                };
302                trees.preimage_hash.insert(key.as_bytes(), &[])?
303            }
304            SubstrateKey::ProposalHash(proposal_hash) => {
305                let key = Bytes32Key {
306                    key: proposal_hash.0,
307                    block_number,
308                    event_index,
309                };
310                trees.proposal_hash.insert(key.as_bytes(), &[])?
311            }
312            SubstrateKey::ProposalIndex(proposal_index) => {
313                let key = U32Key {
314                    key: (*proposal_index).into(),
315                    block_number,
316                    event_index,
317                };
318                trees.proposal_index.insert(key.as_bytes(), &[])?
319            }
320            SubstrateKey::RefIndex(ref_index) => {
321                let key = U32Key {
322                    key: (*ref_index).into(),
323                    block_number,
324                    event_index,
325                };
326                trees.ref_index.insert(key.as_bytes(), &[])?
327            }
328            SubstrateKey::RegistrarIndex(registrar_index) => {
329                let key = U32Key {
330                    key: (*registrar_index).into(),
331                    block_number,
332                    event_index,
333                };
334                trees.registrar_index.insert(key.as_bytes(), &[])?
335            }
336            SubstrateKey::SessionIndex(session_index) => {
337                let key = U32Key {
338                    key: (*session_index).into(),
339                    block_number,
340                    event_index,
341                };
342                trees.session_index.insert(key.as_bytes(), &[])?
343            }
344            SubstrateKey::TipHash(tip_hash) => {
345                let key = Bytes32Key {
346                    key: tip_hash.0,
347                    block_number,
348                    event_index,
349                };
350                trees.tip_hash.insert(key.as_bytes(), &[])?
351            }
352        };
353        Ok(())
354    }
355}
356
357pub trait IndexKey {
358    type ChainTrees: IndexTrees + Send + Sync + Clone;
359
360    fn write_db_key(
361        &self,
362        trees: &Self::ChainTrees,
363        block_number: u32,
364        event_index: u16,
365    ) -> Result<(), sled::Error>;
366
367    fn get_key_events(&self, trees: &Self::ChainTrees) -> Vec<Event>;
368}
369
370/// All the key types for the chain
371#[derive(Serialize, Deserialize, Clone, Debug, Eq, PartialEq, Hash)]
372#[serde(tag = "type", content = "value")]
373pub enum Key<CK: IndexKey> {
374    Variant(u8, u8),
375    Substrate(SubstrateKey),
376    Chain(CK),
377}
378
379impl<CK: IndexKey> Key<CK> {
380    pub fn write_db_key(
381        &self,
382        trees: &Trees<CK::ChainTrees>,
383        block_number: u32,
384        event_index: u16,
385    ) -> Result<(), sled::Error> {
386        match self {
387            Key::Variant(pallet_index, variant_index) => {
388                let key = VariantKey {
389                    pallet_index: *pallet_index,
390                    variant_index: *variant_index,
391                    block_number: block_number.into(),
392                    event_index: event_index.into(),
393                };
394                trees.variant.insert(key.as_bytes(), &[])?;
395            }
396            Key::Substrate(substrate_key) => {
397                substrate_key.write_db_key(&trees.substrate, block_number, event_index)?;
398            }
399            Key::Chain(chain_key) => {
400                chain_key.write_db_key(&trees.chain, block_number, event_index)?;
401            }
402        };
403        Ok(())
404    }
405}
406
407/// JSON request messages
408#[derive(Deserialize, Debug, Clone)]
409#[serde(tag = "type")]
410pub enum RequestMessage<CK: IndexKey> {
411    Status,
412    SubscribeStatus,
413    UnsubscribeStatus,
414    Variants,
415    GetEvents { key: Key<CK> },
416    SubscribeEvents { key: Key<CK> },
417    UnsubscribeEvents { key: Key<CK> },
418    SizeOnDisk,
419}
420
421/// Identifies an event by block number and event index
422#[derive(Serialize, Debug, Clone, Deserialize, PartialEq)]
423#[serde(rename_all = "camelCase")]
424pub struct Event {
425    pub block_number: u32,
426    pub event_index: u16,
427}
428
429impl fmt::Display for Event {
430    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
431        write!(
432            f,
433            "block number: {}, event index: {}",
434            self.block_number, self.event_index
435        )
436    }
437}
438
439/// Index and name of an event type
440#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
441pub struct EventMeta {
442    pub index: u8,
443    pub name: String,
444}
445
446/// Index, name and list of event types for a pallet
447#[derive(Serialize, Debug, Clone, Deserialize, PartialEq)]
448pub struct PalletMeta {
449    pub index: u8,
450    pub name: String,
451    pub events: Vec<EventMeta>,
452}
453
454/// On-disk format for span value
455#[derive(FromZeroes, FromBytes, AsBytes, Unaligned, PartialEq, Debug)]
456#[repr(C)]
457pub struct SpanDbValue {
458    pub start: U32<BigEndian>,
459    pub version: U16<BigEndian>,
460    pub index_variant: u8,
461}
462
463/// Start and end block number for a span of blocks
464#[derive(Serialize, Debug, Clone, PartialEq, Deserialize)]
465pub struct Span {
466    pub start: u32,
467    pub end: u32,
468}
469
470impl fmt::Display for Span {
471    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
472        write!(f, "start: {}, end: {}", self.start, self.end)
473    }
474}
475
476/// JSON response messages
477#[derive(Serialize, Debug, Clone)]
478#[serde(tag = "type", content = "data")]
479#[serde(rename_all = "camelCase")]
480pub enum ResponseMessage<CK: IndexKey> {
481    Status(Vec<Span>),
482    Variants(Vec<PalletMeta>),
483    Events { key: Key<CK>, events: Vec<Event> },
484    Subscribed,
485    Unsubscribed,
486    SizeOnDisk(u64),
487    //    Error,
488}
489
490/// Subscription message sent from a WebSocket connection thread to the indexer thread
491#[derive(Debug)]
492pub enum SubscriptionMessage<CK: IndexKey> {
493    SubscribeStatus {
494        sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
495    },
496    UnsubscribeStatus {
497        sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
498    },
499    SubscribeEvents {
500        key: Key<CK>,
501        sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
502    },
503    UnsubscribeEvents {
504        key: Key<CK>,
505        sub_response_tx: UnboundedSender<ResponseMessage<CK>>,
506    },
507}