Skip to main content

spvirit_codec/
spvirit_state.rs

1//! PVA Connection State Tracker
2//!
3//! Tracks channel mappings (CID ↔ SID ↔ PV name) and operation states
4//! to enable full decoding of MONITOR packets.
5
6use std::collections::HashMap;
7use std::collections::HashSet;
8use std::collections::VecDeque;
9use std::net::{IpAddr, SocketAddr};
10use std::time::{Duration, Instant};
11use tracing::debug;
12
13use crate::spvd_decode::StructureDesc;
14
15/// Configuration for the PVA state tracker
16#[derive(Debug, Clone)]
17pub struct PvaStateConfig {
18    /// Maximum number of channels to track (default: 40000)
19    pub max_channels: usize,
20    /// Time-to-live for channel entries (default: 5 minutes)
21    pub channel_ttl: Duration,
22    /// Maximum number of operations to track per connection
23    pub max_operations: usize,
24    /// Maximum update timestamps kept per connection for rate calculation (default: 10000)
25    pub max_update_rate: usize,
26}
27
28impl Default for PvaStateConfig {
29    fn default() -> Self {
30        Self {
31            max_channels: 40_000,
32            channel_ttl: Duration::from_secs(5 * 60), // 5 minutes
33            max_operations: 10_000,
34            max_update_rate: 10_000,
35        }
36    }
37}
38
39impl PvaStateConfig {
40    pub fn new(max_channels: usize, ttl_secs: u64) -> Self {
41        Self {
42            max_channels,
43            channel_ttl: Duration::from_secs(ttl_secs),
44            max_operations: 10_000,
45            max_update_rate: 10_000,
46        }
47    }
48
49    pub fn with_max_update_rate(mut self, max_update_rate: usize) -> Self {
50        self.max_update_rate = max_update_rate;
51        self
52    }
53}
54
55/// Unique key for a TCP connection (canonical - order independent)
56#[derive(Debug, Clone, Hash, PartialEq, Eq)]
57pub struct ConnectionKey {
58    /// Lower address (lexicographically sorted for consistency)
59    pub addr_a: SocketAddr,
60    /// Higher address
61    pub addr_b: SocketAddr,
62}
63
64impl ConnectionKey {
65    /// Create a canonical connection key (order independent)
66    pub fn new(addr1: SocketAddr, addr2: SocketAddr) -> Self {
67        // Always store in sorted order for consistent hashing
68        if addr1 <= addr2 {
69            Self {
70                addr_a: addr1,
71                addr_b: addr2,
72            }
73        } else {
74            Self {
75                addr_a: addr2,
76                addr_b: addr1,
77            }
78        }
79    }
80
81    /// Create from IP strings and ports (convenience method)
82    /// Order of arguments doesn't matter - will be canonicalized
83    pub fn from_parts(ip1: &str, port1: u16, ip2: &str, port2: u16) -> Option<Self> {
84        let addr1: SocketAddr = format!("{}:{}", ip1, port1).parse().ok()?;
85        let addr2: SocketAddr = format!("{}:{}", ip2, port2).parse().ok()?;
86        Some(Self::new(addr1, addr2))
87    }
88}
89
90/// Information about a channel (PV)
91#[derive(Debug, Clone)]
92pub struct ChannelInfo {
93    /// PV name
94    pub pv_name: String,
95    /// Client Channel ID
96    pub cid: u32,
97    /// Server Channel ID (assigned by server in CREATE_CHANNEL response)
98    pub sid: Option<u32>,
99    /// When this channel was created/last accessed
100    pub last_seen: Instant,
101    /// Whether we saw the full CREATE_CHANNEL exchange
102    pub fully_established: bool,
103    pub update_times: VecDeque<Instant>,
104    pub recent_messages: VecDeque<String>,
105}
106
107impl ChannelInfo {
108    pub fn new_pending(cid: u32, pv_name: String) -> Self {
109        Self {
110            pv_name,
111            cid,
112            sid: None,
113            last_seen: Instant::now(),
114            fully_established: false,
115            update_times: VecDeque::new(),
116            recent_messages: VecDeque::new(),
117        }
118    }
119
120    pub fn touch(&mut self) {
121        self.last_seen = Instant::now();
122    }
123
124    pub fn is_expired(&self, ttl: Duration) -> bool {
125        self.last_seen.elapsed() > ttl
126    }
127}
128
129/// State for an active operation (GET/PUT/MONITOR etc.)
130#[derive(Debug, Clone)]
131pub struct OperationState {
132    /// Server channel ID this operation is on
133    pub sid: u32,
134    /// Operation ID
135    pub ioid: u32,
136    /// Command type (10=GET, 11=PUT, 13=MONITOR, etc.)
137    pub command: u8,
138    /// PV name (resolved from channel state)
139    pub pv_name: Option<String>,
140    /// Field description from INIT response (parsed introspection)
141    pub field_desc: Option<StructureDesc>,
142    /// Whether INIT phase completed
143    pub initialized: bool,
144    /// Last activity
145    pub last_seen: Instant,
146    pub update_times: VecDeque<Instant>,
147    pub recent_messages: VecDeque<String>,
148}
149
150impl OperationState {
151    pub fn new(sid: u32, ioid: u32, command: u8, pv_name: Option<String>) -> Self {
152        Self {
153            sid,
154            ioid,
155            command,
156            pv_name,
157            field_desc: None,
158            initialized: false,
159            last_seen: Instant::now(),
160            update_times: VecDeque::new(),
161            recent_messages: VecDeque::new(),
162        }
163    }
164
165    pub fn touch(&mut self) {
166        self.last_seen = Instant::now();
167    }
168}
169
170/// Per-connection state
171#[derive(Debug)]
172pub struct ConnectionState {
173    /// Channels indexed by Client ID
174    pub channels_by_cid: HashMap<u32, ChannelInfo>,
175    /// Server ID → Client ID mapping
176    pub sid_to_cid: HashMap<u32, u32>,
177    /// Operations indexed by IOID
178    pub operations: HashMap<u32, OperationState>,
179    /// Byte order for this connection (true = big endian)
180    pub is_be: bool,
181    /// Last activity on this connection
182    pub last_seen: Instant,
183    pub update_times: VecDeque<Instant>,
184    pub recent_messages: VecDeque<String>,
185}
186
187impl ConnectionState {
188    pub fn new() -> Self {
189        Self {
190            channels_by_cid: HashMap::new(),
191            sid_to_cid: HashMap::new(),
192            operations: HashMap::new(),
193            is_be: false, // Default to little endian
194            last_seen: Instant::now(),
195            update_times: VecDeque::new(),
196            recent_messages: VecDeque::new(),
197        }
198    }
199
200    pub fn touch(&mut self) {
201        self.last_seen = Instant::now();
202    }
203
204    /// Get channel info by Server ID
205    pub fn get_channel_by_sid(&self, sid: u32) -> Option<&ChannelInfo> {
206        self.sid_to_cid
207            .get(&sid)
208            .and_then(|cid| self.channels_by_cid.get(cid))
209    }
210
211    /// Get mutable channel info by Server ID
212    pub fn get_channel_by_sid_mut(&mut self, sid: u32) -> Option<&mut ChannelInfo> {
213        if let Some(&cid) = self.sid_to_cid.get(&sid) {
214            self.channels_by_cid.get_mut(&cid)
215        } else {
216            None
217        }
218    }
219
220    /// Get PV name for a Server ID
221    pub fn get_pv_name_by_sid(&self, sid: u32) -> Option<&str> {
222        self.get_channel_by_sid(sid).map(|ch| ch.pv_name.as_str())
223    }
224
225    /// Get PV name for an operation IOID
226    pub fn get_pv_name_by_ioid(&self, ioid: u32) -> Option<&str> {
227        self.operations
228            .get(&ioid)
229            .and_then(|op| op.pv_name.as_deref())
230    }
231}
232
233impl Default for ConnectionState {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239/// Global PVA state tracker across all connections
240#[derive(Debug)]
241pub struct PvaStateTracker {
242    /// Configuration
243    config: PvaStateConfig,
244    /// Per-connection state
245    connections: HashMap<ConnectionKey, ConnectionState>,
246    /// Total channel count across all connections (for limit enforcement)
247    total_channels: usize,
248    /// Statistics
249    pub stats: PvaStateStats,
250    /// (client_ip, CID) → PV name cache from SEARCH messages
251    /// Scoped by client IP to prevent CID collisions across different clients
252    search_cache: HashMap<(IpAddr, u32), String>,
253    /// Flat CID → PV name fallback (last-writer-wins, used when client IP is unknown)
254    search_cache_flat: HashMap<u32, String>,
255}
256
257/// Statistics for monitoring
258#[derive(Debug, Default, Clone)]
259pub struct PvaStateStats {
260    pub channels_created: u64,
261    pub channels_destroyed: u64,
262    pub channels_expired: u64,
263    pub channels_evicted: u64,
264    pub operations_created: u64,
265    pub operations_completed: u64,
266    pub create_channel_requests: u64,
267    pub create_channel_responses: u64,
268    pub search_responses_resolved: u64,
269    pub search_cache_entries: u64,
270    pub search_retroactive_resolves: u64,
271    /// PVA messages with is_server=false (sent by client)
272    pub client_messages: u64,
273    /// PVA messages with is_server=true (sent by server)
274    pub server_messages: u64,
275}
276
277#[derive(Debug, Clone)]
278pub struct ConnectionSnapshot {
279    pub addr_a: SocketAddr,
280    pub addr_b: SocketAddr,
281    pub channel_count: usize,
282    pub operation_count: usize,
283    pub last_seen: Duration,
284    pub pv_names: Vec<String>,
285    pub updates_per_sec: f64,
286    pub recent_messages: Vec<String>,
287    pub mid_stream: bool,
288    pub is_beacon: bool,
289    pub is_broadcast: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct ChannelSnapshot {
294    pub addr_a: SocketAddr,
295    pub addr_b: SocketAddr,
296    pub cid: u32,
297    pub sid: Option<u32>,
298    pub pv_name: String,
299    pub last_seen: Duration,
300    pub updates_per_sec: f64,
301    pub recent_messages: Vec<String>,
302    pub mid_stream: bool,
303    pub is_beacon: bool,
304    pub is_broadcast: bool,
305}
306
307impl PvaStateTracker {
308    fn is_broadcast_addr(addr: &SocketAddr) -> bool {
309        match addr.ip() {
310            std::net::IpAddr::V4(v4) => {
311                if v4.is_broadcast() {
312                    return true;
313                }
314                v4.octets()[3] == 255
315            }
316            std::net::IpAddr::V6(v6) => {
317                // IPv6 has no broadcast; treat multicast as equivalent for PVA
318                v6.is_multicast()
319            }
320        }
321    }
322    pub fn new(config: PvaStateConfig) -> Self {
323        Self {
324            config,
325            connections: HashMap::new(),
326            total_channels: 0,
327            stats: PvaStateStats::default(),
328            search_cache: HashMap::new(),
329            search_cache_flat: HashMap::new(),
330        }
331    }
332
333    pub fn with_defaults() -> Self {
334        Self::new(PvaStateConfig::default())
335    }
336
337    /// Get or create connection state
338    fn get_or_create_connection(&mut self, key: &ConnectionKey) -> &mut ConnectionState {
339        if !self.connections.contains_key(key) {
340            self.connections.insert(key.clone(), ConnectionState::new());
341        }
342        self.connections.get_mut(key).unwrap()
343    }
344
345    /// Get connection state (read-only)
346    pub fn get_connection(&self, key: &ConnectionKey) -> Option<&ConnectionState> {
347        self.connections.get(key)
348    }
349
350    /// Get PV name by SID for a connection
351    pub fn get_pv_name_by_sid(&self, conn_key: &ConnectionKey, sid: u32) -> Option<String> {
352        self.connections
353            .get(conn_key)
354            .and_then(|conn| conn.get_pv_name_by_sid(sid))
355            .map(|s| s.to_string())
356    }
357
358    /// Handle CREATE_CHANNEL request (client → server)
359    /// Called when we see cmd=7 from client with CID and PV name
360    pub fn on_create_channel_request(
361        &mut self,
362        conn_key: &ConnectionKey,
363        cid: u32,
364        pv_name: String,
365    ) {
366        self.stats.create_channel_requests += 1;
367
368        // Also cache in search_cache so it's available as fallback
369        // Extract client IP from connection key (client is the one sending the request)
370        let client_ip = conn_key.addr_a.ip(); // either side works as flat fallback
371        self.search_cache.insert((client_ip, cid), pv_name.clone());
372        self.search_cache_flat.insert(cid, pv_name.clone());
373
374        // Check channel limit
375        if self.total_channels >= self.config.max_channels {
376            self.evict_oldest_channels(100); // Evict 100 oldest
377        }
378
379        let conn = self.get_or_create_connection(conn_key);
380        conn.touch();
381
382        // Only add if not already present
383        if !conn.channels_by_cid.contains_key(&cid) {
384            conn.channels_by_cid
385                .insert(cid, ChannelInfo::new_pending(cid, pv_name));
386            self.total_channels += 1;
387            self.stats.channels_created += 1;
388            debug!("CREATE_CHANNEL request: cid={}", cid);
389        }
390    }
391
392    /// Handle CREATE_CHANNEL response (server → client)
393    /// Called when we see cmd=7 from server with CID and SID
394    pub fn on_create_channel_response(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
395        self.stats.create_channel_responses += 1;
396
397        // Look up search cache BEFORE borrowing self mutably via get_or_create_connection
398        // Try scoped cache first (both sides of the connection key), then flat fallback
399        let cached_pv_name = self
400            .search_cache
401            .get(&(conn_key.addr_a.ip(), cid))
402            .or_else(|| self.search_cache.get(&(conn_key.addr_b.ip(), cid)))
403            .or_else(|| self.search_cache_flat.get(&cid))
404            .cloned();
405
406        let conn = self.get_or_create_connection(conn_key);
407        conn.touch();
408
409        if let Some(channel) = conn.channels_by_cid.get_mut(&cid) {
410            channel.sid = Some(sid);
411            channel.fully_established = true;
412            channel.touch();
413            conn.sid_to_cid.insert(sid, cid);
414            debug!(
415                "CREATE_CHANNEL response: cid={}, sid={}, pv={}",
416                cid, sid, channel.pv_name
417            );
418        } else {
419            // We missed the request - try search cache first, then create placeholder
420            let pv_name = cached_pv_name.unwrap_or_else(|| format!("<unknown:cid={}>", cid));
421            let is_resolved = !pv_name.starts_with("<unknown");
422            debug!(
423                "CREATE_CHANNEL response without request: cid={}, sid={}, resolved={}",
424                cid, sid, is_resolved
425            );
426            let mut channel = ChannelInfo::new_pending(cid, pv_name);
427            channel.sid = Some(sid);
428            channel.fully_established = is_resolved;
429            conn.channels_by_cid.insert(cid, channel);
430            conn.sid_to_cid.insert(sid, cid);
431            self.total_channels += 1;
432        }
433    }
434
435    /// Handle DESTROY_CHANNEL (cmd=8)
436    pub fn on_destroy_channel(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
437        if let Some(conn) = self.connections.get_mut(conn_key) {
438            conn.touch();
439
440            // Remove by CID
441            if conn.channels_by_cid.remove(&cid).is_some() {
442                self.total_channels = self.total_channels.saturating_sub(1);
443                self.stats.channels_destroyed += 1;
444            }
445
446            // Remove SID mapping
447            conn.sid_to_cid.remove(&sid);
448
449            // Remove any operations on this channel
450            conn.operations.retain(|_, op| op.sid != sid);
451
452            debug!("DESTROY_CHANNEL: cid={}, sid={}", cid, sid);
453        }
454    }
455
456    /// Handle operation INIT request (client → server)
457    /// subcmd & 0x08 indicates INIT
458    pub fn on_op_init_request(
459        &mut self,
460        conn_key: &ConnectionKey,
461        sid: u32,
462        ioid: u32,
463        command: u8,
464    ) {
465        let max_ops = self.config.max_operations;
466        let conn = self.get_or_create_connection(conn_key);
467        conn.touch();
468
469        let pv_name = conn.get_pv_name_by_sid(sid).map(|s| s.to_string());
470
471        if conn.operations.len() < max_ops {
472            conn.operations
473                .insert(ioid, OperationState::new(sid, ioid, command, pv_name));
474            self.stats.operations_created += 1;
475            debug!(
476                "Operation INIT: sid={}, ioid={}, cmd={}",
477                sid, ioid, command
478            );
479        }
480    }
481
482    /// Handle operation INIT response (server → client)
483    /// Contains type introspection data
484    pub fn on_op_init_response(
485        &mut self,
486        conn_key: &ConnectionKey,
487        ioid: u32,
488        field_desc: Option<StructureDesc>,
489    ) {
490        if let Some(conn) = self.connections.get_mut(conn_key) {
491            conn.touch();
492
493            if let Some(op) = conn.operations.get_mut(&ioid) {
494                op.field_desc = field_desc;
495                op.initialized = true;
496                op.touch();
497                debug!("Operation INIT response: ioid={}", ioid);
498            }
499        }
500    }
501
502    /// Handle operation DESTROY (subcmd & 0x10)
503    pub fn on_op_destroy(&mut self, conn_key: &ConnectionKey, ioid: u32) {
504        if let Some(conn) = self.connections.get_mut(conn_key) {
505            if conn.operations.remove(&ioid).is_some() {
506                self.stats.operations_completed += 1;
507            }
508        }
509    }
510
511    /// Touch connection, operation, and channel activity for any op message (data updates, etc.)
512    /// If the IOID is unknown (mid-stream join), auto-creates a placeholder operation
513    /// so the connection appears on the Connections page.
514    pub fn on_op_activity(&mut self, conn_key: &ConnectionKey, sid: u32, ioid: u32, command: u8) {
515        let max_update_rate = self.config.max_update_rate;
516        let max_ops = self.config.max_operations;
517        let mut created_placeholder = false;
518
519        let conn = self.get_or_create_connection(conn_key);
520        conn.touch();
521
522        Self::record_update(&mut conn.update_times, max_update_rate);
523
524        let mut channel_sid = if sid != 0 { Some(sid) } else { None };
525        if let Some(op) = conn.operations.get_mut(&ioid) {
526            op.touch();
527            Self::record_update(&mut op.update_times, max_update_rate);
528            if channel_sid.is_none() {
529                channel_sid = Some(op.sid);
530            }
531        } else if conn.operations.len() < max_ops {
532            // Mid-stream: we missed the INIT exchange, create a placeholder operation
533            // so this connection/channel is visible on the Connections page.
534            let pv_name = if sid != 0 {
535                conn.get_pv_name_by_sid(sid).map(|s| s.to_string())
536            } else if conn.channels_by_cid.len() == 1 && conn.operations.is_empty() {
537                // Server Op messages have sid=0; only use single-channel fallback
538                // when this is the very first operation (no other ops yet).
539                // If there are already other operations, this is likely a
540                // multiplexed connection and the fallback would be wrong.
541                conn.channels_by_cid
542                    .values()
543                    .next()
544                    .map(|ch| ch.pv_name.clone())
545                    .filter(|n| !n.starts_with("<unknown"))
546            } else {
547                None
548            };
549            conn.operations
550                .insert(ioid, OperationState::new(sid, ioid, command, pv_name));
551            created_placeholder = true;
552        }
553
554        if let Some(sid_val) = channel_sid {
555            if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
556                channel.touch();
557                Self::record_update(&mut channel.update_times, max_update_rate);
558            }
559        }
560
561        // Deferred stat update — can't touch self.stats while conn borrows self
562        if created_placeholder {
563            self.stats.operations_created += 1;
564            debug!(
565                "Auto-created placeholder operation for mid-stream traffic: sid={}, ioid={}, cmd={}",
566                sid, ioid, command
567            );
568        }
569    }
570
571    /// Cache PV name mappings from SEARCH messages (CID → PV name)
572    /// These serve as fallback when the client's CREATE_CHANNEL request is missed.
573    /// Also retroactively resolves any existing `<unknown:cid=N>` channels and
574    /// placeholder operations that match the CIDs in this SEARCH.
575    /// `source_ip` is the IP of the client that sent the SEARCH request.
576    pub fn on_search(&mut self, pv_requests: &[(u32, String)], source_ip: Option<IpAddr>) {
577        // Build a lookup map for this batch
578        let cid_to_pv: HashMap<u32, String> = pv_requests.iter().cloned().collect();
579
580        for (cid, pv_name) in pv_requests {
581            if let Some(ip) = source_ip {
582                self.search_cache.insert((ip, *cid), pv_name.clone());
583            }
584            // Always populate flat fallback
585            self.search_cache_flat.insert(*cid, pv_name.clone());
586        }
587
588        // Retroactively resolve existing unknown channels and operations.
589        // Walk all connections and fix any <unknown:cid=N> entries whose CID
590        // matches a CID from this SEARCH request.
591        let mut retroactive_count: u64 = 0;
592        for conn in self.connections.values_mut() {
593            for (cid, channel) in conn.channels_by_cid.iter_mut() {
594                if channel.pv_name.starts_with("<unknown") {
595                    if let Some(pv_name) = cid_to_pv.get(cid) {
596                        debug!(
597                            "Retroactive PV resolve from SEARCH: cid={} {} -> {}",
598                            cid, channel.pv_name, pv_name
599                        );
600                        channel.pv_name = pv_name.clone();
601                        channel.fully_established = true;
602                        retroactive_count += 1;
603                    }
604                }
605            }
606
607            // Also update placeholder operations that have pv_name=None
608            // or stale <unknown...> names, and whose SID maps to a resolved channel
609            for op in conn.operations.values_mut() {
610                let needs_update = match &op.pv_name {
611                    None => true,
612                    Some(name) => name.starts_with("<unknown"),
613                };
614                if needs_update && op.sid != 0 {
615                    if let Some(&cid) = conn.sid_to_cid.get(&op.sid) {
616                        if let Some(pv_name) = cid_to_pv.get(&cid) {
617                            op.pv_name = Some(pv_name.clone());
618                        }
619                    }
620                }
621            }
622        }
623        if retroactive_count > 0 {
624            self.stats.search_retroactive_resolves += retroactive_count;
625            debug!(
626                "Retroactively resolved {} unknown channels from SEARCH cache",
627                retroactive_count
628            );
629        }
630
631        // Update search cache size stat
632        self.stats.search_cache_entries = self.search_cache_flat.len() as u64;
633
634        // Cap cache sizes to prevent unbounded growth
635        while self.search_cache.len() > 50_000 {
636            if let Some(key) = self.search_cache.keys().next().cloned() {
637                self.search_cache.remove(&key);
638            }
639        }
640        while self.search_cache_flat.len() > 50_000 {
641            if let Some(key) = self.search_cache_flat.keys().next().cloned() {
642                self.search_cache_flat.remove(&key);
643            }
644        }
645    }
646
647    /// Resolve PV names from SEARCH_RESPONSE CIDs using the search cache.
648    /// Returns a list of (CID, resolved_pv_name) pairs for all CIDs that could be resolved.
649    /// `source_ip` is optionally the IP of the server that sent the response;
650    /// we try scoped lookups using peer IPs, then fall back to flat cache.
651    pub fn resolve_search_cids(
652        &mut self,
653        cids: &[u32],
654        peer_ip: Option<IpAddr>,
655    ) -> Vec<(u32, String)> {
656        let mut resolved = Vec::new();
657        for &cid in cids {
658            // Try scoped cache with peer IP (the client that originally searched),
659            // then fall back to flat cache
660            let pv_name = peer_ip
661                .and_then(|ip| self.search_cache.get(&(ip, cid)))
662                .or_else(|| self.search_cache_flat.get(&cid))
663                .cloned();
664            if let Some(name) = pv_name {
665                resolved.push((cid, name));
666                self.stats.search_responses_resolved += 1;
667            }
668        }
669        resolved
670    }
671
672    /// Count a PVA message direction (for messages not routed through on_message)
673    pub fn count_direction(&mut self, is_server: bool) {
674        if is_server {
675            self.stats.server_messages += 1;
676        } else {
677            self.stats.client_messages += 1;
678        }
679    }
680
681    pub fn on_message(
682        &mut self,
683        conn_key: &ConnectionKey,
684        sid: u32,
685        ioid: u32,
686        request_type: &str,
687        message: String,
688        is_server: bool,
689    ) {
690        let conn = self.get_or_create_connection(conn_key);
691        conn.touch();
692        let dir = if is_server { "S>" } else { "C>" };
693        let full_message = format!("{} {} {}", dir, request_type, message);
694        Self::push_message(&mut conn.recent_messages, full_message.clone());
695
696        let mut channel_sid = if sid != 0 { Some(sid) } else { None };
697        if let Some(op) = conn.operations.get_mut(&ioid) {
698            Self::push_message(&mut op.recent_messages, full_message.clone());
699            if channel_sid.is_none() {
700                channel_sid = Some(op.sid);
701            }
702        }
703        if let Some(sid_val) = channel_sid {
704            if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
705                Self::push_message(&mut channel.recent_messages, full_message);
706            }
707        }
708    }
709
710    fn record_update(times: &mut VecDeque<Instant>, max_update_rate: usize) {
711        let now = Instant::now();
712        times.push_back(now);
713        Self::trim_times(times, now);
714        while times.len() > max_update_rate {
715            times.pop_front();
716        }
717    }
718
719    fn trim_times(times: &mut VecDeque<Instant>, now: Instant) {
720        while let Some(front) = times.front() {
721            if now.duration_since(*front) > Duration::from_secs(1) {
722                times.pop_front();
723            } else {
724                break;
725            }
726        }
727    }
728
729    fn updates_per_sec(times: &VecDeque<Instant>) -> f64 {
730        times.len() as f64
731    }
732
733    fn push_message(messages: &mut VecDeque<String>, message: String) {
734        messages.push_back(message);
735        while messages.len() > 30 {
736            messages.pop_front();
737        }
738    }
739
740    /// Resolve PV name for a MONITOR/GET/PUT packet
741    pub fn resolve_pv_name(&self, conn_key: &ConnectionKey, sid: u32, ioid: u32) -> Option<String> {
742        let conn = self.connections.get(conn_key)?;
743
744        // First try by IOID (operation state) - works for server responses
745        if let Some(op) = conn.operations.get(&ioid) {
746            if let Some(ref name) = op.pv_name {
747                if !name.starts_with("<unknown") {
748                    return Some(name.clone());
749                }
750            }
751        }
752
753        // Fall back to SID lookup - works for client requests
754        if sid != 0 {
755            if let Some(name) = conn.get_pv_name_by_sid(sid) {
756                return Some(name.to_string());
757            }
758        }
759
760        // Last resort: if there's exactly one channel AND at most one operation,
761        // use that channel's PV name. This handles simple single-PV connections
762        // where the server Op message has sid_or_cid=0.
763        //
764        // IMPORTANT: Do NOT use this fallback when there are multiple operations,
765        // because PVA multiplexes many channels over one TCP connection (e.g.
766        // Phoebus). If we only captured one CREATE_CHANNEL but there are many
767        // ops, the other ops likely belong to different PVs that were established
768        // before our capture started.
769        if conn.channels_by_cid.len() == 1 && conn.operations.len() <= 1 {
770            if let Some(ch) = conn.channels_by_cid.values().next() {
771                if !ch.pv_name.starts_with("<unknown") {
772                    return Some(ch.pv_name.clone());
773                }
774            }
775        }
776
777        None
778    }
779
780    /// Get the number of active tracked channels
781    pub fn active_channel_count(&self) -> usize {
782        self.total_channels
783    }
784
785    /// Get the number of active tracked connections
786    pub fn active_connection_count(&self) -> usize {
787        self.connections.len()
788    }
789
790    /// Check if a connection is mid-stream (incomplete channel state)
791    pub fn is_connection_mid_stream(&self, conn_key: &ConnectionKey) -> bool {
792        self.connections
793            .get(conn_key)
794            .map(|conn| {
795                // Operations exist but no channels tracked → definitely mid-stream
796                if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
797                    return true;
798                }
799                // Any channel not fully established → mid-stream
800                conn.channels_by_cid
801                    .values()
802                    .any(|ch| !ch.fully_established)
803            })
804            .unwrap_or(false)
805    }
806
807    /// Get operation state for decoding values
808    pub fn get_operation(&self, conn_key: &ConnectionKey, ioid: u32) -> Option<&OperationState> {
809        self.connections
810            .get(conn_key)
811            .and_then(|conn| conn.operations.get(&ioid))
812    }
813
814    /// Evict oldest channels when at capacity
815    fn evict_oldest_channels(&mut self, count: usize) {
816        let mut oldest: Vec<(ConnectionKey, u32, Instant)> = Vec::new();
817
818        for (conn_key, conn) in &self.connections {
819            for (cid, channel) in &conn.channels_by_cid {
820                oldest.push((conn_key.clone(), *cid, channel.last_seen));
821            }
822        }
823
824        // Sort by last_seen (oldest first)
825        oldest.sort_by_key(|(_, _, t)| *t);
826
827        // Remove oldest
828        for (conn_key, cid, _) in oldest.into_iter().take(count) {
829            if let Some(conn) = self.connections.get_mut(&conn_key) {
830                if let Some(channel) = conn.channels_by_cid.remove(&cid) {
831                    if let Some(sid) = channel.sid {
832                        conn.sid_to_cid.remove(&sid);
833                    }
834                    self.total_channels = self.total_channels.saturating_sub(1);
835                    self.stats.channels_evicted += 1;
836                }
837            }
838        }
839    }
840
841    /// Periodic cleanup of expired entries
842    pub fn cleanup_expired(&mut self) {
843        let ttl = self.config.channel_ttl;
844        let mut expired_count = 0;
845
846        for conn in self.connections.values_mut() {
847            let expired_cids: Vec<u32> = conn
848                .channels_by_cid
849                .iter()
850                .filter(|(_, ch)| ch.is_expired(ttl))
851                .map(|(cid, _)| *cid)
852                .collect();
853
854            for cid in expired_cids {
855                if let Some(channel) = conn.channels_by_cid.remove(&cid) {
856                    if let Some(sid) = channel.sid {
857                        conn.sid_to_cid.remove(&sid);
858                        conn.operations.retain(|_, op| op.sid != sid);
859                    }
860                    expired_count += 1;
861                }
862            }
863        }
864
865        if expired_count > 0 {
866            self.total_channels = self.total_channels.saturating_sub(expired_count);
867            self.stats.channels_expired += expired_count as u64;
868            debug!("Cleaned up {} expired channels", expired_count);
869        }
870
871        // Remove empty connections
872        self.connections
873            .retain(|_, conn| !conn.channels_by_cid.is_empty() || !conn.operations.is_empty());
874    }
875
876    /// Get summary statistics
877    pub fn summary(&self) -> String {
878        format!(
879            "PVA State: {} connections, {} channels (created={}, destroyed={}, expired={}, evicted={})",
880            self.connections.len(),
881            self.total_channels,
882            self.stats.channels_created,
883            self.stats.channels_destroyed,
884            self.stats.channels_expired,
885            self.stats.channels_evicted,
886        )
887    }
888
889    /// Get current channel count
890    pub fn channel_count(&self) -> usize {
891        self.total_channels
892    }
893
894    /// Get current connection count
895    pub fn connection_count(&self) -> usize {
896        self.connections.len()
897    }
898
899    pub fn connection_snapshots(&self) -> Vec<ConnectionSnapshot> {
900        let mut snapshots = Vec::new();
901        let now = Instant::now();
902        for (conn_key, conn) in &self.connections {
903            let mut update_times = conn.update_times.clone();
904            Self::trim_times(&mut update_times, now);
905            let mut pv_names: Vec<String> = conn
906                .channels_by_cid
907                .values()
908                .map(|ch| ch.pv_name.clone())
909                .collect();
910            pv_names.sort();
911            pv_names.truncate(8);
912            let mut messages: Vec<String> = conn.recent_messages.iter().cloned().collect();
913            if messages.len() > 20 {
914                messages = messages.split_off(messages.len() - 20);
915            }
916            let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
917            let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
918                || Self::is_broadcast_addr(&conn_key.addr_b);
919            let mut mid_stream = false;
920            if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
921                mid_stream = true;
922            }
923            if conn
924                .channels_by_cid
925                .values()
926                .any(|ch| !ch.fully_established || ch.pv_name.starts_with("<unknown"))
927            {
928                mid_stream = true;
929            }
930
931            snapshots.push(ConnectionSnapshot {
932                addr_a: conn_key.addr_a,
933                addr_b: conn_key.addr_b,
934                channel_count: conn.channels_by_cid.len(),
935                operation_count: conn.operations.len(),
936                last_seen: conn.last_seen.elapsed(),
937                pv_names,
938                updates_per_sec: Self::updates_per_sec(&update_times),
939                recent_messages: messages,
940                mid_stream,
941                is_beacon,
942                is_broadcast,
943            });
944        }
945        snapshots
946    }
947
948    pub fn channel_snapshots(&self) -> Vec<ChannelSnapshot> {
949        let mut snapshots = Vec::new();
950        let now = Instant::now();
951        for (conn_key, conn) in &self.connections {
952            for channel in conn.channels_by_cid.values() {
953                let mut update_times = channel.update_times.clone();
954                Self::trim_times(&mut update_times, now);
955                let mut messages: Vec<String> = channel.recent_messages.iter().cloned().collect();
956                if messages.len() > 20 {
957                    messages = messages.split_off(messages.len() - 20);
958                }
959                let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
960                let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
961                    || Self::is_broadcast_addr(&conn_key.addr_b);
962                snapshots.push(ChannelSnapshot {
963                    addr_a: conn_key.addr_a,
964                    addr_b: conn_key.addr_b,
965                    cid: channel.cid,
966                    sid: channel.sid,
967                    pv_name: channel.pv_name.clone(),
968                    last_seen: channel.last_seen.elapsed(),
969                    updates_per_sec: Self::updates_per_sec(&update_times),
970                    recent_messages: messages,
971                    mid_stream: !channel.fully_established
972                        || channel.pv_name.starts_with("<unknown"),
973                    is_beacon,
974                    is_broadcast,
975                });
976            }
977
978            // Avoid emitting duplicate fallback rows when multiple operations
979            // reference the same unresolved SID/PV on one connection.
980            let mut seen_virtual = HashSet::new();
981            for op in conn.operations.values() {
982                if conn.get_channel_by_sid(op.sid).is_none() {
983                    let mut update_times = op.update_times.clone();
984                    Self::trim_times(&mut update_times, now);
985                    let mut messages: Vec<String> = op.recent_messages.iter().cloned().collect();
986                    if messages.len() > 20 {
987                        messages = messages.split_off(messages.len() - 20);
988                    }
989                    let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
990                    let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
991                        || Self::is_broadcast_addr(&conn_key.addr_b);
992                    let pv_name = op
993                        .pv_name
994                        .clone()
995                        .unwrap_or_else(|| format!("<unknown:sid={}>", op.sid));
996                    if !seen_virtual.insert((op.sid, pv_name.clone())) {
997                        continue;
998                    }
999                    snapshots.push(ChannelSnapshot {
1000                        addr_a: conn_key.addr_a,
1001                        addr_b: conn_key.addr_b,
1002                        cid: 0,
1003                        sid: Some(op.sid),
1004                        pv_name,
1005                        last_seen: op.last_seen.elapsed(),
1006                        updates_per_sec: Self::updates_per_sec(&update_times),
1007                        recent_messages: messages,
1008                        mid_stream: true,
1009                        is_beacon,
1010                        is_broadcast,
1011                    });
1012                }
1013            }
1014        }
1015        snapshots
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022
1023    fn test_conn_key() -> ConnectionKey {
1024        ConnectionKey::from_parts("192.168.1.1", 12345, "192.168.1.2", 5075).unwrap()
1025    }
1026
1027    #[test]
1028    fn test_create_channel_flow() {
1029        let mut tracker = PvaStateTracker::with_defaults();
1030        let key = test_conn_key();
1031
1032        // Client sends CREATE_CHANNEL
1033        tracker.on_create_channel_request(&key, 1, "TEST:PV:VALUE".to_string());
1034        assert_eq!(tracker.channel_count(), 1);
1035
1036        // Server responds
1037        tracker.on_create_channel_response(&key, 1, 100);
1038
1039        // Verify we can resolve the PV name
1040        let pv_name = tracker.resolve_pv_name(&key, 100, 0);
1041        assert_eq!(pv_name, Some("TEST:PV:VALUE".to_string()));
1042    }
1043
1044    #[test]
1045    fn test_channel_limit() {
1046        let config = PvaStateConfig::new(100, 300);
1047        let mut tracker = PvaStateTracker::new(config);
1048        let key = test_conn_key();
1049
1050        // Add 150 channels (exceeds limit of 100)
1051        for i in 0..150 {
1052            tracker.on_create_channel_request(&key, i, format!("PV:{}", i));
1053        }
1054
1055        // Should have evicted some
1056        assert!(tracker.channel_count() <= 100);
1057    }
1058
1059    #[test]
1060    fn test_destroy_channel() {
1061        let mut tracker = PvaStateTracker::with_defaults();
1062        let key = test_conn_key();
1063
1064        tracker.on_create_channel_request(&key, 1, "TEST:PV".to_string());
1065        tracker.on_create_channel_response(&key, 1, 100);
1066        assert_eq!(tracker.channel_count(), 1);
1067
1068        tracker.on_destroy_channel(&key, 1, 100);
1069        assert_eq!(tracker.channel_count(), 0);
1070    }
1071
1072    #[test]
1073    fn test_channel_snapshots_dedup_unresolved_sid_rows() {
1074        let mut tracker = PvaStateTracker::with_defaults();
1075        let key = test_conn_key();
1076
1077        // Two operations on same unresolved SID should collapse to one virtual channel row.
1078        tracker.on_op_init_request(&key, 777, 1001, 13);
1079        tracker.on_op_init_request(&key, 777, 1002, 13);
1080        tracker.on_op_activity(&key, 777, 1001, 13);
1081        tracker.on_op_activity(&key, 777, 1002, 13);
1082
1083        let snapshots = tracker.channel_snapshots();
1084        assert_eq!(snapshots.len(), 1);
1085        assert_eq!(snapshots[0].sid, Some(777));
1086    }
1087
1088    #[test]
1089    fn test_single_channel_fallback_works_for_simple_connection() {
1090        // When there is truly one channel and zero/one operations, the
1091        // single-channel fallback should resolve the PV name from sid=0.
1092        let mut tracker = PvaStateTracker::with_defaults();
1093        let key = test_conn_key();
1094
1095        tracker.on_create_channel_request(&key, 1, "SIMPLE:PV".to_string());
1096        tracker.on_create_channel_response(&key, 1, 100);
1097
1098        // sid=0, ioid=99 — no matching operation
1099        let pv = tracker.resolve_pv_name(&key, 0, 99);
1100        assert_eq!(pv, Some("SIMPLE:PV".to_string()));
1101    }
1102
1103    #[test]
1104    fn test_no_false_attribution_on_multiplexed_connection() {
1105        // Phoebus scenario: one TCP connection carries many channels, but we
1106        // only captured one CREATE_CHANNEL.  When additional ops arrive with
1107        // sid=0 (server direction), the single-channel fallback must NOT
1108        // attribute them to the one known channel.
1109        let mut tracker = PvaStateTracker::with_defaults();
1110        let key = test_conn_key();
1111
1112        // Capture one channel
1113        tracker.on_create_channel_request(&key, 1, "CAPTURED:PV".to_string());
1114        tracker.on_create_channel_response(&key, 1, 100);
1115
1116        // Simulate many ops arriving (as happens with multiplexed connections).
1117        // First op via on_op_init_request with sid known:
1118        tracker.on_op_init_request(&key, 100, 1, 13); // MONITOR for the known channel
1119
1120        // Additional ops with different SIDs (channels we never saw created):
1121        for ioid in 2..=10 {
1122            tracker.on_op_activity(&key, 0, ioid, 13);
1123        }
1124
1125        // The known IOID=1 should resolve (via its op's pv_name from INIT)
1126        let pv1 = tracker.resolve_pv_name(&key, 100, 1);
1127        assert_eq!(pv1, Some("CAPTURED:PV".to_string()));
1128
1129        // Unknown ioids should NOT resolve to CAPTURED:PV
1130        for ioid in 2..=10 {
1131            let pv = tracker.resolve_pv_name(&key, 0, ioid);
1132            assert_eq!(
1133                pv, None,
1134                "ioid={} should not resolve to the single captured channel",
1135                ioid
1136            );
1137        }
1138    }
1139
1140    #[test]
1141    fn test_on_op_activity_placeholder_not_created_for_multiplexed() {
1142        // When one channel is known but operations already exist, activity
1143        // with sid=0 should create a placeholder WITHOUT a PV name (not
1144        // inheriting from the single captured channel).
1145        let mut tracker = PvaStateTracker::with_defaults();
1146        let key = test_conn_key();
1147
1148        tracker.on_create_channel_request(&key, 1, "KNOWN:PV".to_string());
1149        tracker.on_create_channel_response(&key, 1, 100);
1150
1151        // First op — establishes that operations exist
1152        tracker.on_op_init_request(&key, 100, 1, 13);
1153
1154        // Second op via on_op_activity with sid=0 — should NOT inherit PV name
1155        tracker.on_op_activity(&key, 0, 2, 13);
1156
1157        let pv = tracker.resolve_pv_name(&key, 0, 2);
1158        assert_eq!(
1159            pv, None,
1160            "placeholder for ioid=2 should not inherit PV from single-channel fallback"
1161        );
1162    }
1163
1164    #[test]
1165    fn test_search_cache_populates_and_resolves() {
1166        let mut tracker = PvaStateTracker::with_defaults();
1167        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1168
1169        // Simulate SEARCH request with CID → PV name pairs
1170        let pv_requests = vec![
1171            (100, "MOTOR:X:POSITION".to_string()),
1172            (101, "MOTOR:Y:POSITION".to_string()),
1173            (102, "TEMP:SENSOR:1".to_string()),
1174        ];
1175        tracker.on_search(&pv_requests, Some(client_ip));
1176
1177        // Resolve CIDs from a SEARCH_RESPONSE
1178        let resolved = tracker.resolve_search_cids(&[100, 101, 102], Some(client_ip));
1179        assert_eq!(resolved.len(), 3);
1180        assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1181        assert_eq!(resolved[1], (101, "MOTOR:Y:POSITION".to_string()));
1182        assert_eq!(resolved[2], (102, "TEMP:SENSOR:1".to_string()));
1183    }
1184
1185    #[test]
1186    fn test_search_cache_partial_resolve() {
1187        let mut tracker = PvaStateTracker::with_defaults();
1188        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1189
1190        let pv_requests = vec![(100, "MOTOR:X:POSITION".to_string())];
1191        tracker.on_search(&pv_requests, Some(client_ip));
1192
1193        // Resolve with some CIDs that were never cached
1194        let resolved = tracker.resolve_search_cids(&[100, 999], Some(client_ip));
1195        assert_eq!(resolved.len(), 1);
1196        assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1197    }
1198
1199    #[test]
1200    fn test_search_cache_scoped_by_ip() {
1201        let mut tracker = PvaStateTracker::with_defaults();
1202        let client_a: IpAddr = "192.168.1.10".parse().unwrap();
1203        let client_b: IpAddr = "192.168.1.20".parse().unwrap();
1204
1205        // Both clients use the same CID=1 but different PV names
1206        tracker.on_search(&[(1, "CLIENT_A:PV".to_string())], Some(client_a));
1207        tracker.on_search(&[(1, "CLIENT_B:PV".to_string())], Some(client_b));
1208
1209        // Each client should resolve to its own PV name
1210        let resolved_a = tracker.resolve_search_cids(&[1], Some(client_a));
1211        assert_eq!(resolved_a.len(), 1);
1212        assert_eq!(resolved_a[0].1, "CLIENT_A:PV");
1213
1214        let resolved_b = tracker.resolve_search_cids(&[1], Some(client_b));
1215        assert_eq!(resolved_b.len(), 1);
1216        assert_eq!(resolved_b[0].1, "CLIENT_B:PV");
1217    }
1218
1219    #[test]
1220    fn test_search_cache_flat_fallback() {
1221        let mut tracker = PvaStateTracker::with_defaults();
1222        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1223
1224        // Cache with a known client IP
1225        tracker.on_search(&[(42, "SOME:PV:NAME".to_string())], Some(client_ip));
1226
1227        // Resolve without knowing the client IP (flat fallback)
1228        let resolved = tracker.resolve_search_cids(&[42], None);
1229        assert_eq!(resolved.len(), 1);
1230        assert_eq!(resolved[0].1, "SOME:PV:NAME");
1231    }
1232
1233    #[test]
1234    fn test_search_cache_used_by_create_channel_response_fallback() {
1235        // When capture misses CREATE_CHANNEL request but has SEARCH,
1236        // the search cache should resolve PV name in CREATE_CHANNEL response.
1237        let mut tracker = PvaStateTracker::with_defaults();
1238        let key = test_conn_key();
1239        let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1240
1241        // Simulate SEARCH with CID=5 → "SEARCHED:PV"
1242        tracker.on_search(&[(5, "SEARCHED:PV".to_string())], Some(client_ip));
1243
1244        // Simulate CREATE_CHANNEL response without having seen the request
1245        tracker.on_create_channel_response(&key, 5, 200);
1246
1247        // The PV name should be resolved from search cache
1248        let pv = tracker.resolve_pv_name(&key, 200, 0);
1249        assert_eq!(pv, Some("SEARCHED:PV".to_string()));
1250    }
1251
1252    #[test]
1253    fn test_search_responses_resolved_stat() {
1254        let mut tracker = PvaStateTracker::with_defaults();
1255        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1256
1257        tracker.on_search(
1258            &[(1, "PV:A".to_string()), (2, "PV:B".to_string())],
1259            Some(client_ip),
1260        );
1261
1262        assert_eq!(tracker.stats.search_responses_resolved, 0);
1263
1264        tracker.resolve_search_cids(&[1, 2], Some(client_ip));
1265        assert_eq!(tracker.stats.search_responses_resolved, 2);
1266
1267        // Resolving again increments further
1268        tracker.resolve_search_cids(&[1], Some(client_ip));
1269        assert_eq!(tracker.stats.search_responses_resolved, 3);
1270    }
1271
1272    #[test]
1273    fn test_retroactive_resolve_unknown_channels_from_search() {
1274        // Simulates the Java EPICS client scenario:
1275        // 1. Capture starts mid-stream, sees CREATE_CHANNEL responses (cid+sid)
1276        //    but missed the requests → channels are <unknown:cid=N>
1277        // 2. Later a SEARCH arrives with those CIDs → retroactively resolves PV names
1278        let mut tracker = PvaStateTracker::with_defaults();
1279        let key = test_conn_key();
1280
1281        // Step 1: CREATE_CHANNEL responses without prior requests → unknown channels
1282        tracker.on_create_channel_response(&key, 100, 500);
1283        tracker.on_create_channel_response(&key, 101, 501);
1284        tracker.on_create_channel_response(&key, 102, 502);
1285
1286        // Verify channels are unknown
1287        assert_eq!(
1288            tracker.resolve_pv_name(&key, 500, 0),
1289            Some("<unknown:cid=100>".to_string())
1290        );
1291        assert_eq!(
1292            tracker.resolve_pv_name(&key, 501, 0),
1293            Some("<unknown:cid=101>".to_string())
1294        );
1295
1296        // Step 2: SEARCH arrives with CID→PV name mappings
1297        let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1298        tracker.on_search(
1299            &[
1300                (100, "MOTOR:X:POS".to_string()),
1301                (101, "MOTOR:Y:POS".to_string()),
1302                (102, "TEMP:SENSOR:1".to_string()),
1303            ],
1304            Some(client_ip),
1305        );
1306
1307        // Verify channels are now resolved
1308        assert_eq!(
1309            tracker.resolve_pv_name(&key, 500, 0),
1310            Some("MOTOR:X:POS".to_string())
1311        );
1312        assert_eq!(
1313            tracker.resolve_pv_name(&key, 501, 0),
1314            Some("MOTOR:Y:POS".to_string())
1315        );
1316        assert_eq!(
1317            tracker.resolve_pv_name(&key, 502, 0),
1318            Some("TEMP:SENSOR:1".to_string())
1319        );
1320
1321        // Verify retroactive resolution was counted
1322        assert_eq!(tracker.stats.search_retroactive_resolves, 3);
1323    }
1324
1325    #[test]
1326    fn test_retroactive_resolve_also_updates_operations() {
1327        // When a placeholder operation has pv_name=None and its SID maps
1328        // to a channel that just got retroactively resolved, the operation's
1329        // pv_name should also be updated.
1330        let mut tracker = PvaStateTracker::with_defaults();
1331        let key = test_conn_key();
1332
1333        // CREATE_CHANNEL response without request → <unknown:cid=100>
1334        tracker.on_create_channel_response(&key, 100, 500);
1335
1336        // Op INIT on that channel → operation gets pv_name from channel
1337        // But the channel is unknown, so op gets "<unknown:cid=100>" as name
1338        tracker.on_op_init_request(&key, 500, 1, 13); // MONITOR
1339
1340        // Verify op resolves to unknown
1341        let pv = tracker.resolve_pv_name(&key, 500, 1);
1342        assert!(pv.is_some());
1343        // The op should have inherited the unknown name since it looked up via SID
1344
1345        // SEARCH arrives with the CID→PV mapping
1346        let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1347        tracker.on_search(&[(100, "RESOLVED:PV".to_string())], Some(client_ip));
1348
1349        // Channel should now be resolved
1350        assert_eq!(
1351            tracker.resolve_pv_name(&key, 500, 0),
1352            Some("RESOLVED:PV".to_string())
1353        );
1354        // Operation should also resolve (via SID→CID→channel)
1355        let pv = tracker.resolve_pv_name(&key, 500, 1);
1356        assert_eq!(pv, Some("RESOLVED:PV".to_string()));
1357    }
1358}