epics-bridge-rs 0.18.4

EPICS protocol bridges: Record↔PVA (QSRV), CA gateway, pvalink, PVA gateway
Documentation
//! G-G1: multi-tenant PVA gateway.
//!
//! pva2pva-style "N upstream × M downstream" topology in a single
//! process. Each downstream `PvaServer` selects a subset of the
//! configured upstream `PvaClient`s and proxies to them in priority
//! order.
//!
//! ## When to use this
//!
//! Use [`MultiTenantPvaGateway`] when one process needs to bridge
//! multiple isolated PV namespaces — e.g. a site-wide gateway that
//! exposes both the experimental floor and the controls subnet to a
//! shared visitor network, while keeping their `EPICS_PVA_ADDR_LIST`s
//! separate. Use [`super::PvaGateway`] for the typical
//! one-upstream-cluster-behind-one-server case (which is most
//! deployments).
//!
//! ## Topology
//!
//! ```text
//!   ┌─ upstream A ─┐    ┌─ ChannelCache A ─┐
//!   │ PvaClient    │ ─▶ │ (its own client) │ ─┐
//!   └──────────────┘    └──────────────────┘  │
//!                                              ├──▶ ┌─ downstream X ─┐
//!   ┌─ upstream B ─┐    ┌─ ChannelCache B ─┐  │     │ PvaServer       │
//!   │ PvaClient    │ ─▶ │ (its own client) │ ─┤     │ CompositeSource │
//!   └──────────────┘    └──────────────────┘  │     └─────────────────┘
//!//!                                              └──▶ ┌─ downstream Y ─┐
//!                                                   │ subset of      │
//!                                                   │ upstreams      │
//!                                                   └────────────────┘
//! ```
//!
//! ## Example
//!
//! ```no_run
//! use std::sync::Arc;
//! use std::time::Duration;
//! use epics_bridge_rs::pva_gateway::multi_gateway::MultiTenantPvaGatewayBuilder;
//! use epics_pva_rs::client::PvaClient;
//! use epics_pva_rs::server_native::PvaServerConfig;
//!
//! # fn run() -> epics_bridge_rs::pva_gateway::error::GwResult<()> {
//! let upstream_a = Arc::new(PvaClient::builder().build());
//! let upstream_b = Arc::new(PvaClient::builder().build());
//!
//! let gw = MultiTenantPvaGatewayBuilder::new()
//!     .add_upstream("A", upstream_a)
//!     .add_upstream("B", upstream_b)
//!     // Visitor network sees only namespace A
//!     .add_downstream(
//!         "visitor",
//!         PvaServerConfig::default(),
//!         &["A"],
//!         Some("gw:A".to_string()),
//!     )
//!     // Operator subnet sees both A and B (B preferred via order)
//!     .add_downstream(
//!         "ops",
//!         PvaServerConfig::default(),
//!         &["B", "A"],
//!         Some("gw:ops".to_string()),
//!     )
//!     .start()?;
//! # Ok(()) }
//! ```

use std::sync::Arc;
use std::time::Duration;

use epics_pva_rs::client::PvaClient;
use epics_pva_rs::server_native::{CompositeSource, PvaServer, PvaServerConfig};

use super::channel_cache::{ChannelCache, DEFAULT_CLEANUP_INTERVAL};
use super::control::ControlSource;
use super::error::{GwError, GwResult};
use super::source::GatewayChannelSource;

/// One upstream tenant — a `PvaClient` (with its own connection pool
/// and EPICS_PVA_ADDR_LIST scope) labelled for routing.
struct UpstreamTenant {
    label: String,
    client: Arc<PvaClient>,
}

/// One downstream tenant — its `PvaServer` config, the labelled list
/// of upstream tenants it proxies, and an optional control_prefix.
struct DownstreamSpec {
    label: String,
    config: PvaServerConfig,
    upstream_labels: Vec<String>,
    control_prefix: Option<String>,
}

/// Builder for [`MultiTenantPvaGateway`]. Add upstreams first, then
/// downstreams that reference them by label.
pub struct MultiTenantPvaGatewayBuilder {
    upstreams: Vec<UpstreamTenant>,
    downstreams: Vec<DownstreamSpec>,
    cleanup_interval: Duration,
    connect_timeout: Duration,
    max_cache_entries: usize,
    max_subscribers: usize,
}

