Skip to main content

soil_txpool/common/
api.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Chain api required for the transaction pool.
8
9use crate::{
10	common::{sliding_stat::DurationSlidingStats, STAT_SLIDING_WINDOW},
11	graph::ValidateTransactionPriority,
12	insert_and_log_throttled, LOG_TARGET, LOG_TARGET_STAT,
13};
14use async_trait::async_trait;
15use codec::Encode;
16use futures::future::{Future, FutureExt};
17use soil_prometheus::Registry as PrometheusRegistry;
18use soil_client::blockchain::{HeaderMetadata, TreeRoute};
19use soil_client::client_api::{blockchain::HeaderBackend, BlockBackend};
20use subsoil::txpool::runtime_api::TaggedTransactionQueue;
21use std::{
22	marker::PhantomData,
23	pin::Pin,
24	sync::Arc,
25	time::{Duration, Instant},
26};
27use subsoil::api::{ApiExt, ProvideRuntimeApi};
28use subsoil::core::traits::SpawnEssentialNamed;
29use subsoil::runtime::{
30	generic::BlockId,
31	traits::{self, Block as BlockT, BlockIdTo},
32	transaction_validity::{TransactionSource, TransactionValidity},
33};
34use tokio::sync::{mpsc, oneshot, Mutex};
35
36use super::{
37	error::{self, Error},
38	metrics::{ApiMetrics, ApiMetricsExt},
39};
40use crate::graph;
41use tracing::{trace, warn, Level};
42
43/// The transaction pool logic for full client.
44pub struct FullChainApi<Client, Block> {
45	client: Arc<Client>,
46	_marker: PhantomData<Block>,
47	metrics: Option<Arc<ApiMetrics>>,
48	validation_pool_normal: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
49	validation_pool_maintained: mpsc::Sender<Pin<Box<dyn Future<Output = ()> + Send>>>,
50	validate_transaction_normal_stats: DurationSlidingStats,
51	validate_transaction_maintained_stats: DurationSlidingStats,
52}
53
54/// Spawn a validation task that will be used by the transaction pool to validate transactions.
55fn spawn_validation_pool_task(
56	name: &'static str,
57	receiver_normal: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
58	receiver_maintained: Arc<Mutex<mpsc::Receiver<Pin<Box<dyn Future<Output = ()> + Send>>>>>,
59	spawner: &impl SpawnEssentialNamed,
60	stats: DurationSlidingStats,
61	blocking_stats: DurationSlidingStats,
62) {
63	spawner.spawn_essential_blocking(
64		name,
65		Some("transaction-pool"),
66		async move {
67			loop {
68				let start = Instant::now();
69
70				let task = {
71					let receiver_maintained = receiver_maintained.clone();
72					let receiver_normal = receiver_normal.clone();
73					tokio::select! {
74						Some(task) = async {
75							receiver_maintained.lock().await.recv().await
76						} => { task }
77						Some(task) = async {
78							receiver_normal.lock().await.recv().await
79						} => { task }
80						else => {
81							return
82						}
83					}
84				};
85
86				let blocking_duration = {
87					let start = Instant::now();
88					task.await;
89					start.elapsed()
90				};
91
92				insert_and_log_throttled!(
93					Level::DEBUG,
94					target:LOG_TARGET_STAT,
95					prefix:format!("validate_transaction_inner_stats"),
96					stats,
97					start.elapsed().into()
98				);
99				insert_and_log_throttled!(
100					Level::DEBUG,
101					target:LOG_TARGET_STAT,
102					prefix:format!("validate_transaction_blocking_stats"),
103					blocking_stats,
104					blocking_duration.into()
105				);
106				trace!(target:LOG_TARGET, duration=?start.elapsed(), "spawn_validation_pool_task");
107			}
108		}
109		.boxed(),
110	);
111}
112
113impl<Client, Block> FullChainApi<Client, Block> {
114	/// Create new transaction pool logic.
115	pub fn new(
116		client: Arc<Client>,
117		prometheus: Option<&PrometheusRegistry>,
118		spawner: &impl SpawnEssentialNamed,
119	) -> Self {
120		let stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
121		let blocking_stats = DurationSlidingStats::new(Duration::from_secs(STAT_SLIDING_WINDOW));
122
123		let metrics = prometheus.map(ApiMetrics::register).and_then(|r| match r {
124			Err(error) => {
125				warn!(
126					target: LOG_TARGET,
127					?error,
128					"Failed to register transaction pool API Prometheus metrics"
129				);
130				None
131			},
132			Ok(api) => Some(Arc::new(api)),
133		});
134
135		let (sender, receiver) = mpsc::channel(1);
136		let (sender_maintained, receiver_maintained) = mpsc::channel(1);
137
138		let receiver = Arc::new(Mutex::new(receiver));
139		let receiver_maintained = Arc::new(Mutex::new(receiver_maintained));
140		spawn_validation_pool_task(
141			"transaction-pool-task-0",
142			receiver.clone(),
143			receiver_maintained.clone(),
144			spawner,
145			stats.clone(),
146			blocking_stats.clone(),
147		);
148		spawn_validation_pool_task(
149			"transaction-pool-task-1",
150			receiver,
151			receiver_maintained,
152			spawner,
153			stats.clone(),
154			blocking_stats.clone(),
155		);
156
157		FullChainApi {
158			client,
159			validation_pool_normal: sender,
160			validation_pool_maintained: sender_maintained,
161			_marker: Default::default(),
162			metrics,
163			validate_transaction_normal_stats: DurationSlidingStats::new(Duration::from_secs(
164				STAT_SLIDING_WINDOW,
165			)),
166			validate_transaction_maintained_stats: DurationSlidingStats::new(Duration::from_secs(
167				STAT_SLIDING_WINDOW,
168			)),
169		}
170	}
171}
172
173#[async_trait]
174impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
175where
176	Block: BlockT,
177	Client: ProvideRuntimeApi<Block>
178		+ BlockBackend<Block>
179		+ BlockIdTo<Block>
180		+ HeaderBackend<Block>
181		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
182	Client: Send + Sync + 'static,
183	Client::Api: TaggedTransactionQueue<Block>,
184{
185	type Block = Block;
186	type Error = error::Error;
187
188	async fn block_body(
189		&self,
190		hash: Block::Hash,
191	) -> Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>, Self::Error> {
192		self.client.block_body(hash).map_err(error::Error::from)
193	}
194
195	async fn validate_transaction(
196		&self,
197		at: <Self::Block as BlockT>::Hash,
198		source: TransactionSource,
199		uxt: graph::ExtrinsicFor<Self>,
200		validation_priority: ValidateTransactionPriority,
201	) -> Result<TransactionValidity, Self::Error> {
202		let start = Instant::now();
203		let (tx, rx) = oneshot::channel();
204		let client = self.client.clone();
205		let (stats, validation_pool, prefix) =
206			if validation_priority == ValidateTransactionPriority::Maintained {
207				(
208					self.validate_transaction_maintained_stats.clone(),
209					self.validation_pool_maintained.clone(),
210					"validate_transaction_maintained_stats",
211				)
212			} else {
213				(
214					self.validate_transaction_normal_stats.clone(),
215					self.validation_pool_normal.clone(),
216					"validate_transaction_stats",
217				)
218			};
219		let metrics = self.metrics.clone();
220
221		metrics.report(|m| m.validations_scheduled.inc());
222
223		{
224			validation_pool
225				.send(
226					async move {
227						let res = validate_transaction_blocking(&*client, at, source, uxt);
228						let _ = tx.send(res);
229						metrics.report(|m| m.validations_finished.inc());
230					}
231					.boxed(),
232				)
233				.await
234				.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
235		}
236
237		let validity = match rx.await {
238			Ok(r) => r,
239			Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
240		};
241
242		insert_and_log_throttled!(
243			Level::DEBUG,
244			target:LOG_TARGET_STAT,
245			prefix:prefix,
246			stats,
247			start.elapsed().into()
248		);
249
250		validity
251	}
252
253	/// Validates a transaction by calling into the runtime.
254	///
255	/// Same as `validate_transaction` but blocks the current thread when performing validation.
256	fn validate_transaction_blocking(
257		&self,
258		at: Block::Hash,
259		source: TransactionSource,
260		uxt: graph::ExtrinsicFor<Self>,
261	) -> Result<TransactionValidity, Self::Error> {
262		validate_transaction_blocking(&*self.client, at, source, uxt)
263	}
264
265	fn block_id_to_number(
266		&self,
267		at: &BlockId<Self::Block>,
268	) -> Result<Option<graph::NumberFor<Self>>, Self::Error> {
269		self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
270	}
271
272	fn block_id_to_hash(
273		&self,
274		at: &BlockId<Self::Block>,
275	) -> Result<Option<graph::BlockHash<Self>>, Self::Error> {
276		self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
277	}
278
279	fn hash_and_length(
280		&self,
281		ex: &graph::RawExtrinsicFor<Self>,
282	) -> (graph::ExtrinsicHash<Self>, usize) {
283		ex.using_encoded(|x| (<traits::HashingFor<Block> as traits::Hash>::hash(x), x.len()))
284	}
285
286	fn block_header(
287		&self,
288		hash: <Self::Block as BlockT>::Hash,
289	) -> Result<Option<<Self::Block as BlockT>::Header>, Self::Error> {
290		self.client.header(hash).map_err(Into::into)
291	}
292
293	fn tree_route(
294		&self,
295		from: <Self::Block as BlockT>::Hash,
296		to: <Self::Block as BlockT>::Hash,
297	) -> Result<TreeRoute<Self::Block>, Self::Error> {
298		soil_client::blockchain::tree_route::<Block, Client>(&*self.client, from, to)
299			.map_err(Into::into)
300	}
301}
302
303/// Helper function to validate a transaction using a full chain API.
304/// This method will call into the runtime to perform the validation.
305fn validate_transaction_blocking<Client, Block>(
306	client: &Client,
307	at: Block::Hash,
308	source: TransactionSource,
309	uxt: graph::ExtrinsicFor<FullChainApi<Client, Block>>,
310) -> error::Result<TransactionValidity>
311where
312	Block: BlockT,
313	Client: ProvideRuntimeApi<Block>
314		+ BlockBackend<Block>
315		+ BlockIdTo<Block>
316		+ HeaderBackend<Block>
317		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
318	Client: Send + Sync + 'static,
319	Client::Api: TaggedTransactionQueue<Block>,
320{
321	let s = std::time::Instant::now();
322	let tx_hash = uxt.using_encoded(|x| <traits::HashingFor<Block> as traits::Hash>::hash(x));
323
324	let result = subsoil::within_span!(subsoil::tracing::Level::TRACE, "validate_transaction";
325	{
326		let runtime_api = client.runtime_api();
327		let api_version = subsoil::within_span! { subsoil::tracing::Level::TRACE, "check_version";
328			runtime_api
329				.api_version::<dyn TaggedTransactionQueue<Block>>(at)
330				.map_err(|e| Error::RuntimeApi(e.to_string()))?
331				.ok_or_else(|| Error::RuntimeApi(
332					format!("Could not find `TaggedTransactionQueue` api for block `{:?}`.", at)
333				))
334		}?;
335
336		use subsoil::api::Core;
337
338		subsoil::within_span!(
339			subsoil::tracing::Level::TRACE, "runtime::validate_transaction";
340		{
341			if api_version >= 3 {
342				runtime_api.validate_transaction(at, source, (*uxt).clone(), at)
343					.map_err(|e| Error::RuntimeApi(e.to_string()))
344			} else {
345				let block_number = client.to_number(&BlockId::Hash(at))
346					.map_err(|e| Error::RuntimeApi(e.to_string()))?
347					.ok_or_else(||
348						Error::RuntimeApi(format!("Could not get number for block `{:?}`.", at))
349					)?;
350
351				// The old versions require us to call `initialize_block` before.
352				runtime_api.initialize_block(at, &subsoil::runtime::traits::Header::new(
353					block_number + subsoil::runtime::traits::One::one(),
354					Default::default(),
355					Default::default(),
356					at,
357					Default::default()),
358				).map_err(|e| Error::RuntimeApi(e.to_string()))?;
359
360				if api_version == 2 {
361					#[allow(deprecated)] // old validate_transaction
362					runtime_api.validate_transaction_before_version_3(at, source, (*uxt).clone())
363						.map_err(|e| Error::RuntimeApi(e.to_string()))
364				} else {
365					#[allow(deprecated)] // old validate_transaction
366					runtime_api.validate_transaction_before_version_2(at, (*uxt).clone())
367						.map_err(|e| Error::RuntimeApi(e.to_string()))
368				}
369			}
370		})
371	});
372	trace!(
373		target: LOG_TARGET,
374		?tx_hash,
375		?at,
376		duration = ?s.elapsed(),
377		"validate_transaction_blocking"
378	);
379	result
380}