Skip to main content

spvirit_codec/
spvirit_state.rs

1//! PVA Connection State Tracker
2//!
3//! Tracks channel mappings (CID ↔ SID ↔ PV name) and operation states
4//! to enable full decoding of MONITOR packets.
5
6use std::collections::HashMap;
7use std::collections::HashSet;
8use std::collections::VecDeque;
9use std::net::{IpAddr, SocketAddr};
10use std::time::{Duration, Instant};
11use tracing::debug;
12
13use crate::spvd_decode::StructureDesc;
14
15/// Configuration for the PVA state tracker
16#[derive(Debug, Clone)]
17pub struct PvaStateConfig {
18    /// Maximum number of channels to track (default: 40000)
19    pub max_channels: usize,
20    /// Time-to-live for channel entries (default: 5 minutes)
21    pub channel_ttl: Duration,
22    /// Maximum number of operations to track per connection
23    pub max_operations: usize,
24    /// Maximum update timestamps kept per connection for rate calculation (default: 10000)
25    pub max_update_rate: usize,
26}
27
28impl Default for PvaStateConfig {
29    fn default() -> Self {
30        Self {
31            max_channels: 40_000,
32            channel_ttl: Duration::from_secs(5 * 60), // 5 minutes
33            max_operations: 10_000,
34            max_update_rate: 10_000,
35        }
36    }
37}
38
39impl PvaStateConfig {
40    pub fn new(max_channels: usize, ttl_secs: u64) -> Self {
41        Self {
42            max_channels,
43            channel_ttl: Duration::from_secs(ttl_secs),
44            max_operations: 10_000,
45            max_update_rate: 10_000,
46        }
47    }
48
49    pub fn with_max_update_rate(mut self, max_update_rate: usize) -> Self {
50        self.max_update_rate = max_update_rate;
51        self
52    }
53}
54
55/// Unique key for a TCP connection (canonical - order independent)
56#[derive(Debug, Clone, Hash, PartialEq, Eq)]
57pub struct ConnectionKey {
58    /// Lower address (lexicographically sorted for consistency)
59    pub addr_a: SocketAddr,
60    /// Higher address
61    pub addr_b: SocketAddr,
62}
63
64impl ConnectionKey {
65    /// Create a canonical connection key (order independent)
66    pub fn new(addr1: SocketAddr, addr2: SocketAddr) -> Self {
67        // Always store in sorted order for consistent hashing
68        if addr1 <= addr2 {
69            Self {
70                addr_a: addr1,
71                addr_b: addr2,
72            }
73        } else {
74            Self {
75                addr_a: addr2,
76                addr_b: addr1,
77            }
78        }
79    }
80
81    /// Create from IP strings and ports (convenience method)
82    /// Order of arguments doesn't matter - will be canonicalized
83    pub fn from_parts(ip1: &str, port1: u16, ip2: &str, port2: u16) -> Option<Self> {
84        let addr1: SocketAddr = format!("{}:{}", ip1, port1).parse().ok()?;
85        let addr2: SocketAddr = format!("{}:{}", ip2, port2).parse().ok()?;
86        Some(Self::new(addr1, addr2))
87    }
88}
89
90/// Information about a channel (PV)
91#[derive(Debug, Clone)]
92pub struct ChannelInfo {
93    /// PV name
94    pub pv_name: String,
95    /// Client Channel ID
96    pub cid: u32,
97    /// Server Channel ID (assigned by server in CREATE_CHANNEL response)
98    pub sid: Option<u32>,
99    /// When this channel was created/last accessed
100    pub last_seen: Instant,
101    /// Whether we saw the full CREATE_CHANNEL exchange
102    pub fully_established: bool,
103    pub update_times: VecDeque<Instant>,
104    pub recent_messages: VecDeque<String>,
105}
106
107impl ChannelInfo {
108    pub fn new_pending(cid: u32, pv_name: String) -> Self {
109        Self {
110            pv_name,
111            cid,
112            sid: None,
113            last_seen: Instant::now(),
114            fully_established: false,
115            update_times: VecDeque::new(),
116            recent_messages: VecDeque::new(),
117        }
118    }
119
120    pub fn touch(&mut self) {
121        self.last_seen = Instant::now();
122    }
123
124    pub fn is_expired(&self, ttl: Duration) -> bool {
125        self.last_seen.elapsed() > ttl
126    }
127}
128
129/// State for an active operation (GET/PUT/MONITOR etc.)
130#[derive(Debug, Clone)]
131pub struct OperationState {
132    /// Server channel ID this operation is on
133    pub sid: u32,
134    /// Operation ID
135    pub ioid: u32,
136    /// Command type (10=GET, 11=PUT, 13=MONITOR, etc.)
137    pub command: u8,
138    /// PV name (resolved from channel state)
139    pub pv_name: Option<String>,
140    /// Field description from INIT response (parsed introspection)
141    pub field_desc: Option<StructureDesc>,
142    /// Whether INIT phase completed
143    pub initialized: bool,
144    /// Last activity
145    pub last_seen: Instant,
146    pub update_times: VecDeque<Instant>,
147    pub recent_messages: VecDeque<String>,
148}
149
150impl OperationState {
151    pub fn new(sid: u32, ioid: u32, command: u8, pv_name: Option<String>) -> Self {
152        Self {
153            sid,
154            ioid,
155            command,
156            pv_name,
157            field_desc: None,
158            initialized: false,
159            last_seen: Instant::now(),
160            update_times: VecDeque::new(),
161            recent_messages: VecDeque::new(),
162        }
163    }
164
165    pub fn touch(&mut self) {
166        self.last_seen = Instant::now();
167    }
168}
169
170/// Per-connection state
171#[derive(Debug)]
172pub struct ConnectionState {
173    /// Channels indexed by Client ID
174    pub channels_by_cid: HashMap<u32, ChannelInfo>,
175    /// Server ID → Client ID mapping
176    pub sid_to_cid: HashMap<u32, u32>,
177    /// Operations indexed by IOID
178    pub operations: HashMap<u32, OperationState>,
179    /// Byte order for this connection (true = big endian)
180    pub is_be: bool,
181    /// Last activity on this connection
182    pub last_seen: Instant,
183    pub update_times: VecDeque<Instant>,
184    pub recent_messages: VecDeque<String>,
185}
186
187impl ConnectionState {
188    pub fn new() -> Self {
189        Self {
190            channels_by_cid: HashMap::new(),
191            sid_to_cid: HashMap::new(),
192            operations: HashMap::new(),
193            is_be: false, // Default to little endian
194            last_seen: Instant::now(),
195            update_times: VecDeque::new(),
196            recent_messages: VecDeque::new(),
197        }
198    }
199
200    pub fn touch(&mut self) {
201        self.last_seen = Instant::now();
202    }
203
204    /// Get channel info by Server ID
205    pub fn get_channel_by_sid(&self, sid: u32) -> Option<&ChannelInfo> {
206        self.sid_to_cid
207            .get(&sid)
208            .and_then(|cid| self.channels_by_cid.get(cid))
209    }
210
211    /// Get mutable channel info by Server ID
212    pub fn get_channel_by_sid_mut(&mut self, sid: u32) -> Option<&mut ChannelInfo> {
213        if let Some(&cid) = self.sid_to_cid.get(&sid) {
214            self.channels_by_cid.get_mut(&cid)
215        } else {
216            None
217        }
218    }
219
220    /// Get PV name for a Server ID
221    pub fn get_pv_name_by_sid(&self, sid: u32) -> Option<&str> {
222        self.get_channel_by_sid(sid).map(|ch| ch.pv_name.as_str())
223    }
224
225    /// Get PV name for an operation IOID
226    pub fn get_pv_name_by_ioid(&self, ioid: u32) -> Option<&str> {
227        self.operations
228            .get(&ioid)
229            .and_then(|op| op.pv_name.as_deref())
230    }
231}
232
233impl Default for ConnectionState {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239/// Global PVA state tracker across all connections
240#[derive(Debug)]
241pub struct PvaStateTracker {
242    /// Configuration
243    config: PvaStateConfig,
244    /// Per-connection state
245    connections: HashMap<ConnectionKey, ConnectionState>,
246    /// Total channel count across all connections (for limit enforcement)
247    total_channels: usize,
248    /// Statistics
249    pub stats: PvaStateStats,
250    /// (client_ip, CID) → PV name cache from SEARCH messages
251    /// Scoped by client IP to prevent CID collisions across different clients
252    search_cache: HashMap<(IpAddr, u32), String>,
253    /// Flat CID → PV name fallback (last-writer-wins, used when client IP is unknown)
254    search_cache_flat: HashMap<u32, String>,
255}
256
257/// Statistics for monitoring
258#[derive(Debug, Default, Clone)]
259pub struct PvaStateStats {
260    pub channels_created: u64,
261    pub channels_destroyed: u64,
262    pub channels_expired: u64,
263    pub channels_evicted: u64,
264    pub operations_created: u64,
265    pub operations_completed: u64,
266    pub create_channel_requests: u64,
267    pub create_channel_responses: u64,
268    pub search_responses_resolved: u64,
269    pub search_cache_entries: u64,
270    pub search_retroactive_resolves: u64,
271    /// PVA messages with is_server=false (sent by client)
272    pub client_messages: u64,
273    /// PVA messages with is_server=true (sent by server)
274    pub server_messages: u64,
275}
276
277#[derive(Debug, Clone)]
278pub struct ConnectionSnapshot {
279    pub addr_a: SocketAddr,
280    pub addr_b: SocketAddr,
281    pub channel_count: usize,
282    pub operation_count: usize,
283    pub last_seen: Duration,
284    pub pv_names: Vec<String>,
285    pub updates_per_sec: f64,
286    pub recent_messages: Vec<String>,
287    pub mid_stream: bool,
288    pub is_beacon: bool,
289    pub is_broadcast: bool,
290}
291
292#[derive(Debug, Clone)]
293pub struct ChannelSnapshot {
294    pub addr_a: SocketAddr,
295    pub addr_b: SocketAddr,
296    pub cid: u32,
297    pub sid: Option<u32>,
298    pub pv_name: String,
299    pub last_seen: Duration,
300    pub updates_per_sec: f64,
301    pub recent_messages: Vec<String>,
302    pub mid_stream: bool,
303    pub is_beacon: bool,
304    pub is_broadcast: bool,
305}
306
307impl PvaStateTracker {
308    fn is_broadcast_addr(addr: &SocketAddr) -> bool {
309        match addr.ip() {
310            std::net::IpAddr::V4(v4) => {
311                if v4.is_broadcast() {
312                    return true;
313                }
314                v4.octets()[3] == 255
315            }
316            std::net::IpAddr::V6(v6) => {
317                // IPv6 has no broadcast; treat multicast as equivalent for PVA
318                v6.is_multicast()
319            }
320        }
321    }
322    pub fn new(config: PvaStateConfig) -> Self {
323        Self {
324            config,
325            connections: HashMap::new(),
326            total_channels: 0,
327            stats: PvaStateStats::default(),
328            search_cache: HashMap::new(),
329            search_cache_flat: HashMap::new(),
330        }
331    }
332
333    pub fn with_defaults() -> Self {
334        Self::new(PvaStateConfig::default())
335    }
336
337    /// Get or create connection state
338    fn get_or_create_connection(&mut self, key: &ConnectionKey) -> &mut ConnectionState {
339        if !self.connections.contains_key(key) {
340            self.connections.insert(key.clone(), ConnectionState::new());
341        }
342        self.connections.get_mut(key).unwrap()
343    }
344
345    /// Get connection state (read-only)
346    pub fn get_connection(&self, key: &ConnectionKey) -> Option<&ConnectionState> {
347        self.connections.get(key)
348    }
349
350    /// Get PV name by SID for a connection
351    pub fn get_pv_name_by_sid(&self, conn_key: &ConnectionKey, sid: u32) -> Option<String> {
352        self.connections
353            .get(conn_key)
354            .and_then(|conn| conn.get_pv_name_by_sid(sid))
355            .map(|s| s.to_string())
356    }
357
358    /// Handle CREATE_CHANNEL request (client → server)
359    /// Called when we see cmd=7 from client with CID and PV name
360    pub fn on_create_channel_request(
361        &mut self,
362        conn_key: &ConnectionKey,
363        cid: u32,
364        pv_name: String,
365    ) {
366        self.stats.create_channel_requests += 1;
367
368        // Also cache in search_cache so it's available as fallback
369        // Extract client IP from connection key (client is the one sending the request)
370        let client_ip = conn_key.addr_a.ip(); // either side works as flat fallback
371        self.search_cache.insert((client_ip, cid), pv_name.clone());
372        self.search_cache_flat.insert(cid, pv_name.clone());
373
374        // Check channel limit
375        if self.total_channels >= self.config.max_channels {
376            self.evict_oldest_channels(100); // Evict 100 oldest
377        }
378
379        let conn = self.get_or_create_connection(conn_key);
380        conn.touch();
381
382        // Only add if not already present
383        if !conn.channels_by_cid.contains_key(&cid) {
384            conn.channels_by_cid
385                .insert(cid, ChannelInfo::new_pending(cid, pv_name));
386            self.total_channels += 1;
387            self.stats.channels_created += 1;
388            debug!("CREATE_CHANNEL request: cid={}", cid);
389        }
390    }
391
392    /// Handle CREATE_CHANNEL response (server → client)
393    /// Called when we see cmd=7 from server with CID and SID
394    pub fn on_create_channel_response(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
395        self.stats.create_channel_responses += 1;
396
397        // Look up search cache BEFORE borrowing self mutably via get_or_create_connection
398        // Try scoped cache first (both sides of the connection key), then flat fallback
399        let cached_pv_name = self
400            .search_cache
401            .get(&(conn_key.addr_a.ip(), cid))
402            .or_else(|| self.search_cache.get(&(conn_key.addr_b.ip(), cid)))
403            .or_else(|| self.search_cache_flat.get(&cid))
404            .cloned();
405
406        let conn = self.get_or_create_connection(conn_key);
407        conn.touch();
408
409        if let Some(channel) = conn.channels_by_cid.get_mut(&cid) {
410            channel.sid = Some(sid);
411            channel.fully_established = true;
412            channel.touch();
413            conn.sid_to_cid.insert(sid, cid);
414            debug!(
415                "CREATE_CHANNEL response: cid={}, sid={}, pv={}",
416                cid, sid, channel.pv_name
417            );
418        } else {
419            // We missed the request - try search cache first, then create placeholder
420            let pv_name = cached_pv_name
421                .unwrap_or_else(|| format!("<unknown:cid={}>", cid));
422            let is_resolved = !pv_name.starts_with("<unknown");
423            debug!(
424                "CREATE_CHANNEL response without request: cid={}, sid={}, resolved={}",
425                cid, sid, is_resolved
426            );
427            let mut channel = ChannelInfo::new_pending(cid, pv_name);
428            channel.sid = Some(sid);
429            channel.fully_established = is_resolved;
430            conn.channels_by_cid.insert(cid, channel);
431            conn.sid_to_cid.insert(sid, cid);
432            self.total_channels += 1;
433        }
434    }
435
436    /// Handle DESTROY_CHANNEL (cmd=8)
437    pub fn on_destroy_channel(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
438        if let Some(conn) = self.connections.get_mut(conn_key) {
439            conn.touch();
440
441            // Remove by CID
442            if conn.channels_by_cid.remove(&cid).is_some() {
443                self.total_channels = self.total_channels.saturating_sub(1);
444                self.stats.channels_destroyed += 1;
445            }
446
447            // Remove SID mapping
448            conn.sid_to_cid.remove(&sid);
449
450            // Remove any operations on this channel
451            conn.operations.retain(|_, op| op.sid != sid);
452
453            debug!("DESTROY_CHANNEL: cid={}, sid={}", cid, sid);
454        }
455    }
456
457    /// Handle operation INIT request (client → server)
458    /// subcmd & 0x08 indicates INIT
459    pub fn on_op_init_request(
460        &mut self,
461        conn_key: &ConnectionKey,
462        sid: u32,
463        ioid: u32,
464        command: u8,
465    ) {
466        let max_ops = self.config.max_operations;
467        let conn = self.get_or_create_connection(conn_key);
468        conn.touch();
469
470        let pv_name = conn.get_pv_name_by_sid(sid).map(|s| s.to_string());
471
472        if conn.operations.len() < max_ops {
473            conn.operations
474                .insert(ioid, OperationState::new(sid, ioid, command, pv_name));
475            self.stats.operations_created += 1;
476            debug!(
477                "Operation INIT: sid={}, ioid={}, cmd={}",
478                sid, ioid, command
479            );
480        }
481    }
482
483    /// Handle operation INIT response (server → client)
484    /// Contains type introspection data
485    pub fn on_op_init_response(
486        &mut self,
487        conn_key: &ConnectionKey,
488        ioid: u32,
489        field_desc: Option<StructureDesc>,
490    ) {
491        if let Some(conn) = self.connections.get_mut(conn_key) {
492            conn.touch();
493
494            if let Some(op) = conn.operations.get_mut(&ioid) {
495                op.field_desc = field_desc;
496                op.initialized = true;
497                op.touch();
498                debug!("Operation INIT response: ioid={}", ioid);
499            }
500        }
501    }
502
503    /// Handle operation DESTROY (subcmd & 0x10)
504    pub fn on_op_destroy(&mut self, conn_key: &ConnectionKey, ioid: u32) {
505        if let Some(conn) = self.connections.get_mut(conn_key) {
506            if conn.operations.remove(&ioid).is_some() {
507                self.stats.operations_completed += 1;
508            }
509        }
510    }
511
512    /// Touch connection, operation, and channel activity for any op message (data updates, etc.)
513    /// If the IOID is unknown (mid-stream join), auto-creates a placeholder operation
514    /// so the connection appears on the Connections page.
515    pub fn on_op_activity(&mut self, conn_key: &ConnectionKey, sid: u32, ioid: u32, command: u8) {
516        let max_update_rate = self.config.max_update_rate;
517        let max_ops = self.config.max_operations;
518        let mut created_placeholder = false;
519
520        let conn = self.get_or_create_connection(conn_key);
521        conn.touch();
522
523        Self::record_update(&mut conn.update_times, max_update_rate);
524
525        let mut channel_sid = if sid != 0 { Some(sid) } else { None };
526        if let Some(op) = conn.operations.get_mut(&ioid) {
527            op.touch();
528            Self::record_update(&mut op.update_times, max_update_rate);
529            if channel_sid.is_none() {
530                channel_sid = Some(op.sid);
531            }
532        } else if conn.operations.len() < max_ops {
533            // Mid-stream: we missed the INIT exchange, create a placeholder operation
534            // so this connection/channel is visible on the Connections page.
535            let pv_name = if sid != 0 {
536                conn.get_pv_name_by_sid(sid).map(|s| s.to_string())
537            } else if conn.channels_by_cid.len() == 1 && conn.operations.is_empty() {
538                // Server Op messages have sid=0; only use single-channel fallback
539                // when this is the very first operation (no other ops yet).
540                // If there are already other operations, this is likely a
541                // multiplexed connection and the fallback would be wrong.
542                conn.channels_by_cid.values().next()
543                    .map(|ch| ch.pv_name.clone())
544                    .filter(|n| !n.starts_with("<unknown"))
545            } else {
546                None
547            };
548            conn.operations.insert(ioid, OperationState::new(sid, ioid, command, pv_name));
549            created_placeholder = true;
550        }
551
552        if let Some(sid_val) = channel_sid {
553            if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
554                channel.touch();
555                Self::record_update(&mut channel.update_times, max_update_rate);
556            }
557        }
558
559        // Deferred stat update — can't touch self.stats while conn borrows self
560        if created_placeholder {
561            self.stats.operations_created += 1;
562            debug!("Auto-created placeholder operation for mid-stream traffic: sid={}, ioid={}, cmd={}", sid, ioid, command);
563        }
564    }
565
566    /// Cache PV name mappings from SEARCH messages (CID → PV name)
567    /// These serve as fallback when the client's CREATE_CHANNEL request is missed.
568    /// Also retroactively resolves any existing `<unknown:cid=N>` channels and
569    /// placeholder operations that match the CIDs in this SEARCH.
570    /// `source_ip` is the IP of the client that sent the SEARCH request.
571    pub fn on_search(&mut self, pv_requests: &[(u32, String)], source_ip: Option<IpAddr>) {
572        // Build a lookup map for this batch
573        let cid_to_pv: HashMap<u32, String> = pv_requests.iter().cloned().collect();
574
575        for (cid, pv_name) in pv_requests {
576            if let Some(ip) = source_ip {
577                self.search_cache.insert((ip, *cid), pv_name.clone());
578            }
579            // Always populate flat fallback
580            self.search_cache_flat.insert(*cid, pv_name.clone());
581        }
582
583        // Retroactively resolve existing unknown channels and operations.
584        // Walk all connections and fix any <unknown:cid=N> entries whose CID
585        // matches a CID from this SEARCH request.
586        let mut retroactive_count: u64 = 0;
587        for conn in self.connections.values_mut() {
588            for (cid, channel) in conn.channels_by_cid.iter_mut() {
589                if channel.pv_name.starts_with("<unknown") {
590                    if let Some(pv_name) = cid_to_pv.get(cid) {
591                        debug!(
592                            "Retroactive PV resolve from SEARCH: cid={} {} -> {}",
593                            cid, channel.pv_name, pv_name
594                        );
595                        channel.pv_name = pv_name.clone();
596                        channel.fully_established = true;
597                        retroactive_count += 1;
598                    }
599                }
600            }
601
602            // Also update placeholder operations that have pv_name=None
603            // or stale <unknown...> names, and whose SID maps to a resolved channel
604            for op in conn.operations.values_mut() {
605                let needs_update = match &op.pv_name {
606                    None => true,
607                    Some(name) => name.starts_with("<unknown"),
608                };
609                if needs_update && op.sid != 0 {
610                    if let Some(&cid) = conn.sid_to_cid.get(&op.sid) {
611                        if let Some(pv_name) = cid_to_pv.get(&cid) {
612                            op.pv_name = Some(pv_name.clone());
613                        }
614                    }
615                }
616            }
617        }
618        if retroactive_count > 0 {
619            self.stats.search_retroactive_resolves += retroactive_count;
620            debug!(
621                "Retroactively resolved {} unknown channels from SEARCH cache",
622                retroactive_count
623            );
624        }
625
626        // Update search cache size stat
627        self.stats.search_cache_entries = self.search_cache_flat.len() as u64;
628
629        // Cap cache sizes to prevent unbounded growth
630        while self.search_cache.len() > 50_000 {
631            if let Some(key) = self.search_cache.keys().next().cloned() {
632                self.search_cache.remove(&key);
633            }
634        }
635        while self.search_cache_flat.len() > 50_000 {
636            if let Some(key) = self.search_cache_flat.keys().next().cloned() {
637                self.search_cache_flat.remove(&key);
638            }
639        }
640    }
641
642    /// Resolve PV names from SEARCH_RESPONSE CIDs using the search cache.
643    /// Returns a list of (CID, resolved_pv_name) pairs for all CIDs that could be resolved.
644    /// `source_ip` is optionally the IP of the server that sent the response;
645    /// we try scoped lookups using peer IPs, then fall back to flat cache.
646    pub fn resolve_search_cids(
647        &mut self,
648        cids: &[u32],
649        peer_ip: Option<IpAddr>,
650    ) -> Vec<(u32, String)> {
651        let mut resolved = Vec::new();
652        for &cid in cids {
653            // Try scoped cache with peer IP (the client that originally searched),
654            // then fall back to flat cache
655            let pv_name = peer_ip
656                .and_then(|ip| self.search_cache.get(&(ip, cid)))
657                .or_else(|| self.search_cache_flat.get(&cid))
658                .cloned();
659            if let Some(name) = pv_name {
660                resolved.push((cid, name));
661                self.stats.search_responses_resolved += 1;
662            }
663        }
664        resolved
665    }
666
667    /// Count a PVA message direction (for messages not routed through on_message)
668    pub fn count_direction(&mut self, is_server: bool) {
669        if is_server {
670            self.stats.server_messages += 1;
671        } else {
672            self.stats.client_messages += 1;
673        }
674    }
675
676    pub fn on_message(
677        &mut self,
678        conn_key: &ConnectionKey,
679        sid: u32,
680        ioid: u32,
681        request_type: &str,
682        message: String,
683        is_server: bool,
684    ) {
685        let conn = self.get_or_create_connection(conn_key);
686        conn.touch();
687        let dir = if is_server { "S>" } else { "C>" };
688        let full_message = format!("{} {} {}", dir, request_type, message);
689        Self::push_message(&mut conn.recent_messages, full_message.clone());
690
691        let mut channel_sid = if sid != 0 { Some(sid) } else { None };
692        if let Some(op) = conn.operations.get_mut(&ioid) {
693            Self::push_message(&mut op.recent_messages, full_message.clone());
694            if channel_sid.is_none() {
695                channel_sid = Some(op.sid);
696            }
697        }
698        if let Some(sid_val) = channel_sid {
699            if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
700                Self::push_message(&mut channel.recent_messages, full_message);
701            }
702        }
703    }
704
705    fn record_update(times: &mut VecDeque<Instant>, max_update_rate: usize) {
706        let now = Instant::now();
707        times.push_back(now);
708        Self::trim_times(times, now);
709        while times.len() > max_update_rate {
710            times.pop_front();
711        }
712    }
713
714    fn trim_times(times: &mut VecDeque<Instant>, now: Instant) {
715        while let Some(front) = times.front() {
716            if now.duration_since(*front) > Duration::from_secs(1) {
717                times.pop_front();
718            } else {
719                break;
720            }
721        }
722    }
723
724    fn updates_per_sec(times: &VecDeque<Instant>) -> f64 {
725        times.len() as f64
726    }
727
728    fn push_message(messages: &mut VecDeque<String>, message: String) {
729        messages.push_back(message);
730        while messages.len() > 30 {
731            messages.pop_front();
732        }
733    }
734
735    /// Resolve PV name for a MONITOR/GET/PUT packet
736    pub fn resolve_pv_name(&self, conn_key: &ConnectionKey, sid: u32, ioid: u32) -> Option<String> {
737        let conn = self.connections.get(conn_key)?;
738
739        // First try by IOID (operation state) - works for server responses
740        if let Some(op) = conn.operations.get(&ioid) {
741            if let Some(ref name) = op.pv_name {
742                if !name.starts_with("<unknown") {
743                    return Some(name.clone());
744                }
745            }
746        }
747
748        // Fall back to SID lookup - works for client requests
749        if sid != 0 {
750            if let Some(name) = conn.get_pv_name_by_sid(sid) {
751                return Some(name.to_string());
752            }
753        }
754
755        // Last resort: if there's exactly one channel AND at most one operation,
756        // use that channel's PV name. This handles simple single-PV connections
757        // where the server Op message has sid_or_cid=0.
758        //
759        // IMPORTANT: Do NOT use this fallback when there are multiple operations,
760        // because PVA multiplexes many channels over one TCP connection (e.g.
761        // Phoebus). If we only captured one CREATE_CHANNEL but there are many
762        // ops, the other ops likely belong to different PVs that were established
763        // before our capture started.
764        if conn.channels_by_cid.len() == 1 && conn.operations.len() <= 1 {
765            if let Some(ch) = conn.channels_by_cid.values().next() {
766                if !ch.pv_name.starts_with("<unknown") {
767                    return Some(ch.pv_name.clone());
768                }
769            }
770        }
771
772        None
773    }
774
775    /// Get the number of active tracked channels
776    pub fn active_channel_count(&self) -> usize {
777        self.total_channels
778    }
779
780    /// Get the number of active tracked connections
781    pub fn active_connection_count(&self) -> usize {
782        self.connections.len()
783    }
784
785    /// Check if a connection is mid-stream (incomplete channel state)
786    pub fn is_connection_mid_stream(&self, conn_key: &ConnectionKey) -> bool {
787        self.connections
788            .get(conn_key)
789            .map(|conn| {
790                // Operations exist but no channels tracked → definitely mid-stream
791                if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
792                    return true;
793                }
794                // Any channel not fully established → mid-stream
795                conn.channels_by_cid.values().any(|ch| !ch.fully_established)
796            })
797            .unwrap_or(false)
798    }
799
800    /// Get operation state for decoding values
801    pub fn get_operation(&self, conn_key: &ConnectionKey, ioid: u32) -> Option<&OperationState> {
802        self.connections
803            .get(conn_key)
804            .and_then(|conn| conn.operations.get(&ioid))
805    }
806
807    /// Evict oldest channels when at capacity
808    fn evict_oldest_channels(&mut self, count: usize) {
809        let mut oldest: Vec<(ConnectionKey, u32, Instant)> = Vec::new();
810
811        for (conn_key, conn) in &self.connections {
812            for (cid, channel) in &conn.channels_by_cid {
813                oldest.push((conn_key.clone(), *cid, channel.last_seen));
814            }
815        }
816
817        // Sort by last_seen (oldest first)
818        oldest.sort_by_key(|(_, _, t)| *t);
819
820        // Remove oldest
821        for (conn_key, cid, _) in oldest.into_iter().take(count) {
822            if let Some(conn) = self.connections.get_mut(&conn_key) {
823                if let Some(channel) = conn.channels_by_cid.remove(&cid) {
824                    if let Some(sid) = channel.sid {
825                        conn.sid_to_cid.remove(&sid);
826                    }
827                    self.total_channels = self.total_channels.saturating_sub(1);
828                    self.stats.channels_evicted += 1;
829                }
830            }
831        }
832    }
833
834    /// Periodic cleanup of expired entries
835    pub fn cleanup_expired(&mut self) {
836        let ttl = self.config.channel_ttl;
837        let mut expired_count = 0;
838
839        for conn in self.connections.values_mut() {
840            let expired_cids: Vec<u32> = conn
841                .channels_by_cid
842                .iter()
843                .filter(|(_, ch)| ch.is_expired(ttl))
844                .map(|(cid, _)| *cid)
845                .collect();
846
847            for cid in expired_cids {
848                if let Some(channel) = conn.channels_by_cid.remove(&cid) {
849                    if let Some(sid) = channel.sid {
850                        conn.sid_to_cid.remove(&sid);
851                        conn.operations.retain(|_, op| op.sid != sid);
852                    }
853                    expired_count += 1;
854                }
855            }
856        }
857
858        if expired_count > 0 {
859            self.total_channels = self.total_channels.saturating_sub(expired_count);
860            self.stats.channels_expired += expired_count as u64;
861            debug!("Cleaned up {} expired channels", expired_count);
862        }
863
864        // Remove empty connections
865        self.connections
866            .retain(|_, conn| !conn.channels_by_cid.is_empty() || !conn.operations.is_empty());
867    }
868
869    /// Get summary statistics
870    pub fn summary(&self) -> String {
871        format!(
872            "PVA State: {} connections, {} channels (created={}, destroyed={}, expired={}, evicted={})",
873            self.connections.len(),
874            self.total_channels,
875            self.stats.channels_created,
876            self.stats.channels_destroyed,
877            self.stats.channels_expired,
878            self.stats.channels_evicted,
879        )
880    }
881
882    /// Get current channel count
883    pub fn channel_count(&self) -> usize {
884        self.total_channels
885    }
886
887    /// Get current connection count
888    pub fn connection_count(&self) -> usize {
889        self.connections.len()
890    }
891
892    pub fn connection_snapshots(&self) -> Vec<ConnectionSnapshot> {
893        let mut snapshots = Vec::new();
894        let now = Instant::now();
895        for (conn_key, conn) in &self.connections {
896            let mut update_times = conn.update_times.clone();
897            Self::trim_times(&mut update_times, now);
898            let mut pv_names: Vec<String> = conn
899                .channels_by_cid
900                .values()
901                .map(|ch| ch.pv_name.clone())
902                .collect();
903            pv_names.sort();
904            pv_names.truncate(8);
905            let mut messages: Vec<String> = conn.recent_messages.iter().cloned().collect();
906            if messages.len() > 20 {
907                messages = messages.split_off(messages.len() - 20);
908            }
909            let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
910            let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
911                || Self::is_broadcast_addr(&conn_key.addr_b);
912            let mut mid_stream = false;
913            if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
914                mid_stream = true;
915            }
916            if conn
917                .channels_by_cid
918                .values()
919                .any(|ch| !ch.fully_established || ch.pv_name.starts_with("<unknown"))
920            {
921                mid_stream = true;
922            }
923
924            snapshots.push(ConnectionSnapshot {
925                addr_a: conn_key.addr_a,
926                addr_b: conn_key.addr_b,
927                channel_count: conn.channels_by_cid.len(),
928                operation_count: conn.operations.len(),
929                last_seen: conn.last_seen.elapsed(),
930                pv_names,
931                updates_per_sec: Self::updates_per_sec(&update_times),
932                recent_messages: messages,
933                mid_stream,
934                is_beacon,
935                is_broadcast,
936            });
937        }
938        snapshots
939    }
940
941    pub fn channel_snapshots(&self) -> Vec<ChannelSnapshot> {
942        let mut snapshots = Vec::new();
943        let now = Instant::now();
944        for (conn_key, conn) in &self.connections {
945            for channel in conn.channels_by_cid.values() {
946                let mut update_times = channel.update_times.clone();
947                Self::trim_times(&mut update_times, now);
948                let mut messages: Vec<String> = channel.recent_messages.iter().cloned().collect();
949                if messages.len() > 20 {
950                    messages = messages.split_off(messages.len() - 20);
951                }
952                let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
953                let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
954                    || Self::is_broadcast_addr(&conn_key.addr_b);
955                snapshots.push(ChannelSnapshot {
956                    addr_a: conn_key.addr_a,
957                    addr_b: conn_key.addr_b,
958                    cid: channel.cid,
959                    sid: channel.sid,
960                    pv_name: channel.pv_name.clone(),
961                    last_seen: channel.last_seen.elapsed(),
962                    updates_per_sec: Self::updates_per_sec(&update_times),
963                    recent_messages: messages,
964                    mid_stream: !channel.fully_established
965                        || channel.pv_name.starts_with("<unknown"),
966                    is_beacon,
967                    is_broadcast,
968                });
969            }
970
971            // Avoid emitting duplicate fallback rows when multiple operations
972            // reference the same unresolved SID/PV on one connection.
973            let mut seen_virtual = HashSet::new();
974            for op in conn.operations.values() {
975                if conn.get_channel_by_sid(op.sid).is_none() {
976                    let mut update_times = op.update_times.clone();
977                    Self::trim_times(&mut update_times, now);
978                    let mut messages: Vec<String> = op.recent_messages.iter().cloned().collect();
979                    if messages.len() > 20 {
980                        messages = messages.split_off(messages.len() - 20);
981                    }
982                    let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
983                    let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
984                        || Self::is_broadcast_addr(&conn_key.addr_b);
985                    let pv_name = op
986                        .pv_name
987                        .clone()
988                        .unwrap_or_else(|| format!("<unknown:sid={}>", op.sid));
989                    if !seen_virtual.insert((op.sid, pv_name.clone())) {
990                        continue;
991                    }
992                    snapshots.push(ChannelSnapshot {
993                        addr_a: conn_key.addr_a,
994                        addr_b: conn_key.addr_b,
995                        cid: 0,
996                        sid: Some(op.sid),
997                        pv_name,
998                        last_seen: op.last_seen.elapsed(),
999                        updates_per_sec: Self::updates_per_sec(&update_times),
1000                        recent_messages: messages,
1001                        mid_stream: true,
1002                        is_beacon,
1003                        is_broadcast,
1004                    });
1005                }
1006            }
1007        }
1008        snapshots
1009    }
1010}
1011
1012#[cfg(test)]
1013mod tests {
1014    use super::*;
1015
1016    fn test_conn_key() -> ConnectionKey {
1017        ConnectionKey::from_parts("192.168.1.1", 12345, "192.168.1.2", 5075).unwrap()
1018    }
1019
1020    #[test]
1021    fn test_create_channel_flow() {
1022        let mut tracker = PvaStateTracker::with_defaults();
1023        let key = test_conn_key();
1024
1025        // Client sends CREATE_CHANNEL
1026        tracker.on_create_channel_request(&key, 1, "TEST:PV:VALUE".to_string());
1027        assert_eq!(tracker.channel_count(), 1);
1028
1029        // Server responds
1030        tracker.on_create_channel_response(&key, 1, 100);
1031
1032        // Verify we can resolve the PV name
1033        let pv_name = tracker.resolve_pv_name(&key, 100, 0);
1034        assert_eq!(pv_name, Some("TEST:PV:VALUE".to_string()));
1035    }
1036
1037    #[test]
1038    fn test_channel_limit() {
1039        let config = PvaStateConfig::new(100, 300);
1040        let mut tracker = PvaStateTracker::new(config);
1041        let key = test_conn_key();
1042
1043        // Add 150 channels (exceeds limit of 100)
1044        for i in 0..150 {
1045            tracker.on_create_channel_request(&key, i, format!("PV:{}", i));
1046        }
1047
1048        // Should have evicted some
1049        assert!(tracker.channel_count() <= 100);
1050    }
1051
1052    #[test]
1053    fn test_destroy_channel() {
1054        let mut tracker = PvaStateTracker::with_defaults();
1055        let key = test_conn_key();
1056
1057        tracker.on_create_channel_request(&key, 1, "TEST:PV".to_string());
1058        tracker.on_create_channel_response(&key, 1, 100);
1059        assert_eq!(tracker.channel_count(), 1);
1060
1061        tracker.on_destroy_channel(&key, 1, 100);
1062        assert_eq!(tracker.channel_count(), 0);
1063    }
1064
1065    #[test]
1066    fn test_channel_snapshots_dedup_unresolved_sid_rows() {
1067        let mut tracker = PvaStateTracker::with_defaults();
1068        let key = test_conn_key();
1069
1070        // Two operations on same unresolved SID should collapse to one virtual channel row.
1071        tracker.on_op_init_request(&key, 777, 1001, 13);
1072        tracker.on_op_init_request(&key, 777, 1002, 13);
1073        tracker.on_op_activity(&key, 777, 1001, 13);
1074        tracker.on_op_activity(&key, 777, 1002, 13);
1075
1076        let snapshots = tracker.channel_snapshots();
1077        assert_eq!(snapshots.len(), 1);
1078        assert_eq!(snapshots[0].sid, Some(777));
1079    }
1080
1081    #[test]
1082    fn test_single_channel_fallback_works_for_simple_connection() {
1083        // When there is truly one channel and zero/one operations, the
1084        // single-channel fallback should resolve the PV name from sid=0.
1085        let mut tracker = PvaStateTracker::with_defaults();
1086        let key = test_conn_key();
1087
1088        tracker.on_create_channel_request(&key, 1, "SIMPLE:PV".to_string());
1089        tracker.on_create_channel_response(&key, 1, 100);
1090
1091        // sid=0, ioid=99 — no matching operation
1092        let pv = tracker.resolve_pv_name(&key, 0, 99);
1093        assert_eq!(pv, Some("SIMPLE:PV".to_string()));
1094    }
1095
1096    #[test]
1097    fn test_no_false_attribution_on_multiplexed_connection() {
1098        // Phoebus scenario: one TCP connection carries many channels, but we
1099        // only captured one CREATE_CHANNEL.  When additional ops arrive with
1100        // sid=0 (server direction), the single-channel fallback must NOT
1101        // attribute them to the one known channel.
1102        let mut tracker = PvaStateTracker::with_defaults();
1103        let key = test_conn_key();
1104
1105        // Capture one channel
1106        tracker.on_create_channel_request(&key, 1, "CAPTURED:PV".to_string());
1107        tracker.on_create_channel_response(&key, 1, 100);
1108
1109        // Simulate many ops arriving (as happens with multiplexed connections).
1110        // First op via on_op_init_request with sid known:
1111        tracker.on_op_init_request(&key, 100, 1, 13); // MONITOR for the known channel
1112
1113        // Additional ops with different SIDs (channels we never saw created):
1114        for ioid in 2..=10 {
1115            tracker.on_op_activity(&key, 0, ioid, 13);
1116        }
1117
1118        // The known IOID=1 should resolve (via its op's pv_name from INIT)
1119        let pv1 = tracker.resolve_pv_name(&key, 100, 1);
1120        assert_eq!(pv1, Some("CAPTURED:PV".to_string()));
1121
1122        // Unknown ioids should NOT resolve to CAPTURED:PV
1123        for ioid in 2..=10 {
1124            let pv = tracker.resolve_pv_name(&key, 0, ioid);
1125            assert_eq!(pv, None,
1126                "ioid={} should not resolve to the single captured channel", ioid);
1127        }
1128    }
1129
1130    #[test]
1131    fn test_on_op_activity_placeholder_not_created_for_multiplexed() {
1132        // When one channel is known but operations already exist, activity
1133        // with sid=0 should create a placeholder WITHOUT a PV name (not
1134        // inheriting from the single captured channel).
1135        let mut tracker = PvaStateTracker::with_defaults();
1136        let key = test_conn_key();
1137
1138        tracker.on_create_channel_request(&key, 1, "KNOWN:PV".to_string());
1139        tracker.on_create_channel_response(&key, 1, 100);
1140
1141        // First op — establishes that operations exist
1142        tracker.on_op_init_request(&key, 100, 1, 13);
1143
1144        // Second op via on_op_activity with sid=0 — should NOT inherit PV name
1145        tracker.on_op_activity(&key, 0, 2, 13);
1146
1147        let pv = tracker.resolve_pv_name(&key, 0, 2);
1148        assert_eq!(pv, None,
1149            "placeholder for ioid=2 should not inherit PV from single-channel fallback");
1150    }
1151
1152    #[test]
1153    fn test_search_cache_populates_and_resolves() {
1154        let mut tracker = PvaStateTracker::with_defaults();
1155        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1156
1157        // Simulate SEARCH request with CID → PV name pairs
1158        let pv_requests = vec![
1159            (100, "MOTOR:X:POSITION".to_string()),
1160            (101, "MOTOR:Y:POSITION".to_string()),
1161            (102, "TEMP:SENSOR:1".to_string()),
1162        ];
1163        tracker.on_search(& pv_requests, Some(client_ip));
1164
1165        // Resolve CIDs from a SEARCH_RESPONSE
1166        let resolved = tracker.resolve_search_cids(&[100, 101, 102], Some(client_ip));
1167        assert_eq!(resolved.len(), 3);
1168        assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1169        assert_eq!(resolved[1], (101, "MOTOR:Y:POSITION".to_string()));
1170        assert_eq!(resolved[2], (102, "TEMP:SENSOR:1".to_string()));
1171    }
1172
1173    #[test]
1174    fn test_search_cache_partial_resolve() {
1175        let mut tracker = PvaStateTracker::with_defaults();
1176        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1177
1178        let pv_requests = vec![
1179            (100, "MOTOR:X:POSITION".to_string()),
1180        ];
1181        tracker.on_search(&pv_requests, Some(client_ip));
1182
1183        // Resolve with some CIDs that were never cached
1184        let resolved = tracker.resolve_search_cids(&[100, 999], Some(client_ip));
1185        assert_eq!(resolved.len(), 1);
1186        assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
1187    }
1188
1189    #[test]
1190    fn test_search_cache_scoped_by_ip() {
1191        let mut tracker = PvaStateTracker::with_defaults();
1192        let client_a: IpAddr = "192.168.1.10".parse().unwrap();
1193        let client_b: IpAddr = "192.168.1.20".parse().unwrap();
1194
1195        // Both clients use the same CID=1 but different PV names
1196        tracker.on_search(&[(1, "CLIENT_A:PV".to_string())], Some(client_a));
1197        tracker.on_search(&[(1, "CLIENT_B:PV".to_string())], Some(client_b));
1198
1199        // Each client should resolve to its own PV name
1200        let resolved_a = tracker.resolve_search_cids(&[1], Some(client_a));
1201        assert_eq!(resolved_a.len(), 1);
1202        assert_eq!(resolved_a[0].1, "CLIENT_A:PV");
1203
1204        let resolved_b = tracker.resolve_search_cids(&[1], Some(client_b));
1205        assert_eq!(resolved_b.len(), 1);
1206        assert_eq!(resolved_b[0].1, "CLIENT_B:PV");
1207    }
1208
1209    #[test]
1210    fn test_search_cache_flat_fallback() {
1211        let mut tracker = PvaStateTracker::with_defaults();
1212        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1213
1214        // Cache with a known client IP
1215        tracker.on_search(
1216            &[(42, "SOME:PV:NAME".to_string())],
1217            Some(client_ip),
1218        );
1219
1220        // Resolve without knowing the client IP (flat fallback)
1221        let resolved = tracker.resolve_search_cids(&[42], None);
1222        assert_eq!(resolved.len(), 1);
1223        assert_eq!(resolved[0].1, "SOME:PV:NAME");
1224    }
1225
1226    #[test]
1227    fn test_search_cache_used_by_create_channel_response_fallback() {
1228        // When capture misses CREATE_CHANNEL request but has SEARCH,
1229        // the search cache should resolve PV name in CREATE_CHANNEL response.
1230        let mut tracker = PvaStateTracker::with_defaults();
1231        let key = test_conn_key();
1232        let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1233
1234        // Simulate SEARCH with CID=5 → "SEARCHED:PV"
1235        tracker.on_search(&[(5, "SEARCHED:PV".to_string())], Some(client_ip));
1236
1237        // Simulate CREATE_CHANNEL response without having seen the request
1238        tracker.on_create_channel_response(&key, 5, 200);
1239
1240        // The PV name should be resolved from search cache
1241        let pv = tracker.resolve_pv_name(&key, 200, 0);
1242        assert_eq!(pv, Some("SEARCHED:PV".to_string()));
1243    }
1244
1245    #[test]
1246    fn test_search_responses_resolved_stat() {
1247        let mut tracker = PvaStateTracker::with_defaults();
1248        let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
1249
1250        tracker.on_search(
1251            &[
1252                (1, "PV:A".to_string()),
1253                (2, "PV:B".to_string()),
1254            ],
1255            Some(client_ip),
1256        );
1257
1258        assert_eq!(tracker.stats.search_responses_resolved, 0);
1259
1260        tracker.resolve_search_cids(&[1, 2], Some(client_ip));
1261        assert_eq!(tracker.stats.search_responses_resolved, 2);
1262
1263        // Resolving again increments further
1264        tracker.resolve_search_cids(&[1], Some(client_ip));
1265        assert_eq!(tracker.stats.search_responses_resolved, 3);
1266    }
1267
1268    #[test]
1269    fn test_retroactive_resolve_unknown_channels_from_search() {
1270        // Simulates the Java EPICS client scenario:
1271        // 1. Capture starts mid-stream, sees CREATE_CHANNEL responses (cid+sid)
1272        //    but missed the requests → channels are <unknown:cid=N>
1273        // 2. Later a SEARCH arrives with those CIDs → retroactively resolves PV names
1274        let mut tracker = PvaStateTracker::with_defaults();
1275        let key = test_conn_key();
1276
1277        // Step 1: CREATE_CHANNEL responses without prior requests → unknown channels
1278        tracker.on_create_channel_response(&key, 100, 500);
1279        tracker.on_create_channel_response(&key, 101, 501);
1280        tracker.on_create_channel_response(&key, 102, 502);
1281
1282        // Verify channels are unknown
1283        assert_eq!(
1284            tracker.resolve_pv_name(&key, 500, 0),
1285            Some("<unknown:cid=100>".to_string())
1286        );
1287        assert_eq!(
1288            tracker.resolve_pv_name(&key, 501, 0),
1289            Some("<unknown:cid=101>".to_string())
1290        );
1291
1292        // Step 2: SEARCH arrives with CID→PV name mappings
1293        let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1294        tracker.on_search(
1295            &[
1296                (100, "MOTOR:X:POS".to_string()),
1297                (101, "MOTOR:Y:POS".to_string()),
1298                (102, "TEMP:SENSOR:1".to_string()),
1299            ],
1300            Some(client_ip),
1301        );
1302
1303        // Verify channels are now resolved
1304        assert_eq!(
1305            tracker.resolve_pv_name(&key, 500, 0),
1306            Some("MOTOR:X:POS".to_string())
1307        );
1308        assert_eq!(
1309            tracker.resolve_pv_name(&key, 501, 0),
1310            Some("MOTOR:Y:POS".to_string())
1311        );
1312        assert_eq!(
1313            tracker.resolve_pv_name(&key, 502, 0),
1314            Some("TEMP:SENSOR:1".to_string())
1315        );
1316
1317        // Verify retroactive resolution was counted
1318        assert_eq!(tracker.stats.search_retroactive_resolves, 3);
1319    }
1320
1321    #[test]
1322    fn test_retroactive_resolve_also_updates_operations() {
1323        // When a placeholder operation has pv_name=None and its SID maps
1324        // to a channel that just got retroactively resolved, the operation's
1325        // pv_name should also be updated.
1326        let mut tracker = PvaStateTracker::with_defaults();
1327        let key = test_conn_key();
1328
1329        // CREATE_CHANNEL response without request → <unknown:cid=100>
1330        tracker.on_create_channel_response(&key, 100, 500);
1331
1332        // Op INIT on that channel → operation gets pv_name from channel
1333        // But the channel is unknown, so op gets "<unknown:cid=100>" as name
1334        tracker.on_op_init_request(&key, 500, 1, 13); // MONITOR
1335
1336        // Verify op resolves to unknown
1337        let pv = tracker.resolve_pv_name(&key, 500, 1);
1338        assert!(pv.is_some());
1339        // The op should have inherited the unknown name since it looked up via SID
1340
1341        // SEARCH arrives with the CID→PV mapping
1342        let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
1343        tracker.on_search(
1344            &[(100, "RESOLVED:PV".to_string())],
1345            Some(client_ip),
1346        );
1347
1348        // Channel should now be resolved
1349        assert_eq!(
1350            tracker.resolve_pv_name(&key, 500, 0),
1351            Some("RESOLVED:PV".to_string())
1352        );
1353        // Operation should also resolve (via SID→CID→channel)
1354        let pv = tracker.resolve_pv_name(&key, 500, 1);
1355        assert_eq!(pv, Some("RESOLVED:PV".to_string()));
1356    }
1357}