impl Default for MultiTenantPvaGatewayBuilder {
    fn default() -> Self {
        Self::new()
    }
}

impl MultiTenantPvaGatewayBuilder {
    pub fn new() -> Self {
        Self {
            upstreams: Vec::new(),
            downstreams: Vec::new(),
            cleanup_interval: DEFAULT_CLEANUP_INTERVAL,
            connect_timeout: Duration::from_secs(5),
            max_cache_entries: super::channel_cache::DEFAULT_MAX_ENTRIES,
            max_subscribers: 100_000,
        }
    }

    /// Register an upstream tenant. `label` must be unique across
    /// upstreams; downstreams reference it via [`Self::add_downstream`].
    pub fn add_upstream(mut self, label: impl Into<String>, client: Arc<PvaClient>) -> Self {
        self.upstreams.push(UpstreamTenant {
            label: label.into(),
            client,
        });
        self
    }

    /// Register a downstream tenant. `upstream_labels` lists the
    /// upstreams to proxy in priority order — earlier labels are tried
    /// first when a downstream search arrives. `control_prefix`, when
    /// `Some`, exposes diagnostic PVs scoped to *this* downstream only
    /// (each downstream's stats are independent).
    pub fn add_downstream(
        mut self,
        label: impl Into<String>,
        config: PvaServerConfig,
        upstream_labels: &[&str],
        control_prefix: Option<String>,
    ) -> Self {
        self.downstreams.push(DownstreamSpec {
            label: label.into(),
            config,
            upstream_labels: upstream_labels.iter().map(|s| (*s).to_string()).collect(),
            control_prefix,
        });
        self
    }

    pub fn cleanup_interval(mut self, d: Duration) -> Self {
        self.cleanup_interval = d;
        self
    }

    pub fn connect_timeout(mut self, d: Duration) -> Self {
        self.connect_timeout = d;
        self
    }

    pub fn max_cache_entries(mut self, n: usize) -> Self {
        self.max_cache_entries = n;
        self
    }

    pub fn max_subscribers(mut self, n: usize) -> Self {
        self.max_subscribers = n;
        self
    }

