1use std::net::SocketAddr;
14use std::ops::ControlFlow;
15use std::sync::atomic::{AtomicU32, Ordering};
16use std::time::Duration;
17
18use serde_json::Value;
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use tokio::net::tcp::OwnedWriteHalf;
21use tokio::task::JoinHandle;
22use tokio::time::{Instant, interval};
23
24use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
25use spvirit_codec::spvd_decode::{DecodedValue, PvdDecoder, StructureDesc};
26use spvirit_codec::spvd_encode::encode_pv_request;
27use spvirit_codec::spvirit_encode::{
28 encode_control_message, encode_get_field_request, encode_monitor_request, encode_put_request,
29};
30
31use crate::client::{ChannelConn, ensure_status_ok, establish_channel, pvget as low_level_pvget};
32use crate::put_encode::encode_put_payload;
33use crate::search::resolve_pv_server;
34use crate::transport::{read_packet, read_until};
35use crate::types::{PvGetError, PvGetResult, PvOptions};
36
37const PVA_VERSION: u8 = 2;
39const QOS_INIT: u8 = 0x08;
41
42static NEXT_IOID: AtomicU32 = AtomicU32::new(1);
43fn alloc_ioid() -> u32 {
44 NEXT_IOID.fetch_add(1, Ordering::Relaxed)
45}
46
47pub struct PvaClientBuilder {
58 udp_port: u16,
59 tcp_port: u16,
60 timeout: Duration,
61 no_broadcast: bool,
62 name_servers: Vec<SocketAddr>,
63 authnz_user: Option<String>,
64 authnz_host: Option<String>,
65 server_addr: Option<SocketAddr>,
66 search_addr: Option<std::net::IpAddr>,
67 bind_addr: Option<std::net::IpAddr>,
68 debug: bool,
69}
70
71impl PvaClientBuilder {
72 fn new() -> Self {
73 Self {
74 udp_port: 5076,
75 tcp_port: 5075,
76 timeout: Duration::from_secs(5),
77 no_broadcast: false,
78 name_servers: Vec::new(),
79 authnz_user: None,
80 authnz_host: None,
81 server_addr: None,
82 search_addr: None,
83 bind_addr: None,
84 debug: false,
85 }
86 }
87
88 pub fn port(mut self, port: u16) -> Self {
90 self.tcp_port = port;
91 self
92 }
93
94 pub fn udp_port(mut self, port: u16) -> Self {
96 self.udp_port = port;
97 self
98 }
99
100 pub fn timeout(mut self, timeout: Duration) -> Self {
102 self.timeout = timeout;
103 self
104 }
105
106 pub fn no_broadcast(mut self) -> Self {
108 self.no_broadcast = true;
109 self
110 }
111
112 pub fn name_server(mut self, addr: SocketAddr) -> Self {
114 self.name_servers.push(addr);
115 self
116 }
117
118 pub fn authnz_user(mut self, user: impl Into<String>) -> Self {
120 self.authnz_user = Some(user.into());
121 self
122 }
123
124 pub fn authnz_host(mut self, host: impl Into<String>) -> Self {
126 self.authnz_host = Some(host.into());
127 self
128 }
129
130 pub fn server_addr(mut self, addr: SocketAddr) -> Self {
132 self.server_addr = Some(addr);
133 self
134 }
135
136 pub fn search_addr(mut self, addr: std::net::IpAddr) -> Self {
138 self.search_addr = Some(addr);
139 self
140 }
141
142 pub fn bind_addr(mut self, addr: std::net::IpAddr) -> Self {
144 self.bind_addr = Some(addr);
145 self
146 }
147
148 pub fn debug(mut self) -> Self {
150 self.debug = true;
151 self
152 }
153
154 pub fn build(self) -> PvaClient {
156 PvaClient {
157 udp_port: self.udp_port,
158 tcp_port: self.tcp_port,
159 timeout: self.timeout,
160 no_broadcast: self.no_broadcast,
161 name_servers: self.name_servers,
162 authnz_user: self.authnz_user,
163 authnz_host: self.authnz_host,
164 server_addr: self.server_addr,
165 search_addr: self.search_addr,
166 bind_addr: self.bind_addr,
167 debug: self.debug,
168 }
169 }
170}
171
172#[derive(Clone, Debug)]
184pub struct PvaClient {
185 udp_port: u16,
186 tcp_port: u16,
187 timeout: Duration,
188 no_broadcast: bool,
189 name_servers: Vec<SocketAddr>,
190 authnz_user: Option<String>,
191 authnz_host: Option<String>,
192 server_addr: Option<SocketAddr>,
193 search_addr: Option<std::net::IpAddr>,
194 bind_addr: Option<std::net::IpAddr>,
195 debug: bool,
196}
197
198impl PvaClient {
199 pub fn builder() -> PvaClientBuilder {
201 PvaClientBuilder::new()
202 }
203
204 fn opts(&self, pv_name: &str) -> PvOptions {
206 let mut o = PvOptions::new(pv_name.to_string());
207 o.udp_port = self.udp_port;
208 o.tcp_port = self.tcp_port;
209 o.timeout = self.timeout;
210 o.no_broadcast = self.no_broadcast;
211 o.name_servers.clone_from(&self.name_servers);
212 o.authnz_user.clone_from(&self.authnz_user);
213 o.authnz_host.clone_from(&self.authnz_host);
214 o.server_addr = self.server_addr;
215 o.search_addr = self.search_addr;
216 o.bind_addr = self.bind_addr;
217 o.debug = self.debug;
218 o
219 }
220
221 async fn open_channel(&self, pv_name: &str) -> Result<ChannelConn, PvGetError> {
223 let opts = self.opts(pv_name);
224 let target = resolve_pv_server(&opts).await?;
225 establish_channel(target, &opts).await
226 }
227
228 pub async fn pvget(&self, pv_name: &str) -> Result<PvGetResult, PvGetError> {
232 let opts = self.opts(pv_name);
233 low_level_pvget(&opts).await
234 }
235
236 pub async fn pvput(&self, pv_name: &str, value: impl Into<Value>) -> Result<(), PvGetError> {
247 let json_val = value.into();
248 let ChannelConn {
249 mut stream,
250 sid,
251 version: _,
252 is_be,
253 } = self.open_channel(pv_name).await?;
254
255 let ioid = alloc_ioid();
256
257 let pv_request = encode_pv_request(&["value"], is_be);
259 let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
260 stream.write_all(&init).await?;
261
262 let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
264 matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
265 })
266 .await?;
267
268 let desc = decode_init_introspection(&init_bytes, "PUT")?;
269
270 let payload = encode_put_payload(&desc, &json_val, is_be)
272 .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
273 let req = encode_put_request(sid, ioid, 0x00, &payload, PVA_VERSION, is_be);
274 stream.write_all(&req).await?;
275
276 let resp_bytes = read_until(
278 &mut stream,
279 self.timeout,
280 |cmd| matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && op.subcmd == 0x00),
281 )
282 .await?;
283 ensure_status_ok(&resp_bytes, is_be, "PUT")?;
284
285 Ok(())
286 }
287
288 pub async fn open_put_channel(&self, pv_name: &str) -> Result<PvaChannel, PvGetError> {
296 let ChannelConn {
297 mut stream,
298 sid,
299 version,
300 is_be,
301 } = self.open_channel(pv_name).await?;
302
303 let ioid = alloc_ioid();
304
305 let pv_request = encode_pv_request(&["value"], is_be);
307 let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
308 stream.write_all(&init).await?;
309
310 let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
311 matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
312 })
313 .await?;
314
315 let desc = decode_init_introspection(&init_bytes, "PUT")?;
316
317 let (mut reader, writer) = stream.into_split();
319 let reader_is_be = is_be;
320 let reader_handle = tokio::spawn(async move {
321 loop {
322 let mut header = [0u8; 8];
323 if reader.read_exact(&mut header).await.is_err() {
324 break;
325 }
326 let hdr = spvirit_codec::epics_decode::PvaHeader::new(&header);
327 let len = if hdr.flags.is_control {
328 0usize
329 } else {
330 hdr.payload_length as usize
331 };
332 let mut payload = vec![0u8; len];
333 if len > 0 && reader.read_exact(&mut payload).await.is_err() {
334 break;
335 }
336 if hdr.command == 11 && !hdr.flags.is_control && len >= 5 {
337 if let Some(st) =
338 spvirit_codec::epics_decode::decode_status(&payload[5..], reader_is_be).0
339 {
340 if st.code != 0 {
341 let msg = st.message.unwrap_or_else(|| format!("code={}", st.code));
342 eprintln!("PvaChannel put error: {msg}");
343 }
344 }
345 }
346 }
347 });
348
349 Ok(PvaChannel {
350 writer,
351 sid,
352 ioid,
353 version,
354 is_be,
355 put_desc: desc,
356 echo_token: 1,
357 last_echo: Instant::now(),
358 _reader_handle: reader_handle,
359 })
360 }
361
362 pub async fn pvmonitor<F>(&self, pv_name: &str, mut callback: F) -> Result<(), PvGetError>
378 where
379 F: FnMut(&DecodedValue) -> ControlFlow<()>,
380 {
381 let ChannelConn {
382 mut stream,
383 sid,
384 version: _,
385 is_be,
386 } = self.open_channel(pv_name).await?;
387
388 let ioid = alloc_ioid();
389 let decoder = PvdDecoder::new(is_be);
390
391 let pv_request = encode_pv_request(&["value", "alarm", "timeStamp"], is_be);
393 let init = encode_monitor_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
394 stream.write_all(&init).await?;
395
396 let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
398 matches!(cmd, PvaPacketCommand::Op(op) if op.command == 13 && (op.subcmd & 0x08) != 0)
399 })
400 .await?;
401
402 let field_desc = decode_init_introspection(&init_bytes, "MONITOR")?;
403
404 let start = encode_monitor_request(sid, ioid, 0x44, &[], PVA_VERSION, is_be);
406 stream.write_all(&start).await?;
407
408 let mut echo_interval = interval(Duration::from_secs(10));
410 let mut echo_token: u32 = 1;
411
412 loop {
413 tokio::select! {
414 _ = echo_interval.tick() => {
415 let msg = encode_control_message(false, is_be, PVA_VERSION, 3, echo_token);
416 echo_token = echo_token.wrapping_add(1);
417 let _ = stream.write_all(&msg).await;
418 }
419 res = read_packet(&mut stream, self.timeout) => {
420 let bytes = match res {
421 Ok(b) => b,
422 Err(PvGetError::Timeout(_)) => continue,
423 Err(e) => return Err(e),
424 };
425 let mut pkt = PvaPacket::new(&bytes);
426 if let Some(PvaPacketCommand::Op(op)) = pkt.decode_payload() {
427 if op.command == 13 && op.ioid == ioid && op.subcmd == 0x00 {
428 let payload = &bytes[8..]; let pos = 5; if let Some((decoded, _)) =
431 decoder.decode_structure_with_bitset(&payload[pos..], &field_desc)
432 {
433 if callback(&decoded).is_break() {
434 return Ok(());
435 }
436 }
437 }
438 }
439 }
440 }
441 }
442 }
443
444 pub async fn pvinfo(&self, pv_name: &str) -> Result<StructureDesc, PvGetError> {
448 let ChannelConn {
449 mut stream,
450 sid,
451 version: _,
452 is_be,
453 } = self.open_channel(pv_name).await?;
454
455 let ioid = alloc_ioid();
456 let msg = encode_get_field_request(sid, ioid, None, PVA_VERSION, is_be);
457 stream.write_all(&msg).await?;
458
459 let resp_bytes = read_until(
460 &mut stream,
461 self.timeout,
462 |cmd| matches!(cmd, PvaPacketCommand::Op(op) if op.command == 17),
463 )
464 .await?;
465
466 decode_init_introspection(&resp_bytes, "GET_FIELD")
467 }
468
469 pub async fn pvlist(&self, server_addr: SocketAddr) -> Result<Vec<String>, PvGetError> {
473 let opts = self.opts("__pvlist");
474 crate::pvlist::pvlist(&opts, server_addr).await
475 }
476
477 pub async fn pvlist_with_fallback(
481 &self,
482 server_addr: SocketAddr,
483 ) -> Result<(Vec<String>, crate::pvlist::PvListSource), PvGetError> {
484 let opts = self.opts("__pvlist");
485 crate::pvlist::pvlist_with_fallback(&opts, server_addr).await
486 }
487}
488
489pub struct PvaChannel {
507 writer: OwnedWriteHalf,
508 sid: u32,
509 ioid: u32,
510 version: u8,
511 is_be: bool,
512 put_desc: StructureDesc,
513 echo_token: u32,
514 last_echo: Instant,
515 _reader_handle: JoinHandle<()>,
516}
517
518impl PvaChannel {
519 pub async fn put(&mut self, value: impl Into<Value>) -> Result<(), PvGetError> {
524 if self.last_echo.elapsed() >= Duration::from_secs(10) {
526 let msg = encode_control_message(false, self.is_be, self.version, 3, self.echo_token);
527 self.echo_token = self.echo_token.wrapping_add(1);
528 let _ = self.writer.write_all(&msg).await;
529 self.last_echo = Instant::now();
530 }
531
532 let json_val = value.into();
533 let payload = encode_put_payload(&self.put_desc, &json_val, self.is_be)
534 .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
535 let req = encode_put_request(
536 self.sid,
537 self.ioid,
538 0x00,
539 &payload,
540 self.version,
541 self.is_be,
542 );
543 self.writer.write_all(&req).await?;
544 Ok(())
545 }
546
547 pub fn introspection(&self) -> &StructureDesc {
549 &self.put_desc
550 }
551}
552
553impl Drop for PvaChannel {
554 fn drop(&mut self) {
555 self._reader_handle.abort();
556 }
557}
558
559pub async fn pvput(opts: &PvOptions, value: impl Into<Value>) -> Result<(), PvGetError> {
569 let client = client_from_opts(opts);
570 client.pvput(&opts.pv_name, value).await
571}
572
573pub async fn pvmonitor<F>(opts: &PvOptions, callback: F) -> Result<(), PvGetError>
578where
579 F: FnMut(&DecodedValue) -> ControlFlow<()>,
580{
581 let client = client_from_opts(opts);
582 client.pvmonitor(&opts.pv_name, callback).await
583}
584
585pub async fn pvinfo(opts: &PvOptions) -> Result<StructureDesc, PvGetError> {
587 let client = client_from_opts(opts);
588 client.pvinfo(&opts.pv_name).await
589}
590
591pub fn client_from_opts(opts: &PvOptions) -> PvaClient {
595 let mut b = PvaClient::builder()
596 .port(opts.tcp_port)
597 .udp_port(opts.udp_port)
598 .timeout(opts.timeout);
599 if opts.no_broadcast {
600 b = b.no_broadcast();
601 }
602 for ns in &opts.name_servers {
603 b = b.name_server(*ns);
604 }
605 if let Some(ref u) = opts.authnz_user {
606 b = b.authnz_user(u.clone());
607 }
608 if let Some(ref h) = opts.authnz_host {
609 b = b.authnz_host(h.clone());
610 }
611 if let Some(addr) = opts.server_addr {
612 b = b.server_addr(addr);
613 }
614 if let Some(addr) = opts.search_addr {
615 b = b.search_addr(addr);
616 }
617 if let Some(addr) = opts.bind_addr {
618 b = b.bind_addr(addr);
619 }
620 if opts.debug {
621 b = b.debug();
622 }
623 b.build()
624}
625
626fn decode_init_introspection(raw: &[u8], label: &str) -> Result<StructureDesc, PvGetError> {
628 let mut pkt = PvaPacket::new(raw);
629 let cmd = pkt
630 .decode_payload()
631 .ok_or_else(|| PvGetError::Decode(format!("{label} init response decode failed")))?;
632
633 match cmd {
634 PvaPacketCommand::Op(op) => {
635 if let Some(ref st) = op.status {
636 if st.is_error() {
637 let msg = st
638 .message
639 .clone()
640 .unwrap_or_else(|| format!("code={}", st.code));
641 return Err(PvGetError::Protocol(format!("{label} init error: {msg}")));
642 }
643 }
644 op.introspection
645 .ok_or_else(|| PvGetError::Decode(format!("missing {label} introspection")))
646 }
647 _ => Err(PvGetError::Protocol(format!(
648 "unexpected {label} init response"
649 ))),
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[test]
658 fn builder_defaults() {
659 let c = PvaClient::builder().build();
660 assert_eq!(c.tcp_port, 5075);
661 assert_eq!(c.udp_port, 5076);
662 assert_eq!(c.timeout, Duration::from_secs(5));
663 assert!(!c.no_broadcast);
664 assert!(c.name_servers.is_empty());
665 }
666
667 #[test]
668 fn builder_overrides() {
669 let c = PvaClient::builder()
670 .port(9075)
671 .udp_port(9076)
672 .timeout(Duration::from_secs(10))
673 .no_broadcast()
674 .name_server("127.0.0.1:5075".parse().unwrap())
675 .authnz_user("testuser")
676 .authnz_host("testhost")
677 .build();
678 assert_eq!(c.tcp_port, 9075);
679 assert_eq!(c.udp_port, 9076);
680 assert_eq!(c.timeout, Duration::from_secs(10));
681 assert!(c.no_broadcast);
682 assert_eq!(c.name_servers.len(), 1);
683 assert_eq!(c.authnz_user.as_deref(), Some("testuser"));
684 assert_eq!(c.authnz_host.as_deref(), Some("testhost"));
685 }
686
687 #[test]
688 fn opts_inherits_client_config() {
689 let c = PvaClient::builder()
690 .port(9075)
691 .udp_port(9076)
692 .timeout(Duration::from_secs(10))
693 .no_broadcast()
694 .build();
695 let o = c.opts("TEST:PV");
696 assert_eq!(o.pv_name, "TEST:PV");
697 assert_eq!(o.tcp_port, 9075);
698 assert_eq!(o.udp_port, 9076);
699 assert_eq!(o.timeout, Duration::from_secs(10));
700 assert!(o.no_broadcast);
701 }
702
703 #[test]
704 fn client_from_opts_roundtrip() {
705 let mut opts = PvOptions::new("X:Y".into());
706 opts.tcp_port = 8075;
707 opts.udp_port = 8076;
708 opts.timeout = Duration::from_secs(3);
709 opts.no_broadcast = true;
710 let c = client_from_opts(&opts);
711 assert_eq!(c.tcp_port, 8075);
712 assert_eq!(c.udp_port, 8076);
713 assert!(c.no_broadcast);
714 }
715
716 #[test]
717 fn pv_get_options_alias_works() {
718 let opts: crate::types::PvGetOptions = PvOptions::new("ALIAS:TEST".into());
720 assert_eq!(opts.pv_name, "ALIAS:TEST");
721 }
722}