Skip to main content

bacnet_server/
server.rs

1//! BACnetServer: builder, APDU dispatch, and lifecycle management.
2//!
3//! The server wraps a NetworkLayer behind Arc (shared with the dispatch task),
4//! owns an ObjectDatabase via Arc<Mutex>, and spawns a dispatch task that
5//! routes incoming APDUs to service handlers.
6
7use std::collections::HashMap;
8use std::net::Ipv4Addr;
9use std::sync::atomic::{AtomicU8, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{mpsc, oneshot, Mutex, RwLock, Semaphore};
15use tokio::task::JoinHandle;
16use tokio::time::Duration;
17use tracing::{debug, warn};
18
19use bacnet_encoding::apdu::{
20    self, encode_apdu, AbortPdu, Apdu, ComplexAck, ConfirmedRequest as ConfirmedRequestPdu,
21    ErrorPdu, RejectPdu, SegmentAck as SegmentAckPdu, SimpleAck,
22    UnconfirmedRequest as UnconfirmedRequestPdu,
23};
24use bacnet_encoding::primitives::encode_property_value;
25use bacnet_encoding::segmentation::{
26    max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType,
27};
28use bacnet_network::layer::NetworkLayer;
29use bacnet_objects::database::ObjectDatabase;
30use bacnet_objects::notification_class::get_notification_recipients;
31use bacnet_services::alarm_event::EventNotificationRequest;
32use bacnet_services::common::BACnetPropertyValue;
33use bacnet_services::cov::COVNotificationRequest;
34use bacnet_services::who_is::{IAmRequest, WhoIsRequest};
35use bacnet_transport::bip::BipTransport;
36use bacnet_transport::port::TransportPort;
37use bacnet_types::enums::{
38    AbortReason, ConfirmedServiceChoice, ErrorClass, ErrorCode, NetworkPriority, NotifyType,
39    ObjectType, PropertyIdentifier, RejectReason, Segmentation, UnconfirmedServiceChoice,
40};
41use bacnet_types::error::Error;
42use bacnet_types::primitives::{BACnetTimeStamp, ObjectIdentifier, PropertyValue, Time};
43use bacnet_types::MacAddr;
44
45use crate::cov::CovSubscriptionTable;
46use crate::handlers;
47
48/// Maximum number of concurrent segmented reassembly sessions.
49const MAX_SEG_RECEIVERS: usize = 128;
50
51/// Timeout for idle segmented reassembly sessions.
52const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
53
54/// Maximum negative SegmentAck retries during segmented response send.
55const MAX_NEG_SEGMENT_ACK_RETRIES: u8 = 3;
56
57/// Default number of APDU retries for confirmed COV notifications.
58const DEFAULT_APDU_RETRIES: u8 = 3;
59
60// ---------------------------------------------------------------------------
61// Server-side Transaction State Machine (TSM) for outgoing confirmed requests
62// ---------------------------------------------------------------------------
63
64/// Result of a confirmed COV notification from the subscriber's perspective.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CovAckResult {
67    /// SimpleAck received — subscriber accepted the notification.
68    Ack,
69    /// Error or Reject/Abort received — subscriber rejected the notification.
70    Error,
71}
72
73/// Lightweight TSM for tracking outgoing confirmed COV notifications.
74///
75/// The server allocates an invoke ID for each confirmed notification and the
76/// dispatch loop writes the result into a shared map when a SimpleAck, Error,
77/// Reject, or Abort is received.  The per-subscriber retry task polls the map
78/// after each timeout to decide whether to resend.
79pub struct ServerTsm {
80    next_invoke_id: u8,
81    /// Oneshot senders keyed by invoke ID. When a result arrives from the
82    /// dispatch loop, we send it directly — no polling needed.
83    pending: HashMap<u8, oneshot::Sender<CovAckResult>>,
84}
85
86impl ServerTsm {
87    fn new() -> Self {
88        Self {
89            next_invoke_id: 0,
90            pending: HashMap::new(),
91        }
92    }
93
94    /// Allocate the next invoke ID and register a oneshot channel for the result.
95    /// Returns (invoke_id, receiver).
96    fn allocate(&mut self) -> (u8, oneshot::Receiver<CovAckResult>) {
97        let id = self.next_invoke_id;
98        self.next_invoke_id = self.next_invoke_id.wrapping_add(1);
99        let (tx, rx) = oneshot::channel();
100        self.pending.insert(id, tx);
101        (id, rx)
102    }
103
104    /// Record a result from the dispatch loop (SimpleAck, Error, etc.).
105    /// Sends immediately through the oneshot channel.
106    fn record_result(&mut self, invoke_id: u8, result: CovAckResult) {
107        if let Some(tx) = self.pending.remove(&invoke_id) {
108            let _ = tx.send(result);
109        }
110    }
111
112    /// Remove a pending entry (cleanup on completion or exhaustion).
113    fn remove(&mut self, invoke_id: u8) {
114        self.pending.remove(&invoke_id);
115    }
116}
117
118/// Data from a TimeSynchronization request.
119#[derive(Debug, Clone)]
120pub struct TimeSyncData {
121    /// Raw service request bytes (caller can decode if needed).
122    pub raw_service_data: Bytes,
123    /// Whether this was a UTC time sync (vs. local).
124    pub is_utc: bool,
125}
126
127/// Server configuration.
128#[derive(Clone)]
129pub struct ServerConfig {
130    /// Local interface to bind.
131    pub interface: Ipv4Addr,
132    /// UDP port (default 0xBAC0 = 47808).
133    pub port: u16,
134    /// Directed broadcast address.
135    pub broadcast_address: Ipv4Addr,
136    /// Maximum APDU length accepted.
137    pub max_apdu_length: u32,
138    /// Segmentation support level.
139    pub segmentation_supported: Segmentation,
140    /// Vendor identifier.
141    pub vendor_id: u16,
142    /// Timeout in ms before retrying a failed confirmed COV notification send (default 3000ms).
143    pub cov_retry_timeout_ms: u64,
144    /// Optional callback invoked when a TimeSynchronization request is received.
145    pub on_time_sync: Option<Arc<dyn Fn(TimeSyncData) + Send + Sync>>,
146    /// Optional password required for DeviceCommunicationControl.
147    pub dcc_password: Option<String>,
148    /// Optional password required for ReinitializeDevice.
149    pub reinit_password: Option<String>,
150    /// Enable periodic fault detection / reliability evaluation.
151    /// When true, the server evaluates analog objects every 10 s for
152    /// OVER_RANGE / UNDER_RANGE faults.
153    pub enable_fault_detection: bool,
154}
155
156impl std::fmt::Debug for ServerConfig {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("ServerConfig")
159            .field("interface", &self.interface)
160            .field("port", &self.port)
161            .field("broadcast_address", &self.broadcast_address)
162            .field("max_apdu_length", &self.max_apdu_length)
163            .field("segmentation_supported", &self.segmentation_supported)
164            .field("vendor_id", &self.vendor_id)
165            .field("cov_retry_timeout_ms", &self.cov_retry_timeout_ms)
166            .field(
167                "on_time_sync",
168                &self.on_time_sync.as_ref().map(|_| "<callback>"),
169            )
170            .field("dcc_password", &self.dcc_password.as_ref().map(|_| "***"))
171            .field(
172                "reinit_password",
173                &self.reinit_password.as_ref().map(|_| "***"),
174            )
175            .field("enable_fault_detection", &self.enable_fault_detection)
176            .finish()
177    }
178}
179
180impl Default for ServerConfig {
181    fn default() -> Self {
182        Self {
183            interface: Ipv4Addr::UNSPECIFIED,
184            port: 0xBAC0,
185            broadcast_address: Ipv4Addr::BROADCAST,
186            max_apdu_length: 1476,
187            segmentation_supported: Segmentation::NONE,
188            vendor_id: 0,
189            cov_retry_timeout_ms: 3000,
190            on_time_sync: None,
191            dcc_password: None,
192            reinit_password: None,
193            enable_fault_detection: false,
194        }
195    }
196}
197
198/// Generic builder for BACnetServer with a pre-built transport.
199pub struct ServerBuilder<T: TransportPort> {
200    config: ServerConfig,
201    db: ObjectDatabase,
202    transport: Option<T>,
203}
204
205impl<T: TransportPort + 'static> ServerBuilder<T> {
206    /// Set the object database (transfers ownership).
207    pub fn database(mut self, db: ObjectDatabase) -> Self {
208        self.db = db;
209        self
210    }
211
212    /// Set the pre-built transport.
213    pub fn transport(mut self, transport: T) -> Self {
214        self.transport = Some(transport);
215        self
216    }
217
218    /// Set the password required for DeviceCommunicationControl requests.
219    pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
220        self.config.dcc_password = Some(password.into());
221        self
222    }
223
224    /// Set the password required for ReinitializeDevice requests.
225    pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
226        self.config.reinit_password = Some(password.into());
227        self
228    }
229
230    /// Enable periodic fault detection / reliability evaluation.
231    pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
232        self.config.enable_fault_detection = enabled;
233        self
234    }
235
236    /// Set the vendor identifier (used in IAm responses and protocol operations).
237    pub fn vendor_id(mut self, id: u16) -> Self {
238        self.config.vendor_id = id;
239        self
240    }
241
242    /// Build and start the server.
243    pub async fn build(self) -> Result<BACnetServer<T>, Error> {
244        let transport = self
245            .transport
246            .ok_or_else(|| Error::Encoding("transport not set on ServerBuilder".into()))?;
247        BACnetServer::start(self.config, self.db, transport).await
248    }
249}
250
251/// BIP-specific builder that constructs `BipTransport` from interface/port/broadcast fields.
252pub struct BipServerBuilder {
253    config: ServerConfig,
254    db: ObjectDatabase,
255}
256
257impl BipServerBuilder {
258    /// Set the local interface IP.
259    pub fn interface(mut self, ip: Ipv4Addr) -> Self {
260        self.config.interface = ip;
261        self
262    }
263
264    /// Set the UDP port.
265    pub fn port(mut self, port: u16) -> Self {
266        self.config.port = port;
267        self
268    }
269
270    /// Set the directed broadcast address.
271    pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
272        self.config.broadcast_address = addr;
273        self
274    }
275
276    /// Set the object database (transfers ownership).
277    pub fn database(mut self, db: ObjectDatabase) -> Self {
278        self.db = db;
279        self
280    }
281
282    /// Set the password required for DeviceCommunicationControl requests.
283    pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
284        self.config.dcc_password = Some(password.into());
285        self
286    }
287
288    /// Set the password required for ReinitializeDevice requests.
289    pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
290        self.config.reinit_password = Some(password.into());
291        self
292    }
293
294    /// Enable periodic fault detection / reliability evaluation.
295    pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
296        self.config.enable_fault_detection = enabled;
297        self
298    }
299
300    /// Build and start the server, constructing a BipTransport from the config.
301    pub async fn build(self) -> Result<BACnetServer<BipTransport>, Error> {
302        let transport = BipTransport::new(
303            self.config.interface,
304            self.config.port,
305            self.config.broadcast_address,
306        );
307        BACnetServer::start(self.config, self.db, transport).await
308    }
309}
310
311/// Key for tracking in-progress segmented sends: (source_mac, invoke_id).
312type SegKey = (MacAddr, u8);
313
314/// BACnet server with APDU dispatch and service handling.
315pub struct BACnetServer<T: TransportPort> {
316    #[allow(dead_code)]
317    config: ServerConfig,
318    /// Shared network layer (also held by dispatch task).
319    #[allow(dead_code)]
320    network: Arc<NetworkLayer<T>>,
321    /// Shared object database.
322    db: Arc<RwLock<ObjectDatabase>>,
323    /// COV subscription table (held to keep Arc alive for dispatch task).
324    #[allow(dead_code)]
325    cov_table: Arc<RwLock<CovSubscriptionTable>>,
326    /// Channels for routing SegmentAck PDUs to in-progress segmented sends.
327    #[allow(dead_code)]
328    seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
329    /// Semaphore that caps confirmed COV notifications at 255 in-flight
330    /// to prevent invoke ID reuse (invoke IDs are u8 = 0..255).
331    #[allow(dead_code)]
332    cov_in_flight: Arc<Semaphore>,
333    /// Server-side TSM for outgoing confirmed COV notifications.
334    #[allow(dead_code)]
335    server_tsm: Arc<Mutex<ServerTsm>>,
336    /// Communication state: 0 = Enable, 1 = Disable, 2 = DisableInitiation.
337    comm_state: Arc<AtomicU8>,
338    /// Handle for the DCC auto-re-enable timer. A new DCC request aborts
339    /// any previous timer.
340    #[allow(dead_code)]
341    dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>>,
342    dispatch_task: Option<JoinHandle<()>>,
343    cov_purge_task: Option<JoinHandle<()>>,
344    fault_detection_task: Option<JoinHandle<()>>,
345    event_enrollment_task: Option<JoinHandle<()>>,
346    trend_log_task: Option<JoinHandle<()>>,
347    schedule_tick_task: Option<JoinHandle<()>>,
348    local_mac: MacAddr,
349}
350
351impl BACnetServer<BipTransport> {
352    /// Create a BIP-specific builder with interface/port/broadcast fields.
353    pub fn bip_builder() -> BipServerBuilder {
354        BipServerBuilder {
355            config: ServerConfig::default(),
356            db: ObjectDatabase::new(),
357        }
358    }
359
360    /// Create a BIP-specific builder (alias for backward compatibility).
361    pub fn builder() -> BipServerBuilder {
362        Self::bip_builder()
363    }
364}
365
366#[cfg(feature = "sc-tls")]
367impl BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
368    /// Create an SC-specific builder that connects to a BACnet/SC hub.
369    pub fn sc_builder() -> ScServerBuilder {
370        ScServerBuilder {
371            config: ServerConfig::default(),
372            db: ObjectDatabase::new(),
373            hub_url: String::new(),
374            tls_config: None,
375            vmac: [0; 6],
376            heartbeat_interval_ms: 30_000,
377            heartbeat_timeout_ms: 60_000,
378            reconnect: None,
379        }
380    }
381}
382
383/// SC-specific server builder.
384///
385/// Created by [`BACnetServer::sc_builder()`].  Requires the `sc-tls` feature.
386#[cfg(feature = "sc-tls")]
387pub struct ScServerBuilder {
388    config: ServerConfig,
389    db: ObjectDatabase,
390    hub_url: String,
391    tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
392    vmac: bacnet_transport::sc_frame::Vmac,
393    heartbeat_interval_ms: u64,
394    heartbeat_timeout_ms: u64,
395    reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
396}
397
398#[cfg(feature = "sc-tls")]
399impl ScServerBuilder {
400    /// Set the hub WebSocket URL (e.g. `wss://hub.example.com/bacnet`).
401    pub fn hub_url(mut self, url: &str) -> Self {
402        self.hub_url = url.to_string();
403        self
404    }
405
406    /// Set the TLS client configuration.
407    pub fn tls_config(
408        mut self,
409        config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
410    ) -> Self {
411        self.tls_config = Some(config);
412        self
413    }
414
415    /// Set the local VMAC address.
416    pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
417        self.vmac = vmac;
418        self
419    }
420
421    /// Set the object database (transfers ownership).
422    pub fn database(mut self, db: ObjectDatabase) -> Self {
423        self.db = db;
424        self
425    }
426
427    /// Set the heartbeat interval in milliseconds (default 30 000).
428    pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
429        self.heartbeat_interval_ms = ms;
430        self
431    }
432
433    /// Set the heartbeat timeout in milliseconds (default 60 000).
434    pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
435        self.heartbeat_timeout_ms = ms;
436        self
437    }
438
439    /// Enable automatic reconnection with the given configuration.
440    pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
441        self.reconnect = Some(config);
442        self
443    }
444
445    /// Set the password required for DeviceCommunicationControl requests.
446    pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
447        self.config.dcc_password = Some(password.into());
448        self
449    }
450
451    /// Set the password required for ReinitializeDevice requests.
452    pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
453        self.config.reinit_password = Some(password.into());
454        self
455    }
456
457    /// Enable periodic fault detection / reliability evaluation.
458    pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
459        self.config.enable_fault_detection = enabled;
460        self
461    }
462
463    /// Connect to the hub and start the server.
464    pub async fn build(
465        self,
466    ) -> Result<
467        BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
468        Error,
469    > {
470        let tls_config = self
471            .tls_config
472            .ok_or_else(|| Error::Encoding("SC server builder: tls_config is required".into()))?;
473
474        let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
475
476        let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
477            .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
478            .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
479        if let Some(rc) = self.reconnect {
480            #[allow(deprecated)]
481            {
482                transport = transport.with_reconnect(rc);
483            }
484        }
485
486        BACnetServer::start(self.config, self.db, transport).await
487    }
488}
489
490impl<T: TransportPort + 'static> BACnetServer<T> {
491    /// Create a generic builder that accepts a pre-built transport.
492    pub fn generic_builder() -> ServerBuilder<T> {
493        ServerBuilder {
494            config: ServerConfig::default(),
495            db: ObjectDatabase::new(),
496            transport: None,
497        }
498    }
499
500    /// Start the server with a pre-built transport.
501    pub async fn start(
502        mut config: ServerConfig,
503        db: ObjectDatabase,
504        transport: T,
505    ) -> Result<Self, Error> {
506        let transport_max = transport.max_apdu_length() as u32;
507        config.max_apdu_length = config.max_apdu_length.min(transport_max);
508
509        if config.vendor_id == 0 {
510            warn!("vendor_id is 0 (ASHRAE reserved); set a valid vendor ID for production use");
511        }
512
513        let mut network = NetworkLayer::new(transport);
514        let apdu_rx = network.start().await?;
515        let local_mac = MacAddr::from_slice(network.local_mac());
516
517        let network = Arc::new(network);
518        let db = Arc::new(RwLock::new(db));
519        let cov_table = Arc::new(RwLock::new(CovSubscriptionTable::new()));
520        let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
521            Arc::new(Mutex::new(HashMap::new()));
522
523        let cov_in_flight = Arc::new(Semaphore::new(255));
524        let server_tsm = Arc::new(Mutex::new(ServerTsm::new()));
525        let comm_state = Arc::new(AtomicU8::new(0)); // 0 = Enable (default)
526        let dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
527
528        let network_dispatch = Arc::clone(&network);
529        let db_dispatch = Arc::clone(&db);
530        let cov_dispatch = Arc::clone(&cov_table);
531        let seg_ack_dispatch = Arc::clone(&seg_ack_senders);
532        let cov_in_flight_dispatch = Arc::clone(&cov_in_flight);
533        let server_tsm_dispatch = Arc::clone(&server_tsm);
534        let comm_state_dispatch = Arc::clone(&comm_state);
535        let dcc_timer_dispatch = Arc::clone(&dcc_timer);
536        let config_dispatch = Arc::new(config.clone());
537
538        let dispatch_task = tokio::spawn(async move {
539            let mut apdu_rx = apdu_rx;
540            let mut seg_receivers: HashMap<
541                SegKey,
542                (
543                    SegmentReceiver,
544                    bacnet_encoding::apdu::ConfirmedRequest,
545                    Instant,
546                ),
547            > = HashMap::new();
548
549            while let Some(received) = apdu_rx.recv().await {
550                let now = Instant::now();
551                seg_receivers.retain(|_key, (_rx, _req, last_activity)| {
552                    now.duration_since(*last_activity) < SEG_RECEIVER_TIMEOUT
553                });
554
555                match apdu::decode_apdu(received.apdu.clone()) {
556                    Ok(decoded) => {
557                        let source_mac = received.source_mac.clone();
558                        let mut received = Some(received);
559                        let handled = if let Apdu::ConfirmedRequest(ref req) = decoded {
560                            if req.segmented {
561                                let seq = req.sequence_number.unwrap_or(0);
562                                let key: SegKey = (source_mac.clone(), req.invoke_id);
563
564                                if seq == 0 {
565                                    // Reject if too many concurrent segmented sessions
566                                    if seg_receivers.len() >= MAX_SEG_RECEIVERS {
567                                        let abort_pdu = Apdu::Abort(AbortPdu {
568                                            sent_by_server: true,
569                                            invoke_id: req.invoke_id,
570                                            abort_reason: AbortReason::BUFFER_OVERFLOW,
571                                        });
572                                        let mut abort_buf = BytesMut::new();
573                                        encode_apdu(&mut abort_buf, &abort_pdu);
574                                        let _ = network_dispatch
575                                            .send_apdu(
576                                                &abort_buf,
577                                                &source_mac,
578                                                false,
579                                                NetworkPriority::NORMAL,
580                                            )
581                                            .await;
582                                        continue;
583                                    }
584                                    let mut receiver = SegmentReceiver::new();
585                                    if let Err(e) =
586                                        receiver.receive(seq, req.service_request.clone())
587                                    {
588                                        warn!(error = %e, "Rejecting oversized segment");
589                                        continue;
590                                    }
591                                    seg_receivers.insert(
592                                        key.clone(),
593                                        (receiver, req.clone(), Instant::now()),
594                                    );
595                                } else if let Some((receiver, _, last_activity)) =
596                                    seg_receivers.get_mut(&key)
597                                {
598                                    if let Err(e) =
599                                        receiver.receive(seq, req.service_request.clone())
600                                    {
601                                        warn!(error = %e, "Rejecting oversized segment");
602                                        continue;
603                                    }
604                                    *last_activity = Instant::now();
605                                } else {
606                                    warn!(
607                                        invoke_id = req.invoke_id,
608                                        seq = seq,
609                                        "Received non-initial segment without \
610                                         prior segment 0, aborting"
611                                    );
612                                    let abort_pdu = Apdu::Abort(AbortPdu {
613                                        sent_by_server: true,
614                                        invoke_id: req.invoke_id,
615                                        abort_reason: AbortReason::INVALID_APDU_IN_THIS_STATE,
616                                    });
617                                    let mut abort_buf = BytesMut::new();
618                                    encode_apdu(&mut abort_buf, &abort_pdu);
619                                    let _ = network_dispatch
620                                        .send_apdu(
621                                            &abort_buf,
622                                            &source_mac,
623                                            false,
624                                            NetworkPriority::NORMAL,
625                                        )
626                                        .await;
627                                    continue;
628                                }
629
630                                let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
631                                    negative_ack: false,
632                                    sent_by_server: true,
633                                    invoke_id: req.invoke_id,
634                                    sequence_number: seq,
635                                    actual_window_size: 1,
636                                });
637                                let mut ack_buf = BytesMut::new();
638                                encode_apdu(&mut ack_buf, &seg_ack);
639                                if let Err(e) = network_dispatch
640                                    .send_apdu(
641                                        &ack_buf,
642                                        &source_mac,
643                                        false,
644                                        NetworkPriority::NORMAL,
645                                    )
646                                    .await
647                                {
648                                    warn!(
649                                        error = %e,
650                                        "Failed to send SegmentAck for \
651                                         segmented request"
652                                    );
653                                }
654
655                                if !req.more_follows {
656                                    if let Some((receiver, first_req, _)) =
657                                        seg_receivers.remove(&key)
658                                    {
659                                        let total = receiver.received_count();
660                                        match receiver.reassemble(total) {
661                                            Ok(full_data) => {
662                                                let reassembled =
663                                                    bacnet_encoding::apdu::ConfirmedRequest {
664                                                        segmented: false,
665                                                        more_follows: false,
666                                                        sequence_number: None,
667                                                        proposed_window_size: None,
668                                                        service_request: Bytes::from(full_data),
669                                                        invoke_id: first_req.invoke_id,
670                                                        service_choice: first_req.service_choice,
671                                                        max_apdu_length: first_req.max_apdu_length,
672                                                        segmented_response_accepted: first_req
673                                                            .segmented_response_accepted,
674                                                        max_segments: first_req.max_segments,
675                                                    };
676                                                debug!(
677                                                    invoke_id = reassembled.invoke_id,
678                                                    segments = total,
679                                                    payload_len = reassembled.service_request.len(),
680                                                    "Reassembled segmented ConfirmedRequest"
681                                                );
682                                                Self::dispatch(
683                                                    &db_dispatch,
684                                                    &network_dispatch,
685                                                    &cov_dispatch,
686                                                    &seg_ack_dispatch,
687                                                    &cov_in_flight_dispatch,
688                                                    &server_tsm_dispatch,
689                                                    &comm_state_dispatch,
690                                                    &dcc_timer_dispatch,
691                                                    &config_dispatch,
692                                                    &source_mac,
693                                                    Apdu::ConfirmedRequest(reassembled),
694                                                    received
695                                                        .take()
696                                                        .unwrap_or_else(|| {
697                                                        warn!("received consumed twice — using empty fallback");
698                                                        bacnet_network::layer::ReceivedApdu {
699                                                            apdu: bytes::Bytes::new(),
700                                                            source_mac: bacnet_types::MacAddr::new(),
701                                                            source_network: None,
702                                                            reply_tx: None,
703                                                        }
704                                                    }),
705                                                )
706                                                .await;
707                                            }
708                                            Err(e) => {
709                                                warn!(
710                                                    error = %e,
711                                                    "Failed to reassemble \
712                                                     segmented request"
713                                                );
714                                            }
715                                        }
716                                    }
717                                }
718
719                                true
720                            } else {
721                                false
722                            }
723                        } else {
724                            false
725                        };
726
727                        if !handled {
728                            Self::dispatch(
729                                &db_dispatch,
730                                &network_dispatch,
731                                &cov_dispatch,
732                                &seg_ack_dispatch,
733                                &cov_in_flight_dispatch,
734                                &server_tsm_dispatch,
735                                &comm_state_dispatch,
736                                &dcc_timer_dispatch,
737                                &config_dispatch,
738                                &source_mac,
739                                decoded,
740                                received.take().unwrap_or_else(|| {
741                                    warn!("received consumed twice — using empty fallback");
742                                    bacnet_network::layer::ReceivedApdu {
743                                        apdu: bytes::Bytes::new(),
744                                        source_mac: bacnet_types::MacAddr::new(),
745                                        source_network: None,
746                                        reply_tx: None,
747                                    }
748                                }),
749                            )
750                            .await;
751                        }
752                    }
753                    Err(e) => {
754                        warn!(error = %e, "Server failed to decode received APDU");
755                    }
756                }
757            }
758        });
759
760        let cov_table_for_purge = Arc::clone(&cov_table);
761        let cov_purge_task = tokio::spawn(async move {
762            let mut interval = tokio::time::interval(Duration::from_secs(30));
763            loop {
764                interval.tick().await;
765                let mut table = cov_table_for_purge.write().await;
766                let purged = table.purge_expired();
767                if purged > 0 {
768                    debug!(purged, "Purged expired COV subscriptions");
769                }
770            }
771        });
772
773        let fault_detection_task = if config.enable_fault_detection {
774            let db_fault = Arc::clone(&db);
775            Some(tokio::spawn(async move {
776                let detector = crate::fault_detection::FaultDetector::default();
777                let mut interval = tokio::time::interval(Duration::from_secs(10));
778                loop {
779                    interval.tick().await;
780                    let mut db_guard = db_fault.write().await;
781                    let changes = detector.evaluate(&mut db_guard);
782                    for change in &changes {
783                        debug!(
784                            object = %change.object_id,
785                            old = change.old_reliability,
786                            new = change.new_reliability,
787                            "Fault detection: reliability changed"
788                        );
789                    }
790                }
791            }))
792        } else {
793            None
794        };
795
796        let event_enrollment_task = if config.enable_fault_detection {
797            let db_ee = Arc::clone(&db);
798            Some(tokio::spawn(async move {
799                let mut interval = tokio::time::interval(Duration::from_secs(10));
800                loop {
801                    interval.tick().await;
802                    let mut db_guard = db_ee.write().await;
803                    let transitions =
804                        crate::event_enrollment::evaluate_event_enrollments(&mut db_guard);
805                    for t in &transitions {
806                        debug!(
807                            enrollment = %t.enrollment_oid,
808                            monitored = %t.monitored_oid,
809                            from = ?t.change.from,
810                            to = ?t.change.to,
811                            "Event enrollment: state changed"
812                        );
813                    }
814                }
815            }))
816        } else {
817            None
818        };
819
820        let db_trend = Arc::clone(&db);
821        let trend_log_state: crate::trend_log::TrendLogState =
822            Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()));
823        let trend_log_task = Some(tokio::spawn(async move {
824            let mut interval = tokio::time::interval(Duration::from_secs(1));
825            loop {
826                interval.tick().await;
827                crate::trend_log::poll_trend_logs(&db_trend, &trend_log_state).await;
828            }
829        }));
830
831        let db_schedule = Arc::clone(&db);
832        let schedule_tick_task = Some(tokio::spawn(async move {
833            let mut interval = tokio::time::interval(Duration::from_secs(60));
834            loop {
835                interval.tick().await;
836                // TODO: Read UTC_Offset from Device object for local time
837                crate::schedule::tick_schedules(&db_schedule, 0).await;
838            }
839        }));
840
841        Ok(Self {
842            config,
843            network,
844            db,
845            cov_table,
846            seg_ack_senders,
847            cov_in_flight,
848            server_tsm,
849            comm_state,
850            dcc_timer,
851            dispatch_task: Some(dispatch_task),
852            cov_purge_task: Some(cov_purge_task),
853            fault_detection_task,
854            event_enrollment_task,
855            trend_log_task,
856            schedule_tick_task,
857            local_mac,
858        })
859    }
860
861    /// Get the server's local MAC address.
862    pub fn local_mac(&self) -> &[u8] {
863        &self.local_mac
864    }
865
866    /// Get a reference to the shared object database.
867    pub fn database(&self) -> &Arc<RwLock<ObjectDatabase>> {
868        &self.db
869    }
870
871    /// Get the communication state per DeviceCommunicationControl.
872    ///
873    /// Returns 0 (Enable), 1 (Disable), or 2 (DisableInitiation).
874    pub fn comm_state(&self) -> u8 {
875        self.comm_state.load(Ordering::Acquire)
876    }
877
878    /// Generate a PICS document from the current object database and server configuration.
879    ///
880    /// The caller must supply a [`PicsConfig`] for fields not available from the server
881    /// (vendor name, model, firmware revision, etc.).
882    pub async fn generate_pics(&self, pics_config: &crate::pics::PicsConfig) -> crate::pics::Pics {
883        let db = self.db.read().await;
884        crate::pics::PicsGenerator::new(&db, &self.config, pics_config).generate()
885    }
886
887    /// Stop the server.
888    pub async fn stop(&mut self) -> Result<(), Error> {
889        if let Some(task) = self.fault_detection_task.take() {
890            task.abort();
891            let _ = task.await;
892        }
893        if let Some(task) = self.event_enrollment_task.take() {
894            task.abort();
895            let _ = task.await;
896        }
897        if let Some(task) = self.trend_log_task.take() {
898            task.abort();
899            let _ = task.await;
900        }
901        if let Some(task) = self.schedule_tick_task.take() {
902            task.abort();
903            let _ = task.await;
904        }
905        if let Some(task) = self.cov_purge_task.take() {
906            task.abort();
907            let _ = task.await;
908        }
909        if let Some(task) = self.dispatch_task.take() {
910            task.abort();
911            let _ = task.await;
912        }
913        Ok(())
914    }
915
916    /// Dispatch a received APDU.
917    ///
918    /// ConfirmedRequest and UnconfirmedRequest handlers are spawned as
919    /// independent tasks so the dispatch loop can immediately process the
920    /// next incoming APDU.  This allows the server to handle multiple
921    /// client requests concurrently (e.g. concurrent ReadProperty via
922    /// the RwLock on ObjectDatabase).
923    ///
924    /// Fast-path APDU types (SimpleAck, Error, Reject, Abort, SegmentAck)
925    /// remain inline since they are sub-microsecond TSM lookups.
926    #[allow(clippy::too_many_arguments)]
927    async fn dispatch(
928        db: &Arc<RwLock<ObjectDatabase>>,
929        network: &Arc<NetworkLayer<T>>,
930        cov_table: &Arc<RwLock<CovSubscriptionTable>>,
931        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
932        cov_in_flight: &Arc<Semaphore>,
933        server_tsm: &Arc<Mutex<ServerTsm>>,
934        comm_state: &Arc<AtomicU8>,
935        dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
936        config: &Arc<ServerConfig>,
937        source_mac: &[u8],
938        apdu: Apdu,
939        mut received: bacnet_network::layer::ReceivedApdu,
940    ) {
941        match apdu {
942            Apdu::ConfirmedRequest(req) => {
943                let reply_tx = received.reply_tx.take();
944                let db = Arc::clone(db);
945                let network = Arc::clone(network);
946                let cov_table = Arc::clone(cov_table);
947                let seg_ack_senders = Arc::clone(seg_ack_senders);
948                let cov_in_flight = Arc::clone(cov_in_flight);
949                let server_tsm = Arc::clone(server_tsm);
950                let comm_state = Arc::clone(comm_state);
951                let dcc_timer = Arc::clone(dcc_timer);
952                let config = Arc::clone(config);
953                let source_mac = MacAddr::from_slice(source_mac);
954                tokio::spawn(async move {
955                    Self::handle_confirmed_request(
956                        &db,
957                        &network,
958                        &cov_table,
959                        &seg_ack_senders,
960                        &cov_in_flight,
961                        &server_tsm,
962                        &comm_state,
963                        &dcc_timer,
964                        &config,
965                        &source_mac,
966                        req,
967                        reply_tx,
968                    )
969                    .await;
970                });
971            }
972            Apdu::UnconfirmedRequest(req) => {
973                let db = Arc::clone(db);
974                let network = Arc::clone(network);
975                let config = Arc::clone(config);
976                let comm_state = Arc::clone(comm_state);
977                tokio::spawn(async move {
978                    Self::handle_unconfirmed_request(
979                        &db,
980                        &network,
981                        &config,
982                        &comm_state,
983                        req,
984                        &received,
985                    )
986                    .await;
987                });
988            }
989            // Fast paths — remain inline (sub-microsecond TSM lookups)
990            Apdu::SimpleAck(sa) => {
991                let mut tsm = server_tsm.lock().await;
992                tsm.record_result(sa.invoke_id, CovAckResult::Ack);
993                debug!(
994                    invoke_id = sa.invoke_id,
995                    "SimpleAck received for outgoing confirmed notification"
996                );
997            }
998            Apdu::Error(err) => {
999                let mut tsm = server_tsm.lock().await;
1000                tsm.record_result(err.invoke_id, CovAckResult::Error);
1001                debug!(
1002                    invoke_id = err.invoke_id,
1003                    error_class = err.error_class.to_raw(),
1004                    error_code = err.error_code.to_raw(),
1005                    "Error received for outgoing confirmed notification"
1006                );
1007            }
1008            Apdu::Reject(rej) => {
1009                let mut tsm = server_tsm.lock().await;
1010                tsm.record_result(rej.invoke_id, CovAckResult::Error);
1011                debug!(
1012                    invoke_id = rej.invoke_id,
1013                    "Reject received for outgoing confirmed notification"
1014                );
1015            }
1016            Apdu::Abort(abort) if !abort.sent_by_server => {
1017                let mut tsm = server_tsm.lock().await;
1018                tsm.record_result(abort.invoke_id, CovAckResult::Error);
1019                debug!(
1020                    invoke_id = abort.invoke_id,
1021                    "Abort received for outgoing confirmed notification"
1022                );
1023            }
1024            Apdu::SegmentAck(sa) => {
1025                let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
1026                let senders = seg_ack_senders.lock().await;
1027                if let Some(tx) = senders.get(&key) {
1028                    let _ = tx.try_send(sa);
1029                } else {
1030                    debug!(
1031                        invoke_id = sa.invoke_id,
1032                        "Server ignoring SegmentAck for unknown transaction"
1033                    );
1034                }
1035            }
1036            _ => {
1037                debug!("Server ignoring unhandled APDU type");
1038            }
1039        }
1040    }
1041
1042    /// Handle a confirmed request.
1043    #[allow(clippy::too_many_arguments)]
1044    async fn handle_confirmed_request(
1045        db: &Arc<RwLock<ObjectDatabase>>,
1046        network: &Arc<NetworkLayer<T>>,
1047        cov_table: &Arc<RwLock<CovSubscriptionTable>>,
1048        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1049        cov_in_flight: &Arc<Semaphore>,
1050        server_tsm: &Arc<Mutex<ServerTsm>>,
1051        comm_state: &Arc<AtomicU8>,
1052        dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
1053        config: &ServerConfig,
1054        source_mac: &[u8],
1055        req: bacnet_encoding::apdu::ConfirmedRequest,
1056        reply_tx: Option<tokio::sync::oneshot::Sender<Bytes>>,
1057    ) {
1058        let invoke_id = req.invoke_id;
1059        let service_choice = req.service_choice;
1060        let client_max_apdu = req.max_apdu_length;
1061        let client_accepts_segmented = req.segmented_response_accepted;
1062        let mut written_oids: Vec<ObjectIdentifier> = Vec::new();
1063
1064        let state = comm_state.load(Ordering::Acquire);
1065        if state == 1
1066            && service_choice != ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL
1067            && service_choice != ConfirmedServiceChoice::REINITIALIZE_DEVICE
1068        {
1069            debug!(
1070                service = service_choice.to_raw(),
1071                "DCC DISABLE: dropping confirmed request"
1072            );
1073            return;
1074        }
1075
1076        let complex_ack = |ack_buf: BytesMut| -> Apdu {
1077            Apdu::ComplexAck(ComplexAck {
1078                segmented: false,
1079                more_follows: false,
1080                invoke_id,
1081                sequence_number: None,
1082                proposed_window_size: None,
1083                service_choice,
1084                service_ack: ack_buf.freeze(),
1085            })
1086        };
1087        let simple_ack = || -> Apdu {
1088            Apdu::SimpleAck(SimpleAck {
1089                invoke_id,
1090                service_choice,
1091            })
1092        };
1093
1094        let mut ack_buf = BytesMut::with_capacity(512);
1095        let response = match service_choice {
1096            s if s == ConfirmedServiceChoice::READ_PROPERTY => {
1097                let db = db.read().await;
1098                match handlers::handle_read_property(&db, &req.service_request, &mut ack_buf) {
1099                    Ok(()) => complex_ack(ack_buf),
1100                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1101                }
1102            }
1103            s if s == ConfirmedServiceChoice::WRITE_PROPERTY => {
1104                let result = {
1105                    let mut db = db.write().await;
1106                    handlers::handle_write_property(&mut db, &req.service_request)
1107                };
1108                match result {
1109                    Ok(oid) => {
1110                        written_oids.push(oid);
1111                        simple_ack()
1112                    }
1113                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1114                }
1115            }
1116            s if s == ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE => {
1117                let db = db.read().await;
1118                match handlers::handle_read_property_multiple(
1119                    &db,
1120                    &req.service_request,
1121                    &mut ack_buf,
1122                ) {
1123                    Ok(()) => complex_ack(ack_buf),
1124                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1125                }
1126            }
1127            s if s == ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE => {
1128                let result = {
1129                    let mut db = db.write().await;
1130                    handlers::handle_write_property_multiple(&mut db, &req.service_request)
1131                };
1132                match result {
1133                    Ok(oids) => {
1134                        written_oids = oids;
1135                        simple_ack()
1136                    }
1137                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1138                }
1139            }
1140            s if s == ConfirmedServiceChoice::SUBSCRIBE_COV => {
1141                let db = db.read().await;
1142                let mut table = cov_table.write().await;
1143                match handlers::handle_subscribe_cov(
1144                    &mut table,
1145                    &db,
1146                    source_mac,
1147                    &req.service_request,
1148                ) {
1149                    Ok(()) => simple_ack(),
1150                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1151                }
1152            }
1153            s if s == ConfirmedServiceChoice::SUBSCRIBE_COV_PROPERTY => {
1154                let db = db.read().await;
1155                let mut table = cov_table.write().await;
1156                match handlers::handle_subscribe_cov_property(
1157                    &mut table,
1158                    &db,
1159                    source_mac,
1160                    &req.service_request,
1161                ) {
1162                    Ok(()) => simple_ack(),
1163                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1164                }
1165            }
1166            s if s == ConfirmedServiceChoice::CREATE_OBJECT => {
1167                let result = {
1168                    let mut db = db.write().await;
1169                    handlers::handle_create_object(&mut db, &req.service_request, &mut ack_buf)
1170                };
1171                match result {
1172                    Ok(()) => complex_ack(ack_buf),
1173                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1174                }
1175            }
1176            s if s == ConfirmedServiceChoice::DELETE_OBJECT => {
1177                let deleted_oid =
1178                    bacnet_services::object_mgmt::DeleteObjectRequest::decode(&req.service_request)
1179                        .ok()
1180                        .map(|r| r.object_identifier);
1181
1182                let result = {
1183                    let mut db = db.write().await;
1184                    handlers::handle_delete_object(&mut db, &req.service_request)
1185                };
1186                match result {
1187                    Ok(()) => {
1188                        // Clean up COV subscriptions for the deleted object
1189                        if let Some(oid) = deleted_oid {
1190                            let mut table = cov_table.write().await;
1191                            table.remove_for_object(oid);
1192                        }
1193                        simple_ack()
1194                    }
1195                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1196                }
1197            }
1198            s if s == ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL => {
1199                match handlers::handle_device_communication_control(
1200                    &req.service_request,
1201                    comm_state,
1202                    &config.dcc_password,
1203                ) {
1204                    Ok((_state, duration)) => {
1205                        if let Some(prev) = dcc_timer.lock().await.take() {
1206                            prev.abort();
1207                        }
1208                        if let Some(minutes) = duration {
1209                            let comm = Arc::clone(comm_state);
1210                            let handle = tokio::spawn(async move {
1211                                tokio::time::sleep(std::time::Duration::from_secs(
1212                                    minutes as u64 * 60,
1213                                ))
1214                                .await;
1215                                comm.store(0, Ordering::Release);
1216                                tracing::debug!(
1217                                    "DCC timer expired after {} min, state reverted to ENABLE",
1218                                    minutes
1219                                );
1220                            });
1221                            *dcc_timer.lock().await = Some(handle);
1222                        }
1223                        simple_ack()
1224                    }
1225                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1226                }
1227            }
1228            s if s == ConfirmedServiceChoice::REINITIALIZE_DEVICE => {
1229                match handlers::handle_reinitialize_device(
1230                    &req.service_request,
1231                    &config.reinit_password,
1232                ) {
1233                    Ok(()) => simple_ack(),
1234                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1235                }
1236            }
1237            s if s == ConfirmedServiceChoice::GET_EVENT_INFORMATION => {
1238                let db = db.read().await;
1239                match handlers::handle_get_event_information(
1240                    &db,
1241                    &req.service_request,
1242                    &mut ack_buf,
1243                ) {
1244                    Ok(()) => complex_ack(ack_buf),
1245                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1246                }
1247            }
1248            s if s == ConfirmedServiceChoice::ACKNOWLEDGE_ALARM => {
1249                let mut db = db.write().await;
1250                match handlers::handle_acknowledge_alarm(&mut db, &req.service_request) {
1251                    Ok(()) => simple_ack(),
1252                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1253                }
1254            }
1255            s if s == ConfirmedServiceChoice::READ_RANGE => {
1256                let db = db.read().await;
1257                match handlers::handle_read_range(&db, &req.service_request, &mut ack_buf) {
1258                    Ok(()) => complex_ack(ack_buf),
1259                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1260                }
1261            }
1262            s if s == ConfirmedServiceChoice::ATOMIC_READ_FILE => {
1263                let db = db.read().await;
1264                match handlers::handle_atomic_read_file(&db, &req.service_request, &mut ack_buf) {
1265                    Ok(()) => complex_ack(ack_buf),
1266                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1267                }
1268            }
1269            s if s == ConfirmedServiceChoice::ATOMIC_WRITE_FILE => {
1270                let result = {
1271                    let mut db = db.write().await;
1272                    handlers::handle_atomic_write_file(&mut db, &req.service_request, &mut ack_buf)
1273                };
1274                match result {
1275                    Ok(()) => complex_ack(ack_buf),
1276                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1277                }
1278            }
1279            s if s == ConfirmedServiceChoice::ADD_LIST_ELEMENT => {
1280                let mut db = db.write().await;
1281                match handlers::handle_add_list_element(&mut db, &req.service_request) {
1282                    Ok(()) => simple_ack(),
1283                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1284                }
1285            }
1286            s if s == ConfirmedServiceChoice::REMOVE_LIST_ELEMENT => {
1287                let mut db = db.write().await;
1288                match handlers::handle_remove_list_element(&mut db, &req.service_request) {
1289                    Ok(()) => simple_ack(),
1290                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1291                }
1292            }
1293            s if s == ConfirmedServiceChoice::GET_ALARM_SUMMARY => {
1294                let mut buf = BytesMut::new();
1295                let db = db.read().await;
1296                match handlers::handle_get_alarm_summary(&db, &mut buf) {
1297                    Ok(()) => complex_ack(buf),
1298                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1299                }
1300            }
1301            s if s == ConfirmedServiceChoice::GET_ENROLLMENT_SUMMARY => {
1302                let mut buf = BytesMut::new();
1303                let db = db.read().await;
1304                match handlers::handle_get_enrollment_summary(&db, &req.service_request, &mut buf) {
1305                    Ok(()) => complex_ack(buf),
1306                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1307                }
1308            }
1309            s if s == ConfirmedServiceChoice::CONFIRMED_TEXT_MESSAGE => {
1310                match handlers::handle_text_message(&req.service_request) {
1311                    Ok(_msg) => simple_ack(),
1312                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1313                }
1314            }
1315            s if s == ConfirmedServiceChoice::LIFE_SAFETY_OPERATION => {
1316                match handlers::handle_life_safety_operation(&req.service_request) {
1317                    Ok(()) => simple_ack(),
1318                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1319                }
1320            }
1321            s if s == ConfirmedServiceChoice::SUBSCRIBE_COV_PROPERTY_MULTIPLE => {
1322                let db = db.read().await;
1323                let mut table = cov_table.write().await;
1324                match handlers::handle_subscribe_cov_property_multiple(
1325                    &mut table,
1326                    &db,
1327                    source_mac,
1328                    &req.service_request,
1329                ) {
1330                    Ok(()) => simple_ack(),
1331                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1332                }
1333            }
1334            _ => {
1335                debug!(
1336                    service = service_choice.to_raw(),
1337                    "Unsupported confirmed service"
1338                );
1339                Apdu::Reject(RejectPdu {
1340                    invoke_id,
1341                    reject_reason: RejectReason::UNRECOGNIZED_SERVICE,
1342                })
1343            }
1344        };
1345
1346        if let Apdu::ComplexAck(ref ack) = response {
1347            let mut full_buf = BytesMut::new();
1348            encode_apdu(&mut full_buf, &response);
1349
1350            if full_buf.len() > client_max_apdu as usize {
1351                if !client_accepts_segmented {
1352                    let abort = Apdu::Abort(AbortPdu {
1353                        sent_by_server: true,
1354                        invoke_id,
1355                        abort_reason: AbortReason::SEGMENTATION_NOT_SUPPORTED,
1356                    });
1357                    let mut buf = BytesMut::new();
1358                    encode_apdu(&mut buf, &abort);
1359                    if let Err(e) = network
1360                        .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1361                        .await
1362                    {
1363                        warn!(error = %e, "Failed to send Abort for segmentation-not-supported");
1364                    }
1365                } else {
1366                    let network = Arc::clone(network);
1367                    let seg_ack_senders = Arc::clone(seg_ack_senders);
1368                    let source_mac = MacAddr::from_slice(source_mac);
1369                    let service_ack_data = ack.service_ack.clone();
1370                    tokio::spawn(async move {
1371                        Self::send_segmented_complex_ack(
1372                            &network,
1373                            &seg_ack_senders,
1374                            &source_mac,
1375                            invoke_id,
1376                            service_choice,
1377                            &service_ack_data,
1378                            client_max_apdu,
1379                        )
1380                        .await;
1381                    });
1382                }
1383
1384                for oid in &written_oids {
1385                    Self::fire_event_notifications(db, network, comm_state, server_tsm, oid).await;
1386                }
1387                for oid in &written_oids {
1388                    Self::fire_cov_notifications(
1389                        db,
1390                        network,
1391                        cov_table,
1392                        cov_in_flight,
1393                        server_tsm,
1394                        comm_state,
1395                        config,
1396                        oid,
1397                    )
1398                    .await;
1399                }
1400                return;
1401            }
1402        }
1403
1404        let mut buf = BytesMut::new();
1405        encode_apdu(&mut buf, &response);
1406
1407        if let Some(tx) = reply_tx {
1408            use bacnet_encoding::npdu::{encode_npdu, Npdu};
1409            let apdu_bytes = buf.freeze();
1410            let npdu = Npdu {
1411                is_network_message: false,
1412                expecting_reply: false,
1413                priority: NetworkPriority::NORMAL,
1414                destination: None,
1415                source: None,
1416                payload: apdu_bytes.clone(),
1417                ..Npdu::default()
1418            };
1419            let mut npdu_buf = BytesMut::with_capacity(2 + apdu_bytes.len());
1420            match encode_npdu(&mut npdu_buf, &npdu) {
1421                Ok(()) => {
1422                    let _ = tx.send(npdu_buf.freeze());
1423                }
1424                Err(e) => {
1425                    warn!(error = %e, "Failed to encode NPDU for MS/TP reply");
1426                    if let Err(e) = network
1427                        .send_apdu(&apdu_bytes, source_mac, false, NetworkPriority::NORMAL)
1428                        .await
1429                    {
1430                        warn!(error = %e, "Failed to send response");
1431                    }
1432                }
1433            }
1434        } else if let Err(e) = network
1435            .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1436            .await
1437        {
1438            warn!(error = %e, "Failed to send response");
1439        }
1440
1441        for oid in &written_oids {
1442            Self::fire_event_notifications(db, network, comm_state, server_tsm, oid).await;
1443        }
1444
1445        for oid in &written_oids {
1446            Self::fire_cov_notifications(
1447                db,
1448                network,
1449                cov_table,
1450                cov_in_flight,
1451                server_tsm,
1452                comm_state,
1453                config,
1454                oid,
1455            )
1456            .await;
1457        }
1458    }
1459
1460    /// Send a ComplexAck response using segmented transfer.
1461    ///
1462    /// Splits the service ack data into segments that fit within the client's
1463    /// max APDU length, sends each segment, and waits for SegmentAck from
1464    /// the client before sending the next (window size 1).
1465    async fn send_segmented_complex_ack(
1466        network: &Arc<NetworkLayer<T>>,
1467        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1468        source_mac: &[u8],
1469        invoke_id: u8,
1470        service_choice: ConfirmedServiceChoice,
1471        service_ack_data: &[u8],
1472        client_max_apdu: u16,
1473    ) {
1474        let max_seg_size = max_segment_payload(client_max_apdu, SegmentedPduType::ComplexAck);
1475        let segments = split_payload(service_ack_data, max_seg_size);
1476        let total_segments = segments.len();
1477
1478        if total_segments > 255 {
1479            warn!(
1480                total_segments,
1481                "Response requires too many segments, aborting"
1482            );
1483            let abort = Apdu::Abort(AbortPdu {
1484                sent_by_server: true,
1485                invoke_id,
1486                abort_reason: AbortReason::BUFFER_OVERFLOW,
1487            });
1488            let mut buf = BytesMut::new();
1489            encode_apdu(&mut buf, &abort);
1490            let _ = network
1491                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1492                .await;
1493            return;
1494        }
1495
1496        debug!(
1497            total_segments,
1498            max_seg_size,
1499            payload_len = service_ack_data.len(),
1500            "Starting segmented ComplexAck send"
1501        );
1502
1503        let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
1504        let key = (MacAddr::from_slice(source_mac), invoke_id);
1505        {
1506            seg_ack_senders.lock().await.insert(key.clone(), seg_ack_tx);
1507        }
1508
1509        let seg_timeout = Duration::from_secs(5);
1510        let mut seg_idx: usize = 0;
1511        let mut neg_ack_retries: u8 = 0;
1512
1513        while seg_idx < total_segments {
1514            let is_last = seg_idx == total_segments - 1;
1515
1516            let pdu = Apdu::ComplexAck(ComplexAck {
1517                segmented: true,
1518                more_follows: !is_last,
1519                invoke_id,
1520                sequence_number: Some(seg_idx as u8),
1521                proposed_window_size: Some(1),
1522                service_choice,
1523                service_ack: segments[seg_idx].clone(),
1524            });
1525
1526            let mut buf = BytesMut::with_capacity(client_max_apdu as usize);
1527            encode_apdu(&mut buf, &pdu);
1528
1529            if let Err(e) = network
1530                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1531                .await
1532            {
1533                warn!(error = %e, seq = seg_idx, "Failed to send segment");
1534                break;
1535            }
1536
1537            debug!(seq = seg_idx, is_last, "Sent ComplexAck segment");
1538
1539            if !is_last {
1540                match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1541                    Ok(Some(ack)) => {
1542                        debug!(
1543                            seq = ack.sequence_number,
1544                            negative = ack.negative_ack,
1545                            "Received SegmentAck for ComplexAck"
1546                        );
1547                        if ack.negative_ack {
1548                            neg_ack_retries += 1;
1549                            if neg_ack_retries > MAX_NEG_SEGMENT_ACK_RETRIES {
1550                                warn!(
1551                                    invoke_id,
1552                                    retries = neg_ack_retries,
1553                                    "Too many negative SegmentAck retries, aborting segmented send"
1554                                );
1555                                let abort = Apdu::Abort(AbortPdu {
1556                                    sent_by_server: true,
1557                                    invoke_id,
1558                                    abort_reason: AbortReason::TSM_TIMEOUT,
1559                                });
1560                                let mut abort_buf = BytesMut::new();
1561                                encode_apdu(&mut abort_buf, &abort);
1562                                let _ = network
1563                                    .send_apdu(
1564                                        &abort_buf,
1565                                        source_mac,
1566                                        false,
1567                                        NetworkPriority::NORMAL,
1568                                    )
1569                                    .await;
1570                                break;
1571                            }
1572                            let requested = ack.sequence_number as usize;
1573                            if requested >= total_segments {
1574                                tracing::warn!(
1575                                    seq = requested,
1576                                    total = total_segments,
1577                                    "negative SegmentAck requests out-of-range sequence, aborting"
1578                                );
1579                                break;
1580                            }
1581                            debug!(
1582                                seq = ack.sequence_number,
1583                                "Negative SegmentAck — retransmitting from requested sequence"
1584                            );
1585                            seg_idx = requested;
1586                            continue;
1587                        }
1588                    }
1589                    Ok(None) => {
1590                        warn!("SegmentAck channel closed during segmented send");
1591                        break;
1592                    }
1593                    Err(_) => {
1594                        warn!(
1595                            seq = seg_idx,
1596                            "Timeout waiting for SegmentAck, aborting segmented send"
1597                        );
1598                        let abort = Apdu::Abort(AbortPdu {
1599                            sent_by_server: true,
1600                            invoke_id,
1601                            abort_reason: AbortReason::TSM_TIMEOUT,
1602                        });
1603                        let mut abort_buf = BytesMut::new();
1604                        encode_apdu(&mut abort_buf, &abort);
1605                        let _ = network
1606                            .send_apdu(&abort_buf, source_mac, false, NetworkPriority::NORMAL)
1607                            .await;
1608                        break;
1609                    }
1610                }
1611            }
1612
1613            seg_idx += 1;
1614        }
1615
1616        match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1617            Ok(Some(_ack)) => {
1618                debug!("Received final SegmentAck for ComplexAck");
1619            }
1620            _ => {
1621                warn!("No final SegmentAck received for ComplexAck");
1622            }
1623        }
1624
1625        seg_ack_senders.lock().await.remove(&key);
1626    }
1627
1628    /// Handle an unconfirmed request (e.g., WhoIs).
1629    async fn handle_unconfirmed_request(
1630        db: &Arc<RwLock<ObjectDatabase>>,
1631        network: &Arc<NetworkLayer<T>>,
1632        config: &ServerConfig,
1633        comm_state: &Arc<AtomicU8>,
1634        req: UnconfirmedRequestPdu,
1635        received: &bacnet_network::layer::ReceivedApdu,
1636    ) {
1637        let comm = comm_state.load(Ordering::Acquire);
1638        if comm == 1 {
1639            tracing::debug!("Dropping unconfirmed service: DCC is DISABLE");
1640            return;
1641        }
1642
1643        if req.service_choice == UnconfirmedServiceChoice::WHO_IS {
1644            let who_is = match WhoIsRequest::decode(&req.service_request) {
1645                Ok(r) => r,
1646                Err(e) => {
1647                    warn!(error = %e, "Failed to decode WhoIs");
1648                    return;
1649                }
1650            };
1651
1652            let db = db.read().await;
1653            let device_oid = db
1654                .list_objects()
1655                .into_iter()
1656                .find(|oid| oid.object_type() == ObjectType::DEVICE);
1657
1658            if let Some(device_oid) = device_oid {
1659                let instance = device_oid.instance_number();
1660
1661                let in_range = match (who_is.low_limit, who_is.high_limit) {
1662                    (Some(low), Some(high)) => instance >= low && instance <= high,
1663                    _ => true,
1664                };
1665
1666                if in_range {
1667                    let i_am = IAmRequest {
1668                        object_identifier: device_oid,
1669                        max_apdu_length: config.max_apdu_length,
1670                        segmentation_supported: config.segmentation_supported,
1671                        vendor_id: config.vendor_id,
1672                    };
1673
1674                    let mut service_buf = BytesMut::new();
1675                    i_am.encode(&mut service_buf);
1676
1677                    let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1678                        service_choice: UnconfirmedServiceChoice::I_AM,
1679                        service_request: Bytes::from(service_buf.to_vec()),
1680                    });
1681
1682                    let mut buf = BytesMut::new();
1683                    encode_apdu(&mut buf, &pdu);
1684
1685                    if let Some(ref source_net) = received.source_network {
1686                        if let Err(e) = network
1687                            .send_apdu_routed(
1688                                &buf,
1689                                source_net.network,
1690                                &source_net.mac_address,
1691                                &received.source_mac,
1692                                false,
1693                                NetworkPriority::NORMAL,
1694                            )
1695                            .await
1696                        {
1697                            warn!(error = %e, "Failed to route IAm back to remote requester");
1698                        }
1699                    } else if let Err(e) = network
1700                        .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1701                        .await
1702                    {
1703                        warn!(error = %e, "Failed to send IAm broadcast");
1704                    }
1705                }
1706            }
1707        } else if req.service_choice == UnconfirmedServiceChoice::WHO_HAS {
1708            let db = db.read().await;
1709            let device_oid = db
1710                .list_objects()
1711                .into_iter()
1712                .find(|oid| oid.object_type() == ObjectType::DEVICE);
1713
1714            if let Some(device_oid) = device_oid {
1715                match handlers::handle_who_has(&db, &req.service_request, device_oid) {
1716                    Ok(Some(i_have)) => {
1717                        let mut service_buf = BytesMut::new();
1718                        if let Err(e) = i_have.encode(&mut service_buf) {
1719                            warn!(error = %e, "Failed to encode IHave");
1720                        } else {
1721                            let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1722                                service_choice: UnconfirmedServiceChoice::I_HAVE,
1723                                service_request: Bytes::from(service_buf.to_vec()),
1724                            });
1725
1726                            let mut buf = BytesMut::new();
1727                            encode_apdu(&mut buf, &pdu);
1728
1729                            if let Err(e) = network
1730                                .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1731                                .await
1732                            {
1733                                warn!(error = %e, "Failed to send IHave broadcast");
1734                            }
1735                        }
1736                    }
1737                    Ok(None) => {}
1738                    Err(e) => {
1739                        warn!(error = %e, "Failed to decode WhoHas");
1740                    }
1741                }
1742            }
1743        } else if req.service_choice == UnconfirmedServiceChoice::TIME_SYNCHRONIZATION
1744            || req.service_choice == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION
1745        {
1746            debug!("Received time synchronization request");
1747            if let Some(ref callback) = config.on_time_sync {
1748                let data = TimeSyncData {
1749                    raw_service_data: req.service_request.clone(),
1750                    is_utc: req.service_choice
1751                        == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1752                };
1753                callback(data);
1754            }
1755        } else if req.service_choice == UnconfirmedServiceChoice::WRITE_GROUP {
1756            match handlers::handle_write_group(&req.service_request) {
1757                Ok(write_group) => {
1758                    debug!(
1759                        group = write_group.group_number,
1760                        priority = write_group.write_priority,
1761                        values = write_group.change_list.len(),
1762                        "WriteGroup received"
1763                    );
1764                }
1765                Err(e) => {
1766                    debug!(error = %e, "WriteGroup decode failed");
1767                }
1768            }
1769        } else if req.service_choice == UnconfirmedServiceChoice::UNCONFIRMED_TEXT_MESSAGE {
1770            match handlers::handle_text_message(&req.service_request) {
1771                Ok(msg) => {
1772                    debug!(
1773                        source = ?msg.source_device,
1774                        priority = ?msg.message_priority,
1775                        "UnconfirmedTextMessage: {}",
1776                        msg.message
1777                    );
1778                }
1779                Err(e) => {
1780                    debug!(error = %e, "UnconfirmedTextMessage decode failed");
1781                }
1782            }
1783        } else {
1784            debug!(
1785                service = req.service_choice.to_raw(),
1786                "Ignoring unsupported unconfirmed service"
1787            );
1788        }
1789    }
1790
1791    /// Evaluate intrinsic reporting on an object and send event notifications
1792    /// to NotificationClass recipients (or broadcast if none configured).
1793    /// Skipped when DCC is active (comm_state >= 1).
1794    async fn fire_event_notifications(
1795        db: &Arc<RwLock<ObjectDatabase>>,
1796        network: &Arc<NetworkLayer<T>>,
1797        comm_state: &Arc<AtomicU8>,
1798        server_tsm: &Arc<Mutex<ServerTsm>>,
1799        oid: &ObjectIdentifier,
1800    ) {
1801        if comm_state.load(Ordering::Acquire) >= 1 {
1802            return;
1803        }
1804
1805        let now = std::time::SystemTime::now()
1806            .duration_since(std::time::UNIX_EPOCH)
1807            .unwrap_or_default();
1808        let total_secs = now.as_secs();
1809        let dow = ((total_secs / 86400 + 3) % 7) as u8;
1810        let today_bit = 1u8 << dow;
1811        let day_secs = (total_secs % 86400) as u32;
1812        let current_time = Time {
1813            hour: (day_secs / 3600) as u8,
1814            minute: ((day_secs % 3600) / 60) as u8,
1815            second: (day_secs % 60) as u8,
1816            hundredths: (now.subsec_millis() / 10) as u8,
1817        };
1818
1819        let (notification, recipients) = {
1820            let mut db = db.write().await;
1821
1822            let device_oid = db
1823                .list_objects()
1824                .into_iter()
1825                .find(|o| o.object_type() == ObjectType::DEVICE)
1826                .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
1827
1828            let object = match db.get_mut(oid) {
1829                Some(o) => o,
1830                None => return,
1831            };
1832
1833            let change = match object.evaluate_intrinsic_reporting() {
1834                Some(c) => c,
1835                None => return,
1836            };
1837
1838            let notification_class = object
1839                .read_property(PropertyIdentifier::NOTIFICATION_CLASS, None)
1840                .ok()
1841                .and_then(|v| match v {
1842                    PropertyValue::Unsigned(n) => Some(n as u32),
1843                    _ => None,
1844                })
1845                .unwrap_or(0);
1846
1847            let notify_type = object
1848                .read_property(PropertyIdentifier::NOTIFY_TYPE, None)
1849                .ok()
1850                .and_then(|v| match v {
1851                    PropertyValue::Enumerated(n) => Some(n),
1852                    _ => None,
1853                })
1854                .unwrap_or(NotifyType::ALARM.to_raw());
1855
1856            let priority = if change.to == bacnet_types::enums::EventState::NORMAL {
1857                200u8
1858            } else {
1859                100u8
1860            };
1861
1862            let transition = change.transition();
1863
1864            let base_notification = EventNotificationRequest {
1865                process_identifier: 0,
1866                initiating_device_identifier: device_oid,
1867                event_object_identifier: *oid,
1868                timestamp: BACnetTimeStamp::SequenceNumber(total_secs),
1869                notification_class,
1870                priority,
1871                event_type: change.event_type().to_raw(),
1872                message_text: None,
1873                notify_type,
1874                ack_required: notify_type == NotifyType::ALARM.to_raw(),
1875                from_state: change.from.to_raw(),
1876                to_state: change.to.to_raw(),
1877                event_values: None,
1878            };
1879
1880            let recipients = get_notification_recipients(
1881                &db,
1882                notification_class,
1883                transition,
1884                today_bit,
1885                &current_time,
1886            );
1887
1888            (base_notification, recipients)
1889        };
1890
1891        if recipients.is_empty() {
1892            let mut service_buf = BytesMut::new();
1893            if let Err(e) = notification.encode(&mut service_buf) {
1894                warn!(error = %e, "Failed to encode EventNotification");
1895                return;
1896            }
1897
1898            let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1899                service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1900                service_request: Bytes::from(service_buf.to_vec()),
1901            });
1902
1903            let mut buf = BytesMut::new();
1904            encode_apdu(&mut buf, &pdu);
1905
1906            if let Err(e) = network
1907                .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1908                .await
1909            {
1910                warn!(error = %e, "Failed to broadcast EventNotification");
1911            }
1912        } else {
1913            for (recipient, process_id, confirmed) in &recipients {
1914                let mut targeted = notification.clone();
1915                targeted.process_identifier = *process_id;
1916
1917                let mut service_buf = BytesMut::new();
1918                if let Err(e) = targeted.encode(&mut service_buf) {
1919                    warn!(error = %e, "Failed to encode EventNotification");
1920                    continue;
1921                }
1922
1923                let service_bytes = Bytes::from(service_buf.to_vec());
1924
1925                if *confirmed {
1926                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1927                        segmented: false,
1928                        more_follows: false,
1929                        segmented_response_accepted: false,
1930                        max_segments: None,
1931                        max_apdu_length: 1476,
1932                        invoke_id: server_tsm.lock().await.allocate().0,
1933                        sequence_number: None,
1934                        proposed_window_size: None,
1935                        service_choice: ConfirmedServiceChoice::CONFIRMED_EVENT_NOTIFICATION,
1936                        service_request: service_bytes,
1937                    });
1938
1939                    let mut buf = BytesMut::new();
1940                    encode_apdu(&mut buf, &pdu);
1941
1942                    match recipient {
1943                        bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1944                            if let Err(e) = network
1945                                .send_apdu(&buf, &addr.mac_address, true, NetworkPriority::NORMAL)
1946                                .await
1947                            {
1948                                warn!(error = %e, "Failed to send confirmed EventNotification");
1949                            }
1950                        }
1951                        bacnet_types::constructed::BACnetRecipient::Device(_) => {
1952                            if let Err(e) = network
1953                                .broadcast_apdu(&buf, true, NetworkPriority::NORMAL)
1954                                .await
1955                            {
1956                                warn!(error = %e, "Failed to broadcast confirmed EventNotification");
1957                            }
1958                        }
1959                    }
1960                } else {
1961                    let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1962                        service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1963                        service_request: service_bytes,
1964                    });
1965
1966                    let mut buf = BytesMut::new();
1967                    encode_apdu(&mut buf, &pdu);
1968
1969                    match recipient {
1970                        bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1971                            if let Err(e) = network
1972                                .send_apdu(&buf, &addr.mac_address, false, NetworkPriority::NORMAL)
1973                                .await
1974                            {
1975                                warn!(
1976                                    error = %e,
1977                                    "Failed to send unconfirmed EventNotification"
1978                                );
1979                            }
1980                        }
1981                        bacnet_types::constructed::BACnetRecipient::Device(_) => {
1982                            if let Err(e) = network
1983                                .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1984                                .await
1985                            {
1986                                warn!(
1987                                    error = %e,
1988                                    "Failed to broadcast unconfirmed EventNotification"
1989                                );
1990                            }
1991                        }
1992                    }
1993                }
1994            }
1995        }
1996    }
1997
1998    /// Fire COV notifications for all active subscriptions on the given object.
1999    /// Skipped when DCC is active (comm_state >= 1).
2000    #[allow(clippy::too_many_arguments)]
2001    async fn fire_cov_notifications(
2002        db: &Arc<RwLock<ObjectDatabase>>,
2003        network: &Arc<NetworkLayer<T>>,
2004        cov_table: &Arc<RwLock<CovSubscriptionTable>>,
2005        cov_in_flight: &Arc<Semaphore>,
2006        server_tsm: &Arc<Mutex<ServerTsm>>,
2007        comm_state: &Arc<AtomicU8>,
2008        config: &ServerConfig,
2009        oid: &ObjectIdentifier,
2010    ) {
2011        if comm_state.load(Ordering::Acquire) >= 1 {
2012            return;
2013        }
2014        let subs: Vec<crate::cov::CovSubscription> = {
2015            let mut table = cov_table.write().await;
2016            table.subscriptions_for(oid).into_iter().cloned().collect()
2017        };
2018
2019        if subs.is_empty() {
2020            return;
2021        }
2022
2023        let (device_oid, values, current_pv, cov_increment) = {
2024            let db = db.read().await;
2025            let object = match db.get(oid) {
2026                Some(o) => o,
2027                None => return,
2028            };
2029
2030            let cov_increment = object.cov_increment();
2031
2032            let mut current_pv: Option<f32> = None;
2033            let mut values = Vec::new();
2034            if let Ok(pv) = object.read_property(PropertyIdentifier::PRESENT_VALUE, None) {
2035                if let PropertyValue::Real(v) = &pv {
2036                    current_pv = Some(*v);
2037                }
2038                let mut buf = BytesMut::new();
2039                if encode_property_value(&mut buf, &pv).is_ok() {
2040                    values.push(BACnetPropertyValue {
2041                        property_identifier: PropertyIdentifier::PRESENT_VALUE,
2042                        property_array_index: None,
2043                        value: buf.to_vec(),
2044                        priority: None,
2045                    });
2046                }
2047            }
2048            if let Ok(sf) = object.read_property(PropertyIdentifier::STATUS_FLAGS, None) {
2049                let mut buf = BytesMut::new();
2050                if encode_property_value(&mut buf, &sf).is_ok() {
2051                    values.push(BACnetPropertyValue {
2052                        property_identifier: PropertyIdentifier::STATUS_FLAGS,
2053                        property_array_index: None,
2054                        value: buf.to_vec(),
2055                        priority: None,
2056                    });
2057                }
2058            }
2059
2060            let device_oid = db
2061                .list_objects()
2062                .into_iter()
2063                .find(|o| o.object_type() == ObjectType::DEVICE)
2064                .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
2065
2066            (device_oid, values, current_pv, cov_increment)
2067        };
2068
2069        if values.is_empty() {
2070            return;
2071        }
2072
2073        for sub in &subs {
2074            if !CovSubscriptionTable::should_notify(sub, current_pv, cov_increment) {
2075                continue;
2076            }
2077            let time_remaining = sub.expires_at.map_or(0, |exp| {
2078                exp.saturating_duration_since(Instant::now()).as_secs() as u32
2079            });
2080
2081            let notification_values = if let Some(prop) = sub.monitored_property {
2082                let db = db.read().await;
2083                if let Some(object) = db.get(oid) {
2084                    if let Ok(pv) = object.read_property(prop, sub.monitored_property_array_index) {
2085                        let mut buf = BytesMut::new();
2086                        if encode_property_value(&mut buf, &pv).is_ok() {
2087                            vec![BACnetPropertyValue {
2088                                property_identifier: prop,
2089                                property_array_index: sub.monitored_property_array_index,
2090                                value: buf.to_vec(),
2091                                priority: None,
2092                            }]
2093                        } else {
2094                            values.clone()
2095                        }
2096                    } else {
2097                        values.clone()
2098                    }
2099                } else {
2100                    values.clone()
2101                }
2102            } else {
2103                values.clone()
2104            };
2105
2106            let notification = COVNotificationRequest {
2107                subscriber_process_identifier: sub.subscriber_process_identifier,
2108                initiating_device_identifier: device_oid,
2109                monitored_object_identifier: *oid,
2110                time_remaining,
2111                list_of_values: notification_values,
2112            };
2113
2114            let mut service_buf = BytesMut::new();
2115            notification.encode(&mut service_buf);
2116
2117            if sub.issue_confirmed_notifications {
2118                let permit = match cov_in_flight.clone().try_acquire_owned() {
2119                    Ok(permit) => permit,
2120                    Err(_) => {
2121                        warn!(
2122                            object = ?oid,
2123                            "255 confirmed COV notifications in-flight, skipping notification"
2124                        );
2125                        continue;
2126                    }
2127                };
2128
2129                let (id, result_rx) = {
2130                    let mut tsm = server_tsm.lock().await;
2131                    tsm.allocate()
2132                };
2133
2134                let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
2135                    segmented: false,
2136                    more_follows: false,
2137                    segmented_response_accepted: false,
2138                    max_segments: None,
2139                    max_apdu_length: config.max_apdu_length as u16,
2140                    invoke_id: id,
2141                    sequence_number: None,
2142                    proposed_window_size: None,
2143                    service_choice: ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION,
2144                    service_request: Bytes::from(service_buf.to_vec()),
2145                });
2146
2147                let mut buf = BytesMut::new();
2148                encode_apdu(&mut buf, &pdu);
2149
2150                if let Some(pv) = current_pv {
2151                    let mut table = cov_table.write().await;
2152                    table.set_last_notified_value(
2153                        &sub.subscriber_mac,
2154                        sub.subscriber_process_identifier,
2155                        sub.monitored_object_identifier,
2156                        sub.monitored_property,
2157                        pv,
2158                    );
2159                }
2160
2161                let network = Arc::clone(network);
2162                let mac = sub.subscriber_mac.clone();
2163                let apdu_timeout = Duration::from_millis(config.cov_retry_timeout_ms);
2164                let tsm = Arc::clone(server_tsm);
2165                let apdu_retries = DEFAULT_APDU_RETRIES;
2166                tokio::spawn(async move {
2167                    let _permit = permit;
2168                    let mut pending_rx: Option<oneshot::Receiver<CovAckResult>> = Some(result_rx);
2169
2170                    for attempt in 0..=apdu_retries {
2171                        if let Err(e) = network
2172                            .send_apdu(&buf, &mac, true, NetworkPriority::NORMAL)
2173                            .await
2174                        {
2175                            warn!(error = %e, attempt, "COV notification send failed");
2176                        } else {
2177                            debug!(invoke_id = id, attempt, "Confirmed COV notification sent");
2178                        }
2179
2180                        let rx = pending_rx
2181                            .take()
2182                            .expect("receiver always set for each attempt");
2183                        let result = match tokio::time::timeout(apdu_timeout, rx).await {
2184                            Ok(Ok(r)) => Ok(r),
2185                            Ok(Err(_)) => Err(()), // channel closed
2186                            Err(_) => Err(()),     // timeout
2187                        };
2188
2189                        if result.is_err() && attempt < apdu_retries {
2190                            let (tx, new_rx) = oneshot::channel();
2191                            tsm.lock().await.pending.insert(id, tx);
2192                            pending_rx = Some(new_rx);
2193                        }
2194
2195                        match result {
2196                            Ok(CovAckResult::Ack) => {
2197                                debug!(invoke_id = id, "COV notification acknowledged");
2198                                return;
2199                            }
2200                            Ok(CovAckResult::Error) => {
2201                                warn!(invoke_id = id, "COV notification rejected by subscriber");
2202                                return;
2203                            }
2204                            Err(_) => {
2205                                if attempt < apdu_retries {
2206                                    debug!(
2207                                        invoke_id = id,
2208                                        attempt, "COV notification timeout, retrying"
2209                                    );
2210                                } else {
2211                                    warn!(
2212                                        invoke_id = id,
2213                                        "COV notification failed after {} retries", apdu_retries
2214                                    );
2215                                }
2216                            }
2217                        }
2218                    }
2219
2220                    let mut tsm = tsm.lock().await;
2221                    tsm.remove(id);
2222                });
2223            } else {
2224                let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
2225                    service_choice: UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION,
2226                    service_request: Bytes::from(service_buf.to_vec()),
2227                });
2228
2229                let mut buf = BytesMut::new();
2230                encode_apdu(&mut buf, &pdu);
2231
2232                if let Err(e) = network
2233                    .send_apdu(&buf, &sub.subscriber_mac, false, NetworkPriority::NORMAL)
2234                    .await
2235                {
2236                    warn!(error = %e, "Failed to send COV notification");
2237                } else if let Some(pv) = current_pv {
2238                    let mut table = cov_table.write().await;
2239                    table.set_last_notified_value(
2240                        &sub.subscriber_mac,
2241                        sub.subscriber_process_identifier,
2242                        sub.monitored_object_identifier,
2243                        sub.monitored_property,
2244                        pv,
2245                    );
2246                }
2247            }
2248        }
2249    }
2250
2251    /// Convert an Error into an Error APDU.
2252    fn error_apdu_from_error(
2253        invoke_id: u8,
2254        service_choice: ConfirmedServiceChoice,
2255        error: &Error,
2256    ) -> Apdu {
2257        let (class, code) = match error {
2258            Error::Protocol { class, code } => (*class, *code),
2259            _ => (
2260                ErrorClass::SERVICES.to_raw() as u32,
2261                ErrorCode::OTHER.to_raw() as u32,
2262            ),
2263        };
2264        Apdu::Error(ErrorPdu {
2265            invoke_id,
2266            service_choice,
2267            error_class: ErrorClass::from_raw(class as u16),
2268            error_code: ErrorCode::from_raw(code as u16),
2269            error_data: Bytes::new(),
2270        })
2271    }
2272}
2273
2274#[cfg(test)]
2275mod tests {
2276    use super::*;
2277
2278    #[test]
2279    fn server_config_cov_retry_timeout_default() {
2280        let config = ServerConfig::default();
2281        assert_eq!(config.cov_retry_timeout_ms, 3000);
2282    }
2283
2284    #[test]
2285    fn server_config_time_sync_callback_default_is_none() {
2286        let config = ServerConfig::default();
2287        assert!(config.on_time_sync.is_none());
2288    }
2289
2290    // -----------------------------------------------------------------------
2291    // ServerTsm unit tests
2292    // -----------------------------------------------------------------------
2293
2294    #[test]
2295    fn server_tsm_allocate_increments() {
2296        let mut tsm = ServerTsm::new();
2297        assert_eq!(tsm.allocate().0, 0);
2298        assert_eq!(tsm.allocate().0, 1);
2299        assert_eq!(tsm.allocate().0, 2);
2300    }
2301
2302    #[test]
2303    fn server_tsm_allocate_wraps_at_255() {
2304        let mut tsm = ServerTsm::new();
2305        tsm.next_invoke_id = 255;
2306        assert_eq!(tsm.allocate().0, 255);
2307        assert_eq!(tsm.allocate().0, 0); // wraps
2308    }
2309
2310    #[test]
2311    fn server_tsm_record_and_take_ack() {
2312        let mut tsm = ServerTsm::new();
2313        let (_id, rx) = tsm.allocate();
2314        tsm.record_result(_id, CovAckResult::Ack);
2315        // Result should be delivered via the oneshot channel
2316        assert_eq!(rx.blocking_recv(), Ok(CovAckResult::Ack));
2317    }
2318
2319    #[test]
2320    fn server_tsm_record_and_take_error() {
2321        let mut tsm = ServerTsm::new();
2322        let (id, rx) = tsm.allocate();
2323        tsm.record_result(id, CovAckResult::Error);
2324        // Oneshot delivers immediately
2325        assert_eq!(rx.blocking_recv(), Ok(CovAckResult::Error));
2326    }
2327
2328    #[test]
2329    fn server_tsm_record_nonexistent_is_noop() {
2330        let mut tsm = ServerTsm::new();
2331        // Recording a result for an ID with no receiver is a no-op
2332        tsm.record_result(99, CovAckResult::Ack);
2333        assert!(tsm.pending.is_empty());
2334    }
2335
2336    #[test]
2337    fn server_tsm_remove_cleans_up() {
2338        let mut tsm = ServerTsm::new();
2339        let (id, _rx) = tsm.allocate();
2340        tsm.remove(id);
2341        assert!(!tsm.pending.contains_key(&id));
2342    }
2343
2344    #[test]
2345    fn server_tsm_multiple_pending() {
2346        let mut tsm = ServerTsm::new();
2347        let (id1, rx1) = tsm.allocate();
2348        let (id2, rx2) = tsm.allocate();
2349        let (id3, rx3) = tsm.allocate();
2350
2351        tsm.record_result(id2, CovAckResult::Error);
2352        tsm.record_result(id1, CovAckResult::Ack);
2353        tsm.record_result(id3, CovAckResult::Ack);
2354
2355        assert_eq!(rx2.blocking_recv(), Ok(CovAckResult::Error));
2356        assert_eq!(rx1.blocking_recv(), Ok(CovAckResult::Ack));
2357        assert_eq!(rx3.blocking_recv(), Ok(CovAckResult::Ack));
2358    }
2359
2360    #[test]
2361    fn cov_ack_result_debug_and_eq() {
2362        // Ensure derived traits work.
2363        assert_eq!(CovAckResult::Ack, CovAckResult::Ack);
2364        assert_ne!(CovAckResult::Ack, CovAckResult::Error);
2365        let _debug = format!("{:?}", CovAckResult::Ack);
2366    }
2367
2368    #[test]
2369    fn default_apdu_retries_constant() {
2370        assert_eq!(DEFAULT_APDU_RETRIES, 3);
2371    }
2372
2373    #[test]
2374    fn seg_receiver_timeout_is_4s() {
2375        assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2376    }
2377
2378    #[test]
2379    fn max_neg_segment_ack_retries_constant() {
2380        assert_eq!(MAX_NEG_SEGMENT_ACK_RETRIES, 3);
2381    }
2382}