sof 0.17.1

Solana Observer Framework for low-latency shred ingestion and plugin-driven transaction observation
Documentation
use super::*;

pub(in crate::app::runtime) struct ReceiverRuntime {
    pub(in crate::app::runtime) static_receiver_handles: Vec<JoinHandle<()>>,
    pub(in crate::app::runtime) gossip_receiver_handles: Vec<JoinHandle<()>>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) gossip_ingest_telemetry: Option<ingest::ReceiverTelemetry>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) gossip_runtime: Option<GossipRuntime>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) gossip_identity: Arc<Keypair>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) active_gossip_entrypoint: Option<String>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) gossip_runtime_primary_port_range: Option<PortRange>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) gossip_runtime_secondary_port_range: Option<PortRange>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) gossip_runtime_active_port_range: Option<PortRange>,
    #[cfg(feature = "gossip-bootstrap")]
    pub(in crate::app::runtime) repair_client: Option<crate::repair::GossipRepairClient>,
    pub(in crate::app::runtime) tx_event_rx: mpsc::Receiver<TxObservedEvent>,
}

#[cfg(feature = "gossip-bootstrap")]
pub(in crate::app::runtime) struct GossipRuntime {
    pub(in crate::app::runtime) exit: Arc<AtomicBool>,
    pub(in crate::app::runtime) gossip_service: Option<GossipService>,
    pub(in crate::app::runtime) cluster_info: Arc<ClusterInfo>,
    pub(in crate::app::runtime) ingest_telemetry: ingest::ReceiverTelemetry,
}

#[cfg(feature = "gossip-bootstrap")]
impl Drop for GossipRuntime {
    fn drop(&mut self) {
        self.exit.store(true, Ordering::Relaxed);
        if let Some(gossip_service) = self.gossip_service.take()
            && gossip_service.join().is_err()
        {
            // Gossip service already terminated.
        }
    }
}

impl Drop for ReceiverRuntime {
    fn drop(&mut self) {
        for handle in &self.static_receiver_handles {
            handle.abort();
        }
        for handle in &self.gossip_receiver_handles {
            handle.abort();
        }
    }
}

impl ReceiverRuntime {
    /// Builds a receiver runtime that relies on external ingress feed(s).
    #[cfg(feature = "kernel-bypass")]
    #[must_use]
    pub(in crate::app::runtime) fn external(tx_event_rx: mpsc::Receiver<TxObservedEvent>) -> Self {
        Self {
            static_receiver_handles: Vec::new(),
            gossip_receiver_handles: Vec::new(),
            #[cfg(feature = "gossip-bootstrap")]
            gossip_ingest_telemetry: None,
            #[cfg(feature = "gossip-bootstrap")]
            gossip_runtime: None,
            #[cfg(feature = "gossip-bootstrap")]
            gossip_identity: Arc::new(Keypair::new()),
            #[cfg(feature = "gossip-bootstrap")]
            active_gossip_entrypoint: None,
            #[cfg(feature = "gossip-bootstrap")]
            gossip_runtime_primary_port_range: None,
            #[cfg(feature = "gossip-bootstrap")]
            gossip_runtime_secondary_port_range: None,
            #[cfg(feature = "gossip-bootstrap")]
            gossip_runtime_active_port_range: None,
            #[cfg(feature = "gossip-bootstrap")]
            repair_client: None,
            tx_event_rx,
        }
    }
}

#[cfg(feature = "gossip-bootstrap")]
impl ReceiverRuntime {
    pub(in crate::app::runtime) fn replace_gossip_runtime(
        &mut self,
        receiver_handles: Vec<JoinHandle<()>>,
        runtime: GossipRuntime,
        repair_client: Option<crate::repair::GossipRepairClient>,
        active_entrypoint: Option<String>,
        active_port_range: Option<PortRange>,
    ) {
        for handle in &self.gossip_receiver_handles {
            handle.abort();
        }
        self.gossip_receiver_handles = receiver_handles;
        self.gossip_ingest_telemetry = Some(runtime.ingest_telemetry.clone());
        self.gossip_runtime = Some(runtime);
        self.repair_client = repair_client;
        self.active_gossip_entrypoint = active_entrypoint;
        self.gossip_runtime_active_port_range = active_port_range;
    }

    pub(in crate::app::runtime) fn detach_gossip_control_plane(&mut self) {
        self.gossip_runtime = None;
    }

    pub(in crate::app::runtime) async fn stop_gossip_runtime(&mut self) {
        let mut handles = Vec::new();
        handles.append(&mut self.gossip_receiver_handles);
        crate::app::runtime::bootstrap::gossip::stop_gossip_runtime_components(
            handles,
            self.gossip_runtime.take(),
        )
        .await;
        self.gossip_ingest_telemetry = None;
        self.repair_client = None;
        self.active_gossip_entrypoint = None;
        self.gossip_runtime_active_port_range = None;
    }
}