1use std::collections::HashMap;
7use std::collections::HashSet;
8use std::collections::VecDeque;
9use std::net::{IpAddr, SocketAddr};
10use std::time::{Duration, Instant};
11use tracing::debug;
12
13use crate::spvd_decode::StructureDesc;
14
15#[derive(Debug, Clone)]
17pub struct PvaStateConfig {
18 pub max_channels: usize,
20 pub channel_ttl: Duration,
22 pub max_operations: usize,
24 pub max_update_rate: usize,
26}
27
28impl Default for PvaStateConfig {
29 fn default() -> Self {
30 Self {
31 max_channels: 40_000,
32 channel_ttl: Duration::from_secs(5 * 60), max_operations: 10_000,
34 max_update_rate: 10_000,
35 }
36 }
37}
38
39impl PvaStateConfig {
40 pub fn new(max_channels: usize, ttl_secs: u64) -> Self {
41 Self {
42 max_channels,
43 channel_ttl: Duration::from_secs(ttl_secs),
44 max_operations: 10_000,
45 max_update_rate: 10_000,
46 }
47 }
48
49 pub fn with_max_update_rate(mut self, max_update_rate: usize) -> Self {
50 self.max_update_rate = max_update_rate;
51 self
52 }
53}
54
55#[derive(Debug, Clone, Hash, PartialEq, Eq)]
57pub struct ConnectionKey {
58 pub addr_a: SocketAddr,
60 pub addr_b: SocketAddr,
62}
63
64impl ConnectionKey {
65 pub fn new(addr1: SocketAddr, addr2: SocketAddr) -> Self {
67 if addr1 <= addr2 {
69 Self {
70 addr_a: addr1,
71 addr_b: addr2,
72 }
73 } else {
74 Self {
75 addr_a: addr2,
76 addr_b: addr1,
77 }
78 }
79 }
80
81 pub fn from_parts(ip1: &str, port1: u16, ip2: &str, port2: u16) -> Option<Self> {
84 let addr1: SocketAddr = format!("{}:{}", ip1, port1).parse().ok()?;
85 let addr2: SocketAddr = format!("{}:{}", ip2, port2).parse().ok()?;
86 Some(Self::new(addr1, addr2))
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct ChannelInfo {
93 pub pv_name: String,
95 pub cid: u32,
97 pub sid: Option<u32>,
99 pub last_seen: Instant,
101 pub fully_established: bool,
103 pub update_times: VecDeque<Instant>,
104 pub recent_messages: VecDeque<String>,
105}
106
107impl ChannelInfo {
108 pub fn new_pending(cid: u32, pv_name: String) -> Self {
109 Self {
110 pv_name,
111 cid,
112 sid: None,
113 last_seen: Instant::now(),
114 fully_established: false,
115 update_times: VecDeque::new(),
116 recent_messages: VecDeque::new(),
117 }
118 }
119
120 pub fn touch(&mut self) {
121 self.last_seen = Instant::now();
122 }
123
124 pub fn is_expired(&self, ttl: Duration) -> bool {
125 self.last_seen.elapsed() > ttl
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct OperationState {
132 pub sid: u32,
134 pub ioid: u32,
136 pub command: u8,
138 pub pv_name: Option<String>,
140 pub field_desc: Option<StructureDesc>,
142 pub initialized: bool,
144 pub last_seen: Instant,
146 pub update_times: VecDeque<Instant>,
147 pub recent_messages: VecDeque<String>,
148}
149
150impl OperationState {
151 pub fn new(sid: u32, ioid: u32, command: u8, pv_name: Option<String>) -> Self {
152 Self {
153 sid,
154 ioid,
155 command,
156 pv_name,
157 field_desc: None,
158 initialized: false,
159 last_seen: Instant::now(),
160 update_times: VecDeque::new(),
161 recent_messages: VecDeque::new(),
162 }
163 }
164
165 pub fn touch(&mut self) {
166 self.last_seen = Instant::now();
167 }
168}
169
170#[derive(Debug)]
172pub struct ConnectionState {
173 pub channels_by_cid: HashMap<u32, ChannelInfo>,
175 pub sid_to_cid: HashMap<u32, u32>,
177 pub operations: HashMap<u32, OperationState>,
179 pub is_be: bool,
181 pub last_seen: Instant,
183 pub update_times: VecDeque<Instant>,
184 pub recent_messages: VecDeque<String>,
185}
186
187impl ConnectionState {
188 pub fn new() -> Self {
189 Self {
190 channels_by_cid: HashMap::new(),
191 sid_to_cid: HashMap::new(),
192 operations: HashMap::new(),
193 is_be: false, last_seen: Instant::now(),
195 update_times: VecDeque::new(),
196 recent_messages: VecDeque::new(),
197 }
198 }
199
200 pub fn touch(&mut self) {
201 self.last_seen = Instant::now();
202 }
203
204 pub fn get_channel_by_sid(&self, sid: u32) -> Option<&ChannelInfo> {
206 self.sid_to_cid
207 .get(&sid)
208 .and_then(|cid| self.channels_by_cid.get(cid))
209 }
210
211 pub fn get_channel_by_sid_mut(&mut self, sid: u32) -> Option<&mut ChannelInfo> {
213 if let Some(&cid) = self.sid_to_cid.get(&sid) {
214 self.channels_by_cid.get_mut(&cid)
215 } else {
216 None
217 }
218 }
219
220 pub fn get_pv_name_by_sid(&self, sid: u32) -> Option<&str> {
222 self.get_channel_by_sid(sid).map(|ch| ch.pv_name.as_str())
223 }
224
225 pub fn get_pv_name_by_ioid(&self, ioid: u32) -> Option<&str> {
227 self.operations
228 .get(&ioid)
229 .and_then(|op| op.pv_name.as_deref())
230 }
231}
232
233impl Default for ConnectionState {
234 fn default() -> Self {
235 Self::new()
236 }
237}
238
239#[derive(Debug)]
241pub struct PvaStateTracker {
242 config: PvaStateConfig,
244 connections: HashMap<ConnectionKey, ConnectionState>,
246 total_channels: usize,
248 pub stats: PvaStateStats,
250 search_cache: HashMap<(IpAddr, u32), String>,
253 search_cache_flat: HashMap<u32, String>,
255}
256
257#[derive(Debug, Default, Clone)]
259pub struct PvaStateStats {
260 pub channels_created: u64,
261 pub channels_destroyed: u64,
262 pub channels_expired: u64,
263 pub channels_evicted: u64,
264 pub operations_created: u64,
265 pub operations_completed: u64,
266 pub create_channel_requests: u64,
267 pub create_channel_responses: u64,
268 pub search_responses_resolved: u64,
269 pub search_cache_entries: u64,
270 pub search_retroactive_resolves: u64,
271 pub client_messages: u64,
273 pub server_messages: u64,
275}
276
277#[derive(Debug, Clone)]
278pub struct ConnectionSnapshot {
279 pub addr_a: SocketAddr,
280 pub addr_b: SocketAddr,
281 pub channel_count: usize,
282 pub operation_count: usize,
283 pub last_seen: Duration,
284 pub pv_names: Vec<String>,
285 pub updates_per_sec: f64,
286 pub recent_messages: Vec<String>,
287 pub mid_stream: bool,
288 pub is_beacon: bool,
289 pub is_broadcast: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct ChannelSnapshot {
294 pub addr_a: SocketAddr,
295 pub addr_b: SocketAddr,
296 pub cid: u32,
297 pub sid: Option<u32>,
298 pub pv_name: String,
299 pub last_seen: Duration,
300 pub updates_per_sec: f64,
301 pub recent_messages: Vec<String>,
302 pub mid_stream: bool,
303 pub is_beacon: bool,
304 pub is_broadcast: bool,
305}
306
307impl PvaStateTracker {
308 fn is_broadcast_addr(addr: &SocketAddr) -> bool {
309 match addr.ip() {
310 std::net::IpAddr::V4(v4) => {
311 if v4.is_broadcast() {
312 return true;
313 }
314 v4.octets()[3] == 255
315 }
316 std::net::IpAddr::V6(v6) => {
317 v6.is_multicast()
319 }
320 }
321 }
322 pub fn new(config: PvaStateConfig) -> Self {
323 Self {
324 config,
325 connections: HashMap::new(),
326 total_channels: 0,
327 stats: PvaStateStats::default(),
328 search_cache: HashMap::new(),
329 search_cache_flat: HashMap::new(),
330 }
331 }
332
333 pub fn with_defaults() -> Self {
334 Self::new(PvaStateConfig::default())
335 }
336
337 fn get_or_create_connection(&mut self, key: &ConnectionKey) -> &mut ConnectionState {
339 if !self.connections.contains_key(key) {
340 self.connections.insert(key.clone(), ConnectionState::new());
341 }
342 self.connections.get_mut(key).unwrap()
343 }
344
345 pub fn get_connection(&self, key: &ConnectionKey) -> Option<&ConnectionState> {
347 self.connections.get(key)
348 }
349
350 pub fn get_pv_name_by_sid(&self, conn_key: &ConnectionKey, sid: u32) -> Option<String> {
352 self.connections
353 .get(conn_key)
354 .and_then(|conn| conn.get_pv_name_by_sid(sid))
355 .map(|s| s.to_string())
356 }
357
358 pub fn on_create_channel_request(
361 &mut self,
362 conn_key: &ConnectionKey,
363 cid: u32,
364 pv_name: String,
365 ) {
366 self.stats.create_channel_requests += 1;
367
368 let client_ip = conn_key.addr_a.ip(); self.search_cache.insert((client_ip, cid), pv_name.clone());
372 self.search_cache_flat.insert(cid, pv_name.clone());
373
374 if self.total_channels >= self.config.max_channels {
376 self.evict_oldest_channels(100); }
378
379 let conn = self.get_or_create_connection(conn_key);
380 conn.touch();
381
382 if !conn.channels_by_cid.contains_key(&cid) {
384 conn.channels_by_cid
385 .insert(cid, ChannelInfo::new_pending(cid, pv_name));
386 self.total_channels += 1;
387 self.stats.channels_created += 1;
388 debug!("CREATE_CHANNEL request: cid={}", cid);
389 }
390 }
391
392 pub fn on_create_channel_response(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
395 self.stats.create_channel_responses += 1;
396
397 let cached_pv_name = self
400 .search_cache
401 .get(&(conn_key.addr_a.ip(), cid))
402 .or_else(|| self.search_cache.get(&(conn_key.addr_b.ip(), cid)))
403 .or_else(|| self.search_cache_flat.get(&cid))
404 .cloned();
405
406 let conn = self.get_or_create_connection(conn_key);
407 conn.touch();
408
409 if let Some(channel) = conn.channels_by_cid.get_mut(&cid) {
410 channel.sid = Some(sid);
411 channel.fully_established = true;
412 channel.touch();
413 conn.sid_to_cid.insert(sid, cid);
414 debug!(
415 "CREATE_CHANNEL response: cid={}, sid={}, pv={}",
416 cid, sid, channel.pv_name
417 );
418 } else {
419 let pv_name = cached_pv_name
421 .unwrap_or_else(|| format!("<unknown:cid={}>", cid));
422 let is_resolved = !pv_name.starts_with("<unknown");
423 debug!(
424 "CREATE_CHANNEL response without request: cid={}, sid={}, resolved={}",
425 cid, sid, is_resolved
426 );
427 let mut channel = ChannelInfo::new_pending(cid, pv_name);
428 channel.sid = Some(sid);
429 channel.fully_established = is_resolved;
430 conn.channels_by_cid.insert(cid, channel);
431 conn.sid_to_cid.insert(sid, cid);
432 self.total_channels += 1;
433 }
434 }
435
436 pub fn on_destroy_channel(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
438 if let Some(conn) = self.connections.get_mut(conn_key) {
439 conn.touch();
440
441 if conn.channels_by_cid.remove(&cid).is_some() {
443 self.total_channels = self.total_channels.saturating_sub(1);
444 self.stats.channels_destroyed += 1;
445 }
446
447 conn.sid_to_cid.remove(&sid);
449
450 conn.operations.retain(|_, op| op.sid != sid);
452
453 debug!("DESTROY_CHANNEL: cid={}, sid={}", cid, sid);
454 }
455 }
456
457 pub fn on_op_init_request(
460 &mut self,
461 conn_key: &ConnectionKey,
462 sid: u32,
463 ioid: u32,
464 command: u8,
465 ) {
466 let max_ops = self.config.max_operations;
467 let conn = self.get_or_create_connection(conn_key);
468 conn.touch();
469
470 let pv_name = conn.get_pv_name_by_sid(sid).map(|s| s.to_string());
471
472 if conn.operations.len() < max_ops {
473 conn.operations
474 .insert(ioid, OperationState::new(sid, ioid, command, pv_name));
475 self.stats.operations_created += 1;
476 debug!(
477 "Operation INIT: sid={}, ioid={}, cmd={}",
478 sid, ioid, command
479 );
480 }
481 }
482
483 pub fn on_op_init_response(
486 &mut self,
487 conn_key: &ConnectionKey,
488 ioid: u32,
489 field_desc: Option<StructureDesc>,
490 ) {
491 if let Some(conn) = self.connections.get_mut(conn_key) {
492 conn.touch();
493
494 if let Some(op) = conn.operations.get_mut(&ioid) {
495 op.field_desc = field_desc;
496 op.initialized = true;
497 op.touch();
498 debug!("Operation INIT response: ioid={}", ioid);
499 }
500 }
501 }
502
503 pub fn on_op_destroy(&mut self, conn_key: &ConnectionKey, ioid: u32) {
505 if let Some(conn) = self.connections.get_mut(conn_key) {
506 if conn.operations.remove(&ioid).is_some() {
507 self.stats.operations_completed += 1;
508 }
509 }
510 }
511
512 pub fn on_op_activity(&mut self, conn_key: &ConnectionKey, sid: u32, ioid: u32, command: u8) {
516 let max_update_rate = self.config.max_update_rate;
517 let max_ops = self.config.max_operations;
518 let mut created_placeholder = false;
519
520 let conn = self.get_or_create_connection(conn_key);
521 conn.touch();
522
523 Self::record_update(&mut conn.update_times, max_update_rate);
524
525 let mut channel_sid = if sid != 0 { Some(sid) } else { None };
526 if let Some(op) = conn.operations.get_mut(&ioid) {
527 op.touch();
528 Self::record_update(&mut op.update_times, max_update_rate);
529 if channel_sid.is_none() {
530 channel_sid = Some(op.sid);
531 }
532 } else if conn.operations.len() < max_ops {
533 let pv_name = if sid != 0 {
536 conn.get_pv_name_by_sid(sid).map(|s| s.to_string())
537 } else if conn.channels_by_cid.len() == 1 && conn.operations.is_empty() {
538 conn.channels_by_cid.values().next()
543 .map(|ch| ch.pv_name.clone())
544 .filter(|n| !n.starts_with("<unknown"))
545 } else {
546 None
547 };
548 conn.operations.insert(ioid, OperationState::new(sid, ioid, command, pv_name));
549 created_placeholder = true;
550 }
551
552 if let Some(sid_val) = channel_sid {
553 if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
554 channel.touch();
555 Self::record_update(&mut channel.update_times, max_update_rate);
556 }
557 }
558
559 if created_placeholder {
561 self.stats.operations_created += 1;
562 debug!("Auto-created placeholder operation for mid-stream traffic: sid={}, ioid={}, cmd={}", sid, ioid, command);
563 }
564 }
565
566 pub fn on_search(&mut self, pv_requests: &[(u32, String)], source_ip: Option<IpAddr>) {
572 let cid_to_pv: HashMap<u32, String> = pv_requests.iter().cloned().collect();
574
575 for (cid, pv_name) in pv_requests {
576 if let Some(ip) = source_ip {
577 self.search_cache.insert((ip, *cid), pv_name.clone());
578 }
579 self.search_cache_flat.insert(*cid, pv_name.clone());
581 }
582
583 let mut retroactive_count: u64 = 0;
587 for conn in self.connections.values_mut() {
588 for (cid, channel) in conn.channels_by_cid.iter_mut() {
589 if channel.pv_name.starts_with("<unknown") {
590 if let Some(pv_name) = cid_to_pv.get(cid) {
591 debug!(
592 "Retroactive PV resolve from SEARCH: cid={} {} -> {}",
593 cid, channel.pv_name, pv_name
594 );
595 channel.pv_name = pv_name.clone();
596 channel.fully_established = true;
597 retroactive_count += 1;
598 }
599 }
600 }
601
602 for op in conn.operations.values_mut() {
605 let needs_update = match &op.pv_name {
606 None => true,
607 Some(name) => name.starts_with("<unknown"),
608 };
609 if needs_update && op.sid != 0 {
610 if let Some(&cid) = conn.sid_to_cid.get(&op.sid) {
611 if let Some(pv_name) = cid_to_pv.get(&cid) {
612 op.pv_name = Some(pv_name.clone());
613 }
614 }
615 }
616 }
617 }
618 if retroactive_count > 0 {
619 self.stats.search_retroactive_resolves += retroactive_count;
620 debug!(
621 "Retroactively resolved {} unknown channels from SEARCH cache",
622 retroactive_count
623 );
624 }
625
626 self.stats.search_cache_entries = self.search_cache_flat.len() as u64;
628
629 while self.search_cache.len() > 50_000 {
631 if let Some(key) = self.search_cache.keys().next().cloned() {
632 self.search_cache.remove(&key);
633 }
634 }
635 while self.search_cache_flat.len() > 50_000 {
636 if let Some(key) = self.search_cache_flat.keys().next().cloned() {
637 self.search_cache_flat.remove(&key);
638 }
639 }
640 }
641
642 pub fn resolve_search_cids(
647 &mut self,
648 cids: &[u32],
649 peer_ip: Option<IpAddr>,
650 ) -> Vec<(u32, String)> {
651 let mut resolved = Vec::new();
652 for &cid in cids {
653 let pv_name = peer_ip
656 .and_then(|ip| self.search_cache.get(&(ip, cid)))
657 .or_else(|| self.search_cache_flat.get(&cid))
658 .cloned();
659 if let Some(name) = pv_name {
660 resolved.push((cid, name));
661 self.stats.search_responses_resolved += 1;
662 }
663 }
664 resolved
665 }
666
667 pub fn count_direction(&mut self, is_server: bool) {
669 if is_server {
670 self.stats.server_messages += 1;
671 } else {
672 self.stats.client_messages += 1;
673 }
674 }
675
676 pub fn on_message(
677 &mut self,
678 conn_key: &ConnectionKey,
679 sid: u32,
680 ioid: u32,
681 request_type: &str,
682 message: String,
683 is_server: bool,
684 ) {
685 let conn = self.get_or_create_connection(conn_key);
686 conn.touch();
687 let dir = if is_server { "S>" } else { "C>" };
688 let full_message = format!("{} {} {}", dir, request_type, message);
689 Self::push_message(&mut conn.recent_messages, full_message.clone());
690
691 let mut channel_sid = if sid != 0 { Some(sid) } else { None };
692 if let Some(op) = conn.operations.get_mut(&ioid) {
693 Self::push_message(&mut op.recent_messages, full_message.clone());
694 if channel_sid.is_none() {
695 channel_sid = Some(op.sid);
696 }
697 }
698 if let Some(sid_val) = channel_sid {
699 if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
700 Self::push_message(&mut channel.recent_messages, full_message);
701 }
702 }
703 }
704
705 fn record_update(times: &mut VecDeque<Instant>, max_update_rate: usize) {
706 let now = Instant::now();
707 times.push_back(now);
708 Self::trim_times(times, now);
709 while times.len() > max_update_rate {
710 times.pop_front();
711 }
712 }
713
714 fn trim_times(times: &mut VecDeque<Instant>, now: Instant) {
715 while let Some(front) = times.front() {
716 if now.duration_since(*front) > Duration::from_secs(1) {
717 times.pop_front();
718 } else {
719 break;
720 }
721 }
722 }
723
724 fn updates_per_sec(times: &VecDeque<Instant>) -> f64 {
725 times.len() as f64
726 }
727
728 fn push_message(messages: &mut VecDeque<String>, message: String) {
729 messages.push_back(message);
730 while messages.len() > 30 {
731 messages.pop_front();
732 }
733 }
734
735 pub fn resolve_pv_name(&self, conn_key: &ConnectionKey, sid: u32, ioid: u32) -> Option<String> {
737 let conn = self.connections.get(conn_key)?;
738
739 if let Some(op) = conn.operations.get(&ioid) {
741 if let Some(ref name) = op.pv_name {
742 if !name.starts_with("<unknown") {
743 return Some(name.clone());
744 }
745 }
746 }
747
748 if sid != 0 {
750 if let Some(name) = conn.get_pv_name_by_sid(sid) {
751 return Some(name.to_string());
752 }
753 }
754
755 if conn.channels_by_cid.len() == 1 && conn.operations.len() <= 1 {
765 if let Some(ch) = conn.channels_by_cid.values().next() {
766 if !ch.pv_name.starts_with("<unknown") {
767 return Some(ch.pv_name.clone());
768 }
769 }
770 }
771
772 None
773 }
774
775 pub fn active_channel_count(&self) -> usize {
777 self.total_channels
778 }
779
780 pub fn active_connection_count(&self) -> usize {
782 self.connections.len()
783 }
784
785 pub fn is_connection_mid_stream(&self, conn_key: &ConnectionKey) -> bool {
787 self.connections
788 .get(conn_key)
789 .map(|conn| {
790 if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
792 return true;
793 }
794 conn.channels_by_cid.values().any(|ch| !ch.fully_established)
796 })
797 .unwrap_or(false)
798 }
799
800 pub fn get_operation(&self, conn_key: &ConnectionKey, ioid: u32) -> Option<&OperationState> {
802 self.connections
803 .get(conn_key)
804 .and_then(|conn| conn.operations.get(&ioid))
805 }
806
807 fn evict_oldest_channels(&mut self, count: usize) {
809 let mut oldest: Vec<(ConnectionKey, u32, Instant)> = Vec::new();
810
811 for (conn_key, conn) in &self.connections {
812 for (cid, channel) in &conn.channels_by_cid {
813 oldest.push((conn_key.clone(), *cid, channel.last_seen));
814 }
815 }
816
817 oldest.sort_by_key(|(_, _, t)| *t);
819
820 for (conn_key, cid, _) in oldest.into_iter().take(count) {
822 if let Some(conn) = self.connections.get_mut(&conn_key) {
823 if let Some(channel) = conn.channels_by_cid.remove(&cid) {
824 if let Some(sid) = channel.sid {
825 conn.sid_to_cid.remove(&sid);
826 }
827 self.total_channels = self.total_channels.saturating_sub(1);
828 self.stats.channels_evicted += 1;
829 }
830 }
831 }
832 }
833
834 pub fn cleanup_expired(&mut self) {
836 let ttl = self.config.channel_ttl;
837 let mut expired_count = 0;
838
839 for conn in self.connections.values_mut() {
840 let expired_cids: Vec<u32> = conn
841 .channels_by_cid
842 .iter()
843 .filter(|(_, ch)| ch.is_expired(ttl))
844 .map(|(cid, _)| *cid)
845 .collect();
846
847 for cid in expired_cids {
848 if let Some(channel) = conn.channels_by_cid.remove(&cid) {
849 if let Some(sid) = channel.sid {
850 conn.sid_to_cid.remove(&sid);
851 conn.operations.retain(|_, op| op.sid != sid);
852 }
853 expired_count += 1;
854 }
855 }
856 }
857
858 if expired_count > 0 {
859 self.total_channels = self.total_channels.saturating_sub(expired_count);
860 self.stats.channels_expired += expired_count as u64;
861 debug!("Cleaned up {} expired channels", expired_count);
862 }
863
864 self.connections
866 .retain(|_, conn| !conn.channels_by_cid.is_empty() || !conn.operations.is_empty());
867 }
868
869 pub fn summary(&self) -> String {
871 format!(
872 "PVA State: {} connections, {} channels (created={}, destroyed={}, expired={}, evicted={})",
873 self.connections.len(),
874 self.total_channels,
875 self.stats.channels_created,
876 self.stats.channels_destroyed,
877 self.stats.channels_expired,
878 self.stats.channels_evicted,
879 )
880 }
881
882 pub fn channel_count(&self) -> usize {
884 self.total_channels
885 }
886
887 pub fn connection_count(&self) -> usize {
889 self.connections.len()
890 }
891
892 pub fn connection_snapshots(&self) -> Vec<ConnectionSnapshot> {
893 let mut snapshots = Vec::new();
894 let now = Instant::now();
895 for (conn_key, conn) in &self.connections {
896 let mut update_times = conn.update_times.clone();
897 Self::trim_times(&mut update_times, now);
898 let mut pv_names: Vec<String> = conn
899 .channels_by_cid
900 .values()
901 .map(|ch| ch.pv_name.clone())
902 .collect();
903 pv_names.sort();
904 pv_names.truncate(8);
905 let mut messages: Vec<String> = conn.recent_messages.iter().cloned().collect();
906 if messages.len() > 20 {
907 messages = messages.split_off(messages.len() - 20);
908 }
909 let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
910 let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
911 || Self::is_broadcast_addr(&conn_key.addr_b);
912 let mut mid_stream = false;
913 if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
914 mid_stream = true;
915 }
916 if conn
917 .channels_by_cid
918 .values()
919 .any(|ch| !ch.fully_established || ch.pv_name.starts_with("<unknown"))
920 {
921 mid_stream = true;
922 }
923
924 snapshots.push(ConnectionSnapshot {
925 addr_a: conn_key.addr_a,
926 addr_b: conn_key.addr_b,
927 channel_count: conn.channels_by_cid.len(),
928 operation_count: conn.operations.len(),
929 last_seen: conn.last_seen.elapsed(),
930 pv_names,
931 updates_per_sec: Self::updates_per_sec(&update_times),
932 recent_messages: messages,
933 mid_stream,
934 is_beacon,
935 is_broadcast,
936 });
937 }
938 snapshots
939 }
940
941 pub fn channel_snapshots(&self) -> Vec<ChannelSnapshot> {
942 let mut snapshots = Vec::new();
943 let now = Instant::now();
944 for (conn_key, conn) in &self.connections {
945 for channel in conn.channels_by_cid.values() {
946 let mut update_times = channel.update_times.clone();
947 Self::trim_times(&mut update_times, now);
948 let mut messages: Vec<String> = channel.recent_messages.iter().cloned().collect();
949 if messages.len() > 20 {
950 messages = messages.split_off(messages.len() - 20);
951 }
952 let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
953 let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
954 || Self::is_broadcast_addr(&conn_key.addr_b);
955 snapshots.push(ChannelSnapshot {
956 addr_a: conn_key.addr_a,
957 addr_b: conn_key.addr_b,
958 cid: channel.cid,
959 sid: channel.sid,
960 pv_name: channel.pv_name.clone(),
961 last_seen: channel.last_seen.elapsed(),
962 updates_per_sec: Self::updates_per_sec(&update_times),
963 recent_messages: messages,
964 mid_stream: !channel.fully_established
965 || channel.pv_name.starts_with("<unknown"),
966 is_beacon,
967 is_broadcast,
968 });
969 }
970
971 let mut seen_virtual = HashSet::new();
974 for op in conn.operations.values() {
975 if conn.get_channel_by_sid(op.sid).is_none() {
976 let mut update_times = op.update_times.clone();
977 Self::trim_times(&mut update_times, now);
978 let mut messages: Vec<String> = op.recent_messages.iter().cloned().collect();
979 if messages.len() > 20 {
980 messages = messages.split_off(messages.len() - 20);
981 }
982 let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
983 let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
984 || Self::is_broadcast_addr(&conn_key.addr_b);
985 let pv_name = op
986 .pv_name
987 .clone()
988 .unwrap_or_else(|| format!("<unknown:sid={}>", op.sid));
989 if !seen_virtual.insert((op.sid, pv_name.clone())) {
990 continue;
991 }
992 snapshots.push(ChannelSnapshot {
993 addr_a: conn_key.addr_a,
994 addr_b: conn_key.addr_b,
995 cid: 0,
996 sid: Some(op.sid),
997 pv_name,
998 last_seen: op.last_seen.elapsed(),
999 updates_per_sec: Self::updates_per_sec(&update_times),
1000 recent_messages: messages,
1001 mid_stream: true,
1002 is_beacon,
1003 is_broadcast,
1004 });
1005 }
1006 }
1007 }
1008 snapshots
1009 }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014 use super::*;
1015
1016 fn test_conn_key() -> ConnectionKey {
1017 ConnectionKey::from_parts("192.168.1.1", 12345, "192.168.1.2", 5075).unwrap()
1018 }
1019
1020 #[test]
1021 fn test_create_channel_flow() {
1022 let mut tracker = PvaStateTracker::with_defaults();
1023 let key = test_conn_key();
1024
1025 tracker.on_create_channel_request(&key, 1, "TEST:PV:VALUE".to_string());
1027 assert_eq!(tracker.channel_count(), 1);
1028
1029 tracker.on_create_channel_response(&key, 1, 100);
1031
1032 let pv_name = tracker.resolve_pv_name(&key, 100, 0);
1034 assert_eq!(pv_name, Some("TEST:PV:VALUE".to_string()));
1035 }
1036
1037 #[test]
1038 fn test_channel_limit() {
1039 let config = PvaStateConfig::new(100, 300);
1040 let mut tracker = PvaStateTracker::new(config);
1041 let key = test_conn_key();
1042
1043 for i in 0..150 {
1045 tracker.on_create_channel_request(&key, i, format!("PV:{}", i));
1046 }
1047
1048 assert!(tracker.channel_count() <= 100);
1050 }
1051
1052 #[test]
1053 fn test_destroy_channel() {
1054 let mut tracker = PvaStateTracker::with_defaults();
1055 let key = test_conn_key();
1056
1057 tracker.on_create_channel_request(&key, 1, "TEST:PV".to_string());
1058 tracker.on_create_channel_response(&key, 1, 100);
1059 assert_eq!(tracker.channel_count(), 1);
1060
1061 tracker.on_destroy_channel(&key, 1, 100);
1062 assert_eq!(tracker.channel_count(), 0);
1063 }
1064
1065 #[test]
1066 fn test_channel_snapshots_dedup_unresolved_sid_rows() {
1067 let mut tracker = PvaStateTracker::with_defaults();
1068 let key = test_conn_key();
1069
1070 tracker.on_op_init_request(&key, 777, 1001, 13);
1072 tracker.on_op_init_request(&key, 777, 1002, 13);
1073 tracker.on_op_activity(&key, 777, 1001, 13);
1074 tracker.on_op_activity(&key, 777, 1002, 13);
1075
1076 let snapshots = tracker.channel_snapshots();
1077 assert_eq!(snapshots.len(), 1);
1078 assert_eq!(snapshots[0].sid, Some(777));
1079 }
1080
1081 #[test]
1082 fn test_single_channel_fallback_works_for_simple_connection() {
1083 let mut tracker = PvaStateTracker::with_defaults();
1086 let key = test_conn_key();
1087
1088 tracker.on_create_channel_request(&key, 1, "SIMPLE:PV".to_string());
1089 tracker.on_create_channel_response(&key, 1, 100);
1090
1091 let pv = tracker.resolve_pv_name(&key, 0, 99);
1093 assert_eq!(pv, Some("SIMPLE:PV".to_string()));
1094 }
1095
1096 #[test]
1097 fn test_no_false_attribution_on_multiplexed_connection() {
1098 let mut tracker = PvaStateTracker::with_defaults();
1103 let key = test_conn_key();
1104
1105 tracker.on_create_channel_request(&key, 1, "CAPTURED:PV".to_string());
1107 tracker.on_create_channel_response(&key, 1, 100);
1108
1109 tracker.on_op_init_request(&key, 100, 1, 13); for ioid in 2..=10 {
1115 tracker.on_op_activity(&key, 0, ioid, 13);
1116 }
1117
1118 let pv1 = tracker.resolve_pv_name(&key, 100, 1);
1120 assert_eq!(pv1, Some("CAPTURED:PV".to_string()));
1121
1122 for ioid in 2..=10 {
1124 let pv = tracker.resolve_pv_name(&key, 0, ioid);
1125 assert_eq!(pv, None,
1126 "ioid={} should not resolve to the single captured channel", ioid);
1127 }
1128 }
1129
1130 #[test]
1131 fn test_on_op_activity_placeholder_not_created_for_multiplexed() {
1132 let mut tracker = PvaStateTracker::with_defaults();
1136 let key = test_conn_key();
1137
1138 tracker.on_create_channel_request(&key, 1, "KNOWN:PV".to_string());
1139 tracker.on_create_channel_response(&key, 1, 100);
1140
1141 tracker.on_op_init_request(&key, 100, 1, 13);
1143
1144 tracker.on_op_activity(&key, 0, 2, 13);
1146
1147 let pv = tracker.resolve_pv_name(&key, 0, 2);
1148 assert_eq!(pv, None,
1149 "placeholder for ioid=2 should not inherit PV from single-channel fallback");
1150 }
1151
1152 #[test]
1153 fn test_search_cache_populates_and_resolves() {
1154 let mut tracker = PvaStateTracker::with_defaults();
1155 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1156
1157 let pv_requests = vec![
1159 (100, "MOTOR:X:POSITION".to_string()),
1160 (101, "MOTOR:Y:POSITION".to_string()),
1161 (102, "TEMP:SENSOR:1".to_string()),
1162 ];
1163 tracker.on_search(& pv_requests, Some(client_ip));
1164
1165 let resolved = tracker.resolve_search_cids(&[100, 101, 102], Some(client_ip));
1167 assert_eq!(resolved.len(), 3);
1168 assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1169 assert_eq!(resolved[1], (101, "MOTOR:Y:POSITION".to_string()));
1170 assert_eq!(resolved[2], (102, "TEMP:SENSOR:1".to_string()));
1171 }
1172
1173 #[test]
1174 fn test_search_cache_partial_resolve() {
1175 let mut tracker = PvaStateTracker::with_defaults();
1176 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1177
1178 let pv_requests = vec![
1179 (100, "MOTOR:X:POSITION".to_string()),
1180 ];
1181 tracker.on_search(&pv_requests, Some(client_ip));
1182
1183 let resolved = tracker.resolve_search_cids(&[100, 999], Some(client_ip));
1185 assert_eq!(resolved.len(), 1);
1186 assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1187 }
1188
1189 #[test]
1190 fn test_search_cache_scoped_by_ip() {
1191 let mut tracker = PvaStateTracker::with_defaults();
1192 let client_a: IpAddr = "192.168.1.10".parse().unwrap();
1193 let client_b: IpAddr = "192.168.1.20".parse().unwrap();
1194
1195 tracker.on_search(&[(1, "CLIENT_A:PV".to_string())], Some(client_a));
1197 tracker.on_search(&[(1, "CLIENT_B:PV".to_string())], Some(client_b));
1198
1199 let resolved_a = tracker.resolve_search_cids(&[1], Some(client_a));
1201 assert_eq!(resolved_a.len(), 1);
1202 assert_eq!(resolved_a[0].1, "CLIENT_A:PV");
1203
1204 let resolved_b = tracker.resolve_search_cids(&[1], Some(client_b));
1205 assert_eq!(resolved_b.len(), 1);
1206 assert_eq!(resolved_b[0].1, "CLIENT_B:PV");
1207 }
1208
1209 #[test]
1210 fn test_search_cache_flat_fallback() {
1211 let mut tracker = PvaStateTracker::with_defaults();
1212 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1213
1214 tracker.on_search(
1216 &[(42, "SOME:PV:NAME".to_string())],
1217 Some(client_ip),
1218 );
1219
1220 let resolved = tracker.resolve_search_cids(&[42], None);
1222 assert_eq!(resolved.len(), 1);
1223 assert_eq!(resolved[0].1, "SOME:PV:NAME");
1224 }
1225
1226 #[test]
1227 fn test_search_cache_used_by_create_channel_response_fallback() {
1228 let mut tracker = PvaStateTracker::with_defaults();
1231 let key = test_conn_key();
1232 let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1233
1234 tracker.on_search(&[(5, "SEARCHED:PV".to_string())], Some(client_ip));
1236
1237 tracker.on_create_channel_response(&key, 5, 200);
1239
1240 let pv = tracker.resolve_pv_name(&key, 200, 0);
1242 assert_eq!(pv, Some("SEARCHED:PV".to_string()));
1243 }
1244
1245 #[test]
1246 fn test_search_responses_resolved_stat() {
1247 let mut tracker = PvaStateTracker::with_defaults();
1248 let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1249
1250 tracker.on_search(
1251 &[
1252 (1, "PV:A".to_string()),
1253 (2, "PV:B".to_string()),
1254 ],
1255 Some(client_ip),
1256 );
1257
1258 assert_eq!(tracker.stats.search_responses_resolved, 0);
1259
1260 tracker.resolve_search_cids(&[1, 2], Some(client_ip));
1261 assert_eq!(tracker.stats.search_responses_resolved, 2);
1262
1263 tracker.resolve_search_cids(&[1], Some(client_ip));
1265 assert_eq!(tracker.stats.search_responses_resolved, 3);
1266 }
1267
1268 #[test]
1269 fn test_retroactive_resolve_unknown_channels_from_search() {
1270 let mut tracker = PvaStateTracker::with_defaults();
1275 let key = test_conn_key();
1276
1277 tracker.on_create_channel_response(&key, 100, 500);
1279 tracker.on_create_channel_response(&key, 101, 501);
1280 tracker.on_create_channel_response(&key, 102, 502);
1281
1282 assert_eq!(
1284 tracker.resolve_pv_name(&key, 500, 0),
1285 Some("<unknown:cid=100>".to_string())
1286 );
1287 assert_eq!(
1288 tracker.resolve_pv_name(&key, 501, 0),
1289 Some("<unknown:cid=101>".to_string())
1290 );
1291
1292 let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1294 tracker.on_search(
1295 &[
1296 (100, "MOTOR:X:POS".to_string()),
1297 (101, "MOTOR:Y:POS".to_string()),
1298 (102, "TEMP:SENSOR:1".to_string()),
1299 ],
1300 Some(client_ip),
1301 );
1302
1303 assert_eq!(
1305 tracker.resolve_pv_name(&key, 500, 0),
1306 Some("MOTOR:X:POS".to_string())
1307 );
1308 assert_eq!(
1309 tracker.resolve_pv_name(&key, 501, 0),
1310 Some("MOTOR:Y:POS".to_string())
1311 );
1312 assert_eq!(
1313 tracker.resolve_pv_name(&key, 502, 0),
1314 Some("TEMP:SENSOR:1".to_string())
1315 );
1316
1317 assert_eq!(tracker.stats.search_retroactive_resolves, 3);
1319 }
1320
1321 #[test]
1322 fn test_retroactive_resolve_also_updates_operations() {
1323 let mut tracker = PvaStateTracker::with_defaults();
1327 let key = test_conn_key();
1328
1329 tracker.on_create_channel_response(&key, 100, 500);
1331
1332 tracker.on_op_init_request(&key, 500, 1, 13); let pv = tracker.resolve_pv_name(&key, 500, 1);
1338 assert!(pv.is_some());
1339 let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1343 tracker.on_search(
1344 &[(100, "RESOLVED:PV".to_string())],
1345 Some(client_ip),
1346 );
1347
1348 assert_eq!(
1350 tracker.resolve_pv_name(&key, 500, 0),
1351 Some("RESOLVED:PV".to_string())
1352 );
1353 let pv = tracker.resolve_pv_name(&key, 500, 1);
1355 assert_eq!(pv, Some("RESOLVED:PV".to_string()));
1356 }
1357}