1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime};
10
11use regex::Regex;
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use spvirit_codec::epics_decode::{PvaHeader, PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvirit_encode::{
19 encode_connection_validation, encode_control_message,
20 encode_create_channel_error, encode_create_channel_response, encode_get_field_error,
21 encode_get_field_response, encode_header, encode_message_error,
22 encode_monitor_data_response_payload, encode_op_error, encode_op_get_data_response_payload,
23 encode_op_init_response_desc, encode_op_put_get_data_error_response,
24 encode_op_put_get_data_response_payload, encode_op_put_get_init_error_response,
25 encode_op_put_get_init_response, encode_op_put_getput_response_payload, encode_op_put_response,
26 encode_op_put_status_response, encode_op_rpc_data_response_payload,
27 encode_op_status_error_response, encode_op_status_response, encode_search_response,
28 ip_from_bytes, ip_to_bytes,
29};
30use spvirit_codec::spvd_decode::{extract_subfield_desc, StructureDesc};
31use spvirit_codec::spvd_encode::nt_payload_desc;
32
33use spvirit_types::{NtPayload, NtScalar, NtScalarArray, ScalarArrayValue, ScalarValue};
34
35use crate::decode::{assemble_segmented_message, decode_put_body};
36use crate::monitor::MonitorRegistry;
37use crate::pvstore::PvStore;
38use crate::state::{ConnState, MonitorState, MonitorSub};
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum PvListMode {
47 Off,
49 Discover,
51 List,
53}
54
55impl PvListMode {
56 pub fn parse(raw: &str) -> Result<Self, String> {
57 match raw.trim().to_ascii_lowercase().as_str() {
58 "off" => Ok(Self::Off),
59 "discover" => Ok(Self::Discover),
60 "list" => Ok(Self::List),
61 other => Err(format!(
62 "Invalid pvlist-mode '{}'; expected off|discover|list",
63 other
64 )),
65 }
66 }
67}
68
69pub struct ServerState<S: PvStore> {
75 pub store: Arc<S>,
76 pub registry: Arc<MonitorRegistry>,
77 pub sid_counter: AtomicU32,
78 pub beacon_change: Arc<AtomicU16>,
79 pub compute_alarms: bool,
80 pub pvlist_mode: PvListMode,
81 pub pvlist_max: usize,
82 pub pvlist_allow_pattern: Option<Regex>,
83}
84
85impl<S: PvStore> ServerState<S> {
86 pub fn new(
87 store: Arc<S>,
88 registry: Arc<MonitorRegistry>,
89 compute_alarms: bool,
90 pvlist_mode: PvListMode,
91 pvlist_max: usize,
92 pvlist_allow_pattern: Option<Regex>,
93 ) -> Self {
94 Self {
95 store,
96 registry,
97 sid_counter: AtomicU32::new(1),
98 beacon_change: Arc::new(AtomicU16::new(0)),
99 compute_alarms,
100 pvlist_mode,
101 pvlist_max,
102 pvlist_allow_pattern,
103 }
104 }
105}
106
107pub fn is_pvlist_virtual_pv(pv_name: &str) -> bool {
112 pv_name == "__pvlist"
113}
114
115pub fn is_server_rpc_pv(pv_name: &str) -> bool {
116 pv_name == "server"
117}
118
119pub fn is_virtual_event_pv(pv_name: &str) -> bool {
120 pv_name.starts_with("__event:")
121}
122
123pub fn virtual_event_nt(pv_name: &str) -> NtPayload {
124 NtPayload::Scalar(
125 NtScalar::from_value(ScalarValue::Bool(false))
126 .with_description(format!("Virtual event trigger for {}", pv_name)),
127 )
128}
129
130pub fn virtual_pvlist_nt(entries: Vec<String>) -> NtPayload {
131 NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(entries)))
132}
133
134pub fn is_pattern_query(raw: &str) -> bool {
139 raw.contains('*') || raw.contains('?')
140}
141
142pub fn wildcard_match(pattern: &str, text: &str) -> bool {
143 let p = pattern.as_bytes();
144 let t = text.as_bytes();
145 let mut i = 0usize;
146 let mut j = 0usize;
147 let mut star: Option<usize> = None;
148 let mut match_j = 0usize;
149
150 while j < t.len() {
151 if i < p.len() && (p[i] == b'?' || p[i] == t[j]) {
152 i += 1;
153 j += 1;
154 } else if i < p.len() && p[i] == b'*' {
155 star = Some(i);
156 i += 1;
157 match_j = j;
158 } else if let Some(star_idx) = star {
159 i = star_idx + 1;
160 match_j += 1;
161 j = match_j;
162 } else {
163 return false;
164 }
165 }
166
167 while i < p.len() && p[i] == b'*' {
168 i += 1;
169 }
170 i == p.len()
171}
172
173pub fn collect_visible_pv_names(
174 all_names: &[String],
175 mode: PvListMode,
176 allow_pattern: Option<&Regex>,
177 max_items: usize,
178) -> Vec<String> {
179 let mut names: Vec<String> = all_names
180 .iter()
181 .filter(|name| {
182 allow_pattern
183 .as_ref()
184 .map(|re| re.is_match(name))
185 .unwrap_or(true)
186 })
187 .cloned()
188 .collect();
189 names.sort();
190 if names.len() > max_items {
191 names.truncate(max_items);
192 }
193 if mode == PvListMode::List && names.len() < max_items {
194 names.push("__pvlist".to_string());
195 }
196 names
197}
198
199fn build_pvlist_structure(names: &[String]) -> StructureDesc {
200 use spvirit_codec::spvd_decode::{FieldDesc, FieldType, TypeCode};
201 StructureDesc {
202 struct_id: Some("epics:pva/pvlist:1.0".to_string()),
203 fields: names
204 .iter()
205 .map(|name| FieldDesc {
206 name: name.clone(),
207 field_type: FieldType::Scalar(TypeCode::Boolean),
208 })
209 .collect(),
210 }
211}
212
213fn requested_pvlist_pattern(field_name: Option<&str>) -> Option<&str> {
214 let raw = field_name.map(str::trim).unwrap_or("");
215 if raw.is_empty() || raw == "*" || raw == "__pvlist" || raw.eq_ignore_ascii_case("pvlist") {
216 return Some("*");
217 }
218 if is_pattern_query(raw) {
219 return Some(raw);
220 }
221 None
222}
223
224pub fn search_reply_target(addr: &[u8; 16], port: u16, peer: SocketAddr) -> SocketAddr {
229 let target_port = if port != 0 { port } else { peer.port() };
230 let target_ip = ip_from_bytes(addr)
231 .filter(|ip| !ip.is_unspecified())
232 .unwrap_or_else(|| peer.ip());
233 SocketAddr::new(target_ip, target_port)
234}
235
236pub fn infer_udp_response_ip(peer: SocketAddr) -> Option<IpAddr> {
237 let bind_addr = if peer.is_ipv4() {
238 "0.0.0.0:0"
239 } else {
240 "[::]:0"
241 };
242 let sock = std::net::UdpSocket::bind(bind_addr).ok()?;
243 sock.connect(peer).ok()?;
244 let local = sock.local_addr().ok()?;
245 if local.ip().is_unspecified() {
246 None
247 } else {
248 Some(local.ip())
249 }
250}
251
252pub fn rand_guid() -> [u8; 12] {
253 let now = SystemTime::now()
254 .duration_since(SystemTime::UNIX_EPOCH)
255 .unwrap_or_default();
256 let mut guid = [0u8; 12];
257 let bytes = now.as_nanos().to_le_bytes();
258 guid.copy_from_slice(&bytes[0..12]);
259 guid
260}
261
262pub fn validate_encoded_packet(conn_id: u64, label: &str, bytes: &[u8]) {
267 let mut pkt = PvaPacket::new(bytes);
268 let decoded = pkt.decode_payload();
269 match decoded {
270 Some(PvaPacketCommand::ConnectionValidation(payload)) => {
271 debug!(
272 "Conn {}: {} decoded as cmd=1 buffer_size={} qos={} authz={:?}",
273 conn_id, label, payload.buffer_size, payload.qos, payload.authz
274 );
275 }
276 Some(PvaPacketCommand::ConnectionValidated(_)) => {
277 debug!("Conn {}: {} decoded as cmd=9", conn_id, label);
278 }
279 Some(other) => {
280 debug!("Conn {}: {} decoded as {:?}", conn_id, label, other);
281 }
282 None => {
283 debug!("Conn {}: {} failed to decode", conn_id, label);
284 }
285 }
286}
287
288pub fn dump_hex_packet(
289 conn_id: u64,
290 dir: &str,
291 label: &str,
292 version: u8,
293 is_be: bool,
294 bytes: &[u8],
295) {
296 debug!(
297 "Conn {}: {} {} ver={} be={} len={}",
298 conn_id,
299 dir,
300 label,
301 version,
302 is_be,
303 bytes.len()
304 );
305 let mut offset = 0usize;
306 while offset < bytes.len() {
307 let end = usize::min(offset + 16, bytes.len());
308 let chunk = &bytes[offset..end];
309 let mut line = String::new();
310 for (i, b) in chunk.iter().enumerate() {
311 if i > 0 {
312 line.push(' ');
313 }
314 line.push_str(&format!("{:02x}", b));
315 }
316 debug!("Conn {}: {:04x} {}", conn_id, offset, line);
317 offset += 16;
318 }
319}
320
321async fn get_nt_snapshot<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> Option<NtPayload> {
326 if is_pvlist_virtual_pv(pv_name) {
327 if state.pvlist_mode != PvListMode::List {
328 return None;
329 }
330 let all_names = state.store.list_pvs().await;
331 let names = collect_visible_pv_names(
332 &all_names,
333 state.pvlist_mode,
334 state.pvlist_allow_pattern.as_ref(),
335 state.pvlist_max,
336 );
337 return Some(virtual_pvlist_nt(names));
338 }
339 if is_virtual_event_pv(pv_name) {
340 return Some(virtual_event_nt(pv_name));
341 }
342 state.store.get_snapshot(pv_name).await
343}
344
345async fn is_writable_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
346 if is_virtual_event_pv(pv_name) {
347 return true;
348 }
349 state.store.is_writable(pv_name).await
350}
351
352async fn has_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
353 state.store.has_pv(pv_name).await
354 || is_virtual_event_pv(pv_name)
355 || (is_pvlist_virtual_pv(pv_name) && state.pvlist_mode == PvListMode::List)
356 || (is_server_rpc_pv(pv_name) && state.pvlist_mode != PvListMode::Off)
357}
358
359async fn notify_changed_records<S: PvStore>(
364 state: &ServerState<S>,
365 changed: Vec<(String, NtPayload)>,
366) {
367 for (name, payload) in changed {
368 state.beacon_change.fetch_add(1, Ordering::SeqCst);
369 state.registry.notify_monitors(&name, &payload).await;
370 }
371}
372
373async fn handle_get_field_request<S: PvStore>(
378 state: &ServerState<S>,
379 conn_state: &ConnState,
380 conn_id: u64,
381 payload: spvirit_codec::epics_decode::PvaGetFieldPayload,
382 version: u8,
383 is_be: bool,
384) {
385 if payload.is_server {
386 let resp = encode_get_field_error(
387 payload.cid,
388 "Unexpected server GET_FIELD payload",
389 version,
390 is_be,
391 );
392 state.registry.send_msg(conn_id, resp).await;
393 return;
394 }
395
396 let request_id = payload.ioid.unwrap_or(payload.cid);
397
398 let sid = payload
399 .sid
400 .or_else(|| conn_state.cid_to_sid.get(&payload.cid).copied())
401 .or_else(|| {
402 conn_state
403 .sid_to_pv
404 .contains_key(&payload.cid)
405 .then_some(payload.cid)
406 })
407 .or_else(|| {
408 (payload.cid == 0 && conn_state.sid_to_pv.len() == 1)
409 .then(|| conn_state.sid_to_pv.keys().copied().next())
410 .flatten()
411 });
412
413 if let Some(sid) = sid {
414 if let Some(pv_name) = conn_state.sid_to_pv.get(&sid) {
415 if let Some(nt) = get_nt_snapshot(state, pv_name).await {
416 let full_desc = nt_payload_desc(&nt);
417 let sub = payload
418 .field_name
419 .as_deref()
420 .filter(|s| !s.is_empty());
421 let desc = if let Some(field_path) = sub {
422 match extract_subfield_desc(&full_desc, field_path) {
423 Some(sub_desc) => sub_desc,
424 None => {
425 let resp = encode_get_field_error(
426 request_id,
427 &format!("sub-field '{}' not found", field_path),
428 version,
429 is_be,
430 );
431 state.registry.send_msg(conn_id, resp).await;
432 return;
433 }
434 }
435 } else {
436 full_desc
437 };
438 let resp = encode_get_field_response(request_id, &desc, version, is_be);
439 dump_hex_packet(conn_id, "tx", "cmd=17 get_field", version, is_be, &resp);
440 state.registry.send_msg(conn_id, resp).await;
441 debug!(
442 "Conn {}: get_field cid={} sid={:?} ioid={:?} resolved_sid={} pv='{}' field={:?}",
443 conn_id, payload.cid, payload.sid, payload.ioid, sid, pv_name, payload.field_name
444 );
445 return;
446 }
447 let resp = encode_get_field_error(request_id, "PV not found", version, is_be);
448 state.registry.send_msg(conn_id, resp).await;
449 return;
450 }
451 }
452
453 if state.pvlist_mode != PvListMode::List {
454 let resp = encode_get_field_error(
455 request_id,
456 "GET_FIELD listing is disabled (set --pvlist-mode=list)",
457 version,
458 is_be,
459 );
460 state.registry.send_msg(conn_id, resp).await;
461 return;
462 }
463
464 let Some(pattern) = requested_pvlist_pattern(payload.field_name.as_deref()) else {
465 let resp = encode_get_field_error(
466 request_id,
467 "GET_FIELD requires a valid list pattern",
468 version,
469 is_be,
470 );
471 state.registry.send_msg(conn_id, resp).await;
472 return;
473 };
474
475 let all_names = state.store.list_pvs().await;
476 let mut names = collect_visible_pv_names(
477 &all_names,
478 state.pvlist_mode,
479 state.pvlist_allow_pattern.as_ref(),
480 state.pvlist_max,
481 );
482 if pattern != "*" {
483 names.retain(|name| wildcard_match(pattern, name));
484 }
485 if names.is_empty() {
486 let resp = encode_get_field_error(
487 request_id,
488 "No PVs matched list request",
489 version,
490 is_be,
491 );
492 state.registry.send_msg(conn_id, resp).await;
493 return;
494 }
495 let desc = build_pvlist_structure(&names);
496 let resp = encode_get_field_response(request_id, &desc, version, is_be);
497 dump_hex_packet(
498 conn_id,
499 "tx",
500 "cmd=17 get_field_list",
501 version,
502 is_be,
503 &resp,
504 );
505 state.registry.send_msg(conn_id, resp).await;
506 debug!(
507 "Conn {}: get_field list pattern='{}' returned {} entries",
508 conn_id,
509 pattern,
510 names.len()
511 );
512}
513
514async fn handle_server_rpc<S: PvStore>(
519 state: &ServerState<S>,
520 conn_id: u64,
521 ioid: u32,
522 subcmd: u8,
523 version: u8,
524 is_be: bool,
525) {
526 if state.pvlist_mode != PvListMode::List {
527 let resp = encode_op_status_error_response(
528 20,
529 ioid,
530 subcmd,
531 "RPC list endpoint disabled (set --pvlist-mode=list)",
532 version,
533 is_be,
534 );
535 state.registry.send_msg(conn_id, resp).await;
536 return;
537 }
538
539 let all_names = state.store.list_pvs().await;
540 let names = collect_visible_pv_names(
541 &all_names,
542 state.pvlist_mode,
543 state.pvlist_allow_pattern.as_ref(),
544 state.pvlist_max,
545 );
546 let payload = NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(names)));
547
548 let is_init = (subcmd & 0x08) != 0;
549 if is_init {
550 let resp = encode_op_status_response(20, ioid, subcmd, version, is_be);
551 state.registry.send_msg(conn_id, resp).await;
552 return;
553 }
554
555 let resp = encode_op_rpc_data_response_payload(ioid, subcmd, &payload, version, is_be);
556 state.registry.send_msg(conn_id, resp).await;
557}
558
559async fn handle_control_message<S: PvStore>(
564 state: &ServerState<S>,
565 conn_id: u64,
566 header: &PvaHeader,
567) {
568 debug!(
569 "Conn {}: control (segmented) cmd={} data={}",
570 conn_id, header.command, header.payload_length
571 );
572 if header.command == 3 {
573 let resp = encode_control_message(
574 true,
575 header.flags.is_msb,
576 header.version,
577 4,
578 header.payload_length,
579 );
580 state.registry.send_msg(conn_id, resp).await;
581 }
582}
583
584pub async fn run_udp_search<S: PvStore>(
590 state: Arc<ServerState<S>>,
591 addr: SocketAddr,
592 tcp_port: u16,
593 guid: [u8; 12],
594 advertise_ip: Option<IpAddr>,
595) -> Result<(), Box<dyn std::error::Error>> {
596 let socket = UdpSocket::bind(addr).await?;
597 socket.set_broadcast(true)?;
598 let mut buf = vec![0u8; 4096];
599
600 loop {
601 let (len, peer) = socket.recv_from(&mut buf).await?;
602 let data = &buf[..len];
603 let header = PvaHeader::new(data);
604 if header.flags.is_control || header.command != 3 {
605 continue;
606 }
607 let mut pkt = PvaPacket::new(data);
608 let Some(cmd) = pkt.decode_payload() else {
609 continue;
610 };
611 let version = pkt.header.version;
612 let is_be = pkt.header.flags.is_msb;
613 match cmd {
614 PvaPacketCommand::Search(payload) => {
615 debug!(
616 "UDP search from {}: pv_count={} mask=0x{:02x}",
617 peer,
618 payload.pv_requests.len(),
619 payload.mask
620 );
621 let accepts_tcp = payload.protocols.is_empty()
622 || payload
623 .protocols
624 .iter()
625 .any(|p| p.eq_ignore_ascii_case("tcp"));
626 if !accepts_tcp {
627 debug!("UDP search: no compatible protocol (tcp not accepted)");
628 continue;
629 }
630 let all_names = state.store.list_pvs().await;
631 let visible_names = collect_visible_pv_names(
632 &all_names,
633 state.pvlist_mode,
634 state.pvlist_allow_pattern.as_ref(),
635 state.pvlist_max,
636 );
637 let mut cids = Vec::new();
638 for (cid, name) in &payload.pv_requests {
639 if state.store.has_pv(name).await
640 || is_virtual_event_pv(name)
641 || (is_pvlist_virtual_pv(name)
642 && state.pvlist_mode == PvListMode::List)
643 || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
644 {
645 cids.push(*cid);
646 continue;
647 }
648 if state.pvlist_mode != PvListMode::Off
649 && is_pattern_query(name)
650 && visible_names.iter().any(|pv| wildcard_match(name, pv))
651 {
652 cids.push(*cid);
653 }
654 }
655 let response_required = (payload.mask & 0x01) != 0;
656 let server_discovery_ping = payload.pv_requests.is_empty();
657 let found = server_discovery_ping || !cids.is_empty();
658 if !found && !response_required {
659 debug!("UDP search: no matches and response not required");
660 continue;
661 }
662 let resp_ip = if let Some(ip) = advertise_ip {
663 ip
664 } else if !addr.ip().is_unspecified() {
665 addr.ip()
666 } else if let Some(ip) = infer_udp_response_ip(peer) {
667 debug!("UDP search: inferred response address {}", ip);
668 ip
669 } else {
670 IpAddr::V4(Ipv4Addr::UNSPECIFIED)
671 };
672 let addr_bytes = if resp_ip.is_unspecified() {
673 debug!(
674 "UDP search: responding with zero address (unspecified listen)"
675 );
676 [0u8; 16]
677 } else {
678 ip_to_bytes(resp_ip)
679 };
680 let response = encode_search_response(
681 guid,
682 payload.seq,
683 addr_bytes,
684 tcp_port,
685 "tcp",
686 found,
687 &cids,
688 version,
689 is_be,
690 );
691 let reply_target = search_reply_target(&payload.addr, payload.port, peer);
692 if let Err(e) = socket.send_to(&response, reply_target).await {
693 debug!(
694 "UDP search: failed sending {} matches to {}: {}",
695 cids.len(),
696 reply_target,
697 e
698 );
699 continue;
700 }
701 debug!(
702 "UDP search: responded found={} with {} matches to {}",
703 found,
704 cids.len(),
705 reply_target
706 );
707 }
708 _ => {}
709 }
710 }
711}
712
713pub async fn run_tcp_server<S: PvStore>(
719 state: Arc<ServerState<S>>,
720 addr: SocketAddr,
721 conn_timeout: Duration,
722) -> Result<(), Box<dyn std::error::Error>> {
723 let listener = TcpListener::bind(addr).await?;
724 let conn_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
725
726 loop {
727 let (stream, peer) = listener.accept().await?;
728 let id = conn_id.fetch_add(1, Ordering::SeqCst);
729 info!("TCP connection {} from {}", id, peer);
730 let state_clone = state.clone();
731 tokio::spawn(async move {
732 if let Err(e) = handle_connection(state_clone, stream, id, conn_timeout).await {
733 error!("Connection {} error: {}", id, e);
734 }
735 });
736 }
737}
738
739pub async fn handle_connection<S: PvStore>(
749 state: Arc<ServerState<S>>,
750 stream: TcpStream,
751 conn_id: u64,
752 conn_timeout: Duration,
753) -> Result<(), Box<dyn std::error::Error>> {
754 let (mut reader, mut writer) = stream.into_split();
755 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(128);
756
757 {
758 let mut conns = state.registry.conns.lock().await;
759 conns.insert(conn_id, tx);
760 }
761
762 let writer_task = tokio::spawn(async move {
763 while let Some(msg) = rx.recv().await {
764 if writer.write_all(&msg).await.is_err() {
765 break;
766 }
767 }
768 });
769
770 let mut conn_state = ConnState::default();
771
772 let set_byte_order = encode_control_message(true, false, 2, 2, 0);
774 validate_encoded_packet(conn_id, "set_byte_order", &set_byte_order);
775 dump_hex_packet(
776 conn_id,
777 "tx",
778 "ctrl=2 set_byte_order",
779 2,
780 false,
781 &set_byte_order,
782 );
783 state.registry.send_msg(conn_id, set_byte_order).await;
784
785 let server_validation = encode_connection_validation(16_384, 512, 0, "anonymous", 2, false);
787 validate_encoded_packet(conn_id, "server_validation_init", &server_validation);
788 dump_hex_packet(
789 conn_id,
790 "tx",
791 "cmd=1 server_validation_init",
792 2,
793 false,
794 &server_validation,
795 );
796 state.registry.send_msg(conn_id, server_validation).await;
797
798 let mut last_activity = Instant::now();
799
800 loop {
801 let mut header = [0u8; 8];
802 let elapsed = last_activity.elapsed();
803 if elapsed >= conn_timeout {
804 info!("Conn {} idle timeout", conn_id);
805 break;
806 }
807 let remaining = conn_timeout - elapsed;
808 let read_header = tokio::time::timeout(remaining, reader.read_exact(&mut header)).await;
809 match read_header {
810 Ok(Ok(_)) => {}
811 Ok(Err(_)) => break,
812 Err(_) => {
813 info!("Conn {} idle timeout", conn_id);
814 break;
815 }
816 }
817 let header_pkt = PvaPacket::new(&header);
818 let payload_len = if header_pkt.header.flags.is_control {
819 0usize
820 } else {
821 header_pkt.header.payload_length as usize
822 };
823 let mut payload = vec![0u8; payload_len];
824 if payload_len > 0 {
825 let elapsed = last_activity.elapsed();
826 if elapsed >= conn_timeout {
827 info!("Conn {} idle timeout", conn_id);
828 break;
829 }
830 let remaining = conn_timeout - elapsed;
831 let read_payload =
832 tokio::time::timeout(remaining, reader.read_exact(&mut payload)).await;
833 match read_payload {
834 Ok(Ok(_)) => {}
835 Ok(Err(_)) => break,
836 Err(_) => {
837 info!("Conn {} idle timeout", conn_id);
838 break;
839 }
840 }
841 }
842 last_activity = Instant::now();
843 let mut full = header.to_vec();
844 full.extend_from_slice(&payload);
845
846 if header_pkt.header.flags.is_segmented != 0 && !header_pkt.header.flags.is_control {
848 debug!(
849 "Conn {}: segmented message cmd={} seg=0x{:02x}",
850 conn_id, header_pkt.header.command, header_pkt.header.flags.is_segmented
851 );
852 let mut payloads = vec![payload];
853 let mut seg_flags = header_pkt.header.flags;
854 while !seg_flags.is_last_segment {
855 let mut seg_header = [0u8; 8];
856 let elapsed = last_activity.elapsed();
857 if elapsed >= conn_timeout {
858 info!("Conn {} idle timeout", conn_id);
859 break;
860 }
861 let remaining = conn_timeout - elapsed;
862 let read_header =
863 tokio::time::timeout(remaining, reader.read_exact(&mut seg_header)).await;
864 match read_header {
865 Ok(Ok(_)) => {}
866 Ok(Err(_)) => break,
867 Err(_) => {
868 info!("Conn {} idle timeout", conn_id);
869 break;
870 }
871 }
872
873 let seg_header_pkt = PvaPacket::new(&seg_header);
874 let seg_payload_len = if seg_header_pkt.header.flags.is_control {
875 0usize
876 } else {
877 seg_header_pkt.header.payload_length as usize
878 };
879 let mut seg_payload = vec![0u8; seg_payload_len];
880 if seg_payload_len > 0 {
881 let elapsed = last_activity.elapsed();
882 if elapsed >= conn_timeout {
883 info!("Conn {} idle timeout", conn_id);
884 break;
885 }
886 let remaining = conn_timeout - elapsed;
887 let read_payload =
888 tokio::time::timeout(remaining, reader.read_exact(&mut seg_payload)).await;
889 match read_payload {
890 Ok(Ok(_)) => {}
891 Ok(Err(_)) => break,
892 Err(_) => {
893 info!("Conn {} idle timeout", conn_id);
894 break;
895 }
896 }
897 }
898 last_activity = Instant::now();
899
900 if seg_header_pkt.header.flags.is_control {
901 handle_control_message(&state, conn_id, &seg_header_pkt.header).await;
902 continue;
903 }
904 if seg_header_pkt.header.flags.is_segmented == 0 {
905 debug!(
906 "Conn {}: segmented message interrupted by non-segmented cmd={}",
907 conn_id, seg_header_pkt.header.command
908 );
909 break;
910 }
911 payloads.push(seg_payload);
912 seg_flags = seg_header_pkt.header.flags;
913 }
914 full = assemble_segmented_message(header, payloads);
915 }
916
917 let mut pkt = PvaPacket::new(&full);
918 let Some(cmd) = pkt.decode_payload() else {
919 continue;
920 };
921 let version = pkt.header.version;
922 let is_be = pkt.header.flags.is_msb;
923 let cmd_code = pkt.header.command;
924 let payload_slice = if full.len() >= 8 { &full[8..] } else { &[] };
925
926 if cmd_code == 1 {
928 dump_hex_packet(conn_id, "rx", "cmd=1 validation", version, is_be, &full);
929 let validation =
930 spvirit_codec::epics_decode::PvaConnectionValidationPayload::new(
931 payload_slice,
932 is_be,
933 false,
934 );
935 if let Some(val) = validation {
936 debug!(
937 "Conn {}: validation request (cmd=1) ver={} be={} buf={} qos={} authz={:?}",
938 conn_id, version, is_be, val.buffer_size, val.qos, val.authz
939 );
940 let resp = spvirit_codec::spvirit_encode::encode_connection_validated(
941 true, version, is_be,
942 );
943 validate_encoded_packet(conn_id, "conn_validated_resp", &resp);
944 dump_hex_packet(
945 conn_id,
946 "tx",
947 "cmd=9 connection_validated",
948 version,
949 is_be,
950 &resp,
951 );
952 state.registry.send_msg(conn_id, resp).await;
953 continue;
954 }
955 }
956 if cmd_code == 17 {
957 dump_hex_packet(conn_id, "rx", "cmd=17 get_field", version, is_be, &full);
958 }
959
960 match cmd {
961 PvaPacketCommand::Control(payload) => {
962 debug!("Conn {}: control {}", conn_id, payload);
963 if payload.command == 3 {
964 let resp =
965 encode_control_message(true, is_be, version, 4, payload.data);
966 state.registry.send_msg(conn_id, resp).await;
967 }
968 continue;
969 }
970 PvaPacketCommand::ConnectionValidation(_) => {
971 debug!("Conn {}: validation request (decoded)", conn_id);
972 }
973 PvaPacketCommand::ConnectionValidated(_) => {
974 debug!("Conn {}: validation confirmed (decoded)", conn_id);
975 }
976 PvaPacketCommand::CreateChannel(payload) => {
977 debug!(
978 "Conn {}: create_channel count={}",
979 conn_id,
980 payload.channels.len()
981 );
982 for (cid, pv_name) in payload.channels {
983 if has_pv(&state, &pv_name).await {
984 let sid = state.sid_counter.fetch_add(1, Ordering::SeqCst);
985 conn_state.cid_to_sid.insert(cid, sid);
986 conn_state.sid_to_pv.insert(sid, pv_name.clone());
987 let resp =
988 encode_create_channel_response(cid, sid, version, is_be);
989 state.registry.send_msg(conn_id, resp).await;
990 info!(
991 "Conn {}: channel '{}' cid={} sid={}",
992 conn_id, pv_name, cid, sid
993 );
994 } else {
995 let resp =
996 encode_create_channel_error(cid, "PV not found", version, is_be);
997 state.registry.send_msg(conn_id, resp).await;
998 info!(
999 "Conn {}: channel '{}' not found (cid={})",
1000 conn_id, pv_name, cid
1001 );
1002 }
1003 }
1004 }
1005 PvaPacketCommand::Op(payload) => {
1006 if payload.is_server {
1007 continue;
1008 }
1009 let sid = payload.sid_or_cid;
1010 let ioid = payload.ioid;
1011 debug!(
1012 "Conn {}: op cmd={} ioid={} sid={} sub=0x{:02x} body_len={}",
1013 conn_id,
1014 payload.command,
1015 ioid,
1016 sid,
1017 payload.subcmd,
1018 payload.body.len()
1019 );
1020 let Some(pv_name) = conn_state.sid_to_pv.get(&sid).cloned() else {
1021 state
1022 .registry
1023 .send_msg(
1024 conn_id,
1025 encode_op_error(
1026 payload.command,
1027 ioid,
1028 "Unknown SID",
1029 version,
1030 is_be,
1031 ),
1032 )
1033 .await;
1034 continue;
1035 };
1036
1037 let is_init = (payload.subcmd & 0x08) != 0;
1038
1039 match payload.command {
1040 10 => {
1041 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1043 state
1044 .registry
1045 .send_msg(
1046 conn_id,
1047 encode_op_error(
1048 payload.command,
1049 ioid,
1050 "PV not found",
1051 version,
1052 is_be,
1053 ),
1054 )
1055 .await;
1056 continue;
1057 };
1058 if is_init {
1059 let desc = nt_payload_desc(&nt);
1060 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1061 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1062 let resp = encode_op_init_response_desc(
1063 payload.command,
1064 ioid,
1065 0x08,
1066 &desc,
1067 version,
1068 is_be,
1069 );
1070 state.registry.send_msg(conn_id, resp).await;
1071 info!(
1072 "Conn {}: get init pv='{}' ioid={}",
1073 conn_id, pv_name, ioid
1074 );
1075 } else {
1076 let resp = encode_op_get_data_response_payload(
1077 ioid, &nt, version, is_be,
1078 );
1079 state.registry.send_msg(conn_id, resp).await;
1080 debug!(
1081 "Conn {}: get data pv='{}' ioid={}",
1082 conn_id, pv_name, ioid
1083 );
1084 }
1085 }
1086 11 => {
1087 if is_init {
1089 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1090 state
1091 .registry
1092 .send_msg(
1093 conn_id,
1094 encode_op_error(
1095 payload.command,
1096 ioid,
1097 "PV not found",
1098 version,
1099 is_be,
1100 ),
1101 )
1102 .await;
1103 continue;
1104 };
1105 if !is_virtual_event_pv(&pv_name)
1106 && !is_writable_pv(&state, &pv_name).await
1107 {
1108 let resp = encode_op_put_status_response(
1109 ioid,
1110 0x08,
1111 "Write access denied",
1112 version,
1113 is_be,
1114 );
1115 state.registry.send_msg(conn_id, resp).await;
1116 continue;
1117 }
1118 let desc = nt_payload_desc(&nt);
1119 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1120 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1121 let resp = encode_op_init_response_desc(
1122 payload.command,
1123 ioid,
1124 0x08,
1125 &desc,
1126 version,
1127 is_be,
1128 );
1129 state.registry.send_msg(conn_id, resp).await;
1130 info!(
1131 "Conn {}: put init pv='{}' ioid={}",
1132 conn_id, pv_name, ioid
1133 );
1134 } else {
1135 if (payload.subcmd & 0x40) != 0 {
1136 if !is_virtual_event_pv(&pv_name)
1137 && !is_writable_pv(&state, &pv_name).await
1138 {
1139 let resp = encode_op_put_status_response(
1140 ioid,
1141 0x40,
1142 "Write access denied",
1143 version,
1144 is_be,
1145 );
1146 state.registry.send_msg(conn_id, resp).await;
1147 continue;
1148 }
1149 if let Some(nt) =
1150 get_nt_snapshot(&state, &pv_name).await
1151 {
1152 let resp = encode_op_put_getput_response_payload(
1153 ioid, &nt, version, is_be,
1154 );
1155 state.registry.send_msg(conn_id, resp).await;
1156 debug!(
1157 "Conn {}: put get-put pv='{}' ioid={}",
1158 conn_id, pv_name, ioid
1159 );
1160 } else {
1161 state
1162 .registry
1163 .send_msg(
1164 conn_id,
1165 encode_op_error(
1166 payload.command,
1167 ioid,
1168 "PV not found",
1169 version,
1170 is_be,
1171 ),
1172 )
1173 .await;
1174 }
1175 continue;
1176 }
1177 let desc = match conn_state.ioid_to_desc.get(&ioid) {
1178 Some(d) => d.clone(),
1179 None => {
1180 state
1181 .registry
1182 .send_msg(
1183 conn_id,
1184 encode_op_error(
1185 payload.command,
1186 ioid,
1187 "PUT without init",
1188 version,
1189 is_be,
1190 ),
1191 )
1192 .await;
1193 continue;
1194 }
1195 };
1196 let decoded = decode_put_body(&payload.body, &desc, is_be);
1197 if let Some(value) = decoded.as_ref() {
1198 match state.store.put_value(&pv_name, value).await {
1199 Ok(changed) => {
1200 notify_changed_records(&state, changed).await;
1201 }
1202 Err(msg) => {
1203 let resp = encode_op_put_status_response(
1204 ioid, 0x00, &msg, version, is_be,
1205 );
1206 state.registry.send_msg(conn_id, resp).await;
1207 continue;
1208 }
1209 }
1210 } else {
1211 debug!(
1212 "Conn {}: put decode failed ioid={} body_len={}",
1213 conn_id,
1214 ioid,
1215 payload.body.len()
1216 );
1217 }
1218 let resp = encode_op_put_response(ioid, version, is_be);
1219 state.registry.send_msg(conn_id, resp).await;
1220 debug!(
1221 "Conn {}: put data pv='{}' ioid={}",
1222 conn_id, pv_name, ioid
1223 );
1224 }
1225 }
1226 12 => {
1227 if is_init {
1229 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1230 state
1231 .registry
1232 .send_msg(
1233 conn_id,
1234 encode_op_error(
1235 payload.command,
1236 ioid,
1237 "PV not found",
1238 version,
1239 is_be,
1240 ),
1241 )
1242 .await;
1243 continue;
1244 };
1245 if !is_virtual_event_pv(&pv_name)
1246 && !is_writable_pv(&state, &pv_name).await
1247 {
1248 let resp = encode_op_put_get_init_error_response(
1249 ioid,
1250 "Write access denied",
1251 version,
1252 is_be,
1253 );
1254 state.registry.send_msg(conn_id, resp).await;
1255 continue;
1256 }
1257 let desc = nt_payload_desc(&nt);
1258 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1259 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1260 let resp = encode_op_put_get_init_response(
1261 ioid, &desc, &desc, version, is_be,
1262 );
1263 state.registry.send_msg(conn_id, resp).await;
1264 info!(
1265 "Conn {}: put_get init pv='{}' ioid={}",
1266 conn_id, pv_name, ioid
1267 );
1268 } else {
1269 let desc = match conn_state.ioid_to_desc.get(&ioid) {
1270 Some(d) => d.clone(),
1271 None => {
1272 state
1273 .registry
1274 .send_msg(
1275 conn_id,
1276 encode_op_error(
1277 payload.command,
1278 ioid,
1279 "PUT_GET without init",
1280 version,
1281 is_be,
1282 ),
1283 )
1284 .await;
1285 continue;
1286 }
1287 };
1288 let decoded = decode_put_body(&payload.body, &desc, is_be);
1289 if let Some(value) = decoded.as_ref() {
1290 match state.store.put_value(&pv_name, value).await {
1291 Ok(changed) => {
1292 notify_changed_records(&state, changed).await;
1293 }
1294 Err(msg) => {
1295 let resp = encode_op_put_get_data_error_response(
1296 ioid, &msg, version, is_be,
1297 );
1298 state.registry.send_msg(conn_id, resp).await;
1299 continue;
1300 }
1301 }
1302 } else {
1303 debug!(
1304 "Conn {}: put_get decode failed ioid={} body_len={}",
1305 conn_id,
1306 ioid,
1307 payload.body.len()
1308 );
1309 }
1310 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1311 let resp = encode_op_put_get_data_response_payload(
1312 ioid, &nt, version, is_be,
1313 );
1314 state.registry.send_msg(conn_id, resp).await;
1315 } else {
1316 state
1317 .registry
1318 .send_msg(
1319 conn_id,
1320 encode_op_error(
1321 payload.command,
1322 ioid,
1323 "PV not found",
1324 version,
1325 is_be,
1326 ),
1327 )
1328 .await;
1329 }
1330 debug!(
1331 "Conn {}: put_get data pv='{}' ioid={}",
1332 conn_id, pv_name, ioid
1333 );
1334 }
1335 }
1336 13 => {
1337 if is_init {
1339 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1340 state
1341 .registry
1342 .send_msg(
1343 conn_id,
1344 encode_op_error(
1345 payload.command,
1346 ioid,
1347 "PV not found",
1348 version,
1349 is_be,
1350 ),
1351 )
1352 .await;
1353 continue;
1354 };
1355 let desc = nt_payload_desc(&nt);
1356 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1357 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1358 let pipeline_enabled = (payload.subcmd & 0x80) != 0;
1359 let mut nfree = 0u32;
1360 if pipeline_enabled && payload.body.len() >= 4 {
1361 let start = payload.body.len() - 4;
1362 nfree = if is_be {
1363 u32::from_be_bytes([
1364 payload.body[start],
1365 payload.body[start + 1],
1366 payload.body[start + 2],
1367 payload.body[start + 3],
1368 ])
1369 } else {
1370 u32::from_le_bytes([
1371 payload.body[start],
1372 payload.body[start + 1],
1373 payload.body[start + 2],
1374 payload.body[start + 3],
1375 ])
1376 };
1377 }
1378 let resp = encode_op_init_response_desc(
1379 payload.command,
1380 ioid,
1381 0x08,
1382 &desc,
1383 version,
1384 is_be,
1385 );
1386 state.registry.send_msg(conn_id, resp).await;
1387 conn_state.ioid_to_monitor.insert(
1388 ioid,
1389 MonitorState {
1390 running: false,
1391 pipeline_enabled,
1392 nfree,
1393 },
1394 );
1395 {
1396 let mut monitors = state.registry.monitors.lock().await;
1397 monitors
1398 .entry(pv_name.clone())
1399 .or_default()
1400 .push(MonitorSub {
1401 conn_id,
1402 ioid,
1403 version,
1404 is_be,
1405 running: false,
1406 pipeline_enabled,
1407 nfree,
1408 });
1409 }
1410 info!(
1411 "Conn {}: monitor init pv='{}' ioid={}",
1412 conn_id, pv_name, ioid
1413 );
1414 } else if (payload.subcmd & 0x10) != 0 {
1415 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1417 let resp = encode_monitor_data_response_payload(
1418 ioid, 0x10, &nt, version, is_be,
1419 );
1420 state.registry.send_msg(conn_id, resp).await;
1421 }
1422 state
1423 .registry
1424 .remove_monitor_subscription(conn_id, ioid, &pv_name)
1425 .await;
1426 conn_state.ioid_to_monitor.remove(&ioid);
1427 conn_state.ioid_to_pv.remove(&ioid);
1428 conn_state.ioid_to_desc.remove(&ioid);
1429 info!("Conn {}: monitor end ioid={}", conn_id, ioid);
1430 } else if (payload.subcmd & 0x04) != 0
1431 || (payload.subcmd & 0x80) != 0
1432 {
1433 let start = (payload.subcmd & 0x44) == 0x44;
1435 let stop = (payload.subcmd & 0x44) == 0x04;
1436 let pipeline_ack = (payload.subcmd & 0x80) != 0;
1437 let mut nfree = None;
1438 if pipeline_ack && payload.body.len() >= 4 {
1439 let v = if is_be {
1440 u32::from_be_bytes([
1441 payload.body[0],
1442 payload.body[1],
1443 payload.body[2],
1444 payload.body[3],
1445 ])
1446 } else {
1447 u32::from_le_bytes([
1448 payload.body[0],
1449 payload.body[1],
1450 payload.body[2],
1451 payload.body[3],
1452 ])
1453 };
1454 nfree = Some(v);
1455 }
1456 let running = if start {
1457 true
1458 } else if stop {
1459 false
1460 } else {
1461 conn_state
1462 .ioid_to_monitor
1463 .get(&ioid)
1464 .map(|m| m.running)
1465 .unwrap_or(true)
1466 };
1467 state
1468 .registry
1469 .update_monitor_subscription(
1470 conn_id,
1471 ioid,
1472 &pv_name,
1473 running,
1474 nfree,
1475 Some(pipeline_ack),
1476 )
1477 .await;
1478 if let Some(mon) = conn_state.ioid_to_monitor.get_mut(&ioid) {
1479 mon.running = running;
1480 if pipeline_ack {
1481 mon.pipeline_enabled = true;
1482 }
1483 if let Some(v) = nfree {
1484 if pipeline_ack {
1485 mon.nfree = mon.nfree.saturating_add(v);
1486 } else {
1487 mon.nfree = v;
1488 }
1489 }
1490 }
1491 info!(
1492 "Conn {}: monitor {} ioid={} ack={} nfree={:?}",
1493 conn_id,
1494 if start {
1495 "start"
1496 } else if stop {
1497 "stop"
1498 } else {
1499 "ack"
1500 },
1501 ioid,
1502 pipeline_ack,
1503 nfree
1504 );
1505 if start {
1506 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1507 state
1508 .registry
1509 .send_monitor_update_for(
1510 &pv_name, conn_id, ioid, &nt,
1511 )
1512 .await;
1513 }
1514 }
1515 }
1516 }
1517 20 => {
1518 if is_server_rpc_pv(&pv_name) {
1520 handle_server_rpc(
1521 &state,
1522 conn_id,
1523 ioid,
1524 payload.subcmd,
1525 version,
1526 is_be,
1527 )
1528 .await;
1529 } else {
1530 state
1531 .registry
1532 .send_msg(
1533 conn_id,
1534 encode_op_error(
1535 payload.command,
1536 ioid,
1537 "Operation not supported",
1538 version,
1539 is_be,
1540 ),
1541 )
1542 .await;
1543 }
1544 }
1545 14 | 16 => {
1546 state
1547 .registry
1548 .send_msg(
1549 conn_id,
1550 encode_op_error(
1551 payload.command,
1552 ioid,
1553 "Operation not supported",
1554 version,
1555 is_be,
1556 ),
1557 )
1558 .await;
1559 }
1560 _ => {
1561 state
1562 .registry
1563 .send_msg(
1564 conn_id,
1565 encode_op_error(
1566 payload.command,
1567 ioid,
1568 "Operation not supported",
1569 version,
1570 is_be,
1571 ),
1572 )
1573 .await;
1574 }
1575 }
1576 }
1577 PvaPacketCommand::DestroyChannel(payload) => {
1578 let sid = payload.sid;
1579 let cid = payload.cid;
1580 conn_state.cid_to_sid.remove(&cid);
1581 conn_state.sid_to_pv.remove(&sid);
1582 info!(
1583 "Conn {}: channel destroyed sid={} cid={}",
1584 conn_id, sid, cid
1585 );
1586 }
1587 PvaPacketCommand::DestroyRequest(payload) => {
1588 let ioid = payload.request_id;
1589 if let Some(pv_name) = conn_state.ioid_to_pv.remove(&ioid) {
1590 state
1591 .registry
1592 .remove_monitor_subscription(conn_id, ioid, &pv_name)
1593 .await;
1594 conn_state.ioid_to_desc.remove(&ioid);
1595 conn_state.ioid_to_monitor.remove(&ioid);
1596 info!("Conn {}: monitor unsubscribed ioid={}", conn_id, ioid);
1597 }
1598 }
1599 PvaPacketCommand::AuthNZ(_) => {
1600 let resp =
1601 encode_message_error("AUTHNZ command is not supported", version, is_be);
1602 state.registry.send_msg(conn_id, resp).await;
1603 }
1604 PvaPacketCommand::AclChange(_) => {
1605 let resp =
1606 encode_message_error("ACL_CHANGE command is not supported", version, is_be);
1607 state.registry.send_msg(conn_id, resp).await;
1608 }
1609 PvaPacketCommand::GetField(payload) => {
1610 handle_get_field_request(
1611 &state,
1612 &conn_state,
1613 conn_id,
1614 payload,
1615 version,
1616 is_be,
1617 )
1618 .await;
1619 }
1620 PvaPacketCommand::Echo(payload_bytes) => {
1621 let mut resp = encode_header(
1622 true,
1623 is_be,
1624 false,
1625 version,
1626 2,
1627 payload_bytes.len() as u32,
1628 );
1629 resp.extend_from_slice(&payload_bytes);
1630 state.registry.send_msg(conn_id, resp).await;
1631 }
1632 PvaPacketCommand::Message(_) => {
1633 let resp =
1634 encode_message_error("MESSAGE command is not supported", version, is_be);
1635 state.registry.send_msg(conn_id, resp).await;
1636 }
1637 PvaPacketCommand::MultipleData(_) => {
1638 let resp = encode_message_error(
1639 "MULTIPLE_DATA command is not supported",
1640 version,
1641 is_be,
1642 );
1643 state.registry.send_msg(conn_id, resp).await;
1644 }
1645 PvaPacketCommand::CancelRequest(_) => {
1646 let resp = encode_message_error(
1647 "CANCEL_REQUEST command is not supported",
1648 version,
1649 is_be,
1650 );
1651 state.registry.send_msg(conn_id, resp).await;
1652 }
1653 PvaPacketCommand::OriginTag(_) => {
1654 let resp = encode_message_error(
1655 "ORIGIN_TAG command is not supported",
1656 version,
1657 is_be,
1658 );
1659 state.registry.send_msg(conn_id, resp).await;
1660 }
1661 PvaPacketCommand::Search(_)
1662 | PvaPacketCommand::SearchResponse(_)
1663 | PvaPacketCommand::Beacon(_) => {
1664 let resp = encode_message_error(
1665 "Unexpected command for server endpoint",
1666 version,
1667 is_be,
1668 );
1669 state.registry.send_msg(conn_id, resp).await;
1670 }
1671 PvaPacketCommand::Unknown(payload) => {
1672 let resp = encode_message_error(
1673 &format!("Unknown command {}", payload.command),
1674 version,
1675 is_be,
1676 );
1677 state.registry.send_msg(conn_id, resp).await;
1678 }
1679 }
1680 }
1681
1682 state.registry.cleanup_connection(conn_id).await;
1683 let _ = writer_task.await;
1684 Ok(())
1685}