1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6use std::sync::Arc;
7use std::time::Duration;
8
9use regex::Regex;
10use tracing::{error, info};
11
12use crate::beacon::{BeaconConfig, run_beacon};
13use crate::handler::{PvListMode, ServerState, rand_guid, run_tcp_server, run_udp_search};
14use crate::monitor::MonitorRegistry;
15use crate::pvstore::PvStore;
16
17pub struct PvaServerConfig {
19 pub listen_ip: IpAddr,
21 pub tcp_port: u16,
23 pub udp_port: u16,
25 pub advertise_ip: Option<IpAddr>,
27 pub beacon_target: SocketAddr,
29 pub beacon_period_secs: u64,
31 pub conn_timeout: Duration,
33 pub compute_alarms: bool,
35 pub pvlist_mode: PvListMode,
37 pub pvlist_max: usize,
39 pub pvlist_allow_pattern: Option<Regex>,
41}
42
43impl Default for PvaServerConfig {
44 fn default() -> Self {
45 Self {
46 listen_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
47 tcp_port: 5075,
48 udp_port: 5076,
49 advertise_ip: None,
50 beacon_target: "224.0.0.128:5076".parse().unwrap(),
51 beacon_period_secs: 15,
52 conn_timeout: Duration::from_secs(64000),
53 compute_alarms: false,
54 pvlist_mode: PvListMode::List,
55 pvlist_max: 1024,
56 pvlist_allow_pattern: None,
57 }
58 }
59}
60
61pub struct PvaServerState<S: PvStore> {
66 pub inner: Arc<ServerState<S>>,
67 pub registry: Arc<MonitorRegistry>,
68}
69
70impl<S: PvStore> PvaServerState<S> {
71 pub fn new(store: Arc<S>, config: &PvaServerConfig) -> Self {
72 Self::with_registry(store, config, Arc::new(MonitorRegistry::new()))
73 }
74
75 pub fn with_registry(
76 store: Arc<S>,
77 config: &PvaServerConfig,
78 registry: Arc<MonitorRegistry>,
79 ) -> Self {
80 let guid = rand_guid();
81 let inner = Arc::new(ServerState::new(
82 store,
83 registry.clone(),
84 config.compute_alarms,
85 config.pvlist_mode,
86 config.pvlist_max,
87 config.pvlist_allow_pattern.clone(),
88 guid,
89 config.tcp_port,
90 config.advertise_ip,
91 config.listen_ip,
92 ));
93 Self { inner, registry }
94 }
95}
96
97pub async fn run_pva_server<S: PvStore>(
102 store: Arc<S>,
103 config: PvaServerConfig,
104) -> Result<(), Box<dyn std::error::Error>> {
105 let registry = Arc::new(MonitorRegistry::new());
106 run_pva_server_with_registry(store, config, registry).await
107}
108
109pub async fn run_pva_server_with_registry<S: PvStore>(
111 store: Arc<S>,
112 config: PvaServerConfig,
113 registry: Arc<MonitorRegistry>,
114) -> Result<(), Box<dyn std::error::Error>> {
115 let server_state = PvaServerState::with_registry(store, &config, registry);
116 let state = server_state.inner;
117
118 let guid = state.guid;
119 let tcp_addr = SocketAddr::new(config.listen_ip, config.tcp_port);
120 let udp_addr = SocketAddr::new(config.listen_ip, config.udp_port);
121
122 info!(
123 "Starting PVA server: udp={} tcp={} pvlist_mode={:?} pvlist_max={} filter={}",
124 udp_addr,
125 tcp_addr,
126 config.pvlist_mode,
127 config.pvlist_max,
128 config
129 .pvlist_allow_pattern
130 .as_ref()
131 .map(|r| r.as_str())
132 .unwrap_or("<none>")
133 );
134
135 let beacon_config = BeaconConfig {
136 target: config.beacon_target,
137 guid,
138 tcp_port: config.tcp_port,
139 advertise_ip: config.advertise_ip,
140 listen_ip: config.listen_ip,
141 period_secs: config.beacon_period_secs,
142 };
143
144 let udp_state = state.clone();
145 let udp_task = tokio::spawn(async move {
146 if let Err(e) = run_udp_search(
147 udp_state,
148 udp_addr,
149 config.tcp_port,
150 guid,
151 config.advertise_ip,
152 )
153 .await
154 {
155 error!("UDP search server error: {}", e);
156 }
157 });
158
159 let tcp_state = state.clone();
160 let tcp_task = tokio::spawn(async move {
161 if let Err(e) = run_tcp_server(tcp_state, tcp_addr, config.conn_timeout).await {
162 error!("TCP server error: {}", e);
163 }
164 });
165
166 let beacon_change = state.beacon_change.clone();
167 let beacon_task = tokio::spawn(async move {
168 if let Err(e) = run_beacon(beacon_config, beacon_change).await {
169 error!("Beacon task error: {}", e);
170 }
171 });
172
173 let _ = tokio::join!(udp_task, tcp_task, beacon_task);
174 Ok(())
175}