Skip to main content

soil_network/transactions/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Transactions handling to plug on top of the network service.
8//!
9//! Usage:
10//!
11//! - Use [`TransactionsHandlerPrototype::new`] to create a prototype.
12//! - Pass the `NonDefaultSetConfig` returned from [`TransactionsHandlerPrototype::new`] to the
13//!   network configuration as an extra peers set.
14//! - Use [`TransactionsHandlerPrototype::build`] then [`TransactionsHandler::run`] to obtain a
15//! `Future` that processes transactions.
16
17use crate::transactions::config::*;
18
19use codec::{Decode, Encode};
20use futures::{prelude::*, stream::FuturesUnordered};
21use log::{debug, trace, warn};
22
23use soil_prometheus::{register, Counter, PrometheusError, Registry, U64};
24use soil_client::utils::mpsc::{
25	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
26};
27use soil_network::common::{role::ObservedRole, ExHashT};
28use soil_network::sync::{SyncEvent, SyncEventStream};
29use soil_network::types::PeerId;
30use soil_network::{
31	config::{NonReservedPeerMode, ProtocolId, SetConfig},
32	error, multiaddr,
33	peer_store::PeerStoreProvider,
34	service::{
35		traits::{NotificationEvent, NotificationService, ValidationResult},
36		NotificationMetrics,
37	},
38	types::ProtocolName,
39	utils::{interval, LruHashSet},
40	NetworkBackend, NetworkEventStream, NetworkPeers,
41};
42use subsoil::runtime::traits::Block as BlockT;
43
44use std::{
45	collections::{hash_map::Entry, HashMap},
46	iter,
47	num::NonZeroUsize,
48	pin::Pin,
49	sync::Arc,
50	task::Poll,
51};
52
53pub mod config;
54
55/// A set of transactions.
56pub type Transactions<E> = Vec<E>;
57
58/// Logging target for the file.
59const LOG_TARGET: &str = "sync";
60
61mod rep {
62	use soil_network::ReputationChange as Rep;
63	/// Reputation change when a peer sends us any transaction.
64	///
65	/// This forces node to verify it, thus the negative value here. Once transaction is verified,
66	/// reputation change should be refunded with `ANY_TRANSACTION_REFUND`
67	pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
68	/// Reputation change when a peer sends us any transaction that is not invalid.
69	pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
70	/// Reputation change when a peer sends us an transaction that we didn't know about.
71	pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
72	/// Reputation change when a peer sends us a bad transaction.
73	pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
74}
75
76struct Metrics {
77	propagated_transactions: Counter<U64>,
78}
79
80impl Metrics {
81	fn register(r: &Registry) -> Result<Self, PrometheusError> {
82		Ok(Self {
83			propagated_transactions: register(
84				Counter::new(
85					"substrate_sync_propagated_transactions",
86					"Number of transactions propagated to at least one peer",
87				)?,
88				r,
89			)?,
90		})
91	}
92}
93
94struct PendingTransaction<H> {
95	validation: TransactionImportFuture,
96	tx_hash: H,
97}
98
99impl<H> Unpin for PendingTransaction<H> {}
100
101impl<H: ExHashT> Future for PendingTransaction<H> {
102	type Output = (H, TransactionImport);
103
104	fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
105		if let Poll::Ready(import_result) = self.validation.poll_unpin(cx) {
106			return Poll::Ready((self.tx_hash.clone(), import_result));
107		}
108
109		Poll::Pending
110	}
111}
112
113/// Prototype for a [`TransactionsHandler`].
114pub struct TransactionsHandlerPrototype {
115	/// Name of the transaction protocol.
116	protocol_name: ProtocolName,
117
118	/// Handle that is used to communicate with `soil_network::Notifications`.
119	notification_service: Box<dyn NotificationService>,
120}
121
122impl TransactionsHandlerPrototype {
123	/// Create a new instance.
124	pub fn new<
125		Hash: AsRef<[u8]>,
126		Block: BlockT,
127		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
128	>(
129		protocol_id: ProtocolId,
130		genesis_hash: Hash,
131		fork_id: Option<&str>,
132		metrics: NotificationMetrics,
133		peer_store_handle: Arc<dyn PeerStoreProvider>,
134	) -> (Self, Net::NotificationProtocolConfig) {
135		let genesis_hash = genesis_hash.as_ref();
136		let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
137			format!("/{}/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
138		} else {
139			format!("/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash))
140		}
141		.into();
142		let (config, notification_service) = Net::notification_config(
143			protocol_name.clone(),
144			vec![format!("/{}/transactions/1", protocol_id.as_ref()).into()],
145			MAX_TRANSACTIONS_SIZE,
146			None,
147			SetConfig {
148				in_peers: 0,
149				out_peers: 0,
150				reserved_nodes: Vec::new(),
151				non_reserved_mode: NonReservedPeerMode::Deny,
152			},
153			metrics,
154			peer_store_handle,
155		);
156
157		(Self { protocol_name, notification_service }, config)
158	}
159
160	/// Turns the prototype into the actual handler. Returns a controller that allows controlling
161	/// the behaviour of the handler while it's running.
162	///
163	/// Important: the transactions handler is initially disabled and doesn't gossip transactions.
164	/// Gossiping is enabled when major syncing is done.
165	pub fn build<
166		B: BlockT + 'static,
167		H: ExHashT,
168		N: NetworkPeers + NetworkEventStream,
169		S: SyncEventStream + soil_client::consensus::SyncOracle,
170	>(
171		self,
172		network: N,
173		sync: S,
174		transaction_pool: Arc<dyn TransactionPool<H, B>>,
175		metrics_registry: Option<&Registry>,
176	) -> error::Result<(TransactionsHandler<B, H, N, S>, TransactionsHandlerController<H>)> {
177		let sync_event_stream = sync.event_stream("transactions-handler-sync");
178		let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000);
179
180		let handler = TransactionsHandler {
181			protocol_name: self.protocol_name,
182			notification_service: self.notification_service,
183			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
184				as Pin<Box<dyn Stream<Item = ()> + Send>>)
185				.fuse(),
186			pending_transactions: FuturesUnordered::new(),
187			pending_transactions_peers: HashMap::new(),
188			network,
189			sync,
190			sync_event_stream: sync_event_stream.fuse(),
191			peers: HashMap::new(),
192			transaction_pool,
193			from_controller,
194			metrics: if let Some(r) = metrics_registry {
195				Some(Metrics::register(r)?)
196			} else {
197				None
198			},
199		};
200
201		let controller = TransactionsHandlerController { to_handler };
202
203		Ok((handler, controller))
204	}
205}
206
207/// Controls the behaviour of a [`TransactionsHandler`] it is connected to.
208pub struct TransactionsHandlerController<H: ExHashT> {
209	to_handler: TracingUnboundedSender<ToHandler<H>>,
210}
211
212impl<H: ExHashT> TransactionsHandlerController<H> {
213	/// You may call this when new transactions are imported by the transaction pool.
214	///
215	/// All transactions will be fetched from the `TransactionPool` that was passed at
216	/// initialization as part of the configuration and propagated to peers.
217	pub fn propagate_transactions(&self) {
218		let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions);
219	}
220
221	/// You must call when new a transaction is imported by the transaction pool.
222	///
223	/// This transaction will be fetched from the `TransactionPool` that was passed at
224	/// initialization as part of the configuration and propagated to peers.
225	pub fn propagate_transaction(&self, hash: H) {
226		let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash));
227	}
228}
229
230enum ToHandler<H: ExHashT> {
231	PropagateTransactions,
232	PropagateTransaction(H),
233}
234
235/// Handler for transactions. Call [`TransactionsHandler::run`] to start the processing.
236pub struct TransactionsHandler<
237	B: BlockT + 'static,
238	H: ExHashT,
239	N: NetworkPeers + NetworkEventStream,
240	S: SyncEventStream + soil_client::consensus::SyncOracle,
241> {
242	protocol_name: ProtocolName,
243	/// Interval at which we call `propagate_transactions`.
244	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
245	/// Pending transactions verification tasks.
246	pending_transactions: FuturesUnordered<PendingTransaction<H>>,
247	/// As multiple peers can send us the same transaction, we group
248	/// these peers using the transaction hash while the transaction is
249	/// imported. This prevents that we import the same transaction
250	/// multiple times concurrently.
251	pending_transactions_peers: HashMap<H, Vec<PeerId>>,
252	/// Network service to use to send messages and manage peers.
253	network: N,
254	/// Syncing service.
255	sync: S,
256	/// Receiver for syncing-related events.
257	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
258	// All connected peers
259	peers: HashMap<PeerId, Peer<H>>,
260	transaction_pool: Arc<dyn TransactionPool<H, B>>,
261	from_controller: TracingUnboundedReceiver<ToHandler<H>>,
262	/// Prometheus metrics.
263	metrics: Option<Metrics>,
264	/// Handle that is used to communicate with `soil_network::Notifications`.
265	notification_service: Box<dyn NotificationService>,
266}
267
268/// Peer information
269#[derive(Debug)]
270struct Peer<H: ExHashT> {
271	/// Holds a set of transactions known to this peer.
272	known_transactions: LruHashSet<H>,
273	role: ObservedRole,
274}
275
276impl<B, H, N, S> TransactionsHandler<B, H, N, S>
277where
278	B: BlockT + 'static,
279	H: ExHashT,
280	N: NetworkPeers + NetworkEventStream,
281	S: SyncEventStream + soil_client::consensus::SyncOracle,
282{
283	/// Turns the [`TransactionsHandler`] into a future that should run forever and not be
284	/// interrupted.
285	pub async fn run(mut self) {
286		loop {
287			futures::select! {
288				_ = self.propagate_timeout.next() => {
289					self.propagate_transactions();
290				},
291				(tx_hash, result) = self.pending_transactions.select_next_some() => {
292					if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
293						peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
294					} else {
295						warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
296					}
297				},
298				sync_event = self.sync_event_stream.next() => {
299					if let Some(sync_event) = sync_event {
300						self.handle_sync_event(sync_event);
301					} else {
302						// Syncing has seemingly closed. Closing as well.
303						return;
304					}
305				}
306				message = self.from_controller.select_next_some() => {
307					match message {
308						ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash),
309						ToHandler::PropagateTransactions => self.propagate_transactions(),
310					}
311				},
312				event = self.notification_service.next_event().fuse() => {
313					if let Some(event) = event {
314						self.handle_notification_event(event)
315					} else {
316						// `Notifications` has seemingly closed. Closing as well.
317						return
318					}
319				}
320			}
321		}
322	}
323
324	fn handle_notification_event(&mut self, event: NotificationEvent) {
325		match event {
326			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
327				// only accept peers whose role can be determined
328				let result = self
329					.network
330					.peer_role(peer, handshake)
331					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
332				let _ = result_tx.send(result);
333			},
334			NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
335				let Some(role) = self.network.peer_role(peer, handshake) else {
336					log::debug!(target: "sub-libp2p", "role for {peer} couldn't be determined");
337					return;
338				};
339
340				let _was_in = self.peers.insert(
341					peer,
342					Peer {
343						known_transactions: LruHashSet::new(
344							NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS).expect("Constant is nonzero"),
345						),
346						role,
347					},
348				);
349				debug_assert!(_was_in.is_none());
350			},
351			NotificationEvent::NotificationStreamClosed { peer } => {
352				let _peer = self.peers.remove(&peer);
353				debug_assert!(_peer.is_some());
354			},
355			NotificationEvent::NotificationReceived { peer, notification } => {
356				if let Ok(m) =
357					<Transactions<B::Extrinsic> as Decode>::decode(&mut notification.as_ref())
358				{
359					self.on_transactions(peer, m);
360				} else {
361					warn!(target: "sub-libp2p", "Failed to decode transactions list from peer {peer}");
362					self.network.report_peer(peer, rep::BAD_TRANSACTION);
363				}
364			},
365		}
366	}
367
368	fn handle_sync_event(&mut self, event: SyncEvent) {
369		match event {
370			SyncEvent::PeerConnected(remote) => {
371				let addr = iter::once(multiaddr::Protocol::P2p(remote.into()))
372					.collect::<multiaddr::Multiaddr>();
373				let result = self.network.add_peers_to_reserved_set(
374					self.protocol_name.clone(),
375					iter::once(addr).collect(),
376				);
377				if let Err(err) = result {
378					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
379				}
380			},
381			SyncEvent::PeerDisconnected(remote) => {
382				let result = self.network.remove_peers_from_reserved_set(
383					self.protocol_name.clone(),
384					iter::once(remote).collect(),
385				);
386				if let Err(err) = result {
387					log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);
388				}
389			},
390		}
391	}
392
393	/// Called when peer sends us new transactions
394	fn on_transactions(&mut self, who: PeerId, transactions: Transactions<B::Extrinsic>) {
395		// Accept transactions only when node is not major syncing
396		if self.sync.is_major_syncing() {
397			trace!(target: LOG_TARGET, "{} Ignoring transactions while major syncing", who);
398			return;
399		}
400
401		trace!(target: LOG_TARGET, "Received {} transactions from {}", transactions.len(), who);
402		if let Some(ref mut peer) = self.peers.get_mut(&who) {
403			for t in transactions {
404				if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
405					debug!(
406						target: LOG_TARGET,
407						"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
408						MAX_PENDING_TRANSACTIONS,
409					);
410					break;
411				}
412
413				let hash = self.transaction_pool.hash_of(&t);
414				peer.known_transactions.insert(hash.clone());
415
416				self.network.report_peer(who, rep::ANY_TRANSACTION);
417
418				match self.pending_transactions_peers.entry(hash.clone()) {
419					Entry::Vacant(entry) => {
420						self.pending_transactions.push(PendingTransaction {
421							validation: self.transaction_pool.import(t),
422							tx_hash: hash,
423						});
424						entry.insert(vec![who]);
425					},
426					Entry::Occupied(mut entry) => {
427						entry.get_mut().push(who);
428					},
429				}
430			}
431		}
432	}
433
434	fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
435		match import {
436			TransactionImport::KnownGood => {
437				self.network.report_peer(who, rep::ANY_TRANSACTION_REFUND)
438			},
439			TransactionImport::NewGood => self.network.report_peer(who, rep::GOOD_TRANSACTION),
440			TransactionImport::Bad => self.network.report_peer(who, rep::BAD_TRANSACTION),
441			TransactionImport::None => {},
442		}
443	}
444
445	/// Propagate one transaction.
446	pub fn propagate_transaction(&mut self, hash: &H) {
447		// Accept transactions only when node is not major syncing
448		if self.sync.is_major_syncing() {
449			return;
450		}
451
452		debug!(target: LOG_TARGET, "Propagating transaction [{:?}]", hash);
453		if let Some(transaction) = self.transaction_pool.transaction(hash) {
454			let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
455			self.transaction_pool.on_broadcasted(propagated_to);
456		} else {
457			debug!(target: "sync", "Propagating transaction failure [{:?}]", hash);
458		}
459	}
460
461	fn do_propagate_transactions(
462		&mut self,
463		transactions: &[(H, Arc<B::Extrinsic>)],
464	) -> HashMap<H, Vec<String>> {
465		let mut propagated_to = HashMap::<_, Vec<_>>::new();
466		let mut propagated_transactions = 0;
467
468		for (who, peer) in self.peers.iter_mut() {
469			// never send transactions to the light node
470			if matches!(peer.role, ObservedRole::Light) {
471				continue;
472			}
473
474			let (hashes, to_send): (Vec<_>, Transactions<_>) = transactions
475				.iter()
476				.filter(|(hash, _)| peer.known_transactions.insert(hash.clone()))
477				.cloned()
478				.unzip();
479
480			propagated_transactions += hashes.len();
481
482			if !to_send.is_empty() {
483				for hash in hashes {
484					propagated_to.entry(hash).or_default().push(who.to_base58());
485				}
486				trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
487				// Historically, the format of a notification of the transactions protocol
488				// consisted in a (SCALE-encoded) `Vec<Transaction>`.
489				// After RFC 56, the format was modified in a backwards-compatible way to be
490				// a (SCALE-encoded) tuple `(Compact(1), Transaction)`, which is the same encoding
491				// as a `Vec` of length one. This is no coincidence, as the change was
492				// intentionally done in a backwards-compatible way.
493				// In other words, the `Vec` that is sent below **must** always have only a single
494				// element in it.
495				// See <https://github.com/polkadot-fellows/RFCs/blob/main/text/0056-one-transaction-per-notification.md>
496				for to_send in to_send {
497					let _ = self
498						.notification_service
499						.send_sync_notification(who, vec![to_send].encode());
500				}
501			}
502		}
503
504		if let Some(ref metrics) = self.metrics {
505			metrics.propagated_transactions.inc_by(propagated_transactions as _)
506		}
507
508		propagated_to
509	}
510
511	/// Call when we must propagate ready transactions to peers.
512	fn propagate_transactions(&mut self) {
513		// Accept transactions only when node is not major syncing
514		if self.sync.is_major_syncing() {
515			return;
516		}
517
518		let transactions = self.transaction_pool.transactions();
519
520		if transactions.is_empty() {
521			return;
522		}
523
524		debug!(target: LOG_TARGET, "Propagating transactions");
525
526		let propagated_to = self.do_propagate_transactions(&transactions);
527		self.transaction_pool.on_broadcasted(propagated_to);
528	}
529}