#![warn(missing_docs)]
use std::{
	collections::{hash_map, HashMap},
	fmt::{self, Debug},
	pin::Pin,
	sync::Arc,
	time::Duration,
};
use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::{Block, BlockNumber, Hash};
use self::messages::{BitfieldSigningMessage, PvfCheckerMessage};
use polkadot_node_subsystem_types::messages::{
	ApprovalDistributionMessage, ApprovalVotingMessage, AvailabilityDistributionMessage,
	AvailabilityRecoveryMessage, AvailabilityStoreMessage, BitfieldDistributionMessage,
	CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage,
	CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage,
	DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage,
	NetworkBridgeTxMessage, ProspectiveParachainsMessage, ProvisionerMessage, RuntimeApiMessage,
	StatementDistributionMessage,
};
pub use polkadot_node_subsystem_types::{
	errors::{SubsystemError, SubsystemResult},
	jaeger, ActivatedLeaf, ActiveLeavesUpdate, ChainApiBackend, OverseerSignal,
	RuntimeApiSubsystemClient, UnpinHandle,
};
pub mod metrics;
pub use self::metrics::Metrics as OverseerMetrics;
pub mod dummy;
pub use self::dummy::DummySubsystem;
pub use polkadot_node_metrics::{
	metrics::{prometheus, Metrics as MetricsTrait},
	Metronome,
};
pub use orchestra as gen;
pub use orchestra::{
	contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket,
	OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
	SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
	SubsystemSender, TimeoutExt, ToOrchestra, TrySendError,
};
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
mod memory_stats;
#[cfg(test)]
mod tests;
use sp_core::traits::SpawnNamed;
pub struct SpawnGlue<S>(pub S);
impl<S> AsRef<S> for SpawnGlue<S> {
	fn as_ref(&self) -> &S {
		&self.0
	}
}
impl<S: Clone> Clone for SpawnGlue<S> {
	fn clone(&self) -> Self {
		Self(self.0.clone())
	}
}
impl<S: SpawnNamed + Clone + Send + Sync> crate::gen::Spawner for SpawnGlue<S> {
	fn spawn_blocking(
		&self,
		name: &'static str,
		group: Option<&'static str>,
		future: futures::future::BoxFuture<'static, ()>,
	) {
		SpawnNamed::spawn_blocking(&self.0, name, group, future)
	}
	fn spawn(
		&self,
		name: &'static str,
		group: Option<&'static str>,
		future: futures::future::BoxFuture<'static, ()>,
	) {
		SpawnNamed::spawn(&self.0, name, group, future)
	}
}
#[async_trait::async_trait]
pub trait HeadSupportsParachains {
	async fn head_supports_parachains(&self, head: &Hash) -> bool;
}
#[async_trait::async_trait]
impl<Client> HeadSupportsParachains for Arc<Client>
where
	Client: RuntimeApiSubsystemClient + Sync + Send,
{
	async fn head_supports_parachains(&self, head: &Hash) -> bool {
		self.api_version_parachain_host(*head).await.ok().flatten().unwrap_or(0) >= 1
	}
}
#[derive(Clone)]
pub struct Handle(OverseerHandle);
impl Handle {
	pub fn new(raw: OverseerHandle) -> Self {
		Self(raw)
	}
	pub async fn block_imported(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockImported(block)).await
	}
	pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
		self.send_and_log_error(Event::MsgToSubsystem { msg: msg.into(), origin }).await
	}
	#[inline(always)]
	pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
		self.send_msg(msg, "").await
	}
	pub async fn block_finalized(&mut self, block: BlockInfo) {
		self.send_and_log_error(Event::BlockFinalized(block)).await
	}
	pub async fn wait_for_activation(
		&mut self,
		hash: Hash,
		response_channel: oneshot::Sender<SubsystemResult<()>>,
	) {
		self.send_and_log_error(Event::ExternalRequest(ExternalRequest::WaitForActivation {
			hash,
			response_channel,
		}))
		.await;
	}
	pub async fn stop(&mut self) {
		self.send_and_log_error(Event::Stop).await;
	}
	async fn send_and_log_error(&mut self, event: Event) {
		if self.0.send(event).await.is_err() {
			gum::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
		}
	}
}
#[derive(Debug, Clone)]
pub struct BlockInfo {
	pub hash: Hash,
	pub parent_hash: Hash,
	pub number: BlockNumber,
	pub unpin_handle: UnpinHandle,
}
impl From<BlockImportNotification<Block>> for BlockInfo {
	fn from(n: BlockImportNotification<Block>) -> Self {
		let hash = n.hash;
		let parent_hash = n.header.parent_hash;
		let number = n.header.number;
		let unpin_handle = n.into_unpin_handle();
		BlockInfo { hash, parent_hash, number, unpin_handle }
	}
}
impl From<FinalityNotification<Block>> for BlockInfo {
	fn from(n: FinalityNotification<Block>) -> Self {
		let hash = n.hash;
		let parent_hash = n.header.parent_hash;
		let number = n.header.number;
		let unpin_handle = n.into_unpin_handle();
		BlockInfo { hash, parent_hash, number, unpin_handle }
	}
}
pub enum Event {
	BlockImported(BlockInfo),
	BlockFinalized(BlockInfo),
	MsgToSubsystem {
		msg: AllMessages,
		origin: &'static str,
	},
	ExternalRequest(ExternalRequest),
	Stop,
}
pub enum ExternalRequest {
	WaitForActivation {
		hash: Hash,
		response_channel: oneshot::Sender<SubsystemResult<()>>,
	},
}
pub async fn forward_events<P: BlockchainEvents<Block>>(client: Arc<P>, mut handle: Handle) {
	let mut finality = client.finality_notification_stream();
	let mut imports = client.import_notification_stream();
	loop {
		select! {
			f = finality.next() => {
				match f {
					Some(block) => {
						handle.block_finalized(block.into()).await;
					}
					None => break,
				}
			},
			i = imports.next() => {
				match i {
					Some(block) => {
						handle.block_imported(block.into()).await;
					}
					None => break,
				}
			},
			complete => break,
		}
	}
}
#[orchestra(
	gen=AllMessages,
	event=Event,
	signal=OverseerSignal,
	error=SubsystemError,
	message_capacity=2048,
)]
pub struct Overseer<SupportsParachains> {
	#[subsystem(CandidateValidationMessage, sends: [
		RuntimeApiMessage,
	])]
	candidate_validation: CandidateValidation,
	#[subsystem(sends: [
		CandidateValidationMessage,
		RuntimeApiMessage,
	])]
	pvf_checker: PvfChecker,
	#[subsystem(CandidateBackingMessage, sends: [
		CandidateValidationMessage,
		CollatorProtocolMessage,
		ChainApiMessage,
		AvailabilityDistributionMessage,
		AvailabilityStoreMessage,
		StatementDistributionMessage,
		ProvisionerMessage,
		RuntimeApiMessage,
		ProspectiveParachainsMessage,
	])]
	candidate_backing: CandidateBacking,
	#[subsystem(StatementDistributionMessage, sends: [
		NetworkBridgeTxMessage,
		CandidateBackingMessage,
		RuntimeApiMessage,
		ProspectiveParachainsMessage,
		ChainApiMessage,
	])]
	statement_distribution: StatementDistribution,
	#[subsystem(AvailabilityDistributionMessage, sends: [
		AvailabilityStoreMessage,
		ChainApiMessage,
		RuntimeApiMessage,
		NetworkBridgeTxMessage,
	])]
	availability_distribution: AvailabilityDistribution,
	#[subsystem(AvailabilityRecoveryMessage, sends: [
		NetworkBridgeTxMessage,
		RuntimeApiMessage,
		AvailabilityStoreMessage,
	])]
	availability_recovery: AvailabilityRecovery,
	#[subsystem(blocking, sends: [
		AvailabilityStoreMessage,
		RuntimeApiMessage,
		BitfieldDistributionMessage,
	])]
	bitfield_signing: BitfieldSigning,
	#[subsystem(BitfieldDistributionMessage, sends: [
		RuntimeApiMessage,
		NetworkBridgeTxMessage,
		ProvisionerMessage,
	])]
	bitfield_distribution: BitfieldDistribution,
	#[subsystem(ProvisionerMessage, sends: [
		RuntimeApiMessage,
		CandidateBackingMessage,
		ChainApiMessage,
		DisputeCoordinatorMessage,
		ProspectiveParachainsMessage,
	])]
	provisioner: Provisioner,
	#[subsystem(blocking, RuntimeApiMessage, sends: [])]
	runtime_api: RuntimeApi,
	#[subsystem(blocking, AvailabilityStoreMessage, sends: [
		ChainApiMessage,
		RuntimeApiMessage,
	])]
	availability_store: AvailabilityStore,
	#[subsystem(blocking, NetworkBridgeRxMessage, sends: [
		BitfieldDistributionMessage,
		StatementDistributionMessage,
		ApprovalDistributionMessage,
		GossipSupportMessage,
		DisputeDistributionMessage,
		CollationGenerationMessage,
		CollatorProtocolMessage,
	])]
	network_bridge_rx: NetworkBridgeRx,
	#[subsystem(blocking, NetworkBridgeTxMessage, sends: [])]
	network_bridge_tx: NetworkBridgeTx,
	#[subsystem(blocking, ChainApiMessage, sends: [])]
	chain_api: ChainApi,
	#[subsystem(CollationGenerationMessage, sends: [
		RuntimeApiMessage,
		CollatorProtocolMessage,
	])]
	collation_generation: CollationGeneration,
	#[subsystem(CollatorProtocolMessage, sends: [
		NetworkBridgeTxMessage,
		RuntimeApiMessage,
		CandidateBackingMessage,
		ChainApiMessage,
		ProspectiveParachainsMessage,
	])]
	collator_protocol: CollatorProtocol,
	#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
		NetworkBridgeTxMessage,
		ApprovalVotingMessage,
	])]
	approval_distribution: ApprovalDistribution,
	#[subsystem(blocking, ApprovalVotingMessage, sends: [
		ApprovalDistributionMessage,
		AvailabilityRecoveryMessage,
		CandidateValidationMessage,
		ChainApiMessage,
		ChainSelectionMessage,
		DisputeCoordinatorMessage,
		RuntimeApiMessage,
	])]
	approval_voting: ApprovalVoting,
	#[subsystem(GossipSupportMessage, sends: [
		NetworkBridgeTxMessage,
		NetworkBridgeRxMessage, RuntimeApiMessage,
		ChainSelectionMessage,
	])]
	gossip_support: GossipSupport,
	#[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [
		RuntimeApiMessage,
		ChainApiMessage,
		DisputeDistributionMessage,
		CandidateValidationMessage,
		ApprovalVotingMessage,
		AvailabilityStoreMessage,
		AvailabilityRecoveryMessage,
		ChainSelectionMessage,
	])]
	dispute_coordinator: DisputeCoordinator,
	#[subsystem(DisputeDistributionMessage, sends: [
		RuntimeApiMessage,
		DisputeCoordinatorMessage,
		NetworkBridgeTxMessage,
	])]
	dispute_distribution: DisputeDistribution,
	#[subsystem(blocking, ChainSelectionMessage, sends: [ChainApiMessage])]
	chain_selection: ChainSelection,
	#[subsystem(ProspectiveParachainsMessage, sends: [
		RuntimeApiMessage,
		ChainApiMessage,
	])]
	prospective_parachains: ProspectiveParachains,
	pub activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<SubsystemResult<()>>>>,
	pub span_per_active_leaf: HashMap<Hash, Arc<jaeger::Span>>,
	pub active_leaves: HashMap<Hash, BlockNumber>,
	pub supports_parachains: SupportsParachains,
	pub metrics: OverseerMetrics,
}
pub fn spawn_metronome_metrics<S, SupportsParachains>(
	overseer: &mut Overseer<S, SupportsParachains>,
	metronome_metrics: OverseerMetrics,
) -> Result<(), SubsystemError>
where
	S: Spawner,
	SupportsParachains: HeadSupportsParachains,
{
	struct ExtractNameAndMeters;
	impl<'a, T: 'a> MapSubsystem<&'a OrchestratedSubsystem<T>> for ExtractNameAndMeters {
		type Output = Option<(&'static str, SubsystemMeters)>;
		fn map_subsystem(&self, subsystem: &'a OrchestratedSubsystem<T>) -> Self::Output {
			subsystem
				.instance
				.as_ref()
				.map(|instance| (instance.name, instance.meters.clone()))
		}
	}
	let subsystem_meters = overseer.map_subsystems(ExtractNameAndMeters);
	#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> =
		match memory_stats::MemoryAllocationTracker::new() {
			Ok(memory_stats) =>
				Box::new(move |metrics: &OverseerMetrics| match memory_stats.snapshot() {
					Ok(memory_stats_snapshot) => {
						gum::trace!(
							target: LOG_TARGET,
							"memory_stats: {:?}",
							&memory_stats_snapshot
						);
						metrics.memory_stats_snapshot(memory_stats_snapshot);
					},
					Err(e) =>
						gum::debug!(target: LOG_TARGET, "Failed to obtain memory stats: {:?}", e),
				}),
			Err(_) => {
				gum::debug!(
					target: LOG_TARGET,
					"Memory allocation tracking is not supported by the allocator.",
				);
				Box::new(|_| {})
			},
		};
	#[cfg(not(any(target_os = "linux", feature = "jemalloc-allocator")))]
	let collect_memory_stats: Box<dyn Fn(&OverseerMetrics) + Send> = Box::new(|_| {});
	let metronome = Metronome::new(std::time::Duration::from_millis(950)).for_each(move |_| {
		collect_memory_stats(&metronome_metrics);
		metronome_metrics.channel_metrics_snapshot(
			subsystem_meters
				.iter()
				.cloned()
				.flatten()
				.map(|(name, ref meters)| (name, meters.read())),
		);
		futures::future::ready(())
	});
	overseer
		.spawner()
		.spawn("metrics-metronome", Some("overseer"), Box::pin(metronome));
	Ok(())
}
impl<S, SupportsParachains> Overseer<S, SupportsParachains>
where
	SupportsParachains: HeadSupportsParachains,
	S: Spawner,
{
	async fn stop(mut self) {
		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
	}
	pub async fn run(self) {
		if let Err(err) = self.run_inner().await {
			gum::error!(target: LOG_TARGET, ?err, "Overseer exited with error");
		}
	}
	async fn run_inner(mut self) -> SubsystemResult<()> {
		let metrics = self.metrics.clone();
		spawn_metronome_metrics(&mut self, metrics)?;
		loop {
			select! {
				msg = self.events_rx.select_next_some() => {
					match msg {
						Event::MsgToSubsystem { msg, origin } => {
							self.route_message(msg.into(), origin).await?;
							self.metrics.on_message_relayed();
						}
						Event::Stop => {
							self.stop().await;
							return Ok(());
						}
						Event::BlockImported(block) => {
							self.block_imported(block).await?;
						}
						Event::BlockFinalized(block) => {
							self.block_finalized(block).await?;
						}
						Event::ExternalRequest(request) => {
							self.handle_external_request(request);
						}
					}
				},
				msg = self.to_orchestra_rx.select_next_some() => {
					match msg {
						ToOrchestra::SpawnJob { name, subsystem, s } => {
							self.spawn_job(name, subsystem, s);
						}
						ToOrchestra::SpawnBlockingJob { name, subsystem, s } => {
							self.spawn_blocking_job(name, subsystem, s);
						}
					}
				},
				res = self.running_subsystems.select_next_some() => {
					gum::error!(
						target: LOG_TARGET,
						subsystem = ?res,
						"subsystem finished unexpectedly",
					);
					self.stop().await;
					return res;
				},
			}
		}
	}
	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
		match self.active_leaves.entry(block.hash) {
			hash_map::Entry::Vacant(entry) => entry.insert(block.number),
			hash_map::Entry::Occupied(entry) => {
				debug_assert_eq!(*entry.get(), block.number);
				return Ok(())
			},
		};
		let mut update = match self.on_head_activated(&block.hash, Some(block.parent_hash)).await {
			Some(span) => ActiveLeavesUpdate::start_work(ActivatedLeaf {
				hash: block.hash,
				number: block.number,
				unpin_handle: block.unpin_handle,
				span,
			}),
			None => ActiveLeavesUpdate::default(),
		};
		if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
			debug_assert_eq!(block.number.saturating_sub(1), number);
			update.deactivated.push(block.parent_hash);
			self.on_head_deactivated(&block.parent_hash);
		}
		self.clean_up_external_listeners();
		if !update.is_empty() {
			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
		}
		Ok(())
	}
	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
		let mut update = ActiveLeavesUpdate::default();
		self.active_leaves.retain(|h, n| {
			if *n <= block.number && *h != block.hash {
				update.deactivated.push(*h);
				false
			} else {
				true
			}
		});
		for deactivated in &update.deactivated {
			self.on_head_deactivated(deactivated)
		}
		self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number))
			.await?;
		if !update.is_empty() {
			self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
		}
		Ok(())
	}
	async fn on_head_activated(
		&mut self,
		hash: &Hash,
		parent_hash: Option<Hash>,
	) -> Option<Arc<jaeger::Span>> {
		if !self.supports_parachains.head_supports_parachains(hash).await {
			return None
		}
		self.metrics.on_head_activated();
		if let Some(listeners) = self.activation_external_listeners.remove(hash) {
			gum::trace!(
				target: LOG_TARGET,
				relay_parent = ?hash,
				"Leaf got activated, notifying exterinal listeners"
			);
			for listener in listeners {
				let _ = listener.send(Ok(()));
			}
		}
		let mut span = jaeger::Span::new(*hash, "leaf-activated");
		if let Some(parent_span) = parent_hash.and_then(|h| self.span_per_active_leaf.get(&h)) {
			span.add_follows_from(parent_span);
		}
		let span = Arc::new(span);
		self.span_per_active_leaf.insert(*hash, span.clone());
		Some(span)
	}
	fn on_head_deactivated(&mut self, hash: &Hash) {
		self.metrics.on_head_deactivated();
		self.activation_external_listeners.remove(hash);
		self.span_per_active_leaf.remove(hash);
	}
	fn clean_up_external_listeners(&mut self) {
		self.activation_external_listeners.retain(|_, v| {
			v.retain(|c| !c.is_canceled());
			!v.is_empty()
		})
	}
	fn handle_external_request(&mut self, request: ExternalRequest) {
		match request {
			ExternalRequest::WaitForActivation { hash, response_channel } => {
				if self.active_leaves.get(&hash).is_some() {
					gum::trace!(
						target: LOG_TARGET,
						relay_parent = ?hash,
						"Leaf was already ready - answering `WaitForActivation`"
					);
					let _ = response_channel.send(Ok(()));
				} else {
					gum::trace!(
						target: LOG_TARGET,
						relay_parent = ?hash,
						"Leaf not yet ready - queuing `WaitForActivation` sender"
					);
					self.activation_external_listeners
						.entry(hash)
						.or_default()
						.push(response_channel);
				}
			},
		}
	}
	fn spawn_job(
		&mut self,
		task_name: &'static str,
		subsystem_name: Option<&'static str>,
		j: BoxFuture<'static, ()>,
	) {
		self.spawner.spawn(task_name, subsystem_name, j);
	}
	fn spawn_blocking_job(
		&mut self,
		task_name: &'static str,
		subsystem_name: Option<&'static str>,
		j: BoxFuture<'static, ()>,
	) {
		self.spawner.spawn_blocking(task_name, subsystem_name, j);
	}
}