use std::sync::Arc;
use std::time::Duration;
use epics_pva_rs::client::PvaClient;
use epics_pva_rs::error::PvaResult;
use epics_pva_rs::server_native::{
CompositeSource, PvaServer, PvaServerConfig, runtime::ServerReport,
};
use super::channel_cache::{ChannelCache, DEFAULT_CLEANUP_INTERVAL};
use super::control::ControlSource;
use super::error::GwResult;
use super::source::GatewayChannelSource;
pub struct PvaGatewayConfig {
pub upstream_client: Option<Arc<PvaClient>>,
pub server_config: PvaServerConfig,
pub cleanup_interval: Duration,
pub connect_timeout: Duration,
pub max_cache_entries: usize,
pub max_subscribers: usize,
pub control_prefix: Option<String>,
}
impl Default for PvaGatewayConfig {
fn default() -> Self {
let mut server_config = PvaServerConfig::default();
server_config.emit_type_cache = true;
Self {
upstream_client: None,
server_config,
cleanup_interval: DEFAULT_CLEANUP_INTERVAL,
connect_timeout: Duration::from_secs(5),
max_cache_entries: super::channel_cache::DEFAULT_MAX_ENTRIES,
max_subscribers: 100_000,
control_prefix: None,
}
}
}
impl PvaGatewayConfig {
pub fn with_env(mut self) -> Self {
if let Ok(s) = std::env::var("EPICS_PVA_GW_CLEANUP_INTERVAL") {
if let Ok(secs) = s.parse::<f64>() {
if secs > 0.0 && secs.is_finite() {
self.cleanup_interval = Duration::from_secs_f64(secs);
}
}
}
if let Ok(s) = std::env::var("EPICS_PVA_GW_CONNECT_TMO") {
if let Ok(secs) = s.parse::<f64>() {
if secs > 0.0 && secs.is_finite() {
self.connect_timeout = Duration::from_secs_f64(secs);
}
}
}
if let Ok(s) = std::env::var("EPICS_PVA_GW_MAX_CACHE_ENTRIES") {
if let Ok(n) = s.parse::<usize>() {
if n > 0 {
self.max_cache_entries = n;
}
}
}
if let Ok(s) = std::env::var("EPICS_PVA_GW_MAX_SUBSCRIBERS") {
if let Ok(n) = s.parse::<usize>() {
if n > 0 {
self.max_subscribers = n;
}
}
}
if let Ok(s) = std::env::var("EPICS_PVA_GW_CONTROL_PREFIX") {
let trimmed = s.trim();
if !trimmed.is_empty() {
self.control_prefix = Some(trimmed.to_string());
}
}
self
}
}
pub struct PvaGateway {
cache: Arc<ChannelCache>,
server: PvaServer,
source: GatewayChannelSource,
}
impl PvaGateway {
pub fn start(config: PvaGatewayConfig) -> GwResult<Self> {
let client = config
.upstream_client
.unwrap_or_else(|| Arc::new(PvaClient::builder().build()));
let cache = ChannelCache::with_max_entries(
client,
config.cleanup_interval,
config.max_cache_entries,
);
let mut source = GatewayChannelSource::new(cache.clone());
source.connect_timeout = config.connect_timeout;
source.max_subscribers = config.max_subscribers;
let server = match &config.control_prefix {
Some(prefix) if !prefix.is_empty() => {
let composite = CompositeSource::new();
let control = ControlSource::new(prefix, cache.clone(), source.clone());
composite
.add_source("__gw_control", Arc::new(control), -100)
.map_err(|e| {
super::error::GwError::Other(format!("control source registration: {e}"))
})?;
composite
.add_source("gateway", Arc::new(source.clone()), 0)
.map_err(|e| {
super::error::GwError::Other(format!("gateway source registration: {e}"))
})?;
PvaServer::start(composite, config.server_config)
}
_ => PvaServer::start(Arc::new(source.clone()), config.server_config),
};
Ok(Self {
cache,
server,
source,
})
}
pub fn isolated(client: Arc<PvaClient>) -> Self {
let cache = ChannelCache::new(client, DEFAULT_CLEANUP_INTERVAL);
let source = GatewayChannelSource::new(cache.clone());
let server = PvaServer::isolated(Arc::new(source.clone()));
Self {
cache,
server,
source,
}
}
pub fn cache(&self) -> &Arc<ChannelCache> {
&self.cache
}
pub fn source(&self) -> GatewayChannelSource {
self.source.clone()
}
pub fn report(&self) -> ServerReport {
self.server.report()
}
pub fn interrupt(&self) {
self.server.interrupt();
}
pub fn client_config(&self) -> PvaClient {
self.server.client_config()
}
pub async fn run(self) -> PvaResult<()> {
self.server.run().await
}
pub fn stop(&self) {
self.server.stop();
}
pub async fn prefetch(&self, pv_names: &[&str]) {
for name in pv_names {
let _ = self.cache.lookup(name, self.source.connect_timeout).await;
}
}
}