1use crossbeam_channel as channel;
4use cxx::UniquePtr;
5use std::collections::HashMap;
6use std::thread;
7
8use crate::{
9 bridge, compute_alarm_for_scalar, AlarmConfig, AlarmMetadata, AlarmSeverity, AlarmStatus,
10 ControlMetadata, DisplayMetadata, PvxsError, Result, Value,
11};
12
13pub(crate) struct ServerImpl {
19 inner: UniquePtr<bridge::ServerWrapper>,
20}
21
22impl ServerImpl {
23 pub fn from_env() -> Result<Self> {
31 let inner = bridge::server_create_from_env()?;
32 Ok(Self { inner })
33 }
34
35 pub fn create_isolated() -> Result<Self> {
51 let inner = bridge::server_create_isolated()?;
52 Ok(Self { inner })
53 }
54
55 pub fn start(&mut self) -> Result<()> {
63 bridge::server_start(self.inner.pin_mut())?;
64 Ok(())
65 }
66
67 pub fn stop(&mut self) -> Result<()> {
75 bridge::server_stop(self.inner.pin_mut())?;
76 Ok(())
77 }
78
79 pub(crate) fn add_pv(&mut self, name: &str, pv: &mut SharedPV) -> Result<()> {
89 bridge::server_add_pv(self.inner.pin_mut(), name.to_string(), pv.inner.pin_mut())?;
90 Ok(())
91 }
92
93 pub fn remove_pv(&mut self, name: &str) -> Result<()> {
101 bridge::server_remove_pv(self.inner.pin_mut(), name.to_string())?;
102 Ok(())
103 }
104
105 pub fn tcp_port(&self) -> u16 {
125 bridge::server_get_tcp_port(&self.inner)
126 }
127
128 pub fn udp_port(&self) -> u16 {
132 bridge::server_get_udp_port(&self.inner)
133 }
134
135 pub fn create_pv_double(
155 &mut self,
156 name: &str,
157 initial_value: f64,
158 metadata: NTScalarMetadataBuilder,
159 ) -> Result<SharedPV> {
160 let mut pv = SharedPV::create_mailbox()?;
161 pv.open_double(initial_value, metadata)?;
162 self.add_pv(name, &mut pv)?;
163 Ok(pv)
164 }
165
166 pub fn create_pv_double_array(
177 &mut self,
178 name: &str,
179 initial_value: Vec<f64>,
180 metadata: NTScalarMetadataBuilder,
181 ) -> Result<SharedPV> {
182 if initial_value.is_empty() {
183 return Err(PvxsError::new("Initial double array cannot be empty"));
184 }
185 let mut pv = SharedPV::create_mailbox()?;
186 pv.open_double_array(initial_value, metadata)?;
187 self.add_pv(name, &mut pv)?;
188 Ok(pv)
189 }
190
191 pub fn create_pv_int32(
201 &mut self,
202 name: &str,
203 initial_value: i32,
204 metadata: NTScalarMetadataBuilder,
205 ) -> Result<SharedPV> {
206 let mut pv = SharedPV::create_mailbox()?;
207 pv.open_int32(initial_value, metadata)?;
208 self.add_pv(name, &mut pv)?;
209 Ok(pv)
210 }
211
212 pub fn create_pv_int32_array(
223 &mut self,
224 name: &str,
225 initial_value: Vec<i32>,
226 metadata: NTScalarMetadataBuilder,
227 ) -> Result<SharedPV> {
228 if initial_value.is_empty() {
229 return Err(PvxsError::new("Initial int32 array cannot be empty"));
230 }
231 let mut pv = SharedPV::create_mailbox()?;
232 pv.open_int32_array(initial_value, metadata)?;
233 self.add_pv(name, &mut pv)?;
234 Ok(pv)
235 }
236
237 pub fn create_pv_string(
247 &mut self,
248 name: &str,
249 initial_value: &str,
250 metadata: NTScalarMetadataBuilder,
251 ) -> Result<SharedPV> {
252 let mut pv = SharedPV::create_mailbox()?;
253 pv.open_string(initial_value, metadata)?;
254 self.add_pv(name, &mut pv)?;
255 Ok(pv)
256 }
257
258 pub fn create_pv_string_array(
269 &mut self,
270 name: &str,
271 initial_value: Vec<String>,
272 metadata: NTScalarMetadataBuilder,
273 ) -> Result<SharedPV> {
274 if initial_value.is_empty() {
275 return Err(PvxsError::new("Initial string array cannot be empty"));
276 }
277 let mut pv = SharedPV::create_mailbox()?;
278 pv.open_string_array(initial_value, metadata)?;
279 self.add_pv(name, &mut pv)?;
280 Ok(pv)
281 }
282
283 pub fn create_pv_enum(
294 &mut self,
295 name: &str,
296 choices: Vec<&str>,
297 selected_index: i16,
298 metadata: NTEnumMetadataBuilder,
299 ) -> Result<SharedPV> {
300 let mut pv = SharedPV::create_mailbox()?;
301 pv.open_enum(choices, selected_index, metadata)?;
302 self.add_pv(name, &mut pv)?;
303 Ok(pv)
304 }
305
306 }
325
326#[derive(Debug, Clone)]
328pub struct FetchedDouble {
329 pub value: f64,
330 pub alarm_severity: AlarmSeverity,
331 pub alarm_status: AlarmStatus,
332 pub alarm_message: String,
333 pub display_metadata: Option<DisplayMetadata>,
334 pub control_metadata: Option<ControlMetadata>,
335 pub alarm_metadata: Option<AlarmMetadata>,
336}
337
338#[derive(Debug, Clone)]
340pub struct FetchedInt32 {
341 pub value: i32,
342 pub alarm_severity: AlarmSeverity,
343 pub alarm_status: AlarmStatus,
344 pub alarm_message: String,
345 pub display_metadata: Option<DisplayMetadata>,
346 pub control_metadata: Option<ControlMetadata>,
347 pub alarm_metadata: Option<AlarmMetadata>,
348}
349
350#[derive(Debug, Clone)]
352pub struct FetchedString {
353 pub value: String,
354 pub alarm_severity: AlarmSeverity,
355 pub alarm_status: AlarmStatus,
356 pub alarm_message: String,
357}
358
359#[derive(Debug, Clone)]
361pub struct FetchedDoubleArray {
362 pub value: Vec<f64>,
363 pub alarm_severity: AlarmSeverity,
364 pub alarm_status: AlarmStatus,
365 pub alarm_message: String,
366 pub display_metadata: Option<DisplayMetadata>,
367 pub control_metadata: Option<ControlMetadata>,
368 pub alarm_metadata: Option<AlarmMetadata>,
369}
370
371#[derive(Debug, Clone)]
373pub struct FetchedInt32Array {
374 pub value: Vec<i32>,
375 pub alarm_severity: AlarmSeverity,
376 pub alarm_status: AlarmStatus,
377 pub alarm_message: String,
378 pub display_metadata: Option<DisplayMetadata>,
379 pub control_metadata: Option<ControlMetadata>,
380 pub alarm_metadata: Option<AlarmMetadata>,
381}
382
383#[derive(Debug, Clone)]
385pub struct FetchedStringArray {
386 pub value: Vec<String>,
387 pub alarm_severity: AlarmSeverity,
388 pub alarm_status: AlarmStatus,
389 pub alarm_message: String,
390}
391
392#[derive(Debug, Clone)]
394pub struct FetchedEnum {
395 pub value: i16,
396 pub value_choices: Vec<String>,
397 pub alarm_severity: AlarmSeverity,
398 pub alarm_status: AlarmStatus,
399 pub alarm_message: String,
400}
401
402enum ManagerCommand {
403 CreateDouble {
404 name: String,
405 initial: f64,
406 metadata: NTScalarMetadataBuilder,
407 reply: channel::Sender<Result<()>>,
408 },
409 CreateDoubleArray {
410 name: String,
411 initial: Vec<f64>,
412 metadata: NTScalarMetadataBuilder,
413 reply: channel::Sender<Result<()>>,
414 },
415 CreateInt32 {
416 name: String,
417 initial: i32,
418 metadata: NTScalarMetadataBuilder,
419 reply: channel::Sender<Result<()>>,
420 },
421 CreateInt32Array {
422 name: String,
423 initial: Vec<i32>,
424 metadata: NTScalarMetadataBuilder,
425 reply: channel::Sender<Result<()>>,
426 },
427 CreateString {
428 name: String,
429 initial: String,
430 metadata: NTScalarMetadataBuilder,
431 reply: channel::Sender<Result<()>>,
432 },
433 CreateStringArray {
434 name: String,
435 initial: Vec<String>,
436 metadata: NTScalarMetadataBuilder,
437 reply: channel::Sender<Result<()>>,
438 },
439 CreateEnum {
440 name: String,
441 choices: Vec<String>,
442 selected_index: i16,
443 metadata: NTEnumMetadataBuilder,
444 reply: channel::Sender<Result<()>>,
445 },
446 PostDouble {
447 name: String,
448 value: f64,
449 reply: channel::Sender<Result<()>>,
450 },
451 PostDoubleArray {
452 name: String,
453 value: Vec<f64>,
454 reply: channel::Sender<Result<()>>,
455 },
456 PostInt32 {
457 name: String,
458 value: i32,
459 reply: channel::Sender<Result<()>>,
460 },
461 PostInt32Array {
462 name: String,
463 value: Vec<i32>,
464 reply: channel::Sender<Result<()>>,
465 },
466 PostString {
467 name: String,
468 value: String,
469 reply: channel::Sender<Result<()>>,
470 },
471 PostStringArray {
472 name: String,
473 value: Vec<String>,
474 reply: channel::Sender<Result<()>>,
475 },
476 PostEnum {
477 name: String,
478 value: i16,
479 reply: channel::Sender<Result<()>>,
480 },
481 Remove {
482 name: String,
483 reply: channel::Sender<Result<()>>,
484 },
485 FetchDouble {
486 name: String,
487 reply: channel::Sender<Result<FetchedDouble>>,
488 },
489 FetchInt32 {
490 name: String,
491 reply: channel::Sender<Result<FetchedInt32>>,
492 },
493 FetchString {
494 name: String,
495 reply: channel::Sender<Result<FetchedString>>,
496 },
497 FetchDoubleArray {
498 name: String,
499 reply: channel::Sender<Result<FetchedDoubleArray>>,
500 },
501 FetchInt32Array {
502 name: String,
503 reply: channel::Sender<Result<FetchedInt32Array>>,
504 },
505 FetchStringArray {
506 name: String,
507 reply: channel::Sender<Result<FetchedStringArray>>,
508 },
509 FetchEnum {
510 name: String,
511 reply: channel::Sender<Result<FetchedEnum>>,
512 },
513 Stop {
514 reply: channel::Sender<Result<()>>,
515 },
516}
517
518enum ManagedPv {
519 Double {
520 pv: SharedPV,
521 alarm: AlarmConfig,
522 last: f64,
523 },
524 DoubleArray(SharedPV),
525 Int32 {
526 pv: SharedPV,
527 alarm: AlarmConfig,
528 last: i32,
529 },
530 Int32Array(SharedPV),
531 String(SharedPV),
532 StringArray(SharedPV),
533 PvEnum(SharedPV),
534}
535
536#[derive(Clone)]
541pub struct ServerHandle {
542 tx: channel::Sender<ManagerCommand>,
543 tcp_port: u16,
544 udp_port: u16,
545}
546
547impl ServerHandle {
548 pub fn tcp_port(&self) -> u16 {
549 self.tcp_port
550 }
551
552 pub fn udp_port(&self) -> u16 {
553 self.udp_port
554 }
555
556 pub fn create_pv_double(
557 &self,
558 name: &str,
559 initial: f64,
560 metadata: NTScalarMetadataBuilder,
561 ) -> Result<()> {
562 let (reply_tx, reply_rx) = channel::bounded(1);
563 self.tx
564 .send(ManagerCommand::CreateDouble {
565 name: name.to_string(),
566 initial,
567 metadata,
568 reply: reply_tx,
569 })
570 .map_err(|_| PvxsError::new("Server worker stopped"))?;
571 reply_rx
572 .recv()
573 .map_err(|_| PvxsError::new("Server worker stopped"))?
574 }
575
576 pub fn create_pv_double_array(
577 &self,
578 name: &str,
579 initial: Vec<f64>,
580 metadata: NTScalarMetadataBuilder,
581 ) -> Result<()> {
582 let (reply_tx, reply_rx) = channel::bounded(1);
583 self.tx
584 .send(ManagerCommand::CreateDoubleArray {
585 name: name.to_string(),
586 initial,
587 metadata,
588 reply: reply_tx,
589 })
590 .map_err(|_| PvxsError::new("Server worker stopped"))?;
591 reply_rx
592 .recv()
593 .map_err(|_| PvxsError::new("Server worker stopped"))?
594 }
595
596 pub fn create_pv_int32(
597 &self,
598 name: &str,
599 initial: i32,
600 metadata: NTScalarMetadataBuilder,
601 ) -> Result<()> {
602 let (reply_tx, reply_rx) = channel::bounded(1);
603 self.tx
604 .send(ManagerCommand::CreateInt32 {
605 name: name.to_string(),
606 initial,
607 metadata,
608 reply: reply_tx,
609 })
610 .map_err(|_| PvxsError::new("Server worker stopped"))?;
611 reply_rx
612 .recv()
613 .map_err(|_| PvxsError::new("Server worker stopped"))?
614 }
615
616 pub fn create_pv_int32_array(
617 &self,
618 name: &str,
619 initial: Vec<i32>,
620 metadata: NTScalarMetadataBuilder,
621 ) -> Result<()> {
622 let (reply_tx, reply_rx) = channel::bounded(1);
623 self.tx
624 .send(ManagerCommand::CreateInt32Array {
625 name: name.to_string(),
626 initial,
627 metadata,
628 reply: reply_tx,
629 })
630 .map_err(|_| PvxsError::new("Server worker stopped"))?;
631 reply_rx
632 .recv()
633 .map_err(|_| PvxsError::new("Server worker stopped"))?
634 }
635
636 pub fn create_pv_string(
637 &self,
638 name: &str,
639 initial: &str,
640 metadata: NTScalarMetadataBuilder,
641 ) -> Result<()> {
642 let (reply_tx, reply_rx) = channel::bounded(1);
643 self.tx
644 .send(ManagerCommand::CreateString {
645 name: name.to_string(),
646 initial: initial.to_string(),
647 metadata,
648 reply: reply_tx,
649 })
650 .map_err(|_| PvxsError::new("Server worker stopped"))?;
651 reply_rx
652 .recv()
653 .map_err(|_| PvxsError::new("Server worker stopped"))?
654 }
655
656 pub fn create_pv_string_array(
657 &self,
658 name: &str,
659 initial: Vec<String>,
660 metadata: NTScalarMetadataBuilder,
661 ) -> Result<()> {
662 let (reply_tx, reply_rx) = channel::bounded(1);
663 self.tx
664 .send(ManagerCommand::CreateStringArray {
665 name: name.to_string(),
666 initial,
667 metadata,
668 reply: reply_tx,
669 })
670 .map_err(|_| PvxsError::new("Server worker stopped"))?;
671 reply_rx
672 .recv()
673 .map_err(|_| PvxsError::new("Server worker stopped"))?
674 }
675
676 pub fn create_pv_enum(
677 &self,
678 name: &str,
679 choices: Vec<&str>,
680 selected_index: i16,
681 metadata: NTEnumMetadataBuilder,
682 ) -> Result<()> {
683 let (reply_tx, reply_rx) = channel::bounded(1);
684 self.tx
685 .send(ManagerCommand::CreateEnum {
686 name: name.to_string(),
687 choices: choices.iter().map(|s| s.to_string()).collect(),
688 selected_index,
689 metadata,
690 reply: reply_tx,
691 })
692 .map_err(|_| PvxsError::new("Server worker stopped"))?;
693 reply_rx
694 .recv()
695 .map_err(|_| PvxsError::new("Server worker stopped"))?
696 }
697
698 pub fn post_double(&self, name: &str, value: f64) -> Result<()> {
699 let (reply_tx, reply_rx) = channel::bounded(1);
700 self.tx
701 .send(ManagerCommand::PostDouble {
702 name: name.to_string(),
703 value,
704 reply: reply_tx,
705 })
706 .map_err(|_| PvxsError::new("Server worker stopped"))?;
707 reply_rx
708 .recv()
709 .map_err(|_| PvxsError::new("Server worker stopped"))?
710 }
711
712 pub fn post_double_array(&self, name: &str, value: Vec<f64>) -> Result<()> {
713 let (reply_tx, reply_rx) = channel::bounded(1);
714 self.tx
715 .send(ManagerCommand::PostDoubleArray {
716 name: name.to_string(),
717 value,
718 reply: reply_tx,
719 })
720 .map_err(|_| PvxsError::new("Server worker stopped"))?;
721 reply_rx
722 .recv()
723 .map_err(|_| PvxsError::new("Server worker stopped"))?
724 }
725
726 pub fn post_int32(&self, name: &str, value: i32) -> Result<()> {
727 let (reply_tx, reply_rx) = channel::bounded(1);
728 self.tx
729 .send(ManagerCommand::PostInt32 {
730 name: name.to_string(),
731 value,
732 reply: reply_tx,
733 })
734 .map_err(|_| PvxsError::new("Server worker stopped"))?;
735 reply_rx
736 .recv()
737 .map_err(|_| PvxsError::new("Server worker stopped"))?
738 }
739
740 pub fn post_int32_array(&self, name: &str, value: Vec<i32>) -> Result<()> {
741 let (reply_tx, reply_rx) = channel::bounded(1);
742 self.tx
743 .send(ManagerCommand::PostInt32Array {
744 name: name.to_string(),
745 value,
746 reply: reply_tx,
747 })
748 .map_err(|_| PvxsError::new("Server worker stopped"))?;
749 reply_rx
750 .recv()
751 .map_err(|_| PvxsError::new("Server worker stopped"))?
752 }
753
754 pub fn post_string(&self, name: &str, value: &str) -> Result<()> {
755 let (reply_tx, reply_rx) = channel::bounded(1);
756 self.tx
757 .send(ManagerCommand::PostString {
758 name: name.to_string(),
759 value: value.to_string(),
760 reply: reply_tx,
761 })
762 .map_err(|_| PvxsError::new("Server worker stopped"))?;
763 reply_rx
764 .recv()
765 .map_err(|_| PvxsError::new("Server worker stopped"))?
766 }
767
768 pub fn post_string_array(&self, name: &str, value: Vec<String>) -> Result<()> {
769 let (reply_tx, reply_rx) = channel::bounded(1);
770 self.tx
771 .send(ManagerCommand::PostStringArray {
772 name: name.to_string(),
773 value,
774 reply: reply_tx,
775 })
776 .map_err(|_| PvxsError::new("Server worker stopped"))?;
777 reply_rx
778 .recv()
779 .map_err(|_| PvxsError::new("Server worker stopped"))?
780 }
781
782 pub fn post_enum(&self, name: &str, value: i16) -> Result<()> {
783 let (reply_tx, reply_rx) = channel::bounded(1);
784 self.tx
785 .send(ManagerCommand::PostEnum {
786 name: name.to_string(),
787 value,
788 reply: reply_tx,
789 })
790 .map_err(|_| PvxsError::new("Server worker stopped"))?;
791 reply_rx
792 .recv()
793 .map_err(|_| PvxsError::new("Server worker stopped"))?
794 }
795
796 pub fn remove_pv(&self, name: &str) -> Result<()> {
797 let (reply_tx, reply_rx) = channel::bounded(1);
798 self.tx
799 .send(ManagerCommand::Remove {
800 name: name.to_string(),
801 reply: reply_tx,
802 })
803 .map_err(|_| PvxsError::new("Server worker stopped"))?;
804 reply_rx
805 .recv()
806 .map_err(|_| PvxsError::new("Server worker stopped"))?
807 }
808
809 pub fn fetch_double(&self, name: &str) -> Result<FetchedDouble> {
810 let (reply_tx, reply_rx) = channel::bounded(1);
811 self.tx
812 .send(ManagerCommand::FetchDouble {
813 name: name.to_string(),
814 reply: reply_tx,
815 })
816 .map_err(|_| PvxsError::new("Server worker stopped"))?;
817 reply_rx
818 .recv()
819 .map_err(|_| PvxsError::new("Server worker stopped"))?
820 }
821
822 pub fn fetch_int32(&self, name: &str) -> Result<FetchedInt32> {
823 let (reply_tx, reply_rx) = channel::bounded(1);
824 self.tx
825 .send(ManagerCommand::FetchInt32 {
826 name: name.to_string(),
827 reply: reply_tx,
828 })
829 .map_err(|_| PvxsError::new("Server worker stopped"))?;
830 reply_rx
831 .recv()
832 .map_err(|_| PvxsError::new("Server worker stopped"))?
833 }
834
835 pub fn fetch_string(&self, name: &str) -> Result<FetchedString> {
836 let (reply_tx, reply_rx) = channel::bounded(1);
837 self.tx
838 .send(ManagerCommand::FetchString {
839 name: name.to_string(),
840 reply: reply_tx,
841 })
842 .map_err(|_| PvxsError::new("Server worker stopped"))?;
843 reply_rx
844 .recv()
845 .map_err(|_| PvxsError::new("Server worker stopped"))?
846 }
847
848 pub fn fetch_double_array(&self, name: &str) -> Result<FetchedDoubleArray> {
849 let (reply_tx, reply_rx) = channel::bounded(1);
850 self.tx
851 .send(ManagerCommand::FetchDoubleArray {
852 name: name.to_string(),
853 reply: reply_tx,
854 })
855 .map_err(|_| PvxsError::new("Server worker stopped"))?;
856 reply_rx
857 .recv()
858 .map_err(|_| PvxsError::new("Server worker stopped"))?
859 }
860
861 pub fn fetch_int32_array(&self, name: &str) -> Result<FetchedInt32Array> {
862 let (reply_tx, reply_rx) = channel::bounded(1);
863 self.tx
864 .send(ManagerCommand::FetchInt32Array {
865 name: name.to_string(),
866 reply: reply_tx,
867 })
868 .map_err(|_| PvxsError::new("Server worker stopped"))?;
869 reply_rx
870 .recv()
871 .map_err(|_| PvxsError::new("Server worker stopped"))?
872 }
873
874 pub fn fetch_string_array(&self, name: &str) -> Result<FetchedStringArray> {
875 let (reply_tx, reply_rx) = channel::bounded(1);
876 self.tx
877 .send(ManagerCommand::FetchStringArray {
878 name: name.to_string(),
879 reply: reply_tx,
880 })
881 .map_err(|_| PvxsError::new("Server worker stopped"))?;
882 reply_rx
883 .recv()
884 .map_err(|_| PvxsError::new("Server worker stopped"))?
885 }
886
887 pub fn fetch_enum(&self, name: &str) -> Result<FetchedEnum> {
888 let (reply_tx, reply_rx) = channel::bounded(1);
889 self.tx
890 .send(ManagerCommand::FetchEnum {
891 name: name.to_string(),
892 reply: reply_tx,
893 })
894 .map_err(|_| PvxsError::new("Server worker stopped"))?;
895 reply_rx
896 .recv()
897 .map_err(|_| PvxsError::new("Server worker stopped"))?
898 }
899}
900
901pub struct Server {
936 handle: ServerHandle,
937 join: Option<thread::JoinHandle<()>>,
938}
939
940impl Server {
941 pub fn start_from_env() -> Result<Self> {
950 Self::start_inner(false)
951 }
952
953 pub fn start_isolated() -> Result<Self> {
962 Self::start_inner(true)
963 }
964
965 pub fn handle(&self) -> ServerHandle {
969 self.handle.clone()
970 }
971
972 pub fn tcp_port(&self) -> u16 {
973 self.handle.tcp_port()
974 }
975
976 pub fn udp_port(&self) -> u16 {
977 self.handle.udp_port()
978 }
979
980 pub fn create_pv_double(
981 &self,
982 name: &str,
983 initial: f64,
984 metadata: NTScalarMetadataBuilder,
985 ) -> Result<()> {
986 self.handle.create_pv_double(name, initial, metadata)
987 }
988
989 pub fn create_pv_double_array(
990 &self,
991 name: &str,
992 initial: Vec<f64>,
993 metadata: NTScalarMetadataBuilder,
994 ) -> Result<()> {
995 self.handle.create_pv_double_array(name, initial, metadata)
996 }
997
998 pub fn create_pv_int32(
999 &self,
1000 name: &str,
1001 initial: i32,
1002 metadata: NTScalarMetadataBuilder,
1003 ) -> Result<()> {
1004 self.handle.create_pv_int32(name, initial, metadata)
1005 }
1006
1007 pub fn create_pv_int32_array(
1008 &self,
1009 name: &str,
1010 initial: Vec<i32>,
1011 metadata: NTScalarMetadataBuilder,
1012 ) -> Result<()> {
1013 self.handle.create_pv_int32_array(name, initial, metadata)
1014 }
1015
1016 pub fn create_pv_string(
1017 &self,
1018 name: &str,
1019 initial: &str,
1020 metadata: NTScalarMetadataBuilder,
1021 ) -> Result<()> {
1022 self.handle.create_pv_string(name, initial, metadata)
1023 }
1024
1025 pub fn create_pv_string_array(
1026 &self,
1027 name: &str,
1028 initial: Vec<String>,
1029 metadata: NTScalarMetadataBuilder,
1030 ) -> Result<()> {
1031 self.handle.create_pv_string_array(name, initial, metadata)
1032 }
1033
1034 pub fn create_pv_enum(
1035 &self,
1036 name: &str,
1037 choices: Vec<&str>,
1038 selected_index: i16,
1039 metadata: NTEnumMetadataBuilder,
1040 ) -> Result<()> {
1041 self.handle
1042 .create_pv_enum(name, choices, selected_index, metadata)
1043 }
1044
1045 pub fn post_double(&self, name: &str, value: f64) -> Result<()> {
1046 self.handle.post_double(name, value)
1047 }
1048
1049 pub fn post_double_array(&self, name: &str, value: Vec<f64>) -> Result<()> {
1050 self.handle.post_double_array(name, value)
1051 }
1052
1053 pub fn post_int32(&self, name: &str, value: i32) -> Result<()> {
1054 self.handle.post_int32(name, value)
1055 }
1056
1057 pub fn post_int32_array(&self, name: &str, value: Vec<i32>) -> Result<()> {
1058 self.handle.post_int32_array(name, value)
1059 }
1060
1061 pub fn post_string(&self, name: &str, value: &str) -> Result<()> {
1062 self.handle.post_string(name, value)
1063 }
1064
1065 pub fn post_string_array(&self, name: &str, value: Vec<String>) -> Result<()> {
1066 self.handle.post_string_array(name, value)
1067 }
1068
1069 pub fn post_enum(&self, name: &str, value: i16) -> Result<()> {
1070 self.handle.post_enum(name, value)
1071 }
1072
1073 pub fn remove_pv(&self, name: &str) -> Result<()> {
1074 self.handle.remove_pv(name)
1075 }
1076
1077 pub fn fetch_double(&self, name: &str) -> Result<FetchedDouble> {
1078 self.handle.fetch_double(name)
1079 }
1080
1081 pub fn fetch_int32(&self, name: &str) -> Result<FetchedInt32> {
1082 self.handle.fetch_int32(name)
1083 }
1084
1085 pub fn fetch_string(&self, name: &str) -> Result<FetchedString> {
1086 self.handle.fetch_string(name)
1087 }
1088
1089 pub fn fetch_double_array(&self, name: &str) -> Result<FetchedDoubleArray> {
1090 self.handle.fetch_double_array(name)
1091 }
1092
1093 pub fn fetch_int32_array(&self, name: &str) -> Result<FetchedInt32Array> {
1094 self.handle.fetch_int32_array(name)
1095 }
1096
1097 pub fn fetch_string_array(&self, name: &str) -> Result<FetchedStringArray> {
1098 self.handle.fetch_string_array(name)
1099 }
1100
1101 pub fn fetch_enum(&self, name: &str) -> Result<FetchedEnum> {
1102 self.handle.fetch_enum(name)
1103 }
1104
1105 pub fn stop_drop(mut self) -> Result<()> {
1111 let (reply_tx, reply_rx) = channel::bounded(1);
1112 self.handle
1113 .tx
1114 .send(ManagerCommand::Stop { reply: reply_tx })
1115 .map_err(|_| PvxsError::new("Server worker stopped"))?;
1116 let result = reply_rx
1117 .recv()
1118 .map_err(|_| PvxsError::new("Server worker stopped"))?;
1119 if let Some(join) = self.join.take() {
1120 let _ = join.join();
1121 }
1122 result
1123 }
1124
1125 fn start_inner(isolated: bool) -> Result<Self> {
1126 let (tx, rx) = channel::unbounded::<ManagerCommand>();
1127 let (ready_tx, ready_rx) = channel::bounded::<Result<(u16, u16)>>(1);
1128
1129 let join = thread::spawn(move || {
1130 let mut server = if isolated {
1131 match ServerImpl::create_isolated() {
1132 Ok(s) => s,
1133 Err(e) => {
1134 let _ = ready_tx.send(Err(e));
1135 return;
1136 }
1137 }
1138 } else {
1139 match ServerImpl::from_env() {
1140 Ok(s) => s,
1141 Err(e) => {
1142 let _ = ready_tx.send(Err(e));
1143 return;
1144 }
1145 }
1146 };
1147
1148 if let Err(e) = server.start() {
1149 let _ = ready_tx.send(Err(e));
1150 return;
1151 }
1152
1153 let _ = ready_tx.send(Ok((server.tcp_port(), server.udp_port())));
1154
1155 let mut pvs: HashMap<String, ManagedPv> = HashMap::new();
1156
1157 while let Ok(cmd) = rx.recv() {
1158 match cmd {
1159 ManagerCommand::CreateDouble {
1160 name,
1161 initial,
1162 metadata,
1163 reply,
1164 } => {
1165 let result = if pvs.contains_key(&name) {
1166 Err(PvxsError::new("PV already exists"))
1167 } else {
1168 let alarm = AlarmConfig {
1169 control: metadata.control.clone(),
1170 alarm_metadata: metadata.alarm_metadata.clone(),
1171 };
1172 let alarm_result = compute_alarm_for_scalar(initial, &alarm);
1174 let mut metadata_with_alarm = metadata;
1176 metadata_with_alarm.alarm_severity = alarm_result.severity;
1177 metadata_with_alarm.alarm_status = alarm_result.status;
1178 metadata_with_alarm.alarm_message = alarm_result.message.clone();
1179
1180 match server.create_pv_double(&name, initial, metadata_with_alarm) {
1181 Ok(pv) => {
1182 pvs.insert(
1183 name,
1184 ManagedPv::Double {
1185 pv,
1186 alarm,
1187 last: initial,
1188 },
1189 );
1190 Ok(())
1191 }
1192 Err(e) => Err(e),
1193 }
1194 };
1195 let _ = reply.send(result);
1196 }
1197 ManagerCommand::CreateDoubleArray {
1198 name,
1199 initial,
1200 metadata,
1201 reply,
1202 } => {
1203 let result = if pvs.contains_key(&name) {
1204 Err(PvxsError::new("PV already exists"))
1205 } else {
1206 match server.create_pv_double_array(&name, initial, metadata) {
1207 Ok(pv) => {
1208 pvs.insert(name, ManagedPv::DoubleArray(pv));
1209 Ok(())
1210 }
1211 Err(e) => Err(e),
1212 }
1213 };
1214 let _ = reply.send(result);
1215 }
1216 ManagerCommand::CreateInt32 {
1217 name,
1218 initial,
1219 metadata,
1220 reply,
1221 } => {
1222 let result = if pvs.contains_key(&name) {
1223 Err(PvxsError::new("PV already exists"))
1224 } else {
1225 let alarm = AlarmConfig {
1226 control: metadata.control.clone(),
1227 alarm_metadata: metadata.alarm_metadata.clone(),
1228 };
1229 let alarm_result = compute_alarm_for_scalar(initial as f64, &alarm);
1231 let mut metadata_with_alarm = metadata;
1233 metadata_with_alarm.alarm_severity = alarm_result.severity;
1234 metadata_with_alarm.alarm_status = alarm_result.status;
1235 metadata_with_alarm.alarm_message = alarm_result.message.clone();
1236
1237 match server.create_pv_int32(&name, initial, metadata_with_alarm) {
1238 Ok(pv) => {
1239 pvs.insert(
1240 name,
1241 ManagedPv::Int32 {
1242 pv,
1243 alarm,
1244 last: initial,
1245 },
1246 );
1247 Ok(())
1248 }
1249 Err(e) => Err(e),
1250 }
1251 };
1252 let _ = reply.send(result);
1253 }
1254 ManagerCommand::CreateInt32Array {
1255 name,
1256 initial,
1257 metadata,
1258 reply,
1259 } => {
1260 let result = if pvs.contains_key(&name) {
1261 Err(PvxsError::new("PV already exists"))
1262 } else {
1263 match server.create_pv_int32_array(&name, initial, metadata) {
1264 Ok(pv) => {
1265 pvs.insert(name, ManagedPv::Int32Array(pv));
1266 Ok(())
1267 }
1268 Err(e) => Err(e),
1269 }
1270 };
1271 let _ = reply.send(result);
1272 }
1273 ManagerCommand::CreateString {
1274 name,
1275 initial,
1276 metadata,
1277 reply,
1278 } => {
1279 let result = if pvs.contains_key(&name) {
1280 Err(PvxsError::new("PV already exists"))
1281 } else {
1282 match server.create_pv_string(&name, &initial, metadata) {
1283 Ok(pv) => {
1284 pvs.insert(name, ManagedPv::String(pv));
1285 Ok(())
1286 }
1287 Err(e) => Err(e),
1288 }
1289 };
1290 let _ = reply.send(result);
1291 }
1292 ManagerCommand::CreateStringArray {
1293 name,
1294 initial,
1295 metadata,
1296 reply,
1297 } => {
1298 let result = if pvs.contains_key(&name) {
1299 Err(PvxsError::new("PV already exists"))
1300 } else {
1301 match server.create_pv_string_array(&name, initial, metadata) {
1302 Ok(pv) => {
1303 pvs.insert(name, ManagedPv::StringArray(pv));
1304 Ok(())
1305 }
1306 Err(e) => Err(e),
1307 }
1308 };
1309 let _ = reply.send(result);
1310 }
1311 ManagerCommand::CreateEnum {
1312 name,
1313 choices,
1314 selected_index,
1315 metadata,
1316 reply,
1317 } => {
1318 let result = if pvs.contains_key(&name) {
1319 Err(PvxsError::new("PV already exists"))
1320 } else {
1321 let choices_refs: Vec<&str> =
1322 choices.iter().map(|s| s.as_str()).collect();
1323 match server.create_pv_enum(
1324 &name,
1325 choices_refs,
1326 selected_index,
1327 metadata,
1328 ) {
1329 Ok(pv) => {
1330 pvs.insert(name, ManagedPv::PvEnum(pv));
1331 Ok(())
1332 }
1333 Err(e) => Err(e),
1334 }
1335 };
1336 let _ = reply.send(result);
1337 }
1338 ManagerCommand::PostDouble { name, value, reply } => {
1339 let result = match pvs.get_mut(&name) {
1340 Some(ManagedPv::Double { pv, alarm, last }) => {
1341 let alarm_result = compute_alarm_for_scalar(value, alarm);
1342 let post_value = if alarm_result.allow { value } else { *last };
1344 let result = pv.post_double_with_alarm(
1345 post_value,
1346 alarm_result.severity,
1347 alarm_result.status,
1348 alarm_result.message,
1349 );
1350 if result.is_ok() && alarm_result.allow {
1351 *last = post_value;
1352 }
1353 result
1354 }
1355 _ => Err(PvxsError::new("PV not found or type mismatch")),
1356 };
1357 let _ = reply.send(result);
1358 }
1359 ManagerCommand::PostDoubleArray { name, value, reply } => {
1360 let result = match pvs.get_mut(&name) {
1361 Some(ManagedPv::DoubleArray(pv)) => pv.post_double_array(&value),
1362 _ => Err(PvxsError::new("PV not found or type mismatch")),
1363 };
1364 let _ = reply.send(result);
1365 }
1366 ManagerCommand::PostInt32 { name, value, reply } => {
1367 let result = match pvs.get_mut(&name) {
1368 Some(ManagedPv::Int32 { pv, alarm, last }) => {
1369 let alarm_result = compute_alarm_for_scalar(value as f64, alarm);
1370 let post_value = if alarm_result.allow { value } else { *last };
1372 let result = pv.post_int32_with_alarm(
1373 post_value,
1374 alarm_result.severity,
1375 alarm_result.status,
1376 alarm_result.message,
1377 );
1378 if result.is_ok() && alarm_result.allow {
1379 *last = post_value;
1380 }
1381 result
1382 }
1383 _ => Err(PvxsError::new("PV not found or type mismatch")),
1384 };
1385 let _ = reply.send(result);
1386 }
1387 ManagerCommand::PostInt32Array { name, value, reply } => {
1388 let result = match pvs.get_mut(&name) {
1389 Some(ManagedPv::Int32Array(pv)) => pv.post_int32_array(&value),
1390 _ => Err(PvxsError::new("PV not found or type mismatch")),
1391 };
1392 let _ = reply.send(result);
1393 }
1394 ManagerCommand::PostString { name, value, reply } => {
1395 let result = match pvs.get_mut(&name) {
1396 Some(ManagedPv::String(pv)) => pv.post_string(&value),
1397 _ => Err(PvxsError::new("PV not found or type mismatch")),
1398 };
1399 let _ = reply.send(result);
1400 }
1401 ManagerCommand::PostStringArray { name, value, reply } => {
1402 let result = match pvs.get_mut(&name) {
1403 Some(ManagedPv::StringArray(pv)) => pv.post_string_array(&value),
1404 _ => Err(PvxsError::new("PV not found or type mismatch")),
1405 };
1406 let _ = reply.send(result);
1407 }
1408 ManagerCommand::PostEnum { name, value, reply } => {
1409 let result = match pvs.get_mut(&name) {
1410 Some(ManagedPv::PvEnum(pv)) => pv.post_enum(value),
1411 _ => Err(PvxsError::new("PV not found or type mismatch")),
1412 };
1413 let _ = reply.send(result);
1414 }
1415 ManagerCommand::Remove { name, reply } => {
1416 let result = if pvs.remove(&name).is_some() {
1417 server.remove_pv(&name)
1418 } else {
1419 Err(PvxsError::new("PV not found"))
1420 };
1421 let _ = reply.send(result);
1422 }
1423 ManagerCommand::FetchDouble { name, reply } => {
1424 let result = match pvs.get(&name) {
1425 Some(ManagedPv::Double { pv, .. }) => {
1426 pv.fetch().and_then(|v| {
1427 let display_metadata = (|| -> Option<DisplayMetadata> {
1429 Some(DisplayMetadata {
1430 limit_low: v.get_field_int32("display.limitLow").ok()?
1431 as i64,
1432 limit_high: v
1433 .get_field_int32("display.limitHigh")
1434 .ok()?
1435 as i64,
1436 description: v
1437 .get_field_string("display.description")
1438 .ok()?,
1439 units: v.get_field_string("display.units").ok()?,
1440 precision: v
1441 .get_field_int32("display.precision")
1442 .ok()?,
1443 })
1444 })();
1445
1446 let control_metadata = (|| -> Option<ControlMetadata> {
1448 Some(ControlMetadata {
1449 limit_low: v
1450 .get_field_double("control.limitLow")
1451 .ok()?,
1452 limit_high: v
1453 .get_field_double("control.limitHigh")
1454 .ok()?,
1455 min_step: v.get_field_double("control.minStep").ok()?,
1456 })
1457 })();
1458
1459 let alarm_metadata = (|| -> Option<AlarmMetadata> {
1461 Some(AlarmMetadata {
1462 active: v.get_field_int32("valueAlarm.active").ok()?
1463 != 0,
1464 low_alarm_limit: v
1465 .get_field_double("valueAlarm.lowAlarmLimit")
1466 .ok()?,
1467 low_warning_limit: v
1468 .get_field_double("valueAlarm.lowWarningLimit")
1469 .ok()?,
1470 high_warning_limit: v
1471 .get_field_double("valueAlarm.highWarningLimit")
1472 .ok()?,
1473 high_alarm_limit: v
1474 .get_field_double("valueAlarm.highAlarmLimit")
1475 .ok()?,
1476 low_alarm_severity: AlarmSeverity::from(
1477 v.get_field_int32("valueAlarm.lowAlarmSeverity")
1478 .ok()?,
1479 ),
1480 low_warning_severity: AlarmSeverity::from(
1481 v.get_field_int32("valueAlarm.lowWarningSeverity")
1482 .ok()?,
1483 ),
1484 high_warning_severity: AlarmSeverity::from(
1485 v.get_field_int32("valueAlarm.highWarningSeverity")
1486 .ok()?,
1487 ),
1488 high_alarm_severity: AlarmSeverity::from(
1489 v.get_field_int32("valueAlarm.highAlarmSeverity")
1490 .ok()?,
1491 ),
1492 hysteresis: v
1493 .get_field_int32("valueAlarm.hysteresis")
1494 .ok()?
1495 as u8,
1496 })
1497 })();
1498
1499 Ok(FetchedDouble {
1500 value: v.get_field_double("value")?,
1501 alarm_severity: AlarmSeverity::from(
1502 v.get_field_int32("alarm.severity").unwrap_or(0),
1503 ),
1504 alarm_status: AlarmStatus::from(
1505 v.get_field_int32("alarm.status").unwrap_or(0),
1506 ),
1507 alarm_message: v
1508 .get_field_string("alarm.message")
1509 .unwrap_or_default(),
1510 display_metadata,
1511 control_metadata,
1512 alarm_metadata,
1513 })
1514 })
1515 }
1516 _ => Err(PvxsError::new("PV not found or type mismatch")),
1517 };
1518 let _ = reply.send(result);
1519 }
1520 ManagerCommand::FetchInt32 { name, reply } => {
1521 let result = match pvs.get(&name) {
1522 Some(ManagedPv::Int32 { pv, .. }) => {
1523 pv.fetch().and_then(|v| {
1524 let display_metadata = (|| -> Option<DisplayMetadata> {
1526 Some(DisplayMetadata {
1527 limit_low: v.get_field_int32("display.limitLow").ok()?
1528 as i64,
1529 limit_high: v
1530 .get_field_int32("display.limitHigh")
1531 .ok()?
1532 as i64,
1533 description: v
1534 .get_field_string("display.description")
1535 .ok()?,
1536 units: v.get_field_string("display.units").ok()?,
1537 precision: v
1538 .get_field_int32("display.precision")
1539 .ok()?,
1540 })
1541 })();
1542
1543 let control_metadata = (|| -> Option<ControlMetadata> {
1545 Some(ControlMetadata {
1546 limit_low: v
1547 .get_field_double("control.limitLow")
1548 .ok()?,
1549 limit_high: v
1550 .get_field_double("control.limitHigh")
1551 .ok()?,
1552 min_step: v.get_field_double("control.minStep").ok()?,
1553 })
1554 })();
1555
1556 let alarm_metadata = (|| -> Option<AlarmMetadata> {
1558 Some(AlarmMetadata {
1559 active: v.get_field_int32("valueAlarm.active").ok()?
1560 != 0,
1561 low_alarm_limit: v
1562 .get_field_double("valueAlarm.lowAlarmLimit")
1563 .ok()?,
1564 low_warning_limit: v
1565 .get_field_double("valueAlarm.lowWarningLimit")
1566 .ok()?,
1567 high_warning_limit: v
1568 .get_field_double("valueAlarm.highWarningLimit")
1569 .ok()?,
1570 high_alarm_limit: v
1571 .get_field_double("valueAlarm.highAlarmLimit")
1572 .ok()?,
1573 low_alarm_severity: AlarmSeverity::from(
1574 v.get_field_int32("valueAlarm.lowAlarmSeverity")
1575 .ok()?,
1576 ),
1577 low_warning_severity: AlarmSeverity::from(
1578 v.get_field_int32("valueAlarm.lowWarningSeverity")
1579 .ok()?,
1580 ),
1581 high_warning_severity: AlarmSeverity::from(
1582 v.get_field_int32("valueAlarm.highWarningSeverity")
1583 .ok()?,
1584 ),
1585 high_alarm_severity: AlarmSeverity::from(
1586 v.get_field_int32("valueAlarm.highAlarmSeverity")
1587 .ok()?,
1588 ),
1589 hysteresis: v
1590 .get_field_int32("valueAlarm.hysteresis")
1591 .ok()?
1592 as u8,
1593 })
1594 })();
1595
1596 Ok(FetchedInt32 {
1597 value: v.get_field_int32("value")?,
1598 alarm_severity: AlarmSeverity::from(
1599 v.get_field_int32("alarm.severity").unwrap_or(0),
1600 ),
1601 alarm_status: AlarmStatus::from(
1602 v.get_field_int32("alarm.status").unwrap_or(0),
1603 ),
1604 alarm_message: v
1605 .get_field_string("alarm.message")
1606 .unwrap_or_default(),
1607 display_metadata,
1608 control_metadata,
1609 alarm_metadata,
1610 })
1611 })
1612 }
1613 _ => Err(PvxsError::new("PV not found or type mismatch")),
1614 };
1615 let _ = reply.send(result);
1616 }
1617 ManagerCommand::FetchString { name, reply } => {
1618 let result = match pvs.get(&name) {
1619 Some(ManagedPv::String(pv)) => pv.fetch().and_then(|v| {
1620 Ok(FetchedString {
1621 value: v.get_field_string("value")?,
1622 alarm_severity: AlarmSeverity::from(
1623 v.get_field_int32("alarm.severity").unwrap_or(0),
1624 ),
1625 alarm_status: AlarmStatus::from(
1626 v.get_field_int32("alarm.status").unwrap_or(0),
1627 ),
1628 alarm_message: v
1629 .get_field_string("alarm.message")
1630 .unwrap_or_default(),
1631 })
1632 }),
1633 _ => Err(PvxsError::new("PV not found or type mismatch")),
1634 };
1635 let _ = reply.send(result);
1636 }
1637 ManagerCommand::FetchDoubleArray { name, reply } => {
1638 let result = match pvs.get(&name) {
1639 Some(ManagedPv::DoubleArray(pv)) => pv.fetch().and_then(|v| {
1640 let display_metadata = (|| -> Option<DisplayMetadata> {
1641 Some(DisplayMetadata {
1642 limit_low: v.get_field_int32("display.limitLow").ok()?
1643 as i64,
1644 limit_high: v.get_field_int32("display.limitHigh").ok()?
1645 as i64,
1646 description: v
1647 .get_field_string("display.description")
1648 .ok()?,
1649 units: v.get_field_string("display.units").ok()?,
1650 precision: v.get_field_int32("display.precision").ok()?,
1651 })
1652 })();
1653 let control_metadata = (|| -> Option<ControlMetadata> {
1654 Some(ControlMetadata {
1655 limit_low: v.get_field_double("control.limitLow").ok()?,
1656 limit_high: v.get_field_double("control.limitHigh").ok()?,
1657 min_step: v.get_field_double("control.minStep").ok()?,
1658 })
1659 })();
1660 let alarm_metadata = (|| -> Option<AlarmMetadata> {
1661 Some(AlarmMetadata {
1662 active: v.get_field_int32("valueAlarm.active").ok()? != 0,
1663 low_alarm_limit: v
1664 .get_field_double("valueAlarm.lowAlarmLimit")
1665 .ok()?,
1666 low_warning_limit: v
1667 .get_field_double("valueAlarm.lowWarningLimit")
1668 .ok()?,
1669 high_warning_limit: v
1670 .get_field_double("valueAlarm.highWarningLimit")
1671 .ok()?,
1672 high_alarm_limit: v
1673 .get_field_double("valueAlarm.highAlarmLimit")
1674 .ok()?,
1675 low_alarm_severity: AlarmSeverity::from(
1676 v.get_field_int32("valueAlarm.lowAlarmSeverity")
1677 .ok()?,
1678 ),
1679 low_warning_severity: AlarmSeverity::from(
1680 v.get_field_int32("valueAlarm.lowWarningSeverity")
1681 .ok()?,
1682 ),
1683 high_warning_severity: AlarmSeverity::from(
1684 v.get_field_int32("valueAlarm.highWarningSeverity")
1685 .ok()?,
1686 ),
1687 high_alarm_severity: AlarmSeverity::from(
1688 v.get_field_int32("valueAlarm.highAlarmSeverity")
1689 .ok()?,
1690 ),
1691 hysteresis: v
1692 .get_field_int32("valueAlarm.hysteresis")
1693 .ok()?
1694 as u8,
1695 })
1696 })();
1697 Ok(FetchedDoubleArray {
1698 value: v.get_field_double_array("value")?,
1699 alarm_severity: AlarmSeverity::from(
1700 v.get_field_int32("alarm.severity").unwrap_or(0),
1701 ),
1702 alarm_status: AlarmStatus::from(
1703 v.get_field_int32("alarm.status").unwrap_or(0),
1704 ),
1705 alarm_message: v
1706 .get_field_string("alarm.message")
1707 .unwrap_or_default(),
1708 display_metadata,
1709 control_metadata,
1710 alarm_metadata,
1711 })
1712 }),
1713 _ => Err(PvxsError::new("PV not found or type mismatch")),
1714 };
1715 let _ = reply.send(result);
1716 }
1717 ManagerCommand::FetchInt32Array { name, reply } => {
1718 let result = match pvs.get(&name) {
1719 Some(ManagedPv::Int32Array(pv)) => pv.fetch().and_then(|v| {
1720 let display_metadata = (|| -> Option<DisplayMetadata> {
1721 Some(DisplayMetadata {
1722 limit_low: v.get_field_int32("display.limitLow").ok()?
1723 as i64,
1724 limit_high: v.get_field_int32("display.limitHigh").ok()?
1725 as i64,
1726 description: v
1727 .get_field_string("display.description")
1728 .ok()?,
1729 units: v.get_field_string("display.units").ok()?,
1730 precision: v.get_field_int32("display.precision").ok()?,
1731 })
1732 })();
1733 let control_metadata = (|| -> Option<ControlMetadata> {
1734 Some(ControlMetadata {
1735 limit_low: v.get_field_double("control.limitLow").ok()?,
1736 limit_high: v.get_field_double("control.limitHigh").ok()?,
1737 min_step: v.get_field_double("control.minStep").ok()?,
1738 })
1739 })();
1740 let alarm_metadata = (|| -> Option<AlarmMetadata> {
1741 Some(AlarmMetadata {
1742 active: v.get_field_int32("valueAlarm.active").ok()? != 0,
1743 low_alarm_limit: v
1744 .get_field_double("valueAlarm.lowAlarmLimit")
1745 .ok()?,
1746 low_warning_limit: v
1747 .get_field_double("valueAlarm.lowWarningLimit")
1748 .ok()?,
1749 high_warning_limit: v
1750 .get_field_double("valueAlarm.highWarningLimit")
1751 .ok()?,
1752 high_alarm_limit: v
1753 .get_field_double("valueAlarm.highAlarmLimit")
1754 .ok()?,
1755 low_alarm_severity: AlarmSeverity::from(
1756 v.get_field_int32("valueAlarm.lowAlarmSeverity")
1757 .ok()?,
1758 ),
1759 low_warning_severity: AlarmSeverity::from(
1760 v.get_field_int32("valueAlarm.lowWarningSeverity")
1761 .ok()?,
1762 ),
1763 high_warning_severity: AlarmSeverity::from(
1764 v.get_field_int32("valueAlarm.highWarningSeverity")
1765 .ok()?,
1766 ),
1767 high_alarm_severity: AlarmSeverity::from(
1768 v.get_field_int32("valueAlarm.highAlarmSeverity")
1769 .ok()?,
1770 ),
1771 hysteresis: v
1772 .get_field_int32("valueAlarm.hysteresis")
1773 .ok()?
1774 as u8,
1775 })
1776 })();
1777 Ok(FetchedInt32Array {
1778 value: v.get_field_int32_array("value")?,
1779 alarm_severity: AlarmSeverity::from(
1780 v.get_field_int32("alarm.severity").unwrap_or(0),
1781 ),
1782 alarm_status: AlarmStatus::from(
1783 v.get_field_int32("alarm.status").unwrap_or(0),
1784 ),
1785 alarm_message: v
1786 .get_field_string("alarm.message")
1787 .unwrap_or_default(),
1788 display_metadata,
1789 control_metadata,
1790 alarm_metadata,
1791 })
1792 }),
1793 _ => Err(PvxsError::new("PV not found or type mismatch")),
1794 };
1795 let _ = reply.send(result);
1796 }
1797 ManagerCommand::FetchStringArray { name, reply } => {
1798 let result = match pvs.get(&name) {
1799 Some(ManagedPv::StringArray(pv)) => pv.fetch().and_then(|v| {
1800 Ok(FetchedStringArray {
1801 value: v.get_field_string_array("value")?,
1802 alarm_severity: AlarmSeverity::from(
1803 v.get_field_int32("alarm.severity").unwrap_or(0),
1804 ),
1805 alarm_status: AlarmStatus::from(
1806 v.get_field_int32("alarm.status").unwrap_or(0),
1807 ),
1808 alarm_message: v
1809 .get_field_string("alarm.message")
1810 .unwrap_or_default(),
1811 })
1812 }),
1813 _ => Err(PvxsError::new("PV not found or type mismatch")),
1814 };
1815 let _ = reply.send(result);
1816 }
1817 ManagerCommand::FetchEnum { name, reply } => {
1818 let result = match pvs.get(&name) {
1819 Some(ManagedPv::PvEnum(pv)) => pv.fetch().and_then(|v| {
1820 Ok(FetchedEnum {
1821 value: v.get_field_enum("value.index")?,
1822 value_choices: v
1823 .get_field_string_array("value.choices")
1824 .unwrap_or_default(),
1825 alarm_severity: AlarmSeverity::from(
1826 v.get_field_int32("alarm.severity").unwrap_or(0),
1827 ),
1828 alarm_status: AlarmStatus::from(
1829 v.get_field_int32("alarm.status").unwrap_or(0),
1830 ),
1831 alarm_message: v
1832 .get_field_string("alarm.message")
1833 .unwrap_or_default(),
1834 })
1835 }),
1836 _ => Err(PvxsError::new("PV not found or type mismatch")),
1837 };
1838 let _ = reply.send(result);
1839 }
1840 ManagerCommand::Stop { reply } => {
1841 let result = server.stop();
1842 let _ = reply.send(result);
1843 break;
1844 }
1845 }
1846 }
1847 });
1848
1849 let (tcp_port, udp_port) = ready_rx
1850 .recv()
1851 .map_err(|_| PvxsError::new("Server failed to start"))??;
1852
1853 Ok(Self {
1854 handle: ServerHandle {
1855 tx,
1856 tcp_port,
1857 udp_port,
1858 },
1859 join: Some(join),
1860 })
1861 }
1862}
1863
1864pub struct SharedPV {
1886 inner: UniquePtr<bridge::SharedPVWrapper>,
1887}
1888
1889impl SharedPV {
1890 pub fn create_mailbox() -> Result<Self> {
1894 let inner = bridge::shared_pv_create_mailbox()?;
1895 Ok(Self { inner })
1896 }
1897
1898 pub fn create_readonly() -> Result<Self> {
1902 let inner = bridge::shared_pv_create_readonly()?;
1903 Ok(Self { inner })
1904 }
1905
1906 pub(crate) fn open_double(
1935 &mut self,
1936 initial_value: f64,
1937 metadata: NTScalarMetadataBuilder,
1938 ) -> Result<()> {
1939 let meta = metadata.build()?;
1940 bridge::shared_pv_open_double(self.inner.pin_mut(), initial_value, &meta)?;
1941 Ok(())
1942 }
1943
1944 pub(crate) fn open_double_array(
1951 &mut self,
1952 initial_value: Vec<f64>,
1953 metadata: NTScalarMetadataBuilder,
1954 ) -> Result<()> {
1955 let meta = metadata.build()?;
1956 bridge::shared_pv_open_double_array(self.inner.pin_mut(), initial_value, &meta)?;
1957 Ok(())
1958 }
1959
1960 pub(crate) fn open_enum(
1968 &mut self,
1969 choices: Vec<&str>,
1970 selected_index: i16,
1971 metadata: NTEnumMetadataBuilder,
1972 ) -> Result<()> {
1973 let meta = metadata.build()?;
1974 let choices_vec: Vec<String> = choices.iter().map(|s| s.to_string()).collect();
1975 bridge::shared_pv_open_enum(self.inner.pin_mut(), choices_vec, selected_index, &meta)?;
1976 Ok(())
1977 }
1978
1979 pub(crate) fn open_int32(
1986 &mut self,
1987 initial_value: i32,
1988 metadata: NTScalarMetadataBuilder,
1989 ) -> Result<()> {
1990 let meta = metadata.build()?;
1991 bridge::shared_pv_open_int32(self.inner.pin_mut(), initial_value, &meta)?;
1992 Ok(())
1993 }
1994
1995 pub(crate) fn open_int32_array(
2002 &mut self,
2003 initial_value: Vec<i32>,
2004 metadata: NTScalarMetadataBuilder,
2005 ) -> Result<()> {
2006 let meta = metadata.build()?;
2007 bridge::shared_pv_open_int32_array(self.inner.pin_mut(), initial_value, &meta)?;
2008 Ok(())
2009 }
2010
2011 pub(crate) fn open_string(
2018 &mut self,
2019 initial_value: &str,
2020 metadata: NTScalarMetadataBuilder,
2021 ) -> Result<()> {
2022 let meta = metadata.build()?;
2023 bridge::shared_pv_open_string(self.inner.pin_mut(), initial_value.to_string(), &meta)?;
2024 Ok(())
2025 }
2026
2027 pub(crate) fn open_string_array(
2034 &mut self,
2035 initial_value: Vec<String>,
2036 metadata: NTScalarMetadataBuilder,
2037 ) -> Result<()> {
2038 let meta = metadata.build()?;
2039 bridge::shared_pv_open_string_array(self.inner.pin_mut(), initial_value, &meta)?;
2040 Ok(())
2041 }
2042
2043 pub fn is_open(&self) -> bool {
2045 bridge::shared_pv_is_open(&self.inner)
2046 }
2047
2048 pub fn close(&mut self) -> Result<()> {
2050 bridge::shared_pv_close(self.inner.pin_mut())?;
2051 Ok(())
2052 }
2053
2054 pub fn post_double(&mut self, value: f64) -> Result<()> {
2063 bridge::shared_pv_post_double(self.inner.pin_mut(), value)?;
2064 Ok(())
2065 }
2066
2067 pub fn post_int32(&mut self, value: i32) -> Result<()> {
2076 bridge::shared_pv_post_int32(self.inner.pin_mut(), value)?;
2077 Ok(())
2078 }
2079
2080 pub(crate) fn post_double_with_alarm(
2081 &mut self,
2082 value: f64,
2083 severity: AlarmSeverity,
2084 status: AlarmStatus,
2085 message: String,
2086 ) -> Result<()> {
2087 bridge::shared_pv_post_double_with_alarm(
2088 self.inner.pin_mut(),
2089 value,
2090 severity as i32,
2091 status as i32,
2092 message,
2093 )?;
2094 Ok(())
2095 }
2096
2097 pub(crate) fn post_int32_with_alarm(
2098 &mut self,
2099 value: i32,
2100 severity: AlarmSeverity,
2101 status: AlarmStatus,
2102 message: String,
2103 ) -> Result<()> {
2104 bridge::shared_pv_post_int32_with_alarm(
2105 self.inner.pin_mut(),
2106 value,
2107 severity as i32,
2108 status as i32,
2109 message,
2110 )?;
2111 Ok(())
2112 }
2113
2114 pub fn post_string(&mut self, value: &str) -> Result<()> {
2126 bridge::shared_pv_post_string(self.inner.pin_mut(), value.to_string())?;
2127 Ok(())
2128 }
2129
2130 pub fn post_enum(&mut self, value: i16) -> Result<()> {
2138 bridge::shared_pv_post_enum(self.inner.pin_mut(), value)?;
2139 Ok(())
2140 }
2141
2142 pub fn post_double_array(&mut self, value: &[f64]) -> Result<()> {
2150 if value.is_empty() {
2151 return Err(PvxsError::new("Cannot post empty double array"));
2152 }
2153 bridge::shared_pv_post_double_array(self.inner.pin_mut(), value.to_vec())?;
2154 Ok(())
2155 }
2156
2157 pub fn post_int32_array(&mut self, value: &[i32]) -> Result<()> {
2165 if value.is_empty() {
2166 return Err(PvxsError::new("Cannot post empty int32 array"));
2167 }
2168 bridge::shared_pv_post_int32_array(self.inner.pin_mut(), value.to_vec())?;
2169 Ok(())
2170 }
2171
2172 pub fn post_string_array(&mut self, value: &[String]) -> Result<()> {
2180 if value.is_empty() {
2181 return Err(PvxsError::new("Cannot post empty string array"));
2182 }
2183 bridge::shared_pv_post_string_array(self.inner.pin_mut(), value.to_vec())?;
2184 Ok(())
2185 }
2186
2187 pub fn fetch(&self) -> Result<Value> {
2191 let inner = bridge::shared_pv_fetch(&self.inner)?;
2192 Ok(Value { inner })
2193 }
2194}
2195
2196pub struct StaticSource {
2220 inner: UniquePtr<bridge::StaticSourceWrapper>,
2221}
2222
2223impl StaticSource {
2224 pub fn create() -> Result<Self> {
2226 let inner = bridge::static_source_create()?;
2227 Ok(Self { inner })
2228 }
2229
2230 pub fn add_pv(&mut self, name: &str, pv: &mut SharedPV) -> Result<()> {
2237 bridge::static_source_add_pv(self.inner.pin_mut(), name.to_string(), pv.inner.pin_mut())?;
2238 Ok(())
2239 }
2240
2241 pub fn remove_pv(&mut self, name: &str) -> Result<()> {
2247 bridge::static_source_remove_pv(self.inner.pin_mut(), name.to_string())?;
2248 Ok(())
2249 }
2250
2251 pub fn close_all(&mut self) -> Result<()> {
2253 bridge::static_source_close_all(self.inner.pin_mut())?;
2254 Ok(())
2255 }
2256}
2257
2258#[derive(Debug, Clone)]
2301pub struct NTScalarMetadataBuilder {
2302 alarm_severity: AlarmSeverity,
2303 alarm_status: AlarmStatus,
2304 alarm_message: String,
2305 timestamp_seconds: i64,
2306 timestamp_nanos: i32,
2307 timestamp_user_tag: i32,
2308 display: Option<DisplayMetadata>,
2309 control: Option<ControlMetadata>,
2310 alarm_metadata: Option<AlarmMetadata>,
2311}
2312
2313impl NTScalarMetadataBuilder {
2314 pub fn new() -> Self {
2316 use std::time::{SystemTime, UNIX_EPOCH};
2317 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
2318
2319 Self {
2320 alarm_severity: AlarmSeverity::Invalid,
2321 alarm_status: AlarmStatus::UndefinedStatus,
2322 alarm_message: String::new(),
2323 timestamp_seconds: now.as_secs() as i64,
2324 timestamp_nanos: now.subsec_nanos() as i32,
2325 timestamp_user_tag: 0,
2326 display: None,
2327 control: None,
2328 alarm_metadata: None,
2329 }
2330 }
2331
2332 pub fn alarm(
2334 mut self,
2335 severity: AlarmSeverity,
2336 status: AlarmStatus,
2337 message: impl Into<String>,
2338 ) -> Self {
2339 self.alarm_severity = severity;
2340 self.alarm_status = status;
2341 self.alarm_message = message.into();
2342 self
2343 }
2344
2345 pub fn timestamp(mut self, seconds: i64, nanos: i32, user_tag: i32) -> Self {
2347 self.timestamp_seconds = seconds;
2348 self.timestamp_nanos = nanos;
2349 self.timestamp_user_tag = user_tag;
2350 self
2351 }
2352
2353 pub fn display(mut self, meta: DisplayMetadata) -> Self {
2355 self.display = Some(meta);
2356 self
2357 }
2358
2359 pub fn control(mut self, meta: ControlMetadata) -> Self {
2361 self.control = Some(meta);
2362 self
2363 }
2364
2365 pub fn alarm_metadata(mut self, meta: AlarmMetadata) -> Self {
2367 self.alarm_metadata = Some(meta);
2368 self
2369 }
2370
2371 fn build(self) -> Result<cxx::UniquePtr<bridge::NTScalarMetadata>> {
2373 let alarm = bridge::create_alarm(
2375 self.alarm_severity as i32,
2376 self.alarm_status as i32,
2377 self.alarm_message,
2378 );
2379 let time_stamp = bridge::create_time(
2380 self.timestamp_seconds,
2381 self.timestamp_nanos,
2382 self.timestamp_user_tag,
2383 );
2384
2385 let make_display = |d: &DisplayMetadata| {
2386 bridge::create_display(
2387 d.limit_low,
2388 d.limit_high,
2389 d.description.clone(),
2390 d.units.clone(),
2391 d.precision,
2392 )
2393 };
2394
2395 let metadata = match (&self.display, &self.control, &self.alarm_metadata) {
2397 (None, None, None) => bridge::create_metadata_no_optional(&alarm, &time_stamp),
2398 (Some(d), None, None) => {
2399 let display = make_display(d);
2400 bridge::create_metadata_with_display(&alarm, &time_stamp, &display)
2401 }
2402 (None, Some(c), None) => {
2403 let control = bridge::create_control(c.limit_low, c.limit_high, c.min_step);
2404 bridge::create_metadata_with_control(&alarm, &time_stamp, &control)
2405 }
2406 (None, None, Some(v)) => {
2407 let value_alarm = bridge::create_value_alarm(
2408 v.active,
2409 v.low_alarm_limit,
2410 v.low_warning_limit,
2411 v.high_warning_limit,
2412 v.high_alarm_limit,
2413 v.low_alarm_severity as i32,
2414 v.low_warning_severity as i32,
2415 v.high_warning_severity as i32,
2416 v.high_alarm_severity as i32,
2417 v.hysteresis,
2418 );
2419 bridge::create_metadata_with_value_alarm(&alarm, &time_stamp, &value_alarm)
2420 }
2421 (Some(d), Some(c), None) => {
2422 let display = make_display(d);
2423 let control = bridge::create_control(c.limit_low, c.limit_high, c.min_step);
2424 bridge::create_metadata_with_display_control(
2425 &alarm,
2426 &time_stamp,
2427 &display,
2428 &control,
2429 )
2430 }
2431 (Some(d), None, Some(v)) => {
2432 let display = make_display(d);
2433 let value_alarm = bridge::create_value_alarm(
2434 v.active,
2435 v.low_alarm_limit,
2436 v.low_warning_limit,
2437 v.high_warning_limit,
2438 v.high_alarm_limit,
2439 v.low_alarm_severity as i32,
2440 v.low_warning_severity as i32,
2441 v.high_warning_severity as i32,
2442 v.high_alarm_severity as i32,
2443 v.hysteresis,
2444 );
2445 bridge::create_metadata_with_display_value_alarm(
2446 &alarm,
2447 &time_stamp,
2448 &display,
2449 &value_alarm,
2450 )
2451 }
2452 (None, Some(c), Some(v)) => {
2453 let control = bridge::create_control(c.limit_low, c.limit_high, c.min_step);
2454 let value_alarm = bridge::create_value_alarm(
2455 v.active,
2456 v.low_alarm_limit,
2457 v.low_warning_limit,
2458 v.high_warning_limit,
2459 v.high_alarm_limit,
2460 v.low_alarm_severity as i32,
2461 v.low_warning_severity as i32,
2462 v.high_warning_severity as i32,
2463 v.high_alarm_severity as i32,
2464 v.hysteresis,
2465 );
2466 bridge::create_metadata_with_control_value_alarm(
2467 &alarm,
2468 &time_stamp,
2469 &control,
2470 &value_alarm,
2471 )
2472 }
2473 (Some(d), Some(c), Some(v)) => {
2474 let display = make_display(d);
2475 let control = bridge::create_control(c.limit_low, c.limit_high, c.min_step);
2476 let value_alarm = bridge::create_value_alarm(
2477 v.active,
2478 v.low_alarm_limit,
2479 v.low_warning_limit,
2480 v.high_warning_limit,
2481 v.high_alarm_limit,
2482 v.low_alarm_severity as i32,
2483 v.low_warning_severity as i32,
2484 v.high_warning_severity as i32,
2485 v.high_alarm_severity as i32,
2486 v.hysteresis,
2487 );
2488 bridge::create_metadata_full(&alarm, &time_stamp, &display, &control, &value_alarm)
2489 }
2490 };
2491
2492 Ok(metadata)
2493 }
2494}
2495
2496impl Default for NTScalarMetadataBuilder {
2497 fn default() -> Self {
2498 Self::new()
2499 }
2500}
2501
2502pub struct NTEnumMetadataBuilder {
2525 alarm_severity: i32,
2526 alarm_status: i32,
2527 alarm_message: String,
2528 timestamp_seconds: i64,
2529 timestamp_nanos: i32,
2530 timestamp_user_tag: i32,
2531}
2532
2533impl NTEnumMetadataBuilder {
2534 pub fn new() -> Self {
2536 use std::time::{SystemTime, UNIX_EPOCH};
2537 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
2538
2539 Self {
2540 alarm_severity: 0,
2541 alarm_status: 0,
2542 alarm_message: String::new(),
2543 timestamp_seconds: now.as_secs() as i64,
2544 timestamp_nanos: now.subsec_nanos() as i32,
2545 timestamp_user_tag: 0,
2546 }
2547 }
2548
2549 pub fn alarm(mut self, severity: i32, status: i32, message: impl Into<String>) -> Self {
2551 self.alarm_severity = severity;
2552 self.alarm_status = status;
2553 self.alarm_message = message.into();
2554 self
2555 }
2556
2557 pub fn timestamp(mut self, seconds: i64, nanos: i32, user_tag: i32) -> Self {
2559 self.timestamp_seconds = seconds;
2560 self.timestamp_nanos = nanos;
2561 self.timestamp_user_tag = user_tag;
2562 self
2563 }
2564
2565 fn build(self) -> Result<cxx::UniquePtr<bridge::NTEnumMetadata>> {
2566 let alarm =
2567 bridge::create_alarm(self.alarm_severity, self.alarm_status, self.alarm_message);
2568 let time_stamp = bridge::create_time(
2569 self.timestamp_seconds,
2570 self.timestamp_nanos,
2571 self.timestamp_user_tag,
2572 );
2573 let metadata = bridge::create_enum_metadata(&alarm, &time_stamp);
2574 Ok(metadata)
2575 }
2576}
2577
2578impl Default for NTEnumMetadataBuilder {
2579 fn default() -> Self {
2580 Self::new()
2581 }
2582}