use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicU32;
use std::time::SystemTime;
use epics_base_rs::runtime::sync::mpsc;
use tokio::sync::Notify;
use tokio::sync::mpsc::error::TrySendError;
use epics_base_rs::error::{CaError, CaResult};
use epics_base_rs::server::snapshot::Snapshot;
use epics_base_rs::types::{DbFieldType, EpicsValue, decode_dbr};
use super::types::{CircuitKey, TransportCommand};
pub(crate) enum ValueRoute {
Slotted,
TryChannel(Box<Snapshot>),
}
pub(crate) struct CoalesceSlot {
inner: StdMutex<CoalesceInner>,
notify: Notify,
}
struct CoalesceInner {
error: Option<CaError>,
ready: Option<Snapshot>,
gated: Option<Snapshot>,
paused: bool,
}
impl CoalesceSlot {
pub fn new() -> Arc<Self> {
Arc::new(Self {
inner: StdMutex::new(CoalesceInner {
error: None,
ready: None,
gated: None,
paused: false,
}),
notify: Notify::new(),
})
}
fn coalesce_value_locked(inner: &mut CoalesceInner, snapshot: Snapshot) {
if inner.paused {
inner.gated = Some(snapshot);
} else {
inner.ready = Some(snapshot);
}
}
pub fn route_value(&self, snapshot: Snapshot) -> ValueRoute {
let mut g = self.inner.lock().expect("CoalesceSlot mutex poisoned");
if !g.paused && g.error.is_none() && g.ready.is_none() {
return ValueRoute::TryChannel(Box::new(snapshot));
}
let paused = g.paused;
Self::coalesce_value_locked(&mut g, snapshot);
drop(g);
if !paused {
self.notify.notify_one();
}
ValueRoute::Slotted
}
fn put_value(&self, snapshot: Snapshot) {
let mut g = self.inner.lock().expect("CoalesceSlot mutex poisoned");
let paused = g.paused;
Self::coalesce_value_locked(&mut g, snapshot);
drop(g);
if !paused {
self.notify.notify_one();
}
}
fn put_error(&self, err: CaError) {
let mut g = self.inner.lock().expect("CoalesceSlot mutex poisoned");
g.error = Some(err);
drop(g);
self.notify.notify_one();
}
pub fn take_deliverable(&self) -> Option<CaResult<Snapshot>> {
let mut g = self.inner.lock().expect("CoalesceSlot mutex poisoned");
if let Some(err) = g.error.take() {
return Some(Err(err));
}
g.ready.take().map(Ok)
}
pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.notify.notified()
}
pub fn set_paused(&self, paused: bool) -> bool {
let mut g = self.inner.lock().expect("CoalesceSlot mutex poisoned");
let prev = g.paused;
g.paused = paused;
let resuming = prev && !paused;
if resuming && g.gated.is_some() {
g.ready = g.gated.take();
}
drop(g);
if resuming {
self.notify.notify_one();
}
prev
}
pub fn is_paused(&self) -> bool {
self.inner
.lock()
.expect("CoalesceSlot mutex poisoned")
.paused
}
fn clear(&self) {
let mut g = self.inner.lock().expect("CoalesceSlot mutex poisoned");
g.error = None;
g.ready = None;
g.gated = None;
}
#[cfg(test)]
fn take_raw(&self) -> Option<CaResult<Snapshot>> {
let mut g = self.inner.lock().expect("CoalesceSlot mutex poisoned");
if let Some(err) = g.error.take() {
return Some(Err(err));
}
if let Some(v) = g.ready.take() {
return Some(Ok(v));
}
g.gated.take().map(Ok)
}
}
pub(crate) enum MonitorDeliveryOutcome {
Queued(CircuitKey),
Slotted(CircuitKey),
Dropped(CircuitKey),
Filtered,
NotFound,
}
pub(crate) struct SubscriptionRecord {
pub subid: u32,
pub cid: u32,
pub data_type: Option<u16>,
pub count: Option<u32>,
pub req_count: Option<u32>,
pub type_user_supplied: bool,
pub enum_readback: super::EnumReadback,
pub float_as_string: bool,
pub mask: u16,
pub server_addr: SocketAddr,
pub priority: u8,
pub callback_tx: mpsc::Sender<CaResult<Snapshot>>,
pub coalesce_slot: Arc<CoalesceSlot>,
pub needs_restore: bool,
pub deadband: f64,
pub last_value: Option<f64>,
pub pending_deliveries: usize,
pub nreplace: u64,
}
pub(crate) struct SubscriptionRegistry {
subscriptions: HashMap<u32, SubscriptionRecord>,
next_subid: AtomicU32,
}
pub(crate) fn resolve_subscription_count(req_count: Option<u32>, element_count: u32) -> u32 {
match req_count {
Some(n) if n > 0 => n.min(element_count),
_ => 0,
}
}
impl SubscriptionRegistry {
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
next_subid: AtomicU32::new(1),
}
}
pub fn alloc_subid(&self) -> u32 {
crate::channel::alloc_nonzero_probe(&self.next_subid, |v| {
self.subscriptions.contains_key(&v)
})
}
#[cfg(test)]
pub fn seed_next_subid(&self, v: u32) {
self.next_subid
.store(v, std::sync::atomic::Ordering::Relaxed);
}
pub fn add(&mut self, rec: SubscriptionRecord) {
self.subscriptions.insert(rec.subid, rec);
}
pub fn remove(&mut self, subid: u32) -> Option<SubscriptionRecord> {
self.subscriptions.remove(&subid)
}
pub fn on_monitor_error(&mut self, subid: u32, eca_status: u32) -> MonitorDeliveryOutcome {
let Some(rec) = self.subscriptions.get_mut(&subid) else {
return MonitorDeliveryOutcome::NotFound;
};
let circuit = (rec.server_addr, rec.priority);
try_deliver_err(
rec,
epics_base_rs::error::CaError::ServerError(eca_status),
circuit,
)
}
pub fn on_monitor_data(
&mut self,
subid: u32,
data_type: u16,
count: u32,
data: &[u8],
) -> MonitorDeliveryOutcome {
let Some(rec) = self.subscriptions.get_mut(&subid) else {
return MonitorDeliveryOutcome::NotFound;
};
let circuit = (rec.server_addr, rec.priority);
let snapshot = if data_type <= 6 {
let dbr_type = match DbFieldType::from_u16(data_type) {
Ok(t) => t,
Err(e) => {
return try_deliver_err(rec, e, circuit);
}
};
match EpicsValue::from_bytes_array(dbr_type, data, count as usize) {
Ok(value) => Snapshot::new(value, 0, 0, SystemTime::now()),
Err(e) => {
return try_deliver_err(rec, e, circuit);
}
}
} else {
match decode_dbr(data_type, data, count as usize) {
Ok(s) => s,
Err(e) => {
return try_deliver_err(rec, e, circuit);
}
}
};
if rec.deadband > 0.0 {
if let Some(new_val) = snapshot.value.to_f64() {
if let Some(old_val) = rec.last_value {
if (new_val - old_val).abs() < rec.deadband {
return MonitorDeliveryOutcome::Filtered;
}
}
rec.last_value = Some(new_val);
}
}
match rec.coalesce_slot.route_value(snapshot) {
ValueRoute::Slotted => {
rec.nreplace = rec.nreplace.saturating_add(1);
MonitorDeliveryOutcome::Slotted(circuit)
}
ValueRoute::TryChannel(snapshot) => match rec.callback_tx.try_send(Ok(*snapshot)) {
Ok(()) => {
rec.pending_deliveries += 1;
MonitorDeliveryOutcome::Queued(circuit)
}
Err(TrySendError::Full(rejected)) => {
rec.nreplace = rec.nreplace.saturating_add(1);
let snap = rejected.expect("route_value only boxes Ok values");
rec.coalesce_slot.put_value(snap);
MonitorDeliveryOutcome::Slotted(circuit)
}
Err(TrySendError::Closed(_)) => MonitorDeliveryOutcome::Dropped(circuit),
},
}
}
pub fn mark_disconnected(&mut self, cids: &[u32]) -> HashMap<CircuitKey, usize> {
const ECA_DISCONN: u32 = 192; let mut cleared = HashMap::new();
for rec in self.subscriptions.values_mut() {
if cids.contains(&rec.cid) {
rec.needs_restore = true;
let old_pending = rec.pending_deliveries;
rec.pending_deliveries = 0;
rec.coalesce_slot.clear();
rec.coalesce_slot
.put_error(CaError::ServerError(ECA_DISCONN));
if old_pending > 0 {
*cleared.entry((rec.server_addr, rec.priority)).or_insert(0) += old_pending;
}
}
}
cleared
}
#[allow(clippy::too_many_arguments)]
pub fn restore_for_channel(
&mut self,
cid: u32,
new_sid: u32,
native_type: u16,
element_count: u32,
native_changed: bool,
server_addr: std::net::SocketAddr,
transport_tx: &mpsc::UnboundedSender<TransportCommand>,
) -> (u32, u32) {
let mut restored = 0u32;
let mut failed = 0u32;
let stale: Vec<u32> = self
.subscriptions
.values()
.filter(|rec| rec.cid == cid && rec.needs_restore && rec.callback_tx.is_closed())
.map(|rec| rec.subid)
.collect();
for subid in &stale {
self.subscriptions.remove(subid);
failed += 1;
}
for rec in self.subscriptions.values_mut() {
if rec.cid == cid && rec.needs_restore {
rec.needs_restore = false;
rec.server_addr = server_addr;
if native_changed && !rec.type_user_supplied {
rec.data_type = None;
rec.count = None;
}
let derived = DbFieldType::from_u16(native_type)
.ok()
.map(|t| {
super::subscription_readback_dbr(t, rec.enum_readback, rec.float_as_string)
})
.unwrap_or(native_type + 14);
let data_type = *rec.data_type.get_or_insert(derived);
let count = *rec
.count
.get_or_insert(resolve_subscription_count(rec.req_count, element_count));
let _ = transport_tx.send(TransportCommand::Subscribe {
sid: new_sid,
data_type,
count,
subid: rec.subid,
mask: rec.mask,
server_addr,
priority: rec.priority,
});
restored += 1;
}
}
(restored, failed)
}
#[allow(dead_code)]
pub fn count(&self) -> usize {
self.subscriptions.len()
}
#[allow(dead_code)]
pub fn cleanup_closed(&mut self) -> Vec<u32> {
let closed: Vec<u32> = self
.subscriptions
.iter()
.filter(|(_, rec)| rec.callback_tx.is_closed())
.map(|(&subid, _)| subid)
.collect();
for subid in &closed {
self.subscriptions.remove(subid);
}
closed
}
pub fn get(&self, subid: u32) -> Option<&SubscriptionRecord> {
self.subscriptions.get(&subid)
}
pub fn mark_consumed(&mut self, subid: u32) -> Option<CircuitKey> {
let rec = self.subscriptions.get_mut(&subid)?;
if rec.pending_deliveries == 0 {
return None;
}
rec.pending_deliveries -= 1;
Some((rec.server_addr, rec.priority))
}
pub fn for_cid(&self, cid: u32) -> Vec<u32> {
self.subscriptions
.iter()
.filter(|(_, rec)| rec.cid == cid)
.map(|(&subid, _)| subid)
.collect()
}
}
fn try_deliver_err(
rec: &mut SubscriptionRecord,
err: CaError,
circuit: CircuitKey,
) -> MonitorDeliveryOutcome {
match rec.callback_tx.try_send(Err(err)) {
Ok(()) => {
rec.pending_deliveries += 1;
MonitorDeliveryOutcome::Queued(circuit)
}
Err(TrySendError::Full(rejected)) => {
let e = match rejected {
Err(e) => e,
Ok(_) => unreachable!("we just sent an Err"),
};
rec.coalesce_slot.put_error(e);
MonitorDeliveryOutcome::Slotted(circuit)
}
Err(TrySendError::Closed(_)) => MonitorDeliveryOutcome::Dropped(circuit),
}
}
#[cfg(test)]
mod tests {
use super::*;
fn addr() -> SocketAddr {
"127.0.0.1:5064".parse().unwrap()
}
fn record(
subid: u32,
cid: u32,
type_user_supplied: bool,
) -> (SubscriptionRecord, mpsc::Receiver<CaResult<Snapshot>>) {
let (callback_tx, rx) = mpsc::channel(8);
let rec = SubscriptionRecord {
subid,
cid,
data_type: None,
count: None,
req_count: None,
type_user_supplied,
enum_readback: crate::client::EnumReadback::Native,
float_as_string: false,
mask: 1,
server_addr: addr(),
priority: 0,
callback_tx,
coalesce_slot: CoalesceSlot::new(),
needs_restore: true,
deadband: 0.0,
last_value: None,
pending_deliveries: 0,
nreplace: 0,
};
(rec, rx)
}
fn drained_type(rx: &mut mpsc::UnboundedReceiver<TransportCommand>) -> (u16, u32) {
match rx.try_recv() {
Ok(TransportCommand::Subscribe {
data_type, count, ..
}) => (data_type, count),
Ok(_) => panic!("expected Subscribe command, got a different TransportCommand"),
Err(e) => panic!("expected Subscribe command, channel error: {e:?}"),
}
}
#[test]
fn alloc_subid_skips_live_subscription_on_wrap() {
let mut reg = SubscriptionRegistry::new();
let (rec, _cb_rx) = record(1, 100, false);
reg.add(rec);
reg.seed_next_subid(1);
let next = reg.alloc_subid();
assert_ne!(
next, 1,
"must not reissue a subid held by a live subscription"
);
assert!(reg.get(next).is_none(), "allocated subid must be free");
}
#[test]
fn alloc_subid_is_distinct_and_nonzero() {
let reg = SubscriptionRegistry::new();
let a = reg.alloc_subid();
let b = reg.alloc_subid();
assert_ne!(a, b);
assert_ne!(a, 0);
}
#[test]
fn auto_derived_type_resets_on_native_change() {
let mut reg = SubscriptionRegistry::new();
let (rec, _cb_rx) = record(1, 100, false);
reg.add(rec);
let (tx, mut rx) = mpsc::unbounded_channel();
let (restored, failed) = reg.restore_for_channel(100, 7, 1, 1, false, addr(), &tx);
assert_eq!((restored, failed), (1, 0));
assert_eq!(drained_type(&mut rx), (15, 0));
reg.subscriptions.get_mut(&1).unwrap().needs_restore = true;
let (restored, failed) = reg.restore_for_channel(100, 8, 6, 3, true, addr(), &tx);
assert_eq!((restored, failed), (1, 0));
assert_eq!(drained_type(&mut rx), (20, 0));
}
#[test]
fn user_supplied_type_preserved_on_native_change() {
let mut reg = SubscriptionRegistry::new();
let (mut rec, _cb_rx) = record(1, 100, true);
rec.data_type = Some(19); rec.count = Some(2);
reg.add(rec);
let (tx, mut rx) = mpsc::unbounded_channel();
let (restored, _) = reg.restore_for_channel(100, 8, 6, 5, true, addr(), &tx);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx), (19, 2));
}
#[test]
fn auto_derived_count_clamps_req_count_to_native() {
let mut reg = SubscriptionRegistry::new();
let (mut rec, _cb_rx) = record(1, 100, false);
rec.req_count = Some(2);
reg.add(rec);
let (tx, mut rx) = mpsc::unbounded_channel();
let (restored, _) = reg.restore_for_channel(100, 7, 6, 5, false, addr(), &tx);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx).1, 2, "cap below native ⇒ cap");
let mut reg2 = SubscriptionRegistry::new();
let (mut rec2, _cb_rx2) = record(2, 200, false);
rec2.req_count = Some(4);
reg2.add(rec2);
let (tx2, mut rx2) = mpsc::unbounded_channel();
let (restored, _) = reg2.restore_for_channel(200, 9, 6, 1, false, addr(), &tx2);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx2).1, 1, "cap above native ⇒ native");
}
#[test]
fn auto_derived_count_reclamps_on_native_change() {
let mut reg = SubscriptionRegistry::new();
let (mut rec, _cb_rx) = record(1, 100, false);
rec.req_count = Some(4);
reg.add(rec);
let (tx, mut rx) = mpsc::unbounded_channel();
let (restored, _) = reg.restore_for_channel(100, 7, 1, 5, false, addr(), &tx);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx).1, 4, "cap below native ⇒ cap");
reg.subscriptions.get_mut(&1).unwrap().needs_restore = true;
let (restored, _) = reg.restore_for_channel(100, 8, 6, 2, true, addr(), &tx);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx).1, 2, "native change re-clamps cap");
}
#[test]
fn resolve_subscription_count_autosizes_no_cap_and_clamps_positive() {
assert_eq!(resolve_subscription_count(None, 7), 0, "no cap ⇒ autosize");
assert_eq!(
resolve_subscription_count(Some(0), 7),
0,
"explicit 0 ⇒ autosize (C reqElems==0)"
);
assert_eq!(
resolve_subscription_count(Some(3), 7),
3,
"cap below native ⇒ cap"
);
assert_eq!(
resolve_subscription_count(Some(9), 7),
7,
"cap above native ⇒ native (concrete, not autosize)"
);
}
#[test]
fn auto_derived_count_without_cap_requests_autosize_zero() {
let mut reg = SubscriptionRegistry::new();
let (rec, _cb_rx) = record(1, 100, false);
assert_eq!(rec.req_count, None);
reg.add(rec);
let (tx, mut rx) = mpsc::unbounded_channel();
let (restored, _) = reg.restore_for_channel(100, 7, 6, 4, false, addr(), &tx);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx).1, 0, "no cap ⇒ autosize (wire 0)");
}
fn slotted_record(
coalesce_slot: Arc<CoalesceSlot>,
callback_tx: mpsc::Sender<CaResult<Snapshot>>,
) -> SubscriptionRecord {
const DBR_TIME_LONG: u16 = 19;
SubscriptionRecord {
subid: 1,
cid: 100,
data_type: Some(DBR_TIME_LONG),
count: Some(1),
req_count: None,
type_user_supplied: true,
enum_readback: crate::client::EnumReadback::Native,
float_as_string: false,
mask: 1,
server_addr: addr(),
priority: 0,
callback_tx,
coalesce_slot,
needs_restore: false,
deadband: 0.0,
last_value: None,
pending_deliveries: 0,
nreplace: 0,
}
}
fn long_snap(v: i32) -> Snapshot {
Snapshot::new(EpicsValue::Long(v), 0, 0, SystemTime::now())
}
fn post_long(reg: &mut SubscriptionRegistry, v: i32) -> MonitorDeliveryOutcome {
const DBR_TIME_LONG: u16 = 19;
let bytes = epics_base_rs::types::encode_dbr(DBR_TIME_LONG, &long_snap(v)).expect("encode");
reg.on_monitor_data(1, DBR_TIME_LONG, 1, &bytes)
}
#[test]
fn coalesce_on_overflow_preserves_latest_dmov_transition() {
let mut reg = SubscriptionRegistry::new();
let (callback_tx, mut rx) = mpsc::channel::<CaResult<Snapshot>>(2);
let coalesce_slot = CoalesceSlot::new();
reg.add(slotted_record(coalesce_slot.clone(), callback_tx));
for (i, v) in [1_i32, 2, 3, 4, 0].iter().enumerate() {
let outcome = post_long(&mut reg, *v);
match (i, &outcome) {
(0..=1, MonitorDeliveryOutcome::Queued(_)) => {}
(2..=4, MonitorDeliveryOutcome::Slotted(_)) => {}
_ => panic!("unexpected outcome at i={i}"),
}
}
assert_eq!(reg.get(1).expect("rec").pending_deliveries, 2);
assert_eq!(reg.get(1).expect("rec").nreplace, 3);
assert_eq!(
rx.try_recv().expect("first").expect("Ok").value,
EpicsValue::Long(1)
);
assert_eq!(
rx.try_recv().expect("second").expect("Ok").value,
EpicsValue::Long(2)
);
assert!(rx.try_recv().is_err(), "bounded channel drained");
let last = coalesce_slot
.take_raw()
.expect("slot non-empty")
.expect("Ok");
assert_eq!(
last.value,
EpicsValue::Long(0),
"the terminal DMOV 1→0 transition must survive overflow",
);
assert!(coalesce_slot.take_raw().is_none(), "slot is single-entry");
}
#[test]
fn coalesce_preserves_order_under_partial_drain() {
let mut reg = SubscriptionRegistry::new();
let (callback_tx, mut rx) = mpsc::channel::<CaResult<Snapshot>>(2);
let coalesce_slot = CoalesceSlot::new();
reg.add(slotted_record(coalesce_slot.clone(), callback_tx));
assert!(matches!(
post_long(&mut reg, 1),
MonitorDeliveryOutcome::Queued(_)
));
assert!(matches!(
post_long(&mut reg, 2),
MonitorDeliveryOutcome::Queued(_)
));
assert!(matches!(
post_long(&mut reg, 3),
MonitorDeliveryOutcome::Slotted(_)
));
assert_eq!(
rx.try_recv().expect("v1").expect("Ok").value,
EpicsValue::Long(1)
);
assert!(
matches!(post_long(&mut reg, 4), MonitorDeliveryOutcome::Slotted(_)),
"slot-occupied invariant violated — value leaked into channel"
);
assert_eq!(
rx.try_recv().expect("v2").expect("Ok").value,
EpicsValue::Long(2)
);
assert!(rx.try_recv().is_err(), "channel drained");
let v_slot = coalesce_slot.take_raw().expect("slot").expect("Ok");
assert_eq!(v_slot.value, EpicsValue::Long(4), "slot holds latest (4)");
assert_eq!(reg.get(1).expect("rec").nreplace, 2);
}
#[test]
fn disconnect_error_coalesces_when_channel_full() {
let mut reg = SubscriptionRegistry::new();
let (callback_tx, mut rx) = mpsc::channel::<CaResult<Snapshot>>(2);
let coalesce_slot = CoalesceSlot::new();
reg.add(slotted_record(coalesce_slot.clone(), callback_tx));
for v in [1, 2, 3] {
post_long(&mut reg, v);
}
assert_eq!(
reg.get(1).expect("rec").pending_deliveries,
2,
"channel only (I1)"
);
let cleared = reg.mark_disconnected(&[100]);
assert_eq!(*cleared.get(&(addr(), 0)).expect("circuit key"), 2);
assert_eq!(
reg.get(1).expect("rec").pending_deliveries,
0,
"DISCONN parked in the error slot (out of flow control)",
);
let _v1 = rx.try_recv().expect("v1");
let _v2 = rx.try_recv().expect("v2");
assert!(rx.try_recv().is_err(), "channel drained");
match coalesce_slot.take_raw().expect("error slot has DISCONN") {
Err(epics_base_rs::error::CaError::ServerError(code)) => {
assert_eq!(code, 192, "ECA_DISCONN");
}
other => panic!("expected ECA_DISCONN, got {other:?}"),
}
}
#[test]
fn paused_value_held_and_gated() {
let slot = CoalesceSlot::new();
slot.set_paused(true);
assert!(
matches!(slot.route_value(long_snap(7)), ValueRoute::Slotted),
"paused value must be held in slot, not routed to channel"
);
assert!(
slot.take_deliverable().is_none(),
"recv-side gate must withhold a value held during pause"
);
assert!(slot.set_paused(false), "was paused");
let released = slot.take_deliverable().expect("released after resume");
assert_eq!(released.expect("Ok").value, EpicsValue::Long(7));
}
#[test]
fn paused_value_does_not_clobber_pending_error() {
let slot = CoalesceSlot::new();
slot.put_error(CaError::ServerError(192)); slot.set_paused(true);
assert!(matches!(
slot.route_value(long_snap(5)),
ValueRoute::Slotted
));
match slot.take_deliverable().expect("error bypasses pause") {
Err(CaError::ServerError(192)) => {}
other => panic!("expected ECA_DISCONN first, got {other:?}"),
}
assert!(
slot.take_deliverable().is_none(),
"held value remains gated after the error drains"
);
slot.set_paused(false);
assert_eq!(
slot.take_deliverable()
.expect("value after resume")
.expect("Ok")
.value,
EpicsValue::Long(5),
"the held value survived the error and resumes intact"
);
}
#[test]
fn prepause_overflow_value_deliverable_while_paused() {
let slot = CoalesceSlot::new();
slot.put_value(long_snap(11));
slot.set_paused(true);
let v = slot
.take_deliverable()
.expect("pre-pause overflow value deliverable while paused");
assert_eq!(v.expect("Ok").value, EpicsValue::Long(11));
assert!(matches!(
slot.route_value(long_snap(22)),
ValueRoute::Slotted
));
assert!(
slot.take_deliverable().is_none(),
"during-pause value is withheld until resume"
);
}
#[test]
fn prepause_ready_not_clobbered_by_concurrent_during_pause_value() {
let slot = CoalesceSlot::new();
slot.put_value(long_snap(11)); slot.set_paused(true);
assert!(matches!(
slot.route_value(long_snap(22)),
ValueRoute::Slotted
));
let v = slot
.take_deliverable()
.expect("pre-pause value still deliverable");
assert_eq!(
v.expect("Ok").value,
EpicsValue::Long(11),
"during-pause 22 must not clobber pre-pause 11"
);
assert!(slot.take_deliverable().is_none(), "22 gated while paused");
slot.set_paused(false);
assert_eq!(
slot.take_deliverable()
.expect("22 after resume")
.expect("Ok")
.value,
EpicsValue::Long(22),
);
}
#[test]
fn mark_disconnected_old_pending_zero_yields_no_delta() {
let mut reg = SubscriptionRegistry::new();
let (callback_tx, mut rx) = mpsc::channel::<CaResult<Snapshot>>(4);
let coalesce_slot = CoalesceSlot::new();
reg.add(slotted_record(coalesce_slot.clone(), callback_tx));
assert_eq!(reg.get(1).expect("rec").pending_deliveries, 0);
let cleared = reg.mark_disconnected(&[100]);
assert!(
cleared.is_empty(),
"no channel items → empty flow-control delta"
);
assert_eq!(
reg.get(1).expect("rec").pending_deliveries,
0,
"DISCONN parks in the error cell; pending never bumped",
);
assert!(rx.try_recv().is_err(), "DISCONN did NOT go to the channel");
match coalesce_slot.take_raw().expect("DISCONN in error cell") {
Err(CaError::ServerError(192)) => {}
other => panic!("expected ECA_DISCONN, got {other:?}"),
}
}
#[test]
fn pending_error_stays_ahead_of_later_value() {
let mut reg = SubscriptionRegistry::new();
let (callback_tx, mut rx) = mpsc::channel::<CaResult<Snapshot>>(2);
let coalesce_slot = CoalesceSlot::new();
reg.add(slotted_record(coalesce_slot.clone(), callback_tx));
post_long(&mut reg, 1);
post_long(&mut reg, 2);
assert!(matches!(
reg.on_monitor_error(1, 192),
MonitorDeliveryOutcome::Slotted(_)
));
assert_eq!(
rx.try_recv().expect("ch1").expect("Ok").value,
EpicsValue::Long(1)
);
assert!(matches!(
post_long(&mut reg, 3),
MonitorDeliveryOutcome::Slotted(_)
));
assert_eq!(
rx.try_recv().expect("ch2").expect("Ok").value,
EpicsValue::Long(2)
);
assert!(rx.try_recv().is_err(), "channel drained");
match coalesce_slot.take_deliverable().expect("error first") {
Err(CaError::ServerError(192)) => {}
other => panic!("expected error ahead of value, got {other:?}"),
}
assert_eq!(
coalesce_slot
.take_deliverable()
.expect("value after error")
.expect("Ok")
.value,
EpicsValue::Long(3),
);
}
#[test]
fn resume_coalesces_undrained_ready_to_latest() {
let slot = CoalesceSlot::new();
slot.put_value(long_snap(11)); slot.set_paused(true);
assert!(matches!(
slot.route_value(long_snap(22)),
ValueRoute::Slotted
)); slot.set_paused(false); assert_eq!(
slot.take_deliverable().expect("22").expect("Ok").value,
EpicsValue::Long(22),
"latest (22) survives; undrained 11 coalesced away",
);
assert!(
slot.take_deliverable().is_none(),
"single deliverable value"
);
}
#[test]
fn post_resume_value_coalesces_into_ready() {
let slot = CoalesceSlot::new();
slot.put_value(long_snap(11)); slot.set_paused(true);
slot.route_value(long_snap(22)); slot.set_paused(false); assert!(matches!(
slot.route_value(long_snap(33)),
ValueRoute::Slotted
)); assert_eq!(
slot.take_deliverable().expect("33").expect("Ok").value,
EpicsValue::Long(33),
"uniform coalesce to latest",
);
assert!(slot.take_deliverable().is_none());
}
#[test]
fn held_backlog_stays_deliverable_across_second_pause() {
let slot = CoalesceSlot::new();
slot.set_paused(true); slot.route_value(long_snap(1)); slot.set_paused(false); slot.set_paused(true); let v = slot
.take_deliverable()
.expect("post-resume backlog must survive a new pause (I3)");
assert_eq!(v.expect("Ok").value, EpicsValue::Long(1));
}
#[test]
fn repeated_pause_cycles_coalesce_to_latest() {
let slot = CoalesceSlot::new();
slot.put_value(long_snap(11)); slot.set_paused(true); slot.route_value(long_snap(22)); slot.set_paused(false); slot.set_paused(true); let v = slot.take_deliverable().expect("latest backlog deliverable");
assert_eq!(v.expect("Ok").value, EpicsValue::Long(22));
assert!(
slot.take_deliverable().is_none(),
"only the latest survived the cycle"
);
}
#[test]
fn paused_error_bypasses_gate() {
let slot = CoalesceSlot::new();
slot.set_paused(true);
slot.put_error(CaError::ServerError(192)); let got = slot.take_deliverable().expect("error must bypass pause");
assert!(
matches!(got, Err(CaError::ServerError(192))),
"ECA_DISCONN delivered while paused"
);
}
#[test]
fn route_value_not_paused_channel_then_replace() {
let slot = CoalesceSlot::new();
assert!(
matches!(slot.route_value(long_snap(1)), ValueRoute::TryChannel(_)),
"empty slot, not paused → caller tries the channel"
);
slot.put_value(long_snap(2));
assert!(
matches!(slot.route_value(long_snap(3)), ValueRoute::Slotted),
"occupied slot, not paused → replace in place (no channel jump-ahead)"
);
assert_eq!(
slot.take_raw().expect("slot").expect("Ok").value,
EpicsValue::Long(3),
"latest wins"
);
}
#[test]
fn auto_derived_type_stable_without_native_change() {
let mut reg = SubscriptionRegistry::new();
let (rec, _cb_rx) = record(1, 100, false);
reg.add(rec);
let (tx, mut rx) = mpsc::unbounded_channel();
let (restored, _) = reg.restore_for_channel(100, 7, 1, 1, false, addr(), &tx);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx), (15, 0));
reg.subscriptions.get_mut(&1).unwrap().needs_restore = true;
let (restored, _) = reg.restore_for_channel(100, 8, 6, 3, false, addr(), &tx);
assert_eq!(restored, 1);
assert_eq!(drained_type(&mut rx), (15, 0));
}
}