Skip to main content

spvirit_server/
handler.rs

1//! PVA protocol handler — the core TCP connection processor.
2//!
3//! [`handle_connection`] is generic over [`PvStore`], allowing any backend to
4//! serve PVs over the EPICS PVAccess protocol.
5
6use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
9use std::time::{Duration, Instant, SystemTime};
10
11use regex::Regex;
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use spvirit_codec::epics_decode::{PvaHeader, PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvd_decode::{StructureDesc, extract_subfield_desc};
19use spvirit_codec::spvd_encode::{
20    decode_pv_request_fields, filter_structure_desc, nt_payload_desc,
21};
22use spvirit_codec::spvirit_encode::{
23    encode_connection_validation, encode_control_message, encode_create_channel_error,
24    encode_create_channel_response, encode_get_field_error, encode_get_field_response,
25    encode_header, encode_message_error, encode_monitor_data_response_payload,
26    encode_op_data_response_filtered, encode_op_error, encode_op_get_data_response_payload,
27    encode_op_init_response_desc, encode_op_put_get_data_error_response,
28    encode_op_put_get_data_response_payload, encode_op_put_get_init_error_response,
29    encode_op_put_get_init_response, encode_op_put_getput_response_payload, encode_op_put_response,
30    encode_op_put_status_response, encode_op_rpc_data_response_payload,
31    encode_op_status_error_response, encode_op_status_response, encode_search_response,
32    ip_from_bytes, ip_to_bytes,
33};
34
35use spvirit_types::{NtPayload, NtScalar, NtScalarArray, ScalarArrayValue, ScalarValue};
36
37use crate::decode::{assemble_segmented_message, decode_put_body};
38use crate::monitor::MonitorRegistry;
39use crate::pvstore::PvStore;
40use crate::state::{ConnState, MonitorState, MonitorSub};
41
42// ---------------------------------------------------------------------------
43// PvListMode — controls virtual PV listing behaviour
44// ---------------------------------------------------------------------------
45
46/// Controls how the server exposes its PV directory.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PvListMode {
49    /// No PV listing at all.
50    Off,
51    /// Respond to UDP search for known PVs only; no GET_FIELD listing.
52    Discover,
53    /// Full pvlist & server-RPC listing support.
54    List,
55}
56
57impl PvListMode {
58    pub fn parse(raw: &str) -> Result<Self, String> {
59        match raw.trim().to_ascii_lowercase().as_str() {
60            "off" => Ok(Self::Off),
61            "discover" => Ok(Self::Discover),
62            "list" => Ok(Self::List),
63            other => Err(format!(
64                "Invalid pvlist-mode '{}'; expected off|discover|list",
65                other
66            )),
67        }
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Server shared state — generic over PvStore
73// ---------------------------------------------------------------------------
74
75/// Shared server state that is passed to every connection handler.
76pub struct ServerState<S: PvStore> {
77    pub store: Arc<S>,
78    pub registry: Arc<MonitorRegistry>,
79    pub sid_counter: AtomicU32,
80    pub beacon_change: Arc<AtomicU16>,
81    pub compute_alarms: bool,
82    pub pvlist_mode: PvListMode,
83    pub pvlist_max: usize,
84    pub pvlist_allow_pattern: Option<Regex>,
85    pub guid: [u8; 12],
86    pub tcp_port: u16,
87    pub advertise_ip: Option<IpAddr>,
88    pub listen_ip: IpAddr,
89}
90
91impl<S: PvStore> ServerState<S> {
92    pub fn new(
93        store: Arc<S>,
94        registry: Arc<MonitorRegistry>,
95        compute_alarms: bool,
96        pvlist_mode: PvListMode,
97        pvlist_max: usize,
98        pvlist_allow_pattern: Option<Regex>,
99        guid: [u8; 12],
100        tcp_port: u16,
101        advertise_ip: Option<IpAddr>,
102        listen_ip: IpAddr,
103    ) -> Self {
104        Self {
105            store,
106            registry,
107            sid_counter: AtomicU32::new(1),
108            beacon_change: Arc::new(AtomicU16::new(0)),
109            compute_alarms,
110            pvlist_mode,
111            pvlist_max,
112            pvlist_allow_pattern,
113            guid,
114            tcp_port,
115            advertise_ip,
116            listen_ip,
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// Virtual PV helpers
123// ---------------------------------------------------------------------------
124
125pub fn is_pvlist_virtual_pv(pv_name: &str) -> bool {
126    pv_name == "__pvlist"
127}
128
129pub fn is_server_rpc_pv(pv_name: &str) -> bool {
130    pv_name == "server"
131}
132
133pub fn is_virtual_event_pv(pv_name: &str) -> bool {
134    pv_name.starts_with("__event:")
135}
136
137pub fn virtual_event_nt(pv_name: &str) -> NtPayload {
138    NtPayload::Scalar(
139        NtScalar::from_value(ScalarValue::Bool(false))
140            .with_description(format!("Virtual event trigger for {}", pv_name)),
141    )
142}
143
144pub fn virtual_pvlist_nt(entries: Vec<String>) -> NtPayload {
145    NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(entries)))
146}
147
148// ---------------------------------------------------------------------------
149// Pattern / wildcard utilities
150// ---------------------------------------------------------------------------
151
152pub fn is_pattern_query(raw: &str) -> bool {
153    raw.contains('*') || raw.contains('?')
154}
155
156pub fn wildcard_match(pattern: &str, text: &str) -> bool {
157    let p = pattern.as_bytes();
158    let t = text.as_bytes();
159    let mut i = 0usize;
160    let mut j = 0usize;
161    let mut star: Option<usize> = None;
162    let mut match_j = 0usize;
163
164    while j < t.len() {
165        if i < p.len() && (p[i] == b'?' || p[i] == t[j]) {
166            i += 1;
167            j += 1;
168        } else if i < p.len() && p[i] == b'*' {
169            star = Some(i);
170            i += 1;
171            match_j = j;
172        } else if let Some(star_idx) = star {
173            i = star_idx + 1;
174            match_j += 1;
175            j = match_j;
176        } else {
177            return false;
178        }
179    }
180
181    while i < p.len() && p[i] == b'*' {
182        i += 1;
183    }
184    i == p.len()
185}
186
187pub fn collect_visible_pv_names(
188    all_names: &[String],
189    mode: PvListMode,
190    allow_pattern: Option<&Regex>,
191    max_items: usize,
192) -> Vec<String> {
193    let mut names: Vec<String> = all_names
194        .iter()
195        .filter(|name| {
196            allow_pattern
197                .as_ref()
198                .map(|re| re.is_match(name))
199                .unwrap_or(true)
200        })
201        .cloned()
202        .collect();
203    names.sort();
204    if names.len() > max_items {
205        names.truncate(max_items);
206    }
207    if mode == PvListMode::List && names.len() < max_items {
208        names.push("__pvlist".to_string());
209    }
210    names
211}
212
213fn build_pvlist_structure(names: &[String]) -> StructureDesc {
214    use spvirit_codec::spvd_decode::{FieldDesc, FieldType, TypeCode};
215    StructureDesc {
216        struct_id: Some("epics:pva/pvlist:1.0".to_string()),
217        fields: names
218            .iter()
219            .map(|name| FieldDesc {
220                name: name.clone(),
221                field_type: FieldType::Scalar(TypeCode::Boolean),
222            })
223            .collect(),
224    }
225}
226
227fn requested_pvlist_pattern(field_name: Option<&str>) -> Option<&str> {
228    let raw = field_name.map(str::trim).unwrap_or("");
229    if raw.is_empty() || raw == "*" || raw == "__pvlist" || raw.eq_ignore_ascii_case("pvlist") {
230        return Some("*");
231    }
232    if is_pattern_query(raw) {
233        return Some(raw);
234    }
235    None
236}
237
238// ---------------------------------------------------------------------------
239// Network helpers
240// ---------------------------------------------------------------------------
241
242pub fn search_reply_target(addr: &[u8; 16], port: u16, peer: SocketAddr) -> SocketAddr {
243    let target_port = if port != 0 { port } else { peer.port() };
244    let target_ip = ip_from_bytes(addr)
245        .filter(|ip| !ip.is_unspecified())
246        .unwrap_or_else(|| peer.ip());
247    SocketAddr::new(target_ip, target_port)
248}
249
250pub fn infer_udp_response_ip(peer: SocketAddr) -> Option<IpAddr> {
251    let bind_addr = if peer.is_ipv4() {
252        "0.0.0.0:0"
253    } else {
254        "[::]:0"
255    };
256    let sock = std::net::UdpSocket::bind(bind_addr).ok()?;
257    sock.connect(peer).ok()?;
258    let local = sock.local_addr().ok()?;
259    if local.ip().is_unspecified() {
260        None
261    } else {
262        Some(local.ip())
263    }
264}
265
266pub fn rand_guid() -> [u8; 12] {
267    let now = SystemTime::now()
268        .duration_since(SystemTime::UNIX_EPOCH)
269        .unwrap_or_default();
270    let mut guid = [0u8; 12];
271    let bytes = now.as_nanos().to_le_bytes();
272    guid.copy_from_slice(&bytes[0..12]);
273    guid
274}
275
276// ---------------------------------------------------------------------------
277// Debug utilities
278// ---------------------------------------------------------------------------
279
280pub fn validate_encoded_packet(conn_id: u64, label: &str, bytes: &[u8]) {
281    let mut pkt = PvaPacket::new(bytes);
282    let decoded = pkt.decode_payload();
283    match decoded {
284        Some(PvaPacketCommand::ConnectionValidation(payload)) => {
285            debug!(
286                "Conn {}: {} decoded as cmd=1 buffer_size={} qos={} authz={:?}",
287                conn_id, label, payload.buffer_size, payload.qos, payload.authz
288            );
289        }
290        Some(PvaPacketCommand::ConnectionValidated(_)) => {
291            debug!("Conn {}: {} decoded as cmd=9", conn_id, label);
292        }
293        Some(other) => {
294            debug!("Conn {}: {} decoded as {:?}", conn_id, label, other);
295        }
296        None => {
297            debug!("Conn {}: {} failed to decode", conn_id, label);
298        }
299    }
300}
301
302pub fn dump_hex_packet(
303    conn_id: u64,
304    dir: &str,
305    label: &str,
306    version: u8,
307    is_be: bool,
308    bytes: &[u8],
309) {
310    debug!(
311        "Conn {}: {} {} ver={} be={} len={}",
312        conn_id,
313        dir,
314        label,
315        version,
316        is_be,
317        bytes.len()
318    );
319    let mut offset = 0usize;
320    while offset < bytes.len() {
321        let end = usize::min(offset + 16, bytes.len());
322        let chunk = &bytes[offset..end];
323        let mut line = String::new();
324        for (i, b) in chunk.iter().enumerate() {
325            if i > 0 {
326                line.push(' ');
327            }
328            line.push_str(&format!("{:02x}", b));
329        }
330        debug!("Conn {}: {:04x} {}", conn_id, offset, line);
331        offset += 16;
332    }
333}
334
335// ---------------------------------------------------------------------------
336// Store-based snapshot/writable helpers (delegate to PvStore + virtual PVs)
337// ---------------------------------------------------------------------------
338
339async fn get_nt_snapshot<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> Option<NtPayload> {
340    if is_pvlist_virtual_pv(pv_name) {
341        if state.pvlist_mode != PvListMode::List {
342            return None;
343        }
344        let all_names = state.store.list_pvs().await;
345        let names = collect_visible_pv_names(
346            &all_names,
347            state.pvlist_mode,
348            state.pvlist_allow_pattern.as_ref(),
349            state.pvlist_max,
350        );
351        return Some(virtual_pvlist_nt(names));
352    }
353    if is_virtual_event_pv(pv_name) {
354        return Some(virtual_event_nt(pv_name));
355    }
356    state.store.get_snapshot(pv_name).await
357}
358
359async fn is_writable_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
360    if is_virtual_event_pv(pv_name) {
361        return true;
362    }
363    state.store.is_writable(pv_name).await
364}
365
366async fn has_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
367    state.store.has_pv(pv_name).await
368        || is_virtual_event_pv(pv_name)
369        || (is_pvlist_virtual_pv(pv_name) && state.pvlist_mode == PvListMode::List)
370        || (is_server_rpc_pv(pv_name) && state.pvlist_mode != PvListMode::Off)
371}
372
373// ---------------------------------------------------------------------------
374// Notify helpers
375// ---------------------------------------------------------------------------
376
377async fn notify_changed_records<S: PvStore>(
378    state: &ServerState<S>,
379    changed: Vec<(String, NtPayload)>,
380) {
381    for (name, payload) in changed {
382        state.beacon_change.fetch_add(1, Ordering::SeqCst);
383        state.registry.notify_monitors(&name, &payload).await;
384    }
385}
386
387// ---------------------------------------------------------------------------
388// GET_FIELD handler
389// ---------------------------------------------------------------------------
390
391async fn handle_get_field_request<S: PvStore>(
392    state: &ServerState<S>,
393    conn_state: &ConnState,
394    conn_id: u64,
395    payload: spvirit_codec::epics_decode::PvaGetFieldPayload,
396    version: u8,
397    is_be: bool,
398) {
399    if payload.is_server {
400        let resp = encode_get_field_error(
401            payload.cid,
402            "Unexpected server GET_FIELD payload",
403            version,
404            is_be,
405        );
406        state.registry.send_msg(conn_id, resp).await;
407        return;
408    }
409
410    let request_id = payload.ioid.unwrap_or(payload.cid);
411
412    let sid = payload
413        .sid
414        .or_else(|| conn_state.cid_to_sid.get(&payload.cid).copied())
415        .or_else(|| {
416            conn_state
417                .sid_to_pv
418                .contains_key(&payload.cid)
419                .then_some(payload.cid)
420        })
421        .or_else(|| {
422            (payload.cid == 0 && conn_state.sid_to_pv.len() == 1)
423                .then(|| conn_state.sid_to_pv.keys().copied().next())
424                .flatten()
425        });
426
427    if let Some(sid) = sid {
428        if let Some(pv_name) = conn_state.sid_to_pv.get(&sid) {
429            if let Some(nt) = get_nt_snapshot(state, pv_name).await {
430                let full_desc = nt_payload_desc(&nt);
431                let sub = payload.field_name.as_deref().filter(|s| !s.is_empty());
432                let desc = if let Some(field_path) = sub {
433                    match extract_subfield_desc(&full_desc, field_path) {
434                        Some(sub_desc) => sub_desc,
435                        None => {
436                            let resp = encode_get_field_error(
437                                request_id,
438                                &format!("sub-field '{}' not found", field_path),
439                                version,
440                                is_be,
441                            );
442                            state.registry.send_msg(conn_id, resp).await;
443                            return;
444                        }
445                    }
446                } else {
447                    full_desc
448                };
449                let resp = encode_get_field_response(request_id, &desc, version, is_be);
450                dump_hex_packet(conn_id, "tx", "cmd=17 get_field", version, is_be, &resp);
451                state.registry.send_msg(conn_id, resp).await;
452                debug!(
453                    "Conn {}: get_field cid={} sid={:?} ioid={:?} resolved_sid={} pv='{}' field={:?}",
454                    conn_id,
455                    payload.cid,
456                    payload.sid,
457                    payload.ioid,
458                    sid,
459                    pv_name,
460                    payload.field_name
461                );
462                return;
463            }
464            let resp = encode_get_field_error(request_id, "PV not found", version, is_be);
465            state.registry.send_msg(conn_id, resp).await;
466            return;
467        }
468    }
469
470    if state.pvlist_mode != PvListMode::List {
471        let resp = encode_get_field_error(
472            request_id,
473            "GET_FIELD listing is disabled (set --pvlist-mode=list)",
474            version,
475            is_be,
476        );
477        state.registry.send_msg(conn_id, resp).await;
478        return;
479    }
480
481    let Some(pattern) = requested_pvlist_pattern(payload.field_name.as_deref()) else {
482        let resp = encode_get_field_error(
483            request_id,
484            "GET_FIELD requires a valid list pattern",
485            version,
486            is_be,
487        );
488        state.registry.send_msg(conn_id, resp).await;
489        return;
490    };
491
492    let all_names = state.store.list_pvs().await;
493    let mut names = collect_visible_pv_names(
494        &all_names,
495        state.pvlist_mode,
496        state.pvlist_allow_pattern.as_ref(),
497        state.pvlist_max,
498    );
499    if pattern != "*" {
500        names.retain(|name| wildcard_match(pattern, name));
501    }
502    if names.is_empty() {
503        let resp =
504            encode_get_field_error(request_id, "No PVs matched list request", version, is_be);
505        state.registry.send_msg(conn_id, resp).await;
506        return;
507    }
508    let desc = build_pvlist_structure(&names);
509    let resp = encode_get_field_response(request_id, &desc, version, is_be);
510    dump_hex_packet(
511        conn_id,
512        "tx",
513        "cmd=17 get_field_list",
514        version,
515        is_be,
516        &resp,
517    );
518    state.registry.send_msg(conn_id, resp).await;
519    debug!(
520        "Conn {}: get_field list pattern='{}' returned {} entries",
521        conn_id,
522        pattern,
523        names.len()
524    );
525}
526
527// ---------------------------------------------------------------------------
528// Server RPC handler
529// ---------------------------------------------------------------------------
530
531async fn handle_server_rpc<S: PvStore>(
532    state: &ServerState<S>,
533    conn_id: u64,
534    ioid: u32,
535    subcmd: u8,
536    version: u8,
537    is_be: bool,
538) {
539    if state.pvlist_mode != PvListMode::List {
540        let resp = encode_op_status_error_response(
541            20,
542            ioid,
543            subcmd,
544            "RPC list endpoint disabled (set --pvlist-mode=list)",
545            version,
546            is_be,
547        );
548        state.registry.send_msg(conn_id, resp).await;
549        return;
550    }
551
552    let all_names = state.store.list_pvs().await;
553    let names = collect_visible_pv_names(
554        &all_names,
555        state.pvlist_mode,
556        state.pvlist_allow_pattern.as_ref(),
557        state.pvlist_max,
558    );
559    let payload = NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(names)));
560
561    let is_init = (subcmd & 0x08) != 0;
562    if is_init {
563        let resp = encode_op_status_response(20, ioid, subcmd, version, is_be);
564        state.registry.send_msg(conn_id, resp).await;
565        return;
566    }
567
568    let resp = encode_op_rpc_data_response_payload(ioid, subcmd, &payload, version, is_be);
569    state.registry.send_msg(conn_id, resp).await;
570}
571
572// ---------------------------------------------------------------------------
573// Control message handler (inside segmented stream)
574// ---------------------------------------------------------------------------
575
576async fn handle_control_message<S: PvStore>(
577    state: &ServerState<S>,
578    conn_id: u64,
579    header: &PvaHeader,
580) {
581    debug!(
582        "Conn {}: control (segmented) cmd={} data={}",
583        conn_id, header.command, header.payload_length
584    );
585    if header.command == 3 {
586        let resp = encode_control_message(
587            true,
588            header.flags.is_msb,
589            header.version,
590            4,
591            header.payload_length,
592        );
593        state.registry.send_msg(conn_id, resp).await;
594    }
595}
596
597// ---------------------------------------------------------------------------
598// UDP search handler
599// ---------------------------------------------------------------------------
600
601/// Run the UDP search responder.
602pub async fn run_udp_search<S: PvStore>(
603    state: Arc<ServerState<S>>,
604    addr: SocketAddr,
605    tcp_port: u16,
606    guid: [u8; 12],
607    advertise_ip: Option<IpAddr>,
608) -> Result<(), Box<dyn std::error::Error>> {
609    let socket = UdpSocket::bind(addr).await?;
610    socket.set_broadcast(true)?;
611    let mut buf = vec![0u8; 4096];
612
613    loop {
614        let (len, peer) = socket.recv_from(&mut buf).await?;
615        let data = &buf[..len];
616        let header = PvaHeader::new(data);
617        if header.flags.is_control || header.command != 3 {
618            continue;
619        }
620        let mut pkt = PvaPacket::new(data);
621        let Some(cmd) = pkt.decode_payload() else {
622            continue;
623        };
624        let version = pkt.header.version;
625        let is_be = pkt.header.flags.is_msb;
626        match cmd {
627            PvaPacketCommand::Search(payload) => {
628                debug!(
629                    "UDP search from {}: pv_count={} mask=0x{:02x}",
630                    peer,
631                    payload.pv_requests.len(),
632                    payload.mask
633                );
634                let accepts_tcp = payload.protocols.is_empty()
635                    || payload
636                        .protocols
637                        .iter()
638                        .any(|p| p.eq_ignore_ascii_case("tcp"));
639                if !accepts_tcp {
640                    debug!("UDP search: no compatible protocol (tcp not accepted)");
641                    continue;
642                }
643                let all_names = state.store.list_pvs().await;
644                let visible_names = collect_visible_pv_names(
645                    &all_names,
646                    state.pvlist_mode,
647                    state.pvlist_allow_pattern.as_ref(),
648                    state.pvlist_max,
649                );
650                let mut cids = Vec::new();
651                for (cid, name) in &payload.pv_requests {
652                    if state.store.has_pv(name).await
653                        || is_virtual_event_pv(name)
654                        || (is_pvlist_virtual_pv(name) && state.pvlist_mode == PvListMode::List)
655                        || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
656                    {
657                        cids.push(*cid);
658                        continue;
659                    }
660                    if state.pvlist_mode != PvListMode::Off
661                        && is_pattern_query(name)
662                        && visible_names.iter().any(|pv| wildcard_match(name, pv))
663                    {
664                        cids.push(*cid);
665                    }
666                }
667                let response_required = (payload.mask & 0x01) != 0;
668                let server_discovery_ping = payload.pv_requests.is_empty();
669                let found = server_discovery_ping || !cids.is_empty();
670                if !found && !response_required {
671                    debug!("UDP search: no matches and response not required");
672                    continue;
673                }
674                let resp_ip = if let Some(ip) = advertise_ip {
675                    ip
676                } else if !addr.ip().is_unspecified() {
677                    addr.ip()
678                } else if let Some(ip) = infer_udp_response_ip(peer) {
679                    debug!("UDP search: inferred response address {}", ip);
680                    ip
681                } else {
682                    IpAddr::V4(Ipv4Addr::UNSPECIFIED)
683                };
684                let addr_bytes = if resp_ip.is_unspecified() {
685                    debug!("UDP search: responding with zero address (unspecified listen)");
686                    [0u8; 16]
687                } else {
688                    ip_to_bytes(resp_ip)
689                };
690                let response = encode_search_response(
691                    guid,
692                    payload.seq,
693                    addr_bytes,
694                    tcp_port,
695                    "tcp",
696                    found,
697                    &cids,
698                    version,
699                    is_be,
700                );
701                let reply_target = search_reply_target(&payload.addr, payload.port, peer);
702                if let Err(e) = socket.send_to(&response, reply_target).await {
703                    debug!(
704                        "UDP search: failed sending {} matches to {}: {}",
705                        cids.len(),
706                        reply_target,
707                        e
708                    );
709                    continue;
710                }
711                debug!(
712                    "UDP search: responded found={} with {} matches to {}",
713                    found,
714                    cids.len(),
715                    reply_target
716                );
717            }
718            _ => {}
719        }
720    }
721}
722
723// ---------------------------------------------------------------------------
724// TCP server
725// ---------------------------------------------------------------------------
726
727/// Accept TCP connections and spawn a handler for each.
728pub async fn run_tcp_server<S: PvStore>(
729    state: Arc<ServerState<S>>,
730    addr: SocketAddr,
731    conn_timeout: Duration,
732) -> Result<(), Box<dyn std::error::Error>> {
733    let listener = TcpListener::bind(addr).await?;
734    let conn_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
735
736    loop {
737        let (stream, peer) = listener.accept().await?;
738        let id = conn_id.fetch_add(1, Ordering::SeqCst);
739        info!("TCP connection {} from {}", id, peer);
740        let state_clone = state.clone();
741        tokio::spawn(async move {
742            if let Err(e) = handle_connection(state_clone, stream, id, conn_timeout).await {
743                error!("Connection {} error: {}", id, e);
744            }
745        });
746    }
747}
748
749// ---------------------------------------------------------------------------
750// Core TCP connection handler
751// ---------------------------------------------------------------------------
752
753/// Handle a single PVA TCP connection.
754///
755/// This is the main protocol loop: handshake, then dispatch each command
756/// (CreateChannel, GET, PUT, PUT_GET, MONITOR, RPC, etc.) using the
757/// [`PvStore`] abstraction.
758pub async fn handle_connection<S: PvStore>(
759    state: Arc<ServerState<S>>,
760    stream: TcpStream,
761    conn_id: u64,
762    conn_timeout: Duration,
763) -> Result<(), Box<dyn std::error::Error>> {
764    let (mut reader, mut writer) = stream.into_split();
765    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(128);
766
767    {
768        let mut conns = state.registry.conns.lock().await;
769        conns.insert(conn_id, tx);
770    }
771
772    let writer_task = tokio::spawn(async move {
773        while let Some(msg) = rx.recv().await {
774            if writer.write_all(&msg).await.is_err() {
775                break;
776            }
777        }
778    });
779
780    let mut conn_state = ConnState::default();
781
782    // Per EPICS PVA protocol: send SET_BYTE_ORDER control message before validation.
783    let set_byte_order = encode_control_message(true, false, 2, 2, 0);
784    validate_encoded_packet(conn_id, "set_byte_order", &set_byte_order);
785    dump_hex_packet(
786        conn_id,
787        "tx",
788        "ctrl=2 set_byte_order",
789        2,
790        false,
791        &set_byte_order,
792    );
793    state.registry.send_msg(conn_id, set_byte_order).await;
794
795    // Server sends Connection Validation (cmd=1) next.
796    let server_validation = encode_connection_validation(16_384, 512, &["anonymous", "ca"], 2, false);
797    validate_encoded_packet(conn_id, "server_validation_init", &server_validation);
798    dump_hex_packet(
799        conn_id,
800        "tx",
801        "cmd=1 server_validation_init",
802        2,
803        false,
804        &server_validation,
805    );
806    state.registry.send_msg(conn_id, server_validation).await;
807
808    let mut last_activity = Instant::now();
809
810    loop {
811        let mut header = [0u8; 8];
812        let elapsed = last_activity.elapsed();
813        if elapsed >= conn_timeout {
814            info!("Conn {} idle timeout", conn_id);
815            break;
816        }
817        let remaining = conn_timeout - elapsed;
818        let read_header = tokio::time::timeout(remaining, reader.read_exact(&mut header)).await;
819        match read_header {
820            Ok(Ok(_)) => {}
821            Ok(Err(_)) => break,
822            Err(_) => {
823                info!("Conn {} idle timeout", conn_id);
824                break;
825            }
826        }
827        let header_pkt = PvaPacket::new(&header);
828        let payload_len = if header_pkt.header.flags.is_control {
829            0usize
830        } else {
831            header_pkt.header.payload_length as usize
832        };
833        let mut payload = vec![0u8; payload_len];
834        if payload_len > 0 {
835            let elapsed = last_activity.elapsed();
836            if elapsed >= conn_timeout {
837                info!("Conn {} idle timeout", conn_id);
838                break;
839            }
840            let remaining = conn_timeout - elapsed;
841            let read_payload =
842                tokio::time::timeout(remaining, reader.read_exact(&mut payload)).await;
843            match read_payload {
844                Ok(Ok(_)) => {}
845                Ok(Err(_)) => break,
846                Err(_) => {
847                    info!("Conn {} idle timeout", conn_id);
848                    break;
849                }
850            }
851        }
852        last_activity = Instant::now();
853        let mut full = header.to_vec();
854        full.extend_from_slice(&payload);
855
856        // Segmented message reassembly
857        if header_pkt.header.flags.is_segmented != 0 && !header_pkt.header.flags.is_control {
858            debug!(
859                "Conn {}: segmented message cmd={} seg=0x{:02x}",
860                conn_id, header_pkt.header.command, header_pkt.header.flags.is_segmented
861            );
862            let mut payloads = vec![payload];
863            let mut seg_flags = header_pkt.header.flags;
864            while !seg_flags.is_last_segment {
865                let mut seg_header = [0u8; 8];
866                let elapsed = last_activity.elapsed();
867                if elapsed >= conn_timeout {
868                    info!("Conn {} idle timeout", conn_id);
869                    break;
870                }
871                let remaining = conn_timeout - elapsed;
872                let read_header =
873                    tokio::time::timeout(remaining, reader.read_exact(&mut seg_header)).await;
874                match read_header {
875                    Ok(Ok(_)) => {}
876                    Ok(Err(_)) => break,
877                    Err(_) => {
878                        info!("Conn {} idle timeout", conn_id);
879                        break;
880                    }
881                }
882
883                let seg_header_pkt = PvaPacket::new(&seg_header);
884                let seg_payload_len = if seg_header_pkt.header.flags.is_control {
885                    0usize
886                } else {
887                    seg_header_pkt.header.payload_length as usize
888                };
889                let mut seg_payload = vec![0u8; seg_payload_len];
890                if seg_payload_len > 0 {
891                    let elapsed = last_activity.elapsed();
892                    if elapsed >= conn_timeout {
893                        info!("Conn {} idle timeout", conn_id);
894                        break;
895                    }
896                    let remaining = conn_timeout - elapsed;
897                    let read_payload =
898                        tokio::time::timeout(remaining, reader.read_exact(&mut seg_payload)).await;
899                    match read_payload {
900                        Ok(Ok(_)) => {}
901                        Ok(Err(_)) => break,
902                        Err(_) => {
903                            info!("Conn {} idle timeout", conn_id);
904                            break;
905                        }
906                    }
907                }
908                last_activity = Instant::now();
909
910                if seg_header_pkt.header.flags.is_control {
911                    handle_control_message(&state, conn_id, &seg_header_pkt.header).await;
912                    continue;
913                }
914                if seg_header_pkt.header.flags.is_segmented == 0 {
915                    debug!(
916                        "Conn {}: segmented message interrupted by non-segmented cmd={}",
917                        conn_id, seg_header_pkt.header.command
918                    );
919                    break;
920                }
921                payloads.push(seg_payload);
922                seg_flags = seg_header_pkt.header.flags;
923            }
924            full = assemble_segmented_message(header, payloads);
925        }
926
927        let mut pkt = PvaPacket::new(&full);
928        let Some(cmd) = pkt.decode_payload() else {
929            continue;
930        };
931        let version = pkt.header.version;
932        let is_be = pkt.header.flags.is_msb;
933        let cmd_code = pkt.header.command;
934        let payload_slice = if full.len() >= 8 { &full[8..] } else { &[] };
935
936        // Connection Validation (cmd=1): respond with CONNECTION_VALIDATED (cmd=9).
937        if cmd_code == 1 {
938            dump_hex_packet(conn_id, "rx", "cmd=1 validation", version, is_be, &full);
939            let validation = spvirit_codec::epics_decode::PvaConnectionValidationPayload::new(
940                payload_slice,
941                is_be,
942                false,
943            );
944            if let Some(val) = validation {
945                debug!(
946                    "Conn {}: validation request (cmd=1) ver={} be={} buf={} qos={} authz={:?}",
947                    conn_id, version, is_be, val.buffer_size, val.qos, val.authz
948                );
949                let resp = spvirit_codec::spvirit_encode::encode_connection_validated(
950                    true, version, is_be,
951                );
952                validate_encoded_packet(conn_id, "conn_validated_resp", &resp);
953                dump_hex_packet(
954                    conn_id,
955                    "tx",
956                    "cmd=9 connection_validated",
957                    version,
958                    is_be,
959                    &resp,
960                );
961                state.registry.send_msg(conn_id, resp).await;
962                continue;
963            }
964        }
965        if cmd_code == 17 {
966            dump_hex_packet(conn_id, "rx", "cmd=17 get_field", version, is_be, &full);
967        }
968
969        match cmd {
970            PvaPacketCommand::Control(payload) => {
971                debug!("Conn {}: control {}", conn_id, payload);
972                if payload.command == 3 {
973                    let resp = encode_control_message(true, is_be, version, 4, payload.data);
974                    state.registry.send_msg(conn_id, resp).await;
975                }
976                continue;
977            }
978            PvaPacketCommand::ConnectionValidation(_) => {
979                debug!("Conn {}: validation request (decoded)", conn_id);
980            }
981            PvaPacketCommand::ConnectionValidated(_) => {
982                debug!("Conn {}: validation confirmed (decoded)", conn_id);
983            }
984            PvaPacketCommand::CreateChannel(payload) => {
985                debug!(
986                    "Conn {}: create_channel count={}",
987                    conn_id,
988                    payload.channels.len()
989                );
990                for (cid, pv_name) in payload.channels {
991                    if has_pv(&state, &pv_name).await {
992                        let sid = state.sid_counter.fetch_add(1, Ordering::SeqCst);
993                        conn_state.cid_to_sid.insert(cid, sid);
994                        conn_state.sid_to_pv.insert(sid, pv_name.clone());
995                        let resp = encode_create_channel_response(cid, sid, version, is_be);
996                        state.registry.send_msg(conn_id, resp).await;
997                        info!(
998                            "Conn {}: channel '{}' cid={} sid={}",
999                            conn_id, pv_name, cid, sid
1000                        );
1001                    } else {
1002                        let resp = encode_create_channel_error(cid, "PV not found", version, is_be);
1003                        state.registry.send_msg(conn_id, resp).await;
1004                        info!(
1005                            "Conn {}: channel '{}' not found (cid={})",
1006                            conn_id, pv_name, cid
1007                        );
1008                    }
1009                }
1010            }
1011            PvaPacketCommand::Op(payload) => {
1012                if payload.is_server {
1013                    continue;
1014                }
1015                let sid = payload.sid_or_cid;
1016                let ioid = payload.ioid;
1017                debug!(
1018                    "Conn {}: op cmd={} ioid={} sid={} sub=0x{:02x} body_len={}",
1019                    conn_id,
1020                    payload.command,
1021                    ioid,
1022                    sid,
1023                    payload.subcmd,
1024                    payload.body.len()
1025                );
1026                let Some(pv_name) = conn_state.sid_to_pv.get(&sid).cloned() else {
1027                    state
1028                        .registry
1029                        .send_msg(
1030                            conn_id,
1031                            encode_op_error(
1032                                payload.command,
1033                                payload.subcmd,
1034                                ioid,
1035                                "Unknown SID",
1036                                version,
1037                                is_be,
1038                            ),
1039                        )
1040                        .await;
1041                    continue;
1042                };
1043
1044                let is_init = (payload.subcmd & 0x08) != 0;
1045
1046                match payload.command {
1047                    10 => {
1048                        // GET
1049                        if is_init {
1050                            // Init only needs the type descriptor, not the data.
1051                            // Use get_descriptor first; fall back to snapshot.
1052                            let full_desc = if let Some(desc) =
1053                                state.store.get_descriptor(&pv_name).await
1054                            {
1055                                desc
1056                            } else if let Some(nt) =
1057                                get_nt_snapshot(&state, &pv_name).await
1058                            {
1059                                nt_payload_desc(&nt)
1060                            } else {
1061                                state
1062                                    .registry
1063                                    .send_msg(
1064                                        conn_id,
1065                                        encode_op_error(
1066                                            payload.command,
1067                                            payload.subcmd,
1068                                            ioid,
1069                                            "PV not found",
1070                                            version,
1071                                            is_be,
1072                                        ),
1073                                    )
1074                                    .await;
1075                                continue;
1076                            };
1077                            let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1078                            let desc = match &pv_req_fields {
1079                                Some(fields) => filter_structure_desc(&full_desc, fields),
1080                                None => full_desc,
1081                            };
1082                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1083                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1084                            let resp = encode_op_init_response_desc(
1085                                payload.command,
1086                                ioid,
1087                                0x08,
1088                                &desc,
1089                                version,
1090                                is_be,
1091                            );
1092                            state.registry.send_msg(conn_id, resp).await;
1093                            info!("Conn {}: get init pv='{}' ioid={}", conn_id, pv_name, ioid);
1094                        } else {
1095                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1096                                state
1097                                    .registry
1098                                    .send_msg(
1099                                        conn_id,
1100                                        encode_op_error(
1101                                            payload.command,
1102                                            payload.subcmd,
1103                                            ioid,
1104                                            "PV has no data yet",
1105                                            version,
1106                                            is_be,
1107                                        ),
1108                                    )
1109                                    .await;
1110                                continue;
1111                            };
1112                            let resp = if let Some(desc) = conn_state.ioid_to_desc.get(&ioid) {
1113                                encode_op_data_response_filtered(
1114                                    10, ioid, &nt, desc, version, is_be,
1115                                )
1116                            } else {
1117                                encode_op_get_data_response_payload(ioid, &nt, version, is_be)
1118                            };
1119                            state.registry.send_msg(conn_id, resp).await;
1120                            debug!("Conn {}: get data pv='{}' ioid={}", conn_id, pv_name, ioid);
1121                        }
1122                    }
1123                    11 => {
1124                        // PUT
1125                        if is_init {
1126                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1127                                state
1128                                    .registry
1129                                    .send_msg(
1130                                        conn_id,
1131                                        encode_op_error(
1132                                            payload.command,
1133                                            payload.subcmd,
1134                                            ioid,
1135                                            "PV not found",
1136                                            version,
1137                                            is_be,
1138                                        ),
1139                                    )
1140                                    .await;
1141                                continue;
1142                            };
1143                            if !is_virtual_event_pv(&pv_name)
1144                                && !is_writable_pv(&state, &pv_name).await
1145                            {
1146                                let resp = encode_op_put_status_response(
1147                                    ioid,
1148                                    0x08,
1149                                    "Write access denied",
1150                                    version,
1151                                    is_be,
1152                                );
1153                                state.registry.send_msg(conn_id, resp).await;
1154                                continue;
1155                            }
1156                            let desc = nt_payload_desc(&nt);
1157                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1158                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1159                            let resp = encode_op_init_response_desc(
1160                                payload.command,
1161                                ioid,
1162                                0x08,
1163                                &desc,
1164                                version,
1165                                is_be,
1166                            );
1167                            state.registry.send_msg(conn_id, resp).await;
1168                            info!("Conn {}: put init pv='{}' ioid={}", conn_id, pv_name, ioid);
1169                        } else {
1170                            if (payload.subcmd & 0x40) != 0 {
1171                                if !is_virtual_event_pv(&pv_name)
1172                                    && !is_writable_pv(&state, &pv_name).await
1173                                {
1174                                    let resp = encode_op_put_status_response(
1175                                        ioid,
1176                                        0x40,
1177                                        "Write access denied",
1178                                        version,
1179                                        is_be,
1180                                    );
1181                                    state.registry.send_msg(conn_id, resp).await;
1182                                    continue;
1183                                }
1184                                if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1185                                    let resp = encode_op_put_getput_response_payload(
1186                                        ioid, &nt, version, is_be,
1187                                    );
1188                                    state.registry.send_msg(conn_id, resp).await;
1189                                    debug!(
1190                                        "Conn {}: put get-put pv='{}' ioid={}",
1191                                        conn_id, pv_name, ioid
1192                                    );
1193                                } else {
1194                                    state
1195                                        .registry
1196                                        .send_msg(
1197                                            conn_id,
1198                                            encode_op_error(
1199                                                payload.command,
1200                                                payload.subcmd,
1201                                                ioid,
1202                                                "PV not found",
1203                                                version,
1204                                                is_be,
1205                                            ),
1206                                        )
1207                                        .await;
1208                                }
1209                                continue;
1210                            }
1211                            let desc = match conn_state.ioid_to_desc.get(&ioid) {
1212                                Some(d) => d.clone(),
1213                                None => {
1214                                    state
1215                                        .registry
1216                                        .send_msg(
1217                                            conn_id,
1218                                            encode_op_error(
1219                                                payload.command,
1220                                                payload.subcmd,
1221                                                ioid,
1222                                                "PUT without init",
1223                                                version,
1224                                                is_be,
1225                                            ),
1226                                        )
1227                                        .await;
1228                                    continue;
1229                                }
1230                            };
1231                            let decoded = decode_put_body(&payload.body, &desc, is_be);
1232                            if let Some(value) = decoded.as_ref() {
1233                                match state.store.put_value(&pv_name, value).await {
1234                                    Ok(changed) => {
1235                                        notify_changed_records(&state, changed).await;
1236                                    }
1237                                    Err(msg) => {
1238                                        let resp = encode_op_put_status_response(
1239                                            ioid,
1240                                            payload.subcmd,
1241                                            &msg,
1242                                            version,
1243                                            is_be,
1244                                        );
1245                                        state.registry.send_msg(conn_id, resp).await;
1246                                        continue;
1247                                    }
1248                                }
1249                            } else {
1250                                debug!(
1251                                    "Conn {}: put decode failed ioid={} body_len={}",
1252                                    conn_id,
1253                                    ioid,
1254                                    payload.body.len()
1255                                );
1256                                let resp = encode_op_put_status_response(
1257                                    ioid,
1258                                    payload.subcmd,
1259                                    "cannot decode PUT body",
1260                                    version,
1261                                    is_be,
1262                                );
1263                                state.registry.send_msg(conn_id, resp).await;
1264                                continue;
1265                            }
1266                            let resp = encode_op_put_response(ioid, payload.subcmd, version, is_be);
1267                            state.registry.send_msg(conn_id, resp).await;
1268                            debug!("Conn {}: put data pv='{}' ioid={}", conn_id, pv_name, ioid);
1269                        }
1270                    }
1271                    12 => {
1272                        // PUT_GET
1273                        if is_init {
1274                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1275                                state
1276                                    .registry
1277                                    .send_msg(
1278                                        conn_id,
1279                                        encode_op_error(
1280                                            payload.command,
1281                                            payload.subcmd,
1282                                            ioid,
1283                                            "PV not found",
1284                                            version,
1285                                            is_be,
1286                                        ),
1287                                    )
1288                                    .await;
1289                                continue;
1290                            };
1291                            if !is_virtual_event_pv(&pv_name)
1292                                && !is_writable_pv(&state, &pv_name).await
1293                            {
1294                                let resp = encode_op_put_get_init_error_response(
1295                                    ioid,
1296                                    "Write access denied",
1297                                    version,
1298                                    is_be,
1299                                );
1300                                state.registry.send_msg(conn_id, resp).await;
1301                                continue;
1302                            }
1303                            let desc = nt_payload_desc(&nt);
1304                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1305                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1306                            let resp =
1307                                encode_op_put_get_init_response(ioid, &desc, &desc, version, is_be);
1308                            state.registry.send_msg(conn_id, resp).await;
1309                            info!(
1310                                "Conn {}: put_get init pv='{}' ioid={}",
1311                                conn_id, pv_name, ioid
1312                            );
1313                        } else {
1314                            let desc = match conn_state.ioid_to_desc.get(&ioid) {
1315                                Some(d) => d.clone(),
1316                                None => {
1317                                    state
1318                                        .registry
1319                                        .send_msg(
1320                                            conn_id,
1321                                            encode_op_error(
1322                                                payload.command,
1323                                                payload.subcmd,
1324                                                ioid,
1325                                                "PUT_GET without init",
1326                                                version,
1327                                                is_be,
1328                                            ),
1329                                        )
1330                                        .await;
1331                                    continue;
1332                                }
1333                            };
1334                            let decoded = decode_put_body(&payload.body, &desc, is_be);
1335                            if let Some(value) = decoded.as_ref() {
1336                                match state.store.put_value(&pv_name, value).await {
1337                                    Ok(changed) => {
1338                                        notify_changed_records(&state, changed).await;
1339                                    }
1340                                    Err(msg) => {
1341                                        let resp = encode_op_put_get_data_error_response(
1342                                            ioid, &msg, version, is_be,
1343                                        );
1344                                        state.registry.send_msg(conn_id, resp).await;
1345                                        continue;
1346                                    }
1347                                }
1348                            } else {
1349                                debug!(
1350                                    "Conn {}: put_get decode failed ioid={} body_len={}",
1351                                    conn_id,
1352                                    ioid,
1353                                    payload.body.len()
1354                                );
1355                                let resp = encode_op_put_get_data_error_response(
1356                                    ioid,
1357                                    "cannot decode PUT body",
1358                                    version,
1359                                    is_be,
1360                                );
1361                                state.registry.send_msg(conn_id, resp).await;
1362                                continue;
1363                            }
1364                            if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1365                                let resp = encode_op_put_get_data_response_payload(
1366                                    ioid, &nt, version, is_be,
1367                                );
1368                                state.registry.send_msg(conn_id, resp).await;
1369                            } else {
1370                                state
1371                                    .registry
1372                                    .send_msg(
1373                                        conn_id,
1374                                        encode_op_error(
1375                                            payload.command,
1376                                            payload.subcmd,
1377                                            ioid,
1378                                            "PV not found",
1379                                            version,
1380                                            is_be,
1381                                        ),
1382                                    )
1383                                    .await;
1384                            }
1385                            debug!(
1386                                "Conn {}: put_get data pv='{}' ioid={}",
1387                                conn_id, pv_name, ioid
1388                            );
1389                        }
1390                    }
1391                    13 => {
1392                        // MONITOR
1393                        if is_init {
1394                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1395                                state
1396                                    .registry
1397                                    .send_msg(
1398                                        conn_id,
1399                                        encode_op_error(
1400                                            payload.command,
1401                                            payload.subcmd,
1402                                            ioid,
1403                                            "PV not found",
1404                                            version,
1405                                            is_be,
1406                                        ),
1407                                    )
1408                                    .await;
1409                                continue;
1410                            };
1411                            let full_desc = nt_payload_desc(&nt);
1412                            let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1413                            let desc = match &pv_req_fields {
1414                                Some(fields) => filter_structure_desc(&full_desc, fields),
1415                                None => full_desc,
1416                            };
1417                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1418                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1419                            let pipeline_enabled = (payload.subcmd & 0x80) != 0;
1420                            let mut nfree = 0u32;
1421                            if pipeline_enabled && payload.body.len() >= 4 {
1422                                let start = payload.body.len() - 4;
1423                                nfree = if is_be {
1424                                    u32::from_be_bytes([
1425                                        payload.body[start],
1426                                        payload.body[start + 1],
1427                                        payload.body[start + 2],
1428                                        payload.body[start + 3],
1429                                    ])
1430                                } else {
1431                                    u32::from_le_bytes([
1432                                        payload.body[start],
1433                                        payload.body[start + 1],
1434                                        payload.body[start + 2],
1435                                        payload.body[start + 3],
1436                                    ])
1437                                };
1438                            }
1439                            let resp = encode_op_init_response_desc(
1440                                payload.command,
1441                                ioid,
1442                                0x08,
1443                                &desc,
1444                                version,
1445                                is_be,
1446                            );
1447                            state.registry.send_msg(conn_id, resp).await;
1448                            conn_state.ioid_to_monitor.insert(
1449                                ioid,
1450                                MonitorState {
1451                                    running: false,
1452                                    pipeline_enabled,
1453                                    nfree,
1454                                },
1455                            );
1456                            {
1457                                let mut monitors = state.registry.monitors.lock().await;
1458                                monitors
1459                                    .entry(pv_name.clone())
1460                                    .or_default()
1461                                    .push(MonitorSub {
1462                                        conn_id,
1463                                        ioid,
1464                                        version,
1465                                        is_be,
1466                                        running: false,
1467                                        pipeline_enabled,
1468                                        nfree,
1469                                        filtered_desc: conn_state.ioid_to_desc.get(&ioid).cloned(),
1470                                    });
1471                            }
1472                            info!(
1473                                "Conn {}: monitor init pv='{}' ioid={}",
1474                                conn_id, pv_name, ioid
1475                            );
1476                        } else if (payload.subcmd & 0x10) != 0 {
1477                            // Monitor destroy
1478                            if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1479                                let resp = encode_monitor_data_response_payload(
1480                                    ioid, 0x10, &nt, version, is_be,
1481                                );
1482                                state.registry.send_msg(conn_id, resp).await;
1483                            }
1484                            state
1485                                .registry
1486                                .remove_monitor_subscription(conn_id, ioid, &pv_name)
1487                                .await;
1488                            conn_state.ioid_to_monitor.remove(&ioid);
1489                            conn_state.ioid_to_pv.remove(&ioid);
1490                            conn_state.ioid_to_desc.remove(&ioid);
1491                            info!("Conn {}: monitor end ioid={}", conn_id, ioid);
1492                        } else if (payload.subcmd & 0x04) != 0 || (payload.subcmd & 0x80) != 0 {
1493                            // Monitor start/stop/pipeline-ack
1494                            let start = (payload.subcmd & 0x44) == 0x44;
1495                            let stop = (payload.subcmd & 0x44) == 0x04;
1496                            let pipeline_ack = (payload.subcmd & 0x80) != 0;
1497                            let mut nfree = None;
1498                            if pipeline_ack && payload.body.len() >= 4 {
1499                                let v = if is_be {
1500                                    u32::from_be_bytes([
1501                                        payload.body[0],
1502                                        payload.body[1],
1503                                        payload.body[2],
1504                                        payload.body[3],
1505                                    ])
1506                                } else {
1507                                    u32::from_le_bytes([
1508                                        payload.body[0],
1509                                        payload.body[1],
1510                                        payload.body[2],
1511                                        payload.body[3],
1512                                    ])
1513                                };
1514                                nfree = Some(v);
1515                            }
1516                            let running = if start {
1517                                true
1518                            } else if stop {
1519                                false
1520                            } else {
1521                                conn_state
1522                                    .ioid_to_monitor
1523                                    .get(&ioid)
1524                                    .map(|m| m.running)
1525                                    .unwrap_or(true)
1526                            };
1527                            state
1528                                .registry
1529                                .update_monitor_subscription(
1530                                    conn_id,
1531                                    ioid,
1532                                    &pv_name,
1533                                    running,
1534                                    nfree,
1535                                    Some(pipeline_ack),
1536                                )
1537                                .await;
1538                            if let Some(mon) = conn_state.ioid_to_monitor.get_mut(&ioid) {
1539                                mon.running = running;
1540                                if pipeline_ack {
1541                                    mon.pipeline_enabled = true;
1542                                }
1543                                if let Some(v) = nfree {
1544                                    if pipeline_ack {
1545                                        mon.nfree = mon.nfree.saturating_add(v);
1546                                    } else {
1547                                        mon.nfree = v;
1548                                    }
1549                                }
1550                            }
1551                            info!(
1552                                "Conn {}: monitor {} ioid={} ack={} nfree={:?}",
1553                                conn_id,
1554                                if start {
1555                                    "start"
1556                                } else if stop {
1557                                    "stop"
1558                                } else {
1559                                    "ack"
1560                                },
1561                                ioid,
1562                                pipeline_ack,
1563                                nfree
1564                            );
1565                            if start {
1566                                if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1567                                    state
1568                                        .registry
1569                                        .send_monitor_update_for(&pv_name, conn_id, ioid, &nt)
1570                                        .await;
1571                                }
1572                            }
1573                        }
1574                    }
1575                    20 => {
1576                        // RPC
1577                        if is_server_rpc_pv(&pv_name) {
1578                            handle_server_rpc(
1579                                &state,
1580                                conn_id,
1581                                ioid,
1582                                payload.subcmd,
1583                                version,
1584                                is_be,
1585                            )
1586                            .await;
1587                        } else {
1588                            state
1589                                .registry
1590                                .send_msg(
1591                                    conn_id,
1592                                    encode_op_error(
1593                                        payload.command,
1594                                        payload.subcmd,
1595                                        ioid,
1596                                        "Operation not supported",
1597                                        version,
1598                                        is_be,
1599                                    ),
1600                                )
1601                                .await;
1602                        }
1603                    }
1604                    14 | 16 => {
1605                        state
1606                            .registry
1607                            .send_msg(
1608                                conn_id,
1609                                encode_op_error(
1610                                    payload.command,
1611                                    payload.subcmd,
1612                                    ioid,
1613                                    "Operation not supported",
1614                                    version,
1615                                    is_be,
1616                                ),
1617                            )
1618                            .await;
1619                    }
1620                    _ => {
1621                        state
1622                            .registry
1623                            .send_msg(
1624                                conn_id,
1625                                encode_op_error(
1626                                    payload.command,
1627                                    payload.subcmd,
1628                                    ioid,
1629                                    "Operation not supported",
1630                                    version,
1631                                    is_be,
1632                                ),
1633                            )
1634                            .await;
1635                    }
1636                }
1637            }
1638            PvaPacketCommand::DestroyChannel(payload) => {
1639                let sid = payload.sid;
1640                let cid = payload.cid;
1641                conn_state.cid_to_sid.remove(&cid);
1642                conn_state.sid_to_pv.remove(&sid);
1643                info!(
1644                    "Conn {}: channel destroyed sid={} cid={}",
1645                    conn_id, sid, cid
1646                );
1647            }
1648            PvaPacketCommand::DestroyRequest(payload) => {
1649                let ioid = payload.request_id;
1650                if let Some(pv_name) = conn_state.ioid_to_pv.remove(&ioid) {
1651                    state
1652                        .registry
1653                        .remove_monitor_subscription(conn_id, ioid, &pv_name)
1654                        .await;
1655                    conn_state.ioid_to_desc.remove(&ioid);
1656                    conn_state.ioid_to_monitor.remove(&ioid);
1657                    info!("Conn {}: monitor unsubscribed ioid={}", conn_id, ioid);
1658                }
1659            }
1660            PvaPacketCommand::AuthNZ(_) => {
1661                // Silently accept — pvxs and pvAccessCPP ignore AUTHNZ.
1662                debug!("Conn {}: ignoring AUTHNZ", conn_id);
1663            }
1664            PvaPacketCommand::AclChange(_) => {
1665                let resp =
1666                    encode_message_error("ACL_CHANGE command is not supported", version, is_be);
1667                state.registry.send_msg(conn_id, resp).await;
1668            }
1669            PvaPacketCommand::GetField(payload) => {
1670                handle_get_field_request(&state, &conn_state, conn_id, payload, version, is_be)
1671                    .await;
1672            }
1673            PvaPacketCommand::Echo(payload_bytes) => {
1674                let mut resp =
1675                    encode_header(true, is_be, false, version, 2, payload_bytes.len() as u32);
1676                resp.extend_from_slice(&payload_bytes);
1677                state.registry.send_msg(conn_id, resp).await;
1678            }
1679            PvaPacketCommand::Message(_) => {
1680                let resp = encode_message_error("MESSAGE command is not supported", version, is_be);
1681                state.registry.send_msg(conn_id, resp).await;
1682            }
1683            PvaPacketCommand::MultipleData(_) => {
1684                let resp =
1685                    encode_message_error("MULTIPLE_DATA command is not supported", version, is_be);
1686                state.registry.send_msg(conn_id, resp).await;
1687            }
1688            PvaPacketCommand::CancelRequest(_) => {
1689                let resp =
1690                    encode_message_error("CANCEL_REQUEST command is not supported", version, is_be);
1691                state.registry.send_msg(conn_id, resp).await;
1692            }
1693            PvaPacketCommand::OriginTag(_) => {
1694                let resp =
1695                    encode_message_error("ORIGIN_TAG command is not supported", version, is_be);
1696                state.registry.send_msg(conn_id, resp).await;
1697            }
1698            PvaPacketCommand::Search(payload) => {
1699                debug!(
1700                    "Conn {}: TCP search: pv_count={} mask=0x{:02x}",
1701                    conn_id,
1702                    payload.pv_requests.len(),
1703                    payload.mask
1704                );
1705                let accepts_tcp = payload.protocols.is_empty()
1706                    || payload
1707                        .protocols
1708                        .iter()
1709                        .any(|p| p.eq_ignore_ascii_case("tcp"));
1710                if accepts_tcp {
1711                    let all_names = state.store.list_pvs().await;
1712                    let visible_names = collect_visible_pv_names(
1713                        &all_names,
1714                        state.pvlist_mode,
1715                        state.pvlist_allow_pattern.as_ref(),
1716                        state.pvlist_max,
1717                    );
1718                    let mut cids = Vec::new();
1719                    for (cid, name) in &payload.pv_requests {
1720                        if state.store.has_pv(name).await
1721                            || is_virtual_event_pv(name)
1722                            || (is_pvlist_virtual_pv(name)
1723                                && state.pvlist_mode == PvListMode::List)
1724                            || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
1725                        {
1726                            cids.push(*cid);
1727                            continue;
1728                        }
1729                        if state.pvlist_mode != PvListMode::Off
1730                            && is_pattern_query(name)
1731                            && visible_names.iter().any(|pv| wildcard_match(name, pv))
1732                        {
1733                            cids.push(*cid);
1734                        }
1735                    }
1736                    let server_discovery_ping = payload.pv_requests.is_empty();
1737                    let found = server_discovery_ping || !cids.is_empty();
1738                    let resp_ip = state.advertise_ip.unwrap_or(state.listen_ip);
1739                    let addr_bytes = if resp_ip.is_unspecified() {
1740                        [0u8; 16]
1741                    } else {
1742                        ip_to_bytes(resp_ip)
1743                    };
1744                    let response = encode_search_response(
1745                        state.guid,
1746                        payload.seq,
1747                        addr_bytes,
1748                        state.tcp_port,
1749                        "tcp",
1750                        found,
1751                        &cids,
1752                        version,
1753                        is_be,
1754                    );
1755                    state.registry.send_msg(conn_id, response).await;
1756                    debug!(
1757                        "Conn {}: TCP search responded found={} matches={}",
1758                        conn_id,
1759                        found,
1760                        cids.len()
1761                    );
1762                } else {
1763                    debug!("Conn {}: TCP search: no compatible protocol", conn_id);
1764                }
1765            }
1766            PvaPacketCommand::SearchResponse(_)
1767            | PvaPacketCommand::Beacon(_) => {
1768                let resp =
1769                    encode_message_error("Unexpected command for server endpoint", version, is_be);
1770                state.registry.send_msg(conn_id, resp).await;
1771            }
1772            PvaPacketCommand::Unknown(payload) => {
1773                let resp = encode_message_error(
1774                    &format!("Unknown command {}", payload.command),
1775                    version,
1776                    is_be,
1777                );
1778                state.registry.send_msg(conn_id, resp).await;
1779            }
1780        }
1781    }
1782
1783    state.registry.cleanup_connection(conn_id).await;
1784    let _ = writer_task.await;
1785    Ok(())
1786}