#![allow(unused_imports, clippy::all)]
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use epics_base_rs::error::CaError;
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::record::*;
use epics_base_rs::server::records::ai::AiRecord;
use epics_base_rs::server::records::ao::AoRecord;
use epics_base_rs::server::records::bi::BiRecord;
use epics_base_rs::server::records::longin::LonginRecord;
use epics_base_rs::types::EpicsValue;
#[tokio::test]
async fn test_write_notify_follows_flnk() {
let db = PvDatabase::new();
db.add_record("REC_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC_A").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("REC_B".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("REC_A", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("REC_A"));
assert!(visited.contains("REC_B"));
}
#[tokio::test]
async fn test_inp_link_processing() {
let db = PvDatabase::new();
db.add_record("SOURCE", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
db.add_record("DEST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DEST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SOURCE".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("DEST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DEST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_soft_inp_read_failure_sets_link_alarm() {
use epics_base_rs::server::recgbl::alarm_status;
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("BROKEN", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("BROKEN").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("NO_SUCH_PV".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("BROKEN", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("BROKEN").await.expect("record exists");
let inst = rec.read().await;
assert_eq!(
inst.common.sevr,
AlarmSeverity::Invalid,
"broken soft-channel INP must drive SEVR=INVALID, got {:?}",
inst.common.sevr
);
assert_eq!(
inst.common.stat,
alarm_status::LINK_ALARM,
"broken soft-channel INP must drive STAT=LINK, got {}",
inst.common.stat
);
}
#[tokio::test]
async fn test_single_inp_ms_propagates_link_alarm_no_msg() {
use epics_base_rs::server::recgbl::alarm_status;
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(7.0)))
.await
.unwrap();
db.add_record("DST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC").await {
let mut inst = rec.write().await;
inst.common.stat = alarm_status::HIHI_ALARM;
inst.common.sevr = AlarmSeverity::Major;
inst.common.amsg = "src-msg".to_string();
}
if let Some(rec) = db.get_record("DST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC NPP MS".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("DST", &mut visited, 0)
.await
.unwrap();
let dst = db.get_record("DST").await.expect("DST exists");
let inst = dst.read().await;
assert_eq!(
inst.common.sevr,
AlarmSeverity::Major,
"MS link must lift DST severity to source's Major"
);
assert_eq!(
inst.common.stat,
alarm_status::LINK_ALARM,
"C parity: MS link MUST surface as LINK_ALARM, not the source's STAT"
);
assert!(
inst.common.amsg.is_empty(),
"C parity: MS link MUST NOT propagate amsg; got {:?}",
inst.common.amsg
);
}
#[tokio::test]
async fn test_single_inp_mss_propagates_stat_and_amsg() {
use epics_base_rs::server::recgbl::alarm_status;
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(7.0)))
.await
.unwrap();
db.add_record("DST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC").await {
let mut inst = rec.write().await;
inst.common.stat = alarm_status::HIHI_ALARM;
inst.common.sevr = AlarmSeverity::Major;
inst.common.amsg = "src-major".to_string();
}
if let Some(rec) = db.get_record("DST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC NPP MSS".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("DST", &mut visited, 0)
.await
.unwrap();
let dst = db.get_record("DST").await.expect("DST exists");
let inst = dst.read().await;
assert_eq!(inst.common.sevr, AlarmSeverity::Major);
assert_eq!(
inst.common.stat,
alarm_status::HIHI_ALARM,
"MSS must carry source's STAT"
);
assert_eq!(
inst.common.amsg, "src-major",
"MSS must carry source's AMSG"
);
}
#[tokio::test]
async fn test_pva_link_propagates_alarm_severity_into_link_alarm() {
use epics_base_rs::server::database::LinkSet;
use epics_base_rs::server::recgbl::alarm_status;
use epics_base_rs::server::record::AlarmSeverity;
struct AlarmingLset;
impl LinkSet for AlarmingLset {
fn is_connected(&self, _: &str) -> bool {
true
}
fn get_value(&self, _: &str) -> Option<EpicsValue> {
Some(EpicsValue::Double(12.0))
}
fn alarm_severity(&self, _: &str) -> Option<i32> {
Some(2) }
fn alarm_message(&self, _: &str) -> Option<String> {
Some("remote major".into())
}
}
let db = PvDatabase::new();
db.register_link_set("pva", Arc::new(AlarmingLset)).await;
db.add_record("PVADST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("PVADST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("pva://REMOTE:PV".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("PVADST", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("PVADST").await.expect("record exists");
let inst = rec.read().await;
assert_eq!(
inst.record.val().and_then(|v| v.to_f64()),
Some(12.0),
"pva link value must be applied"
);
assert_eq!(
inst.common.sevr,
AlarmSeverity::Major,
"pva link's MAJOR severity must reach the record's SEVR"
);
assert_eq!(
inst.common.stat,
alarm_status::LINK_ALARM,
"pva link alarm must surface as LINK_ALARM"
);
}
#[tokio::test]
async fn test_pva_link_no_alarm_when_lset_reports_none() {
use epics_base_rs::server::database::LinkSet;
use epics_base_rs::server::record::AlarmSeverity;
struct QuietLset;
impl LinkSet for QuietLset {
fn is_connected(&self, _: &str) -> bool {
true
}
fn get_value(&self, _: &str) -> Option<EpicsValue> {
Some(EpicsValue::Double(5.0))
}
}
let db = PvDatabase::new();
db.register_link_set("pva", Arc::new(QuietLset)).await;
db.add_record("PVAQUIET", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("PVAQUIET").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("pva://REMOTE:OK".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("PVAQUIET", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("PVAQUIET").await.expect("record exists");
let inst = rec.read().await;
assert_eq!(
inst.record.val().and_then(|v| v.to_f64()),
Some(5.0),
"pva link value must still be applied"
);
assert_eq!(
inst.common.sevr,
AlarmSeverity::NoAlarm,
"no lset severity → record stays NO_ALARM"
);
}
#[tokio::test]
async fn test_pva_out_link_writes_value_through_link_set() {
use std::sync::Mutex;
use epics_base_rs::server::database::LinkSet;
struct CapturingLset {
writes: Arc<Mutex<Vec<(String, EpicsValue)>>>,
}
impl LinkSet for CapturingLset {
fn is_connected(&self, _: &str) -> bool {
true
}
fn get_value(&self, _: &str) -> Option<EpicsValue> {
None
}
fn put_value(&self, name: &str, value: EpicsValue) -> Result<(), String> {
self.writes.lock().unwrap().push((name.to_string(), value));
Ok(())
}
}
let writes = Arc::new(Mutex::new(Vec::new()));
let db = PvDatabase::new();
db.register_link_set(
"pva",
Arc::new(CapturingLset {
writes: writes.clone(),
}),
)
.await;
db.add_record("AO_PVAOUT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("AO_PVAOUT").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("pva://REMOTE:OUT".into()))
.unwrap();
inst.common.udf = false;
inst.record
.put_field("VAL", EpicsValue::Double(3.5))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("AO_PVAOUT", &mut visited, 0)
.await
.unwrap();
let captured = writes.lock().unwrap();
assert_eq!(
captured.len(),
1,
"the pva OUT link must drive exactly one put_value"
);
assert_eq!(
captured[0].0, "REMOTE:OUT",
"put_value must receive the bare PV name (scheme stripped)"
);
assert_eq!(
captured[0].1.to_f64(),
Some(3.5),
"put_value must receive the record's processed value"
);
}
#[tokio::test]
async fn test_pva_out_link_no_link_set_fails_gracefully() {
let db = PvDatabase::new();
db.add_record("AO_NOLSET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("AO_NOLSET").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("pva://NOWHERE:PV".into()))
.unwrap();
inst.common.udf = false;
inst.record
.put_field("VAL", EpicsValue::Double(1.0))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("AO_NOLSET", &mut visited, 0)
.await
.expect("process must complete despite the unresolvable OUT link");
let rec = db.get_record("AO_NOLSET").await.expect("record exists");
let inst = rec.read().await;
assert_eq!(
inst.record.val().and_then(|v| v.to_f64()),
Some(1.0),
"the record itself still holds its value"
);
}
#[tokio::test]
async fn test_mss_propagates_amsg_only_change_posts_amsg_event() {
use epics_base_rs::server::recgbl::{EventMask, alarm_status};
use epics_base_rs::server::record::AlarmSeverity;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("SRC_AMSG", Box::new(AoRecord::new(7.0)))
.await
.unwrap();
db.add_record("DST_AMSG", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC_AMSG").await {
let mut inst = rec.write().await;
inst.common.stat = alarm_status::HIHI_ALARM;
inst.common.sevr = AlarmSeverity::Major;
inst.common.amsg = "msg1".to_string();
}
if let Some(rec) = db.get_record("DST_AMSG").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC_AMSG NPP MSS".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("DST_AMSG", &mut visited, 0)
.await
.unwrap();
let mut amsg_rx = {
let rec = db.get_record("DST_AMSG").await.unwrap();
let mut inst = rec.write().await;
inst.add_subscriber("AMSG", 11, DbFieldType::String, EventMask::ALARM.bits())
}
.expect("AMSG subscription must be accepted");
if let Some(rec) = db.get_record("SRC_AMSG").await {
let mut inst = rec.write().await;
inst.common.amsg = "msg2".to_string();
}
let mut visited = HashSet::new();
db.process_record_with_links("DST_AMSG", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("DST_AMSG").await.unwrap();
let inst = rec.read().await;
assert_eq!(inst.common.sevr, AlarmSeverity::Major, "sevr unchanged");
assert_eq!(inst.common.amsg, "msg2", "amsg propagated");
}
let event = amsg_rx
.try_recv()
.expect("AMSG-only change must produce an event on DBE_ALARM-class subscribers");
assert!(
matches!(event.snapshot.value, EpicsValue::String(ref s) if s == "msg2"),
"AMSG event payload should be the new message, got {:?}",
event.snapshot.value
);
}
#[tokio::test]
async fn test_process_record_delivers_udf_monitor_event() {
use epics_base_rs::server::recgbl::{EventMask, alarm_status};
use epics_base_rs::server::record::AlarmSeverity;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("UDF_REC", Box::new(AiRecord::new(5.0)))
.await
.unwrap();
{
let rec = db.get_record("UDF_REC").await.unwrap();
let mut inst = rec.write().await;
inst.common.udf = true;
inst.common.sevr = AlarmSeverity::Invalid;
inst.common.stat = alarm_status::UDF_ALARM;
}
let mut udf_rx = {
let rec = db.get_record("UDF_REC").await.unwrap();
let mut inst = rec.write().await;
inst.add_subscriber("UDF", 31, DbFieldType::Char, EventMask::ALARM.bits())
}
.expect("UDF subscription must be accepted");
db.process_record("UDF_REC").await.unwrap();
{
let rec = db.get_record("UDF_REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf, "process must have cleared UDF");
}
let event = udf_rx
.try_recv()
.expect("a UDF change via process_record must deliver a UDF monitor event");
assert!(
matches!(event.snapshot.value, EpicsValue::Char(0)),
"UDF event payload should be the cleared value 0, got {:?}",
event.snapshot.value
);
}
#[tokio::test]
async fn test_putf_clears_after_synchronous_put_completion() {
let db = PvDatabase::new();
db.add_record("PUTF_SYNC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let _ = db
.put_record_field_from_ca("PUTF_SYNC", "VAL", EpicsValue::Double(42.0))
.await;
let rec = db.get_record("PUTF_SYNC").await.unwrap();
let inst = rec.read().await;
assert!(
!inst.common.putf,
"after synchronous put completion, PUTF must clear (mirrors C recGblFwdLink:302)"
);
}
#[tokio::test]
async fn test_putf_survives_async_round_trip_and_clears_on_completion() {
let db = PvDatabase::new();
db.add_record("ASYNC_PUTF", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
let _ = db
.put_record_field_from_ca("ASYNC_PUTF", "VAL", EpicsValue::Double(7.0))
.await;
{
let rec = db.get_record("ASYNC_PUTF").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "async pending → PACT=true");
assert!(
inst.common.putf,
"PUTF must remain TRUE across the async round trip — \
pre-fix the Rust port cleared it before the process call \
so async-completion logic could not classify the trigger \
as put-driven"
);
}
db.complete_async_record("ASYNC_PUTF").await.unwrap();
{
let rec = db.get_record("ASYNC_PUTF").await.unwrap();
let inst = rec.read().await;
assert!(!inst.is_processing(), "completion clears PACT");
assert!(
!inst.common.putf,
"complete_async_record_inner must clear PUTF (recGblFwdLink parity)"
);
}
}
#[tokio::test]
async fn test_put_record_field_from_ca_clears_udf_on_primary_field_write() {
let db = PvDatabase::new();
db.add_record("UDF_ASYNC", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
{
let rec = db.get_record("UDF_ASYNC").await.unwrap();
assert!(
rec.read().await.common.udf,
"AsyncRecord starts undefined (udf=true)"
);
}
let _ = db
.put_record_field_from_ca("UDF_ASYNC", "VAL", EpicsValue::Double(7.0))
.await;
let rec = db.get_record("UDF_ASYNC").await.unwrap();
let inst = rec.read().await;
assert!(
inst.is_processing(),
"AsyncRecord should be mid-async (PACT=true)"
);
assert!(
!inst.common.udf,
"primary-field CA put must clear UDF synchronously \
(dbAccess.c::dbPut:1411 parity) — observable before \
complete_async_record runs"
);
}
#[tokio::test]
async fn test_putf_stays_off_for_cp_chained_targets() {
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TGT", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TGT").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC CP".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("SRC", &mut visited, 0)
.await
.unwrap();
let tgt = db.get_record("TGT").await.expect("TGT exists");
let inst = tgt.read().await;
assert!(
!inst.common.putf,
"CP-driven TGT must not carry PUTF=1 — that bit belongs only to the directly-put record"
);
}
#[tokio::test]
async fn test_putf_propagates_through_db_out_link_to_passive_target() {
let db = PvDatabase::new();
db.add_record("PUTF_OUT_TGT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("PUTF_OUT_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("PUTF_OUT_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("PUTF_OUT_TGT PP".into()))
.unwrap();
}
let _ = db
.put_record_field_from_ca("PUTF_OUT_SRC", "VAL", EpicsValue::Double(5.0))
.await;
let tgt = db.get_record("PUTF_OUT_TGT").await.unwrap();
let inst = tgt.read().await;
assert!(
!inst.common.putf,
"after both records' synchronous cycles complete, both clear putf"
);
assert!(
!inst.common.rpro,
"target was not pact, so rpro must stay false (normal propagation)"
);
let val = inst.record.val().and_then(|v| v.to_f64()).unwrap_or(0.0);
assert!(
(val - 5.0).abs() < 1e-10,
"OUT link write propagated value (val={val})"
);
}
#[tokio::test]
async fn test_putf_propagates_mid_cycle_via_async_target_out_link() {
let db = PvDatabase::new();
db.add_record("PROP_TGT", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
db.add_record("PROP_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("PROP_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("PROP_TGT PP".into()))
.unwrap();
}
let _ = db
.put_record_field_from_ca("PROP_SRC", "VAL", EpicsValue::Double(11.0))
.await;
let tgt = db.get_record("PROP_TGT").await.unwrap();
let inst = tgt.read().await;
assert!(
inst.is_processing(),
"AsyncPending target stays pact between process and complete"
);
assert!(
inst.common.putf,
"target.putf must inherit from src.putf BEFORE complete_async_record clears it \
(C dbDbLink.c::processTarget:474). Pre-fix this stayed false."
);
}
#[tokio::test]
async fn test_simm_raw_input_runs_conversion_chain() {
let db = PvDatabase::new();
db.add_record("RAW:SRC", Box::new(AoRecord::new(5.0)))
.await
.unwrap();
let mut ai = epics_base_rs::server::records::ai::AiRecord::new(0.0);
ai.linr = 1;
ai.eslo = 2.0;
ai.eoff = 10.0;
db.add_record("AI:SIMRAW", Box::new(ai)).await.unwrap();
if let Some(rec) = db.get_record("AI:SIMRAW").await {
let mut inst = rec.write().await;
inst.record.put_field("SIMM", EpicsValue::Short(2)).unwrap();
inst.record
.put_field("SIOL", EpicsValue::String("RAW:SRC".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("AI:SIMRAW", &mut visited, 0)
.await
.unwrap();
let ai_rec = db.get_record("AI:SIMRAW").await.expect("AI:SIMRAW exists");
let inst = ai_rec.read().await;
let val = inst
.record
.get_field("VAL")
.and_then(|v| v.to_f64())
.expect("VAL must be readable as f64");
assert!(
(val - 20.0).abs() < 1e-10,
"SIMM=RAW must run convert(): expected VAL=5*ESLO+EOFF=20.0, got {val}"
);
let rval = inst
.record
.get_field("RVAL")
.and_then(|v| match v {
EpicsValue::Long(n) => Some(n as f64),
other => other.to_f64(),
})
.expect("RVAL must be readable");
assert!(
(rval - 5.0).abs() < 1e-10,
"RVAL must hold the raw count from SIOL; got {rval}"
);
}
#[tokio::test]
async fn test_cycle_detection() {
let db = PvDatabase::new();
db.add_record("CYCLE_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("CYCLE_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("CYCLE_A").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("CYCLE_B".into()))
.unwrap();
}
if let Some(rec) = db.get_record("CYCLE_B").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("CYCLE_A".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("CYCLE_A", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("CYCLE_A"));
assert!(visited.contains("CYCLE_B"));
assert_eq!(visited.len(), 2);
}
#[tokio::test]
async fn test_ao_drvh_drvl_clamp() {
let mut rec = AoRecord::new(0.0);
rec.drvh = 100.0;
rec.drvl = -50.0;
rec.val = 200.0;
rec.process().unwrap();
assert!((rec.val - 100.0).abs() < 1e-10);
rec.val = -100.0;
rec.process().unwrap();
assert!((rec.val - (-50.0)).abs() < 1e-10);
}
#[tokio::test]
async fn test_ao_oroc_rate_limit() {
let mut rec = AoRecord::new(0.0);
rec.oroc = 5.0;
rec.drvh = 0.0;
rec.drvl = 0.0;
rec.val = 100.0;
rec.process().unwrap();
assert!((rec.oval - 5.0).abs() < 1e-10, "First: oval={}", rec.oval);
rec.val = 200.0;
rec.process().unwrap();
assert!((rec.oval - 10.0).abs() < 1e-10, "Second: oval={}", rec.oval);
}
#[tokio::test]
async fn test_ao_omsl_dol() {
let db = PvDatabase::new();
db.add_record("SOURCE", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
let mut ao = AoRecord::new(0.0);
ao.omsl = 1;
ao.dol = "SOURCE".to_string();
db.add_record("OUTPUT", Box::new(ao)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("OUTPUT", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("OUTPUT").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_ao_oif_incremental() {
let db = PvDatabase::new();
db.add_record("DELTA", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let mut ao = AoRecord::new(100.0);
ao.omsl = 1;
ao.oif = 1;
ao.dol = "DELTA".to_string();
db.add_record("OUTPUT", Box::new(ao)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("OUTPUT", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("OUTPUT").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 110.0).abs() < 1e-10),
other => panic!("expected Double(110.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_ao_ivoa_dont_drive() {
let db = PvDatabase::new();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(999.0);
ao.ivoa = 1;
db.add_record("OUTPUT", Box::new(ao)).await.unwrap();
if let Some(rec) = db.get_record("OUTPUT").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TARGET".into()))
.unwrap();
inst.put_common_field("HIHI", EpicsValue::Double(100.0))
.unwrap();
inst.put_common_field("HHSV", EpicsValue::Short(AlarmSeverity::Invalid as i16))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("OUTPUT", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("TARGET").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 0.0).abs() < 1e-10),
other => panic!("expected Double(0.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_db_load_records_asl_field() {
use epics_base_rs::server::db_loader;
use epics_base_rs::server::records::ai::AiRecord;
let defs = db_loader::parse_db(
r#"
record(ai, "ASLT:HIGH") {
field(ASL, "1")
}
record(ai, "ASLT:LOW") {
}
"#,
&std::collections::HashMap::new(),
)
.unwrap();
let db = PvDatabase::new();
for def in defs {
let mut record: Box<dyn epics_base_rs::server::record::Record> =
Box::new(AiRecord::new(0.0));
let mut common_fields = Vec::new();
db_loader::apply_fields(&mut record, &def.fields, &mut common_fields).unwrap();
db.add_record(&def.name, record).await.unwrap();
if let Some(rec) = db.get_record(&def.name).await {
let mut inst = rec.write().await;
for (n, v) in common_fields {
let _ = inst.put_common_field(&n, v);
}
}
}
let high = db.get_record("ASLT:HIGH").await.unwrap();
let low = db.get_record("ASLT:LOW").await.unwrap();
assert_eq!(
high.read().await.common.asl,
1,
"field(ASL, \"1\") must set ASL=1"
);
assert_eq!(low.read().await.common.asl, 0, "absent ASL defaults to 0");
}
#[tokio::test]
async fn test_ao_ivoa_set_to_ivov_writes_oval() {
use epics_base_rs::server::records::ao::AoRecord;
let db = PvDatabase::new();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(7.0);
ao.ivoa = 2;
ao.ivov = 42.0;
db.add_record("SRC", Box::new(ao)).await.unwrap();
if let Some(rec) = db.get_record("SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TARGET".into()))
.unwrap();
inst.put_common_field("HIHI", EpicsValue::Double(1.0))
.unwrap();
inst.put_common_field("HHSV", EpicsValue::Short(AlarmSeverity::Invalid as i16))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("SRC", &mut visited, 0)
.await
.unwrap();
let v = db.get_pv("TARGET").await.unwrap();
assert!(
matches!(v, EpicsValue::Double(d) if (d - 42.0).abs() < 1e-9),
"TARGET must receive IVOV via OVAL: got {v:?}"
);
let oval = db.get_pv("SRC.OVAL").await.unwrap();
assert!(
matches!(oval, EpicsValue::Double(d) if (d - 42.0).abs() < 1e-9),
"SRC.OVAL must equal IVOV: got {oval:?}"
);
}
#[tokio::test]
async fn test_bo_ivoa_set_to_ivov_writes_rval() {
use epics_base_rs::server::records::bo::BoRecord;
let db = PvDatabase::new();
let mut bo = BoRecord::new(0);
bo.ivoa = 2;
bo.ivov = 1;
db.add_record("BO_SRC", Box::new(bo)).await.unwrap();
if let Some(rec) = db.get_record("BO_SRC").await {
let mut inst = rec.write().await;
inst.common.nsev = AlarmSeverity::Invalid;
inst.common.nsta = epics_base_rs::server::recgbl::alarm_status::SOFT_ALARM;
}
let mut visited = HashSet::new();
db.process_record_with_links("BO_SRC", &mut visited, 0)
.await
.unwrap();
let rval = db.get_pv("BO_SRC.RVAL").await.unwrap();
assert!(
matches!(rval, EpicsValue::Long(1)),
"BO_SRC.RVAL must equal IVOV(1): got {rval:?}"
);
}
#[tokio::test]
async fn test_calcout_ivoa_set_to_ivov_writes_oval_only() {
use epics_base_rs::server::records::calcout::CalcoutRecord;
let db = PvDatabase::new();
db.add_record(
"OUT_TGT",
Box::new(epics_base_rs::server::records::ao::AoRecord::new(0.0)),
)
.await
.unwrap();
let mut co = CalcoutRecord::default();
co.ivoa = 2;
co.ivov = 17.5;
co.val = 99.9;
co.oval = 99.9;
co.calc = "A".to_string();
db.add_record("CO_SRC", Box::new(co)).await.unwrap();
if let Some(rec) = db.get_record("CO_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("OUT_TGT".into()))
.unwrap();
inst.put_common_field("HIHI", EpicsValue::Double(1.0))
.unwrap();
inst.put_common_field("HHSV", EpicsValue::Short(AlarmSeverity::Invalid as i16))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("CO_SRC", &mut visited, 0)
.await
.unwrap();
let v = db.get_pv("OUT_TGT").await.unwrap();
assert!(
matches!(v, EpicsValue::Double(d) if (d - 17.5).abs() < 1e-9),
"OUT_TGT must receive IVOV via OVAL: got {v:?}"
);
}
#[tokio::test]
async fn test_sim_mode_input() {
let db = PvDatabase::new();
db.add_record("SIM_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("SIM_VAL", Box::new(AoRecord::new(99.0)))
.await
.unwrap();
let mut ai = AiRecord::new(0.0);
ai.siml = "SIM_SW".to_string();
ai.siol = "SIM_VAL".to_string();
ai.sims = 1;
db.add_record("SIM_AI", Box::new(ai)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SIM_AI", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("SIM_AI").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 99.0).abs() < 1e-10),
other => panic!("expected Double(99.0), got {:?}", other),
}
let sevr = db.get_pv("SIM_AI.SEVR").await.unwrap();
assert!(matches!(sevr, EpicsValue::Short(1)));
}
#[tokio::test]
async fn test_sim_mode_toggle() {
let db = PvDatabase::new();
db.add_record("SIM_SW", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("SIM_VAL", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
db.add_record("REAL_SRC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let mut ai = AiRecord::new(0.0);
ai.siml = "SIM_SW".to_string();
ai.siol = "SIM_VAL".to_string();
db.add_record("TEST_AI", Box::new(ai)).await.unwrap();
if let Some(rec) = db.get_record("TEST_AI").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("REAL_SRC".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("TEST_AI", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("TEST_AI").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 10.0).abs() < 1e-10),
other => panic!("expected Double(10.0), got {:?}", other),
}
db.put_pv("SIM_SW", EpicsValue::Double(1.0)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("TEST_AI", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("TEST_AI").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_sim_mode_output() {
let db = PvDatabase::new();
db.add_record("SIM_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("SIM_OUT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(77.0);
ao.siml = "SIM_SW".to_string();
ao.siol = "SIM_OUT".to_string();
db.add_record("TEST_AO", Box::new(ao)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("TEST_AO", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("SIM_OUT").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 77.0).abs() < 1e-10),
other => panic!("expected Double(77.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_sdis_disable_skips_process() {
let db = PvDatabase::new();
db.add_record("DISABLE_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TARGET").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DISABLE_SW".into()))
.unwrap();
inst.put_common_field("DISS", EpicsValue::Short(1)).unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("TARGET", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("TARGET").await.unwrap();
let inst = rec.read().await;
assert_eq!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::DISABLE_ALARM
);
assert_eq!(inst.common.sevr, AlarmSeverity::Minor);
drop(inst);
db.put_pv("DISABLE_SW", EpicsValue::Double(0.0))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("TARGET", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("TARGET").await.unwrap();
let inst = rec.read().await;
assert_ne!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::DISABLE_ALARM
);
}
#[tokio::test]
async fn test_phas_scan_order() {
let db = PvDatabase::new();
db.add_record("REC_C", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
for (name, phas) in &[("REC_C", 2i16), ("REC_A", 0), ("REC_B", 1)] {
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.put_common_field("PHAS", EpicsValue::Short(*phas))
.unwrap();
let result = inst
.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
if let CommonFieldPutResult::ScanChanged {
old_scan,
new_scan,
phas: p,
} = result
{
drop(inst);
db.update_scan_index(name, old_scan, new_scan, p, p).await;
}
}
}
let names = db.records_for_scan(ScanType::Sec1).await;
assert_eq!(names, vec!["REC_A", "REC_B", "REC_C"]);
}
#[tokio::test]
async fn test_depth_limit() {
let db = PvDatabase::new();
for i in 0..20 {
db.add_record(&format!("CHAIN_{i}"), Box::new(AoRecord::new(0.0)))
.await
.unwrap();
}
for i in 0..19 {
if let Some(rec) = db.get_record(&format!("CHAIN_{i}")).await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String(format!("CHAIN_{}", i + 1)))
.unwrap();
}
}
let mut visited = HashSet::new();
db.process_record_with_links("CHAIN_0", &mut visited, 0)
.await
.unwrap();
assert!(visited.len() <= 17);
assert!(visited.contains("CHAIN_0"));
}
#[tokio::test]
async fn test_disp_blocks_ca_put() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db
.put_record_field_from_ca("REC", "VAL", EpicsValue::Double(42.0))
.await;
assert!(matches!(result, Err(CaError::PutDisabled(_))));
}
#[tokio::test]
async fn test_disp_allows_disp_write() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db
.put_record_field_from_ca("REC", "DISP", EpicsValue::Char(0))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.disp);
}
#[tokio::test]
async fn test_disp_bypassed_by_internal_put() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db.put_pv("REC", EpicsValue::Double(42.0)).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_proc_triggers_processing() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.put_pv("REC", EpicsValue::Double(42.0)).await.unwrap();
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
#[tokio::test]
async fn test_proc_works_any_scan() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
}
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
#[tokio::test]
async fn test_proc_bypasses_disp() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_proc_while_pact() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
#[tokio::test]
async fn test_lcnt_ca_write_rejected() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let result = db
.put_record_field_from_ca("REC", "LCNT", EpicsValue::Short(0))
.await;
assert!(matches!(result, Err(CaError::ReadOnlyField(_))));
}
#[tokio::test]
async fn test_ca_put_scan_index_update() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.put_record_field_from_ca("REC", "SCAN", EpicsValue::String("1 second".into()))
.await
.unwrap();
let names = db.records_for_scan(ScanType::Sec1).await;
assert!(names.contains(&"REC".to_string()));
}
struct MockDeviceSupport {
read_count: Arc<AtomicU32>,
write_count: Arc<AtomicU32>,
dtyp_name: String,
}
impl MockDeviceSupport {
fn new(dtyp: &str, read_count: Arc<AtomicU32>, write_count: Arc<AtomicU32>) -> Self {
Self {
read_count,
write_count,
dtyp_name: dtyp.to_string(),
}
}
}
impl epics_base_rs::server::device_support::DeviceSupport for MockDeviceSupport {
fn read(
&mut self,
_record: &mut dyn Record,
) -> epics_base_rs::error::CaResult<epics_base_rs::server::device_support::DeviceReadOutcome>
{
self.read_count.fetch_add(1, Ordering::SeqCst);
Ok(epics_base_rs::server::device_support::DeviceReadOutcome::ok())
}
fn write(&mut self, _record: &mut dyn Record) -> epics_base_rs::error::CaResult<()> {
self.write_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn dtyp(&self) -> &str {
&self.dtyp_name
}
}
#[tokio::test]
async fn test_ca_put_no_double_device_write() {
let db = PvDatabase::new();
db.add_record("AO_REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AO_REC").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.device = Some(Box::new(mock));
}
db.put_record_field_from_ca("AO_REC", "VAL", EpicsValue::Double(42.0))
.await
.unwrap();
assert_eq!(write_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_bi_raw_soft_channel_inp_applies_mask() {
let db = PvDatabase::new();
db.add_record("SRC_LI", Box::new(LonginRecord::new(0)))
.await
.unwrap();
db.add_record("BI_RAW", Box::new(BiRecord::new(0)))
.await
.unwrap();
db.put_record_field_from_ca("SRC_LI", "VAL", EpicsValue::Long(0xFF))
.await
.unwrap();
if let Some(rec) = db.get_record("BI_RAW").await {
let mut inst = rec.write().await;
inst.common.dtyp = "Raw Soft Channel".to_string();
inst.common.inp = "SRC_LI".to_string();
inst.parsed_inp = epics_base_rs::server::record::parse_link_v2(&inst.common.inp);
inst.record
.put_field("MASK", EpicsValue::Long(0x0F))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("BI_RAW", &mut visited, 0)
.await
.unwrap();
if let Some(rec) = db.get_record("BI_RAW").await {
let inst = rec.read().await;
let rval = inst.record.get_field("RVAL");
assert_eq!(
rval,
Some(EpicsValue::Long(0x0F)),
"MASK must clamp RVAL to low nibble"
);
let val = inst.record.get_field("VAL");
assert_eq!(
val,
Some(EpicsValue::Enum(1)),
"masked-non-zero RVAL → VAL=1"
);
}
}
#[tokio::test]
async fn test_input_record_no_device_write() {
let db = PvDatabase::new();
db.add_record("AI_REC", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AI_REC").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.device = Some(Box::new(mock));
}
let mut visited = HashSet::new();
db.process_record_with_links("AI_REC", &mut visited, 0)
.await
.unwrap();
assert_eq!(read_count.load(Ordering::SeqCst), 1);
assert_eq!(write_count.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn test_non_passive_output_ca_put_triggers_write() {
let db = PvDatabase::new();
db.add_record("AO_NP", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AO_NP").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.common.scan = ScanType::Sec1;
inst.device = Some(Box::new(mock));
}
db.put_record_field_from_ca("AO_NP", "VAL", EpicsValue::Double(42.0))
.await
.unwrap();
assert_eq!(write_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_proc_triggers_device_write() {
let db = PvDatabase::new();
db.add_record("AO_PROC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AO_PROC").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.device = Some(Box::new(mock));
}
db.put_record_field_from_ca("AO_PROC", "PROC", EpicsValue::Char(1))
.await
.unwrap();
assert_eq!(write_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_phas_change_updates_scan_index() {
let db = PvDatabase::new();
db.add_record("REC_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
for (name, phas) in &[("REC_A", 10i16), ("REC_B", 5)] {
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.put_common_field("PHAS", EpicsValue::Short(*phas))
.unwrap();
let result = inst
.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
if let CommonFieldPutResult::ScanChanged {
old_scan,
new_scan,
phas: p,
} = result
{
drop(inst);
db.update_scan_index(name, old_scan, new_scan, p, p).await;
}
}
}
let names = db.records_for_scan(ScanType::Sec1).await;
assert_eq!(names, vec!["REC_B", "REC_A"]);
if let Some(rec) = db.get_record("REC_A").await {
let mut inst = rec.write().await;
let result = inst.put_common_field("PHAS", EpicsValue::Short(0)).unwrap();
if let CommonFieldPutResult::PhasChanged {
scan,
old_phas,
new_phas,
} = result
{
drop(inst);
db.update_scan_index("REC_A", scan, scan, old_phas, new_phas)
.await;
}
}
let names = db.records_for_scan(ScanType::Sec1).await;
assert_eq!(names, vec!["REC_A", "REC_B"]);
}
#[tokio::test]
async fn test_scan_change_preserves_phas() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("PHAS", EpicsValue::Short(3)).unwrap();
let result = inst
.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
match result {
CommonFieldPutResult::ScanChanged { phas, .. } => assert_eq!(phas, 3),
other => panic!("expected ScanChanged, got {:?}", other),
}
}
}
#[tokio::test]
async fn test_phas_change_passive_no_index() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
let result = inst.put_common_field("PHAS", EpicsValue::Short(5)).unwrap();
assert_eq!(result, CommonFieldPutResult::NoChange);
}
}
struct AsyncRecord {
val: f64,
}
impl Record for AsyncRecord {
fn record_type(&self) -> &'static str {
"async_test"
}
fn process(&mut self) -> epics_base_rs::error::CaResult<ProcessOutcome> {
Ok(ProcessOutcome::async_pending())
}
fn get_field(&self, name: &str) -> Option<EpicsValue> {
match name {
"VAL" => Some(EpicsValue::Double(self.val)),
_ => None,
}
}
fn put_field(&mut self, name: &str, value: EpicsValue) -> epics_base_rs::error::CaResult<()> {
match name {
"VAL" => {
if let EpicsValue::Double(v) = value {
self.val = v;
Ok(())
} else {
Err(CaError::InvalidValue("bad".into()))
}
}
_ => Err(CaError::FieldNotFound(name.into())),
}
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
}
#[tokio::test]
async fn test_async_pending_skips_post_process() {
let db = PvDatabase::new();
db.add_record("ASYNC", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
db.add_record("FLNK_TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("ASYNC").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("FLNK_TARGET".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("ASYNC"));
assert!(!visited.contains("FLNK_TARGET"));
let rec = db.get_record("ASYNC").await.unwrap();
let inst = rec.read().await;
assert!(inst.common.udf);
}
#[tokio::test]
async fn test_complete_async_record() {
let db = PvDatabase::new();
db.add_record("ASYNC", Box::new(AsyncRecord { val: 42.0 }))
.await
.unwrap();
db.add_record("FLNK_TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("ASYNC").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("FLNK_TARGET".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC", &mut visited, 0)
.await
.unwrap();
assert!(!visited.contains("FLNK_TARGET"));
db.complete_async_record("ASYNC").await.unwrap();
let rec = db.get_record("ASYNC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
struct AsyncAlarmingRecord;
impl Record for AsyncAlarmingRecord {
fn record_type(&self) -> &'static str {
"async_alarm_test"
}
fn process(&mut self) -> epics_base_rs::error::CaResult<ProcessOutcome> {
Ok(ProcessOutcome::async_pending())
}
fn check_alarms(&mut self, common: &mut epics_base_rs::server::record::CommonFields) {
use epics_base_rs::server::recgbl::{self, alarm_status};
recgbl::rec_gbl_set_sevr(
common,
alarm_status::STATE_ALARM,
epics_base_rs::server::record::AlarmSeverity::Major,
);
}
fn get_field(&self, name: &str) -> Option<EpicsValue> {
match name {
"VAL" => Some(EpicsValue::Double(1.0)),
_ => None,
}
}
fn put_field(&mut self, name: &str, _value: EpicsValue) -> epics_base_rs::error::CaResult<()> {
match name {
"VAL" => Ok(()),
_ => Err(CaError::FieldNotFound(name.into())),
}
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
}
#[tokio::test]
async fn test_complete_async_posts_sevr_with_per_field_mask() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::server::record::AlarmSeverity;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("ASYNC_SEVR", Box::new(AsyncAlarmingRecord))
.await
.unwrap();
if let Some(rec) = db.get_record("ASYNC_SEVR").await {
let mut inst = rec.write().await;
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_SEVR", &mut visited, 0)
.await
.unwrap();
let (mut sevr_value_rx, mut sevr_alarm_rx) = {
let rec = db.get_record("ASYNC_SEVR").await.unwrap();
let mut inst = rec.write().await;
let v = inst
.add_subscriber("SEVR", 21, DbFieldType::Short, EventMask::VALUE.bits())
.expect("DBE_VALUE SEVR subscription accepted");
let a = inst
.add_subscriber("SEVR", 22, DbFieldType::Short, EventMask::ALARM.bits())
.expect("DBE_ALARM SEVR subscription accepted");
(v, a)
};
db.complete_async_record("ASYNC_SEVR").await.unwrap();
{
let rec = db.get_record("ASYNC_SEVR").await.unwrap();
let inst = rec.read().await;
assert_eq!(
inst.common.sevr,
AlarmSeverity::Major,
"completion must raise Major"
);
}
assert!(
sevr_value_rx.try_recv().is_ok(),
"DBE_VALUE SEVR subscriber must receive the SEVR change"
);
assert!(
sevr_alarm_rx.try_recv().is_err(),
"DBE_ALARM-only SEVR subscriber must NOT receive SEVR \
(per-field mask collapsed onto record-wide ALARM mask)"
);
}
#[tokio::test]
async fn test_pact_entry_guard_silent_bail_until_max_lock() {
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("ASYNC_PACT", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_PACT", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("ASYNC_PACT").await.unwrap();
let inst = rec.read().await;
assert!(
inst.is_processing(),
"first cycle must leave PACT=true (AsyncPending)"
);
assert_eq!(inst.common.lcnt, 0, "first cycle must reset lcnt");
assert_eq!(inst.common.sevr, AlarmSeverity::NoAlarm);
}
for i in 1..=10 {
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_PACT", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_PACT").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "must remain PACT=true (iter {i})");
assert_eq!(inst.common.lcnt, i as i16, "lcnt must increment per bail");
assert_eq!(
inst.common.sevr,
AlarmSeverity::NoAlarm,
"no SCAN_ALARM yet (iter {i})"
);
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_PACT", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_PACT").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "PACT still true post-alarm-raise");
assert_eq!(inst.common.sevr, AlarmSeverity::Invalid);
assert_eq!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::SCAN_ALARM
);
assert_eq!(inst.common.amsg, "Async in progress");
}
#[tokio::test]
async fn test_pact_entry_guard_tpro_diagnostic_does_not_change_bail_outcome() {
let db = PvDatabase::new();
db.add_record("ASYNC_TPRO", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
{
let rec = db.get_record("ASYNC_TPRO").await.unwrap();
let mut inst = rec.write().await;
inst.common.tpro = true;
inst.common.rpro = true;
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_TPRO", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("ASYNC_TPRO").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "must enter PACT");
assert!(inst.common.tpro, "TPRO must be preserved");
assert!(inst.common.rpro, "RPRO must be preserved across PACT entry");
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_TPRO", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_TPRO").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "still PACT after bail");
assert_eq!(inst.common.lcnt, 1, "lcnt must have advanced");
assert!(
inst.common.rpro,
"RPRO must remain unchanged by the diagnostic path"
);
}
#[tokio::test]
async fn test_pact_entry_guard_resets_lcnt_after_completion() {
let db = PvDatabase::new();
db.add_record("ASYNC_RESET", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_RESET", &mut visited, 0)
.await
.unwrap();
for _ in 0..3 {
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_RESET", &mut visited, 0)
.await
.unwrap();
}
{
let rec = db.get_record("ASYNC_RESET").await.unwrap();
assert_eq!(rec.read().await.common.lcnt, 3);
}
db.complete_async_record("ASYNC_RESET").await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_RESET", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_RESET").await.unwrap();
let inst = rec.read().await;
assert_eq!(inst.common.lcnt, 0, "lcnt must reset when PACT clears");
}
#[tokio::test]
async fn test_reprocess_after_continuation_bypasses_pact_guard() {
use epics_base_rs::server::record::{ProcessAction, ProcessOutcome, RecordProcessResult};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
struct ContinuationRecord {
process_count: Arc<AtomicU32>,
}
impl Record for ContinuationRecord {
fn record_type(&self) -> &'static str {
"continuation_test"
}
fn process(&mut self) -> epics_base_rs::error::CaResult<ProcessOutcome> {
let n = self.process_count.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Ok(ProcessOutcome {
result: RecordProcessResult::AsyncPending,
actions: vec![ProcessAction::ReprocessAfter(
std::time::Duration::from_millis(20),
)],
device_did_compute: false,
})
} else {
Ok(ProcessOutcome::complete())
}
}
fn get_field(&self, _name: &str) -> Option<EpicsValue> {
None
}
fn put_field(
&mut self,
_name: &str,
_value: EpicsValue,
) -> epics_base_rs::error::CaResult<()> {
Ok(())
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
}
let process_count = Arc::new(AtomicU32::new(0));
let db = PvDatabase::new();
db.add_record(
"CONT_REC",
Box::new(ContinuationRecord {
process_count: process_count.clone(),
}),
)
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CONT_REC", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("CONT_REC").await.unwrap();
assert!(
rec.read().await.is_processing(),
"PACT must be true after AsyncPending"
);
}
assert_eq!(process_count.load(Ordering::SeqCst), 1);
let mut visited = HashSet::new();
db.process_record_with_links("CONT_REC", &mut visited, 0)
.await
.unwrap();
assert_eq!(
process_count.load(Ordering::SeqCst),
1,
"foreign re-entry during AsyncPending must NOT call process()"
);
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
assert_eq!(
process_count.load(Ordering::SeqCst),
2,
"ReprocessAfter timer must call process() again — owner-driven \
continuation bypasses the PACT entry guard"
);
{
let rec = db.get_record("CONT_REC").await.unwrap();
assert!(
!rec.read().await.is_processing(),
"BUG 1: completed ReprocessAfter continuation must clear PACT"
);
}
let mut visited = HashSet::new();
db.process_record_with_links("CONT_REC", &mut visited, 0)
.await
.unwrap();
assert_eq!(
process_count.load(Ordering::SeqCst),
3,
"BUG 1: after the continuation cleared PACT, a foreign process \
must run process() again instead of bailing at the entry guard"
);
}
#[tokio::test]
async fn test_dbnd_filter_drops_subthreshold_changes() {
use epics_base_rs::server::database::filters::DeadbandFilter;
use epics_base_rs::server::recgbl::EventMask;
use std::sync::Arc;
let db = PvDatabase::new();
db.add_record("DBND:REC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let rec = db.get_record("DBND:REC").await.unwrap();
let mut rx = {
let mut inst = rec.write().await;
let rx = inst
.add_subscriber(
"VAL",
1,
epics_base_rs::types::DbFieldType::Double,
EventMask::VALUE.bits(),
)
.expect("subscribe");
let attached =
inst.attach_filter_to_last_subscriber("VAL", Arc::new(DeadbandFilter::absolute(1.0)));
assert!(attached, "filter must attach to the just-added subscriber");
rx
};
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(11.0))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
rx.try_recv()
.expect("first value passes the deadband filter");
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(11.4))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
assert!(
rx.try_recv().is_err(),
"sub-threshold change must be filtered out"
);
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(12.5))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
rx.try_recv().expect("above-threshold change passes");
}
#[tokio::test]
async fn test_dbnd_filter_passes_alarm_events() {
use epics_base_rs::server::database::filters::DeadbandFilter;
use epics_base_rs::server::recgbl::EventMask;
use std::sync::Arc;
let db = PvDatabase::new();
db.add_record("DBND:ALR", Box::new(AoRecord::new(50.0)))
.await
.unwrap();
let rec = db.get_record("DBND:ALR").await.unwrap();
let mut rx = {
let mut inst = rec.write().await;
let rx = inst
.add_subscriber(
"VAL",
1,
epics_base_rs::types::DbFieldType::Double,
(EventMask::VALUE | EventMask::ALARM).bits(),
)
.expect("subscribe");
inst.attach_filter_to_last_subscriber("VAL", Arc::new(DeadbandFilter::absolute(10.0)));
rx
};
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(50.0))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
rx.try_recv().expect("seed value");
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(50.5))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
assert!(rx.try_recv().is_err(), "sub-threshold value silenced");
{
let inst = rec.read().await;
inst.notify_field("VAL", EventMask::ALARM);
}
rx.try_recv().expect("alarm event passes the filter");
}
#[tokio::test]
async fn test_notify_field_respects_mask() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
let (mut value_rx, mut alarm_rx) = {
let mut inst = rec.write().await;
let value_rx = inst
.add_subscriber(
"VAL",
1,
epics_base_rs::types::DbFieldType::Double,
EventMask::VALUE.bits(),
)
.expect("subscribe should not be capped at default");
let alarm_rx = inst
.add_subscriber(
"VAL",
2,
epics_base_rs::types::DbFieldType::Double,
EventMask::ALARM.bits(),
)
.expect("subscribe should not be capped at default");
(value_rx, alarm_rx)
};
{
let inst = rec.read().await;
inst.notify_field("VAL", EventMask::VALUE);
}
assert!(value_rx.try_recv().is_ok());
assert!(alarm_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_sdis_disable_clears_rpro_and_putf() {
let db = PvDatabase::new();
db.add_record("DIS_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("DIS_TGT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DIS_TGT").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DIS_SW".into()))
.unwrap();
inst.common.rpro = true;
inst.common.putf = true;
}
let mut visited = HashSet::new();
db.process_record_with_links("DIS_TGT", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("DIS_TGT").await.unwrap();
let inst = rec.read().await;
assert!(
!inst.common.rpro,
"SDIS disable must clear rpro (C dbAccess.c:575). Pre-fix this leaked."
);
assert!(
!inst.common.putf,
"SDIS disable must clear putf (C dbAccess.c:576). Pre-fix this leaked."
);
}
#[tokio::test]
async fn test_sdis_disable_fires_put_notify_completion() {
let db = PvDatabase::new();
db.add_record("DIS_NOT_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("DIS_NOT_TGT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DIS_NOT_TGT").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DIS_NOT_SW".into()))
.unwrap();
}
let (tx, rx) = epics_base_rs::runtime::sync::oneshot::channel();
{
let rec = db.get_record("DIS_NOT_TGT").await.unwrap();
let mut inst = rec.write().await;
inst.put_notify_tx = Some(tx);
}
let mut visited = HashSet::new();
db.process_record_with_links("DIS_NOT_TGT", &mut visited, 0)
.await
.unwrap();
rx.await
.expect("disable bail must fire put_notify_tx (C dbAccess.c:622)");
let rec = db.get_record("DIS_NOT_TGT").await.unwrap();
assert!(
rec.read().await.put_notify_tx.is_none(),
"put_notify_tx must be cleared after firing"
);
}
#[tokio::test]
async fn test_sdis_disable_notifies_alarm() {
let db = PvDatabase::new();
db.add_record("DISABLE_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TARGET").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DISABLE_SW".into()))
.unwrap();
inst.put_common_field("DISS", EpicsValue::Short(1)).unwrap();
}
let mut alarm_rx = {
let rec = db.get_record("TARGET").await.unwrap();
let mut inst = rec.write().await;
inst.add_subscriber(
"VAL",
1,
epics_base_rs::types::DbFieldType::Double,
EventMask::ALARM.bits(),
)
.expect("subscribe should not be capped at default")
};
let mut visited = HashSet::new();
db.process_record_with_links("TARGET", &mut visited, 0)
.await
.unwrap();
assert!(alarm_rx.try_recv().is_ok());
}
#[tokio::test]
async fn test_udf_cleared_by_process_with_links() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
assert!(rec.read().await.common.udf);
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
assert!(!rec.read().await.common.udf);
}
#[tokio::test]
async fn test_udf_not_cleared_by_clears_udf_false() {
struct NoClearRecord {
val: f64,
}
impl Record for NoClearRecord {
fn record_type(&self) -> &'static str {
"noclear"
}
fn get_field(&self, name: &str) -> Option<EpicsValue> {
match name {
"VAL" => Some(EpicsValue::Double(self.val)),
_ => None,
}
}
fn put_field(
&mut self,
name: &str,
value: EpicsValue,
) -> epics_base_rs::error::CaResult<()> {
match name {
"VAL" => {
if let EpicsValue::Double(v) = value {
self.val = v;
Ok(())
} else {
Err(CaError::InvalidValue("bad".into()))
}
}
_ => Err(CaError::FieldNotFound(name.into())),
}
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
fn clears_udf(&self) -> bool {
false
}
}
let db = PvDatabase::new();
db.add_record("REC", Box::new(NoClearRecord { val: 0.0 }))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
assert!(rec.read().await.common.udf);
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
assert!(rec.read().await.common.udf);
}
#[tokio::test]
async fn test_constant_inp_link() {
let db = PvDatabase::new();
db.add_record("AI_CONST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("AI_CONST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("3.15".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("AI_CONST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("AI_CONST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 3.15).abs() < 1e-10),
other => panic!("expected Double(3.15), got {:?}", other),
}
}
#[tokio::test]
async fn test_calc_multi_input_db_links() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
db.add_record("SRC_A", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
db.add_record("SRC_B", Box::new(AoRecord::new(20.0)))
.await
.unwrap();
let mut calc = CalcRecord::new("A+B");
calc.inpa = "SRC_A".to_string();
calc.inpb = "SRC_B".to_string();
db.add_record("CALC_REC", Box::new(calc)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CALC_REC", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("CALC_REC").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 30.0).abs() < 1e-10),
other => panic!("expected Double(30.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_calc_multi_input_pp_processes_passive_source() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let src = CalcRecord::new("42");
db.add_record("PP_SRC", Box::new(src)).await.unwrap();
let mut dst = CalcRecord::new("A");
dst.inpa = "PP_SRC PP".to_string();
db.add_record("PP_DST", Box::new(dst)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("PP_DST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("PP_DST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!(
(v - 42.0).abs() < 1e-10,
"PP multi-input link must process source first: expected 42, got {v}"
),
other => panic!("expected Double(42.0), got {other:?}"),
}
let src_val = db.get_pv("PP_SRC").await.unwrap();
match src_val {
EpicsValue::Double(v) => assert!(
(v - 42.0).abs() < 1e-10,
"PP_SRC must have been processed by the PP link, VAL={v}"
),
other => panic!("expected Double(42.0), got {other:?}"),
}
}
#[tokio::test]
async fn test_calc_multi_input_npp_does_not_process_source() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let src = CalcRecord::new("42");
db.add_record("NPP_SRC", Box::new(src)).await.unwrap();
let mut dst = CalcRecord::new("A");
dst.inpa = "NPP_SRC NPP".to_string();
db.add_record("NPP_DST", Box::new(dst)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("NPP_DST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("NPP_DST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!(
v.abs() < 1e-10,
"NPP multi-input link must NOT process source: expected 0, got {v}"
),
other => panic!("expected Double(0.0), got {other:?}"),
}
}
#[tokio::test]
async fn test_calc_pp_link_cycle_terminates() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let mut a = CalcRecord::new("A");
a.inpa = "CALC_B PP".to_string();
db.add_record("CALC_A", Box::new(a)).await.unwrap();
let mut b = CalcRecord::new("A");
b.inpa = "CALC_A PP".to_string();
db.add_record("CALC_B", Box::new(b)).await.unwrap();
let mut visited = HashSet::new();
let result = db
.process_record_with_links("CALC_A", &mut visited, 0)
.await;
assert!(
result.is_ok(),
"PP-link A<->B cycle must terminate cleanly, got {result:?}"
);
let va = db.get_pv("CALC_A").await.unwrap();
let vb = db.get_pv("CALC_B").await.unwrap();
match (va, vb) {
(EpicsValue::Double(x), EpicsValue::Double(y)) => {
assert!(
x.is_finite() && y.is_finite(),
"cycle must leave finite values, got A={x} B={y}"
);
}
other => panic!("expected Double values, got {other:?}"),
}
}
#[tokio::test]
async fn test_calc_constant_inputs() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let mut calc = CalcRecord::new("A+B");
calc.inpa = "5".to_string();
calc.inpb = "3.5".to_string();
db.add_record("CALC_CONST", Box::new(calc)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CALC_CONST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("CALC_CONST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 8.5).abs() < 1e-10),
other => panic!("expected Double(8.5), got {:?}", other),
}
}
#[tokio::test]
async fn test_calc_record_has_analog_alarm_limits() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let mut calc = CalcRecord::new("A");
calc.inpa = "15".to_string(); db.add_record("CALC_LIM", Box::new(calc)).await.unwrap();
db.put_record_field_from_ca("CALC_LIM", "HIHI", EpicsValue::Double(10.0))
.await
.unwrap();
db.put_record_field_from_ca("CALC_LIM", "HHSV", EpicsValue::String("MAJOR".into()))
.await
.unwrap();
let hihi = {
let rec = db.get_record("CALC_LIM").await.unwrap();
let inst = rec.read().await;
inst.resolve_field("HIHI").and_then(|v| v.to_f64()).unwrap()
};
assert_eq!(hihi, 10.0);
let mut visited = HashSet::new();
db.process_record_with_links("CALC_LIM", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("CALC_LIM").await.unwrap();
let inst = rec.read().await;
assert_eq!(
inst.common.sevr,
epics_base_rs::server::record::AlarmSeverity::Major,
"VAL=15, HIHI=10, HHSV=MAJOR — must raise HIHI alarm",
);
assert_eq!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::HIHI_ALARM,
);
}
#[tokio::test]
async fn test_calc_record_aftc_filter_delays_alarm() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let mut calc = CalcRecord::new("A");
calc.inpa = "1".to_string();
calc.aftc = 5.0; db.add_record("CALC_AFTC", Box::new(calc)).await.unwrap();
db.put_record_field_from_ca("CALC_AFTC", "HIHI", EpicsValue::Double(10.0))
.await
.unwrap();
db.put_record_field_from_ca("CALC_AFTC", "HHSV", EpicsValue::String("MAJOR".into()))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CALC_AFTC", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("CALC_AFTC").await.unwrap();
{
let mut inst = rec.write().await;
let _ = inst.record.put_field("VAL", EpicsValue::Double(15.0));
}
let mut visited = HashSet::new();
db.process_record_with_links("CALC_AFTC", &mut visited, 0)
.await
.unwrap();
let inst = rec.read().await;
let afvl = inst
.record
.get_field("AFVL")
.and_then(|v| v.to_f64())
.unwrap_or(0.0);
assert!(afvl != 0.0, "AFVL must be updated when AFTC > 0");
}
#[tokio::test]
async fn test_fanout_all() {
use epics_base_rs::server::records::fanout::FanoutRecord;
let db = PvDatabase::new();
let mut fanout = FanoutRecord::new();
fanout.selm = 0;
fanout.lnk1 = "TARGET_1".to_string();
fanout.lnk2 = "TARGET_2".to_string();
db.add_record("FANOUT", Box::new(fanout)).await.unwrap();
db.add_record("TARGET_1", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TARGET_2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("FANOUT", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("FANOUT"));
assert!(visited.contains("TARGET_1"));
assert!(visited.contains("TARGET_2"));
}
#[tokio::test]
async fn test_fanout_specified() {
use epics_base_rs::server::records::fanout::FanoutRecord;
let db = PvDatabase::new();
let mut fanout = FanoutRecord::new();
fanout.selm = 1;
fanout.seln = 1;
db.add_record("FANOUT", Box::new(fanout)).await.unwrap();
db.add_record("T1", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("T2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("FANOUT").await {
let mut inst = rec.write().await;
inst.record
.put_field("LNK1", EpicsValue::String("T1".into()))
.unwrap();
inst.record
.put_field("LNK2", EpicsValue::String("T2".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("FANOUT", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("FANOUT"));
assert!(visited.contains("T1"));
assert!(!visited.contains("T2"));
}
#[tokio::test]
async fn test_dfanout_value_write() {
use epics_base_rs::server::records::dfanout::DfanoutRecord;
let db = PvDatabase::new();
let mut dfan = DfanoutRecord::new(42.0);
dfan.selm = 0;
dfan.outa = "DEST_A".to_string();
dfan.outb = "DEST_B".to_string();
db.add_record("DFAN", Box::new(dfan)).await.unwrap();
db.add_record("DEST_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("DEST_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("DFAN", &mut visited, 0)
.await
.unwrap();
let val_a = db.get_pv("DEST_A").await.unwrap();
match val_a {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
let val_b = db.get_pv("DEST_B").await.unwrap();
match val_b {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_dfanout_omsl_closed_loop_sources_val_from_dol() {
use epics_base_rs::server::records::dfanout::DfanoutRecord;
let db = PvDatabase::new();
db.add_record("DOL_SRC", Box::new(AoRecord::new(7.5)))
.await
.unwrap();
let mut dfan = DfanoutRecord::new(0.0);
dfan.selm = 0;
dfan.outa = "DFAN_DEST_A".to_string();
dfan.outb = "DFAN_DEST_B".to_string();
dfan.dol = "DOL_SRC".to_string();
dfan.omsl = 1; db.add_record("DFAN_OMSL", Box::new(dfan)).await.unwrap();
db.add_record("DFAN_DEST_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("DFAN_DEST_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("DFAN_OMSL", &mut visited, 0)
.await
.unwrap();
let val_a = db.get_pv("DFAN_DEST_A").await.unwrap();
assert!(
matches!(val_a, EpicsValue::Double(v) if (v - 7.5).abs() < 1e-10),
"DFAN_DEST_A must reflect DOL_SRC (=7.5), got {val_a:?}"
);
let val_b = db.get_pv("DFAN_DEST_B").await.unwrap();
assert!(
matches!(val_b, EpicsValue::Double(v) if (v - 7.5).abs() < 1e-10),
"DFAN_DEST_B must reflect DOL_SRC (=7.5), got {val_b:?}"
);
}
#[tokio::test]
async fn test_dfanout_omsl_supervisory_ignores_dol() {
use epics_base_rs::server::records::dfanout::DfanoutRecord;
let db = PvDatabase::new();
db.add_record("DOL_SRC2", Box::new(AoRecord::new(99.0)))
.await
.unwrap();
let mut dfan = DfanoutRecord::new(3.0);
dfan.selm = 0;
dfan.outa = "DFAN_DEST_A2".to_string();
dfan.dol = "DOL_SRC2".to_string();
dfan.omsl = 0; db.add_record("DFAN_SUP", Box::new(dfan)).await.unwrap();
db.add_record("DFAN_DEST_A2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("DFAN_SUP", &mut visited, 0)
.await
.unwrap();
let val_a = db.get_pv("DFAN_DEST_A2").await.unwrap();
assert!(
matches!(val_a, EpicsValue::Double(v) if (v - 3.0).abs() < 1e-10),
"OMSL=supervisory must keep the operator-staged VAL (=3.0), got {val_a:?}"
);
}
#[tokio::test]
async fn test_seq_dol_lnk_dispatch() {
use epics_base_rs::server::records::seq::SeqRecord;
let db = PvDatabase::new();
db.add_record("SEQ_SRC1", Box::new(AoRecord::new(100.0)))
.await
.unwrap();
db.add_record("SEQ_SRC2", Box::new(AoRecord::new(200.0)))
.await
.unwrap();
db.add_record("SEQ_DEST1", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("SEQ_DEST2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut seq = SeqRecord::new();
seq.selm = 0;
seq.dol1 = "SEQ_SRC1".to_string();
seq.lnk1 = "SEQ_DEST1".to_string();
seq.dol2 = "SEQ_SRC2".to_string();
seq.lnk2 = "SEQ_DEST2".to_string();
db.add_record("SEQ_REC", Box::new(seq)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SEQ_REC", &mut visited, 0)
.await
.unwrap();
let val1 = db.get_pv("SEQ_DEST1").await.unwrap();
match val1 {
EpicsValue::Double(v) => assert!((v - 100.0).abs() < 1e-10),
other => panic!("expected Double(100.0), got {:?}", other),
}
let val2 = db.get_pv("SEQ_DEST2").await.unwrap();
match val2 {
EpicsValue::Double(v) => assert!((v - 200.0).abs() < 1e-10),
other => panic!("expected Double(200.0), got {:?}", other),
}
}
struct CountingTarget {
process_count: Arc<AtomicU32>,
}
impl Record for CountingTarget {
fn record_type(&self) -> &'static str {
"counting_target"
}
fn process(&mut self) -> epics_base_rs::error::CaResult<ProcessOutcome> {
self.process_count.fetch_add(1, Ordering::SeqCst);
Ok(ProcessOutcome::complete())
}
fn get_field(&self, _name: &str) -> Option<EpicsValue> {
None
}
fn put_field(&mut self, _name: &str, _value: EpicsValue) -> epics_base_rs::error::CaResult<()> {
Ok(())
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
}
#[tokio::test]
async fn test_seq_bare_lnk_does_not_process_passive_target() {
use epics_base_rs::server::records::seq::SeqRecord;
let db = PvDatabase::new();
let bare_count = Arc::new(AtomicU32::new(0));
let pp_count = Arc::new(AtomicU32::new(0));
db.add_record(
"SEQ_BARE_TGT",
Box::new(CountingTarget {
process_count: bare_count.clone(),
}),
)
.await
.unwrap();
db.add_record(
"SEQ_PP_TGT",
Box::new(CountingTarget {
process_count: pp_count.clone(),
}),
)
.await
.unwrap();
let mut seq = SeqRecord::new();
seq.selm = 0;
seq.do1 = 11.0;
seq.lnk1 = "SEQ_BARE_TGT".to_string();
seq.do2 = 22.0;
seq.lnk2 = "SEQ_PP_TGT PP".to_string();
db.add_record("SEQ_NPP_REC", Box::new(seq)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SEQ_NPP_REC", &mut visited, 0)
.await
.unwrap();
assert_eq!(
bare_count.load(Ordering::SeqCst),
0,
"bare seq LNKn (NPP) must NOT process its Passive target"
);
assert_eq!(
pp_count.load(Ordering::SeqCst),
1,
"explicit-PP seq LNKn must process its Passive target"
);
}
#[tokio::test]
async fn test_sseq_bare_lnk_does_not_process_passive_target() {
use epics_base_rs::server::records::sseq::SseqRecord;
let db = PvDatabase::new();
let bare_count = Arc::new(AtomicU32::new(0));
let pp_count = Arc::new(AtomicU32::new(0));
db.add_record(
"SSEQ_BARE_TGT",
Box::new(CountingTarget {
process_count: bare_count.clone(),
}),
)
.await
.unwrap();
db.add_record(
"SSEQ_PP_TGT",
Box::new(CountingTarget {
process_count: pp_count.clone(),
}),
)
.await
.unwrap();
let mut sseq = SseqRecord::new();
sseq.selm = 0;
sseq.put_field("DO1", EpicsValue::Double(11.0)).unwrap();
sseq.put_field("LNK1", EpicsValue::String("SSEQ_BARE_TGT".to_string()))
.unwrap();
sseq.put_field("DO2", EpicsValue::Double(22.0)).unwrap();
sseq.put_field("LNK2", EpicsValue::String("SSEQ_PP_TGT PP".to_string()))
.unwrap();
db.add_record("SSEQ_NPP_REC", Box::new(sseq)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SSEQ_NPP_REC", &mut visited, 0)
.await
.unwrap();
assert_eq!(
bare_count.load(Ordering::SeqCst),
0,
"bare sseq LNKn (NPP) must NOT process its Passive target"
);
assert_eq!(
pp_count.load(Ordering::SeqCst),
1,
"explicit-PP sseq LNKn must process its Passive target"
);
}
#[tokio::test]
async fn test_sseq_per_step_dly_delays_step_write() {
use epics_base_rs::server::records::sseq::SseqRecord;
let db = PvDatabase::new();
db.add_record("SSEQ_DLY_TGT1", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("SSEQ_DLY_TGT2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut sseq = SseqRecord::new();
sseq.selm = 0; sseq.put_field("DLY1", EpicsValue::Double(0.3)).unwrap();
sseq.put_field("DO1", EpicsValue::Double(11.0)).unwrap();
sseq.put_field("LNK1", EpicsValue::String("SSEQ_DLY_TGT1 PP".to_string()))
.unwrap();
sseq.put_field("DLY2", EpicsValue::Double(0.0)).unwrap();
sseq.put_field("DO2", EpicsValue::Double(22.0)).unwrap();
sseq.put_field("LNK2", EpicsValue::String("SSEQ_DLY_TGT2 PP".to_string()))
.unwrap();
db.add_record("SSEQ_DLY_REC", Box::new(sseq)).await.unwrap();
let db_proc = db.clone();
let handle = tokio::spawn(async move {
let mut visited = HashSet::new();
db_proc
.process_record_with_links("SSEQ_DLY_REC", &mut visited, 0)
.await
.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(
db.get_pv("SSEQ_DLY_TGT1").await.unwrap(),
EpicsValue::Double(0.0),
"step 1 LNKn must not fire before its DLY1 delay elapses"
);
assert_eq!(
db.get_pv("SSEQ_DLY_TGT2").await.unwrap(),
EpicsValue::Double(0.0),
"step 2 must not fire before step 1's delay completes"
);
handle.await.unwrap();
assert_eq!(
db.get_pv("SSEQ_DLY_TGT1").await.unwrap(),
EpicsValue::Double(11.0),
"step 1 LNKn must fire after DLY1 elapses"
);
assert_eq!(
db.get_pv("SSEQ_DLY_TGT2").await.unwrap(),
EpicsValue::Double(22.0),
"step 2 LNKn must fire after step 1"
);
}
#[tokio::test]
async fn test_sel_nvl_link() {
use epics_base_rs::server::records::sel::SelRecord;
let db = PvDatabase::new();
db.add_record("NVL_SRC", Box::new(AoRecord::new(2.0)))
.await
.unwrap();
let mut sel = SelRecord::default();
sel.selm = 0;
sel.nvl = "NVL_SRC".to_string();
sel.a = 10.0;
sel.b = 20.0;
sel.c = 30.0;
db.add_record("SEL_REC", Box::new(sel)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SEL_REC", &mut visited, 0)
.await
.unwrap();
let seln = db.get_pv("SEL_REC.SELN").await.unwrap();
match seln {
EpicsValue::Short(v) => assert_eq!(v, 2),
other => panic!("expected Short(2), got {:?}", other),
}
let val = db.get_pv("SEL_REC").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 30.0).abs() < 1e-10),
other => panic!("expected Double(30.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_dol_cp_link_registration() {
let db = PvDatabase::new();
db.add_record("MTR", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(0.0);
ao.omsl = 1;
ao.dol = "MTR CP".to_string();
db.add_record("MOTOR_POS", Box::new(ao)).await.unwrap();
db.setup_cp_links().await;
let targets = db.get_cp_targets("MTR").await;
assert_eq!(targets, vec!["MOTOR_POS"]);
}
#[tokio::test]
async fn test_dol_cp_link_triggers_processing() {
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let mut ao = AoRecord::new(0.0);
ao.omsl = 1;
ao.dol = "SRC CP".to_string();
db.add_record("DST", Box::new(ao)).await.unwrap();
db.setup_cp_links().await;
let mut visited = HashSet::new();
db.process_record_with_links("SRC", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 10.0).abs() < 1e-10),
other => panic!("expected Double(10.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_seq_dol_cp_link_registration() {
use epics_base_rs::server::records::seq::SeqRecord;
let db = PvDatabase::new();
db.add_record("SENSOR", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut seq = SeqRecord::default();
seq.dol1 = "SENSOR CP".to_string();
db.add_record("MY_SEQ", Box::new(seq)).await.unwrap();
db.setup_cp_links().await;
let targets = db.get_cp_targets("SENSOR").await;
assert_eq!(targets, vec!["MY_SEQ"]);
}
#[tokio::test]
async fn test_sel_nvl_cp_link_registration() {
use epics_base_rs::server::records::sel::SelRecord;
let db = PvDatabase::new();
db.add_record("INDEX_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut sel = SelRecord::default();
sel.nvl = "INDEX_SRC CP".to_string();
db.add_record("MY_SEL", Box::new(sel)).await.unwrap();
db.setup_cp_links().await;
let targets = db.get_cp_targets("INDEX_SRC").await;
assert_eq!(targets, vec!["MY_SEL"]);
}
#[tokio::test]
async fn test_sdis_cp_link_registration() {
let db = PvDatabase::new();
db.add_record("DISABLE_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("GUARDED", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec_arc) = db.get_record("GUARDED").await {
rec_arc.write().await.common.sdis = "DISABLE_SRC CP".to_string();
}
db.setup_cp_links().await;
let targets = db.get_cp_targets("DISABLE_SRC").await;
assert_eq!(targets, vec!["GUARDED"]);
}
#[tokio::test]
async fn test_tse_minus1_always_overwrites_via_best_time() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let stale = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1234567);
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.common.tse = -1;
inst.common.time = stale;
}
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert_ne!(
inst.common.time, stale,
"TSE=-1 must always overwrite via generalTime BestTime, matching \
C `epicsTimeGetEvent(-1)` called unconditionally"
);
}
#[tokio::test]
async fn test_tse_minus2_keeps_time_unchanged() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let fixed_time = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(999);
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.common.tse = -2;
inst.common.time = fixed_time;
}
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert_eq!(inst.common.time, fixed_time);
}
#[tokio::test]
async fn test_putf_read_only_from_ca() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let result = db
.put_record_field_from_ca("REC", "PUTF", EpicsValue::Char(1))
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_rpro_causes_reprocessing() {
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
db.add_record("DEST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DEST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("DEST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DEST").await.unwrap();
assert_eq!(val.to_f64().unwrap() as i64, 10);
db.put_pv_no_process("SRC", EpicsValue::Double(20.0))
.await
.unwrap();
if let Some(rec) = db.get_record("DEST").await {
let mut inst = rec.write().await;
inst.common.rpro = true;
}
let mut visited = HashSet::new();
db.process_record_with_links("DEST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DEST").await.unwrap();
assert_eq!(val.to_f64().unwrap() as i64, 20);
let rec = db.get_record("DEST").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.rpro);
}
#[tokio::test]
async fn test_tsel_cp_link_registration() {
let db = PvDatabase::new();
db.add_record("TSE_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TARGET", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec_arc) = db.get_record("TARGET").await {
let mut inst = rec_arc.write().await;
inst.common.tsel = "TSE_SRC CP".to_string();
inst.parsed_tsel = parse_link_v2(&inst.common.tsel);
}
db.setup_cp_links().await;
let targets = db.get_cp_targets("TSE_SRC").await;
assert_eq!(targets, vec!["TARGET"]);
}
#[tokio::test]
async fn test_new_common_fields_get_put() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("UDFS"), Some(EpicsValue::Short(3)));
}
{
let mut inst = rec.write().await;
inst.put_common_field("UDFS", EpicsValue::Short(1)).unwrap();
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("UDFS"), Some(EpicsValue::Short(1)));
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("SSCN"), Some(EpicsValue::Enum(0)));
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("BKPT"), Some(EpicsValue::Char(0)));
}
{
let mut inst = rec.write().await;
inst.put_common_field("BKPT", EpicsValue::Char(1)).unwrap();
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("BKPT"), Some(EpicsValue::Char(1)));
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("TSE"), Some(EpicsValue::Short(0)));
}
{
let inst = rec.read().await;
assert_eq!(
inst.get_common_field("TSEL"),
Some(EpicsValue::String(String::new()))
);
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("PUTF"), Some(EpicsValue::Char(0)));
}
{
let mut inst = rec.write().await;
let result = inst.put_common_field("PUTF", EpicsValue::Char(1));
assert!(result.is_err());
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("RPRO"), Some(EpicsValue::Char(0)));
}
{
let mut inst = rec.write().await;
inst.put_common_field("RPRO", EpicsValue::Char(1)).unwrap();
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("RPRO"), Some(EpicsValue::Char(1)));
}
}
#[tokio::test]
async fn test_array_records_nord_monitor_uses_post_process_timestamp() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::server::records::waveform::{ArrayKind, WaveformRecord};
use epics_base_rs::types::DbFieldType;
use std::time::SystemTime;
for (kind, name) in [
(ArrayKind::Waveform, "WF_KIND"),
(ArrayKind::Aai, "AAI_KIND"),
(ArrayKind::Aao, "AAO_KIND"),
(ArrayKind::SubArray, "SUBA_KIND"),
] {
let db = PvDatabase::new();
db.add_record(name, Box::new(WaveformRecord::with_kind(kind)))
.await
.unwrap();
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.record.put_field("NELM", EpicsValue::Long(10)).unwrap();
inst.record
.put_field("FTVL", EpicsValue::Short(10))
.unwrap();
if matches!(kind, ArrayKind::SubArray) {
inst.record.put_field("INDX", EpicsValue::Long(0)).unwrap();
inst.record.put_field("MALM", EpicsValue::Long(10)).unwrap();
}
}
let start = SystemTime::now();
let mut nord_rx = if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.add_subscriber("NORD", 1, DbFieldType::Long, EventMask::VALUE.bits())
} else {
None
}
.unwrap_or_else(|| panic!("NORD subscription must be accepted for {name}"));
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.record
.set_val(EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0]))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links(name, &mut visited, 0)
.await
.unwrap();
let event = nord_rx
.try_recv()
.unwrap_or_else(|_| panic!("NORD monitor event must be delivered for {name}"));
assert!(
matches!(event.snapshot.value, EpicsValue::Long(3)),
"{name}: NORD payload should reflect post-set_val length (3), got {:?}",
event.snapshot.value
);
let ts = event.snapshot.timestamp;
assert!(
ts != SystemTime::UNIX_EPOCH,
"{name}: NORD event timestamp must not be the epoch sentinel"
);
assert!(
ts >= start,
"{name}: NORD event timestamp ({ts:?}) must be ≥ pre-process baseline ({start:?})"
);
}
}
#[tokio::test]
async fn test_complete_async_record_gates_subscribed_field_on_change() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("ASYNC_GATE", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
if let Some(rec) = db.get_record("ASYNC_GATE").await {
let mut inst = rec.write().await;
inst.put_common_field("DESC", EpicsValue::String("alpha".into()))
.unwrap();
}
let mut desc_rx = if let Some(rec) = db.get_record("ASYNC_GATE").await {
let mut inst = rec.write().await;
inst.add_subscriber("DESC", 7, DbFieldType::String, EventMask::VALUE.bits())
} else {
None
}
.expect("DESC subscription must be accepted");
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_GATE", &mut visited, 0)
.await
.unwrap();
db.complete_async_record("ASYNC_GATE").await.unwrap();
assert!(
desc_rx.try_recv().is_err(),
"DESC unchanged across async-completion → must NOT post a duplicate event"
);
if let Some(rec) = db.get_record("ASYNC_GATE").await {
let mut inst = rec.write().await;
inst.put_common_field("DESC", EpicsValue::String("beta".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_GATE", &mut visited, 0)
.await
.unwrap();
db.complete_async_record("ASYNC_GATE").await.unwrap();
let event = desc_rx
.try_recv()
.expect("DESC change must produce a post-completion event");
assert!(
matches!(event.snapshot.value, EpicsValue::String(ref s) if s == "beta"),
"DESC event payload should reflect post-change value, got {:?}",
event.snapshot.value
);
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_GATE", &mut visited, 0)
.await
.unwrap();
db.complete_async_record("ASYNC_GATE").await.unwrap();
assert!(
desc_rx.try_recv().is_err(),
"DESC stable after the change → no further events"
);
}
#[tokio::test]
async fn test_put_pv_and_post_propagates_nord_side_effect_on_waveform() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::server::records::waveform::{ArrayKind, WaveformRecord};
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record(
"WF_GW",
Box::new(WaveformRecord::with_kind(ArrayKind::Waveform)),
)
.await
.unwrap();
if let Some(rec) = db.get_record("WF_GW").await {
let mut inst = rec.write().await;
inst.record.put_field("NELM", EpicsValue::Long(10)).unwrap();
inst.record
.put_field("FTVL", EpicsValue::Short(10))
.unwrap();
}
let (mut nord_rx, mut val_rx) = if let Some(rec) = db.get_record("WF_GW").await {
let mut inst = rec.write().await;
let n = inst.add_subscriber("NORD", 1, DbFieldType::Long, EventMask::VALUE.bits());
let v = inst.add_subscriber("VAL", 2, DbFieldType::Double, EventMask::VALUE.bits());
(n, v)
} else {
(None, None)
};
let nord_rx = nord_rx.as_mut().expect("NORD subscription accepted");
let val_rx = val_rx.as_mut().expect("VAL subscription accepted");
db.put_pv_and_post("WF_GW", EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0, 4.0]))
.await
.unwrap();
let val_event = val_rx
.try_recv()
.expect("VAL event must be delivered after put_pv_and_post");
let nord_event = nord_rx
.try_recv()
.expect("NORD event must be delivered after put_pv_and_post (side-effect of VAL)");
assert!(
matches!(nord_event.snapshot.value, EpicsValue::Long(4)),
"NORD event should reflect post-put length (4), got {:?}",
nord_event.snapshot.value
);
assert_eq!(
val_event.snapshot.timestamp, nord_event.snapshot.timestamp,
"VAL and NORD side-effect events must share the put's timestamp"
);
db.put_pv_and_post("WF_GW", EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0, 4.0]))
.await
.unwrap();
assert!(
nord_rx.try_recv().is_err(),
"NORD unchanged → no duplicate NORD event"
);
}
#[tokio::test]
async fn test_output_link_cascade_uses_post_process_source_timestamp() {
use std::time::SystemTime;
let db = PvDatabase::new();
db.add_record("TS_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TS_DST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TS_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TS_DST PP".into()))
.unwrap();
}
let baseline = SystemTime::now();
if let Some(rec) = db.get_record("TS_SRC").await {
let mut inst = rec.write().await;
inst.record.set_val(EpicsValue::Double(7.5)).unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("TS_SRC", &mut visited, 0)
.await
.unwrap();
let src_time = db
.get_record("TS_SRC")
.await
.expect("TS_SRC exists")
.read()
.await
.common
.time;
assert!(
src_time >= baseline,
"SRC.common.time ({src_time:?}) must be post-baseline ({baseline:?}) — \
apply_timestamp must run before OUT write"
);
let dst_time = db
.get_record("TS_DST")
.await
.expect("TS_DST exists")
.read()
.await
.common
.time;
assert!(
dst_time >= baseline,
"DST.common.time ({dst_time:?}) must be post-baseline ({baseline:?}) — \
OUT cascade must drive Passive DST through process_record_with_links"
);
}
#[tokio::test]
async fn test_complete_async_record_updates_timestamp_at_completion() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::types::DbFieldType;
use std::time::{Duration, SystemTime};
let db = PvDatabase::new();
db.add_record("ASYNC_TS", Box::new(AsyncRecord { val: 1.0 }))
.await
.unwrap();
let mut val_rx = if let Some(rec) = db.get_record("ASYNC_TS").await {
let mut inst = rec.write().await;
inst.add_subscriber("VAL", 9, DbFieldType::Double, EventMask::VALUE.bits())
} else {
None
}
.expect("VAL subscription accepted");
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_TS", &mut visited, 0)
.await
.unwrap();
assert!(
val_rx.try_recv().is_err(),
"AsyncPending early-return must not deliver VAL event yet"
);
tokio::time::sleep(Duration::from_millis(20)).await;
let post_sleep = SystemTime::now();
db.complete_async_record("ASYNC_TS").await.unwrap();
let event = val_rx
.try_recv()
.expect("VAL event must be delivered at async completion");
assert!(
event.snapshot.timestamp >= post_sleep,
"completion event timestamp ({:?}) must be ≥ post-sleep ({post_sleep:?}) — \
apply_timestamp must run at async completion, not at process start",
event.snapshot.timestamp
);
}
#[tokio::test]
async fn test_longout_oopt_on_change_first_cycle_emits_then_suppresses() {
use epics_base_rs::server::records::longout::LongoutRecord;
use std::time::SystemTime;
let db = PvDatabase::new();
db.add_record("LO_SRC", Box::new(LongoutRecord::new(0)))
.await
.unwrap();
db.add_record("LO_DST", Box::new(LongoutRecord::new(0)))
.await
.unwrap();
if let Some(rec) = db.get_record("LO_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("LO_DST PP".into()))
.unwrap();
inst.record.put_field("OOPT", EpicsValue::Short(1)).unwrap();
}
let baseline = SystemTime::now();
let mut visited = HashSet::new();
db.process_record_with_links("LO_SRC", &mut visited, 0)
.await
.unwrap();
let dst_time_after_first = db
.get_record("LO_DST")
.await
.expect("LO_DST exists")
.read()
.await
.common
.time;
assert!(
dst_time_after_first >= baseline,
"first-cycle OOPT=On_Change must drive OUT cascade (DST.time {dst_time_after_first:?} \
must be ≥ baseline {baseline:?}); pre-fix the cascade was suppressed"
);
let src_first_done = db
.get_record("LO_SRC")
.await
.expect("LO_SRC exists")
.read()
.await
.record
.get_field("VAL")
.is_some();
assert!(src_first_done, "SRC must have processed at least once");
let dst_time_before_second = dst_time_after_first;
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
let mut visited = HashSet::new();
db.process_record_with_links("LO_SRC", &mut visited, 0)
.await
.unwrap();
let dst_time_after_second = db
.get_record("LO_DST")
.await
.expect("LO_DST exists")
.read()
.await
.common
.time;
assert_eq!(
dst_time_after_second, dst_time_before_second,
"second-cycle OOPT=On_Change with val==pval must NOT re-trigger OUT cascade — \
DST.time should not advance from {dst_time_before_second:?} to {dst_time_after_second:?}"
);
}
#[tokio::test]
async fn test_self_link_out_does_not_loop() {
use epics_base_rs::server::records::longout::LongoutRecord;
use std::time::Duration;
let db = PvDatabase::new();
db.add_record("SELF_LO", Box::new(LongoutRecord::new(0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SELF_LO").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("SELF_LO PP".into()))
.unwrap();
}
let mut visited = HashSet::new();
let result = tokio::time::timeout(
Duration::from_secs(1),
db.process_record_with_links("SELF_LO", &mut visited, 0),
)
.await;
assert!(
result.is_ok(),
"self-link processing must complete within 1s — \
hang implies the visited HashSet guard regressed"
);
result.unwrap().expect("process call must succeed");
assert!(visited.contains("SELF_LO"));
let mut visited2 = HashSet::new();
let result2 = tokio::time::timeout(
Duration::from_secs(1),
db.process_record_with_links("SELF_LO", &mut visited2, 0),
)
.await;
assert!(
result2.is_ok(),
"subsequent self-link processing must also complete within 1s"
);
result2.unwrap().expect("second process call must succeed");
let rpro_after = db
.get_record("SELF_LO")
.await
.expect("SELF_LO exists")
.read()
.await
.common
.rpro;
assert!(
!rpro_after,
"RPRO must be cleared after self-link processing — \
stuck-true would queue an infinite reprocess loop"
);
}
#[tokio::test]
async fn test_compress_res_write_posts_val_monitor() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::server::records::compress::CompressRecord;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("CMP_RES", Box::new(CompressRecord::new(8, 4)))
.await
.unwrap();
if let Some(rec) = db.get_record("CMP_RES").await {
let mut inst = rec.write().await;
let arr = EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0]);
let _ = inst.record.put_field("VAL", arr);
}
let mut val_rx = if let Some(rec) = db.get_record("CMP_RES").await {
let mut inst = rec.write().await;
inst.add_subscriber("VAL", 1, DbFieldType::Double, EventMask::VALUE.bits())
} else {
None
}
.expect("VAL subscription accepted");
let _ = db
.put_record_field_from_ca("CMP_RES", "RES", EpicsValue::Short(1))
.await;
let event = val_rx
.try_recv()
.expect("RES write must trigger a VAL monitor event");
if let EpicsValue::DoubleArray(v) = &event.snapshot.value {
assert!(
v.iter().all(|&x| x == 0.0),
"post-reset VAL must be all zeros; got {v:?}"
);
} else {
panic!("VAL must be DoubleArray, got {:?}", event.snapshot.value);
}
let res = db
.get_record("CMP_RES")
.await
.expect("CMP_RES exists")
.read()
.await
.record
.get_field("RES")
.and_then(|v| match v {
EpicsValue::Short(s) => Some(s),
_ => None,
})
.expect("RES readable");
assert_eq!(res, 0, "RES must auto-clear after the reset");
}
#[tokio::test]
async fn test_mbbo_direct_initialises_val_from_bits_when_undef() {
use epics_base_rs::server::record::Record;
use epics_base_rs::server::records::mbbo_direct::MbboDirectRecord;
let mut rec = MbboDirectRecord::default();
rec.put_field("B3", EpicsValue::Char(1)).unwrap();
let mut udf = true;
rec.post_init_finalize_undef(&mut udf).unwrap();
assert!(
!udf,
"UDF must be cleared once bits supplied an initial value"
);
assert!(matches!(rec.get_field("VAL"), Some(EpicsValue::Long(8))));
let mut rec2 = MbboDirectRecord::default();
rec2.put_field("VAL", EpicsValue::Long(0b0101)).unwrap();
let mut udf2 = false;
rec2.post_init_finalize_undef(&mut udf2).unwrap();
assert!(!udf2, "UDF stays cleared");
assert!(matches!(rec2.get_field("VAL"), Some(EpicsValue::Long(5))));
assert!(matches!(rec2.get_field("B0"), Some(EpicsValue::Char(1))));
assert!(matches!(rec2.get_field("B2"), Some(EpicsValue::Char(1))));
assert!(matches!(rec2.get_field("B1"), Some(EpicsValue::Char(0))));
let mut rec3 = MbboDirectRecord::default();
let mut udf3 = true;
rec3.post_init_finalize_undef(&mut udf3).unwrap();
assert!(udf3, "UDF stays true when nothing initialised");
assert!(matches!(rec3.get_field("VAL"), Some(EpicsValue::Long(0))));
}
#[tokio::test]
async fn test_lnk_calc_parses_evaluates_and_passes_timestamp() {
use epics_base_rs::server::record::{CalcLink, ParsedLink, parse_link_v2};
use epics_base_rs::server::records::ai::AiRecord;
let parsed = parse_link_v2(r#"{calc:{"expr":"A+B*2","args":["pv_a","pv_b"],"time":"A"}}"#);
let calc = match parsed {
ParsedLink::Calc(c) => c,
other => panic!("expected ParsedLink::Calc, got {other:?}"),
};
assert_eq!(calc.expr, "A+B*2");
assert_eq!(calc.args, vec!["pv_a".to_string(), "pv_b".to_string()]);
assert_eq!(calc.time_source, Some('A'));
let no_time = parse_link_v2(r#"{calc:{"expr":"A","args":["pv_a"]}}"#);
assert!(matches!(
no_time,
ParsedLink::Calc(CalcLink {
time_source: None,
..
})
));
let too_many = parse_link_v2(
r#"{calc:{"expr":"A","args":["a","b","c","d","e","f","g","h","i","j","k","l","m"]}}"#,
);
assert!(
!matches!(too_many, ParsedLink::Calc(_)),
"13+ args must NOT parse as Calc"
);
let db = PvDatabase::new();
db.add_record("pv_a", Box::new(AiRecord::new(3.0)))
.await
.unwrap();
db.add_record("pv_b", Box::new(AiRecord::new(5.0)))
.await
.unwrap();
let calc = CalcLink {
expr: "A+B*2".into(),
args: vec!["pv_a".into(), "pv_b".into()],
time_source: Some('A'),
};
let parsed = ParsedLink::Calc(calc.clone());
let mut visited = HashSet::new();
let value = db
.read_link_value_soft(&parsed, true, &mut visited, 0)
.await
.expect("calc link evaluates");
match value {
EpicsValue::Double(v) => assert!((v - 13.0).abs() < 1e-9, "expected 3+5*2=13, got {v}"),
other => panic!("expected Double, got {other:?}"),
}
let known = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
if let Some(rec) = db.get_record("pv_a").await {
rec.write().await.common.time = known;
}
let (v, t) = db
.evaluate_calc_link_with_time(&calc)
.await
.expect("calc evaluates with time");
match v {
EpicsValue::Double(x) => assert!((x - 13.0).abs() < 1e-9),
other => panic!("expected Double, got {other:?}"),
}
assert_eq!(t, Some(known), "time pulled from pv_a (letter 'A')");
}
#[tokio::test]
async fn test_ca_put_mbbo_val_recomputes_rval() {
use epics_base_rs::server::records::mbbo::MbboRecord;
let db = PvDatabase::new();
let mut rec = MbboRecord::new(0);
rec.shft = 4;
db.add_record("MBBO_CA", Box::new(rec)).await.unwrap();
db.put_record_field_from_ca("MBBO_CA", "VAL", EpicsValue::Enum(3))
.await
.unwrap();
let rec = db.get_record("MBBO_CA").await.unwrap();
let inst = rec.read().await;
assert_eq!(
inst.record.get_field("VAL"),
Some(EpicsValue::Enum(3)),
"VAL holds the CA-written value"
);
assert_eq!(
inst.record.get_field("RVAL"),
Some(EpicsValue::Long(48)),
"RVAL must be recomputed from the new VAL (3 << 4), not left stale at 0"
);
assert_eq!(
inst.record.get_field("ORAW"),
Some(EpicsValue::Long(48)),
"ORAW must roll forward to the freshly converted RVAL"
);
}
#[tokio::test]
async fn test_ca_put_mbbo_direct_val_recomputes_rval() {
use epics_base_rs::server::records::mbbo_direct::MbboDirectRecord;
let db = PvDatabase::new();
let mut rec = MbboDirectRecord::default();
rec.shft = 4;
db.add_record("MBBOD_CA", Box::new(rec)).await.unwrap();
db.put_record_field_from_ca("MBBOD_CA", "VAL", EpicsValue::Long(5))
.await
.unwrap();
let rec = db.get_record("MBBOD_CA").await.unwrap();
let inst = rec.read().await;
assert_eq!(
inst.record.get_field("RVAL"),
Some(EpicsValue::Long(80)),
"RVAL must be recomputed from the new VAL (5 << 4), not left stale at 0"
);
assert_eq!(
inst.record.get_field("ORAW"),
Some(EpicsValue::Long(80)),
"ORAW must roll forward to the freshly converted RVAL"
);
}
#[tokio::test]
async fn test_simulation_mode_still_fires_forward_link() {
let db = PvDatabase::new();
db.add_record("SIM:SRC", Box::new(AoRecord::new(11.0)))
.await
.unwrap();
db.add_record("SIM:FLNK_TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ai = AiRecord::new(0.0);
ai.simm = 1;
ai.siol = "SIM:SRC".into();
db.add_record("SIM:AI", Box::new(ai)).await.unwrap();
if let Some(rec) = db.get_record("SIM:AI").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("SIM:FLNK_TARGET".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("SIM:AI", &mut visited, 0)
.await
.unwrap();
assert!(
visited.contains("SIM:AI"),
"the simulated record itself must be in the visited set"
);
assert!(
visited.contains("SIM:FLNK_TARGET"),
"a SIMM-mode record must still dispatch its FLNK forward link \
(C aiRecord.c:168 recGblFwdLink runs unconditionally): {visited:?}"
);
}
#[tokio::test]
async fn test_simulated_mbbi_reads_siol_not_writes_it() {
use epics_base_rs::server::records::mbbi::MbbiRecord;
let db = PvDatabase::new();
db.add_record("MBBISIM:SRC", Box::new(LonginRecord::new(3)))
.await
.unwrap();
let mut mbbi = MbbiRecord::new(0);
mbbi.simm = 1;
mbbi.siol = "MBBISIM:SRC".into();
db.add_record("MBBISIM:IN", Box::new(mbbi)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("MBBISIM:IN", &mut visited, 0)
.await
.unwrap();
let src = db.get_record("MBBISIM:SRC").await.unwrap();
let src_val = src.read().await.record.get_field("VAL").unwrap();
assert_eq!(
src_val.to_f64().unwrap() as i64,
3,
"simulated mbbi must NOT write VAL out to its SIOL target"
);
let mbbi_rec = db.get_record("MBBISIM:IN").await.unwrap();
let mbbi_val = mbbi_rec.read().await.record.get_field("VAL").unwrap();
assert_eq!(
mbbi_val.to_f64().unwrap() as i64,
3,
"simulated mbbi must read VAL in from SIOL (got {mbbi_val:?})"
);
}
#[tokio::test]
async fn test_async_completion_flnk_cycle_terminates() {
let db = PvDatabase::new();
db.add_record("ACYC:A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("ACYC:B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("ACYC:A").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("ACYC:B".into()))
.unwrap();
}
if let Some(rec) = db.get_record("ACYC:B").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("ACYC:A".into()))
.unwrap();
}
db.complete_async_record("ACYC:A").await.unwrap();
}
#[tokio::test]
async fn test_fanout_resolves_sell_link_into_seln() {
use epics_base_rs::server::records::fanout::FanoutRecord;
let db = PvDatabase::new();
db.add_record("FANSELL:SRC", Box::new(LonginRecord::new(2)))
.await
.unwrap();
db.add_record("FANSELL:T2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("FANSELL:T0", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut fan = FanoutRecord::new();
fan.put_field("SELM", EpicsValue::Short(1)).unwrap(); fan.put_field("SELN", EpicsValue::Short(0)).unwrap(); fan.put_field("SELL", EpicsValue::String("FANSELL:SRC".into()))
.unwrap();
fan.put_field("LNK0", EpicsValue::String("FANSELL:T0 PP".into()))
.unwrap();
fan.put_field("LNK2", EpicsValue::String("FANSELL:T2 PP".into()))
.unwrap();
db.add_record("FANSELL:FAN", Box::new(fan)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("FANSELL:FAN", &mut visited, 0)
.await
.unwrap();
assert!(
visited.contains("FANSELL:T2"),
"SELL must resolve SELN=2 so LNK2 is dispatched: {visited:?}"
);
assert!(
!visited.contains("FANSELL:T0"),
"with SELL-resolved SELN=2, the stale SELN=0 (LNK0) must NOT \
be dispatched: {visited:?}"
);
let fan_rec = db.get_record("FANSELL:FAN").await.unwrap();
let seln = fan_rec.read().await.record.get_field("SELN").unwrap();
assert_eq!(
seln,
EpicsValue::Short(2),
"SELN must be updated from the SELL link"
);
}
#[tokio::test]
async fn test_acks_put_compares_against_acks_and_ackt_lowers() {
{
let rec = AoRecord::new(0.0);
let mut inst = RecordInstance::new("ACKTEST1".into(), rec);
inst.common.acks = AlarmSeverity::Major;
inst.common.sevr = AlarmSeverity::Minor;
inst.put_common_field("ACKS", EpicsValue::Short(2)).unwrap();
assert_eq!(
inst.common.acks,
AlarmSeverity::NoAlarm,
"ACKS write at sev>=stored acks must clear acks \
(C dbAccess.c:1309 compares *psev >= precord->acks)"
);
let rec2 = AoRecord::new(0.0);
let mut inst2 = RecordInstance::new("ACKTEST2".into(), rec2);
inst2.common.acks = AlarmSeverity::Major;
inst2.common.sevr = AlarmSeverity::Minor;
inst2
.put_common_field("ACKS", EpicsValue::Short(1))
.unwrap();
assert_eq!(
inst2.common.acks,
AlarmSeverity::Major,
"ACKS write at sev BELOW stored acks must NOT clear acks; \
comparing against sevr (Minor) instead would wrongly clear"
);
}
{
let rec = AoRecord::new(0.0);
let mut inst = RecordInstance::new("ACKTEST3".into(), rec);
inst.common.ackt = true;
inst.common.acks = AlarmSeverity::Major;
inst.common.sevr = AlarmSeverity::Minor;
inst.put_common_field("ACKT", EpicsValue::Short(0)).unwrap();
assert!(!inst.common.ackt, "ACKT must be cleared");
assert_eq!(
inst.common.acks,
AlarmSeverity::Minor,
"ACKT=false with acks>sevr must lower acks down to sevr \
(C dbAccess.c:1294-1297)"
);
}
}
#[tokio::test]
async fn test_bare_out_link_does_not_process_target() {
let db = PvDatabase::new();
db.add_record("SRC_OUT", Box::new(AoRecord::new(33.0)))
.await
.unwrap();
db.add_record("TGT_OUT", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC_OUT").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TGT_OUT.VAL".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("SRC_OUT", &mut visited, 0)
.await
.unwrap();
let tgt_val = db.get_pv("TGT_OUT").await.unwrap();
assert_eq!(
tgt_val.to_f64().unwrap(),
33.0,
"bare OUT link must still write the value to the target"
);
assert!(
!visited.contains("TGT_OUT"),
"bare OUT link (NPP) must NOT process its target: {visited:?}"
);
}
#[tokio::test]
async fn test_pp_out_link_processes_passive_target() {
let db = PvDatabase::new();
db.add_record("SRC_PP", Box::new(AoRecord::new(44.0)))
.await
.unwrap();
db.add_record("TGT_PP", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC_PP").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TGT_PP.VAL PP".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("SRC_PP", &mut visited, 0)
.await
.unwrap();
assert!(
visited.contains("TGT_PP"),
"explicit PP OUT link must process its Passive target: {visited:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mr_r5_foreign_process_blocks_on_held_epoch() {
let db = PvDatabase::new();
db.add_record("MR_R5_MEMBER", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
let epoch = db.lock_records(["MR_R5_MEMBER"]).await;
let db2 = db.clone();
let processed = Arc::new(AtomicU32::new(0));
let processed2 = processed.clone();
let h = tokio::spawn(async move {
let mut visited = HashSet::new();
let _ = db2
.process_record_with_links("MR_R5_MEMBER", &mut visited, 0)
.await;
processed2.store(1, Ordering::SeqCst);
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(
processed.load(Ordering::SeqCst),
0,
"foreign process_record_with_links must block while a lock_records epoch holds the member gate"
);
drop(epoch);
h.await.unwrap();
assert_eq!(
processed.load(Ordering::SeqCst),
1,
"foreign process must complete once the epoch is released"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mr_r5_already_locked_process_does_not_self_deadlock() {
let db = PvDatabase::new();
db.add_record("MR_R5_OWNED", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
let _epoch = db.lock_records(["MR_R5_OWNED"]).await;
let mut visited = HashSet::new();
let res = tokio::time::timeout(
std::time::Duration::from_secs(5),
db.process_record_with_links_already_locked("MR_R5_OWNED", &mut visited, 0),
)
.await
.expect("process_record_with_links_already_locked must not dead-lock under a held epoch");
res.expect("owner-path processing of an owned member must succeed");
assert!(visited.contains("MR_R5_OWNED"));
}