1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
// Copyright 2026 Tine Zata
// SPDX-License-Identifier: MPL-2.0
mod test_pvxs_monitor_builder {
use pvxs_sys::{Context, Monitor, MonitorEvent, NTScalarMetadataBuilder, PvxsError, Server};
use std::thread;
use std::time::Duration;
// Simple callback for basic testing
extern "C" fn simple_test_callback() {
// This is just a placeholder for testing callback registration
}
#[test]
fn test_local_server_failes_to_connect_to_remote_client_monitor() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")?;
// Create isolated server for testing
let server = Server::start_isolated()?;
// Create PV with initial value (automatically added to server)
server.create_pv_double(
"TEST:MonitorBuilder:LocalFail",
3.14,
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(100));
let mut ctx = Context::from_env()?;
// Attempt to create monitor using builder from remote client context
let monitor_result: Result<Monitor, PvxsError> = ctx
.monitor_builder("TEST:MonitorBuilder:LocalFail")?
.connect_exception(false) // Suppress connection exceptions
.disconnect_exception(false) // Suppress disconnection exceptions
.exec();
match monitor_result {
Ok(mut monitor) => {
// Start monitoring
assert!(monitor.start().is_ok());
thread::sleep(Duration::from_millis(1000)); // Wait a bit for connection attempt
// Since server is isolated, monitor should not connect
assert!(
!monitor.is_connected(),
"Monitor should not connect to isolated local server"
);
assert!(monitor.stop().is_ok());
}
Err(e) => {
assert!(false, "Monitor creation failed unexpectedly: {:?}", e);
}
}
assert!(server.stop_drop().is_ok());
Ok(())
}
use serial_test::serial;
/// Test basic MonitorBuilder creation and configuration
#[test]
#[serial]
fn test_monitor_builder_creation() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")?;
let pv_name = "TEST:MonitorBuilder:Creation";
// Now start a new server from_env to test actual connection
let mut server = Server::start_from_env().expect("Failed to start server from environment");
server.create_pv_double(pv_name, 1.0, NTScalarMetadataBuilder::new())?;
let mut ctx = Context::from_env()?;
// Test MonitorBuilder creation again - this time server is running
let mut _monitor: Result<Monitor, PvxsError> = ctx
.monitor_builder(pv_name)?
.connect_exception(false) // Suppress connection exceptions
.disconnect_exception(false) // Suppress disconnection exceptions
.exec();
match _monitor {
Ok(mut mon) => {
// Start the monitor
mon.start()?;
// Give more time for connection to establish
thread::sleep(Duration::from_millis(2000));
// is_connected() now properly uses Connect object to check actual connection
assert!(
mon.is_connected(),
"Monitor should be connected to from_env server"
);
// stop the server
assert!(server.stop_drop().is_ok());
thread::sleep(Duration::from_millis(1000));
// After stopping server, should detect disconnection
assert_eq!(
mon.is_connected(),
false,
"Monitor should be disconnected after server stop"
);
// start the server again - stop() destroyed the old ServerImpl and all PVs,
// so the new server is a blank slate and the PV must be re-registered.
server =
Server::start_from_env().expect("Failed to restart server from environment");
server
.create_pv_double(pv_name, 1.0, NTScalarMetadataBuilder::new())
.expect("Failed to re-register PV on restarted server");
thread::sleep(Duration::from_millis(1000));
// Give more time for reconnection (might take longer than initial connection)
thread::sleep(Duration::from_millis(5000));
assert!(
mon.is_connected(),
"Monitor should reconnect after server restart"
);
assert!(server.stop_drop().is_ok());
}
Err(e) => {
assert!(false, "Monitor creation failed: {:?}", e);
}
}
Ok(())
}
/// Test Monitor pop() method following PVXS pattern
#[test]
fn test_monitor_pop_functionality() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")
.expect("Error in setting logger level");
// Create isolated server for testing
let server = Server::start_from_env()?;
// Create PV with initial value (automatically added to server)
server.create_pv_double(
"TEST:MonitorBuilder:Pop",
10.0,
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(100));
let mut ctx = Context::from_env()?;
// Create monitor using builder
let mut monitor = ctx
.monitor_builder("TEST:MonitorBuilder:Pop")?
.connect_exception(false)
.exec()?;
// Start monitoring
monitor.start().expect("Failed to start monitor");
// Give time for initial connection
thread::sleep(Duration::from_millis(200));
// Test pop() method - should initially be empty or have connection data
match monitor.pop() {
Ok(Some(value)) => {
// Try to get the value field
assert!(
value.get_field_double("value").is_ok(),
"Method - should initially have value field"
);
}
Ok(None) => assert!(true, "Queue initially empty"),
Err(e) => {
assert!(false, "Error popping from monitor queue: {:?}", e);
}
}
// Now test updating the PV via client PUT operation
let new_value = 25.5;
match ctx.put_double("TEST:MonitorBuilder:Pop", new_value, 2.0) {
Ok(_) => {
// Give time for update to propagate
thread::sleep(Duration::from_millis(200));
// Try to pop the update
let mut updates_received = 0;
for _ in 0..5 {
// Try a few times
match monitor.pop() {
Ok(Some(value)) => {
updates_received += 1;
// Try to extract the value
if let Ok(val) = value.get_field_double("value") {
assert_eq!(
val, new_value,
"Popped value should match updated PV value"
);
}
}
Ok(None) => {
thread::sleep(Duration::from_millis(50));
}
Err(_e) => {
thread::sleep(Duration::from_millis(50));
}
}
}
if updates_received > 0 {
} else {
}
}
Err(e) => assert!(false, "Failed to PUT new value: {}", e),
}
monitor.stop()?;
server.stop_drop()?;
Ok(())
}
/// Test real Rust function callback functionality
#[test]
fn test_monitor_builder_with_callback() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")?;
// Create isolated server for testing
let server = Server::start_from_env()?;
server.create_pv_double(
"TEST:MonitorBuilder:Callback",
42.0,
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(100));
let mut ctx = Context::from_env()?;
// Create monitor with actual Rust callback function
let mut monitor = ctx
.monitor_builder("TEST:MonitorBuilder:Callback")?
.connect_exception(true) // Throw connection exceptions in queue
.disconnect_exception(true) // Throw disconnection exceptions in queue
.event(simple_test_callback) // Set a simple callback
.exec()?;
// Start monitoring
monitor.start()?;
// Wait for initial connection
thread::sleep(Duration::from_millis(1000));
// Test if we can see any activity in the monitor queue (connection events, etc.)
let mut events_seen = 0;
for _attempt in 1..=3 {
match monitor.pop() {
Ok(Some(value)) => {
events_seen += 1;
let _ = value.get_field_double("value");
}
Ok(None) => {}
Err(_e) => {
events_seen += 1;
}
}
thread::sleep(Duration::from_millis(100));
}
// Update the PV via client PUT
for i in 1..=3 {
let new_value = 100.0 + i as f64;
// Use client PUT to update the PV
match ctx.put_double("TEST:MonitorBuilder:Callback", new_value, 2.0) {
Ok(_) => {
thread::sleep(Duration::from_millis(200));
}
Err(e) => assert!(false, "PUT failed: {}", e),
}
}
// Give extra time for all updates to be processed
thread::sleep(Duration::from_millis(500));
// Verify we can pop() the values from the queue
let mut values_popped = 0;
while let Ok(Some(value)) = monitor.pop() {
values_popped += 1;
let _ = value.get_field_double("value");
}
// Check that we received some events
assert!(
values_popped > 0 || events_seen > 0,
"Expected to receive some events, got values={} events={}",
values_popped,
events_seen
);
monitor.stop()?;
server.stop_drop()?;
Ok(())
}
/// Test MonitorBuilder with string PV
#[test]
fn test_monitor_builder_string_pv() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")?;
let server = Server::start_from_env()?;
server.create_pv_string(
"TEST:MonitorBuilder:String",
"Hello MonitorBuilder",
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(100));
let mut ctx = Context::from_env()?;
let mut monitor = ctx
.monitor_builder("TEST:MonitorBuilder:String")?
.connect_exception(false)
.disconnect_exception(false)
.exec()?;
monitor.start()?;
thread::sleep(Duration::from_millis(200));
// Try to get initial value
match monitor.pop() {
Ok(Some(value)) => {
assert!(
value.get_field_string("value").is_ok(),
"Method - should initially have value field"
);
}
Ok(None) => assert!(true, "String PV queue initially empty"),
Err(e) => assert!(false, "String PV event: {}", e),
}
monitor.stop()?;
server.stop_drop()?;
Ok(())
}
/// Test error handling in MonitorBuilder
#[test]
fn test_monitor_builder_error_handling() {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")
.expect("Error in setting logger level");
let mut ctx = Context::from_env().expect("Context creation failed");
// Test with non-existent PV
match ctx.monitor_builder("NONEXISTENT:PV:NAME") {
Ok(builder) => {
// Builder creation should succeed, but exec might fail or timeout
// Try to execute - this might succeed but the monitor won't connect
match builder.exec() {
Ok(_monitor) => {}
Err(e) => assert!(
true,
"Expected error creating monitor for non-existent PV: {}",
e
),
}
}
Err(e) => assert!(
true,
"Expected error creating builder for non-existent PV: {}",
e
),
}
}
/// Test monitoring with multiple rapid value changes
#[test]
fn test_monitor_builder_rapid_updates() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")
.expect("Error in setting logger level");
let server = Server::start_from_env()?;
server.create_pv_double(
"TEST:MonitorBuilder:Rapid",
0.0,
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(100));
let mut ctx = Context::from_env()?;
let mut monitor = ctx
.monitor_builder("TEST:MonitorBuilder:Rapid")?
.connect_exception(true) // Throw connection exceptions
.exec()?;
monitor.start()?;
thread::sleep(Duration::from_millis(200));
// Clear initial events
while monitor.pop().unwrap_or(None).is_some() {}
// Post rapid updates using client PUT operations
for i in 1..=5 {
let _ = ctx.put_double("TEST:MonitorBuilder:Rapid", i as f64, 1.0);
thread::sleep(Duration::from_millis(20)); // Small delay between updates
}
// Give time for all updates to propagate
thread::sleep(Duration::from_millis(200));
// Collect all updates
let mut updates = Vec::new();
while let Ok(Some(value)) = monitor.pop() {
if let Ok(val) = value.get_field_double("value") {
updates.push(val);
}
}
if !updates.is_empty() {}
monitor.stop()?;
server.stop_drop()?;
Ok(())
}
/// Integration test comparing MonitorBuilder vs regular Monitor
#[test]
fn test_monitor_builder_vs_regular_monitor() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")?;
let server = Server::start_from_env()?;
server.create_pv_double(
"TEST:MonitorBuilder:Compare",
100.0,
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(100));
let mut ctx = Context::from_env()?;
// Create monitor using traditional method
let mut regular_monitor = ctx.monitor("TEST:MonitorBuilder:Compare")?;
regular_monitor.start()?;
// Create monitor using builder
let mut builder_monitor = ctx
.monitor_builder("TEST:MonitorBuilder:Compare")?
.connect_exception(true) // Throw connection exceptions
.exec()?;
builder_monitor.start()?;
thread::sleep(Duration::from_millis(200));
// Test that both monitors work
// Both should be monitoring the same PV
assert_eq!(regular_monitor.name(), builder_monitor.name());
// Both should detect updates
let _ = ctx.put_double("TEST:MonitorBuilder:Compare", 999.9, 1.0);
thread::sleep(Duration::from_millis(100));
let _regular_has_update = regular_monitor.has_update();
let _builder_has_update = builder_monitor.has_update();
regular_monitor.stop()?;
builder_monitor.stop()?;
server.stop_drop()?;
Ok(())
}
/// Test callbacks with continuously incrementing server-side value
#[test]
fn test_monitor_builder_with_server_side_counter() -> Result<(), PvxsError> {
use pvxs_sys::MonitorEvent;
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")?;
// Create server using from_env instead of create_isolated
let server = Server::start_from_env()?;
server.create_pv_double(
"TEST:MonitorBuilder:Counter",
0.0,
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(200));
let mut ctx = Context::from_env()?;
// Create monitor with callback
let mut monitor = ctx
.monitor_builder("TEST:MonitorBuilder:Counter")?
.connect_exception(true)
.disconnect_exception(true)
.event(simple_test_callback)
.exec()?;
// Start monitoring
monitor.start()?;
// Wait for initial connection
thread::sleep(Duration::from_millis(500));
// Use context to PUT values
let mut ctx_clone = Context::from_env()?;
// Spawn background thread to continuously update the value
let counter_handle = thread::spawn(move || {
for i in 1..=10 {
let _ = ctx_clone.put_double("TEST:MonitorBuilder:Counter", i as f64, 1.0);
thread::sleep(Duration::from_millis(200));
}
});
// Wait for background thread to finish
counter_handle.join().unwrap();
// Give time for remaining updates
thread::sleep(Duration::from_millis(500));
// Check queue state - drain all values
let mut values_received = 0;
loop {
match monitor.pop() {
Ok(Some(value)) => {
values_received += 1;
let _ = value.get_field_double("value");
}
Ok(None) => {
break;
}
Err(MonitorEvent::Connected(_)) => {
// Ignore connection events
}
Err(_e) => {
assert!(false, "Error popping from monitor: {:?}", _e);
}
}
}
assert!(
values_received > 0,
"Expected to receive some values from server updates, got {}",
values_received
);
monitor.stop()?;
server.stop_drop()?;
Ok(())
}
/// Test that demonstrates the correct PVXS event callback pattern:
/// 1. Event fires when queue goes empty -> not-empty
/// 2. Drain queue completely (sets needNotify back to true)
/// 3. Post new value (queue empty -> not-empty again)
/// 4. Event fires again
#[test]
fn test_monitor_builder_proper_event_pattern() -> Result<(), PvxsError> {
// Suppress pvxs.tcp.setup Server unable to bind port 5075
pvxs_sys::set_logger_level("pvxs.tcp.setup", "CRIT")?;
// Create server using from_env
let server = Server::start_from_env()?;
server.create_pv_double(
"TEST:MonitorBuilder:EventPattern",
0.0,
NTScalarMetadataBuilder::new(),
)?;
thread::sleep(Duration::from_millis(200));
let mut ctx = Context::from_env()?;
// Create monitor with callback
let mut monitor = ctx
.monitor_builder("TEST:MonitorBuilder:EventPattern")?
.event(simple_test_callback)
.exec()?;
// Start monitoring
monitor.start()?;
// Wait for initial connection and drain any connection events
thread::sleep(Duration::from_millis(500));
let mut _drained = 0;
while let Ok(Some(_)) = monitor.pop() {
_drained += 1;
}
// Queue is now EMPTY
// Post a single value
ctx.put_double("TEST:MonitorBuilder:EventPattern", 100.0, 1.0)?;
// Wait for update
thread::sleep(Duration::from_millis(500));
// Drain the queue completely
let mut values_popped = 0;
while let Ok(Some(value)) = monitor.pop() {
values_popped += 1;
assert!(
value.get_field_double("value").is_ok(),
"Should have value field when draining"
);
}
// Queue is now EMPTY again
// Post another value
ctx.put_double("TEST:MonitorBuilder:EventPattern", 200.0, 1.0)?;
// Wait for update
thread::sleep(Duration::from_millis(500));
// Drain again
let mut values_popped_2 = 0;
while let Ok(Some(value)) = monitor.pop() {
values_popped_2 += 1;
assert!(
value.get_field_double("value").is_ok(),
"Should have value field when draining"
);
}
// We expect to have received both values
assert!(
values_popped > 0 && values_popped_2 > 0,
"Expected to receive both posted values, got first={} second={}",
values_popped,
values_popped_2
);
monitor.stop().expect("Monitor stop failed");
server.stop_drop()?;
Ok(())
}
#[test]
fn test_monitor_error_after_stop() -> Result<(), PvxsError> {
// This test demonstrates ClientError when trying to pop after stopping the monitor
let srv = Server::start_from_env()?;
srv.create_pv_double("test:stop:error", 3.14, NTScalarMetadataBuilder::new())?;
thread::sleep(Duration::from_millis(500));
let mut ctx = Context::from_env()?;
let mut monitor = ctx.monitor_builder("test:stop:error")?.exec()?;
monitor.start()?;
thread::sleep(Duration::from_millis(500));
// Stop the monitor
monitor.stop()?;
// Try to pop after stopping - should get ClientError
match monitor.pop() {
Ok(None) => {
assert!(false, "Queue empty (monitor stopped)");
}
Ok(Some(_)) => {
assert!(false, "Unexpectedly got data after stop");
}
Err(MonitorEvent::ClientError(msg)) => {
assert!(
true,
"Expected ClientError after stopping monitor but got: {}",
msg
);
}
Err(e) => {
assert!(false, "Got unexpected error type: {:?}", e);
}
}
srv.stop_drop()?;
Ok(())
}
}