Skip to main content

spvirit_client/
pva_client.rs

1//! High-level PVAccess client — one-liner get, put, monitor, info.
2//!
3//! # Example
4//!
5//! ```rust,ignore
6//! use spvirit_client::PvaClient;
7//!
8//! let client = PvaClient::builder().build();
9//! let result = client.pvget("MY:PV").await?;
10//! client.pvput("MY:PV", 42.0).await?;
11//! ```
12
13use std::net::SocketAddr;
14use std::ops::ControlFlow;
15use std::sync::atomic::{AtomicU32, Ordering};
16use std::time::Duration;
17
18use serde_json::Value;
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use tokio::net::tcp::OwnedWriteHalf;
21use tokio::task::JoinHandle;
22use tokio::time::{Instant, interval};
23
24use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
25use spvirit_codec::spvd_decode::{DecodedValue, PvdDecoder, StructureDesc};
26use spvirit_codec::spvd_encode::encode_pv_request;
27use spvirit_codec::spvirit_encode::{
28    encode_control_message, encode_get_field_request, encode_monitor_request, encode_put_request,
29};
30
31use crate::client::{ChannelConn, ensure_status_ok, establish_channel, pvget as low_level_pvget};
32use crate::put_encode::encode_put_payload;
33use crate::search::resolve_pv_server;
34use crate::transport::{read_packet, read_until};
35use crate::types::{PvGetError, PvGetResult, PvOptions};
36
37/// PVA protocol version used in headers.
38const PVA_VERSION: u8 = 2;
39/// QoS / subcommand flag: INIT.
40const QOS_INIT: u8 = 0x08;
41
42static NEXT_IOID: AtomicU32 = AtomicU32::new(1);
43fn alloc_ioid() -> u32 {
44    NEXT_IOID.fetch_add(1, Ordering::Relaxed)
45}
46
47// ─── PvaClientBuilder ────────────────────────────────────────────────────────
48
49/// Builder for [`PvaClient`].
50///
51/// ```rust,ignore
52/// let client = PvaClient::builder()
53///     .timeout(Duration::from_secs(10))
54///     .port(5075)
55///     .build();
56/// ```
57pub struct PvaClientBuilder {
58    udp_port: u16,
59    tcp_port: u16,
60    timeout: Duration,
61    no_broadcast: bool,
62    name_servers: Vec<SocketAddr>,
63    authnz_user: Option<String>,
64    authnz_host: Option<String>,
65    server_addr: Option<SocketAddr>,
66    search_addr: Option<std::net::IpAddr>,
67    bind_addr: Option<std::net::IpAddr>,
68    debug: bool,
69}
70
71impl PvaClientBuilder {
72    fn new() -> Self {
73        Self {
74            udp_port: 5076,
75            tcp_port: 5075,
76            timeout: Duration::from_secs(5),
77            no_broadcast: false,
78            name_servers: Vec::new(),
79            authnz_user: None,
80            authnz_host: None,
81            server_addr: None,
82            search_addr: None,
83            bind_addr: None,
84            debug: false,
85        }
86    }
87
88    /// Set the TCP port (default 5075).
89    pub fn port(mut self, port: u16) -> Self {
90        self.tcp_port = port;
91        self
92    }
93
94    /// Set the UDP search port (default 5076).
95    pub fn udp_port(mut self, port: u16) -> Self {
96        self.udp_port = port;
97        self
98    }
99
100    /// Set the operation timeout (default 5 s).
101    pub fn timeout(mut self, timeout: Duration) -> Self {
102        self.timeout = timeout;
103        self
104    }
105
106    /// Disable UDP broadcast search (use name servers only).
107    pub fn no_broadcast(mut self) -> Self {
108        self.no_broadcast = true;
109        self
110    }
111
112    /// Add a PVA name-server address for TCP search.
113    pub fn name_server(mut self, addr: SocketAddr) -> Self {
114        self.name_servers.push(addr);
115        self
116    }
117
118    /// Override the authentication user.
119    pub fn authnz_user(mut self, user: impl Into<String>) -> Self {
120        self.authnz_user = Some(user.into());
121        self
122    }
123
124    /// Override the authentication host.
125    pub fn authnz_host(mut self, host: impl Into<String>) -> Self {
126        self.authnz_host = Some(host.into());
127        self
128    }
129
130    /// Set an explicit server address, bypassing UDP search.
131    pub fn server_addr(mut self, addr: SocketAddr) -> Self {
132        self.server_addr = Some(addr);
133        self
134    }
135
136    /// Set the search target IP address.
137    pub fn search_addr(mut self, addr: std::net::IpAddr) -> Self {
138        self.search_addr = Some(addr);
139        self
140    }
141
142    /// Set the local bind IP for UDP search.
143    pub fn bind_addr(mut self, addr: std::net::IpAddr) -> Self {
144        self.bind_addr = Some(addr);
145        self
146    }
147
148    /// Enable debug logging.
149    pub fn debug(mut self) -> Self {
150        self.debug = true;
151        self
152    }
153
154    /// Build the [`PvaClient`].
155    pub fn build(self) -> PvaClient {
156        PvaClient {
157            udp_port: self.udp_port,
158            tcp_port: self.tcp_port,
159            timeout: self.timeout,
160            no_broadcast: self.no_broadcast,
161            name_servers: self.name_servers,
162            authnz_user: self.authnz_user,
163            authnz_host: self.authnz_host,
164            server_addr: self.server_addr,
165            search_addr: self.search_addr,
166            bind_addr: self.bind_addr,
167            debug: self.debug,
168        }
169    }
170}
171
172// ─── PvaClient ───────────────────────────────────────────────────────────────
173
174/// High-level PVAccess client.
175///
176/// Provides `pvget`, `pvput`, `pvmonitor`, and `pvinfo` methods that hide
177/// the underlying protocol handshake.
178///
179/// ```rust,ignore
180/// let client = PvaClient::builder().build();
181/// let val = client.pvget("MY:PV").await?;
182/// ```
183#[derive(Clone, Debug)]
184pub struct PvaClient {
185    udp_port: u16,
186    tcp_port: u16,
187    timeout: Duration,
188    no_broadcast: bool,
189    name_servers: Vec<SocketAddr>,
190    authnz_user: Option<String>,
191    authnz_host: Option<String>,
192    server_addr: Option<SocketAddr>,
193    search_addr: Option<std::net::IpAddr>,
194    bind_addr: Option<std::net::IpAddr>,
195    debug: bool,
196}
197
198impl PvaClient {
199    /// Create a builder for configuring a [`PvaClient`].
200    pub fn builder() -> PvaClientBuilder {
201        PvaClientBuilder::new()
202    }
203
204    /// Build [`PvOptions`] for a given PV name, inheriting client-level settings.
205    fn opts(&self, pv_name: &str) -> PvOptions {
206        let mut o = PvOptions::new(pv_name.to_string());
207        o.udp_port = self.udp_port;
208        o.tcp_port = self.tcp_port;
209        o.timeout = self.timeout;
210        o.no_broadcast = self.no_broadcast;
211        o.name_servers.clone_from(&self.name_servers);
212        o.authnz_user.clone_from(&self.authnz_user);
213        o.authnz_host.clone_from(&self.authnz_host);
214        o.server_addr = self.server_addr;
215        o.search_addr = self.search_addr;
216        o.bind_addr = self.bind_addr;
217        o.debug = self.debug;
218        o
219    }
220
221    /// Resolve a PV server and establish a channel, returning the raw connection.
222    async fn open_channel(&self, pv_name: &str) -> Result<ChannelConn, PvGetError> {
223        let opts = self.opts(pv_name);
224        let target = resolve_pv_server(&opts).await?;
225        establish_channel(target, &opts).await
226    }
227
228    // ─── pvget ───────────────────────────────────────────────────────────
229
230    /// Fetch the current value of a PV.
231    pub async fn pvget(&self, pv_name: &str) -> Result<PvGetResult, PvGetError> {
232        let opts = self.opts(pv_name);
233        low_level_pvget(&opts).await
234    }
235
236    // ─── pvput ───────────────────────────────────────────────────────────
237
238    /// Write a value to a PV.
239    ///
240    /// Accepts anything convertible to `serde_json::Value`:
241    /// ```rust,ignore
242    /// client.pvput("MY:PV", 42.0).await?;
243    /// client.pvput("MY:PV", "hello").await?;
244    /// client.pvput("MY:PV", serde_json::json!({"value": 1.5})).await?;
245    /// ```
246    pub async fn pvput(&self, pv_name: &str, value: impl Into<Value>) -> Result<(), PvGetError> {
247        let json_val = value.into();
248        let ChannelConn {
249            mut stream,
250            sid,
251            version: _,
252            is_be,
253        } = self.open_channel(pv_name).await?;
254
255        let ioid = alloc_ioid();
256
257        // PUT INIT — send pvRequest for "field(value)"
258        let pv_request = encode_pv_request(&["value"], is_be);
259        let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
260        stream.write_all(&init).await?;
261
262        // Read INIT response — extract introspection
263        let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
264            matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
265        })
266        .await?;
267
268        let desc = decode_init_introspection(&init_bytes, "PUT")?;
269
270        // Encode and send the value
271        let payload = encode_put_payload(&desc, &json_val, is_be)
272            .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
273        let req = encode_put_request(sid, ioid, 0x00, &payload, PVA_VERSION, is_be);
274        stream.write_all(&req).await?;
275
276        // Read PUT response — verify status
277        let resp_bytes = read_until(
278            &mut stream,
279            self.timeout,
280            |cmd| matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && op.subcmd == 0x00),
281        )
282        .await?;
283        ensure_status_ok(&resp_bytes, is_be, "PUT")?;
284
285        Ok(())
286    }
287
288    // ─── open_put_channel ────────────────────────────────────────────────
289
290    /// Open a persistent channel for high-rate PUT streaming.
291    ///
292    /// Resolves the PV, establishes a channel, and completes the PUT INIT
293    /// handshake. The returned [`PvaChannel`] is ready for immediate
294    /// [`put`](PvaChannel::put) calls.
295    pub async fn open_put_channel(&self, pv_name: &str) -> Result<PvaChannel, PvGetError> {
296        let ChannelConn {
297            mut stream,
298            sid,
299            version,
300            is_be,
301        } = self.open_channel(pv_name).await?;
302
303        let ioid = alloc_ioid();
304
305        // PUT INIT
306        let pv_request = encode_pv_request(&["value"], is_be);
307        let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
308        stream.write_all(&init).await?;
309
310        let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
311            matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
312        })
313        .await?;
314
315        let desc = decode_init_introspection(&init_bytes, "PUT")?;
316
317        // Split stream; background reader logs PUT errors
318        let (mut reader, writer) = stream.into_split();
319        let reader_is_be = is_be;
320        let reader_handle = tokio::spawn(async move {
321            loop {
322                let mut header = [0u8; 8];
323                if reader.read_exact(&mut header).await.is_err() {
324                    break;
325                }
326                let hdr = spvirit_codec::epics_decode::PvaHeader::new(&header);
327                let len = if hdr.flags.is_control {
328                    0usize
329                } else {
330                    hdr.payload_length as usize
331                };
332                let mut payload = vec![0u8; len];
333                if len > 0 && reader.read_exact(&mut payload).await.is_err() {
334                    break;
335                }
336                if hdr.command == 11 && !hdr.flags.is_control && len >= 5 {
337                    if let Some(st) =
338                        spvirit_codec::epics_decode::decode_status(&payload[5..], reader_is_be).0
339                    {
340                        if st.code != 0 {
341                            let msg = st.message.unwrap_or_else(|| format!("code={}", st.code));
342                            eprintln!("PvaChannel put error: {msg}");
343                        }
344                    }
345                }
346            }
347        });
348
349        Ok(PvaChannel {
350            writer,
351            sid,
352            ioid,
353            version,
354            is_be,
355            put_desc: desc,
356            echo_token: 1,
357            last_echo: Instant::now(),
358            _reader_handle: reader_handle,
359        })
360    }
361
362    // ─── pvmonitor ───────────────────────────────────────────────────────
363
364    /// Subscribe to a PV and receive live updates via a callback.
365    ///
366    /// The callback returns [`ControlFlow::Continue`] to keep listening or
367    /// [`ControlFlow::Break`] to stop the subscription.
368    ///
369    /// ```rust,ignore
370    /// use std::ops::ControlFlow;
371    ///
372    /// client.pvmonitor("MY:PV", |value| {
373    ///     println!("{value:?}");
374    ///     ControlFlow::Continue(())
375    /// }).await?;
376    /// ```
377    pub async fn pvmonitor<F>(&self, pv_name: &str, mut callback: F) -> Result<(), PvGetError>
378    where
379        F: FnMut(&DecodedValue) -> ControlFlow<()>,
380    {
381        let ChannelConn {
382            mut stream,
383            sid,
384            version: _,
385            is_be,
386        } = self.open_channel(pv_name).await?;
387
388        let ioid = alloc_ioid();
389        let decoder = PvdDecoder::new(is_be);
390
391        // MONITOR INIT — request value + alarm + timeStamp
392        let pv_request = encode_pv_request(&["value", "alarm", "timeStamp"], is_be);
393        let init = encode_monitor_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
394        stream.write_all(&init).await?;
395
396        // Read INIT response — extract introspection
397        let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
398            matches!(cmd, PvaPacketCommand::Op(op) if op.command == 13 && (op.subcmd & 0x08) != 0)
399        })
400        .await?;
401
402        let field_desc = decode_init_introspection(&init_bytes, "MONITOR")?;
403
404        // Start subscription (non-pipeline: START 0x04 + GET 0x40 = 0x44)
405        let start = encode_monitor_request(sid, ioid, 0x44, &[], PVA_VERSION, is_be);
406        stream.write_all(&start).await?;
407
408        // Event loop — with echo keepalive and timeout resilience
409        let mut echo_interval = interval(Duration::from_secs(10));
410        let mut echo_token: u32 = 1;
411
412        loop {
413            tokio::select! {
414                _ = echo_interval.tick() => {
415                    let msg = encode_control_message(false, is_be, PVA_VERSION, 3, echo_token);
416                    echo_token = echo_token.wrapping_add(1);
417                    let _ = stream.write_all(&msg).await;
418                }
419                res = read_packet(&mut stream, self.timeout) => {
420                    let bytes = match res {
421                        Ok(b) => b,
422                        Err(PvGetError::Timeout(_)) => continue,
423                        Err(e) => return Err(e),
424                    };
425                    let mut pkt = PvaPacket::new(&bytes);
426                    if let Some(PvaPacketCommand::Op(op)) = pkt.decode_payload() {
427                        if op.command == 13 && op.ioid == ioid && op.subcmd == 0x00 {
428                            let payload = &bytes[8..]; // skip header
429                            let pos = 5; // skip ioid(4) + subcmd(1)
430                            if let Some((decoded, _)) =
431                                decoder.decode_structure_with_bitset(&payload[pos..], &field_desc)
432                            {
433                                if callback(&decoded).is_break() {
434                                    return Ok(());
435                                }
436                            }
437                        }
438                    }
439                }
440            }
441        }
442    }
443
444    // ─── pvinfo ──────────────────────────────────────────────────────────
445
446    /// Retrieve the field/structure description (introspection) for a PV.
447    pub async fn pvinfo(&self, pv_name: &str) -> Result<StructureDesc, PvGetError> {
448        let ChannelConn {
449            mut stream,
450            sid,
451            version: _,
452            is_be,
453        } = self.open_channel(pv_name).await?;
454
455        let ioid = alloc_ioid();
456        let msg = encode_get_field_request(sid, ioid, None, PVA_VERSION, is_be);
457        stream.write_all(&msg).await?;
458
459        let resp_bytes = read_until(
460            &mut stream,
461            self.timeout,
462            |cmd| matches!(cmd, PvaPacketCommand::Op(op) if op.command == 17),
463        )
464        .await?;
465
466        decode_init_introspection(&resp_bytes, "GET_FIELD")
467    }
468
469    // ─── pvlist ──────────────────────────────────────────────────────────
470
471    /// List PV names served by a specific server (via `__pvlist` GET).
472    pub async fn pvlist(&self, server_addr: SocketAddr) -> Result<Vec<String>, PvGetError> {
473        let opts = self.opts("__pvlist");
474        crate::pvlist::pvlist(&opts, server_addr).await
475    }
476
477    /// List PV names with automatic fallback through all strategies.
478    ///
479    /// Tries: `__pvlist` → GET_FIELD (opt-in) → Server RPC → Server GET.
480    pub async fn pvlist_with_fallback(
481        &self,
482        server_addr: SocketAddr,
483    ) -> Result<(Vec<String>, crate::pvlist::PvListSource), PvGetError> {
484        let opts = self.opts("__pvlist");
485        crate::pvlist::pvlist_with_fallback(&opts, server_addr).await
486    }
487}
488
489// ─── PvaChannel ──────────────────────────────────────────────────────────────
490
491/// A persistent PVA channel for high-rate streaming PUT operations.
492///
493/// Created via [`PvaClient::open_put_channel`], this keeps the TCP connection
494/// open and reuses the PUT introspection for repeated writes without
495/// per-operation handshake overhead.
496///
497/// # Example
498///
499/// ```rust,ignore
500/// let client = PvaClient::builder().build();
501/// let mut channel = client.open_put_channel("MY:PV").await?;
502/// for value in 0..100 {
503///     channel.put(value as f64).await?;
504/// }
505/// ```
506pub struct PvaChannel {
507    writer: OwnedWriteHalf,
508    sid: u32,
509    ioid: u32,
510    version: u8,
511    is_be: bool,
512    put_desc: StructureDesc,
513    echo_token: u32,
514    last_echo: Instant,
515    _reader_handle: JoinHandle<()>,
516}
517
518impl PvaChannel {
519    /// Write a value over the persistent channel.
520    ///
521    /// Automatically sends echo keepalive pings when more than 10 seconds
522    /// have elapsed since the last one.
523    pub async fn put(&mut self, value: impl Into<Value>) -> Result<(), PvGetError> {
524        // Echo keepalive
525        if self.last_echo.elapsed() >= Duration::from_secs(10) {
526            let msg = encode_control_message(false, self.is_be, self.version, 3, self.echo_token);
527            self.echo_token = self.echo_token.wrapping_add(1);
528            let _ = self.writer.write_all(&msg).await;
529            self.last_echo = Instant::now();
530        }
531
532        let json_val = value.into();
533        let payload = encode_put_payload(&self.put_desc, &json_val, self.is_be)
534            .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
535        let req = encode_put_request(
536            self.sid,
537            self.ioid,
538            0x00,
539            &payload,
540            self.version,
541            self.is_be,
542        );
543        self.writer.write_all(&req).await?;
544        Ok(())
545    }
546
547    /// Returns the PUT introspection for this channel.
548    pub fn introspection(&self) -> &StructureDesc {
549        &self.put_desc
550    }
551}
552
553impl Drop for PvaChannel {
554    fn drop(&mut self) {
555        self._reader_handle.abort();
556    }
557}
558
559// ─── Standalone convenience functions ────────────────────────────────────────
560
561/// Write a value to a PV (one-shot).
562///
563/// ```rust,ignore
564/// use spvirit_client::{pvput, PvOptions};
565///
566/// pvput(&PvOptions::new("MY:PV".into()), 42.0).await?;
567/// ```
568pub async fn pvput(opts: &PvOptions, value: impl Into<Value>) -> Result<(), PvGetError> {
569    let client = client_from_opts(opts);
570    client.pvput(&opts.pv_name, value).await
571}
572
573/// Subscribe to a PV and receive live updates (one-shot).
574///
575/// The callback returns [`ControlFlow::Continue`] to keep listening or
576/// [`ControlFlow::Break`] to stop.
577pub async fn pvmonitor<F>(opts: &PvOptions, callback: F) -> Result<(), PvGetError>
578where
579    F: FnMut(&DecodedValue) -> ControlFlow<()>,
580{
581    let client = client_from_opts(opts);
582    client.pvmonitor(&opts.pv_name, callback).await
583}
584
585/// Retrieve the field/structure description for a PV (one-shot).
586pub async fn pvinfo(opts: &PvOptions) -> Result<StructureDesc, PvGetError> {
587    let client = client_from_opts(opts);
588    client.pvinfo(&opts.pv_name).await
589}
590
591// ─── Internal helpers ────────────────────────────────────────────────────────
592
593/// Build a PvaClient inheriting configuration from PvOptions.
594pub fn client_from_opts(opts: &PvOptions) -> PvaClient {
595    let mut b = PvaClient::builder()
596        .port(opts.tcp_port)
597        .udp_port(opts.udp_port)
598        .timeout(opts.timeout);
599    if opts.no_broadcast {
600        b = b.no_broadcast();
601    }
602    for ns in &opts.name_servers {
603        b = b.name_server(*ns);
604    }
605    if let Some(ref u) = opts.authnz_user {
606        b = b.authnz_user(u.clone());
607    }
608    if let Some(ref h) = opts.authnz_host {
609        b = b.authnz_host(h.clone());
610    }
611    if let Some(addr) = opts.server_addr {
612        b = b.server_addr(addr);
613    }
614    if let Some(addr) = opts.search_addr {
615        b = b.search_addr(addr);
616    }
617    if let Some(addr) = opts.bind_addr {
618        b = b.bind_addr(addr);
619    }
620    if opts.debug {
621        b = b.debug();
622    }
623    b.build()
624}
625
626/// Decode an INIT response to extract the introspection StructureDesc.
627fn decode_init_introspection(raw: &[u8], label: &str) -> Result<StructureDesc, PvGetError> {
628    let mut pkt = PvaPacket::new(raw);
629    let cmd = pkt
630        .decode_payload()
631        .ok_or_else(|| PvGetError::Decode(format!("{label} init response decode failed")))?;
632
633    match cmd {
634        PvaPacketCommand::Op(op) => {
635            if let Some(ref st) = op.status {
636                if st.is_error() {
637                    let msg = st
638                        .message
639                        .clone()
640                        .unwrap_or_else(|| format!("code={}", st.code));
641                    return Err(PvGetError::Protocol(format!("{label} init error: {msg}")));
642                }
643            }
644            op.introspection
645                .ok_or_else(|| PvGetError::Decode(format!("missing {label} introspection")))
646        }
647        _ => Err(PvGetError::Protocol(format!(
648            "unexpected {label} init response"
649        ))),
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656
657    #[test]
658    fn builder_defaults() {
659        let c = PvaClient::builder().build();
660        assert_eq!(c.tcp_port, 5075);
661        assert_eq!(c.udp_port, 5076);
662        assert_eq!(c.timeout, Duration::from_secs(5));
663        assert!(!c.no_broadcast);
664        assert!(c.name_servers.is_empty());
665    }
666
667    #[test]
668    fn builder_overrides() {
669        let c = PvaClient::builder()
670            .port(9075)
671            .udp_port(9076)
672            .timeout(Duration::from_secs(10))
673            .no_broadcast()
674            .name_server("127.0.0.1:5075".parse().unwrap())
675            .authnz_user("testuser")
676            .authnz_host("testhost")
677            .build();
678        assert_eq!(c.tcp_port, 9075);
679        assert_eq!(c.udp_port, 9076);
680        assert_eq!(c.timeout, Duration::from_secs(10));
681        assert!(c.no_broadcast);
682        assert_eq!(c.name_servers.len(), 1);
683        assert_eq!(c.authnz_user.as_deref(), Some("testuser"));
684        assert_eq!(c.authnz_host.as_deref(), Some("testhost"));
685    }
686
687    #[test]
688    fn opts_inherits_client_config() {
689        let c = PvaClient::builder()
690            .port(9075)
691            .udp_port(9076)
692            .timeout(Duration::from_secs(10))
693            .no_broadcast()
694            .build();
695        let o = c.opts("TEST:PV");
696        assert_eq!(o.pv_name, "TEST:PV");
697        assert_eq!(o.tcp_port, 9075);
698        assert_eq!(o.udp_port, 9076);
699        assert_eq!(o.timeout, Duration::from_secs(10));
700        assert!(o.no_broadcast);
701    }
702
703    #[test]
704    fn client_from_opts_roundtrip() {
705        let mut opts = PvOptions::new("X:Y".into());
706        opts.tcp_port = 8075;
707        opts.udp_port = 8076;
708        opts.timeout = Duration::from_secs(3);
709        opts.no_broadcast = true;
710        let c = client_from_opts(&opts);
711        assert_eq!(c.tcp_port, 8075);
712        assert_eq!(c.udp_port, 8076);
713        assert!(c.no_broadcast);
714    }
715
716    #[test]
717    fn pv_get_options_alias_works() {
718        // PvGetOptions is a type alias for PvOptions — verify it compiles and works
719        let opts: crate::types::PvGetOptions = PvOptions::new("ALIAS:TEST".into());
720        assert_eq!(opts.pv_name, "ALIAS:TEST");
721    }
722}