Skip to main content

spvirit_client/
pvlist.rs

1//! PV listing — discover available PV names from a PVA server.
2//!
3//! Provides multiple discovery strategies (tried in order by
4//! [`pvlist_with_fallback`]):
5//!
6//! 1. `__pvlist` GET (preferred — spvirit / EPICS7 servers)
7//! 2. Connection-level GET_FIELD (legacy, opt-in via env var)
8//! 3. Server RPC `op=channels`
9//! 4. Server GET with heuristic parsing
10
11use std::net::SocketAddr;
12
13use tokio::io::AsyncWriteExt;
14use tokio::net::TcpStream;
15use tokio::time::timeout;
16
17use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvd_decode::{
19    DecodedValue, FieldDesc, FieldType, PvdDecoder, StructureDesc, extract_nt_scalar_value,
20};
21use spvirit_codec::spvd_encode::{encode_string_pvd, encode_structure_desc};
22use spvirit_codec::spvirit_encode::encode_rpc_request;
23
24use crate::client::{
25    ChannelConn, build_client_validation, encode_get_field_request, encode_get_request,
26    establish_channel, pvget,
27};
28use crate::transport::{read_packet, read_until};
29use crate::types::{PvGetError, PvOptions};
30
31/// Which discovery strategy succeeded.
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum PvListSource {
34    PvList,
35    GetField,
36    ServerRpc,
37    ServerGet,
38}
39
40// ─── Helpers ─────────────────────────────────────────────────────────────────
41
42const PV_REQUEST_EMPTY: [u8; 6] = [0xfd, 0x02, 0x00, 0x80, 0x00, 0x00];
43
44/// Sort, deduplicate, and remove empty entries.
45pub fn normalize_pv_names(mut names: Vec<String>) -> Vec<String> {
46    names.retain(|name| !name.trim().is_empty());
47    names.sort();
48    names.dedup();
49    names
50}
51
52/// Extract PV names from a decoded `__pvlist` value (NTScalarArray of strings).
53pub fn parse_pvlist_value(value: &DecodedValue) -> Option<Vec<String>> {
54    let root = extract_nt_scalar_value(value).unwrap_or(value);
55    let DecodedValue::Array(items) = root else {
56        return None;
57    };
58    let mut out = Vec::with_capacity(items.len());
59    for item in items {
60        if let DecodedValue::String(name) = item {
61            out.push(name.clone());
62        } else {
63            return None;
64        }
65    }
66    Some(out)
67}
68
69fn candidate_server_addrs(opts: &PvOptions, server_addr: SocketAddr) -> Vec<SocketAddr> {
70    let mut out = vec![server_addr];
71    let default_addr = SocketAddr::new(server_addr.ip(), opts.tcp_port);
72    if default_addr != server_addr {
73        out.push(default_addr);
74    }
75    out
76}
77
78fn is_get_field_fallback_enabled() -> bool {
79    match std::env::var("EPICS_PVA_ENABLE_GET_FIELD_FALLBACK") {
80        Ok(v) => {
81            let v = v.trim().to_ascii_uppercase();
82            v == "YES" || v == "Y" || v == "1" || v == "TRUE"
83        }
84        Err(_) => false,
85    }
86}
87
88fn collect_strings_from_decoded(value: &DecodedValue, out: &mut Vec<String>) {
89    match value {
90        DecodedValue::String(s) => out.push(s.clone()),
91        DecodedValue::Array(items) => {
92            for item in items {
93                collect_strings_from_decoded(item, out);
94            }
95        }
96        DecodedValue::Structure(fields) => {
97            for (_, item) in fields {
98                collect_strings_from_decoded(item, out);
99            }
100        }
101        _ => {}
102    }
103}
104
105fn looks_like_pv_name(candidate: &str) -> bool {
106    if candidate.is_empty() || candidate.len() > 128 {
107        return false;
108    }
109    if candidate.chars().any(|c| c.is_whitespace()) {
110        return false;
111    }
112    let lower = candidate.to_ascii_lowercase();
113    let deny = [
114        "value",
115        "alarm",
116        "timestamp",
117        "display",
118        "control",
119        "severity",
120        "message",
121        "seconds",
122        "nanoseconds",
123        "units",
124    ];
125    if deny.iter().any(|d| lower == *d) {
126        return false;
127    }
128    if lower.starts_with("epics:") {
129        return false;
130    }
131    true
132}
133
134fn extract_ascii_candidates(raw: &[u8], out: &mut Vec<String>) {
135    let mut i = 0usize;
136    while i < raw.len() {
137        if raw[i].is_ascii_alphanumeric() {
138            let start = i;
139            i += 1;
140            while i < raw.len() {
141                let b = raw[i];
142                if b.is_ascii_alphanumeric()
143                    || b == b':'
144                    || b == b'.'
145                    || b == b'_'
146                    || b == b'-'
147                    || b == b'/'
148                {
149                    i += 1;
150                } else {
151                    break;
152                }
153            }
154            let len = i - start;
155            if (3..=128).contains(&len) {
156                if let Ok(s) = std::str::from_utf8(&raw[start..start + len]) {
157                    out.push(s.to_string());
158                }
159            }
160        } else {
161            i += 1;
162        }
163    }
164}
165
166fn encode_server_rpc_channels_request(is_be: bool) -> Vec<u8> {
167    let desc = StructureDesc {
168        struct_id: Some("epics:nt/NTURI:1.0".to_string()),
169        fields: vec![
170            FieldDesc {
171                name: "scheme".to_string(),
172                field_type: FieldType::String,
173            },
174            FieldDesc {
175                name: "path".to_string(),
176                field_type: FieldType::String,
177            },
178            FieldDesc {
179                name: "query".to_string(),
180                field_type: FieldType::Structure(StructureDesc {
181                    struct_id: None,
182                    fields: vec![FieldDesc {
183                        name: "op".to_string(),
184                        field_type: FieldType::String,
185                    }],
186                }),
187            },
188        ],
189    };
190
191    let mut out = Vec::new();
192    out.push(0x80);
193    out.extend_from_slice(&encode_structure_desc(&desc, is_be));
194    out.extend_from_slice(&encode_string_pvd("pva", is_be));
195    out.extend_from_slice(&encode_string_pvd("server", is_be));
196    out.extend_from_slice(&encode_string_pvd("channels", is_be));
197    out
198}
199
200// ─── Listing strategies ──────────────────────────────────────────────────────
201
202/// List PVs via the `__pvlist` channel (preferred).
203async fn list_pvs_via_pvlist(
204    opts: &PvOptions,
205    server_addr: SocketAddr,
206) -> Result<Vec<String>, PvGetError> {
207    let mut get_opts = opts.clone();
208    get_opts.pv_name = "__pvlist".to_string();
209    get_opts.server_addr = Some(server_addr);
210    let result = pvget(&get_opts).await?;
211    let names = parse_pvlist_value(&result.value)
212        .ok_or_else(|| PvGetError::Decode("failed to decode __pvlist value".to_string()))?;
213    Ok(normalize_pv_names(names))
214}
215
216/// List PVs via connection-level GET_FIELD.
217pub async fn list_pvs_via_get_field(
218    opts: &PvOptions,
219    server_addr: SocketAddr,
220    field_pattern: Option<&str>,
221) -> Result<Vec<String>, PvGetError> {
222    let mut stream = timeout(opts.timeout, TcpStream::connect(server_addr))
223        .await
224        .map_err(|_| PvGetError::Timeout("connect"))??;
225
226    let mut version = 2u8;
227    let mut is_be = false;
228
229    for _ in 0..2 {
230        if let Ok(bytes) = read_packet(&mut stream, opts.timeout).await {
231            let mut pkt = PvaPacket::new(&bytes);
232            if let Some(cmd) = pkt.decode_payload() {
233                match cmd {
234                    PvaPacketCommand::Control(payload) => {
235                        if payload.command == 2 {
236                            is_be = pkt.header.flags.is_msb;
237                        }
238                    }
239                    PvaPacketCommand::ConnectionValidation(_) => {
240                        version = pkt.header.version;
241                        is_be = pkt.header.flags.is_msb;
242                    }
243                    _ => {}
244                }
245            }
246        }
247    }
248
249    let validation = build_client_validation(opts, version, is_be);
250    stream.write_all(&validation).await?;
251
252    let _ = read_until(&mut stream, opts.timeout, |cmd| {
253        matches!(cmd, PvaPacketCommand::ConnectionValidated(_))
254    })
255    .await?;
256
257    let get_field = encode_get_field_request(0, 0, field_pattern, version, is_be);
258    stream.write_all(&get_field).await?;
259
260    let field_resp = read_until(
261        &mut stream,
262        opts.timeout,
263        |cmd| matches!(cmd, PvaPacketCommand::GetField(payload) if payload.is_server),
264    )
265    .await?;
266    let mut pkt = PvaPacket::new(&field_resp);
267    let cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
268        "get_field listing decode failed".to_string(),
269    ))?;
270    let PvaPacketCommand::GetField(payload) = cmd else {
271        return Err(PvGetError::Protocol(
272            "unexpected GET_FIELD response".to_string(),
273        ));
274    };
275
276    if payload.status.as_ref().is_some_and(|s| s.is_error()) {
277        let detail = payload
278            .status
279            .as_ref()
280            .map(ToString::to_string)
281            .unwrap_or_default();
282        return Err(PvGetError::Protocol(format!(
283            "get_field listing error: {}",
284            detail
285        )));
286    }
287
288    let desc = payload
289        .introspection
290        .ok_or_else(|| PvGetError::Decode("missing GET_FIELD introspection".to_string()))?;
291
292    let names = desc.fields.into_iter().map(|f| f.name).collect::<Vec<_>>();
293    Ok(normalize_pv_names(names))
294}
295
296/// List PVs via server RPC on a specific channel name.
297async fn list_pvs_via_server_rpc_channel(
298    opts: &PvOptions,
299    server_addr: SocketAddr,
300    rpc_channel: &str,
301) -> Result<Vec<String>, PvGetError> {
302    let mut rpc_opts = opts.clone();
303    rpc_opts.pv_name = rpc_channel.to_string();
304    let ChannelConn {
305        mut stream,
306        sid,
307        version,
308        is_be,
309    } = establish_channel(server_addr, &rpc_opts).await?;
310
311    let ioid = 1u32;
312    let rpc_init = encode_rpc_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
313    stream.write_all(&rpc_init).await?;
314
315    let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
316        PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && (op.subcmd & 0x08) != 0,
317        _ => false,
318    })
319    .await?;
320    let mut pkt = PvaPacket::new(&init_resp);
321    let init_cmd = pkt
322        .decode_payload()
323        .ok_or(PvGetError::Protocol("rpc init decode failed".to_string()))?;
324    if let PvaPacketCommand::Op(op) = init_cmd {
325        if op.status.as_ref().is_some_and(|s| s.is_error()) {
326            let detail = op
327                .status
328                .as_ref()
329                .map(ToString::to_string)
330                .unwrap_or_default();
331            return Err(PvGetError::Protocol(format!("rpc init failed: {}", detail)));
332        }
333    }
334
335    let rpc_payload = encode_server_rpc_channels_request(is_be);
336    let rpc_req = encode_rpc_request(sid, ioid, 0x00, &rpc_payload, version, is_be);
337    stream.write_all(&rpc_req).await?;
338
339    let rpc_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
340        PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && op.subcmd == 0x00,
341        _ => false,
342    })
343    .await?;
344    let mut pkt = PvaPacket::new(&rpc_resp);
345    let rpc_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
346        "rpc response decode failed".to_string(),
347    ))?;
348    let PvaPacketCommand::Op(op) = rpc_cmd else {
349        return Err(PvGetError::Protocol("unexpected RPC response".to_string()));
350    };
351    if op.status.as_ref().is_some_and(|s| s.is_error()) {
352        let detail = op
353            .status
354            .as_ref()
355            .map(ToString::to_string)
356            .unwrap_or_default();
357        return Err(PvGetError::Protocol(format!(
358            "rpc execute failed: {}",
359            detail
360        )));
361    }
362
363    if op.body.is_empty() {
364        return Err(PvGetError::Decode("empty RPC response".to_string()));
365    }
366
367    let decoder = PvdDecoder::new(is_be);
368    let (desc, consumed) = decoder
369        .parse_introspection_with_len(&op.body)
370        .ok_or_else(|| PvGetError::Decode("RPC missing introspection".to_string()))?;
371    let value_raw = op
372        .body
373        .get(consumed..)
374        .ok_or_else(|| PvGetError::Decode("RPC malformed payload".to_string()))?;
375    let (decoded, _) = decoder
376        .decode_structure(value_raw, &desc)
377        .ok_or_else(|| PvGetError::Decode("RPC decode failed".to_string()))?;
378
379    let mut strings = Vec::new();
380    collect_strings_from_decoded(&decoded, &mut strings);
381    if strings.is_empty() {
382        return Err(PvGetError::Decode(
383            "RPC list returned no strings".to_string(),
384        ));
385    }
386    Ok(normalize_pv_names(strings))
387}
388
389/// List PVs via server RPC, trying `"server"` then `"__server"`.
390pub async fn list_pvs_via_server_rpc(
391    opts: &PvOptions,
392    server_addr: SocketAddr,
393) -> Result<Vec<String>, PvGetError> {
394    let mut errs = Vec::new();
395    for channel in ["server", "__server"] {
396        match list_pvs_via_server_rpc_channel(opts, server_addr, channel).await {
397            Ok(names) => return Ok(names),
398            Err(err) => errs.push(format!("{}: {}", channel, err)),
399        }
400    }
401    Err(PvGetError::Protocol(format!(
402        "server RPC unavailable: {}",
403        errs.join(" | ")
404    )))
405}
406
407/// List PVs via server GET, trying `"server"` then `"__server"`.
408pub async fn list_pvs_via_server_get(
409    opts: &PvOptions,
410    server_addr: SocketAddr,
411) -> Result<Vec<String>, PvGetError> {
412    async fn get_channel(
413        opts: &PvOptions,
414        server_addr: SocketAddr,
415        channel: &str,
416    ) -> Result<Vec<String>, PvGetError> {
417        let mut get_opts = opts.clone();
418        get_opts.pv_name = channel.to_string();
419        let ChannelConn {
420            mut stream,
421            sid,
422            version,
423            is_be,
424        } = establish_channel(server_addr, &get_opts).await?;
425
426        let ioid = 1u32;
427        let init_req = encode_get_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
428        stream.write_all(&init_req).await?;
429        let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
430            PvaPacketCommand::Op(op) => {
431                op.command == 10 && op.ioid == ioid && (op.subcmd & 0x08) != 0
432            }
433            _ => false,
434        })
435        .await?;
436        let mut pkt = PvaPacket::new(&init_resp);
437        let init_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
438            "server get init decode failed".to_string(),
439        ))?;
440        let init_desc = match init_cmd {
441            PvaPacketCommand::Op(op) => {
442                if op.status.as_ref().is_some_and(|s| s.is_error()) {
443                    let detail = op
444                        .status
445                        .as_ref()
446                        .map(ToString::to_string)
447                        .unwrap_or_default();
448                    return Err(PvGetError::Protocol(format!(
449                        "server GET init failed: {}",
450                        detail
451                    )));
452                }
453                op.introspection
454            }
455            _ => None,
456        };
457
458        let data_req = encode_get_request(sid, ioid, 0x00, &[], version, is_be);
459        stream.write_all(&data_req).await?;
460        let data_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
461            PvaPacketCommand::Op(op) => op.command == 10 && op.ioid == ioid && op.subcmd == 0x00,
462            _ => false,
463        })
464        .await?;
465        let mut pkt = PvaPacket::new(&data_resp);
466        let data_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
467            "server get data decode failed".to_string(),
468        ))?;
469
470        let PvaPacketCommand::Op(mut op) = data_cmd else {
471            return Err(PvGetError::Protocol(
472                "unexpected GET data response".to_string(),
473            ));
474        };
475        if op.status.as_ref().is_some_and(|s| s.is_error()) {
476            let detail = op
477                .status
478                .as_ref()
479                .map(ToString::to_string)
480                .unwrap_or_default();
481            return Err(PvGetError::Protocol(format!(
482                "server GET data failed: {}",
483                detail
484            )));
485        }
486
487        let mut names = Vec::new();
488        names.extend(op.pv_names.clone());
489        extract_ascii_candidates(&op.body, &mut names);
490        if let Some(desc) = &init_desc {
491            for field in &desc.fields {
492                names.push(field.name.clone());
493            }
494            op.decode_with_field_desc(desc, is_be);
495            if let Some(decoded) = &op.decoded_value {
496                collect_strings_from_decoded(decoded, &mut names);
497            }
498        }
499
500        let mut names = normalize_pv_names(names);
501        names.retain(|n| looks_like_pv_name(n));
502        if names.is_empty() {
503            return Err(PvGetError::Decode(
504                "server GET returned no PV-like names".to_string(),
505            ));
506        }
507        Ok(names)
508    }
509
510    let mut errs = Vec::new();
511    for channel in ["server", "__server"] {
512        match get_channel(opts, server_addr, channel).await {
513            Ok(names) => return Ok(names),
514            Err(err) => errs.push(format!("{}: {}", channel, err)),
515        }
516    }
517    Err(PvGetError::Protocol(format!(
518        "server GET unavailable: {}",
519        errs.join(" | ")
520    )))
521}
522
523// ─── Public API ──────────────────────────────────────────────────────────────
524
525/// List PV names from a server using `__pvlist` GET (preferred method).
526pub async fn pvlist(opts: &PvOptions, server_addr: SocketAddr) -> Result<Vec<String>, PvGetError> {
527    list_pvs_via_pvlist(opts, server_addr).await
528}
529
530/// List PV names with automatic fallback through all strategies.
531///
532/// Tries (in order): `__pvlist` → GET_FIELD (opt-in) → Server RPC → Server GET.
533pub async fn pvlist_with_fallback(
534    opts: &PvOptions,
535    server_addr: SocketAddr,
536) -> Result<(Vec<String>, PvListSource), PvGetError> {
537    pvlist_with_fallback_progress(opts, server_addr, |_| {}).await
538}
539
540/// List PV names with fallback and progress callback.
541pub async fn pvlist_with_fallback_progress<F>(
542    opts: &PvOptions,
543    server_addr: SocketAddr,
544    mut on_progress: F,
545) -> Result<(Vec<String>, PvListSource), PvGetError>
546where
547    F: FnMut(&str),
548{
549    let addrs = candidate_server_addrs(opts, server_addr);
550    let mut attempts = Vec::new();
551    let get_field_fallback = is_get_field_fallback_enabled();
552
553    if addrs.len() > 1 {
554        on_progress(&format!(
555            "Trying {} candidate server endpoints...",
556            addrs.len()
557        ));
558    }
559    if !get_field_fallback {
560        on_progress(
561            "GET_FIELD fallback is disabled by default (set EPICS_PVA_ENABLE_GET_FIELD_FALLBACK=YES to enable)",
562        );
563    }
564
565    for addr in addrs {
566        on_progress(&format!("Trying __pvlist on {}", addr));
567        let primary = list_pvs_via_pvlist(opts, addr).await;
568        match primary {
569            Ok(names) => return Ok((normalize_pv_names(names), PvListSource::PvList)),
570            Err(primary_err) => {
571                let get_field_result = if get_field_fallback {
572                    on_progress(&format!(
573                        "__pvlist unavailable on {}; trying GET_FIELD(*)",
574                        addr
575                    ));
576                    let fallback_star = list_pvs_via_get_field(opts, addr, Some("*")).await;
577                    match fallback_star {
578                        Ok(names) => {
579                            return Ok((normalize_pv_names(names), PvListSource::GetField));
580                        }
581                        Err(star_err) => {
582                            on_progress(&format!(
583                                "GET_FIELD(*) unavailable on {}; trying GET_FIELD(<empty>)",
584                                addr
585                            ));
586                            let fallback_empty = list_pvs_via_get_field(opts, addr, None).await;
587                            match fallback_empty {
588                                Ok(names) => {
589                                    return Ok((normalize_pv_names(names), PvListSource::GetField));
590                                }
591                                Err(empty_err) => Some(format!(
592                                    "GET_FIELD(*): {}; GET_FIELD(<empty>): {}",
593                                    star_err, empty_err
594                                )),
595                            }
596                        }
597                    }
598                } else {
599                    None
600                };
601
602                on_progress(&format!(
603                    "__pvlist unavailable on {}; trying RPC(server)",
604                    addr
605                ));
606                match list_pvs_via_server_rpc(opts, addr).await {
607                    Ok(names) => return Ok((normalize_pv_names(names), PvListSource::ServerRpc)),
608                    Err(rpc_err) => {
609                        on_progress(&format!(
610                            "RPC(server) unavailable on {}; trying GET(server)",
611                            addr
612                        ));
613                        match list_pvs_via_server_get(opts, addr).await {
614                            Ok(names) => {
615                                return Ok((normalize_pv_names(names), PvListSource::ServerGet));
616                            }
617                            Err(get_err) => {
618                                let get_field_msg = get_field_result
619                                    .unwrap_or_else(|| "GET_FIELD: disabled".to_string());
620                                attempts.push(format!(
621                                    "{} => __pvlist: {}; {}; RPC(server): {}; GET(server): {}",
622                                    addr, primary_err, get_field_msg, rpc_err, get_err
623                                ));
624                            }
625                        }
626                    }
627                }
628            }
629        }
630    }
631
632    Err(PvGetError::Protocol(format!(
633        "failed to list PVs from {}: {}",
634        server_addr,
635        attempts.join(" | ")
636    )))
637}
638
639// ─── Tests ───────────────────────────────────────────────────────────────────
640
641#[cfg(test)]
642mod tests {
643    use super::*;
644
645    #[test]
646    fn parse_pvlist_value_extracts_ntscalararray_strings() {
647        let value = DecodedValue::Structure(vec![
648            (
649                "value".to_string(),
650                DecodedValue::Array(vec![
651                    DecodedValue::String("SIM:AI".to_string()),
652                    DecodedValue::String("SIM:AO".to_string()),
653                ]),
654            ),
655            ("alarm".to_string(), DecodedValue::Structure(vec![])),
656        ]);
657
658        let parsed = parse_pvlist_value(&value).expect("parsed");
659        assert_eq!(parsed, vec!["SIM:AI".to_string(), "SIM:AO".to_string()]);
660    }
661
662    #[test]
663    fn normalize_pv_names_sorts_and_deduplicates() {
664        let names = vec!["B".into(), "A".into(), "B".into(), " ".into()];
665        let result = normalize_pv_names(names);
666        assert_eq!(result, vec!["A".to_string(), "B".to_string()]);
667    }
668
669    #[test]
670    fn candidate_server_addrs_adds_default_tcp_port_fallback() {
671        let mut opts = PvOptions::new(String::new());
672        opts.tcp_port = 5075;
673        let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
674        let addrs = candidate_server_addrs(&opts, addr);
675        assert_eq!(addrs.len(), 2);
676        assert_eq!(addrs[0], addr);
677        assert_eq!(addrs[1], "10.0.0.2:5075".parse::<SocketAddr>().unwrap());
678    }
679
680    #[test]
681    fn candidate_server_addrs_no_dup_when_same_port() {
682        let mut opts = PvOptions::new(String::new());
683        opts.tcp_port = 6000;
684        let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
685        let addrs = candidate_server_addrs(&opts, addr);
686        assert_eq!(addrs.len(), 1);
687    }
688
689    #[test]
690    fn collect_strings_from_decoded_extracts_nested_strings() {
691        let value = DecodedValue::Structure(vec![
692            ("a".to_string(), DecodedValue::String("ONE".to_string())),
693            (
694                "b".to_string(),
695                DecodedValue::Array(vec![DecodedValue::String("TWO".to_string())]),
696            ),
697        ]);
698        let mut out = Vec::new();
699        collect_strings_from_decoded(&value, &mut out);
700        assert_eq!(out, vec!["ONE".to_string(), "TWO".to_string()]);
701    }
702
703    #[test]
704    fn extract_ascii_candidates_finds_pv_like_tokens() {
705        let raw = b"\x00SIM:AI\x00junk\x00IOC-01:PV1\x00";
706        let mut out = Vec::new();
707        extract_ascii_candidates(raw, &mut out);
708        assert!(out.iter().any(|s| s == "SIM:AI"));
709        assert!(out.iter().any(|s| s == "IOC-01:PV1"));
710    }
711
712    #[test]
713    fn looks_like_pv_name_filters_metadata() {
714        assert!(looks_like_pv_name("SIM:AI"));
715        assert!(looks_like_pv_name("IOC-01:PV1"));
716        assert!(!looks_like_pv_name("value"));
717        assert!(!looks_like_pv_name("alarm"));
718        assert!(!looks_like_pv_name("epics:nt/NTScalar:1.0"));
719        assert!(!looks_like_pv_name(""));
720        assert!(!looks_like_pv_name("has space"));
721    }
722
723    #[test]
724    fn encode_server_rpc_channels_request_uses_nturi_channels() {
725        let payload = encode_server_rpc_channels_request(false);
726        assert_eq!(payload.first(), Some(&0x80));
727
728        let decoder = PvdDecoder::new(false);
729        let (desc, consumed) = decoder
730            .parse_introspection_with_len(&payload)
731            .expect("introspection");
732        assert_eq!(desc.struct_id.as_deref(), Some("epics:nt/NTURI:1.0"));
733
734        let (decoded, _) = decoder
735            .decode_structure(&payload[consumed..], &desc)
736            .expect("decode payload");
737        let DecodedValue::Structure(fields) = decoded else {
738            panic!("expected structure");
739        };
740
741        let mut scheme = None;
742        let mut path = None;
743        let mut op = None;
744        for (name, value) in fields {
745            match (name.as_str(), value) {
746                ("scheme", DecodedValue::String(v)) => scheme = Some(v),
747                ("path", DecodedValue::String(v)) => path = Some(v),
748                ("query", DecodedValue::Structure(query_fields)) => {
749                    for (qname, qvalue) in query_fields {
750                        if qname == "op" {
751                            if let DecodedValue::String(v) = qvalue {
752                                op = Some(v);
753                            }
754                        }
755                    }
756                }
757                _ => {}
758            }
759        }
760        assert_eq!(scheme.as_deref(), Some("pva"));
761        assert_eq!(path.as_deref(), Some("server"));
762        assert_eq!(op.as_deref(), Some("channels"));
763    }
764}