Skip to main content

spvirit_server/
server.rs

1//! Top-level PVA server orchestration.
2//!
3//! [`run_pva_server`] binds UDP + TCP + beacon and runs until cancelled.
4
5use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6use std::sync::Arc;
7use std::time::Duration;
8
9use regex::Regex;
10use tracing::{error, info};
11
12use crate::beacon::{BeaconConfig, run_beacon};
13use crate::handler::{PvListMode, ServerState, rand_guid, run_tcp_server, run_udp_search};
14use crate::monitor::MonitorRegistry;
15use crate::pvstore::PvStore;
16
17/// Configuration for the PVA server.
18pub struct PvaServerConfig {
19    /// IP address to listen on (default: 0.0.0.0).
20    pub listen_ip: IpAddr,
21    /// TCP port (default: 5075).
22    pub tcp_port: u16,
23    /// UDP port (default: 5076).
24    pub udp_port: u16,
25    /// Address to advertise in search responses (None = auto).
26    pub advertise_ip: Option<IpAddr>,
27    /// Beacon target address (default: 224.0.0.128:5076).
28    pub beacon_target: SocketAddr,
29    /// Beacon period in seconds.
30    pub beacon_period_secs: u64,
31    /// Idle connection timeout.
32    pub conn_timeout: Duration,
33    /// Whether to compute alarms from limits.
34    pub compute_alarms: bool,
35    /// PV list mode.
36    pub pvlist_mode: PvListMode,
37    /// Maximum PV names in pvlist responses.
38    pub pvlist_max: usize,
39    /// Optional regex filter for pvlist.
40    pub pvlist_allow_pattern: Option<Regex>,
41}
42
43impl Default for PvaServerConfig {
44    fn default() -> Self {
45        Self {
46            listen_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
47            tcp_port: 5075,
48            udp_port: 5076,
49            advertise_ip: None,
50            beacon_target: "224.0.0.128:5076".parse().unwrap(),
51            beacon_period_secs: 15,
52            conn_timeout: Duration::from_secs(64000),
53            compute_alarms: false,
54            pvlist_mode: PvListMode::List,
55            pvlist_max: 1024,
56            pvlist_allow_pattern: None,
57        }
58    }
59}
60
61/// Shared server state wrapping a [`PvStore`] implementation.
62///
63/// Consumers can hold an `Arc<PvaServerState<S>>` to inspect or mutate the
64/// underlying store while the server tasks are running.
65pub struct PvaServerState<S: PvStore> {
66    pub inner: Arc<ServerState<S>>,
67    pub registry: Arc<MonitorRegistry>,
68}
69
70impl<S: PvStore> PvaServerState<S> {
71    pub fn new(store: Arc<S>, config: &PvaServerConfig) -> Self {
72        Self::with_registry(store, config, Arc::new(MonitorRegistry::new()))
73    }
74
75    pub fn with_registry(
76        store: Arc<S>,
77        config: &PvaServerConfig,
78        registry: Arc<MonitorRegistry>,
79    ) -> Self {
80        let inner = Arc::new(ServerState::new(
81            store,
82            registry.clone(),
83            config.compute_alarms,
84            config.pvlist_mode,
85            config.pvlist_max,
86            config.pvlist_allow_pattern.clone(),
87        ));
88        Self { inner, registry }
89    }
90}
91
92/// Run a PVA server (UDP search + TCP handler + beacon).
93///
94/// This function drives the three server tasks in a `tokio::select!` loop and
95/// returns when any task errors or the future is dropped.
96pub async fn run_pva_server<S: PvStore>(
97    store: Arc<S>,
98    config: PvaServerConfig,
99) -> Result<(), Box<dyn std::error::Error>> {
100    let registry = Arc::new(MonitorRegistry::new());
101    run_pva_server_with_registry(store, config, registry).await
102}
103
104/// Like [`run_pva_server`] but re-uses an existing [`MonitorRegistry`].
105pub async fn run_pva_server_with_registry<S: PvStore>(
106    store: Arc<S>,
107    config: PvaServerConfig,
108    registry: Arc<MonitorRegistry>,
109) -> Result<(), Box<dyn std::error::Error>> {
110    let server_state = PvaServerState::with_registry(store, &config, registry);
111    let state = server_state.inner;
112
113    let guid = rand_guid();
114    let tcp_addr = SocketAddr::new(config.listen_ip, config.tcp_port);
115    let udp_addr = SocketAddr::new(config.listen_ip, config.udp_port);
116
117    info!(
118        "Starting PVA server: udp={} tcp={} pvlist_mode={:?} pvlist_max={} filter={}",
119        udp_addr,
120        tcp_addr,
121        config.pvlist_mode,
122        config.pvlist_max,
123        config
124            .pvlist_allow_pattern
125            .as_ref()
126            .map(|r| r.as_str())
127            .unwrap_or("<none>")
128    );
129
130    let beacon_config = BeaconConfig {
131        target: config.beacon_target,
132        guid,
133        tcp_port: config.tcp_port,
134        advertise_ip: config.advertise_ip,
135        listen_ip: config.listen_ip,
136        period_secs: config.beacon_period_secs,
137    };
138
139    let udp_state = state.clone();
140    let udp_task = tokio::spawn(async move {
141        if let Err(e) = run_udp_search(
142            udp_state,
143            udp_addr,
144            config.tcp_port,
145            guid,
146            config.advertise_ip,
147        )
148        .await
149        {
150            error!("UDP search server error: {}", e);
151        }
152    });
153
154    let tcp_state = state.clone();
155    let tcp_task = tokio::spawn(async move {
156        if let Err(e) = run_tcp_server(tcp_state, tcp_addr, config.conn_timeout).await {
157            error!("TCP server error: {}", e);
158        }
159    });
160
161    let beacon_change = state.beacon_change.clone();
162    let beacon_task = tokio::spawn(async move {
163        if let Err(e) = run_beacon(beacon_config, beacon_change).await {
164            error!("Beacon task error: {}", e);
165        }
166    });
167
168    let _ = tokio::join!(udp_task, tcp_task, beacon_task);
169    Ok(())
170}