1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
9use std::time::{Duration, Instant, SystemTime};
10
11use regex::Regex;
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use spvirit_codec::epics_decode::{PvaHeader, PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvd_decode::{StructureDesc, extract_subfield_desc};
19use spvirit_codec::spvd_encode::{
20 decode_pv_request_fields, filter_structure_desc, nt_payload_desc,
21};
22use spvirit_codec::spvirit_encode::{
23 encode_connection_validation, encode_control_message, encode_create_channel_error,
24 encode_create_channel_response, encode_get_field_error, encode_get_field_response,
25 encode_header, encode_message_error, encode_monitor_data_response_payload,
26 encode_op_data_response_filtered, encode_op_error, encode_op_get_data_response_payload,
27 encode_op_init_response_desc, encode_op_put_get_data_error_response,
28 encode_op_put_get_data_response_payload, encode_op_put_get_init_error_response,
29 encode_op_put_get_init_response, encode_op_put_getput_response_payload, encode_op_put_response,
30 encode_op_put_status_response, encode_op_rpc_data_response_payload,
31 encode_op_status_error_response, encode_op_status_response, encode_search_response,
32 ip_from_bytes, ip_to_bytes,
33};
34
35use spvirit_types::{NtPayload, NtScalar, NtScalarArray, ScalarArrayValue, ScalarValue};
36
37use crate::decode::{assemble_segmented_message, decode_put_body};
38use crate::monitor::MonitorRegistry;
39use crate::pvstore::PvStore;
40use crate::state::{ConnState, MonitorState, MonitorSub};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PvListMode {
49 Off,
51 Discover,
53 List,
55}
56
57impl PvListMode {
58 pub fn parse(raw: &str) -> Result<Self, String> {
59 match raw.trim().to_ascii_lowercase().as_str() {
60 "off" => Ok(Self::Off),
61 "discover" => Ok(Self::Discover),
62 "list" => Ok(Self::List),
63 other => Err(format!(
64 "Invalid pvlist-mode '{}'; expected off|discover|list",
65 other
66 )),
67 }
68 }
69}
70
71pub struct ServerState<S: PvStore> {
77 pub store: Arc<S>,
78 pub registry: Arc<MonitorRegistry>,
79 pub sid_counter: AtomicU32,
80 pub beacon_change: Arc<AtomicU16>,
81 pub compute_alarms: bool,
82 pub pvlist_mode: PvListMode,
83 pub pvlist_max: usize,
84 pub pvlist_allow_pattern: Option<Regex>,
85}
86
87impl<S: PvStore> ServerState<S> {
88 pub fn new(
89 store: Arc<S>,
90 registry: Arc<MonitorRegistry>,
91 compute_alarms: bool,
92 pvlist_mode: PvListMode,
93 pvlist_max: usize,
94 pvlist_allow_pattern: Option<Regex>,
95 ) -> Self {
96 Self {
97 store,
98 registry,
99 sid_counter: AtomicU32::new(1),
100 beacon_change: Arc::new(AtomicU16::new(0)),
101 compute_alarms,
102 pvlist_mode,
103 pvlist_max,
104 pvlist_allow_pattern,
105 }
106 }
107}
108
109pub fn is_pvlist_virtual_pv(pv_name: &str) -> bool {
114 pv_name == "__pvlist"
115}
116
117pub fn is_server_rpc_pv(pv_name: &str) -> bool {
118 pv_name == "server"
119}
120
121pub fn is_virtual_event_pv(pv_name: &str) -> bool {
122 pv_name.starts_with("__event:")
123}
124
125pub fn virtual_event_nt(pv_name: &str) -> NtPayload {
126 NtPayload::Scalar(
127 NtScalar::from_value(ScalarValue::Bool(false))
128 .with_description(format!("Virtual event trigger for {}", pv_name)),
129 )
130}
131
132pub fn virtual_pvlist_nt(entries: Vec<String>) -> NtPayload {
133 NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(entries)))
134}
135
136pub fn is_pattern_query(raw: &str) -> bool {
141 raw.contains('*') || raw.contains('?')
142}
143
144pub fn wildcard_match(pattern: &str, text: &str) -> bool {
145 let p = pattern.as_bytes();
146 let t = text.as_bytes();
147 let mut i = 0usize;
148 let mut j = 0usize;
149 let mut star: Option<usize> = None;
150 let mut match_j = 0usize;
151
152 while j < t.len() {
153 if i < p.len() && (p[i] == b'?' || p[i] == t[j]) {
154 i += 1;
155 j += 1;
156 } else if i < p.len() && p[i] == b'*' {
157 star = Some(i);
158 i += 1;
159 match_j = j;
160 } else if let Some(star_idx) = star {
161 i = star_idx + 1;
162 match_j += 1;
163 j = match_j;
164 } else {
165 return false;
166 }
167 }
168
169 while i < p.len() && p[i] == b'*' {
170 i += 1;
171 }
172 i == p.len()
173}
174
175pub fn collect_visible_pv_names(
176 all_names: &[String],
177 mode: PvListMode,
178 allow_pattern: Option<&Regex>,
179 max_items: usize,
180) -> Vec<String> {
181 let mut names: Vec<String> = all_names
182 .iter()
183 .filter(|name| {
184 allow_pattern
185 .as_ref()
186 .map(|re| re.is_match(name))
187 .unwrap_or(true)
188 })
189 .cloned()
190 .collect();
191 names.sort();
192 if names.len() > max_items {
193 names.truncate(max_items);
194 }
195 if mode == PvListMode::List && names.len() < max_items {
196 names.push("__pvlist".to_string());
197 }
198 names
199}
200
201fn build_pvlist_structure(names: &[String]) -> StructureDesc {
202 use spvirit_codec::spvd_decode::{FieldDesc, FieldType, TypeCode};
203 StructureDesc {
204 struct_id: Some("epics:pva/pvlist:1.0".to_string()),
205 fields: names
206 .iter()
207 .map(|name| FieldDesc {
208 name: name.clone(),
209 field_type: FieldType::Scalar(TypeCode::Boolean),
210 })
211 .collect(),
212 }
213}
214
215fn requested_pvlist_pattern(field_name: Option<&str>) -> Option<&str> {
216 let raw = field_name.map(str::trim).unwrap_or("");
217 if raw.is_empty() || raw == "*" || raw == "__pvlist" || raw.eq_ignore_ascii_case("pvlist") {
218 return Some("*");
219 }
220 if is_pattern_query(raw) {
221 return Some(raw);
222 }
223 None
224}
225
226pub fn search_reply_target(addr: &[u8; 16], port: u16, peer: SocketAddr) -> SocketAddr {
231 let target_port = if port != 0 { port } else { peer.port() };
232 let target_ip = ip_from_bytes(addr)
233 .filter(|ip| !ip.is_unspecified())
234 .unwrap_or_else(|| peer.ip());
235 SocketAddr::new(target_ip, target_port)
236}
237
238pub fn infer_udp_response_ip(peer: SocketAddr) -> Option<IpAddr> {
239 let bind_addr = if peer.is_ipv4() {
240 "0.0.0.0:0"
241 } else {
242 "[::]:0"
243 };
244 let sock = std::net::UdpSocket::bind(bind_addr).ok()?;
245 sock.connect(peer).ok()?;
246 let local = sock.local_addr().ok()?;
247 if local.ip().is_unspecified() {
248 None
249 } else {
250 Some(local.ip())
251 }
252}
253
254pub fn rand_guid() -> [u8; 12] {
255 let now = SystemTime::now()
256 .duration_since(SystemTime::UNIX_EPOCH)
257 .unwrap_or_default();
258 let mut guid = [0u8; 12];
259 let bytes = now.as_nanos().to_le_bytes();
260 guid.copy_from_slice(&bytes[0..12]);
261 guid
262}
263
264pub fn validate_encoded_packet(conn_id: u64, label: &str, bytes: &[u8]) {
269 let mut pkt = PvaPacket::new(bytes);
270 let decoded = pkt.decode_payload();
271 match decoded {
272 Some(PvaPacketCommand::ConnectionValidation(payload)) => {
273 debug!(
274 "Conn {}: {} decoded as cmd=1 buffer_size={} qos={} authz={:?}",
275 conn_id, label, payload.buffer_size, payload.qos, payload.authz
276 );
277 }
278 Some(PvaPacketCommand::ConnectionValidated(_)) => {
279 debug!("Conn {}: {} decoded as cmd=9", conn_id, label);
280 }
281 Some(other) => {
282 debug!("Conn {}: {} decoded as {:?}", conn_id, label, other);
283 }
284 None => {
285 debug!("Conn {}: {} failed to decode", conn_id, label);
286 }
287 }
288}
289
290pub fn dump_hex_packet(
291 conn_id: u64,
292 dir: &str,
293 label: &str,
294 version: u8,
295 is_be: bool,
296 bytes: &[u8],
297) {
298 debug!(
299 "Conn {}: {} {} ver={} be={} len={}",
300 conn_id,
301 dir,
302 label,
303 version,
304 is_be,
305 bytes.len()
306 );
307 let mut offset = 0usize;
308 while offset < bytes.len() {
309 let end = usize::min(offset + 16, bytes.len());
310 let chunk = &bytes[offset..end];
311 let mut line = String::new();
312 for (i, b) in chunk.iter().enumerate() {
313 if i > 0 {
314 line.push(' ');
315 }
316 line.push_str(&format!("{:02x}", b));
317 }
318 debug!("Conn {}: {:04x} {}", conn_id, offset, line);
319 offset += 16;
320 }
321}
322
323async fn get_nt_snapshot<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> Option<NtPayload> {
328 if is_pvlist_virtual_pv(pv_name) {
329 if state.pvlist_mode != PvListMode::List {
330 return None;
331 }
332 let all_names = state.store.list_pvs().await;
333 let names = collect_visible_pv_names(
334 &all_names,
335 state.pvlist_mode,
336 state.pvlist_allow_pattern.as_ref(),
337 state.pvlist_max,
338 );
339 return Some(virtual_pvlist_nt(names));
340 }
341 if is_virtual_event_pv(pv_name) {
342 return Some(virtual_event_nt(pv_name));
343 }
344 state.store.get_snapshot(pv_name).await
345}
346
347async fn is_writable_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
348 if is_virtual_event_pv(pv_name) {
349 return true;
350 }
351 state.store.is_writable(pv_name).await
352}
353
354async fn has_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
355 state.store.has_pv(pv_name).await
356 || is_virtual_event_pv(pv_name)
357 || (is_pvlist_virtual_pv(pv_name) && state.pvlist_mode == PvListMode::List)
358 || (is_server_rpc_pv(pv_name) && state.pvlist_mode != PvListMode::Off)
359}
360
361async fn notify_changed_records<S: PvStore>(
366 state: &ServerState<S>,
367 changed: Vec<(String, NtPayload)>,
368) {
369 for (name, payload) in changed {
370 state.beacon_change.fetch_add(1, Ordering::SeqCst);
371 state.registry.notify_monitors(&name, &payload).await;
372 }
373}
374
375async fn handle_get_field_request<S: PvStore>(
380 state: &ServerState<S>,
381 conn_state: &ConnState,
382 conn_id: u64,
383 payload: spvirit_codec::epics_decode::PvaGetFieldPayload,
384 version: u8,
385 is_be: bool,
386) {
387 if payload.is_server {
388 let resp = encode_get_field_error(
389 payload.cid,
390 "Unexpected server GET_FIELD payload",
391 version,
392 is_be,
393 );
394 state.registry.send_msg(conn_id, resp).await;
395 return;
396 }
397
398 let request_id = payload.ioid.unwrap_or(payload.cid);
399
400 let sid = payload
401 .sid
402 .or_else(|| conn_state.cid_to_sid.get(&payload.cid).copied())
403 .or_else(|| {
404 conn_state
405 .sid_to_pv
406 .contains_key(&payload.cid)
407 .then_some(payload.cid)
408 })
409 .or_else(|| {
410 (payload.cid == 0 && conn_state.sid_to_pv.len() == 1)
411 .then(|| conn_state.sid_to_pv.keys().copied().next())
412 .flatten()
413 });
414
415 if let Some(sid) = sid {
416 if let Some(pv_name) = conn_state.sid_to_pv.get(&sid) {
417 if let Some(nt) = get_nt_snapshot(state, pv_name).await {
418 let full_desc = nt_payload_desc(&nt);
419 let sub = payload.field_name.as_deref().filter(|s| !s.is_empty());
420 let desc = if let Some(field_path) = sub {
421 match extract_subfield_desc(&full_desc, field_path) {
422 Some(sub_desc) => sub_desc,
423 None => {
424 let resp = encode_get_field_error(
425 request_id,
426 &format!("sub-field '{}' not found", field_path),
427 version,
428 is_be,
429 );
430 state.registry.send_msg(conn_id, resp).await;
431 return;
432 }
433 }
434 } else {
435 full_desc
436 };
437 let resp = encode_get_field_response(request_id, &desc, version, is_be);
438 dump_hex_packet(conn_id, "tx", "cmd=17 get_field", version, is_be, &resp);
439 state.registry.send_msg(conn_id, resp).await;
440 debug!(
441 "Conn {}: get_field cid={} sid={:?} ioid={:?} resolved_sid={} pv='{}' field={:?}",
442 conn_id,
443 payload.cid,
444 payload.sid,
445 payload.ioid,
446 sid,
447 pv_name,
448 payload.field_name
449 );
450 return;
451 }
452 let resp = encode_get_field_error(request_id, "PV not found", version, is_be);
453 state.registry.send_msg(conn_id, resp).await;
454 return;
455 }
456 }
457
458 if state.pvlist_mode != PvListMode::List {
459 let resp = encode_get_field_error(
460 request_id,
461 "GET_FIELD listing is disabled (set --pvlist-mode=list)",
462 version,
463 is_be,
464 );
465 state.registry.send_msg(conn_id, resp).await;
466 return;
467 }
468
469 let Some(pattern) = requested_pvlist_pattern(payload.field_name.as_deref()) else {
470 let resp = encode_get_field_error(
471 request_id,
472 "GET_FIELD requires a valid list pattern",
473 version,
474 is_be,
475 );
476 state.registry.send_msg(conn_id, resp).await;
477 return;
478 };
479
480 let all_names = state.store.list_pvs().await;
481 let mut names = collect_visible_pv_names(
482 &all_names,
483 state.pvlist_mode,
484 state.pvlist_allow_pattern.as_ref(),
485 state.pvlist_max,
486 );
487 if pattern != "*" {
488 names.retain(|name| wildcard_match(pattern, name));
489 }
490 if names.is_empty() {
491 let resp =
492 encode_get_field_error(request_id, "No PVs matched list request", version, is_be);
493 state.registry.send_msg(conn_id, resp).await;
494 return;
495 }
496 let desc = build_pvlist_structure(&names);
497 let resp = encode_get_field_response(request_id, &desc, version, is_be);
498 dump_hex_packet(
499 conn_id,
500 "tx",
501 "cmd=17 get_field_list",
502 version,
503 is_be,
504 &resp,
505 );
506 state.registry.send_msg(conn_id, resp).await;
507 debug!(
508 "Conn {}: get_field list pattern='{}' returned {} entries",
509 conn_id,
510 pattern,
511 names.len()
512 );
513}
514
515async fn handle_server_rpc<S: PvStore>(
520 state: &ServerState<S>,
521 conn_id: u64,
522 ioid: u32,
523 subcmd: u8,
524 version: u8,
525 is_be: bool,
526) {
527 if state.pvlist_mode != PvListMode::List {
528 let resp = encode_op_status_error_response(
529 20,
530 ioid,
531 subcmd,
532 "RPC list endpoint disabled (set --pvlist-mode=list)",
533 version,
534 is_be,
535 );
536 state.registry.send_msg(conn_id, resp).await;
537 return;
538 }
539
540 let all_names = state.store.list_pvs().await;
541 let names = collect_visible_pv_names(
542 &all_names,
543 state.pvlist_mode,
544 state.pvlist_allow_pattern.as_ref(),
545 state.pvlist_max,
546 );
547 let payload = NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(names)));
548
549 let is_init = (subcmd & 0x08) != 0;
550 if is_init {
551 let resp = encode_op_status_response(20, ioid, subcmd, version, is_be);
552 state.registry.send_msg(conn_id, resp).await;
553 return;
554 }
555
556 let resp = encode_op_rpc_data_response_payload(ioid, subcmd, &payload, version, is_be);
557 state.registry.send_msg(conn_id, resp).await;
558}
559
560async fn handle_control_message<S: PvStore>(
565 state: &ServerState<S>,
566 conn_id: u64,
567 header: &PvaHeader,
568) {
569 debug!(
570 "Conn {}: control (segmented) cmd={} data={}",
571 conn_id, header.command, header.payload_length
572 );
573 if header.command == 3 {
574 let resp = encode_control_message(
575 true,
576 header.flags.is_msb,
577 header.version,
578 4,
579 header.payload_length,
580 );
581 state.registry.send_msg(conn_id, resp).await;
582 }
583}
584
585pub async fn run_udp_search<S: PvStore>(
591 state: Arc<ServerState<S>>,
592 addr: SocketAddr,
593 tcp_port: u16,
594 guid: [u8; 12],
595 advertise_ip: Option<IpAddr>,
596) -> Result<(), Box<dyn std::error::Error>> {
597 let socket = UdpSocket::bind(addr).await?;
598 socket.set_broadcast(true)?;
599 let mut buf = vec![0u8; 4096];
600
601 loop {
602 let (len, peer) = socket.recv_from(&mut buf).await?;
603 let data = &buf[..len];
604 let header = PvaHeader::new(data);
605 if header.flags.is_control || header.command != 3 {
606 continue;
607 }
608 let mut pkt = PvaPacket::new(data);
609 let Some(cmd) = pkt.decode_payload() else {
610 continue;
611 };
612 let version = pkt.header.version;
613 let is_be = pkt.header.flags.is_msb;
614 match cmd {
615 PvaPacketCommand::Search(payload) => {
616 debug!(
617 "UDP search from {}: pv_count={} mask=0x{:02x}",
618 peer,
619 payload.pv_requests.len(),
620 payload.mask
621 );
622 let accepts_tcp = payload.protocols.is_empty()
623 || payload
624 .protocols
625 .iter()
626 .any(|p| p.eq_ignore_ascii_case("tcp"));
627 if !accepts_tcp {
628 debug!("UDP search: no compatible protocol (tcp not accepted)");
629 continue;
630 }
631 let all_names = state.store.list_pvs().await;
632 let visible_names = collect_visible_pv_names(
633 &all_names,
634 state.pvlist_mode,
635 state.pvlist_allow_pattern.as_ref(),
636 state.pvlist_max,
637 );
638 let mut cids = Vec::new();
639 for (cid, name) in &payload.pv_requests {
640 if state.store.has_pv(name).await
641 || is_virtual_event_pv(name)
642 || (is_pvlist_virtual_pv(name) && state.pvlist_mode == PvListMode::List)
643 || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
644 {
645 cids.push(*cid);
646 continue;
647 }
648 if state.pvlist_mode != PvListMode::Off
649 && is_pattern_query(name)
650 && visible_names.iter().any(|pv| wildcard_match(name, pv))
651 {
652 cids.push(*cid);
653 }
654 }
655 let response_required = (payload.mask & 0x01) != 0;
656 let server_discovery_ping = payload.pv_requests.is_empty();
657 let found = server_discovery_ping || !cids.is_empty();
658 if !found && !response_required {
659 debug!("UDP search: no matches and response not required");
660 continue;
661 }
662 let resp_ip = if let Some(ip) = advertise_ip {
663 ip
664 } else if !addr.ip().is_unspecified() {
665 addr.ip()
666 } else if let Some(ip) = infer_udp_response_ip(peer) {
667 debug!("UDP search: inferred response address {}", ip);
668 ip
669 } else {
670 IpAddr::V4(Ipv4Addr::UNSPECIFIED)
671 };
672 let addr_bytes = if resp_ip.is_unspecified() {
673 debug!("UDP search: responding with zero address (unspecified listen)");
674 [0u8; 16]
675 } else {
676 ip_to_bytes(resp_ip)
677 };
678 let response = encode_search_response(
679 guid,
680 payload.seq,
681 addr_bytes,
682 tcp_port,
683 "tcp",
684 found,
685 &cids,
686 version,
687 is_be,
688 );
689 let reply_target = search_reply_target(&payload.addr, payload.port, peer);
690 if let Err(e) = socket.send_to(&response, reply_target).await {
691 debug!(
692 "UDP search: failed sending {} matches to {}: {}",
693 cids.len(),
694 reply_target,
695 e
696 );
697 continue;
698 }
699 debug!(
700 "UDP search: responded found={} with {} matches to {}",
701 found,
702 cids.len(),
703 reply_target
704 );
705 }
706 _ => {}
707 }
708 }
709}
710
711pub async fn run_tcp_server<S: PvStore>(
717 state: Arc<ServerState<S>>,
718 addr: SocketAddr,
719 conn_timeout: Duration,
720) -> Result<(), Box<dyn std::error::Error>> {
721 let listener = TcpListener::bind(addr).await?;
722 let conn_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
723
724 loop {
725 let (stream, peer) = listener.accept().await?;
726 let id = conn_id.fetch_add(1, Ordering::SeqCst);
727 info!("TCP connection {} from {}", id, peer);
728 let state_clone = state.clone();
729 tokio::spawn(async move {
730 if let Err(e) = handle_connection(state_clone, stream, id, conn_timeout).await {
731 error!("Connection {} error: {}", id, e);
732 }
733 });
734 }
735}
736
737pub async fn handle_connection<S: PvStore>(
747 state: Arc<ServerState<S>>,
748 stream: TcpStream,
749 conn_id: u64,
750 conn_timeout: Duration,
751) -> Result<(), Box<dyn std::error::Error>> {
752 let (mut reader, mut writer) = stream.into_split();
753 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(128);
754
755 {
756 let mut conns = state.registry.conns.lock().await;
757 conns.insert(conn_id, tx);
758 }
759
760 let writer_task = tokio::spawn(async move {
761 while let Some(msg) = rx.recv().await {
762 if writer.write_all(&msg).await.is_err() {
763 break;
764 }
765 }
766 });
767
768 let mut conn_state = ConnState::default();
769
770 let set_byte_order = encode_control_message(true, false, 2, 2, 0);
772 validate_encoded_packet(conn_id, "set_byte_order", &set_byte_order);
773 dump_hex_packet(
774 conn_id,
775 "tx",
776 "ctrl=2 set_byte_order",
777 2,
778 false,
779 &set_byte_order,
780 );
781 state.registry.send_msg(conn_id, set_byte_order).await;
782
783 let server_validation = encode_connection_validation(16_384, 512, 0, "anonymous", 2, false);
785 validate_encoded_packet(conn_id, "server_validation_init", &server_validation);
786 dump_hex_packet(
787 conn_id,
788 "tx",
789 "cmd=1 server_validation_init",
790 2,
791 false,
792 &server_validation,
793 );
794 state.registry.send_msg(conn_id, server_validation).await;
795
796 let mut last_activity = Instant::now();
797
798 loop {
799 let mut header = [0u8; 8];
800 let elapsed = last_activity.elapsed();
801 if elapsed >= conn_timeout {
802 info!("Conn {} idle timeout", conn_id);
803 break;
804 }
805 let remaining = conn_timeout - elapsed;
806 let read_header = tokio::time::timeout(remaining, reader.read_exact(&mut header)).await;
807 match read_header {
808 Ok(Ok(_)) => {}
809 Ok(Err(_)) => break,
810 Err(_) => {
811 info!("Conn {} idle timeout", conn_id);
812 break;
813 }
814 }
815 let header_pkt = PvaPacket::new(&header);
816 let payload_len = if header_pkt.header.flags.is_control {
817 0usize
818 } else {
819 header_pkt.header.payload_length as usize
820 };
821 let mut payload = vec![0u8; payload_len];
822 if payload_len > 0 {
823 let elapsed = last_activity.elapsed();
824 if elapsed >= conn_timeout {
825 info!("Conn {} idle timeout", conn_id);
826 break;
827 }
828 let remaining = conn_timeout - elapsed;
829 let read_payload =
830 tokio::time::timeout(remaining, reader.read_exact(&mut payload)).await;
831 match read_payload {
832 Ok(Ok(_)) => {}
833 Ok(Err(_)) => break,
834 Err(_) => {
835 info!("Conn {} idle timeout", conn_id);
836 break;
837 }
838 }
839 }
840 last_activity = Instant::now();
841 let mut full = header.to_vec();
842 full.extend_from_slice(&payload);
843
844 if header_pkt.header.flags.is_segmented != 0 && !header_pkt.header.flags.is_control {
846 debug!(
847 "Conn {}: segmented message cmd={} seg=0x{:02x}",
848 conn_id, header_pkt.header.command, header_pkt.header.flags.is_segmented
849 );
850 let mut payloads = vec![payload];
851 let mut seg_flags = header_pkt.header.flags;
852 while !seg_flags.is_last_segment {
853 let mut seg_header = [0u8; 8];
854 let elapsed = last_activity.elapsed();
855 if elapsed >= conn_timeout {
856 info!("Conn {} idle timeout", conn_id);
857 break;
858 }
859 let remaining = conn_timeout - elapsed;
860 let read_header =
861 tokio::time::timeout(remaining, reader.read_exact(&mut seg_header)).await;
862 match read_header {
863 Ok(Ok(_)) => {}
864 Ok(Err(_)) => break,
865 Err(_) => {
866 info!("Conn {} idle timeout", conn_id);
867 break;
868 }
869 }
870
871 let seg_header_pkt = PvaPacket::new(&seg_header);
872 let seg_payload_len = if seg_header_pkt.header.flags.is_control {
873 0usize
874 } else {
875 seg_header_pkt.header.payload_length as usize
876 };
877 let mut seg_payload = vec![0u8; seg_payload_len];
878 if seg_payload_len > 0 {
879 let elapsed = last_activity.elapsed();
880 if elapsed >= conn_timeout {
881 info!("Conn {} idle timeout", conn_id);
882 break;
883 }
884 let remaining = conn_timeout - elapsed;
885 let read_payload =
886 tokio::time::timeout(remaining, reader.read_exact(&mut seg_payload)).await;
887 match read_payload {
888 Ok(Ok(_)) => {}
889 Ok(Err(_)) => break,
890 Err(_) => {
891 info!("Conn {} idle timeout", conn_id);
892 break;
893 }
894 }
895 }
896 last_activity = Instant::now();
897
898 if seg_header_pkt.header.flags.is_control {
899 handle_control_message(&state, conn_id, &seg_header_pkt.header).await;
900 continue;
901 }
902 if seg_header_pkt.header.flags.is_segmented == 0 {
903 debug!(
904 "Conn {}: segmented message interrupted by non-segmented cmd={}",
905 conn_id, seg_header_pkt.header.command
906 );
907 break;
908 }
909 payloads.push(seg_payload);
910 seg_flags = seg_header_pkt.header.flags;
911 }
912 full = assemble_segmented_message(header, payloads);
913 }
914
915 let mut pkt = PvaPacket::new(&full);
916 let Some(cmd) = pkt.decode_payload() else {
917 continue;
918 };
919 let version = pkt.header.version;
920 let is_be = pkt.header.flags.is_msb;
921 let cmd_code = pkt.header.command;
922 let payload_slice = if full.len() >= 8 { &full[8..] } else { &[] };
923
924 if cmd_code == 1 {
926 dump_hex_packet(conn_id, "rx", "cmd=1 validation", version, is_be, &full);
927 let validation = spvirit_codec::epics_decode::PvaConnectionValidationPayload::new(
928 payload_slice,
929 is_be,
930 false,
931 );
932 if let Some(val) = validation {
933 debug!(
934 "Conn {}: validation request (cmd=1) ver={} be={} buf={} qos={} authz={:?}",
935 conn_id, version, is_be, val.buffer_size, val.qos, val.authz
936 );
937 let resp = spvirit_codec::spvirit_encode::encode_connection_validated(
938 true, version, is_be,
939 );
940 validate_encoded_packet(conn_id, "conn_validated_resp", &resp);
941 dump_hex_packet(
942 conn_id,
943 "tx",
944 "cmd=9 connection_validated",
945 version,
946 is_be,
947 &resp,
948 );
949 state.registry.send_msg(conn_id, resp).await;
950 continue;
951 }
952 }
953 if cmd_code == 17 {
954 dump_hex_packet(conn_id, "rx", "cmd=17 get_field", version, is_be, &full);
955 }
956
957 match cmd {
958 PvaPacketCommand::Control(payload) => {
959 debug!("Conn {}: control {}", conn_id, payload);
960 if payload.command == 3 {
961 let resp = encode_control_message(true, is_be, version, 4, payload.data);
962 state.registry.send_msg(conn_id, resp).await;
963 }
964 continue;
965 }
966 PvaPacketCommand::ConnectionValidation(_) => {
967 debug!("Conn {}: validation request (decoded)", conn_id);
968 }
969 PvaPacketCommand::ConnectionValidated(_) => {
970 debug!("Conn {}: validation confirmed (decoded)", conn_id);
971 }
972 PvaPacketCommand::CreateChannel(payload) => {
973 debug!(
974 "Conn {}: create_channel count={}",
975 conn_id,
976 payload.channels.len()
977 );
978 for (cid, pv_name) in payload.channels {
979 if has_pv(&state, &pv_name).await {
980 let sid = state.sid_counter.fetch_add(1, Ordering::SeqCst);
981 conn_state.cid_to_sid.insert(cid, sid);
982 conn_state.sid_to_pv.insert(sid, pv_name.clone());
983 let resp = encode_create_channel_response(cid, sid, version, is_be);
984 state.registry.send_msg(conn_id, resp).await;
985 info!(
986 "Conn {}: channel '{}' cid={} sid={}",
987 conn_id, pv_name, cid, sid
988 );
989 } else {
990 let resp = encode_create_channel_error(cid, "PV not found", version, is_be);
991 state.registry.send_msg(conn_id, resp).await;
992 info!(
993 "Conn {}: channel '{}' not found (cid={})",
994 conn_id, pv_name, cid
995 );
996 }
997 }
998 }
999 PvaPacketCommand::Op(payload) => {
1000 if payload.is_server {
1001 continue;
1002 }
1003 let sid = payload.sid_or_cid;
1004 let ioid = payload.ioid;
1005 debug!(
1006 "Conn {}: op cmd={} ioid={} sid={} sub=0x{:02x} body_len={}",
1007 conn_id,
1008 payload.command,
1009 ioid,
1010 sid,
1011 payload.subcmd,
1012 payload.body.len()
1013 );
1014 let Some(pv_name) = conn_state.sid_to_pv.get(&sid).cloned() else {
1015 state
1016 .registry
1017 .send_msg(
1018 conn_id,
1019 encode_op_error(
1020 payload.command,
1021 payload.subcmd,
1022 ioid,
1023 "Unknown SID",
1024 version,
1025 is_be,
1026 ),
1027 )
1028 .await;
1029 continue;
1030 };
1031
1032 let is_init = (payload.subcmd & 0x08) != 0;
1033
1034 match payload.command {
1035 10 => {
1036 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1038 state
1039 .registry
1040 .send_msg(
1041 conn_id,
1042 encode_op_error(
1043 payload.command,
1044 payload.subcmd,
1045 ioid,
1046 "PV not found",
1047 version,
1048 is_be,
1049 ),
1050 )
1051 .await;
1052 continue;
1053 };
1054 if is_init {
1055 let full_desc = nt_payload_desc(&nt);
1056 let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1057 let desc = match &pv_req_fields {
1058 Some(fields) => filter_structure_desc(&full_desc, fields),
1059 None => full_desc,
1060 };
1061 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1062 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1063 let resp = encode_op_init_response_desc(
1064 payload.command,
1065 ioid,
1066 0x08,
1067 &desc,
1068 version,
1069 is_be,
1070 );
1071 state.registry.send_msg(conn_id, resp).await;
1072 info!("Conn {}: get init pv='{}' ioid={}", conn_id, pv_name, ioid);
1073 } else {
1074 let resp = if let Some(desc) = conn_state.ioid_to_desc.get(&ioid) {
1075 encode_op_data_response_filtered(
1076 10, ioid, &nt, desc, version, is_be,
1077 )
1078 } else {
1079 encode_op_get_data_response_payload(ioid, &nt, version, is_be)
1080 };
1081 state.registry.send_msg(conn_id, resp).await;
1082 debug!("Conn {}: get data pv='{}' ioid={}", conn_id, pv_name, ioid);
1083 }
1084 }
1085 11 => {
1086 if is_init {
1088 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1089 state
1090 .registry
1091 .send_msg(
1092 conn_id,
1093 encode_op_error(
1094 payload.command,
1095 payload.subcmd,
1096 ioid,
1097 "PV not found",
1098 version,
1099 is_be,
1100 ),
1101 )
1102 .await;
1103 continue;
1104 };
1105 if !is_virtual_event_pv(&pv_name)
1106 && !is_writable_pv(&state, &pv_name).await
1107 {
1108 let resp = encode_op_put_status_response(
1109 ioid,
1110 0x08,
1111 "Write access denied",
1112 version,
1113 is_be,
1114 );
1115 state.registry.send_msg(conn_id, resp).await;
1116 continue;
1117 }
1118 let desc = nt_payload_desc(&nt);
1119 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1120 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1121 let resp = encode_op_init_response_desc(
1122 payload.command,
1123 ioid,
1124 0x08,
1125 &desc,
1126 version,
1127 is_be,
1128 );
1129 state.registry.send_msg(conn_id, resp).await;
1130 info!("Conn {}: put init pv='{}' ioid={}", conn_id, pv_name, ioid);
1131 } else {
1132 if (payload.subcmd & 0x40) != 0 {
1133 if !is_virtual_event_pv(&pv_name)
1134 && !is_writable_pv(&state, &pv_name).await
1135 {
1136 let resp = encode_op_put_status_response(
1137 ioid,
1138 0x40,
1139 "Write access denied",
1140 version,
1141 is_be,
1142 );
1143 state.registry.send_msg(conn_id, resp).await;
1144 continue;
1145 }
1146 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1147 let resp = encode_op_put_getput_response_payload(
1148 ioid, &nt, version, is_be,
1149 );
1150 state.registry.send_msg(conn_id, resp).await;
1151 debug!(
1152 "Conn {}: put get-put pv='{}' ioid={}",
1153 conn_id, pv_name, ioid
1154 );
1155 } else {
1156 state
1157 .registry
1158 .send_msg(
1159 conn_id,
1160 encode_op_error(
1161 payload.command,
1162 payload.subcmd,
1163 ioid,
1164 "PV not found",
1165 version,
1166 is_be,
1167 ),
1168 )
1169 .await;
1170 }
1171 continue;
1172 }
1173 let desc = match conn_state.ioid_to_desc.get(&ioid) {
1174 Some(d) => d.clone(),
1175 None => {
1176 state
1177 .registry
1178 .send_msg(
1179 conn_id,
1180 encode_op_error(
1181 payload.command,
1182 payload.subcmd,
1183 ioid,
1184 "PUT without init",
1185 version,
1186 is_be,
1187 ),
1188 )
1189 .await;
1190 continue;
1191 }
1192 };
1193 let decoded = decode_put_body(&payload.body, &desc, is_be);
1194 if let Some(value) = decoded.as_ref() {
1195 match state.store.put_value(&pv_name, value).await {
1196 Ok(changed) => {
1197 notify_changed_records(&state, changed).await;
1198 }
1199 Err(msg) => {
1200 let resp = encode_op_put_status_response(
1201 ioid,
1202 payload.subcmd,
1203 &msg,
1204 version,
1205 is_be,
1206 );
1207 state.registry.send_msg(conn_id, resp).await;
1208 continue;
1209 }
1210 }
1211 } else {
1212 debug!(
1213 "Conn {}: put decode failed ioid={} body_len={}",
1214 conn_id,
1215 ioid,
1216 payload.body.len()
1217 );
1218 let resp = encode_op_put_status_response(
1219 ioid,
1220 payload.subcmd,
1221 "cannot decode PUT body",
1222 version,
1223 is_be,
1224 );
1225 state.registry.send_msg(conn_id, resp).await;
1226 continue;
1227 }
1228 let resp = encode_op_put_response(ioid, payload.subcmd, version, is_be);
1229 state.registry.send_msg(conn_id, resp).await;
1230 debug!("Conn {}: put data pv='{}' ioid={}", conn_id, pv_name, ioid);
1231 }
1232 }
1233 12 => {
1234 if is_init {
1236 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1237 state
1238 .registry
1239 .send_msg(
1240 conn_id,
1241 encode_op_error(
1242 payload.command,
1243 payload.subcmd,
1244 ioid,
1245 "PV not found",
1246 version,
1247 is_be,
1248 ),
1249 )
1250 .await;
1251 continue;
1252 };
1253 if !is_virtual_event_pv(&pv_name)
1254 && !is_writable_pv(&state, &pv_name).await
1255 {
1256 let resp = encode_op_put_get_init_error_response(
1257 ioid,
1258 "Write access denied",
1259 version,
1260 is_be,
1261 );
1262 state.registry.send_msg(conn_id, resp).await;
1263 continue;
1264 }
1265 let desc = nt_payload_desc(&nt);
1266 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1267 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1268 let resp =
1269 encode_op_put_get_init_response(ioid, &desc, &desc, version, is_be);
1270 state.registry.send_msg(conn_id, resp).await;
1271 info!(
1272 "Conn {}: put_get init pv='{}' ioid={}",
1273 conn_id, pv_name, ioid
1274 );
1275 } else {
1276 let desc = match conn_state.ioid_to_desc.get(&ioid) {
1277 Some(d) => d.clone(),
1278 None => {
1279 state
1280 .registry
1281 .send_msg(
1282 conn_id,
1283 encode_op_error(
1284 payload.command,
1285 payload.subcmd,
1286 ioid,
1287 "PUT_GET without init",
1288 version,
1289 is_be,
1290 ),
1291 )
1292 .await;
1293 continue;
1294 }
1295 };
1296 let decoded = decode_put_body(&payload.body, &desc, is_be);
1297 if let Some(value) = decoded.as_ref() {
1298 match state.store.put_value(&pv_name, value).await {
1299 Ok(changed) => {
1300 notify_changed_records(&state, changed).await;
1301 }
1302 Err(msg) => {
1303 let resp = encode_op_put_get_data_error_response(
1304 ioid, &msg, version, is_be,
1305 );
1306 state.registry.send_msg(conn_id, resp).await;
1307 continue;
1308 }
1309 }
1310 } else {
1311 debug!(
1312 "Conn {}: put_get decode failed ioid={} body_len={}",
1313 conn_id,
1314 ioid,
1315 payload.body.len()
1316 );
1317 let resp = encode_op_put_get_data_error_response(
1318 ioid,
1319 "cannot decode PUT body",
1320 version,
1321 is_be,
1322 );
1323 state.registry.send_msg(conn_id, resp).await;
1324 continue;
1325 }
1326 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1327 let resp = encode_op_put_get_data_response_payload(
1328 ioid, &nt, version, is_be,
1329 );
1330 state.registry.send_msg(conn_id, resp).await;
1331 } else {
1332 state
1333 .registry
1334 .send_msg(
1335 conn_id,
1336 encode_op_error(
1337 payload.command,
1338 payload.subcmd,
1339 ioid,
1340 "PV not found",
1341 version,
1342 is_be,
1343 ),
1344 )
1345 .await;
1346 }
1347 debug!(
1348 "Conn {}: put_get data pv='{}' ioid={}",
1349 conn_id, pv_name, ioid
1350 );
1351 }
1352 }
1353 13 => {
1354 if is_init {
1356 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1357 state
1358 .registry
1359 .send_msg(
1360 conn_id,
1361 encode_op_error(
1362 payload.command,
1363 payload.subcmd,
1364 ioid,
1365 "PV not found",
1366 version,
1367 is_be,
1368 ),
1369 )
1370 .await;
1371 continue;
1372 };
1373 let full_desc = nt_payload_desc(&nt);
1374 let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1375 let desc = match &pv_req_fields {
1376 Some(fields) => filter_structure_desc(&full_desc, fields),
1377 None => full_desc,
1378 };
1379 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1380 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1381 let pipeline_enabled = (payload.subcmd & 0x80) != 0;
1382 let mut nfree = 0u32;
1383 if pipeline_enabled && payload.body.len() >= 4 {
1384 let start = payload.body.len() - 4;
1385 nfree = if is_be {
1386 u32::from_be_bytes([
1387 payload.body[start],
1388 payload.body[start + 1],
1389 payload.body[start + 2],
1390 payload.body[start + 3],
1391 ])
1392 } else {
1393 u32::from_le_bytes([
1394 payload.body[start],
1395 payload.body[start + 1],
1396 payload.body[start + 2],
1397 payload.body[start + 3],
1398 ])
1399 };
1400 }
1401 let resp = encode_op_init_response_desc(
1402 payload.command,
1403 ioid,
1404 0x08,
1405 &desc,
1406 version,
1407 is_be,
1408 );
1409 state.registry.send_msg(conn_id, resp).await;
1410 conn_state.ioid_to_monitor.insert(
1411 ioid,
1412 MonitorState {
1413 running: false,
1414 pipeline_enabled,
1415 nfree,
1416 },
1417 );
1418 {
1419 let mut monitors = state.registry.monitors.lock().await;
1420 monitors
1421 .entry(pv_name.clone())
1422 .or_default()
1423 .push(MonitorSub {
1424 conn_id,
1425 ioid,
1426 version,
1427 is_be,
1428 running: false,
1429 pipeline_enabled,
1430 nfree,
1431 filtered_desc: conn_state.ioid_to_desc.get(&ioid).cloned(),
1432 });
1433 }
1434 info!(
1435 "Conn {}: monitor init pv='{}' ioid={}",
1436 conn_id, pv_name, ioid
1437 );
1438 } else if (payload.subcmd & 0x10) != 0 {
1439 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1441 let resp = encode_monitor_data_response_payload(
1442 ioid, 0x10, &nt, version, is_be,
1443 );
1444 state.registry.send_msg(conn_id, resp).await;
1445 }
1446 state
1447 .registry
1448 .remove_monitor_subscription(conn_id, ioid, &pv_name)
1449 .await;
1450 conn_state.ioid_to_monitor.remove(&ioid);
1451 conn_state.ioid_to_pv.remove(&ioid);
1452 conn_state.ioid_to_desc.remove(&ioid);
1453 info!("Conn {}: monitor end ioid={}", conn_id, ioid);
1454 } else if (payload.subcmd & 0x04) != 0 || (payload.subcmd & 0x80) != 0 {
1455 let start = (payload.subcmd & 0x44) == 0x44;
1457 let stop = (payload.subcmd & 0x44) == 0x04;
1458 let pipeline_ack = (payload.subcmd & 0x80) != 0;
1459 let mut nfree = None;
1460 if pipeline_ack && payload.body.len() >= 4 {
1461 let v = if is_be {
1462 u32::from_be_bytes([
1463 payload.body[0],
1464 payload.body[1],
1465 payload.body[2],
1466 payload.body[3],
1467 ])
1468 } else {
1469 u32::from_le_bytes([
1470 payload.body[0],
1471 payload.body[1],
1472 payload.body[2],
1473 payload.body[3],
1474 ])
1475 };
1476 nfree = Some(v);
1477 }
1478 let running = if start {
1479 true
1480 } else if stop {
1481 false
1482 } else {
1483 conn_state
1484 .ioid_to_monitor
1485 .get(&ioid)
1486 .map(|m| m.running)
1487 .unwrap_or(true)
1488 };
1489 state
1490 .registry
1491 .update_monitor_subscription(
1492 conn_id,
1493 ioid,
1494 &pv_name,
1495 running,
1496 nfree,
1497 Some(pipeline_ack),
1498 )
1499 .await;
1500 if let Some(mon) = conn_state.ioid_to_monitor.get_mut(&ioid) {
1501 mon.running = running;
1502 if pipeline_ack {
1503 mon.pipeline_enabled = true;
1504 }
1505 if let Some(v) = nfree {
1506 if pipeline_ack {
1507 mon.nfree = mon.nfree.saturating_add(v);
1508 } else {
1509 mon.nfree = v;
1510 }
1511 }
1512 }
1513 info!(
1514 "Conn {}: monitor {} ioid={} ack={} nfree={:?}",
1515 conn_id,
1516 if start {
1517 "start"
1518 } else if stop {
1519 "stop"
1520 } else {
1521 "ack"
1522 },
1523 ioid,
1524 pipeline_ack,
1525 nfree
1526 );
1527 if start {
1528 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1529 state
1530 .registry
1531 .send_monitor_update_for(&pv_name, conn_id, ioid, &nt)
1532 .await;
1533 }
1534 }
1535 }
1536 }
1537 20 => {
1538 if is_server_rpc_pv(&pv_name) {
1540 handle_server_rpc(
1541 &state,
1542 conn_id,
1543 ioid,
1544 payload.subcmd,
1545 version,
1546 is_be,
1547 )
1548 .await;
1549 } else {
1550 state
1551 .registry
1552 .send_msg(
1553 conn_id,
1554 encode_op_error(
1555 payload.command,
1556 payload.subcmd,
1557 ioid,
1558 "Operation not supported",
1559 version,
1560 is_be,
1561 ),
1562 )
1563 .await;
1564 }
1565 }
1566 14 | 16 => {
1567 state
1568 .registry
1569 .send_msg(
1570 conn_id,
1571 encode_op_error(
1572 payload.command,
1573 payload.subcmd,
1574 ioid,
1575 "Operation not supported",
1576 version,
1577 is_be,
1578 ),
1579 )
1580 .await;
1581 }
1582 _ => {
1583 state
1584 .registry
1585 .send_msg(
1586 conn_id,
1587 encode_op_error(
1588 payload.command,
1589 payload.subcmd,
1590 ioid,
1591 "Operation not supported",
1592 version,
1593 is_be,
1594 ),
1595 )
1596 .await;
1597 }
1598 }
1599 }
1600 PvaPacketCommand::DestroyChannel(payload) => {
1601 let sid = payload.sid;
1602 let cid = payload.cid;
1603 conn_state.cid_to_sid.remove(&cid);
1604 conn_state.sid_to_pv.remove(&sid);
1605 info!(
1606 "Conn {}: channel destroyed sid={} cid={}",
1607 conn_id, sid, cid
1608 );
1609 }
1610 PvaPacketCommand::DestroyRequest(payload) => {
1611 let ioid = payload.request_id;
1612 if let Some(pv_name) = conn_state.ioid_to_pv.remove(&ioid) {
1613 state
1614 .registry
1615 .remove_monitor_subscription(conn_id, ioid, &pv_name)
1616 .await;
1617 conn_state.ioid_to_desc.remove(&ioid);
1618 conn_state.ioid_to_monitor.remove(&ioid);
1619 info!("Conn {}: monitor unsubscribed ioid={}", conn_id, ioid);
1620 }
1621 }
1622 PvaPacketCommand::AuthNZ(_) => {
1623 let resp = encode_message_error("AUTHNZ command is not supported", version, is_be);
1624 state.registry.send_msg(conn_id, resp).await;
1625 }
1626 PvaPacketCommand::AclChange(_) => {
1627 let resp =
1628 encode_message_error("ACL_CHANGE command is not supported", version, is_be);
1629 state.registry.send_msg(conn_id, resp).await;
1630 }
1631 PvaPacketCommand::GetField(payload) => {
1632 handle_get_field_request(&state, &conn_state, conn_id, payload, version, is_be)
1633 .await;
1634 }
1635 PvaPacketCommand::Echo(payload_bytes) => {
1636 let mut resp =
1637 encode_header(true, is_be, false, version, 2, payload_bytes.len() as u32);
1638 resp.extend_from_slice(&payload_bytes);
1639 state.registry.send_msg(conn_id, resp).await;
1640 }
1641 PvaPacketCommand::Message(_) => {
1642 let resp = encode_message_error("MESSAGE command is not supported", version, is_be);
1643 state.registry.send_msg(conn_id, resp).await;
1644 }
1645 PvaPacketCommand::MultipleData(_) => {
1646 let resp =
1647 encode_message_error("MULTIPLE_DATA command is not supported", version, is_be);
1648 state.registry.send_msg(conn_id, resp).await;
1649 }
1650 PvaPacketCommand::CancelRequest(_) => {
1651 let resp =
1652 encode_message_error("CANCEL_REQUEST command is not supported", version, is_be);
1653 state.registry.send_msg(conn_id, resp).await;
1654 }
1655 PvaPacketCommand::OriginTag(_) => {
1656 let resp =
1657 encode_message_error("ORIGIN_TAG command is not supported", version, is_be);
1658 state.registry.send_msg(conn_id, resp).await;
1659 }
1660 PvaPacketCommand::Search(_)
1661 | PvaPacketCommand::SearchResponse(_)
1662 | PvaPacketCommand::Beacon(_) => {
1663 let resp =
1664 encode_message_error("Unexpected command for server endpoint", version, is_be);
1665 state.registry.send_msg(conn_id, resp).await;
1666 }
1667 PvaPacketCommand::Unknown(payload) => {
1668 let resp = encode_message_error(
1669 &format!("Unknown command {}", payload.command),
1670 version,
1671 is_be,
1672 );
1673 state.registry.send_msg(conn_id, resp).await;
1674 }
1675 }
1676 }
1677
1678 state.registry.cleanup_connection(conn_id).await;
1679 let _ = writer_task.await;
1680 Ok(())
1681}