1use std::net::SocketAddr;
12
13use tokio::io::AsyncWriteExt;
14use tokio::net::TcpStream;
15use tokio::time::timeout;
16
17use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvirit_encode::encode_rpc_request;
19use spvirit_codec::spvd_decode::{
20 extract_nt_scalar_value, DecodedValue, FieldDesc, FieldType, PvdDecoder, StructureDesc,
21};
22use spvirit_codec::spvd_encode::{encode_string_pvd, encode_structure_desc};
23
24use crate::client::{
25 build_client_validation, encode_get_field_request, encode_get_request, establish_channel,
26 pvget, ChannelConn,
27};
28use crate::transport::{read_packet, read_until};
29use crate::types::{PvGetError, PvOptions};
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum PvListSource {
34 PvList,
35 GetField,
36 ServerRpc,
37 ServerGet,
38}
39
40const PV_REQUEST_EMPTY: [u8; 6] = [0xfd, 0x02, 0x00, 0x80, 0x00, 0x00];
43
44pub fn normalize_pv_names(mut names: Vec<String>) -> Vec<String> {
46 names.retain(|name| !name.trim().is_empty());
47 names.sort();
48 names.dedup();
49 names
50}
51
52pub fn parse_pvlist_value(value: &DecodedValue) -> Option<Vec<String>> {
54 let root = extract_nt_scalar_value(value).unwrap_or(value);
55 let DecodedValue::Array(items) = root else {
56 return None;
57 };
58 let mut out = Vec::with_capacity(items.len());
59 for item in items {
60 if let DecodedValue::String(name) = item {
61 out.push(name.clone());
62 } else {
63 return None;
64 }
65 }
66 Some(out)
67}
68
69fn candidate_server_addrs(opts: &PvOptions, server_addr: SocketAddr) -> Vec<SocketAddr> {
70 let mut out = vec![server_addr];
71 let default_addr = SocketAddr::new(server_addr.ip(), opts.tcp_port);
72 if default_addr != server_addr {
73 out.push(default_addr);
74 }
75 out
76}
77
78fn is_get_field_fallback_enabled() -> bool {
79 match std::env::var("EPICS_PVA_ENABLE_GET_FIELD_FALLBACK") {
80 Ok(v) => {
81 let v = v.trim().to_ascii_uppercase();
82 v == "YES" || v == "Y" || v == "1" || v == "TRUE"
83 }
84 Err(_) => false,
85 }
86}
87
88fn collect_strings_from_decoded(value: &DecodedValue, out: &mut Vec<String>) {
89 match value {
90 DecodedValue::String(s) => out.push(s.clone()),
91 DecodedValue::Array(items) => {
92 for item in items {
93 collect_strings_from_decoded(item, out);
94 }
95 }
96 DecodedValue::Structure(fields) => {
97 for (_, item) in fields {
98 collect_strings_from_decoded(item, out);
99 }
100 }
101 _ => {}
102 }
103}
104
105fn looks_like_pv_name(candidate: &str) -> bool {
106 if candidate.is_empty() || candidate.len() > 128 {
107 return false;
108 }
109 if candidate.chars().any(|c| c.is_whitespace()) {
110 return false;
111 }
112 let lower = candidate.to_ascii_lowercase();
113 let deny = [
114 "value",
115 "alarm",
116 "timestamp",
117 "display",
118 "control",
119 "severity",
120 "message",
121 "seconds",
122 "nanoseconds",
123 "units",
124 ];
125 if deny.iter().any(|d| lower == *d) {
126 return false;
127 }
128 if lower.starts_with("epics:") {
129 return false;
130 }
131 true
132}
133
134fn extract_ascii_candidates(raw: &[u8], out: &mut Vec<String>) {
135 let mut i = 0usize;
136 while i < raw.len() {
137 if raw[i].is_ascii_alphanumeric() {
138 let start = i;
139 i += 1;
140 while i < raw.len() {
141 let b = raw[i];
142 if b.is_ascii_alphanumeric()
143 || b == b':'
144 || b == b'.'
145 || b == b'_'
146 || b == b'-'
147 || b == b'/'
148 {
149 i += 1;
150 } else {
151 break;
152 }
153 }
154 let len = i - start;
155 if (3..=128).contains(&len) {
156 if let Ok(s) = std::str::from_utf8(&raw[start..start + len]) {
157 out.push(s.to_string());
158 }
159 }
160 } else {
161 i += 1;
162 }
163 }
164}
165
166fn encode_server_rpc_channels_request(is_be: bool) -> Vec<u8> {
167 let desc = StructureDesc {
168 struct_id: Some("epics:nt/NTURI:1.0".to_string()),
169 fields: vec![
170 FieldDesc {
171 name: "scheme".to_string(),
172 field_type: FieldType::String,
173 },
174 FieldDesc {
175 name: "path".to_string(),
176 field_type: FieldType::String,
177 },
178 FieldDesc {
179 name: "query".to_string(),
180 field_type: FieldType::Structure(StructureDesc {
181 struct_id: None,
182 fields: vec![FieldDesc {
183 name: "op".to_string(),
184 field_type: FieldType::String,
185 }],
186 }),
187 },
188 ],
189 };
190
191 let mut out = Vec::new();
192 out.push(0x80);
193 out.extend_from_slice(&encode_structure_desc(&desc, is_be));
194 out.extend_from_slice(&encode_string_pvd("pva", is_be));
195 out.extend_from_slice(&encode_string_pvd("server", is_be));
196 out.extend_from_slice(&encode_string_pvd("channels", is_be));
197 out
198}
199
200async fn list_pvs_via_pvlist(
204 opts: &PvOptions,
205 server_addr: SocketAddr,
206) -> Result<Vec<String>, PvGetError> {
207 let mut get_opts = opts.clone();
208 get_opts.pv_name = "__pvlist".to_string();
209 get_opts.server_addr = Some(server_addr);
210 let result = pvget(&get_opts).await?;
211 let names = parse_pvlist_value(&result.value)
212 .ok_or_else(|| PvGetError::Decode("failed to decode __pvlist value".to_string()))?;
213 Ok(normalize_pv_names(names))
214}
215
216pub async fn list_pvs_via_get_field(
218 opts: &PvOptions,
219 server_addr: SocketAddr,
220 field_pattern: Option<&str>,
221) -> Result<Vec<String>, PvGetError> {
222 let mut stream = timeout(opts.timeout, TcpStream::connect(server_addr))
223 .await
224 .map_err(|_| PvGetError::Timeout("connect"))??;
225
226 let mut version = 2u8;
227 let mut is_be = false;
228
229 for _ in 0..2 {
230 if let Ok(bytes) = read_packet(&mut stream, opts.timeout).await {
231 let mut pkt = PvaPacket::new(&bytes);
232 if let Some(cmd) = pkt.decode_payload() {
233 match cmd {
234 PvaPacketCommand::Control(payload) => {
235 if payload.command == 2 {
236 is_be = pkt.header.flags.is_msb;
237 }
238 }
239 PvaPacketCommand::ConnectionValidation(_) => {
240 version = pkt.header.version;
241 is_be = pkt.header.flags.is_msb;
242 }
243 _ => {}
244 }
245 }
246 }
247 }
248
249 let validation = build_client_validation(opts, version, is_be);
250 stream.write_all(&validation).await?;
251
252 let _ = read_until(&mut stream, opts.timeout, |cmd| {
253 matches!(cmd, PvaPacketCommand::ConnectionValidated(_))
254 })
255 .await?;
256
257 let get_field = encode_get_field_request(0, 0, field_pattern, version, is_be);
258 stream.write_all(&get_field).await?;
259
260 let field_resp = read_until(
261 &mut stream,
262 opts.timeout,
263 |cmd| matches!(cmd, PvaPacketCommand::GetField(payload) if payload.is_server),
264 )
265 .await?;
266 let mut pkt = PvaPacket::new(&field_resp);
267 let cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
268 "get_field listing decode failed".to_string(),
269 ))?;
270 let PvaPacketCommand::GetField(payload) = cmd else {
271 return Err(PvGetError::Protocol(
272 "unexpected GET_FIELD response".to_string(),
273 ));
274 };
275
276 if payload.status.as_ref().is_some_and(|s| s.is_error()) {
277 let detail = payload
278 .status
279 .as_ref()
280 .map(ToString::to_string)
281 .unwrap_or_default();
282 return Err(PvGetError::Protocol(format!(
283 "get_field listing error: {}",
284 detail
285 )));
286 }
287
288 let desc = payload
289 .introspection
290 .ok_or_else(|| PvGetError::Decode("missing GET_FIELD introspection".to_string()))?;
291
292 let names = desc.fields.into_iter().map(|f| f.name).collect::<Vec<_>>();
293 Ok(normalize_pv_names(names))
294}
295
296async fn list_pvs_via_server_rpc_channel(
298 opts: &PvOptions,
299 server_addr: SocketAddr,
300 rpc_channel: &str,
301) -> Result<Vec<String>, PvGetError> {
302 let mut rpc_opts = opts.clone();
303 rpc_opts.pv_name = rpc_channel.to_string();
304 let ChannelConn {
305 mut stream,
306 sid,
307 version,
308 is_be,
309 } = establish_channel(server_addr, &rpc_opts).await?;
310
311 let ioid = 1u32;
312 let rpc_init = encode_rpc_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
313 stream.write_all(&rpc_init).await?;
314
315 let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
316 PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && (op.subcmd & 0x08) != 0,
317 _ => false,
318 })
319 .await?;
320 let mut pkt = PvaPacket::new(&init_resp);
321 let init_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
322 "rpc init decode failed".to_string(),
323 ))?;
324 if let PvaPacketCommand::Op(op) = init_cmd {
325 if op.status.as_ref().is_some_and(|s| s.is_error()) {
326 let detail = op
327 .status
328 .as_ref()
329 .map(ToString::to_string)
330 .unwrap_or_default();
331 return Err(PvGetError::Protocol(format!("rpc init failed: {}", detail)));
332 }
333 }
334
335 let rpc_payload = encode_server_rpc_channels_request(is_be);
336 let rpc_req = encode_rpc_request(sid, ioid, 0x00, &rpc_payload, version, is_be);
337 stream.write_all(&rpc_req).await?;
338
339 let rpc_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
340 PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && op.subcmd == 0x00,
341 _ => false,
342 })
343 .await?;
344 let mut pkt = PvaPacket::new(&rpc_resp);
345 let rpc_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
346 "rpc response decode failed".to_string(),
347 ))?;
348 let PvaPacketCommand::Op(op) = rpc_cmd else {
349 return Err(PvGetError::Protocol("unexpected RPC response".to_string()));
350 };
351 if op.status.as_ref().is_some_and(|s| s.is_error()) {
352 let detail = op
353 .status
354 .as_ref()
355 .map(ToString::to_string)
356 .unwrap_or_default();
357 return Err(PvGetError::Protocol(format!(
358 "rpc execute failed: {}",
359 detail
360 )));
361 }
362
363 if op.body.is_empty() {
364 return Err(PvGetError::Decode("empty RPC response".to_string()));
365 }
366
367 let decoder = PvdDecoder::new(is_be);
368 let (desc, consumed) = decoder
369 .parse_introspection_with_len(&op.body)
370 .ok_or_else(|| PvGetError::Decode("RPC missing introspection".to_string()))?;
371 let value_raw = op
372 .body
373 .get(consumed..)
374 .ok_or_else(|| PvGetError::Decode("RPC malformed payload".to_string()))?;
375 let (decoded, _) = decoder
376 .decode_structure(value_raw, &desc)
377 .ok_or_else(|| PvGetError::Decode("RPC decode failed".to_string()))?;
378
379 let mut strings = Vec::new();
380 collect_strings_from_decoded(&decoded, &mut strings);
381 if strings.is_empty() {
382 return Err(PvGetError::Decode(
383 "RPC list returned no strings".to_string(),
384 ));
385 }
386 Ok(normalize_pv_names(strings))
387}
388
389pub async fn list_pvs_via_server_rpc(
391 opts: &PvOptions,
392 server_addr: SocketAddr,
393) -> Result<Vec<String>, PvGetError> {
394 let mut errs = Vec::new();
395 for channel in ["server", "__server"] {
396 match list_pvs_via_server_rpc_channel(opts, server_addr, channel).await {
397 Ok(names) => return Ok(names),
398 Err(err) => errs.push(format!("{}: {}", channel, err)),
399 }
400 }
401 Err(PvGetError::Protocol(format!(
402 "server RPC unavailable: {}",
403 errs.join(" | ")
404 )))
405}
406
407pub async fn list_pvs_via_server_get(
409 opts: &PvOptions,
410 server_addr: SocketAddr,
411) -> Result<Vec<String>, PvGetError> {
412 async fn get_channel(
413 opts: &PvOptions,
414 server_addr: SocketAddr,
415 channel: &str,
416 ) -> Result<Vec<String>, PvGetError> {
417 let mut get_opts = opts.clone();
418 get_opts.pv_name = channel.to_string();
419 let ChannelConn {
420 mut stream,
421 sid,
422 version,
423 is_be,
424 } = establish_channel(server_addr, &get_opts).await?;
425
426 let ioid = 1u32;
427 let init_req =
428 encode_get_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
429 stream.write_all(&init_req).await?;
430 let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
431 PvaPacketCommand::Op(op) => {
432 op.command == 10 && op.ioid == ioid && (op.subcmd & 0x08) != 0
433 }
434 _ => false,
435 })
436 .await?;
437 let mut pkt = PvaPacket::new(&init_resp);
438 let init_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
439 "server get init decode failed".to_string(),
440 ))?;
441 let init_desc = match init_cmd {
442 PvaPacketCommand::Op(op) => {
443 if op.status.as_ref().is_some_and(|s| s.is_error()) {
444 let detail = op
445 .status
446 .as_ref()
447 .map(ToString::to_string)
448 .unwrap_or_default();
449 return Err(PvGetError::Protocol(format!(
450 "server GET init failed: {}",
451 detail
452 )));
453 }
454 op.introspection
455 }
456 _ => None,
457 };
458
459 let data_req = encode_get_request(sid, ioid, 0x00, &[], version, is_be);
460 stream.write_all(&data_req).await?;
461 let data_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
462 PvaPacketCommand::Op(op) => op.command == 10 && op.ioid == ioid && op.subcmd == 0x00,
463 _ => false,
464 })
465 .await?;
466 let mut pkt = PvaPacket::new(&data_resp);
467 let data_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
468 "server get data decode failed".to_string(),
469 ))?;
470
471 let PvaPacketCommand::Op(mut op) = data_cmd else {
472 return Err(PvGetError::Protocol(
473 "unexpected GET data response".to_string(),
474 ));
475 };
476 if op.status.as_ref().is_some_and(|s| s.is_error()) {
477 let detail = op
478 .status
479 .as_ref()
480 .map(ToString::to_string)
481 .unwrap_or_default();
482 return Err(PvGetError::Protocol(format!(
483 "server GET data failed: {}",
484 detail
485 )));
486 }
487
488 let mut names = Vec::new();
489 names.extend(op.pv_names.clone());
490 extract_ascii_candidates(&op.body, &mut names);
491 if let Some(desc) = &init_desc {
492 for field in &desc.fields {
493 names.push(field.name.clone());
494 }
495 op.decode_with_field_desc(desc, is_be);
496 if let Some(decoded) = &op.decoded_value {
497 collect_strings_from_decoded(decoded, &mut names);
498 }
499 }
500
501 let mut names = normalize_pv_names(names);
502 names.retain(|n| looks_like_pv_name(n));
503 if names.is_empty() {
504 return Err(PvGetError::Decode(
505 "server GET returned no PV-like names".to_string(),
506 ));
507 }
508 Ok(names)
509 }
510
511 let mut errs = Vec::new();
512 for channel in ["server", "__server"] {
513 match get_channel(opts, server_addr, channel).await {
514 Ok(names) => return Ok(names),
515 Err(err) => errs.push(format!("{}: {}", channel, err)),
516 }
517 }
518 Err(PvGetError::Protocol(format!(
519 "server GET unavailable: {}",
520 errs.join(" | ")
521 )))
522}
523
524pub async fn pvlist(
528 opts: &PvOptions,
529 server_addr: SocketAddr,
530) -> Result<Vec<String>, PvGetError> {
531 list_pvs_via_pvlist(opts, server_addr).await
532}
533
534pub async fn pvlist_with_fallback(
538 opts: &PvOptions,
539 server_addr: SocketAddr,
540) -> Result<(Vec<String>, PvListSource), PvGetError> {
541 pvlist_with_fallback_progress(opts, server_addr, |_| {}).await
542}
543
544pub async fn pvlist_with_fallback_progress<F>(
546 opts: &PvOptions,
547 server_addr: SocketAddr,
548 mut on_progress: F,
549) -> Result<(Vec<String>, PvListSource), PvGetError>
550where
551 F: FnMut(&str),
552{
553 let addrs = candidate_server_addrs(opts, server_addr);
554 let mut attempts = Vec::new();
555 let get_field_fallback = is_get_field_fallback_enabled();
556
557 if addrs.len() > 1 {
558 on_progress(&format!(
559 "Trying {} candidate server endpoints...",
560 addrs.len()
561 ));
562 }
563 if !get_field_fallback {
564 on_progress(
565 "GET_FIELD fallback is disabled by default (set EPICS_PVA_ENABLE_GET_FIELD_FALLBACK=YES to enable)",
566 );
567 }
568
569 for addr in addrs {
570 on_progress(&format!("Trying __pvlist on {}", addr));
571 let primary = list_pvs_via_pvlist(opts, addr).await;
572 match primary {
573 Ok(names) => return Ok((normalize_pv_names(names), PvListSource::PvList)),
574 Err(primary_err) => {
575 let get_field_result = if get_field_fallback {
576 on_progress(&format!(
577 "__pvlist unavailable on {}; trying GET_FIELD(*)",
578 addr
579 ));
580 let fallback_star = list_pvs_via_get_field(opts, addr, Some("*")).await;
581 match fallback_star {
582 Ok(names) => {
583 return Ok((normalize_pv_names(names), PvListSource::GetField));
584 }
585 Err(star_err) => {
586 on_progress(&format!(
587 "GET_FIELD(*) unavailable on {}; trying GET_FIELD(<empty>)",
588 addr
589 ));
590 let fallback_empty = list_pvs_via_get_field(opts, addr, None).await;
591 match fallback_empty {
592 Ok(names) => {
593 return Ok((normalize_pv_names(names), PvListSource::GetField));
594 }
595 Err(empty_err) => Some(format!(
596 "GET_FIELD(*): {}; GET_FIELD(<empty>): {}",
597 star_err, empty_err
598 )),
599 }
600 }
601 }
602 } else {
603 None
604 };
605
606 on_progress(&format!(
607 "__pvlist unavailable on {}; trying RPC(server)",
608 addr
609 ));
610 match list_pvs_via_server_rpc(opts, addr).await {
611 Ok(names) => return Ok((normalize_pv_names(names), PvListSource::ServerRpc)),
612 Err(rpc_err) => {
613 on_progress(&format!(
614 "RPC(server) unavailable on {}; trying GET(server)",
615 addr
616 ));
617 match list_pvs_via_server_get(opts, addr).await {
618 Ok(names) => {
619 return Ok((normalize_pv_names(names), PvListSource::ServerGet))
620 }
621 Err(get_err) => {
622 let get_field_msg = get_field_result
623 .unwrap_or_else(|| "GET_FIELD: disabled".to_string());
624 attempts.push(format!(
625 "{} => __pvlist: {}; {}; RPC(server): {}; GET(server): {}",
626 addr, primary_err, get_field_msg, rpc_err, get_err
627 ));
628 }
629 }
630 }
631 }
632 }
633 }
634 }
635
636 Err(PvGetError::Protocol(format!(
637 "failed to list PVs from {}: {}",
638 server_addr,
639 attempts.join(" | ")
640 )))
641}
642
643#[cfg(test)]
646mod tests {
647 use super::*;
648
649 #[test]
650 fn parse_pvlist_value_extracts_ntscalararray_strings() {
651 let value = DecodedValue::Structure(vec![
652 (
653 "value".to_string(),
654 DecodedValue::Array(vec![
655 DecodedValue::String("SIM:AI".to_string()),
656 DecodedValue::String("SIM:AO".to_string()),
657 ]),
658 ),
659 ("alarm".to_string(), DecodedValue::Structure(vec![])),
660 ]);
661
662 let parsed = parse_pvlist_value(&value).expect("parsed");
663 assert_eq!(parsed, vec!["SIM:AI".to_string(), "SIM:AO".to_string()]);
664 }
665
666 #[test]
667 fn normalize_pv_names_sorts_and_deduplicates() {
668 let names = vec!["B".into(), "A".into(), "B".into(), " ".into()];
669 let result = normalize_pv_names(names);
670 assert_eq!(result, vec!["A".to_string(), "B".to_string()]);
671 }
672
673 #[test]
674 fn candidate_server_addrs_adds_default_tcp_port_fallback() {
675 let mut opts = PvOptions::new(String::new());
676 opts.tcp_port = 5075;
677 let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
678 let addrs = candidate_server_addrs(&opts, addr);
679 assert_eq!(addrs.len(), 2);
680 assert_eq!(addrs[0], addr);
681 assert_eq!(addrs[1], "10.0.0.2:5075".parse::<SocketAddr>().unwrap());
682 }
683
684 #[test]
685 fn candidate_server_addrs_no_dup_when_same_port() {
686 let mut opts = PvOptions::new(String::new());
687 opts.tcp_port = 6000;
688 let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
689 let addrs = candidate_server_addrs(&opts, addr);
690 assert_eq!(addrs.len(), 1);
691 }
692
693 #[test]
694 fn collect_strings_from_decoded_extracts_nested_strings() {
695 let value = DecodedValue::Structure(vec![
696 ("a".to_string(), DecodedValue::String("ONE".to_string())),
697 (
698 "b".to_string(),
699 DecodedValue::Array(vec![DecodedValue::String("TWO".to_string())]),
700 ),
701 ]);
702 let mut out = Vec::new();
703 collect_strings_from_decoded(&value, &mut out);
704 assert_eq!(out, vec!["ONE".to_string(), "TWO".to_string()]);
705 }
706
707 #[test]
708 fn extract_ascii_candidates_finds_pv_like_tokens() {
709 let raw = b"\x00SIM:AI\x00junk\x00IOC-01:PV1\x00";
710 let mut out = Vec::new();
711 extract_ascii_candidates(raw, &mut out);
712 assert!(out.iter().any(|s| s == "SIM:AI"));
713 assert!(out.iter().any(|s| s == "IOC-01:PV1"));
714 }
715
716 #[test]
717 fn looks_like_pv_name_filters_metadata() {
718 assert!(looks_like_pv_name("SIM:AI"));
719 assert!(looks_like_pv_name("IOC-01:PV1"));
720 assert!(!looks_like_pv_name("value"));
721 assert!(!looks_like_pv_name("alarm"));
722 assert!(!looks_like_pv_name("epics:nt/NTScalar:1.0"));
723 assert!(!looks_like_pv_name(""));
724 assert!(!looks_like_pv_name("has space"));
725 }
726
727 #[test]
728 fn encode_server_rpc_channels_request_uses_nturi_channels() {
729 let payload = encode_server_rpc_channels_request(false);
730 assert_eq!(payload.first(), Some(&0x80));
731
732 let decoder = PvdDecoder::new(false);
733 let (desc, consumed) = decoder
734 .parse_introspection_with_len(&payload)
735 .expect("introspection");
736 assert_eq!(desc.struct_id.as_deref(), Some("epics:nt/NTURI:1.0"));
737
738 let (decoded, _) = decoder
739 .decode_structure(&payload[consumed..], &desc)
740 .expect("decode payload");
741 let DecodedValue::Structure(fields) = decoded else {
742 panic!("expected structure");
743 };
744
745 let mut scheme = None;
746 let mut path = None;
747 let mut op = None;
748 for (name, value) in fields {
749 match (name.as_str(), value) {
750 ("scheme", DecodedValue::String(v)) => scheme = Some(v),
751 ("path", DecodedValue::String(v)) => path = Some(v),
752 ("query", DecodedValue::Structure(query_fields)) => {
753 for (qname, qvalue) in query_fields {
754 if qname == "op" {
755 if let DecodedValue::String(v) = qvalue {
756 op = Some(v);
757 }
758 }
759 }
760 }
761 _ => {}
762 }
763 }
764 assert_eq!(scheme.as_deref(), Some("pva"));
765 assert_eq!(path.as_deref(), Some("server"));
766 assert_eq!(op.as_deref(), Some("channels"));
767 }
768}