1use super::{metrics::MetricsLink as PrometheusMetrics, revalidation};
10pub use crate::{
11 api::FullChainApi,
12 graph::{ChainApi, ValidatedTransaction},
13};
14use crate::{
15 common::{
16 enactment_state::{EnactmentAction, EnactmentState},
17 error,
18 tracing_log_xt::log_xt_trace,
19 },
20 graph::{
21 self, base_pool::TimedTransactionSource, EventHandler, ExtrinsicHash, IsValidator,
22 RawExtrinsicFor,
23 },
24 ReadyIteratorFor, ValidateTransactionPriority, LOG_TARGET,
25};
26use async_trait::async_trait;
27use futures::{channel::oneshot, future, prelude::*, Future, FutureExt};
28use parking_lot::Mutex;
29use soil_prometheus::Registry as PrometheusRegistry;
30use soil_client::blockchain::{HashAndNumber, TreeRoute};
31use soil_client::transaction_pool::{
32 error::Error as TxPoolError, ChainEvent, ImportNotificationStream, MaintainedTransactionPool,
33 PoolStatus, TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor,
34 TxHash, TxInvalidityReportMap,
35};
36use std::{
37 collections::{HashMap, HashSet},
38 pin::Pin,
39 sync::Arc,
40 time::Instant,
41};
42use subsoil::core::traits::SpawnEssentialNamed;
43use subsoil::runtime::{
44 generic::BlockId,
45 traits::{
46 AtLeast32Bit, Block as BlockT, Header as HeaderT, NumberFor, SaturatedConversion, Zero,
47 },
48 transaction_validity::{TransactionTag as Tag, TransactionValidityError},
49};
50use tokio::select;
51use tracing::{trace, warn};
52
53pub struct BasicPool<PoolApi, Block>
55where
56 Block: BlockT,
57 PoolApi: graph::ChainApi<Block = Block>,
58{
59 pool: Arc<graph::Pool<PoolApi, ()>>,
60 api: Arc<PoolApi>,
61 revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
62 revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
63 ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
64 metrics: PrometheusMetrics,
65 enactment_state: Arc<Mutex<EnactmentState<Block>>>,
66}
67
68struct ReadyPoll<T, Block: BlockT> {
69 updated_at: NumberFor<Block>,
70 pollers: Vec<(NumberFor<Block>, oneshot::Sender<T>)>,
71}
72
73impl<T, Block: BlockT> Default for ReadyPoll<T, Block> {
74 fn default() -> Self {
75 Self { updated_at: NumberFor::<Block>::zero(), pollers: Default::default() }
76 }
77}
78
79impl<T, Block: BlockT> ReadyPoll<T, Block> {
80 fn new(best_block_number: NumberFor<Block>) -> Self {
81 Self { updated_at: best_block_number, pollers: Default::default() }
82 }
83
84 fn trigger(&mut self, number: NumberFor<Block>, iterator_factory: impl Fn() -> T) {
85 self.updated_at = number;
86
87 let mut idx = 0;
88 while idx < self.pollers.len() {
89 if self.pollers[idx].0 <= number {
90 let poller_sender = self.pollers.swap_remove(idx);
91 trace!(
92 target: LOG_TARGET,
93 ?number,
94 "Sending ready signal."
95 );
96 let _ = poller_sender.1.send(iterator_factory());
97 } else {
98 idx += 1;
99 }
100 }
101 }
102
103 fn add(&mut self, number: NumberFor<Block>) -> oneshot::Receiver<T> {
104 let (sender, receiver) = oneshot::channel();
105 self.pollers.push((number, sender));
106 receiver
107 }
108
109 fn updated_at(&self) -> NumberFor<Block> {
110 self.updated_at
111 }
112}
113
114pub enum RevalidationType {
116 Light,
123
124 Full,
129}
130
131impl<PoolApi, Block> BasicPool<PoolApi, Block>
132where
133 Block: BlockT,
134 PoolApi: graph::ChainApi<Block = Block> + 'static,
135{
136 pub fn new_test(
138 pool_api: Arc<PoolApi>,
139 best_block_hash: Block::Hash,
140 finalized_hash: Block::Hash,
141 options: graph::Options,
142 ) -> (Self, Pin<Box<dyn Future<Output = ()> + Send>>) {
143 let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
144 options,
145 true.into(),
146 pool_api.clone(),
147 ));
148 let (revalidation_queue, background_task) = revalidation::RevalidationQueue::new_background(
149 pool_api.clone(),
150 pool.clone(),
151 finalized_hash,
152 );
153 (
154 Self {
155 api: pool_api,
156 pool,
157 revalidation_queue: Arc::new(revalidation_queue),
158 revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
159 ready_poll: Default::default(),
160 metrics: Default::default(),
161 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
162 best_block_hash,
163 finalized_hash,
164 ))),
165 },
166 background_task,
167 )
168 }
169
170 pub fn with_revalidation_type(
173 options: graph::Options,
174 is_validator: IsValidator,
175 pool_api: Arc<PoolApi>,
176 prometheus: Option<&PrometheusRegistry>,
177 revalidation_type: RevalidationType,
178 spawner: impl SpawnEssentialNamed,
179 best_block_number: NumberFor<Block>,
180 best_block_hash: Block::Hash,
181 finalized_hash: Block::Hash,
182 ) -> Self {
183 let pool = Arc::new(graph::Pool::new_with_staticly_sized_rotator(
184 options,
185 is_validator,
186 pool_api.clone(),
187 ));
188 let (revalidation_queue, background_task) = match revalidation_type {
189 RevalidationType::Light => {
190 (revalidation::RevalidationQueue::new(pool_api.clone(), pool.clone()), None)
191 },
192 RevalidationType::Full => {
193 let (queue, background) = revalidation::RevalidationQueue::new_background(
194 pool_api.clone(),
195 pool.clone(),
196 finalized_hash,
197 );
198 (queue, Some(background))
199 },
200 };
201
202 if let Some(background_task) = background_task {
203 spawner.spawn_essential("txpool-background", Some("transaction-pool"), background_task);
204 }
205
206 Self {
207 api: pool_api,
208 pool,
209 revalidation_queue: Arc::new(revalidation_queue),
210 revalidation_strategy: Arc::new(Mutex::new(match revalidation_type {
211 RevalidationType::Light => {
212 RevalidationStrategy::Light(RevalidationStatus::NotScheduled)
213 },
214 RevalidationType::Full => RevalidationStrategy::Always,
215 })),
216 ready_poll: Arc::new(Mutex::new(ReadyPoll::new(best_block_number))),
217 metrics: PrometheusMetrics::new(prometheus),
218 enactment_state: Arc::new(Mutex::new(EnactmentState::new(
219 best_block_hash,
220 finalized_hash,
221 ))),
222 }
223 }
224
225 pub fn pool(&self) -> &Arc<graph::Pool<PoolApi, ()>> {
227 &self.pool
228 }
229
230 pub fn api(&self) -> &PoolApi {
232 &self.api
233 }
234
235 async fn ready_at_with_timeout_internal(
236 &self,
237 at: Block::Hash,
238 timeout: std::time::Duration,
239 ) -> ReadyIteratorFor<PoolApi> {
240 select! {
241 ready = self.ready_at(at)=> ready,
242 _ = futures_timer::Delay::new(timeout)=> self.ready()
243 }
244 }
245}
246
247#[async_trait]
248impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
249where
250 Block: BlockT,
251 PoolApi: 'static + graph::ChainApi<Block = Block>,
252{
253 type Block = PoolApi::Block;
254 type Hash = graph::ExtrinsicHash<PoolApi>;
255 type InPoolTransaction =
256 graph::base_pool::Transaction<graph::ExtrinsicHash<PoolApi>, graph::ExtrinsicFor<PoolApi>>;
257 type Error = PoolApi::Error;
258
259 async fn submit_at(
260 &self,
261 at: <Self::Block as BlockT>::Hash,
262 source: TransactionSource,
263 xts: Vec<TransactionFor<Self>>,
264 ) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
265 let pool = self.pool.clone();
266 let xts = xts
267 .into_iter()
268 .map(|xt| {
269 (TimedTransactionSource::from_transaction_source(source, false), Arc::from(xt))
270 })
271 .collect::<Vec<_>>();
272
273 self.metrics
274 .report(|metrics| metrics.submitted_transactions.inc_by(xts.len() as u64));
275
276 let number = self.api.resolve_block_number(at);
277 let at = HashAndNumber { hash: at, number: number? };
278 Ok(pool
279 .submit_at(&at, xts, ValidateTransactionPriority::Submitted)
280 .await
281 .into_iter()
282 .map(|result| result.map(|outcome| outcome.hash()))
283 .collect())
284 }
285
286 async fn submit_one(
287 &self,
288 at: <Self::Block as BlockT>::Hash,
289 source: TransactionSource,
290 xt: TransactionFor<Self>,
291 ) -> Result<TxHash<Self>, Self::Error> {
292 let pool = self.pool.clone();
293 let xt = Arc::from(xt);
294
295 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
296
297 let number = self.api.resolve_block_number(at);
298 let at = HashAndNumber { hash: at, number: number? };
299 pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt)
300 .await
301 .map(|outcome| outcome.hash())
302 }
303
304 async fn submit_and_watch(
305 &self,
306 at: <Self::Block as BlockT>::Hash,
307 source: TransactionSource,
308 xt: TransactionFor<Self>,
309 ) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
310 let pool = self.pool.clone();
311 let xt = Arc::from(xt);
312
313 self.metrics.report(|metrics| metrics.submitted_transactions.inc());
314
315 let number = self.api.resolve_block_number(at);
316
317 let at = HashAndNumber { hash: at, number: number? };
318 pool.submit_and_watch(
319 &at,
320 TimedTransactionSource::from_transaction_source(source, false),
321 xt,
322 )
323 .await
324 .map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
325 }
326
327 async fn report_invalid(
328 &self,
329 _at: Option<<Self::Block as BlockT>::Hash>,
330 invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
331 ) -> Vec<Arc<Self::InPoolTransaction>> {
332 let hashes = invalid_tx_errors.keys().map(|h| *h).collect::<Vec<_>>();
333 let removed = self.pool.validated_pool().remove_invalid(&hashes);
334 self.metrics
335 .report(|metrics| metrics.validations_invalid.inc_by(removed.len() as u64));
336 removed
337 }
338
339 fn status(&self) -> PoolStatus {
340 self.pool.validated_pool().status()
341 }
342
343 fn import_notification_stream(&self) -> ImportNotificationStream<TxHash<Self>> {
344 self.pool.validated_pool().import_notification_stream()
345 }
346
347 fn hash_of(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
348 self.pool.hash_of(xt)
349 }
350
351 fn on_broadcasted(&self, propagations: HashMap<TxHash<Self>, Vec<String>>) {
352 self.pool.validated_pool().on_broadcasted(propagations)
353 }
354
355 fn ready_transaction(&self, hash: &TxHash<Self>) -> Option<Arc<Self::InPoolTransaction>> {
356 self.pool.validated_pool().ready_by_hash(hash)
357 }
358
359 async fn ready_at(&self, at: <Self::Block as BlockT>::Hash) -> ReadyIteratorFor<PoolApi> {
360 let Ok(at) = self.api.resolve_block_number(at) else {
361 return Box::new(std::iter::empty()) as Box<_>;
362 };
363
364 let status = self.status();
365 if status.ready == 0 && status.future == 0 {
370 return Box::new(std::iter::empty()) as Box<_>;
371 }
372
373 if self.ready_poll.lock().updated_at() >= at {
374 trace!(
375 target: LOG_TARGET,
376 ?at,
377 "Transaction pool already processed block."
378 );
379 let iterator: ReadyIteratorFor<PoolApi> = Box::new(self.pool.validated_pool().ready());
380 return iterator;
381 }
382
383 let result = self.ready_poll.lock().add(at).map(|received| {
384 received.unwrap_or_else(|error| {
385 warn!(target: LOG_TARGET, ?error, "Error receiving pending set.");
386 Box::new(std::iter::empty())
387 })
388 });
389
390 result.await
391 }
392
393 fn ready(&self) -> ReadyIteratorFor<PoolApi> {
394 Box::new(self.pool.validated_pool().ready())
395 }
396
397 fn futures(&self) -> Vec<Self::InPoolTransaction> {
398 let pool = self.pool.validated_pool().pool.read();
399 pool.futures().cloned().collect::<Vec<_>>()
400 }
401
402 async fn ready_at_with_timeout(
403 &self,
404 at: <Self::Block as BlockT>::Hash,
405 timeout: std::time::Duration,
406 ) -> ReadyIteratorFor<PoolApi> {
407 self.ready_at_with_timeout_internal(at, timeout).await
408 }
409}
410
411impl<Block, Client> BasicPool<FullChainApi<Client, Block>, Block>
412where
413 Block: BlockT,
414 Client: subsoil::api::ProvideRuntimeApi<Block>
415 + soil_client::client_api::BlockBackend<Block>
416 + soil_client::client_api::blockchain::HeaderBackend<Block>
417 + subsoil::runtime::traits::BlockIdTo<Block>
418 + soil_client::client_api::ExecutorProvider<Block>
419 + soil_client::client_api::UsageProvider<Block>
420 + soil_client::blockchain::HeaderMetadata<Block, Error = soil_client::blockchain::Error>
421 + Send
422 + Sync
423 + 'static,
424 Client::Api: subsoil::txpool::runtime_api::TaggedTransactionQueue<Block>,
425{
426 pub fn new_full(
428 options: graph::Options,
429 is_validator: IsValidator,
430 prometheus: Option<&PrometheusRegistry>,
431 spawner: impl SpawnEssentialNamed,
432 client: Arc<Client>,
433 ) -> Self {
434 let pool_api = Arc::new(FullChainApi::new(client.clone(), prometheus, &spawner));
435 let pool = Self::with_revalidation_type(
436 options,
437 is_validator,
438 pool_api,
439 prometheus,
440 RevalidationType::Full,
441 spawner,
442 client.usage_info().chain.best_number,
443 client.usage_info().chain.best_hash,
444 client.usage_info().chain.finalized_hash,
445 );
446
447 pool
448 }
449}
450
451impl<Block, Client> soil_client::transaction_pool::LocalTransactionPool
452 for BasicPool<FullChainApi<Client, Block>, Block>
453where
454 Block: BlockT,
455 Client: subsoil::api::ProvideRuntimeApi<Block>
456 + soil_client::client_api::BlockBackend<Block>
457 + soil_client::client_api::blockchain::HeaderBackend<Block>
458 + subsoil::runtime::traits::BlockIdTo<Block>
459 + soil_client::blockchain::HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
460 Client: Send + Sync + 'static,
461 Client::Api: subsoil::txpool::runtime_api::TaggedTransactionQueue<Block>,
462{
463 type Block = Block;
464 type Hash = graph::ExtrinsicHash<FullChainApi<Client, Block>>;
465 type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
466
467 fn submit_local(
468 &self,
469 at: Block::Hash,
470 xt: soil_client::transaction_pool::LocalTransactionFor<Self>,
471 ) -> Result<Self::Hash, Self::Error> {
472 let validity = self
473 .api
474 .validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))?
475 .map_err(|e| {
476 Self::Error::Pool(match e {
477 TransactionValidityError::Invalid(i) => TxPoolError::InvalidTransaction(i),
478 TransactionValidityError::Unknown(u) => TxPoolError::UnknownTransaction(u),
479 })
480 })?;
481
482 let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
483 let block_number = self
484 .api
485 .block_id_to_number(&BlockId::hash(at))?
486 .ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;
487
488 let validated = ValidatedTransaction::valid_at(
489 block_number.saturated_into::<u64>(),
490 hash,
491 TimedTransactionSource::new_local(false),
492 Arc::from(xt),
493 bytes,
494 validity,
495 );
496
497 self.pool
498 .validated_pool()
499 .submit(vec![validated])
500 .remove(0)
501 .map(|outcome| outcome.hash())
502 }
503}
504
505#[cfg_attr(test, derive(Debug))]
506enum RevalidationStatus<N> {
507 NotScheduled,
509 Scheduled(Option<Instant>, Option<N>),
511 InProgress,
513}
514
515enum RevalidationStrategy<N> {
516 Always,
517 Light(RevalidationStatus<N>),
518}
519
520struct RevalidationAction {
521 revalidate: bool,
522 resubmit: bool,
523}
524
525impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
526 pub fn clear(&mut self) {
527 if let Self::Light(status) = self {
528 status.clear()
529 }
530 }
531
532 pub fn next(
533 &mut self,
534 block: N,
535 revalidate_time_period: Option<std::time::Duration>,
536 revalidate_block_period: Option<N>,
537 ) -> RevalidationAction {
538 match self {
539 Self::Light(status) => RevalidationAction {
540 revalidate: status.next_required(
541 block,
542 revalidate_time_period,
543 revalidate_block_period,
544 ),
545 resubmit: false,
546 },
547 Self::Always => RevalidationAction { revalidate: true, resubmit: true },
548 }
549 }
550}
551
552impl<N: Clone + Copy + AtLeast32Bit> RevalidationStatus<N> {
553 pub fn clear(&mut self) {
555 *self = Self::NotScheduled;
556 }
557
558 pub fn next_required(
560 &mut self,
561 block: N,
562 revalidate_time_period: Option<std::time::Duration>,
563 revalidate_block_period: Option<N>,
564 ) -> bool {
565 match *self {
566 Self::NotScheduled => {
567 *self = Self::Scheduled(
568 revalidate_time_period.map(|period| Instant::now() + period),
569 revalidate_block_period.map(|period| block + period),
570 );
571 false
572 },
573 Self::Scheduled(revalidate_at_time, revalidate_at_block) => {
574 let is_required =
575 revalidate_at_time.map(|at| Instant::now() >= at).unwrap_or(false)
576 || revalidate_at_block.map(|at| block >= at).unwrap_or(false);
577 if is_required {
578 *self = Self::InProgress;
579 }
580 is_required
581 },
582 Self::InProgress => false,
583 }
584 }
585}
586
587pub async fn prune_known_txs_for_block<
591 Block: BlockT,
592 Api: graph::ChainApi<Block = Block>,
593 L: EventHandler<Api>,
594>(
595 at: &HashAndNumber<Block>,
596 api: &Api,
597 pool: &graph::Pool<Api, L>,
598 extrinsics: Option<Vec<RawExtrinsicFor<Api>>>,
599 known_provides_tags: Option<Arc<HashMap<ExtrinsicHash<Api>, Vec<Tag>>>>,
600) -> Vec<ExtrinsicHash<Api>> {
601 let extrinsics = match extrinsics {
602 Some(xts) => xts,
603 None => api
604 .block_body(at.hash)
605 .await
606 .unwrap_or_else(|error| {
607 warn!(target: LOG_TARGET, ?error, "Prune known transactions: error request.");
608 None
609 })
610 .unwrap_or_default(),
611 };
612
613 let hashes = extrinsics.iter().map(|tx| pool.hash_of(tx)).collect::<Vec<_>>();
614
615 let header = match api.block_header(at.hash) {
616 Ok(Some(h)) => h,
617 Ok(None) => {
618 trace!(target: LOG_TARGET, hash = ?at.hash, "Could not find header.");
619 return hashes;
620 },
621 Err(error) => {
622 trace!(target: LOG_TARGET, hash = ?at.hash, ?error, "Error retrieving header.");
623 return hashes;
624 },
625 };
626
627 log_xt_trace!(target: LOG_TARGET, &hashes, "Pruning transaction.");
628
629 pool.prune(at, *header.parent_hash(), &extrinsics, known_provides_tags).await;
630 hashes
631}
632
633impl<PoolApi, Block> BasicPool<PoolApi, Block>
634where
635 Block: BlockT,
636 PoolApi: 'static + graph::ChainApi<Block = Block>,
637{
638 async fn handle_enactment(&self, tree_route: TreeRoute<Block>) {
642 trace!(target: LOG_TARGET, ?tree_route, "handle_enactment tree_route.");
643 let pool = self.pool.clone();
644 let api = self.api.clone();
645
646 let hash_and_number = match tree_route.last() {
647 Some(hash_and_number) => hash_and_number,
648 None => {
649 warn!(target: LOG_TARGET, ?tree_route, "Skipping ChainEvent - no last block in tree route.");
650 return;
651 },
652 };
653
654 let next_action = self.revalidation_strategy.lock().next(
655 hash_and_number.number,
656 Some(std::time::Duration::from_secs(60)),
657 Some(20u32.into()),
658 );
659
660 let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();
663
664 for retracted in tree_route.retracted() {
670 pool.validated_pool().on_block_retracted(retracted.hash);
672 }
673
674 future::join_all(
675 tree_route
676 .enacted()
677 .iter()
678 .map(|h| prune_known_txs_for_block(h, &*api, &*pool, None, None)),
679 )
680 .await
681 .into_iter()
682 .for_each(|enacted_log| {
683 pruned_log.extend(enacted_log);
684 });
685
686 self.metrics
687 .report(|metrics| metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64));
688
689 if next_action.resubmit {
690 let mut resubmit_transactions = Vec::new();
691
692 for retracted in tree_route.retracted() {
693 let hash = retracted.hash;
694
695 let block_transactions = api
696 .block_body(hash)
697 .await
698 .unwrap_or_else(|error| {
699 warn!(target: LOG_TARGET, ?error, "Failed to fetch block body.");
700 None
701 })
702 .unwrap_or_default()
703 .into_iter();
704
705 let mut resubmitted_to_report = 0;
706
707 resubmit_transactions.extend(
708 block_transactions.into_iter().map(Arc::from).filter_map(|tx| {
710 let tx_hash = pool.hash_of(&tx);
711 let contains = pruned_log.contains(&tx_hash);
712
713 resubmitted_to_report += 1;
715
716 if !contains {
717 trace!(target: LOG_TARGET, ?tx_hash, ?hash, "Resubmitting from retracted block.");
718 Some((
719 TimedTransactionSource::new_external(false),
722 tx,
723 ))
724 } else {
725 None
726 }
727 }),
728 );
729
730 self.metrics.report(|metrics| {
731 metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
732 });
733 }
734
735 pool.resubmit_at(
736 &hash_and_number,
737 resubmit_transactions,
738 ValidateTransactionPriority::Submitted,
739 )
740 .await;
741 }
742
743 let extra_pool = pool.clone();
744 self.ready_poll
747 .lock()
748 .trigger(hash_and_number.number, move || Box::new(extra_pool.validated_pool().ready()));
749
750 if next_action.revalidate {
751 let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
752 self.revalidation_queue.revalidate_later(hash_and_number.hash, hashes).await;
753
754 self.revalidation_strategy.lock().clear();
755 }
756 }
757}
758
759#[async_trait]
760impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
761where
762 Block: BlockT,
763 PoolApi: 'static + graph::ChainApi<Block = Block>,
764{
765 async fn maintain(&self, event: ChainEvent<Self::Block>) {
766 let prev_finalized_block = self.enactment_state.lock().recent_finalized_block();
767 let compute_tree_route = |from, to| -> Result<TreeRoute<Block>, String> {
768 match self.api.tree_route(from, to) {
769 Ok(tree_route) => Ok(tree_route),
770 Err(e) => {
771 return Err(format!(
772 "Error occurred while computing tree_route from {from:?} to {to:?}: {e}"
773 ))
774 },
775 }
776 };
777 let block_id_to_number =
778 |hash| self.api.block_id_to_number(&BlockId::Hash(hash)).map_err(|e| format!("{}", e));
779
780 let result =
781 self.enactment_state
782 .lock()
783 .update(&event, &compute_tree_route, &block_id_to_number);
784
785 match result {
786 Err(error) => {
787 trace!(target: LOG_TARGET, %error, "enactment state update");
788 self.enactment_state.lock().force_update(&event);
789 },
790 Ok(EnactmentAction::Skip) => return,
791 Ok(EnactmentAction::HandleFinalization) => {},
792 Ok(EnactmentAction::HandleEnactment(tree_route)) => {
793 self.handle_enactment(tree_route).await;
794 },
795 };
796
797 if let ChainEvent::Finalized { hash, tree_route } = event {
798 trace!(
799 target: LOG_TARGET,
800 ?tree_route,
801 ?prev_finalized_block,
802 "on-finalized enacted"
803 );
804
805 for hash in tree_route.iter().chain(std::iter::once(&hash)) {
806 if let Err(error) = self.pool.validated_pool().on_block_finalized(*hash).await {
807 warn!(
808 target: LOG_TARGET,
809 ?hash,
810 ?error,
811 "Error occurred while attempting to notify watchers about finalization"
812 );
813 }
814 }
815 }
816 }
817}