1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
9use std::time::{Duration, Instant, SystemTime};
10
11use regex::Regex;
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use spvirit_codec::epics_decode::{PvaHeader, PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvd_decode::{StructureDesc, extract_subfield_desc};
19use spvirit_codec::spvd_encode::{
20 decode_pv_request_fields, filter_structure_desc, nt_payload_desc,
21};
22use spvirit_codec::spvirit_encode::{
23 encode_connection_validation, encode_control_message, encode_create_channel_error,
24 encode_create_channel_response, encode_get_field_error, encode_get_field_response,
25 encode_header, encode_message_error, encode_monitor_data_response_payload,
26 encode_op_data_response_filtered, encode_op_error, encode_op_get_data_response_payload,
27 encode_op_init_response_desc, encode_op_put_get_data_error_response,
28 encode_op_put_get_data_response_payload, encode_op_put_get_init_error_response,
29 encode_op_put_get_init_response, encode_op_put_getput_response_payload, encode_op_put_response,
30 encode_op_put_status_response, encode_op_rpc_data_response_payload,
31 encode_op_status_error_response, encode_op_status_response, encode_search_response,
32 ip_from_bytes, ip_to_bytes,
33};
34
35use spvirit_types::{NtPayload, NtScalar, NtScalarArray, ScalarArrayValue, ScalarValue};
36
37use crate::decode::{assemble_segmented_message, decode_put_body};
38use crate::monitor::MonitorRegistry;
39use crate::pvstore::PvStore;
40use crate::state::{ConnState, MonitorState, MonitorSub};
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PvListMode {
49 Off,
51 Discover,
53 List,
55}
56
57impl PvListMode {
58 pub fn parse(raw: &str) -> Result<Self, String> {
59 match raw.trim().to_ascii_lowercase().as_str() {
60 "off" => Ok(Self::Off),
61 "discover" => Ok(Self::Discover),
62 "list" => Ok(Self::List),
63 other => Err(format!(
64 "Invalid pvlist-mode '{}'; expected off|discover|list",
65 other
66 )),
67 }
68 }
69}
70
71pub struct ServerState<S: PvStore> {
77 pub store: Arc<S>,
78 pub registry: Arc<MonitorRegistry>,
79 pub sid_counter: AtomicU32,
80 pub beacon_change: Arc<AtomicU16>,
81 pub compute_alarms: bool,
82 pub pvlist_mode: PvListMode,
83 pub pvlist_max: usize,
84 pub pvlist_allow_pattern: Option<Regex>,
85 pub guid: [u8; 12],
86 pub tcp_port: u16,
87 pub advertise_ip: Option<IpAddr>,
88 pub listen_ip: IpAddr,
89}
90
91impl<S: PvStore> ServerState<S> {
92 pub fn new(
93 store: Arc<S>,
94 registry: Arc<MonitorRegistry>,
95 compute_alarms: bool,
96 pvlist_mode: PvListMode,
97 pvlist_max: usize,
98 pvlist_allow_pattern: Option<Regex>,
99 guid: [u8; 12],
100 tcp_port: u16,
101 advertise_ip: Option<IpAddr>,
102 listen_ip: IpAddr,
103 ) -> Self {
104 Self {
105 store,
106 registry,
107 sid_counter: AtomicU32::new(1),
108 beacon_change: Arc::new(AtomicU16::new(0)),
109 compute_alarms,
110 pvlist_mode,
111 pvlist_max,
112 pvlist_allow_pattern,
113 guid,
114 tcp_port,
115 advertise_ip,
116 listen_ip,
117 }
118 }
119}
120
121pub fn is_pvlist_virtual_pv(pv_name: &str) -> bool {
126 pv_name == "__pvlist"
127}
128
129pub fn is_server_rpc_pv(pv_name: &str) -> bool {
130 pv_name == "server"
131}
132
133pub fn is_virtual_event_pv(pv_name: &str) -> bool {
134 pv_name.starts_with("__event:")
135}
136
137pub fn virtual_event_nt(pv_name: &str) -> NtPayload {
138 NtPayload::Scalar(
139 NtScalar::from_value(ScalarValue::Bool(false))
140 .with_description(format!("Virtual event trigger for {}", pv_name)),
141 )
142}
143
144pub fn virtual_pvlist_nt(entries: Vec<String>) -> NtPayload {
145 NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(entries)))
146}
147
148pub fn is_pattern_query(raw: &str) -> bool {
153 raw.contains('*') || raw.contains('?')
154}
155
156pub fn wildcard_match(pattern: &str, text: &str) -> bool {
157 let p = pattern.as_bytes();
158 let t = text.as_bytes();
159 let mut i = 0usize;
160 let mut j = 0usize;
161 let mut star: Option<usize> = None;
162 let mut match_j = 0usize;
163
164 while j < t.len() {
165 if i < p.len() && (p[i] == b'?' || p[i] == t[j]) {
166 i += 1;
167 j += 1;
168 } else if i < p.len() && p[i] == b'*' {
169 star = Some(i);
170 i += 1;
171 match_j = j;
172 } else if let Some(star_idx) = star {
173 i = star_idx + 1;
174 match_j += 1;
175 j = match_j;
176 } else {
177 return false;
178 }
179 }
180
181 while i < p.len() && p[i] == b'*' {
182 i += 1;
183 }
184 i == p.len()
185}
186
187pub fn collect_visible_pv_names(
188 all_names: &[String],
189 mode: PvListMode,
190 allow_pattern: Option<&Regex>,
191 max_items: usize,
192) -> Vec<String> {
193 let mut names: Vec<String> = all_names
194 .iter()
195 .filter(|name| {
196 allow_pattern
197 .as_ref()
198 .map(|re| re.is_match(name))
199 .unwrap_or(true)
200 })
201 .cloned()
202 .collect();
203 names.sort();
204 if names.len() > max_items {
205 names.truncate(max_items);
206 }
207 if mode == PvListMode::List && names.len() < max_items {
208 names.push("__pvlist".to_string());
209 }
210 names
211}
212
213fn build_pvlist_structure(names: &[String]) -> StructureDesc {
214 use spvirit_codec::spvd_decode::{FieldDesc, FieldType, TypeCode};
215 StructureDesc {
216 struct_id: Some("epics:pva/pvlist:1.0".to_string()),
217 fields: names
218 .iter()
219 .map(|name| FieldDesc {
220 name: name.clone(),
221 field_type: FieldType::Scalar(TypeCode::Boolean),
222 })
223 .collect(),
224 }
225}
226
227fn requested_pvlist_pattern(field_name: Option<&str>) -> Option<&str> {
228 let raw = field_name.map(str::trim).unwrap_or("");
229 if raw.is_empty() || raw == "*" || raw == "__pvlist" || raw.eq_ignore_ascii_case("pvlist") {
230 return Some("*");
231 }
232 if is_pattern_query(raw) {
233 return Some(raw);
234 }
235 None
236}
237
238pub fn search_reply_target(addr: &[u8; 16], port: u16, peer: SocketAddr) -> SocketAddr {
243 let target_port = if port != 0 { port } else { peer.port() };
244 let target_ip = ip_from_bytes(addr)
245 .filter(|ip| !ip.is_unspecified())
246 .unwrap_or_else(|| peer.ip());
247 SocketAddr::new(target_ip, target_port)
248}
249
250pub fn infer_udp_response_ip(peer: SocketAddr) -> Option<IpAddr> {
251 let bind_addr = if peer.is_ipv4() {
252 "0.0.0.0:0"
253 } else {
254 "[::]:0"
255 };
256 let sock = std::net::UdpSocket::bind(bind_addr).ok()?;
257 sock.connect(peer).ok()?;
258 let local = sock.local_addr().ok()?;
259 if local.ip().is_unspecified() {
260 None
261 } else {
262 Some(local.ip())
263 }
264}
265
266pub fn rand_guid() -> [u8; 12] {
267 let now = SystemTime::now()
268 .duration_since(SystemTime::UNIX_EPOCH)
269 .unwrap_or_default();
270 let mut guid = [0u8; 12];
271 let bytes = now.as_nanos().to_le_bytes();
272 guid.copy_from_slice(&bytes[0..12]);
273 guid
274}
275
276pub fn validate_encoded_packet(conn_id: u64, label: &str, bytes: &[u8]) {
281 let mut pkt = PvaPacket::new(bytes);
282 let decoded = pkt.decode_payload();
283 match decoded {
284 Some(PvaPacketCommand::ConnectionValidation(payload)) => {
285 debug!(
286 "Conn {}: {} decoded as cmd=1 buffer_size={} qos={} authz={:?}",
287 conn_id, label, payload.buffer_size, payload.qos, payload.authz
288 );
289 }
290 Some(PvaPacketCommand::ConnectionValidated(_)) => {
291 debug!("Conn {}: {} decoded as cmd=9", conn_id, label);
292 }
293 Some(other) => {
294 debug!("Conn {}: {} decoded as {:?}", conn_id, label, other);
295 }
296 None => {
297 debug!("Conn {}: {} failed to decode", conn_id, label);
298 }
299 }
300}
301
302pub fn dump_hex_packet(
303 conn_id: u64,
304 dir: &str,
305 label: &str,
306 version: u8,
307 is_be: bool,
308 bytes: &[u8],
309) {
310 debug!(
311 "Conn {}: {} {} ver={} be={} len={}",
312 conn_id,
313 dir,
314 label,
315 version,
316 is_be,
317 bytes.len()
318 );
319 let mut offset = 0usize;
320 while offset < bytes.len() {
321 let end = usize::min(offset + 16, bytes.len());
322 let chunk = &bytes[offset..end];
323 let mut line = String::new();
324 for (i, b) in chunk.iter().enumerate() {
325 if i > 0 {
326 line.push(' ');
327 }
328 line.push_str(&format!("{:02x}", b));
329 }
330 debug!("Conn {}: {:04x} {}", conn_id, offset, line);
331 offset += 16;
332 }
333}
334
335async fn get_nt_snapshot<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> Option<NtPayload> {
340 if is_pvlist_virtual_pv(pv_name) {
341 if state.pvlist_mode != PvListMode::List {
342 return None;
343 }
344 let all_names = state.store.list_pvs().await;
345 let names = collect_visible_pv_names(
346 &all_names,
347 state.pvlist_mode,
348 state.pvlist_allow_pattern.as_ref(),
349 state.pvlist_max,
350 );
351 return Some(virtual_pvlist_nt(names));
352 }
353 if is_virtual_event_pv(pv_name) {
354 return Some(virtual_event_nt(pv_name));
355 }
356 state.store.get_snapshot(pv_name).await
357}
358
359async fn is_writable_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
360 if is_virtual_event_pv(pv_name) {
361 return true;
362 }
363 state.store.is_writable(pv_name).await
364}
365
366async fn has_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
367 state.store.has_pv(pv_name).await
368 || is_virtual_event_pv(pv_name)
369 || (is_pvlist_virtual_pv(pv_name) && state.pvlist_mode == PvListMode::List)
370 || (is_server_rpc_pv(pv_name) && state.pvlist_mode != PvListMode::Off)
371}
372
373async fn notify_changed_records<S: PvStore>(
378 state: &ServerState<S>,
379 changed: Vec<(String, NtPayload)>,
380) {
381 for (name, payload) in changed {
382 state.beacon_change.fetch_add(1, Ordering::SeqCst);
383 state.registry.notify_monitors(&name, &payload).await;
384 }
385}
386
387async fn handle_get_field_request<S: PvStore>(
392 state: &ServerState<S>,
393 conn_state: &ConnState,
394 conn_id: u64,
395 payload: spvirit_codec::epics_decode::PvaGetFieldPayload,
396 version: u8,
397 is_be: bool,
398) {
399 if payload.is_server {
400 let resp = encode_get_field_error(
401 payload.cid,
402 "Unexpected server GET_FIELD payload",
403 version,
404 is_be,
405 );
406 state.registry.send_msg(conn_id, resp).await;
407 return;
408 }
409
410 let request_id = payload.ioid.unwrap_or(payload.cid);
411
412 let sid = payload
413 .sid
414 .or_else(|| conn_state.cid_to_sid.get(&payload.cid).copied())
415 .or_else(|| {
416 conn_state
417 .sid_to_pv
418 .contains_key(&payload.cid)
419 .then_some(payload.cid)
420 })
421 .or_else(|| {
422 (payload.cid == 0 && conn_state.sid_to_pv.len() == 1)
423 .then(|| conn_state.sid_to_pv.keys().copied().next())
424 .flatten()
425 });
426
427 if let Some(sid) = sid {
428 if let Some(pv_name) = conn_state.sid_to_pv.get(&sid) {
429 if let Some(nt) = get_nt_snapshot(state, pv_name).await {
430 let full_desc = nt_payload_desc(&nt);
431 let sub = payload.field_name.as_deref().filter(|s| !s.is_empty());
432 let desc = if let Some(field_path) = sub {
433 match extract_subfield_desc(&full_desc, field_path) {
434 Some(sub_desc) => sub_desc,
435 None => {
436 let resp = encode_get_field_error(
437 request_id,
438 &format!("sub-field '{}' not found", field_path),
439 version,
440 is_be,
441 );
442 state.registry.send_msg(conn_id, resp).await;
443 return;
444 }
445 }
446 } else {
447 full_desc
448 };
449 let resp = encode_get_field_response(request_id, &desc, version, is_be);
450 dump_hex_packet(conn_id, "tx", "cmd=17 get_field", version, is_be, &resp);
451 state.registry.send_msg(conn_id, resp).await;
452 debug!(
453 "Conn {}: get_field cid={} sid={:?} ioid={:?} resolved_sid={} pv='{}' field={:?}",
454 conn_id,
455 payload.cid,
456 payload.sid,
457 payload.ioid,
458 sid,
459 pv_name,
460 payload.field_name
461 );
462 return;
463 }
464 let resp = encode_get_field_error(request_id, "PV not found", version, is_be);
465 state.registry.send_msg(conn_id, resp).await;
466 return;
467 }
468 }
469
470 if state.pvlist_mode != PvListMode::List {
471 let resp = encode_get_field_error(
472 request_id,
473 "GET_FIELD listing is disabled (set --pvlist-mode=list)",
474 version,
475 is_be,
476 );
477 state.registry.send_msg(conn_id, resp).await;
478 return;
479 }
480
481 let Some(pattern) = requested_pvlist_pattern(payload.field_name.as_deref()) else {
482 let resp = encode_get_field_error(
483 request_id,
484 "GET_FIELD requires a valid list pattern",
485 version,
486 is_be,
487 );
488 state.registry.send_msg(conn_id, resp).await;
489 return;
490 };
491
492 let all_names = state.store.list_pvs().await;
493 let mut names = collect_visible_pv_names(
494 &all_names,
495 state.pvlist_mode,
496 state.pvlist_allow_pattern.as_ref(),
497 state.pvlist_max,
498 );
499 if pattern != "*" {
500 names.retain(|name| wildcard_match(pattern, name));
501 }
502 if names.is_empty() {
503 let resp =
504 encode_get_field_error(request_id, "No PVs matched list request", version, is_be);
505 state.registry.send_msg(conn_id, resp).await;
506 return;
507 }
508 let desc = build_pvlist_structure(&names);
509 let resp = encode_get_field_response(request_id, &desc, version, is_be);
510 dump_hex_packet(
511 conn_id,
512 "tx",
513 "cmd=17 get_field_list",
514 version,
515 is_be,
516 &resp,
517 );
518 state.registry.send_msg(conn_id, resp).await;
519 debug!(
520 "Conn {}: get_field list pattern='{}' returned {} entries",
521 conn_id,
522 pattern,
523 names.len()
524 );
525}
526
527async fn handle_server_rpc<S: PvStore>(
532 state: &ServerState<S>,
533 conn_id: u64,
534 ioid: u32,
535 subcmd: u8,
536 version: u8,
537 is_be: bool,
538) {
539 if state.pvlist_mode != PvListMode::List {
540 let resp = encode_op_status_error_response(
541 20,
542 ioid,
543 subcmd,
544 "RPC list endpoint disabled (set --pvlist-mode=list)",
545 version,
546 is_be,
547 );
548 state.registry.send_msg(conn_id, resp).await;
549 return;
550 }
551
552 let all_names = state.store.list_pvs().await;
553 let names = collect_visible_pv_names(
554 &all_names,
555 state.pvlist_mode,
556 state.pvlist_allow_pattern.as_ref(),
557 state.pvlist_max,
558 );
559 let payload = NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(names)));
560
561 let is_init = (subcmd & 0x08) != 0;
562 if is_init {
563 let resp = encode_op_status_response(20, ioid, subcmd, version, is_be);
564 state.registry.send_msg(conn_id, resp).await;
565 return;
566 }
567
568 let resp = encode_op_rpc_data_response_payload(ioid, subcmd, &payload, version, is_be);
569 state.registry.send_msg(conn_id, resp).await;
570}
571
572async fn handle_control_message<S: PvStore>(
577 state: &ServerState<S>,
578 conn_id: u64,
579 header: &PvaHeader,
580) {
581 debug!(
582 "Conn {}: control (segmented) cmd={} data={}",
583 conn_id, header.command, header.payload_length
584 );
585 if header.command == 3 {
586 let resp = encode_control_message(
587 true,
588 header.flags.is_msb,
589 header.version,
590 4,
591 header.payload_length,
592 );
593 state.registry.send_msg(conn_id, resp).await;
594 }
595}
596
597pub async fn run_udp_search<S: PvStore>(
603 state: Arc<ServerState<S>>,
604 addr: SocketAddr,
605 tcp_port: u16,
606 guid: [u8; 12],
607 advertise_ip: Option<IpAddr>,
608) -> Result<(), Box<dyn std::error::Error>> {
609 let socket = UdpSocket::bind(addr).await?;
610 socket.set_broadcast(true)?;
611 let mut buf = vec![0u8; 4096];
612
613 loop {
614 let (len, peer) = socket.recv_from(&mut buf).await?;
615 let data = &buf[..len];
616 let header = PvaHeader::new(data);
617 if header.flags.is_control || header.command != 3 {
618 continue;
619 }
620 let mut pkt = PvaPacket::new(data);
621 let Some(cmd) = pkt.decode_payload() else {
622 continue;
623 };
624 let version = pkt.header.version;
625 let is_be = pkt.header.flags.is_msb;
626 match cmd {
627 PvaPacketCommand::Search(payload) => {
628 debug!(
629 "UDP search from {}: pv_count={} mask=0x{:02x}",
630 peer,
631 payload.pv_requests.len(),
632 payload.mask
633 );
634 let accepts_tcp = payload.protocols.is_empty()
635 || payload
636 .protocols
637 .iter()
638 .any(|p| p.eq_ignore_ascii_case("tcp"));
639 if !accepts_tcp {
640 debug!("UDP search: no compatible protocol (tcp not accepted)");
641 continue;
642 }
643 let all_names = state.store.list_pvs().await;
644 let visible_names = collect_visible_pv_names(
645 &all_names,
646 state.pvlist_mode,
647 state.pvlist_allow_pattern.as_ref(),
648 state.pvlist_max,
649 );
650 let mut cids = Vec::new();
651 for (cid, name) in &payload.pv_requests {
652 if state.store.has_pv(name).await
653 || is_virtual_event_pv(name)
654 || (is_pvlist_virtual_pv(name) && state.pvlist_mode == PvListMode::List)
655 || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
656 {
657 cids.push(*cid);
658 continue;
659 }
660 if state.pvlist_mode != PvListMode::Off
661 && is_pattern_query(name)
662 && visible_names.iter().any(|pv| wildcard_match(name, pv))
663 {
664 cids.push(*cid);
665 }
666 }
667 let response_required = (payload.mask & 0x01) != 0;
668 let server_discovery_ping = payload.pv_requests.is_empty();
669 let found = server_discovery_ping || !cids.is_empty();
670 if !found && !response_required {
671 debug!("UDP search: no matches and response not required");
672 continue;
673 }
674 let resp_ip = if let Some(ip) = advertise_ip {
675 ip
676 } else if !addr.ip().is_unspecified() {
677 addr.ip()
678 } else if let Some(ip) = infer_udp_response_ip(peer) {
679 debug!("UDP search: inferred response address {}", ip);
680 ip
681 } else {
682 IpAddr::V4(Ipv4Addr::UNSPECIFIED)
683 };
684 let addr_bytes = if resp_ip.is_unspecified() {
685 debug!("UDP search: responding with zero address (unspecified listen)");
686 [0u8; 16]
687 } else {
688 ip_to_bytes(resp_ip)
689 };
690 let response = encode_search_response(
691 guid,
692 payload.seq,
693 addr_bytes,
694 tcp_port,
695 "tcp",
696 found,
697 &cids,
698 version,
699 is_be,
700 );
701 let reply_target = search_reply_target(&payload.addr, payload.port, peer);
702 if let Err(e) = socket.send_to(&response, reply_target).await {
703 debug!(
704 "UDP search: failed sending {} matches to {}: {}",
705 cids.len(),
706 reply_target,
707 e
708 );
709 continue;
710 }
711 debug!(
712 "UDP search: responded found={} with {} matches to {}",
713 found,
714 cids.len(),
715 reply_target
716 );
717 }
718 _ => {}
719 }
720 }
721}
722
723pub async fn run_tcp_server<S: PvStore>(
729 state: Arc<ServerState<S>>,
730 addr: SocketAddr,
731 conn_timeout: Duration,
732) -> Result<(), Box<dyn std::error::Error>> {
733 let listener = TcpListener::bind(addr).await?;
734 let conn_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
735
736 loop {
737 let (stream, peer) = listener.accept().await?;
738 let id = conn_id.fetch_add(1, Ordering::SeqCst);
739 info!("TCP connection {} from {}", id, peer);
740 let state_clone = state.clone();
741 tokio::spawn(async move {
742 if let Err(e) = handle_connection(state_clone, stream, id, conn_timeout).await {
743 error!("Connection {} error: {}", id, e);
744 }
745 });
746 }
747}
748
749pub async fn handle_connection<S: PvStore>(
759 state: Arc<ServerState<S>>,
760 stream: TcpStream,
761 conn_id: u64,
762 conn_timeout: Duration,
763) -> Result<(), Box<dyn std::error::Error>> {
764 let (mut reader, mut writer) = stream.into_split();
765 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(128);
766
767 {
768 let mut conns = state.registry.conns.lock().await;
769 conns.insert(conn_id, tx);
770 }
771
772 let writer_task = tokio::spawn(async move {
773 while let Some(msg) = rx.recv().await {
774 if writer.write_all(&msg).await.is_err() {
775 break;
776 }
777 }
778 });
779
780 let mut conn_state = ConnState::default();
781
782 let set_byte_order = encode_control_message(true, false, 2, 2, 0);
784 validate_encoded_packet(conn_id, "set_byte_order", &set_byte_order);
785 dump_hex_packet(
786 conn_id,
787 "tx",
788 "ctrl=2 set_byte_order",
789 2,
790 false,
791 &set_byte_order,
792 );
793 state.registry.send_msg(conn_id, set_byte_order).await;
794
795 let server_validation = encode_connection_validation(16_384, 512, &["anonymous", "ca"], 2, false);
797 validate_encoded_packet(conn_id, "server_validation_init", &server_validation);
798 dump_hex_packet(
799 conn_id,
800 "tx",
801 "cmd=1 server_validation_init",
802 2,
803 false,
804 &server_validation,
805 );
806 state.registry.send_msg(conn_id, server_validation).await;
807
808 let mut last_activity = Instant::now();
809
810 loop {
811 let mut header = [0u8; 8];
812 let elapsed = last_activity.elapsed();
813 if elapsed >= conn_timeout {
814 info!("Conn {} idle timeout", conn_id);
815 break;
816 }
817 let remaining = conn_timeout - elapsed;
818 let read_header = tokio::time::timeout(remaining, reader.read_exact(&mut header)).await;
819 match read_header {
820 Ok(Ok(_)) => {}
821 Ok(Err(_)) => break,
822 Err(_) => {
823 info!("Conn {} idle timeout", conn_id);
824 break;
825 }
826 }
827 let header_pkt = PvaPacket::new(&header);
828 let payload_len = if header_pkt.header.flags.is_control {
829 0usize
830 } else {
831 header_pkt.header.payload_length as usize
832 };
833 let mut payload = vec![0u8; payload_len];
834 if payload_len > 0 {
835 let elapsed = last_activity.elapsed();
836 if elapsed >= conn_timeout {
837 info!("Conn {} idle timeout", conn_id);
838 break;
839 }
840 let remaining = conn_timeout - elapsed;
841 let read_payload =
842 tokio::time::timeout(remaining, reader.read_exact(&mut payload)).await;
843 match read_payload {
844 Ok(Ok(_)) => {}
845 Ok(Err(_)) => break,
846 Err(_) => {
847 info!("Conn {} idle timeout", conn_id);
848 break;
849 }
850 }
851 }
852 last_activity = Instant::now();
853 let mut full = header.to_vec();
854 full.extend_from_slice(&payload);
855
856 if header_pkt.header.flags.is_segmented != 0 && !header_pkt.header.flags.is_control {
858 debug!(
859 "Conn {}: segmented message cmd={} seg=0x{:02x}",
860 conn_id, header_pkt.header.command, header_pkt.header.flags.is_segmented
861 );
862 let mut payloads = vec![payload];
863 let mut seg_flags = header_pkt.header.flags;
864 while !seg_flags.is_last_segment {
865 let mut seg_header = [0u8; 8];
866 let elapsed = last_activity.elapsed();
867 if elapsed >= conn_timeout {
868 info!("Conn {} idle timeout", conn_id);
869 break;
870 }
871 let remaining = conn_timeout - elapsed;
872 let read_header =
873 tokio::time::timeout(remaining, reader.read_exact(&mut seg_header)).await;
874 match read_header {
875 Ok(Ok(_)) => {}
876 Ok(Err(_)) => break,
877 Err(_) => {
878 info!("Conn {} idle timeout", conn_id);
879 break;
880 }
881 }
882
883 let seg_header_pkt = PvaPacket::new(&seg_header);
884 let seg_payload_len = if seg_header_pkt.header.flags.is_control {
885 0usize
886 } else {
887 seg_header_pkt.header.payload_length as usize
888 };
889 let mut seg_payload = vec![0u8; seg_payload_len];
890 if seg_payload_len > 0 {
891 let elapsed = last_activity.elapsed();
892 if elapsed >= conn_timeout {
893 info!("Conn {} idle timeout", conn_id);
894 break;
895 }
896 let remaining = conn_timeout - elapsed;
897 let read_payload =
898 tokio::time::timeout(remaining, reader.read_exact(&mut seg_payload)).await;
899 match read_payload {
900 Ok(Ok(_)) => {}
901 Ok(Err(_)) => break,
902 Err(_) => {
903 info!("Conn {} idle timeout", conn_id);
904 break;
905 }
906 }
907 }
908 last_activity = Instant::now();
909
910 if seg_header_pkt.header.flags.is_control {
911 handle_control_message(&state, conn_id, &seg_header_pkt.header).await;
912 continue;
913 }
914 if seg_header_pkt.header.flags.is_segmented == 0 {
915 debug!(
916 "Conn {}: segmented message interrupted by non-segmented cmd={}",
917 conn_id, seg_header_pkt.header.command
918 );
919 break;
920 }
921 payloads.push(seg_payload);
922 seg_flags = seg_header_pkt.header.flags;
923 }
924 full = assemble_segmented_message(header, payloads);
925 }
926
927 let mut pkt = PvaPacket::new(&full);
928 let Some(cmd) = pkt.decode_payload() else {
929 continue;
930 };
931 let version = pkt.header.version;
932 let is_be = pkt.header.flags.is_msb;
933 let cmd_code = pkt.header.command;
934 let payload_slice = if full.len() >= 8 { &full[8..] } else { &[] };
935
936 if cmd_code == 1 {
938 dump_hex_packet(conn_id, "rx", "cmd=1 validation", version, is_be, &full);
939 let validation = spvirit_codec::epics_decode::PvaConnectionValidationPayload::new(
940 payload_slice,
941 is_be,
942 false,
943 );
944 if let Some(val) = validation {
945 debug!(
946 "Conn {}: validation request (cmd=1) ver={} be={} buf={} qos={} authz={:?}",
947 conn_id, version, is_be, val.buffer_size, val.qos, val.authz
948 );
949 let resp = spvirit_codec::spvirit_encode::encode_connection_validated(
950 true, version, is_be,
951 );
952 validate_encoded_packet(conn_id, "conn_validated_resp", &resp);
953 dump_hex_packet(
954 conn_id,
955 "tx",
956 "cmd=9 connection_validated",
957 version,
958 is_be,
959 &resp,
960 );
961 state.registry.send_msg(conn_id, resp).await;
962 continue;
963 }
964 }
965 if cmd_code == 17 {
966 dump_hex_packet(conn_id, "rx", "cmd=17 get_field", version, is_be, &full);
967 }
968
969 match cmd {
970 PvaPacketCommand::Control(payload) => {
971 debug!("Conn {}: control {}", conn_id, payload);
972 if payload.command == 3 {
973 let resp = encode_control_message(true, is_be, version, 4, payload.data);
974 state.registry.send_msg(conn_id, resp).await;
975 }
976 continue;
977 }
978 PvaPacketCommand::ConnectionValidation(_) => {
979 debug!("Conn {}: validation request (decoded)", conn_id);
980 }
981 PvaPacketCommand::ConnectionValidated(_) => {
982 debug!("Conn {}: validation confirmed (decoded)", conn_id);
983 }
984 PvaPacketCommand::CreateChannel(payload) => {
985 debug!(
986 "Conn {}: create_channel count={}",
987 conn_id,
988 payload.channels.len()
989 );
990 for (cid, pv_name) in payload.channels {
991 if has_pv(&state, &pv_name).await {
992 let sid = state.sid_counter.fetch_add(1, Ordering::SeqCst);
993 conn_state.cid_to_sid.insert(cid, sid);
994 conn_state.sid_to_pv.insert(sid, pv_name.clone());
995 let resp = encode_create_channel_response(cid, sid, version, is_be);
996 state.registry.send_msg(conn_id, resp).await;
997 info!(
998 "Conn {}: channel '{}' cid={} sid={}",
999 conn_id, pv_name, cid, sid
1000 );
1001 } else {
1002 let resp = encode_create_channel_error(cid, "PV not found", version, is_be);
1003 state.registry.send_msg(conn_id, resp).await;
1004 info!(
1005 "Conn {}: channel '{}' not found (cid={})",
1006 conn_id, pv_name, cid
1007 );
1008 }
1009 }
1010 }
1011 PvaPacketCommand::Op(payload) => {
1012 if payload.is_server {
1013 continue;
1014 }
1015 let sid = payload.sid_or_cid;
1016 let ioid = payload.ioid;
1017 debug!(
1018 "Conn {}: op cmd={} ioid={} sid={} sub=0x{:02x} body_len={}",
1019 conn_id,
1020 payload.command,
1021 ioid,
1022 sid,
1023 payload.subcmd,
1024 payload.body.len()
1025 );
1026 let Some(pv_name) = conn_state.sid_to_pv.get(&sid).cloned() else {
1027 state
1028 .registry
1029 .send_msg(
1030 conn_id,
1031 encode_op_error(
1032 payload.command,
1033 payload.subcmd,
1034 ioid,
1035 "Unknown SID",
1036 version,
1037 is_be,
1038 ),
1039 )
1040 .await;
1041 continue;
1042 };
1043
1044 let is_init = (payload.subcmd & 0x08) != 0;
1045
1046 match payload.command {
1047 10 => {
1048 if is_init {
1050 let full_desc = if let Some(desc) =
1053 state.store.get_descriptor(&pv_name).await
1054 {
1055 desc
1056 } else if let Some(nt) =
1057 get_nt_snapshot(&state, &pv_name).await
1058 {
1059 nt_payload_desc(&nt)
1060 } else {
1061 state
1062 .registry
1063 .send_msg(
1064 conn_id,
1065 encode_op_error(
1066 payload.command,
1067 payload.subcmd,
1068 ioid,
1069 "PV not found",
1070 version,
1071 is_be,
1072 ),
1073 )
1074 .await;
1075 continue;
1076 };
1077 let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1078 let desc = match &pv_req_fields {
1079 Some(fields) => filter_structure_desc(&full_desc, fields),
1080 None => full_desc,
1081 };
1082 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1083 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1084 let resp = encode_op_init_response_desc(
1085 payload.command,
1086 ioid,
1087 0x08,
1088 &desc,
1089 version,
1090 is_be,
1091 );
1092 state.registry.send_msg(conn_id, resp).await;
1093 info!("Conn {}: get init pv='{}' ioid={}", conn_id, pv_name, ioid);
1094 } else {
1095 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1096 state
1097 .registry
1098 .send_msg(
1099 conn_id,
1100 encode_op_error(
1101 payload.command,
1102 payload.subcmd,
1103 ioid,
1104 "PV has no data yet",
1105 version,
1106 is_be,
1107 ),
1108 )
1109 .await;
1110 continue;
1111 };
1112 let resp = if let Some(desc) = conn_state.ioid_to_desc.get(&ioid) {
1113 encode_op_data_response_filtered(
1114 10, ioid, &nt, desc, version, is_be,
1115 )
1116 } else {
1117 encode_op_get_data_response_payload(ioid, &nt, version, is_be)
1118 };
1119 state.registry.send_msg(conn_id, resp).await;
1120 debug!("Conn {}: get data pv='{}' ioid={}", conn_id, pv_name, ioid);
1121 }
1122 }
1123 11 => {
1124 if is_init {
1126 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1127 state
1128 .registry
1129 .send_msg(
1130 conn_id,
1131 encode_op_error(
1132 payload.command,
1133 payload.subcmd,
1134 ioid,
1135 "PV not found",
1136 version,
1137 is_be,
1138 ),
1139 )
1140 .await;
1141 continue;
1142 };
1143 if !is_virtual_event_pv(&pv_name)
1144 && !is_writable_pv(&state, &pv_name).await
1145 {
1146 let resp = encode_op_put_status_response(
1147 ioid,
1148 0x08,
1149 "Write access denied",
1150 version,
1151 is_be,
1152 );
1153 state.registry.send_msg(conn_id, resp).await;
1154 continue;
1155 }
1156 let desc = nt_payload_desc(&nt);
1157 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1158 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1159 let resp = encode_op_init_response_desc(
1160 payload.command,
1161 ioid,
1162 0x08,
1163 &desc,
1164 version,
1165 is_be,
1166 );
1167 state.registry.send_msg(conn_id, resp).await;
1168 info!("Conn {}: put init pv='{}' ioid={}", conn_id, pv_name, ioid);
1169 } else {
1170 if (payload.subcmd & 0x40) != 0 {
1171 if !is_virtual_event_pv(&pv_name)
1172 && !is_writable_pv(&state, &pv_name).await
1173 {
1174 let resp = encode_op_put_status_response(
1175 ioid,
1176 0x40,
1177 "Write access denied",
1178 version,
1179 is_be,
1180 );
1181 state.registry.send_msg(conn_id, resp).await;
1182 continue;
1183 }
1184 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1185 let resp = encode_op_put_getput_response_payload(
1186 ioid, &nt, version, is_be,
1187 );
1188 state.registry.send_msg(conn_id, resp).await;
1189 debug!(
1190 "Conn {}: put get-put pv='{}' ioid={}",
1191 conn_id, pv_name, ioid
1192 );
1193 } else {
1194 state
1195 .registry
1196 .send_msg(
1197 conn_id,
1198 encode_op_error(
1199 payload.command,
1200 payload.subcmd,
1201 ioid,
1202 "PV not found",
1203 version,
1204 is_be,
1205 ),
1206 )
1207 .await;
1208 }
1209 continue;
1210 }
1211 let desc = match conn_state.ioid_to_desc.get(&ioid) {
1212 Some(d) => d.clone(),
1213 None => {
1214 state
1215 .registry
1216 .send_msg(
1217 conn_id,
1218 encode_op_error(
1219 payload.command,
1220 payload.subcmd,
1221 ioid,
1222 "PUT without init",
1223 version,
1224 is_be,
1225 ),
1226 )
1227 .await;
1228 continue;
1229 }
1230 };
1231 let decoded = decode_put_body(&payload.body, &desc, is_be);
1232 if let Some(value) = decoded.as_ref() {
1233 match state.store.put_value(&pv_name, value).await {
1234 Ok(changed) => {
1235 notify_changed_records(&state, changed).await;
1236 }
1237 Err(msg) => {
1238 let resp = encode_op_put_status_response(
1239 ioid,
1240 payload.subcmd,
1241 &msg,
1242 version,
1243 is_be,
1244 );
1245 state.registry.send_msg(conn_id, resp).await;
1246 continue;
1247 }
1248 }
1249 } else {
1250 debug!(
1251 "Conn {}: put decode failed ioid={} body_len={}",
1252 conn_id,
1253 ioid,
1254 payload.body.len()
1255 );
1256 let resp = encode_op_put_status_response(
1257 ioid,
1258 payload.subcmd,
1259 "cannot decode PUT body",
1260 version,
1261 is_be,
1262 );
1263 state.registry.send_msg(conn_id, resp).await;
1264 continue;
1265 }
1266 let resp = encode_op_put_response(ioid, payload.subcmd, version, is_be);
1267 state.registry.send_msg(conn_id, resp).await;
1268 debug!("Conn {}: put data pv='{}' ioid={}", conn_id, pv_name, ioid);
1269 }
1270 }
1271 12 => {
1272 if is_init {
1274 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1275 state
1276 .registry
1277 .send_msg(
1278 conn_id,
1279 encode_op_error(
1280 payload.command,
1281 payload.subcmd,
1282 ioid,
1283 "PV not found",
1284 version,
1285 is_be,
1286 ),
1287 )
1288 .await;
1289 continue;
1290 };
1291 if !is_virtual_event_pv(&pv_name)
1292 && !is_writable_pv(&state, &pv_name).await
1293 {
1294 let resp = encode_op_put_get_init_error_response(
1295 ioid,
1296 "Write access denied",
1297 version,
1298 is_be,
1299 );
1300 state.registry.send_msg(conn_id, resp).await;
1301 continue;
1302 }
1303 let desc = nt_payload_desc(&nt);
1304 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1305 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1306 let resp =
1307 encode_op_put_get_init_response(ioid, &desc, &desc, version, is_be);
1308 state.registry.send_msg(conn_id, resp).await;
1309 info!(
1310 "Conn {}: put_get init pv='{}' ioid={}",
1311 conn_id, pv_name, ioid
1312 );
1313 } else {
1314 let desc = match conn_state.ioid_to_desc.get(&ioid) {
1315 Some(d) => d.clone(),
1316 None => {
1317 state
1318 .registry
1319 .send_msg(
1320 conn_id,
1321 encode_op_error(
1322 payload.command,
1323 payload.subcmd,
1324 ioid,
1325 "PUT_GET without init",
1326 version,
1327 is_be,
1328 ),
1329 )
1330 .await;
1331 continue;
1332 }
1333 };
1334 let decoded = decode_put_body(&payload.body, &desc, is_be);
1335 if let Some(value) = decoded.as_ref() {
1336 match state.store.put_value(&pv_name, value).await {
1337 Ok(changed) => {
1338 notify_changed_records(&state, changed).await;
1339 }
1340 Err(msg) => {
1341 let resp = encode_op_put_get_data_error_response(
1342 ioid, &msg, version, is_be,
1343 );
1344 state.registry.send_msg(conn_id, resp).await;
1345 continue;
1346 }
1347 }
1348 } else {
1349 debug!(
1350 "Conn {}: put_get decode failed ioid={} body_len={}",
1351 conn_id,
1352 ioid,
1353 payload.body.len()
1354 );
1355 let resp = encode_op_put_get_data_error_response(
1356 ioid,
1357 "cannot decode PUT body",
1358 version,
1359 is_be,
1360 );
1361 state.registry.send_msg(conn_id, resp).await;
1362 continue;
1363 }
1364 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1365 let resp = encode_op_put_get_data_response_payload(
1366 ioid, &nt, version, is_be,
1367 );
1368 state.registry.send_msg(conn_id, resp).await;
1369 } else {
1370 state
1371 .registry
1372 .send_msg(
1373 conn_id,
1374 encode_op_error(
1375 payload.command,
1376 payload.subcmd,
1377 ioid,
1378 "PV not found",
1379 version,
1380 is_be,
1381 ),
1382 )
1383 .await;
1384 }
1385 debug!(
1386 "Conn {}: put_get data pv='{}' ioid={}",
1387 conn_id, pv_name, ioid
1388 );
1389 }
1390 }
1391 13 => {
1392 if is_init {
1394 let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1395 state
1396 .registry
1397 .send_msg(
1398 conn_id,
1399 encode_op_error(
1400 payload.command,
1401 payload.subcmd,
1402 ioid,
1403 "PV not found",
1404 version,
1405 is_be,
1406 ),
1407 )
1408 .await;
1409 continue;
1410 };
1411 let full_desc = nt_payload_desc(&nt);
1412 let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1413 let desc = match &pv_req_fields {
1414 Some(fields) => filter_structure_desc(&full_desc, fields),
1415 None => full_desc,
1416 };
1417 conn_state.ioid_to_desc.insert(ioid, desc.clone());
1418 conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1419 let pipeline_enabled = (payload.subcmd & 0x80) != 0;
1420 let mut nfree = 0u32;
1421 if pipeline_enabled && payload.body.len() >= 4 {
1422 let start = payload.body.len() - 4;
1423 nfree = if is_be {
1424 u32::from_be_bytes([
1425 payload.body[start],
1426 payload.body[start + 1],
1427 payload.body[start + 2],
1428 payload.body[start + 3],
1429 ])
1430 } else {
1431 u32::from_le_bytes([
1432 payload.body[start],
1433 payload.body[start + 1],
1434 payload.body[start + 2],
1435 payload.body[start + 3],
1436 ])
1437 };
1438 }
1439 let resp = encode_op_init_response_desc(
1440 payload.command,
1441 ioid,
1442 0x08,
1443 &desc,
1444 version,
1445 is_be,
1446 );
1447 state.registry.send_msg(conn_id, resp).await;
1448 conn_state.ioid_to_monitor.insert(
1449 ioid,
1450 MonitorState {
1451 running: false,
1452 pipeline_enabled,
1453 nfree,
1454 },
1455 );
1456 {
1457 let mut monitors = state.registry.monitors.lock().await;
1458 monitors
1459 .entry(pv_name.clone())
1460 .or_default()
1461 .push(MonitorSub {
1462 conn_id,
1463 ioid,
1464 version,
1465 is_be,
1466 running: false,
1467 pipeline_enabled,
1468 nfree,
1469 filtered_desc: conn_state.ioid_to_desc.get(&ioid).cloned(),
1470 });
1471 }
1472 info!(
1473 "Conn {}: monitor init pv='{}' ioid={}",
1474 conn_id, pv_name, ioid
1475 );
1476 } else if (payload.subcmd & 0x10) != 0 {
1477 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1479 let resp = encode_monitor_data_response_payload(
1480 ioid, 0x10, &nt, version, is_be,
1481 );
1482 state.registry.send_msg(conn_id, resp).await;
1483 }
1484 state
1485 .registry
1486 .remove_monitor_subscription(conn_id, ioid, &pv_name)
1487 .await;
1488 conn_state.ioid_to_monitor.remove(&ioid);
1489 conn_state.ioid_to_pv.remove(&ioid);
1490 conn_state.ioid_to_desc.remove(&ioid);
1491 info!("Conn {}: monitor end ioid={}", conn_id, ioid);
1492 } else if (payload.subcmd & 0x04) != 0 || (payload.subcmd & 0x80) != 0 {
1493 let start = (payload.subcmd & 0x44) == 0x44;
1495 let stop = (payload.subcmd & 0x44) == 0x04;
1496 let pipeline_ack = (payload.subcmd & 0x80) != 0;
1497 let mut nfree = None;
1498 if pipeline_ack && payload.body.len() >= 4 {
1499 let v = if is_be {
1500 u32::from_be_bytes([
1501 payload.body[0],
1502 payload.body[1],
1503 payload.body[2],
1504 payload.body[3],
1505 ])
1506 } else {
1507 u32::from_le_bytes([
1508 payload.body[0],
1509 payload.body[1],
1510 payload.body[2],
1511 payload.body[3],
1512 ])
1513 };
1514 nfree = Some(v);
1515 }
1516 let running = if start {
1517 true
1518 } else if stop {
1519 false
1520 } else {
1521 conn_state
1522 .ioid_to_monitor
1523 .get(&ioid)
1524 .map(|m| m.running)
1525 .unwrap_or(true)
1526 };
1527 state
1528 .registry
1529 .update_monitor_subscription(
1530 conn_id,
1531 ioid,
1532 &pv_name,
1533 running,
1534 nfree,
1535 Some(pipeline_ack),
1536 )
1537 .await;
1538 if let Some(mon) = conn_state.ioid_to_monitor.get_mut(&ioid) {
1539 mon.running = running;
1540 if pipeline_ack {
1541 mon.pipeline_enabled = true;
1542 }
1543 if let Some(v) = nfree {
1544 if pipeline_ack {
1545 mon.nfree = mon.nfree.saturating_add(v);
1546 } else {
1547 mon.nfree = v;
1548 }
1549 }
1550 }
1551 info!(
1552 "Conn {}: monitor {} ioid={} ack={} nfree={:?}",
1553 conn_id,
1554 if start {
1555 "start"
1556 } else if stop {
1557 "stop"
1558 } else {
1559 "ack"
1560 },
1561 ioid,
1562 pipeline_ack,
1563 nfree
1564 );
1565 if start {
1566 if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1567 state
1568 .registry
1569 .send_monitor_update_for(&pv_name, conn_id, ioid, &nt)
1570 .await;
1571 }
1572 }
1573 }
1574 }
1575 20 => {
1576 if is_server_rpc_pv(&pv_name) {
1578 handle_server_rpc(
1579 &state,
1580 conn_id,
1581 ioid,
1582 payload.subcmd,
1583 version,
1584 is_be,
1585 )
1586 .await;
1587 } else {
1588 state
1589 .registry
1590 .send_msg(
1591 conn_id,
1592 encode_op_error(
1593 payload.command,
1594 payload.subcmd,
1595 ioid,
1596 "Operation not supported",
1597 version,
1598 is_be,
1599 ),
1600 )
1601 .await;
1602 }
1603 }
1604 14 | 16 => {
1605 state
1606 .registry
1607 .send_msg(
1608 conn_id,
1609 encode_op_error(
1610 payload.command,
1611 payload.subcmd,
1612 ioid,
1613 "Operation not supported",
1614 version,
1615 is_be,
1616 ),
1617 )
1618 .await;
1619 }
1620 _ => {
1621 state
1622 .registry
1623 .send_msg(
1624 conn_id,
1625 encode_op_error(
1626 payload.command,
1627 payload.subcmd,
1628 ioid,
1629 "Operation not supported",
1630 version,
1631 is_be,
1632 ),
1633 )
1634 .await;
1635 }
1636 }
1637 }
1638 PvaPacketCommand::DestroyChannel(payload) => {
1639 let sid = payload.sid;
1640 let cid = payload.cid;
1641 conn_state.cid_to_sid.remove(&cid);
1642 conn_state.sid_to_pv.remove(&sid);
1643 info!(
1644 "Conn {}: channel destroyed sid={} cid={}",
1645 conn_id, sid, cid
1646 );
1647 }
1648 PvaPacketCommand::DestroyRequest(payload) => {
1649 let ioid = payload.request_id;
1650 if let Some(pv_name) = conn_state.ioid_to_pv.remove(&ioid) {
1651 state
1652 .registry
1653 .remove_monitor_subscription(conn_id, ioid, &pv_name)
1654 .await;
1655 conn_state.ioid_to_desc.remove(&ioid);
1656 conn_state.ioid_to_monitor.remove(&ioid);
1657 info!("Conn {}: monitor unsubscribed ioid={}", conn_id, ioid);
1658 }
1659 }
1660 PvaPacketCommand::AuthNZ(_) => {
1661 debug!("Conn {}: ignoring AUTHNZ", conn_id);
1663 }
1664 PvaPacketCommand::AclChange(_) => {
1665 let resp =
1666 encode_message_error("ACL_CHANGE command is not supported", version, is_be);
1667 state.registry.send_msg(conn_id, resp).await;
1668 }
1669 PvaPacketCommand::GetField(payload) => {
1670 handle_get_field_request(&state, &conn_state, conn_id, payload, version, is_be)
1671 .await;
1672 }
1673 PvaPacketCommand::Echo(payload_bytes) => {
1674 let mut resp =
1675 encode_header(true, is_be, false, version, 2, payload_bytes.len() as u32);
1676 resp.extend_from_slice(&payload_bytes);
1677 state.registry.send_msg(conn_id, resp).await;
1678 }
1679 PvaPacketCommand::Message(_) => {
1680 let resp = encode_message_error("MESSAGE command is not supported", version, is_be);
1681 state.registry.send_msg(conn_id, resp).await;
1682 }
1683 PvaPacketCommand::MultipleData(_) => {
1684 let resp =
1685 encode_message_error("MULTIPLE_DATA command is not supported", version, is_be);
1686 state.registry.send_msg(conn_id, resp).await;
1687 }
1688 PvaPacketCommand::CancelRequest(_) => {
1689 let resp =
1690 encode_message_error("CANCEL_REQUEST command is not supported", version, is_be);
1691 state.registry.send_msg(conn_id, resp).await;
1692 }
1693 PvaPacketCommand::OriginTag(_) => {
1694 let resp =
1695 encode_message_error("ORIGIN_TAG command is not supported", version, is_be);
1696 state.registry.send_msg(conn_id, resp).await;
1697 }
1698 PvaPacketCommand::Search(payload) => {
1699 debug!(
1700 "Conn {}: TCP search: pv_count={} mask=0x{:02x}",
1701 conn_id,
1702 payload.pv_requests.len(),
1703 payload.mask
1704 );
1705 let accepts_tcp = payload.protocols.is_empty()
1706 || payload
1707 .protocols
1708 .iter()
1709 .any(|p| p.eq_ignore_ascii_case("tcp"));
1710 if accepts_tcp {
1711 let all_names = state.store.list_pvs().await;
1712 let visible_names = collect_visible_pv_names(
1713 &all_names,
1714 state.pvlist_mode,
1715 state.pvlist_allow_pattern.as_ref(),
1716 state.pvlist_max,
1717 );
1718 let mut cids = Vec::new();
1719 for (cid, name) in &payload.pv_requests {
1720 if state.store.has_pv(name).await
1721 || is_virtual_event_pv(name)
1722 || (is_pvlist_virtual_pv(name)
1723 && state.pvlist_mode == PvListMode::List)
1724 || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
1725 {
1726 cids.push(*cid);
1727 continue;
1728 }
1729 if state.pvlist_mode != PvListMode::Off
1730 && is_pattern_query(name)
1731 && visible_names.iter().any(|pv| wildcard_match(name, pv))
1732 {
1733 cids.push(*cid);
1734 }
1735 }
1736 let server_discovery_ping = payload.pv_requests.is_empty();
1737 let found = server_discovery_ping || !cids.is_empty();
1738 let resp_ip = state.advertise_ip.unwrap_or(state.listen_ip);
1739 let addr_bytes = if resp_ip.is_unspecified() {
1740 [0u8; 16]
1741 } else {
1742 ip_to_bytes(resp_ip)
1743 };
1744 let response = encode_search_response(
1745 state.guid,
1746 payload.seq,
1747 addr_bytes,
1748 state.tcp_port,
1749 "tcp",
1750 found,
1751 &cids,
1752 version,
1753 is_be,
1754 );
1755 state.registry.send_msg(conn_id, response).await;
1756 debug!(
1757 "Conn {}: TCP search responded found={} matches={}",
1758 conn_id,
1759 found,
1760 cids.len()
1761 );
1762 } else {
1763 debug!("Conn {}: TCP search: no compatible protocol", conn_id);
1764 }
1765 }
1766 PvaPacketCommand::SearchResponse(_)
1767 | PvaPacketCommand::Beacon(_) => {
1768 let resp =
1769 encode_message_error("Unexpected command for server endpoint", version, is_be);
1770 state.registry.send_msg(conn_id, resp).await;
1771 }
1772 PvaPacketCommand::Unknown(payload) => {
1773 let resp = encode_message_error(
1774 &format!("Unknown command {}", payload.command),
1775 version,
1776 is_be,
1777 );
1778 state.registry.send_msg(conn_id, resp).await;
1779 }
1780 }
1781 }
1782
1783 state.registry.cleanup_connection(conn_id).await;
1784 let _ = writer_task.await;
1785 Ok(())
1786}