use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr};
use std::time::{Duration, Instant};
use epics_base_rs::net::AsyncUdpV4;
use epics_base_rs::runtime::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::interval;
use crate::protocol::*;
use super::circuit_breaker::CircuitBreakerRegistry;
use super::types::{SearchAttempts, SearchReason, SearchRequest, SearchResponse};
use std::sync::atomic::{AtomicU32, Ordering};
type ParsedDatagram = (Vec<u8>, SocketAddr);
async fn send_with_fanout(
socket: &AsyncUdpV4,
buf: &[u8],
addr: SocketAddr,
site: &'static str,
send_errors: &mut HashMap<SocketAddr, std::io::ErrorKind>,
) {
let needs_fanout = match addr {
SocketAddr::V4(v4) => v4.ip().is_broadcast() || v4.ip().is_multicast(),
SocketAddr::V6(_) => false,
};
let result = if needs_fanout {
socket.fanout_to(buf, addr).await.map(|_| ())
} else {
socket.send_to(buf, addr).await.map(|_| ())
};
match result {
Ok(()) => {
if let Some(prev) = send_errors.remove(&addr) {
tracing::info!(
target: "epics_ca_rs::search",
%addr, site, prev_error = ?prev,
"search send_to: recovered"
);
}
}
Err(e) => {
let kind = e.kind();
let prev = send_errors.insert(addr, kind);
if prev != Some(kind) {
tracing::warn!(
target: "epics_ca_rs::search",
%addr,
site,
error = %e,
"search send_to failed"
);
}
}
}
}
const N_SEARCH_BUCKETS: usize = 30;
fn placement_bucket(current_bucket: usize, reason: SearchReason) -> usize {
match reason {
SearchReason::Initial | SearchReason::BeaconAnomaly => {
(current_bucket + 1) % N_SEARCH_BUCKETS
}
SearchReason::Reconnect => current_bucket,
}
}
fn cascade_smoothed_next(
current_bucket: usize,
attempt: u32,
bucket_sizes: impl Fn(usize) -> usize,
) -> usize {
let n_search = (attempt as usize).min(N_SEARCH_BUCKETS);
let next = (current_bucket + n_search) % N_SEARCH_BUCKETS;
let nextnext = (next + 1) % N_SEARCH_BUCKETS;
let next_n = bucket_sizes(next);
let nextnext_n = bucket_sizes(nextnext);
if next_n > nextnext_n && next_n - nextnext_n > 100 {
nextnext
} else {
next
}
}
const MAX_SEARCH_PERIOD_DEFAULT_SECS: f64 = 300.0;
const MAX_SEARCH_PERIOD_LOWER_LIMIT_SECS: f64 = 60.0;
fn max_search_period_secs() -> f64 {
match epics_base_rs::runtime::env::get("EPICS_CA_MAX_SEARCH_PERIOD") {
Some(raw) => match raw.parse::<f64>() {
Ok(v) => v.max(MAX_SEARCH_PERIOD_LOWER_LIMIT_SECS),
Err(_) => MAX_SEARCH_PERIOD_DEFAULT_SECS,
},
None => MAX_SEARCH_PERIOD_DEFAULT_SECS,
}
}
fn normal_tick() -> Duration {
let period_secs = max_search_period_secs();
Duration::from_secs_f64(period_secs / N_SEARCH_BUCKETS as f64)
}
const FAST_TICK: Duration = Duration::from_millis(200);
const MAX_UDP_SEND: usize = 1024;
const PENALTY_DURATION: Duration = Duration::from_secs(30);
struct PendingSearch {
#[allow(dead_code)]
cid: u32,
#[allow(dead_code)]
pv_name: String,
search_payload: Vec<u8>,
bucket: usize,
attempt: u32,
#[allow(dead_code)]
last_attempt: Option<Instant>,
}
struct PenaltyEntry {
until: Instant,
}
struct SearchEngineState {
pending: HashMap<u32, PendingSearch>,
buckets: Vec<Vec<u32>>,
current_bucket: usize,
attempts: SearchAttempts,
fast_ticks_remaining: u32,
penalty: HashMap<SocketAddr, PenaltyEntry>,
breakers: CircuitBreakerRegistry,
dgram_seq: u32,
last_valid_seq: Option<u32>,
send_errors: HashMap<SocketAddr, std::io::ErrorKind>,
ignored_servers: std::collections::HashSet<Ipv4Addr>,
resolved: HashMap<u32, (String, SocketAddr)>,
}
const MULTIPLY_DEFINED_RESOLVED_CAP: usize = 1024;
impl SearchEngineState {
#[cfg(test)]
fn new() -> Self {
Self::with_attempts(std::sync::Arc::new(dashmap::DashMap::new()))
}
fn with_attempts(attempts: SearchAttempts) -> Self {
Self {
pending: HashMap::new(),
buckets: (0..N_SEARCH_BUCKETS).map(|_| Vec::new()).collect(),
current_bucket: 0,
attempts,
fast_ticks_remaining: 0,
penalty: HashMap::new(),
breakers: CircuitBreakerRegistry::new(),
dgram_seq: 0,
last_valid_seq: None,
send_errors: HashMap::new(),
ignored_servers: super::epics_rs_client_ignore().into_iter().collect(),
resolved: HashMap::new(),
}
}
fn remove_channel(&mut self, cid: u32) {
if let Some(p) = self.pending.remove(&cid) {
self.buckets[p.bucket].retain(|x| *x != cid);
}
self.attempts.remove(&cid);
self.resolved.remove(&cid);
}
fn mark_connected(&mut self, _cid: u32) {
}
fn poke(&mut self) {
for p in self.pending.values_mut() {
p.attempt = 0;
p.last_attempt = None;
}
self.fast_ticks_remaining = N_SEARCH_BUCKETS as u32;
}
}
pub(crate) async fn run_search_engine(
mut addr_list: Vec<super::AddrEntry>,
nameserver_addrs: Vec<SocketAddr>,
mut request_rx: mpsc::UnboundedReceiver<SearchRequest>,
response_tx: mpsc::UnboundedSender<SearchResponse>,
attempts: SearchAttempts,
) {
let socket = match AsyncUdpV4::bind(0, true) {
Ok(s) => s,
Err(_) => return,
};
let _ = socket.set_recv_buffer_size(256 * 1024);
let _ = socket.set_multicast_ttl_v4(epics_base_rs::runtime::net::ca_mcast_ttl());
if let Err(e) = socket.enable_so_rxq_ovfl() {
tracing::trace!(
target: "epics_ca_rs::client::search",
error = %e,
"SO_RXQ_OVFL enable on per-NIC SEARCH bundle failed (non-fatal)"
);
}
let (tcp_response_tx, mut tcp_response_rx) = mpsc::unbounded_channel::<ParsedDatagram>();
let ns_queue_cap = epics_base_rs::runtime::env::get("EPICS_CA_NAMESERVER_QUEUE_DEPTH")
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(256)
.max(8);
let mut nameserver_send_txs: Vec<mpsc::Sender<Vec<u8>>> = Vec::new();
for addr in nameserver_addrs {
let (tx, rx) = mpsc::channel::<Vec<u8>>(ns_queue_cap);
nameserver_send_txs.push(tx);
let resp_tx = tcp_response_tx.clone();
epics_base_rs::runtime::task::spawn(async move {
run_nameserver_connection(addr, rx, resp_tx).await;
});
}
let mut state = SearchEngineState::with_attempts(attempts);
let mut recv_buf = [0u8; 65536];
let mut prev_drops_per_iface: HashMap<Ipv4Addr, u32> = HashMap::new();
let mut tick = interval(normal_tick());
tick.tick().await; let mut tick_is_fast = false;
let dns_refresh_secs: u64 = epics_base_rs::runtime::env::get("EPICS_CA_DNS_REFRESH_SECS")
.and_then(|s| s.parse().ok())
.filter(|&n: &u64| n > 0)
.unwrap_or(60);
let mut dns_refresh = interval(Duration::from_secs(dns_refresh_secs));
dns_refresh.tick().await;
loop {
tokio::select! {
req = request_rx.recv() => {
let Some(req) = req else { return };
let mut immediate: Vec<u32> = Vec::new();
if let Some(cid) = handle_request_or_addr(&mut state, &mut addr_list, req) {
immediate.push(cid);
}
drain_pending_requests(&mut state, &mut addr_list, &mut request_rx, &mut immediate);
if !immediate.is_empty() {
fire_searches(&mut state, &immediate, &addr_list, &socket, &nameserver_send_txs).await;
}
}
result = socket.recv_with_meta_with_drops(&mut recv_buf) => {
let Ok((meta, drops)) = result else { continue };
let mut immediate: Vec<u32> = Vec::new();
drain_pending_requests(&mut state, &mut addr_list, &mut request_rx, &mut immediate);
if !immediate.is_empty() {
fire_searches(&mut state, &immediate, &addr_list, &socket, &nameserver_send_txs).await;
}
let prev = prev_drops_per_iface.insert(meta.iface_ip, drops).unwrap_or(0);
if drops != 0 && drops != prev {
tracing::debug!(
target: "epics_ca_rs::client::search",
iface_ip = %meta.iface_ip,
prev,
drops,
"CA client SEARCH per-NIC socket buffer overflow"
);
}
handle_udp_response(&mut state, &recv_buf[..meta.n], meta.src, &response_tx);
}
tcp_dgram = tcp_response_rx.recv() => {
let Some((bytes, src)) = tcp_dgram else { continue };
let mut immediate: Vec<u32> = Vec::new();
drain_pending_requests(&mut state, &mut addr_list, &mut request_rx, &mut immediate);
if !immediate.is_empty() {
fire_searches(&mut state, &immediate, &addr_list, &socket, &nameserver_send_txs).await;
}
handle_tcp_response(&mut state, &bytes, src, &response_tx);
}
_ = tick.tick() => {
process_bucket(&mut state, &addr_list, &socket, &nameserver_send_txs).await;
if state.fast_ticks_remaining > 0 {
state.fast_ticks_remaining -= 1;
}
}
_ = dns_refresh.tick() => {
for entry in addr_list.iter_mut() {
let prev_sock = entry.sock;
match entry.refresh_dns() {
Ok(new_sock) if new_sock != prev_sock => {
tracing::info!(
hostname = ?entry.hostname,
old = %prev_sock,
new = %new_sock,
"ca-rs: EPICS_CA_ADDR_LIST entry re-resolved"
);
}
Ok(_) => {}
Err(e) => {
tracing::debug!(
hostname = ?entry.hostname,
error = %e,
"ca-rs: DNS refresh failed; keeping cached IP"
);
}
}
}
}
}
if state.fast_ticks_remaining > 0 && !tick_is_fast {
tick = interval(FAST_TICK);
tick.tick().await; tick_is_fast = true;
} else if state.fast_ticks_remaining == 0 && tick_is_fast {
tick = interval(normal_tick());
tick.tick().await; tick_is_fast = false;
}
}
}
async fn run_nameserver_connection(
addr: SocketAddr,
mut outgoing_rx: mpsc::Receiver<Vec<u8>>,
response_tx: mpsc::UnboundedSender<ParsedDatagram>,
) {
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(30);
loop {
let stream =
match tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(addr)).await {
Ok(Ok(s)) => s,
_ => {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
continue;
}
};
let _ = stream.set_nodelay(true);
backoff = Duration::from_secs(1);
let (mut reader, mut writer) = stream.into_split();
let mut handshake = Vec::new();
let mut version = CaHeader::new(CA_PROTO_VERSION);
version.count = CA_MINOR_VERSION;
handshake.extend_from_slice(&version.to_bytes());
let user = epics_base_rs::runtime::env::get("USER")
.or_else(|| epics_base_rs::runtime::env::get("USERNAME"))
.unwrap_or_else(|| "unknown".to_string());
let user_payload = pad_string(&user);
let mut client = CaHeader::new(CA_PROTO_CLIENT_NAME);
client.set_payload_size(user_payload.len(), 0);
handshake.extend_from_slice(&client.to_bytes_extended());
handshake.extend_from_slice(&user_payload);
let host_payload = pad_string(&epics_base_rs::runtime::env::hostname());
let mut host = CaHeader::new(CA_PROTO_HOST_NAME);
host.set_payload_size(host_payload.len(), 0);
handshake.extend_from_slice(&host.to_bytes_extended());
handshake.extend_from_slice(&host_payload);
if writer.write_all(&handshake).await.is_err() {
tokio::time::sleep(backoff).await;
continue;
}
let resp_tx = response_tx.clone();
let read_task = epics_base_rs::runtime::task::spawn(async move {
let mut buf = vec![0u8; 8192];
let mut accumulated: Vec<u8> = Vec::new();
loop {
let n = match reader.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => n,
};
accumulated.extend_from_slice(&buf[..n]);
let mut consumed = 0usize;
let mut bad_frame = false;
loop {
if accumulated.len() - consumed < CaHeader::SIZE {
break;
}
let base_post =
u16::from_be_bytes([accumulated[consumed + 2], accumulated[consumed + 3]]);
let header_needed = if base_post == 0xFFFF { 24 } else { 16 };
if accumulated.len() - consumed < header_needed {
break;
}
let (hdr, hdr_size) =
match CaHeader::from_bytes_extended(&accumulated[consumed..]) {
Ok(v) => v,
Err(_) => {
bad_frame = true;
break;
}
};
let actual_post = hdr.actual_postsize();
if actual_post & 0x7 != 0 {
bad_frame = true;
break;
}
let msg_size = hdr_size + actual_post;
if accumulated.len() - consumed < msg_size {
break;
}
consumed += msg_size;
}
if consumed > 0 {
let frame_bytes = accumulated[..consumed].to_vec();
let _ = resp_tx.send((frame_bytes, addr));
accumulated.drain(..consumed);
}
if bad_frame {
tracing::warn!(
addr = ?addr,
"TCP nameserver framing error; closing circuit \
(C tcpiiu.cpp:1197-1202 parity)"
);
break;
}
}
});
let mut writer_failed = false;
let mut shutdown = false;
'pump: loop {
tokio::select! {
msg = outgoing_rx.recv() => {
let Some(bytes) = msg else {
shutdown = true;
break 'pump;
};
if writer.write_all(&bytes).await.is_err() {
writer_failed = true;
break 'pump;
}
}
_ = epics_base_rs::runtime::task::sleep(Duration::from_secs(60)) => {
let echo = CaHeader::new(CA_PROTO_ECHO);
if writer.write_all(&echo.to_bytes()).await.is_err() {
writer_failed = true;
break 'pump;
}
}
}
if read_task.is_finished() {
break 'pump;
}
}
read_task.abort();
let _ = read_task.await;
if shutdown {
return;
}
if writer_failed {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
}
}
fn handle_request_or_addr(
state: &mut SearchEngineState,
addr_list: &mut Vec<super::AddrEntry>,
req: SearchRequest,
) -> Option<u32> {
match req {
SearchRequest::AddAddress(addr) => {
if !addr_list.iter().any(|e| e.sock == addr) {
let port = match addr {
SocketAddr::V4(a) => a.port(),
SocketAddr::V6(a) => a.port(),
};
addr_list.push(super::AddrEntry::new(addr, None, port));
tracing::info!(?addr, "ca-rs: addr_list += (programmatic)");
}
None
}
SearchRequest::RemoveAddress(addr) => {
let before = addr_list.len();
addr_list.retain(|e| e.sock != addr);
if addr_list.len() != before {
tracing::info!(?addr, "ca-rs: addr_list -= (discovery removal)");
}
None
}
SearchRequest::SetAddressList(list) => {
tracing::info!(count = list.len(), "ca-rs: addr_list replaced");
*addr_list = list
.into_iter()
.map(|sock| {
let port = match sock {
SocketAddr::V4(a) => a.port(),
SocketAddr::V6(a) => a.port(),
};
super::AddrEntry::new(sock, None, port)
})
.collect();
None
}
other => handle_request(state, other),
}
}
fn drain_pending_requests(
state: &mut SearchEngineState,
addr_list: &mut Vec<super::AddrEntry>,
request_rx: &mut mpsc::UnboundedReceiver<SearchRequest>,
immediate: &mut Vec<u32>,
) {
while let Ok(req) = request_rx.try_recv() {
if let Some(cid) = handle_request_or_addr(state, addr_list, req) {
immediate.push(cid);
}
}
}
fn handle_request(state: &mut SearchEngineState, req: SearchRequest) -> Option<u32> {
match req {
SearchRequest::Schedule {
cid,
pv_name,
reason,
} => {
if reason == SearchReason::BeaconAnomaly && state.pending.contains_key(&cid) {
if let Some(p) = state.pending.get_mut(&cid) {
p.attempt = 0;
p.last_attempt = None;
}
state.fast_ticks_remaining = N_SEARCH_BUCKETS as u32;
return None;
}
let search_payload = build_search_payload(cid, &pv_name);
state.remove_channel(cid);
let bucket = placement_bucket(state.current_bucket, reason);
let p = PendingSearch {
cid,
pv_name,
search_payload,
bucket,
attempt: 0,
last_attempt: None,
};
state.buckets[bucket].push(cid);
state.pending.insert(cid, p);
if reason == SearchReason::BeaconAnomaly {
state.poke();
}
match reason {
SearchReason::Initial => Some(cid),
SearchReason::Reconnect | SearchReason::BeaconAnomaly => None,
}
}
SearchRequest::Cancel { cid } => {
state.remove_channel(cid);
None
}
SearchRequest::ConnectResult {
cid,
success,
server_addr,
} => {
if success {
if let Some(p) = state.pending.remove(&cid) {
state.buckets[p.bucket].retain(|x| *x != cid);
}
state.attempts.remove(&cid);
state.mark_connected(cid);
state.penalty.remove(&server_addr);
state.breakers.record_success(server_addr);
} else {
state.penalty.insert(
server_addr,
PenaltyEntry {
until: Instant::now() + PENALTY_DURATION,
},
);
let was_open = state.breakers.is_open(server_addr);
state.breakers.record_failure(server_addr);
if !was_open && state.breakers.is_open(server_addr) {
tracing::warn!(server = %server_addr, "circuit breaker tripped OPEN");
metrics::counter!("ca_client_circuit_breaker_open_total",
"server" => server_addr.to_string())
.increment(1);
}
}
None
}
SearchRequest::AddAddress(_)
| SearchRequest::RemoveAddress(_)
| SearchRequest::SetAddressList(_) => None,
}
}
fn handle_udp_response(
state: &mut SearchEngineState,
data: &[u8],
src: SocketAddr,
response_tx: &mpsc::UnboundedSender<SearchResponse>,
) {
handle_search_response(state, data, src, response_tx, false);
}
fn handle_tcp_response(
state: &mut SearchEngineState,
data: &[u8],
src: SocketAddr,
response_tx: &mpsc::UnboundedSender<SearchResponse>,
) {
handle_search_response(state, data, src, response_tx, true);
}
fn handle_search_response(
state: &mut SearchEngineState,
data: &[u8],
src: SocketAddr,
response_tx: &mpsc::UnboundedSender<SearchResponse>,
is_tcp: bool,
) {
if data.len() < CaHeader::SIZE {
return;
}
state.last_valid_seq = if is_tcp { Some(0) } else { None };
let recv_time = Instant::now();
let mut offset = 0;
while offset + CaHeader::SIZE <= data.len() {
let Ok(hdr) = CaHeader::from_bytes(&data[offset..]) else {
break;
};
if (hdr.postsize as usize) & 0x7 != 0 {
break;
}
match hdr.cmmd {
CA_PROTO_VERSION => {
if hdr.data_type == 1 {
state.last_valid_seq = Some(hdr.cid);
} else {
state.last_valid_seq = Some(0);
}
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
CA_PROTO_SEARCH => {
let server_port = hdr.data_type;
let server_ip = if hdr.cid == 0 || hdr.cid == u32::MAX {
src.ip()
} else {
std::net::IpAddr::V4(Ipv4Addr::from(hdr.cid.to_be_bytes()))
};
metrics::counter!("ca_client_search_responses_total").increment(1);
let server_addr = SocketAddr::new(server_ip, server_port as u16);
let cid = hdr.available;
if let std::net::IpAddr::V4(v4) = server_ip {
if state.ignored_servers.contains(&v4) {
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
}
if let std::net::IpAddr::V4(v4) = src.ip() {
if state.ignored_servers.contains(&v4) {
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
}
if let Some((pv_name, prev_addr)) = state.resolved.get(&cid) {
if *prev_addr != server_addr {
let pv_name = pv_name.clone();
let prev_addr = *prev_addr;
tracing::warn!(
target: "epics_ca_rs::client::search",
pv = %pv_name,
cid,
connected_to = %prev_addr,
but_also_on = %server_addr,
"Channel multiply defined: PV is also hosted on a second server"
);
metrics::counter!("ca_client_multiply_defined_pv_total").increment(1);
let _ = response_tx.send(SearchResponse::MultiplyDefined {
pv_name,
prev_addr,
new_addr: server_addr,
});
}
}
let penalized = state
.penalty
.get(&server_addr)
.map(|p| p.until > recv_time)
.unwrap_or(false);
if penalized || state.breakers.is_blocking(server_addr) {
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
if state.last_valid_seq.is_none() {
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
if let Some(p) = state.pending.get(&cid) {
if !state.breakers.allow(server_addr) {
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
let bucket = p.bucket;
let pv_name = p.pv_name.clone();
state.pending.remove(&cid);
state.buckets[bucket].retain(|x| *x != cid);
tracing::debug!(
pv = %pv_name, cid, server = %server_addr,
"PV search resolved"
);
if state.resolved.len() >= MULTIPLY_DEFINED_RESOLVED_CAP {
if let Some(&victim) = state.resolved.keys().next() {
state.resolved.remove(&victim);
}
}
state.resolved.insert(cid, (pv_name, server_addr));
let _ = response_tx.send(SearchResponse::Found { cid, server_addr });
}
}
CA_PROTO_NOT_FOUND => {
}
_ => {}
}
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
}
}
async fn process_bucket(
state: &mut SearchEngineState,
addr_list: &[super::AddrEntry],
socket: &AsyncUdpV4,
nameserver_txs: &[mpsc::Sender<Vec<u8>>],
) {
let now = Instant::now();
state.penalty.retain(|_, entry| entry.until > now);
let current = state.current_bucket;
let bucket_ids = std::mem::take(&mut state.buckets[current]);
let mut to_send: Vec<u32> = Vec::new();
{
let pending = &mut state.pending;
let buckets = &mut state.buckets;
for sid in bucket_ids {
let Some(p) = pending.get_mut(&sid) else {
continue;
};
p.last_attempt = Some(now);
p.attempt = p.attempt.saturating_add(1);
let attempt = p.attempt;
to_send.push(sid);
let bucket_sizes = |idx: usize| buckets[idx].len();
let next = cascade_smoothed_next(current, attempt, bucket_sizes);
if let Some(p) = pending.get_mut(&sid) {
p.bucket = next;
}
buckets[next].push(sid);
}
}
state.current_bucket = (state.current_bucket + 1) % N_SEARCH_BUCKETS;
if to_send.is_empty() {
return;
}
fire_searches(state, &to_send, addr_list, socket, nameserver_txs).await;
}
async fn fire_searches(
state: &mut SearchEngineState,
cids: &[u32],
addr_list: &[super::AddrEntry],
socket: &AsyncUdpV4,
nameserver_txs: &[mpsc::Sender<Vec<u8>>],
) {
state.dgram_seq = state.dgram_seq.wrapping_add(1);
let version_hdr = {
let mut h = CaHeader::new(CA_PROTO_VERSION);
h.count = CA_MINOR_VERSION;
h.data_type = 1;
h.cid = state.dgram_seq;
h.to_bytes()
};
let mut current_frame = Vec::with_capacity(MAX_UDP_SEND);
current_frame.extend_from_slice(&version_hdr);
for sid in cids {
let Some(p) = state.pending.get(sid) else {
continue;
};
let payload = p.search_payload.clone();
state
.attempts
.entry(*sid)
.or_insert_with(|| AtomicU32::new(0))
.fetch_add(1, Ordering::Relaxed);
if current_frame.len() + payload.len() > MAX_UDP_SEND
&& current_frame.len() > CaHeader::SIZE
{
for entry in addr_list {
send_with_fanout(
socket,
¤t_frame,
entry.sock,
"bucket",
&mut state.send_errors,
)
.await;
}
for ns_tx in nameserver_txs {
ns_try_send(ns_tx, current_frame.clone());
}
current_frame.clear();
current_frame.extend_from_slice(&version_hdr);
}
if CaHeader::SIZE + payload.len() > MAX_UDP_SEND {
let mut solo = Vec::with_capacity(CaHeader::SIZE + payload.len());
solo.extend_from_slice(&version_hdr);
solo.extend_from_slice(&payload);
for entry in addr_list {
send_with_fanout(socket, &solo, entry.sock, "solo", &mut state.send_errors).await;
}
for ns_tx in nameserver_txs {
ns_try_send(ns_tx, solo.clone());
}
} else {
current_frame.extend_from_slice(&payload);
}
}
if current_frame.len() > CaHeader::SIZE {
for entry in addr_list {
send_with_fanout(
socket,
¤t_frame,
entry.sock,
"flush",
&mut state.send_errors,
)
.await;
}
for ns_tx in nameserver_txs {
ns_try_send(ns_tx, current_frame.clone());
}
}
}
fn ns_try_send(ns_tx: &mpsc::Sender<Vec<u8>>, frame: Vec<u8>) {
use tokio::sync::mpsc::error::TrySendError;
match ns_tx.try_send(frame) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
metrics::counter!("ca_client_nameserver_queue_drops_total").increment(1);
tracing::warn!(
"EPICS_CA_NAME_SERVERS queue full — dropping search frame \
(peer is slow/unresponsive; raise EPICS_CA_NAMESERVER_QUEUE_DEPTH \
if the peer is healthy)"
);
}
Err(TrySendError::Closed(_)) => {
}
}
}
fn build_search_payload(cid: u32, pv_name: &str) -> Vec<u8> {
let pv_payload = pad_string(pv_name);
let mut search_hdr = CaHeader::new(CA_PROTO_SEARCH);
search_hdr.postsize = pv_payload.len() as u16;
search_hdr.data_type = CA_DONT_REPLY;
search_hdr.count = CA_MINOR_VERSION;
search_hdr.cid = cid;
search_hdr.available = cid;
let mut payload = Vec::with_capacity(CaHeader::SIZE + pv_payload.len());
payload.extend_from_slice(&search_hdr.to_bytes());
payload.extend_from_slice(&pv_payload);
payload
}
#[cfg(test)]
mod tests {
use super::*;
fn schedule_initial(state: &mut SearchEngineState, cid: u32, pv_name: &str) {
handle_request(
state,
SearchRequest::Schedule {
cid,
pv_name: pv_name.to_string(),
reason: SearchReason::Initial,
},
);
}
#[test]
#[serial_test::serial]
fn ex_r2_max_search_period_matches_c_default_and_lower_bound() {
let restore = std::env::var("EPICS_CA_MAX_SEARCH_PERIOD").ok();
unsafe { std::env::remove_var("EPICS_CA_MAX_SEARCH_PERIOD") };
assert_eq!(
max_search_period_secs(),
300.0,
"unset env must default to C's 300 s, not the old 30 s"
);
assert_eq!(normal_tick(), Duration::from_secs(10));
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "45") };
assert_eq!(
max_search_period_secs(),
60.0,
"a configured 45 s must clamp UP to C's 60 s lower bound"
);
assert_eq!(normal_tick(), Duration::from_secs(2));
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "120") };
assert_eq!(max_search_period_secs(), 120.0);
assert_eq!(normal_tick(), Duration::from_secs(4));
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "300") };
assert_eq!(max_search_period_secs(), 300.0);
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "not-a-number") };
assert_eq!(
max_search_period_secs(),
300.0,
"a non-numeric value must fall back to the 300 s default"
);
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "-5") };
assert_eq!(
max_search_period_secs(),
60.0,
"a negative value must clamp to the 60 s lower bound, not default"
);
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "0") };
assert_eq!(max_search_period_secs(), 60.0);
match restore {
Some(v) => unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", v) },
None => unsafe { std::env::remove_var("EPICS_CA_MAX_SEARCH_PERIOD") },
}
}
#[tokio::test]
async fn nameserver_queue_drops_when_full_no_leak() {
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(2);
ns_try_send(&tx, vec![1, 2, 3]);
ns_try_send(&tx, vec![4, 5, 6]);
ns_try_send(&tx, vec![7, 8, 9]);
assert_eq!(rx.try_recv().unwrap(), vec![1, 2, 3]);
assert_eq!(rx.try_recv().unwrap(), vec![4, 5, 6]);
assert!(
rx.try_recv().is_err(),
"third frame must be dropped, not queued (lp #739789)"
);
}
#[tokio::test]
async fn nameserver_queue_handles_closed_receiver() {
let (tx, rx) = mpsc::channel::<Vec<u8>>(2);
drop(rx);
ns_try_send(&tx, vec![1, 2, 3]);
}
#[test]
fn build_search_payload_size() {
let payload = build_search_payload(42, "TEST:PV");
assert_eq!(payload.len(), 24);
}
#[test]
fn build_search_payload_alignment() {
let payload = build_search_payload(1, "A");
assert_eq!(payload.len(), CaHeader::SIZE + 8);
assert_eq!(payload.len() % 8, 0);
}
#[test]
fn schedule_places_into_next_bucket() {
let mut state = SearchEngineState::new();
state.current_bucket = 5;
schedule_initial(&mut state, 1, "PV:1");
let p = state.pending.get(&1).unwrap();
assert_eq!(p.bucket, 6);
assert_eq!(state.buckets[6], vec![1]);
assert_eq!(state.buckets[5], Vec::<u32>::new());
}
#[test]
fn cancel_removes_from_bucket() {
let mut state = SearchEngineState::new();
schedule_initial(&mut state, 1, "PV:1");
let bucket = state.pending.get(&1).unwrap().bucket;
handle_request(&mut state, SearchRequest::Cancel { cid: 1 });
assert!(state.pending.is_empty());
assert!(state.buckets[bucket].is_empty());
}
#[test]
fn add_then_remove_address_round_trip() {
let mut state = SearchEngineState::new();
let mut addr_list: Vec<super::super::AddrEntry> = Vec::new();
let a: SocketAddr = "10.0.0.7:5064".parse().unwrap();
let b: SocketAddr = "10.0.0.8:5064".parse().unwrap();
handle_request_or_addr(&mut state, &mut addr_list, SearchRequest::AddAddress(a));
handle_request_or_addr(&mut state, &mut addr_list, SearchRequest::AddAddress(b));
assert_eq!(addr_list.len(), 2);
handle_request_or_addr(&mut state, &mut addr_list, SearchRequest::RemoveAddress(a));
assert_eq!(addr_list.len(), 1);
assert!(addr_list.iter().all(|e| e.sock == b));
handle_request_or_addr(&mut state, &mut addr_list, SearchRequest::RemoveAddress(a));
assert_eq!(addr_list.len(), 1);
}
#[test]
fn poke_resets_attempts_and_engages_fast_mode() {
let mut state = SearchEngineState::new();
schedule_initial(&mut state, 1, "PV:1");
if let Some(p) = state.pending.get_mut(&1) {
p.attempt = 3;
}
state.poke();
let p = state.pending.get(&1).unwrap();
assert_eq!(p.attempt, 0, "poke must reset per-channel retry counter");
assert_eq!(state.fast_ticks_remaining, N_SEARCH_BUCKETS as u32);
}
#[test]
fn beacon_anomaly_for_pending_channel_keeps_bucket() {
let mut state = SearchEngineState::new();
handle_request(
&mut state,
SearchRequest::Schedule {
cid: 7,
pv_name: "PV:7".into(),
reason: SearchReason::Reconnect,
},
);
let original_bucket = state.pending.get(&7).unwrap().bucket;
if let Some(p) = state.pending.get_mut(&7) {
p.attempt = 4;
}
handle_request(
&mut state,
SearchRequest::Schedule {
cid: 7,
pv_name: "PV:7".into(),
reason: SearchReason::BeaconAnomaly,
},
);
let p = state.pending.get(&7).unwrap();
assert_eq!(p.bucket, original_bucket, "poke must not relocate bucket");
assert_eq!(p.attempt, 0);
assert_eq!(state.fast_ticks_remaining, N_SEARCH_BUCKETS as u32);
let count = state.buckets[original_bucket]
.iter()
.filter(|x| **x == 7)
.count();
assert_eq!(count, 1);
}
#[test]
fn beacon_anomaly_schedule_pokes_engine() {
let mut state = SearchEngineState::new();
schedule_initial(&mut state, 1, "PV:1");
if let Some(p) = state.pending.get_mut(&1) {
p.attempt = 2;
}
handle_request(
&mut state,
SearchRequest::Schedule {
cid: 2,
pv_name: "PV:2".into(),
reason: SearchReason::BeaconAnomaly,
},
);
assert_eq!(state.pending.get(&1).unwrap().attempt, 0);
assert_eq!(state.pending.get(&2).unwrap().attempt, 0);
assert_eq!(state.fast_ticks_remaining, N_SEARCH_BUCKETS as u32);
}
#[test]
fn connect_success_clears_pending_and_penalty() {
let mut state = SearchEngineState::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
schedule_initial(&mut state, 1, "PV:1");
state.penalty.insert(
server,
PenaltyEntry {
until: Instant::now() + Duration::from_secs(60),
},
);
handle_request(
&mut state,
SearchRequest::ConnectResult {
cid: 1,
success: true,
server_addr: server,
},
);
assert!(state.pending.is_empty());
assert!(!state.penalty.contains_key(&server));
}
#[test]
fn connect_failure_inserts_penalty() {
let mut state = SearchEngineState::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
schedule_initial(&mut state, 1, "PV:1");
handle_request(
&mut state,
SearchRequest::ConnectResult {
cid: 1,
success: false,
server_addr: server,
},
);
assert!(state.pending.contains_key(&1));
assert!(state.penalty.contains_key(&server));
}
#[test]
fn n_search_buckets_is_30() {
let state = SearchEngineState::new();
assert_eq!(state.buckets.len(), N_SEARCH_BUCKETS);
assert_eq!(N_SEARCH_BUCKETS, 30);
}
#[test]
fn fast_tick_revolution_covers_full_ring() {
let revolution = FAST_TICK * N_SEARCH_BUCKETS as u32;
assert!(revolution >= Duration::from_secs(5));
assert!(revolution <= Duration::from_secs(7));
}
#[test]
fn reconnect_and_beacon_anomaly_skip_immediate_fire() {
let mut state = SearchEngineState::new();
let cid_initial = handle_request(
&mut state,
SearchRequest::Schedule {
cid: 100,
pv_name: "PV:Initial".into(),
reason: SearchReason::Initial,
},
);
assert_eq!(
cid_initial,
Some(100),
"Initial must return Some for immediate fire"
);
let cid_reconnect = handle_request(
&mut state,
SearchRequest::Schedule {
cid: 101,
pv_name: "PV:Reconnect".into(),
reason: SearchReason::Reconnect,
},
);
assert_eq!(cid_reconnect, None, "Reconnect must NOT immediately fire");
let cid_anomaly = handle_request(
&mut state,
SearchRequest::Schedule {
cid: 102,
pv_name: "PV:Anomaly".into(),
reason: SearchReason::BeaconAnomaly,
},
);
assert_eq!(
cid_anomaly, None,
"BeaconAnomaly NEW must NOT immediately fire"
);
}
#[test]
fn placement_reconnect_uses_current_bucket() {
for current in 0..N_SEARCH_BUCKETS {
assert_eq!(
placement_bucket(current, SearchReason::Reconnect),
current,
"Reconnect must drop in current bucket (got {current})"
);
}
}
#[test]
fn placement_initial_and_beacon_anomaly_one_bucket_ahead() {
for reason in [SearchReason::Initial, SearchReason::BeaconAnomaly] {
assert_eq!(placement_bucket(0, reason), 1);
assert_eq!(placement_bucket(13, reason), 14);
assert_eq!(
placement_bucket(N_SEARCH_BUCKETS - 1, reason),
0,
"wrap-around at ring boundary"
);
}
}
#[test]
fn cascade_next_implements_pvxs_nsearch_escalation() {
let no_imbalance = |_| 0usize;
let current = 7;
assert_eq!(
cascade_smoothed_next(current, 1, no_imbalance),
(current + 1) % N_SEARCH_BUCKETS,
);
assert_eq!(
cascade_smoothed_next(current, 2, no_imbalance),
(current + 2) % N_SEARCH_BUCKETS,
);
assert_eq!(
cascade_smoothed_next(current, 10, no_imbalance),
(current + 10) % N_SEARCH_BUCKETS,
);
assert_eq!(
cascade_smoothed_next(current, N_SEARCH_BUCKETS as u32, no_imbalance),
current,
"attempt at cap wraps to current (full-ring steady state)",
);
assert_eq!(
cascade_smoothed_next(current, 1_000_000, no_imbalance),
current,
"attempt > cap stays clamped",
);
}
#[test]
fn cascade_smoothing_defers_when_next_is_overloaded() {
let current = 5;
let attempt = 1;
let overloaded = |idx: usize| if idx == 6 { 200 } else { 0 };
assert_eq!(
cascade_smoothed_next(current, attempt, overloaded),
7,
"delta > 100 must defer"
);
let below = |idx: usize| if idx == 6 { 90 } else { 0 };
assert_eq!(
cascade_smoothed_next(current, attempt, below),
6,
"delta < 100 stays in next"
);
let balanced = |idx: usize| if idx == 6 || idx == 7 { 200 } else { 0 };
assert_eq!(cascade_smoothed_next(current, attempt, balanced), 6);
let reverse = |idx: usize| if idx == 7 { 200 } else { 0 };
assert_eq!(
cascade_smoothed_next(current, attempt, reverse),
6,
"smoothing only defers forward, never backward"
);
}
#[test]
fn cascade_smoothing_boundary_at_delta_100() {
let current = 5;
let attempt = 1;
let exactly_100 = |idx: usize| if idx == 6 { 100 } else { 0 };
assert_eq!(
cascade_smoothed_next(current, attempt, exactly_100),
6,
"delta == 100 must NOT trigger"
);
let just_over_100 = |idx: usize| if idx == 6 { 101 } else { 0 };
assert_eq!(
cascade_smoothed_next(current, attempt, just_over_100),
7,
"delta == 101 must trigger"
);
}
#[test]
fn mass_5000_reconnect_spreads_at_least_two_buckets() {
const N_CHANNELS: usize = 5000;
let current = 0;
let attempt = 1;
let mut buckets = vec![0usize; N_SEARCH_BUCKETS];
for _sid in 0..N_CHANNELS {
let bucket_sizes = |idx: usize| buckets[idx];
let next = cascade_smoothed_next(current, attempt, bucket_sizes);
buckets[next] += 1;
}
let total: usize = buckets.iter().sum();
assert_eq!(
total, N_CHANNELS,
"every channel must be placed exactly once"
);
let nonempty = buckets.iter().filter(|&&n| n > 0).count();
assert!(
nonempty >= 2,
"smoothing must split the load across ≥2 buckets; got {} non-empty: {buckets:?}",
nonempty
);
let max_load = *buckets.iter().max().unwrap();
let cap = (N_CHANNELS * 60) / 100;
assert!(
max_load <= cap,
"no single bucket may carry > {cap} entries (60% of {N_CHANNELS}); \
got max {max_load} in {buckets:?}"
);
}
#[test]
fn mass_5000_multi_tick_distribution_covers_full_ring() {
const N_CHANNELS: usize = 5000;
const TICKS: usize = 2 * N_SEARCH_BUCKETS;
let mut buckets: Vec<Vec<u32>> = (0..N_SEARCH_BUCKETS).map(|_| Vec::new()).collect();
buckets[0] = (0..N_CHANNELS as u32).collect();
let mut attempts = vec![0u32; N_CHANNELS];
let mut max_per_tick = 0usize;
let mut buckets_visited = [false; N_SEARCH_BUCKETS];
let mut current = 0;
for _ in 0..TICKS {
buckets_visited[current] = true;
let processing = std::mem::take(&mut buckets[current]);
max_per_tick = max_per_tick.max(processing.len());
for sid in processing {
attempts[sid as usize] = attempts[sid as usize].saturating_add(1);
let attempt = attempts[sid as usize];
let bucket_sizes = |idx: usize| buckets[idx].len();
let next = cascade_smoothed_next(current, attempt, bucket_sizes);
buckets[next].push(sid);
}
current = (current + 1) % N_SEARCH_BUCKETS;
}
let visited_count = buckets_visited.iter().filter(|&&v| v).count();
assert_eq!(
visited_count, N_SEARCH_BUCKETS,
"current_bucket must rotate through every slot in {TICKS} ticks; got {visited_count}"
);
assert!(
max_per_tick <= N_CHANNELS,
"per-tick processing load must not exceed initial burst {N_CHANNELS}; got {max_per_tick}"
);
let still_pending: usize = buckets.iter().map(|b| b.len()).sum();
assert_eq!(
still_pending, N_CHANNELS,
"sids must not be lost across {TICKS} ticks; got {still_pending} pending of {N_CHANNELS}"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn reconnect_search_broadcasts_within_one_tick() {
use std::net::Ipv4Addr;
let restore = std::env::var("EPICS_CA_MAX_SEARCH_PERIOD").ok();
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "60") };
let sniffer = AsyncUdpV4::bind_single(Ipv4Addr::LOCALHOST, 0, false).expect("bind sniffer");
let sniffer_addr = sniffer
.local_addrs()
.first()
.copied()
.expect("sniffer local_addr");
let (req_tx, req_rx) = mpsc::unbounded_channel();
let (resp_tx, _resp_rx) = mpsc::unbounded_channel();
let engine_handle = tokio::spawn(run_search_engine(
vec![crate::client::AddrEntry::new(
sniffer_addr,
None,
sniffer_addr.port(),
)],
Vec::new(),
req_rx,
resp_tx,
std::sync::Arc::new(dashmap::DashMap::new()),
));
let cid = 42u32;
let pv = "TEST:CA:RECONNECT:PV";
let started = std::time::Instant::now();
req_tx
.send(SearchRequest::Schedule {
cid,
pv_name: pv.into(),
reason: SearchReason::Reconnect,
})
.expect("schedule send");
let mut buf = vec![0u8; 4096];
let recv_result = tokio::time::timeout(Duration::from_secs(5), async {
loop {
let (n, _from) = sniffer.recv_from(&mut buf).await?;
if buf[..n].windows(pv.len()).any(|w| w == pv.as_bytes()) {
return Ok::<usize, std::io::Error>(n);
}
}
})
.await;
let elapsed = started.elapsed();
engine_handle.abort();
match restore {
Some(v) => unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", v) },
None => unsafe { std::env::remove_var("EPICS_CA_MAX_SEARCH_PERIOD") },
}
let n = recv_result
.expect("Reconnect SEARCH must arrive within 5 s")
.expect("recv_from must not error");
assert!(
n > 0,
"received an empty datagram — Reconnect SEARCH path is broken"
);
assert!(
elapsed < Duration::from_millis(4000),
"Reconnect should broadcast within one tick (~2 s at the \
pinned 60 s period); took {elapsed:?} — bucket placement \
/ tick handler may have regressed"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn retry_escalation_pvxs_pattern() {
use std::net::Ipv4Addr;
let restore = std::env::var("EPICS_CA_MAX_SEARCH_PERIOD").ok();
unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", "60") };
let sniffer = AsyncUdpV4::bind_single(Ipv4Addr::LOCALHOST, 0, false).expect("bind sniffer");
let sniffer_addr = sniffer
.local_addrs()
.first()
.copied()
.expect("sniffer addr");
let (req_tx, req_rx) = mpsc::unbounded_channel();
let (resp_tx, _resp_rx) = mpsc::unbounded_channel();
let engine_handle = tokio::spawn(run_search_engine(
vec![crate::client::AddrEntry::new(
sniffer_addr,
None,
sniffer_addr.port(),
)],
Vec::new(),
req_rx,
resp_tx,
std::sync::Arc::new(dashmap::DashMap::new()),
));
let cid = 77u32;
let pv = "ESCALATION:CA";
let started = std::time::Instant::now();
req_tx
.send(SearchRequest::Schedule {
cid,
pv_name: pv.into(),
reason: SearchReason::Reconnect,
})
.expect("schedule");
let mut buf = vec![0u8; 4096];
let mut packet_times = Vec::new();
for i in 0..3 {
let t = tokio::time::timeout(Duration::from_secs(12), async {
loop {
let (n, _) = sniffer.recv_from(&mut buf).await.expect("recv");
if buf[..n].windows(pv.len()).any(|w| w == pv.as_bytes()) {
return started.elapsed();
}
}
})
.await
.unwrap_or_else(|_| panic!("SEARCH #{} did not arrive within 12 s", i + 1));
packet_times.push(t);
}
engine_handle.abort();
match restore {
Some(v) => unsafe { std::env::set_var("EPICS_CA_MAX_SEARCH_PERIOD", v) },
None => unsafe { std::env::remove_var("EPICS_CA_MAX_SEARCH_PERIOD") },
}
assert!(
packet_times[0] < Duration::from_millis(3000),
"first SEARCH should arrive ~2 s after Schedule (one tick \
at the pinned 60 s period); got {:?}",
packet_times[0]
);
let gap_12 = packet_times[1].saturating_sub(packet_times[0]);
let gap_23 = packet_times[2].saturating_sub(packet_times[1]);
assert!(
(1500..=3000).contains(&(gap_12.as_millis() as u64)),
"gap #1→#2 should be ~2 s (nSearch=1, one 2 s cycle); \
got {gap_12:?}. Production retry escalation may have regressed."
);
assert!(
(3000..=5400).contains(&(gap_23.as_millis() as u64)),
"gap #2→#3 should be ~4 s (nSearch=2, two 2 s cycles); \
got {gap_23:?}. Production retry escalation may have regressed."
);
}
fn search_reply(cid: u32, server: SocketAddr) -> Vec<u8> {
let ip = match server.ip() {
std::net::IpAddr::V4(v4) => v4,
std::net::IpAddr::V6(_) => unreachable!("test uses IPv4 only"),
};
let mut hdr = CaHeader::new(CA_PROTO_SEARCH);
hdr.data_type = server.port();
hdr.cid = u32::from_be_bytes(ip.octets());
hdr.available = cid;
hdr.set_payload_size(8, 1);
let mut buf = hdr.to_bytes().to_vec();
buf.extend_from_slice(&(CA_MINOR_VERSION).to_be_bytes());
buf.extend_from_slice(&[0u8; 6]); buf
}
#[test]
fn mr_r3_reconnect_to_new_server_no_false_multiply_defined() {
let server_a: SocketAddr = "10.0.0.1:5064".parse().unwrap();
let server_b: SocketAddr = "10.0.0.2:5064".parse().unwrap();
let src_b: SocketAddr = "10.0.0.2:5064".parse().unwrap();
let cid = 1u32;
let pv = "MR:R3:PV";
let mut state = SearchEngineState::new();
let mut addr_list: Vec<super::super::AddrEntry> = Vec::new();
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<SearchResponse>();
let (req_tx, mut req_rx) = mpsc::unbounded_channel::<SearchRequest>();
schedule_initial(&mut state, cid, pv);
handle_tcp_response(&mut state, &search_reply(cid, server_a), server_a, &resp_tx);
match resp_rx.try_recv() {
Ok(SearchResponse::Found { server_addr, .. }) => {
assert_eq!(server_addr, server_a);
}
Ok(SearchResponse::MultiplyDefined { .. }) => {
panic!("first reply must resolve as Found, not MultiplyDefined")
}
Err(e) => panic!("expected Found from server A, got recv error {e:?}"),
}
handle_request(
&mut state,
SearchRequest::ConnectResult {
cid,
success: true,
server_addr: server_a,
},
);
assert!(
state.resolved.contains_key(&cid),
"resolved entry kept past ConnectResult{{success}}"
);
req_tx
.send(SearchRequest::Schedule {
cid,
pv_name: pv.into(),
reason: SearchReason::Reconnect,
})
.expect("reconnect schedule send");
let mut immediate: Vec<u32> = Vec::new();
drain_pending_requests(&mut state, &mut addr_list, &mut req_rx, &mut immediate);
assert!(
!state.resolved.contains_key(&cid),
"Schedule{{Reconnect}} must invalidate the stale resolved \
entry before the server-B reply is parsed"
);
handle_tcp_response(&mut state, &search_reply(cid, server_b), src_b, &resp_tx);
match resp_rx.try_recv() {
Ok(SearchResponse::Found { server_addr, .. }) => {
assert_eq!(
server_addr, server_b,
"reconnect must resolve to the new server B"
);
}
Ok(SearchResponse::MultiplyDefined {
prev_addr,
new_addr,
..
}) => panic!(
"false ECA_DBLCHNL after legitimate server migration: \
prev={prev_addr} new={new_addr} — the reconnect Schedule \
was not drained before the reply was parsed"
),
Err(e) => panic!("expected Found from server B, got recv error {e:?}"),
}
assert!(
resp_rx.try_recv().is_err(),
"no further responses expected after the single Found"
);
}
}