use super::*;
pub(in crate::app::runtime) struct ReceiverRuntime {
pub(in crate::app::runtime) static_receiver_handles: Vec<JoinHandle<()>>,
pub(in crate::app::runtime) gossip_receiver_handles: Vec<JoinHandle<()>>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) gossip_ingest_telemetry: Option<ingest::ReceiverTelemetry>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) gossip_runtime: Option<GossipRuntime>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) gossip_identity: Arc<Keypair>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) active_gossip_entrypoint: Option<String>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) gossip_runtime_primary_port_range: Option<PortRange>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) gossip_runtime_secondary_port_range: Option<PortRange>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) gossip_runtime_active_port_range: Option<PortRange>,
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) repair_client: Option<crate::repair::GossipRepairClient>,
pub(in crate::app::runtime) tx_event_rx: mpsc::Receiver<TxObservedEvent>,
}
#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) struct GossipRuntime {
pub(in crate::app::runtime) exit: Arc<AtomicBool>,
pub(in crate::app::runtime) gossip_service: Option<GossipService>,
pub(in crate::app::runtime) cluster_info: Arc<ClusterInfo>,
pub(in crate::app::runtime) ingest_telemetry: ingest::ReceiverTelemetry,
}
#[cfg(feature = "gossip-bootstrap")]
impl Drop for GossipRuntime {
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
if let Some(gossip_service) = self.gossip_service.take()
&& gossip_service.join().is_err()
{
}
}
}
impl Drop for ReceiverRuntime {
fn drop(&mut self) {
for handle in &self.static_receiver_handles {
handle.abort();
}
for handle in &self.gossip_receiver_handles {
handle.abort();
}
}
}
impl ReceiverRuntime {
#[cfg(feature = "kernel-bypass")]
#[must_use]
pub(in crate::app::runtime) fn external(tx_event_rx: mpsc::Receiver<TxObservedEvent>) -> Self {
Self {
static_receiver_handles: Vec::new(),
gossip_receiver_handles: Vec::new(),
#[cfg(feature = "gossip-bootstrap")]
gossip_ingest_telemetry: None,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime: None,
#[cfg(feature = "gossip-bootstrap")]
gossip_identity: Arc::new(Keypair::new()),
#[cfg(feature = "gossip-bootstrap")]
active_gossip_entrypoint: None,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime_primary_port_range: None,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime_secondary_port_range: None,
#[cfg(feature = "gossip-bootstrap")]
gossip_runtime_active_port_range: None,
#[cfg(feature = "gossip-bootstrap")]
repair_client: None,
tx_event_rx,
}
}
}
#[cfg(feature = "gossip-bootstrap")]
impl ReceiverRuntime {
pub(in crate::app::runtime) fn replace_gossip_runtime(
&mut self,
receiver_handles: Vec<JoinHandle<()>>,
runtime: GossipRuntime,
repair_client: Option<crate::repair::GossipRepairClient>,
active_entrypoint: Option<String>,
active_port_range: Option<PortRange>,
) {
for handle in &self.gossip_receiver_handles {
handle.abort();
}
self.gossip_receiver_handles = receiver_handles;
self.gossip_ingest_telemetry = Some(runtime.ingest_telemetry.clone());
self.gossip_runtime = Some(runtime);
self.repair_client = repair_client;
self.active_gossip_entrypoint = active_entrypoint;
self.gossip_runtime_active_port_range = active_port_range;
}
pub(in crate::app::runtime) fn detach_gossip_control_plane(&mut self) {
self.gossip_runtime = None;
}
pub(in crate::app::runtime) async fn stop_gossip_runtime(&mut self) {
let mut handles = Vec::new();
handles.append(&mut self.gossip_receiver_handles);
crate::app::runtime::bootstrap::gossip::stop_gossip_runtime_components(
handles,
self.gossip_runtime.take(),
)
.await;
self.gossip_ingest_telemetry = None;
self.repair_client = None;
self.active_gossip_entrypoint = None;
self.gossip_runtime_active_port_range = None;
}
}