1use std::net::SocketAddr;
12
13use tokio::io::AsyncWriteExt;
14use tokio::net::TcpStream;
15use tokio::time::timeout;
16
17use spvirit_codec::epics_decode::{PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvd_decode::{
19 DecodedValue, FieldDesc, FieldType, PvdDecoder, StructureDesc, extract_nt_scalar_value,
20};
21use spvirit_codec::spvd_encode::{encode_string_pvd, encode_structure_desc};
22use spvirit_codec::spvirit_encode::encode_rpc_request;
23
24use crate::client::{
25 ChannelConn, build_client_validation, encode_get_field_request, encode_get_request,
26 establish_channel, pvget,
27};
28use crate::transport::{read_packet, read_until};
29use crate::types::{PvGetError, PvOptions};
30
31#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum PvListSource {
34 PvList,
35 GetField,
36 ServerRpc,
37 ServerGet,
38}
39
40const PV_REQUEST_EMPTY: [u8; 6] = [0xfd, 0x02, 0x00, 0x80, 0x00, 0x00];
43
44pub fn normalize_pv_names(mut names: Vec<String>) -> Vec<String> {
46 names.retain(|name| !name.trim().is_empty());
47 names.sort();
48 names.dedup();
49 names
50}
51
52pub fn parse_pvlist_value(value: &DecodedValue) -> Option<Vec<String>> {
54 let root = extract_nt_scalar_value(value).unwrap_or(value);
55 let DecodedValue::Array(items) = root else {
56 return None;
57 };
58 let mut out = Vec::with_capacity(items.len());
59 for item in items {
60 if let DecodedValue::String(name) = item {
61 out.push(name.clone());
62 } else {
63 return None;
64 }
65 }
66 Some(out)
67}
68
69fn candidate_server_addrs(opts: &PvOptions, server_addr: SocketAddr) -> Vec<SocketAddr> {
70 let mut out = vec![server_addr];
71 let default_addr = SocketAddr::new(server_addr.ip(), opts.tcp_port);
72 if default_addr != server_addr {
73 out.push(default_addr);
74 }
75 out
76}
77
78fn is_get_field_fallback_enabled() -> bool {
79 match std::env::var("EPICS_PVA_ENABLE_GET_FIELD_FALLBACK") {
80 Ok(v) => {
81 let v = v.trim().to_ascii_uppercase();
82 v == "YES" || v == "Y" || v == "1" || v == "TRUE"
83 }
84 Err(_) => false,
85 }
86}
87
88fn collect_strings_from_decoded(value: &DecodedValue, out: &mut Vec<String>) {
89 match value {
90 DecodedValue::String(s) => out.push(s.clone()),
91 DecodedValue::Array(items) => {
92 for item in items {
93 collect_strings_from_decoded(item, out);
94 }
95 }
96 DecodedValue::Structure(fields) => {
97 for (_, item) in fields {
98 collect_strings_from_decoded(item, out);
99 }
100 }
101 _ => {}
102 }
103}
104
105fn looks_like_pv_name(candidate: &str) -> bool {
106 if candidate.is_empty() || candidate.len() > 128 {
107 return false;
108 }
109 if candidate.chars().any(|c| c.is_whitespace()) {
110 return false;
111 }
112 let lower = candidate.to_ascii_lowercase();
113 let deny = [
114 "value",
115 "alarm",
116 "timestamp",
117 "display",
118 "control",
119 "severity",
120 "message",
121 "seconds",
122 "nanoseconds",
123 "units",
124 ];
125 if deny.iter().any(|d| lower == *d) {
126 return false;
127 }
128 if lower.starts_with("epics:") {
129 return false;
130 }
131 true
132}
133
134fn extract_ascii_candidates(raw: &[u8], out: &mut Vec<String>) {
135 let mut i = 0usize;
136 while i < raw.len() {
137 if raw[i].is_ascii_alphanumeric() {
138 let start = i;
139 i += 1;
140 while i < raw.len() {
141 let b = raw[i];
142 if b.is_ascii_alphanumeric()
143 || b == b':'
144 || b == b'.'
145 || b == b'_'
146 || b == b'-'
147 || b == b'/'
148 {
149 i += 1;
150 } else {
151 break;
152 }
153 }
154 let len = i - start;
155 if (3..=128).contains(&len) {
156 if let Ok(s) = std::str::from_utf8(&raw[start..start + len]) {
157 out.push(s.to_string());
158 }
159 }
160 } else {
161 i += 1;
162 }
163 }
164}
165
166fn encode_server_rpc_channels_request(is_be: bool) -> Vec<u8> {
167 let desc = StructureDesc {
168 struct_id: Some("epics:nt/NTURI:1.0".to_string()),
169 fields: vec![
170 FieldDesc {
171 name: "scheme".to_string(),
172 field_type: FieldType::String,
173 },
174 FieldDesc {
175 name: "path".to_string(),
176 field_type: FieldType::String,
177 },
178 FieldDesc {
179 name: "query".to_string(),
180 field_type: FieldType::Structure(StructureDesc {
181 struct_id: None,
182 fields: vec![FieldDesc {
183 name: "op".to_string(),
184 field_type: FieldType::String,
185 }],
186 }),
187 },
188 ],
189 };
190
191 let mut out = Vec::new();
192 out.push(0x80);
193 out.extend_from_slice(&encode_structure_desc(&desc, is_be));
194 out.extend_from_slice(&encode_string_pvd("pva", is_be));
195 out.extend_from_slice(&encode_string_pvd("server", is_be));
196 out.extend_from_slice(&encode_string_pvd("channels", is_be));
197 out
198}
199
200async fn list_pvs_via_pvlist(
204 opts: &PvOptions,
205 server_addr: SocketAddr,
206) -> Result<Vec<String>, PvGetError> {
207 let mut get_opts = opts.clone();
208 get_opts.pv_name = "__pvlist".to_string();
209 get_opts.server_addr = Some(server_addr);
210 let result = pvget(&get_opts).await?;
211 let names = parse_pvlist_value(&result.value)
212 .ok_or_else(|| PvGetError::Decode("failed to decode __pvlist value".to_string()))?;
213 Ok(normalize_pv_names(names))
214}
215
216pub async fn list_pvs_via_get_field(
218 opts: &PvOptions,
219 server_addr: SocketAddr,
220 field_pattern: Option<&str>,
221) -> Result<Vec<String>, PvGetError> {
222 let mut stream = timeout(opts.timeout, TcpStream::connect(server_addr))
223 .await
224 .map_err(|_| PvGetError::Timeout("connect"))??;
225
226 let mut version = 2u8;
227 let mut is_be = false;
228
229 for _ in 0..2 {
230 if let Ok(bytes) = read_packet(&mut stream, opts.timeout).await {
231 let mut pkt = PvaPacket::new(&bytes);
232 if let Some(cmd) = pkt.decode_payload() {
233 match cmd {
234 PvaPacketCommand::Control(payload) => {
235 if payload.command == 2 {
236 is_be = pkt.header.flags.is_msb;
237 }
238 }
239 PvaPacketCommand::ConnectionValidation(_) => {
240 version = pkt.header.version;
241 is_be = pkt.header.flags.is_msb;
242 }
243 _ => {}
244 }
245 }
246 }
247 }
248
249 let validation = build_client_validation(opts, version, is_be);
250 stream.write_all(&validation).await?;
251
252 let _ = read_until(&mut stream, opts.timeout, |cmd| {
253 matches!(cmd, PvaPacketCommand::ConnectionValidated(_))
254 })
255 .await?;
256
257 let get_field = encode_get_field_request(0, 0, field_pattern, version, is_be);
258 stream.write_all(&get_field).await?;
259
260 let field_resp = read_until(
261 &mut stream,
262 opts.timeout,
263 |cmd| matches!(cmd, PvaPacketCommand::GetField(payload) if payload.is_server),
264 )
265 .await?;
266 let mut pkt = PvaPacket::new(&field_resp);
267 let cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
268 "get_field listing decode failed".to_string(),
269 ))?;
270 let PvaPacketCommand::GetField(payload) = cmd else {
271 return Err(PvGetError::Protocol(
272 "unexpected GET_FIELD response".to_string(),
273 ));
274 };
275
276 if payload.status.as_ref().is_some_and(|s| s.is_error()) {
277 let detail = payload
278 .status
279 .as_ref()
280 .map(ToString::to_string)
281 .unwrap_or_default();
282 return Err(PvGetError::Protocol(format!(
283 "get_field listing error: {}",
284 detail
285 )));
286 }
287
288 let desc = payload
289 .introspection
290 .ok_or_else(|| PvGetError::Decode("missing GET_FIELD introspection".to_string()))?;
291
292 let names = desc.fields.into_iter().map(|f| f.name).collect::<Vec<_>>();
293 Ok(normalize_pv_names(names))
294}
295
296async fn list_pvs_via_server_rpc_channel(
298 opts: &PvOptions,
299 server_addr: SocketAddr,
300 rpc_channel: &str,
301) -> Result<Vec<String>, PvGetError> {
302 let mut rpc_opts = opts.clone();
303 rpc_opts.pv_name = rpc_channel.to_string();
304 let ChannelConn {
305 mut stream,
306 sid,
307 version,
308 is_be,
309 ..
310 } = establish_channel(server_addr, &rpc_opts).await?;
311
312 let ioid = 1u32;
313 let rpc_init = encode_rpc_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
314 stream.write_all(&rpc_init).await?;
315
316 let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
317 PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && (op.subcmd & 0x08) != 0,
318 _ => false,
319 })
320 .await?;
321 let mut pkt = PvaPacket::new(&init_resp);
322 let init_cmd = pkt
323 .decode_payload()
324 .ok_or(PvGetError::Protocol("rpc init decode failed".to_string()))?;
325 if let PvaPacketCommand::Op(op) = init_cmd {
326 if op.status.as_ref().is_some_and(|s| s.is_error()) {
327 let detail = op
328 .status
329 .as_ref()
330 .map(ToString::to_string)
331 .unwrap_or_default();
332 return Err(PvGetError::Protocol(format!("rpc init failed: {}", detail)));
333 }
334 }
335
336 let rpc_payload = encode_server_rpc_channels_request(is_be);
337 let rpc_req = encode_rpc_request(sid, ioid, 0x00, &rpc_payload, version, is_be);
338 stream.write_all(&rpc_req).await?;
339
340 let rpc_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
341 PvaPacketCommand::Op(op) => op.command == 20 && op.ioid == ioid && op.subcmd == 0x00,
342 _ => false,
343 })
344 .await?;
345 let mut pkt = PvaPacket::new(&rpc_resp);
346 let rpc_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
347 "rpc response decode failed".to_string(),
348 ))?;
349 let PvaPacketCommand::Op(op) = rpc_cmd else {
350 return Err(PvGetError::Protocol("unexpected RPC response".to_string()));
351 };
352 if op.status.as_ref().is_some_and(|s| s.is_error()) {
353 let detail = op
354 .status
355 .as_ref()
356 .map(ToString::to_string)
357 .unwrap_or_default();
358 return Err(PvGetError::Protocol(format!(
359 "rpc execute failed: {}",
360 detail
361 )));
362 }
363
364 if op.body.is_empty() {
365 return Err(PvGetError::Decode("empty RPC response".to_string()));
366 }
367
368 let decoder = PvdDecoder::new(is_be);
369 let (desc, consumed) = decoder
370 .parse_introspection_with_len(&op.body)
371 .ok_or_else(|| PvGetError::Decode("RPC missing introspection".to_string()))?;
372 let value_raw = op
373 .body
374 .get(consumed..)
375 .ok_or_else(|| PvGetError::Decode("RPC malformed payload".to_string()))?;
376 let (decoded, _) = decoder
377 .decode_structure(value_raw, &desc)
378 .ok_or_else(|| PvGetError::Decode("RPC decode failed".to_string()))?;
379
380 let mut strings = Vec::new();
381 collect_strings_from_decoded(&decoded, &mut strings);
382 if strings.is_empty() {
383 return Err(PvGetError::Decode(
384 "RPC list returned no strings".to_string(),
385 ));
386 }
387 Ok(normalize_pv_names(strings))
388}
389
390pub async fn list_pvs_via_server_rpc(
392 opts: &PvOptions,
393 server_addr: SocketAddr,
394) -> Result<Vec<String>, PvGetError> {
395 let mut errs = Vec::new();
396 for channel in ["server", "__server"] {
397 match list_pvs_via_server_rpc_channel(opts, server_addr, channel).await {
398 Ok(names) => return Ok(names),
399 Err(err) => errs.push(format!("{}: {}", channel, err)),
400 }
401 }
402 Err(PvGetError::Protocol(format!(
403 "server RPC unavailable: {}",
404 errs.join(" | ")
405 )))
406}
407
408pub async fn list_pvs_via_server_get(
410 opts: &PvOptions,
411 server_addr: SocketAddr,
412) -> Result<Vec<String>, PvGetError> {
413 async fn get_channel(
414 opts: &PvOptions,
415 server_addr: SocketAddr,
416 channel: &str,
417 ) -> Result<Vec<String>, PvGetError> {
418 let mut get_opts = opts.clone();
419 get_opts.pv_name = channel.to_string();
420 let ChannelConn {
421 mut stream,
422 sid,
423 version,
424 is_be,
425 ..
426 } = establish_channel(server_addr, &get_opts).await?;
427
428 let ioid = 1u32;
429 let init_req = encode_get_request(sid, ioid, 0x08, &PV_REQUEST_EMPTY, version, is_be);
430 stream.write_all(&init_req).await?;
431 let init_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
432 PvaPacketCommand::Op(op) => {
433 op.command == 10 && op.ioid == ioid && (op.subcmd & 0x08) != 0
434 }
435 _ => false,
436 })
437 .await?;
438 let mut pkt = PvaPacket::new(&init_resp);
439 let init_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
440 "server get init decode failed".to_string(),
441 ))?;
442 let init_desc = match init_cmd {
443 PvaPacketCommand::Op(op) => {
444 if op.status.as_ref().is_some_and(|s| s.is_error()) {
445 let detail = op
446 .status
447 .as_ref()
448 .map(ToString::to_string)
449 .unwrap_or_default();
450 return Err(PvGetError::Protocol(format!(
451 "server GET init failed: {}",
452 detail
453 )));
454 }
455 op.introspection
456 }
457 _ => None,
458 };
459
460 let data_req = encode_get_request(sid, ioid, 0x00, &[], version, is_be);
461 stream.write_all(&data_req).await?;
462 let data_resp = read_until(&mut stream, opts.timeout, |cmd| match cmd {
463 PvaPacketCommand::Op(op) => op.command == 10 && op.ioid == ioid && op.subcmd == 0x00,
464 _ => false,
465 })
466 .await?;
467 let mut pkt = PvaPacket::new(&data_resp);
468 let data_cmd = pkt.decode_payload().ok_or(PvGetError::Protocol(
469 "server get data decode failed".to_string(),
470 ))?;
471
472 let PvaPacketCommand::Op(mut op) = data_cmd else {
473 return Err(PvGetError::Protocol(
474 "unexpected GET data response".to_string(),
475 ));
476 };
477 if op.status.as_ref().is_some_and(|s| s.is_error()) {
478 let detail = op
479 .status
480 .as_ref()
481 .map(ToString::to_string)
482 .unwrap_or_default();
483 return Err(PvGetError::Protocol(format!(
484 "server GET data failed: {}",
485 detail
486 )));
487 }
488
489 let mut names = Vec::new();
490 names.extend(op.pv_names.clone());
491 extract_ascii_candidates(&op.body, &mut names);
492 if let Some(desc) = &init_desc {
493 for field in &desc.fields {
494 names.push(field.name.clone());
495 }
496 op.decode_with_field_desc(desc, is_be);
497 if let Some(decoded) = &op.decoded_value {
498 collect_strings_from_decoded(decoded, &mut names);
499 }
500 }
501
502 let mut names = normalize_pv_names(names);
503 names.retain(|n| looks_like_pv_name(n));
504 if names.is_empty() {
505 return Err(PvGetError::Decode(
506 "server GET returned no PV-like names".to_string(),
507 ));
508 }
509 Ok(names)
510 }
511
512 let mut errs = Vec::new();
513 for channel in ["server", "__server"] {
514 match get_channel(opts, server_addr, channel).await {
515 Ok(names) => return Ok(names),
516 Err(err) => errs.push(format!("{}: {}", channel, err)),
517 }
518 }
519 Err(PvGetError::Protocol(format!(
520 "server GET unavailable: {}",
521 errs.join(" | ")
522 )))
523}
524
525pub async fn pvlist(opts: &PvOptions, server_addr: SocketAddr) -> Result<Vec<String>, PvGetError> {
529 list_pvs_via_pvlist(opts, server_addr).await
530}
531
532pub async fn pvlist_with_fallback(
536 opts: &PvOptions,
537 server_addr: SocketAddr,
538) -> Result<(Vec<String>, PvListSource), PvGetError> {
539 pvlist_with_fallback_progress(opts, server_addr, |_| {}).await
540}
541
542pub async fn pvlist_with_fallback_progress<F>(
544 opts: &PvOptions,
545 server_addr: SocketAddr,
546 mut on_progress: F,
547) -> Result<(Vec<String>, PvListSource), PvGetError>
548where
549 F: FnMut(&str),
550{
551 let addrs = candidate_server_addrs(opts, server_addr);
552 let mut attempts = Vec::new();
553 let get_field_fallback = is_get_field_fallback_enabled();
554
555 if addrs.len() > 1 {
556 on_progress(&format!(
557 "Trying {} candidate server endpoints...",
558 addrs.len()
559 ));
560 }
561 if !get_field_fallback {
562 on_progress(
563 "GET_FIELD fallback is disabled by default (set EPICS_PVA_ENABLE_GET_FIELD_FALLBACK=YES to enable)",
564 );
565 }
566
567 for addr in addrs {
568 on_progress(&format!("Trying __pvlist on {}", addr));
569 let primary = list_pvs_via_pvlist(opts, addr).await;
570 match primary {
571 Ok(names) => return Ok((normalize_pv_names(names), PvListSource::PvList)),
572 Err(primary_err) => {
573 let get_field_result = if get_field_fallback {
574 on_progress(&format!(
575 "__pvlist unavailable on {}; trying GET_FIELD(*)",
576 addr
577 ));
578 let fallback_star = list_pvs_via_get_field(opts, addr, Some("*")).await;
579 match fallback_star {
580 Ok(names) => {
581 return Ok((normalize_pv_names(names), PvListSource::GetField));
582 }
583 Err(star_err) => {
584 on_progress(&format!(
585 "GET_FIELD(*) unavailable on {}; trying GET_FIELD(<empty>)",
586 addr
587 ));
588 let fallback_empty = list_pvs_via_get_field(opts, addr, None).await;
589 match fallback_empty {
590 Ok(names) => {
591 return Ok((normalize_pv_names(names), PvListSource::GetField));
592 }
593 Err(empty_err) => Some(format!(
594 "GET_FIELD(*): {}; GET_FIELD(<empty>): {}",
595 star_err, empty_err
596 )),
597 }
598 }
599 }
600 } else {
601 None
602 };
603
604 on_progress(&format!(
605 "__pvlist unavailable on {}; trying RPC(server)",
606 addr
607 ));
608 match list_pvs_via_server_rpc(opts, addr).await {
609 Ok(names) => return Ok((normalize_pv_names(names), PvListSource::ServerRpc)),
610 Err(rpc_err) => {
611 on_progress(&format!(
612 "RPC(server) unavailable on {}; trying GET(server)",
613 addr
614 ));
615 match list_pvs_via_server_get(opts, addr).await {
616 Ok(names) => {
617 return Ok((normalize_pv_names(names), PvListSource::ServerGet));
618 }
619 Err(get_err) => {
620 let get_field_msg = get_field_result
621 .unwrap_or_else(|| "GET_FIELD: disabled".to_string());
622 attempts.push(format!(
623 "{} => __pvlist: {}; {}; RPC(server): {}; GET(server): {}",
624 addr, primary_err, get_field_msg, rpc_err, get_err
625 ));
626 }
627 }
628 }
629 }
630 }
631 }
632 }
633
634 Err(PvGetError::Protocol(format!(
635 "failed to list PVs from {}: {}",
636 server_addr,
637 attempts.join(" | ")
638 )))
639}
640
641#[cfg(test)]
644mod tests {
645 use super::*;
646
647 #[test]
648 fn parse_pvlist_value_extracts_ntscalararray_strings() {
649 let value = DecodedValue::Structure(vec![
650 (
651 "value".to_string(),
652 DecodedValue::Array(vec![
653 DecodedValue::String("SIM:AI".to_string()),
654 DecodedValue::String("SIM:AO".to_string()),
655 ]),
656 ),
657 ("alarm".to_string(), DecodedValue::Structure(vec![])),
658 ]);
659
660 let parsed = parse_pvlist_value(&value).expect("parsed");
661 assert_eq!(parsed, vec!["SIM:AI".to_string(), "SIM:AO".to_string()]);
662 }
663
664 #[test]
665 fn normalize_pv_names_sorts_and_deduplicates() {
666 let names = vec!["B".into(), "A".into(), "B".into(), " ".into()];
667 let result = normalize_pv_names(names);
668 assert_eq!(result, vec!["A".to_string(), "B".to_string()]);
669 }
670
671 #[test]
672 fn candidate_server_addrs_adds_default_tcp_port_fallback() {
673 let mut opts = PvOptions::new(String::new());
674 opts.tcp_port = 5075;
675 let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
676 let addrs = candidate_server_addrs(&opts, addr);
677 assert_eq!(addrs.len(), 2);
678 assert_eq!(addrs[0], addr);
679 assert_eq!(addrs[1], "10.0.0.2:5075".parse::<SocketAddr>().unwrap());
680 }
681
682 #[test]
683 fn candidate_server_addrs_no_dup_when_same_port() {
684 let mut opts = PvOptions::new(String::new());
685 opts.tcp_port = 6000;
686 let addr: SocketAddr = "10.0.0.2:6000".parse().unwrap();
687 let addrs = candidate_server_addrs(&opts, addr);
688 assert_eq!(addrs.len(), 1);
689 }
690
691 #[test]
692 fn collect_strings_from_decoded_extracts_nested_strings() {
693 let value = DecodedValue::Structure(vec![
694 ("a".to_string(), DecodedValue::String("ONE".to_string())),
695 (
696 "b".to_string(),
697 DecodedValue::Array(vec![DecodedValue::String("TWO".to_string())]),
698 ),
699 ]);
700 let mut out = Vec::new();
701 collect_strings_from_decoded(&value, &mut out);
702 assert_eq!(out, vec!["ONE".to_string(), "TWO".to_string()]);
703 }
704
705 #[test]
706 fn extract_ascii_candidates_finds_pv_like_tokens() {
707 let raw = b"\x00SIM:AI\x00junk\x00IOC-01:PV1\x00";
708 let mut out = Vec::new();
709 extract_ascii_candidates(raw, &mut out);
710 assert!(out.iter().any(|s| s == "SIM:AI"));
711 assert!(out.iter().any(|s| s == "IOC-01:PV1"));
712 }
713
714 #[test]
715 fn looks_like_pv_name_filters_metadata() {
716 assert!(looks_like_pv_name("SIM:AI"));
717 assert!(looks_like_pv_name("IOC-01:PV1"));
718 assert!(!looks_like_pv_name("value"));
719 assert!(!looks_like_pv_name("alarm"));
720 assert!(!looks_like_pv_name("epics:nt/NTScalar:1.0"));
721 assert!(!looks_like_pv_name(""));
722 assert!(!looks_like_pv_name("has space"));
723 }
724
725 #[test]
726 fn encode_server_rpc_channels_request_uses_nturi_channels() {
727 let payload = encode_server_rpc_channels_request(false);
728 assert_eq!(payload.first(), Some(&0x80));
729
730 let decoder = PvdDecoder::new(false);
731 let (desc, consumed) = decoder
732 .parse_introspection_with_len(&payload)
733 .expect("introspection");
734 assert_eq!(desc.struct_id.as_deref(), Some("epics:nt/NTURI:1.0"));
735
736 let (decoded, _) = decoder
737 .decode_structure(&payload[consumed..], &desc)
738 .expect("decode payload");
739 let DecodedValue::Structure(fields) = decoded else {
740 panic!("expected structure");
741 };
742
743 let mut scheme = None;
744 let mut path = None;
745 let mut op = None;
746 for (name, value) in fields {
747 match (name.as_str(), value) {
748 ("scheme", DecodedValue::String(v)) => scheme = Some(v),
749 ("path", DecodedValue::String(v)) => path = Some(v),
750 ("query", DecodedValue::Structure(query_fields)) => {
751 for (qname, qvalue) in query_fields {
752 if qname == "op" {
753 if let DecodedValue::String(v) = qvalue {
754 op = Some(v);
755 }
756 }
757 }
758 }
759 _ => {}
760 }
761 }
762 assert_eq!(scheme.as_deref(), Some("pva"));
763 assert_eq!(path.as_deref(), Some("server"));
764 assert_eq!(op.as_deref(), Some("channels"));
765 }
766}