use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use tokio::sync::Notify;
use super::config::{LinkDirection, PvaLinkConfig};
use super::link::{PvaLink, PvaLinkResult};
type RegistryKey = (String, bool, usize, LinkDirection, Option<OutOpts>);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct OutOpts {
field: String,
process: bool,
defer: bool,
retry: bool,
}
impl OutOpts {
fn from_config(config: &PvaLinkConfig) -> Option<Self> {
match config.direction {
LinkDirection::Out => Some(Self {
field: config.field.clone(),
process: config.process,
defer: config.defer,
retry: config.retry,
}),
LinkDirection::Inp => None,
}
}
}
#[derive(Default)]
pub struct PvaLinkRegistry {
map: RwLock<HashMap<RegistryKey, Arc<PvaLink>>>,
pending: RwLock<HashMap<RegistryKey, Arc<Notify>>>,
}
impl PvaLinkRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn try_get(&self, config: &PvaLinkConfig) -> Option<Arc<PvaLink>> {
let key: RegistryKey = (
config.pv_name.clone(),
config.pipeline,
config.queue_size,
config.direction,
OutOpts::from_config(config),
);
self.map.read().get(&key).cloned()
}
pub fn try_get_any(&self, pv_name: &str, direction: LinkDirection) -> Option<Arc<PvaLink>> {
self.map
.read()
.iter()
.find(|((name, _, _, dir, _), _)| name == pv_name && *dir == direction)
.map(|(_, link)| link.clone())
}
pub async fn get_or_open(&self, config: PvaLinkConfig) -> PvaLinkResult<Arc<PvaLink>> {
let key: RegistryKey = (
config.pv_name.clone(),
config.pipeline,
config.queue_size,
config.direction,
OutOpts::from_config(&config),
);
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
let (claim, notify) = {
let mut pending = self.pending.write();
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
if let Some(existing) = pending.get(&key).cloned() {
(false, existing)
} else {
let n = Arc::new(Notify::new());
pending.insert(key.clone(), n.clone());
(true, n)
}
};
if !claim {
let notified = notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
if !self.pending.read().contains_key(&key) {
return Box::pin(self.get_or_open(config)).await;
}
notified.await;
if let Some(existing) = self.map.read().get(&key).cloned() {
return Ok(existing);
}
return Box::pin(self.get_or_open(config)).await;
}
struct CompletionGuard<'a> {
owner: &'a PvaLinkRegistry,
key: RegistryKey,
notify: Arc<Notify>,
armed: bool,
}
impl<'a> CompletionGuard<'a> {
fn disarm(&mut self) {
self.armed = false;
}
}
impl<'a> Drop for CompletionGuard<'a> {
fn drop(&mut self) {
self.owner.pending.write().remove(&self.key);
self.notify.notify_waiters();
let _ = self.armed; }
}
let mut guard = CompletionGuard {
owner: self,
key: key.clone(),
notify,
armed: true,
};
let result = PvaLink::open(config).await;
let link = Arc::new(result?);
self.map.write().insert(key.clone(), link.clone());
guard.disarm();
Ok(link)
}
pub fn close_all(&self) {
self.map.write().clear();
self.pending.write().clear();
}
#[cfg(test)]
pub(crate) fn insert_for_test(&self, config: &PvaLinkConfig, link: Arc<PvaLink>) {
let key: RegistryKey = (
config.pv_name.clone(),
config.pipeline,
config.queue_size,
config.direction,
OutOpts::from_config(config),
);
self.map.write().insert(key, link);
}
pub fn len(&self) -> usize {
self.map.read().len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn close_all_empties_registry() {
let reg = PvaLinkRegistry::new();
assert!(reg.is_empty());
reg.close_all();
assert_eq!(reg.len(), 0);
}
#[tokio::test]
async fn mr_r14_out_links_keep_own_options() {
let reg = PvaLinkRegistry::new();
let cfg_a = PvaLinkConfig {
field: "fieldA".to_string(),
process: false,
defer: false,
retry: false,
..PvaLinkConfig::defaults_for("MR_R14:PV", LinkDirection::Out)
};
let cfg_b = PvaLinkConfig {
field: "fieldB".to_string(),
process: true,
defer: true,
retry: true,
..PvaLinkConfig::defaults_for("MR_R14:PV", LinkDirection::Out)
};
let link_a = reg.get_or_open(cfg_a).await.expect("open OUT link A");
let link_b = reg.get_or_open(cfg_b).await.expect("open OUT link B");
assert!(
!Arc::ptr_eq(&link_a, &link_b),
"OUT links with different options must not share one PvaLink"
);
assert_eq!(
link_a.config().field,
"fieldA",
"link A keeps its own field"
);
assert_eq!(
link_b.config().field,
"fieldB",
"link B must not inherit link A's field"
);
assert!(
!link_a.config().process && !link_a.config().defer && !link_a.config().retry,
"link A keeps its own proc/defer/retry"
);
assert!(
link_b.config().process && link_b.config().defer && link_b.config().retry,
"link B must not inherit link A's proc/defer/retry"
);
let cfg_a2 = PvaLinkConfig {
field: "fieldA".to_string(),
process: false,
defer: false,
retry: false,
..PvaLinkConfig::defaults_for("MR_R14:PV", LinkDirection::Out)
};
let link_a2 = reg.get_or_open(cfg_a2).await.expect("re-open OUT link A");
assert!(
Arc::ptr_eq(&link_a, &link_a2),
"identical OUT options must share one cached PvaLink"
);
assert_eq!(reg.len(), 2, "two distinct OUT links cached");
}
}