1use std::net::SocketAddr;
14use std::ops::ControlFlow;
15use std::sync::atomic::{AtomicU32, Ordering};
16use std::time::Duration;
17
18use serde_json::Value;
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use tokio::net::tcp::OwnedWriteHalf;
21use tokio::task::JoinHandle;
22use tokio::time::{Instant, interval};
23
24use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
25use spvirit_codec::spvd_decode::{DecodedValue, PvdDecoder, StructureDesc};
26use spvirit_codec::spvd_encode::encode_pv_request;
27use spvirit_codec::spvirit_encode::{
28 encode_control_message, encode_get_field_request, encode_monitor_request, encode_put_request,
29};
30
31use crate::client::{ChannelConn, ensure_status_ok, establish_channel, pvget as low_level_pvget};
32use crate::put_encode::encode_put_payload;
33use crate::search::resolve_pv_server;
34use crate::transport::{read_packet, read_until};
35use crate::types::{PvGetError, PvGetResult, PvOptions};
36
37const PVA_VERSION: u8 = 2;
39const QOS_INIT: u8 = 0x08;
41
42static NEXT_IOID: AtomicU32 = AtomicU32::new(1);
43fn alloc_ioid() -> u32 {
44 NEXT_IOID.fetch_add(1, Ordering::Relaxed)
45}
46
47pub struct PvaClientBuilder {
58 udp_port: u16,
59 tcp_port: u16,
60 timeout: Duration,
61 no_broadcast: bool,
62 name_servers: Vec<SocketAddr>,
63 authnz_user: Option<String>,
64 authnz_host: Option<String>,
65 server_addr: Option<SocketAddr>,
66 search_addr: Option<std::net::IpAddr>,
67 bind_addr: Option<std::net::IpAddr>,
68 debug: bool,
69}
70
71impl PvaClientBuilder {
72 fn new() -> Self {
73 Self {
74 udp_port: 5076,
75 tcp_port: 5075,
76 timeout: Duration::from_secs(5),
77 no_broadcast: false,
78 name_servers: Vec::new(),
79 authnz_user: None,
80 authnz_host: None,
81 server_addr: None,
82 search_addr: None,
83 bind_addr: None,
84 debug: false,
85 }
86 }
87
88 pub fn port(mut self, port: u16) -> Self {
90 self.tcp_port = port;
91 self
92 }
93
94 pub fn udp_port(mut self, port: u16) -> Self {
96 self.udp_port = port;
97 self
98 }
99
100 pub fn timeout(mut self, timeout: Duration) -> Self {
102 self.timeout = timeout;
103 self
104 }
105
106 pub fn no_broadcast(mut self) -> Self {
108 self.no_broadcast = true;
109 self
110 }
111
112 pub fn name_server(mut self, addr: SocketAddr) -> Self {
114 self.name_servers.push(addr);
115 self
116 }
117
118 pub fn authnz_user(mut self, user: impl Into<String>) -> Self {
120 self.authnz_user = Some(user.into());
121 self
122 }
123
124 pub fn authnz_host(mut self, host: impl Into<String>) -> Self {
126 self.authnz_host = Some(host.into());
127 self
128 }
129
130 pub fn server_addr(mut self, addr: SocketAddr) -> Self {
132 self.server_addr = Some(addr);
133 self
134 }
135
136 pub fn search_addr(mut self, addr: std::net::IpAddr) -> Self {
138 self.search_addr = Some(addr);
139 self
140 }
141
142 pub fn bind_addr(mut self, addr: std::net::IpAddr) -> Self {
144 self.bind_addr = Some(addr);
145 self
146 }
147
148 pub fn debug(mut self) -> Self {
150 self.debug = true;
151 self
152 }
153
154 pub fn build(self) -> PvaClient {
156 PvaClient {
157 udp_port: self.udp_port,
158 tcp_port: self.tcp_port,
159 timeout: self.timeout,
160 no_broadcast: self.no_broadcast,
161 name_servers: self.name_servers,
162 authnz_user: self.authnz_user,
163 authnz_host: self.authnz_host,
164 server_addr: self.server_addr,
165 search_addr: self.search_addr,
166 bind_addr: self.bind_addr,
167 debug: self.debug,
168 }
169 }
170}
171
172#[derive(Clone, Debug)]
184pub struct PvaClient {
185 udp_port: u16,
186 tcp_port: u16,
187 timeout: Duration,
188 no_broadcast: bool,
189 name_servers: Vec<SocketAddr>,
190 authnz_user: Option<String>,
191 authnz_host: Option<String>,
192 server_addr: Option<SocketAddr>,
193 search_addr: Option<std::net::IpAddr>,
194 bind_addr: Option<std::net::IpAddr>,
195 debug: bool,
196}
197
198impl PvaClient {
199 pub fn builder() -> PvaClientBuilder {
201 PvaClientBuilder::new()
202 }
203
204 fn opts(&self, pv_name: &str) -> PvOptions {
206 let mut o = PvOptions::new(pv_name.to_string());
207 o.udp_port = self.udp_port;
208 o.tcp_port = self.tcp_port;
209 o.timeout = self.timeout;
210 o.no_broadcast = self.no_broadcast;
211 o.name_servers.clone_from(&self.name_servers);
212 o.authnz_user.clone_from(&self.authnz_user);
213 o.authnz_host.clone_from(&self.authnz_host);
214 o.server_addr = self.server_addr;
215 o.search_addr = self.search_addr;
216 o.bind_addr = self.bind_addr;
217 o.debug = self.debug;
218 o
219 }
220
221 async fn open_channel(&self, pv_name: &str) -> Result<ChannelConn, PvGetError> {
223 let opts = self.opts(pv_name);
224 let target = resolve_pv_server(&opts).await?;
225 establish_channel(target, &opts).await
226 }
227
228 pub async fn pvget(&self, pv_name: &str) -> Result<PvGetResult, PvGetError> {
232 let opts = self.opts(pv_name);
233 low_level_pvget(&opts).await
234 }
235
236 pub async fn pvget_fields(
238 &self,
239 pv_name: &str,
240 fields: &[&str],
241 ) -> Result<PvGetResult, PvGetError> {
242 let opts = self.opts(pv_name);
243 crate::client::pvget_fields(&opts, fields).await
244 }
245
246 pub async fn pvput(&self, pv_name: &str, value: impl Into<Value>) -> Result<(), PvGetError> {
257 let json_val = value.into();
258 let ChannelConn {
259 mut stream,
260 sid,
261 version: _,
262 is_be,
263 ..
264 } = self.open_channel(pv_name).await?;
265
266 let ioid = alloc_ioid();
267
268 let pv_request = encode_pv_request(&["value"], is_be);
270 let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
271 stream.write_all(&init).await?;
272
273 let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
275 matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
276 })
277 .await?;
278
279 let desc = decode_init_introspection(&init_bytes, "PUT")?;
280
281 let payload = encode_put_payload(&desc, &json_val, is_be)
283 .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
284 let req = encode_put_request(sid, ioid, 0x00, &payload, PVA_VERSION, is_be);
285 stream.write_all(&req).await?;
286
287 let resp_bytes = read_until(
289 &mut stream,
290 self.timeout,
291 |cmd| matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && op.subcmd == 0x00),
292 )
293 .await?;
294 ensure_status_ok(&resp_bytes, is_be, "PUT")?;
295
296 Ok(())
297 }
298
299 pub async fn open_put_channel(&self, pv_name: &str) -> Result<PvaChannel, PvGetError> {
307 let ChannelConn {
308 mut stream,
309 sid,
310 version,
311 is_be,
312 ..
313 } = self.open_channel(pv_name).await?;
314
315 let ioid = alloc_ioid();
316
317 let pv_request = encode_pv_request(&["value"], is_be);
319 let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
320 stream.write_all(&init).await?;
321
322 let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
323 matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
324 })
325 .await?;
326
327 let desc = decode_init_introspection(&init_bytes, "PUT")?;
328
329 let (mut reader, writer) = stream.into_split();
331 let reader_is_be = is_be;
332 let reader_handle = tokio::spawn(async move {
333 loop {
334 let mut header = [0u8; 8];
335 if reader.read_exact(&mut header).await.is_err() {
336 break;
337 }
338 let hdr = spvirit_codec::epics_decode::PvaHeader::new(&header);
339 let len = if hdr.flags.is_control {
340 0usize
341 } else {
342 hdr.payload_length as usize
343 };
344 let mut payload = vec![0u8; len];
345 if len > 0 && reader.read_exact(&mut payload).await.is_err() {
346 break;
347 }
348 if hdr.command == 11 && !hdr.flags.is_control && len >= 5 {
349 if let Some(st) =
350 spvirit_codec::epics_decode::decode_status(&payload[5..], reader_is_be).0
351 {
352 if st.code != 0 {
353 let msg = st.message.unwrap_or_else(|| format!("code={}", st.code));
354 eprintln!("PvaChannel put error: {msg}");
355 }
356 }
357 }
358 }
359 });
360
361 Ok(PvaChannel {
362 writer,
363 sid,
364 ioid,
365 version,
366 is_be,
367 put_desc: desc,
368 echo_token: 1,
369 last_echo: Instant::now(),
370 _reader_handle: reader_handle,
371 })
372 }
373
374 pub async fn pvmonitor<F>(&self, pv_name: &str, mut callback: F) -> Result<(), PvGetError>
390 where
391 F: FnMut(&DecodedValue) -> ControlFlow<()>,
392 {
393 let ChannelConn {
394 mut stream,
395 sid,
396 version: _,
397 is_be,
398 ..
399 } = self.open_channel(pv_name).await?;
400
401 let ioid = alloc_ioid();
402 let decoder = PvdDecoder::new(is_be);
403
404 let pv_request = encode_pv_request(&["value", "alarm", "timeStamp"], is_be);
406 let init = encode_monitor_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
407 stream.write_all(&init).await?;
408
409 let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
411 matches!(cmd, PvaPacketCommand::Op(op) if op.command == 13 && (op.subcmd & 0x08) != 0)
412 })
413 .await?;
414
415 let field_desc = decode_init_introspection(&init_bytes, "MONITOR")?;
416
417 let start = encode_monitor_request(sid, ioid, 0x44, &[], PVA_VERSION, is_be);
419 stream.write_all(&start).await?;
420
421 let mut echo_interval = interval(Duration::from_secs(10));
423 let mut echo_token: u32 = 1;
424
425 loop {
426 tokio::select! {
427 _ = echo_interval.tick() => {
428 let msg = encode_control_message(false, is_be, PVA_VERSION, 3, echo_token);
429 echo_token = echo_token.wrapping_add(1);
430 let _ = stream.write_all(&msg).await;
431 }
432 res = read_packet(&mut stream, self.timeout) => {
433 let bytes = match res {
434 Ok(b) => b,
435 Err(PvGetError::Timeout(_)) => continue,
436 Err(e) => return Err(e),
437 };
438 let mut pkt = PvaPacket::new(&bytes);
439 if let Some(PvaPacketCommand::Op(op)) = pkt.decode_payload() {
440 if op.command == 13 && op.ioid == ioid && op.subcmd == 0x00 {
441 let payload = &bytes[8..]; let pos = 5; if let Some((decoded, _)) =
444 decoder.decode_structure_with_bitset(&payload[pos..], &field_desc)
445 {
446 if callback(&decoded).is_break() {
447 return Ok(());
448 }
449 }
450 }
451 }
452 }
453 }
454 }
455 }
456
457 pub async fn pvinfo(&self, pv_name: &str) -> Result<StructureDesc, PvGetError> {
461 let result = self.pvinfo_full(pv_name).await?;
462 Ok(result.0)
463 }
464
465 pub async fn pvinfo_full(
467 &self,
468 pv_name: &str,
469 ) -> Result<(StructureDesc, SocketAddr), PvGetError> {
470 let ChannelConn {
471 mut stream,
472 sid,
473 version: _,
474 is_be,
475 server_addr,
476 } = self.open_channel(pv_name).await?;
477
478 let ioid = alloc_ioid();
479 let msg = encode_get_field_request(sid, ioid, None, PVA_VERSION, is_be);
480 stream.write_all(&msg).await?;
481
482 let resp_bytes = read_until(
483 &mut stream,
484 self.timeout,
485 |cmd| matches!(cmd, PvaPacketCommand::GetField(_)),
486 )
487 .await?;
488
489 let mut pkt = PvaPacket::new(&resp_bytes);
490 let cmd = pkt
491 .decode_payload()
492 .ok_or_else(|| PvGetError::Decode("GET_FIELD response decode failed".to_string()))?;
493 match cmd {
494 PvaPacketCommand::GetField(payload) => {
495 if let Some(ref st) = payload.status {
496 if st.is_error() {
497 let msg = st
498 .message
499 .clone()
500 .unwrap_or_else(|| format!("code={}", st.code));
501 return Err(PvGetError::Protocol(format!("GET_FIELD error: {msg}")));
502 }
503 }
504 let desc = payload
505 .introspection
506 .ok_or_else(|| PvGetError::Decode("missing GET_FIELD introspection".to_string()))?;
507 Ok((desc, server_addr))
508 }
509 _ => Err(PvGetError::Protocol(
510 "unexpected GET_FIELD response".to_string(),
511 )),
512 }
513 }
514
515 pub async fn pvlist(&self, server_addr: SocketAddr) -> Result<Vec<String>, PvGetError> {
519 let opts = self.opts("__pvlist");
520 crate::pvlist::pvlist(&opts, server_addr).await
521 }
522
523 pub async fn pvlist_with_fallback(
527 &self,
528 server_addr: SocketAddr,
529 ) -> Result<(Vec<String>, crate::pvlist::PvListSource), PvGetError> {
530 let opts = self.opts("__pvlist");
531 crate::pvlist::pvlist_with_fallback(&opts, server_addr).await
532 }
533}
534
535pub struct PvaChannel {
553 writer: OwnedWriteHalf,
554 sid: u32,
555 ioid: u32,
556 version: u8,
557 is_be: bool,
558 put_desc: StructureDesc,
559 echo_token: u32,
560 last_echo: Instant,
561 _reader_handle: JoinHandle<()>,
562}
563
564impl PvaChannel {
565 pub async fn put(&mut self, value: impl Into<Value>) -> Result<(), PvGetError> {
570 if self.last_echo.elapsed() >= Duration::from_secs(10) {
572 let msg = encode_control_message(false, self.is_be, self.version, 3, self.echo_token);
573 self.echo_token = self.echo_token.wrapping_add(1);
574 let _ = self.writer.write_all(&msg).await;
575 self.last_echo = Instant::now();
576 }
577
578 let json_val = value.into();
579 let payload = encode_put_payload(&self.put_desc, &json_val, self.is_be)
580 .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
581 let req = encode_put_request(
582 self.sid,
583 self.ioid,
584 0x00,
585 &payload,
586 self.version,
587 self.is_be,
588 );
589 self.writer.write_all(&req).await?;
590 Ok(())
591 }
592
593 pub fn introspection(&self) -> &StructureDesc {
595 &self.put_desc
596 }
597}
598
599impl Drop for PvaChannel {
600 fn drop(&mut self) {
601 self._reader_handle.abort();
602 }
603}
604
605pub async fn pvput(opts: &PvOptions, value: impl Into<Value>) -> Result<(), PvGetError> {
615 let client = client_from_opts(opts);
616 client.pvput(&opts.pv_name, value).await
617}
618
619pub async fn pvmonitor<F>(opts: &PvOptions, callback: F) -> Result<(), PvGetError>
624where
625 F: FnMut(&DecodedValue) -> ControlFlow<()>,
626{
627 let client = client_from_opts(opts);
628 client.pvmonitor(&opts.pv_name, callback).await
629}
630
631pub async fn pvinfo(opts: &PvOptions) -> Result<StructureDesc, PvGetError> {
633 let client = client_from_opts(opts);
634 client.pvinfo(&opts.pv_name).await
635}
636
637pub fn client_from_opts(opts: &PvOptions) -> PvaClient {
641 let mut b = PvaClient::builder()
642 .port(opts.tcp_port)
643 .udp_port(opts.udp_port)
644 .timeout(opts.timeout);
645 if opts.no_broadcast {
646 b = b.no_broadcast();
647 }
648 for ns in &opts.name_servers {
649 b = b.name_server(*ns);
650 }
651 if let Some(ref u) = opts.authnz_user {
652 b = b.authnz_user(u.clone());
653 }
654 if let Some(ref h) = opts.authnz_host {
655 b = b.authnz_host(h.clone());
656 }
657 if let Some(addr) = opts.server_addr {
658 b = b.server_addr(addr);
659 }
660 if let Some(addr) = opts.search_addr {
661 b = b.search_addr(addr);
662 }
663 if let Some(addr) = opts.bind_addr {
664 b = b.bind_addr(addr);
665 }
666 if opts.debug {
667 b = b.debug();
668 }
669 b.build()
670}
671
672fn decode_init_introspection(raw: &[u8], label: &str) -> Result<StructureDesc, PvGetError> {
674 let mut pkt = PvaPacket::new(raw);
675 let cmd = pkt
676 .decode_payload()
677 .ok_or_else(|| PvGetError::Decode(format!("{label} init response decode failed")))?;
678
679 match cmd {
680 PvaPacketCommand::Op(op) => {
681 if let Some(ref st) = op.status {
682 if st.is_error() {
683 let msg = st
684 .message
685 .clone()
686 .unwrap_or_else(|| format!("code={}", st.code));
687 return Err(PvGetError::Protocol(format!("{label} init error: {msg}")));
688 }
689 }
690 op.introspection
691 .ok_or_else(|| PvGetError::Decode(format!("missing {label} introspection")))
692 }
693 _ => Err(PvGetError::Protocol(format!(
694 "unexpected {label} init response"
695 ))),
696 }
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702
703 #[test]
704 fn builder_defaults() {
705 let c = PvaClient::builder().build();
706 assert_eq!(c.tcp_port, 5075);
707 assert_eq!(c.udp_port, 5076);
708 assert_eq!(c.timeout, Duration::from_secs(5));
709 assert!(!c.no_broadcast);
710 assert!(c.name_servers.is_empty());
711 }
712
713 #[test]
714 fn builder_overrides() {
715 let c = PvaClient::builder()
716 .port(9075)
717 .udp_port(9076)
718 .timeout(Duration::from_secs(10))
719 .no_broadcast()
720 .name_server("127.0.0.1:5075".parse().unwrap())
721 .authnz_user("testuser")
722 .authnz_host("testhost")
723 .build();
724 assert_eq!(c.tcp_port, 9075);
725 assert_eq!(c.udp_port, 9076);
726 assert_eq!(c.timeout, Duration::from_secs(10));
727 assert!(c.no_broadcast);
728 assert_eq!(c.name_servers.len(), 1);
729 assert_eq!(c.authnz_user.as_deref(), Some("testuser"));
730 assert_eq!(c.authnz_host.as_deref(), Some("testhost"));
731 }
732
733 #[test]
734 fn opts_inherits_client_config() {
735 let c = PvaClient::builder()
736 .port(9075)
737 .udp_port(9076)
738 .timeout(Duration::from_secs(10))
739 .no_broadcast()
740 .build();
741 let o = c.opts("TEST:PV");
742 assert_eq!(o.pv_name, "TEST:PV");
743 assert_eq!(o.tcp_port, 9075);
744 assert_eq!(o.udp_port, 9076);
745 assert_eq!(o.timeout, Duration::from_secs(10));
746 assert!(o.no_broadcast);
747 }
748
749 #[test]
750 fn client_from_opts_roundtrip() {
751 let mut opts = PvOptions::new("X:Y".into());
752 opts.tcp_port = 8075;
753 opts.udp_port = 8076;
754 opts.timeout = Duration::from_secs(3);
755 opts.no_broadcast = true;
756 let c = client_from_opts(&opts);
757 assert_eq!(c.tcp_port, 8075);
758 assert_eq!(c.udp_port, 8076);
759 assert!(c.no_broadcast);
760 }
761
762 #[test]
763 fn pv_get_options_alias_works() {
764 let opts: crate::types::PvGetOptions = PvOptions::new("ALIAS:TEST".into());
766 assert_eq!(opts.pv_name, "ALIAS:TEST");
767 }
768}