1use super::{metrics::MetricsLink as PrometheusMetrics, revalidation};
22pub use crate::{
23 api::FullChainApi,
24 graph::{ChainApi, ValidatedTransaction},
25};
26use crate::{
27 common::{
28 enactment_state::{EnactmentAction, EnactmentState},
29 error,
30 tracing_log_xt::log_xt_trace,
31 },
32 graph::{
33 self, base_pool::TimedTransactionSource, EventHandler, ExtrinsicHash, IsValidator,
34 RawExtrinsicFor,
35 },
36 ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
37};
38use async_trait::async_trait;
39use futures::{channel::oneshot, future, prelude::*, Future, FutureExt};
40use parking_lot::Mutex;
41use prometheus_endpoint::Registry as PrometheusRegistry;
42use sc_transaction_pool_api::{
43 error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
44 PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor,
45 TxHash, TxInvalidityReportMap,
46};
47use sp_blockchain::{HashAndNumber, TreeRoute};
48use sp_core::traits::SpawnEssentialNamed;
49use sp_runtime::{
50 generic::BlockId,
51 traits::{
52 AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, SaturatedConversion, Zero,
53 },
54 transaction_validity::{TransactionTag as Tag, TransactionValidityError},
55};
56use std::{
57 collections::{HashMap, HashSet},
58 pin::Pin,
59 sync::Arc,
60 time::Instant,
61};
62use tokio::select;
63use tracing::{trace, warn};
64
65pub struct BasicPool<PoolApi, Block>
67where
68 Block: BlockT,
69 PoolApi: graph::ChainApi<Block = Block>,
70{
71 pool: Arc<graph::Pool<PoolApi, ()>>,
72 api: Arc<PoolApi>,
73 revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
74 revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
75 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
76 metrics: PrometheusMetrics,
77 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
78}
79
80struct ReadyPoll<T, Block: BlockT> {
81 updated_at: NumberFor<Block>,
82 pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
83}
84
85impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
86 fn default() -> Self {
87 Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
88 }
89}
90
91impl<T, Block: BlockT> ReadyPoll<T, Block> {
92 fn new(best_block_number: NumberFor<Block>) -> Self {
93 Self { updated_at: best_block_number, pollers: Default::default() }
94 }
95
96 fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
97 self.updated_at = number;
98
99 let mut idx = 0;
100 while idx < self.pollers.len() {
101 if self.pollers[idx].0 <= number {
102 let poller_sender = self.pollers.swap_remove(idx);
103 trace!(
104 target: LOG_TARGET,
105 ?number,
106 "Sending ready signal."
107 );
108 let _ = poller_sender.1.send(iterator_factory());
109 } else {
110 idx += 1;
111 }
112 }
113 }
114
115 fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
116 let (sender, receiver) = oneshot::channel();
117 self.pollers.push((number, sender));
118 receiver
119 }
120
121 fn updated_at(&self) -> NumberFor<Block> {
122 self.updated_at
123 }
124}
125
126pub enum RevalidationType {
128 Light,
135
136 Full,
141}
142
143impl<PoolApi, Block> BasicPool<PoolApi, Block>
144where
145 Block: BlockT,
146 PoolApi: graph::ChainApi<Block = Block> + 'static,
147{
148 pub fn new_test(
150 pool_api: Arc<PoolApi>,
151 best_block_hash: Block::Hash,
152 finalized_hash: Block::Hash,
153 options: graph::Options,
154 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
155 let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
156 options,
157 true.into(),
158 pool_api.clone(),
159 ));
160 let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
161 pool_api.clone(),
162 pool.clone(),
163 finalized_hash,
164 );
165 (
166 Self {
167 api: pool_api,
168 pool,
169 revalidation_queue: Arc::new(revalidation_queue),
170 revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
171 ready_poll: Default::default(),
172 metrics: Default::default(),
173 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
174 best_block_hash,
175 finalized_hash,
176 ))),
177 },
178 background_task,
179 )
180 }
181
182 pub fn with_revalidation_type(
185 options: graph::Options,
186 is_validator: IsValidator,
187 pool_api: Arc<PoolApi>,
188 prometheus: Option<&PrometheusRegistry>,
189 revalidation_type: RevalidationType,
190 spawner: impl SpawnEssentialNamed,
191 best_block_number: NumberFor<Block>,
192 best_block_hash: Block::Hash,
193 finalized_hash: Block::Hash,
194 ) -> Self {
195 let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
196 options,
197 is_validator,
198 pool_api.clone(),
199 ));
200 let (revalidation_queue, background_task) = match revalidation_type {
201 RevalidationType::Light => {
202 (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None)
203 },
204 RevalidationType::Full => {
205 let (queue, background) = revalidation::RevalidationQueue::new_background(
206 pool_api.clone(),
207 pool.clone(),
208 finalized_hash,
209 );
210 (queue, Some(background))
211 },
212 };
213
214 if let Some(background_task) = background_task {
215 spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
216 }
217
218 Self {
219 api: pool_api,
220 pool,
221 revalidation_queue: Arc::new(revalidation_queue),
222 revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
223 RevalidationType::Light => {
224 RevalidationStrategy::Light(RevalidationStatus::NotScheduled)
225 },
226 RevalidationType::Full => RevalidationStrategy::Always,
227 })),
228 ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
229 metrics: PrometheusMetrics::new(prometheus),
230 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
231 best_block_hash,
232 finalized_hash,
233 ))),
234 }
235 }
236
237 pub fn pool(&self) -> &Arc<graph::Pool<PoolApi, ()>> {
239 &self.pool
240 }
241
242 pub fn api(&self) -> &PoolApi {
244 &self.api
245 }
246
247 async fn ready_at_with_timeout_internal(
248 &self,
249 at: Block::Hash,
250 timeout: std::time::Duration,
251 ) -> ReadyIteratorFor<PoolApi> {
252 select! {
253 ready = self.ready_at(at)=> ready,
254 _ = futures_timer::Delay::new(timeout)=> self.ready()
255 }
256 }
257}
258
259#[async_trait]
260impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
261where
262 Block: BlockT,
263 PoolApi: 'static + graph::ChainApi<Block = Block>,
264{
265 type Block = PoolApi::Block;
266 type Hash = graph::ExtrinsicHash<PoolApi>;
267 type InPoolTransaction =
268 graph::base_pool::Transaction<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
269 type Error = PoolApi::Error;
270
271 async fn submit_at(
272 &self,
273 at: <Self::Block as BlockT>::Hash,
274 source: TransactionSource,
275 xts: Vec<TransactionFor<Self>>,
276 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
277 let pool = self.pool.clone();
278 let xts = xts
279 .into_iter()
280 .map(|xt| {
281 (TimedTransactionSource::from_transaction_source(source, false), Arc::from(xt))
282 })
283 .collect::<Vec<_>>();
284
285 self.metrics
286 .report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
287
288 let number = self.api.resolve_block_number(at);
289 let at = HashAndNumber { hash: at, number: number? };
290 Ok(pool
291 .submit_at(&at, xts, ValidateTransactionPriority::Submitted)
292 .await
293 .into_iter()
294 .map(|result| result.map(|outcome| outcome.hash()))
295 .collect())
296 }
297
298 async fn submit_one(
299 &self,
300 at: <Self::Block as BlockT>::Hash,
301 source: TransactionSource,
302 xt: TransactionFor<Self>,
303 ) -> Result<TxHash<Self>, Self::Error> {
304 let pool = self.pool.clone();
305 let xt = Arc::from(xt);
306
307 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
308
309 let number = self.api.resolve_block_number(at);
310 let at = HashAndNumber { hash: at, number: number? };
311 pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt)
312 .await
313 .map(|outcome| outcome.hash())
314 }
315
316 async fn submit_and_watch(
317 &self,
318 at: <Self::Block as BlockT>::Hash,
319 source: TransactionSource,
320 xt: TransactionFor<Self>,
321 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
322 let pool = self.pool.clone();
323 let xt = Arc::from(xt);
324
325 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
326
327 let number = self.api.resolve_block_number(at);
328
329 let at = HashAndNumber { hash: at, number: number? };
330 pool.submit_and_watch(
331 &at,
332 TimedTransactionSource::from_transaction_source(source, false),
333 xt,
334 )
335 .await
336 .map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
337 }
338
339 async fn report_invalid(
340 &self,
341 _at: Option<<Self::Block as BlockT>::Hash>,
342 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
343 ) -> Vec<Arc<Self::InPoolTransaction>> {
344 let hashes = invalid_tx_errors.keys().map(|h| *h).collect::<Vec<_>>();
345 let removed = self.pool.validated_pool().remove_invalid(&hashes);
346 self.metrics
347 .report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
348 removed
349 }
350
351 fn status(&self) -> PoolStatus {
352 self.pool.validated_pool().status()
353 }
354
355 fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
356 self.pool.validated_pool().import_notification_stream()
357 }
358
359 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
360 self.pool.hash_of(xt)
361 }
362
363 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
364 self.pool.validated_pool().on_broadcasted(propagations)
365 }
366
367 fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
368 self.pool.validated_pool().ready_by_hash(hash)
369 }
370
371 async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<PoolApi> {
372 let Ok(at) = self.api.resolve_block_number(at) else {
373 return Box::new(std::iter::empty()) as Box<_>;
374 };
375
376 let status = self.status();
377 if status.ready == 0 && status.future == 0 {
382 return Box::new(std::iter::empty()) as Box<_>;
383 }
384
385 if self.ready_poll.lock().updated_at() >= at {
386 trace!(
387 target: LOG_TARGET,
388 ?at,
389 "Transaction pool already processed block."
390 );
391 let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
392 return iterator;
393 }
394
395 let result = self.ready_poll.lock().add(at).map(|received| {
396 received.unwrap_or_else(|error| {
397 warn!(target: LOG_TARGET, ?error, "Error receiving pending set.");
398 Box::new(std::iter::empty())
399 })
400 });
401
402 result.await
403 }
404
405 fn ready(&self) -> ReadyIteratorFor<PoolApi> {
406 Box::new(self.pool.validated_pool().ready())
407 }
408
409 fn futures(&self) -> Vec<Self::InPoolTransaction> {
410 let pool = self.pool.validated_pool().pool.read();
411 pool.futures().cloned().collect::<Vec<_>>()
412 }
413
414 async fn ready_at_with_timeout(
415 &self,
416 at: <Self::Block as BlockT>::Hash,
417 timeout: std::time::Duration,
418 ) -> ReadyIteratorFor<PoolApi> {
419 self.ready_at_with_timeout_internal(at, timeout).await
420 }
421}
422
423impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
424where
425 Block: BlockT,
426 Client: sp_api::ProvideRuntimeApi<Block>
427 + sc_client_api::BlockBackend<Block>
428 + sc_client_api::blockchain::HeaderBackend<Block>
429 + sp_runtime::traits::BlockIdTo<Block>
430 + sc_client_api::ExecutorProvider<Block>
431 + sc_client_api::UsageProvider<Block>
432 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>
433 + Send
434 + Sync
435 + 'static,
436 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
437{
438 pub fn new_full(
440 options: graph::Options,
441 is_validator: IsValidator,
442 prometheus: Option<&PrometheusRegistry>,
443 spawner: impl SpawnEssentialNamed,
444 client: Arc<Client>,
445 ) -> Self {
446 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
447 let pool = Self::with_revalidation_type(
448 options,
449 is_validator,
450 pool_api,
451 prometheus,
452 RevalidationType::Full,
453 spawner,
454 client.usage_info().chain.best_number,
455 client.usage_info().chain.best_hash,
456 client.usage_info().chain.finalized_hash,
457 );
458
459 pool
460 }
461}
462
463impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
464 for BasicPool<FullChainApi<Client, Block>, Block>
465where
466 Block: BlockT,
467 Client: sp_api::ProvideRuntimeApi<Block>
468 + sc_client_api::BlockBackend<Block>
469 + sc_client_api::blockchain::HeaderBackend<Block>
470 + sp_runtime::traits::BlockIdTo<Block>
471 + sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
472 Client: Send + Sync + 'static,
473 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
474{
475 type Block = Block;
476 type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
477 type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
478
479 fn submit_local(
480 &self,
481 at: Block::Hash,
482 xt: sc_transaction_pool_api::LocalTransactionFor<Self>,
483 ) -> Result<Self::Hash, Self::Error> {
484 let validity = self
485 .api
486 .validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
487 .map_err(|e| {
488 Self::Error::Pool(match e {
489 TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
490 TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
491 })
492 })?;
493
494 let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
495 let block_number = self
496 .api
497 .block_id_to_number(&BlockId::hash(at))?
498 .ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
499
500 let validated = ValidatedTransaction::valid_at(
501 block_number.saturated_into::<u64>(),
502 hash,
503 TimedTransactionSource::new_local(false),
504 Arc::from(xt),
505 bytes,
506 validity,
507 );
508
509 self.pool
510 .validated_pool()
511 .submit(vec![validated])
512 .remove(0)
513 .map(|outcome| outcome.hash())
514 }
515}
516
517#[cfg_attr(test, derive(Debug))]
518enum RevalidationStatus<N> {
519 NotScheduled,
521 Scheduled(Option<Instant>, Option<N>),
523 InProgress,
525}
526
527enum RevalidationStrategy<N> {
528 Always,
529 Light(RevalidationStatus<N>),
530}
531
532struct RevalidationAction {
533 revalidate: bool,
534 resubmit: bool,
535}
536
537impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
538 pub fn clear(&mut self) {
539 if let Self::Light(status) = self {
540 status.clear()
541 }
542 }
543
544 pub fn next(
545 &mut self,
546 block: N,
547 revalidate_time_period: Option<std::time::Duration>,
548 revalidate_block_period: Option<N>,
549 ) -> RevalidationAction {
550 match self {
551 Self::Light(status) => RevalidationAction {
552 revalidate: status.next_required(
553 block,
554 revalidate_time_period,
555 revalidate_block_period,
556 ),
557 resubmit: false,
558 },
559 Self::Always => RevalidationAction { revalidate: true, resubmit: true },
560 }
561 }
562}
563
564impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
565 pub fn clear(&mut self) {
567 *self = Self::NotScheduled;
568 }
569
570 pub fn next_required(
572 &mut self,
573 block: N,
574 revalidate_time_period: Option<std::time::Duration>,
575 revalidate_block_period: Option<N>,
576 ) -> bool {
577 match *self {
578 Self::NotScheduled => {
579 *self = Self::Scheduled(
580 revalidate_time_period.map(|period| Instant::now() + period),
581 revalidate_block_period.map(|period| block + period),
582 );
583 false
584 },
585 Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
586 let is_required =
587 revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false) ||
588 revalidate_at_block.map(|at| block >= at).unwrap_or(false);
589 if is_required {
590 *self = Self::InProgress;
591 }
592 is_required
593 },
594 Self::InProgress => false,
595 }
596 }
597}
598
599pub async fn prune_known_txs_for_block<
603 Block: BlockT,
604 Api: graph::ChainApi<Block = Block>,
605 L: EventHandler<Api>,
606>(
607 at: &HashAndNumber<Block>,
608 api: &Api,
609 pool: &graph::Pool<Api, L>,
610 extrinsics: Option<Vec<RawExtrinsicFor<Api>>>,
611 known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<Api>, Vec<Tag>>>>,
612) -> Vec<ExtrinsicHash<Api>> {
613 let extrinsics = match extrinsics {
614 Some(xts) => xts,
615 None => api
616 .block_body(at.hash)
617 .await
618 .unwrap_or_else(|error| {
619 warn!(target: LOG_TARGET, ?error, "Prune known transactions: error request.");
620 None
621 })
622 .unwrap_or_default(),
623 };
624
625 let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
626
627 let header = match api.block_header(at.hash) {
628 Ok(Some(h)) => h,
629 Ok(None) => {
630 trace!(target: LOG_TARGET, hash = ?at.hash, "Could not find header.");
631 return hashes;
632 },
633 Err(error) => {
634 trace!(target: LOG_TARGET, hash = ?at.hash, ?error, "Error retrieving header.");
635 return hashes;
636 },
637 };
638
639 log_xt_trace!(target: LOG_TARGET, &hashes, "Pruning transaction.");
640
641 pool.prune(at, *header.parent_hash(), &extrinsics, known_provides_tags).await;
642 hashes
643}
644
645impl<PoolApi, Block> BasicPool<PoolApi, Block>
646where
647 Block: BlockT,
648 PoolApi: 'static + graph::ChainApi<Block = Block>,
649{
650 async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
654 trace!(target: LOG_TARGET, ?tree_route, "handle_enactment tree_route.");
655 let pool = self.pool.clone();
656 let api = self.api.clone();
657
658 let hash_and_number = match tree_route.last() {
659 Some(hash_and_number) => hash_and_number,
660 None => {
661 warn!(target: LOG_TARGET, ?tree_route, "Skipping ChainEvent - no last block in tree route.");
662 return;
663 },
664 };
665
666 let next_action = self.revalidation_strategy.lock().next(
667 hash_and_number.number,
668 Some(std::time::Duration::from_secs(60)),
669 Some(20u32.into()),
670 );
671
672 let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
675
676 for retracted in tree_route.retracted() {
682 pool.validated_pool().on_block_retracted(retracted.hash);
684 }
685
686 future::join_all(
687 tree_route
688 .enacted()
689 .iter()
690 .map(|h| prune_known_txs_for_block(h, &*api, &*pool, None, None)),
691 )
692 .await
693 .into_iter()
694 .for_each(|enacted_log| {
695 pruned_log.extend(enacted_log);
696 });
697
698 self.metrics
699 .report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
700
701 if next_action.resubmit {
702 let mut resubmit_transactions = Vec::new();
703
704 for retracted in tree_route.retracted() {
705 let hash = retracted.hash;
706
707 let block_transactions = api
708 .block_body(hash)
709 .await
710 .unwrap_or_else(|error| {
711 warn!(target: LOG_TARGET, ?error, "Failed to fetch block body.");
712 None
713 })
714 .unwrap_or_default()
715 .into_iter();
716
717 let mut resubmitted_to_report = 0;
718
719 resubmit_transactions.extend(
720 block_transactions.into_iter().map(Arc::from).filter_map(|tx| {
722 let tx_hash = pool.hash_of(&tx);
723 let contains = pruned_log.contains(&tx_hash);
724
725 resubmitted_to_report += 1;
727
728 if !contains {
729 trace!(target: LOG_TARGET, ?tx_hash, ?hash, "Resubmitting from retracted block.");
730 Some((
731 TimedTransactionSource::new_external(false),
734 tx,
735 ))
736 } else {
737 None
738 }
739 }),
740 );
741
742 self.metrics.report(|metrics| {
743 metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
744 });
745 }
746
747 pool.resubmit_at(
748 &hash_and_number,
749 resubmit_transactions,
750 ValidateTransactionPriority::Submitted,
751 )
752 .await;
753 }
754
755 let extra_pool = pool.clone();
756 self.ready_poll
759 .lock()
760 .trigger(hash_and_number.number, move || Box::new(extra_pool.validated_pool().ready()));
761
762 if next_action.revalidate {
763 let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
764 self.revalidation_queue.revalidate_later(hash_and_number.hash, hashes).await;
765
766 self.revalidation_strategy.lock().clear();
767 }
768 }
769}
770
771#[async_trait]
772impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
773where
774 Block: BlockT,
775 PoolApi: 'static + graph::ChainApi<Block = Block>,
776{
777 async fn maintain(&self, event: ChainEvent<Self::Block>) {
778 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
779 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
780 match self.api.tree_route(from, to) {
781 Ok(tree_route) => Ok(tree_route),
782 Err(e) => {
783 return Err(format!(
784 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
785 ))
786 },
787 }
788 };
789 let block_id_to_number =
790 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
791
792 let result =
793 self.enactment_state
794 .lock()
795 .update(&event, &compute_tree_route, &block_id_to_number);
796
797 match result {
798 Err(error) => {
799 trace!(target: LOG_TARGET, %error, "enactment state update");
800 self.enactment_state.lock().force_update(&event);
801 },
802 Ok(EnactmentAction::Skip) => return,
803 Ok(EnactmentAction::HandleFinalization) => {},
804 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
805 self.handle_enactment(tree_route).await;
806 },
807 };
808
809 if let ChainEvent::Finalized { hash, tree_route } = event {
810 trace!(
811 target: LOG_TARGET,
812 ?tree_route,
813 ?prev_finalized_block,
814 "on-finalized enacted"
815 );
816
817 for hash in tree_route.iter().chain(std::iter::once(&hash)) {
818 if let Err(error) = self.pool.validated_pool().on_block_finalized(*hash).await {
819 warn!(
820 target: LOG_TARGET,
821 ?hash,
822 ?error,
823 "Error occurred while attempting to notify watchers about finalization"
824 );
825 }
826 }
827 }
828 }
829}