Skip to main content

spvirit_server/
handler.rs

1//! PVA protocol handler — the core TCP connection processor.
2//!
3//! [`handle_connection`] is generic over [`PvStore`], allowing any backend to
4//! serve PVs over the EPICS PVAccess protocol.
5
6use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
8use std::sync::Arc;
9use std::time::{Duration, Instant, SystemTime};
10
11use regex::Regex;
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use spvirit_codec::epics_decode::{PvaHeader, PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvirit_encode::{
19    encode_connection_validation, encode_control_message,
20    encode_create_channel_error, encode_create_channel_response, encode_get_field_error,
21    encode_get_field_response, encode_header, encode_message_error,
22    encode_monitor_data_response_payload, encode_op_error, encode_op_get_data_response_payload,
23    encode_op_init_response_desc, encode_op_put_get_data_error_response,
24    encode_op_put_get_data_response_payload, encode_op_put_get_init_error_response,
25    encode_op_put_get_init_response, encode_op_put_getput_response_payload, encode_op_put_response,
26    encode_op_put_status_response, encode_op_rpc_data_response_payload,
27    encode_op_status_error_response, encode_op_status_response, encode_search_response,
28    ip_from_bytes, ip_to_bytes,
29};
30use spvirit_codec::spvd_decode::{extract_subfield_desc, StructureDesc};
31use spvirit_codec::spvd_encode::nt_payload_desc;
32
33use spvirit_types::{NtPayload, NtScalar, NtScalarArray, ScalarArrayValue, ScalarValue};
34
35use crate::decode::{assemble_segmented_message, decode_put_body};
36use crate::monitor::MonitorRegistry;
37use crate::pvstore::PvStore;
38use crate::state::{ConnState, MonitorState, MonitorSub};
39
40// ---------------------------------------------------------------------------
41// PvListMode — controls virtual PV listing behaviour
42// ---------------------------------------------------------------------------
43
44/// Controls how the server exposes its PV directory.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum PvListMode {
47    /// No PV listing at all.
48    Off,
49    /// Respond to UDP search for known PVs only; no GET_FIELD listing.
50    Discover,
51    /// Full pvlist & server-RPC listing support.
52    List,
53}
54
55impl PvListMode {
56    pub fn parse(raw: &str) -> Result<Self, String> {
57        match raw.trim().to_ascii_lowercase().as_str() {
58            "off" => Ok(Self::Off),
59            "discover" => Ok(Self::Discover),
60            "list" => Ok(Self::List),
61            other => Err(format!(
62                "Invalid pvlist-mode '{}'; expected off|discover|list",
63                other
64            )),
65        }
66    }
67}
68
69// ---------------------------------------------------------------------------
70// Server shared state — generic over PvStore
71// ---------------------------------------------------------------------------
72
73/// Shared server state that is passed to every connection handler.
74pub struct ServerState<S: PvStore> {
75    pub store: Arc<S>,
76    pub registry: Arc<MonitorRegistry>,
77    pub sid_counter: AtomicU32,
78    pub beacon_change: Arc<AtomicU16>,
79    pub compute_alarms: bool,
80    pub pvlist_mode: PvListMode,
81    pub pvlist_max: usize,
82    pub pvlist_allow_pattern: Option<Regex>,
83}
84
85impl<S: PvStore> ServerState<S> {
86    pub fn new(
87        store: Arc<S>,
88        registry: Arc<MonitorRegistry>,
89        compute_alarms: bool,
90        pvlist_mode: PvListMode,
91        pvlist_max: usize,
92        pvlist_allow_pattern: Option<Regex>,
93    ) -> Self {
94        Self {
95            store,
96            registry,
97            sid_counter: AtomicU32::new(1),
98            beacon_change: Arc::new(AtomicU16::new(0)),
99            compute_alarms,
100            pvlist_mode,
101            pvlist_max,
102            pvlist_allow_pattern,
103        }
104    }
105}
106
107// ---------------------------------------------------------------------------
108// Virtual PV helpers
109// ---------------------------------------------------------------------------
110
111pub fn is_pvlist_virtual_pv(pv_name: &str) -> bool {
112    pv_name == "__pvlist"
113}
114
115pub fn is_server_rpc_pv(pv_name: &str) -> bool {
116    pv_name == "server"
117}
118
119pub fn is_virtual_event_pv(pv_name: &str) -> bool {
120    pv_name.starts_with("__event:")
121}
122
123pub fn virtual_event_nt(pv_name: &str) -> NtPayload {
124    NtPayload::Scalar(
125        NtScalar::from_value(ScalarValue::Bool(false))
126            .with_description(format!("Virtual event trigger for {}", pv_name)),
127    )
128}
129
130pub fn virtual_pvlist_nt(entries: Vec<String>) -> NtPayload {
131    NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(entries)))
132}
133
134// ---------------------------------------------------------------------------
135// Pattern / wildcard utilities
136// ---------------------------------------------------------------------------
137
138pub fn is_pattern_query(raw: &str) -> bool {
139    raw.contains('*') || raw.contains('?')
140}
141
142pub fn wildcard_match(pattern: &str, text: &str) -> bool {
143    let p = pattern.as_bytes();
144    let t = text.as_bytes();
145    let mut i = 0usize;
146    let mut j = 0usize;
147    let mut star: Option<usize> = None;
148    let mut match_j = 0usize;
149
150    while j < t.len() {
151        if i < p.len() && (p[i] == b'?' || p[i] == t[j]) {
152            i += 1;
153            j += 1;
154        } else if i < p.len() && p[i] == b'*' {
155            star = Some(i);
156            i += 1;
157            match_j = j;
158        } else if let Some(star_idx) = star {
159            i = star_idx + 1;
160            match_j += 1;
161            j = match_j;
162        } else {
163            return false;
164        }
165    }
166
167    while i < p.len() && p[i] == b'*' {
168        i += 1;
169    }
170    i == p.len()
171}
172
173pub fn collect_visible_pv_names(
174    all_names: &[String],
175    mode: PvListMode,
176    allow_pattern: Option<&Regex>,
177    max_items: usize,
178) -> Vec<String> {
179    let mut names: Vec<String> = all_names
180        .iter()
181        .filter(|name| {
182            allow_pattern
183                .as_ref()
184                .map(|re| re.is_match(name))
185                .unwrap_or(true)
186        })
187        .cloned()
188        .collect();
189    names.sort();
190    if names.len() > max_items {
191        names.truncate(max_items);
192    }
193    if mode == PvListMode::List && names.len() < max_items {
194        names.push("__pvlist".to_string());
195    }
196    names
197}
198
199fn build_pvlist_structure(names: &[String]) -> StructureDesc {
200    use spvirit_codec::spvd_decode::{FieldDesc, FieldType, TypeCode};
201    StructureDesc {
202        struct_id: Some("epics:pva/pvlist:1.0".to_string()),
203        fields: names
204            .iter()
205            .map(|name| FieldDesc {
206                name: name.clone(),
207                field_type: FieldType::Scalar(TypeCode::Boolean),
208            })
209            .collect(),
210    }
211}
212
213fn requested_pvlist_pattern(field_name: Option<&str>) -> Option<&str> {
214    let raw = field_name.map(str::trim).unwrap_or("");
215    if raw.is_empty() || raw == "*" || raw == "__pvlist" || raw.eq_ignore_ascii_case("pvlist") {
216        return Some("*");
217    }
218    if is_pattern_query(raw) {
219        return Some(raw);
220    }
221    None
222}
223
224// ---------------------------------------------------------------------------
225// Network helpers
226// ---------------------------------------------------------------------------
227
228pub fn search_reply_target(addr: &[u8; 16], port: u16, peer: SocketAddr) -> SocketAddr {
229    let target_port = if port != 0 { port } else { peer.port() };
230    let target_ip = ip_from_bytes(addr)
231        .filter(|ip| !ip.is_unspecified())
232        .unwrap_or_else(|| peer.ip());
233    SocketAddr::new(target_ip, target_port)
234}
235
236pub fn infer_udp_response_ip(peer: SocketAddr) -> Option<IpAddr> {
237    let bind_addr = if peer.is_ipv4() {
238        "0.0.0.0:0"
239    } else {
240        "[::]:0"
241    };
242    let sock = std::net::UdpSocket::bind(bind_addr).ok()?;
243    sock.connect(peer).ok()?;
244    let local = sock.local_addr().ok()?;
245    if local.ip().is_unspecified() {
246        None
247    } else {
248        Some(local.ip())
249    }
250}
251
252pub fn rand_guid() -> [u8; 12] {
253    let now = SystemTime::now()
254        .duration_since(SystemTime::UNIX_EPOCH)
255        .unwrap_or_default();
256    let mut guid = [0u8; 12];
257    let bytes = now.as_nanos().to_le_bytes();
258    guid.copy_from_slice(&bytes[0..12]);
259    guid
260}
261
262// ---------------------------------------------------------------------------
263// Debug utilities
264// ---------------------------------------------------------------------------
265
266pub fn validate_encoded_packet(conn_id: u64, label: &str, bytes: &[u8]) {
267    let mut pkt = PvaPacket::new(bytes);
268    let decoded = pkt.decode_payload();
269    match decoded {
270        Some(PvaPacketCommand::ConnectionValidation(payload)) => {
271            debug!(
272                "Conn {}: {} decoded as cmd=1 buffer_size={} qos={} authz={:?}",
273                conn_id, label, payload.buffer_size, payload.qos, payload.authz
274            );
275        }
276        Some(PvaPacketCommand::ConnectionValidated(_)) => {
277            debug!("Conn {}: {} decoded as cmd=9", conn_id, label);
278        }
279        Some(other) => {
280            debug!("Conn {}: {} decoded as {:?}", conn_id, label, other);
281        }
282        None => {
283            debug!("Conn {}: {} failed to decode", conn_id, label);
284        }
285    }
286}
287
288pub fn dump_hex_packet(
289    conn_id: u64,
290    dir: &str,
291    label: &str,
292    version: u8,
293    is_be: bool,
294    bytes: &[u8],
295) {
296    debug!(
297        "Conn {}: {} {} ver={} be={} len={}",
298        conn_id,
299        dir,
300        label,
301        version,
302        is_be,
303        bytes.len()
304    );
305    let mut offset = 0usize;
306    while offset < bytes.len() {
307        let end = usize::min(offset + 16, bytes.len());
308        let chunk = &bytes[offset..end];
309        let mut line = String::new();
310        for (i, b) in chunk.iter().enumerate() {
311            if i > 0 {
312                line.push(' ');
313            }
314            line.push_str(&format!("{:02x}", b));
315        }
316        debug!("Conn {}: {:04x} {}", conn_id, offset, line);
317        offset += 16;
318    }
319}
320
321// ---------------------------------------------------------------------------
322// Store-based snapshot/writable helpers (delegate to PvStore + virtual PVs)
323// ---------------------------------------------------------------------------
324
325async fn get_nt_snapshot<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> Option<NtPayload> {
326    if is_pvlist_virtual_pv(pv_name) {
327        if state.pvlist_mode != PvListMode::List {
328            return None;
329        }
330        let all_names = state.store.list_pvs().await;
331        let names = collect_visible_pv_names(
332            &all_names,
333            state.pvlist_mode,
334            state.pvlist_allow_pattern.as_ref(),
335            state.pvlist_max,
336        );
337        return Some(virtual_pvlist_nt(names));
338    }
339    if is_virtual_event_pv(pv_name) {
340        return Some(virtual_event_nt(pv_name));
341    }
342    state.store.get_snapshot(pv_name).await
343}
344
345async fn is_writable_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
346    if is_virtual_event_pv(pv_name) {
347        return true;
348    }
349    state.store.is_writable(pv_name).await
350}
351
352async fn has_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
353    state.store.has_pv(pv_name).await
354        || is_virtual_event_pv(pv_name)
355        || (is_pvlist_virtual_pv(pv_name) && state.pvlist_mode == PvListMode::List)
356        || (is_server_rpc_pv(pv_name) && state.pvlist_mode != PvListMode::Off)
357}
358
359// ---------------------------------------------------------------------------
360// Notify helpers
361// ---------------------------------------------------------------------------
362
363async fn notify_changed_records<S: PvStore>(
364    state: &ServerState<S>,
365    changed: Vec<(String, NtPayload)>,
366) {
367    for (name, payload) in changed {
368        state.beacon_change.fetch_add(1, Ordering::SeqCst);
369        state.registry.notify_monitors(&name, &payload).await;
370    }
371}
372
373// ---------------------------------------------------------------------------
374// GET_FIELD handler
375// ---------------------------------------------------------------------------
376
377async fn handle_get_field_request<S: PvStore>(
378    state: &ServerState<S>,
379    conn_state: &ConnState,
380    conn_id: u64,
381    payload: spvirit_codec::epics_decode::PvaGetFieldPayload,
382    version: u8,
383    is_be: bool,
384) {
385    if payload.is_server {
386        let resp = encode_get_field_error(
387            payload.cid,
388            "Unexpected server GET_FIELD payload",
389            version,
390            is_be,
391        );
392        state.registry.send_msg(conn_id, resp).await;
393        return;
394    }
395
396    let request_id = payload.ioid.unwrap_or(payload.cid);
397
398    let sid = payload
399        .sid
400        .or_else(|| conn_state.cid_to_sid.get(&payload.cid).copied())
401        .or_else(|| {
402            conn_state
403                .sid_to_pv
404                .contains_key(&payload.cid)
405                .then_some(payload.cid)
406        })
407        .or_else(|| {
408            (payload.cid == 0 && conn_state.sid_to_pv.len() == 1)
409                .then(|| conn_state.sid_to_pv.keys().copied().next())
410                .flatten()
411        });
412
413    if let Some(sid) = sid {
414        if let Some(pv_name) = conn_state.sid_to_pv.get(&sid) {
415            if let Some(nt) = get_nt_snapshot(state, pv_name).await {
416                let full_desc = nt_payload_desc(&nt);
417                let sub = payload
418                    .field_name
419                    .as_deref()
420                    .filter(|s| !s.is_empty());
421                let desc = if let Some(field_path) = sub {
422                    match extract_subfield_desc(&full_desc, field_path) {
423                        Some(sub_desc) => sub_desc,
424                        None => {
425                            let resp = encode_get_field_error(
426                                request_id,
427                                &format!("sub-field '{}' not found", field_path),
428                                version,
429                                is_be,
430                            );
431                            state.registry.send_msg(conn_id, resp).await;
432                            return;
433                        }
434                    }
435                } else {
436                    full_desc
437                };
438                let resp = encode_get_field_response(request_id, &desc, version, is_be);
439                dump_hex_packet(conn_id, "tx", "cmd=17 get_field", version, is_be, &resp);
440                state.registry.send_msg(conn_id, resp).await;
441                debug!(
442                    "Conn {}: get_field cid={} sid={:?} ioid={:?} resolved_sid={} pv='{}' field={:?}",
443                    conn_id, payload.cid, payload.sid, payload.ioid, sid, pv_name, payload.field_name
444                );
445                return;
446            }
447            let resp = encode_get_field_error(request_id, "PV not found", version, is_be);
448            state.registry.send_msg(conn_id, resp).await;
449            return;
450        }
451    }
452
453    if state.pvlist_mode != PvListMode::List {
454        let resp = encode_get_field_error(
455            request_id,
456            "GET_FIELD listing is disabled (set --pvlist-mode=list)",
457            version,
458            is_be,
459        );
460        state.registry.send_msg(conn_id, resp).await;
461        return;
462    }
463
464    let Some(pattern) = requested_pvlist_pattern(payload.field_name.as_deref()) else {
465        let resp = encode_get_field_error(
466            request_id,
467            "GET_FIELD requires a valid list pattern",
468            version,
469            is_be,
470        );
471        state.registry.send_msg(conn_id, resp).await;
472        return;
473    };
474
475    let all_names = state.store.list_pvs().await;
476    let mut names = collect_visible_pv_names(
477        &all_names,
478        state.pvlist_mode,
479        state.pvlist_allow_pattern.as_ref(),
480        state.pvlist_max,
481    );
482    if pattern != "*" {
483        names.retain(|name| wildcard_match(pattern, name));
484    }
485    if names.is_empty() {
486        let resp = encode_get_field_error(
487            request_id,
488            "No PVs matched list request",
489            version,
490            is_be,
491        );
492        state.registry.send_msg(conn_id, resp).await;
493        return;
494    }
495    let desc = build_pvlist_structure(&names);
496    let resp = encode_get_field_response(request_id, &desc, version, is_be);
497    dump_hex_packet(
498        conn_id,
499        "tx",
500        "cmd=17 get_field_list",
501        version,
502        is_be,
503        &resp,
504    );
505    state.registry.send_msg(conn_id, resp).await;
506    debug!(
507        "Conn {}: get_field list pattern='{}' returned {} entries",
508        conn_id,
509        pattern,
510        names.len()
511    );
512}
513
514// ---------------------------------------------------------------------------
515// Server RPC handler
516// ---------------------------------------------------------------------------
517
518async fn handle_server_rpc<S: PvStore>(
519    state: &ServerState<S>,
520    conn_id: u64,
521    ioid: u32,
522    subcmd: u8,
523    version: u8,
524    is_be: bool,
525) {
526    if state.pvlist_mode != PvListMode::List {
527        let resp = encode_op_status_error_response(
528            20,
529            ioid,
530            subcmd,
531            "RPC list endpoint disabled (set --pvlist-mode=list)",
532            version,
533            is_be,
534        );
535        state.registry.send_msg(conn_id, resp).await;
536        return;
537    }
538
539    let all_names = state.store.list_pvs().await;
540    let names = collect_visible_pv_names(
541        &all_names,
542        state.pvlist_mode,
543        state.pvlist_allow_pattern.as_ref(),
544        state.pvlist_max,
545    );
546    let payload = NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(names)));
547
548    let is_init = (subcmd & 0x08) != 0;
549    if is_init {
550        let resp = encode_op_status_response(20, ioid, subcmd, version, is_be);
551        state.registry.send_msg(conn_id, resp).await;
552        return;
553    }
554
555    let resp = encode_op_rpc_data_response_payload(ioid, subcmd, &payload, version, is_be);
556    state.registry.send_msg(conn_id, resp).await;
557}
558
559// ---------------------------------------------------------------------------
560// Control message handler (inside segmented stream)
561// ---------------------------------------------------------------------------
562
563async fn handle_control_message<S: PvStore>(
564    state: &ServerState<S>,
565    conn_id: u64,
566    header: &PvaHeader,
567) {
568    debug!(
569        "Conn {}: control (segmented) cmd={} data={}",
570        conn_id, header.command, header.payload_length
571    );
572    if header.command == 3 {
573        let resp = encode_control_message(
574            true,
575            header.flags.is_msb,
576            header.version,
577            4,
578            header.payload_length,
579        );
580        state.registry.send_msg(conn_id, resp).await;
581    }
582}
583
584// ---------------------------------------------------------------------------
585// UDP search handler
586// ---------------------------------------------------------------------------
587
588/// Run the UDP search responder.
589pub async fn run_udp_search<S: PvStore>(
590    state: Arc<ServerState<S>>,
591    addr: SocketAddr,
592    tcp_port: u16,
593    guid: [u8; 12],
594    advertise_ip: Option<IpAddr>,
595) -> Result<(), Box<dyn std::error::Error>> {
596    let socket = UdpSocket::bind(addr).await?;
597    socket.set_broadcast(true)?;
598    let mut buf = vec![0u8; 4096];
599
600    loop {
601        let (len, peer) = socket.recv_from(&mut buf).await?;
602        let data = &buf[..len];
603        let header = PvaHeader::new(data);
604        if header.flags.is_control || header.command != 3 {
605            continue;
606        }
607        let mut pkt = PvaPacket::new(data);
608        let Some(cmd) = pkt.decode_payload() else {
609            continue;
610        };
611        let version = pkt.header.version;
612        let is_be = pkt.header.flags.is_msb;
613        match cmd {
614            PvaPacketCommand::Search(payload) => {
615                debug!(
616                    "UDP search from {}: pv_count={} mask=0x{:02x}",
617                    peer,
618                    payload.pv_requests.len(),
619                    payload.mask
620                );
621                let accepts_tcp = payload.protocols.is_empty()
622                    || payload
623                        .protocols
624                        .iter()
625                        .any(|p| p.eq_ignore_ascii_case("tcp"));
626                if !accepts_tcp {
627                    debug!("UDP search: no compatible protocol (tcp not accepted)");
628                    continue;
629                }
630                let all_names = state.store.list_pvs().await;
631                let visible_names = collect_visible_pv_names(
632                    &all_names,
633                    state.pvlist_mode,
634                    state.pvlist_allow_pattern.as_ref(),
635                    state.pvlist_max,
636                );
637                let mut cids = Vec::new();
638                for (cid, name) in &payload.pv_requests {
639                    if state.store.has_pv(name).await
640                        || is_virtual_event_pv(name)
641                        || (is_pvlist_virtual_pv(name)
642                            && state.pvlist_mode == PvListMode::List)
643                        || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
644                    {
645                        cids.push(*cid);
646                        continue;
647                    }
648                    if state.pvlist_mode != PvListMode::Off
649                        && is_pattern_query(name)
650                        && visible_names.iter().any(|pv| wildcard_match(name, pv))
651                    {
652                        cids.push(*cid);
653                    }
654                }
655                let response_required = (payload.mask & 0x01) != 0;
656                let server_discovery_ping = payload.pv_requests.is_empty();
657                let found = server_discovery_ping || !cids.is_empty();
658                if !found && !response_required {
659                    debug!("UDP search: no matches and response not required");
660                    continue;
661                }
662                let resp_ip = if let Some(ip) = advertise_ip {
663                    ip
664                } else if !addr.ip().is_unspecified() {
665                    addr.ip()
666                } else if let Some(ip) = infer_udp_response_ip(peer) {
667                    debug!("UDP search: inferred response address {}", ip);
668                    ip
669                } else {
670                    IpAddr::V4(Ipv4Addr::UNSPECIFIED)
671                };
672                let addr_bytes = if resp_ip.is_unspecified() {
673                    debug!(
674                        "UDP search: responding with zero address (unspecified listen)"
675                    );
676                    [0u8; 16]
677                } else {
678                    ip_to_bytes(resp_ip)
679                };
680                let response = encode_search_response(
681                    guid,
682                    payload.seq,
683                    addr_bytes,
684                    tcp_port,
685                    "tcp",
686                    found,
687                    &cids,
688                    version,
689                    is_be,
690                );
691                let reply_target = search_reply_target(&payload.addr, payload.port, peer);
692                if let Err(e) = socket.send_to(&response, reply_target).await {
693                    debug!(
694                        "UDP search: failed sending {} matches to {}: {}",
695                        cids.len(),
696                        reply_target,
697                        e
698                    );
699                    continue;
700                }
701                debug!(
702                    "UDP search: responded found={} with {} matches to {}",
703                    found,
704                    cids.len(),
705                    reply_target
706                );
707            }
708            _ => {}
709        }
710    }
711}
712
713// ---------------------------------------------------------------------------
714// TCP server
715// ---------------------------------------------------------------------------
716
717/// Accept TCP connections and spawn a handler for each.
718pub async fn run_tcp_server<S: PvStore>(
719    state: Arc<ServerState<S>>,
720    addr: SocketAddr,
721    conn_timeout: Duration,
722) -> Result<(), Box<dyn std::error::Error>> {
723    let listener = TcpListener::bind(addr).await?;
724    let conn_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
725
726    loop {
727        let (stream, peer) = listener.accept().await?;
728        let id = conn_id.fetch_add(1, Ordering::SeqCst);
729        info!("TCP connection {} from {}", id, peer);
730        let state_clone = state.clone();
731        tokio::spawn(async move {
732            if let Err(e) = handle_connection(state_clone, stream, id, conn_timeout).await {
733                error!("Connection {} error: {}", id, e);
734            }
735        });
736    }
737}
738
739// ---------------------------------------------------------------------------
740// Core TCP connection handler
741// ---------------------------------------------------------------------------
742
743/// Handle a single PVA TCP connection.
744///
745/// This is the main protocol loop: handshake, then dispatch each command
746/// (CreateChannel, GET, PUT, PUT_GET, MONITOR, RPC, etc.) using the
747/// [`PvStore`] abstraction.
748pub async fn handle_connection<S: PvStore>(
749    state: Arc<ServerState<S>>,
750    stream: TcpStream,
751    conn_id: u64,
752    conn_timeout: Duration,
753) -> Result<(), Box<dyn std::error::Error>> {
754    let (mut reader, mut writer) = stream.into_split();
755    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(128);
756
757    {
758        let mut conns = state.registry.conns.lock().await;
759        conns.insert(conn_id, tx);
760    }
761
762    let writer_task = tokio::spawn(async move {
763        while let Some(msg) = rx.recv().await {
764            if writer.write_all(&msg).await.is_err() {
765                break;
766            }
767        }
768    });
769
770    let mut conn_state = ConnState::default();
771
772    // Per EPICS PVA protocol: send SET_BYTE_ORDER control message before validation.
773    let set_byte_order = encode_control_message(true, false, 2, 2, 0);
774    validate_encoded_packet(conn_id, "set_byte_order", &set_byte_order);
775    dump_hex_packet(
776        conn_id,
777        "tx",
778        "ctrl=2 set_byte_order",
779        2,
780        false,
781        &set_byte_order,
782    );
783    state.registry.send_msg(conn_id, set_byte_order).await;
784
785    // Server sends Connection Validation (cmd=1) next.
786    let server_validation = encode_connection_validation(16_384, 512, 0, "anonymous", 2, false);
787    validate_encoded_packet(conn_id, "server_validation_init", &server_validation);
788    dump_hex_packet(
789        conn_id,
790        "tx",
791        "cmd=1 server_validation_init",
792        2,
793        false,
794        &server_validation,
795    );
796    state.registry.send_msg(conn_id, server_validation).await;
797
798    let mut last_activity = Instant::now();
799
800    loop {
801        let mut header = [0u8; 8];
802        let elapsed = last_activity.elapsed();
803        if elapsed >= conn_timeout {
804            info!("Conn {} idle timeout", conn_id);
805            break;
806        }
807        let remaining = conn_timeout - elapsed;
808        let read_header = tokio::time::timeout(remaining, reader.read_exact(&mut header)).await;
809        match read_header {
810            Ok(Ok(_)) => {}
811            Ok(Err(_)) => break,
812            Err(_) => {
813                info!("Conn {} idle timeout", conn_id);
814                break;
815            }
816        }
817        let header_pkt = PvaPacket::new(&header);
818        let payload_len = if header_pkt.header.flags.is_control {
819            0usize
820        } else {
821            header_pkt.header.payload_length as usize
822        };
823        let mut payload = vec![0u8; payload_len];
824        if payload_len > 0 {
825            let elapsed = last_activity.elapsed();
826            if elapsed >= conn_timeout {
827                info!("Conn {} idle timeout", conn_id);
828                break;
829            }
830            let remaining = conn_timeout - elapsed;
831            let read_payload =
832                tokio::time::timeout(remaining, reader.read_exact(&mut payload)).await;
833            match read_payload {
834                Ok(Ok(_)) => {}
835                Ok(Err(_)) => break,
836                Err(_) => {
837                    info!("Conn {} idle timeout", conn_id);
838                    break;
839                }
840            }
841        }
842        last_activity = Instant::now();
843        let mut full = header.to_vec();
844        full.extend_from_slice(&payload);
845
846        // Segmented message reassembly
847        if header_pkt.header.flags.is_segmented != 0 && !header_pkt.header.flags.is_control {
848            debug!(
849                "Conn {}: segmented message cmd={} seg=0x{:02x}",
850                conn_id, header_pkt.header.command, header_pkt.header.flags.is_segmented
851            );
852            let mut payloads = vec![payload];
853            let mut seg_flags = header_pkt.header.flags;
854            while !seg_flags.is_last_segment {
855                let mut seg_header = [0u8; 8];
856                let elapsed = last_activity.elapsed();
857                if elapsed >= conn_timeout {
858                    info!("Conn {} idle timeout", conn_id);
859                    break;
860                }
861                let remaining = conn_timeout - elapsed;
862                let read_header =
863                    tokio::time::timeout(remaining, reader.read_exact(&mut seg_header)).await;
864                match read_header {
865                    Ok(Ok(_)) => {}
866                    Ok(Err(_)) => break,
867                    Err(_) => {
868                        info!("Conn {} idle timeout", conn_id);
869                        break;
870                    }
871                }
872
873                let seg_header_pkt = PvaPacket::new(&seg_header);
874                let seg_payload_len = if seg_header_pkt.header.flags.is_control {
875                    0usize
876                } else {
877                    seg_header_pkt.header.payload_length as usize
878                };
879                let mut seg_payload = vec![0u8; seg_payload_len];
880                if seg_payload_len > 0 {
881                    let elapsed = last_activity.elapsed();
882                    if elapsed >= conn_timeout {
883                        info!("Conn {} idle timeout", conn_id);
884                        break;
885                    }
886                    let remaining = conn_timeout - elapsed;
887                    let read_payload =
888                        tokio::time::timeout(remaining, reader.read_exact(&mut seg_payload)).await;
889                    match read_payload {
890                        Ok(Ok(_)) => {}
891                        Ok(Err(_)) => break,
892                        Err(_) => {
893                            info!("Conn {} idle timeout", conn_id);
894                            break;
895                        }
896                    }
897                }
898                last_activity = Instant::now();
899
900                if seg_header_pkt.header.flags.is_control {
901                    handle_control_message(&state, conn_id, &seg_header_pkt.header).await;
902                    continue;
903                }
904                if seg_header_pkt.header.flags.is_segmented == 0 {
905                    debug!(
906                        "Conn {}: segmented message interrupted by non-segmented cmd={}",
907                        conn_id, seg_header_pkt.header.command
908                    );
909                    break;
910                }
911                payloads.push(seg_payload);
912                seg_flags = seg_header_pkt.header.flags;
913            }
914            full = assemble_segmented_message(header, payloads);
915        }
916
917        let mut pkt = PvaPacket::new(&full);
918        let Some(cmd) = pkt.decode_payload() else {
919            continue;
920        };
921        let version = pkt.header.version;
922        let is_be = pkt.header.flags.is_msb;
923        let cmd_code = pkt.header.command;
924        let payload_slice = if full.len() >= 8 { &full[8..] } else { &[] };
925
926        // Connection Validation (cmd=1): respond with CONNECTION_VALIDATED (cmd=9).
927        if cmd_code == 1 {
928            dump_hex_packet(conn_id, "rx", "cmd=1 validation", version, is_be, &full);
929            let validation =
930                spvirit_codec::epics_decode::PvaConnectionValidationPayload::new(
931                    payload_slice,
932                    is_be,
933                    false,
934                );
935            if let Some(val) = validation {
936                debug!(
937                    "Conn {}: validation request (cmd=1) ver={} be={} buf={} qos={} authz={:?}",
938                    conn_id, version, is_be, val.buffer_size, val.qos, val.authz
939                );
940                let resp = spvirit_codec::spvirit_encode::encode_connection_validated(
941                    true, version, is_be,
942                );
943                validate_encoded_packet(conn_id, "conn_validated_resp", &resp);
944                dump_hex_packet(
945                    conn_id,
946                    "tx",
947                    "cmd=9 connection_validated",
948                    version,
949                    is_be,
950                    &resp,
951                );
952                state.registry.send_msg(conn_id, resp).await;
953                continue;
954            }
955        }
956        if cmd_code == 17 {
957            dump_hex_packet(conn_id, "rx", "cmd=17 get_field", version, is_be, &full);
958        }
959
960        match cmd {
961            PvaPacketCommand::Control(payload) => {
962                debug!("Conn {}: control {}", conn_id, payload);
963                if payload.command == 3 {
964                    let resp =
965                        encode_control_message(true, is_be, version, 4, payload.data);
966                    state.registry.send_msg(conn_id, resp).await;
967                }
968                continue;
969            }
970            PvaPacketCommand::ConnectionValidation(_) => {
971                debug!("Conn {}: validation request (decoded)", conn_id);
972            }
973            PvaPacketCommand::ConnectionValidated(_) => {
974                debug!("Conn {}: validation confirmed (decoded)", conn_id);
975            }
976            PvaPacketCommand::CreateChannel(payload) => {
977                debug!(
978                    "Conn {}: create_channel count={}",
979                    conn_id,
980                    payload.channels.len()
981                );
982                for (cid, pv_name) in payload.channels {
983                    if has_pv(&state, &pv_name).await {
984                        let sid = state.sid_counter.fetch_add(1, Ordering::SeqCst);
985                        conn_state.cid_to_sid.insert(cid, sid);
986                        conn_state.sid_to_pv.insert(sid, pv_name.clone());
987                        let resp =
988                            encode_create_channel_response(cid, sid, version, is_be);
989                        state.registry.send_msg(conn_id, resp).await;
990                        info!(
991                            "Conn {}: channel '{}' cid={} sid={}",
992                            conn_id, pv_name, cid, sid
993                        );
994                    } else {
995                        let resp =
996                            encode_create_channel_error(cid, "PV not found", version, is_be);
997                        state.registry.send_msg(conn_id, resp).await;
998                        info!(
999                            "Conn {}: channel '{}' not found (cid={})",
1000                            conn_id, pv_name, cid
1001                        );
1002                    }
1003                }
1004            }
1005            PvaPacketCommand::Op(payload) => {
1006                if payload.is_server {
1007                    continue;
1008                }
1009                let sid = payload.sid_or_cid;
1010                let ioid = payload.ioid;
1011                debug!(
1012                    "Conn {}: op cmd={} ioid={} sid={} sub=0x{:02x} body_len={}",
1013                    conn_id,
1014                    payload.command,
1015                    ioid,
1016                    sid,
1017                    payload.subcmd,
1018                    payload.body.len()
1019                );
1020                let Some(pv_name) = conn_state.sid_to_pv.get(&sid).cloned() else {
1021                    state
1022                        .registry
1023                        .send_msg(
1024                            conn_id,
1025                            encode_op_error(
1026                                payload.command,
1027                                ioid,
1028                                "Unknown SID",
1029                                version,
1030                                is_be,
1031                            ),
1032                        )
1033                        .await;
1034                    continue;
1035                };
1036
1037                let is_init = (payload.subcmd & 0x08) != 0;
1038
1039                match payload.command {
1040                    10 => {
1041                        // GET
1042                        let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1043                            state
1044                                .registry
1045                                .send_msg(
1046                                    conn_id,
1047                                    encode_op_error(
1048                                        payload.command,
1049                                        ioid,
1050                                        "PV not found",
1051                                        version,
1052                                        is_be,
1053                                    ),
1054                                )
1055                                .await;
1056                            continue;
1057                        };
1058                        if is_init {
1059                            let desc = nt_payload_desc(&nt);
1060                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1061                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1062                            let resp = encode_op_init_response_desc(
1063                                payload.command,
1064                                ioid,
1065                                0x08,
1066                                &desc,
1067                                version,
1068                                is_be,
1069                            );
1070                            state.registry.send_msg(conn_id, resp).await;
1071                            info!(
1072                                "Conn {}: get init pv='{}' ioid={}",
1073                                conn_id, pv_name, ioid
1074                            );
1075                        } else {
1076                            let resp = encode_op_get_data_response_payload(
1077                                ioid, &nt, version, is_be,
1078                            );
1079                            state.registry.send_msg(conn_id, resp).await;
1080                            debug!(
1081                                "Conn {}: get data pv='{}' ioid={}",
1082                                conn_id, pv_name, ioid
1083                            );
1084                        }
1085                    }
1086                    11 => {
1087                        // PUT
1088                        if is_init {
1089                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1090                                state
1091                                    .registry
1092                                    .send_msg(
1093                                        conn_id,
1094                                        encode_op_error(
1095                                            payload.command,
1096                                            ioid,
1097                                            "PV not found",
1098                                            version,
1099                                            is_be,
1100                                        ),
1101                                    )
1102                                    .await;
1103                                continue;
1104                            };
1105                            if !is_virtual_event_pv(&pv_name)
1106                                && !is_writable_pv(&state, &pv_name).await
1107                            {
1108                                let resp = encode_op_put_status_response(
1109                                    ioid,
1110                                    0x08,
1111                                    "Write access denied",
1112                                    version,
1113                                    is_be,
1114                                );
1115                                state.registry.send_msg(conn_id, resp).await;
1116                                continue;
1117                            }
1118                            let desc = nt_payload_desc(&nt);
1119                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1120                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1121                            let resp = encode_op_init_response_desc(
1122                                payload.command,
1123                                ioid,
1124                                0x08,
1125                                &desc,
1126                                version,
1127                                is_be,
1128                            );
1129                            state.registry.send_msg(conn_id, resp).await;
1130                            info!(
1131                                "Conn {}: put init pv='{}' ioid={}",
1132                                conn_id, pv_name, ioid
1133                            );
1134                        } else {
1135                            if (payload.subcmd & 0x40) != 0 {
1136                                if !is_virtual_event_pv(&pv_name)
1137                                    && !is_writable_pv(&state, &pv_name).await
1138                                {
1139                                    let resp = encode_op_put_status_response(
1140                                        ioid,
1141                                        0x40,
1142                                        "Write access denied",
1143                                        version,
1144                                        is_be,
1145                                    );
1146                                    state.registry.send_msg(conn_id, resp).await;
1147                                    continue;
1148                                }
1149                                if let Some(nt) =
1150                                    get_nt_snapshot(&state, &pv_name).await
1151                                {
1152                                    let resp = encode_op_put_getput_response_payload(
1153                                        ioid, &nt, version, is_be,
1154                                    );
1155                                    state.registry.send_msg(conn_id, resp).await;
1156                                    debug!(
1157                                        "Conn {}: put get-put pv='{}' ioid={}",
1158                                        conn_id, pv_name, ioid
1159                                    );
1160                                } else {
1161                                    state
1162                                        .registry
1163                                        .send_msg(
1164                                            conn_id,
1165                                            encode_op_error(
1166                                                payload.command,
1167                                                ioid,
1168                                                "PV not found",
1169                                                version,
1170                                                is_be,
1171                                            ),
1172                                        )
1173                                        .await;
1174                                }
1175                                continue;
1176                            }
1177                            let desc = match conn_state.ioid_to_desc.get(&ioid) {
1178                                Some(d) => d.clone(),
1179                                None => {
1180                                    state
1181                                        .registry
1182                                        .send_msg(
1183                                            conn_id,
1184                                            encode_op_error(
1185                                                payload.command,
1186                                                ioid,
1187                                                "PUT without init",
1188                                                version,
1189                                                is_be,
1190                                            ),
1191                                        )
1192                                        .await;
1193                                    continue;
1194                                }
1195                            };
1196                            let decoded = decode_put_body(&payload.body, &desc, is_be);
1197                            if let Some(value) = decoded.as_ref() {
1198                                match state.store.put_value(&pv_name, value).await {
1199                                    Ok(changed) => {
1200                                        notify_changed_records(&state, changed).await;
1201                                    }
1202                                    Err(msg) => {
1203                                        let resp = encode_op_put_status_response(
1204                                            ioid, 0x00, &msg, version, is_be,
1205                                        );
1206                                        state.registry.send_msg(conn_id, resp).await;
1207                                        continue;
1208                                    }
1209                                }
1210                            } else {
1211                                debug!(
1212                                    "Conn {}: put decode failed ioid={} body_len={}",
1213                                    conn_id,
1214                                    ioid,
1215                                    payload.body.len()
1216                                );
1217                            }
1218                            let resp = encode_op_put_response(ioid, version, is_be);
1219                            state.registry.send_msg(conn_id, resp).await;
1220                            debug!(
1221                                "Conn {}: put data pv='{}' ioid={}",
1222                                conn_id, pv_name, ioid
1223                            );
1224                        }
1225                    }
1226                    12 => {
1227                        // PUT_GET
1228                        if is_init {
1229                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1230                                state
1231                                    .registry
1232                                    .send_msg(
1233                                        conn_id,
1234                                        encode_op_error(
1235                                            payload.command,
1236                                            ioid,
1237                                            "PV not found",
1238                                            version,
1239                                            is_be,
1240                                        ),
1241                                    )
1242                                    .await;
1243                                continue;
1244                            };
1245                            if !is_virtual_event_pv(&pv_name)
1246                                && !is_writable_pv(&state, &pv_name).await
1247                            {
1248                                let resp = encode_op_put_get_init_error_response(
1249                                    ioid,
1250                                    "Write access denied",
1251                                    version,
1252                                    is_be,
1253                                );
1254                                state.registry.send_msg(conn_id, resp).await;
1255                                continue;
1256                            }
1257                            let desc = nt_payload_desc(&nt);
1258                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1259                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1260                            let resp = encode_op_put_get_init_response(
1261                                ioid, &desc, &desc, version, is_be,
1262                            );
1263                            state.registry.send_msg(conn_id, resp).await;
1264                            info!(
1265                                "Conn {}: put_get init pv='{}' ioid={}",
1266                                conn_id, pv_name, ioid
1267                            );
1268                        } else {
1269                            let desc = match conn_state.ioid_to_desc.get(&ioid) {
1270                                Some(d) => d.clone(),
1271                                None => {
1272                                    state
1273                                        .registry
1274                                        .send_msg(
1275                                            conn_id,
1276                                            encode_op_error(
1277                                                payload.command,
1278                                                ioid,
1279                                                "PUT_GET without init",
1280                                                version,
1281                                                is_be,
1282                                            ),
1283                                        )
1284                                        .await;
1285                                    continue;
1286                                }
1287                            };
1288                            let decoded = decode_put_body(&payload.body, &desc, is_be);
1289                            if let Some(value) = decoded.as_ref() {
1290                                match state.store.put_value(&pv_name, value).await {
1291                                    Ok(changed) => {
1292                                        notify_changed_records(&state, changed).await;
1293                                    }
1294                                    Err(msg) => {
1295                                        let resp = encode_op_put_get_data_error_response(
1296                                            ioid, &msg, version, is_be,
1297                                        );
1298                                        state.registry.send_msg(conn_id, resp).await;
1299                                        continue;
1300                                    }
1301                                }
1302                            } else {
1303                                debug!(
1304                                    "Conn {}: put_get decode failed ioid={} body_len={}",
1305                                    conn_id,
1306                                    ioid,
1307                                    payload.body.len()
1308                                );
1309                            }
1310                            if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1311                                let resp = encode_op_put_get_data_response_payload(
1312                                    ioid, &nt, version, is_be,
1313                                );
1314                                state.registry.send_msg(conn_id, resp).await;
1315                            } else {
1316                                state
1317                                    .registry
1318                                    .send_msg(
1319                                        conn_id,
1320                                        encode_op_error(
1321                                            payload.command,
1322                                            ioid,
1323                                            "PV not found",
1324                                            version,
1325                                            is_be,
1326                                        ),
1327                                    )
1328                                    .await;
1329                            }
1330                            debug!(
1331                                "Conn {}: put_get data pv='{}' ioid={}",
1332                                conn_id, pv_name, ioid
1333                            );
1334                        }
1335                    }
1336                    13 => {
1337                        // MONITOR
1338                        if is_init {
1339                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1340                                state
1341                                    .registry
1342                                    .send_msg(
1343                                        conn_id,
1344                                        encode_op_error(
1345                                            payload.command,
1346                                            ioid,
1347                                            "PV not found",
1348                                            version,
1349                                            is_be,
1350                                        ),
1351                                    )
1352                                    .await;
1353                                continue;
1354                            };
1355                            let desc = nt_payload_desc(&nt);
1356                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1357                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1358                            let pipeline_enabled = (payload.subcmd & 0x80) != 0;
1359                            let mut nfree = 0u32;
1360                            if pipeline_enabled && payload.body.len() >= 4 {
1361                                let start = payload.body.len() - 4;
1362                                nfree = if is_be {
1363                                    u32::from_be_bytes([
1364                                        payload.body[start],
1365                                        payload.body[start + 1],
1366                                        payload.body[start + 2],
1367                                        payload.body[start + 3],
1368                                    ])
1369                                } else {
1370                                    u32::from_le_bytes([
1371                                        payload.body[start],
1372                                        payload.body[start + 1],
1373                                        payload.body[start + 2],
1374                                        payload.body[start + 3],
1375                                    ])
1376                                };
1377                            }
1378                            let resp = encode_op_init_response_desc(
1379                                payload.command,
1380                                ioid,
1381                                0x08,
1382                                &desc,
1383                                version,
1384                                is_be,
1385                            );
1386                            state.registry.send_msg(conn_id, resp).await;
1387                            conn_state.ioid_to_monitor.insert(
1388                                ioid,
1389                                MonitorState {
1390                                    running: false,
1391                                    pipeline_enabled,
1392                                    nfree,
1393                                },
1394                            );
1395                            {
1396                                let mut monitors = state.registry.monitors.lock().await;
1397                                monitors
1398                                    .entry(pv_name.clone())
1399                                    .or_default()
1400                                    .push(MonitorSub {
1401                                        conn_id,
1402                                        ioid,
1403                                        version,
1404                                        is_be,
1405                                        running: false,
1406                                        pipeline_enabled,
1407                                        nfree,
1408                                    });
1409                            }
1410                            info!(
1411                                "Conn {}: monitor init pv='{}' ioid={}",
1412                                conn_id, pv_name, ioid
1413                            );
1414                        } else if (payload.subcmd & 0x10) != 0 {
1415                            // Monitor destroy
1416                            if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1417                                let resp = encode_monitor_data_response_payload(
1418                                    ioid, 0x10, &nt, version, is_be,
1419                                );
1420                                state.registry.send_msg(conn_id, resp).await;
1421                            }
1422                            state
1423                                .registry
1424                                .remove_monitor_subscription(conn_id, ioid, &pv_name)
1425                                .await;
1426                            conn_state.ioid_to_monitor.remove(&ioid);
1427                            conn_state.ioid_to_pv.remove(&ioid);
1428                            conn_state.ioid_to_desc.remove(&ioid);
1429                            info!("Conn {}: monitor end ioid={}", conn_id, ioid);
1430                        } else if (payload.subcmd & 0x04) != 0
1431                            || (payload.subcmd & 0x80) != 0
1432                        {
1433                            // Monitor start/stop/pipeline-ack
1434                            let start = (payload.subcmd & 0x44) == 0x44;
1435                            let stop = (payload.subcmd & 0x44) == 0x04;
1436                            let pipeline_ack = (payload.subcmd & 0x80) != 0;
1437                            let mut nfree = None;
1438                            if pipeline_ack && payload.body.len() >= 4 {
1439                                let v = if is_be {
1440                                    u32::from_be_bytes([
1441                                        payload.body[0],
1442                                        payload.body[1],
1443                                        payload.body[2],
1444                                        payload.body[3],
1445                                    ])
1446                                } else {
1447                                    u32::from_le_bytes([
1448                                        payload.body[0],
1449                                        payload.body[1],
1450                                        payload.body[2],
1451                                        payload.body[3],
1452                                    ])
1453                                };
1454                                nfree = Some(v);
1455                            }
1456                            let running = if start {
1457                                true
1458                            } else if stop {
1459                                false
1460                            } else {
1461                                conn_state
1462                                    .ioid_to_monitor
1463                                    .get(&ioid)
1464                                    .map(|m| m.running)
1465                                    .unwrap_or(true)
1466                            };
1467                            state
1468                                .registry
1469                                .update_monitor_subscription(
1470                                    conn_id,
1471                                    ioid,
1472                                    &pv_name,
1473                                    running,
1474                                    nfree,
1475                                    Some(pipeline_ack),
1476                                )
1477                                .await;
1478                            if let Some(mon) = conn_state.ioid_to_monitor.get_mut(&ioid) {
1479                                mon.running = running;
1480                                if pipeline_ack {
1481                                    mon.pipeline_enabled = true;
1482                                }
1483                                if let Some(v) = nfree {
1484                                    if pipeline_ack {
1485                                        mon.nfree = mon.nfree.saturating_add(v);
1486                                    } else {
1487                                        mon.nfree = v;
1488                                    }
1489                                }
1490                            }
1491                            info!(
1492                                "Conn {}: monitor {} ioid={} ack={} nfree={:?}",
1493                                conn_id,
1494                                if start {
1495                                    "start"
1496                                } else if stop {
1497                                    "stop"
1498                                } else {
1499                                    "ack"
1500                                },
1501                                ioid,
1502                                pipeline_ack,
1503                                nfree
1504                            );
1505                            if start {
1506                                if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1507                                    state
1508                                        .registry
1509                                        .send_monitor_update_for(
1510                                            &pv_name, conn_id, ioid, &nt,
1511                                        )
1512                                        .await;
1513                                }
1514                            }
1515                        }
1516                    }
1517                    20 => {
1518                        // RPC
1519                        if is_server_rpc_pv(&pv_name) {
1520                            handle_server_rpc(
1521                                &state,
1522                                conn_id,
1523                                ioid,
1524                                payload.subcmd,
1525                                version,
1526                                is_be,
1527                            )
1528                            .await;
1529                        } else {
1530                            state
1531                                .registry
1532                                .send_msg(
1533                                    conn_id,
1534                                    encode_op_error(
1535                                        payload.command,
1536                                        ioid,
1537                                        "Operation not supported",
1538                                        version,
1539                                        is_be,
1540                                    ),
1541                                )
1542                                .await;
1543                        }
1544                    }
1545                    14 | 16 => {
1546                        state
1547                            .registry
1548                            .send_msg(
1549                                conn_id,
1550                                encode_op_error(
1551                                    payload.command,
1552                                    ioid,
1553                                    "Operation not supported",
1554                                    version,
1555                                    is_be,
1556                                ),
1557                            )
1558                            .await;
1559                    }
1560                    _ => {
1561                        state
1562                            .registry
1563                            .send_msg(
1564                                conn_id,
1565                                encode_op_error(
1566                                    payload.command,
1567                                    ioid,
1568                                    "Operation not supported",
1569                                    version,
1570                                    is_be,
1571                                ),
1572                            )
1573                            .await;
1574                    }
1575                }
1576            }
1577            PvaPacketCommand::DestroyChannel(payload) => {
1578                let sid = payload.sid;
1579                let cid = payload.cid;
1580                conn_state.cid_to_sid.remove(&cid);
1581                conn_state.sid_to_pv.remove(&sid);
1582                info!(
1583                    "Conn {}: channel destroyed sid={} cid={}",
1584                    conn_id, sid, cid
1585                );
1586            }
1587            PvaPacketCommand::DestroyRequest(payload) => {
1588                let ioid = payload.request_id;
1589                if let Some(pv_name) = conn_state.ioid_to_pv.remove(&ioid) {
1590                    state
1591                        .registry
1592                        .remove_monitor_subscription(conn_id, ioid, &pv_name)
1593                        .await;
1594                    conn_state.ioid_to_desc.remove(&ioid);
1595                    conn_state.ioid_to_monitor.remove(&ioid);
1596                    info!("Conn {}: monitor unsubscribed ioid={}", conn_id, ioid);
1597                }
1598            }
1599            PvaPacketCommand::AuthNZ(_) => {
1600                let resp =
1601                    encode_message_error("AUTHNZ command is not supported", version, is_be);
1602                state.registry.send_msg(conn_id, resp).await;
1603            }
1604            PvaPacketCommand::AclChange(_) => {
1605                let resp =
1606                    encode_message_error("ACL_CHANGE command is not supported", version, is_be);
1607                state.registry.send_msg(conn_id, resp).await;
1608            }
1609            PvaPacketCommand::GetField(payload) => {
1610                handle_get_field_request(
1611                    &state,
1612                    &conn_state,
1613                    conn_id,
1614                    payload,
1615                    version,
1616                    is_be,
1617                )
1618                .await;
1619            }
1620            PvaPacketCommand::Echo(payload_bytes) => {
1621                let mut resp = encode_header(
1622                    true,
1623                    is_be,
1624                    false,
1625                    version,
1626                    2,
1627                    payload_bytes.len() as u32,
1628                );
1629                resp.extend_from_slice(&payload_bytes);
1630                state.registry.send_msg(conn_id, resp).await;
1631            }
1632            PvaPacketCommand::Message(_) => {
1633                let resp =
1634                    encode_message_error("MESSAGE command is not supported", version, is_be);
1635                state.registry.send_msg(conn_id, resp).await;
1636            }
1637            PvaPacketCommand::MultipleData(_) => {
1638                let resp = encode_message_error(
1639                    "MULTIPLE_DATA command is not supported",
1640                    version,
1641                    is_be,
1642                );
1643                state.registry.send_msg(conn_id, resp).await;
1644            }
1645            PvaPacketCommand::CancelRequest(_) => {
1646                let resp = encode_message_error(
1647                    "CANCEL_REQUEST command is not supported",
1648                    version,
1649                    is_be,
1650                );
1651                state.registry.send_msg(conn_id, resp).await;
1652            }
1653            PvaPacketCommand::OriginTag(_) => {
1654                let resp = encode_message_error(
1655                    "ORIGIN_TAG command is not supported",
1656                    version,
1657                    is_be,
1658                );
1659                state.registry.send_msg(conn_id, resp).await;
1660            }
1661            PvaPacketCommand::Search(_)
1662            | PvaPacketCommand::SearchResponse(_)
1663            | PvaPacketCommand::Beacon(_) => {
1664                let resp = encode_message_error(
1665                    "Unexpected command for server endpoint",
1666                    version,
1667                    is_be,
1668                );
1669                state.registry.send_msg(conn_id, resp).await;
1670            }
1671            PvaPacketCommand::Unknown(payload) => {
1672                let resp = encode_message_error(
1673                    &format!("Unknown command {}", payload.command),
1674                    version,
1675                    is_be,
1676                );
1677                state.registry.send_msg(conn_id, resp).await;
1678            }
1679        }
1680    }
1681
1682    state.registry.cleanup_connection(conn_id).await;
1683    let _ = writer_task.await;
1684    Ok(())
1685}