Skip to main content

spvirit_client/
pvlist.rs

1//! PV listing — discover available PV names from a PVA server.
2//!
3//! Provides multiple discovery strategies (tried in order by
4//! [`pvlist_with_fallback`]):
5//!
6//! 1. `__pvlist` GET (preferred — spvirit / EPICS7 servers)
7//! 2. Connection-level GET_FIELD (legacy, opt-in via env var)
8//! 3. Server RPC `op=channels`
9//! 4. Server GET with heuristic parsing
10
11use std::net::SocketAddr;
12
13use tokio::io::AsyncWriteExt;
14use tokio::net::TcpStream;
15use tokio::time::timeout;
16
17use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvirit_encode::encode_rpc_request;
19use spvirit_codec::spvd_decode::{
20    extract_nt_scalar_value, DecodedValue, FieldDesc, FieldType, PvdDecoder, StructureDesc,
21};
22use spvirit_codec::spvd_encode::{encode_string_pvd, encode_structure_desc};
23
24use crate::client::{
25    build_client_validation, encode_get_field_request, encode_get_request, establish_channel,
26    pvget, ChannelConn,
27};
28use crate::transport::{read_packet, read_until};
29use crate::types::{PvGetError, PvOptions};
30
31/// Which discovery strategy succeeded.
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum PvListSource {
34    PvList,
35    GetField,
36    ServerRpc,
37    ServerGet,
38}
39
40// ─── Helpers ─────────────────────────────────────────────────────────────────
41
42const PV_REQUEST_EMPTY: [u8; 6] = [0xfd, 0x02, 0x00, 0x80, 0x00, 0x00];
43
44/// Sort, deduplicate, and remove empty entries.
45pub fn normalize_pv_names(mut names: Vec<String>) -> Vec<String> {
46    names.retain(|name| !name.trim().is_empty());
47    names.sort();
48    names.dedup();
49    names
50}
51
52/// Extract PV names from a decoded `__pvlist` value (NTScalarArray of strings).
53pub fn parse_pvlist_value(value: &DecodedValue) -> Option<Vec<String>> {
54    let root = extract_nt_scalar_value(value).unwrap_or(value);
55    let DecodedValue::Array(items) = root else {
56        return None;
57    };
58    let mut out = Vec::with_capacity(items.len());
59    for item in items {
60        if let DecodedValue::String(name) = item {
61            out.push(name.clone());
62        } else {
63            return None;
64        }
65    }
66    Some(out)
67}
68
69fn candidate_server_addrs(opts: &PvOptions, server_addr: SocketAddr) -> Vec<SocketAddr> {
70    let mut out = vec![server_addr];
71    let default_addr = SocketAddr::new(server_addr.ip(), opts.tcp_port);
72    if default_addr != server_addr {
73        out.push(default_addr);
74    }
75    out
76}
77
78fn is_get_field_fallback_enabled() -> bool {
79    match std::env::var("EPICS_PVA_ENABLE_GET_FIELD_FALLBACK") {
80        Ok(v) => {
81            let v = v.trim().to_ascii_uppercase();
82            v == "YES" || v == "Y" || v == "1" || v == "TRUE"
83        }
84        Err(_) => false,
85    }
86}
87
88fn collect_strings_from_decoded(value: &DecodedValue, out: &mut Vec<String>) {
89    match value {
90        DecodedValue::String(s) => out.push(s.clone()),
91        DecodedValue::Array(items) => {
92            for item in items {
93                collect_strings_from_decoded(item, out);
94            }
95        }
96        DecodedValue::Structure(fields) => {
97            for (_, item) in fields {
98                collect_strings_from_decoded(item, out);
99            }
100        }
101        _ => {}
102    }
103}
104
105fn looks_like_pv_name(candidate: &str) -> bool {
106    if candidate.is_empty() || candidate.len() > 128 {
107        return false;
108    }
109    if candidate.chars().any(|c| c.is_whitespace()) {
110        return false;
111    }
112    let lower = candidate.to_ascii_lowercase();
113    let deny = [
114        "value",
115        "alarm",
116        "timestamp",
117        "display",
118        "control",
119        "severity",
120        "message",
121        "seconds",
122        "nanoseconds",
123        "units",
124    ];
125    if deny.iter().any(|d| lower == *d) {
126        return false;
127    }
128    if lower.starts_with("epics:") {
129        return false;
130    }
131    true
132}
133
134fn extract_ascii_candidates(raw: &[u8], out: &mut Vec<String>) {
135    let mut i = 0usize;
136    while i < raw.len() {
137        if raw[i].is_ascii_alphanumeric() {
138            let start = i;
139            i += 1;
140            while i < raw.len() {
141                let b = raw[i];
142                if b.is_ascii_alphanumeric()
143                    || b == b':'
144                    || b == b'.'
145                    || b == b'_'
146                    || b == b'-'
147                    || b == b'/'
148                {
149                    i += 1;
150                } else {
151                    break;
152                }
153            }
154            let len = i - start;
155            if (3..=128).contains(&len) {
156                if let Ok(s) = std::str::from_utf8(&raw[start..start + len]) {
157                    out.push(s.to_string());
158                }
159            }
160        } else {
161            i += 1;
162        }
163    }
164}
165
166fn encode_server_rpc_channels_request(is_be: bool) -> Vec<u8> {
167    let desc = StructureDesc {
168        struct_id: Some("epics:nt/NTURI:1.0".to_string()),
169        fields: vec![
170            FieldDesc {
171                name: "scheme".to_string(),
172                field_type: FieldType::String,
173            },
174            FieldDesc {
175                name: "path".to_string(),
176                field_type: FieldType::String,
177            },
178            FieldDesc {
179                name: "query".to_string(),
180                field_type: FieldType::Structure(StructureDesc {
181                    struct_id: None,
182                    fields: vec![FieldDesc {
183                        name: "op".to_string(),
184                        field_type: FieldType::String,
185                    }],
186                }),
187            },
188        ],
189    };
190
191    let mut out = Vec::new();
192    out.push(0x80);
193    out.extend_from_slice(&encode_structure_desc(&desc, is_be));
194    out.extend_from_slice(&encode_string_pvd("pva", is_be));
195    out.extend_from_slice(&encode_string_pvd("server", is_be));
196    out.extend_from_slice(&encode_string_pvd("channels", is_be));
197    out
198}
199
200// ─── Listing strategies ──────────────────────────────────────────────────────
201
202/// List PVs via the `__pvlist` channel (preferred).
203async fn list_pvs_via_pvlist(
204    opts: &PvOptions,
205    server_addr: SocketAddr,
206) -> Result<Vec<String>, PvGetError> {
207    let mut get_opts = opts.clone();
208    get_opts.pv_name = "__pvlist".to_string();
209    get_opts.server_addr = Some(server_addr);
210    let result = pvget(&get_opts).await?;
211    let names = parse_pvlist_value(&result.value)
212        .ok_or_else(|| PvGetError::Decode("failed to decode __pvlist value".to_string()))?;
213    Ok(normalize_pv_names(names))
214}
215
216/// List PVs via connection-level GET_FIELD.
217pub async fn list_pvs_via_get_field(
218    opts: &PvOptions,
219    server_addr: SocketAddr,
220    field_pattern: Option<&str>,
221) -> Result<Vec<String>, PvGetError> {
222    let mut stream = timeout(opts.timeout, TcpStream::connect(server_addr))
223        .await
224        .map_err(|_| PvGetError::Timeout("connect"))??;
225
226    let mut version = 2u8;
227    let mut is_be = false;
228
229    for _ in 0..2 {
230        if let Ok(bytes) = read_packet(&mut stream, opts.timeout).await {
231            let mut pkt = PvaPacket::new(&bytes);
232            if let Some(cmd) = pkt.decode_payload() {
233                match cmd {
234                    PvaPacketCommand::Control(payload) => {
235                        if payload.command == 2 {
236                            is_be = pkt.header.flags.is_msb;
237                        }
238                    }
239                    PvaPacketCommand::ConnectionValidation(_) => {
240                        version = pkt.header.version;
241                        is_be = pkt.header.flags.is_msb;
242                    }
243                    _ => {}
244                }
245            }
246        }
247    }
248
249    let validation = build_client_validation(opts, version, is_be);
250    stream.write_all(&validation).await?;
251
252    let _ = read_until(&mut stream, opts.timeout, |cmd| {
253        matches!(cmd, PvaPacketCommand::ConnectionValidated(_))
254    })
255    .await?;
256
257    let get_field = encode_get_field_request(0, 0, field_pattern, version, is_be);
258    stream.write_all(&get_field).await?;
259
260    let field_resp = read_until(
261        &mut stream,
262        opts.timeout,
263        |cmd| matches!(cmd, PvaPacketCommand::GetField(payload) if payload.is_server),
264    )
265    .await?;
266    let mut pkt = PvaPacket::new(&field_resp);
267    let cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
268        "get_field listing decode failed".to_string(),
269    ))?;
270    let PvaPacketCommand::GetField(payload) = cmd else {
271        return Err(PvGetError::Protocol(
272            "unexpected GET_FIELD response".to_string(),
273        ));
274    };
275
276    if payload.status.as_ref().is_some_and(|s| s.is_error()) {
277        let detail = payload
278            .status
279            .as_ref()
280            .map(ToString::to_string)
281            .unwrap_or_default();
282        return Err(PvGetError::Protocol(format!(
283            "get_field listing error: {}",
284            detail
285        )));
286    }
287
288    let desc = payload
289        .introspection
290        .ok_or_else(|| PvGetError::Decode("missing GET_FIELD introspection".to_string()))?;
291
292    let names = desc.fields.into_iter().map(|f| f.name).collect::<Vec<_>>();
293    Ok(normalize_pv_names(names))
294}
295
296/// List PVs via server RPC on a specific channel name.
297async fn list_pvs_via_server_rpc_channel(
298    opts: &PvOptions,
299    server_addr: SocketAddr,
300    rpc_channel: &str,
301) -> Result<Vec<String>, PvGetError> {
302    let mut rpc_opts = opts.clone();
303    rpc_opts.pv_name = rpc_channel.to_string();
304    let ChannelConn {
305        mut stream,
306        sid,
307        version,
308        is_be,
309    } = establish_channel(server_addr, &rpc_opts).await?;
310
311    let ioid = 1u32;
312    let rpc_init = encode_rpc_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
313    stream.write_all(&rpc_init).await?;
314
315    let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
316        PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && (op.subcmd & 0x08) != 0,
317        _ => false,
318    })
319    .await?;
320    let mut pkt = PvaPacket::new(&init_resp);
321    let init_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
322        "rpc init decode failed".to_string(),
323    ))?;
324    if let PvaPacketCommand::Op(op) = init_cmd {
325        if op.status.as_ref().is_some_and(|s| s.is_error()) {
326            let detail = op
327                .status
328                .as_ref()
329                .map(ToString::to_string)
330                .unwrap_or_default();
331            return Err(PvGetError::Protocol(format!("rpc init failed: {}", detail)));
332        }
333    }
334
335    let rpc_payload = encode_server_rpc_channels_request(is_be);
336    let rpc_req = encode_rpc_request(sid, ioid, 0x00, &rpc_payload, version, is_be);
337    stream.write_all(&rpc_req).await?;
338
339    let rpc_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
340        PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && op.subcmd == 0x00,
341        _ => false,
342    })
343    .await?;
344    let mut pkt = PvaPacket::new(&rpc_resp);
345    let rpc_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
346        "rpc response decode failed".to_string(),
347    ))?;
348    let PvaPacketCommand::Op(op) = rpc_cmd else {
349        return Err(PvGetError::Protocol("unexpected RPC response".to_string()));
350    };
351    if op.status.as_ref().is_some_and(|s| s.is_error()) {
352        let detail = op
353            .status
354            .as_ref()
355            .map(ToString::to_string)
356            .unwrap_or_default();
357        return Err(PvGetError::Protocol(format!(
358            "rpc execute failed: {}",
359            detail
360        )));
361    }
362
363    if op.body.is_empty() {
364        return Err(PvGetError::Decode("empty RPC response".to_string()));
365    }
366
367    let decoder = PvdDecoder::new(is_be);
368    let (desc, consumed) = decoder
369        .parse_introspection_with_len(&op.body)
370        .ok_or_else(|| PvGetError::Decode("RPC missing introspection".to_string()))?;
371    let value_raw = op
372        .body
373        .get(consumed..)
374        .ok_or_else(|| PvGetError::Decode("RPC malformed payload".to_string()))?;
375    let (decoded, _) = decoder
376        .decode_structure(value_raw, &desc)
377        .ok_or_else(|| PvGetError::Decode("RPC decode failed".to_string()))?;
378
379    let mut strings = Vec::new();
380    collect_strings_from_decoded(&decoded, &mut strings);
381    if strings.is_empty() {
382        return Err(PvGetError::Decode(
383            "RPC list returned no strings".to_string(),
384        ));
385    }
386    Ok(normalize_pv_names(strings))
387}
388
389/// List PVs via server RPC, trying `"server"` then `"__server"`.
390pub async fn list_pvs_via_server_rpc(
391    opts: &PvOptions,
392    server_addr: SocketAddr,
393) -> Result<Vec<String>, PvGetError> {
394    let mut errs = Vec::new();
395    for channel in ["server", "__server"] {
396        match list_pvs_via_server_rpc_channel(opts, server_addr, channel).await {
397            Ok(names) => return Ok(names),
398            Err(err) => errs.push(format!("{}: {}", channel, err)),
399        }
400    }
401    Err(PvGetError::Protocol(format!(
402        "server RPC unavailable: {}",
403        errs.join(" | ")
404    )))
405}
406
407/// List PVs via server GET, trying `"server"` then `"__server"`.
408pub async fn list_pvs_via_server_get(
409    opts: &PvOptions,
410    server_addr: SocketAddr,
411) -> Result<Vec<String>, PvGetError> {
412    async fn get_channel(
413        opts: &PvOptions,
414        server_addr: SocketAddr,
415        channel: &str,
416    ) -> Result<Vec<String>, PvGetError> {
417        let mut get_opts = opts.clone();
418        get_opts.pv_name = channel.to_string();
419        let ChannelConn {
420            mut stream,
421            sid,
422            version,
423            is_be,
424        } = establish_channel(server_addr, &get_opts).await?;
425
426        let ioid = 1u32;
427        let init_req =
428            encode_get_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
429        stream.write_all(&init_req).await?;
430        let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
431            PvaPacketCommand::Op(op) => {
432                op.command == 10 && op.ioid == ioid && (op.subcmd & 0x08) != 0
433            }
434            _ => false,
435        })
436        .await?;
437        let mut pkt = PvaPacket::new(&init_resp);
438        let init_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
439            "server get init decode failed".to_string(),
440        ))?;
441        let init_desc = match init_cmd {
442            PvaPacketCommand::Op(op) => {
443                if op.status.as_ref().is_some_and(|s| s.is_error()) {
444                    let detail = op
445                        .status
446                        .as_ref()
447                        .map(ToString::to_string)
448                        .unwrap_or_default();
449                    return Err(PvGetError::Protocol(format!(
450                        "server GET init failed: {}",
451                        detail
452                    )));
453                }
454                op.introspection
455            }
456            _ => None,
457        };
458
459        let data_req = encode_get_request(sid, ioid, 0x00, &[], version, is_be);
460        stream.write_all(&data_req).await?;
461        let data_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
462            PvaPacketCommand::Op(op) => op.command == 10 && op.ioid == ioid && op.subcmd == 0x00,
463            _ => false,
464        })
465        .await?;
466        let mut pkt = PvaPacket::new(&data_resp);
467        let data_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
468            "server get data decode failed".to_string(),
469        ))?;
470
471        let PvaPacketCommand::Op(mut op) = data_cmd else {
472            return Err(PvGetError::Protocol(
473                "unexpected GET data response".to_string(),
474            ));
475        };
476        if op.status.as_ref().is_some_and(|s| s.is_error()) {
477            let detail = op
478                .status
479                .as_ref()
480                .map(ToString::to_string)
481                .unwrap_or_default();
482            return Err(PvGetError::Protocol(format!(
483                "server GET data failed: {}",
484                detail
485            )));
486        }
487
488        let mut names = Vec::new();
489        names.extend(op.pv_names.clone());
490        extract_ascii_candidates(&op.body, &mut names);
491        if let Some(desc) = &init_desc {
492            for field in &desc.fields {
493                names.push(field.name.clone());
494            }
495            op.decode_with_field_desc(desc, is_be);
496            if let Some(decoded) = &op.decoded_value {
497                collect_strings_from_decoded(decoded, &mut names);
498            }
499        }
500
501        let mut names = normalize_pv_names(names);
502        names.retain(|n| looks_like_pv_name(n));
503        if names.is_empty() {
504            return Err(PvGetError::Decode(
505                "server GET returned no PV-like names".to_string(),
506            ));
507        }
508        Ok(names)
509    }
510
511    let mut errs = Vec::new();
512    for channel in ["server", "__server"] {
513        match get_channel(opts, server_addr, channel).await {
514            Ok(names) => return Ok(names),
515            Err(err) => errs.push(format!("{}: {}", channel, err)),
516        }
517    }
518    Err(PvGetError::Protocol(format!(
519        "server GET unavailable: {}",
520        errs.join(" | ")
521    )))
522}
523
524// ─── Public API ──────────────────────────────────────────────────────────────
525
526/// List PV names from a server using `__pvlist` GET (preferred method).
527pub async fn pvlist(
528    opts: &PvOptions,
529    server_addr: SocketAddr,
530) -> Result<Vec<String>, PvGetError> {
531    list_pvs_via_pvlist(opts, server_addr).await
532}
533
534/// List PV names with automatic fallback through all strategies.
535///
536/// Tries (in order): `__pvlist` → GET_FIELD (opt-in) → Server RPC → Server GET.
537pub async fn pvlist_with_fallback(
538    opts: &PvOptions,
539    server_addr: SocketAddr,
540) -> Result<(Vec<String>, PvListSource), PvGetError> {
541    pvlist_with_fallback_progress(opts, server_addr, |_| {}).await
542}
543
544/// List PV names with fallback and progress callback.
545pub async fn pvlist_with_fallback_progress<F>(
546    opts: &PvOptions,
547    server_addr: SocketAddr,
548    mut on_progress: F,
549) -> Result<(Vec<String>, PvListSource), PvGetError>
550where
551    F: FnMut(&str),
552{
553    let addrs = candidate_server_addrs(opts, server_addr);
554    let mut attempts = Vec::new();
555    let get_field_fallback = is_get_field_fallback_enabled();
556
557    if addrs.len() > 1 {
558        on_progress(&format!(
559            "Trying {} candidate server endpoints...",
560            addrs.len()
561        ));
562    }
563    if !get_field_fallback {
564        on_progress(
565            "GET_FIELD fallback is disabled by default (set EPICS_PVA_ENABLE_GET_FIELD_FALLBACK=YES to enable)",
566        );
567    }
568
569    for addr in addrs {
570        on_progress(&format!("Trying __pvlist on {}", addr));
571        let primary = list_pvs_via_pvlist(opts, addr).await;
572        match primary {
573            Ok(names) => return Ok((normalize_pv_names(names), PvListSource::PvList)),
574            Err(primary_err) => {
575                let get_field_result = if get_field_fallback {
576                    on_progress(&format!(
577                        "__pvlist unavailable on {}; trying GET_FIELD(*)",
578                        addr
579                    ));
580                    let fallback_star = list_pvs_via_get_field(opts, addr, Some("*")).await;
581                    match fallback_star {
582                        Ok(names) => {
583                            return Ok((normalize_pv_names(names), PvListSource::GetField));
584                        }
585                        Err(star_err) => {
586                            on_progress(&format!(
587                                "GET_FIELD(*) unavailable on {}; trying GET_FIELD(<empty>)",
588                                addr
589                            ));
590                            let fallback_empty = list_pvs_via_get_field(opts, addr, None).await;
591                            match fallback_empty {
592                                Ok(names) => {
593                                    return Ok((normalize_pv_names(names), PvListSource::GetField));
594                                }
595                                Err(empty_err) => Some(format!(
596                                    "GET_FIELD(*): {}; GET_FIELD(<empty>): {}",
597                                    star_err, empty_err
598                                )),
599                            }
600                        }
601                    }
602                } else {
603                    None
604                };
605
606                on_progress(&format!(
607                    "__pvlist unavailable on {}; trying RPC(server)",
608                    addr
609                ));
610                match list_pvs_via_server_rpc(opts, addr).await {
611                    Ok(names) => return Ok((normalize_pv_names(names), PvListSource::ServerRpc)),
612                    Err(rpc_err) => {
613                        on_progress(&format!(
614                            "RPC(server) unavailable on {}; trying GET(server)",
615                            addr
616                        ));
617                        match list_pvs_via_server_get(opts, addr).await {
618                            Ok(names) => {
619                                return Ok((normalize_pv_names(names), PvListSource::ServerGet))
620                            }
621                            Err(get_err) => {
622                                let get_field_msg = get_field_result
623                                    .unwrap_or_else(|| "GET_FIELD: disabled".to_string());
624                                attempts.push(format!(
625                                    "{} => __pvlist: {}; {}; RPC(server): {}; GET(server): {}",
626                                    addr, primary_err, get_field_msg, rpc_err, get_err
627                                ));
628                            }
629                        }
630                    }
631                }
632            }
633        }
634    }
635
636    Err(PvGetError::Protocol(format!(
637        "failed to list PVs from {}: {}",
638        server_addr,
639        attempts.join(" | ")
640    )))
641}
642
643// ─── Tests ───────────────────────────────────────────────────────────────────
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648
649    #[test]
650    fn parse_pvlist_value_extracts_ntscalararray_strings() {
651        let value = DecodedValue::Structure(vec![
652            (
653                "value".to_string(),
654                DecodedValue::Array(vec![
655                    DecodedValue::String("SIM:AI".to_string()),
656                    DecodedValue::String("SIM:AO".to_string()),
657                ]),
658            ),
659            ("alarm".to_string(), DecodedValue::Structure(vec![])),
660        ]);
661
662        let parsed = parse_pvlist_value(&value).expect("parsed");
663        assert_eq!(parsed, vec!["SIM:AI".to_string(), "SIM:AO".to_string()]);
664    }
665
666    #[test]
667    fn normalize_pv_names_sorts_and_deduplicates() {
668        let names = vec!["B".into(), "A".into(), "B".into(), " ".into()];
669        let result = normalize_pv_names(names);
670        assert_eq!(result, vec!["A".to_string(), "B".to_string()]);
671    }
672
673    #[test]
674    fn candidate_server_addrs_adds_default_tcp_port_fallback() {
675        let mut opts = PvOptions::new(String::new());
676        opts.tcp_port = 5075;
677        let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
678        let addrs = candidate_server_addrs(&opts, addr);
679        assert_eq!(addrs.len(), 2);
680        assert_eq!(addrs[0], addr);
681        assert_eq!(addrs[1], "10.0.0.2:5075".parse::<SocketAddr>().unwrap());
682    }
683
684    #[test]
685    fn candidate_server_addrs_no_dup_when_same_port() {
686        let mut opts = PvOptions::new(String::new());
687        opts.tcp_port = 6000;
688        let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
689        let addrs = candidate_server_addrs(&opts, addr);
690        assert_eq!(addrs.len(), 1);
691    }
692
693    #[test]
694    fn collect_strings_from_decoded_extracts_nested_strings() {
695        let value = DecodedValue::Structure(vec![
696            ("a".to_string(), DecodedValue::String("ONE".to_string())),
697            (
698                "b".to_string(),
699                DecodedValue::Array(vec![DecodedValue::String("TWO".to_string())]),
700            ),
701        ]);
702        let mut out = Vec::new();
703        collect_strings_from_decoded(&value, &mut out);
704        assert_eq!(out, vec!["ONE".to_string(), "TWO".to_string()]);
705    }
706
707    #[test]
708    fn extract_ascii_candidates_finds_pv_like_tokens() {
709        let raw = b"\x00SIM:AI\x00junk\x00IOC-01:PV1\x00";
710        let mut out = Vec::new();
711        extract_ascii_candidates(raw, &mut out);
712        assert!(out.iter().any(|s| s == "SIM:AI"));
713        assert!(out.iter().any(|s| s == "IOC-01:PV1"));
714    }
715
716    #[test]
717    fn looks_like_pv_name_filters_metadata() {
718        assert!(looks_like_pv_name("SIM:AI"));
719        assert!(looks_like_pv_name("IOC-01:PV1"));
720        assert!(!looks_like_pv_name("value"));
721        assert!(!looks_like_pv_name("alarm"));
722        assert!(!looks_like_pv_name("epics:nt/NTScalar:1.0"));
723        assert!(!looks_like_pv_name(""));
724        assert!(!looks_like_pv_name("has space"));
725    }
726
727    #[test]
728    fn encode_server_rpc_channels_request_uses_nturi_channels() {
729        let payload = encode_server_rpc_channels_request(false);
730        assert_eq!(payload.first(), Some(&0x80));
731
732        let decoder = PvdDecoder::new(false);
733        let (desc, consumed) = decoder
734            .parse_introspection_with_len(&payload)
735            .expect("introspection");
736        assert_eq!(desc.struct_id.as_deref(), Some("epics:nt/NTURI:1.0"));
737
738        let (decoded, _) = decoder
739            .decode_structure(&payload[consumed..], &desc)
740            .expect("decode payload");
741        let DecodedValue::Structure(fields) = decoded else {
742            panic!("expected structure");
743        };
744
745        let mut scheme = None;
746        let mut path = None;
747        let mut op = None;
748        for (name, value) in fields {
749            match (name.as_str(), value) {
750                ("scheme", DecodedValue::String(v)) => scheme = Some(v),
751                ("path", DecodedValue::String(v)) => path = Some(v),
752                ("query", DecodedValue::Structure(query_fields)) => {
753                    for (qname, qvalue) in query_fields {
754                        if qname == "op" {
755                            if let DecodedValue::String(v) = qvalue {
756                                op = Some(v);
757                            }
758                        }
759                    }
760                }
761                _ => {}
762            }
763        }
764        assert_eq!(scheme.as_deref(), Some("pva"));
765        assert_eq!(path.as_deref(), Some("server"));
766        assert_eq!(op.as_deref(), Some("channels"));
767    }
768}