use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr};
use std::time::{Duration, Instant};
use epics_base_rs::net::AsyncUdpV4;
use epics_base_rs::runtime::sync::mpsc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::interval;
use crate::protocol::*;
use super::circuit_breaker::CircuitBreakerRegistry;
use super::types::{SearchReason, SearchRequest, SearchResponse};
type ParsedDatagram = (Vec<u8>, SocketAddr);
async fn send_with_fanout(
socket: &AsyncUdpV4,
buf: &[u8],
addr: SocketAddr,
site: &'static str,
send_errors: &mut HashMap<SocketAddr, std::io::ErrorKind>,
) {
let needs_fanout = match addr {
SocketAddr::V4(v4) => v4.ip().is_broadcast() || v4.ip().is_multicast(),
SocketAddr::V6(_) => false,
};
let result = if needs_fanout {
socket.fanout_to(buf, addr).await.map(|_| ())
} else {
socket.send_to(buf, addr).await.map(|_| ())
};
match result {
Ok(()) => {
if let Some(prev) = send_errors.remove(&addr) {
tracing::info!(
target: "epics_ca_rs::search",
%addr, site, prev_error = ?prev,
"search send_to: recovered"
);
}
}
Err(e) => {
let kind = e.kind();
let prev = send_errors.insert(addr, kind);
if prev != Some(kind) {
tracing::warn!(
target: "epics_ca_rs::search",
%addr,
site,
error = %e,
"search send_to failed"
);
}
}
}
}
const N_SEARCH_BUCKETS: usize = 30;
const RETRY_HOLDOFF_CYCLES: u32 = 10;
const NORMAL_TICK: Duration = Duration::from_secs(1);
const FAST_TICK: Duration = Duration::from_millis(200);
const MAX_UDP_SEND: usize = 1024;
const PENALTY_DURATION: Duration = Duration::from_secs(30);
struct PendingSearch {
#[allow(dead_code)]
cid: u32,
#[allow(dead_code)]
pv_name: String,
search_payload: Vec<u8>,
bucket: usize,
attempt: u32,
holdoff_cycles: u32,
#[allow(dead_code)]
last_attempt: Option<Instant>,
}
struct PenaltyEntry {
until: Instant,
}
struct SearchEngineState {
pending: HashMap<u32, PendingSearch>,
buckets: Vec<Vec<u32>>,
current_bucket: usize,
fast_ticks_remaining: u32,
penalty: HashMap<SocketAddr, PenaltyEntry>,
breakers: CircuitBreakerRegistry,
dgram_seq: u32,
last_valid_seq: Option<u32>,
send_errors: HashMap<SocketAddr, std::io::ErrorKind>,
}
impl SearchEngineState {
fn new() -> Self {
Self {
pending: HashMap::new(),
buckets: (0..N_SEARCH_BUCKETS).map(|_| Vec::new()).collect(),
current_bucket: 0,
fast_ticks_remaining: 0,
penalty: HashMap::new(),
breakers: CircuitBreakerRegistry::new(),
dgram_seq: 0,
last_valid_seq: None,
send_errors: HashMap::new(),
}
}
fn remove_channel(&mut self, cid: u32) {
if let Some(p) = self.pending.remove(&cid) {
self.buckets[p.bucket].retain(|x| *x != cid);
}
}
fn poke(&mut self) {
for p in self.pending.values_mut() {
p.attempt = 0;
p.holdoff_cycles = 0;
p.last_attempt = None;
}
self.fast_ticks_remaining = N_SEARCH_BUCKETS as u32;
}
}
pub(crate) async fn run_search_engine(
mut addr_list: Vec<SocketAddr>,
nameserver_addrs: Vec<SocketAddr>,
mut request_rx: mpsc::UnboundedReceiver<SearchRequest>,
response_tx: mpsc::UnboundedSender<SearchResponse>,
) {
let socket = match AsyncUdpV4::bind(0, true) {
Ok(s) => s,
Err(_) => return,
};
let _ = socket.set_recv_buffer_size(256 * 1024);
let (tcp_response_tx, mut tcp_response_rx) = mpsc::unbounded_channel::<ParsedDatagram>();
let mut nameserver_send_txs: Vec<mpsc::UnboundedSender<Vec<u8>>> = Vec::new();
for addr in nameserver_addrs {
let (tx, rx) = mpsc::unbounded_channel::<Vec<u8>>();
nameserver_send_txs.push(tx);
let resp_tx = tcp_response_tx.clone();
epics_base_rs::runtime::task::spawn(async move {
run_nameserver_connection(addr, rx, resp_tx).await;
});
}
let mut state = SearchEngineState::new();
let mut recv_buf = [0u8; 65536];
let mut tick = interval(NORMAL_TICK);
tick.tick().await; let mut tick_is_fast = false;
loop {
tokio::select! {
req = request_rx.recv() => {
let Some(req) = req else { return };
let mut immediate: Vec<u32> = Vec::new();
if let Some(cid) = handle_request_or_addr(&mut state, &mut addr_list, req) {
immediate.push(cid);
}
while let Ok(req) = request_rx.try_recv() {
if let Some(cid) = handle_request_or_addr(&mut state, &mut addr_list, req) {
immediate.push(cid);
}
}
if !immediate.is_empty() {
fire_searches(&mut state, &immediate, &addr_list, &socket, &nameserver_send_txs).await;
}
}
result = socket.recv_from(&mut recv_buf) => {
let Ok((len, src)) = result else { continue };
handle_udp_response(&mut state, &recv_buf[..len], src, &response_tx);
}
tcp_dgram = tcp_response_rx.recv() => {
let Some((bytes, src)) = tcp_dgram else { continue };
handle_udp_response(&mut state, &bytes, src, &response_tx);
}
_ = tick.tick() => {
process_bucket(&mut state, &addr_list, &socket, &nameserver_send_txs).await;
if state.fast_ticks_remaining > 0 {
state.fast_ticks_remaining -= 1;
}
}
}
if state.fast_ticks_remaining > 0 && !tick_is_fast {
tick = interval(FAST_TICK);
tick.tick().await; tick_is_fast = true;
} else if state.fast_ticks_remaining == 0 && tick_is_fast {
tick = interval(NORMAL_TICK);
tick.tick().await; tick_is_fast = false;
}
}
}
async fn run_nameserver_connection(
addr: SocketAddr,
mut outgoing_rx: mpsc::UnboundedReceiver<Vec<u8>>,
response_tx: mpsc::UnboundedSender<ParsedDatagram>,
) {
let mut backoff = Duration::from_secs(1);
let max_backoff = Duration::from_secs(30);
loop {
let stream =
match tokio::time::timeout(Duration::from_secs(5), TcpStream::connect(addr)).await {
Ok(Ok(s)) => s,
_ => {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
continue;
}
};
let _ = stream.set_nodelay(true);
backoff = Duration::from_secs(1);
let (mut reader, mut writer) = stream.into_split();
let mut handshake = Vec::new();
let mut version = CaHeader::new(CA_PROTO_VERSION);
version.count = CA_MINOR_VERSION;
handshake.extend_from_slice(&version.to_bytes());
let host_payload = pad_string(&epics_base_rs::runtime::env::hostname());
let mut host = CaHeader::new(CA_PROTO_HOST_NAME);
host.postsize = host_payload.len() as u16;
handshake.extend_from_slice(&host.to_bytes());
handshake.extend_from_slice(&host_payload);
let user = epics_base_rs::runtime::env::get("USER")
.or_else(|| epics_base_rs::runtime::env::get("USERNAME"))
.unwrap_or_else(|| "unknown".to_string());
let user_payload = pad_string(&user);
let mut client = CaHeader::new(CA_PROTO_CLIENT_NAME);
client.postsize = user_payload.len() as u16;
handshake.extend_from_slice(&client.to_bytes());
handshake.extend_from_slice(&user_payload);
if writer.write_all(&handshake).await.is_err() {
tokio::time::sleep(backoff).await;
continue;
}
let resp_tx = response_tx.clone();
let read_task = epics_base_rs::runtime::task::spawn(async move {
let mut buf = vec![0u8; 8192];
let mut accumulated: Vec<u8> = Vec::new();
loop {
let n = match reader.read(&mut buf).await {
Ok(0) | Err(_) => break,
Ok(n) => n,
};
accumulated.extend_from_slice(&buf[..n]);
let mut consumed = 0usize;
loop {
if accumulated.len() - consumed < CaHeader::SIZE {
break;
}
let (hdr, hdr_size) =
match CaHeader::from_bytes_extended(&accumulated[consumed..]) {
Ok(v) => v,
Err(_) => break,
};
let msg_size = hdr_size + align8(hdr.actual_postsize());
if accumulated.len() - consumed < msg_size {
break;
}
consumed += msg_size;
}
if consumed > 0 {
let frame_bytes = accumulated[..consumed].to_vec();
let _ = resp_tx.send((frame_bytes, addr));
accumulated.drain(..consumed);
}
}
});
let mut writer_failed = false;
let mut shutdown = false;
'pump: loop {
tokio::select! {
msg = outgoing_rx.recv() => {
let Some(bytes) = msg else {
shutdown = true;
break 'pump;
};
if writer.write_all(&bytes).await.is_err() {
writer_failed = true;
break 'pump;
}
}
_ = epics_base_rs::runtime::task::sleep(Duration::from_secs(60)) => {
let echo = CaHeader::new(CA_PROTO_ECHO);
if writer.write_all(&echo.to_bytes()).await.is_err() {
writer_failed = true;
break 'pump;
}
}
}
if read_task.is_finished() {
break 'pump;
}
}
read_task.abort();
let _ = read_task.await;
if shutdown {
return;
}
if writer_failed {
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
}
}
fn handle_request_or_addr(
state: &mut SearchEngineState,
addr_list: &mut Vec<SocketAddr>,
req: SearchRequest,
) -> Option<u32> {
match req {
SearchRequest::AddAddress(addr) => {
if !addr_list.contains(&addr) {
addr_list.push(addr);
tracing::info!(?addr, "ca-rs: addr_list += (programmatic)");
}
None
}
SearchRequest::SetAddressList(list) => {
tracing::info!(count = list.len(), "ca-rs: addr_list replaced");
*addr_list = list;
None
}
other => handle_request(state, other),
}
}
fn handle_request(state: &mut SearchEngineState, req: SearchRequest) -> Option<u32> {
match req {
SearchRequest::Schedule {
cid,
pv_name,
reason,
} => {
if reason == SearchReason::BeaconAnomaly && state.pending.contains_key(&cid) {
if let Some(p) = state.pending.get_mut(&cid) {
p.attempt = 0;
p.holdoff_cycles = 0;
p.last_attempt = None;
}
state.fast_ticks_remaining = N_SEARCH_BUCKETS as u32;
return None;
}
let search_payload = build_search_payload(cid, &pv_name);
state.remove_channel(cid);
let bucket = match reason {
SearchReason::Initial | SearchReason::BeaconAnomaly => {
(state.current_bucket + 1) % N_SEARCH_BUCKETS
}
SearchReason::Reconnect => {
let offset = (cid as usize) % N_SEARCH_BUCKETS;
(state.current_bucket + 1 + offset) % N_SEARCH_BUCKETS
}
};
let p = PendingSearch {
cid,
pv_name,
search_payload,
bucket,
attempt: 0,
holdoff_cycles: 0,
last_attempt: None,
};
state.buckets[bucket].push(cid);
state.pending.insert(cid, p);
if reason == SearchReason::BeaconAnomaly {
state.poke();
}
match reason {
SearchReason::Initial => Some(cid),
SearchReason::Reconnect | SearchReason::BeaconAnomaly => None,
}
}
SearchRequest::Cancel { cid } => {
state.remove_channel(cid);
None
}
SearchRequest::ConnectResult {
cid,
success,
server_addr,
} => {
if success {
state.remove_channel(cid);
state.penalty.remove(&server_addr);
state.breakers.record_success(server_addr);
} else {
state.penalty.insert(
server_addr,
PenaltyEntry {
until: Instant::now() + PENALTY_DURATION,
},
);
let was_open = state.breakers.is_open(server_addr);
state.breakers.record_failure(server_addr);
if !was_open && state.breakers.is_open(server_addr) {
tracing::warn!(server = %server_addr, "circuit breaker tripped OPEN");
metrics::counter!("ca_client_circuit_breaker_open_total",
"server" => server_addr.to_string())
.increment(1);
}
}
None
}
SearchRequest::AddAddress(_) | SearchRequest::SetAddressList(_) => None,
}
}
fn handle_udp_response(
state: &mut SearchEngineState,
data: &[u8],
src: SocketAddr,
response_tx: &mpsc::UnboundedSender<SearchResponse>,
) {
if data.len() < CaHeader::SIZE {
return;
}
let recv_time = Instant::now();
let mut offset = 0;
while offset + CaHeader::SIZE <= data.len() {
let Ok(hdr) = CaHeader::from_bytes(&data[offset..]) else {
break;
};
match hdr.cmmd {
CA_PROTO_VERSION => {
if hdr.data_type & 0x8000 != 0 {
state.last_valid_seq = Some(hdr.cid);
} else {
state.last_valid_seq = Some(0);
}
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
CA_PROTO_SEARCH => {
let server_port = hdr.data_type;
let server_ip = if hdr.cid == 0 || hdr.cid == u32::MAX {
src.ip()
} else {
std::net::IpAddr::V4(Ipv4Addr::from(hdr.cid.to_be_bytes()))
};
metrics::counter!("ca_client_search_responses_total").increment(1);
let server_addr = SocketAddr::new(server_ip, server_port as u16);
let cid = hdr.available;
let penalized = state
.penalty
.get(&server_addr)
.map(|p| p.until > recv_time)
.unwrap_or(false);
let breaker_blocked = !state.breakers.allow(server_addr);
if penalized || breaker_blocked {
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
if state.last_valid_seq.is_none() {
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
continue;
}
if let Some(p) = state.pending.remove(&cid) {
state.buckets[p.bucket].retain(|x| *x != cid);
tracing::debug!(
pv = %p.pv_name, cid, server = %server_addr,
"PV search resolved"
);
let _ = response_tx.send(SearchResponse::Found { cid, server_addr });
}
}
CA_PROTO_NOT_FOUND => {
}
_ => {}
}
offset += CaHeader::SIZE + align8(hdr.postsize as usize);
}
}
async fn process_bucket(
state: &mut SearchEngineState,
addr_list: &[SocketAddr],
socket: &AsyncUdpV4,
nameserver_txs: &[mpsc::UnboundedSender<Vec<u8>>],
) {
let now = Instant::now();
state.penalty.retain(|_, entry| entry.until > now);
let bucket_idx = state.current_bucket;
let bucket_ids = std::mem::take(&mut state.buckets[bucket_idx]);
let mut to_send: Vec<u32> = Vec::new();
for sid in bucket_ids {
let Some(p) = state.pending.get_mut(&sid) else {
continue;
};
if p.holdoff_cycles > 0 {
p.holdoff_cycles -= 1;
let next = (state.current_bucket + 1) % N_SEARCH_BUCKETS;
p.bucket = next;
state.buckets[next].push(sid);
continue;
}
p.last_attempt = Some(now);
p.attempt = p.attempt.saturating_add(1);
if p.attempt > 1 {
p.holdoff_cycles = RETRY_HOLDOFF_CYCLES;
}
p.bucket = state.current_bucket;
state.buckets[state.current_bucket].push(sid);
to_send.push(sid);
}
state.current_bucket = (state.current_bucket + 1) % N_SEARCH_BUCKETS;
if to_send.is_empty() {
return;
}
fire_searches(state, &to_send, addr_list, socket, nameserver_txs).await;
}
async fn fire_searches(
state: &mut SearchEngineState,
cids: &[u32],
addr_list: &[SocketAddr],
socket: &AsyncUdpV4,
nameserver_txs: &[mpsc::UnboundedSender<Vec<u8>>],
) {
state.dgram_seq = state.dgram_seq.wrapping_add(1);
let version_hdr = {
let mut h = CaHeader::new(CA_PROTO_VERSION);
h.count = CA_MINOR_VERSION;
h.data_type = 0x8000;
h.cid = state.dgram_seq;
h.to_bytes()
};
let mut current_frame = Vec::with_capacity(MAX_UDP_SEND);
current_frame.extend_from_slice(&version_hdr);
for sid in cids {
let Some(p) = state.pending.get(sid) else {
continue;
};
let payload = p.search_payload.clone();
if current_frame.len() + payload.len() > MAX_UDP_SEND
&& current_frame.len() > CaHeader::SIZE
{
for addr in addr_list {
send_with_fanout(
socket,
¤t_frame,
*addr,
"bucket",
&mut state.send_errors,
)
.await;
}
for ns_tx in nameserver_txs {
let _ = ns_tx.send(current_frame.clone());
}
current_frame.clear();
current_frame.extend_from_slice(&version_hdr);
}
if CaHeader::SIZE + payload.len() > MAX_UDP_SEND {
let mut solo = Vec::with_capacity(CaHeader::SIZE + payload.len());
solo.extend_from_slice(&version_hdr);
solo.extend_from_slice(&payload);
for addr in addr_list {
send_with_fanout(socket, &solo, *addr, "solo", &mut state.send_errors).await;
}
for ns_tx in nameserver_txs {
let _ = ns_tx.send(solo.clone());
}
} else {
current_frame.extend_from_slice(&payload);
}
}
if current_frame.len() > CaHeader::SIZE {
for addr in addr_list {
send_with_fanout(
socket,
¤t_frame,
*addr,
"flush",
&mut state.send_errors,
)
.await;
}
for ns_tx in nameserver_txs {
let _ = ns_tx.send(current_frame.clone());
}
}
}
fn build_search_payload(cid: u32, pv_name: &str) -> Vec<u8> {
let pv_payload = pad_string(pv_name);
let mut search_hdr = CaHeader::new(CA_PROTO_SEARCH);
search_hdr.postsize = pv_payload.len() as u16;
search_hdr.data_type = CA_DO_REPLY;
search_hdr.count = CA_MINOR_VERSION;
search_hdr.cid = cid;
search_hdr.available = cid;
let mut payload = Vec::with_capacity(CaHeader::SIZE + pv_payload.len());
payload.extend_from_slice(&search_hdr.to_bytes());
payload.extend_from_slice(&pv_payload);
payload
}
#[cfg(test)]
mod tests {
use super::*;
fn schedule_initial(state: &mut SearchEngineState, cid: u32, pv_name: &str) {
handle_request(
state,
SearchRequest::Schedule {
cid,
pv_name: pv_name.to_string(),
reason: SearchReason::Initial,
},
);
}
#[test]
fn build_search_payload_size() {
let payload = build_search_payload(42, "TEST:PV");
assert_eq!(payload.len(), 24);
}
#[test]
fn build_search_payload_alignment() {
let payload = build_search_payload(1, "A");
assert_eq!(payload.len(), CaHeader::SIZE + 8);
assert_eq!(payload.len() % 8, 0);
}
#[test]
fn schedule_places_into_next_bucket() {
let mut state = SearchEngineState::new();
state.current_bucket = 5;
schedule_initial(&mut state, 1, "PV:1");
let p = state.pending.get(&1).unwrap();
assert_eq!(p.bucket, 6);
assert_eq!(state.buckets[6], vec![1]);
assert_eq!(state.buckets[5], Vec::<u32>::new());
}
#[test]
fn cancel_removes_from_bucket() {
let mut state = SearchEngineState::new();
schedule_initial(&mut state, 1, "PV:1");
let bucket = state.pending.get(&1).unwrap().bucket;
handle_request(&mut state, SearchRequest::Cancel { cid: 1 });
assert!(state.pending.is_empty());
assert!(state.buckets[bucket].is_empty());
}
#[test]
fn poke_resets_attempts_and_engages_fast_mode() {
let mut state = SearchEngineState::new();
schedule_initial(&mut state, 1, "PV:1");
if let Some(p) = state.pending.get_mut(&1) {
p.attempt = 3;
p.holdoff_cycles = 7;
}
state.poke();
let p = state.pending.get(&1).unwrap();
assert_eq!(p.attempt, 0);
assert_eq!(p.holdoff_cycles, 0);
assert_eq!(state.fast_ticks_remaining, N_SEARCH_BUCKETS as u32);
}
#[test]
fn beacon_anomaly_for_pending_channel_keeps_bucket() {
let mut state = SearchEngineState::new();
handle_request(
&mut state,
SearchRequest::Schedule {
cid: 7,
pv_name: "PV:7".into(),
reason: SearchReason::Reconnect,
},
);
let original_bucket = state.pending.get(&7).unwrap().bucket;
if let Some(p) = state.pending.get_mut(&7) {
p.attempt = 4;
p.holdoff_cycles = 8;
}
handle_request(
&mut state,
SearchRequest::Schedule {
cid: 7,
pv_name: "PV:7".into(),
reason: SearchReason::BeaconAnomaly,
},
);
let p = state.pending.get(&7).unwrap();
assert_eq!(p.bucket, original_bucket, "poke must not relocate bucket");
assert_eq!(p.attempt, 0);
assert_eq!(p.holdoff_cycles, 0);
assert_eq!(state.fast_ticks_remaining, N_SEARCH_BUCKETS as u32);
let count = state.buckets[original_bucket]
.iter()
.filter(|x| **x == 7)
.count();
assert_eq!(count, 1);
}
#[test]
fn beacon_anomaly_schedule_pokes_engine() {
let mut state = SearchEngineState::new();
schedule_initial(&mut state, 1, "PV:1");
if let Some(p) = state.pending.get_mut(&1) {
p.attempt = 2;
p.holdoff_cycles = 5;
}
handle_request(
&mut state,
SearchRequest::Schedule {
cid: 2,
pv_name: "PV:2".into(),
reason: SearchReason::BeaconAnomaly,
},
);
assert_eq!(state.pending.get(&1).unwrap().attempt, 0);
assert_eq!(state.pending.get(&2).unwrap().attempt, 0);
assert_eq!(state.fast_ticks_remaining, N_SEARCH_BUCKETS as u32);
}
#[test]
fn connect_success_clears_pending_and_penalty() {
let mut state = SearchEngineState::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
schedule_initial(&mut state, 1, "PV:1");
state.penalty.insert(
server,
PenaltyEntry {
until: Instant::now() + Duration::from_secs(60),
},
);
handle_request(
&mut state,
SearchRequest::ConnectResult {
cid: 1,
success: true,
server_addr: server,
},
);
assert!(state.pending.is_empty());
assert!(!state.penalty.contains_key(&server));
}
#[test]
fn connect_failure_inserts_penalty() {
let mut state = SearchEngineState::new();
let server: SocketAddr = "127.0.0.1:5064".parse().unwrap();
schedule_initial(&mut state, 1, "PV:1");
handle_request(
&mut state,
SearchRequest::ConnectResult {
cid: 1,
success: false,
server_addr: server,
},
);
assert!(state.pending.contains_key(&1));
assert!(state.penalty.contains_key(&server));
}
#[test]
fn n_search_buckets_is_30() {
let state = SearchEngineState::new();
assert_eq!(state.buckets.len(), N_SEARCH_BUCKETS);
assert_eq!(N_SEARCH_BUCKETS, 30);
}
#[test]
fn fast_tick_revolution_covers_full_ring() {
let revolution = FAST_TICK * N_SEARCH_BUCKETS as u32;
assert!(revolution >= Duration::from_secs(5));
assert!(revolution <= Duration::from_secs(7));
}
#[test]
fn reconnect_and_beacon_anomaly_skip_immediate_fire() {
let mut state = SearchEngineState::new();
let cid_initial = handle_request(
&mut state,
SearchRequest::Schedule {
cid: 100,
pv_name: "PV:Initial".into(),
reason: SearchReason::Initial,
},
);
assert_eq!(
cid_initial,
Some(100),
"Initial must return Some for immediate fire"
);
let cid_reconnect = handle_request(
&mut state,
SearchRequest::Schedule {
cid: 101,
pv_name: "PV:Reconnect".into(),
reason: SearchReason::Reconnect,
},
);
assert_eq!(cid_reconnect, None, "Reconnect must NOT immediately fire");
let cid_anomaly = handle_request(
&mut state,
SearchRequest::Schedule {
cid: 102,
pv_name: "PV:Anomaly".into(),
reason: SearchReason::BeaconAnomaly,
},
);
assert_eq!(
cid_anomaly, None,
"BeaconAnomaly NEW must NOT immediately fire"
);
}
#[test]
fn reconnect_bucket_spread() {
let mut state = SearchEngineState::new();
state.current_bucket = 0;
let mut hit = [false; N_SEARCH_BUCKETS];
for cid in 1_000u32..(1_000 + N_SEARCH_BUCKETS as u32) {
handle_request(
&mut state,
SearchRequest::Schedule {
cid,
pv_name: format!("PV:{cid}"),
reason: SearchReason::Reconnect,
},
);
hit[state.pending.get(&cid).unwrap().bucket] = true;
}
assert!(
hit.iter().all(|h| *h),
"Reconnect cids must hit every bucket in the ring"
);
}
}