use std::f64::consts::PI;
use std::sync::Arc;
use std::time::Duration;
use epics_base_rs::error::CaResult;
use epics_base_rs::server::database::PvDatabase;
use epics_base_rs::server::snapshot::DbrClass;
use epics_base_rs::types::{DbFieldType, EpicsValue};
use serial_test::serial;
fn free_port() -> u16 {
let sock = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
sock.local_addr().unwrap().port()
}
async fn setup(pvs: Vec<(&str, EpicsValue)>) -> CaResult<epics_ca_rs::client::CaClient> {
let db = Arc::new(PvDatabase::new());
for (name, val) in pvs {
db.add_pv(name, val).await;
}
let acf = Arc::new(tokio::sync::RwLock::new(None));
let (tcp_tx, tcp_rx) = tokio::sync::oneshot::channel();
let db_tcp = db.clone();
let acf_clone = acf.clone();
tokio::spawn(async move {
let beacon_reset = std::sync::Arc::new(tokio::sync::Notify::new());
let drain = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let (acf_reload_tx, _) = tokio::sync::broadcast::channel::<()>(16);
let _ = epics_ca_rs::server::tcp::run_tcp_listener(
db_tcp,
0,
acf_clone,
acf_reload_tx,
tcp_tx,
beacon_reset,
None, None, drain,
#[cfg(feature = "ca-experimental-rust-tls")]
None, #[cfg(feature = "ca-cap-tokens")]
None, )
.await;
});
let tcp_port = tcp_rx.await.expect("TCP listener started");
let udp_port = free_port();
let db_udp = db.clone();
tokio::spawn(async move {
let _ = epics_ca_rs::server::udp::run_udp_search_responder(
db_udp,
udp_port,
tcp_port,
vec![std::net::Ipv4Addr::UNSPECIFIED],
Vec::new(),
)
.await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
unsafe {
std::env::set_var("EPICS_CA_ADDR_LIST", format!("127.0.0.1:{udp_port}"));
std::env::set_var("EPICS_CA_AUTO_ADDR_LIST", "NO");
}
epics_ca_rs::client::CaClient::new().await
}
#[tokio::test]
#[serial]
async fn test_get_double() {
let client = setup(vec![("TEST:VAL", EpicsValue::Double(42.0))])
.await
.unwrap();
let ch = client.create_channel("TEST:VAL");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let (dbr_type, value) = ch.get().await.unwrap();
assert_eq!(dbr_type, DbFieldType::Double);
assert_eq!(value, EpicsValue::Double(42.0));
}
#[tokio::test]
#[serial]
async fn test_put_and_readback() {
let client = setup(vec![("TEST:SP", EpicsValue::Double(0.0))])
.await
.unwrap();
let ch = client.create_channel("TEST:SP");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
ch.put(&EpicsValue::Double(99.5)).await.unwrap();
let (_, value) = ch.get().await.unwrap();
assert_eq!(value, EpicsValue::Double(99.5));
}
#[tokio::test]
#[serial]
async fn test_get_with_metadata_time_double() {
let client = setup(vec![("TEST:TEMP", EpicsValue::Double(25.0))])
.await
.unwrap();
let ch = client.create_channel("TEST:TEMP");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let snap = ch.get_with_metadata(DbrClass::Time).await.unwrap();
assert_eq!(snap.value, EpicsValue::Double(25.0));
assert!(snap.timestamp > std::time::SystemTime::UNIX_EPOCH);
}
#[tokio::test]
#[serial]
async fn test_get_with_metadata_time_short() {
let client = setup(vec![("TEST:INT", EpicsValue::Short(7))])
.await
.unwrap();
let ch = client.create_channel("TEST:INT");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let snap = ch.get_with_metadata(DbrClass::Time).await.unwrap();
assert_eq!(snap.value, EpicsValue::Short(7));
}
#[tokio::test]
#[serial]
async fn test_get_with_metadata_time_string() {
let client = setup(vec![("TEST:STR", EpicsValue::String("hello".into()))])
.await
.unwrap();
let ch = client.create_channel("TEST:STR");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let snap = ch.get_with_metadata(DbrClass::Time).await.unwrap();
assert_eq!(snap.value, EpicsValue::String("hello".into()));
}
#[tokio::test]
#[serial]
async fn test_get_with_metadata_ctrl_double() {
let client = setup(vec![("TEST:CTRL", EpicsValue::Double(PI))])
.await
.unwrap();
let ch = client.create_channel("TEST:CTRL");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let snap = ch.get_with_metadata(DbrClass::Ctrl).await.unwrap();
assert_eq!(snap.value, EpicsValue::Double(PI));
assert_eq!(snap.alarm.status, 0);
assert_eq!(snap.alarm.severity, 0);
}
#[tokio::test]
#[serial]
async fn test_get_with_metadata_sts() {
let client = setup(vec![("TEST:STS", EpicsValue::Double(1.0))])
.await
.unwrap();
let ch = client.create_channel("TEST:STS");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let snap = ch.get_with_metadata(DbrClass::Sts).await.unwrap();
assert_eq!(snap.value, EpicsValue::Double(1.0));
assert_eq!(snap.alarm.status, 0);
}
#[tokio::test]
#[serial]
async fn test_monitor_receives_updates() {
let client = setup(vec![("TEST:MON", EpicsValue::Double(0.0))])
.await
.unwrap();
let ch = client.create_channel("TEST:MON");
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let mut monitor = ch.subscribe().await.unwrap();
let snap = tokio::time::timeout(Duration::from_secs(3), monitor.recv())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(snap.value, EpicsValue::Double(0.0));
ch.put(&EpicsValue::Double(123.0)).await.unwrap();
let snap = tokio::time::timeout(Duration::from_secs(3), monitor.recv())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(snap.value, EpicsValue::Double(123.0));
}
#[tokio::test]
#[serial]
async fn test_get_nonexistent_pv_times_out() {
let client = setup(vec![("TEST:EXISTS", EpicsValue::Double(0.0))])
.await
.unwrap();
let ch = client.create_channel("TEST:DOES_NOT_EXIST");
let result = ch.wait_connected(Duration::from_secs(2)).await;
assert!(result.is_err());
}
#[tokio::test]
#[serial]
async fn test_get_with_metadata_all_native_types() {
let pvs = vec![
("T:DOUBLE", EpicsValue::Double(1.5)),
("T:FLOAT", EpicsValue::Float(2.5)),
("T:LONG", EpicsValue::Long(42)),
("T:SHORT", EpicsValue::Short(-7)),
("T:CHAR", EpicsValue::Char(0xAB)),
("T:ENUM", EpicsValue::Enum(3)),
("T:STRING", EpicsValue::String("test".into())),
];
let client = setup(pvs).await.unwrap();
for (name, expected) in [
("T:DOUBLE", EpicsValue::Double(1.5)),
("T:FLOAT", EpicsValue::Float(2.5)),
("T:LONG", EpicsValue::Long(42)),
("T:SHORT", EpicsValue::Short(-7)),
("T:CHAR", EpicsValue::Char(0xAB)),
("T:ENUM", EpicsValue::Enum(3)),
("T:STRING", EpicsValue::String("test".into())),
] {
let ch = client.create_channel(name);
ch.wait_connected(Duration::from_secs(3)).await.unwrap();
let snap = ch.get_with_metadata(DbrClass::Time).await.unwrap();
assert_eq!(snap.value, expected, "mismatch for {name}");
}
}