Skip to main content

spvirit_server/
server.rs

1//! Top-level PVA server orchestration.
2//!
3//! [`run_pva_server`] binds UDP + TCP + beacon and runs until cancelled.
4
5use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6use std::sync::Arc;
7use std::time::Duration;
8
9use regex::Regex;
10use tracing::{error, info};
11
12use crate::beacon::{BeaconConfig, run_beacon};
13use crate::handler::{PvListMode, ServerState, rand_guid, run_tcp_server, run_udp_search};
14use crate::monitor::MonitorRegistry;
15use crate::pvstore::PvStore;
16
17/// Configuration for the PVA server.
18pub struct PvaServerConfig {
19    /// IP address to listen on (default: 0.0.0.0).
20    pub listen_ip: IpAddr,
21    /// TCP port (default: 5075).
22    pub tcp_port: u16,
23    /// UDP port (default: 5076).
24    pub udp_port: u16,
25    /// Address to advertise in search responses (None = auto).
26    pub advertise_ip: Option<IpAddr>,
27    /// Beacon target address (default: 224.0.0.128:5076).
28    pub beacon_target: SocketAddr,
29    /// Beacon period in seconds.
30    pub beacon_period_secs: u64,
31    /// Idle connection timeout.
32    pub conn_timeout: Duration,
33    /// Whether to compute alarms from limits.
34    pub compute_alarms: bool,
35    /// PV list mode.
36    pub pvlist_mode: PvListMode,
37    /// Maximum PV names in pvlist responses.
38    pub pvlist_max: usize,
39    /// Optional regex filter for pvlist.
40    pub pvlist_allow_pattern: Option<Regex>,
41}
42
43impl Default for PvaServerConfig {
44    fn default() -> Self {
45        Self {
46            listen_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
47            tcp_port: 5075,
48            udp_port: 5076,
49            advertise_ip: None,
50            beacon_target: "224.0.0.128:5076".parse().unwrap(),
51            beacon_period_secs: 15,
52            conn_timeout: Duration::from_secs(64000),
53            compute_alarms: false,
54            pvlist_mode: PvListMode::List,
55            pvlist_max: 1024,
56            pvlist_allow_pattern: None,
57        }
58    }
59}
60
61/// Shared server state wrapping a [`PvStore`] implementation.
62///
63/// Consumers can hold an `Arc<PvaServerState<S>>` to inspect or mutate the
64/// underlying store while the server tasks are running.
65pub struct PvaServerState<S: PvStore> {
66    pub inner: Arc<ServerState<S>>,
67    pub registry: Arc<MonitorRegistry>,
68}
69
70impl<S: PvStore> PvaServerState<S> {
71    pub fn new(store: Arc<S>, config: &PvaServerConfig) -> Self {
72        Self::with_registry(store, config, Arc::new(MonitorRegistry::new()))
73    }
74
75    pub fn with_registry(
76        store: Arc<S>,
77        config: &PvaServerConfig,
78        registry: Arc<MonitorRegistry>,
79    ) -> Self {
80        let guid = rand_guid();
81        let inner = Arc::new(ServerState::new(
82            store,
83            registry.clone(),
84            config.compute_alarms,
85            config.pvlist_mode,
86            config.pvlist_max,
87            config.pvlist_allow_pattern.clone(),
88            guid,
89            config.tcp_port,
90            config.advertise_ip,
91            config.listen_ip,
92        ));
93        Self { inner, registry }
94    }
95}
96
97/// Run a PVA server (UDP search + TCP handler + beacon).
98///
99/// This function drives the three server tasks in a `tokio::select!` loop and
100/// returns when any task errors or the future is dropped.
101pub async fn run_pva_server<S: PvStore>(
102    store: Arc<S>,
103    config: PvaServerConfig,
104) -> Result<(), Box<dyn std::error::Error>> {
105    let registry = Arc::new(MonitorRegistry::new());
106    run_pva_server_with_registry(store, config, registry).await
107}
108
109/// Like [`run_pva_server`] but re-uses an existing [`MonitorRegistry`].
110pub async fn run_pva_server_with_registry<S: PvStore>(
111    store: Arc<S>,
112    config: PvaServerConfig,
113    registry: Arc<MonitorRegistry>,
114) -> Result<(), Box<dyn std::error::Error>> {
115    let server_state = PvaServerState::with_registry(store, &config, registry);
116    let state = server_state.inner;
117
118    let guid = state.guid;
119    let tcp_addr = SocketAddr::new(config.listen_ip, config.tcp_port);
120    let udp_addr = SocketAddr::new(config.listen_ip, config.udp_port);
121
122    info!(
123        "Starting PVA server: udp={} tcp={} pvlist_mode={:?} pvlist_max={} filter={}",
124        udp_addr,
125        tcp_addr,
126        config.pvlist_mode,
127        config.pvlist_max,
128        config
129            .pvlist_allow_pattern
130            .as_ref()
131            .map(|r| r.as_str())
132            .unwrap_or("<none>")
133    );
134
135    let beacon_config = BeaconConfig {
136        target: config.beacon_target,
137        guid,
138        tcp_port: config.tcp_port,
139        advertise_ip: config.advertise_ip,
140        listen_ip: config.listen_ip,
141        period_secs: config.beacon_period_secs,
142    };
143
144    let udp_state = state.clone();
145    let udp_task = tokio::spawn(async move {
146        if let Err(e) = run_udp_search(
147            udp_state,
148            udp_addr,
149            config.tcp_port,
150            guid,
151            config.advertise_ip,
152        )
153        .await
154        {
155            error!("UDP search server error: {}", e);
156        }
157    });
158
159    let tcp_state = state.clone();
160    let tcp_task = tokio::spawn(async move {
161        if let Err(e) = run_tcp_server(tcp_state, tcp_addr, config.conn_timeout).await {
162            error!("TCP server error: {}", e);
163        }
164    });
165
166    let beacon_change = state.beacon_change.clone();
167    let beacon_task = tokio::spawn(async move {
168        if let Err(e) = run_beacon(beacon_config, beacon_change).await {
169            error!("Beacon task error: {}", e);
170        }
171    });
172
173    let _ = tokio::join!(udp_task, tcp_task, beacon_task);
174    Ok(())
175}