1use std::collections::HashMap;
7use std::collections::HashSet;
8use std::collections::VecDeque;
9use std::net::{IpAddr, SocketAddr};
10use std::time::{Duration, Instant};
11use tracing::debug;
12
13use crate::spvd_decode::StructureDesc;
14
15#[derive(Debug, Clone)]
17pub struct PvaStateConfig {
18 pub max_channels: usize,
20 pub channel_ttl: Duration,
22 pub max_operations: usize,
24 pub max_update_rate: usize,
26}
27
28impl Default for PvaStateConfig {
29 fn default() -> Self {
30 Self {
31 max_channels: 40_000,
32 channel_ttl: Duration::from_secs(5 * 60), max_operations: 10_000,
34 max_update_rate: 10_000,
35 }
36 }
37}
38
39impl PvaStateConfig {
40 pub fn new(max_channels: usize, ttl_secs: u64) -> Self {
41 Self {
42 max_channels,
43 channel_ttl: Duration::from_secs(ttl_secs),
44 max_operations: 10_000,
45 max_update_rate: 10_000,
46 }
47 }
48
49 pub fn with_max_update_rate(mut self, max_update_rate: usize) -> Self {
50 self.max_update_rate = max_update_rate;
51 self
52 }
53}
54
55#[derive(Debug, Clone, Hash, PartialEq, Eq)]
57pub struct ConnectionKey {
58 pub addr_a: SocketAddr,
60 pub addr_b: SocketAddr,
62}
63
64impl ConnectionKey {
65 pub fn new(addr1: SocketAddr, addr2: SocketAddr) -> Self {
67 if addr1 <= addr2 {
69 Self {
70 addr_a: addr1,
71 addr_b: addr2,
72 }
73 } else {
74 Self {
75 addr_a: addr2,
76 addr_b: addr1,
77 }
78 }
79 }
80
81 pub fn from_parts(ip1: &str, port1: u16, ip2: &str, port2: u16) -> Option<Self> {
84 let addr1: SocketAddr = format!("{}:{}", ip1, port1).parse().ok()?;
85 let addr2: SocketAddr = format!("{}:{}", ip2, port2).parse().ok()?;
86 Some(Self::new(addr1, addr2))
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ChannelInfo {
93 pub pv_name: String,
95 pub cid: u32,
97 pub sid: Option<u32>,
99 pub last_seen: Instant,
101 pub fully_established: bool,
103 pub update_times: VecDeque<Instant>,
104 pub recent_messages: VecDeque<String>,
105}
106
107impl ChannelInfo {
108 pub fn new_pending(cid: u32, pv_name: String) -> Self {
109 Self {
110 pv_name,
111 cid,
112 sid: None,
113 last_seen: Instant::now(),
114 fully_established: false,
115 update_times: VecDeque::new(),
116 recent_messages: VecDeque::new(),
117 }
118 }
119
120 pub fn touch(&mut self) {
121 self.last_seen = Instant::now();
122 }
123
124 pub fn is_expired(&self, ttl: Duration) -> bool {
125 self.last_seen.elapsed() > ttl
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct OperationState {
132 pub sid: u32,
134 pub ioid: u32,
136 pub command: u8,
138 pub pv_name: Option<String>,
140 pub field_desc: Option<StructureDesc>,
142 pub initialized: bool,
144 pub last_seen: Instant,
146 pub update_times: VecDeque<Instant>,
147 pub recent_messages: VecDeque<String>,
148}
149
150impl OperationState {
151 pub fn new(sid: u32, ioid: u32, command: u8, pv_name: Option<String>) -> Self {
152 Self {
153 sid,
154 ioid,
155 command,
156 pv_name,
157 field_desc: None,
158 initialized: false,
159 last_seen: Instant::now(),
160 update_times: VecDeque::new(),
161 recent_messages: VecDeque::new(),
162 }
163 }
164
165 pub fn touch(&mut self) {
166 self.last_seen = Instant::now();
167 }
168}
169
170#[derive(Debug)]
172pub struct ConnectionState {
173 pub channels_by_cid: HashMap<u32, ChannelInfo>,
175 pub sid_to_cid: HashMap<u32, u32>,
177 pub operations: HashMap<u32, OperationState>,
179 pub is_be: bool,
181 pub last_seen: Instant,
183 pub update_times: VecDeque<Instant>,
184 pub recent_messages: VecDeque<String>,
185}
186
187impl ConnectionState {
188 pub fn new() -> Self {
189 Self {
190 channels_by_cid: HashMap::new(),
191 sid_to_cid: HashMap::new(),
192 operations: HashMap::new(),
193 is_be: false, last_seen: Instant::now(),
195 update_times: VecDeque::new(),
196 recent_messages: VecDeque::new(),
197 }
198 }
199
200 pub fn touch(&mut self) {
201 self.last_seen = Instant::now();
202 }
203
204 pub fn get_channel_by_sid(&self, sid: u32) -> Option<&ChannelInfo> {
206 self.sid_to_cid
207 .get(&sid)
208 .and_then(|cid| self.channels_by_cid.get(cid))
209 }
210
211 pub fn get_channel_by_sid_mut(&mut self, sid: u32) -> Option<&mut ChannelInfo> {
213 if let Some(&cid) = self.sid_to_cid.get(&sid) {
214 self.channels_by_cid.get_mut(&cid)
215 } else {
216 None
217 }
218 }
219
220 pub fn get_pv_name_by_sid(&self, sid: u32) -> Option<&str> {
222 self.get_channel_by_sid(sid).map(|ch| ch.pv_name.as_str())
223 }
224
225 pub fn get_pv_name_by_ioid(&self, ioid: u32) -> Option<&str> {
227 self.operations
228 .get(&ioid)
229 .and_then(|op| op.pv_name.as_deref())
230 }
231}
232
233impl Default for ConnectionState {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239#[derive(Debug)]
241pub struct PvaStateTracker {
242 config: PvaStateConfig,
244 connections: HashMap<ConnectionKey, ConnectionState>,
246 total_channels: usize,
248 pub stats: PvaStateStats,
250 search_cache: HashMap<(IpAddr, u32), String>,
253 search_cache_flat: HashMap<u32, String>,
255}
256
257#[derive(Debug, Default, Clone)]
259pub struct PvaStateStats {
260 pub channels_created: u64,
261 pub channels_destroyed: u64,
262 pub channels_expired: u64,
263 pub channels_evicted: u64,
264 pub operations_created: u64,
265 pub operations_completed: u64,
266 pub create_channel_requests: u64,
267 pub create_channel_responses: u64,
268 pub search_responses_resolved: u64,
269 pub search_cache_entries: u64,
270 pub search_retroactive_resolves: u64,
271 pub client_messages: u64,
273 pub server_messages: u64,
275}
276
277#[derive(Debug, Clone)]
278pub struct ConnectionSnapshot {
279 pub addr_a: SocketAddr,
280 pub addr_b: SocketAddr,
281 pub channel_count: usize,
282 pub operation_count: usize,
283 pub last_seen: Duration,
284 pub pv_names: Vec<String>,
285 pub updates_per_sec: f64,
286 pub recent_messages: Vec<String>,
287 pub mid_stream: bool,
288 pub is_beacon: bool,
289 pub is_broadcast: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct ChannelSnapshot {
294 pub addr_a: SocketAddr,
295 pub addr_b: SocketAddr,
296 pub cid: u32,
297 pub sid: Option<u32>,
298 pub pv_name: String,
299 pub last_seen: Duration,
300 pub updates_per_sec: f64,
301 pub recent_messages: Vec<String>,
302 pub mid_stream: bool,
303 pub is_beacon: bool,
304 pub is_broadcast: bool,
305}
306
307impl PvaStateTracker {
308 fn is_broadcast_addr(addr: &SocketAddr) -> bool {
309 match addr.ip() {
310 std::net::IpAddr::V4(v4) => {
311 if v4.is_broadcast() {
312 return true;
313 }
314 v4.octets()[3] == 255
315 }
316 std::net::IpAddr::V6(v6) => {
317 v6.is_multicast()
319 }
320 }
321 }
322 pub fn new(config: PvaStateConfig) -> Self {
323 Self {
324 config,
325 connections: HashMap::new(),
326 total_channels: 0,
327 stats: PvaStateStats::default(),
328 search_cache: HashMap::new(),
329 search_cache_flat: HashMap::new(),
330 }
331 }
332
333 pub fn with_defaults() -> Self {
334 Self::new(PvaStateConfig::default())
335 }
336
337 fn get_or_create_connection(&mut self, key: &ConnectionKey) -> &mut ConnectionState {
339 if !self.connections.contains_key(key) {
340 self.connections.insert(key.clone(), ConnectionState::new());
341 }
342 self.connections.get_mut(key).unwrap()
343 }
344
345 pub fn get_connection(&self, key: &ConnectionKey) -> Option<&ConnectionState> {
347 self.connections.get(key)
348 }
349
350 pub fn get_pv_name_by_sid(&self, conn_key: &ConnectionKey, sid: u32) -> Option<String> {
352 self.connections
353 .get(conn_key)
354 .and_then(|conn| conn.get_pv_name_by_sid(sid))
355 .map(|s| s.to_string())
356 }
357
358 pub fn on_create_channel_request(
361 &mut self,
362 conn_key: &ConnectionKey,
363 cid: u32,
364 pv_name: String,
365 ) {
366 self.stats.create_channel_requests += 1;
367
368 let client_ip = conn_key.addr_a.ip(); self.search_cache.insert((client_ip, cid), pv_name.clone());
372 self.search_cache_flat.insert(cid, pv_name.clone());
373
374 if self.total_channels >= self.config.max_channels {
376 self.evict_oldest_channels(100); }
378
379 let conn = self.get_or_create_connection(conn_key);
380 conn.touch();
381
382 if !conn.channels_by_cid.contains_key(&cid) {
384 conn.channels_by_cid
385 .insert(cid, ChannelInfo::new_pending(cid, pv_name));
386 self.total_channels += 1;
387 self.stats.channels_created += 1;
388 debug!("CREATE_CHANNEL request: cid={}", cid);
389 }
390 }
391
392 pub fn on_create_channel_response(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
395 self.stats.create_channel_responses += 1;
396
397 let cached_pv_name = self
400 .search_cache
401 .get(&(conn_key.addr_a.ip(), cid))
402 .or_else(|| self.search_cache.get(&(conn_key.addr_b.ip(), cid)))
403 .or_else(|| self.search_cache_flat.get(&cid))
404 .cloned();
405
406 let conn = self.get_or_create_connection(conn_key);
407 conn.touch();
408
409 if let Some(channel) = conn.channels_by_cid.get_mut(&cid) {
410 channel.sid = Some(sid);
411 channel.fully_established = true;
412 channel.touch();
413 conn.sid_to_cid.insert(sid, cid);
414 debug!(
415 "CREATE_CHANNEL response: cid={}, sid={}, pv={}",
416 cid, sid, channel.pv_name
417 );
418 } else {
419 let pv_name = cached_pv_name.unwrap_or_else(|| format!("<unknown:cid={}>", cid));
421 let is_resolved = !pv_name.starts_with("<unknown");
422 debug!(
423 "CREATE_CHANNEL response without request: cid={}, sid={}, resolved={}",
424 cid, sid, is_resolved
425 );
426 let mut channel = ChannelInfo::new_pending(cid, pv_name);
427 channel.sid = Some(sid);
428 channel.fully_established = is_resolved;
429 conn.channels_by_cid.insert(cid, channel);
430 conn.sid_to_cid.insert(sid, cid);
431 self.total_channels += 1;
432 }
433 }
434
435 pub fn on_destroy_channel(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
437 if let Some(conn) = self.connections.get_mut(conn_key) {
438 conn.touch();
439
440 if conn.channels_by_cid.remove(&cid).is_some() {
442 self.total_channels = self.total_channels.saturating_sub(1);
443 self.stats.channels_destroyed += 1;
444 }
445
446 conn.sid_to_cid.remove(&sid);
448
449 conn.operations.retain(|_, op| op.sid != sid);
451
452 debug!("DESTROY_CHANNEL: cid={}, sid={}", cid, sid);
453 }
454 }
455
456 pub fn on_op_init_request(
459 &mut self,
460 conn_key: &ConnectionKey,
461 sid: u32,
462 ioid: u32,
463 command: u8,
464 ) {
465 let max_ops = self.config.max_operations;
466 let conn = self.get_or_create_connection(conn_key);
467 conn.touch();
468
469 let pv_name = conn.get_pv_name_by_sid(sid).map(|s| s.to_string());
470
471 if conn.operations.len() < max_ops {
472 conn.operations
473 .insert(ioid, OperationState::new(sid, ioid, command, pv_name));
474 self.stats.operations_created += 1;
475 debug!(
476 "Operation INIT: sid={}, ioid={}, cmd={}",
477 sid, ioid, command
478 );
479 }
480 }
481
482 pub fn on_op_init_response(
485 &mut self,
486 conn_key: &ConnectionKey,
487 ioid: u32,
488 field_desc: Option<StructureDesc>,
489 ) {
490 if let Some(conn) = self.connections.get_mut(conn_key) {
491 conn.touch();
492
493 if let Some(op) = conn.operations.get_mut(&ioid) {
494 op.field_desc = field_desc;
495 op.initialized = true;
496 op.touch();
497 debug!("Operation INIT response: ioid={}", ioid);
498 }
499 }
500 }
501
502 pub fn on_op_destroy(&mut self, conn_key: &ConnectionKey, ioid: u32) {
504 if let Some(conn) = self.connections.get_mut(conn_key) {
505 if conn.operations.remove(&ioid).is_some() {
506 self.stats.operations_completed += 1;
507 }
508 }
509 }
510
511 pub fn on_op_activity(&mut self, conn_key: &ConnectionKey, sid: u32, ioid: u32, command: u8) {
515 let max_update_rate = self.config.max_update_rate;
516 let max_ops = self.config.max_operations;
517 let mut created_placeholder = false;
518
519 let conn = self.get_or_create_connection(conn_key);
520 conn.touch();
521
522 Self::record_update(&mut conn.update_times, max_update_rate);
523
524 let mut channel_sid = if sid != 0 { Some(sid) } else { None };
525 if let Some(op) = conn.operations.get_mut(&ioid) {
526 op.touch();
527 Self::record_update(&mut op.update_times, max_update_rate);
528 if channel_sid.is_none() {
529 channel_sid = Some(op.sid);
530 }
531 } else if conn.operations.len() < max_ops {
532 let pv_name = if sid != 0 {
535 conn.get_pv_name_by_sid(sid).map(|s| s.to_string())
536 } else if conn.channels_by_cid.len() == 1 && conn.operations.is_empty() {
537 conn.channels_by_cid
542 .values()
543 .next()
544 .map(|ch| ch.pv_name.clone())
545 .filter(|n| !n.starts_with("<unknown"))
546 } else {
547 None
548 };
549 conn.operations
550 .insert(ioid, OperationState::new(sid, ioid, command, pv_name));
551 created_placeholder = true;
552 }
553
554 if let Some(sid_val) = channel_sid {
555 if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
556 channel.touch();
557 Self::record_update(&mut channel.update_times, max_update_rate);
558 }
559 }
560
561 if created_placeholder {
563 self.stats.operations_created += 1;
564 debug!(
565 "Auto-created placeholder operation for mid-stream traffic: sid={}, ioid={}, cmd={}",
566 sid, ioid, command
567 );
568 }
569 }
570
571 pub fn on_search(&mut self, pv_requests: &[(u32, String)], source_ip: Option<IpAddr>) {
577 let cid_to_pv: HashMap<u32, String> = pv_requests.iter().cloned().collect();
579
580 for (cid, pv_name) in pv_requests {
581 if let Some(ip) = source_ip {
582 self.search_cache.insert((ip, *cid), pv_name.clone());
583 }
584 self.search_cache_flat.insert(*cid, pv_name.clone());
586 }
587
588 let mut retroactive_count: u64 = 0;
592 for conn in self.connections.values_mut() {
593 for (cid, channel) in conn.channels_by_cid.iter_mut() {
594 if channel.pv_name.starts_with("<unknown") {
595 if let Some(pv_name) = cid_to_pv.get(cid) {
596 debug!(
597 "Retroactive PV resolve from SEARCH: cid={} {} -> {}",
598 cid, channel.pv_name, pv_name
599 );
600 channel.pv_name = pv_name.clone();
601 channel.fully_established = true;
602 retroactive_count += 1;
603 }
604 }
605 }
606
607 for op in conn.operations.values_mut() {
610 let needs_update = match &op.pv_name {
611 None => true,
612 Some(name) => name.starts_with("<unknown"),
613 };
614 if needs_update && op.sid != 0 {
615 if let Some(&cid) = conn.sid_to_cid.get(&op.sid) {
616 if let Some(pv_name) = cid_to_pv.get(&cid) {
617 op.pv_name = Some(pv_name.clone());
618 }
619 }
620 }
621 }
622 }
623 if retroactive_count > 0 {
624 self.stats.search_retroactive_resolves += retroactive_count;
625 debug!(
626 "Retroactively resolved {} unknown channels from SEARCH cache",
627 retroactive_count
628 );
629 }
630
631 self.stats.search_cache_entries = self.search_cache_flat.len() as u64;
633
634 while self.search_cache.len() > 50_000 {
636 if let Some(key) = self.search_cache.keys().next().cloned() {
637 self.search_cache.remove(&key);
638 }
639 }
640 while self.search_cache_flat.len() > 50_000 {
641 if let Some(key) = self.search_cache_flat.keys().next().cloned() {
642 self.search_cache_flat.remove(&key);
643 }
644 }
645 }
646
647 pub fn resolve_search_cids(
652 &mut self,
653 cids: &[u32],
654 peer_ip: Option<IpAddr>,
655 ) -> Vec<(u32, String)> {
656 let mut resolved = Vec::new();
657 for &cid in cids {
658 let pv_name = peer_ip
661 .and_then(|ip| self.search_cache.get(&(ip, cid)))
662 .or_else(|| self.search_cache_flat.get(&cid))
663 .cloned();
664 if let Some(name) = pv_name {
665 resolved.push((cid, name));
666 self.stats.search_responses_resolved += 1;
667 }
668 }
669 resolved
670 }
671
672 pub fn count_direction(&mut self, is_server: bool) {
674 if is_server {
675 self.stats.server_messages += 1;
676 } else {
677 self.stats.client_messages += 1;
678 }
679 }
680
681 pub fn on_message(
682 &mut self,
683 conn_key: &ConnectionKey,
684 sid: u32,
685 ioid: u32,
686 request_type: &str,
687 message: String,
688 is_server: bool,
689 ) {
690 let conn = self.get_or_create_connection(conn_key);
691 conn.touch();
692 let dir = if is_server { "S>" } else { "C>" };
693 let full_message = format!("{} {} {}", dir, request_type, message);
694 Self::push_message(&mut conn.recent_messages, full_message.clone());
695
696 let mut channel_sid = if sid != 0 { Some(sid) } else { None };
697 if let Some(op) = conn.operations.get_mut(&ioid) {
698 Self::push_message(&mut op.recent_messages, full_message.clone());
699 if channel_sid.is_none() {
700 channel_sid = Some(op.sid);
701 }
702 }
703 if let Some(sid_val) = channel_sid {
704 if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
705 Self::push_message(&mut channel.recent_messages, full_message);
706 }
707 }
708 }
709
710 fn record_update(times: &mut VecDeque<Instant>, max_update_rate: usize) {
711 let now = Instant::now();
712 times.push_back(now);
713 Self::trim_times(times, now);
714 while times.len() > max_update_rate {
715 times.pop_front();
716 }
717 }
718
719 fn trim_times(times: &mut VecDeque<Instant>, now: Instant) {
720 while let Some(front) = times.front() {
721 if now.duration_since(*front) > Duration::from_secs(1) {
722 times.pop_front();
723 } else {
724 break;
725 }
726 }
727 }
728
729 fn updates_per_sec(times: &VecDeque<Instant>) -> f64 {
730 times.len() as f64
731 }
732
733 fn push_message(messages: &mut VecDeque<String>, message: String) {
734 messages.push_back(message);
735 while messages.len() > 30 {
736 messages.pop_front();
737 }
738 }
739
740 pub fn resolve_pv_name(&self, conn_key: &ConnectionKey, sid: u32, ioid: u32) -> Option<String> {
742 let conn = self.connections.get(conn_key)?;
743
744 if let Some(op) = conn.operations.get(&ioid) {
746 if let Some(ref name) = op.pv_name {
747 if !name.starts_with("<unknown") {
748 return Some(name.clone());
749 }
750 }
751 }
752
753 if sid != 0 {
755 if let Some(name) = conn.get_pv_name_by_sid(sid) {
756 return Some(name.to_string());
757 }
758 }
759
760 if conn.channels_by_cid.len() == 1 && conn.operations.len() <= 1 {
770 if let Some(ch) = conn.channels_by_cid.values().next() {
771 if !ch.pv_name.starts_with("<unknown") {
772 return Some(ch.pv_name.clone());
773 }
774 }
775 }
776
777 None
778 }
779
780 pub fn active_channel_count(&self) -> usize {
782 self.total_channels
783 }
784
785 pub fn active_connection_count(&self) -> usize {
787 self.connections.len()
788 }
789
790 pub fn is_connection_mid_stream(&self, conn_key: &ConnectionKey) -> bool {
792 self.connections
793 .get(conn_key)
794 .map(|conn| {
795 if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
797 return true;
798 }
799 conn.channels_by_cid
801 .values()
802 .any(|ch| !ch.fully_established)
803 })
804 .unwrap_or(false)
805 }
806
807 pub fn get_operation(&self, conn_key: &ConnectionKey, ioid: u32) -> Option<&OperationState> {
809 self.connections
810 .get(conn_key)
811 .and_then(|conn| conn.operations.get(&ioid))
812 }
813
814 fn evict_oldest_channels(&mut self, count: usize) {
816 let mut oldest: Vec<(ConnectionKey, u32, Instant)> = Vec::new();
817
818 for (conn_key, conn) in &self.connections {
819 for (cid, channel) in &conn.channels_by_cid {
820 oldest.push((conn_key.clone(), *cid, channel.last_seen));
821 }
822 }
823
824 oldest.sort_by_key(|(_, _, t)| *t);
826
827 for (conn_key, cid, _) in oldest.into_iter().take(count) {
829 if let Some(conn) = self.connections.get_mut(&conn_key) {
830 if let Some(channel) = conn.channels_by_cid.remove(&cid) {
831 if let Some(sid) = channel.sid {
832 conn.sid_to_cid.remove(&sid);
833 }
834 self.total_channels = self.total_channels.saturating_sub(1);
835 self.stats.channels_evicted += 1;
836 }
837 }
838 }
839 }
840
841 pub fn cleanup_expired(&mut self) {
843 let ttl = self.config.channel_ttl;
844 let mut expired_count = 0;
845
846 for conn in self.connections.values_mut() {
847 let expired_cids: Vec<u32> = conn
848 .channels_by_cid
849 .iter()
850 .filter(|(_, ch)| ch.is_expired(ttl))
851 .map(|(cid, _)| *cid)
852 .collect();
853
854 for cid in expired_cids {
855 if let Some(channel) = conn.channels_by_cid.remove(&cid) {
856 if let Some(sid) = channel.sid {
857 conn.sid_to_cid.remove(&sid);
858 conn.operations.retain(|_, op| op.sid != sid);
859 }
860 expired_count += 1;
861 }
862 }
863 }
864
865 if expired_count > 0 {
866 self.total_channels = self.total_channels.saturating_sub(expired_count);
867 self.stats.channels_expired += expired_count as u64;
868 debug!("Cleaned up {} expired channels", expired_count);
869 }
870
871 self.connections
873 .retain(|_, conn| !conn.channels_by_cid.is_empty() || !conn.operations.is_empty());
874 }
875
876 pub fn summary(&self) -> String {
878 format!(
879 "PVA State: {} connections, {} channels (created={}, destroyed={}, expired={}, evicted={})",
880 self.connections.len(),
881 self.total_channels,
882 self.stats.channels_created,
883 self.stats.channels_destroyed,
884 self.stats.channels_expired,
885 self.stats.channels_evicted,
886 )
887 }
888
889 pub fn channel_count(&self) -> usize {
891 self.total_channels
892 }
893
894 pub fn connection_count(&self) -> usize {
896 self.connections.len()
897 }
898
899 pub fn connection_snapshots(&self) -> Vec<ConnectionSnapshot> {
900 let mut snapshots = Vec::new();
901 let now = Instant::now();
902 for (conn_key, conn) in &self.connections {
903 let mut update_times = conn.update_times.clone();
904 Self::trim_times(&mut update_times, now);
905 let mut pv_names: Vec<String> = conn
906 .channels_by_cid
907 .values()
908 .map(|ch| ch.pv_name.clone())
909 .collect();
910 pv_names.sort();
911 pv_names.truncate(8);
912 let mut messages: Vec<String> = conn.recent_messages.iter().cloned().collect();
913 if messages.len() > 20 {
914 messages = messages.split_off(messages.len() - 20);
915 }
916 let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
917 let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
918 || Self::is_broadcast_addr(&conn_key.addr_b);
919 let mut mid_stream = false;
920 if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
921 mid_stream = true;
922 }
923 if conn
924 .channels_by_cid
925 .values()
926 .any(|ch| !ch.fully_established || ch.pv_name.starts_with("<unknown"))
927 {
928 mid_stream = true;
929 }
930
931 snapshots.push(ConnectionSnapshot {
932 addr_a: conn_key.addr_a,
933 addr_b: conn_key.addr_b,
934 channel_count: conn.channels_by_cid.len(),
935 operation_count: conn.operations.len(),
936 last_seen: conn.last_seen.elapsed(),
937 pv_names,
938 updates_per_sec: Self::updates_per_sec(&update_times),
939 recent_messages: messages,
940 mid_stream,
941 is_beacon,
942 is_broadcast,
943 });
944 }
945 snapshots
946 }
947
948 pub fn channel_snapshots(&self) -> Vec<ChannelSnapshot> {
949 let mut snapshots = Vec::new();
950 let now = Instant::now();
951 for (conn_key, conn) in &self.connections {
952 for channel in conn.channels_by_cid.values() {
953 let mut update_times = channel.update_times.clone();
954 Self::trim_times(&mut update_times, now);
955 let mut messages: Vec<String> = channel.recent_messages.iter().cloned().collect();
956 if messages.len() > 20 {
957 messages = messages.split_off(messages.len() - 20);
958 }
959 let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
960 let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
961 || Self::is_broadcast_addr(&conn_key.addr_b);
962 snapshots.push(ChannelSnapshot {
963 addr_a: conn_key.addr_a,
964 addr_b: conn_key.addr_b,
965 cid: channel.cid,
966 sid: channel.sid,
967 pv_name: channel.pv_name.clone(),
968 last_seen: channel.last_seen.elapsed(),
969 updates_per_sec: Self::updates_per_sec(&update_times),
970 recent_messages: messages,
971 mid_stream: !channel.fully_established
972 || channel.pv_name.starts_with("<unknown"),
973 is_beacon,
974 is_broadcast,
975 });
976 }
977
978 let mut seen_virtual = HashSet::new();
981 for op in conn.operations.values() {
982 if conn.get_channel_by_sid(op.sid).is_none() {
983 let mut update_times = op.update_times.clone();
984 Self::trim_times(&mut update_times, now);
985 let mut messages: Vec<String> = op.recent_messages.iter().cloned().collect();
986 if messages.len() > 20 {
987 messages = messages.split_off(messages.len() - 20);
988 }
989 let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
990 let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
991 || Self::is_broadcast_addr(&conn_key.addr_b);
992 let pv_name = op
993 .pv_name
994 .clone()
995 .unwrap_or_else(|| format!("<unknown:sid={}>", op.sid));
996 if !seen_virtual.insert((op.sid, pv_name.clone())) {
997 continue;
998 }
999 snapshots.push(ChannelSnapshot {
1000 addr_a: conn_key.addr_a,
1001 addr_b: conn_key.addr_b,
1002 cid: 0,
1003 sid: Some(op.sid),
1004 pv_name,
1005 last_seen: op.last_seen.elapsed(),
1006 updates_per_sec: Self::updates_per_sec(&update_times),
1007 recent_messages: messages,
1008 mid_stream: true,
1009 is_beacon,
1010 is_broadcast,
1011 });
1012 }
1013 }
1014 }
1015 snapshots
1016 }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use super::*;
1022
1023 fn test_conn_key() -> ConnectionKey {
1024 ConnectionKey::from_parts("192.168.1.1", 12345, "192.168.1.2", 5075).unwrap()
1025 }
1026
1027 #[test]
1028 fn test_create_channel_flow() {
1029 let mut tracker = PvaStateTracker::with_defaults();
1030 let key = test_conn_key();
1031
1032 tracker.on_create_channel_request(&key, 1, "TEST:PV:VALUE".to_string());
1034 assert_eq!(tracker.channel_count(), 1);
1035
1036 tracker.on_create_channel_response(&key, 1, 100);
1038
1039 let pv_name = tracker.resolve_pv_name(&key, 100, 0);
1041 assert_eq!(pv_name, Some("TEST:PV:VALUE".to_string()));
1042 }
1043
1044 #[test]
1045 fn test_channel_limit() {
1046 let config = PvaStateConfig::new(100, 300);
1047 let mut tracker = PvaStateTracker::new(config);
1048 let key = test_conn_key();
1049
1050 for i in 0..150 {
1052 tracker.on_create_channel_request(&key, i, format!("PV:{}", i));
1053 }
1054
1055 assert!(tracker.channel_count() <= 100);
1057 }
1058
1059 #[test]
1060 fn test_destroy_channel() {
1061 let mut tracker = PvaStateTracker::with_defaults();
1062 let key = test_conn_key();
1063
1064 tracker.on_create_channel_request(&key, 1, "TEST:PV".to_string());
1065 tracker.on_create_channel_response(&key, 1, 100);
1066 assert_eq!(tracker.channel_count(), 1);
1067
1068 tracker.on_destroy_channel(&key, 1, 100);
1069 assert_eq!(tracker.channel_count(), 0);
1070 }
1071
1072 #[test]
1073 fn test_channel_snapshots_dedup_unresolved_sid_rows() {
1074 let mut tracker = PvaStateTracker::with_defaults();
1075 let key = test_conn_key();
1076
1077 tracker.on_op_init_request(&key, 777, 1001, 13);
1079 tracker.on_op_init_request(&key, 777, 1002, 13);
1080 tracker.on_op_activity(&key, 777, 1001, 13);
1081 tracker.on_op_activity(&key, 777, 1002, 13);
1082
1083 let snapshots = tracker.channel_snapshots();
1084 assert_eq!(snapshots.len(), 1);
1085 assert_eq!(snapshots[0].sid, Some(777));
1086 }
1087
1088 #[test]
1089 fn test_single_channel_fallback_works_for_simple_connection() {
1090 let mut tracker = PvaStateTracker::with_defaults();
1093 let key = test_conn_key();
1094
1095 tracker.on_create_channel_request(&key, 1, "SIMPLE:PV".to_string());
1096 tracker.on_create_channel_response(&key, 1, 100);
1097
1098 let pv = tracker.resolve_pv_name(&key, 0, 99);
1100 assert_eq!(pv, Some("SIMPLE:PV".to_string()));
1101 }
1102
1103 #[test]
1104 fn test_no_false_attribution_on_multiplexed_connection() {
1105 let mut tracker = PvaStateTracker::with_defaults();
1110 let key = test_conn_key();
1111
1112 tracker.on_create_channel_request(&key, 1, "CAPTURED:PV".to_string());
1114 tracker.on_create_channel_response(&key, 1, 100);
1115
1116 tracker.on_op_init_request(&key, 100, 1, 13); for ioid in 2..=10 {
1122 tracker.on_op_activity(&key, 0, ioid, 13);
1123 }
1124
1125 let pv1 = tracker.resolve_pv_name(&key, 100, 1);
1127 assert_eq!(pv1, Some("CAPTURED:PV".to_string()));
1128
1129 for ioid in 2..=10 {
1131 let pv = tracker.resolve_pv_name(&key, 0, ioid);
1132 assert_eq!(
1133 pv, None,
1134 "ioid={} should not resolve to the single captured channel",
1135 ioid
1136 );
1137 }
1138 }
1139
1140 #[test]
1141 fn test_on_op_activity_placeholder_not_created_for_multiplexed() {
1142 let mut tracker = PvaStateTracker::with_defaults();
1146 let key = test_conn_key();
1147
1148 tracker.on_create_channel_request(&key, 1, "KNOWN:PV".to_string());
1149 tracker.on_create_channel_response(&key, 1, 100);
1150
1151 tracker.on_op_init_request(&key, 100, 1, 13);
1153
1154 tracker.on_op_activity(&key, 0, 2, 13);
1156
1157 let pv = tracker.resolve_pv_name(&key, 0, 2);
1158 assert_eq!(
1159 pv, None,
1160 "placeholder for ioid=2 should not inherit PV from single-channel fallback"
1161 );
1162 }
1163
1164 #[test]
1165 fn test_search_cache_populates_and_resolves() {
1166 let mut tracker = PvaStateTracker::with_defaults();
1167 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1168
1169 let pv_requests = vec![
1171 (100, "MOTOR:X:POSITION".to_string()),
1172 (101, "MOTOR:Y:POSITION".to_string()),
1173 (102, "TEMP:SENSOR:1".to_string()),
1174 ];
1175 tracker.on_search(&pv_requests, Some(client_ip));
1176
1177 let resolved = tracker.resolve_search_cids(&[100, 101, 102], Some(client_ip));
1179 assert_eq!(resolved.len(), 3);
1180 assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1181 assert_eq!(resolved[1], (101, "MOTOR:Y:POSITION".to_string()));
1182 assert_eq!(resolved[2], (102, "TEMP:SENSOR:1".to_string()));
1183 }
1184
1185 #[test]
1186 fn test_search_cache_partial_resolve() {
1187 let mut tracker = PvaStateTracker::with_defaults();
1188 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1189
1190 let pv_requests = vec![(100, "MOTOR:X:POSITION".to_string())];
1191 tracker.on_search(&pv_requests, Some(client_ip));
1192
1193 let resolved = tracker.resolve_search_cids(&[100, 999], Some(client_ip));
1195 assert_eq!(resolved.len(), 1);
1196 assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1197 }
1198
1199 #[test]
1200 fn test_search_cache_scoped_by_ip() {
1201 let mut tracker = PvaStateTracker::with_defaults();
1202 let client_a: IpAddr = "192.168.1.10".parse().unwrap();
1203 let client_b: IpAddr = "192.168.1.20".parse().unwrap();
1204
1205 tracker.on_search(&[(1, "CLIENT_A:PV".to_string())], Some(client_a));
1207 tracker.on_search(&[(1, "CLIENT_B:PV".to_string())], Some(client_b));
1208
1209 let resolved_a = tracker.resolve_search_cids(&[1], Some(client_a));
1211 assert_eq!(resolved_a.len(), 1);
1212 assert_eq!(resolved_a[0].1, "CLIENT_A:PV");
1213
1214 let resolved_b = tracker.resolve_search_cids(&[1], Some(client_b));
1215 assert_eq!(resolved_b.len(), 1);
1216 assert_eq!(resolved_b[0].1, "CLIENT_B:PV");
1217 }
1218
1219 #[test]
1220 fn test_search_cache_flat_fallback() {
1221 let mut tracker = PvaStateTracker::with_defaults();
1222 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1223
1224 tracker.on_search(&[(42, "SOME:PV:NAME".to_string())], Some(client_ip));
1226
1227 let resolved = tracker.resolve_search_cids(&[42], None);
1229 assert_eq!(resolved.len(), 1);
1230 assert_eq!(resolved[0].1, "SOME:PV:NAME");
1231 }
1232
1233 #[test]
1234 fn test_search_cache_used_by_create_channel_response_fallback() {
1235 let mut tracker = PvaStateTracker::with_defaults();
1238 let key = test_conn_key();
1239 let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1240
1241 tracker.on_search(&[(5, "SEARCHED:PV".to_string())], Some(client_ip));
1243
1244 tracker.on_create_channel_response(&key, 5, 200);
1246
1247 let pv = tracker.resolve_pv_name(&key, 200, 0);
1249 assert_eq!(pv, Some("SEARCHED:PV".to_string()));
1250 }
1251
1252 #[test]
1253 fn test_search_responses_resolved_stat() {
1254 let mut tracker = PvaStateTracker::with_defaults();
1255 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1256
1257 tracker.on_search(
1258 &[(1, "PV:A".to_string()), (2, "PV:B".to_string())],
1259 Some(client_ip),
1260 );
1261
1262 assert_eq!(tracker.stats.search_responses_resolved, 0);
1263
1264 tracker.resolve_search_cids(&[1, 2], Some(client_ip));
1265 assert_eq!(tracker.stats.search_responses_resolved, 2);
1266
1267 tracker.resolve_search_cids(&[1], Some(client_ip));
1269 assert_eq!(tracker.stats.search_responses_resolved, 3);
1270 }
1271
1272 #[test]
1273 fn test_retroactive_resolve_unknown_channels_from_search() {
1274 let mut tracker = PvaStateTracker::with_defaults();
1279 let key = test_conn_key();
1280
1281 tracker.on_create_channel_response(&key, 100, 500);
1283 tracker.on_create_channel_response(&key, 101, 501);
1284 tracker.on_create_channel_response(&key, 102, 502);
1285
1286 assert_eq!(
1288 tracker.resolve_pv_name(&key, 500, 0),
1289 Some("<unknown:cid=100>".to_string())
1290 );
1291 assert_eq!(
1292 tracker.resolve_pv_name(&key, 501, 0),
1293 Some("<unknown:cid=101>".to_string())
1294 );
1295
1296 let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1298 tracker.on_search(
1299 &[
1300 (100, "MOTOR:X:POS".to_string()),
1301 (101, "MOTOR:Y:POS".to_string()),
1302 (102, "TEMP:SENSOR:1".to_string()),
1303 ],
1304 Some(client_ip),
1305 );
1306
1307 assert_eq!(
1309 tracker.resolve_pv_name(&key, 500, 0),
1310 Some("MOTOR:X:POS".to_string())
1311 );
1312 assert_eq!(
1313 tracker.resolve_pv_name(&key, 501, 0),
1314 Some("MOTOR:Y:POS".to_string())
1315 );
1316 assert_eq!(
1317 tracker.resolve_pv_name(&key, 502, 0),
1318 Some("TEMP:SENSOR:1".to_string())
1319 );
1320
1321 assert_eq!(tracker.stats.search_retroactive_resolves, 3);
1323 }
1324
1325 #[test]
1326 fn test_retroactive_resolve_also_updates_operations() {
1327 let mut tracker = PvaStateTracker::with_defaults();
1331 let key = test_conn_key();
1332
1333 tracker.on_create_channel_response(&key, 100, 500);
1335
1336 tracker.on_op_init_request(&key, 500, 1, 13); let pv = tracker.resolve_pv_name(&key, 500, 1);
1342 assert!(pv.is_some());
1343 let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1347 tracker.on_search(&[(100, "RESOLVED:PV".to_string())], Some(client_ip));
1348
1349 assert_eq!(
1351 tracker.resolve_pv_name(&key, 500, 0),
1352 Some("RESOLVED:PV".to_string())
1353 );
1354 let pv = tracker.resolve_pv_name(&key, 500, 1);
1356 assert_eq!(pv, Some("RESOLVED:PV".to_string()));
1357 }
1358}