1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
6use std::sync::Arc;
7use std::time::Duration;
8
9use regex::Regex;
10use tracing::{error, info};
11
12use crate::beacon::{BeaconConfig, run_beacon};
13use crate::handler::{PvListMode, ServerState, rand_guid, run_tcp_server, run_udp_search};
14use crate::monitor::MonitorRegistry;
15use crate::pvstore::PvStore;
16
17pub struct PvaServerConfig {
19 pub listen_ip: IpAddr,
21 pub tcp_port: u16,
23 pub udp_port: u16,
25 pub advertise_ip: Option<IpAddr>,
27 pub beacon_target: SocketAddr,
29 pub beacon_period_secs: u64,
31 pub conn_timeout: Duration,
33 pub compute_alarms: bool,
35 pub pvlist_mode: PvListMode,
37 pub pvlist_max: usize,
39 pub pvlist_allow_pattern: Option<Regex>,
41}
42
43impl Default for PvaServerConfig {
44 fn default() -> Self {
45 Self {
46 listen_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
47 tcp_port: 5075,
48 udp_port: 5076,
49 advertise_ip: None,
50 beacon_target: "224.0.0.128:5076".parse().unwrap(),
51 beacon_period_secs: 15,
52 conn_timeout: Duration::from_secs(64000),
53 compute_alarms: false,
54 pvlist_mode: PvListMode::List,
55 pvlist_max: 1024,
56 pvlist_allow_pattern: None,
57 }
58 }
59}
60
61pub struct PvaServerState<S: PvStore> {
66 pub inner: Arc<ServerState<S>>,
67 pub registry: Arc<MonitorRegistry>,
68}
69
70impl<S: PvStore> PvaServerState<S> {
71 pub fn new(store: Arc<S>, config: &PvaServerConfig) -> Self {
72 Self::with_registry(store, config, Arc::new(MonitorRegistry::new()))
73 }
74
75 pub fn with_registry(
76 store: Arc<S>,
77 config: &PvaServerConfig,
78 registry: Arc<MonitorRegistry>,
79 ) -> Self {
80 let inner = Arc::new(ServerState::new(
81 store,
82 registry.clone(),
83 config.compute_alarms,
84 config.pvlist_mode,
85 config.pvlist_max,
86 config.pvlist_allow_pattern.clone(),
87 ));
88 Self { inner, registry }
89 }
90}
91
92pub async fn run_pva_server<S: PvStore>(
97 store: Arc<S>,
98 config: PvaServerConfig,
99) -> Result<(), Box<dyn std::error::Error>> {
100 let registry = Arc::new(MonitorRegistry::new());
101 run_pva_server_with_registry(store, config, registry).await
102}
103
104pub async fn run_pva_server_with_registry<S: PvStore>(
106 store: Arc<S>,
107 config: PvaServerConfig,
108 registry: Arc<MonitorRegistry>,
109) -> Result<(), Box<dyn std::error::Error>> {
110 let server_state = PvaServerState::with_registry(store, &config, registry);
111 let state = server_state.inner;
112
113 let guid = rand_guid();
114 let tcp_addr = SocketAddr::new(config.listen_ip, config.tcp_port);
115 let udp_addr = SocketAddr::new(config.listen_ip, config.udp_port);
116
117 info!(
118 "Starting PVA server: udp={} tcp={} pvlist_mode={:?} pvlist_max={} filter={}",
119 udp_addr,
120 tcp_addr,
121 config.pvlist_mode,
122 config.pvlist_max,
123 config
124 .pvlist_allow_pattern
125 .as_ref()
126 .map(|r| r.as_str())
127 .unwrap_or("<none>")
128 );
129
130 let beacon_config = BeaconConfig {
131 target: config.beacon_target,
132 guid,
133 tcp_port: config.tcp_port,
134 advertise_ip: config.advertise_ip,
135 listen_ip: config.listen_ip,
136 period_secs: config.beacon_period_secs,
137 };
138
139 let udp_state = state.clone();
140 let udp_task = tokio::spawn(async move {
141 if let Err(e) = run_udp_search(
142 udp_state,
143 udp_addr,
144 config.tcp_port,
145 guid,
146 config.advertise_ip,
147 )
148 .await
149 {
150 error!("UDP search server error: {}", e);
151 }
152 });
153
154 let tcp_state = state.clone();
155 let tcp_task = tokio::spawn(async move {
156 if let Err(e) = run_tcp_server(tcp_state, tcp_addr, config.conn_timeout).await {
157 error!("TCP server error: {}", e);
158 }
159 });
160
161 let beacon_change = state.beacon_change.clone();
162 let beacon_task = tokio::spawn(async move {
163 if let Err(e) = run_beacon(beacon_config, beacon_change).await {
164 error!("Beacon task error: {}", e);
165 }
166 });
167
168 let _ = tokio::join!(udp_task, tcp_task, beacon_task);
169 Ok(())
170}