1#![warn(missing_docs)]
63#![allow(dead_code, irrefutable_let_patterns)]
65
66use std::{
67 collections::{hash_map, HashMap},
68 fmt::{self, Debug},
69 pin::Pin,
70 sync::Arc,
71 time::Duration,
72};
73
74use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
75
76use pezkuwi_primitives::{Block, BlockNumber, Hash};
77use pezsc_client_api::{BlockImportNotification, BlockchainEvents, FinalityNotification};
78
79use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
80use pezkuwi_node_subsystem_types::messages::{
81 ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage,
82 AvailabilityDistributionMessage, AvailabilityRecoveryMessage, AvailabilityStoreMessage,
83 BitfieldDistributionMessage, CandidateBackingMessage, CandidateValidationMessage,
84 ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage,
85 DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage,
86 NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveTeyrchainsMessage,
87 ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage,
88};
89
90pub use pezkuwi_node_subsystem_types::{
91 errors::{SubsystemError, SubsystemResult},
92 ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal, RuntimeApiSubsystemClient,
93 UnpinHandle,
94};
95
96pub mod metrics;
97pub use self::metrics::Metrics as OverseerMetrics;
98
99pub mod dummy;
101pub use self::dummy::DummySubsystem;
102
103pub use pezkuwi_node_metrics::{
104 metrics::{prometheus, Metrics as MetricsTrait},
105 Metronome,
106};
107
108pub use orchestra as gen;
109pub use orchestra::{
110 contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
111 NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
112 Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
113 SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
114 TrySendError,
115};
116
117#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
118mod memory_stats;
119#[cfg(test)]
120mod tests;
121
122use pezsp_core::traits::SpawnNamed;
123
124pub struct SpawnGlue<S>(pub S);
126
127impl<S> AsRef<S> for SpawnGlue<S> {
128 fn as_ref(&self) -> &S {
129 &self.0
130 }
131}
132
133impl<S: Clone> Clone for SpawnGlue<S> {
134 fn clone(&self) -> Self {
135 Self(self.0.clone())
136 }
137}
138
139impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
140 fn spawn_blocking(
141 &self,
142 name: &'static str,
143 group: Option<&'static str>,
144 future: futures::future::BoxFuture<'static, ()>,
145 ) {
146 SpawnNamed::spawn_blocking(&self.0, name, group, future)
147 }
148 fn spawn(
149 &self,
150 name: &'static str,
151 group: Option<&'static str>,
152 future: futures::future::BoxFuture<'static, ()>,
153 ) {
154 SpawnNamed::spawn(&self.0, name, group, future)
155 }
156}
157
158#[async_trait::async_trait]
160pub trait HeadSupportsTeyrchains {
161 async fn head_supports_teyrchains(&self, head: &Hash) -> bool;
163}
164
165#[async_trait::async_trait]
166impl<Client> HeadSupportsTeyrchains for Arc<Client>
167where
168 Client: RuntimeApiSubsystemClient + Sync + Send,
169{
170 async fn head_supports_teyrchains(&self, head: &Hash) -> bool {
171 self.api_version_teyrchain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
173 }
174}
175
176#[derive(Clone)]
180pub struct Handle(OverseerHandle);
181
182impl Handle {
183 pub fn new(raw: OverseerHandle) -> Self {
185 Self(raw)
186 }
187
188 pub async fn block_imported(&mut self, block: BlockInfo) {
190 self.send_and_log_error(Event::BlockImported(block)).await
191 }
192
193 pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
195 self.send_msg_with_priority(msg, origin, PriorityLevel::Normal).await
196 }
197
198 pub async fn send_msg_with_priority(
200 &mut self,
201 msg: impl Into<AllMessages>,
202 origin: &'static str,
203 priority: PriorityLevel,
204 ) {
205 self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin, priority })
206 .await
207 }
208
209 #[inline(always)]
211 pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
212 self.send_msg(msg, "").await
213 }
214
215 pub async fn block_finalized(&mut self, block: BlockInfo) {
217 self.send_and_log_error(Event::BlockFinalized(block)).await
218 }
219
220 pub async fn wait_for_activation(
227 &mut self,
228 hash: Hash,
229 response_channel: oneshot::Sender<SubsystemResult<()>>,
230 ) {
231 self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
232 hash,
233 response_channel,
234 }))
235 .await;
236 }
237
238 pub async fn stop(&mut self) {
240 self.send_and_log_error(Event::Stop).await;
241 }
242
243 async fn send_and_log_error(&mut self, event: Event) {
245 if self.0.send(event).await.is_err() {
246 gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
247 }
248 }
249}
250
251#[derive(Debug, Clone)]
258pub struct BlockInfo {
259 pub hash: Hash,
261 pub parent_hash: Hash,
263 pub number: BlockNumber,
265 pub unpin_handle: UnpinHandle,
267}
268
269impl From<BlockImportNotification<Block>> for BlockInfo {
270 fn from(n: BlockImportNotification<Block>) -> Self {
271 let hash = n.hash;
272 let parent_hash = n.header.parent_hash;
273 let number = n.header.number;
274 let unpin_handle = n.into_unpin_handle();
275
276 BlockInfo { hash, parent_hash, number, unpin_handle }
277 }
278}
279
280impl From<FinalityNotification<Block>> for BlockInfo {
281 fn from(n: FinalityNotification<Block>) -> Self {
282 let hash = n.hash;
283 let parent_hash = n.header.parent_hash;
284 let number = n.header.number;
285 let unpin_handle = n.into_unpin_handle();
286
287 BlockInfo { hash, parent_hash, number, unpin_handle }
288 }
289}
290
291#[derive(Debug)]
294pub enum Event {
295 BlockImported(BlockInfo),
302 BlockFinalized(BlockInfo),
304 MsgToSubsystem {
306 msg: AllMessages,
308 origin: &'static str,
310 priority: PriorityLevel,
312 },
313 ExternalRequest(ExternalRequest),
315 Stop,
317}
318
319#[derive(Debug)]
321pub enum ExternalRequest {
322 WaitForActivation {
325 hash: Hash,
327 response_channel: oneshot::Sender<SubsystemResult<()>>,
329 },
330}
331
332pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
335 let mut finality = client.finality_notification_stream();
336 let mut imports = client.import_notification_stream();
337
338 loop {
339 select! {
340 f = finality.next() => {
341 match f {
342 Some(block) => {
343 handle.block_finalized(block.into()).await;
344 }
345 None => break,
346 }
347 },
348 i = imports.next() => {
349 match i {
350 Some(block) => {
351 handle.block_imported(block.into()).await;
352 }
353 None => break,
354 }
355 },
356 complete => break,
357 }
358 }
359}
360
361#[orchestra(
477 gen=AllMessages,
478 event=Event,
479 signal=OverseerSignal,
480 error=SubsystemError,
481 message_capacity=2048,
482)]
483pub struct Overseer<SupportsTeyrchains> {
484 #[subsystem(CandidateValidationMessage, sends: [
485 ChainApiMessage,
486 RuntimeApiMessage,
487 ])]
488 candidate_validation: CandidateValidation,
489
490 #[subsystem(sends: [
491 CandidateValidationMessage,
492 RuntimeApiMessage,
493 ])]
494 pvf_checker: PvfChecker,
495
496 #[subsystem(CandidateBackingMessage, sends: [
497 CandidateValidationMessage,
498 CollatorProtocolMessage,
499 ChainApiMessage,
500 AvailabilityDistributionMessage,
501 AvailabilityStoreMessage,
502 StatementDistributionMessage,
503 ProvisionerMessage,
504 RuntimeApiMessage,
505 ProspectiveTeyrchainsMessage,
506 ])]
507 candidate_backing: CandidateBacking,
508
509 #[subsystem(StatementDistributionMessage, sends: [
510 NetworkBridgeTxMessage,
511 CandidateBackingMessage,
512 RuntimeApiMessage,
513 ProspectiveTeyrchainsMessage,
514 ChainApiMessage,
515 ], can_receive_priority_messages)]
516 statement_distribution: StatementDistribution,
517
518 #[subsystem(AvailabilityDistributionMessage, sends: [
519 AvailabilityStoreMessage,
520 ChainApiMessage,
521 RuntimeApiMessage,
522 NetworkBridgeTxMessage,
523 ])]
524 availability_distribution: AvailabilityDistribution,
525
526 #[subsystem(AvailabilityRecoveryMessage, sends: [
527 NetworkBridgeTxMessage,
528 RuntimeApiMessage,
529 AvailabilityStoreMessage,
530 ])]
531 availability_recovery: AvailabilityRecovery,
532
533 #[subsystem(blocking, sends: [
534 AvailabilityStoreMessage,
535 RuntimeApiMessage,
536 BitfieldDistributionMessage,
537 ])]
538 bitfield_signing: BitfieldSigning,
539
540 #[subsystem(blocking, message_capacity: 8192, BitfieldDistributionMessage, sends: [
541 RuntimeApiMessage,
542 NetworkBridgeTxMessage,
543 ProvisionerMessage,
544 ], can_receive_priority_messages)]
545 bitfield_distribution: BitfieldDistribution,
546
547 #[subsystem(ProvisionerMessage, sends: [
548 RuntimeApiMessage,
549 CandidateBackingMessage,
550 DisputeCoordinatorMessage,
551 ProspectiveTeyrchainsMessage,
552 ChainApiMessage,
553 ])]
554 provisioner: Provisioner,
555
556 #[subsystem(blocking, RuntimeApiMessage, sends: [])]
557 runtime_api: RuntimeApi,
558
559 #[subsystem(blocking, AvailabilityStoreMessage, sends: [
560 ChainApiMessage,
561 RuntimeApiMessage,
562 ])]
563 availability_store: AvailabilityStore,
564
565 #[subsystem(blocking, NetworkBridgeRxMessage, sends: [
566 BitfieldDistributionMessage,
567 StatementDistributionMessage,
568 ApprovalVotingParallelMessage,
569 GossipSupportMessage,
570 DisputeDistributionMessage,
571 CollationGenerationMessage,
572 CollatorProtocolMessage,
573 ])]
574 network_bridge_rx: NetworkBridgeRx,
575
576 #[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
577 network_bridge_tx: NetworkBridgeTx,
578
579 #[subsystem(blocking, ChainApiMessage, sends: [])]
580 chain_api: ChainApi,
581
582 #[subsystem(CollationGenerationMessage, sends: [
583 RuntimeApiMessage,
584 CollatorProtocolMessage,
585 ])]
586 collation_generation: CollationGeneration,
587
588 #[subsystem(CollatorProtocolMessage, sends: [
589 NetworkBridgeTxMessage,
590 RuntimeApiMessage,
591 CandidateBackingMessage,
592 ChainApiMessage,
593 ProspectiveTeyrchainsMessage,
594 ])]
595 collator_protocol: CollatorProtocol,
596
597 #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
598 NetworkBridgeTxMessage,
599 ApprovalVotingMessage,
600 RuntimeApiMessage,
601 ], can_receive_priority_messages)]
602 approval_distribution: ApprovalDistribution,
603
604 #[subsystem(blocking, ApprovalVotingMessage, sends: [
605 ApprovalDistributionMessage,
606 AvailabilityRecoveryMessage,
607 CandidateValidationMessage,
608 ChainApiMessage,
609 ChainSelectionMessage,
610 DisputeCoordinatorMessage,
611 RuntimeApiMessage,
612 ])]
613 approval_voting: ApprovalVoting,
614 #[subsystem(blocking, message_capacity: 64000, ApprovalVotingParallelMessage, sends: [
615 AvailabilityRecoveryMessage,
616 CandidateValidationMessage,
617 ChainApiMessage,
618 ChainSelectionMessage,
619 DisputeCoordinatorMessage,
620 RuntimeApiMessage,
621 NetworkBridgeTxMessage,
622 ApprovalVotingParallelMessage,
623 ], can_receive_priority_messages)]
624 approval_voting_parallel: ApprovalVotingParallel,
625 #[subsystem(GossipSupportMessage, sends: [
626 NetworkBridgeTxMessage,
627 NetworkBridgeRxMessage, RuntimeApiMessage,
629 ChainSelectionMessage,
630 ChainApiMessage,
631 ], can_receive_priority_messages)]
632 gossip_support: GossipSupport,
633
634 #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
635 RuntimeApiMessage,
636 ChainApiMessage,
637 DisputeDistributionMessage,
638 CandidateValidationMessage,
639 AvailabilityStoreMessage,
640 AvailabilityRecoveryMessage,
641 ChainSelectionMessage,
642 ApprovalVotingParallelMessage,
643 ], can_receive_priority_messages)]
644 dispute_coordinator: DisputeCoordinator,
645
646 #[subsystem(DisputeDistributionMessage, sends: [
647 RuntimeApiMessage,
648 DisputeCoordinatorMessage,
649 NetworkBridgeTxMessage,
650 ])]
651 dispute_distribution: DisputeDistribution,
652
653 #[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
654 chain_selection: ChainSelection,
655
656 #[subsystem(ProspectiveTeyrchainsMessage, sends: [
657 RuntimeApiMessage,
658 ChainApiMessage,
659 ])]
660 prospective_teyrchains: ProspectiveTeyrchains,
661
662 pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
664
665 pub active_leaves: HashMap<Hash, BlockNumber>,
667
668 pub supports_teyrchains: SupportsTeyrchains,
670
671 pub metrics: OverseerMetrics,
673}
674
675pub fn spawn_metronome_metrics<S, SupportsTeyrchains>(
677 overseer: &mut Overseer<S, SupportsTeyrchains>,
678 metronome_metrics: OverseerMetrics,
679) -> Result<(), SubsystemError>
680where
681 S: Spawner,
682 SupportsTeyrchains: HeadSupportsTeyrchains,
683{
684 struct ExtractNameAndMeters;
685
686 impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
687 type Output = Option<(&'static str, SubsystemMeters)>;
688
689 fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
690 subsystem
691 .instance
692 .as_ref()
693 .map(|instance| (instance.name, instance.meters.clone()))
694 }
695 }
696 let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
697
698 #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
699 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
700 match memory_stats::MemoryAllocationTracker::new() {
701 Ok(memory_stats) => {
702 Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
703 Ok(memory_stats_snapshot) => {
704 gum::trace!(
705 target: LOG_TARGET,
706 "memory_stats: {:?}",
707 &memory_stats_snapshot
708 );
709 metrics.memory_stats_snapshot(memory_stats_snapshot);
710 },
711 Err(e) => {
712 gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e)
713 },
714 })
715 },
716 Err(_) => {
717 gum::debug!(
718 target: LOG_TARGET,
719 "Memory allocation tracking is not supported by the allocator.",
720 );
721
722 Box::new(|_| {})
723 },
724 };
725
726 #[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
727 let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
728
729 let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
730 collect_memory_stats(&metronome_metrics);
731
732 metronome_metrics.channel_metrics_snapshot(
736 subsystem_meters
737 .iter()
738 .cloned()
739 .flatten()
740 .map(|(name, ref meters)| (name, meters.read())),
741 );
742
743 futures::future::ready(())
744 });
745 overseer
746 .spawner()
747 .spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
748
749 Ok(())
750}
751
752impl<S, SupportsTeyrchains> Overseer<S, SupportsTeyrchains>
753where
754 SupportsTeyrchains: HeadSupportsTeyrchains,
755 S: Spawner,
756{
757 async fn stop(mut self) {
759 let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
760 }
761
762 pub async fn run(self) {
766 if let Err(err) = self.run_inner().await {
767 gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
768 }
769 }
770
771 async fn run_inner(mut self) -> SubsystemResult<()> {
772 let metrics = self.metrics.clone();
773 spawn_metronome_metrics(&mut self, metrics)?;
774
775 loop {
776 select! {
777 msg = self.events_rx.select_next_some() => {
778 match msg {
779 Event::MsgToSubsystem { msg, origin, priority } => {
780 match priority {
781 PriorityLevel::Normal => {
782 self.route_message(msg.into(), origin).await?;
783 },
784 PriorityLevel::High => {
785 self.route_message_with_priority::<HighPriority>(msg.into(), origin).await?;
786 },
787 }
788 self.metrics.on_message_relayed();
789 }
790 Event::Stop => {
791 self.stop().await;
792 return Ok(());
793 }
794 Event::BlockImported(block) => {
795 self.block_imported(block).await?;
796 }
797 Event::BlockFinalized(block) => {
798 self.block_finalized(block).await?;
799 }
800 Event::ExternalRequest(request) => {
801 self.handle_external_request(request);
802 }
803 }
804 },
805 msg = self.to_orchestra_rx.select_next_some() => {
806 match msg {
807 ToOrchestra::SpawnJob { name, subsystem, s } => {
808 self.spawn_job(name, subsystem, s);
809 }
810 ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
811 self.spawn_blocking_job(name, subsystem, s);
812 }
813 }
814 },
815 res = self.running_subsystems.select_next_some() => {
816 gum::error!(
817 target: LOG_TARGET,
818 subsystem = ?res,
819 "subsystem finished unexpectedly",
820 );
821 self.stop().await;
822 return res;
823 },
824 }
825 }
826 }
827
828 async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
829 match self.active_leaves.entry(block.hash) {
830 hash_map::Entry::Vacant(entry) => entry.insert(block.number),
831 hash_map::Entry::Occupied(entry) => {
832 debug_assert_eq!(*entry.get(), block.number);
833 return Ok(());
834 },
835 };
836
837 let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
838 Some(_) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
839 hash: block.hash,
840 number: block.number,
841 unpin_handle: block.unpin_handle,
842 }),
843 None => ActiveLeavesUpdate::default(),
844 };
845
846 if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
847 debug_assert_eq!(block.number.saturating_sub(1), number);
848 update.deactivated.push(block.parent_hash);
849 self.on_head_deactivated(&block.parent_hash);
850 }
851
852 self.clean_up_external_listeners();
853
854 if !update.is_empty() {
855 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
856 }
857 Ok(())
858 }
859
860 async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
861 let mut update = ActiveLeavesUpdate::default();
862
863 self.active_leaves.retain(|h, n| {
864 if *n <= block.number && *h != block.hash {
867 update.deactivated.push(*h);
868 false
869 } else {
870 true
871 }
872 });
873
874 for deactivated in &update.deactivated {
875 self.on_head_deactivated(deactivated)
876 }
877
878 self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
879 .await?;
880
881 if !update.is_empty() {
886 self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
887 }
888
889 Ok(())
890 }
891
892 async fn on_head_activated(&mut self, hash: &Hash, _parent_hash: Option<Hash>) -> Option<()> {
895 if !self.supports_teyrchains.head_supports_teyrchains(hash).await {
896 return None;
897 }
898
899 self.metrics.on_head_activated();
900 if let Some(listeners) = self.activation_external_listeners.remove(hash) {
901 gum::trace!(
902 target: LOG_TARGET,
903 relay_parent = ?hash,
904 "Leaf got activated, notifying external listeners"
905 );
906 for listener in listeners {
907 let _ = listener.send(Ok(()));
909 }
910 }
911
912 Some(())
913 }
914
915 fn on_head_deactivated(&mut self, hash: &Hash) {
916 self.metrics.on_head_deactivated();
917 self.activation_external_listeners.remove(hash);
918 }
919
920 fn clean_up_external_listeners(&mut self) {
921 self.activation_external_listeners.retain(|_, v| {
922 v.retain(|c| !c.is_canceled());
924 !v.is_empty()
925 })
926 }
927
928 fn handle_external_request(&mut self, request: ExternalRequest) {
929 match request {
930 ExternalRequest::WaitForActivation { hash, response_channel } => {
931 if self.active_leaves.get(&hash).is_some() {
932 gum::trace!(
933 target: LOG_TARGET,
934 relay_parent = ?hash,
935 "Leaf was already ready - answering `WaitForActivation`"
936 );
937 let _ = response_channel.send(Ok(()));
939 } else {
940 gum::trace!(
941 target: LOG_TARGET,
942 relay_parent = ?hash,
943 "Leaf not yet ready - queuing `WaitForActivation` sender"
944 );
945 self.activation_external_listeners
946 .entry(hash)
947 .or_default()
948 .push(response_channel);
949 }
950 },
951 }
952 }
953
954 fn spawn_job(
955 &mut self,
956 task_name: &'static str,
957 subsystem_name: Option<&'static str>,
958 j: BoxFuture<'static, ()>,
959 ) {
960 self.spawner.spawn(task_name, subsystem_name, j);
961 }
962
963 fn spawn_blocking_job(
964 &mut self,
965 task_name: &'static str,
966 subsystem_name: Option<&'static str>,
967 j: BoxFuture<'static, ()>,
968 ) {
969 self.spawner.spawn_blocking(task_name, subsystem_name, j);
970 }
971}