mod test_pvxs_monitor_callbacks {
use pvxs_sys::{AtomicUsize, Context, MonitorEvent, NTScalarMetadataBuilder, Ordering, Server};
use std::thread;
use std::time::Duration;
static CONNECT_COUNTER: AtomicUsize = AtomicUsize::new(0);
static DISCONNECT_COUNTER: AtomicUsize = AtomicUsize::new(0);
static EVENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
extern "C" fn connection_callback() {
CONNECT_COUNTER.fetch_add(1, Ordering::SeqCst);
println!(
"Connection event detected! Count: {}",
CONNECT_COUNTER.load(Ordering::SeqCst)
);
}
extern "C" fn disconnection_callback() {
DISCONNECT_COUNTER.fetch_add(1, Ordering::SeqCst);
println!(
"Disconnection event detected! Count: {}",
DISCONNECT_COUNTER.load(Ordering::SeqCst)
);
}
extern "C" fn generic_event_callback() {
EVENT_COUNTER.fetch_add(1, Ordering::SeqCst);
println!(
"Event detected! Count: {}",
EVENT_COUNTER.load(Ordering::SeqCst)
);
}
#[test]
fn test_monitor_connection_and_disconnection_events_off() {
let srv = Server::start_from_env().expect("Failed to create server");
srv.create_pv_double("callback:test:stop", 2.71, NTScalarMetadataBuilder::new())
.expect("Failed to create PV");
thread::sleep(Duration::from_millis(500));
let mut ctx = Context::from_env().expect("Failed to create context");
let mut monitor1 = ctx
.monitor_builder("callback:test:stop")
.expect("Failed to create monitor builder")
.connect_exception(false) .disconnect_exception(false) .exec()
.expect("Failed to create monitor1");
monitor1.start().expect("Failed to start monitor1");
thread::sleep(Duration::from_millis(500));
let mut got_connected_exception = false;
let mut got_disconnected_exception = false;
let mut got_finished_exception = false;
let mut got_remote_error_exception = false;
let mut got_generic_exception = false;
let mut data_count1 = 0;
for _ in 0..20 {
match monitor1.pop() {
Ok(Some(_)) => data_count1 += 1,
Ok(None) => break,
Err(MonitorEvent::Connected(_)) => {
got_connected_exception = true;
}
Err(MonitorEvent::Disconnected(_)) => {
got_disconnected_exception = true;
}
Err(MonitorEvent::Finished(_)) => {
got_finished_exception = true;
}
Err(MonitorEvent::RemoteError(_)) => {
got_remote_error_exception = true;
}
Err(MonitorEvent::ClientError(_)) => {
got_generic_exception = true;
}
}
}
monitor1.stop().expect("Failed to stop monitor1");
assert!(
!got_connected_exception,
"Did not expect a connection exception with connect_exception(false)"
);
assert!(
!got_disconnected_exception,
"Did not expect a disconnection exception with disconnect_exception(false)"
);
assert!(
!got_finished_exception,
"Did not expect a finished exception with disconnect_exception(false)"
);
assert!(
!got_remote_error_exception,
"Did not expect a remote error exception"
);
assert!(
!got_generic_exception,
"Did not expect a generic client error exception"
);
assert_eq!(
data_count1, 1,
"Expected data with as pop returns data after the initial connection"
);
srv.stop_drop().expect("Failed to stop server");
}
#[test]
fn test_monitor_connection_on_and_disconnection_off() {
let srv = Server::start_from_env().expect("Failed to create server");
srv.create_pv_double("callback:test:stop", 2.71, NTScalarMetadataBuilder::new())
.expect("Failed to create PV");
let mut ctx = Context::from_env().expect("Failed to create context");
thread::sleep(Duration::from_millis(500));
let mut got_connected_exception = false;
let mut got_disconnected_exception = false;
let mut got_finished_exception = false;
let mut got_remote_error_exception = false;
let mut got_generic_exception = false;
let mut monitor2 = ctx
.monitor_builder("callback:test:stop")
.expect("Failed to create monitor builder")
.connect_exception(true) .disconnect_exception(false) .exec()
.expect("Failed to create monitor2");
monitor2.start().expect("Failed to start monitor2");
thread::sleep(Duration::from_millis(500));
let mut data_count2 = 0;
for _ in 0..20 {
match monitor2.pop() {
Ok(Some(_)) => data_count2 += 1,
Ok(None) => break,
Err(MonitorEvent::Connected(_)) => {
got_connected_exception = true;
}
Err(MonitorEvent::Disconnected(_)) => {
got_disconnected_exception = true;
}
Err(MonitorEvent::Finished(_)) => {
got_finished_exception = true;
}
Err(MonitorEvent::RemoteError(_)) => {
got_remote_error_exception = true;
}
Err(MonitorEvent::ClientError(_)) => {
got_generic_exception = true;
}
}
}
monitor2.stop().expect("Failed to stop monitor2");
assert!(
got_connected_exception,
"Expected connection exception to be queued as data with connect_exception(true)"
);
assert!(
!got_disconnected_exception,
"Did not expect disconnection exception with disconnect_exception(false)"
);
assert!(!got_finished_exception, "Did not expect finished exception");
assert!(
!got_remote_error_exception,
"Did not expect a remote error exception"
);
assert!(
!got_generic_exception,
"Did not expect a generic client error exception"
);
assert!(
data_count2 > 0,
"Expected data before disconnection occurred, but got {}",
data_count2
);
srv.stop_drop().expect("Failed to stop server");
}
#[test]
fn test_monitor_connection_off_disconnection_on() {
let mut srv = Some(Server::start_from_env().expect("Failed to create server"));
srv.as_ref()
.unwrap()
.create_pv_double("callback:test:stop", 2.71, NTScalarMetadataBuilder::new())
.expect("Failed to create PV");
let mut ctx = Context::from_env().expect("Failed to create context");
thread::sleep(Duration::from_millis(500));
let mut got_connected_exception = false;
let mut got_disconnected_exception = false;
let mut got_finished_exception = false;
let mut got_remote_error_exception = false;
let mut got_generic_exception = false;
let mut monitor3 = ctx
.monitor_builder("callback:test:stop")
.expect("Failed to create monitor builder")
.connect_exception(false) .disconnect_exception(true) .exec()
.expect("Failed to create monitor3");
monitor3.start().expect("Failed to start monitor3");
thread::sleep(Duration::from_millis(500));
let mut data_count3 = 0;
for i in 0..20 {
match monitor3.pop() {
Ok(Some(_)) => data_count3 += 1,
Ok(None) => { }
Err(MonitorEvent::Connected(e)) => {
got_connected_exception = true;
println!("Unexpected connected exception: {}", e);
}
Err(MonitorEvent::Disconnected(e)) => {
got_disconnected_exception = true;
println!("Disconnection event detected: {}", e);
}
Err(MonitorEvent::Finished(e)) => {
got_finished_exception = true;
println!("Finished event detected: {}", e);
}
Err(MonitorEvent::RemoteError(e)) => {
got_remote_error_exception = true;
println!("Remote error event detected: {}", e);
}
Err(MonitorEvent::ClientError(e)) => {
got_generic_exception = true;
println!("Generic client error: {}", e);
}
}
if i == 10 {
srv.take()
.unwrap()
.stop_drop()
.expect("Failed to stop server to trigger disconnection");
thread::sleep(Duration::from_millis(500));
}
}
thread::sleep(Duration::from_millis(500));
assert!(
got_disconnected_exception,
"Expected disconnection exception to be thrown with disconnect_exception(true)"
);
assert!(
!got_connected_exception,
"Did not expect a connection exception with connect_exception(false)"
);
assert!(!got_finished_exception, "Did not expect finished exception");
assert!(
!got_remote_error_exception,
"Did not expect a remote error exception"
);
assert!(
!got_generic_exception,
"Did not expect a generic client error exception"
);
assert!(
data_count3 > 0,
"Expected data before disconnection occurred, but got {}",
data_count3
);
}
#[test]
fn test_monitor_multiple_callbacks() {
CONNECT_COUNTER.store(0, Ordering::SeqCst);
DISCONNECT_COUNTER.store(0, Ordering::SeqCst);
let srv = Server::start_from_env().expect("Failed to create server");
let _pv = srv
.create_pv_double("callback:test:multi", 4.56, NTScalarMetadataBuilder::new())
.expect("Failed to create PV");
thread::sleep(Duration::from_millis(500));
let mut ctx = Context::from_env().expect("Failed to create context");
let mut mon_connect = ctx
.monitor_builder("callback:test:multi")
.expect("Failed to create monitor builder")
.connect_exception(true)
.disconnect_exception(false)
.event(connection_callback)
.exec()
.expect("Failed to create connection monitor");
let mut mon_disconnect = ctx
.monitor_builder("callback:test:multi")
.expect("Failed to create monitor builder")
.connect_exception(false)
.disconnect_exception(true)
.event(disconnection_callback)
.exec()
.expect("Failed to create disconnection monitor");
mon_connect
.start()
.expect("Failed to start connection monitor");
mon_disconnect
.start()
.expect("Failed to start disconnection monitor");
thread::sleep(Duration::from_millis(1000));
let connect_count = CONNECT_COUNTER.load(Ordering::SeqCst);
let disconnect_count = DISCONNECT_COUNTER.load(Ordering::SeqCst);
println!("Connection callbacks: {}", connect_count);
println!("Disconnection callbacks: {}", disconnect_count);
assert!(connect_count > 0, "Expected connection callbacks");
mon_connect
.stop()
.expect("Failed to stop connection monitor");
mon_disconnect
.stop()
.expect("Failed to stop disconnection monitor");
srv.stop_drop().expect("Failed to stop server");
}
#[test]
fn test_monitor_callback_with_updates() {
EVENT_COUNTER.store(0, Ordering::SeqCst);
let name = "callback:test:updates";
let srv = Server::start_from_env().expect("Failed to create server");
srv.create_pv_double(name, 0.0, NTScalarMetadataBuilder::new())
.expect("Failed to create PV");
thread::sleep(Duration::from_millis(500));
let mut ctx = Context::from_env().expect("Failed to create context");
let mut monitor = ctx
.monitor_builder(name)
.expect("Failed to create monitor builder")
.connect_exception(true)
.disconnect_exception(false)
.event(generic_event_callback)
.exec()
.expect("Failed to create monitor");
monitor.start().expect("Failed to start monitor");
thread::sleep(Duration::from_millis(500));
loop {
match monitor.pop() {
Ok(Some(_)) => {}
Ok(None) => break,
Err(_) => {}
}
}
EVENT_COUNTER.store(0, Ordering::SeqCst);
for i in 1..=5 {
srv.post_double(name, i as f64)
.expect("Failed to post value");
thread::sleep(Duration::from_millis(100));
loop {
match monitor.pop() {
Ok(Some(_)) => {}
Ok(None) => break,
Err(_) => {}
}
}
}
thread::sleep(Duration::from_millis(200));
let event_count = EVENT_COUNTER.load(Ordering::SeqCst);
println!("Callbacks after {} updates: {}", 5, event_count);
assert_eq!(
event_count, 5,
"Expected 5 callbacks, one per post (got {})",
event_count
);
monitor.stop().expect("Failed to stop monitor");
srv.stop_drop().expect("Failed to stop server");
}
#[test]
fn test_monitor_multiple_client_monitors() {
let srv = Server::start_from_env().expect("Failed to create server");
srv.create_pv_double("callback:test:mask", 1.23, NTScalarMetadataBuilder::new())
.expect("Failed to create PV");
thread::sleep(Duration::from_millis(500));
let mut ctx = Context::from_env().expect("Failed to create context");
EVENT_COUNTER.store(0, Ordering::SeqCst);
let mut monitor1 = ctx
.monitor_builder("callback:test:mask")
.expect("Failed to create monitor builder")
.connect_exception(false) .disconnect_exception(false)
.event(generic_event_callback)
.exec()
.expect("Failed to create monitor1");
monitor1.start().expect("Failed to start monitor1");
thread::sleep(Duration::from_millis(500));
let count1 = EVENT_COUNTER.load(Ordering::SeqCst);
println!("Events with connection masked: {}", count1);
EVENT_COUNTER.store(0, Ordering::SeqCst);
let mut monitor2 = ctx
.monitor_builder("callback:test:mask")
.expect("Failed to create monitor builder")
.connect_exception(true) .disconnect_exception(false)
.event(generic_event_callback)
.exec()
.expect("Failed to create monitor2");
monitor2.start().expect("Failed to start monitor2");
thread::sleep(Duration::from_millis(500));
let count2 = EVENT_COUNTER.load(Ordering::SeqCst);
println!("Events with connection enabled: {}", count2);
assert!(
count2 >= count1,
"Expected more events with connection enabled ({}) vs masked ({})",
count2,
count1
);
monitor1.stop().expect("Failed to stop monitor1");
monitor2.stop().expect("Failed to stop monitor2");
srv.stop_drop().expect("Failed to stop server");
}
}