#![allow(unused_imports, clippy::all)]
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use epics_base_rs::error::CaError;
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::record::*;
use epics_base_rs::server::records::ai::AiRecord;
use epics_base_rs::server::records::ao::AoRecord;
use epics_base_rs::server::records::bi::BiRecord;
use epics_base_rs::server::records::longin::LonginRecord;
use epics_base_rs::types::EpicsValue;
#[tokio::test]
async fn test_write_notify_follows_flnk() {
let db = PvDatabase::new();
db.add_record("REC_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC_A").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("REC_B".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("REC_A", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("REC_A"));
assert!(visited.contains("REC_B"));
}
#[tokio::test]
async fn test_inp_link_processing() {
let db = PvDatabase::new();
db.add_record("SOURCE", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
db.add_record("DEST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DEST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SOURCE".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("DEST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DEST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_soft_inp_read_failure_sets_link_alarm() {
use epics_base_rs::server::recgbl::alarm_status;
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("BROKEN", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("BROKEN").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("NO_SUCH_PV".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("BROKEN", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("BROKEN").await.expect("record exists");
let inst = rec.read().await;
assert_eq!(
inst.common.sevr,
AlarmSeverity::Invalid,
"broken soft-channel INP must drive SEVR=INVALID, got {:?}",
inst.common.sevr
);
assert_eq!(
inst.common.stat,
alarm_status::LINK_ALARM,
"broken soft-channel INP must drive STAT=LINK, got {}",
inst.common.stat
);
}
#[tokio::test]
async fn test_single_inp_ms_propagates_link_alarm_no_msg() {
use epics_base_rs::server::recgbl::alarm_status;
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(7.0)))
.await
.unwrap();
db.add_record("DST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC").await {
let mut inst = rec.write().await;
inst.common.stat = alarm_status::HIHI_ALARM;
inst.common.sevr = AlarmSeverity::Major;
inst.common.amsg = "src-msg".to_string();
}
if let Some(rec) = db.get_record("DST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC NPP MS".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("DST", &mut visited, 0)
.await
.unwrap();
let dst = db.get_record("DST").await.expect("DST exists");
let inst = dst.read().await;
assert_eq!(
inst.common.sevr,
AlarmSeverity::Major,
"MS link must lift DST severity to source's Major"
);
assert_eq!(
inst.common.stat,
alarm_status::LINK_ALARM,
"C parity: MS link MUST surface as LINK_ALARM, not the source's STAT"
);
assert!(
inst.common.amsg.is_empty(),
"C parity: MS link MUST NOT propagate amsg; got {:?}",
inst.common.amsg
);
}
#[tokio::test]
async fn test_single_inp_mss_propagates_stat_and_amsg() {
use epics_base_rs::server::recgbl::alarm_status;
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(7.0)))
.await
.unwrap();
db.add_record("DST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC").await {
let mut inst = rec.write().await;
inst.common.stat = alarm_status::HIHI_ALARM;
inst.common.sevr = AlarmSeverity::Major;
inst.common.amsg = "src-major".to_string();
}
if let Some(rec) = db.get_record("DST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC NPP MSS".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("DST", &mut visited, 0)
.await
.unwrap();
let dst = db.get_record("DST").await.expect("DST exists");
let inst = dst.read().await;
assert_eq!(inst.common.sevr, AlarmSeverity::Major);
assert_eq!(
inst.common.stat,
alarm_status::HIHI_ALARM,
"MSS must carry source's STAT"
);
assert_eq!(
inst.common.amsg, "src-major",
"MSS must carry source's AMSG"
);
}
#[tokio::test]
async fn test_mss_propagates_amsg_only_change_posts_amsg_event() {
use epics_base_rs::server::recgbl::{EventMask, alarm_status};
use epics_base_rs::server::record::AlarmSeverity;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("SRC_AMSG", Box::new(AoRecord::new(7.0)))
.await
.unwrap();
db.add_record("DST_AMSG", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SRC_AMSG").await {
let mut inst = rec.write().await;
inst.common.stat = alarm_status::HIHI_ALARM;
inst.common.sevr = AlarmSeverity::Major;
inst.common.amsg = "msg1".to_string();
}
if let Some(rec) = db.get_record("DST_AMSG").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC_AMSG NPP MSS".into()))
.unwrap();
inst.common.udf = false;
}
let mut visited = HashSet::new();
db.process_record_with_links("DST_AMSG", &mut visited, 0)
.await
.unwrap();
let mut amsg_rx = {
let rec = db.get_record("DST_AMSG").await.unwrap();
let mut inst = rec.write().await;
inst.add_subscriber("AMSG", 11, DbFieldType::String, EventMask::ALARM.bits())
}
.expect("AMSG subscription must be accepted");
if let Some(rec) = db.get_record("SRC_AMSG").await {
let mut inst = rec.write().await;
inst.common.amsg = "msg2".to_string();
}
let mut visited = HashSet::new();
db.process_record_with_links("DST_AMSG", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("DST_AMSG").await.unwrap();
let inst = rec.read().await;
assert_eq!(inst.common.sevr, AlarmSeverity::Major, "sevr unchanged");
assert_eq!(inst.common.amsg, "msg2", "amsg propagated");
}
let event = amsg_rx
.try_recv()
.expect("AMSG-only change must produce an event on DBE_ALARM-class subscribers");
assert!(
matches!(event.snapshot.value, EpicsValue::String(ref s) if s == "msg2"),
"AMSG event payload should be the new message, got {:?}",
event.snapshot.value
);
}
#[tokio::test]
async fn test_putf_clears_after_synchronous_put_completion() {
let db = PvDatabase::new();
db.add_record("PUTF_SYNC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let _ = db
.put_record_field_from_ca("PUTF_SYNC", "VAL", EpicsValue::Double(42.0))
.await;
let rec = db.get_record("PUTF_SYNC").await.unwrap();
let inst = rec.read().await;
assert!(
!inst.common.putf,
"after synchronous put completion, PUTF must clear (mirrors C recGblFwdLink:302)"
);
}
#[tokio::test]
async fn test_putf_survives_async_round_trip_and_clears_on_completion() {
let db = PvDatabase::new();
db.add_record("ASYNC_PUTF", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
let _ = db
.put_record_field_from_ca("ASYNC_PUTF", "VAL", EpicsValue::Double(7.0))
.await;
{
let rec = db.get_record("ASYNC_PUTF").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "async pending → PACT=true");
assert!(
inst.common.putf,
"PUTF must remain TRUE across the async round trip — \
pre-fix the Rust port cleared it before the process call \
so async-completion logic could not classify the trigger \
as put-driven"
);
}
db.complete_async_record("ASYNC_PUTF").await.unwrap();
{
let rec = db.get_record("ASYNC_PUTF").await.unwrap();
let inst = rec.read().await;
assert!(!inst.is_processing(), "completion clears PACT");
assert!(
!inst.common.putf,
"complete_async_record_inner must clear PUTF (recGblFwdLink parity)"
);
}
}
#[tokio::test]
async fn test_put_record_field_from_ca_clears_udf_on_primary_field_write() {
let db = PvDatabase::new();
db.add_record("UDF_ASYNC", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
{
let rec = db.get_record("UDF_ASYNC").await.unwrap();
assert!(
rec.read().await.common.udf,
"AsyncRecord starts undefined (udf=true)"
);
}
let _ = db
.put_record_field_from_ca("UDF_ASYNC", "VAL", EpicsValue::Double(7.0))
.await;
let rec = db.get_record("UDF_ASYNC").await.unwrap();
let inst = rec.read().await;
assert!(
inst.is_processing(),
"AsyncRecord should be mid-async (PACT=true)"
);
assert!(
!inst.common.udf,
"primary-field CA put must clear UDF synchronously \
(dbAccess.c::dbPut:1411 parity) — observable before \
complete_async_record runs"
);
}
#[tokio::test]
async fn test_putf_stays_off_for_cp_chained_targets() {
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TGT", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TGT").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC CP".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("SRC", &mut visited, 0)
.await
.unwrap();
let tgt = db.get_record("TGT").await.expect("TGT exists");
let inst = tgt.read().await;
assert!(
!inst.common.putf,
"CP-driven TGT must not carry PUTF=1 — that bit belongs only to the directly-put record"
);
}
#[tokio::test]
async fn test_putf_propagates_through_db_out_link_to_passive_target() {
let db = PvDatabase::new();
db.add_record("PUTF_OUT_TGT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("PUTF_OUT_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("PUTF_OUT_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("PUTF_OUT_TGT PP".into()))
.unwrap();
}
let _ = db
.put_record_field_from_ca("PUTF_OUT_SRC", "VAL", EpicsValue::Double(5.0))
.await;
let tgt = db.get_record("PUTF_OUT_TGT").await.unwrap();
let inst = tgt.read().await;
assert!(
!inst.common.putf,
"after both records' synchronous cycles complete, both clear putf"
);
assert!(
!inst.common.rpro,
"target was not pact, so rpro must stay false (normal propagation)"
);
let val = inst.record.val().and_then(|v| v.to_f64()).unwrap_or(0.0);
assert!(
(val - 5.0).abs() < 1e-10,
"OUT link write propagated value (val={val})"
);
}
#[tokio::test]
async fn test_putf_propagates_mid_cycle_via_async_target_out_link() {
let db = PvDatabase::new();
db.add_record("PROP_TGT", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
db.add_record("PROP_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("PROP_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("PROP_TGT PP".into()))
.unwrap();
}
let _ = db
.put_record_field_from_ca("PROP_SRC", "VAL", EpicsValue::Double(11.0))
.await;
let tgt = db.get_record("PROP_TGT").await.unwrap();
let inst = tgt.read().await;
assert!(
inst.is_processing(),
"AsyncPending target stays pact between process and complete"
);
assert!(
inst.common.putf,
"target.putf must inherit from src.putf BEFORE complete_async_record clears it \
(C dbDbLink.c::processTarget:474). Pre-fix this stayed false."
);
}
#[tokio::test]
async fn test_simm_raw_input_runs_conversion_chain() {
let db = PvDatabase::new();
db.add_record("RAW:SRC", Box::new(AoRecord::new(5.0)))
.await
.unwrap();
let mut ai = epics_base_rs::server::records::ai::AiRecord::new(0.0);
ai.linr = 1;
ai.eslo = 2.0;
ai.eoff = 10.0;
db.add_record("AI:SIMRAW", Box::new(ai)).await.unwrap();
if let Some(rec) = db.get_record("AI:SIMRAW").await {
let mut inst = rec.write().await;
inst.record.put_field("SIMM", EpicsValue::Short(2)).unwrap();
inst.record
.put_field("SIOL", EpicsValue::String("RAW:SRC".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("AI:SIMRAW", &mut visited, 0)
.await
.unwrap();
let ai_rec = db.get_record("AI:SIMRAW").await.expect("AI:SIMRAW exists");
let inst = ai_rec.read().await;
let val = inst
.record
.get_field("VAL")
.and_then(|v| v.to_f64())
.expect("VAL must be readable as f64");
assert!(
(val - 20.0).abs() < 1e-10,
"SIMM=RAW must run convert(): expected VAL=5*ESLO+EOFF=20.0, got {val}"
);
let rval = inst
.record
.get_field("RVAL")
.and_then(|v| match v {
EpicsValue::Long(n) => Some(n as f64),
other => other.to_f64(),
})
.expect("RVAL must be readable");
assert!(
(rval - 5.0).abs() < 1e-10,
"RVAL must hold the raw count from SIOL; got {rval}"
);
}
#[tokio::test]
async fn test_cycle_detection() {
let db = PvDatabase::new();
db.add_record("CYCLE_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("CYCLE_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("CYCLE_A").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("CYCLE_B".into()))
.unwrap();
}
if let Some(rec) = db.get_record("CYCLE_B").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("CYCLE_A".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("CYCLE_A", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("CYCLE_A"));
assert!(visited.contains("CYCLE_B"));
assert_eq!(visited.len(), 2);
}
#[tokio::test]
async fn test_ao_drvh_drvl_clamp() {
let mut rec = AoRecord::new(0.0);
rec.drvh = 100.0;
rec.drvl = -50.0;
rec.val = 200.0;
rec.process().unwrap();
assert!((rec.val - 100.0).abs() < 1e-10);
rec.val = -100.0;
rec.process().unwrap();
assert!((rec.val - (-50.0)).abs() < 1e-10);
}
#[tokio::test]
async fn test_ao_oroc_rate_limit() {
let mut rec = AoRecord::new(0.0);
rec.oroc = 5.0;
rec.drvh = 0.0;
rec.drvl = 0.0;
rec.val = 100.0;
rec.process().unwrap();
assert!((rec.oval - 5.0).abs() < 1e-10, "First: oval={}", rec.oval);
rec.val = 200.0;
rec.process().unwrap();
assert!((rec.oval - 10.0).abs() < 1e-10, "Second: oval={}", rec.oval);
}
#[tokio::test]
async fn test_ao_omsl_dol() {
let db = PvDatabase::new();
db.add_record("SOURCE", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
let mut ao = AoRecord::new(0.0);
ao.omsl = 1;
ao.dol = "SOURCE".to_string();
db.add_record("OUTPUT", Box::new(ao)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("OUTPUT", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("OUTPUT").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_ao_oif_incremental() {
let db = PvDatabase::new();
db.add_record("DELTA", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let mut ao = AoRecord::new(100.0);
ao.omsl = 1;
ao.oif = 1;
ao.dol = "DELTA".to_string();
db.add_record("OUTPUT", Box::new(ao)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("OUTPUT", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("OUTPUT").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 110.0).abs() < 1e-10),
other => panic!("expected Double(110.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_ao_ivoa_dont_drive() {
let db = PvDatabase::new();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(999.0);
ao.ivoa = 1;
db.add_record("OUTPUT", Box::new(ao)).await.unwrap();
if let Some(rec) = db.get_record("OUTPUT").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TARGET".into()))
.unwrap();
inst.put_common_field("HIHI", EpicsValue::Double(100.0))
.unwrap();
inst.put_common_field("HHSV", EpicsValue::Short(AlarmSeverity::Invalid as i16))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("OUTPUT", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("TARGET").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 0.0).abs() < 1e-10),
other => panic!("expected Double(0.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_db_load_records_asl_field() {
use epics_base_rs::server::db_loader;
use epics_base_rs::server::records::ai::AiRecord;
let defs = db_loader::parse_db(
r#"
record(ai, "ASLT:HIGH") {
field(ASL, "1")
}
record(ai, "ASLT:LOW") {
}
"#,
&std::collections::HashMap::new(),
)
.unwrap();
let db = PvDatabase::new();
for def in defs {
let mut record: Box<dyn epics_base_rs::server::record::Record> =
Box::new(AiRecord::new(0.0));
let mut common_fields = Vec::new();
db_loader::apply_fields(&mut record, &def.fields, &mut common_fields).unwrap();
db.add_record(&def.name, record).await.unwrap();
if let Some(rec) = db.get_record(&def.name).await {
let mut inst = rec.write().await;
for (n, v) in common_fields {
let _ = inst.put_common_field(&n, v);
}
}
}
let high = db.get_record("ASLT:HIGH").await.unwrap();
let low = db.get_record("ASLT:LOW").await.unwrap();
assert_eq!(
high.read().await.common.asl,
1,
"field(ASL, \"1\") must set ASL=1"
);
assert_eq!(low.read().await.common.asl, 0, "absent ASL defaults to 0");
}
#[tokio::test]
async fn test_ao_ivoa_set_to_ivov_writes_oval() {
use epics_base_rs::server::records::ao::AoRecord;
let db = PvDatabase::new();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(7.0);
ao.ivoa = 2;
ao.ivov = 42.0;
db.add_record("SRC", Box::new(ao)).await.unwrap();
if let Some(rec) = db.get_record("SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TARGET".into()))
.unwrap();
inst.put_common_field("HIHI", EpicsValue::Double(1.0))
.unwrap();
inst.put_common_field("HHSV", EpicsValue::Short(AlarmSeverity::Invalid as i16))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("SRC", &mut visited, 0)
.await
.unwrap();
let v = db.get_pv("TARGET").await.unwrap();
assert!(
matches!(v, EpicsValue::Double(d) if (d - 42.0).abs() < 1e-9),
"TARGET must receive IVOV via OVAL: got {v:?}"
);
let oval = db.get_pv("SRC.OVAL").await.unwrap();
assert!(
matches!(oval, EpicsValue::Double(d) if (d - 42.0).abs() < 1e-9),
"SRC.OVAL must equal IVOV: got {oval:?}"
);
}
#[tokio::test]
async fn test_bo_ivoa_set_to_ivov_writes_rval() {
use epics_base_rs::server::records::bo::BoRecord;
let db = PvDatabase::new();
let mut bo = BoRecord::new(0);
bo.ivoa = 2;
bo.ivov = 1;
db.add_record("BO_SRC", Box::new(bo)).await.unwrap();
if let Some(rec) = db.get_record("BO_SRC").await {
let mut inst = rec.write().await;
inst.common.nsev = AlarmSeverity::Invalid;
inst.common.nsta = epics_base_rs::server::recgbl::alarm_status::SOFT_ALARM;
}
let mut visited = HashSet::new();
db.process_record_with_links("BO_SRC", &mut visited, 0)
.await
.unwrap();
let rval = db.get_pv("BO_SRC.RVAL").await.unwrap();
assert!(
matches!(rval, EpicsValue::Long(1)),
"BO_SRC.RVAL must equal IVOV(1): got {rval:?}"
);
}
#[tokio::test]
async fn test_calcout_ivoa_set_to_ivov_writes_oval_only() {
use epics_base_rs::server::records::calcout::CalcoutRecord;
let db = PvDatabase::new();
db.add_record(
"OUT_TGT",
Box::new(epics_base_rs::server::records::ao::AoRecord::new(0.0)),
)
.await
.unwrap();
let mut co = CalcoutRecord::default();
co.ivoa = 2;
co.ivov = 17.5;
co.val = 99.9;
co.oval = 99.9;
co.calc = "A".to_string();
db.add_record("CO_SRC", Box::new(co)).await.unwrap();
if let Some(rec) = db.get_record("CO_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("OUT_TGT".into()))
.unwrap();
inst.put_common_field("HIHI", EpicsValue::Double(1.0))
.unwrap();
inst.put_common_field("HHSV", EpicsValue::Short(AlarmSeverity::Invalid as i16))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("CO_SRC", &mut visited, 0)
.await
.unwrap();
let v = db.get_pv("OUT_TGT").await.unwrap();
assert!(
matches!(v, EpicsValue::Double(d) if (d - 17.5).abs() < 1e-9),
"OUT_TGT must receive IVOV via OVAL: got {v:?}"
);
}
#[tokio::test]
async fn test_sim_mode_input() {
let db = PvDatabase::new();
db.add_record("SIM_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("SIM_VAL", Box::new(AoRecord::new(99.0)))
.await
.unwrap();
let mut ai = AiRecord::new(0.0);
ai.siml = "SIM_SW".to_string();
ai.siol = "SIM_VAL".to_string();
ai.sims = 1;
db.add_record("SIM_AI", Box::new(ai)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SIM_AI", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("SIM_AI").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 99.0).abs() < 1e-10),
other => panic!("expected Double(99.0), got {:?}", other),
}
let sevr = db.get_pv("SIM_AI.SEVR").await.unwrap();
assert!(matches!(sevr, EpicsValue::Short(1)));
}
#[tokio::test]
async fn test_sim_mode_toggle() {
let db = PvDatabase::new();
db.add_record("SIM_SW", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("SIM_VAL", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
db.add_record("REAL_SRC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let mut ai = AiRecord::new(0.0);
ai.siml = "SIM_SW".to_string();
ai.siol = "SIM_VAL".to_string();
db.add_record("TEST_AI", Box::new(ai)).await.unwrap();
if let Some(rec) = db.get_record("TEST_AI").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("REAL_SRC".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("TEST_AI", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("TEST_AI").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 10.0).abs() < 1e-10),
other => panic!("expected Double(10.0), got {:?}", other),
}
db.put_pv("SIM_SW", EpicsValue::Double(1.0)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("TEST_AI", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("TEST_AI").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_sim_mode_output() {
let db = PvDatabase::new();
db.add_record("SIM_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("SIM_OUT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(77.0);
ao.siml = "SIM_SW".to_string();
ao.siol = "SIM_OUT".to_string();
db.add_record("TEST_AO", Box::new(ao)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("TEST_AO", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("SIM_OUT").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 77.0).abs() < 1e-10),
other => panic!("expected Double(77.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_sdis_disable_skips_process() {
let db = PvDatabase::new();
db.add_record("DISABLE_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TARGET").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DISABLE_SW".into()))
.unwrap();
inst.put_common_field("DISS", EpicsValue::Short(1)).unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("TARGET", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("TARGET").await.unwrap();
let inst = rec.read().await;
assert_eq!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::DISABLE_ALARM
);
assert_eq!(inst.common.sevr, AlarmSeverity::Minor);
drop(inst);
db.put_pv("DISABLE_SW", EpicsValue::Double(0.0))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("TARGET", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("TARGET").await.unwrap();
let inst = rec.read().await;
assert_ne!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::DISABLE_ALARM
);
}
#[tokio::test]
async fn test_phas_scan_order() {
let db = PvDatabase::new();
db.add_record("REC_C", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
for (name, phas) in &[("REC_C", 2i16), ("REC_A", 0), ("REC_B", 1)] {
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.put_common_field("PHAS", EpicsValue::Short(*phas))
.unwrap();
let result = inst
.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
if let CommonFieldPutResult::ScanChanged {
old_scan,
new_scan,
phas: p,
} = result
{
drop(inst);
db.update_scan_index(name, old_scan, new_scan, p, p).await;
}
}
}
let names = db.records_for_scan(ScanType::Sec1).await;
assert_eq!(names, vec!["REC_A", "REC_B", "REC_C"]);
}
#[tokio::test]
async fn test_depth_limit() {
let db = PvDatabase::new();
for i in 0..20 {
db.add_record(&format!("CHAIN_{i}"), Box::new(AoRecord::new(0.0)))
.await
.unwrap();
}
for i in 0..19 {
if let Some(rec) = db.get_record(&format!("CHAIN_{i}")).await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String(format!("CHAIN_{}", i + 1)))
.unwrap();
}
}
let mut visited = HashSet::new();
db.process_record_with_links("CHAIN_0", &mut visited, 0)
.await
.unwrap();
assert!(visited.len() <= 17);
assert!(visited.contains("CHAIN_0"));
}
#[tokio::test]
async fn test_disp_blocks_ca_put() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db
.put_record_field_from_ca("REC", "VAL", EpicsValue::Double(42.0))
.await;
assert!(matches!(result, Err(CaError::PutDisabled(_))));
}
#[tokio::test]
async fn test_disp_allows_disp_write() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db
.put_record_field_from_ca("REC", "DISP", EpicsValue::Char(0))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.disp);
}
#[tokio::test]
async fn test_disp_bypassed_by_internal_put() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db.put_pv("REC", EpicsValue::Double(42.0)).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_proc_triggers_processing() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.put_pv("REC", EpicsValue::Double(42.0)).await.unwrap();
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
#[tokio::test]
async fn test_proc_works_any_scan() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
}
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
#[tokio::test]
async fn test_proc_bypasses_disp() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("DISP", EpicsValue::Char(1)).unwrap();
}
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_proc_while_pact() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let result = db
.put_record_field_from_ca("REC", "PROC", EpicsValue::Char(1))
.await;
assert!(result.is_ok());
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
#[tokio::test]
async fn test_lcnt_ca_write_rejected() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let result = db
.put_record_field_from_ca("REC", "LCNT", EpicsValue::Short(0))
.await;
assert!(matches!(result, Err(CaError::ReadOnlyField(_))));
}
#[tokio::test]
async fn test_ca_put_scan_index_update() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.put_record_field_from_ca("REC", "SCAN", EpicsValue::String("1 second".into()))
.await
.unwrap();
let names = db.records_for_scan(ScanType::Sec1).await;
assert!(names.contains(&"REC".to_string()));
}
struct MockDeviceSupport {
read_count: Arc<AtomicU32>,
write_count: Arc<AtomicU32>,
dtyp_name: String,
}
impl MockDeviceSupport {
fn new(dtyp: &str, read_count: Arc<AtomicU32>, write_count: Arc<AtomicU32>) -> Self {
Self {
read_count,
write_count,
dtyp_name: dtyp.to_string(),
}
}
}
impl epics_base_rs::server::device_support::DeviceSupport for MockDeviceSupport {
fn read(
&mut self,
_record: &mut dyn Record,
) -> epics_base_rs::error::CaResult<epics_base_rs::server::device_support::DeviceReadOutcome>
{
self.read_count.fetch_add(1, Ordering::SeqCst);
Ok(epics_base_rs::server::device_support::DeviceReadOutcome::ok())
}
fn write(&mut self, _record: &mut dyn Record) -> epics_base_rs::error::CaResult<()> {
self.write_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn dtyp(&self) -> &str {
&self.dtyp_name
}
}
#[tokio::test]
async fn test_ca_put_no_double_device_write() {
let db = PvDatabase::new();
db.add_record("AO_REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AO_REC").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.device = Some(Box::new(mock));
}
db.put_record_field_from_ca("AO_REC", "VAL", EpicsValue::Double(42.0))
.await
.unwrap();
assert_eq!(write_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_bi_raw_soft_channel_inp_applies_mask() {
let db = PvDatabase::new();
db.add_record("SRC_LI", Box::new(LonginRecord::new(0)))
.await
.unwrap();
db.add_record("BI_RAW", Box::new(BiRecord::new(0)))
.await
.unwrap();
db.put_record_field_from_ca("SRC_LI", "VAL", EpicsValue::Long(0xFF))
.await
.unwrap();
if let Some(rec) = db.get_record("BI_RAW").await {
let mut inst = rec.write().await;
inst.common.dtyp = "Raw Soft Channel".to_string();
inst.common.inp = "SRC_LI".to_string();
inst.parsed_inp = epics_base_rs::server::record::parse_link_v2(&inst.common.inp);
inst.record
.put_field("MASK", EpicsValue::Long(0x0F))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("BI_RAW", &mut visited, 0)
.await
.unwrap();
if let Some(rec) = db.get_record("BI_RAW").await {
let inst = rec.read().await;
let rval = inst.record.get_field("RVAL");
assert_eq!(
rval,
Some(EpicsValue::Long(0x0F)),
"MASK must clamp RVAL to low nibble"
);
let val = inst.record.get_field("VAL");
assert_eq!(
val,
Some(EpicsValue::Enum(1)),
"masked-non-zero RVAL → VAL=1"
);
}
}
#[tokio::test]
async fn test_input_record_no_device_write() {
let db = PvDatabase::new();
db.add_record("AI_REC", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AI_REC").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.device = Some(Box::new(mock));
}
let mut visited = HashSet::new();
db.process_record_with_links("AI_REC", &mut visited, 0)
.await
.unwrap();
assert_eq!(read_count.load(Ordering::SeqCst), 1);
assert_eq!(write_count.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn test_non_passive_output_ca_put_triggers_write() {
let db = PvDatabase::new();
db.add_record("AO_NP", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AO_NP").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.common.scan = ScanType::Sec1;
inst.device = Some(Box::new(mock));
}
db.put_record_field_from_ca("AO_NP", "VAL", EpicsValue::Double(42.0))
.await
.unwrap();
assert_eq!(write_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_proc_triggers_device_write() {
let db = PvDatabase::new();
db.add_record("AO_PROC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let read_count = Arc::new(AtomicU32::new(0));
let write_count = Arc::new(AtomicU32::new(0));
let mock = MockDeviceSupport::new("MockDev", read_count.clone(), write_count.clone());
if let Some(rec) = db.get_record("AO_PROC").await {
let mut inst = rec.write().await;
inst.common.dtyp = "MockDev".to_string();
inst.device = Some(Box::new(mock));
}
db.put_record_field_from_ca("AO_PROC", "PROC", EpicsValue::Char(1))
.await
.unwrap();
assert_eq!(write_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_phas_change_updates_scan_index() {
let db = PvDatabase::new();
db.add_record("REC_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("REC_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
for (name, phas) in &[("REC_A", 10i16), ("REC_B", 5)] {
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.put_common_field("PHAS", EpicsValue::Short(*phas))
.unwrap();
let result = inst
.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
if let CommonFieldPutResult::ScanChanged {
old_scan,
new_scan,
phas: p,
} = result
{
drop(inst);
db.update_scan_index(name, old_scan, new_scan, p, p).await;
}
}
}
let names = db.records_for_scan(ScanType::Sec1).await;
assert_eq!(names, vec!["REC_B", "REC_A"]);
if let Some(rec) = db.get_record("REC_A").await {
let mut inst = rec.write().await;
let result = inst.put_common_field("PHAS", EpicsValue::Short(0)).unwrap();
if let CommonFieldPutResult::PhasChanged {
scan,
old_phas,
new_phas,
} = result
{
drop(inst);
db.update_scan_index("REC_A", scan, scan, old_phas, new_phas)
.await;
}
}
let names = db.records_for_scan(ScanType::Sec1).await;
assert_eq!(names, vec!["REC_A", "REC_B"]);
}
#[tokio::test]
async fn test_scan_change_preserves_phas() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.put_common_field("PHAS", EpicsValue::Short(3)).unwrap();
let result = inst
.put_common_field("SCAN", EpicsValue::String("1 second".into()))
.unwrap();
match result {
CommonFieldPutResult::ScanChanged { phas, .. } => assert_eq!(phas, 3),
other => panic!("expected ScanChanged, got {:?}", other),
}
}
}
#[tokio::test]
async fn test_phas_change_passive_no_index() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
let result = inst.put_common_field("PHAS", EpicsValue::Short(5)).unwrap();
assert_eq!(result, CommonFieldPutResult::NoChange);
}
}
struct AsyncRecord {
val: f64,
}
impl Record for AsyncRecord {
fn record_type(&self) -> &'static str {
"async_test"
}
fn process(&mut self) -> epics_base_rs::error::CaResult<ProcessOutcome> {
Ok(ProcessOutcome::async_pending())
}
fn get_field(&self, name: &str) -> Option<EpicsValue> {
match name {
"VAL" => Some(EpicsValue::Double(self.val)),
_ => None,
}
}
fn put_field(&mut self, name: &str, value: EpicsValue) -> epics_base_rs::error::CaResult<()> {
match name {
"VAL" => {
if let EpicsValue::Double(v) = value {
self.val = v;
Ok(())
} else {
Err(CaError::InvalidValue("bad".into()))
}
}
_ => Err(CaError::FieldNotFound(name.into())),
}
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
}
#[tokio::test]
async fn test_async_pending_skips_post_process() {
let db = PvDatabase::new();
db.add_record("ASYNC", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
db.add_record("FLNK_TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("ASYNC").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("FLNK_TARGET".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("ASYNC"));
assert!(!visited.contains("FLNK_TARGET"));
let rec = db.get_record("ASYNC").await.unwrap();
let inst = rec.read().await;
assert!(inst.common.udf);
}
#[tokio::test]
async fn test_complete_async_record() {
let db = PvDatabase::new();
db.add_record("ASYNC", Box::new(AsyncRecord { val: 42.0 }))
.await
.unwrap();
db.add_record("FLNK_TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("ASYNC").await {
let mut inst = rec.write().await;
inst.put_common_field("FLNK", EpicsValue::String("FLNK_TARGET".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC", &mut visited, 0)
.await
.unwrap();
assert!(!visited.contains("FLNK_TARGET"));
db.complete_async_record("ASYNC").await.unwrap();
let rec = db.get_record("ASYNC").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.udf);
}
#[tokio::test]
async fn test_pact_entry_guard_silent_bail_until_max_lock() {
use epics_base_rs::server::record::AlarmSeverity;
let db = PvDatabase::new();
db.add_record("ASYNC_PACT", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_PACT", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("ASYNC_PACT").await.unwrap();
let inst = rec.read().await;
assert!(
inst.is_processing(),
"first cycle must leave PACT=true (AsyncPending)"
);
assert_eq!(inst.common.lcnt, 0, "first cycle must reset lcnt");
assert_eq!(inst.common.sevr, AlarmSeverity::NoAlarm);
}
for i in 1..=10 {
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_PACT", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_PACT").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "must remain PACT=true (iter {i})");
assert_eq!(inst.common.lcnt, i as i16, "lcnt must increment per bail");
assert_eq!(
inst.common.sevr,
AlarmSeverity::NoAlarm,
"no SCAN_ALARM yet (iter {i})"
);
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_PACT", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_PACT").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "PACT still true post-alarm-raise");
assert_eq!(inst.common.sevr, AlarmSeverity::Invalid);
assert_eq!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::SCAN_ALARM
);
assert_eq!(inst.common.amsg, "Async in progress");
}
#[tokio::test]
async fn test_pact_entry_guard_tpro_diagnostic_does_not_change_bail_outcome() {
let db = PvDatabase::new();
db.add_record("ASYNC_TPRO", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
{
let rec = db.get_record("ASYNC_TPRO").await.unwrap();
let mut inst = rec.write().await;
inst.common.tpro = true;
inst.common.rpro = true;
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_TPRO", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("ASYNC_TPRO").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "must enter PACT");
assert!(inst.common.tpro, "TPRO must be preserved");
assert!(inst.common.rpro, "RPRO must be preserved across PACT entry");
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_TPRO", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_TPRO").await.unwrap();
let inst = rec.read().await;
assert!(inst.is_processing(), "still PACT after bail");
assert_eq!(inst.common.lcnt, 1, "lcnt must have advanced");
assert!(
inst.common.rpro,
"RPRO must remain unchanged by the diagnostic path"
);
}
#[tokio::test]
async fn test_pact_entry_guard_resets_lcnt_after_completion() {
let db = PvDatabase::new();
db.add_record("ASYNC_RESET", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_RESET", &mut visited, 0)
.await
.unwrap();
for _ in 0..3 {
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_RESET", &mut visited, 0)
.await
.unwrap();
}
{
let rec = db.get_record("ASYNC_RESET").await.unwrap();
assert_eq!(rec.read().await.common.lcnt, 3);
}
db.complete_async_record("ASYNC_RESET").await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_RESET", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("ASYNC_RESET").await.unwrap();
let inst = rec.read().await;
assert_eq!(inst.common.lcnt, 0, "lcnt must reset when PACT clears");
}
#[tokio::test]
async fn test_reprocess_after_continuation_bypasses_pact_guard() {
use epics_base_rs::server::record::{ProcessAction, ProcessOutcome, RecordProcessResult};
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
struct ContinuationRecord {
process_count: Arc<AtomicU32>,
}
impl Record for ContinuationRecord {
fn record_type(&self) -> &'static str {
"continuation_test"
}
fn process(&mut self) -> epics_base_rs::error::CaResult<ProcessOutcome> {
let n = self.process_count.fetch_add(1, Ordering::SeqCst);
if n == 0 {
Ok(ProcessOutcome {
result: RecordProcessResult::AsyncPending,
actions: vec![ProcessAction::ReprocessAfter(
std::time::Duration::from_millis(20),
)],
device_did_compute: false,
})
} else {
Ok(ProcessOutcome::complete())
}
}
fn get_field(&self, _name: &str) -> Option<EpicsValue> {
None
}
fn put_field(
&mut self,
_name: &str,
_value: EpicsValue,
) -> epics_base_rs::error::CaResult<()> {
Ok(())
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
}
let process_count = Arc::new(AtomicU32::new(0));
let db = PvDatabase::new();
db.add_record(
"CONT_REC",
Box::new(ContinuationRecord {
process_count: process_count.clone(),
}),
)
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CONT_REC", &mut visited, 0)
.await
.unwrap();
{
let rec = db.get_record("CONT_REC").await.unwrap();
assert!(
rec.read().await.is_processing(),
"PACT must be true after AsyncPending"
);
}
assert_eq!(process_count.load(Ordering::SeqCst), 1);
let mut visited = HashSet::new();
db.process_record_with_links("CONT_REC", &mut visited, 0)
.await
.unwrap();
assert_eq!(
process_count.load(Ordering::SeqCst),
1,
"foreign re-entry during AsyncPending must NOT call process()"
);
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
assert_eq!(
process_count.load(Ordering::SeqCst),
2,
"ReprocessAfter timer must call process() again — owner-driven \
continuation bypasses the PACT entry guard"
);
}
#[tokio::test]
async fn test_dbnd_filter_drops_subthreshold_changes() {
use epics_base_rs::server::database::filters::DeadbandFilter;
use epics_base_rs::server::recgbl::EventMask;
use std::sync::Arc;
let db = PvDatabase::new();
db.add_record("DBND:REC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let rec = db.get_record("DBND:REC").await.unwrap();
let mut rx = {
let mut inst = rec.write().await;
let rx = inst
.add_subscriber(
"VAL",
1,
epics_base_rs::types::DbFieldType::Double,
EventMask::VALUE.bits(),
)
.expect("subscribe");
let attached =
inst.attach_filter_to_last_subscriber("VAL", Arc::new(DeadbandFilter::absolute(1.0)));
assert!(attached, "filter must attach to the just-added subscriber");
rx
};
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(11.0))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
rx.try_recv()
.expect("first value passes the deadband filter");
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(11.4))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
assert!(
rx.try_recv().is_err(),
"sub-threshold change must be filtered out"
);
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(12.5))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
rx.try_recv().expect("above-threshold change passes");
}
#[tokio::test]
async fn test_dbnd_filter_passes_alarm_events() {
use epics_base_rs::server::database::filters::DeadbandFilter;
use epics_base_rs::server::recgbl::EventMask;
use std::sync::Arc;
let db = PvDatabase::new();
db.add_record("DBND:ALR", Box::new(AoRecord::new(50.0)))
.await
.unwrap();
let rec = db.get_record("DBND:ALR").await.unwrap();
let mut rx = {
let mut inst = rec.write().await;
let rx = inst
.add_subscriber(
"VAL",
1,
epics_base_rs::types::DbFieldType::Double,
(EventMask::VALUE | EventMask::ALARM).bits(),
)
.expect("subscribe");
inst.attach_filter_to_last_subscriber("VAL", Arc::new(DeadbandFilter::absolute(10.0)));
rx
};
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(50.0))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
rx.try_recv().expect("seed value");
{
let mut inst = rec.write().await;
inst.record
.put_field("VAL", EpicsValue::Double(50.5))
.unwrap();
inst.notify_field("VAL", EventMask::VALUE);
}
assert!(rx.try_recv().is_err(), "sub-threshold value silenced");
{
let inst = rec.read().await;
inst.notify_field("VAL", EventMask::ALARM);
}
rx.try_recv().expect("alarm event passes the filter");
}
#[tokio::test]
async fn test_notify_field_respects_mask() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(42.0)))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
let (mut value_rx, mut alarm_rx) = {
let mut inst = rec.write().await;
let value_rx = inst
.add_subscriber(
"VAL",
1,
epics_base_rs::types::DbFieldType::Double,
EventMask::VALUE.bits(),
)
.expect("subscribe should not be capped at default");
let alarm_rx = inst
.add_subscriber(
"VAL",
2,
epics_base_rs::types::DbFieldType::Double,
EventMask::ALARM.bits(),
)
.expect("subscribe should not be capped at default");
(value_rx, alarm_rx)
};
{
let inst = rec.read().await;
inst.notify_field("VAL", EventMask::VALUE);
}
assert!(value_rx.try_recv().is_ok());
assert!(alarm_rx.try_recv().is_err());
}
#[tokio::test]
async fn test_sdis_disable_clears_rpro_and_putf() {
let db = PvDatabase::new();
db.add_record("DIS_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("DIS_TGT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DIS_TGT").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DIS_SW".into()))
.unwrap();
inst.common.rpro = true;
inst.common.putf = true;
}
let mut visited = HashSet::new();
db.process_record_with_links("DIS_TGT", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("DIS_TGT").await.unwrap();
let inst = rec.read().await;
assert!(
!inst.common.rpro,
"SDIS disable must clear rpro (C dbAccess.c:575). Pre-fix this leaked."
);
assert!(
!inst.common.putf,
"SDIS disable must clear putf (C dbAccess.c:576). Pre-fix this leaked."
);
}
#[tokio::test]
async fn test_sdis_disable_fires_put_notify_completion() {
let db = PvDatabase::new();
db.add_record("DIS_NOT_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("DIS_NOT_TGT", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DIS_NOT_TGT").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DIS_NOT_SW".into()))
.unwrap();
}
let (tx, rx) = epics_base_rs::runtime::sync::oneshot::channel();
{
let rec = db.get_record("DIS_NOT_TGT").await.unwrap();
let mut inst = rec.write().await;
inst.put_notify_tx = Some(tx);
}
let mut visited = HashSet::new();
db.process_record_with_links("DIS_NOT_TGT", &mut visited, 0)
.await
.unwrap();
rx.await
.expect("disable bail must fire put_notify_tx (C dbAccess.c:622)");
let rec = db.get_record("DIS_NOT_TGT").await.unwrap();
assert!(
rec.read().await.put_notify_tx.is_none(),
"put_notify_tx must be cleared after firing"
);
}
#[tokio::test]
async fn test_sdis_disable_notifies_alarm() {
let db = PvDatabase::new();
db.add_record("DISABLE_SW", Box::new(AoRecord::new(1.0)))
.await
.unwrap();
db.add_record("TARGET", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TARGET").await {
let mut inst = rec.write().await;
inst.put_common_field("SDIS", EpicsValue::String("DISABLE_SW".into()))
.unwrap();
inst.put_common_field("DISS", EpicsValue::Short(1)).unwrap();
}
let mut alarm_rx = {
let rec = db.get_record("TARGET").await.unwrap();
let mut inst = rec.write().await;
inst.add_subscriber(
"SEVR",
1,
epics_base_rs::types::DbFieldType::Short,
EventMask::ALARM.bits(),
)
.expect("subscribe should not be capped at default")
};
let mut visited = HashSet::new();
db.process_record_with_links("TARGET", &mut visited, 0)
.await
.unwrap();
assert!(alarm_rx.try_recv().is_ok());
}
#[tokio::test]
async fn test_udf_cleared_by_process_with_links() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
assert!(rec.read().await.common.udf);
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
assert!(!rec.read().await.common.udf);
}
#[tokio::test]
async fn test_udf_not_cleared_by_clears_udf_false() {
struct NoClearRecord {
val: f64,
}
impl Record for NoClearRecord {
fn record_type(&self) -> &'static str {
"noclear"
}
fn get_field(&self, name: &str) -> Option<EpicsValue> {
match name {
"VAL" => Some(EpicsValue::Double(self.val)),
_ => None,
}
}
fn put_field(
&mut self,
name: &str,
value: EpicsValue,
) -> epics_base_rs::error::CaResult<()> {
match name {
"VAL" => {
if let EpicsValue::Double(v) = value {
self.val = v;
Ok(())
} else {
Err(CaError::InvalidValue("bad".into()))
}
}
_ => Err(CaError::FieldNotFound(name.into())),
}
}
fn field_list(&self) -> &'static [FieldDesc] {
&[]
}
fn clears_udf(&self) -> bool {
false
}
}
let db = PvDatabase::new();
db.add_record("REC", Box::new(NoClearRecord { val: 0.0 }))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
assert!(rec.read().await.common.udf);
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
assert!(rec.read().await.common.udf);
}
#[tokio::test]
async fn test_constant_inp_link() {
let db = PvDatabase::new();
db.add_record("AI_CONST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("AI_CONST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("3.15".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("AI_CONST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("AI_CONST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 3.15).abs() < 1e-10),
other => panic!("expected Double(3.15), got {:?}", other),
}
}
#[tokio::test]
async fn test_calc_multi_input_db_links() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
db.add_record("SRC_A", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
db.add_record("SRC_B", Box::new(AoRecord::new(20.0)))
.await
.unwrap();
let mut calc = CalcRecord::new("A+B");
calc.inpa = "SRC_A".to_string();
calc.inpb = "SRC_B".to_string();
db.add_record("CALC_REC", Box::new(calc)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CALC_REC", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("CALC_REC").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 30.0).abs() < 1e-10),
other => panic!("expected Double(30.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_calc_constant_inputs() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let mut calc = CalcRecord::new("A+B");
calc.inpa = "5".to_string();
calc.inpb = "3.5".to_string();
db.add_record("CALC_CONST", Box::new(calc)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CALC_CONST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("CALC_CONST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 8.5).abs() < 1e-10),
other => panic!("expected Double(8.5), got {:?}", other),
}
}
#[tokio::test]
async fn test_calc_record_has_analog_alarm_limits() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let mut calc = CalcRecord::new("A");
calc.inpa = "15".to_string(); db.add_record("CALC_LIM", Box::new(calc)).await.unwrap();
db.put_record_field_from_ca("CALC_LIM", "HIHI", EpicsValue::Double(10.0))
.await
.unwrap();
db.put_record_field_from_ca("CALC_LIM", "HHSV", EpicsValue::String("MAJOR".into()))
.await
.unwrap();
let hihi = {
let rec = db.get_record("CALC_LIM").await.unwrap();
let inst = rec.read().await;
inst.resolve_field("HIHI").and_then(|v| v.to_f64()).unwrap()
};
assert_eq!(hihi, 10.0);
let mut visited = HashSet::new();
db.process_record_with_links("CALC_LIM", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("CALC_LIM").await.unwrap();
let inst = rec.read().await;
assert_eq!(
inst.common.sevr,
epics_base_rs::server::record::AlarmSeverity::Major,
"VAL=15, HIHI=10, HHSV=MAJOR — must raise HIHI alarm",
);
assert_eq!(
inst.common.stat,
epics_base_rs::server::recgbl::alarm_status::HIHI_ALARM,
);
}
#[tokio::test]
async fn test_calc_record_aftc_filter_delays_alarm() {
use epics_base_rs::server::records::calc::CalcRecord;
let db = PvDatabase::new();
let mut calc = CalcRecord::new("A");
calc.inpa = "1".to_string();
calc.aftc = 5.0; db.add_record("CALC_AFTC", Box::new(calc)).await.unwrap();
db.put_record_field_from_ca("CALC_AFTC", "HIHI", EpicsValue::Double(10.0))
.await
.unwrap();
db.put_record_field_from_ca("CALC_AFTC", "HHSV", EpicsValue::String("MAJOR".into()))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("CALC_AFTC", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("CALC_AFTC").await.unwrap();
{
let mut inst = rec.write().await;
let _ = inst.record.put_field("VAL", EpicsValue::Double(15.0));
}
let mut visited = HashSet::new();
db.process_record_with_links("CALC_AFTC", &mut visited, 0)
.await
.unwrap();
let inst = rec.read().await;
let afvl = inst
.record
.get_field("AFVL")
.and_then(|v| v.to_f64())
.unwrap_or(0.0);
assert!(afvl != 0.0, "AFVL must be updated when AFTC > 0");
}
#[tokio::test]
async fn test_fanout_all() {
use epics_base_rs::server::records::fanout::FanoutRecord;
let db = PvDatabase::new();
let mut fanout = FanoutRecord::new();
fanout.selm = 0;
fanout.lnk1 = "TARGET_1".to_string();
fanout.lnk2 = "TARGET_2".to_string();
db.add_record("FANOUT", Box::new(fanout)).await.unwrap();
db.add_record("TARGET_1", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TARGET_2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("FANOUT", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("FANOUT"));
assert!(visited.contains("TARGET_1"));
assert!(visited.contains("TARGET_2"));
}
#[tokio::test]
async fn test_fanout_specified() {
use epics_base_rs::server::records::fanout::FanoutRecord;
let db = PvDatabase::new();
let mut fanout = FanoutRecord::new();
fanout.selm = 1;
fanout.seln = 1;
db.add_record("FANOUT", Box::new(fanout)).await.unwrap();
db.add_record("T1", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("T2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("FANOUT").await {
let mut inst = rec.write().await;
inst.record
.put_field("LNK1", EpicsValue::String("T1".into()))
.unwrap();
inst.record
.put_field("LNK2", EpicsValue::String("T2".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("FANOUT", &mut visited, 0)
.await
.unwrap();
assert!(visited.contains("FANOUT"));
assert!(!visited.contains("T1"));
assert!(visited.contains("T2"));
}
#[tokio::test]
async fn test_dfanout_value_write() {
use epics_base_rs::server::records::dfanout::DfanoutRecord;
let db = PvDatabase::new();
let mut dfan = DfanoutRecord::new(42.0);
dfan.selm = 0;
dfan.outa = "DEST_A".to_string();
dfan.outb = "DEST_B".to_string();
db.add_record("DFAN", Box::new(dfan)).await.unwrap();
db.add_record("DEST_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("DEST_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("DFAN", &mut visited, 0)
.await
.unwrap();
let val_a = db.get_pv("DEST_A").await.unwrap();
match val_a {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
let val_b = db.get_pv("DEST_B").await.unwrap();
match val_b {
EpicsValue::Double(v) => assert!((v - 42.0).abs() < 1e-10),
other => panic!("expected Double(42.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_dfanout_omsl_closed_loop_sources_val_from_dol() {
use epics_base_rs::server::records::dfanout::DfanoutRecord;
let db = PvDatabase::new();
db.add_record("DOL_SRC", Box::new(AoRecord::new(7.5)))
.await
.unwrap();
let mut dfan = DfanoutRecord::new(0.0);
dfan.selm = 0;
dfan.outa = "DFAN_DEST_A".to_string();
dfan.outb = "DFAN_DEST_B".to_string();
dfan.dol = "DOL_SRC".to_string();
dfan.omsl = 1; db.add_record("DFAN_OMSL", Box::new(dfan)).await.unwrap();
db.add_record("DFAN_DEST_A", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("DFAN_DEST_B", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("DFAN_OMSL", &mut visited, 0)
.await
.unwrap();
let val_a = db.get_pv("DFAN_DEST_A").await.unwrap();
assert!(
matches!(val_a, EpicsValue::Double(v) if (v - 7.5).abs() < 1e-10),
"DFAN_DEST_A must reflect DOL_SRC (=7.5), got {val_a:?}"
);
let val_b = db.get_pv("DFAN_DEST_B").await.unwrap();
assert!(
matches!(val_b, EpicsValue::Double(v) if (v - 7.5).abs() < 1e-10),
"DFAN_DEST_B must reflect DOL_SRC (=7.5), got {val_b:?}"
);
}
#[tokio::test]
async fn test_dfanout_omsl_supervisory_ignores_dol() {
use epics_base_rs::server::records::dfanout::DfanoutRecord;
let db = PvDatabase::new();
db.add_record("DOL_SRC2", Box::new(AoRecord::new(99.0)))
.await
.unwrap();
let mut dfan = DfanoutRecord::new(3.0);
dfan.selm = 0;
dfan.outa = "DFAN_DEST_A2".to_string();
dfan.dol = "DOL_SRC2".to_string();
dfan.omsl = 0; db.add_record("DFAN_SUP", Box::new(dfan)).await.unwrap();
db.add_record("DFAN_DEST_A2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("DFAN_SUP", &mut visited, 0)
.await
.unwrap();
let val_a = db.get_pv("DFAN_DEST_A2").await.unwrap();
assert!(
matches!(val_a, EpicsValue::Double(v) if (v - 3.0).abs() < 1e-10),
"OMSL=supervisory must keep the operator-staged VAL (=3.0), got {val_a:?}"
);
}
#[tokio::test]
async fn test_seq_dol_lnk_dispatch() {
use epics_base_rs::server::records::seq::SeqRecord;
let db = PvDatabase::new();
db.add_record("SEQ_SRC1", Box::new(AoRecord::new(100.0)))
.await
.unwrap();
db.add_record("SEQ_SRC2", Box::new(AoRecord::new(200.0)))
.await
.unwrap();
db.add_record("SEQ_DEST1", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("SEQ_DEST2", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut seq = SeqRecord::new();
seq.selm = 0;
seq.dol1 = "SEQ_SRC1".to_string();
seq.lnk1 = "SEQ_DEST1".to_string();
seq.dol2 = "SEQ_SRC2".to_string();
seq.lnk2 = "SEQ_DEST2".to_string();
db.add_record("SEQ_REC", Box::new(seq)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SEQ_REC", &mut visited, 0)
.await
.unwrap();
let val1 = db.get_pv("SEQ_DEST1").await.unwrap();
match val1 {
EpicsValue::Double(v) => assert!((v - 100.0).abs() < 1e-10),
other => panic!("expected Double(100.0), got {:?}", other),
}
let val2 = db.get_pv("SEQ_DEST2").await.unwrap();
match val2 {
EpicsValue::Double(v) => assert!((v - 200.0).abs() < 1e-10),
other => panic!("expected Double(200.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_sel_nvl_link() {
use epics_base_rs::server::records::sel::SelRecord;
let db = PvDatabase::new();
db.add_record("NVL_SRC", Box::new(AoRecord::new(2.0)))
.await
.unwrap();
let mut sel = SelRecord::default();
sel.selm = 0;
sel.nvl = "NVL_SRC".to_string();
sel.a = 10.0;
sel.b = 20.0;
sel.c = 30.0;
db.add_record("SEL_REC", Box::new(sel)).await.unwrap();
let mut visited = HashSet::new();
db.process_record_with_links("SEL_REC", &mut visited, 0)
.await
.unwrap();
let seln = db.get_pv("SEL_REC.SELN").await.unwrap();
match seln {
EpicsValue::Short(v) => assert_eq!(v, 2),
other => panic!("expected Short(2), got {:?}", other),
}
let val = db.get_pv("SEL_REC").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 30.0).abs() < 1e-10),
other => panic!("expected Double(30.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_dol_cp_link_registration() {
let db = PvDatabase::new();
db.add_record("MTR", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut ao = AoRecord::new(0.0);
ao.omsl = 1;
ao.dol = "MTR CP".to_string();
db.add_record("MOTOR_POS", Box::new(ao)).await.unwrap();
db.setup_cp_links().await;
let targets = db.get_cp_targets("MTR").await;
assert_eq!(targets, vec!["MOTOR_POS"]);
}
#[tokio::test]
async fn test_dol_cp_link_triggers_processing() {
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
let mut ao = AoRecord::new(0.0);
ao.omsl = 1;
ao.dol = "SRC CP".to_string();
db.add_record("DST", Box::new(ao)).await.unwrap();
db.setup_cp_links().await;
let mut visited = HashSet::new();
db.process_record_with_links("SRC", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DST").await.unwrap();
match val {
EpicsValue::Double(v) => assert!((v - 10.0).abs() < 1e-10),
other => panic!("expected Double(10.0), got {:?}", other),
}
}
#[tokio::test]
async fn test_seq_dol_cp_link_registration() {
use epics_base_rs::server::records::seq::SeqRecord;
let db = PvDatabase::new();
db.add_record("SENSOR", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut seq = SeqRecord::default();
seq.dol1 = "SENSOR CP".to_string();
db.add_record("MY_SEQ", Box::new(seq)).await.unwrap();
db.setup_cp_links().await;
let targets = db.get_cp_targets("SENSOR").await;
assert_eq!(targets, vec!["MY_SEQ"]);
}
#[tokio::test]
async fn test_sel_nvl_cp_link_registration() {
use epics_base_rs::server::records::sel::SelRecord;
let db = PvDatabase::new();
db.add_record("INDEX_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let mut sel = SelRecord::default();
sel.nvl = "INDEX_SRC CP".to_string();
db.add_record("MY_SEL", Box::new(sel)).await.unwrap();
db.setup_cp_links().await;
let targets = db.get_cp_targets("INDEX_SRC").await;
assert_eq!(targets, vec!["MY_SEL"]);
}
#[tokio::test]
async fn test_sdis_cp_link_registration() {
let db = PvDatabase::new();
db.add_record("DISABLE_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("GUARDED", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
if let Some(rec_arc) = db.get_record("GUARDED").await {
rec_arc.write().await.common.sdis = "DISABLE_SRC CP".to_string();
}
db.setup_cp_links().await;
let targets = db.get_cp_targets("DISABLE_SRC").await;
assert_eq!(targets, vec!["GUARDED"]);
}
#[tokio::test]
async fn test_tse_minus1_always_overwrites_via_best_time() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let stale = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1234567);
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.common.tse = -1;
inst.common.time = stale;
}
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert_ne!(
inst.common.time, stale,
"TSE=-1 must always overwrite via generalTime BestTime, matching \
C `epicsTimeGetEvent(-1)` called unconditionally"
);
}
#[tokio::test]
async fn test_tse_minus2_keeps_time_unchanged() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let fixed_time = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(999);
if let Some(rec) = db.get_record("REC").await {
let mut inst = rec.write().await;
inst.common.tse = -2;
inst.common.time = fixed_time;
}
let mut visited = HashSet::new();
db.process_record_with_links("REC", &mut visited, 0)
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
let inst = rec.read().await;
assert_eq!(inst.common.time, fixed_time);
}
#[tokio::test]
async fn test_putf_read_only_from_ca() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let result = db
.put_record_field_from_ca("REC", "PUTF", EpicsValue::Char(1))
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_rpro_causes_reprocessing() {
let db = PvDatabase::new();
db.add_record("SRC", Box::new(AoRecord::new(10.0)))
.await
.unwrap();
db.add_record("DEST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("DEST").await {
let mut inst = rec.write().await;
inst.put_common_field("INP", EpicsValue::String("SRC".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("DEST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DEST").await.unwrap();
assert_eq!(val.to_f64().unwrap() as i64, 10);
db.put_pv_no_process("SRC", EpicsValue::Double(20.0))
.await
.unwrap();
if let Some(rec) = db.get_record("DEST").await {
let mut inst = rec.write().await;
inst.common.rpro = true;
}
let mut visited = HashSet::new();
db.process_record_with_links("DEST", &mut visited, 0)
.await
.unwrap();
let val = db.get_pv("DEST").await.unwrap();
assert_eq!(val.to_f64().unwrap() as i64, 20);
let rec = db.get_record("DEST").await.unwrap();
let inst = rec.read().await;
assert!(!inst.common.rpro);
}
#[tokio::test]
async fn test_tsel_cp_link_registration() {
let db = PvDatabase::new();
db.add_record("TSE_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TARGET", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec_arc) = db.get_record("TARGET").await {
let mut inst = rec_arc.write().await;
inst.common.tsel = "TSE_SRC CP".to_string();
inst.parsed_tsel = parse_link_v2(&inst.common.tsel);
}
db.setup_cp_links().await;
let targets = db.get_cp_targets("TSE_SRC").await;
assert_eq!(targets, vec!["TARGET"]);
}
#[tokio::test]
async fn test_new_common_fields_get_put() {
let db = PvDatabase::new();
db.add_record("REC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
let rec = db.get_record("REC").await.unwrap();
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("UDFS"), Some(EpicsValue::Short(3)));
}
{
let mut inst = rec.write().await;
inst.put_common_field("UDFS", EpicsValue::Short(1)).unwrap();
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("UDFS"), Some(EpicsValue::Short(1)));
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("SSCN"), Some(EpicsValue::Enum(0)));
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("BKPT"), Some(EpicsValue::Char(0)));
}
{
let mut inst = rec.write().await;
inst.put_common_field("BKPT", EpicsValue::Char(1)).unwrap();
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("BKPT"), Some(EpicsValue::Char(1)));
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("TSE"), Some(EpicsValue::Short(0)));
}
{
let inst = rec.read().await;
assert_eq!(
inst.get_common_field("TSEL"),
Some(EpicsValue::String(String::new()))
);
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("PUTF"), Some(EpicsValue::Char(0)));
}
{
let mut inst = rec.write().await;
let result = inst.put_common_field("PUTF", EpicsValue::Char(1));
assert!(result.is_err());
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("RPRO"), Some(EpicsValue::Char(0)));
}
{
let mut inst = rec.write().await;
inst.put_common_field("RPRO", EpicsValue::Char(1)).unwrap();
}
{
let inst = rec.read().await;
assert_eq!(inst.get_common_field("RPRO"), Some(EpicsValue::Char(1)));
}
}
#[tokio::test]
async fn test_array_records_nord_monitor_uses_post_process_timestamp() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::server::records::waveform::{ArrayKind, WaveformRecord};
use epics_base_rs::types::DbFieldType;
use std::time::SystemTime;
for (kind, name) in [
(ArrayKind::Waveform, "WF_KIND"),
(ArrayKind::Aai, "AAI_KIND"),
(ArrayKind::Aao, "AAO_KIND"),
(ArrayKind::SubArray, "SUBA_KIND"),
] {
let db = PvDatabase::new();
db.add_record(name, Box::new(WaveformRecord::with_kind(kind)))
.await
.unwrap();
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.record.put_field("NELM", EpicsValue::Long(10)).unwrap();
inst.record
.put_field("FTVL", EpicsValue::Short(10))
.unwrap();
if matches!(kind, ArrayKind::SubArray) {
inst.record.put_field("INDX", EpicsValue::Long(0)).unwrap();
inst.record.put_field("MALM", EpicsValue::Long(10)).unwrap();
}
}
let start = SystemTime::now();
let mut nord_rx = if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.add_subscriber("NORD", 1, DbFieldType::Long, EventMask::VALUE.bits())
} else {
None
}
.unwrap_or_else(|| panic!("NORD subscription must be accepted for {name}"));
if let Some(rec) = db.get_record(name).await {
let mut inst = rec.write().await;
inst.record
.set_val(EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0]))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links(name, &mut visited, 0)
.await
.unwrap();
let event = nord_rx
.try_recv()
.unwrap_or_else(|_| panic!("NORD monitor event must be delivered for {name}"));
assert!(
matches!(event.snapshot.value, EpicsValue::Long(3)),
"{name}: NORD payload should reflect post-set_val length (3), got {:?}",
event.snapshot.value
);
let ts = event.snapshot.timestamp;
assert!(
ts != SystemTime::UNIX_EPOCH,
"{name}: NORD event timestamp must not be the epoch sentinel"
);
assert!(
ts >= start,
"{name}: NORD event timestamp ({ts:?}) must be ≥ pre-process baseline ({start:?})"
);
}
}
#[tokio::test]
async fn test_complete_async_record_gates_subscribed_field_on_change() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("ASYNC_GATE", Box::new(AsyncRecord { val: 0.0 }))
.await
.unwrap();
if let Some(rec) = db.get_record("ASYNC_GATE").await {
let mut inst = rec.write().await;
inst.put_common_field("DESC", EpicsValue::String("alpha".into()))
.unwrap();
}
let mut desc_rx = if let Some(rec) = db.get_record("ASYNC_GATE").await {
let mut inst = rec.write().await;
inst.add_subscriber("DESC", 7, DbFieldType::String, EventMask::VALUE.bits())
} else {
None
}
.expect("DESC subscription must be accepted");
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_GATE", &mut visited, 0)
.await
.unwrap();
db.complete_async_record("ASYNC_GATE").await.unwrap();
assert!(
desc_rx.try_recv().is_err(),
"DESC unchanged across async-completion → must NOT post a duplicate event"
);
if let Some(rec) = db.get_record("ASYNC_GATE").await {
let mut inst = rec.write().await;
inst.put_common_field("DESC", EpicsValue::String("beta".into()))
.unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_GATE", &mut visited, 0)
.await
.unwrap();
db.complete_async_record("ASYNC_GATE").await.unwrap();
let event = desc_rx
.try_recv()
.expect("DESC change must produce a post-completion event");
assert!(
matches!(event.snapshot.value, EpicsValue::String(ref s) if s == "beta"),
"DESC event payload should reflect post-change value, got {:?}",
event.snapshot.value
);
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_GATE", &mut visited, 0)
.await
.unwrap();
db.complete_async_record("ASYNC_GATE").await.unwrap();
assert!(
desc_rx.try_recv().is_err(),
"DESC stable after the change → no further events"
);
}
#[tokio::test]
async fn test_put_pv_and_post_propagates_nord_side_effect_on_waveform() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::server::records::waveform::{ArrayKind, WaveformRecord};
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record(
"WF_GW",
Box::new(WaveformRecord::with_kind(ArrayKind::Waveform)),
)
.await
.unwrap();
if let Some(rec) = db.get_record("WF_GW").await {
let mut inst = rec.write().await;
inst.record.put_field("NELM", EpicsValue::Long(10)).unwrap();
inst.record
.put_field("FTVL", EpicsValue::Short(10))
.unwrap();
}
let (mut nord_rx, mut val_rx) = if let Some(rec) = db.get_record("WF_GW").await {
let mut inst = rec.write().await;
let n = inst.add_subscriber("NORD", 1, DbFieldType::Long, EventMask::VALUE.bits());
let v = inst.add_subscriber("VAL", 2, DbFieldType::Double, EventMask::VALUE.bits());
(n, v)
} else {
(None, None)
};
let nord_rx = nord_rx.as_mut().expect("NORD subscription accepted");
let val_rx = val_rx.as_mut().expect("VAL subscription accepted");
db.put_pv_and_post("WF_GW", EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0, 4.0]))
.await
.unwrap();
let val_event = val_rx
.try_recv()
.expect("VAL event must be delivered after put_pv_and_post");
let nord_event = nord_rx
.try_recv()
.expect("NORD event must be delivered after put_pv_and_post (side-effect of VAL)");
assert!(
matches!(nord_event.snapshot.value, EpicsValue::Long(4)),
"NORD event should reflect post-put length (4), got {:?}",
nord_event.snapshot.value
);
assert_eq!(
val_event.snapshot.timestamp, nord_event.snapshot.timestamp,
"VAL and NORD side-effect events must share the put's timestamp"
);
db.put_pv_and_post("WF_GW", EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0, 4.0]))
.await
.unwrap();
assert!(
nord_rx.try_recv().is_err(),
"NORD unchanged → no duplicate NORD event"
);
}
#[tokio::test]
async fn test_output_link_cascade_uses_post_process_source_timestamp() {
use std::time::SystemTime;
let db = PvDatabase::new();
db.add_record("TS_SRC", Box::new(AoRecord::new(0.0)))
.await
.unwrap();
db.add_record("TS_DST", Box::new(AiRecord::new(0.0)))
.await
.unwrap();
if let Some(rec) = db.get_record("TS_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("TS_DST".into()))
.unwrap();
}
let baseline = SystemTime::now();
if let Some(rec) = db.get_record("TS_SRC").await {
let mut inst = rec.write().await;
inst.record.set_val(EpicsValue::Double(7.5)).unwrap();
}
let mut visited = HashSet::new();
db.process_record_with_links("TS_SRC", &mut visited, 0)
.await
.unwrap();
let src_time = db
.get_record("TS_SRC")
.await
.expect("TS_SRC exists")
.read()
.await
.common
.time;
assert!(
src_time >= baseline,
"SRC.common.time ({src_time:?}) must be post-baseline ({baseline:?}) — \
apply_timestamp must run before OUT write"
);
let dst_time = db
.get_record("TS_DST")
.await
.expect("TS_DST exists")
.read()
.await
.common
.time;
assert!(
dst_time >= baseline,
"DST.common.time ({dst_time:?}) must be post-baseline ({baseline:?}) — \
OUT cascade must drive Passive DST through process_record_with_links"
);
}
#[tokio::test]
async fn test_complete_async_record_updates_timestamp_at_completion() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::types::DbFieldType;
use std::time::{Duration, SystemTime};
let db = PvDatabase::new();
db.add_record("ASYNC_TS", Box::new(AsyncRecord { val: 1.0 }))
.await
.unwrap();
let mut val_rx = if let Some(rec) = db.get_record("ASYNC_TS").await {
let mut inst = rec.write().await;
inst.add_subscriber("VAL", 9, DbFieldType::Double, EventMask::VALUE.bits())
} else {
None
}
.expect("VAL subscription accepted");
let mut visited = HashSet::new();
db.process_record_with_links("ASYNC_TS", &mut visited, 0)
.await
.unwrap();
assert!(
val_rx.try_recv().is_err(),
"AsyncPending early-return must not deliver VAL event yet"
);
tokio::time::sleep(Duration::from_millis(20)).await;
let post_sleep = SystemTime::now();
db.complete_async_record("ASYNC_TS").await.unwrap();
let event = val_rx
.try_recv()
.expect("VAL event must be delivered at async completion");
assert!(
event.snapshot.timestamp >= post_sleep,
"completion event timestamp ({:?}) must be ≥ post-sleep ({post_sleep:?}) — \
apply_timestamp must run at async completion, not at process start",
event.snapshot.timestamp
);
}
#[tokio::test]
async fn test_longout_oopt_on_change_first_cycle_emits_then_suppresses() {
use epics_base_rs::server::records::longout::LongoutRecord;
use std::time::SystemTime;
let db = PvDatabase::new();
db.add_record("LO_SRC", Box::new(LongoutRecord::new(0)))
.await
.unwrap();
db.add_record("LO_DST", Box::new(LongoutRecord::new(0)))
.await
.unwrap();
if let Some(rec) = db.get_record("LO_SRC").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("LO_DST".into()))
.unwrap();
inst.record.put_field("OOPT", EpicsValue::Short(1)).unwrap();
}
let baseline = SystemTime::now();
let mut visited = HashSet::new();
db.process_record_with_links("LO_SRC", &mut visited, 0)
.await
.unwrap();
let dst_time_after_first = db
.get_record("LO_DST")
.await
.expect("LO_DST exists")
.read()
.await
.common
.time;
assert!(
dst_time_after_first >= baseline,
"first-cycle OOPT=On_Change must drive OUT cascade (DST.time {dst_time_after_first:?} \
must be ≥ baseline {baseline:?}); pre-fix the cascade was suppressed"
);
let src_first_done = db
.get_record("LO_SRC")
.await
.expect("LO_SRC exists")
.read()
.await
.record
.get_field("VAL")
.is_some();
assert!(src_first_done, "SRC must have processed at least once");
let dst_time_before_second = dst_time_after_first;
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
let mut visited = HashSet::new();
db.process_record_with_links("LO_SRC", &mut visited, 0)
.await
.unwrap();
let dst_time_after_second = db
.get_record("LO_DST")
.await
.expect("LO_DST exists")
.read()
.await
.common
.time;
assert_eq!(
dst_time_after_second, dst_time_before_second,
"second-cycle OOPT=On_Change with val==pval must NOT re-trigger OUT cascade — \
DST.time should not advance from {dst_time_before_second:?} to {dst_time_after_second:?}"
);
}
#[tokio::test]
async fn test_self_link_out_does_not_loop() {
use epics_base_rs::server::records::longout::LongoutRecord;
use std::time::Duration;
let db = PvDatabase::new();
db.add_record("SELF_LO", Box::new(LongoutRecord::new(0)))
.await
.unwrap();
if let Some(rec) = db.get_record("SELF_LO").await {
let mut inst = rec.write().await;
inst.put_common_field("OUT", EpicsValue::String("SELF_LO".into()))
.unwrap();
}
let mut visited = HashSet::new();
let result = tokio::time::timeout(
Duration::from_secs(1),
db.process_record_with_links("SELF_LO", &mut visited, 0),
)
.await;
assert!(
result.is_ok(),
"self-link processing must complete within 1s — \
hang implies the visited HashSet guard regressed"
);
result.unwrap().expect("process call must succeed");
assert!(visited.contains("SELF_LO"));
let mut visited2 = HashSet::new();
let result2 = tokio::time::timeout(
Duration::from_secs(1),
db.process_record_with_links("SELF_LO", &mut visited2, 0),
)
.await;
assert!(
result2.is_ok(),
"subsequent self-link processing must also complete within 1s"
);
result2.unwrap().expect("second process call must succeed");
let rpro_after = db
.get_record("SELF_LO")
.await
.expect("SELF_LO exists")
.read()
.await
.common
.rpro;
assert!(
!rpro_after,
"RPRO must be cleared after self-link processing — \
stuck-true would queue an infinite reprocess loop"
);
}
#[tokio::test]
async fn test_compress_res_write_posts_val_monitor() {
use epics_base_rs::server::recgbl::EventMask;
use epics_base_rs::server::records::compress::CompressRecord;
use epics_base_rs::types::DbFieldType;
let db = PvDatabase::new();
db.add_record("CMP_RES", Box::new(CompressRecord::new(8, 4)))
.await
.unwrap();
if let Some(rec) = db.get_record("CMP_RES").await {
let mut inst = rec.write().await;
let arr = EpicsValue::DoubleArray(vec![1.0, 2.0, 3.0, 0.0, 0.0, 0.0, 0.0, 0.0]);
let _ = inst.record.put_field("VAL", arr);
}
let mut val_rx = if let Some(rec) = db.get_record("CMP_RES").await {
let mut inst = rec.write().await;
inst.add_subscriber("VAL", 1, DbFieldType::Double, EventMask::VALUE.bits())
} else {
None
}
.expect("VAL subscription accepted");
let _ = db
.put_record_field_from_ca("CMP_RES", "RES", EpicsValue::Short(1))
.await;
let event = val_rx
.try_recv()
.expect("RES write must trigger a VAL monitor event");
if let EpicsValue::DoubleArray(v) = &event.snapshot.value {
assert!(
v.iter().all(|&x| x == 0.0),
"post-reset VAL must be all zeros; got {v:?}"
);
} else {
panic!("VAL must be DoubleArray, got {:?}", event.snapshot.value);
}
let res = db
.get_record("CMP_RES")
.await
.expect("CMP_RES exists")
.read()
.await
.record
.get_field("RES")
.and_then(|v| match v {
EpicsValue::Short(s) => Some(s),
_ => None,
})
.expect("RES readable");
assert_eq!(res, 0, "RES must auto-clear after the reset");
}
#[tokio::test]
async fn test_mbbo_direct_initialises_val_from_bits_when_undef() {
use epics_base_rs::server::record::Record;
use epics_base_rs::server::records::mbbo_direct::MbboDirectRecord;
let mut rec = MbboDirectRecord::default();
rec.put_field("B3", EpicsValue::Char(1)).unwrap();
let mut udf = true;
rec.post_init_finalize_undef(&mut udf).unwrap();
assert!(
!udf,
"UDF must be cleared once bits supplied an initial value"
);
assert!(matches!(rec.get_field("VAL"), Some(EpicsValue::Long(8))));
let mut rec2 = MbboDirectRecord::default();
rec2.put_field("VAL", EpicsValue::Long(0b0101)).unwrap();
let mut udf2 = false;
rec2.post_init_finalize_undef(&mut udf2).unwrap();
assert!(!udf2, "UDF stays cleared");
assert!(matches!(rec2.get_field("VAL"), Some(EpicsValue::Long(5))));
assert!(matches!(rec2.get_field("B0"), Some(EpicsValue::Char(1))));
assert!(matches!(rec2.get_field("B2"), Some(EpicsValue::Char(1))));
assert!(matches!(rec2.get_field("B1"), Some(EpicsValue::Char(0))));
let mut rec3 = MbboDirectRecord::default();
let mut udf3 = true;
rec3.post_init_finalize_undef(&mut udf3).unwrap();
assert!(udf3, "UDF stays true when nothing initialised");
assert!(matches!(rec3.get_field("VAL"), Some(EpicsValue::Long(0))));
}
#[tokio::test]
async fn test_lnk_calc_parses_evaluates_and_passes_timestamp() {
use epics_base_rs::server::record::{CalcLink, ParsedLink, parse_link_v2};
use epics_base_rs::server::records::ai::AiRecord;
let parsed = parse_link_v2(r#"{calc:{"expr":"A+B*2","args":["pv_a","pv_b"],"time":"A"}}"#);
let calc = match parsed {
ParsedLink::Calc(c) => c,
other => panic!("expected ParsedLink::Calc, got {other:?}"),
};
assert_eq!(calc.expr, "A+B*2");
assert_eq!(calc.args, vec!["pv_a".to_string(), "pv_b".to_string()]);
assert_eq!(calc.time_source, Some('A'));
let no_time = parse_link_v2(r#"{calc:{"expr":"A","args":["pv_a"]}}"#);
assert!(matches!(
no_time,
ParsedLink::Calc(CalcLink {
time_source: None,
..
})
));
let too_many = parse_link_v2(
r#"{calc:{"expr":"A","args":["a","b","c","d","e","f","g","h","i","j","k","l","m"]}}"#,
);
assert!(
!matches!(too_many, ParsedLink::Calc(_)),
"13+ args must NOT parse as Calc"
);
let db = PvDatabase::new();
db.add_record("pv_a", Box::new(AiRecord::new(3.0)))
.await
.unwrap();
db.add_record("pv_b", Box::new(AiRecord::new(5.0)))
.await
.unwrap();
let calc = CalcLink {
expr: "A+B*2".into(),
args: vec!["pv_a".into(), "pv_b".into()],
time_source: Some('A'),
};
let parsed = ParsedLink::Calc(calc.clone());
let value = db
.read_link_value_soft(&parsed, true)
.await
.expect("calc link evaluates");
match value {
EpicsValue::Double(v) => assert!((v - 13.0).abs() < 1e-9, "expected 3+5*2=13, got {v}"),
other => panic!("expected Double, got {other:?}"),
}
let known = std::time::SystemTime::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
if let Some(rec) = db.get_record("pv_a").await {
rec.write().await.common.time = known;
}
let (v, t) = db
.evaluate_calc_link_with_time(&calc)
.await
.expect("calc evaluates with time");
match v {
EpicsValue::Double(x) => assert!((x - 13.0).abs() < 1e-9),
other => panic!("expected Double, got {other:?}"),
}
assert_eq!(t, Some(known), "time pulled from pv_a (letter 'A')");
}