1use crate::{
10 common::{sliding_stat::DurationSlidingStats, STAT_SLIDING_WINDOW},
11 graph::ValidateTransactionPriority,
12 insert_and_log_throttled, LOG_TARGET, LOG_TARGET_STAT,
13};
14use async_trait::async_trait;
15use codec::Encode;
16use futures::future::{Future, FutureExt};
17use soil_prometheus::Registry as PrometheusRegistry;
18use soil_client::blockchain::{HeaderMetadata, TreeRoute};
19use soil_client::client_api::{blockchain::HeaderBackend, BlockBackend};
20use subsoil::txpool::runtime_api::TaggedTransactionQueue;
21use std::{
22 marker::PhantomData,
23 pin::Pin,
24 sync::Arc,
25 time::{Duration, Instant},
26};
27use subsoil::api::{ApiExt, ProvideRuntimeApi};
28use subsoil::core::traits::SpawnEssentialNamed;
29use subsoil::runtime::{
30 generic::BlockId,
31 traits::{self, Block as BlockT, BlockIdTo},
32 transaction_validity::{TransactionSource, TransactionValidity},
33};
34use tokio::sync::{mpsc, oneshot, Mutex};
35
36use super::{
37 error::{self, Error},
38 metrics::{ApiMetrics, ApiMetricsExt},
39};
40use crate::graph;
41use tracing::{trace, warn, Level};
42
43pub struct FullChainApi<Client, Block> {
45 client: Arc<Client>,
46 _marker: PhantomData<Block>,
47 metrics: Option<Arc<ApiMetrics>>,
48 validation_pool_normal: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
49 validation_pool_maintained: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
50 validate_transaction_normal_stats: DurationSlidingStats,
51 validate_transaction_maintained_stats: DurationSlidingStats,
52}
53
54fn spawn_validation_pool_task(
56 name: &'static str,
57 receiver_normal: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
58 receiver_maintained: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
59 spawner: &impl SpawnEssentialNamed,
60 stats: DurationSlidingStats,
61 blocking_stats: DurationSlidingStats,
62) {
63 spawner.spawn_essential_blocking(
64 name,
65 Some("transaction-pool"),
66 async move {
67 loop {
68 let start = Instant::now();
69
70 let task = {
71 let receiver_maintained = receiver_maintained.clone();
72 let receiver_normal = receiver_normal.clone();
73 tokio::select! {
74 Some(task) = async {
75 receiver_maintained.lock().await.recv().await
76 } => { task }
77 Some(task) = async {
78 receiver_normal.lock().await.recv().await
79 } => { task }
80 else => {
81 return
82 }
83 }
84 };
85
86 let blocking_duration = {
87 let start = Instant::now();
88 task.await;
89 start.elapsed()
90 };
91
92 insert_and_log_throttled!(
93 Level::DEBUG,
94 target:LOG_TARGET_STAT,
95 prefix:format!("validate_transaction_inner_stats"),
96 stats,
97 start.elapsed().into()
98 );
99 insert_and_log_throttled!(
100 Level::DEBUG,
101 target:LOG_TARGET_STAT,
102 prefix:format!("validate_transaction_blocking_stats"),
103 blocking_stats,
104 blocking_duration.into()
105 );
106 trace!(target:LOG_TARGET, duration=?start.elapsed(), "spawn_validation_pool_task");
107 }
108 }
109 .boxed(),
110 );
111}
112
113impl<Client, Block> FullChainApi<Client, Block> {
114 pub fn new(
116 client: Arc<Client>,
117 prometheus: Option<&PrometheusRegistry>,
118 spawner: &impl SpawnEssentialNamed,
119 ) -> Self {
120 let stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
121 let blocking_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
122
123 let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
124 Err(error) => {
125 warn!(
126 target: LOG_TARGET,
127 ?error,
128 "Failed to register transaction pool API Prometheus metrics"
129 );
130 None
131 },
132 Ok(api) => Some(Arc::new(api)),
133 });
134
135 let (sender, receiver) = mpsc::channel(1);
136 let (sender_maintained, receiver_maintained) = mpsc::channel(1);
137
138 let receiver = Arc::new(Mutex::new(receiver));
139 let receiver_maintained = Arc::new(Mutex::new(receiver_maintained));
140 spawn_validation_pool_task(
141 "transaction-pool-task-0",
142 receiver.clone(),
143 receiver_maintained.clone(),
144 spawner,
145 stats.clone(),
146 blocking_stats.clone(),
147 );
148 spawn_validation_pool_task(
149 "transaction-pool-task-1",
150 receiver,
151 receiver_maintained,
152 spawner,
153 stats.clone(),
154 blocking_stats.clone(),
155 );
156
157 FullChainApi {
158 client,
159 validation_pool_normal: sender,
160 validation_pool_maintained: sender_maintained,
161 _marker: Default::default(),
162 metrics,
163 validate_transaction_normal_stats: DurationSlidingStats::new(Duration::from_secs(
164 STAT_SLIDING_WINDOW,
165 )),
166 validate_transaction_maintained_stats: DurationSlidingStats::new(Duration::from_secs(
167 STAT_SLIDING_WINDOW,
168 )),
169 }
170 }
171}
172
173#[async_trait]
174impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
175where
176 Block: BlockT,
177 Client: ProvideRuntimeApi<Block>
178 + BlockBackend<Block>
179 + BlockIdTo<Block>
180 + HeaderBackend<Block>
181 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
182 Client: Send + Sync + 'static,
183 Client::Api: TaggedTransactionQueue<Block>,
184{
185 type Block = Block;
186 type Error = error::Error;
187
188 async fn block_body(
189 &self,
190 hash: Block::Hash,
191 ) -> Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>, Self::Error> {
192 self.client.block_body(hash).map_err(error::Error::from)
193 }
194
195 async fn validate_transaction(
196 &self,
197 at: <Self::Block as BlockT>::Hash,
198 source: TransactionSource,
199 uxt: graph::ExtrinsicFor<Self>,
200 validation_priority: ValidateTransactionPriority,
201 ) -> Result<TransactionValidity, Self::Error> {
202 let start = Instant::now();
203 let (tx, rx) = oneshot::channel();
204 let client = self.client.clone();
205 let (stats, validation_pool, prefix) =
206 if validation_priority == ValidateTransactionPriority::Maintained {
207 (
208 self.validate_transaction_maintained_stats.clone(),
209 self.validation_pool_maintained.clone(),
210 "validate_transaction_maintained_stats",
211 )
212 } else {
213 (
214 self.validate_transaction_normal_stats.clone(),
215 self.validation_pool_normal.clone(),
216 "validate_transaction_stats",
217 )
218 };
219 let metrics = self.metrics.clone();
220
221 metrics.report(|m| m.validations_scheduled.inc());
222
223 {
224 validation_pool
225 .send(
226 async move {
227 let res = validate_transaction_blocking(&*client, at, source, uxt);
228 let _ = tx.send(res);
229 metrics.report(|m| m.validations_finished.inc());
230 }
231 .boxed(),
232 )
233 .await
234 .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
235 }
236
237 let validity = match rx.await {
238 Ok(r) => r,
239 Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
240 };
241
242 insert_and_log_throttled!(
243 Level::DEBUG,
244 target:LOG_TARGET_STAT,
245 prefix:prefix,
246 stats,
247 start.elapsed().into()
248 );
249
250 validity
251 }
252
253 fn validate_transaction_blocking(
257 &self,
258 at: Block::Hash,
259 source: TransactionSource,
260 uxt: graph::ExtrinsicFor<Self>,
261 ) -> Result<TransactionValidity, Self::Error> {
262 validate_transaction_blocking(&*self.client, at, source, uxt)
263 }
264
265 fn block_id_to_number(
266 &self,
267 at: &BlockId<Self::Block>,
268 ) -> Result<Option<graph::NumberFor<Self>>, Self::Error> {
269 self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
270 }
271
272 fn block_id_to_hash(
273 &self,
274 at: &BlockId<Self::Block>,
275 ) -> Result<Option<graph::BlockHash<Self>>, Self::Error> {
276 self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
277 }
278
279 fn hash_and_length(
280 &self,
281 ex: &graph::RawExtrinsicFor<Self>,
282 ) -> (graph::ExtrinsicHash<Self>, usize) {
283 ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
284 }
285
286 fn block_header(
287 &self,
288 hash: <Self::Block as BlockT>::Hash,
289 ) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
290 self.client.header(hash).map_err(Into::into)
291 }
292
293 fn tree_route(
294 &self,
295 from: <Self::Block as BlockT>::Hash,
296 to: <Self::Block as BlockT>::Hash,
297 ) -> Result<TreeRoute<Self::Block>, Self::Error> {
298 soil_client::blockchain::tree_route::<Block, Client>(&*self.client, from, to)
299 .map_err(Into::into)
300 }
301}
302
303fn validate_transaction_blocking<Client, Block>(
306 client: &Client,
307 at: Block::Hash,
308 source: TransactionSource,
309 uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
310) -> error::Result<TransactionValidity>
311where
312 Block: BlockT,
313 Client: ProvideRuntimeApi<Block>
314 + BlockBackend<Block>
315 + BlockIdTo<Block>
316 + HeaderBackend<Block>
317 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
318 Client: Send + Sync + 'static,
319 Client::Api: TaggedTransactionQueue<Block>,
320{
321 let s = std::time::Instant::now();
322 let tx_hash = uxt.using_encoded(|x| <traits::HashingFor<Block> as traits::Hash>::hash(x));
323
324 let result = subsoil::within_span!(subsoil::tracing::Level::TRACE, "validate_transaction";
325 {
326 let runtime_api = client.runtime_api();
327 let api_version = subsoil::within_span! { subsoil::tracing::Level::TRACE, "check_version";
328 runtime_api
329 .api_version::<dyn TaggedTransactionQueue<Block>>(at)
330 .map_err(|e| Error::RuntimeApi(e.to_string()))?
331 .ok_or_else(|| Error::RuntimeApi(
332 format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
333 ))
334 }?;
335
336 use subsoil::api::Core;
337
338 subsoil::within_span!(
339 subsoil::tracing::Level::TRACE, "runtime::validate_transaction";
340 {
341 if api_version >= 3 {
342 runtime_api.validate_transaction(at, source, (*uxt).clone(), at)
343 .map_err(|e| Error::RuntimeApi(e.to_string()))
344 } else {
345 let block_number = client.to_number(&BlockId::Hash(at))
346 .map_err(|e| Error::RuntimeApi(e.to_string()))?
347 .ok_or_else(||
348 Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
349 )?;
350
351 runtime_api.initialize_block(at, &subsoil::runtime::traits::Header::new(
353 block_number + subsoil::runtime::traits::One::one(),
354 Default::default(),
355 Default::default(),
356 at,
357 Default::default()),
358 ).map_err(|e| Error::RuntimeApi(e.to_string()))?;
359
360 if api_version == 2 {
361 #[allow(deprecated)] runtime_api.validate_transaction_before_version_3(at, source, (*uxt).clone())
363 .map_err(|e| Error::RuntimeApi(e.to_string()))
364 } else {
365 #[allow(deprecated)] runtime_api.validate_transaction_before_version_2(at, (*uxt).clone())
367 .map_err(|e| Error::RuntimeApi(e.to_string()))
368 }
369 }
370 })
371 });
372 trace!(
373 target: LOG_TARGET,
374 ?tx_hash,
375 ?at,
376 duration = ?s.elapsed(),
377 "validate_transaction_blocking"
378 );
379 result
380}