Skip to main content

spvirit_client/
pva_client.rs

1//! High-level PVAccess client — one-liner get, put, monitor, info.
2//!
3//! # Example
4//!
5//! ```rust,ignore
6//! use spvirit_client::PvaClient;
7//!
8//! let client = PvaClient::builder().build();
9//! let result = client.pvget("MY:PV").await?;
10//! client.pvput("MY:PV", 42.0).await?;
11//! ```
12
13use std::net::SocketAddr;
14use std::ops::ControlFlow;
15use std::sync::atomic::{AtomicU32, Ordering};
16use std::time::Duration;
17
18use serde_json::Value;
19use tokio::io::{AsyncReadExt, AsyncWriteExt};
20use tokio::net::tcp::OwnedWriteHalf;
21use tokio::task::JoinHandle;
22use tokio::time::{Instant, interval};
23
24use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
25use spvirit_codec::spvd_decode::{DecodedValue, PvdDecoder, StructureDesc};
26use spvirit_codec::spvd_encode::encode_pv_request;
27use spvirit_codec::spvirit_encode::{
28    encode_control_message, encode_get_field_request, encode_monitor_request, encode_put_request,
29};
30
31use crate::client::{ChannelConn, ensure_status_ok, establish_channel, pvget as low_level_pvget};
32use crate::put_encode::encode_put_payload;
33use crate::search::resolve_pv_server;
34use crate::transport::{read_packet, read_until};
35use crate::types::{PvGetError, PvGetResult, PvOptions};
36
37/// PVA protocol version used in headers.
38const PVA_VERSION: u8 = 2;
39/// QoS / subcommand flag: INIT.
40const QOS_INIT: u8 = 0x08;
41
42static NEXT_IOID: AtomicU32 = AtomicU32::new(1);
43fn alloc_ioid() -> u32 {
44    NEXT_IOID.fetch_add(1, Ordering::Relaxed)
45}
46
47// ─── PvaClientBuilder ────────────────────────────────────────────────────────
48
49/// Builder for [`PvaClient`].
50///
51/// ```rust,ignore
52/// let client = PvaClient::builder()
53///     .timeout(Duration::from_secs(10))
54///     .port(5075)
55///     .build();
56/// ```
57pub struct PvaClientBuilder {
58    udp_port: u16,
59    tcp_port: u16,
60    timeout: Duration,
61    no_broadcast: bool,
62    name_servers: Vec<SocketAddr>,
63    authnz_user: Option<String>,
64    authnz_host: Option<String>,
65    server_addr: Option<SocketAddr>,
66    search_addr: Option<std::net::IpAddr>,
67    bind_addr: Option<std::net::IpAddr>,
68    debug: bool,
69}
70
71impl PvaClientBuilder {
72    fn new() -> Self {
73        Self {
74            udp_port: 5076,
75            tcp_port: 5075,
76            timeout: Duration::from_secs(5),
77            no_broadcast: false,
78            name_servers: Vec::new(),
79            authnz_user: None,
80            authnz_host: None,
81            server_addr: None,
82            search_addr: None,
83            bind_addr: None,
84            debug: false,
85        }
86    }
87
88    /// Set the TCP port (default 5075).
89    pub fn port(mut self, port: u16) -> Self {
90        self.tcp_port = port;
91        self
92    }
93
94    /// Set the UDP search port (default 5076).
95    pub fn udp_port(mut self, port: u16) -> Self {
96        self.udp_port = port;
97        self
98    }
99
100    /// Set the operation timeout (default 5 s).
101    pub fn timeout(mut self, timeout: Duration) -> Self {
102        self.timeout = timeout;
103        self
104    }
105
106    /// Disable UDP broadcast search (use name servers only).
107    pub fn no_broadcast(mut self) -> Self {
108        self.no_broadcast = true;
109        self
110    }
111
112    /// Add a PVA name-server address for TCP search.
113    pub fn name_server(mut self, addr: SocketAddr) -> Self {
114        self.name_servers.push(addr);
115        self
116    }
117
118    /// Override the authentication user.
119    pub fn authnz_user(mut self, user: impl Into<String>) -> Self {
120        self.authnz_user = Some(user.into());
121        self
122    }
123
124    /// Override the authentication host.
125    pub fn authnz_host(mut self, host: impl Into<String>) -> Self {
126        self.authnz_host = Some(host.into());
127        self
128    }
129
130    /// Set an explicit server address, bypassing UDP search.
131    pub fn server_addr(mut self, addr: SocketAddr) -> Self {
132        self.server_addr = Some(addr);
133        self
134    }
135
136    /// Set the search target IP address.
137    pub fn search_addr(mut self, addr: std::net::IpAddr) -> Self {
138        self.search_addr = Some(addr);
139        self
140    }
141
142    /// Set the local bind IP for UDP search.
143    pub fn bind_addr(mut self, addr: std::net::IpAddr) -> Self {
144        self.bind_addr = Some(addr);
145        self
146    }
147
148    /// Enable debug logging.
149    pub fn debug(mut self) -> Self {
150        self.debug = true;
151        self
152    }
153
154    /// Build the [`PvaClient`].
155    pub fn build(self) -> PvaClient {
156        PvaClient {
157            udp_port: self.udp_port,
158            tcp_port: self.tcp_port,
159            timeout: self.timeout,
160            no_broadcast: self.no_broadcast,
161            name_servers: self.name_servers,
162            authnz_user: self.authnz_user,
163            authnz_host: self.authnz_host,
164            server_addr: self.server_addr,
165            search_addr: self.search_addr,
166            bind_addr: self.bind_addr,
167            debug: self.debug,
168        }
169    }
170}
171
172// ─── PvaClient ───────────────────────────────────────────────────────────────
173
174/// High-level PVAccess client.
175///
176/// Provides `pvget`, `pvput`, `pvmonitor`, and `pvinfo` methods that hide
177/// the underlying protocol handshake.
178///
179/// ```rust,ignore
180/// let client = PvaClient::builder().build();
181/// let val = client.pvget("MY:PV").await?;
182/// ```
183#[derive(Clone, Debug)]
184pub struct PvaClient {
185    udp_port: u16,
186    tcp_port: u16,
187    timeout: Duration,
188    no_broadcast: bool,
189    name_servers: Vec<SocketAddr>,
190    authnz_user: Option<String>,
191    authnz_host: Option<String>,
192    server_addr: Option<SocketAddr>,
193    search_addr: Option<std::net::IpAddr>,
194    bind_addr: Option<std::net::IpAddr>,
195    debug: bool,
196}
197
198impl PvaClient {
199    /// Create a builder for configuring a [`PvaClient`].
200    pub fn builder() -> PvaClientBuilder {
201        PvaClientBuilder::new()
202    }
203
204    /// Build [`PvOptions`] for a given PV name, inheriting client-level settings.
205    fn opts(&self, pv_name: &str) -> PvOptions {
206        let mut o = PvOptions::new(pv_name.to_string());
207        o.udp_port = self.udp_port;
208        o.tcp_port = self.tcp_port;
209        o.timeout = self.timeout;
210        o.no_broadcast = self.no_broadcast;
211        o.name_servers.clone_from(&self.name_servers);
212        o.authnz_user.clone_from(&self.authnz_user);
213        o.authnz_host.clone_from(&self.authnz_host);
214        o.server_addr = self.server_addr;
215        o.search_addr = self.search_addr;
216        o.bind_addr = self.bind_addr;
217        o.debug = self.debug;
218        o
219    }
220
221    /// Resolve a PV server and establish a channel, returning the raw connection.
222    async fn open_channel(&self, pv_name: &str) -> Result<ChannelConn, PvGetError> {
223        let opts = self.opts(pv_name);
224        let target = resolve_pv_server(&opts).await?;
225        establish_channel(target, &opts).await
226    }
227
228    // ─── pvget ───────────────────────────────────────────────────────────
229
230    /// Fetch the current value of a PV.
231    pub async fn pvget(&self, pv_name: &str) -> Result<PvGetResult, PvGetError> {
232        let opts = self.opts(pv_name);
233        low_level_pvget(&opts).await
234    }
235
236    /// Fetch a PV with field filtering (equivalent to `pvget -r "field(value,alarm)"`).
237    pub async fn pvget_fields(
238        &self,
239        pv_name: &str,
240        fields: &[&str],
241    ) -> Result<PvGetResult, PvGetError> {
242        let opts = self.opts(pv_name);
243        crate::client::pvget_fields(&opts, fields).await
244    }
245
246    // ─── pvput ───────────────────────────────────────────────────────────
247
248    /// Write a value to a PV.
249    ///
250    /// Accepts anything convertible to `serde_json::Value`:
251    /// ```rust,ignore
252    /// client.pvput("MY:PV", 42.0).await?;
253    /// client.pvput("MY:PV", "hello").await?;
254    /// client.pvput("MY:PV", serde_json::json!({"value": 1.5})).await?;
255    /// ```
256    pub async fn pvput(&self, pv_name: &str, value: impl Into<Value>) -> Result<(), PvGetError> {
257        let json_val = value.into();
258        let ChannelConn {
259            mut stream,
260            sid,
261            version: _,
262            is_be,
263            ..
264        } = self.open_channel(pv_name).await?;
265
266        let ioid = alloc_ioid();
267
268        // PUT INIT — send pvRequest for "field(value)"
269        let pv_request = encode_pv_request(&["value"], is_be);
270        let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
271        stream.write_all(&init).await?;
272
273        // Read INIT response — extract introspection
274        let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
275            matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
276        })
277        .await?;
278
279        let desc = decode_init_introspection(&init_bytes, "PUT")?;
280
281        // Encode and send the value
282        let payload = encode_put_payload(&desc, &json_val, is_be)
283            .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
284        let req = encode_put_request(sid, ioid, 0x00, &payload, PVA_VERSION, is_be);
285        stream.write_all(&req).await?;
286
287        // Read PUT response — verify status
288        let resp_bytes = read_until(
289            &mut stream,
290            self.timeout,
291            |cmd| matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && op.subcmd == 0x00),
292        )
293        .await?;
294        ensure_status_ok(&resp_bytes, is_be, "PUT")?;
295
296        Ok(())
297    }
298
299    // ─── open_put_channel ────────────────────────────────────────────────
300
301    /// Open a persistent channel for high-rate PUT streaming.
302    ///
303    /// Resolves the PV, establishes a channel, and completes the PUT INIT
304    /// handshake. The returned [`PvaChannel`] is ready for immediate
305    /// [`put`](PvaChannel::put) calls.
306    pub async fn open_put_channel(&self, pv_name: &str) -> Result<PvaChannel, PvGetError> {
307        let ChannelConn {
308            mut stream,
309            sid,
310            version,
311            is_be,
312            ..
313        } = self.open_channel(pv_name).await?;
314
315        let ioid = alloc_ioid();
316
317        // PUT INIT
318        let pv_request = encode_pv_request(&["value"], is_be);
319        let init = encode_put_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
320        stream.write_all(&init).await?;
321
322        let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
323            matches!(cmd, PvaPacketCommand::Op(op) if op.command == 11 && (op.subcmd & 0x08) != 0)
324        })
325        .await?;
326
327        let desc = decode_init_introspection(&init_bytes, "PUT")?;
328
329        // Split stream; background reader logs PUT errors
330        let (mut reader, writer) = stream.into_split();
331        let reader_is_be = is_be;
332        let reader_handle = tokio::spawn(async move {
333            loop {
334                let mut header = [0u8; 8];
335                if reader.read_exact(&mut header).await.is_err() {
336                    break;
337                }
338                let hdr = spvirit_codec::epics_decode::PvaHeader::new(&header);
339                let len = if hdr.flags.is_control {
340                    0usize
341                } else {
342                    hdr.payload_length as usize
343                };
344                let mut payload = vec![0u8; len];
345                if len > 0 && reader.read_exact(&mut payload).await.is_err() {
346                    break;
347                }
348                if hdr.command == 11 && !hdr.flags.is_control && len >= 5 {
349                    if let Some(st) =
350                        spvirit_codec::epics_decode::decode_status(&payload[5..], reader_is_be).0
351                    {
352                        if st.code != 0 {
353                            let msg = st.message.unwrap_or_else(|| format!("code={}", st.code));
354                            eprintln!("PvaChannel put error: {msg}");
355                        }
356                    }
357                }
358            }
359        });
360
361        Ok(PvaChannel {
362            writer,
363            sid,
364            ioid,
365            version,
366            is_be,
367            put_desc: desc,
368            echo_token: 1,
369            last_echo: Instant::now(),
370            _reader_handle: reader_handle,
371        })
372    }
373
374    // ─── pvmonitor ───────────────────────────────────────────────────────
375
376    /// Subscribe to a PV and receive live updates via a callback.
377    ///
378    /// The callback returns [`ControlFlow::Continue`] to keep listening or
379    /// [`ControlFlow::Break`] to stop the subscription.
380    ///
381    /// ```rust,ignore
382    /// use std::ops::ControlFlow;
383    ///
384    /// client.pvmonitor("MY:PV", |value| {
385    ///     println!("{value:?}");
386    ///     ControlFlow::Continue(())
387    /// }).await?;
388    /// ```
389    pub async fn pvmonitor<F>(&self, pv_name: &str, mut callback: F) -> Result<(), PvGetError>
390    where
391        F: FnMut(&DecodedValue) -> ControlFlow<()>,
392    {
393        let ChannelConn {
394            mut stream,
395            sid,
396            version: _,
397            is_be,
398            ..
399        } = self.open_channel(pv_name).await?;
400
401        let ioid = alloc_ioid();
402        let decoder = PvdDecoder::new(is_be);
403
404        // MONITOR INIT — request value + alarm + timeStamp
405        let pv_request = encode_pv_request(&["value", "alarm", "timeStamp"], is_be);
406        let init = encode_monitor_request(sid, ioid, QOS_INIT, &pv_request, PVA_VERSION, is_be);
407        stream.write_all(&init).await?;
408
409        // Read INIT response — extract introspection
410        let init_bytes = read_until(&mut stream, self.timeout, |cmd| {
411            matches!(cmd, PvaPacketCommand::Op(op) if op.command == 13 && (op.subcmd & 0x08) != 0)
412        })
413        .await?;
414
415        let field_desc = decode_init_introspection(&init_bytes, "MONITOR")?;
416
417        // Start subscription (non-pipeline: START 0x04 + GET 0x40 = 0x44)
418        let start = encode_monitor_request(sid, ioid, 0x44, &[], PVA_VERSION, is_be);
419        stream.write_all(&start).await?;
420
421        // Event loop — with echo keepalive and timeout resilience
422        let mut echo_interval = interval(Duration::from_secs(10));
423        let mut echo_token: u32 = 1;
424
425        loop {
426            tokio::select! {
427                _ = echo_interval.tick() => {
428                    let msg = encode_control_message(false, is_be, PVA_VERSION, 3, echo_token);
429                    echo_token = echo_token.wrapping_add(1);
430                    let _ = stream.write_all(&msg).await;
431                }
432                res = read_packet(&mut stream, self.timeout) => {
433                    let bytes = match res {
434                        Ok(b) => b,
435                        Err(PvGetError::Timeout(_)) => continue,
436                        Err(e) => return Err(e),
437                    };
438                    let mut pkt = PvaPacket::new(&bytes);
439                    if let Some(PvaPacketCommand::Op(op)) = pkt.decode_payload() {
440                        if op.command == 13 && op.ioid == ioid && op.subcmd == 0x00 {
441                            let payload = &bytes[8..]; // skip header
442                            let pos = 5; // skip ioid(4) + subcmd(1)
443                            if let Some((decoded, _)) =
444                                decoder.decode_structure_with_bitset(&payload[pos..], &field_desc)
445                            {
446                                if callback(&decoded).is_break() {
447                                    return Ok(());
448                                }
449                            }
450                        }
451                    }
452                }
453            }
454        }
455    }
456
457    // ─── pvinfo ──────────────────────────────────────────────────────────
458
459    /// Retrieve the field/structure description (introspection) for a PV.
460    pub async fn pvinfo(&self, pv_name: &str) -> Result<StructureDesc, PvGetError> {
461        let result = self.pvinfo_full(pv_name).await?;
462        Ok(result.0)
463    }
464
465    /// Retrieve introspection and server address for a PV.
466    pub async fn pvinfo_full(
467        &self,
468        pv_name: &str,
469    ) -> Result<(StructureDesc, SocketAddr), PvGetError> {
470        let ChannelConn {
471            mut stream,
472            sid,
473            version: _,
474            is_be,
475            server_addr,
476        } = self.open_channel(pv_name).await?;
477
478        let ioid = alloc_ioid();
479        let msg = encode_get_field_request(sid, ioid, None, PVA_VERSION, is_be);
480        stream.write_all(&msg).await?;
481
482        let resp_bytes = read_until(
483            &mut stream,
484            self.timeout,
485            |cmd| matches!(cmd, PvaPacketCommand::GetField(_)),
486        )
487        .await?;
488
489        let mut pkt = PvaPacket::new(&resp_bytes);
490        let cmd = pkt
491            .decode_payload()
492            .ok_or_else(|| PvGetError::Decode("GET_FIELD response decode failed".to_string()))?;
493        match cmd {
494            PvaPacketCommand::GetField(payload) => {
495                if let Some(ref st) = payload.status {
496                    if st.is_error() {
497                        let msg = st
498                            .message
499                            .clone()
500                            .unwrap_or_else(|| format!("code={}", st.code));
501                        return Err(PvGetError::Protocol(format!("GET_FIELD error: {msg}")));
502                    }
503                }
504                let desc = payload
505                    .introspection
506                    .ok_or_else(|| PvGetError::Decode("missing GET_FIELD introspection".to_string()))?;
507                Ok((desc, server_addr))
508            }
509            _ => Err(PvGetError::Protocol(
510                "unexpected GET_FIELD response".to_string(),
511            )),
512        }
513    }
514
515    // ─── pvlist ──────────────────────────────────────────────────────────
516
517    /// List PV names served by a specific server (via `__pvlist` GET).
518    pub async fn pvlist(&self, server_addr: SocketAddr) -> Result<Vec<String>, PvGetError> {
519        let opts = self.opts("__pvlist");
520        crate::pvlist::pvlist(&opts, server_addr).await
521    }
522
523    /// List PV names with automatic fallback through all strategies.
524    ///
525    /// Tries: `__pvlist` → GET_FIELD (opt-in) → Server RPC → Server GET.
526    pub async fn pvlist_with_fallback(
527        &self,
528        server_addr: SocketAddr,
529    ) -> Result<(Vec<String>, crate::pvlist::PvListSource), PvGetError> {
530        let opts = self.opts("__pvlist");
531        crate::pvlist::pvlist_with_fallback(&opts, server_addr).await
532    }
533}
534
535// ─── PvaChannel ──────────────────────────────────────────────────────────────
536
537/// A persistent PVA channel for high-rate streaming PUT operations.
538///
539/// Created via [`PvaClient::open_put_channel`], this keeps the TCP connection
540/// open and reuses the PUT introspection for repeated writes without
541/// per-operation handshake overhead.
542///
543/// # Example
544///
545/// ```rust,ignore
546/// let client = PvaClient::builder().build();
547/// let mut channel = client.open_put_channel("MY:PV").await?;
548/// for value in 0..100 {
549///     channel.put(value as f64).await?;
550/// }
551/// ```
552pub struct PvaChannel {
553    writer: OwnedWriteHalf,
554    sid: u32,
555    ioid: u32,
556    version: u8,
557    is_be: bool,
558    put_desc: StructureDesc,
559    echo_token: u32,
560    last_echo: Instant,
561    _reader_handle: JoinHandle<()>,
562}
563
564impl PvaChannel {
565    /// Write a value over the persistent channel.
566    ///
567    /// Automatically sends echo keepalive pings when more than 10 seconds
568    /// have elapsed since the last one.
569    pub async fn put(&mut self, value: impl Into<Value>) -> Result<(), PvGetError> {
570        // Echo keepalive
571        if self.last_echo.elapsed() >= Duration::from_secs(10) {
572            let msg = encode_control_message(false, self.is_be, self.version, 3, self.echo_token);
573            self.echo_token = self.echo_token.wrapping_add(1);
574            let _ = self.writer.write_all(&msg).await;
575            self.last_echo = Instant::now();
576        }
577
578        let json_val = value.into();
579        let payload = encode_put_payload(&self.put_desc, &json_val, self.is_be)
580            .map_err(|e| PvGetError::Protocol(format!("put encode: {e}")))?;
581        let req = encode_put_request(
582            self.sid,
583            self.ioid,
584            0x00,
585            &payload,
586            self.version,
587            self.is_be,
588        );
589        self.writer.write_all(&req).await?;
590        Ok(())
591    }
592
593    /// Returns the PUT introspection for this channel.
594    pub fn introspection(&self) -> &StructureDesc {
595        &self.put_desc
596    }
597}
598
599impl Drop for PvaChannel {
600    fn drop(&mut self) {
601        self._reader_handle.abort();
602    }
603}
604
605// ─── Standalone convenience functions ────────────────────────────────────────
606
607/// Write a value to a PV (one-shot).
608///
609/// ```rust,ignore
610/// use spvirit_client::{pvput, PvOptions};
611///
612/// pvput(&PvOptions::new("MY:PV".into()), 42.0).await?;
613/// ```
614pub async fn pvput(opts: &PvOptions, value: impl Into<Value>) -> Result<(), PvGetError> {
615    let client = client_from_opts(opts);
616    client.pvput(&opts.pv_name, value).await
617}
618
619/// Subscribe to a PV and receive live updates (one-shot).
620///
621/// The callback returns [`ControlFlow::Continue`] to keep listening or
622/// [`ControlFlow::Break`] to stop.
623pub async fn pvmonitor<F>(opts: &PvOptions, callback: F) -> Result<(), PvGetError>
624where
625    F: FnMut(&DecodedValue) -> ControlFlow<()>,
626{
627    let client = client_from_opts(opts);
628    client.pvmonitor(&opts.pv_name, callback).await
629}
630
631/// Retrieve the field/structure description for a PV (one-shot).
632pub async fn pvinfo(opts: &PvOptions) -> Result<StructureDesc, PvGetError> {
633    let client = client_from_opts(opts);
634    client.pvinfo(&opts.pv_name).await
635}
636
637// ─── Internal helpers ────────────────────────────────────────────────────────
638
639/// Build a PvaClient inheriting configuration from PvOptions.
640pub fn client_from_opts(opts: &PvOptions) -> PvaClient {
641    let mut b = PvaClient::builder()
642        .port(opts.tcp_port)
643        .udp_port(opts.udp_port)
644        .timeout(opts.timeout);
645    if opts.no_broadcast {
646        b = b.no_broadcast();
647    }
648    for ns in &opts.name_servers {
649        b = b.name_server(*ns);
650    }
651    if let Some(ref u) = opts.authnz_user {
652        b = b.authnz_user(u.clone());
653    }
654    if let Some(ref h) = opts.authnz_host {
655        b = b.authnz_host(h.clone());
656    }
657    if let Some(addr) = opts.server_addr {
658        b = b.server_addr(addr);
659    }
660    if let Some(addr) = opts.search_addr {
661        b = b.search_addr(addr);
662    }
663    if let Some(addr) = opts.bind_addr {
664        b = b.bind_addr(addr);
665    }
666    if opts.debug {
667        b = b.debug();
668    }
669    b.build()
670}
671
672/// Decode an INIT response to extract the introspection StructureDesc.
673fn decode_init_introspection(raw: &[u8], label: &str) -> Result<StructureDesc, PvGetError> {
674    let mut pkt = PvaPacket::new(raw);
675    let cmd = pkt
676        .decode_payload()
677        .ok_or_else(|| PvGetError::Decode(format!("{label} init response decode failed")))?;
678
679    match cmd {
680        PvaPacketCommand::Op(op) => {
681            if let Some(ref st) = op.status {
682                if st.is_error() {
683                    let msg = st
684                        .message
685                        .clone()
686                        .unwrap_or_else(|| format!("code={}", st.code));
687                    return Err(PvGetError::Protocol(format!("{label} init error: {msg}")));
688                }
689            }
690            op.introspection
691                .ok_or_else(|| PvGetError::Decode(format!("missing {label} introspection")))
692        }
693        _ => Err(PvGetError::Protocol(format!(
694            "unexpected {label} init response"
695        ))),
696    }
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702
703    #[test]
704    fn builder_defaults() {
705        let c = PvaClient::builder().build();
706        assert_eq!(c.tcp_port, 5075);
707        assert_eq!(c.udp_port, 5076);
708        assert_eq!(c.timeout, Duration::from_secs(5));
709        assert!(!c.no_broadcast);
710        assert!(c.name_servers.is_empty());
711    }
712
713    #[test]
714    fn builder_overrides() {
715        let c = PvaClient::builder()
716            .port(9075)
717            .udp_port(9076)
718            .timeout(Duration::from_secs(10))
719            .no_broadcast()
720            .name_server("127.0.0.1:5075".parse().unwrap())
721            .authnz_user("testuser")
722            .authnz_host("testhost")
723            .build();
724        assert_eq!(c.tcp_port, 9075);
725        assert_eq!(c.udp_port, 9076);
726        assert_eq!(c.timeout, Duration::from_secs(10));
727        assert!(c.no_broadcast);
728        assert_eq!(c.name_servers.len(), 1);
729        assert_eq!(c.authnz_user.as_deref(), Some("testuser"));
730        assert_eq!(c.authnz_host.as_deref(), Some("testhost"));
731    }
732
733    #[test]
734    fn opts_inherits_client_config() {
735        let c = PvaClient::builder()
736            .port(9075)
737            .udp_port(9076)
738            .timeout(Duration::from_secs(10))
739            .no_broadcast()
740            .build();
741        let o = c.opts("TEST:PV");
742        assert_eq!(o.pv_name, "TEST:PV");
743        assert_eq!(o.tcp_port, 9075);
744        assert_eq!(o.udp_port, 9076);
745        assert_eq!(o.timeout, Duration::from_secs(10));
746        assert!(o.no_broadcast);
747    }
748
749    #[test]
750    fn client_from_opts_roundtrip() {
751        let mut opts = PvOptions::new("X:Y".into());
752        opts.tcp_port = 8075;
753        opts.udp_port = 8076;
754        opts.timeout = Duration::from_secs(3);
755        opts.no_broadcast = true;
756        let c = client_from_opts(&opts);
757        assert_eq!(c.tcp_port, 8075);
758        assert_eq!(c.udp_port, 8076);
759        assert!(c.no_broadcast);
760    }
761
762    #[test]
763    fn pv_get_options_alias_works() {
764        // PvGetOptions is a type alias for PvOptions — verify it compiles and works
765        let opts: crate::types::PvGetOptions = PvOptions::new("ALIAS:TEST".into());
766        assert_eq!(opts.pv_name, "ALIAS:TEST");
767    }
768}