    /// Validate + start every downstream. The returned
    /// [`MultiTenantPvaGateway`] owns one [`PvaServer`] per downstream
    /// spec and one [`ChannelCache`] per upstream tenant.
    pub fn start(self) -> GwResult<MultiTenantPvaGateway> {
        if self.upstreams.is_empty() {
            return Err(GwError::Other(
                "MultiTenantPvaGatewayBuilder: at least one upstream required".into(),
            ));
        }
        if self.downstreams.is_empty() {
            return Err(GwError::Other(
                "MultiTenantPvaGatewayBuilder: at least one downstream required \
                 (a gateway with no listeners would resolve no clients)"
                    .into(),
            ));
        }
        // Detect duplicate upstream labels — a server's label list is
        // matched against this set, so duplicates would silently
        // route to whichever entry came first.
        for (i, a) in self.upstreams.iter().enumerate() {
            for b in &self.upstreams[i + 1..] {
                if a.label == b.label {
                    return Err(GwError::Other(format!(
                        "duplicate upstream label '{}'",
                        a.label
                    )));
                }
            }
        }
        // Same check for downstreams. `downstream(label)` accessor
        // returns the FIRST match, so duplicate labels would silently
        // shadow the second one — better to refuse at build time.
        for (i, a) in self.downstreams.iter().enumerate() {
            for b in &self.downstreams[i + 1..] {
                if a.label == b.label {
                    return Err(GwError::Other(format!(
                        "duplicate downstream label '{}'",
                        a.label
                    )));
                }
            }
            if a.upstream_labels.is_empty() {
                return Err(GwError::Other(format!(
                    "downstream '{}' must reference at least one upstream",
                    a.label
                )));
            }
        }
        // Build a cache per upstream. Sized identically — the per-PV
        // entry is per-client so the budgets don't share.
        let mut caches: Vec<(String, Arc<ChannelCache>)> = Vec::with_capacity(self.upstreams.len());
        for u in &self.upstreams {
            let c = ChannelCache::with_max_entries(
                u.client.clone(),
                self.cleanup_interval,
                self.max_cache_entries,
            );
            caches.push((u.label.clone(), c));
        }

        let mut servers: Vec<DownstreamHandle> = Vec::with_capacity(self.downstreams.len());
        for ds in self.downstreams {
            // Resolve each label to a cache; refuse unknown labels at
            // build time so misconfigured deployments surface early.
            let mut sources: Vec<(String, Arc<ChannelCache>)> = Vec::new();
            for needed in &ds.upstream_labels {
                let cache = caches
                    .iter()
                    .find(|(lbl, _)| lbl == needed)
                    .map(|(_, c)| c.clone())
                    .ok_or_else(|| {
                        GwError::Other(format!(
                            "downstream '{}' references unknown upstream label '{needed}'",
                            ds.label
                        ))
                    })?;
                sources.push((needed.clone(), cache));
            }
            // Compose the ChannelSource: optional control source at
            // order=-100 (so its names always win), then one
            // GatewayChannelSource per upstream label in spec order.
            let composite = CompositeSource::new();
            // Track the first gateway source for the optional
            // ControlSource (its `liveSubscribers` counter is
            // per-source). When multiple upstreams are present, we
            // pick the first one for the control surface — the
            // operator can always disambiguate via the per-cache
            // diagnostic PVs in each control_prefix namespace.
            let mut first_gw_source: Option<GatewayChannelSource> = None;
            let mut first_cache: Option<Arc<ChannelCache>> = None;
            for (i, (label, cache)) in sources.iter().enumerate() {
                let mut src = GatewayChannelSource::new(cache.clone());
                src.connect_timeout = self.connect_timeout;
                src.max_subscribers = self.max_subscribers;
                if first_gw_source.is_none() {
                    first_gw_source = Some(src.clone());
                    first_cache = Some(cache.clone());
                }
                let order = i as i32; // earlier labels win
                let name = format!("gateway:{label}");
                composite
                    .add_source(&name, Arc::new(src), order)
                    .map_err(|e| {
                        GwError::Other(format!(
                            "downstream '{}' source '{name}' registration: {e}",
                            ds.label
                        ))
                    })?;
            }
            if let (Some(prefix), Some(gw_src), Some(cache)) =
                (ds.control_prefix.as_ref(), first_gw_source, first_cache)
            {
                if !prefix.is_empty() {
                    let control = ControlSource::new(prefix, cache, gw_src);
                    composite
                        .add_source("__gw_control", Arc::new(control), -100)
                        .map_err(|e| {
                            GwError::Other(format!(
                                "downstream '{}' control source registration: {e}",
                                ds.label
                            ))
                        })?;
                }
            }
            let server = PvaServer::start(composite, ds.config)?;
            servers.push(DownstreamHandle {
                label: ds.label,
                server,
            });
        }

        Ok(MultiTenantPvaGateway { caches, servers })
    }
}

struct DownstreamHandle {
    label: String,
    server: PvaServer,
}

/// Running multi-tenant gateway. Drop to tear down all servers;
/// the per-upstream `ChannelCache`s drop alongside (their cleanup
/// task aborts via [`ChannelCache::drop`]).
pub struct MultiTenantPvaGateway {
    caches: Vec<(String, Arc<ChannelCache>)>,
    servers: Vec<DownstreamHandle>,
}

impl MultiTenantPvaGateway {
    /// Number of configured downstream servers.
    pub fn downstream_count(&self) -> usize {
        self.servers.len()
    }

    /// Number of configured upstream tenants.
    pub fn upstream_count(&self) -> usize {
        self.caches.len()
    }

    /// Look up a downstream by its label.
    pub fn downstream(&self, label: &str) -> Option<&PvaServer> {
        self.servers
            .iter()
            .find(|h| h.label == label)
            .map(|h| &h.server)
    }

    /// Look up an upstream cache by its label.
    pub fn upstream_cache(&self, label: &str) -> Option<&Arc<ChannelCache>> {
        self.caches
            .iter()
            .find(|(lbl, _)| lbl == label)
            .map(|(_, c)| c)
    }

    /// Stop every downstream server. Per-cache cleanup tasks are
    /// torn down when the gateway is dropped.
    pub fn stop_all(&self) {
        for h in &self.servers {
            h.server.stop();
        }
    }
}