polkadot_node_core_chain_api/
lib.rs1#![deny(unused_crate_dependencies, unused_results)]
32#![warn(missing_docs)]
33
34use std::sync::Arc;
35
36use futures::prelude::*;
37use sc_client_api::AuxStore;
38
39use futures::stream::StreamExt;
40use polkadot_node_subsystem::{
41 messages::ChainApiMessage, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem,
42 SubsystemError, SubsystemResult,
43};
44use polkadot_node_subsystem_types::ChainApiBackend;
45
46mod metrics;
47use self::metrics::Metrics;
48
49#[cfg(test)]
50mod tests;
51
52const LOG_TARGET: &str = "parachain::chain-api";
53
54pub struct ChainApiSubsystem<Client> {
56 client: Arc<Client>,
57 metrics: Metrics,
58}
59
60impl<Client> ChainApiSubsystem<Client> {
61 pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
63 ChainApiSubsystem { client, metrics }
64 }
65}
66
67#[overseer::subsystem(ChainApi, error = SubsystemError, prefix = self::overseer)]
68impl<Client, Context> ChainApiSubsystem<Client>
69where
70 Client: ChainApiBackend + AuxStore + 'static,
71{
72 fn start(self, ctx: Context) -> SpawnedSubsystem {
73 let future = run::<Client, Context>(ctx, self)
74 .map_err(|e| SubsystemError::with_origin("chain-api", e))
75 .boxed();
76 SpawnedSubsystem { future, name: "chain-api-subsystem" }
77 }
78}
79
80#[overseer::contextbounds(ChainApi, prefix = self::overseer)]
81async fn run<Client, Context>(
82 mut ctx: Context,
83 subsystem: ChainApiSubsystem<Client>,
84) -> SubsystemResult<()>
85where
86 Client: ChainApiBackend + AuxStore,
87{
88 loop {
89 match ctx.recv().await? {
90 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
91 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {},
92 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
93 FromOrchestra::Communication { msg } => match msg {
94 ChainApiMessage::BlockNumber(hash, response_channel) => {
95 let _timer = subsystem.metrics.time_block_number();
96 let result =
97 subsystem.client.number(hash).await.map_err(|e| e.to_string().into());
98 subsystem.metrics.on_request(result.is_ok());
99 let _ = response_channel.send(result);
100 },
101 ChainApiMessage::BlockHeader(hash, response_channel) => {
102 let _timer = subsystem.metrics.time_block_header();
103 let result =
104 subsystem.client.header(hash).await.map_err(|e| e.to_string().into());
105 subsystem.metrics.on_request(result.is_ok());
106 let _ = response_channel.send(result);
107 },
108 ChainApiMessage::BlockWeight(hash, response_channel) => {
109 let _timer = subsystem.metrics.time_block_weight();
110 let result = sc_consensus_babe::block_weight(&*subsystem.client, hash)
111 .map_err(|e| e.to_string().into());
112 subsystem.metrics.on_request(result.is_ok());
113 let _ = response_channel.send(result);
114 },
115 ChainApiMessage::FinalizedBlockHash(number, response_channel) => {
116 let _timer = subsystem.metrics.time_finalized_block_hash();
117 let result =
119 subsystem.client.hash(number).await.map_err(|e| e.to_string().into());
120 subsystem.metrics.on_request(result.is_ok());
121 let _ = response_channel.send(result);
122 },
123 ChainApiMessage::FinalizedBlockNumber(response_channel) => {
124 let _timer = subsystem.metrics.time_finalized_block_number();
125 let result = subsystem
126 .client
127 .info()
128 .await
129 .map_err(|e| e.to_string().into())
130 .map(|info| info.finalized_number);
131 subsystem.metrics.on_request(result.is_ok());
132 let _ = response_channel.send(result);
133 },
134 ChainApiMessage::Ancestors { hash, k, response_channel } => {
135 let _timer = subsystem.metrics.time_ancestors();
136 gum::trace!(target: LOG_TARGET, hash=%hash, k=k, "ChainApiMessage::Ancestors");
137
138 let next_parent_stream = futures::stream::unfold(
139 (hash, subsystem.client.clone()),
140 |(hash, client)| async move {
141 let maybe_header = client.header(hash).await;
142 match maybe_header {
143 Err(e) => {
145 let e = e.to_string().into();
146 Some((Err(e), (hash, client)))
147 },
148 Ok(None) => None,
150 Ok(Some(header)) => {
151 if header.number == 0 {
153 None
154 } else {
155 Some((Ok(header.parent_hash), (header.parent_hash, client)))
156 }
157 },
158 }
159 },
160 );
161
162 let result = next_parent_stream.take(k).try_collect().await;
163 subsystem.metrics.on_request(result.is_ok());
164 let _ = response_channel.send(result);
165 },
166 },
167 }
168 }
169}