Skip to main content

spvirit_client/
pvlist.rs

1//! PV listing — discover available PV names from a PVA server.
2//!
3//! Provides multiple discovery strategies (tried in order by
4//! [`pvlist_with_fallback`]):
5//!
6//! 1. `__pvlist` GET (preferred — spvirit / EPICS7 servers)
7//! 2. Connection-level GET_FIELD (legacy, opt-in via env var)
8//! 3. Server RPC `op=channels`
9//! 4. Server GET with heuristic parsing
10
11use std::net::SocketAddr;
12
13use tokio::io::AsyncWriteExt;
14use tokio::net::TcpStream;
15use tokio::time::timeout;
16
17use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvd_decode::{
19    DecodedValue, FieldDesc, FieldType, PvdDecoder, StructureDesc, extract_nt_scalar_value,
20};
21use spvirit_codec::spvd_encode::{encode_string_pvd, encode_structure_desc};
22use spvirit_codec::spvirit_encode::encode_rpc_request;
23
24use crate::client::{
25    ChannelConn, build_client_validation, encode_get_field_request, encode_get_request,
26    establish_channel, pvget,
27};
28use crate::transport::{read_packet, read_until};
29use crate::types::{PvGetError, PvOptions};
30
31/// Which discovery strategy succeeded.
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum PvListSource {
34    PvList,
35    GetField,
36    ServerRpc,
37    ServerGet,
38}
39
40// ─── Helpers ─────────────────────────────────────────────────────────────────
41
42const PV_REQUEST_EMPTY: [u8; 6] = [0xfd, 0x02, 0x00, 0x80, 0x00, 0x00];
43
44/// Sort, deduplicate, and remove empty entries.
45pub fn normalize_pv_names(mut names: Vec<String>) -> Vec<String> {
46    names.retain(|name| !name.trim().is_empty());
47    names.sort();
48    names.dedup();
49    names
50}
51
52/// Extract PV names from a decoded `__pvlist` value (NTScalarArray of strings).
53pub fn parse_pvlist_value(value: &DecodedValue) -> Option<Vec<String>> {
54    let root = extract_nt_scalar_value(value).unwrap_or(value);
55    let DecodedValue::Array(items) = root else {
56        return None;
57    };
58    let mut out = Vec::with_capacity(items.len());
59    for item in items {
60        if let DecodedValue::String(name) = item {
61            out.push(name.clone());
62        } else {
63            return None;
64        }
65    }
66    Some(out)
67}
68
69fn candidate_server_addrs(opts: &PvOptions, server_addr: SocketAddr) -> Vec<SocketAddr> {
70    let mut out = vec![server_addr];
71    let default_addr = SocketAddr::new(server_addr.ip(), opts.tcp_port);
72    if default_addr != server_addr {
73        out.push(default_addr);
74    }
75    out
76}
77
78fn is_get_field_fallback_enabled() -> bool {
79    match std::env::var("EPICS_PVA_ENABLE_GET_FIELD_FALLBACK") {
80        Ok(v) => {
81            let v = v.trim().to_ascii_uppercase();
82            v == "YES" || v == "Y" || v == "1" || v == "TRUE"
83        }
84        Err(_) => false,
85    }
86}
87
88fn collect_strings_from_decoded(value: &DecodedValue, out: &mut Vec<String>) {
89    match value {
90        DecodedValue::String(s) => out.push(s.clone()),
91        DecodedValue::Array(items) => {
92            for item in items {
93                collect_strings_from_decoded(item, out);
94            }
95        }
96        DecodedValue::Structure(fields) => {
97            for (_, item) in fields {
98                collect_strings_from_decoded(item, out);
99            }
100        }
101        _ => {}
102    }
103}
104
105fn looks_like_pv_name(candidate: &str) -> bool {
106    if candidate.is_empty() || candidate.len() > 128 {
107        return false;
108    }
109    if candidate.chars().any(|c| c.is_whitespace()) {
110        return false;
111    }
112    let lower = candidate.to_ascii_lowercase();
113    let deny = [
114        "value",
115        "alarm",
116        "timestamp",
117        "display",
118        "control",
119        "severity",
120        "message",
121        "seconds",
122        "nanoseconds",
123        "units",
124    ];
125    if deny.iter().any(|d| lower == *d) {
126        return false;
127    }
128    if lower.starts_with("epics:") {
129        return false;
130    }
131    true
132}
133
134fn extract_ascii_candidates(raw: &[u8], out: &mut Vec<String>) {
135    let mut i = 0usize;
136    while i < raw.len() {
137        if raw[i].is_ascii_alphanumeric() {
138            let start = i;
139            i += 1;
140            while i < raw.len() {
141                let b = raw[i];
142                if b.is_ascii_alphanumeric()
143                    || b == b':'
144                    || b == b'.'
145                    || b == b'_'
146                    || b == b'-'
147                    || b == b'/'
148                {
149                    i += 1;
150                } else {
151                    break;
152                }
153            }
154            let len = i - start;
155            if (3..=128).contains(&len) {
156                if let Ok(s) = std::str::from_utf8(&raw[start..start + len]) {
157                    out.push(s.to_string());
158                }
159            }
160        } else {
161            i += 1;
162        }
163    }
164}
165
166fn encode_server_rpc_channels_request(is_be: bool) -> Vec<u8> {
167    let desc = StructureDesc {
168        struct_id: Some("epics:nt/NTURI:1.0".to_string()),
169        fields: vec![
170            FieldDesc {
171                name: "scheme".to_string(),
172                field_type: FieldType::String,
173            },
174            FieldDesc {
175                name: "path".to_string(),
176                field_type: FieldType::String,
177            },
178            FieldDesc {
179                name: "query".to_string(),
180                field_type: FieldType::Structure(StructureDesc {
181                    struct_id: None,
182                    fields: vec![FieldDesc {
183                        name: "op".to_string(),
184                        field_type: FieldType::String,
185                    }],
186                }),
187            },
188        ],
189    };
190
191    let mut out = Vec::new();
192    out.push(0x80);
193    out.extend_from_slice(&encode_structure_desc(&desc, is_be));
194    out.extend_from_slice(&encode_string_pvd("pva", is_be));
195    out.extend_from_slice(&encode_string_pvd("server", is_be));
196    out.extend_from_slice(&encode_string_pvd("channels", is_be));
197    out
198}
199
200// ─── Listing strategies ──────────────────────────────────────────────────────
201
202/// List PVs via the `__pvlist` channel (preferred).
203async fn list_pvs_via_pvlist(
204    opts: &PvOptions,
205    server_addr: SocketAddr,
206) -> Result<Vec<String>, PvGetError> {
207    let mut get_opts = opts.clone();
208    get_opts.pv_name = "__pvlist".to_string();
209    get_opts.server_addr = Some(server_addr);
210    let result = pvget(&get_opts).await?;
211    let names = parse_pvlist_value(&result.value)
212        .ok_or_else(|| PvGetError::Decode("failed to decode __pvlist value".to_string()))?;
213    Ok(normalize_pv_names(names))
214}
215
216/// List PVs via connection-level GET_FIELD.
217pub async fn list_pvs_via_get_field(
218    opts: &PvOptions,
219    server_addr: SocketAddr,
220    field_pattern: Option<&str>,
221) -> Result<Vec<String>, PvGetError> {
222    let mut stream = timeout(opts.timeout, TcpStream::connect(server_addr))
223        .await
224        .map_err(|_| PvGetError::Timeout("connect"))??;
225
226    let mut version = 2u8;
227    let mut is_be = false;
228
229    for _ in 0..2 {
230        if let Ok(bytes) = read_packet(&mut stream, opts.timeout).await {
231            let mut pkt = PvaPacket::new(&bytes);
232            if let Some(cmd) = pkt.decode_payload() {
233                match cmd {
234                    PvaPacketCommand::Control(payload) => {
235                        if payload.command == 2 {
236                            is_be = pkt.header.flags.is_msb;
237                        }
238                    }
239                    PvaPacketCommand::ConnectionValidation(_) => {
240                        version = pkt.header.version;
241                        is_be = pkt.header.flags.is_msb;
242                    }
243                    _ => {}
244                }
245            }
246        }
247    }
248
249    let validation = build_client_validation(opts, version, is_be);
250    stream.write_all(&validation).await?;
251
252    let _ = read_until(&mut stream, opts.timeout, |cmd| {
253        matches!(cmd, PvaPacketCommand::ConnectionValidated(_))
254    })
255    .await?;
256
257    let get_field = encode_get_field_request(0, 0, field_pattern, version, is_be);
258    stream.write_all(&get_field).await?;
259
260    let field_resp = read_until(
261        &mut stream,
262        opts.timeout,
263        |cmd| matches!(cmd, PvaPacketCommand::GetField(payload) if payload.is_server),
264    )
265    .await?;
266    let mut pkt = PvaPacket::new(&field_resp);
267    let cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
268        "get_field listing decode failed".to_string(),
269    ))?;
270    let PvaPacketCommand::GetField(payload) = cmd else {
271        return Err(PvGetError::Protocol(
272            "unexpected GET_FIELD response".to_string(),
273        ));
274    };
275
276    if payload.status.as_ref().is_some_and(|s| s.is_error()) {
277        let detail = payload
278            .status
279            .as_ref()
280            .map(ToString::to_string)
281            .unwrap_or_default();
282        return Err(PvGetError::Protocol(format!(
283            "get_field listing error: {}",
284            detail
285        )));
286    }
287
288    let desc = payload
289        .introspection
290        .ok_or_else(|| PvGetError::Decode("missing GET_FIELD introspection".to_string()))?;
291
292    let names = desc.fields.into_iter().map(|f| f.name).collect::<Vec<_>>();
293    Ok(normalize_pv_names(names))
294}
295
296/// List PVs via server RPC on a specific channel name.
297async fn list_pvs_via_server_rpc_channel(
298    opts: &PvOptions,
299    server_addr: SocketAddr,
300    rpc_channel: &str,
301) -> Result<Vec<String>, PvGetError> {
302    let mut rpc_opts = opts.clone();
303    rpc_opts.pv_name = rpc_channel.to_string();
304    let ChannelConn {
305        mut stream,
306        sid,
307        version,
308        is_be,
309        ..
310    } = establish_channel(server_addr, &rpc_opts).await?;
311
312    let ioid = 1u32;
313    let rpc_init = encode_rpc_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
314    stream.write_all(&rpc_init).await?;
315
316    let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
317        PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && (op.subcmd & 0x08) != 0,
318        _ => false,
319    })
320    .await?;
321    let mut pkt = PvaPacket::new(&init_resp);
322    let init_cmd = pkt
323        .decode_payload()
324        .ok_or(PvGetError::Protocol("rpc init decode failed".to_string()))?;
325    if let PvaPacketCommand::Op(op) = init_cmd {
326        if op.status.as_ref().is_some_and(|s| s.is_error()) {
327            let detail = op
328                .status
329                .as_ref()
330                .map(ToString::to_string)
331                .unwrap_or_default();
332            return Err(PvGetError::Protocol(format!("rpc init failed: {}", detail)));
333        }
334    }
335
336    let rpc_payload = encode_server_rpc_channels_request(is_be);
337    let rpc_req = encode_rpc_request(sid, ioid, 0x00, &rpc_payload, version, is_be);
338    stream.write_all(&rpc_req).await?;
339
340    let rpc_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
341        PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && op.subcmd == 0x00,
342        _ => false,
343    })
344    .await?;
345    let mut pkt = PvaPacket::new(&rpc_resp);
346    let rpc_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
347        "rpc response decode failed".to_string(),
348    ))?;
349    let PvaPacketCommand::Op(op) = rpc_cmd else {
350        return Err(PvGetError::Protocol("unexpected RPC response".to_string()));
351    };
352    if op.status.as_ref().is_some_and(|s| s.is_error()) {
353        let detail = op
354            .status
355            .as_ref()
356            .map(ToString::to_string)
357            .unwrap_or_default();
358        return Err(PvGetError::Protocol(format!(
359            "rpc execute failed: {}",
360            detail
361        )));
362    }
363
364    if op.body.is_empty() {
365        return Err(PvGetError::Decode("empty RPC response".to_string()));
366    }
367
368    let decoder = PvdDecoder::new(is_be);
369    let (desc, consumed) = decoder
370        .parse_introspection_with_len(&op.body)
371        .ok_or_else(|| PvGetError::Decode("RPC missing introspection".to_string()))?;
372    let value_raw = op
373        .body
374        .get(consumed..)
375        .ok_or_else(|| PvGetError::Decode("RPC malformed payload".to_string()))?;
376    let (decoded, _) = decoder
377        .decode_structure(value_raw, &desc)
378        .ok_or_else(|| PvGetError::Decode("RPC decode failed".to_string()))?;
379
380    let mut strings = Vec::new();
381    collect_strings_from_decoded(&decoded, &mut strings);
382    if strings.is_empty() {
383        return Err(PvGetError::Decode(
384            "RPC list returned no strings".to_string(),
385        ));
386    }
387    Ok(normalize_pv_names(strings))
388}
389
390/// List PVs via server RPC, trying `"server"` then `"__server"`.
391pub async fn list_pvs_via_server_rpc(
392    opts: &PvOptions,
393    server_addr: SocketAddr,
394) -> Result<Vec<String>, PvGetError> {
395    let mut errs = Vec::new();
396    for channel in ["server", "__server"] {
397        match list_pvs_via_server_rpc_channel(opts, server_addr, channel).await {
398            Ok(names) => return Ok(names),
399            Err(err) => errs.push(format!("{}: {}", channel, err)),
400        }
401    }
402    Err(PvGetError::Protocol(format!(
403        "server RPC unavailable: {}",
404        errs.join(" | ")
405    )))
406}
407
408/// List PVs via server GET, trying `"server"` then `"__server"`.
409pub async fn list_pvs_via_server_get(
410    opts: &PvOptions,
411    server_addr: SocketAddr,
412) -> Result<Vec<String>, PvGetError> {
413    async fn get_channel(
414        opts: &PvOptions,
415        server_addr: SocketAddr,
416        channel: &str,
417    ) -> Result<Vec<String>, PvGetError> {
418        let mut get_opts = opts.clone();
419        get_opts.pv_name = channel.to_string();
420        let ChannelConn {
421            mut stream,
422            sid,
423            version,
424            is_be,
425            ..
426        } = establish_channel(server_addr, &get_opts).await?;
427
428        let ioid = 1u32;
429        let init_req = encode_get_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
430        stream.write_all(&init_req).await?;
431        let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
432            PvaPacketCommand::Op(op) => {
433                op.command == 10 && op.ioid == ioid && (op.subcmd & 0x08) != 0
434            }
435            _ => false,
436        })
437        .await?;
438        let mut pkt = PvaPacket::new(&init_resp);
439        let init_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
440            "server get init decode failed".to_string(),
441        ))?;
442        let init_desc = match init_cmd {
443            PvaPacketCommand::Op(op) => {
444                if op.status.as_ref().is_some_and(|s| s.is_error()) {
445                    let detail = op
446                        .status
447                        .as_ref()
448                        .map(ToString::to_string)
449                        .unwrap_or_default();
450                    return Err(PvGetError::Protocol(format!(
451                        "server GET init failed: {}",
452                        detail
453                    )));
454                }
455                op.introspection
456            }
457            _ => None,
458        };
459
460        let data_req = encode_get_request(sid, ioid, 0x00, &[], version, is_be);
461        stream.write_all(&data_req).await?;
462        let data_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
463            PvaPacketCommand::Op(op) => op.command == 10 && op.ioid == ioid && op.subcmd == 0x00,
464            _ => false,
465        })
466        .await?;
467        let mut pkt = PvaPacket::new(&data_resp);
468        let data_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
469            "server get data decode failed".to_string(),
470        ))?;
471
472        let PvaPacketCommand::Op(mut op) = data_cmd else {
473            return Err(PvGetError::Protocol(
474                "unexpected GET data response".to_string(),
475            ));
476        };
477        if op.status.as_ref().is_some_and(|s| s.is_error()) {
478            let detail = op
479                .status
480                .as_ref()
481                .map(ToString::to_string)
482                .unwrap_or_default();
483            return Err(PvGetError::Protocol(format!(
484                "server GET data failed: {}",
485                detail
486            )));
487        }
488
489        let mut names = Vec::new();
490        names.extend(op.pv_names.clone());
491        extract_ascii_candidates(&op.body, &mut names);
492        if let Some(desc) = &init_desc {
493            for field in &desc.fields {
494                names.push(field.name.clone());
495            }
496            op.decode_with_field_desc(desc, is_be);
497            if let Some(decoded) = &op.decoded_value {
498                collect_strings_from_decoded(decoded, &mut names);
499            }
500        }
501
502        let mut names = normalize_pv_names(names);
503        names.retain(|n| looks_like_pv_name(n));
504        if names.is_empty() {
505            return Err(PvGetError::Decode(
506                "server GET returned no PV-like names".to_string(),
507            ));
508        }
509        Ok(names)
510    }
511
512    let mut errs = Vec::new();
513    for channel in ["server", "__server"] {
514        match get_channel(opts, server_addr, channel).await {
515            Ok(names) => return Ok(names),
516            Err(err) => errs.push(format!("{}: {}", channel, err)),
517        }
518    }
519    Err(PvGetError::Protocol(format!(
520        "server GET unavailable: {}",
521        errs.join(" | ")
522    )))
523}
524
525// ─── Public API ──────────────────────────────────────────────────────────────
526
527/// List PV names from a server using `__pvlist` GET (preferred method).
528pub async fn pvlist(opts: &PvOptions, server_addr: SocketAddr) -> Result<Vec<String>, PvGetError> {
529    list_pvs_via_pvlist(opts, server_addr).await
530}
531
532/// List PV names with automatic fallback through all strategies.
533///
534/// Tries (in order): `__pvlist` → GET_FIELD (opt-in) → Server RPC → Server GET.
535pub async fn pvlist_with_fallback(
536    opts: &PvOptions,
537    server_addr: SocketAddr,
538) -> Result<(Vec<String>, PvListSource), PvGetError> {
539    pvlist_with_fallback_progress(opts, server_addr, |_| {}).await
540}
541
542/// List PV names with fallback and progress callback.
543pub async fn pvlist_with_fallback_progress<F>(
544    opts: &PvOptions,
545    server_addr: SocketAddr,
546    mut on_progress: F,
547) -> Result<(Vec<String>, PvListSource), PvGetError>
548where
549    F: FnMut(&str),
550{
551    let addrs = candidate_server_addrs(opts, server_addr);
552    let mut attempts = Vec::new();
553    let get_field_fallback = is_get_field_fallback_enabled();
554
555    if addrs.len() > 1 {
556        on_progress(&format!(
557            "Trying {} candidate server endpoints...",
558            addrs.len()
559        ));
560    }
561    if !get_field_fallback {
562        on_progress(
563            "GET_FIELD fallback is disabled by default (set EPICS_PVA_ENABLE_GET_FIELD_FALLBACK=YES to enable)",
564        );
565    }
566
567    for addr in addrs {
568        on_progress(&format!("Trying __pvlist on {}", addr));
569        let primary = list_pvs_via_pvlist(opts, addr).await;
570        match primary {
571            Ok(names) => return Ok((normalize_pv_names(names), PvListSource::PvList)),
572            Err(primary_err) => {
573                let get_field_result = if get_field_fallback {
574                    on_progress(&format!(
575                        "__pvlist unavailable on {}; trying GET_FIELD(*)",
576                        addr
577                    ));
578                    let fallback_star = list_pvs_via_get_field(opts, addr, Some("*")).await;
579                    match fallback_star {
580                        Ok(names) => {
581                            return Ok((normalize_pv_names(names), PvListSource::GetField));
582                        }
583                        Err(star_err) => {
584                            on_progress(&format!(
585                                "GET_FIELD(*) unavailable on {}; trying GET_FIELD(<empty>)",
586                                addr
587                            ));
588                            let fallback_empty = list_pvs_via_get_field(opts, addr, None).await;
589                            match fallback_empty {
590                                Ok(names) => {
591                                    return Ok((normalize_pv_names(names), PvListSource::GetField));
592                                }
593                                Err(empty_err) => Some(format!(
594                                    "GET_FIELD(*): {}; GET_FIELD(<empty>): {}",
595                                    star_err, empty_err
596                                )),
597                            }
598                        }
599                    }
600                } else {
601                    None
602                };
603
604                on_progress(&format!(
605                    "__pvlist unavailable on {}; trying RPC(server)",
606                    addr
607                ));
608                match list_pvs_via_server_rpc(opts, addr).await {
609                    Ok(names) => return Ok((normalize_pv_names(names), PvListSource::ServerRpc)),
610                    Err(rpc_err) => {
611                        on_progress(&format!(
612                            "RPC(server) unavailable on {}; trying GET(server)",
613                            addr
614                        ));
615                        match list_pvs_via_server_get(opts, addr).await {
616                            Ok(names) => {
617                                return Ok((normalize_pv_names(names), PvListSource::ServerGet));
618                            }
619                            Err(get_err) => {
620                                let get_field_msg = get_field_result
621                                    .unwrap_or_else(|| "GET_FIELD: disabled".to_string());
622                                attempts.push(format!(
623                                    "{} => __pvlist: {}; {}; RPC(server): {}; GET(server): {}",
624                                    addr, primary_err, get_field_msg, rpc_err, get_err
625                                ));
626                            }
627                        }
628                    }
629                }
630            }
631        }
632    }
633
634    Err(PvGetError::Protocol(format!(
635        "failed to list PVs from {}: {}",
636        server_addr,
637        attempts.join(" | ")
638    )))
639}
640
641// ─── Tests ───────────────────────────────────────────────────────────────────
642
643#[cfg(test)]
644mod tests {
645    use super::*;
646
647    #[test]
648    fn parse_pvlist_value_extracts_ntscalararray_strings() {
649        let value = DecodedValue::Structure(vec![
650            (
651                "value".to_string(),
652                DecodedValue::Array(vec![
653                    DecodedValue::String("SIM:AI".to_string()),
654                    DecodedValue::String("SIM:AO".to_string()),
655                ]),
656            ),
657            ("alarm".to_string(), DecodedValue::Structure(vec![])),
658        ]);
659
660        let parsed = parse_pvlist_value(&value).expect("parsed");
661        assert_eq!(parsed, vec!["SIM:AI".to_string(), "SIM:AO".to_string()]);
662    }
663
664    #[test]
665    fn normalize_pv_names_sorts_and_deduplicates() {
666        let names = vec!["B".into(), "A".into(), "B".into(), " ".into()];
667        let result = normalize_pv_names(names);
668        assert_eq!(result, vec!["A".to_string(), "B".to_string()]);
669    }
670
671    #[test]
672    fn candidate_server_addrs_adds_default_tcp_port_fallback() {
673        let mut opts = PvOptions::new(String::new());
674        opts.tcp_port = 5075;
675        let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
676        let addrs = candidate_server_addrs(&opts, addr);
677        assert_eq!(addrs.len(), 2);
678        assert_eq!(addrs[0], addr);
679        assert_eq!(addrs[1], "10.0.0.2:5075".parse::<SocketAddr>().unwrap());
680    }
681
682    #[test]
683    fn candidate_server_addrs_no_dup_when_same_port() {
684        let mut opts = PvOptions::new(String::new());
685        opts.tcp_port = 6000;
686        let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
687        let addrs = candidate_server_addrs(&opts, addr);
688        assert_eq!(addrs.len(), 1);
689    }
690
691    #[test]
692    fn collect_strings_from_decoded_extracts_nested_strings() {
693        let value = DecodedValue::Structure(vec![
694            ("a".to_string(), DecodedValue::String("ONE".to_string())),
695            (
696                "b".to_string(),
697                DecodedValue::Array(vec![DecodedValue::String("TWO".to_string())]),
698            ),
699        ]);
700        let mut out = Vec::new();
701        collect_strings_from_decoded(&value, &mut out);
702        assert_eq!(out, vec!["ONE".to_string(), "TWO".to_string()]);
703    }
704
705    #[test]
706    fn extract_ascii_candidates_finds_pv_like_tokens() {
707        let raw = b"\x00SIM:AI\x00junk\x00IOC-01:PV1\x00";
708        let mut out = Vec::new();
709        extract_ascii_candidates(raw, &mut out);
710        assert!(out.iter().any(|s| s == "SIM:AI"));
711        assert!(out.iter().any(|s| s == "IOC-01:PV1"));
712    }
713
714    #[test]
715    fn looks_like_pv_name_filters_metadata() {
716        assert!(looks_like_pv_name("SIM:AI"));
717        assert!(looks_like_pv_name("IOC-01:PV1"));
718        assert!(!looks_like_pv_name("value"));
719        assert!(!looks_like_pv_name("alarm"));
720        assert!(!looks_like_pv_name("epics:nt/NTScalar:1.0"));
721        assert!(!looks_like_pv_name(""));
722        assert!(!looks_like_pv_name("has space"));
723    }
724
725    #[test]
726    fn encode_server_rpc_channels_request_uses_nturi_channels() {
727        let payload = encode_server_rpc_channels_request(false);
728        assert_eq!(payload.first(), Some(&0x80));
729
730        let decoder = PvdDecoder::new(false);
731        let (desc, consumed) = decoder
732            .parse_introspection_with_len(&payload)
733            .expect("introspection");
734        assert_eq!(desc.struct_id.as_deref(), Some("epics:nt/NTURI:1.0"));
735
736        let (decoded, _) = decoder
737            .decode_structure(&payload[consumed..], &desc)
738            .expect("decode payload");
739        let DecodedValue::Structure(fields) = decoded else {
740            panic!("expected structure");
741        };
742
743        let mut scheme = None;
744        let mut path = None;
745        let mut op = None;
746        for (name, value) in fields {
747            match (name.as_str(), value) {
748                ("scheme", DecodedValue::String(v)) => scheme = Some(v),
749                ("path", DecodedValue::String(v)) => path = Some(v),
750                ("query", DecodedValue::Structure(query_fields)) => {
751                    for (qname, qvalue) in query_fields {
752                        if qname == "op" {
753                            if let DecodedValue::String(v) = qvalue {
754                                op = Some(v);
755                            }
756                        }
757                    }
758                }
759                _ => {}
760            }
761        }
762        assert_eq!(scheme.as_deref(), Some("pva"));
763        assert_eq!(path.as_deref(), Some("server"));
764        assert_eq!(op.as_deref(), Some("channels"));
765    }
766}