pub mod db_access;
mod field_io;
mod link_set;
mod links;
mod processing;
mod scan_index;
pub use link_set::{DynLinkSet, LinkSet, LinkSetRegistry};
use crate::error::{CaError, CaResult};
use crate::runtime::sync::RwLock;
use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
use crate::server::pv::ProcessVariable;
use crate::server::record::{Record, RecordInstance, ScanType};
use crate::types::EpicsValue;
pub fn parse_pv_name(name: &str) -> (&str, &str) {
match name.rsplit_once('.') {
Some((base, field)) => (base, field),
None => (name, "VAL"),
}
}
fn apply_timestamp(common: &mut super::record::CommonFields, _is_soft: bool) {
match common.tse {
0 => {
common.time = crate::runtime::general_time::get_current();
}
-1 => {
if common.time == std::time::SystemTime::UNIX_EPOCH {
common.time = crate::runtime::general_time::get_event(-1);
}
}
-2 => {
}
_ => {
common.time = crate::runtime::general_time::get_event(common.tse as i32);
}
}
}
pub enum PvEntry {
Simple(Arc<ProcessVariable>),
Record(Arc<RwLock<RecordInstance>>),
}
pub type ExternalPvResolver = Arc<dyn Fn(&str) -> Option<EpicsValue> + Send + Sync>;
pub type SearchResolver = Arc<
dyn Fn(String) -> std::pin::Pin<Box<dyn std::future::Future<Output = bool> + Send>>
+ Send
+ Sync,
>;
struct PvDatabaseInner {
simple_pvs: RwLock<HashMap<String, Arc<ProcessVariable>>>,
records: RwLock<HashMap<String, Arc<RwLock<RecordInstance>>>>,
scan_index: RwLock<HashMap<ScanType, BTreeSet<(i16, String)>>>,
cp_links: RwLock<HashMap<String, Vec<String>>>,
aliases: RwLock<HashMap<String, String>>,
registration_mutex: tokio::sync::Mutex<()>,
after_ioc_running: std::sync::Mutex<Vec<String>>,
external_resolver: RwLock<Option<ExternalPvResolver>>,
search_resolver: RwLock<Option<SearchResolver>>,
link_sets: RwLock<link_set::LinkSetRegistry>,
scan_started: std::sync::atomic::AtomicBool,
pini_done: std::sync::atomic::AtomicBool,
pini_notify: tokio::sync::Notify,
}
#[derive(Clone)]
pub struct PvDatabase {
inner: Arc<PvDatabaseInner>,
}
fn select_link_indices(selm: i16, seln: i16, count: usize) -> Vec<usize> {
match selm {
0 => (0..count).collect(),
1 => {
let i = seln as usize;
if i < count { vec![i] } else { vec![] }
}
2 => (0..count)
.filter(|i| (seln as u16) & (1 << i) != 0)
.collect(),
_ => (0..count).collect(),
}
}
impl PvDatabase {
pub fn new() -> Self {
Self {
inner: Arc::new(PvDatabaseInner {
simple_pvs: RwLock::new(HashMap::new()),
external_resolver: RwLock::new(None),
search_resolver: RwLock::new(None),
link_sets: RwLock::new(link_set::LinkSetRegistry::new()),
records: RwLock::new(HashMap::new()),
scan_index: RwLock::new(HashMap::new()),
cp_links: RwLock::new(HashMap::new()),
aliases: RwLock::new(HashMap::new()),
registration_mutex: tokio::sync::Mutex::new(()),
after_ioc_running: std::sync::Mutex::new(Vec::new()),
scan_started: std::sync::atomic::AtomicBool::new(false),
pini_done: std::sync::atomic::AtomicBool::new(false),
pini_notify: tokio::sync::Notify::new(),
}),
}
}
pub fn try_claim_scan_start(&self) -> bool {
self.inner
.scan_started
.compare_exchange(
false,
true,
std::sync::atomic::Ordering::AcqRel,
std::sync::atomic::Ordering::Acquire,
)
.is_ok()
}
pub fn mark_pini_done(&self) {
self.inner
.pini_done
.store(true, std::sync::atomic::Ordering::Release);
self.inner.pini_notify.notify_waiters();
}
pub async fn wait_for_pini(&self) {
if self
.inner
.pini_done
.load(std::sync::atomic::Ordering::Acquire)
{
return;
}
let notified = self.inner.pini_notify.notified();
if self
.inner
.pini_done
.load(std::sync::atomic::Ordering::Acquire)
{
return;
}
notified.await;
}
pub async fn set_search_resolver(&self, resolver: SearchResolver) {
*self.inner.search_resolver.write().await = Some(resolver);
}
pub async fn clear_search_resolver(&self) {
*self.inner.search_resolver.write().await = None;
}
pub async fn set_external_resolver(&self, resolver: ExternalPvResolver) {
*self.inner.external_resolver.write().await = Some(resolver);
}
pub async fn register_link_set(&self, scheme: &str, lset: link_set::DynLinkSet) {
self.inner.link_sets.write().await.register(scheme, lset);
}
pub async fn link_set(&self, scheme: &str) -> Option<link_set::DynLinkSet> {
self.inner.link_sets.read().await.get(scheme)
}
pub async fn registered_link_schemes(&self) -> Vec<String> {
let mut s = self.inner.link_sets.read().await.schemes();
s.sort();
s
}
pub async fn wait_for_external_links(&self, timeout: std::time::Duration) -> (usize, usize) {
let registry_snapshot: Vec<(String, link_set::DynLinkSet)> = {
let registry = self.inner.link_sets.read().await;
registry
.schemes()
.into_iter()
.filter_map(|s| registry.get(&s).map(|l| (s, l)))
.collect()
};
if registry_snapshot.is_empty() {
return (0, 0);
}
let mut targets: Vec<(link_set::DynLinkSet, String)> = Vec::new();
for (_scheme, lset) in ®istry_snapshot {
for n in lset.link_names() {
targets.push((lset.clone(), n));
}
}
let total = targets.len();
if total == 0 {
return (0, 0);
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
let connected = targets
.iter()
.filter(|(lset, name)| lset.is_connected(name))
.count();
if connected == total {
return (connected, total);
}
if tokio::time::Instant::now() >= deadline {
return (connected, total);
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
pub async fn record_link_fields(
&self,
record_name: &str,
) -> Vec<(String, String, crate::server::record::ParsedLink)> {
let rec = match self.get_record(record_name).await {
Some(r) => r,
None => return Vec::new(),
};
let inst = rec.read().await;
let mut out = Vec::new();
for fd in inst.record.field_list() {
if !matches!(fd.dbf_type, crate::types::DbFieldType::String) {
continue;
}
let raw = match inst.record.get_field(fd.name) {
Some(EpicsValue::String(s)) => s,
_ => continue,
};
if raw.is_empty() {
continue;
}
let parsed = crate::server::record::parse_link_v2(&raw);
if !matches!(parsed, crate::server::record::ParsedLink::None) {
out.push((fd.name.to_string(), raw, parsed));
}
}
out
}
pub(crate) async fn resolve_external_pv(&self, name: &str) -> Option<EpicsValue> {
let (scheme, body) = if let Some(rest) = name.strip_prefix("pva://") {
("pva", rest)
} else if let Some(rest) = name.strip_prefix("ca://") {
("ca", rest)
} else {
let registry = self.inner.link_sets.read().await;
for s in registry.schemes() {
if let Some(lset) = registry.get(&s) {
if let Some(v) = lset.get_value(name) {
return Some(v);
}
}
}
drop(registry);
let resolver = self.inner.external_resolver.read().await;
return resolver.as_ref().and_then(|r| r(name));
};
if let Some(lset) = self.inner.link_sets.read().await.get(scheme) {
if let Some(v) = lset.get_value(body) {
return Some(v);
}
}
let resolver = self.inner.external_resolver.read().await;
resolver.as_ref().and_then(|r| r(name))
}
pub async fn add_pv(&self, name: &str, initial: EpicsValue) -> CaResult<()> {
let _gate = self.inner.registration_mutex.lock().await;
self.check_name_free(name).await?;
let pv = Arc::new(ProcessVariable::new(name.to_string(), initial));
self.inner
.simple_pvs
.write()
.await
.insert(name.to_string(), pv);
Ok(())
}
pub async fn add_pv_with_hook(
&self,
name: &str,
initial: EpicsValue,
hook: crate::server::pv::WriteHook,
) -> CaResult<()> {
let _gate = self.inner.registration_mutex.lock().await;
self.check_name_free(name).await?;
let pv = Arc::new(ProcessVariable::new(name.to_string(), initial));
pv.set_write_hook(hook);
self.inner
.simple_pvs
.write()
.await
.insert(name.to_string(), pv);
Ok(())
}
pub async fn remove_simple_pv(&self, name: &str) -> Option<Arc<ProcessVariable>> {
let _gate = self.inner.registration_mutex.lock().await;
self.inner.simple_pvs.write().await.remove(name)
}
pub async fn add_record(&self, name: &str, record: Box<dyn Record>) -> CaResult<()> {
let _gate = self.inner.registration_mutex.lock().await;
self.check_name_free(name).await?;
let instance = RecordInstance::new_boxed(name.to_string(), record);
let scan = instance.common.scan;
let phas = instance.common.phas;
self.inner
.records
.write()
.await
.insert(name.to_string(), Arc::new(RwLock::new(instance)));
if scan != ScanType::Passive {
self.inner
.scan_index
.write()
.await
.entry(scan)
.or_default()
.insert((phas, name.to_string()));
}
Ok(())
}
async fn check_name_free(&self, name: &str) -> CaResult<()> {
let kind = if self.inner.simple_pvs.read().await.contains_key(name) {
Some("simple PV")
} else if self.inner.records.read().await.contains_key(name) {
Some("record")
} else if self.inner.aliases.read().await.contains_key(name) {
Some("alias")
} else {
None
};
if let Some(kind) = kind {
return Err(CaError::DbParseError {
line: 0,
column: 0,
message: format!("name '{name}' is already registered as a {kind}"),
});
}
Ok(())
}
pub async fn remove_record(&self, name: &str) -> bool {
let _gate = self.inner.registration_mutex.lock().await;
let removed = self.inner.records.write().await.remove(name);
let Some(rec_arc) = removed else {
return false;
};
let (scan, phas) = {
let inst = rec_arc.read().await;
(inst.common.scan, inst.common.phas)
};
if scan != ScanType::Passive {
let mut idx = self.inner.scan_index.write().await;
if let Some(set) = idx.get_mut(&scan) {
set.remove(&(phas, name.to_string()));
if set.is_empty() {
idx.remove(&scan);
}
}
}
let mut cp = self.inner.cp_links.write().await;
cp.remove(name);
for targets in cp.values_mut() {
targets.retain(|t| t != name);
}
drop(cp);
let mut aliases = self.inner.aliases.write().await;
aliases.retain(|_alias, target| target != name);
true
}
async fn find_entry_no_resolve(&self, name: &str) -> Option<PvEntry> {
let (base, _field) = parse_pv_name(name);
if let Some(pv) = self.inner.simple_pvs.read().await.get(name) {
return Some(PvEntry::Simple(pv.clone()));
}
if let Some(rec) = self.inner.records.read().await.get(base) {
return Some(PvEntry::Record(rec.clone()));
}
if let Some(target) = self.inner.aliases.read().await.get(base).cloned() {
if let Some(rec) = self.inner.records.read().await.get(&target) {
return Some(PvEntry::Record(rec.clone()));
}
}
None
}
pub async fn add_alias(&self, alias: &str, target: &str) -> CaResult<()> {
let _gate = self.inner.registration_mutex.lock().await;
if !self.inner.records.read().await.contains_key(target) {
return Err(CaError::ChannelNotFound(format!(
"alias target '{target}' is not a registered record"
)));
}
self.check_name_free(alias).await?;
self.inner
.aliases
.write()
.await
.insert(alias.to_string(), target.to_string());
Ok(())
}
pub async fn resolve_alias(&self, name: &str) -> Option<String> {
self.inner.aliases.read().await.get(name).cloned()
}
pub fn queue_after_ioc_running(&self, line: impl Into<String>) {
self.inner
.after_ioc_running
.lock()
.unwrap()
.push(line.into());
}
pub fn take_after_ioc_running(&self) -> Vec<String> {
std::mem::take(&mut *self.inner.after_ioc_running.lock().unwrap())
}
async fn has_name_no_resolve(&self, name: &str) -> bool {
let (base, _) = parse_pv_name(name);
if self.inner.simple_pvs.read().await.contains_key(name) {
return true;
}
if self.inner.records.read().await.contains_key(base) {
return true;
}
if let Some(target) = self.inner.aliases.read().await.get(base) {
return self.inner.records.read().await.contains_key(target);
}
false
}
pub async fn find_entry(&self, name: &str) -> Option<PvEntry> {
if let Some(entry) = self.find_entry_no_resolve(name).await {
return Some(entry);
}
let resolver = self.inner.search_resolver.read().await.clone();
if let Some(r) = resolver {
if r(name.to_string()).await {
return self.find_entry_no_resolve(name).await;
}
}
None
}
pub async fn has_name(&self, name: &str) -> bool {
if self.has_name_no_resolve(name).await {
return true;
}
let resolver = self.inner.search_resolver.read().await.clone();
if let Some(r) = resolver {
if r(name.to_string()).await {
return self.has_name_no_resolve(name).await;
}
}
false
}
pub async fn find_pv(&self, name: &str) -> Option<Arc<ProcessVariable>> {
if let Some(pv) = self.inner.simple_pvs.read().await.get(name) {
return Some(pv.clone());
}
None
}
pub async fn get_record(&self, name: &str) -> Option<Arc<RwLock<RecordInstance>>> {
if let Some(rec) = self.inner.records.read().await.get(name).cloned() {
return Some(rec);
}
let target = self.inner.aliases.read().await.get(name).cloned()?;
self.inner.records.read().await.get(&target).cloned()
}
pub async fn get_record_no_resolve(&self, name: &str) -> Option<Arc<RwLock<RecordInstance>>> {
self.inner.records.read().await.get(name).cloned()
}
pub async fn all_record_names(&self) -> Vec<String> {
self.inner.records.read().await.keys().cloned().collect()
}
pub async fn all_alias_names(&self) -> Vec<String> {
self.inner.aliases.read().await.keys().cloned().collect()
}
pub async fn aliases_for_record(&self, canonical: &str) -> Vec<String> {
let aliases = self.inner.aliases.read().await;
let mut hits: Vec<String> = aliases
.iter()
.filter_map(|(alias, target)| {
if target == canonical {
Some(alias.clone())
} else {
None
}
})
.collect();
hits.sort();
hits
}
pub async fn all_simple_pv_names(&self) -> Vec<String> {
self.inner.simple_pvs.read().await.keys().cloned().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_select_link_indices() {
assert_eq!(select_link_indices(0, 0, 6), vec![0, 1, 2, 3, 4, 5]);
assert_eq!(select_link_indices(1, 2, 6), vec![2]);
assert_eq!(select_link_indices(1, 10, 6), Vec::<usize>::new());
assert_eq!(select_link_indices(2, 5, 6), vec![0, 2]);
}
struct DelayedConnectLset {
names: Vec<String>,
connect_at: tokio::time::Instant,
}
impl link_set::LinkSet for DelayedConnectLset {
fn is_connected(&self, _: &str) -> bool {
tokio::time::Instant::now() >= self.connect_at
}
fn get_value(&self, _: &str) -> Option<EpicsValue> {
None
}
fn link_names(&self) -> Vec<String> {
self.names.clone()
}
}
#[tokio::test]
async fn wait_for_external_links_returns_zero_zero_when_no_lsets() {
let db = PvDatabase::new();
let (c, t) = db
.wait_for_external_links(std::time::Duration::from_millis(50))
.await;
assert_eq!((c, t), (0, 0));
}
#[tokio::test]
async fn wait_for_external_links_connected_quickly() {
let db = PvDatabase::new();
let lset = Arc::new(DelayedConnectLset {
names: vec!["pv:A".to_string(), "pv:B".to_string()],
connect_at: tokio::time::Instant::now(),
});
db.register_link_set("pva", lset).await;
let (c, t) = db
.wait_for_external_links(std::time::Duration::from_secs(1))
.await;
assert_eq!((c, t), (2, 2));
}
#[tokio::test]
async fn wait_for_external_links_returns_partial_on_timeout() {
let db = PvDatabase::new();
let lset = Arc::new(DelayedConnectLset {
names: vec!["slow:pv".to_string()],
connect_at: tokio::time::Instant::now() + std::time::Duration::from_secs(60),
});
db.register_link_set("ca", lset).await;
let started = tokio::time::Instant::now();
let (c, t) = db
.wait_for_external_links(std::time::Duration::from_millis(250))
.await;
let elapsed = started.elapsed();
assert_eq!((c, t), (0, 1));
assert!(
elapsed >= std::time::Duration::from_millis(200),
"wait must consume at least the configured budget, got {:?}",
elapsed
);
assert!(
elapsed < std::time::Duration::from_secs(2),
"wait must not exceed the budget by much, got {:?}",
elapsed
);
}
#[tokio::test]
async fn alias_resolves_through_find_entry() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(42.0)),
)
.await
.unwrap();
db.add_alias("ALIAS_NAME", "TARGET").await.unwrap();
let via_alias = db.find_entry("ALIAS_NAME").await;
let via_target = db.find_entry("TARGET").await;
assert!(via_alias.is_some());
assert!(via_target.is_some());
assert!(db.has_name("ALIAS_NAME").await);
assert!(db.has_name("TARGET").await);
assert!(!db.has_name("NOT:THERE").await);
}
#[tokio::test]
async fn alias_target_must_exist() {
let db = PvDatabase::new();
let err = db.add_alias("DANGLING", "MISSING_TARGET").await;
assert!(err.is_err(), "alias to missing target must be rejected");
}
#[tokio::test]
async fn alias_collision_with_existing_record_rejected() {
let db = PvDatabase::new();
db.add_record(
"EXISTING",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_record(
"OTHER",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
let err = db.add_alias("EXISTING", "OTHER").await;
assert!(
err.is_err(),
"alias name colliding with record must be rejected"
);
}
#[tokio::test]
async fn get_record_resolves_alias() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ALIAS", "TARGET").await.unwrap();
let via_canonical = db.get_record("TARGET").await;
let via_alias = db.get_record("ALIAS").await;
assert!(via_canonical.is_some());
assert!(via_alias.is_some(), "get_record must resolve alias");
assert!(Arc::ptr_eq(&via_canonical.unwrap(), &via_alias.unwrap()));
}
#[tokio::test]
async fn get_record_no_resolve_skips_alias_table() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ALIAS", "TARGET").await.unwrap();
assert!(db.get_record_no_resolve("TARGET").await.is_some());
assert!(
db.get_record_no_resolve("ALIAS").await.is_none(),
"get_record_no_resolve must not follow alias table"
);
}
#[tokio::test]
async fn register_cp_link_normalises_alias_to_canonical() {
let db = PvDatabase::new();
db.add_record(
"SRC_REAL",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_record(
"DST_REAL",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("SRC_ALIAS", "SRC_REAL").await.unwrap();
db.add_alias("DST_ALIAS", "DST_REAL").await.unwrap();
db.register_cp_link("SRC_ALIAS", "DST_ALIAS").await;
let targets = db.get_cp_targets("SRC_REAL").await;
assert_eq!(targets, vec!["DST_REAL".to_string()]);
let alias_lookup = db.get_cp_targets("SRC_ALIAS").await;
assert!(alias_lookup.is_empty());
}
#[tokio::test]
async fn aliases_for_record_returns_sorted_targets_only() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_record(
"OTHER",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ZZ", "TARGET").await.unwrap();
db.add_alias("AA", "TARGET").await.unwrap();
db.add_alias("MM", "OTHER").await.unwrap();
assert_eq!(
db.aliases_for_record("TARGET").await,
vec!["AA".to_string(), "ZZ".to_string()]
);
assert_eq!(db.aliases_for_record("OTHER").await, vec!["MM".to_string()]);
assert!(db.aliases_for_record("MISSING").await.is_empty());
}
#[tokio::test]
async fn all_alias_names_returns_registered_aliases() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ALIAS_A", "TARGET").await.unwrap();
db.add_alias("ALIAS_B", "TARGET").await.unwrap();
let mut aliases = db.all_alias_names().await;
aliases.sort();
assert_eq!(aliases, vec!["ALIAS_A".to_string(), "ALIAS_B".to_string()]);
assert!(!aliases.contains(&"TARGET".to_string()));
}
#[tokio::test]
async fn complete_async_record_accepts_alias() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ALIAS", "TARGET").await.unwrap();
db.complete_async_record("ALIAS").await.unwrap();
db.complete_async_record("TARGET").await.unwrap();
}
#[tokio::test]
async fn process_record_accepts_alias() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ALIAS", "TARGET").await.unwrap();
db.process_record("TARGET").await.unwrap();
db.process_record("ALIAS").await.unwrap();
assert!(db.process_record("MISSING").await.is_err());
}
#[tokio::test]
async fn process_record_with_links_accepts_alias_and_avoids_cycle() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ALIAS", "TARGET").await.unwrap();
let mut visited = std::collections::HashSet::new();
db.process_record_with_links("ALIAS", &mut visited, 0)
.await
.unwrap();
assert!(
visited.contains("TARGET"),
"visited must record the canonical name: {visited:?}",
);
assert!(
!visited.contains("ALIAS"),
"visited must NOT record the alias form: {visited:?}",
);
}
#[tokio::test]
async fn alias_duplicate_rejected() {
let db = PvDatabase::new();
db.add_record(
"TARGET",
Box::new(crate::server::records::ai::AiRecord::new(0.0)),
)
.await
.unwrap();
db.add_alias("ALIAS", "TARGET").await.unwrap();
let err = db.add_alias("ALIAS", "TARGET").await;
assert!(err.is_err(), "duplicate alias name must be rejected");
}
#[tokio::test]
async fn add_pv_and_add_record_reject_duplicates_across_namespaces() {
use crate::server::records::ai::AiRecord;
let db = PvDatabase::new();
db.add_pv("A", EpicsValue::Double(1.0)).await.unwrap();
assert!(db.add_pv("A", EpicsValue::Double(2.0)).await.is_err());
let noop_hook: crate::server::pv::WriteHook =
std::sync::Arc::new(|_v, _ctx| Box::pin(async { Ok(()) }));
assert!(
db.add_pv_with_hook("A", EpicsValue::Double(2.0), noop_hook)
.await
.is_err()
);
assert!(
db.add_record("A", Box::new(AiRecord::new(0.0)))
.await
.is_err()
);
assert!(db.add_alias("A", "A").await.is_err());
db.add_record("R", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
assert!(
db.add_record("R", Box::new(AiRecord::new(1.0)))
.await
.is_err()
);
assert!(db.add_pv("R", EpicsValue::Double(0.0)).await.is_err());
assert!(db.add_alias("R", "R").await.is_err());
db.add_alias("AL", "R").await.unwrap();
assert!(db.add_pv("AL", EpicsValue::Double(0.0)).await.is_err());
assert!(
db.add_record("AL", Box::new(AiRecord::new(0.0)))
.await
.is_err()
);
}
#[tokio::test]
async fn remove_record_purges_dangling_aliases() {
use crate::server::records::ai::AiRecord;
let db = PvDatabase::new();
db.add_record("R", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
db.add_alias("ALT1", "R").await.unwrap();
db.add_alias("ALT2", "R").await.unwrap();
db.add_record("OTHER", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
db.add_alias("KEEPER", "OTHER").await.unwrap();
assert!(db.remove_record("R").await);
db.add_pv("ALT1", EpicsValue::Double(0.0)).await.unwrap();
db.add_pv("ALT2", EpicsValue::Double(0.0)).await.unwrap();
assert_eq!(db.resolve_alias("KEEPER").await, Some("OTHER".to_string()));
}
#[tokio::test]
async fn add_alias_rejects_simple_pv_collision() {
use crate::server::records::ai::AiRecord;
let db = PvDatabase::new();
db.add_pv("PVX", EpicsValue::Double(0.0)).await.unwrap();
db.add_record("TARGET", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
assert!(db.add_alias("PVX", "TARGET").await.is_err());
}
#[tokio::test]
async fn concurrent_add_pv_and_add_record_do_not_deadlock() {
use crate::server::records::ai::AiRecord;
let db = std::sync::Arc::new(PvDatabase::new());
let db1 = db.clone();
let db2 = db.clone();
let h1 = tokio::spawn(async move { db1.add_pv("RACE", EpicsValue::Double(1.0)).await });
let h2 =
tokio::spawn(async move { db2.add_record("RACE", Box::new(AiRecord::new(0.0))).await });
let r1 = tokio::time::timeout(std::time::Duration::from_secs(2), h1)
.await
.expect("add_pv must not block on add_record");
let r2 = tokio::time::timeout(std::time::Duration::from_secs(2), h2)
.await
.expect("add_record must not block on add_pv");
let r1 = r1.unwrap();
let r2 = r2.unwrap();
assert!(
(r1.is_ok() && r2.is_err()) || (r1.is_err() && r2.is_ok()),
"exactly one of the racing inserts must succeed: r1={r1:?} r2={r2:?}",
);
}
}