use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::net::{IpAddr, SocketAddr};
use std::time::{Duration, Instant};
use tracing::debug;
use crate::spvd_decode::StructureDesc;
#[derive(Debug, Clone)]
pub struct PvaStateConfig {
pub max_channels: usize,
pub channel_ttl: Duration,
pub max_operations: usize,
pub max_update_rate: usize,
}
impl Default for PvaStateConfig {
fn default() -> Self {
Self {
max_channels: 40_000,
channel_ttl: Duration::from_secs(5 * 60), max_operations: 10_000,
max_update_rate: 10_000,
}
}
}
impl PvaStateConfig {
pub fn new(max_channels: usize, ttl_secs: u64) -> Self {
Self {
max_channels,
channel_ttl: Duration::from_secs(ttl_secs),
max_operations: 10_000,
max_update_rate: 10_000,
}
}
pub fn with_max_update_rate(mut self, max_update_rate: usize) -> Self {
self.max_update_rate = max_update_rate;
self
}
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ConnectionKey {
pub addr_a: SocketAddr,
pub addr_b: SocketAddr,
}
impl ConnectionKey {
pub fn new(addr1: SocketAddr, addr2: SocketAddr) -> Self {
if addr1 <= addr2 {
Self {
addr_a: addr1,
addr_b: addr2,
}
} else {
Self {
addr_a: addr2,
addr_b: addr1,
}
}
}
pub fn from_parts(ip1: &str, port1: u16, ip2: &str, port2: u16) -> Option<Self> {
let addr1: SocketAddr = format!("{}:{}", ip1, port1).parse().ok()?;
let addr2: SocketAddr = format!("{}:{}", ip2, port2).parse().ok()?;
Some(Self::new(addr1, addr2))
}
}
#[derive(Debug, Clone)]
pub struct ChannelInfo {
pub pv_name: String,
pub cid: u32,
pub sid: Option<u32>,
pub last_seen: Instant,
pub fully_established: bool,
pub update_times: VecDeque<Instant>,
pub recent_messages: VecDeque<String>,
}
impl ChannelInfo {
pub fn new_pending(cid: u32, pv_name: String) -> Self {
Self {
pv_name,
cid,
sid: None,
last_seen: Instant::now(),
fully_established: false,
update_times: VecDeque::new(),
recent_messages: VecDeque::new(),
}
}
pub fn touch(&mut self) {
self.last_seen = Instant::now();
}
pub fn is_expired(&self, ttl: Duration) -> bool {
self.last_seen.elapsed() > ttl
}
}
#[derive(Debug, Clone)]
pub struct OperationState {
pub sid: u32,
pub ioid: u32,
pub command: u8,
pub pv_name: Option<String>,
pub field_desc: Option<StructureDesc>,
pub initialized: bool,
pub last_seen: Instant,
pub update_times: VecDeque<Instant>,
pub recent_messages: VecDeque<String>,
}
impl OperationState {
pub fn new(sid: u32, ioid: u32, command: u8, pv_name: Option<String>) -> Self {
Self {
sid,
ioid,
command,
pv_name,
field_desc: None,
initialized: false,
last_seen: Instant::now(),
update_times: VecDeque::new(),
recent_messages: VecDeque::new(),
}
}
pub fn touch(&mut self) {
self.last_seen = Instant::now();
}
}
#[derive(Debug)]
pub struct ConnectionState {
pub channels_by_cid: HashMap<u32, ChannelInfo>,
pub sid_to_cid: HashMap<u32, u32>,
pub operations: HashMap<u32, OperationState>,
pub is_be: bool,
pub last_seen: Instant,
pub update_times: VecDeque<Instant>,
pub recent_messages: VecDeque<String>,
}
impl ConnectionState {
pub fn new() -> Self {
Self {
channels_by_cid: HashMap::new(),
sid_to_cid: HashMap::new(),
operations: HashMap::new(),
is_be: false, last_seen: Instant::now(),
update_times: VecDeque::new(),
recent_messages: VecDeque::new(),
}
}
pub fn touch(&mut self) {
self.last_seen = Instant::now();
}
pub fn get_channel_by_sid(&self, sid: u32) -> Option<&ChannelInfo> {
self.sid_to_cid
.get(&sid)
.and_then(|cid| self.channels_by_cid.get(cid))
}
pub fn get_channel_by_sid_mut(&mut self, sid: u32) -> Option<&mut ChannelInfo> {
if let Some(&cid) = self.sid_to_cid.get(&sid) {
self.channels_by_cid.get_mut(&cid)
} else {
None
}
}
pub fn get_pv_name_by_sid(&self, sid: u32) -> Option<&str> {
self.get_channel_by_sid(sid).map(|ch| ch.pv_name.as_str())
}
pub fn get_pv_name_by_ioid(&self, ioid: u32) -> Option<&str> {
self.operations
.get(&ioid)
.and_then(|op| op.pv_name.as_deref())
}
}
impl Default for ConnectionState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct PvaStateTracker {
config: PvaStateConfig,
connections: HashMap<ConnectionKey, ConnectionState>,
total_channels: usize,
pub stats: PvaStateStats,
search_cache: HashMap<(IpAddr, u32), String>,
search_cache_flat: HashMap<u32, String>,
}
#[derive(Debug, Default, Clone)]
pub struct PvaStateStats {
pub channels_created: u64,
pub channels_destroyed: u64,
pub channels_expired: u64,
pub channels_evicted: u64,
pub operations_created: u64,
pub operations_completed: u64,
pub create_channel_requests: u64,
pub create_channel_responses: u64,
pub search_responses_resolved: u64,
pub search_cache_entries: u64,
pub search_retroactive_resolves: u64,
pub client_messages: u64,
pub server_messages: u64,
}
#[derive(Debug, Clone)]
pub struct ConnectionSnapshot {
pub addr_a: SocketAddr,
pub addr_b: SocketAddr,
pub channel_count: usize,
pub operation_count: usize,
pub last_seen: Duration,
pub pv_names: Vec<String>,
pub updates_per_sec: f64,
pub recent_messages: Vec<String>,
pub mid_stream: bool,
pub is_beacon: bool,
pub is_broadcast: bool,
}
#[derive(Debug, Clone)]
pub struct ChannelSnapshot {
pub addr_a: SocketAddr,
pub addr_b: SocketAddr,
pub cid: u32,
pub sid: Option<u32>,
pub pv_name: String,
pub last_seen: Duration,
pub updates_per_sec: f64,
pub recent_messages: Vec<String>,
pub mid_stream: bool,
pub is_beacon: bool,
pub is_broadcast: bool,
}
impl PvaStateTracker {
fn is_broadcast_addr(addr: &SocketAddr) -> bool {
match addr.ip() {
std::net::IpAddr::V4(v4) => {
if v4.is_broadcast() {
return true;
}
v4.octets()[3] == 255
}
std::net::IpAddr::V6(v6) => {
v6.is_multicast()
}
}
}
pub fn new(config: PvaStateConfig) -> Self {
Self {
config,
connections: HashMap::new(),
total_channels: 0,
stats: PvaStateStats::default(),
search_cache: HashMap::new(),
search_cache_flat: HashMap::new(),
}
}
pub fn with_defaults() -> Self {
Self::new(PvaStateConfig::default())
}
fn get_or_create_connection(&mut self, key: &ConnectionKey) -> &mut ConnectionState {
if !self.connections.contains_key(key) {
self.connections.insert(key.clone(), ConnectionState::new());
}
self.connections.get_mut(key).unwrap()
}
pub fn get_connection(&self, key: &ConnectionKey) -> Option<&ConnectionState> {
self.connections.get(key)
}
pub fn get_pv_name_by_sid(&self, conn_key: &ConnectionKey, sid: u32) -> Option<String> {
self.connections
.get(conn_key)
.and_then(|conn| conn.get_pv_name_by_sid(sid))
.map(|s| s.to_string())
}
pub fn on_create_channel_request(
&mut self,
conn_key: &ConnectionKey,
cid: u32,
pv_name: String,
) {
self.stats.create_channel_requests += 1;
let client_ip = conn_key.addr_a.ip(); self.search_cache.insert((client_ip, cid), pv_name.clone());
self.search_cache_flat.insert(cid, pv_name.clone());
if self.total_channels >= self.config.max_channels {
self.evict_oldest_channels(100); }
let conn = self.get_or_create_connection(conn_key);
conn.touch();
if !conn.channels_by_cid.contains_key(&cid) {
conn.channels_by_cid
.insert(cid, ChannelInfo::new_pending(cid, pv_name));
self.total_channels += 1;
self.stats.channels_created += 1;
debug!("CREATE_CHANNEL request: cid={}", cid);
}
}
pub fn on_create_channel_response(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
self.stats.create_channel_responses += 1;
let cached_pv_name = self
.search_cache
.get(&(conn_key.addr_a.ip(), cid))
.or_else(|| self.search_cache.get(&(conn_key.addr_b.ip(), cid)))
.or_else(|| self.search_cache_flat.get(&cid))
.cloned();
let conn = self.get_or_create_connection(conn_key);
conn.touch();
if let Some(channel) = conn.channels_by_cid.get_mut(&cid) {
channel.sid = Some(sid);
channel.fully_established = true;
channel.touch();
conn.sid_to_cid.insert(sid, cid);
debug!(
"CREATE_CHANNEL response: cid={}, sid={}, pv={}",
cid, sid, channel.pv_name
);
} else {
let pv_name = cached_pv_name.unwrap_or_else(|| format!("<unknown:cid={}>", cid));
let is_resolved = !pv_name.starts_with("<unknown");
debug!(
"CREATE_CHANNEL response without request: cid={}, sid={}, resolved={}",
cid, sid, is_resolved
);
let mut channel = ChannelInfo::new_pending(cid, pv_name);
channel.sid = Some(sid);
channel.fully_established = is_resolved;
conn.channels_by_cid.insert(cid, channel);
conn.sid_to_cid.insert(sid, cid);
self.total_channels += 1;
}
}
pub fn on_destroy_channel(&mut self, conn_key: &ConnectionKey, cid: u32, sid: u32) {
if let Some(conn) = self.connections.get_mut(conn_key) {
conn.touch();
if conn.channels_by_cid.remove(&cid).is_some() {
self.total_channels = self.total_channels.saturating_sub(1);
self.stats.channels_destroyed += 1;
}
conn.sid_to_cid.remove(&sid);
conn.operations.retain(|_, op| op.sid != sid);
debug!("DESTROY_CHANNEL: cid={}, sid={}", cid, sid);
}
}
pub fn on_op_init_request(
&mut self,
conn_key: &ConnectionKey,
sid: u32,
ioid: u32,
command: u8,
) {
let max_ops = self.config.max_operations;
let conn = self.get_or_create_connection(conn_key);
conn.touch();
let pv_name = conn.get_pv_name_by_sid(sid).map(|s| s.to_string());
if conn.operations.len() < max_ops {
conn.operations
.insert(ioid, OperationState::new(sid, ioid, command, pv_name));
self.stats.operations_created += 1;
debug!(
"Operation INIT: sid={}, ioid={}, cmd={}",
sid, ioid, command
);
}
}
pub fn on_op_init_response(
&mut self,
conn_key: &ConnectionKey,
ioid: u32,
field_desc: Option<StructureDesc>,
) {
if let Some(conn) = self.connections.get_mut(conn_key) {
conn.touch();
if let Some(op) = conn.operations.get_mut(&ioid) {
op.field_desc = field_desc;
op.initialized = true;
op.touch();
debug!("Operation INIT response: ioid={}", ioid);
}
}
}
pub fn on_op_destroy(&mut self, conn_key: &ConnectionKey, ioid: u32) {
if let Some(conn) = self.connections.get_mut(conn_key) {
if conn.operations.remove(&ioid).is_some() {
self.stats.operations_completed += 1;
}
}
}
pub fn on_op_activity(&mut self, conn_key: &ConnectionKey, sid: u32, ioid: u32, command: u8) {
let max_update_rate = self.config.max_update_rate;
let max_ops = self.config.max_operations;
let mut created_placeholder = false;
let conn = self.get_or_create_connection(conn_key);
conn.touch();
Self::record_update(&mut conn.update_times, max_update_rate);
let mut channel_sid = if sid != 0 { Some(sid) } else { None };
if let Some(op) = conn.operations.get_mut(&ioid) {
op.touch();
Self::record_update(&mut op.update_times, max_update_rate);
if channel_sid.is_none() {
channel_sid = Some(op.sid);
}
} else if conn.operations.len() < max_ops {
let pv_name = if sid != 0 {
conn.get_pv_name_by_sid(sid).map(|s| s.to_string())
} else if conn.channels_by_cid.len() == 1 && conn.operations.is_empty() {
conn.channels_by_cid
.values()
.next()
.map(|ch| ch.pv_name.clone())
.filter(|n| !n.starts_with("<unknown"))
} else {
None
};
conn.operations
.insert(ioid, OperationState::new(sid, ioid, command, pv_name));
created_placeholder = true;
}
if let Some(sid_val) = channel_sid {
if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
channel.touch();
Self::record_update(&mut channel.update_times, max_update_rate);
}
}
if created_placeholder {
self.stats.operations_created += 1;
debug!(
"Auto-created placeholder operation for mid-stream traffic: sid={}, ioid={}, cmd={}",
sid, ioid, command
);
}
}
pub fn on_search(&mut self, pv_requests: &[(u32, String)], source_ip: Option<IpAddr>) {
let cid_to_pv: HashMap<u32, String> = pv_requests.iter().cloned().collect();
for (cid, pv_name) in pv_requests {
if let Some(ip) = source_ip {
self.search_cache.insert((ip, *cid), pv_name.clone());
}
self.search_cache_flat.insert(*cid, pv_name.clone());
}
let mut retroactive_count: u64 = 0;
for conn in self.connections.values_mut() {
for (cid, channel) in conn.channels_by_cid.iter_mut() {
if channel.pv_name.starts_with("<unknown") {
if let Some(pv_name) = cid_to_pv.get(cid) {
debug!(
"Retroactive PV resolve from SEARCH: cid={} {} -> {}",
cid, channel.pv_name, pv_name
);
channel.pv_name = pv_name.clone();
channel.fully_established = true;
retroactive_count += 1;
}
}
}
for op in conn.operations.values_mut() {
let needs_update = match &op.pv_name {
None => true,
Some(name) => name.starts_with("<unknown"),
};
if needs_update && op.sid != 0 {
if let Some(&cid) = conn.sid_to_cid.get(&op.sid) {
if let Some(pv_name) = cid_to_pv.get(&cid) {
op.pv_name = Some(pv_name.clone());
}
}
}
}
}
if retroactive_count > 0 {
self.stats.search_retroactive_resolves += retroactive_count;
debug!(
"Retroactively resolved {} unknown channels from SEARCH cache",
retroactive_count
);
}
self.stats.search_cache_entries = self.search_cache_flat.len() as u64;
while self.search_cache.len() > 50_000 {
if let Some(key) = self.search_cache.keys().next().cloned() {
self.search_cache.remove(&key);
}
}
while self.search_cache_flat.len() > 50_000 {
if let Some(key) = self.search_cache_flat.keys().next().cloned() {
self.search_cache_flat.remove(&key);
}
}
}
pub fn resolve_search_cids(
&mut self,
cids: &[u32],
peer_ip: Option<IpAddr>,
) -> Vec<(u32, String)> {
let mut resolved = Vec::new();
for &cid in cids {
let pv_name = peer_ip
.and_then(|ip| self.search_cache.get(&(ip, cid)))
.or_else(|| self.search_cache_flat.get(&cid))
.cloned();
if let Some(name) = pv_name {
resolved.push((cid, name));
self.stats.search_responses_resolved += 1;
}
}
resolved
}
pub fn count_direction(&mut self, is_server: bool) {
if is_server {
self.stats.server_messages += 1;
} else {
self.stats.client_messages += 1;
}
}
pub fn on_message(
&mut self,
conn_key: &ConnectionKey,
sid: u32,
ioid: u32,
request_type: &str,
message: String,
is_server: bool,
) {
let conn = self.get_or_create_connection(conn_key);
conn.touch();
let dir = if is_server { "S>" } else { "C>" };
let full_message = format!("{} {} {}", dir, request_type, message);
Self::push_message(&mut conn.recent_messages, full_message.clone());
let mut channel_sid = if sid != 0 { Some(sid) } else { None };
if let Some(op) = conn.operations.get_mut(&ioid) {
Self::push_message(&mut op.recent_messages, full_message.clone());
if channel_sid.is_none() {
channel_sid = Some(op.sid);
}
}
if let Some(sid_val) = channel_sid {
if let Some(channel) = conn.get_channel_by_sid_mut(sid_val) {
Self::push_message(&mut channel.recent_messages, full_message);
}
}
}
fn record_update(times: &mut VecDeque<Instant>, max_update_rate: usize) {
let now = Instant::now();
times.push_back(now);
Self::trim_times(times, now);
while times.len() > max_update_rate {
times.pop_front();
}
}
fn trim_times(times: &mut VecDeque<Instant>, now: Instant) {
while let Some(front) = times.front() {
if now.duration_since(*front) > Duration::from_secs(1) {
times.pop_front();
} else {
break;
}
}
}
fn updates_per_sec(times: &VecDeque<Instant>) -> f64 {
times.len() as f64
}
fn push_message(messages: &mut VecDeque<String>, message: String) {
messages.push_back(message);
while messages.len() > 30 {
messages.pop_front();
}
}
pub fn resolve_pv_name(&self, conn_key: &ConnectionKey, sid: u32, ioid: u32) -> Option<String> {
let conn = self.connections.get(conn_key)?;
if let Some(op) = conn.operations.get(&ioid) {
if let Some(ref name) = op.pv_name {
if !name.starts_with("<unknown") {
return Some(name.clone());
}
}
}
if sid != 0 {
if let Some(name) = conn.get_pv_name_by_sid(sid) {
return Some(name.to_string());
}
}
if conn.channels_by_cid.len() == 1 && conn.operations.len() <= 1 {
if let Some(ch) = conn.channels_by_cid.values().next() {
if !ch.pv_name.starts_with("<unknown") {
return Some(ch.pv_name.clone());
}
}
}
None
}
pub fn active_channel_count(&self) -> usize {
self.total_channels
}
pub fn active_connection_count(&self) -> usize {
self.connections.len()
}
pub fn is_connection_mid_stream(&self, conn_key: &ConnectionKey) -> bool {
self.connections
.get(conn_key)
.map(|conn| {
if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
return true;
}
conn.channels_by_cid
.values()
.any(|ch| !ch.fully_established)
})
.unwrap_or(false)
}
pub fn get_operation(&self, conn_key: &ConnectionKey, ioid: u32) -> Option<&OperationState> {
self.connections
.get(conn_key)
.and_then(|conn| conn.operations.get(&ioid))
}
fn evict_oldest_channels(&mut self, count: usize) {
let mut oldest: Vec<(ConnectionKey, u32, Instant)> = Vec::new();
for (conn_key, conn) in &self.connections {
for (cid, channel) in &conn.channels_by_cid {
oldest.push((conn_key.clone(), *cid, channel.last_seen));
}
}
oldest.sort_by_key(|(_, _, t)| *t);
for (conn_key, cid, _) in oldest.into_iter().take(count) {
if let Some(conn) = self.connections.get_mut(&conn_key) {
if let Some(channel) = conn.channels_by_cid.remove(&cid) {
if let Some(sid) = channel.sid {
conn.sid_to_cid.remove(&sid);
}
self.total_channels = self.total_channels.saturating_sub(1);
self.stats.channels_evicted += 1;
}
}
}
}
pub fn cleanup_expired(&mut self) {
let ttl = self.config.channel_ttl;
let mut expired_count = 0;
for conn in self.connections.values_mut() {
let expired_cids: Vec<u32> = conn
.channels_by_cid
.iter()
.filter(|(_, ch)| ch.is_expired(ttl))
.map(|(cid, _)| *cid)
.collect();
for cid in expired_cids {
if let Some(channel) = conn.channels_by_cid.remove(&cid) {
if let Some(sid) = channel.sid {
conn.sid_to_cid.remove(&sid);
conn.operations.retain(|_, op| op.sid != sid);
}
expired_count += 1;
}
}
}
if expired_count > 0 {
self.total_channels = self.total_channels.saturating_sub(expired_count);
self.stats.channels_expired += expired_count as u64;
debug!("Cleaned up {} expired channels", expired_count);
}
self.connections
.retain(|_, conn| !conn.channels_by_cid.is_empty() || !conn.operations.is_empty());
}
pub fn summary(&self) -> String {
format!(
"PVA State: {} connections, {} channels (created={}, destroyed={}, expired={}, evicted={})",
self.connections.len(),
self.total_channels,
self.stats.channels_created,
self.stats.channels_destroyed,
self.stats.channels_expired,
self.stats.channels_evicted,
)
}
pub fn channel_count(&self) -> usize {
self.total_channels
}
pub fn connection_count(&self) -> usize {
self.connections.len()
}
pub fn connection_snapshots(&self) -> Vec<ConnectionSnapshot> {
let mut snapshots = Vec::new();
let now = Instant::now();
for (conn_key, conn) in &self.connections {
let mut update_times = conn.update_times.clone();
Self::trim_times(&mut update_times, now);
let mut pv_names: Vec<String> = conn
.channels_by_cid
.values()
.map(|ch| ch.pv_name.clone())
.collect();
pv_names.sort();
pv_names.truncate(8);
let mut messages: Vec<String> = conn.recent_messages.iter().cloned().collect();
if messages.len() > 20 {
messages = messages.split_off(messages.len() - 20);
}
let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
|| Self::is_broadcast_addr(&conn_key.addr_b);
let mut mid_stream = false;
if conn.channels_by_cid.is_empty() && !conn.operations.is_empty() {
mid_stream = true;
}
if conn
.channels_by_cid
.values()
.any(|ch| !ch.fully_established || ch.pv_name.starts_with("<unknown"))
{
mid_stream = true;
}
snapshots.push(ConnectionSnapshot {
addr_a: conn_key.addr_a,
addr_b: conn_key.addr_b,
channel_count: conn.channels_by_cid.len(),
operation_count: conn.operations.len(),
last_seen: conn.last_seen.elapsed(),
pv_names,
updates_per_sec: Self::updates_per_sec(&update_times),
recent_messages: messages,
mid_stream,
is_beacon,
is_broadcast,
});
}
snapshots
}
pub fn channel_snapshots(&self) -> Vec<ChannelSnapshot> {
let mut snapshots = Vec::new();
let now = Instant::now();
for (conn_key, conn) in &self.connections {
for channel in conn.channels_by_cid.values() {
let mut update_times = channel.update_times.clone();
Self::trim_times(&mut update_times, now);
let mut messages: Vec<String> = channel.recent_messages.iter().cloned().collect();
if messages.len() > 20 {
messages = messages.split_off(messages.len() - 20);
}
let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
|| Self::is_broadcast_addr(&conn_key.addr_b);
snapshots.push(ChannelSnapshot {
addr_a: conn_key.addr_a,
addr_b: conn_key.addr_b,
cid: channel.cid,
sid: channel.sid,
pv_name: channel.pv_name.clone(),
last_seen: channel.last_seen.elapsed(),
updates_per_sec: Self::updates_per_sec(&update_times),
recent_messages: messages,
mid_stream: !channel.fully_established
|| channel.pv_name.starts_with("<unknown"),
is_beacon,
is_broadcast,
});
}
let mut seen_virtual = HashSet::new();
for op in conn.operations.values() {
if conn.get_channel_by_sid(op.sid).is_none() {
let mut update_times = op.update_times.clone();
Self::trim_times(&mut update_times, now);
let mut messages: Vec<String> = op.recent_messages.iter().cloned().collect();
if messages.len() > 20 {
messages = messages.split_off(messages.len() - 20);
}
let is_beacon = messages.iter().any(|m| m.starts_with("BEACON "));
let is_broadcast = Self::is_broadcast_addr(&conn_key.addr_a)
|| Self::is_broadcast_addr(&conn_key.addr_b);
let pv_name = op
.pv_name
.clone()
.unwrap_or_else(|| format!("<unknown:sid={}>", op.sid));
if !seen_virtual.insert((op.sid, pv_name.clone())) {
continue;
}
snapshots.push(ChannelSnapshot {
addr_a: conn_key.addr_a,
addr_b: conn_key.addr_b,
cid: 0,
sid: Some(op.sid),
pv_name,
last_seen: op.last_seen.elapsed(),
updates_per_sec: Self::updates_per_sec(&update_times),
recent_messages: messages,
mid_stream: true,
is_beacon,
is_broadcast,
});
}
}
}
snapshots
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_conn_key() -> ConnectionKey {
ConnectionKey::from_parts("192.168.1.1", 12345, "192.168.1.2", 5075).unwrap()
}
#[test]
fn test_create_channel_flow() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_create_channel_request(&key, 1, "TEST:PV:VALUE".to_string());
assert_eq!(tracker.channel_count(), 1);
tracker.on_create_channel_response(&key, 1, 100);
let pv_name = tracker.resolve_pv_name(&key, 100, 0);
assert_eq!(pv_name, Some("TEST:PV:VALUE".to_string()));
}
#[test]
fn test_channel_limit() {
let config = PvaStateConfig::new(100, 300);
let mut tracker = PvaStateTracker::new(config);
let key = test_conn_key();
for i in 0..150 {
tracker.on_create_channel_request(&key, i, format!("PV:{}", i));
}
assert!(tracker.channel_count() <= 100);
}
#[test]
fn test_destroy_channel() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_create_channel_request(&key, 1, "TEST:PV".to_string());
tracker.on_create_channel_response(&key, 1, 100);
assert_eq!(tracker.channel_count(), 1);
tracker.on_destroy_channel(&key, 1, 100);
assert_eq!(tracker.channel_count(), 0);
}
#[test]
fn test_channel_snapshots_dedup_unresolved_sid_rows() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_op_init_request(&key, 777, 1001, 13);
tracker.on_op_init_request(&key, 777, 1002, 13);
tracker.on_op_activity(&key, 777, 1001, 13);
tracker.on_op_activity(&key, 777, 1002, 13);
let snapshots = tracker.channel_snapshots();
assert_eq!(snapshots.len(), 1);
assert_eq!(snapshots[0].sid, Some(777));
}
#[test]
fn test_single_channel_fallback_works_for_simple_connection() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_create_channel_request(&key, 1, "SIMPLE:PV".to_string());
tracker.on_create_channel_response(&key, 1, 100);
let pv = tracker.resolve_pv_name(&key, 0, 99);
assert_eq!(pv, Some("SIMPLE:PV".to_string()));
}
#[test]
fn test_no_false_attribution_on_multiplexed_connection() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_create_channel_request(&key, 1, "CAPTURED:PV".to_string());
tracker.on_create_channel_response(&key, 1, 100);
tracker.on_op_init_request(&key, 100, 1, 13);
for ioid in 2..=10 {
tracker.on_op_activity(&key, 0, ioid, 13);
}
let pv1 = tracker.resolve_pv_name(&key, 100, 1);
assert_eq!(pv1, Some("CAPTURED:PV".to_string()));
for ioid in 2..=10 {
let pv = tracker.resolve_pv_name(&key, 0, ioid);
assert_eq!(
pv, None,
"ioid={} should not resolve to the single captured channel",
ioid
);
}
}
#[test]
fn test_on_op_activity_placeholder_not_created_for_multiplexed() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_create_channel_request(&key, 1, "KNOWN:PV".to_string());
tracker.on_create_channel_response(&key, 1, 100);
tracker.on_op_init_request(&key, 100, 1, 13);
tracker.on_op_activity(&key, 0, 2, 13);
let pv = tracker.resolve_pv_name(&key, 0, 2);
assert_eq!(
pv, None,
"placeholder for ioid=2 should not inherit PV from single-channel fallback"
);
}
#[test]
fn test_search_cache_populates_and_resolves() {
let mut tracker = PvaStateTracker::with_defaults();
let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
let pv_requests = vec![
(100, "MOTOR:X:POSITION".to_string()),
(101, "MOTOR:Y:POSITION".to_string()),
(102, "TEMP:SENSOR:1".to_string()),
];
tracker.on_search(&pv_requests, Some(client_ip));
let resolved = tracker.resolve_search_cids(&[100, 101, 102], Some(client_ip));
assert_eq!(resolved.len(), 3);
assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
assert_eq!(resolved[1], (101, "MOTOR:Y:POSITION".to_string()));
assert_eq!(resolved[2], (102, "TEMP:SENSOR:1".to_string()));
}
#[test]
fn test_search_cache_partial_resolve() {
let mut tracker = PvaStateTracker::with_defaults();
let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
let pv_requests = vec![(100, "MOTOR:X:POSITION".to_string())];
tracker.on_search(&pv_requests, Some(client_ip));
let resolved = tracker.resolve_search_cids(&[100, 999], Some(client_ip));
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0], (100, "MOTOR:X:POSITION".to_string()));
}
#[test]
fn test_search_cache_scoped_by_ip() {
let mut tracker = PvaStateTracker::with_defaults();
let client_a: IpAddr = "192.168.1.10".parse().unwrap();
let client_b: IpAddr = "192.168.1.20".parse().unwrap();
tracker.on_search(&[(1, "CLIENT_A:PV".to_string())], Some(client_a));
tracker.on_search(&[(1, "CLIENT_B:PV".to_string())], Some(client_b));
let resolved_a = tracker.resolve_search_cids(&[1], Some(client_a));
assert_eq!(resolved_a.len(), 1);
assert_eq!(resolved_a[0].1, "CLIENT_A:PV");
let resolved_b = tracker.resolve_search_cids(&[1], Some(client_b));
assert_eq!(resolved_b.len(), 1);
assert_eq!(resolved_b[0].1, "CLIENT_B:PV");
}
#[test]
fn test_search_cache_flat_fallback() {
let mut tracker = PvaStateTracker::with_defaults();
let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
tracker.on_search(&[(42, "SOME:PV:NAME".to_string())], Some(client_ip));
let resolved = tracker.resolve_search_cids(&[42], None);
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].1, "SOME:PV:NAME");
}
#[test]
fn test_search_cache_used_by_create_channel_response_fallback() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
tracker.on_search(&[(5, "SEARCHED:PV".to_string())], Some(client_ip));
tracker.on_create_channel_response(&key, 5, 200);
let pv = tracker.resolve_pv_name(&key, 200, 0);
assert_eq!(pv, Some("SEARCHED:PV".to_string()));
}
#[test]
fn test_search_responses_resolved_stat() {
let mut tracker = PvaStateTracker::with_defaults();
let client_ip: IpAddr = "192.168.1.10".parse().unwrap();
tracker.on_search(
&[(1, "PV:A".to_string()), (2, "PV:B".to_string())],
Some(client_ip),
);
assert_eq!(tracker.stats.search_responses_resolved, 0);
tracker.resolve_search_cids(&[1, 2], Some(client_ip));
assert_eq!(tracker.stats.search_responses_resolved, 2);
tracker.resolve_search_cids(&[1], Some(client_ip));
assert_eq!(tracker.stats.search_responses_resolved, 3);
}
#[test]
fn test_retroactive_resolve_unknown_channels_from_search() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_create_channel_response(&key, 100, 500);
tracker.on_create_channel_response(&key, 101, 501);
tracker.on_create_channel_response(&key, 102, 502);
assert_eq!(
tracker.resolve_pv_name(&key, 500, 0),
Some("<unknown:cid=100>".to_string())
);
assert_eq!(
tracker.resolve_pv_name(&key, 501, 0),
Some("<unknown:cid=101>".to_string())
);
let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
tracker.on_search(
&[
(100, "MOTOR:X:POS".to_string()),
(101, "MOTOR:Y:POS".to_string()),
(102, "TEMP:SENSOR:1".to_string()),
],
Some(client_ip),
);
assert_eq!(
tracker.resolve_pv_name(&key, 500, 0),
Some("MOTOR:X:POS".to_string())
);
assert_eq!(
tracker.resolve_pv_name(&key, 501, 0),
Some("MOTOR:Y:POS".to_string())
);
assert_eq!(
tracker.resolve_pv_name(&key, 502, 0),
Some("TEMP:SENSOR:1".to_string())
);
assert_eq!(tracker.stats.search_retroactive_resolves, 3);
}
#[test]
fn test_retroactive_resolve_also_updates_operations() {
let mut tracker = PvaStateTracker::with_defaults();
let key = test_conn_key();
tracker.on_create_channel_response(&key, 100, 500);
tracker.on_op_init_request(&key, 500, 1, 13);
let pv = tracker.resolve_pv_name(&key, 500, 1);
assert!(pv.is_some());
let client_ip: IpAddr = "192.168.1.1".parse().unwrap();
tracker.on_search(&[(100, "RESOLVED:PV".to_string())], Some(client_ip));
assert_eq!(
tracker.resolve_pv_name(&key, 500, 0),
Some("RESOLVED:PV".to_string())
);
let pv = tracker.resolve_pv_name(&key, 500, 1);
assert_eq!(pv, Some("RESOLVED:PV".to_string()));
}
}