1#![warn(missing_docs)]
25
26use std::{fmt, sync::Arc};
27
28use futures::{
29 future::{ready, Future},
30 prelude::*,
31};
32use parking_lot::Mutex;
33use soil_client::client_api::BlockchainEvents;
34use soil_client::transaction_pool::OffchainTransactionPoolFactory;
35use soil_network::{NetworkPeers, NetworkStateInfo};
36use subsoil::api::{ApiExt, ProvideRuntimeApi};
37use subsoil::core::{offchain, traits::SpawnNamed};
38use subsoil::externalities::Extension;
39use subsoil::keystore::{KeystoreExt, KeystorePtr};
40use subsoil::runtime::traits::{self, Header};
41use threadpool::ThreadPool;
42
43mod api;
44
45pub use subsoil::offchain_worker::{OffchainWorkerApi, STORAGE_PREFIX};
46pub use subsoil::core::offchain::storage::OffchainDb;
47
48const LOG_TARGET: &str = "offchain-worker";
49
50pub trait NetworkProvider: NetworkStateInfo + NetworkPeers {}
53
54impl<T> NetworkProvider for T where T: NetworkStateInfo + NetworkPeers {}
55
56#[derive(Clone)]
61pub enum NoOffchainStorage {}
62
63impl offchain::OffchainStorage for NoOffchainStorage {
64 fn set(&mut self, _: &[u8], _: &[u8], _: &[u8]) {
65 unimplemented!("`NoOffchainStorage` can not be constructed!")
66 }
67
68 fn remove(&mut self, _: &[u8], _: &[u8]) {
69 unimplemented!("`NoOffchainStorage` can not be constructed!")
70 }
71
72 fn get(&self, _: &[u8], _: &[u8]) -> Option<Vec<u8>> {
73 unimplemented!("`NoOffchainStorage` can not be constructed!")
74 }
75
76 fn compare_and_set(&mut self, _: &[u8], _: &[u8], _: Option<&[u8]>, _: &[u8]) -> bool {
77 unimplemented!("`NoOffchainStorage` can not be constructed!")
78 }
79}
80
81pub struct OffchainWorkerOptions<RA, Block: traits::Block, Storage, CE> {
83 pub runtime_api_provider: Arc<RA>,
85 pub keystore: Option<KeystorePtr>,
87 pub offchain_db: Option<Storage>,
91 pub transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
93 pub network_provider: Arc<dyn NetworkProvider + Send + Sync>,
95 pub is_validator: bool,
97 pub enable_http_requests: bool,
101 pub custom_extensions: CE,
115}
116
117pub struct OffchainWorkers<RA, Block: traits::Block, Storage> {
119 runtime_api_provider: Arc<RA>,
120 thread_pool: Mutex<ThreadPool>,
121 shared_http_client: api::SharedClient,
122 enable_http_requests: bool,
123 keystore: Option<KeystorePtr>,
124 offchain_db: Option<OffchainDb<Storage>>,
125 transaction_pool: Option<OffchainTransactionPoolFactory<Block>>,
126 network_provider: Arc<dyn NetworkProvider + Send + Sync>,
127 is_validator: bool,
128 custom_extensions: Box<dyn Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send>,
129}
130
131impl<RA, Block: traits::Block, Storage> OffchainWorkers<RA, Block, Storage> {
132 pub fn new<CE: Fn(Block::Hash) -> Vec<Box<dyn Extension>> + Send + 'static>(
134 OffchainWorkerOptions {
135 runtime_api_provider,
136 keystore,
137 offchain_db,
138 transaction_pool,
139 network_provider,
140 is_validator,
141 enable_http_requests,
142 custom_extensions,
143 }: OffchainWorkerOptions<RA, Block, Storage, CE>,
144 ) -> std::io::Result<Self> {
145 Ok(Self {
146 runtime_api_provider,
147 thread_pool: Mutex::new(ThreadPool::with_name(
148 "offchain-worker".into(),
149 num_cpus::get(),
150 )),
151 shared_http_client: api::SharedClient::new()?,
152 enable_http_requests,
153 keystore,
154 offchain_db: offchain_db.map(OffchainDb::new),
155 transaction_pool,
156 is_validator,
157 network_provider,
158 custom_extensions: Box::new(custom_extensions),
159 })
160 }
161}
162
163impl<RA, Block: traits::Block, Storage: offchain::OffchainStorage> fmt::Debug
164 for OffchainWorkers<RA, Block, Storage>
165{
166 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
167 f.debug_tuple("OffchainWorkers").finish()
168 }
169}
170
171impl<RA, Block, Storage> OffchainWorkers<RA, Block, Storage>
172where
173 Block: traits::Block,
174 RA: ProvideRuntimeApi<Block> + Send + Sync + 'static,
175 RA::Api: OffchainWorkerApi<Block>,
176 Storage: offchain::OffchainStorage + 'static,
177{
178 pub async fn run<BE: BlockchainEvents<Block>>(
180 self,
181 import_events: Arc<BE>,
182 spawner: impl SpawnNamed,
183 ) {
184 import_events
185 .import_notification_stream()
186 .for_each(move |n| {
187 if n.is_new_best {
188 spawner.spawn(
189 "offchain-on-block",
190 Some("offchain-worker"),
191 self.on_block_imported(&n.header).boxed(),
192 );
193 } else {
194 tracing::debug!(
195 target: LOG_TARGET,
196 "Skipping offchain workers for non-canon block: {:?}",
197 n.header,
198 )
199 }
200
201 ready(())
202 })
203 .await;
204 }
205
206 #[must_use]
208 fn on_block_imported(&self, header: &Block::Header) -> impl Future<Output = ()> {
209 let runtime = self.runtime_api_provider.runtime_api();
210 let hash = header.hash();
211 let has_api_v1 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 1);
212 let has_api_v2 = runtime.has_api_with::<dyn OffchainWorkerApi<Block>, _>(hash, |v| v == 2);
213 let version = match (has_api_v1, has_api_v2) {
214 (_, Ok(true)) => 2,
215 (Ok(true), _) => 1,
216 err => {
217 let help =
218 "Consider turning off offchain workers if they are not part of your runtime.";
219 tracing::error!(
220 target: LOG_TARGET,
221 "Unsupported Offchain Worker API version: {:?}. {}.",
222 err,
223 help
224 );
225 0
226 },
227 };
228 tracing::debug!(
229 target: LOG_TARGET,
230 "Checking offchain workers at {hash:?}: version: {version}",
231 );
232
233 let process = (version > 0).then(|| {
234 let (api, runner) = api::AsyncApi::new(
235 self.network_provider.clone(),
236 self.is_validator,
237 self.shared_http_client.clone(),
238 );
239 tracing::debug!(target: LOG_TARGET, "Spawning offchain workers at {hash:?}");
240 let header = header.clone();
241 let client = self.runtime_api_provider.clone();
242
243 let mut capabilities = offchain::Capabilities::all();
244 capabilities.set(offchain::Capabilities::HTTP, self.enable_http_requests);
245
246 let keystore = self.keystore.clone();
247 let db = self.offchain_db.clone();
248 let tx_pool = self.transaction_pool.clone();
249 let custom_extensions = (*self.custom_extensions)(hash);
250
251 self.spawn_worker(move || {
252 let mut runtime = client.runtime_api();
253 let api = Box::new(api);
254 tracing::debug!(target: LOG_TARGET, "Running offchain workers at {hash:?}");
255
256 if let Some(keystore) = keystore {
257 runtime.register_extension(KeystoreExt(keystore.clone()));
258 }
259
260 if let Some(pool) = tx_pool {
261 runtime.register_extension(pool.offchain_transaction_pool(hash));
262 }
263
264 if let Some(offchain_db) = db {
265 runtime.register_extension(offchain::OffchainDbExt::new(
266 offchain::LimitedExternalities::new(capabilities, offchain_db.clone()),
267 ));
268 }
269
270 runtime.register_extension(offchain::OffchainWorkerExt::new(
271 offchain::LimitedExternalities::new(capabilities, api),
272 ));
273
274 custom_extensions.into_iter().for_each(|ext| runtime.register_extension(ext));
275
276 let run = if version == 2 {
277 runtime.offchain_worker(hash, &header)
278 } else {
279 #[allow(deprecated)]
280 runtime.offchain_worker_before_version_2(hash, *header.number())
281 };
282
283 if let Err(e) = run {
284 tracing::error!(
285 target: LOG_TARGET,
286 "Error running offchain workers at {:?}: {}",
287 hash,
288 e
289 );
290 }
291 });
292
293 runner.process()
294 });
295
296 async move {
297 futures::future::OptionFuture::from(process).await;
298 }
299 }
300
301 fn spawn_worker(&self, f: impl FnOnce() -> () + Send + 'static) {
310 self.thread_pool.lock().execute(f);
311 }
312}
313
314#[cfg(test)]
315mod tests {
316 use super::*;
317 use futures::executor::block_on;
318 use soil_client::block_builder::BlockBuilderBuilder;
319 use soil_client::client_api::Backend as _;
320 use soil_client::consensus::BlockOrigin;
321 use soil_client::transaction_pool::{InPoolTransaction, TransactionPool};
322 use soil_network::types::PeerId;
323 use soil_network::{
324 config::MultiaddrWithPeerId, types::ProtocolName, Multiaddr, ObservedRole, ReputationChange,
325 };
326 use soil_txpool::BasicPool;
327 use std::{collections::HashSet, sync::Arc};
328 use subsoil::runtime::traits::Block as BlockT;
329 use soil_test_node_runtime_client::{
330 runtime::{
331 substrate_test_pallet::pallet::Call as PalletCall, ExtrinsicBuilder, RuntimeCall,
332 },
333 ClientBlockImportExt, DefaultTestClientBuilderExt, TestClientBuilderExt,
334 };
335
336 struct TestNetwork();
337
338 impl NetworkStateInfo for TestNetwork {
339 fn external_addresses(&self) -> Vec<Multiaddr> {
340 Vec::new()
341 }
342
343 fn local_peer_id(&self) -> PeerId {
344 PeerId::random()
345 }
346
347 fn listen_addresses(&self) -> Vec<Multiaddr> {
348 Vec::new()
349 }
350 }
351
352 #[async_trait::async_trait]
353 impl NetworkPeers for TestNetwork {
354 fn set_authorized_peers(&self, _peers: HashSet<PeerId>) {
355 unimplemented!();
356 }
357
358 fn set_authorized_only(&self, _reserved_only: bool) {
359 unimplemented!();
360 }
361
362 fn add_known_address(&self, _peer_id: PeerId, _addr: Multiaddr) {
363 unimplemented!();
364 }
365
366 fn report_peer(&self, _peer_id: PeerId, _cost_benefit: ReputationChange) {
367 unimplemented!();
368 }
369
370 fn peer_reputation(&self, _peer_id: &PeerId) -> i32 {
371 unimplemented!()
372 }
373
374 fn disconnect_peer(&self, _peer_id: PeerId, _protocol: ProtocolName) {
375 unimplemented!();
376 }
377
378 fn accept_unreserved_peers(&self) {
379 unimplemented!();
380 }
381
382 fn deny_unreserved_peers(&self) {
383 unimplemented!();
384 }
385
386 fn add_reserved_peer(&self, _peer: MultiaddrWithPeerId) -> Result<(), String> {
387 unimplemented!();
388 }
389
390 fn remove_reserved_peer(&self, _peer_id: PeerId) {
391 unimplemented!();
392 }
393
394 fn set_reserved_peers(
395 &self,
396 _protocol: ProtocolName,
397 _peers: HashSet<Multiaddr>,
398 ) -> Result<(), String> {
399 unimplemented!();
400 }
401
402 fn add_peers_to_reserved_set(
403 &self,
404 _protocol: ProtocolName,
405 _peers: HashSet<Multiaddr>,
406 ) -> Result<(), String> {
407 unimplemented!();
408 }
409
410 fn remove_peers_from_reserved_set(
411 &self,
412 _protocol: ProtocolName,
413 _peers: Vec<PeerId>,
414 ) -> Result<(), String> {
415 unimplemented!();
416 }
417
418 fn sync_num_connected(&self) -> usize {
419 unimplemented!();
420 }
421
422 fn peer_role(&self, _peer_id: PeerId, _handshake: Vec<u8>) -> Option<ObservedRole> {
423 None
424 }
425
426 async fn reserved_peers(&self) -> Result<Vec<PeerId>, ()> {
427 unimplemented!();
428 }
429 }
430
431 #[test]
432 fn should_call_into_runtime_and_produce_extrinsic() {
433 subsoil::tracing::try_init_simple();
434
435 let client = Arc::new(soil_test_node_runtime_client::new());
436 let spawner = subsoil::core::testing::TaskExecutor::new();
437 let pool = Arc::from(BasicPool::new_full(
438 Default::default(),
439 true.into(),
440 None,
441 spawner,
442 client.clone(),
443 ));
444 let network = Arc::new(TestNetwork());
445 let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();
446
447 let offchain = OffchainWorkers::new(OffchainWorkerOptions {
449 runtime_api_provider: client,
450 keystore: None,
451 offchain_db: None::<NoOffchainStorage>,
452 transaction_pool: Some(OffchainTransactionPoolFactory::new(pool.clone())),
453 network_provider: network,
454 is_validator: false,
455 enable_http_requests: false,
456 custom_extensions: |_| Vec::new(),
457 })
458 .unwrap();
459 futures::executor::block_on(offchain.on_block_imported(&header));
460
461 assert_eq!(pool.status().ready, 1);
463 assert!(matches!(
464 pool.ready().next().unwrap().data().function,
465 RuntimeCall::SubstrateTest(PalletCall::storage_change { .. })
466 ));
467 }
468
469 #[test]
470 fn offchain_index_set_and_clear_works() {
471 use subsoil::core::offchain::OffchainStorage;
472
473 subsoil::tracing::try_init_simple();
474
475 let (client, backend) = soil_test_node_runtime_client::TestClientBuilder::new()
476 .enable_offchain_indexing_api()
477 .build_with_backend();
478 let client = Arc::new(client);
479 let offchain_db = backend.offchain_storage().unwrap();
480
481 let key = &b"hello"[..];
482 let value = &b"world"[..];
483 let mut block_builder = BlockBuilderBuilder::new(&*client)
484 .on_parent_block(client.chain_info().genesis_hash)
485 .with_parent_block_number(0)
486 .build()
487 .unwrap();
488 let ext = ExtrinsicBuilder::new_offchain_index_set(key.to_vec(), value.to_vec()).build();
489 block_builder.push(ext).unwrap();
490
491 let block = block_builder.build().unwrap().block;
492 block_on(client.import(BlockOrigin::Own, block.clone())).unwrap();
493
494 assert_eq!(value, &offchain_db.get(subsoil::offchain_worker::STORAGE_PREFIX, &key).unwrap());
495
496 let mut block_builder = BlockBuilderBuilder::new(&*client)
497 .on_parent_block(block.hash())
498 .with_parent_block_number(1)
499 .build()
500 .unwrap();
501 let ext = ExtrinsicBuilder::new_offchain_index_clear(key.to_vec()).nonce(1).build();
502 block_builder.push(ext).unwrap();
503
504 let block = block_builder.build().unwrap().block;
505 block_on(client.import(BlockOrigin::Own, block)).unwrap();
506
507 assert!(offchain_db.get(subsoil::offchain_worker::STORAGE_PREFIX, &key).is_none());
508 }
509}