Skip to main content

spvirit_server/
handler.rs

1//! PVA protocol handler — the core TCP connection processor.
2//!
3//! [`handle_connection`] is generic over [`PvStore`], allowing any backend to
4//! serve PVs over the EPICS PVAccess protocol.
5
6use std::net::{IpAddr, Ipv4Addr, SocketAddr};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU16, AtomicU32, Ordering};
9use std::time::{Duration, Instant, SystemTime};
10
11use regex::Regex;
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::{TcpListener, TcpStream, UdpSocket};
14use tokio::sync::mpsc;
15use tracing::{debug, error, info};
16
17use spvirit_codec::epics_decode::{PvaHeader, PvaPacket, PvaPacketCommand};
18use spvirit_codec::spvd_decode::{StructureDesc, extract_subfield_desc};
19use spvirit_codec::spvd_encode::{
20    decode_pv_request_fields, filter_structure_desc, nt_payload_desc,
21};
22use spvirit_codec::spvirit_encode::{
23    encode_connection_validation, encode_control_message, encode_create_channel_error,
24    encode_create_channel_response, encode_get_field_error, encode_get_field_response,
25    encode_header, encode_message_error, encode_monitor_data_response_payload,
26    encode_op_data_response_filtered, encode_op_error, encode_op_get_data_response_payload,
27    encode_op_init_response_desc, encode_op_put_get_data_error_response,
28    encode_op_put_get_data_response_payload, encode_op_put_get_init_error_response,
29    encode_op_put_get_init_response, encode_op_put_getput_response_payload, encode_op_put_response,
30    encode_op_put_status_response, encode_op_rpc_data_response_payload,
31    encode_op_status_error_response, encode_op_status_response, encode_search_response,
32    ip_from_bytes, ip_to_bytes,
33};
34
35use spvirit_types::{NtPayload, NtScalar, NtScalarArray, ScalarArrayValue, ScalarValue};
36
37use crate::decode::{assemble_segmented_message, decode_put_body};
38use crate::monitor::MonitorRegistry;
39use crate::pvstore::PvStore;
40use crate::state::{ConnState, MonitorState, MonitorSub};
41
42// ---------------------------------------------------------------------------
43// PvListMode — controls virtual PV listing behaviour
44// ---------------------------------------------------------------------------
45
46/// Controls how the server exposes its PV directory.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum PvListMode {
49    /// No PV listing at all.
50    Off,
51    /// Respond to UDP search for known PVs only; no GET_FIELD listing.
52    Discover,
53    /// Full pvlist & server-RPC listing support.
54    List,
55}
56
57impl PvListMode {
58    pub fn parse(raw: &str) -> Result<Self, String> {
59        match raw.trim().to_ascii_lowercase().as_str() {
60            "off" => Ok(Self::Off),
61            "discover" => Ok(Self::Discover),
62            "list" => Ok(Self::List),
63            other => Err(format!(
64                "Invalid pvlist-mode '{}'; expected off|discover|list",
65                other
66            )),
67        }
68    }
69}
70
71// ---------------------------------------------------------------------------
72// Server shared state — generic over PvStore
73// ---------------------------------------------------------------------------
74
75/// Shared server state that is passed to every connection handler.
76pub struct ServerState<S: PvStore> {
77    pub store: Arc<S>,
78    pub registry: Arc<MonitorRegistry>,
79    pub sid_counter: AtomicU32,
80    pub beacon_change: Arc<AtomicU16>,
81    pub compute_alarms: bool,
82    pub pvlist_mode: PvListMode,
83    pub pvlist_max: usize,
84    pub pvlist_allow_pattern: Option<Regex>,
85}
86
87impl<S: PvStore> ServerState<S> {
88    pub fn new(
89        store: Arc<S>,
90        registry: Arc<MonitorRegistry>,
91        compute_alarms: bool,
92        pvlist_mode: PvListMode,
93        pvlist_max: usize,
94        pvlist_allow_pattern: Option<Regex>,
95    ) -> Self {
96        Self {
97            store,
98            registry,
99            sid_counter: AtomicU32::new(1),
100            beacon_change: Arc::new(AtomicU16::new(0)),
101            compute_alarms,
102            pvlist_mode,
103            pvlist_max,
104            pvlist_allow_pattern,
105        }
106    }
107}
108
109// ---------------------------------------------------------------------------
110// Virtual PV helpers
111// ---------------------------------------------------------------------------
112
113pub fn is_pvlist_virtual_pv(pv_name: &str) -> bool {
114    pv_name == "__pvlist"
115}
116
117pub fn is_server_rpc_pv(pv_name: &str) -> bool {
118    pv_name == "server"
119}
120
121pub fn is_virtual_event_pv(pv_name: &str) -> bool {
122    pv_name.starts_with("__event:")
123}
124
125pub fn virtual_event_nt(pv_name: &str) -> NtPayload {
126    NtPayload::Scalar(
127        NtScalar::from_value(ScalarValue::Bool(false))
128            .with_description(format!("Virtual event trigger for {}", pv_name)),
129    )
130}
131
132pub fn virtual_pvlist_nt(entries: Vec<String>) -> NtPayload {
133    NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(entries)))
134}
135
136// ---------------------------------------------------------------------------
137// Pattern / wildcard utilities
138// ---------------------------------------------------------------------------
139
140pub fn is_pattern_query(raw: &str) -> bool {
141    raw.contains('*') || raw.contains('?')
142}
143
144pub fn wildcard_match(pattern: &str, text: &str) -> bool {
145    let p = pattern.as_bytes();
146    let t = text.as_bytes();
147    let mut i = 0usize;
148    let mut j = 0usize;
149    let mut star: Option<usize> = None;
150    let mut match_j = 0usize;
151
152    while j < t.len() {
153        if i < p.len() && (p[i] == b'?' || p[i] == t[j]) {
154            i += 1;
155            j += 1;
156        } else if i < p.len() && p[i] == b'*' {
157            star = Some(i);
158            i += 1;
159            match_j = j;
160        } else if let Some(star_idx) = star {
161            i = star_idx + 1;
162            match_j += 1;
163            j = match_j;
164        } else {
165            return false;
166        }
167    }
168
169    while i < p.len() && p[i] == b'*' {
170        i += 1;
171    }
172    i == p.len()
173}
174
175pub fn collect_visible_pv_names(
176    all_names: &[String],
177    mode: PvListMode,
178    allow_pattern: Option<&Regex>,
179    max_items: usize,
180) -> Vec<String> {
181    let mut names: Vec<String> = all_names
182        .iter()
183        .filter(|name| {
184            allow_pattern
185                .as_ref()
186                .map(|re| re.is_match(name))
187                .unwrap_or(true)
188        })
189        .cloned()
190        .collect();
191    names.sort();
192    if names.len() > max_items {
193        names.truncate(max_items);
194    }
195    if mode == PvListMode::List && names.len() < max_items {
196        names.push("__pvlist".to_string());
197    }
198    names
199}
200
201fn build_pvlist_structure(names: &[String]) -> StructureDesc {
202    use spvirit_codec::spvd_decode::{FieldDesc, FieldType, TypeCode};
203    StructureDesc {
204        struct_id: Some("epics:pva/pvlist:1.0".to_string()),
205        fields: names
206            .iter()
207            .map(|name| FieldDesc {
208                name: name.clone(),
209                field_type: FieldType::Scalar(TypeCode::Boolean),
210            })
211            .collect(),
212    }
213}
214
215fn requested_pvlist_pattern(field_name: Option<&str>) -> Option<&str> {
216    let raw = field_name.map(str::trim).unwrap_or("");
217    if raw.is_empty() || raw == "*" || raw == "__pvlist" || raw.eq_ignore_ascii_case("pvlist") {
218        return Some("*");
219    }
220    if is_pattern_query(raw) {
221        return Some(raw);
222    }
223    None
224}
225
226// ---------------------------------------------------------------------------
227// Network helpers
228// ---------------------------------------------------------------------------
229
230pub fn search_reply_target(addr: &[u8; 16], port: u16, peer: SocketAddr) -> SocketAddr {
231    let target_port = if port != 0 { port } else { peer.port() };
232    let target_ip = ip_from_bytes(addr)
233        .filter(|ip| !ip.is_unspecified())
234        .unwrap_or_else(|| peer.ip());
235    SocketAddr::new(target_ip, target_port)
236}
237
238pub fn infer_udp_response_ip(peer: SocketAddr) -> Option<IpAddr> {
239    let bind_addr = if peer.is_ipv4() {
240        "0.0.0.0:0"
241    } else {
242        "[::]:0"
243    };
244    let sock = std::net::UdpSocket::bind(bind_addr).ok()?;
245    sock.connect(peer).ok()?;
246    let local = sock.local_addr().ok()?;
247    if local.ip().is_unspecified() {
248        None
249    } else {
250        Some(local.ip())
251    }
252}
253
254pub fn rand_guid() -> [u8; 12] {
255    let now = SystemTime::now()
256        .duration_since(SystemTime::UNIX_EPOCH)
257        .unwrap_or_default();
258    let mut guid = [0u8; 12];
259    let bytes = now.as_nanos().to_le_bytes();
260    guid.copy_from_slice(&bytes[0..12]);
261    guid
262}
263
264// ---------------------------------------------------------------------------
265// Debug utilities
266// ---------------------------------------------------------------------------
267
268pub fn validate_encoded_packet(conn_id: u64, label: &str, bytes: &[u8]) {
269    let mut pkt = PvaPacket::new(bytes);
270    let decoded = pkt.decode_payload();
271    match decoded {
272        Some(PvaPacketCommand::ConnectionValidation(payload)) => {
273            debug!(
274                "Conn {}: {} decoded as cmd=1 buffer_size={} qos={} authz={:?}",
275                conn_id, label, payload.buffer_size, payload.qos, payload.authz
276            );
277        }
278        Some(PvaPacketCommand::ConnectionValidated(_)) => {
279            debug!("Conn {}: {} decoded as cmd=9", conn_id, label);
280        }
281        Some(other) => {
282            debug!("Conn {}: {} decoded as {:?}", conn_id, label, other);
283        }
284        None => {
285            debug!("Conn {}: {} failed to decode", conn_id, label);
286        }
287    }
288}
289
290pub fn dump_hex_packet(
291    conn_id: u64,
292    dir: &str,
293    label: &str,
294    version: u8,
295    is_be: bool,
296    bytes: &[u8],
297) {
298    debug!(
299        "Conn {}: {} {} ver={} be={} len={}",
300        conn_id,
301        dir,
302        label,
303        version,
304        is_be,
305        bytes.len()
306    );
307    let mut offset = 0usize;
308    while offset < bytes.len() {
309        let end = usize::min(offset + 16, bytes.len());
310        let chunk = &bytes[offset..end];
311        let mut line = String::new();
312        for (i, b) in chunk.iter().enumerate() {
313            if i > 0 {
314                line.push(' ');
315            }
316            line.push_str(&format!("{:02x}", b));
317        }
318        debug!("Conn {}: {:04x} {}", conn_id, offset, line);
319        offset += 16;
320    }
321}
322
323// ---------------------------------------------------------------------------
324// Store-based snapshot/writable helpers (delegate to PvStore + virtual PVs)
325// ---------------------------------------------------------------------------
326
327async fn get_nt_snapshot<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> Option<NtPayload> {
328    if is_pvlist_virtual_pv(pv_name) {
329        if state.pvlist_mode != PvListMode::List {
330            return None;
331        }
332        let all_names = state.store.list_pvs().await;
333        let names = collect_visible_pv_names(
334            &all_names,
335            state.pvlist_mode,
336            state.pvlist_allow_pattern.as_ref(),
337            state.pvlist_max,
338        );
339        return Some(virtual_pvlist_nt(names));
340    }
341    if is_virtual_event_pv(pv_name) {
342        return Some(virtual_event_nt(pv_name));
343    }
344    state.store.get_snapshot(pv_name).await
345}
346
347async fn is_writable_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
348    if is_virtual_event_pv(pv_name) {
349        return true;
350    }
351    state.store.is_writable(pv_name).await
352}
353
354async fn has_pv<S: PvStore>(state: &ServerState<S>, pv_name: &str) -> bool {
355    state.store.has_pv(pv_name).await
356        || is_virtual_event_pv(pv_name)
357        || (is_pvlist_virtual_pv(pv_name) && state.pvlist_mode == PvListMode::List)
358        || (is_server_rpc_pv(pv_name) && state.pvlist_mode != PvListMode::Off)
359}
360
361// ---------------------------------------------------------------------------
362// Notify helpers
363// ---------------------------------------------------------------------------
364
365async fn notify_changed_records<S: PvStore>(
366    state: &ServerState<S>,
367    changed: Vec<(String, NtPayload)>,
368) {
369    for (name, payload) in changed {
370        state.beacon_change.fetch_add(1, Ordering::SeqCst);
371        state.registry.notify_monitors(&name, &payload).await;
372    }
373}
374
375// ---------------------------------------------------------------------------
376// GET_FIELD handler
377// ---------------------------------------------------------------------------
378
379async fn handle_get_field_request<S: PvStore>(
380    state: &ServerState<S>,
381    conn_state: &ConnState,
382    conn_id: u64,
383    payload: spvirit_codec::epics_decode::PvaGetFieldPayload,
384    version: u8,
385    is_be: bool,
386) {
387    if payload.is_server {
388        let resp = encode_get_field_error(
389            payload.cid,
390            "Unexpected server GET_FIELD payload",
391            version,
392            is_be,
393        );
394        state.registry.send_msg(conn_id, resp).await;
395        return;
396    }
397
398    let request_id = payload.ioid.unwrap_or(payload.cid);
399
400    let sid = payload
401        .sid
402        .or_else(|| conn_state.cid_to_sid.get(&payload.cid).copied())
403        .or_else(|| {
404            conn_state
405                .sid_to_pv
406                .contains_key(&payload.cid)
407                .then_some(payload.cid)
408        })
409        .or_else(|| {
410            (payload.cid == 0 && conn_state.sid_to_pv.len() == 1)
411                .then(|| conn_state.sid_to_pv.keys().copied().next())
412                .flatten()
413        });
414
415    if let Some(sid) = sid {
416        if let Some(pv_name) = conn_state.sid_to_pv.get(&sid) {
417            if let Some(nt) = get_nt_snapshot(state, pv_name).await {
418                let full_desc = nt_payload_desc(&nt);
419                let sub = payload.field_name.as_deref().filter(|s| !s.is_empty());
420                let desc = if let Some(field_path) = sub {
421                    match extract_subfield_desc(&full_desc, field_path) {
422                        Some(sub_desc) => sub_desc,
423                        None => {
424                            let resp = encode_get_field_error(
425                                request_id,
426                                &format!("sub-field '{}' not found", field_path),
427                                version,
428                                is_be,
429                            );
430                            state.registry.send_msg(conn_id, resp).await;
431                            return;
432                        }
433                    }
434                } else {
435                    full_desc
436                };
437                let resp = encode_get_field_response(request_id, &desc, version, is_be);
438                dump_hex_packet(conn_id, "tx", "cmd=17 get_field", version, is_be, &resp);
439                state.registry.send_msg(conn_id, resp).await;
440                debug!(
441                    "Conn {}: get_field cid={} sid={:?} ioid={:?} resolved_sid={} pv='{}' field={:?}",
442                    conn_id,
443                    payload.cid,
444                    payload.sid,
445                    payload.ioid,
446                    sid,
447                    pv_name,
448                    payload.field_name
449                );
450                return;
451            }
452            let resp = encode_get_field_error(request_id, "PV not found", version, is_be);
453            state.registry.send_msg(conn_id, resp).await;
454            return;
455        }
456    }
457
458    if state.pvlist_mode != PvListMode::List {
459        let resp = encode_get_field_error(
460            request_id,
461            "GET_FIELD listing is disabled (set --pvlist-mode=list)",
462            version,
463            is_be,
464        );
465        state.registry.send_msg(conn_id, resp).await;
466        return;
467    }
468
469    let Some(pattern) = requested_pvlist_pattern(payload.field_name.as_deref()) else {
470        let resp = encode_get_field_error(
471            request_id,
472            "GET_FIELD requires a valid list pattern",
473            version,
474            is_be,
475        );
476        state.registry.send_msg(conn_id, resp).await;
477        return;
478    };
479
480    let all_names = state.store.list_pvs().await;
481    let mut names = collect_visible_pv_names(
482        &all_names,
483        state.pvlist_mode,
484        state.pvlist_allow_pattern.as_ref(),
485        state.pvlist_max,
486    );
487    if pattern != "*" {
488        names.retain(|name| wildcard_match(pattern, name));
489    }
490    if names.is_empty() {
491        let resp =
492            encode_get_field_error(request_id, "No PVs matched list request", version, is_be);
493        state.registry.send_msg(conn_id, resp).await;
494        return;
495    }
496    let desc = build_pvlist_structure(&names);
497    let resp = encode_get_field_response(request_id, &desc, version, is_be);
498    dump_hex_packet(
499        conn_id,
500        "tx",
501        "cmd=17 get_field_list",
502        version,
503        is_be,
504        &resp,
505    );
506    state.registry.send_msg(conn_id, resp).await;
507    debug!(
508        "Conn {}: get_field list pattern='{}' returned {} entries",
509        conn_id,
510        pattern,
511        names.len()
512    );
513}
514
515// ---------------------------------------------------------------------------
516// Server RPC handler
517// ---------------------------------------------------------------------------
518
519async fn handle_server_rpc<S: PvStore>(
520    state: &ServerState<S>,
521    conn_id: u64,
522    ioid: u32,
523    subcmd: u8,
524    version: u8,
525    is_be: bool,
526) {
527    if state.pvlist_mode != PvListMode::List {
528        let resp = encode_op_status_error_response(
529            20,
530            ioid,
531            subcmd,
532            "RPC list endpoint disabled (set --pvlist-mode=list)",
533            version,
534            is_be,
535        );
536        state.registry.send_msg(conn_id, resp).await;
537        return;
538    }
539
540    let all_names = state.store.list_pvs().await;
541    let names = collect_visible_pv_names(
542        &all_names,
543        state.pvlist_mode,
544        state.pvlist_allow_pattern.as_ref(),
545        state.pvlist_max,
546    );
547    let payload = NtPayload::ScalarArray(NtScalarArray::from_value(ScalarArrayValue::Str(names)));
548
549    let is_init = (subcmd & 0x08) != 0;
550    if is_init {
551        let resp = encode_op_status_response(20, ioid, subcmd, version, is_be);
552        state.registry.send_msg(conn_id, resp).await;
553        return;
554    }
555
556    let resp = encode_op_rpc_data_response_payload(ioid, subcmd, &payload, version, is_be);
557    state.registry.send_msg(conn_id, resp).await;
558}
559
560// ---------------------------------------------------------------------------
561// Control message handler (inside segmented stream)
562// ---------------------------------------------------------------------------
563
564async fn handle_control_message<S: PvStore>(
565    state: &ServerState<S>,
566    conn_id: u64,
567    header: &PvaHeader,
568) {
569    debug!(
570        "Conn {}: control (segmented) cmd={} data={}",
571        conn_id, header.command, header.payload_length
572    );
573    if header.command == 3 {
574        let resp = encode_control_message(
575            true,
576            header.flags.is_msb,
577            header.version,
578            4,
579            header.payload_length,
580        );
581        state.registry.send_msg(conn_id, resp).await;
582    }
583}
584
585// ---------------------------------------------------------------------------
586// UDP search handler
587// ---------------------------------------------------------------------------
588
589/// Run the UDP search responder.
590pub async fn run_udp_search<S: PvStore>(
591    state: Arc<ServerState<S>>,
592    addr: SocketAddr,
593    tcp_port: u16,
594    guid: [u8; 12],
595    advertise_ip: Option<IpAddr>,
596) -> Result<(), Box<dyn std::error::Error>> {
597    let socket = UdpSocket::bind(addr).await?;
598    socket.set_broadcast(true)?;
599    let mut buf = vec![0u8; 4096];
600
601    loop {
602        let (len, peer) = socket.recv_from(&mut buf).await?;
603        let data = &buf[..len];
604        let header = PvaHeader::new(data);
605        if header.flags.is_control || header.command != 3 {
606            continue;
607        }
608        let mut pkt = PvaPacket::new(data);
609        let Some(cmd) = pkt.decode_payload() else {
610            continue;
611        };
612        let version = pkt.header.version;
613        let is_be = pkt.header.flags.is_msb;
614        match cmd {
615            PvaPacketCommand::Search(payload) => {
616                debug!(
617                    "UDP search from {}: pv_count={} mask=0x{:02x}",
618                    peer,
619                    payload.pv_requests.len(),
620                    payload.mask
621                );
622                let accepts_tcp = payload.protocols.is_empty()
623                    || payload
624                        .protocols
625                        .iter()
626                        .any(|p| p.eq_ignore_ascii_case("tcp"));
627                if !accepts_tcp {
628                    debug!("UDP search: no compatible protocol (tcp not accepted)");
629                    continue;
630                }
631                let all_names = state.store.list_pvs().await;
632                let visible_names = collect_visible_pv_names(
633                    &all_names,
634                    state.pvlist_mode,
635                    state.pvlist_allow_pattern.as_ref(),
636                    state.pvlist_max,
637                );
638                let mut cids = Vec::new();
639                for (cid, name) in &payload.pv_requests {
640                    if state.store.has_pv(name).await
641                        || is_virtual_event_pv(name)
642                        || (is_pvlist_virtual_pv(name) && state.pvlist_mode == PvListMode::List)
643                        || (is_server_rpc_pv(name) && state.pvlist_mode != PvListMode::Off)
644                    {
645                        cids.push(*cid);
646                        continue;
647                    }
648                    if state.pvlist_mode != PvListMode::Off
649                        && is_pattern_query(name)
650                        && visible_names.iter().any(|pv| wildcard_match(name, pv))
651                    {
652                        cids.push(*cid);
653                    }
654                }
655                let response_required = (payload.mask & 0x01) != 0;
656                let server_discovery_ping = payload.pv_requests.is_empty();
657                let found = server_discovery_ping || !cids.is_empty();
658                if !found && !response_required {
659                    debug!("UDP search: no matches and response not required");
660                    continue;
661                }
662                let resp_ip = if let Some(ip) = advertise_ip {
663                    ip
664                } else if !addr.ip().is_unspecified() {
665                    addr.ip()
666                } else if let Some(ip) = infer_udp_response_ip(peer) {
667                    debug!("UDP search: inferred response address {}", ip);
668                    ip
669                } else {
670                    IpAddr::V4(Ipv4Addr::UNSPECIFIED)
671                };
672                let addr_bytes = if resp_ip.is_unspecified() {
673                    debug!("UDP search: responding with zero address (unspecified listen)");
674                    [0u8; 16]
675                } else {
676                    ip_to_bytes(resp_ip)
677                };
678                let response = encode_search_response(
679                    guid,
680                    payload.seq,
681                    addr_bytes,
682                    tcp_port,
683                    "tcp",
684                    found,
685                    &cids,
686                    version,
687                    is_be,
688                );
689                let reply_target = search_reply_target(&payload.addr, payload.port, peer);
690                if let Err(e) = socket.send_to(&response, reply_target).await {
691                    debug!(
692                        "UDP search: failed sending {} matches to {}: {}",
693                        cids.len(),
694                        reply_target,
695                        e
696                    );
697                    continue;
698                }
699                debug!(
700                    "UDP search: responded found={} with {} matches to {}",
701                    found,
702                    cids.len(),
703                    reply_target
704                );
705            }
706            _ => {}
707        }
708    }
709}
710
711// ---------------------------------------------------------------------------
712// TCP server
713// ---------------------------------------------------------------------------
714
715/// Accept TCP connections and spawn a handler for each.
716pub async fn run_tcp_server<S: PvStore>(
717    state: Arc<ServerState<S>>,
718    addr: SocketAddr,
719    conn_timeout: Duration,
720) -> Result<(), Box<dyn std::error::Error>> {
721    let listener = TcpListener::bind(addr).await?;
722    let conn_id = Arc::new(std::sync::atomic::AtomicU64::new(1));
723
724    loop {
725        let (stream, peer) = listener.accept().await?;
726        let id = conn_id.fetch_add(1, Ordering::SeqCst);
727        info!("TCP connection {} from {}", id, peer);
728        let state_clone = state.clone();
729        tokio::spawn(async move {
730            if let Err(e) = handle_connection(state_clone, stream, id, conn_timeout).await {
731                error!("Connection {} error: {}", id, e);
732            }
733        });
734    }
735}
736
737// ---------------------------------------------------------------------------
738// Core TCP connection handler
739// ---------------------------------------------------------------------------
740
741/// Handle a single PVA TCP connection.
742///
743/// This is the main protocol loop: handshake, then dispatch each command
744/// (CreateChannel, GET, PUT, PUT_GET, MONITOR, RPC, etc.) using the
745/// [`PvStore`] abstraction.
746pub async fn handle_connection<S: PvStore>(
747    state: Arc<ServerState<S>>,
748    stream: TcpStream,
749    conn_id: u64,
750    conn_timeout: Duration,
751) -> Result<(), Box<dyn std::error::Error>> {
752    let (mut reader, mut writer) = stream.into_split();
753    let (tx, mut rx) = mpsc::channel::<Vec<u8>>(128);
754
755    {
756        let mut conns = state.registry.conns.lock().await;
757        conns.insert(conn_id, tx);
758    }
759
760    let writer_task = tokio::spawn(async move {
761        while let Some(msg) = rx.recv().await {
762            if writer.write_all(&msg).await.is_err() {
763                break;
764            }
765        }
766    });
767
768    let mut conn_state = ConnState::default();
769
770    // Per EPICS PVA protocol: send SET_BYTE_ORDER control message before validation.
771    let set_byte_order = encode_control_message(true, false, 2, 2, 0);
772    validate_encoded_packet(conn_id, "set_byte_order", &set_byte_order);
773    dump_hex_packet(
774        conn_id,
775        "tx",
776        "ctrl=2 set_byte_order",
777        2,
778        false,
779        &set_byte_order,
780    );
781    state.registry.send_msg(conn_id, set_byte_order).await;
782
783    // Server sends Connection Validation (cmd=1) next.
784    let server_validation = encode_connection_validation(16_384, 512, 0, "anonymous", 2, false);
785    validate_encoded_packet(conn_id, "server_validation_init", &server_validation);
786    dump_hex_packet(
787        conn_id,
788        "tx",
789        "cmd=1 server_validation_init",
790        2,
791        false,
792        &server_validation,
793    );
794    state.registry.send_msg(conn_id, server_validation).await;
795
796    let mut last_activity = Instant::now();
797
798    loop {
799        let mut header = [0u8; 8];
800        let elapsed = last_activity.elapsed();
801        if elapsed >= conn_timeout {
802            info!("Conn {} idle timeout", conn_id);
803            break;
804        }
805        let remaining = conn_timeout - elapsed;
806        let read_header = tokio::time::timeout(remaining, reader.read_exact(&mut header)).await;
807        match read_header {
808            Ok(Ok(_)) => {}
809            Ok(Err(_)) => break,
810            Err(_) => {
811                info!("Conn {} idle timeout", conn_id);
812                break;
813            }
814        }
815        let header_pkt = PvaPacket::new(&header);
816        let payload_len = if header_pkt.header.flags.is_control {
817            0usize
818        } else {
819            header_pkt.header.payload_length as usize
820        };
821        let mut payload = vec![0u8; payload_len];
822        if payload_len > 0 {
823            let elapsed = last_activity.elapsed();
824            if elapsed >= conn_timeout {
825                info!("Conn {} idle timeout", conn_id);
826                break;
827            }
828            let remaining = conn_timeout - elapsed;
829            let read_payload =
830                tokio::time::timeout(remaining, reader.read_exact(&mut payload)).await;
831            match read_payload {
832                Ok(Ok(_)) => {}
833                Ok(Err(_)) => break,
834                Err(_) => {
835                    info!("Conn {} idle timeout", conn_id);
836                    break;
837                }
838            }
839        }
840        last_activity = Instant::now();
841        let mut full = header.to_vec();
842        full.extend_from_slice(&payload);
843
844        // Segmented message reassembly
845        if header_pkt.header.flags.is_segmented != 0 && !header_pkt.header.flags.is_control {
846            debug!(
847                "Conn {}: segmented message cmd={} seg=0x{:02x}",
848                conn_id, header_pkt.header.command, header_pkt.header.flags.is_segmented
849            );
850            let mut payloads = vec![payload];
851            let mut seg_flags = header_pkt.header.flags;
852            while !seg_flags.is_last_segment {
853                let mut seg_header = [0u8; 8];
854                let elapsed = last_activity.elapsed();
855                if elapsed >= conn_timeout {
856                    info!("Conn {} idle timeout", conn_id);
857                    break;
858                }
859                let remaining = conn_timeout - elapsed;
860                let read_header =
861                    tokio::time::timeout(remaining, reader.read_exact(&mut seg_header)).await;
862                match read_header {
863                    Ok(Ok(_)) => {}
864                    Ok(Err(_)) => break,
865                    Err(_) => {
866                        info!("Conn {} idle timeout", conn_id);
867                        break;
868                    }
869                }
870
871                let seg_header_pkt = PvaPacket::new(&seg_header);
872                let seg_payload_len = if seg_header_pkt.header.flags.is_control {
873                    0usize
874                } else {
875                    seg_header_pkt.header.payload_length as usize
876                };
877                let mut seg_payload = vec![0u8; seg_payload_len];
878                if seg_payload_len > 0 {
879                    let elapsed = last_activity.elapsed();
880                    if elapsed >= conn_timeout {
881                        info!("Conn {} idle timeout", conn_id);
882                        break;
883                    }
884                    let remaining = conn_timeout - elapsed;
885                    let read_payload =
886                        tokio::time::timeout(remaining, reader.read_exact(&mut seg_payload)).await;
887                    match read_payload {
888                        Ok(Ok(_)) => {}
889                        Ok(Err(_)) => break,
890                        Err(_) => {
891                            info!("Conn {} idle timeout", conn_id);
892                            break;
893                        }
894                    }
895                }
896                last_activity = Instant::now();
897
898                if seg_header_pkt.header.flags.is_control {
899                    handle_control_message(&state, conn_id, &seg_header_pkt.header).await;
900                    continue;
901                }
902                if seg_header_pkt.header.flags.is_segmented == 0 {
903                    debug!(
904                        "Conn {}: segmented message interrupted by non-segmented cmd={}",
905                        conn_id, seg_header_pkt.header.command
906                    );
907                    break;
908                }
909                payloads.push(seg_payload);
910                seg_flags = seg_header_pkt.header.flags;
911            }
912            full = assemble_segmented_message(header, payloads);
913        }
914
915        let mut pkt = PvaPacket::new(&full);
916        let Some(cmd) = pkt.decode_payload() else {
917            continue;
918        };
919        let version = pkt.header.version;
920        let is_be = pkt.header.flags.is_msb;
921        let cmd_code = pkt.header.command;
922        let payload_slice = if full.len() >= 8 { &full[8..] } else { &[] };
923
924        // Connection Validation (cmd=1): respond with CONNECTION_VALIDATED (cmd=9).
925        if cmd_code == 1 {
926            dump_hex_packet(conn_id, "rx", "cmd=1 validation", version, is_be, &full);
927            let validation = spvirit_codec::epics_decode::PvaConnectionValidationPayload::new(
928                payload_slice,
929                is_be,
930                false,
931            );
932            if let Some(val) = validation {
933                debug!(
934                    "Conn {}: validation request (cmd=1) ver={} be={} buf={} qos={} authz={:?}",
935                    conn_id, version, is_be, val.buffer_size, val.qos, val.authz
936                );
937                let resp = spvirit_codec::spvirit_encode::encode_connection_validated(
938                    true, version, is_be,
939                );
940                validate_encoded_packet(conn_id, "conn_validated_resp", &resp);
941                dump_hex_packet(
942                    conn_id,
943                    "tx",
944                    "cmd=9 connection_validated",
945                    version,
946                    is_be,
947                    &resp,
948                );
949                state.registry.send_msg(conn_id, resp).await;
950                continue;
951            }
952        }
953        if cmd_code == 17 {
954            dump_hex_packet(conn_id, "rx", "cmd=17 get_field", version, is_be, &full);
955        }
956
957        match cmd {
958            PvaPacketCommand::Control(payload) => {
959                debug!("Conn {}: control {}", conn_id, payload);
960                if payload.command == 3 {
961                    let resp = encode_control_message(true, is_be, version, 4, payload.data);
962                    state.registry.send_msg(conn_id, resp).await;
963                }
964                continue;
965            }
966            PvaPacketCommand::ConnectionValidation(_) => {
967                debug!("Conn {}: validation request (decoded)", conn_id);
968            }
969            PvaPacketCommand::ConnectionValidated(_) => {
970                debug!("Conn {}: validation confirmed (decoded)", conn_id);
971            }
972            PvaPacketCommand::CreateChannel(payload) => {
973                debug!(
974                    "Conn {}: create_channel count={}",
975                    conn_id,
976                    payload.channels.len()
977                );
978                for (cid, pv_name) in payload.channels {
979                    if has_pv(&state, &pv_name).await {
980                        let sid = state.sid_counter.fetch_add(1, Ordering::SeqCst);
981                        conn_state.cid_to_sid.insert(cid, sid);
982                        conn_state.sid_to_pv.insert(sid, pv_name.clone());
983                        let resp = encode_create_channel_response(cid, sid, version, is_be);
984                        state.registry.send_msg(conn_id, resp).await;
985                        info!(
986                            "Conn {}: channel '{}' cid={} sid={}",
987                            conn_id, pv_name, cid, sid
988                        );
989                    } else {
990                        let resp = encode_create_channel_error(cid, "PV not found", version, is_be);
991                        state.registry.send_msg(conn_id, resp).await;
992                        info!(
993                            "Conn {}: channel '{}' not found (cid={})",
994                            conn_id, pv_name, cid
995                        );
996                    }
997                }
998            }
999            PvaPacketCommand::Op(payload) => {
1000                if payload.is_server {
1001                    continue;
1002                }
1003                let sid = payload.sid_or_cid;
1004                let ioid = payload.ioid;
1005                debug!(
1006                    "Conn {}: op cmd={} ioid={} sid={} sub=0x{:02x} body_len={}",
1007                    conn_id,
1008                    payload.command,
1009                    ioid,
1010                    sid,
1011                    payload.subcmd,
1012                    payload.body.len()
1013                );
1014                let Some(pv_name) = conn_state.sid_to_pv.get(&sid).cloned() else {
1015                    state
1016                        .registry
1017                        .send_msg(
1018                            conn_id,
1019                            encode_op_error(
1020                                payload.command,
1021                                payload.subcmd,
1022                                ioid,
1023                                "Unknown SID",
1024                                version,
1025                                is_be,
1026                            ),
1027                        )
1028                        .await;
1029                    continue;
1030                };
1031
1032                let is_init = (payload.subcmd & 0x08) != 0;
1033
1034                match payload.command {
1035                    10 => {
1036                        // GET
1037                        let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1038                            state
1039                                .registry
1040                                .send_msg(
1041                                    conn_id,
1042                                    encode_op_error(
1043                                        payload.command,
1044                                        payload.subcmd,
1045                                        ioid,
1046                                        "PV not found",
1047                                        version,
1048                                        is_be,
1049                                    ),
1050                                )
1051                                .await;
1052                            continue;
1053                        };
1054                        if is_init {
1055                            let full_desc = nt_payload_desc(&nt);
1056                            let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1057                            let desc = match &pv_req_fields {
1058                                Some(fields) => filter_structure_desc(&full_desc, fields),
1059                                None => full_desc,
1060                            };
1061                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1062                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1063                            let resp = encode_op_init_response_desc(
1064                                payload.command,
1065                                ioid,
1066                                0x08,
1067                                &desc,
1068                                version,
1069                                is_be,
1070                            );
1071                            state.registry.send_msg(conn_id, resp).await;
1072                            info!("Conn {}: get init pv='{}' ioid={}", conn_id, pv_name, ioid);
1073                        } else {
1074                            let resp = if let Some(desc) = conn_state.ioid_to_desc.get(&ioid) {
1075                                encode_op_data_response_filtered(
1076                                    10, ioid, &nt, desc, version, is_be,
1077                                )
1078                            } else {
1079                                encode_op_get_data_response_payload(ioid, &nt, version, is_be)
1080                            };
1081                            state.registry.send_msg(conn_id, resp).await;
1082                            debug!("Conn {}: get data pv='{}' ioid={}", conn_id, pv_name, ioid);
1083                        }
1084                    }
1085                    11 => {
1086                        // PUT
1087                        if is_init {
1088                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1089                                state
1090                                    .registry
1091                                    .send_msg(
1092                                        conn_id,
1093                                        encode_op_error(
1094                                            payload.command,
1095                                            payload.subcmd,
1096                                            ioid,
1097                                            "PV not found",
1098                                            version,
1099                                            is_be,
1100                                        ),
1101                                    )
1102                                    .await;
1103                                continue;
1104                            };
1105                            if !is_virtual_event_pv(&pv_name)
1106                                && !is_writable_pv(&state, &pv_name).await
1107                            {
1108                                let resp = encode_op_put_status_response(
1109                                    ioid,
1110                                    0x08,
1111                                    "Write access denied",
1112                                    version,
1113                                    is_be,
1114                                );
1115                                state.registry.send_msg(conn_id, resp).await;
1116                                continue;
1117                            }
1118                            let desc = nt_payload_desc(&nt);
1119                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1120                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1121                            let resp = encode_op_init_response_desc(
1122                                payload.command,
1123                                ioid,
1124                                0x08,
1125                                &desc,
1126                                version,
1127                                is_be,
1128                            );
1129                            state.registry.send_msg(conn_id, resp).await;
1130                            info!("Conn {}: put init pv='{}' ioid={}", conn_id, pv_name, ioid);
1131                        } else {
1132                            if (payload.subcmd & 0x40) != 0 {
1133                                if !is_virtual_event_pv(&pv_name)
1134                                    && !is_writable_pv(&state, &pv_name).await
1135                                {
1136                                    let resp = encode_op_put_status_response(
1137                                        ioid,
1138                                        0x40,
1139                                        "Write access denied",
1140                                        version,
1141                                        is_be,
1142                                    );
1143                                    state.registry.send_msg(conn_id, resp).await;
1144                                    continue;
1145                                }
1146                                if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1147                                    let resp = encode_op_put_getput_response_payload(
1148                                        ioid, &nt, version, is_be,
1149                                    );
1150                                    state.registry.send_msg(conn_id, resp).await;
1151                                    debug!(
1152                                        "Conn {}: put get-put pv='{}' ioid={}",
1153                                        conn_id, pv_name, ioid
1154                                    );
1155                                } else {
1156                                    state
1157                                        .registry
1158                                        .send_msg(
1159                                            conn_id,
1160                                            encode_op_error(
1161                                                payload.command,
1162                                                payload.subcmd,
1163                                                ioid,
1164                                                "PV not found",
1165                                                version,
1166                                                is_be,
1167                                            ),
1168                                        )
1169                                        .await;
1170                                }
1171                                continue;
1172                            }
1173                            let desc = match conn_state.ioid_to_desc.get(&ioid) {
1174                                Some(d) => d.clone(),
1175                                None => {
1176                                    state
1177                                        .registry
1178                                        .send_msg(
1179                                            conn_id,
1180                                            encode_op_error(
1181                                                payload.command,
1182                                                payload.subcmd,
1183                                                ioid,
1184                                                "PUT without init",
1185                                                version,
1186                                                is_be,
1187                                            ),
1188                                        )
1189                                        .await;
1190                                    continue;
1191                                }
1192                            };
1193                            let decoded = decode_put_body(&payload.body, &desc, is_be);
1194                            if let Some(value) = decoded.as_ref() {
1195                                match state.store.put_value(&pv_name, value).await {
1196                                    Ok(changed) => {
1197                                        notify_changed_records(&state, changed).await;
1198                                    }
1199                                    Err(msg) => {
1200                                        let resp = encode_op_put_status_response(
1201                                            ioid,
1202                                            payload.subcmd,
1203                                            &msg,
1204                                            version,
1205                                            is_be,
1206                                        );
1207                                        state.registry.send_msg(conn_id, resp).await;
1208                                        continue;
1209                                    }
1210                                }
1211                            } else {
1212                                debug!(
1213                                    "Conn {}: put decode failed ioid={} body_len={}",
1214                                    conn_id,
1215                                    ioid,
1216                                    payload.body.len()
1217                                );
1218                                let resp = encode_op_put_status_response(
1219                                    ioid,
1220                                    payload.subcmd,
1221                                    "cannot decode PUT body",
1222                                    version,
1223                                    is_be,
1224                                );
1225                                state.registry.send_msg(conn_id, resp).await;
1226                                continue;
1227                            }
1228                            let resp = encode_op_put_response(ioid, payload.subcmd, version, is_be);
1229                            state.registry.send_msg(conn_id, resp).await;
1230                            debug!("Conn {}: put data pv='{}' ioid={}", conn_id, pv_name, ioid);
1231                        }
1232                    }
1233                    12 => {
1234                        // PUT_GET
1235                        if is_init {
1236                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1237                                state
1238                                    .registry
1239                                    .send_msg(
1240                                        conn_id,
1241                                        encode_op_error(
1242                                            payload.command,
1243                                            payload.subcmd,
1244                                            ioid,
1245                                            "PV not found",
1246                                            version,
1247                                            is_be,
1248                                        ),
1249                                    )
1250                                    .await;
1251                                continue;
1252                            };
1253                            if !is_virtual_event_pv(&pv_name)
1254                                && !is_writable_pv(&state, &pv_name).await
1255                            {
1256                                let resp = encode_op_put_get_init_error_response(
1257                                    ioid,
1258                                    "Write access denied",
1259                                    version,
1260                                    is_be,
1261                                );
1262                                state.registry.send_msg(conn_id, resp).await;
1263                                continue;
1264                            }
1265                            let desc = nt_payload_desc(&nt);
1266                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1267                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1268                            let resp =
1269                                encode_op_put_get_init_response(ioid, &desc, &desc, version, is_be);
1270                            state.registry.send_msg(conn_id, resp).await;
1271                            info!(
1272                                "Conn {}: put_get init pv='{}' ioid={}",
1273                                conn_id, pv_name, ioid
1274                            );
1275                        } else {
1276                            let desc = match conn_state.ioid_to_desc.get(&ioid) {
1277                                Some(d) => d.clone(),
1278                                None => {
1279                                    state
1280                                        .registry
1281                                        .send_msg(
1282                                            conn_id,
1283                                            encode_op_error(
1284                                                payload.command,
1285                                                payload.subcmd,
1286                                                ioid,
1287                                                "PUT_GET without init",
1288                                                version,
1289                                                is_be,
1290                                            ),
1291                                        )
1292                                        .await;
1293                                    continue;
1294                                }
1295                            };
1296                            let decoded = decode_put_body(&payload.body, &desc, is_be);
1297                            if let Some(value) = decoded.as_ref() {
1298                                match state.store.put_value(&pv_name, value).await {
1299                                    Ok(changed) => {
1300                                        notify_changed_records(&state, changed).await;
1301                                    }
1302                                    Err(msg) => {
1303                                        let resp = encode_op_put_get_data_error_response(
1304                                            ioid, &msg, version, is_be,
1305                                        );
1306                                        state.registry.send_msg(conn_id, resp).await;
1307                                        continue;
1308                                    }
1309                                }
1310                            } else {
1311                                debug!(
1312                                    "Conn {}: put_get decode failed ioid={} body_len={}",
1313                                    conn_id,
1314                                    ioid,
1315                                    payload.body.len()
1316                                );
1317                                let resp = encode_op_put_get_data_error_response(
1318                                    ioid,
1319                                    "cannot decode PUT body",
1320                                    version,
1321                                    is_be,
1322                                );
1323                                state.registry.send_msg(conn_id, resp).await;
1324                                continue;
1325                            }
1326                            if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1327                                let resp = encode_op_put_get_data_response_payload(
1328                                    ioid, &nt, version, is_be,
1329                                );
1330                                state.registry.send_msg(conn_id, resp).await;
1331                            } else {
1332                                state
1333                                    .registry
1334                                    .send_msg(
1335                                        conn_id,
1336                                        encode_op_error(
1337                                            payload.command,
1338                                            payload.subcmd,
1339                                            ioid,
1340                                            "PV not found",
1341                                            version,
1342                                            is_be,
1343                                        ),
1344                                    )
1345                                    .await;
1346                            }
1347                            debug!(
1348                                "Conn {}: put_get data pv='{}' ioid={}",
1349                                conn_id, pv_name, ioid
1350                            );
1351                        }
1352                    }
1353                    13 => {
1354                        // MONITOR
1355                        if is_init {
1356                            let Some(nt) = get_nt_snapshot(&state, &pv_name).await else {
1357                                state
1358                                    .registry
1359                                    .send_msg(
1360                                        conn_id,
1361                                        encode_op_error(
1362                                            payload.command,
1363                                            payload.subcmd,
1364                                            ioid,
1365                                            "PV not found",
1366                                            version,
1367                                            is_be,
1368                                        ),
1369                                    )
1370                                    .await;
1371                                continue;
1372                            };
1373                            let full_desc = nt_payload_desc(&nt);
1374                            let pv_req_fields = decode_pv_request_fields(&payload.body, is_be);
1375                            let desc = match &pv_req_fields {
1376                                Some(fields) => filter_structure_desc(&full_desc, fields),
1377                                None => full_desc,
1378                            };
1379                            conn_state.ioid_to_desc.insert(ioid, desc.clone());
1380                            conn_state.ioid_to_pv.insert(ioid, pv_name.clone());
1381                            let pipeline_enabled = (payload.subcmd & 0x80) != 0;
1382                            let mut nfree = 0u32;
1383                            if pipeline_enabled && payload.body.len() >= 4 {
1384                                let start = payload.body.len() - 4;
1385                                nfree = if is_be {
1386                                    u32::from_be_bytes([
1387                                        payload.body[start],
1388                                        payload.body[start + 1],
1389                                        payload.body[start + 2],
1390                                        payload.body[start + 3],
1391                                    ])
1392                                } else {
1393                                    u32::from_le_bytes([
1394                                        payload.body[start],
1395                                        payload.body[start + 1],
1396                                        payload.body[start + 2],
1397                                        payload.body[start + 3],
1398                                    ])
1399                                };
1400                            }
1401                            let resp = encode_op_init_response_desc(
1402                                payload.command,
1403                                ioid,
1404                                0x08,
1405                                &desc,
1406                                version,
1407                                is_be,
1408                            );
1409                            state.registry.send_msg(conn_id, resp).await;
1410                            conn_state.ioid_to_monitor.insert(
1411                                ioid,
1412                                MonitorState {
1413                                    running: false,
1414                                    pipeline_enabled,
1415                                    nfree,
1416                                },
1417                            );
1418                            {
1419                                let mut monitors = state.registry.monitors.lock().await;
1420                                monitors
1421                                    .entry(pv_name.clone())
1422                                    .or_default()
1423                                    .push(MonitorSub {
1424                                        conn_id,
1425                                        ioid,
1426                                        version,
1427                                        is_be,
1428                                        running: false,
1429                                        pipeline_enabled,
1430                                        nfree,
1431                                        filtered_desc: conn_state.ioid_to_desc.get(&ioid).cloned(),
1432                                    });
1433                            }
1434                            info!(
1435                                "Conn {}: monitor init pv='{}' ioid={}",
1436                                conn_id, pv_name, ioid
1437                            );
1438                        } else if (payload.subcmd & 0x10) != 0 {
1439                            // Monitor destroy
1440                            if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1441                                let resp = encode_monitor_data_response_payload(
1442                                    ioid, 0x10, &nt, version, is_be,
1443                                );
1444                                state.registry.send_msg(conn_id, resp).await;
1445                            }
1446                            state
1447                                .registry
1448                                .remove_monitor_subscription(conn_id, ioid, &pv_name)
1449                                .await;
1450                            conn_state.ioid_to_monitor.remove(&ioid);
1451                            conn_state.ioid_to_pv.remove(&ioid);
1452                            conn_state.ioid_to_desc.remove(&ioid);
1453                            info!("Conn {}: monitor end ioid={}", conn_id, ioid);
1454                        } else if (payload.subcmd & 0x04) != 0 || (payload.subcmd & 0x80) != 0 {
1455                            // Monitor start/stop/pipeline-ack
1456                            let start = (payload.subcmd & 0x44) == 0x44;
1457                            let stop = (payload.subcmd & 0x44) == 0x04;
1458                            let pipeline_ack = (payload.subcmd & 0x80) != 0;
1459                            let mut nfree = None;
1460                            if pipeline_ack && payload.body.len() >= 4 {
1461                                let v = if is_be {
1462                                    u32::from_be_bytes([
1463                                        payload.body[0],
1464                                        payload.body[1],
1465                                        payload.body[2],
1466                                        payload.body[3],
1467                                    ])
1468                                } else {
1469                                    u32::from_le_bytes([
1470                                        payload.body[0],
1471                                        payload.body[1],
1472                                        payload.body[2],
1473                                        payload.body[3],
1474                                    ])
1475                                };
1476                                nfree = Some(v);
1477                            }
1478                            let running = if start {
1479                                true
1480                            } else if stop {
1481                                false
1482                            } else {
1483                                conn_state
1484                                    .ioid_to_monitor
1485                                    .get(&ioid)
1486                                    .map(|m| m.running)
1487                                    .unwrap_or(true)
1488                            };
1489                            state
1490                                .registry
1491                                .update_monitor_subscription(
1492                                    conn_id,
1493                                    ioid,
1494                                    &pv_name,
1495                                    running,
1496                                    nfree,
1497                                    Some(pipeline_ack),
1498                                )
1499                                .await;
1500                            if let Some(mon) = conn_state.ioid_to_monitor.get_mut(&ioid) {
1501                                mon.running = running;
1502                                if pipeline_ack {
1503                                    mon.pipeline_enabled = true;
1504                                }
1505                                if let Some(v) = nfree {
1506                                    if pipeline_ack {
1507                                        mon.nfree = mon.nfree.saturating_add(v);
1508                                    } else {
1509                                        mon.nfree = v;
1510                                    }
1511                                }
1512                            }
1513                            info!(
1514                                "Conn {}: monitor {} ioid={} ack={} nfree={:?}",
1515                                conn_id,
1516                                if start {
1517                                    "start"
1518                                } else if stop {
1519                                    "stop"
1520                                } else {
1521                                    "ack"
1522                                },
1523                                ioid,
1524                                pipeline_ack,
1525                                nfree
1526                            );
1527                            if start {
1528                                if let Some(nt) = get_nt_snapshot(&state, &pv_name).await {
1529                                    state
1530                                        .registry
1531                                        .send_monitor_update_for(&pv_name, conn_id, ioid, &nt)
1532                                        .await;
1533                                }
1534                            }
1535                        }
1536                    }
1537                    20 => {
1538                        // RPC
1539                        if is_server_rpc_pv(&pv_name) {
1540                            handle_server_rpc(
1541                                &state,
1542                                conn_id,
1543                                ioid,
1544                                payload.subcmd,
1545                                version,
1546                                is_be,
1547                            )
1548                            .await;
1549                        } else {
1550                            state
1551                                .registry
1552                                .send_msg(
1553                                    conn_id,
1554                                    encode_op_error(
1555                                        payload.command,
1556                                        payload.subcmd,
1557                                        ioid,
1558                                        "Operation not supported",
1559                                        version,
1560                                        is_be,
1561                                    ),
1562                                )
1563                                .await;
1564                        }
1565                    }
1566                    14 | 16 => {
1567                        state
1568                            .registry
1569                            .send_msg(
1570                                conn_id,
1571                                encode_op_error(
1572                                    payload.command,
1573                                    payload.subcmd,
1574                                    ioid,
1575                                    "Operation not supported",
1576                                    version,
1577                                    is_be,
1578                                ),
1579                            )
1580                            .await;
1581                    }
1582                    _ => {
1583                        state
1584                            .registry
1585                            .send_msg(
1586                                conn_id,
1587                                encode_op_error(
1588                                    payload.command,
1589                                    payload.subcmd,
1590                                    ioid,
1591                                    "Operation not supported",
1592                                    version,
1593                                    is_be,
1594                                ),
1595                            )
1596                            .await;
1597                    }
1598                }
1599            }
1600            PvaPacketCommand::DestroyChannel(payload) => {
1601                let sid = payload.sid;
1602                let cid = payload.cid;
1603                conn_state.cid_to_sid.remove(&cid);
1604                conn_state.sid_to_pv.remove(&sid);
1605                info!(
1606                    "Conn {}: channel destroyed sid={} cid={}",
1607                    conn_id, sid, cid
1608                );
1609            }
1610            PvaPacketCommand::DestroyRequest(payload) => {
1611                let ioid = payload.request_id;
1612                if let Some(pv_name) = conn_state.ioid_to_pv.remove(&ioid) {
1613                    state
1614                        .registry
1615                        .remove_monitor_subscription(conn_id, ioid, &pv_name)
1616                        .await;
1617                    conn_state.ioid_to_desc.remove(&ioid);
1618                    conn_state.ioid_to_monitor.remove(&ioid);
1619                    info!("Conn {}: monitor unsubscribed ioid={}", conn_id, ioid);
1620                }
1621            }
1622            PvaPacketCommand::AuthNZ(_) => {
1623                let resp = encode_message_error("AUTHNZ command is not supported", version, is_be);
1624                state.registry.send_msg(conn_id, resp).await;
1625            }
1626            PvaPacketCommand::AclChange(_) => {
1627                let resp =
1628                    encode_message_error("ACL_CHANGE command is not supported", version, is_be);
1629                state.registry.send_msg(conn_id, resp).await;
1630            }
1631            PvaPacketCommand::GetField(payload) => {
1632                handle_get_field_request(&state, &conn_state, conn_id, payload, version, is_be)
1633                    .await;
1634            }
1635            PvaPacketCommand::Echo(payload_bytes) => {
1636                let mut resp =
1637                    encode_header(true, is_be, false, version, 2, payload_bytes.len() as u32);
1638                resp.extend_from_slice(&payload_bytes);
1639                state.registry.send_msg(conn_id, resp).await;
1640            }
1641            PvaPacketCommand::Message(_) => {
1642                let resp = encode_message_error("MESSAGE command is not supported", version, is_be);
1643                state.registry.send_msg(conn_id, resp).await;
1644            }
1645            PvaPacketCommand::MultipleData(_) => {
1646                let resp =
1647                    encode_message_error("MULTIPLE_DATA command is not supported", version, is_be);
1648                state.registry.send_msg(conn_id, resp).await;
1649            }
1650            PvaPacketCommand::CancelRequest(_) => {
1651                let resp =
1652                    encode_message_error("CANCEL_REQUEST command is not supported", version, is_be);
1653                state.registry.send_msg(conn_id, resp).await;
1654            }
1655            PvaPacketCommand::OriginTag(_) => {
1656                let resp =
1657                    encode_message_error("ORIGIN_TAG command is not supported", version, is_be);
1658                state.registry.send_msg(conn_id, resp).await;
1659            }
1660            PvaPacketCommand::Search(_)
1661            | PvaPacketCommand::SearchResponse(_)
1662            | PvaPacketCommand::Beacon(_) => {
1663                let resp =
1664                    encode_message_error("Unexpected command for server endpoint", version, is_be);
1665                state.registry.send_msg(conn_id, resp).await;
1666            }
1667            PvaPacketCommand::Unknown(payload) => {
1668                let resp = encode_message_error(
1669                    &format!("Unknown command {}", payload.command),
1670                    version,
1671                    is_be,
1672                );
1673                state.registry.send_msg(conn_id, resp).await;
1674            }
1675        }
1676    }
1677
1678    state.registry.cleanup_connection(conn_id).await;
1679    let _ = writer_task.await;
1680    Ok(())
1681}