use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use tokio::sync::{Mutex, Notify, broadcast};
use epics_pva_rs::client::PvaClient;
use epics_pva_rs::client_native::ops_v2::Pauser;
use epics_pva_rs::pvdata::{FieldDesc, PvField};
use super::error::{GwError, GwResult};
pub const BROADCAST_CAPACITY: usize = 16;
pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::from_secs(30);
pub const DEFAULT_MAX_ENTRIES: usize = 50_000;
const NEG_CACHE_MAX: usize = 1024;
const NEG_CACHE_TTL: Duration = Duration::from_secs(30);
pub struct UpstreamEntry {
pub pv_name: String,
state: Arc<RwLock<EntryState>>,
tx: broadcast::Sender<PvField>,
tx_raw: broadcast::Sender<crate::pva_gateway::source::RawEvent>,
first_event: Arc<Notify>,
_monitor_task: AbortOnDrop,
drop_poke: parking_lot::Mutex<bool>,
pauser: Arc<parking_lot::Mutex<Option<Pauser>>>,
}
#[derive(Default)]
struct EntryState {
latest: Option<PvField>,
introspection: Option<FieldDesc>,
}
#[derive(Debug, Default, Clone)]
struct MonitorEventOutcome {
was_first: bool,
type_changed: bool,
value: Option<PvField>,
}
fn apply_monitor_event(
state: &RwLock<EntryState>,
desc: &FieldDesc,
body: &[u8],
order: epics_pva_rs::proto::ByteOrder,
) -> MonitorEventOutcome {
let decoded = (|| -> Option<(epics_pva_rs::proto::BitSet, PvField)> {
let mut cur = std::io::Cursor::new(body);
let changed = epics_pva_rs::proto::BitSet::decode(&mut cur, order).ok()?;
let v = epics_pva_rs::pvdata::encode::decode_pv_field_with_bitset(
desc, &changed, 0, &mut cur, order,
)
.ok()?;
Some((changed, v))
})();
let Some((changed, v)) = decoded else {
return MonitorEventOutcome::default();
};
let mut s = state.write();
let was_first = s.introspection.is_none();
let type_changed = s
.introspection
.as_ref()
.is_some_and(|existing| existing != desc);
s.introspection = Some(desc.clone());
match s.latest.take() {
Some(prior) if !type_changed => {
s.latest = Some(epics_pva_rs::pvdata::encode::fill_unmarked_from_prior(
desc, &changed, 0, v, &prior,
));
}
_ => s.latest = Some(v),
}
let value = s.latest.clone();
MonitorEventOutcome {
was_first,
type_changed,
value,
}
}
impl UpstreamEntry {
pub fn snapshot(&self) -> Option<PvField> {
self.state.read().latest.clone()
}
pub fn introspection(&self) -> Option<FieldDesc> {
self.state.read().introspection.clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<PvField> {
self.poke();
self.tx.subscribe()
}
pub fn subscribe_raw(&self) -> broadcast::Receiver<crate::pva_gateway::source::RawEvent> {
self.poke();
self.tx_raw.subscribe()
}
pub fn subscriber_count(&self) -> usize {
self.tx.receiver_count() + self.tx_raw.receiver_count()
}
fn poke(&self) {
*self.drop_poke.lock() = true;
}
pub fn pauser_snapshot(&self) -> Option<Pauser> {
self.pauser.lock().clone()
}
}
struct AbortOnDrop(tokio::task::AbortHandle);
impl Drop for AbortOnDrop {
fn drop(&mut self) {
self.0.abort();
}
}
pub struct ChannelCache {
client: Arc<PvaClient>,
entries: Arc<Mutex<HashMap<String, Arc<UpstreamEntry>>>>,
cleanup_task: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
max_entries: usize,
negative_cache: parking_lot::Mutex<VecDeque<(String, Instant)>>,
}
impl ChannelCache {
pub fn new(client: Arc<PvaClient>, cleanup_interval: Duration) -> Arc<Self> {
Self::with_max_entries(client, cleanup_interval, DEFAULT_MAX_ENTRIES)
}
pub fn with_max_entries(
client: Arc<PvaClient>,
cleanup_interval: Duration,
max_entries: usize,
) -> Arc<Self> {
let cache = Arc::new(Self {
client,
entries: Arc::new(Mutex::new(HashMap::new())),
cleanup_task: parking_lot::Mutex::new(None),
max_entries,
negative_cache: parking_lot::Mutex::new(VecDeque::with_capacity(NEG_CACHE_MAX)),
});
let weak = Arc::downgrade(&cache);
let task = tokio::spawn(async move {
let mut tick = tokio::time::interval(cleanup_interval);
tick.tick().await; loop {
tick.tick().await;
let Some(c) = weak.upgrade() else { break };
c.cleanup_tick().await;
}
});
*cache.cleanup_task.lock() = Some(task);
cache
}
fn is_recently_failed(&self, name: &str) -> bool {
let now = Instant::now();
let mut neg = self.negative_cache.lock();
while let Some((_, t)) = neg.front() {
if now.duration_since(*t) >= NEG_CACHE_TTL {
neg.pop_front();
} else {
break;
}
}
neg.iter().any(|(n, _)| n == name)
}
fn record_failure(&self, name: &str) {
let mut neg = self.negative_cache.lock();
if neg.iter().any(|(n, _)| n == name) {
return; }
if neg.len() >= NEG_CACHE_MAX {
neg.pop_front();
}
neg.push_back((name.to_string(), Instant::now()));
}
pub fn client(&self) -> &Arc<PvaClient> {
&self.client
}
pub async fn peek(&self, pv_name: &str) -> Option<Arc<UpstreamEntry>> {
let map = self.entries.lock().await;
let existing = map.get(pv_name).cloned();
if let Some(ref e) = existing {
e.poke();
}
existing
}
pub async fn lookup(
&self,
pv_name: &str,
connect_timeout: Duration,
) -> GwResult<Arc<UpstreamEntry>> {
if self.is_recently_failed(pv_name) {
return Err(GwError::UpstreamTimeout(pv_name.to_string()));
}
let (entry, was_fresh) = {
let mut map = self.entries.lock().await;
if let Some(existing) = map.get(pv_name) {
existing.poke();
(existing.clone(), false)
} else {
if map.len() >= self.max_entries {
map.retain(|_, e| Arc::strong_count(e) > 1);
}
if map.len() >= self.max_entries {
tracing::warn!(
pv = %pv_name,
len = map.len(),
cap = self.max_entries,
"pva-gateway: channel cache full, refusing new entry"
);
return Err(GwError::CacheFull(self.max_entries));
}
let fresh = self.spawn_upstream_monitor(pv_name);
map.insert(pv_name.to_string(), fresh.clone());
(fresh, true)
}
};
struct CleanupGuard<'a> {
cache: &'a ChannelCache,
pv_name: &'a str,
armed: bool,
}
impl<'a> CleanupGuard<'a> {
fn disarm(&mut self) {
self.armed = false;
}
}
impl<'a> Drop for CleanupGuard<'a> {
fn drop(&mut self) {
if !self.armed {
return;
}
self.cache.record_failure(self.pv_name);
if let Ok(mut map) = self.cache.entries.try_lock() {
map.remove(self.pv_name);
return;
}
let entries = self.cache.entries.clone();
let pv_name = self.pv_name.to_string();
tokio::spawn(async move {
entries.lock().await.remove(&pv_name);
});
}
}
let mut guard = CleanupGuard {
cache: self,
pv_name,
armed: was_fresh,
};
match self.await_first_event(entry, connect_timeout).await {
Ok(e) => {
guard.disarm();
Ok(e)
}
Err(e) => {
self.record_failure(pv_name);
Err(e)
}
}
}
fn spawn_upstream_monitor(&self, pv_name: &str) -> Arc<UpstreamEntry> {
let (tx, _rx0) = broadcast::channel::<PvField>(BROADCAST_CAPACITY);
let (tx_raw, _rx0_raw) =
broadcast::channel::<crate::pva_gateway::source::RawEvent>(BROADCAST_CAPACITY);
let first_event = Arc::new(Notify::new());
let state = Arc::new(RwLock::new(EntryState::default()));
let pauser_slot: Arc<parking_lot::Mutex<Option<Pauser>>> =
Arc::new(parking_lot::Mutex::new(None));
let pv_name_owned = pv_name.to_string();
let client = self.client.clone();
let tx_for_task = tx.clone();
let tx_raw_for_task = tx_raw.clone();
let state_for_task = state.clone();
let first_event_for_task = first_event.clone();
let pauser_slot_for_task = pauser_slot.clone();
let join = tokio::spawn(async move {
let mut backoff = Duration::from_millis(250);
let max_backoff = Duration::from_secs(30);
loop {
let tx_inner = tx_for_task.clone();
let state_inner = state_for_task.clone();
let first_event_inner = first_event_for_task.clone();
let _pv_name_for_cb = pv_name_owned.clone();
let tx_raw_inner = tx_raw_for_task.clone();
let pv_clone = pv_name_owned.clone();
let handle_result = client
.pvmonitor_raw_frames_handle(&pv_name_owned, move |desc, body, order| {
let outcome = apply_monitor_event(&state_inner, desc, &body, order);
use crate::pva_gateway::source::RawEvent;
if outcome.type_changed {
tracing::warn!(
pv = %pv_clone,
"pva-gateway: upstream introspection changed — \
emitting type-change boundary to downstream monitors \
(cache descriptor reset)"
);
let _ = tx_raw_inner.send(RawEvent {
body: bytes::Bytes::new(),
byte_order: order,
type_changed: true,
});
return;
}
if outcome.was_first {
first_event_inner.notify_waiters();
}
if !outcome.was_first {
if let Some(val) = outcome.value {
let _ = tx_inner.send(val);
}
}
let _ = tx_raw_inner.send(RawEvent {
body,
byte_order: order,
type_changed: false,
});
})
.await;
let handle = match handle_result {
Ok(h) => h,
Err(e) => {
tracing::warn!(
pv = %pv_name_owned,
error = %e,
backoff_ms = backoff.as_millis() as u64,
"pva-gateway: raw upstream monitor failed to start, will retry"
);
if tx_raw_for_task.receiver_count() == 0 {
return;
}
tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, max_backoff);
continue;
}
};
*pauser_slot_for_task.lock() = Some(handle.pauser());
let raw_result = handle.wait().await;
*pauser_slot_for_task.lock() = None;
if let Err(e) = raw_result {
tracing::warn!(
pv = %pv_name_owned,
error = %e,
backoff_ms = backoff.as_millis() as u64,
"pva-gateway: raw upstream monitor failed, will retry"
);
if tx_raw_for_task.receiver_count() == 0 {
return;
}
tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, max_backoff);
continue;
}
backoff = Duration::from_millis(250);
if tx_for_task.receiver_count() == 0 && tx_raw_for_task.receiver_count() == 0 {
tracing::debug!(
pv = %pv_name_owned,
"pva-gateway: monitor exit (no subscribers)"
);
return;
}
}
});
Arc::new(UpstreamEntry {
pv_name: pv_name.to_string(),
state,
tx,
tx_raw,
first_event,
_monitor_task: AbortOnDrop(join.abort_handle()),
drop_poke: parking_lot::Mutex::new(true),
pauser: pauser_slot,
})
}
async fn await_first_event(
&self,
entry: Arc<UpstreamEntry>,
connect_timeout: Duration,
) -> GwResult<Arc<UpstreamEntry>> {
let notify = entry.first_event.clone();
let notified = notify.notified();
tokio::pin!(notified);
if entry.snapshot().is_some() {
return Ok(entry);
}
let res = tokio::time::timeout(connect_timeout, &mut notified).await;
if res.is_err() && entry.snapshot().is_none() {
return Err(GwError::UpstreamTimeout(entry.pv_name.clone()));
}
Ok(entry)
}
async fn cleanup_tick(&self) {
let mut map = self.entries.lock().await;
map.retain(|_, entry| {
let mut poke = entry.drop_poke.lock();
if *poke {
*poke = false;
return true; }
entry.subscriber_count() > 0
});
}
pub async fn names(&self) -> Vec<String> {
self.entries.lock().await.keys().cloned().collect()
}
pub async fn entry_count(&self) -> usize {
self.entries.lock().await.len()
}
pub async fn flush(&self) -> usize {
let mut map = self.entries.lock().await;
let removed = map.len();
map.clear();
self.negative_cache.lock().clear();
removed
}
pub async fn drop_entry(&self, pv_name: &str) -> bool {
let removed = self.entries.lock().await.remove(pv_name).is_some();
self.negative_cache.lock().retain(|(n, _)| n != pv_name);
removed
}
}
impl Drop for ChannelCache {
fn drop(&mut self) {
if let Some(task) = self.cleanup_task.lock().take() {
task.abort();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use epics_pva_rs::proto::{BitSet, ByteOrder};
use epics_pva_rs::pvdata::{ScalarType, ScalarValue};
fn encode_body(desc: &FieldDesc, value: &PvField, set_bits: &[usize]) -> Vec<u8> {
let mut changed = BitSet::new();
for &b in set_bits {
changed.set(b);
}
let mut body = Vec::new();
changed.write_into(ByteOrder::Little, &mut body);
epics_pva_rs::pvdata::encode::encode_pv_field_with_bitset(
value,
desc,
&changed,
0,
ByteOrder::Little,
&mut body,
);
body
}
#[test]
fn bug2_get_value_tracks_every_monitor_event() {
let desc = FieldDesc::Scalar(ScalarType::Double);
let state = RwLock::new(EntryState::default());
let body1 = encode_body(&desc, &PvField::Scalar(ScalarValue::Double(1.0)), &[0]);
let o1 = apply_monitor_event(&state, &desc, &body1, ByteOrder::Little);
assert!(o1.was_first && o1.value.is_some() && !o1.type_changed);
assert_eq!(
state.read().latest,
Some(PvField::Scalar(ScalarValue::Double(1.0)))
);
let body2 = encode_body(&desc, &PvField::Scalar(ScalarValue::Double(2.0)), &[0]);
apply_monitor_event(&state, &desc, &body2, ByteOrder::Little);
assert_eq!(
state.read().latest,
Some(PvField::Scalar(ScalarValue::Double(2.0))),
"snapshot must reflect the 2nd monitor event, not freeze at the 1st"
);
let body3 = encode_body(&desc, &PvField::Scalar(ScalarValue::Double(3.0)), &[0]);
apply_monitor_event(&state, &desc, &body3, ByteOrder::Little);
assert_eq!(
state.read().latest,
Some(PvField::Scalar(ScalarValue::Double(3.0))),
"snapshot must track the live value across many events"
);
}
#[test]
fn bug2_delta_event_merges_onto_prior_snapshot() {
let desc = FieldDesc::Structure {
struct_id: String::new(),
fields: vec![
("a".to_string(), FieldDesc::Scalar(ScalarType::Int)),
("b".to_string(), FieldDesc::Scalar(ScalarType::Int)),
],
};
let full = |a: i32, b: i32| {
let mut s = epics_pva_rs::pvdata::PvStructure::new("");
s.fields
.push(("a".to_string(), PvField::Scalar(ScalarValue::Int(a))));
s.fields
.push(("b".to_string(), PvField::Scalar(ScalarValue::Int(b))));
PvField::Structure(s)
};
let state = RwLock::new(EntryState::default());
let body1 = encode_body(&desc, &full(10, 20), &[0, 1, 2]);
apply_monitor_event(&state, &desc, &body1, ByteOrder::Little);
assert_eq!(state.read().latest, Some(full(10, 20)));
let body2 = encode_body(&desc, &full(99, 0), &[1]);
apply_monitor_event(&state, &desc, &body2, ByteOrder::Little);
assert_eq!(
state.read().latest,
Some(full(99, 20)),
"delta merge must keep unmarked field `b` at its prior value"
);
}
#[test]
fn br_r42_apply_monitor_event_flags_descriptor_change() {
let desc1 = FieldDesc::Scalar(ScalarType::Double);
let state = RwLock::new(EntryState::default());
let body1 = encode_body(&desc1, &PvField::Scalar(ScalarValue::Double(1.0)), &[0]);
let o1 = apply_monitor_event(&state, &desc1, &body1, ByteOrder::Little);
assert!(
o1.was_first && !o1.type_changed,
"first event must NOT report type_changed (no prior descriptor to compare)"
);
let body2 = encode_body(&desc1, &PvField::Scalar(ScalarValue::Double(2.0)), &[0]);
let o2 = apply_monitor_event(&state, &desc1, &body2, ByteOrder::Little);
assert!(
!o2.was_first && !o2.type_changed,
"same-descriptor event must NOT flag type_changed"
);
let desc2 = FieldDesc::Scalar(ScalarType::Int);
let body3 = encode_body(&desc2, &PvField::Scalar(ScalarValue::Int(42)), &[0]);
let o3 = apply_monitor_event(&state, &desc2, &body3, ByteOrder::Little);
assert!(
o3.type_changed,
"introspection change must be flagged for the BR-R42 marker path"
);
assert!(
o3.value.is_some(),
"the new-descriptor body still decodes cleanly"
);
}
#[tokio::test]
async fn entry_subscribe_returns_fresh_receivers() {
let (tx, rx0) = broadcast::channel::<PvField>(4);
drop(rx0);
let (tx_raw, rx0_raw) = broadcast::channel::<crate::pva_gateway::source::RawEvent>(4);
drop(rx0_raw);
let task = tokio::spawn(async {
tokio::time::sleep(Duration::from_secs(60)).await;
});
let entry = UpstreamEntry {
pv_name: "X".into(),
state: Arc::new(RwLock::new(EntryState::default())),
tx,
tx_raw,
first_event: Arc::new(Notify::new()),
_monitor_task: AbortOnDrop(task.abort_handle()),
drop_poke: parking_lot::Mutex::new(false),
pauser: Arc::new(parking_lot::Mutex::new(None)),
};
assert_eq!(entry.subscriber_count(), 0);
let _r1 = entry.subscribe();
let _r2 = entry.subscribe();
assert_eq!(entry.subscriber_count(), 2);
assert!(*entry.drop_poke.lock(), "subscribe must poke");
}
}