1use codec::Encode;
24use futures::{
25 channel::oneshot,
26 future,
27 future::{Future, FutureExt},
28};
29use log::{debug, error, info, log_enabled, trace, warn, Level};
30use pezsc_block_builder::{BlockBuilderApi, BlockBuilderBuilder};
31use pezsc_proposer_metrics::{EndProposingReason, MetricsLink as PrometheusMetrics};
32use pezsc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO};
33use pezsc_transaction_pool_api::{InPoolTransaction, TransactionPool, TxInvalidityReportMap};
34use pezsp_api::{ApiExt, CallApiAt, ProofRecorder, ProvideRuntimeApi};
35use pezsp_blockchain::{
36 ApplyExtrinsicFailed::Validity, Error::ApplyExtrinsicFailed, HeaderBackend,
37};
38use pezsp_consensus::{DisableProofRecording, EnableProofRecording, ProofRecording, Proposal};
39use pezsp_core::traits::SpawnNamed;
40use pezsp_inherents::InherentData;
41use pezsp_runtime::{
42 traits::{BlakeTwo256, Block as BlockT, Hash as HashT, Header as HeaderT},
43 Digest, ExtrinsicInclusionMode, Percent, SaturatedConversion,
44};
45use pezsp_trie::recorder::IgnoredNodes;
46use prometheus_endpoint::Registry as PrometheusRegistry;
47use std::{marker::PhantomData, pin::Pin, sync::Arc, time};
48
49pub const DEFAULT_BLOCK_SIZE_LIMIT: usize = 4 * 1024 * 1024 + 512;
57
58const DEFAULT_SOFT_DEADLINE_PERCENT: Percent = Percent::from_percent(50);
59
60const LOG_TARGET: &'static str = "basic-authorship";
61
62pub struct ProposerFactory<A, C, PR> {
64 spawn_handle: Box<dyn SpawnNamed>,
65 client: Arc<C>,
67 transaction_pool: Arc<A>,
69 metrics: PrometheusMetrics,
71 default_block_size_limit: usize,
76 soft_deadline_percent: Percent,
84 telemetry: Option<TelemetryHandle>,
85 include_proof_in_block_size_estimation: bool,
87 _phantom: PhantomData<PR>,
89}
90
91impl<A, C, PR> Clone for ProposerFactory<A, C, PR> {
92 fn clone(&self) -> Self {
93 Self {
94 spawn_handle: self.spawn_handle.clone(),
95 client: self.client.clone(),
96 transaction_pool: self.transaction_pool.clone(),
97 metrics: self.metrics.clone(),
98 default_block_size_limit: self.default_block_size_limit,
99 soft_deadline_percent: self.soft_deadline_percent,
100 telemetry: self.telemetry.clone(),
101 include_proof_in_block_size_estimation: self.include_proof_in_block_size_estimation,
102 _phantom: self._phantom,
103 }
104 }
105}
106
107impl<A, C> ProposerFactory<A, C, DisableProofRecording> {
108 pub fn new(
113 spawn_handle: impl SpawnNamed + 'static,
114 client: Arc<C>,
115 transaction_pool: Arc<A>,
116 prometheus: Option<&PrometheusRegistry>,
117 telemetry: Option<TelemetryHandle>,
118 ) -> Self {
119 ProposerFactory {
120 spawn_handle: Box::new(spawn_handle),
121 transaction_pool,
122 metrics: PrometheusMetrics::new(prometheus),
123 default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
124 soft_deadline_percent: DEFAULT_SOFT_DEADLINE_PERCENT,
125 telemetry,
126 client,
127 include_proof_in_block_size_estimation: false,
128 _phantom: PhantomData,
129 }
130 }
131}
132
133impl<A, C> ProposerFactory<A, C, EnableProofRecording> {
134 pub fn with_proof_recording(
141 spawn_handle: impl SpawnNamed + 'static,
142 client: Arc<C>,
143 transaction_pool: Arc<A>,
144 prometheus: Option<&PrometheusRegistry>,
145 telemetry: Option<TelemetryHandle>,
146 ) -> Self {
147 ProposerFactory {
148 client,
149 spawn_handle: Box::new(spawn_handle),
150 transaction_pool,
151 metrics: PrometheusMetrics::new(prometheus),
152 default_block_size_limit: DEFAULT_BLOCK_SIZE_LIMIT,
153 soft_deadline_percent: DEFAULT_SOFT_DEADLINE_PERCENT,
154 telemetry,
155 include_proof_in_block_size_estimation: true,
156 _phantom: PhantomData,
157 }
158 }
159
160 pub fn disable_proof_in_block_size_estimation(&mut self) {
162 self.include_proof_in_block_size_estimation = false;
163 }
164}
165
166impl<A, C, PR> ProposerFactory<A, C, PR> {
167 pub fn set_default_block_size_limit(&mut self, limit: usize) {
175 self.default_block_size_limit = limit;
176 }
177
178 pub fn set_soft_deadline(&mut self, percent: Percent) {
191 self.soft_deadline_percent = percent;
192 }
193}
194
195impl<Block, C, A, PR> ProposerFactory<A, C, PR>
196where
197 A: TransactionPool<Block = Block> + 'static,
198 Block: BlockT,
199 C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + Send + Sync + 'static,
200 C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
201{
202 fn init_with_now(
203 &mut self,
204 parent_header: &<Block as BlockT>::Header,
205 now: Box<dyn Fn() -> time::Instant + Send + Sync>,
206 ) -> Proposer<Block, C, A, PR> {
207 let parent_hash = parent_header.hash();
208
209 info!(
210 "🙌 Starting consensus session on top of parent {:?} (#{})",
211 parent_hash,
212 parent_header.number()
213 );
214
215 let proposer = Proposer::<_, _, _, PR> {
216 spawn_handle: self.spawn_handle.clone(),
217 client: self.client.clone(),
218 parent_hash,
219 parent_number: *parent_header.number(),
220 transaction_pool: self.transaction_pool.clone(),
221 now,
222 metrics: self.metrics.clone(),
223 default_block_size_limit: self.default_block_size_limit,
224 soft_deadline_percent: self.soft_deadline_percent,
225 telemetry: self.telemetry.clone(),
226 _phantom: PhantomData,
227 include_proof_in_block_size_estimation: self.include_proof_in_block_size_estimation,
228 };
229
230 proposer
231 }
232}
233
234impl<A, Block, C, PR> pezsp_consensus::Environment<Block> for ProposerFactory<A, C, PR>
235where
236 A: TransactionPool<Block = Block> + 'static,
237 Block: BlockT,
238 C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
239 C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
240 PR: ProofRecording,
241{
242 type CreateProposer = future::Ready<Result<Self::Proposer, Self::Error>>;
243 type Proposer = Proposer<Block, C, A, PR>;
244 type Error = pezsp_blockchain::Error;
245
246 fn init(&mut self, parent_header: &<Block as BlockT>::Header) -> Self::CreateProposer {
247 future::ready(Ok(self.init_with_now(parent_header, Box::new(time::Instant::now))))
248 }
249}
250
251pub struct Proposer<Block: BlockT, C, A: TransactionPool, PR> {
253 spawn_handle: Box<dyn SpawnNamed>,
254 client: Arc<C>,
255 parent_hash: Block::Hash,
256 parent_number: <<Block as BlockT>::Header as HeaderT>::Number,
257 transaction_pool: Arc<A>,
258 now: Box<dyn Fn() -> time::Instant + Send + Sync>,
259 metrics: PrometheusMetrics,
260 default_block_size_limit: usize,
261 include_proof_in_block_size_estimation: bool,
262 soft_deadline_percent: Percent,
263 telemetry: Option<TelemetryHandle>,
264 _phantom: PhantomData<PR>,
265}
266
267impl<A, Block, C, PR> pezsp_consensus::Proposer<Block> for Proposer<Block, C, A, PR>
268where
269 A: TransactionPool<Block = Block> + 'static,
270 Block: BlockT,
271 C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
272 C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
273 PR: ProofRecording,
274{
275 type Proposal =
276 Pin<Box<dyn Future<Output = Result<Proposal<Block, PR::Proof>, Self::Error>> + Send>>;
277 type Error = pezsp_blockchain::Error;
278 type ProofRecording = PR;
279 type Proof = PR::Proof;
280
281 fn propose(
282 self,
283 inherent_data: InherentData,
284 inherent_digests: Digest,
285 max_duration: time::Duration,
286 block_size_limit: Option<usize>,
287 ) -> Self::Proposal {
288 self.propose_block(ProposeArgs {
289 inherent_data,
290 inherent_digests,
291 max_duration,
292 block_size_limit,
293 ignored_nodes_by_proof_recording: None,
294 })
295 .boxed()
296 }
297}
298
299pub struct ProposeArgs<Block: BlockT> {
301 pub inherent_data: InherentData,
303 pub inherent_digests: Digest,
305 pub max_duration: time::Duration,
307 pub block_size_limit: Option<usize>,
312 pub ignored_nodes_by_proof_recording: Option<IgnoredNodes<Block::Hash>>,
316}
317
318impl<Block: BlockT> Default for ProposeArgs<Block> {
319 fn default() -> Self {
320 Self {
321 inherent_data: Default::default(),
322 inherent_digests: Default::default(),
323 max_duration: Default::default(),
324 block_size_limit: None,
325 ignored_nodes_by_proof_recording: None,
326 }
327 }
328}
329
330const MAX_SKIPPED_TRANSACTIONS: usize = 8;
334
335impl<A, Block, C, PR> Proposer<Block, C, A, PR>
336where
337 A: TransactionPool<Block = Block> + 'static,
338 Block: BlockT,
339 C: HeaderBackend<Block> + ProvideRuntimeApi<Block> + CallApiAt<Block> + Send + Sync + 'static,
340 C::Api: ApiExt<Block> + BlockBuilderApi<Block>,
341 PR: ProofRecording,
342{
343 pub async fn propose_block(
345 self,
346 args: ProposeArgs<Block>,
347 ) -> Result<Proposal<Block, PR::Proof>, pezsp_blockchain::Error> {
348 let (tx, rx) = oneshot::channel();
349 let spawn_handle = self.spawn_handle.clone();
350
351 spawn_handle.spawn_blocking(
353 "basic-authorship-proposer",
354 None,
355 async move {
356 let res = self.propose_with(args).await;
357 if tx.send(res).is_err() {
358 trace!(
359 target: LOG_TARGET,
360 "Could not send block production result to proposer!"
361 );
362 }
363 }
364 .boxed(),
365 );
366
367 rx.await?.map_err(Into::into)
368 }
369
370 async fn propose_with(
371 self,
372 ProposeArgs {
373 inherent_data,
374 inherent_digests,
375 max_duration,
376 block_size_limit,
377 ignored_nodes_by_proof_recording,
378 }: ProposeArgs<Block>,
379 ) -> Result<Proposal<Block, PR::Proof>, pezsp_blockchain::Error> {
380 let deadline = (self.now)() + max_duration - max_duration / 10;
382 let block_timer = time::Instant::now();
383 let mut block_builder = BlockBuilderBuilder::new(&*self.client)
384 .on_parent_block(self.parent_hash)
385 .with_parent_block_number(self.parent_number)
386 .with_proof_recorder(PR::ENABLED.then(|| {
387 ProofRecorder::<Block>::with_ignored_nodes(
388 ignored_nodes_by_proof_recording.unwrap_or_default(),
389 )
390 }))
391 .with_inherent_digests(inherent_digests)
392 .build()?;
393
394 self.apply_inherents(&mut block_builder, inherent_data)?;
395
396 let mode = block_builder.extrinsic_inclusion_mode();
397 let end_reason = match mode {
398 ExtrinsicInclusionMode::AllExtrinsics => {
399 self.apply_extrinsics(&mut block_builder, deadline, block_size_limit).await?
400 },
401 ExtrinsicInclusionMode::OnlyInherents => EndProposingReason::TransactionForbidden,
402 };
403 let (block, storage_changes, proof) = block_builder.build()?.into_inner();
404 let block_took = block_timer.elapsed();
405
406 let proof =
407 PR::into_proof(proof).map_err(|e| pezsp_blockchain::Error::Application(Box::new(e)))?;
408
409 self.print_summary(&block, end_reason, block_took, block_timer.elapsed());
410 Ok(Proposal { block, proof, storage_changes })
411 }
412
413 fn apply_inherents(
415 &self,
416 block_builder: &mut pezsc_block_builder::BlockBuilder<'_, Block, C>,
417 inherent_data: InherentData,
418 ) -> Result<(), pezsp_blockchain::Error> {
419 let create_inherents_start = time::Instant::now();
420
421 let inherent_identifiers = log_enabled!(target: LOG_TARGET, Level::Debug).then(|| {
422 inherent_data
423 .identifiers()
424 .map(|id| String::from_utf8_lossy(id).to_string())
425 .collect::<Vec<String>>()
426 });
427
428 let inherents = block_builder.create_inherents(inherent_data)?;
429 let create_inherents_end = time::Instant::now();
430
431 debug!(target: LOG_TARGET, "apply_inherents: Runtime provided {} inherents. Inherent identifiers present: {:?}", inherents.len(), inherent_identifiers);
432
433 self.metrics.report(|metrics| {
434 metrics.create_inherents_time.observe(
435 create_inherents_end
436 .saturating_duration_since(create_inherents_start)
437 .as_secs_f64(),
438 );
439 });
440
441 for inherent in inherents {
442 match block_builder.push(inherent) {
443 Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
444 warn!(
445 target: LOG_TARGET,
446 "⚠️ Dropping non-mandatory inherent from overweight block."
447 )
448 },
449 Err(ApplyExtrinsicFailed(Validity(e))) if e.was_mandatory() => {
450 error!(
451 "❌️ Mandatory inherent extrinsic returned error. Block cannot be produced."
452 );
453 return Err(ApplyExtrinsicFailed(Validity(e)));
454 },
455 Err(e) => {
456 warn!(
457 target: LOG_TARGET,
458 "❗️ Inherent extrinsic returned unexpected error: {}. Dropping.", e
459 );
460 },
461 Ok(_) => {},
462 }
463 }
464 Ok(())
465 }
466
467 async fn apply_extrinsics(
469 &self,
470 block_builder: &mut pezsc_block_builder::BlockBuilder<'_, Block, C>,
471 deadline: time::Instant,
472 block_size_limit: Option<usize>,
473 ) -> Result<EndProposingReason, pezsp_blockchain::Error> {
474 let now = (self.now)();
477 let left = deadline.saturating_duration_since(now);
478 let left_micros: u64 = left.as_micros().saturated_into();
479 let soft_deadline =
480 now + time::Duration::from_micros(self.soft_deadline_percent.mul_floor(left_micros));
481 let mut skipped = 0;
482 let mut unqueue_invalid = TxInvalidityReportMap::new();
483 let mut limit_hit_reason: Option<EndProposingReason> = None;
484
485 let delay = deadline.saturating_duration_since((self.now)()) / 8;
486 let mut pending_iterator =
487 self.transaction_pool.ready_at_with_timeout(self.parent_hash, delay).await;
488
489 let block_size_limit = block_size_limit.unwrap_or(self.default_block_size_limit);
490
491 debug!(target: LOG_TARGET, "Attempting to push transactions from the pool at {:?}.", self.parent_hash);
492 let mut transaction_pushed = false;
493
494 let end_reason = loop {
495 let pending_tx = if let Some(pending_tx) = pending_iterator.next() {
496 pending_tx
497 } else {
498 debug!(
499 target: LOG_TARGET,
500 "No more transactions, proceeding with proposing."
501 );
502
503 break limit_hit_reason.unwrap_or(EndProposingReason::NoMoreTransactions);
504 };
505
506 let now = (self.now)();
507 if now > deadline {
508 debug!(
509 target: LOG_TARGET,
510 "Consensus deadline reached when pushing block transactions, \
511 proceeding with proposing."
512 );
513 break limit_hit_reason.unwrap_or(EndProposingReason::HitDeadline);
514 }
515
516 let pending_tx_data = (**pending_tx.data()).clone();
517 let pending_tx_hash = pending_tx.hash().clone();
518
519 let block_size =
520 block_builder.estimate_block_size(self.include_proof_in_block_size_estimation);
521 if block_size + pending_tx_data.encoded_size() > block_size_limit {
522 pending_iterator.report_invalid(&pending_tx);
523 limit_hit_reason = Some(EndProposingReason::HitBlockSizeLimit);
524 if skipped < MAX_SKIPPED_TRANSACTIONS {
525 skipped += 1;
526 debug!(
527 target: LOG_TARGET,
528 "Transaction would overflow the block size limit, \
529 but will try {} more transactions before quitting.",
530 MAX_SKIPPED_TRANSACTIONS - skipped,
531 );
532 continue;
533 } else if now < soft_deadline {
534 debug!(
535 target: LOG_TARGET,
536 "Transaction would overflow the block size limit, \
537 but we still have time before the soft deadline, so \
538 we will try a bit more."
539 );
540 continue;
541 } else {
542 debug!(
543 target: LOG_TARGET,
544 "Reached block size limit, proceeding with proposing."
545 );
546 break EndProposingReason::HitBlockSizeLimit;
547 }
548 }
549
550 trace!(target: LOG_TARGET, "[{:?}] Pushing to the block.", pending_tx_hash);
551 match pezsc_block_builder::BlockBuilder::push(block_builder, pending_tx_data) {
552 Ok(()) => {
553 transaction_pushed = true;
554 limit_hit_reason = None;
555 trace!(target: LOG_TARGET, "[{:?}] Pushed to the block.", pending_tx_hash);
556 },
557 Err(ApplyExtrinsicFailed(Validity(e))) if e.exhausted_resources() => {
558 pending_iterator.report_invalid(&pending_tx);
559 limit_hit_reason = Some(EndProposingReason::HitBlockWeightLimit);
560 if skipped < MAX_SKIPPED_TRANSACTIONS {
561 skipped += 1;
562 debug!(target: LOG_TARGET,
563 "Block seems full, but will try {} more transactions before quitting.",
564 MAX_SKIPPED_TRANSACTIONS - skipped,
565 );
566 } else if (self.now)() < soft_deadline {
567 debug!(target: LOG_TARGET,
568 "Block seems full, but we still have time before the soft deadline, \
569 so we will try a bit more before quitting."
570 );
571 } else {
572 debug!(
573 target: LOG_TARGET,
574 "Reached block weight limit, proceeding with proposing."
575 );
576 break EndProposingReason::HitBlockWeightLimit;
577 }
578 },
579 Err(e) => {
580 pending_iterator.report_invalid(&pending_tx);
581 debug!(
582 target: LOG_TARGET,
583 "[{:?}] Invalid transaction: {} at: {}", pending_tx_hash, e, self.parent_hash
584 );
585
586 let error_to_report = match e {
587 ApplyExtrinsicFailed(Validity(e)) => Some(e),
588 _ => None,
589 };
590
591 unqueue_invalid.insert(pending_tx_hash, error_to_report);
592 },
593 }
594 };
595
596 if matches!(end_reason, EndProposingReason::HitBlockSizeLimit) && !transaction_pushed {
597 warn!(
598 target: LOG_TARGET,
599 "Hit block size limit of `{}` without including any transaction!", block_size_limit,
600 );
601 }
602
603 self.transaction_pool
604 .report_invalid(Some(self.parent_hash), unqueue_invalid)
605 .await;
606 Ok(end_reason)
607 }
608
609 fn print_summary(
616 &self,
617 block: &Block,
618 end_reason: EndProposingReason,
619 block_took: time::Duration,
620 propose_took: time::Duration,
621 ) {
622 let extrinsics = block.extrinsics();
623 self.metrics.report(|metrics| {
624 metrics.number_of_transactions.set(extrinsics.len() as u64);
625 metrics.block_constructed.observe(block_took.as_secs_f64());
626 metrics.report_end_proposing_reason(end_reason);
627 metrics.create_block_proposal_time.observe(propose_took.as_secs_f64());
628 });
629
630 let extrinsics_summary = if extrinsics.is_empty() {
631 "no extrinsics".to_string()
632 } else {
633 format!(
634 "extrinsics ({}): [{}]",
635 extrinsics.len(),
636 extrinsics
637 .iter()
638 .map(|xt| BlakeTwo256::hash_of(xt).to_string())
639 .collect::<Vec<_>>()
640 .join(", ")
641 )
642 };
643
644 if log::log_enabled!(log::Level::Info) {
645 info!(
646 "🎁 Prepared block for proposing at {} ({} ms) hash: {:?}; parent_hash: {}; end: {:?}; extrinsics_count: {}",
647 block.header().number(),
648 block_took.as_millis(),
649 <Block as BlockT>::Hash::from(block.header().hash()),
650 block.header().parent_hash(),
651 end_reason,
652 extrinsics.len()
653 )
654 } else if log::log_enabled!(log::Level::Trace) {
655 trace!(
656 "🎁 Prepared block for proposing at {} ({} ms) hash: {:?}; parent_hash: {}; end: {:?}; {extrinsics_summary}",
657 block.header().number(),
658 block_took.as_millis(),
659 <Block as BlockT>::Hash::from(block.header().hash()),
660 block.header().parent_hash(),
661 end_reason
662 );
663 }
664
665 telemetry!(
666 self.telemetry;
667 CONSENSUS_INFO;
668 "prepared_block_for_proposing";
669 "number" => ?block.header().number(),
670 "hash" => ?<Block as BlockT>::Hash::from(block.header().hash()),
671 );
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678 use bizinikiwi_test_runtime_client::{
679 prelude::*,
680 runtime::{Block as TestBlock, Extrinsic, ExtrinsicBuilder, Transfer},
681 TestClientBuilder, TestClientBuilderExt,
682 };
683 use futures::executor::block_on;
684 use parking_lot::Mutex;
685 use pezsc_client_api::{Backend, TrieCacheContext};
686 use pezsc_transaction_pool::BasicPool;
687 use pezsc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool, TransactionSource};
688 use pezsp_api::Core;
689 use pezsp_blockchain::HeaderBackend;
690 use pezsp_consensus::{BlockOrigin, Environment};
691 use pezsp_runtime::{generic::BlockId, traits::NumberFor, Perbill};
692
693 const SOURCE: TransactionSource = TransactionSource::External;
694
695 const HUGE: u32 = 649000000;
704 const MEDIUM: u32 = 250000000;
705 const TINY: u32 = 1000;
706
707 fn extrinsic(nonce: u64) -> Extrinsic {
708 ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY)).nonce(nonce).build()
709 }
710
711 fn chain_event<B: BlockT>(header: B::Header) -> ChainEvent<B>
712 where
713 NumberFor<B>: From<u64>,
714 {
715 ChainEvent::NewBestBlock { hash: header.hash(), tree_route: None }
716 }
717
718 #[test]
719 fn should_cease_building_block_when_deadline_is_reached() {
720 let client = Arc::new(bizinikiwi_test_runtime_client::new());
722 let spawner = pezsp_core::testing::TaskExecutor::new();
723 let txpool = Arc::from(BasicPool::new_full(
724 Default::default(),
725 true.into(),
726 None,
727 spawner.clone(),
728 client.clone(),
729 ));
730
731 let hashof0 = client.info().genesis_hash;
732 block_on(txpool.submit_at(hashof0, SOURCE, vec![extrinsic(0), extrinsic(1)])).unwrap();
733
734 block_on(
735 txpool.maintain(chain_event(
736 client.expect_header(hashof0).expect("there should be header"),
737 )),
738 );
739
740 let mut proposer_factory =
741 ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
742
743 let cell = Mutex::new((false, time::Instant::now()));
744 let proposer = proposer_factory.init_with_now(
745 &client.expect_header(hashof0).unwrap(),
746 Box::new(move || {
747 let mut value = cell.lock();
748 if !value.0 {
749 value.0 = true;
750 return value.1;
751 }
752 let old = value.1;
753 let new = old + time::Duration::from_secs(1);
754 *value = (true, new);
755 old
756 }),
757 );
758
759 let deadline = time::Duration::from_secs(3);
761 let block = block_on(
762 proposer.propose_block(ProposeArgs { max_duration: deadline, ..Default::default() }),
763 )
764 .map(|r| r.block)
765 .unwrap();
766
767 assert_eq!(block.extrinsics().len(), 1);
770 assert_eq!(txpool.ready().count(), 2);
771 }
772
773 #[test]
774 fn should_not_panic_when_deadline_is_reached() {
775 let client = Arc::new(bizinikiwi_test_runtime_client::new());
776 let spawner = pezsp_core::testing::TaskExecutor::new();
777 let txpool = Arc::from(BasicPool::new_full(
778 Default::default(),
779 true.into(),
780 None,
781 spawner.clone(),
782 client.clone(),
783 ));
784
785 let mut proposer_factory =
786 ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
787
788 let cell = Mutex::new((false, time::Instant::now()));
789 let proposer = proposer_factory.init_with_now(
790 &client.expect_header(client.info().genesis_hash).unwrap(),
791 Box::new(move || {
792 let mut value = cell.lock();
793 if !value.0 {
794 value.0 = true;
795 return value.1;
796 }
797 let new = value.1 + time::Duration::from_secs(160);
798 *value = (true, new);
799 new
800 }),
801 );
802
803 let deadline = time::Duration::from_secs(1);
804 block_on(
805 proposer.propose_block(ProposeArgs { max_duration: deadline, ..Default::default() }),
806 )
807 .map(|r| r.block)
808 .unwrap();
809 }
810
811 #[test]
812 fn proposed_storage_changes_should_match_execute_block_storage_changes() {
813 let (client, backend) = TestClientBuilder::new().build_with_backend();
814 let client = Arc::new(client);
815 let spawner = pezsp_core::testing::TaskExecutor::new();
816 let txpool = Arc::from(BasicPool::new_full(
817 Default::default(),
818 true.into(),
819 None,
820 spawner.clone(),
821 client.clone(),
822 ));
823
824 let genesis_hash = client.info().best_hash;
825
826 block_on(txpool.submit_at(genesis_hash, SOURCE, vec![extrinsic(0)])).unwrap();
827
828 block_on(
829 txpool.maintain(chain_event(
830 client
831 .expect_header(client.info().genesis_hash)
832 .expect("there should be header"),
833 )),
834 );
835
836 let mut proposer_factory =
837 ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
838
839 let proposer = proposer_factory.init_with_now(
840 &client.header(genesis_hash).unwrap().unwrap(),
841 Box::new(move || time::Instant::now()),
842 );
843
844 let deadline = time::Duration::from_secs(9);
845 let proposal = block_on(
846 proposer.propose_block(ProposeArgs { max_duration: deadline, ..Default::default() }),
847 )
848 .unwrap();
849
850 assert_eq!(proposal.block.extrinsics().len(), 1);
851
852 let api = client.runtime_api();
853 api.execute_block(genesis_hash, proposal.block.into()).unwrap();
854
855 let state = backend.state_at(genesis_hash, TrieCacheContext::Untrusted).unwrap();
856
857 let storage_changes = api.into_storage_changes(&state, genesis_hash).unwrap();
858
859 assert_eq!(
860 proposal.storage_changes.transaction_storage_root,
861 storage_changes.transaction_storage_root,
862 );
863 }
864
865 #[test]
869 fn should_not_remove_invalid_transactions_from_the_same_sender_after_one_was_invalid() {
870 let client = Arc::new(bizinikiwi_test_runtime_client::new());
872 let spawner = pezsp_core::testing::TaskExecutor::new();
873 let txpool = Arc::from(BasicPool::new_full(
874 Default::default(),
875 true.into(),
876 None,
877 spawner.clone(),
878 client.clone(),
879 ));
880
881 let medium = |nonce| {
882 ExtrinsicBuilder::new_fill_block(Perbill::from_parts(MEDIUM))
883 .nonce(nonce)
884 .build()
885 };
886 let huge = |nonce| {
887 ExtrinsicBuilder::new_fill_block(Perbill::from_parts(HUGE)).nonce(nonce).build()
888 };
889
890 block_on(txpool.submit_at(
891 client.info().genesis_hash,
892 SOURCE,
893 vec![medium(0), medium(1), huge(2), medium(3), huge(4), medium(5), medium(6)],
894 ))
895 .unwrap();
896
897 let mut proposer_factory =
898 ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
899 let mut propose_block = |client: &TestClient,
900 parent_number,
901 expected_block_extrinsics,
902 expected_pool_transactions| {
903 let hash = client.expect_block_hash_from_id(&BlockId::Number(parent_number)).unwrap();
904 let proposer = proposer_factory.init_with_now(
905 &client.expect_header(hash).unwrap(),
906 Box::new(move || time::Instant::now()),
907 );
908
909 let deadline = time::Duration::from_secs(900);
911 let block = block_on(
912 proposer
913 .propose_block(ProposeArgs { max_duration: deadline, ..Default::default() }),
914 )
915 .map(|r| r.block)
916 .unwrap();
917
918 assert_eq!(
921 txpool.ready().count(),
922 expected_pool_transactions,
923 "at block: {}",
924 block.header.number
925 );
926 assert_eq!(
927 block.extrinsics().len(),
928 expected_block_extrinsics,
929 "at block: {}",
930 block.header.number
931 );
932
933 block
934 };
935
936 let import_and_maintain = |client: Arc<TestClient>, block: TestBlock| {
937 let hash = block.hash();
938 block_on(client.import(BlockOrigin::Own, block)).unwrap();
939 block_on(txpool.maintain(chain_event(
940 client.expect_header(hash).expect("there should be header"),
941 )));
942 };
943
944 block_on(
945 txpool.maintain(chain_event(
946 client
947 .expect_header(client.info().genesis_hash)
948 .expect("there should be header"),
949 )),
950 );
951 assert_eq!(txpool.ready().count(), 7);
952
953 let block = propose_block(&client, 0, 2, 7);
955 import_and_maintain(client.clone(), block.clone());
956 assert_eq!(txpool.ready().count(), 5);
957
958 let block = propose_block(&client, 1, 1, 5);
960 import_and_maintain(client.clone(), block.clone());
961 assert_eq!(txpool.ready().count(), 4);
962
963 let block = propose_block(&client, 2, 1, 4);
965 import_and_maintain(client.clone(), block.clone());
966 assert_eq!(txpool.ready().count(), 3);
967
968 let block = propose_block(&client, 3, 1, 3);
970 import_and_maintain(client.clone(), block.clone());
971 assert_eq!(txpool.ready().count(), 2);
972
973 let block = propose_block(&client, 4, 2, 2);
975 import_and_maintain(client.clone(), block.clone());
976 assert_eq!(txpool.ready().count(), 0);
977 }
978
979 #[test]
980 fn should_cease_building_block_when_block_limit_is_reached() {
981 let client = Arc::new(bizinikiwi_test_runtime_client::new());
982 let spawner = pezsp_core::testing::TaskExecutor::new();
983 let txpool = Arc::from(BasicPool::new_full(
984 Default::default(),
985 true.into(),
986 None,
987 spawner.clone(),
988 client.clone(),
989 ));
990 let genesis_hash = client.info().genesis_hash;
991 let genesis_header = client.expect_header(genesis_hash).expect("there should be header");
992
993 let extrinsics_num = 5;
994 let extrinsics = std::iter::once(
995 Transfer {
996 from: Sr25519Keyring::Alice.into(),
997 to: Sr25519Keyring::Bob.into(),
998 amount: 100,
999 nonce: 0,
1000 }
1001 .into_unchecked_extrinsic(),
1002 )
1003 .chain((1..extrinsics_num as u64).map(extrinsic))
1004 .collect::<Vec<_>>();
1005
1006 let block_limit = genesis_header.encoded_size()
1007 + extrinsics
1008 .iter()
1009 .take(extrinsics_num - 1)
1010 .map(Encode::encoded_size)
1011 .sum::<usize>()
1012 + Vec::<Extrinsic>::new().encoded_size();
1013
1014 block_on(txpool.submit_at(genesis_hash, SOURCE, extrinsics.clone())).unwrap();
1015
1016 block_on(txpool.maintain(chain_event(genesis_header.clone())));
1017
1018 let mut proposer_factory =
1019 ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
1020
1021 let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
1022
1023 let deadline = time::Duration::from_secs(300);
1025 let block = block_on(proposer.propose_block(ProposeArgs {
1026 max_duration: deadline,
1027 block_size_limit: Some(block_limit),
1028 ..Default::default()
1029 }))
1030 .map(|r| r.block)
1031 .unwrap();
1032
1033 assert_eq!(block.extrinsics().len(), extrinsics_num - 1);
1035
1036 let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
1037
1038 let block = block_on(
1039 proposer.propose_block(ProposeArgs { max_duration: deadline, ..Default::default() }),
1040 )
1041 .map(|r| r.block)
1042 .unwrap();
1043
1044 assert_eq!(block.extrinsics().len(), extrinsics_num);
1046
1047 let mut proposer_factory = ProposerFactory::with_proof_recording(
1048 spawner.clone(),
1049 client.clone(),
1050 txpool.clone(),
1051 None,
1052 None,
1053 );
1054
1055 let proposer = block_on(proposer_factory.init(&genesis_header)).unwrap();
1056
1057 let block_limit = {
1060 let builder = BlockBuilderBuilder::new(&*client)
1061 .on_parent_block(genesis_header.hash())
1062 .with_parent_block_number(0)
1063 .enable_proof_recording()
1064 .build()
1065 .unwrap();
1066 builder.estimate_block_size(true) + extrinsics[0].encoded_size()
1067 };
1068 let block = block_on(proposer.propose_block(ProposeArgs {
1069 max_duration: deadline,
1070 block_size_limit: Some(block_limit),
1071 ..Default::default()
1072 }))
1073 .map(|r| r.block)
1074 .unwrap();
1075
1076 assert_eq!(block.extrinsics().len(), 1);
1080 }
1081
1082 #[test]
1083 fn should_keep_adding_transactions_after_exhausts_resources_before_soft_deadline() {
1084 let client = Arc::new(bizinikiwi_test_runtime_client::new());
1086 let spawner = pezsp_core::testing::TaskExecutor::new();
1087 let txpool = Arc::from(BasicPool::new_full(
1088 Default::default(),
1089 true.into(),
1090 None,
1091 spawner.clone(),
1092 client.clone(),
1093 ));
1094 let genesis_hash = client.info().genesis_hash;
1095
1096 let tiny = |nonce| {
1097 ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY)).nonce(nonce).build()
1098 };
1099 let huge = |who| {
1100 ExtrinsicBuilder::new_fill_block(Perbill::from_parts(HUGE))
1101 .signer(Sr25519Keyring::numeric(who))
1102 .build()
1103 };
1104
1105 block_on(
1106 txpool.submit_at(
1107 genesis_hash,
1108 SOURCE,
1109 (0..MAX_SKIPPED_TRANSACTIONS * 2)
1111 .into_iter()
1112 .map(huge)
1113 .chain((0..MAX_SKIPPED_TRANSACTIONS as u64).into_iter().map(tiny))
1115 .collect(),
1116 ),
1117 )
1118 .unwrap();
1119
1120 block_on(txpool.maintain(chain_event(
1121 client.expect_header(genesis_hash).expect("there should be header"),
1122 )));
1123 assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 3);
1124
1125 let mut proposer_factory =
1126 ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
1127
1128 let cell = Mutex::new(time::Instant::now());
1129 let proposer = proposer_factory.init_with_now(
1130 &client.expect_header(genesis_hash).unwrap(),
1131 Box::new(move || {
1132 let mut value = cell.lock();
1133 let old = *value;
1134 *value = old + time::Duration::from_secs(1);
1135 old
1136 }),
1137 );
1138
1139 let deadline = time::Duration::from_secs(900);
1142 let block = block_on(
1143 proposer.propose_block(ProposeArgs { max_duration: deadline, ..Default::default() }),
1144 )
1145 .map(|r| r.block)
1146 .unwrap();
1147
1148 assert_eq!(block.extrinsics().len(), MAX_SKIPPED_TRANSACTIONS + 1);
1150 }
1151
1152 #[test]
1153 fn should_only_skip_up_to_some_limit_after_soft_deadline() {
1154 let client = Arc::new(bizinikiwi_test_runtime_client::new());
1156 let spawner = pezsp_core::testing::TaskExecutor::new();
1157 let txpool = Arc::from(BasicPool::new_full(
1158 Default::default(),
1159 true.into(),
1160 None,
1161 spawner.clone(),
1162 client.clone(),
1163 ));
1164 let genesis_hash = client.info().genesis_hash;
1165
1166 let tiny = |who| {
1167 ExtrinsicBuilder::new_fill_block(Perbill::from_parts(TINY))
1168 .signer(Sr25519Keyring::numeric(who))
1169 .nonce(1)
1170 .build()
1171 };
1172 let huge = |who| {
1173 ExtrinsicBuilder::new_fill_block(Perbill::from_parts(HUGE))
1174 .signer(Sr25519Keyring::numeric(who))
1175 .build()
1176 };
1177
1178 block_on(
1179 txpool.submit_at(
1180 genesis_hash,
1181 SOURCE,
1182 (0..MAX_SKIPPED_TRANSACTIONS + 2)
1183 .into_iter()
1184 .map(huge)
1185 .chain((0..MAX_SKIPPED_TRANSACTIONS + 2).into_iter().map(tiny))
1187 .collect(),
1188 ),
1189 )
1190 .unwrap();
1191
1192 block_on(txpool.maintain(chain_event(
1193 client.expect_header(genesis_hash).expect("there should be header"),
1194 )));
1195 assert_eq!(txpool.ready().count(), MAX_SKIPPED_TRANSACTIONS * 2 + 4);
1196
1197 let mut proposer_factory =
1198 ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
1199
1200 let deadline = time::Duration::from_secs(600);
1201 let cell = Arc::new(Mutex::new((0, time::Instant::now())));
1202 let cell2 = cell.clone();
1203 let proposer = proposer_factory.init_with_now(
1204 &client.expect_header(genesis_hash).unwrap(),
1205 Box::new(move || {
1206 let mut value = cell.lock();
1207 let (called, old) = *value;
1208 let increase = if called == 1 {
1210 deadline / 2
1212 } else {
1213 time::Duration::from_millis(0)
1215 };
1216 *value = (called + 1, old + increase);
1217 old
1218 }),
1219 );
1220
1221 let block = block_on(
1222 proposer.propose_block(ProposeArgs { max_duration: deadline, ..Default::default() }),
1223 )
1224 .map(|r| r.block)
1225 .unwrap();
1226
1227 assert!(
1231 (1..3).contains(&block.extrinsics().len()),
1232 "Block shall contain one or two extrinsics."
1233 );
1234 assert!(
1235 cell2.lock().0 > MAX_SKIPPED_TRANSACTIONS,
1236 "Not enough calls to current time, which indicates the test might have ended because of deadline, not soft deadline"
1237 );
1238 }
1239}