Skip to main content

soil_offchain/
lib.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Substrate offchain workers.
8//!
9//! The offchain workers is a special function of the runtime that
10//! gets executed after block is imported. During execution
11//! it's able to asynchronously submit extrinsics that will either
12//! be propagated to other nodes or added to the next block
13//! produced by the node as unsigned transactions.
14//!
15//! Offchain workers can be used for computation-heavy tasks
16//! that are not feasible for execution during regular block processing.
17//! It can either be tasks that no consensus is required for,
18//! or some form of consensus over the data can be built on-chain
19//! for instance via:
20//! 1. Challenge period for incorrect computations
21//! 2. Majority voting for results
22//! 3. etc
23
24#![warn(missing_docs)]
25
26use std::{fmt, sync::Arc};
27
28use futures::{
29	future::{ready, Future},
30	prelude::*,
31};
32use parking_lot::Mutex;
33use soil_client::client_api::BlockchainEvents;
34use soil_client::transaction_pool::OffchainTransactionPoolFactory;
35use soil_network::{NetworkPeers, NetworkStateInfo};
36use subsoil::api::{ApiExt, ProvideRuntimeApi};
37use subsoil::core::{offchain, traits::SpawnNamed};
38use subsoil::externalities::Extension;
39use subsoil::keystore::{KeystoreExt, KeystorePtr};
40use subsoil::runtime::traits::{self, Header};
41use threadpool::ThreadPool;
42
43mod api;
44
45pub use subsoil::offchain_worker::{OffchainWorkerApi, STORAGE_PREFIX};
46pub use subsoil::core::offchain::storage::OffchainDb;
47
48const LOG_TARGET: &str = "offchain-worker";
49
50/// NetworkProvider provides [`OffchainWorkers`] with all necessary hooks into the
51/// underlying Substrate networking.
52pub trait NetworkProvider: NetworkStateInfo + NetworkPeers {}
53
54impl<T> NetworkProvider for T where T: NetworkStateInfo + NetworkPeers {}
55
56/// Special type that implements [`OffchainStorage`](offchain::OffchainStorage).
57///
58/// This type can not be constructed and should only be used when passing `None` as `offchain_db` to
59/// [`OffchainWorkerOptions`] to make the compiler happy.
60#[derive(Clone)]
61pub enum NoOffchainStorage {}
62
63impl offchain::OffchainStorage for NoOffchainStorage {
64	fn set(&mut self, _: &[u8], _: &[u8], _: &[u8]) {
65		unimplemented!("`NoOffchainStorage` can not be constructed!")
66	}
67
68	fn remove(&mut self, _: &[u8], _: &[u8]) {
69		unimplemented!("`NoOffchainStorage` can not be constructed!")
70	}
71
72	fn get(&self, _: &[u8], _: &[u8]) -> Option<Vec<u8>> {
73		unimplemented!("`NoOffchainStorage` can not be constructed!")
74	}
75
76	fn compare_and_set(&mut self, _: &[u8], _: &[u8], _: Option<&[u8]>, _: &[u8]) -> bool {
77		unimplemented!("`NoOffchainStorage` can not be constructed!")
78	}
79}
80
81/// Options for [`OffchainWorkers`]
82pub struct OffchainWorkerOptions<RA, Block: traits::Block, Storage, CE> {
83	/// Provides access to the runtime api.
84	pub runtime_api_provider: Arc<RA>,
85	/// Provides access to the keystore.
86	pub keystore: Option<KeystorePtr>,
87	/// Provides access to the offchain database.
88	///
89	/// Use [`NoOffchainStorage`] as type when passing `None` to have some type that works.
90	pub offchain_db: Option<Storage>,
91	/// Provides access to the transaction pool.
92	pub transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
93	/// Provides access to network information.
94	pub network_provider: Arc<dyn NetworkProvider + Send + Sync>,
95	/// Is the node running as validator?
96	pub is_validator: bool,
97	/// Enable http requests from offchain workers?
98	///
99	/// If not enabled, any http request will panic.
100	pub enable_http_requests: bool,
101	/// Callback to create custom [`Extension`]s that should be registered for the
102	/// `offchain_worker` runtime call.
103	///
104	/// These [`Extension`]s are registered along-side the default extensions and are accessible in
105	/// the host functions.
106	///
107	/// # Example:
108	///
109	/// ```nocompile
110	/// custom_extensions: |block_hash| {
111	///     vec![MyCustomExtension::new()]
112	/// }
113	/// ```
114	pub custom_extensions: CE,
115}
116
117/// An offchain workers manager.
118pub struct OffchainWorkers<RA, Block: traits::Block, Storage> {
119	runtime_api_provider: Arc<RA>,
120	thread_pool: Mutex<ThreadPool>,
121	shared_http_client: api::SharedClient,
122	enable_http_requests: bool,
123	keystore: Option<KeystorePtr>,
124	offchain_db: Option<OffchainDb<Storage>>,
125	transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
126	network_provider: Arc<dyn NetworkProvider + Send + Sync>,
127	is_validator: bool,
128	custom_extensions: Box<dyn Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send>,
129}
130
131impl<RA, Block: traits::Block, Storage> OffchainWorkers<RA, Block, Storage> {
132	/// Creates new [`OffchainWorkers`].
133	pub fn new<CE: Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send + 'static>(
134		OffchainWorkerOptions {
135			runtime_api_provider,
136			keystore,
137			offchain_db,
138			transaction_pool,
139			network_provider,
140			is_validator,
141			enable_http_requests,
142			custom_extensions,
143		}: OffchainWorkerOptions<RA, Block, Storage, CE>,
144	) -> std::io::Result<Self> {
145		Ok(Self {
146			runtime_api_provider,
147			thread_pool: Mutex::new(ThreadPool::with_name(
148				"offchain-worker".into(),
149				num_cpus::get(),
150			)),
151			shared_http_client: api::SharedClient::new()?,
152			enable_http_requests,
153			keystore,
154			offchain_db: offchain_db.map(OffchainDb::new),
155			transaction_pool,
156			is_validator,
157			network_provider,
158			custom_extensions: Box::new(custom_extensions),
159		})
160	}
161}
162
163impl<RA, Block: traits::Block, Storage: offchain::OffchainStorage> fmt::Debug
164	for OffchainWorkers<RA, Block, Storage>
165{
166	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167		f.debug_tuple("OffchainWorkers").finish()
168	}
169}
170
171impl<RA, Block, Storage> OffchainWorkers<RA, Block, Storage>
172where
173	Block: traits::Block,
174	RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
175	RA::Api: OffchainWorkerApi<Block>,
176	Storage: offchain::OffchainStorage + 'static,
177{
178	/// Run the offchain workers on every block import.
179	pub async fn run<BE: BlockchainEvents<Block>>(
180		self,
181		import_events: Arc<BE>,
182		spawner: impl SpawnNamed,
183	) {
184		import_events
185			.import_notification_stream()
186			.for_each(move |n| {
187				if n.is_new_best {
188					spawner.spawn(
189						"offchain-on-block",
190						Some("offchain-worker"),
191						self.on_block_imported(&n.header).boxed(),
192					);
193				} else {
194					tracing::debug!(
195						target: LOG_TARGET,
196						"Skipping offchain workers for non-canon block: {:?}",
197						n.header,
198					)
199				}
200
201				ready(())
202			})
203			.await;
204	}
205
206	/// Start the offchain workers after given block.
207	#[must_use]
208	fn on_block_imported(&self, header: &Block::Header) -> impl Future<Output = ()> {
209		let runtime = self.runtime_api_provider.runtime_api();
210		let hash = header.hash();
211		let has_api_v1 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 1);
212		let has_api_v2 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 2);
213		let version = match (has_api_v1, has_api_v2) {
214			(_, Ok(true)) => 2,
215			(Ok(true), _) => 1,
216			err => {
217				let help =
218					"Consider turning off offchain workers if they are not part of your runtime.";
219				tracing::error!(
220					target: LOG_TARGET,
221					"Unsupported Offchain Worker API version: {:?}. {}.",
222					err,
223					help
224				);
225				0
226			},
227		};
228		tracing::debug!(
229			target: LOG_TARGET,
230			"Checking offchain workers at {hash:?}: version: {version}",
231		);
232
233		let process = (version > 0).then(|| {
234			let (api, runner) = api::AsyncApi::new(
235				self.network_provider.clone(),
236				self.is_validator,
237				self.shared_http_client.clone(),
238			);
239			tracing::debug!(target: LOG_TARGET, "Spawning offchain workers at {hash:?}");
240			let header = header.clone();
241			let client = self.runtime_api_provider.clone();
242
243			let mut capabilities = offchain::Capabilities::all();
244			capabilities.set(offchain::Capabilities::HTTP, self.enable_http_requests);
245
246			let keystore = self.keystore.clone();
247			let db = self.offchain_db.clone();
248			let tx_pool = self.transaction_pool.clone();
249			let custom_extensions = (*self.custom_extensions)(hash);
250
251			self.spawn_worker(move || {
252				let mut runtime = client.runtime_api();
253				let api = Box::new(api);
254				tracing::debug!(target: LOG_TARGET, "Running offchain workers at {hash:?}");
255
256				if let Some(keystore) = keystore {
257					runtime.register_extension(KeystoreExt(keystore.clone()));
258				}
259
260				if let Some(pool) = tx_pool {
261					runtime.register_extension(pool.offchain_transaction_pool(hash));
262				}
263
264				if let Some(offchain_db) = db {
265					runtime.register_extension(offchain::OffchainDbExt::new(
266						offchain::LimitedExternalities::new(capabilities, offchain_db.clone()),
267					));
268				}
269
270				runtime.register_extension(offchain::OffchainWorkerExt::new(
271					offchain::LimitedExternalities::new(capabilities, api),
272				));
273
274				custom_extensions.into_iter().for_each(|ext| runtime.register_extension(ext));
275
276				let run = if version == 2 {
277					runtime.offchain_worker(hash, &header)
278				} else {
279					#[allow(deprecated)]
280					runtime.offchain_worker_before_version_2(hash, *header.number())
281				};
282
283				if let Err(e) = run {
284					tracing::error!(
285						target: LOG_TARGET,
286						"Error running offchain workers at {:?}: {}",
287						hash,
288						e
289					);
290				}
291			});
292
293			runner.process()
294		});
295
296		async move {
297			futures::future::OptionFuture::from(process).await;
298		}
299	}
300
301	/// Spawns a new offchain worker.
302	///
303	/// We spawn offchain workers for each block in a separate thread,
304	/// since they can run for a significant amount of time
305	/// in a blocking fashion and we don't want to block the runtime.
306	///
307	/// Note that we should avoid that if we switch to future-based runtime in the future,
308	/// alternatively:
309	fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
310		self.thread_pool.lock().execute(f);
311	}
312}
313
314#[cfg(test)]
315mod tests {
316	use super::*;
317	use futures::executor::block_on;
318	use soil_client::block_builder::BlockBuilderBuilder;
319	use soil_client::client_api::Backend as _;
320	use soil_client::consensus::BlockOrigin;
321	use soil_client::transaction_pool::{InPoolTransaction, TransactionPool};
322	use soil_network::types::PeerId;
323	use soil_network::{
324		config::MultiaddrWithPeerId, types::ProtocolName, Multiaddr, ObservedRole, ReputationChange,
325	};
326	use soil_txpool::BasicPool;
327	use std::{collections::HashSet, sync::Arc};
328	use subsoil::runtime::traits::Block as BlockT;
329	use soil_test_node_runtime_client::{
330		runtime::{
331			substrate_test_pallet::pallet::Call as PalletCall, ExtrinsicBuilder, RuntimeCall,
332		},
333		ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilderExt,
334	};
335
336	struct TestNetwork();
337
338	impl NetworkStateInfo for TestNetwork {
339		fn external_addresses(&self) -> Vec<Multiaddr> {
340			Vec::new()
341		}
342
343		fn local_peer_id(&self) -> PeerId {
344			PeerId::random()
345		}
346
347		fn listen_addresses(&self) -> Vec<Multiaddr> {
348			Vec::new()
349		}
350	}
351
352	#[async_trait::async_trait]
353	impl NetworkPeers for TestNetwork {
354		fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
355			unimplemented!();
356		}
357
358		fn set_authorized_only(&self, _reserved_only: bool) {
359			unimplemented!();
360		}
361
362		fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
363			unimplemented!();
364		}
365
366		fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {
367			unimplemented!();
368		}
369
370		fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
371			unimplemented!()
372		}
373
374		fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
375			unimplemented!();
376		}
377
378		fn accept_unreserved_peers(&self) {
379			unimplemented!();
380		}
381
382		fn deny_unreserved_peers(&self) {
383			unimplemented!();
384		}
385
386		fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
387			unimplemented!();
388		}
389
390		fn remove_reserved_peer(&self, _peer_id: PeerId) {
391			unimplemented!();
392		}
393
394		fn set_reserved_peers(
395			&self,
396			_protocol: ProtocolName,
397			_peers: HashSet<Multiaddr>,
398		) -> Result<(), String> {
399			unimplemented!();
400		}
401
402		fn add_peers_to_reserved_set(
403			&self,
404			_protocol: ProtocolName,
405			_peers: HashSet<Multiaddr>,
406		) -> Result<(), String> {
407			unimplemented!();
408		}
409
410		fn remove_peers_from_reserved_set(
411			&self,
412			_protocol: ProtocolName,
413			_peers: Vec<PeerId>,
414		) -> Result<(), String> {
415			unimplemented!();
416		}
417
418		fn sync_num_connected(&self) -> usize {
419			unimplemented!();
420		}
421
422		fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
423			None
424		}
425
426		async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
427			unimplemented!();
428		}
429	}
430
431	#[test]
432	fn should_call_into_runtime_and_produce_extrinsic() {
433		subsoil::tracing::try_init_simple();
434
435		let client = Arc::new(soil_test_node_runtime_client::new());
436		let spawner = subsoil::core::testing::TaskExecutor::new();
437		let pool = Arc::from(BasicPool::new_full(
438			Default::default(),
439			true.into(),
440			None,
441			spawner,
442			client.clone(),
443		));
444		let network = Arc::new(TestNetwork());
445		let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();
446
447		// when
448		let offchain = OffchainWorkers::new(OffchainWorkerOptions {
449			runtime_api_provider: client,
450			keystore: None,
451			offchain_db: None::<NoOffchainStorage>,
452			transaction_pool: Some(OffchainTransactionPoolFactory::new(pool.clone())),
453			network_provider: network,
454			is_validator: false,
455			enable_http_requests: false,
456			custom_extensions: |_| Vec::new(),
457		})
458		.unwrap();
459		futures::executor::block_on(offchain.on_block_imported(&header));
460
461		// then
462		assert_eq!(pool.status().ready, 1);
463		assert!(matches!(
464			pool.ready().next().unwrap().data().function,
465			RuntimeCall::SubstrateTest(PalletCall::storage_change { .. })
466		));
467	}
468
469	#[test]
470	fn offchain_index_set_and_clear_works() {
471		use subsoil::core::offchain::OffchainStorage;
472
473		subsoil::tracing::try_init_simple();
474
475		let (client, backend) = soil_test_node_runtime_client::TestClientBuilder::new()
476			.enable_offchain_indexing_api()
477			.build_with_backend();
478		let client = Arc::new(client);
479		let offchain_db = backend.offchain_storage().unwrap();
480
481		let key = &b"hello"[..];
482		let value = &b"world"[..];
483		let mut block_builder = BlockBuilderBuilder::new(&*client)
484			.on_parent_block(client.chain_info().genesis_hash)
485			.with_parent_block_number(0)
486			.build()
487			.unwrap();
488		let ext = ExtrinsicBuilder::new_offchain_index_set(key.to_vec(), value.to_vec()).build();
489		block_builder.push(ext).unwrap();
490
491		let block = block_builder.build().unwrap().block;
492		block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
493
494		assert_eq!(value, &offchain_db.get(subsoil::offchain_worker::STORAGE_PREFIX, &key).unwrap());
495
496		let mut block_builder = BlockBuilderBuilder::new(&*client)
497			.on_parent_block(block.hash())
498			.with_parent_block_number(1)
499			.build()
500			.unwrap();
501		let ext = ExtrinsicBuilder::new_offchain_index_clear(key.to_vec()).nonce(1).build();
502		block_builder.push(ext).unwrap();
503
504		let block = block_builder.build().unwrap().block;
505		block_on(client.import(BlockOrigin::Own, block)).unwrap();
506
507		assert!(offchain_db.get(subsoil::offchain_worker::STORAGE_PREFIX, &key).is_none());
508	}
509}