Skip to main content

bacnet_server/
server.rs

1//! BACnetServer: builder, APDU dispatch, and lifecycle management.
2//!
3//! The server wraps a NetworkLayer behind Arc (shared with the dispatch task),
4//! owns an ObjectDatabase via Arc<Mutex>, and spawns a dispatch task that
5//! routes incoming APDUs to service handlers.
6
7use std::collections::HashMap;
8use std::net::Ipv4Addr;
9use std::sync::atomic::{AtomicU8, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
15use tokio::task::JoinHandle;
16use tokio::time::Duration;
17use tracing::{debug, warn};
18
19use bacnet_encoding::apdu::{
20    self, encode_apdu, AbortPdu, Apdu, ComplexAck, ConfirmedRequest as ConfirmedRequestPdu,
21    ErrorPdu, RejectPdu, SegmentAck as SegmentAckPdu, SimpleAck,
22    UnconfirmedRequest as UnconfirmedRequestPdu,
23};
24use bacnet_encoding::primitives::encode_property_value;
25use bacnet_encoding::segmentation::{
26    max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType,
27};
28use bacnet_network::layer::NetworkLayer;
29use bacnet_objects::database::ObjectDatabase;
30use bacnet_objects::notification_class::get_notification_recipients;
31use bacnet_services::alarm_event::EventNotificationRequest;
32use bacnet_services::common::BACnetPropertyValue;
33use bacnet_services::cov::COVNotificationRequest;
34use bacnet_services::who_is::{IAmRequest, WhoIsRequest};
35use bacnet_transport::bip::BipTransport;
36use bacnet_transport::port::TransportPort;
37use bacnet_types::enums::{
38    AbortReason, ConfirmedServiceChoice, ErrorClass, ErrorCode, NetworkPriority, NotifyType,
39    ObjectType, PropertyIdentifier, RejectReason, Segmentation, UnconfirmedServiceChoice,
40};
41use bacnet_types::error::Error;
42use bacnet_types::primitives::{BACnetTimeStamp, ObjectIdentifier, PropertyValue, Time};
43use bacnet_types::MacAddr;
44
45use crate::cov::CovSubscriptionTable;
46use crate::handlers;
47
48/// Maximum number of concurrent segmented reassembly sessions.
49const MAX_SEG_RECEIVERS: usize = 128;
50
51/// Timeout for idle segmented reassembly sessions (Clause 9.1.6).
52const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
53
54/// Maximum negative SegmentAck retries during segmented response send.
55const MAX_NEG_SEGMENT_ACK_RETRIES: u8 = 3;
56
57/// Default number of APDU retries for confirmed COV notifications (Clause 10.6.3).
58const DEFAULT_APDU_RETRIES: u8 = 3;
59
60// ---------------------------------------------------------------------------
61// Server-side Transaction State Machine (TSM) for outgoing confirmed requests
62// ---------------------------------------------------------------------------
63
64/// Result of a confirmed COV notification from the subscriber's perspective.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CovAckResult {
67    /// SimpleAck received — subscriber accepted the notification.
68    Ack,
69    /// Error or Reject/Abort received — subscriber rejected the notification.
70    Error,
71}
72
73/// Lightweight TSM for tracking outgoing confirmed COV notifications.
74///
75/// The server allocates an invoke ID for each confirmed notification and the
76/// dispatch loop writes the result into a shared map when a SimpleAck, Error,
77/// Reject, or Abort is received.  The per-subscriber retry task polls the map
78/// after each timeout to decide whether to resend.
79pub struct ServerTsm {
80    next_invoke_id: u8,
81    /// Results written by the dispatch loop, read by retry tasks.
82    pending: HashMap<u8, CovAckResult>,
83}
84
85impl ServerTsm {
86    fn new() -> Self {
87        Self {
88            next_invoke_id: 0,
89            pending: HashMap::new(),
90        }
91    }
92
93    /// Allocate the next invoke ID.  The semaphore guarantees at most 255
94    /// concurrent callers, so wrapping is safe.
95    fn allocate(&mut self) -> u8 {
96        let id = self.next_invoke_id;
97        self.next_invoke_id = self.next_invoke_id.wrapping_add(1);
98        id
99    }
100
101    /// Record a result from the dispatch loop (SimpleAck, Error, etc.).
102    fn record_result(&mut self, invoke_id: u8, result: CovAckResult) {
103        self.pending.insert(invoke_id, result);
104    }
105
106    /// Take the result for a given invoke ID (returns and removes it).
107    fn take_result(&mut self, invoke_id: u8) -> Option<CovAckResult> {
108        self.pending.remove(&invoke_id)
109    }
110
111    /// Remove a pending entry (cleanup on completion or exhaustion).
112    fn remove(&mut self, invoke_id: u8) {
113        self.pending.remove(&invoke_id);
114    }
115}
116
117/// Data from a TimeSynchronization request.
118#[derive(Debug, Clone)]
119pub struct TimeSyncData {
120    /// Raw service request bytes (caller can decode if needed).
121    pub raw_service_data: Bytes,
122    /// Whether this was a UTC time sync (vs. local).
123    pub is_utc: bool,
124}
125
126/// Server configuration.
127#[derive(Clone)]
128pub struct ServerConfig {
129    /// Local interface to bind.
130    pub interface: Ipv4Addr,
131    /// UDP port (default 0xBAC0 = 47808).
132    pub port: u16,
133    /// Directed broadcast address.
134    pub broadcast_address: Ipv4Addr,
135    /// Maximum APDU length accepted.
136    pub max_apdu_length: u32,
137    /// Segmentation support level.
138    pub segmentation_supported: Segmentation,
139    /// Vendor identifier.
140    pub vendor_id: u16,
141    /// Timeout in ms before retrying a failed confirmed COV notification send (default 3000ms).
142    pub cov_retry_timeout_ms: u64,
143    /// Optional callback invoked when a TimeSynchronization request is received.
144    pub on_time_sync: Option<Arc<dyn Fn(TimeSyncData) + Send + Sync>>,
145    /// Optional password required for DeviceCommunicationControl (Clause 16.4.1).
146    pub dcc_password: Option<String>,
147    /// Optional password required for ReinitializeDevice (Clause 16.4.2).
148    pub reinit_password: Option<String>,
149    /// Enable periodic fault detection / reliability evaluation (Clause 12).
150    /// When true, the server evaluates analog objects every 10 s for
151    /// OVER_RANGE / UNDER_RANGE faults.
152    pub enable_fault_detection: bool,
153}
154
155impl std::fmt::Debug for ServerConfig {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        f.debug_struct("ServerConfig")
158            .field("interface", &self.interface)
159            .field("port", &self.port)
160            .field("broadcast_address", &self.broadcast_address)
161            .field("max_apdu_length", &self.max_apdu_length)
162            .field("segmentation_supported", &self.segmentation_supported)
163            .field("vendor_id", &self.vendor_id)
164            .field("cov_retry_timeout_ms", &self.cov_retry_timeout_ms)
165            .field(
166                "on_time_sync",
167                &self.on_time_sync.as_ref().map(|_| "<callback>"),
168            )
169            .field("dcc_password", &self.dcc_password.as_ref().map(|_| "***"))
170            .field(
171                "reinit_password",
172                &self.reinit_password.as_ref().map(|_| "***"),
173            )
174            .field("enable_fault_detection", &self.enable_fault_detection)
175            .finish()
176    }
177}
178
179impl Default for ServerConfig {
180    fn default() -> Self {
181        Self {
182            interface: Ipv4Addr::UNSPECIFIED,
183            port: 0xBAC0,
184            broadcast_address: Ipv4Addr::BROADCAST,
185            max_apdu_length: 1476,
186            segmentation_supported: Segmentation::NONE,
187            vendor_id: 0,
188            cov_retry_timeout_ms: 3000,
189            on_time_sync: None,
190            dcc_password: None,
191            reinit_password: None,
192            enable_fault_detection: false,
193        }
194    }
195}
196
197/// Generic builder for BACnetServer with a pre-built transport.
198pub struct ServerBuilder<T: TransportPort> {
199    config: ServerConfig,
200    db: ObjectDatabase,
201    transport: Option<T>,
202}
203
204impl<T: TransportPort + 'static> ServerBuilder<T> {
205    /// Set the object database (transfers ownership).
206    pub fn database(mut self, db: ObjectDatabase) -> Self {
207        self.db = db;
208        self
209    }
210
211    /// Set the pre-built transport.
212    pub fn transport(mut self, transport: T) -> Self {
213        self.transport = Some(transport);
214        self
215    }
216
217    /// Set the password required for DeviceCommunicationControl requests.
218    pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
219        self.config.dcc_password = Some(password.into());
220        self
221    }
222
223    /// Set the password required for ReinitializeDevice requests.
224    pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
225        self.config.reinit_password = Some(password.into());
226        self
227    }
228
229    /// Enable periodic fault detection / reliability evaluation.
230    pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
231        self.config.enable_fault_detection = enabled;
232        self
233    }
234
235    /// Build and start the server.
236    pub async fn build(self) -> Result<BACnetServer<T>, Error> {
237        let transport = self
238            .transport
239            .ok_or_else(|| Error::Encoding("transport not set on ServerBuilder".into()))?;
240        BACnetServer::start(self.config, self.db, transport).await
241    }
242}
243
244/// BIP-specific builder that constructs `BipTransport` from interface/port/broadcast fields.
245pub struct BipServerBuilder {
246    config: ServerConfig,
247    db: ObjectDatabase,
248}
249
250impl BipServerBuilder {
251    /// Set the local interface IP.
252    pub fn interface(mut self, ip: Ipv4Addr) -> Self {
253        self.config.interface = ip;
254        self
255    }
256
257    /// Set the UDP port.
258    pub fn port(mut self, port: u16) -> Self {
259        self.config.port = port;
260        self
261    }
262
263    /// Set the directed broadcast address.
264    pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
265        self.config.broadcast_address = addr;
266        self
267    }
268
269    /// Set the object database (transfers ownership).
270    pub fn database(mut self, db: ObjectDatabase) -> Self {
271        self.db = db;
272        self
273    }
274
275    /// Set the password required for DeviceCommunicationControl requests.
276    pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
277        self.config.dcc_password = Some(password.into());
278        self
279    }
280
281    /// Set the password required for ReinitializeDevice requests.
282    pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
283        self.config.reinit_password = Some(password.into());
284        self
285    }
286
287    /// Enable periodic fault detection / reliability evaluation.
288    pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
289        self.config.enable_fault_detection = enabled;
290        self
291    }
292
293    /// Build and start the server, constructing a BipTransport from the config.
294    pub async fn build(self) -> Result<BACnetServer<BipTransport>, Error> {
295        let transport = BipTransport::new(
296            self.config.interface,
297            self.config.port,
298            self.config.broadcast_address,
299        );
300        BACnetServer::start(self.config, self.db, transport).await
301    }
302}
303
304/// Key for tracking in-progress segmented sends: (source_mac, invoke_id).
305type SegKey = (MacAddr, u8);
306
307/// BACnet server with APDU dispatch and service handling.
308pub struct BACnetServer<T: TransportPort> {
309    #[allow(dead_code)]
310    config: ServerConfig,
311    /// Shared network layer (also held by dispatch task).
312    #[allow(dead_code)]
313    network: Arc<NetworkLayer<T>>,
314    /// Shared object database.
315    db: Arc<RwLock<ObjectDatabase>>,
316    /// COV subscription table (held to keep Arc alive for dispatch task).
317    #[allow(dead_code)]
318    cov_table: Arc<RwLock<CovSubscriptionTable>>,
319    /// Channels for routing SegmentAck PDUs to in-progress segmented sends.
320    #[allow(dead_code)]
321    seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
322    /// Semaphore that caps confirmed COV notifications at 255 in-flight
323    /// to prevent invoke ID reuse (invoke IDs are u8 = 0..255).
324    #[allow(dead_code)]
325    cov_in_flight: Arc<Semaphore>,
326    /// Server-side TSM for outgoing confirmed COV notifications (Clause 10.6.3).
327    #[allow(dead_code)]
328    server_tsm: Arc<Mutex<ServerTsm>>,
329    /// Communication state per DeviceCommunicationControl (Clause 16.4.3).
330    /// 0 = Enable, 1 = Disable, 2 = DisableInitiation.
331    comm_state: Arc<AtomicU8>,
332    /// Handle for the DCC auto-re-enable timer (Clause 16.4.3).
333    /// When DCC DISABLE/DISABLE_INITIATION is received with a time_duration,
334    /// a timer task is spawned to revert comm_state to ENABLE after the duration.
335    /// Previous timers are aborted when a new DCC request arrives.
336    #[allow(dead_code)]
337    dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>>,
338    dispatch_task: Option<JoinHandle<()>>,
339    cov_purge_task: Option<JoinHandle<()>>,
340    fault_detection_task: Option<JoinHandle<()>>,
341    event_enrollment_task: Option<JoinHandle<()>>,
342    trend_log_task: Option<JoinHandle<()>>,
343    schedule_tick_task: Option<JoinHandle<()>>,
344    local_mac: MacAddr,
345}
346
347impl BACnetServer<BipTransport> {
348    /// Create a BIP-specific builder with interface/port/broadcast fields.
349    pub fn bip_builder() -> BipServerBuilder {
350        BipServerBuilder {
351            config: ServerConfig::default(),
352            db: ObjectDatabase::new(),
353        }
354    }
355
356    /// Create a BIP-specific builder (alias for backward compatibility).
357    pub fn builder() -> BipServerBuilder {
358        Self::bip_builder()
359    }
360}
361
362#[cfg(feature = "sc-tls")]
363impl BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
364    /// Create an SC-specific builder that connects to a BACnet/SC hub.
365    pub fn sc_builder() -> ScServerBuilder {
366        ScServerBuilder {
367            config: ServerConfig::default(),
368            db: ObjectDatabase::new(),
369            hub_url: String::new(),
370            tls_config: None,
371            vmac: [0; 6],
372            heartbeat_interval_ms: 30_000,
373            heartbeat_timeout_ms: 60_000,
374            reconnect: None,
375        }
376    }
377}
378
379/// SC-specific server builder.
380///
381/// Created by [`BACnetServer::sc_builder()`].  Requires the `sc-tls` feature.
382#[cfg(feature = "sc-tls")]
383pub struct ScServerBuilder {
384    config: ServerConfig,
385    db: ObjectDatabase,
386    hub_url: String,
387    tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
388    vmac: bacnet_transport::sc_frame::Vmac,
389    heartbeat_interval_ms: u64,
390    heartbeat_timeout_ms: u64,
391    reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
392}
393
394#[cfg(feature = "sc-tls")]
395impl ScServerBuilder {
396    /// Set the hub WebSocket URL (e.g. `wss://hub.example.com/bacnet`).
397    pub fn hub_url(mut self, url: &str) -> Self {
398        self.hub_url = url.to_string();
399        self
400    }
401
402    /// Set the TLS client configuration.
403    pub fn tls_config(
404        mut self,
405        config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
406    ) -> Self {
407        self.tls_config = Some(config);
408        self
409    }
410
411    /// Set the local VMAC address.
412    pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
413        self.vmac = vmac;
414        self
415    }
416
417    /// Set the object database (transfers ownership).
418    pub fn database(mut self, db: ObjectDatabase) -> Self {
419        self.db = db;
420        self
421    }
422
423    /// Set the heartbeat interval in milliseconds (default 30 000).
424    pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
425        self.heartbeat_interval_ms = ms;
426        self
427    }
428
429    /// Set the heartbeat timeout in milliseconds (default 60 000).
430    pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
431        self.heartbeat_timeout_ms = ms;
432        self
433    }
434
435    /// Enable automatic reconnection with the given configuration.
436    pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
437        self.reconnect = Some(config);
438        self
439    }
440
441    /// Set the password required for DeviceCommunicationControl requests.
442    pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
443        self.config.dcc_password = Some(password.into());
444        self
445    }
446
447    /// Set the password required for ReinitializeDevice requests.
448    pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
449        self.config.reinit_password = Some(password.into());
450        self
451    }
452
453    /// Enable periodic fault detection / reliability evaluation.
454    pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
455        self.config.enable_fault_detection = enabled;
456        self
457    }
458
459    /// Connect to the hub and start the server.
460    pub async fn build(
461        self,
462    ) -> Result<
463        BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
464        Error,
465    > {
466        let tls_config = self
467            .tls_config
468            .ok_or_else(|| Error::Encoding("SC server builder: tls_config is required".into()))?;
469
470        let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
471
472        let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
473            .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
474            .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
475        if let Some(rc) = self.reconnect {
476            #[allow(deprecated)]
477            {
478                transport = transport.with_reconnect(rc);
479            }
480        }
481
482        BACnetServer::start(self.config, self.db, transport).await
483    }
484}
485
486impl<T: TransportPort + 'static> BACnetServer<T> {
487    /// Create a generic builder that accepts a pre-built transport.
488    pub fn generic_builder() -> ServerBuilder<T> {
489        ServerBuilder {
490            config: ServerConfig::default(),
491            db: ObjectDatabase::new(),
492            transport: None,
493        }
494    }
495
496    /// Start the server with a pre-built transport.
497    pub async fn start(
498        mut config: ServerConfig,
499        db: ObjectDatabase,
500        transport: T,
501    ) -> Result<Self, Error> {
502        // Clamp max_apdu_length to the transport's physical limit.
503        let transport_max = transport.max_apdu_length() as u32;
504        config.max_apdu_length = config.max_apdu_length.min(transport_max);
505
506        if config.vendor_id == 0 {
507            warn!("vendor_id is 0 (ASHRAE reserved); set a valid vendor ID for production use");
508        }
509
510        let mut network = NetworkLayer::new(transport);
511        let apdu_rx = network.start().await?;
512        let local_mac = MacAddr::from_slice(network.local_mac());
513
514        let network = Arc::new(network);
515        let db = Arc::new(RwLock::new(db));
516        let cov_table = Arc::new(RwLock::new(CovSubscriptionTable::new()));
517        let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
518            Arc::new(Mutex::new(HashMap::new()));
519
520        let cov_in_flight = Arc::new(Semaphore::new(255));
521        let server_tsm = Arc::new(Mutex::new(ServerTsm::new()));
522        let comm_state = Arc::new(AtomicU8::new(0)); // 0 = Enable (default)
523        let dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
524
525        let network_dispatch = Arc::clone(&network);
526        let db_dispatch = Arc::clone(&db);
527        let cov_dispatch = Arc::clone(&cov_table);
528        let seg_ack_dispatch = Arc::clone(&seg_ack_senders);
529        let cov_in_flight_dispatch = Arc::clone(&cov_in_flight);
530        let server_tsm_dispatch = Arc::clone(&server_tsm);
531        let comm_state_dispatch = Arc::clone(&comm_state);
532        let dcc_timer_dispatch = Arc::clone(&dcc_timer);
533        let config_dispatch = config.clone();
534
535        let dispatch_task = tokio::spawn(async move {
536            let mut apdu_rx = apdu_rx;
537            // State for reassembling segmented ConfirmedRequests from clients.
538            // Key: (source_mac, invoke_id).
539            // Value: (receiver, first segment's request with all metadata).
540            let mut seg_receivers: HashMap<
541                SegKey,
542                (
543                    SegmentReceiver,
544                    bacnet_encoding::apdu::ConfirmedRequest,
545                    Instant,
546                ),
547            > = HashMap::new();
548
549            while let Some(received) = apdu_rx.recv().await {
550                // Reap timed-out segmented reassembly sessions (Clause 9.1.6).
551                let now = Instant::now();
552                seg_receivers.retain(|_key, (_rx, _req, last_activity)| {
553                    now.duration_since(*last_activity) < SEG_RECEIVER_TIMEOUT
554                });
555
556                match apdu::decode_apdu(received.apdu.clone()) {
557                    Ok(decoded) => {
558                        let source_mac = received.source_mac.clone();
559                        let mut received = Some(received);
560                        // Intercept segmented ConfirmedRequests for reassembly.
561                        let handled = if let Apdu::ConfirmedRequest(ref req) = decoded {
562                            if req.segmented {
563                                let seq = req.sequence_number.unwrap_or(0);
564                                let key: SegKey = (source_mac.clone(), req.invoke_id);
565
566                                if seq == 0 {
567                                    // Reject if too many concurrent segmented sessions
568                                    if seg_receivers.len() >= MAX_SEG_RECEIVERS {
569                                        warn!("Too many concurrent segmented sessions ({}), rejecting", seg_receivers.len());
570                                        let abort_pdu = Apdu::Abort(AbortPdu {
571                                            sent_by_server: true,
572                                            invoke_id: req.invoke_id,
573                                            abort_reason: AbortReason::BUFFER_OVERFLOW,
574                                        });
575                                        let mut abort_buf = BytesMut::new();
576                                        encode_apdu(&mut abort_buf, &abort_pdu);
577                                        let _ = network_dispatch
578                                            .send_apdu(
579                                                &abort_buf,
580                                                &source_mac,
581                                                false,
582                                                NetworkPriority::NORMAL,
583                                            )
584                                            .await;
585                                        continue;
586                                    }
587                                    // First segment: create a new receiver and
588                                    // store the request metadata.
589                                    let mut receiver = SegmentReceiver::new();
590                                    if let Err(e) =
591                                        receiver.receive(seq, req.service_request.clone())
592                                    {
593                                        warn!(error = %e, "Rejecting oversized segment");
594                                        continue;
595                                    }
596                                    seg_receivers.insert(
597                                        key.clone(),
598                                        (receiver, req.clone(), Instant::now()),
599                                    );
600                                } else if let Some((receiver, _, last_activity)) =
601                                    seg_receivers.get_mut(&key)
602                                {
603                                    if let Err(e) =
604                                        receiver.receive(seq, req.service_request.clone())
605                                    {
606                                        warn!(error = %e, "Rejecting oversized segment");
607                                        continue;
608                                    }
609                                    *last_activity = Instant::now();
610                                } else {
611                                    // Non-initial segment without prior segment 0:
612                                    // send Abort PDU per Clause 9.20.1.7.
613                                    warn!(
614                                        invoke_id = req.invoke_id,
615                                        seq = seq,
616                                        "Received non-initial segment without \
617                                         prior segment 0, aborting"
618                                    );
619                                    let abort_pdu = Apdu::Abort(AbortPdu {
620                                        sent_by_server: true,
621                                        invoke_id: req.invoke_id,
622                                        abort_reason: AbortReason::INVALID_APDU_IN_THIS_STATE,
623                                    });
624                                    let mut abort_buf = BytesMut::new();
625                                    encode_apdu(&mut abort_buf, &abort_pdu);
626                                    let _ = network_dispatch
627                                        .send_apdu(
628                                            &abort_buf,
629                                            &source_mac,
630                                            false,
631                                            NetworkPriority::NORMAL,
632                                        )
633                                        .await;
634                                    continue;
635                                }
636
637                                // Send SegmentAck back to the client.
638                                let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
639                                    negative_ack: false,
640                                    sent_by_server: true,
641                                    invoke_id: req.invoke_id,
642                                    sequence_number: seq,
643                                    actual_window_size: 1,
644                                });
645                                let mut ack_buf = BytesMut::new();
646                                encode_apdu(&mut ack_buf, &seg_ack);
647                                if let Err(e) = network_dispatch
648                                    .send_apdu(
649                                        &ack_buf,
650                                        &source_mac,
651                                        false,
652                                        NetworkPriority::NORMAL,
653                                    )
654                                    .await
655                                {
656                                    warn!(
657                                        error = %e,
658                                        "Failed to send SegmentAck for \
659                                         segmented request"
660                                    );
661                                }
662
663                                // Last segment: reassemble and dispatch.
664                                if !req.more_follows {
665                                    if let Some((receiver, first_req, _)) =
666                                        seg_receivers.remove(&key)
667                                    {
668                                        let total = receiver.received_count();
669                                        match receiver.reassemble(total) {
670                                            Ok(full_data) => {
671                                                let reassembled =
672                                                    bacnet_encoding::apdu::ConfirmedRequest {
673                                                        segmented: false,
674                                                        more_follows: false,
675                                                        sequence_number: None,
676                                                        proposed_window_size: None,
677                                                        service_request: Bytes::from(full_data),
678                                                        invoke_id: first_req.invoke_id,
679                                                        service_choice: first_req.service_choice,
680                                                        max_apdu_length: first_req.max_apdu_length,
681                                                        segmented_response_accepted: first_req
682                                                            .segmented_response_accepted,
683                                                        max_segments: first_req.max_segments,
684                                                    };
685                                                debug!(
686                                                    invoke_id = reassembled.invoke_id,
687                                                    segments = total,
688                                                    payload_len = reassembled.service_request.len(),
689                                                    "Reassembled segmented ConfirmedRequest"
690                                                );
691                                                Self::dispatch(
692                                                    &db_dispatch,
693                                                    &network_dispatch,
694                                                    &cov_dispatch,
695                                                    &seg_ack_dispatch,
696                                                    &cov_in_flight_dispatch,
697                                                    &server_tsm_dispatch,
698                                                    &comm_state_dispatch,
699                                                    &dcc_timer_dispatch,
700                                                    &config_dispatch,
701                                                    &source_mac,
702                                                    Apdu::ConfirmedRequest(reassembled),
703                                                    received
704                                                        .take()
705                                                        .expect("received consumed twice"),
706                                                )
707                                                .await;
708                                            }
709                                            Err(e) => {
710                                                warn!(
711                                                    error = %e,
712                                                    "Failed to reassemble \
713                                                     segmented request"
714                                                );
715                                            }
716                                        }
717                                    }
718                                }
719
720                                true // Already handled
721                            } else {
722                                false // Not segmented, dispatch normally
723                            }
724                        } else {
725                            false // Not a ConfirmedRequest, dispatch normally
726                        };
727
728                        if !handled {
729                            Self::dispatch(
730                                &db_dispatch,
731                                &network_dispatch,
732                                &cov_dispatch,
733                                &seg_ack_dispatch,
734                                &cov_in_flight_dispatch,
735                                &server_tsm_dispatch,
736                                &comm_state_dispatch,
737                                &dcc_timer_dispatch,
738                                &config_dispatch,
739                                &source_mac,
740                                decoded,
741                                received.take().expect("received consumed twice"),
742                            )
743                            .await;
744                        }
745                    }
746                    Err(e) => {
747                        warn!(error = %e, "Server failed to decode received APDU");
748                    }
749                }
750            }
751        });
752
753        let cov_table_for_purge = Arc::clone(&cov_table);
754        let cov_purge_task = tokio::spawn(async move {
755            let mut interval = tokio::time::interval(Duration::from_secs(30));
756            loop {
757                interval.tick().await;
758                let mut table = cov_table_for_purge.write().await;
759                let purged = table.purge_expired();
760                if purged > 0 {
761                    debug!(purged, "Purged expired COV subscriptions");
762                }
763            }
764        });
765
766        let fault_detection_task = if config.enable_fault_detection {
767            let db_fault = Arc::clone(&db);
768            Some(tokio::spawn(async move {
769                let detector = crate::fault_detection::FaultDetector::default();
770                let mut interval = tokio::time::interval(Duration::from_secs(10));
771                loop {
772                    interval.tick().await;
773                    let mut db_guard = db_fault.write().await;
774                    let changes = detector.evaluate(&mut db_guard);
775                    for change in &changes {
776                        debug!(
777                            object = %change.object_id,
778                            old = change.old_reliability,
779                            new = change.new_reliability,
780                            "Fault detection: reliability changed"
781                        );
782                    }
783                }
784            }))
785        } else {
786            None
787        };
788
789        // Event Enrollment evaluation task (Clause 13.4): evaluates EventEnrollment
790        // objects every 10 seconds alongside fault detection.
791        let event_enrollment_task = if config.enable_fault_detection {
792            let db_ee = Arc::clone(&db);
793            Some(tokio::spawn(async move {
794                let mut interval = tokio::time::interval(Duration::from_secs(10));
795                loop {
796                    interval.tick().await;
797                    let mut db_guard = db_ee.write().await;
798                    let transitions =
799                        crate::event_enrollment::evaluate_event_enrollments(&mut db_guard);
800                    for t in &transitions {
801                        debug!(
802                            enrollment = %t.enrollment_oid,
803                            monitored = %t.monitored_oid,
804                            from = ?t.change.from,
805                            to = ?t.change.to,
806                            "Event enrollment: state changed"
807                        );
808                    }
809                }
810            }))
811        } else {
812            None
813        };
814
815        // Trend log polling task: polls TrendLog objects whose log_interval > 0.
816        let db_trend = Arc::clone(&db);
817        let trend_log_task = Some(tokio::spawn(async move {
818            let mut interval = tokio::time::interval(Duration::from_secs(1));
819            loop {
820                interval.tick().await;
821                crate::trend_log::poll_trend_logs(&db_trend).await;
822            }
823        }));
824
825        // Schedule execution task (Clause 12.24): evaluates Schedule objects
826        // every 60 seconds and writes to controlled properties.
827        let db_schedule = Arc::clone(&db);
828        let schedule_tick_task = Some(tokio::spawn(async move {
829            let mut interval = tokio::time::interval(Duration::from_secs(60));
830            loop {
831                interval.tick().await;
832                crate::schedule::tick_schedules(&db_schedule).await;
833            }
834        }));
835
836        Ok(Self {
837            config,
838            network,
839            db,
840            cov_table,
841            seg_ack_senders,
842            cov_in_flight,
843            server_tsm,
844            comm_state,
845            dcc_timer,
846            dispatch_task: Some(dispatch_task),
847            cov_purge_task: Some(cov_purge_task),
848            fault_detection_task,
849            event_enrollment_task,
850            trend_log_task,
851            schedule_tick_task,
852            local_mac,
853        })
854    }
855
856    /// Get the server's local MAC address.
857    pub fn local_mac(&self) -> &[u8] {
858        &self.local_mac
859    }
860
861    /// Get a reference to the shared object database.
862    pub fn database(&self) -> &Arc<RwLock<ObjectDatabase>> {
863        &self.db
864    }
865
866    /// Get the communication state per DeviceCommunicationControl.
867    ///
868    /// Returns 0 (Enable), 1 (Disable), or 2 (DisableInitiation).
869    pub fn comm_state(&self) -> u8 {
870        self.comm_state.load(Ordering::Acquire)
871    }
872
873    /// Generate a PICS document from the current object database and server configuration.
874    ///
875    /// The caller must supply a [`PicsConfig`] for fields not available from the server
876    /// (vendor name, model, firmware revision, etc.).
877    pub async fn generate_pics(&self, pics_config: &crate::pics::PicsConfig) -> crate::pics::Pics {
878        let db = self.db.read().await;
879        crate::pics::PicsGenerator::new(&db, &self.config, pics_config).generate()
880    }
881
882    /// Stop the server.
883    pub async fn stop(&mut self) -> Result<(), Error> {
884        if let Some(task) = self.fault_detection_task.take() {
885            task.abort();
886            let _ = task.await;
887        }
888        if let Some(task) = self.event_enrollment_task.take() {
889            task.abort();
890            let _ = task.await;
891        }
892        if let Some(task) = self.trend_log_task.take() {
893            task.abort();
894            let _ = task.await;
895        }
896        if let Some(task) = self.schedule_tick_task.take() {
897            task.abort();
898            let _ = task.await;
899        }
900        if let Some(task) = self.cov_purge_task.take() {
901            task.abort();
902            let _ = task.await;
903        }
904        if let Some(task) = self.dispatch_task.take() {
905            task.abort();
906            let _ = task.await;
907        }
908        // Network cleanup happens when Arc is dropped (socket close on drop).
909        Ok(())
910    }
911
912    /// Dispatch a received APDU.
913    #[allow(clippy::too_many_arguments)]
914    async fn dispatch(
915        db: &Arc<RwLock<ObjectDatabase>>,
916        network: &Arc<NetworkLayer<T>>,
917        cov_table: &Arc<RwLock<CovSubscriptionTable>>,
918        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
919        cov_in_flight: &Arc<Semaphore>,
920        server_tsm: &Arc<Mutex<ServerTsm>>,
921        comm_state: &Arc<AtomicU8>,
922        dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
923        config: &ServerConfig,
924        source_mac: &[u8],
925        apdu: Apdu,
926        mut received: bacnet_network::layer::ReceivedApdu,
927    ) {
928        match apdu {
929            Apdu::ConfirmedRequest(req) => {
930                let reply_tx = received.reply_tx.take();
931                Self::handle_confirmed_request(
932                    db,
933                    network,
934                    cov_table,
935                    seg_ack_senders,
936                    cov_in_flight,
937                    server_tsm,
938                    comm_state,
939                    dcc_timer,
940                    config,
941                    source_mac,
942                    req,
943                    reply_tx,
944                )
945                .await;
946            }
947            Apdu::UnconfirmedRequest(req) => {
948                Self::handle_unconfirmed_request(db, network, config, comm_state, req, &received)
949                    .await;
950            }
951            Apdu::SimpleAck(sa) => {
952                // Correlate with pending confirmed COV notification.
953                let mut tsm = server_tsm.lock().await;
954                tsm.record_result(sa.invoke_id, CovAckResult::Ack);
955                debug!(
956                    invoke_id = sa.invoke_id,
957                    "SimpleAck received for outgoing confirmed notification"
958                );
959            }
960            Apdu::Error(err) => {
961                // Correlate with pending confirmed COV notification.
962                let mut tsm = server_tsm.lock().await;
963                tsm.record_result(err.invoke_id, CovAckResult::Error);
964                debug!(
965                    invoke_id = err.invoke_id,
966                    error_class = err.error_class.to_raw(),
967                    error_code = err.error_code.to_raw(),
968                    "Error received for outgoing confirmed notification"
969                );
970            }
971            Apdu::Reject(rej) => {
972                // Treat Reject like Error for COV TSM purposes.
973                let mut tsm = server_tsm.lock().await;
974                tsm.record_result(rej.invoke_id, CovAckResult::Error);
975                debug!(
976                    invoke_id = rej.invoke_id,
977                    "Reject received for outgoing confirmed notification"
978                );
979            }
980            Apdu::Abort(abort) if !abort.sent_by_server => {
981                // Client-sent Abort for our outgoing request.
982                let mut tsm = server_tsm.lock().await;
983                tsm.record_result(abort.invoke_id, CovAckResult::Error);
984                debug!(
985                    invoke_id = abort.invoke_id,
986                    "Abort received for outgoing confirmed notification"
987                );
988            }
989            Apdu::SegmentAck(sa) => {
990                let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
991                let senders = seg_ack_senders.lock().await;
992                if let Some(tx) = senders.get(&key) {
993                    let _ = tx.try_send(sa);
994                } else {
995                    debug!(
996                        invoke_id = sa.invoke_id,
997                        "Server ignoring SegmentAck for unknown transaction"
998                    );
999                }
1000            }
1001            _ => {
1002                debug!("Server ignoring unhandled APDU type");
1003            }
1004        }
1005    }
1006
1007    /// Handle a confirmed request.
1008    #[allow(clippy::too_many_arguments)]
1009    async fn handle_confirmed_request(
1010        db: &Arc<RwLock<ObjectDatabase>>,
1011        network: &Arc<NetworkLayer<T>>,
1012        cov_table: &Arc<RwLock<CovSubscriptionTable>>,
1013        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1014        cov_in_flight: &Arc<Semaphore>,
1015        server_tsm: &Arc<Mutex<ServerTsm>>,
1016        comm_state: &Arc<AtomicU8>,
1017        dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
1018        config: &ServerConfig,
1019        source_mac: &[u8],
1020        req: bacnet_encoding::apdu::ConfirmedRequest,
1021        reply_tx: Option<tokio::sync::oneshot::Sender<Bytes>>,
1022    ) {
1023        let invoke_id = req.invoke_id;
1024        let service_choice = req.service_choice;
1025        let client_max_apdu = req.max_apdu_length;
1026        let client_accepts_segmented = req.segmented_response_accepted;
1027        let mut written_oids: Vec<ObjectIdentifier> = Vec::new();
1028
1029        // DCC DISABLE enforcement (Clause 16.4.3):
1030        // When state == 1 (DISABLE), only DeviceCommunicationControl and
1031        // ReinitializeDevice are permitted; all other requests are silently dropped.
1032        let state = comm_state.load(Ordering::Acquire);
1033        if state == 1
1034            && service_choice != ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL
1035            && service_choice != ConfirmedServiceChoice::REINITIALIZE_DEVICE
1036        {
1037            debug!(
1038                service = service_choice.to_raw(),
1039                "DCC DISABLE: dropping confirmed request"
1040            );
1041            return;
1042        }
1043
1044        // Helper closures for common response patterns
1045        let complex_ack = |ack_buf: BytesMut| -> Apdu {
1046            Apdu::ComplexAck(ComplexAck {
1047                segmented: false,
1048                more_follows: false,
1049                invoke_id,
1050                sequence_number: None,
1051                proposed_window_size: None,
1052                service_choice,
1053                service_ack: ack_buf.freeze(),
1054            })
1055        };
1056        let simple_ack = || -> Apdu {
1057            Apdu::SimpleAck(SimpleAck {
1058                invoke_id,
1059                service_choice,
1060            })
1061        };
1062
1063        let mut ack_buf = BytesMut::with_capacity(512);
1064        let response = match service_choice {
1065            s if s == ConfirmedServiceChoice::READ_PROPERTY => {
1066                let db = db.read().await;
1067                match handlers::handle_read_property(&db, &req.service_request, &mut ack_buf) {
1068                    Ok(()) => complex_ack(ack_buf),
1069                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1070                }
1071            }
1072            s if s == ConfirmedServiceChoice::WRITE_PROPERTY => {
1073                let result = {
1074                    let mut db = db.write().await;
1075                    handlers::handle_write_property(&mut db, &req.service_request)
1076                };
1077                match result {
1078                    Ok(oid) => {
1079                        written_oids.push(oid);
1080                        simple_ack()
1081                    }
1082                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1083                }
1084            }
1085            s if s == ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE => {
1086                let db = db.read().await;
1087                match handlers::handle_read_property_multiple(
1088                    &db,
1089                    &req.service_request,
1090                    &mut ack_buf,
1091                ) {
1092                    Ok(()) => complex_ack(ack_buf),
1093                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1094                }
1095            }
1096            s if s == ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE => {
1097                let result = {
1098                    let mut db = db.write().await;
1099                    handlers::handle_write_property_multiple(&mut db, &req.service_request)
1100                };
1101                match result {
1102                    Ok(oids) => {
1103                        written_oids = oids;
1104                        simple_ack()
1105                    }
1106                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1107                }
1108            }
1109            s if s == ConfirmedServiceChoice::SUBSCRIBE_COV => {
1110                let db = db.read().await;
1111                let mut table = cov_table.write().await;
1112                match handlers::handle_subscribe_cov(
1113                    &mut table,
1114                    &db,
1115                    source_mac,
1116                    &req.service_request,
1117                ) {
1118                    Ok(()) => simple_ack(),
1119                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1120                }
1121            }
1122            s if s == ConfirmedServiceChoice::SUBSCRIBE_COV_PROPERTY => {
1123                let db = db.read().await;
1124                let mut table = cov_table.write().await;
1125                match handlers::handle_subscribe_cov_property(
1126                    &mut table,
1127                    &db,
1128                    source_mac,
1129                    &req.service_request,
1130                ) {
1131                    Ok(()) => simple_ack(),
1132                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1133                }
1134            }
1135            s if s == ConfirmedServiceChoice::CREATE_OBJECT => {
1136                let result = {
1137                    let mut db = db.write().await;
1138                    handlers::handle_create_object(&mut db, &req.service_request, &mut ack_buf)
1139                };
1140                match result {
1141                    Ok(()) => complex_ack(ack_buf),
1142                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1143                }
1144            }
1145            s if s == ConfirmedServiceChoice::DELETE_OBJECT => {
1146                let result = {
1147                    let mut db = db.write().await;
1148                    handlers::handle_delete_object(&mut db, &req.service_request)
1149                };
1150                match result {
1151                    Ok(()) => simple_ack(),
1152                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1153                }
1154            }
1155            s if s == ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL => {
1156                match handlers::handle_device_communication_control(
1157                    &req.service_request,
1158                    comm_state,
1159                    &config.dcc_password,
1160                ) {
1161                    Ok((_state, duration)) => {
1162                        // Abort any previous DCC timer.
1163                        if let Some(prev) = dcc_timer.lock().await.take() {
1164                            prev.abort();
1165                        }
1166                        // If duration is specified for DISABLE/DISABLE_INITIATION,
1167                        // spawn a timer to auto-revert to ENABLE (Clause 16.4.3).
1168                        if let Some(minutes) = duration {
1169                            let comm = Arc::clone(comm_state);
1170                            let handle = tokio::spawn(async move {
1171                                tokio::time::sleep(std::time::Duration::from_secs(
1172                                    minutes as u64 * 60,
1173                                ))
1174                                .await;
1175                                comm.store(0, Ordering::Release);
1176                                tracing::debug!(
1177                                    "DCC timer expired after {} min, state reverted to ENABLE",
1178                                    minutes
1179                                );
1180                            });
1181                            *dcc_timer.lock().await = Some(handle);
1182                        }
1183                        simple_ack()
1184                    }
1185                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1186                }
1187            }
1188            s if s == ConfirmedServiceChoice::REINITIALIZE_DEVICE => {
1189                match handlers::handle_reinitialize_device(
1190                    &req.service_request,
1191                    &config.reinit_password,
1192                ) {
1193                    Ok(()) => simple_ack(),
1194                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1195                }
1196            }
1197            s if s == ConfirmedServiceChoice::GET_EVENT_INFORMATION => {
1198                let db = db.read().await;
1199                match handlers::handle_get_event_information(
1200                    &db,
1201                    &req.service_request,
1202                    &mut ack_buf,
1203                ) {
1204                    Ok(()) => complex_ack(ack_buf),
1205                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1206                }
1207            }
1208            s if s == ConfirmedServiceChoice::ACKNOWLEDGE_ALARM => {
1209                let mut db = db.write().await;
1210                match handlers::handle_acknowledge_alarm(&mut db, &req.service_request) {
1211                    Ok(()) => simple_ack(),
1212                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1213                }
1214            }
1215            s if s == ConfirmedServiceChoice::READ_RANGE => {
1216                let db = db.read().await;
1217                match handlers::handle_read_range(&db, &req.service_request, &mut ack_buf) {
1218                    Ok(()) => complex_ack(ack_buf),
1219                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1220                }
1221            }
1222            s if s == ConfirmedServiceChoice::ATOMIC_READ_FILE => {
1223                let db = db.read().await;
1224                match handlers::handle_atomic_read_file(&db, &req.service_request, &mut ack_buf) {
1225                    Ok(()) => complex_ack(ack_buf),
1226                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1227                }
1228            }
1229            s if s == ConfirmedServiceChoice::ATOMIC_WRITE_FILE => {
1230                let result = {
1231                    let mut db = db.write().await;
1232                    handlers::handle_atomic_write_file(&mut db, &req.service_request, &mut ack_buf)
1233                };
1234                match result {
1235                    Ok(()) => complex_ack(ack_buf),
1236                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1237                }
1238            }
1239            s if s == ConfirmedServiceChoice::ADD_LIST_ELEMENT => {
1240                let mut db = db.write().await;
1241                match handlers::handle_add_list_element(&mut db, &req.service_request) {
1242                    Ok(()) => simple_ack(),
1243                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1244                }
1245            }
1246            s if s == ConfirmedServiceChoice::REMOVE_LIST_ELEMENT => {
1247                let mut db = db.write().await;
1248                match handlers::handle_remove_list_element(&mut db, &req.service_request) {
1249                    Ok(()) => simple_ack(),
1250                    Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1251                }
1252            }
1253            _ => {
1254                debug!(
1255                    service = service_choice.to_raw(),
1256                    "Unsupported confirmed service"
1257                );
1258                Apdu::Reject(RejectPdu {
1259                    invoke_id,
1260                    reject_reason: RejectReason::UNRECOGNIZED_SERVICE,
1261                })
1262            }
1263        };
1264
1265        // Check if segmentation is needed for ComplexAck responses.
1266        if let Apdu::ComplexAck(ref ack) = response {
1267            // Encode the full unsegmented response to check its size.
1268            let mut full_buf = BytesMut::new();
1269            encode_apdu(&mut full_buf, &response);
1270
1271            if full_buf.len() > client_max_apdu as usize {
1272                // Response exceeds the client's max APDU length — segmentation required.
1273                if !client_accepts_segmented {
1274                    // Client does not accept segmented responses — send Abort.
1275                    let abort = Apdu::Abort(AbortPdu {
1276                        sent_by_server: true,
1277                        invoke_id,
1278                        abort_reason: AbortReason::SEGMENTATION_NOT_SUPPORTED,
1279                    });
1280                    let mut buf = BytesMut::new();
1281                    encode_apdu(&mut buf, &abort);
1282                    if let Err(e) = network
1283                        .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1284                        .await
1285                    {
1286                        warn!(error = %e, "Failed to send Abort for segmentation-not-supported");
1287                    }
1288                } else {
1289                    // Spawn the segmented send as a background task so the
1290                    // dispatch loop can continue processing incoming SegmentAck
1291                    // PDUs from the client.
1292                    let network = Arc::clone(network);
1293                    let seg_ack_senders = Arc::clone(seg_ack_senders);
1294                    let source_mac = MacAddr::from_slice(source_mac);
1295                    let service_ack_data = ack.service_ack.clone();
1296                    tokio::spawn(async move {
1297                        Self::send_segmented_complex_ack(
1298                            &network,
1299                            &seg_ack_senders,
1300                            &source_mac,
1301                            invoke_id,
1302                            service_choice,
1303                            &service_ack_data,
1304                            client_max_apdu,
1305                        )
1306                        .await;
1307                    });
1308                }
1309
1310                // Fire post-write notifications even for segmented responses.
1311                for oid in &written_oids {
1312                    Self::fire_event_notifications(db, network, comm_state, oid).await;
1313                }
1314                for oid in &written_oids {
1315                    Self::fire_cov_notifications(
1316                        db,
1317                        network,
1318                        cov_table,
1319                        cov_in_flight,
1320                        server_tsm,
1321                        comm_state,
1322                        config,
1323                        oid,
1324                    )
1325                    .await;
1326                }
1327                return;
1328            }
1329        }
1330
1331        // Non-segmented path: send the response.
1332        // If a reply_tx channel is available (MS/TP DataExpectingReply), encode as
1333        // NPDU and send through the channel for a fast in-window reply. Otherwise
1334        // fall back to the normal network send_apdu path.
1335        let mut buf = BytesMut::new();
1336        encode_apdu(&mut buf, &response);
1337
1338        if let Some(tx) = reply_tx {
1339            use bacnet_encoding::npdu::{encode_npdu, Npdu};
1340            let apdu_bytes = buf.freeze();
1341            let npdu = Npdu {
1342                is_network_message: false,
1343                expecting_reply: false,
1344                priority: NetworkPriority::NORMAL,
1345                destination: None,
1346                source: None,
1347                payload: apdu_bytes.clone(),
1348                ..Npdu::default()
1349            };
1350            let mut npdu_buf = BytesMut::with_capacity(2 + apdu_bytes.len());
1351            match encode_npdu(&mut npdu_buf, &npdu) {
1352                Ok(()) => {
1353                    let _ = tx.send(npdu_buf.freeze());
1354                }
1355                Err(e) => {
1356                    warn!(error = %e, "Failed to encode NPDU for MS/TP reply");
1357                    // Fallback: try normal path (reply_tx consumed, so this will go
1358                    // through the transport as a separate frame)
1359                    if let Err(e) = network
1360                        .send_apdu(&apdu_bytes, source_mac, false, NetworkPriority::NORMAL)
1361                        .await
1362                    {
1363                        warn!(error = %e, "Failed to send response");
1364                    }
1365                }
1366            }
1367        } else if let Err(e) = network
1368            .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1369            .await
1370        {
1371            warn!(error = %e, "Failed to send response");
1372        }
1373
1374        // Evaluate intrinsic reporting and fire event notifications
1375        for oid in &written_oids {
1376            Self::fire_event_notifications(db, network, comm_state, oid).await;
1377        }
1378
1379        // Fire COV notifications for any written objects
1380        for oid in &written_oids {
1381            Self::fire_cov_notifications(
1382                db,
1383                network,
1384                cov_table,
1385                cov_in_flight,
1386                server_tsm,
1387                comm_state,
1388                config,
1389                oid,
1390            )
1391            .await;
1392        }
1393    }
1394
1395    /// Send a ComplexAck response using segmented transfer.
1396    ///
1397    /// Splits the service ack data into segments that fit within the client's
1398    /// max APDU length, sends each segment, and waits for SegmentAck from
1399    /// the client before sending the next (window size 1).
1400    async fn send_segmented_complex_ack(
1401        network: &Arc<NetworkLayer<T>>,
1402        seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1403        source_mac: &[u8],
1404        invoke_id: u8,
1405        service_choice: ConfirmedServiceChoice,
1406        service_ack_data: &[u8],
1407        client_max_apdu: u16,
1408    ) {
1409        let max_seg_size = max_segment_payload(client_max_apdu, SegmentedPduType::ComplexAck);
1410        let segments = split_payload(service_ack_data, max_seg_size);
1411        let total_segments = segments.len();
1412
1413        if total_segments > 255 {
1414            warn!(
1415                total_segments,
1416                "Response requires too many segments, aborting"
1417            );
1418            let abort = Apdu::Abort(AbortPdu {
1419                sent_by_server: true,
1420                invoke_id,
1421                abort_reason: AbortReason::BUFFER_OVERFLOW,
1422            });
1423            let mut buf = BytesMut::new();
1424            encode_apdu(&mut buf, &abort);
1425            let _ = network
1426                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1427                .await;
1428            return;
1429        }
1430
1431        debug!(
1432            total_segments,
1433            max_seg_size,
1434            payload_len = service_ack_data.len(),
1435            "Starting segmented ComplexAck send"
1436        );
1437
1438        // Register a channel for receiving SegmentAck PDUs during the send.
1439        let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
1440        let key = (MacAddr::from_slice(source_mac), invoke_id);
1441        {
1442            seg_ack_senders.lock().await.insert(key.clone(), seg_ack_tx);
1443        }
1444
1445        let seg_timeout = Duration::from_secs(5);
1446        let mut seg_idx: usize = 0;
1447        let mut neg_ack_retries: u8 = 0;
1448
1449        while seg_idx < total_segments {
1450            let is_last = seg_idx == total_segments - 1;
1451
1452            let pdu = Apdu::ComplexAck(ComplexAck {
1453                segmented: true,
1454                more_follows: !is_last,
1455                invoke_id,
1456                sequence_number: Some(seg_idx as u8),
1457                proposed_window_size: Some(1),
1458                service_choice,
1459                service_ack: segments[seg_idx].clone(),
1460            });
1461
1462            let mut buf = BytesMut::with_capacity(client_max_apdu as usize);
1463            encode_apdu(&mut buf, &pdu);
1464
1465            if let Err(e) = network
1466                .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1467                .await
1468            {
1469                warn!(error = %e, seq = seg_idx, "Failed to send segment");
1470                break;
1471            }
1472
1473            debug!(seq = seg_idx, is_last, "Sent ComplexAck segment");
1474
1475            // Wait for SegmentAck from the client before sending the next segment.
1476            if !is_last {
1477                match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1478                    Ok(Some(ack)) => {
1479                        debug!(
1480                            seq = ack.sequence_number,
1481                            negative = ack.negative_ack,
1482                            "Received SegmentAck for ComplexAck"
1483                        );
1484                        if ack.negative_ack {
1485                            neg_ack_retries += 1;
1486                            if neg_ack_retries > MAX_NEG_SEGMENT_ACK_RETRIES {
1487                                warn!(
1488                                    invoke_id,
1489                                    retries = neg_ack_retries,
1490                                    "Too many negative SegmentAck retries, aborting segmented send"
1491                                );
1492                                let abort = Apdu::Abort(AbortPdu {
1493                                    sent_by_server: true,
1494                                    invoke_id,
1495                                    abort_reason: AbortReason::TSM_TIMEOUT,
1496                                });
1497                                let mut abort_buf = BytesMut::new();
1498                                encode_apdu(&mut abort_buf, &abort);
1499                                let _ = network
1500                                    .send_apdu(
1501                                        &abort_buf,
1502                                        source_mac,
1503                                        false,
1504                                        NetworkPriority::NORMAL,
1505                                    )
1506                                    .await;
1507                                break;
1508                            }
1509                            // Retransmit from the requested sequence number
1510                            let requested = ack.sequence_number as usize;
1511                            if requested >= total_segments {
1512                                tracing::warn!(
1513                                    seq = requested,
1514                                    total = total_segments,
1515                                    "negative SegmentAck requests out-of-range sequence, aborting"
1516                                );
1517                                break;
1518                            }
1519                            debug!(
1520                                seq = ack.sequence_number,
1521                                "Negative SegmentAck — retransmitting from requested sequence"
1522                            );
1523                            seg_idx = requested;
1524                            continue;
1525                        }
1526                        // Positive ack — proceed to next segment
1527                    }
1528                    Ok(None) => {
1529                        warn!("SegmentAck channel closed during segmented send");
1530                        break;
1531                    }
1532                    Err(_) => {
1533                        warn!(
1534                            seq = seg_idx,
1535                            "Timeout waiting for SegmentAck, aborting segmented send"
1536                        );
1537                        let abort = Apdu::Abort(AbortPdu {
1538                            sent_by_server: true,
1539                            invoke_id,
1540                            abort_reason: AbortReason::TSM_TIMEOUT,
1541                        });
1542                        let mut abort_buf = BytesMut::new();
1543                        encode_apdu(&mut abort_buf, &abort);
1544                        let _ = network
1545                            .send_apdu(&abort_buf, source_mac, false, NetworkPriority::NORMAL)
1546                            .await;
1547                        break;
1548                    }
1549                }
1550            }
1551
1552            seg_idx += 1;
1553        }
1554
1555        // Wait for final SegmentAck (best-effort, per Clause 9.22)
1556        match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1557            Ok(Some(_ack)) => {
1558                debug!("Received final SegmentAck for ComplexAck");
1559            }
1560            _ => {
1561                warn!("No final SegmentAck received for ComplexAck");
1562            }
1563        }
1564
1565        // Clean up the seg_ack channel.
1566        seg_ack_senders.lock().await.remove(&key);
1567    }
1568
1569    /// Handle an unconfirmed request (e.g., WhoIs).
1570    async fn handle_unconfirmed_request(
1571        db: &Arc<RwLock<ObjectDatabase>>,
1572        network: &Arc<NetworkLayer<T>>,
1573        config: &ServerConfig,
1574        comm_state: &Arc<AtomicU8>,
1575        req: UnconfirmedRequestPdu,
1576        received: &bacnet_network::layer::ReceivedApdu,
1577    ) {
1578        let comm = comm_state.load(Ordering::Acquire);
1579        if comm == 1 {
1580            tracing::debug!("Dropping unconfirmed service: DCC is DISABLE");
1581            return;
1582        }
1583
1584        if req.service_choice == UnconfirmedServiceChoice::WHO_IS {
1585            let who_is = match WhoIsRequest::decode(&req.service_request) {
1586                Ok(r) => r,
1587                Err(e) => {
1588                    warn!(error = %e, "Failed to decode WhoIs");
1589                    return;
1590                }
1591            };
1592
1593            let db = db.read().await;
1594            let device_oid = db
1595                .list_objects()
1596                .into_iter()
1597                .find(|oid| oid.object_type() == ObjectType::DEVICE);
1598
1599            if let Some(device_oid) = device_oid {
1600                let instance = device_oid.instance_number();
1601
1602                let in_range = match (who_is.low_limit, who_is.high_limit) {
1603                    (Some(low), Some(high)) => instance >= low && instance <= high,
1604                    _ => true,
1605                };
1606
1607                if in_range {
1608                    let i_am = IAmRequest {
1609                        object_identifier: device_oid,
1610                        max_apdu_length: config.max_apdu_length,
1611                        segmentation_supported: config.segmentation_supported,
1612                        vendor_id: config.vendor_id,
1613                    };
1614
1615                    let mut service_buf = BytesMut::new();
1616                    i_am.encode(&mut service_buf);
1617
1618                    let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1619                        service_choice: UnconfirmedServiceChoice::I_AM,
1620                        service_request: Bytes::from(service_buf.to_vec()),
1621                    });
1622
1623                    let mut buf = BytesMut::new();
1624                    encode_apdu(&mut buf, &pdu);
1625
1626                    // Per Clause 16.10: if the WhoIs came from a remote network
1627                    // (NPDU has SNET/SADR), route the IAm back through the local
1628                    // router. Otherwise broadcast locally.
1629                    if let Some(ref source_net) = received.source_network {
1630                        if let Err(e) = network
1631                            .send_apdu_routed(
1632                                &buf,
1633                                source_net.network,
1634                                &source_net.mac_address,
1635                                &received.source_mac,
1636                                false,
1637                                NetworkPriority::NORMAL,
1638                            )
1639                            .await
1640                        {
1641                            warn!(error = %e, "Failed to route IAm back to remote requester");
1642                        }
1643                    } else if let Err(e) = network
1644                        .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1645                        .await
1646                    {
1647                        warn!(error = %e, "Failed to send IAm broadcast");
1648                    }
1649                }
1650            }
1651        } else if req.service_choice == UnconfirmedServiceChoice::WHO_HAS {
1652            let db = db.read().await;
1653            let device_oid = db
1654                .list_objects()
1655                .into_iter()
1656                .find(|oid| oid.object_type() == ObjectType::DEVICE);
1657
1658            if let Some(device_oid) = device_oid {
1659                match handlers::handle_who_has(&db, &req.service_request, device_oid) {
1660                    Ok(Some(i_have)) => {
1661                        let mut service_buf = BytesMut::new();
1662                        if let Err(e) = i_have.encode(&mut service_buf) {
1663                            warn!(error = %e, "Failed to encode IHave");
1664                        } else {
1665                            let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1666                                service_choice: UnconfirmedServiceChoice::I_HAVE,
1667                                service_request: Bytes::from(service_buf.to_vec()),
1668                            });
1669
1670                            let mut buf = BytesMut::new();
1671                            encode_apdu(&mut buf, &pdu);
1672
1673                            if let Err(e) = network
1674                                .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1675                                .await
1676                            {
1677                                warn!(error = %e, "Failed to send IHave broadcast");
1678                            }
1679                        }
1680                    }
1681                    Ok(None) => { /* Object not found or not in range — no response */ }
1682                    Err(e) => {
1683                        warn!(error = %e, "Failed to decode WhoHas");
1684                    }
1685                }
1686            }
1687        } else if req.service_choice == UnconfirmedServiceChoice::TIME_SYNCHRONIZATION
1688            || req.service_choice == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION
1689        {
1690            debug!("Received time synchronization request");
1691            if let Some(ref callback) = config.on_time_sync {
1692                let data = TimeSyncData {
1693                    raw_service_data: req.service_request.clone(),
1694                    is_utc: req.service_choice
1695                        == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1696                };
1697                callback(data);
1698            }
1699        } else {
1700            debug!(
1701                service = req.service_choice.to_raw(),
1702                "Ignoring unsupported unconfirmed service"
1703            );
1704        }
1705    }
1706
1707    /// Evaluate intrinsic reporting on an object and send event notifications
1708    /// to NotificationClass recipients (or broadcast if none configured).
1709    ///
1710    /// Called after a successful write to present_value (or any property
1711    /// that might affect event evaluation).
1712    ///
1713    /// Skipped when `comm_state >= 1` (DISABLE or DISABLE_INITIATION) per
1714    /// BACnet Clause 16.4.3 — the server must not initiate notifications.
1715    async fn fire_event_notifications(
1716        db: &Arc<RwLock<ObjectDatabase>>,
1717        network: &Arc<NetworkLayer<T>>,
1718        comm_state: &Arc<AtomicU8>,
1719        oid: &ObjectIdentifier,
1720    ) {
1721        if comm_state.load(Ordering::Acquire) >= 1 {
1722            return;
1723        }
1724
1725        // Compute current day-of-week bit and time (UTC) for recipient filtering.
1726        let now = std::time::SystemTime::now()
1727            .duration_since(std::time::UNIX_EPOCH)
1728            .unwrap_or_default();
1729        let total_secs = now.as_secs();
1730        // Jan 1, 1970 = Thursday; 0=Sunday, 1=Monday, ..., 6=Saturday
1731        let dow = ((total_secs / 86400 + 4) % 7) as u8;
1732        let today_bit = 1u8 << dow;
1733        let day_secs = (total_secs % 86400) as u32;
1734        let current_time = Time {
1735            hour: (day_secs / 3600) as u8,
1736            minute: ((day_secs % 3600) / 60) as u8,
1737            second: (day_secs % 60) as u8,
1738            hundredths: (now.subsec_millis() / 10) as u8,
1739        };
1740
1741        let (notification, recipients) = {
1742            let mut db = db.write().await;
1743
1744            let device_oid = db
1745                .list_objects()
1746                .into_iter()
1747                .find(|o| o.object_type() == ObjectType::DEVICE)
1748                .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
1749
1750            let object = match db.get_mut(oid) {
1751                Some(o) => o,
1752                None => return,
1753            };
1754
1755            let change = match object.evaluate_intrinsic_reporting() {
1756                Some(c) => c,
1757                None => return,
1758            };
1759
1760            // Read notification parameters from the object
1761            let notification_class = object
1762                .read_property(PropertyIdentifier::NOTIFICATION_CLASS, None)
1763                .ok()
1764                .and_then(|v| match v {
1765                    PropertyValue::Unsigned(n) => Some(n as u32),
1766                    _ => None,
1767                })
1768                .unwrap_or(0);
1769
1770            let notify_type = object
1771                .read_property(PropertyIdentifier::NOTIFY_TYPE, None)
1772                .ok()
1773                .and_then(|v| match v {
1774                    PropertyValue::Enumerated(n) => Some(n),
1775                    _ => None,
1776                })
1777                .unwrap_or(NotifyType::ALARM.to_raw());
1778
1779            // Priority: HIGH_LIMIT/LOW_LIMIT -> high priority (100), NORMAL -> low (200)
1780            let priority = if change.to == bacnet_types::enums::EventState::NORMAL {
1781                200u8
1782            } else {
1783                100u8
1784            };
1785
1786            let transition = change.transition();
1787
1788            let base_notification = EventNotificationRequest {
1789                process_identifier: 0,
1790                initiating_device_identifier: device_oid,
1791                event_object_identifier: *oid,
1792                timestamp: BACnetTimeStamp::SequenceNumber(total_secs),
1793                notification_class,
1794                priority,
1795                event_type: change.event_type().to_raw(),
1796                notify_type,
1797                ack_required: notify_type != NotifyType::ACK_NOTIFICATION.to_raw(),
1798                from_state: change.from.to_raw(),
1799                to_state: change.to.to_raw(),
1800                event_values: None,
1801            };
1802
1803            // Look up recipients from the NotificationClass object
1804            let recipients = get_notification_recipients(
1805                &db,
1806                notification_class,
1807                transition,
1808                today_bit,
1809                &current_time,
1810            );
1811
1812            (base_notification, recipients)
1813        };
1814
1815        if recipients.is_empty() {
1816            // No matching recipients — fall back to broadcast (backward compatible)
1817            let mut service_buf = BytesMut::new();
1818            if let Err(e) = notification.encode(&mut service_buf) {
1819                warn!(error = %e, "Failed to encode EventNotification");
1820                return;
1821            }
1822
1823            let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1824                service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1825                service_request: Bytes::from(service_buf.to_vec()),
1826            });
1827
1828            let mut buf = BytesMut::new();
1829            encode_apdu(&mut buf, &pdu);
1830
1831            if let Err(e) = network
1832                .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1833                .await
1834            {
1835                warn!(error = %e, "Failed to broadcast EventNotification");
1836            }
1837        } else {
1838            // Send to each matching recipient
1839            for (recipient, process_id, confirmed) in &recipients {
1840                let mut targeted = notification.clone();
1841                targeted.process_identifier = *process_id;
1842
1843                let mut service_buf = BytesMut::new();
1844                if let Err(e) = targeted.encode(&mut service_buf) {
1845                    warn!(error = %e, "Failed to encode EventNotification");
1846                    continue;
1847                }
1848
1849                let service_bytes = Bytes::from(service_buf.to_vec());
1850
1851                if *confirmed {
1852                    let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1853                        segmented: false,
1854                        more_follows: false,
1855                        segmented_response_accepted: false,
1856                        max_segments: None,
1857                        max_apdu_length: 1476,
1858                        invoke_id: 0, // simplified: no TSM for event notifications yet
1859                        sequence_number: None,
1860                        proposed_window_size: None,
1861                        service_choice: ConfirmedServiceChoice::CONFIRMED_EVENT_NOTIFICATION,
1862                        service_request: service_bytes,
1863                    });
1864
1865                    let mut buf = BytesMut::new();
1866                    encode_apdu(&mut buf, &pdu);
1867
1868                    match recipient {
1869                        bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1870                            if let Err(e) = network
1871                                .send_apdu(&buf, &addr.mac_address, true, NetworkPriority::NORMAL)
1872                                .await
1873                            {
1874                                warn!(error = %e, "Failed to send confirmed EventNotification");
1875                            }
1876                        }
1877                        bacnet_types::constructed::BACnetRecipient::Device(_) => {
1878                            // Cannot resolve Device OID to MAC; broadcast as fallback
1879                            if let Err(e) = network
1880                                .broadcast_apdu(&buf, true, NetworkPriority::NORMAL)
1881                                .await
1882                            {
1883                                warn!(error = %e, "Failed to broadcast confirmed EventNotification");
1884                            }
1885                        }
1886                    }
1887                } else {
1888                    let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1889                        service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1890                        service_request: service_bytes,
1891                    });
1892
1893                    let mut buf = BytesMut::new();
1894                    encode_apdu(&mut buf, &pdu);
1895
1896                    match recipient {
1897                        bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1898                            if let Err(e) = network
1899                                .send_apdu(&buf, &addr.mac_address, false, NetworkPriority::NORMAL)
1900                                .await
1901                            {
1902                                warn!(
1903                                    error = %e,
1904                                    "Failed to send unconfirmed EventNotification"
1905                                );
1906                            }
1907                        }
1908                        bacnet_types::constructed::BACnetRecipient::Device(_) => {
1909                            // Cannot resolve Device OID to MAC; broadcast as fallback
1910                            if let Err(e) = network
1911                                .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1912                                .await
1913                            {
1914                                warn!(
1915                                    error = %e,
1916                                    "Failed to broadcast unconfirmed EventNotification"
1917                                );
1918                            }
1919                        }
1920                    }
1921                }
1922            }
1923        }
1924    }
1925
1926    /// Fire COV notifications for all active subscriptions on the given object.
1927    ///
1928    /// Called after a successful write. Reads the object's Present_Value and
1929    /// Status_Flags, checks COV_Increment filtering, and sends notifications
1930    /// to subscribers whose change threshold is met.
1931    ///
1932    /// Skipped when `comm_state >= 1` (DISABLE or DISABLE_INITIATION) per
1933    /// BACnet Clause 16.4.3 — the server must not initiate notifications.
1934    #[allow(clippy::too_many_arguments)]
1935    async fn fire_cov_notifications(
1936        db: &Arc<RwLock<ObjectDatabase>>,
1937        network: &Arc<NetworkLayer<T>>,
1938        cov_table: &Arc<RwLock<CovSubscriptionTable>>,
1939        cov_in_flight: &Arc<Semaphore>,
1940        server_tsm: &Arc<Mutex<ServerTsm>>,
1941        comm_state: &Arc<AtomicU8>,
1942        config: &ServerConfig,
1943        oid: &ObjectIdentifier,
1944    ) {
1945        if comm_state.load(Ordering::Acquire) >= 1 {
1946            return;
1947        }
1948        // 1. Get active subscriptions for this object
1949        let subs: Vec<crate::cov::CovSubscription> = {
1950            let mut table = cov_table.write().await;
1951            table.subscriptions_for(oid).into_iter().cloned().collect()
1952        };
1953
1954        if subs.is_empty() {
1955            return;
1956        }
1957
1958        // 2. Read COV-relevant properties and COV_Increment from the object
1959        let (device_oid, values, current_pv, cov_increment) = {
1960            let db = db.read().await;
1961            let object = match db.get(oid) {
1962                Some(o) => o,
1963                None => return,
1964            };
1965
1966            let cov_increment = object.cov_increment();
1967
1968            let mut current_pv: Option<f32> = None;
1969            let mut values = Vec::new();
1970            if let Ok(pv) = object.read_property(PropertyIdentifier::PRESENT_VALUE, None) {
1971                if let PropertyValue::Real(v) = &pv {
1972                    current_pv = Some(*v);
1973                }
1974                let mut buf = BytesMut::new();
1975                if encode_property_value(&mut buf, &pv).is_ok() {
1976                    values.push(BACnetPropertyValue {
1977                        property_identifier: PropertyIdentifier::PRESENT_VALUE,
1978                        property_array_index: None,
1979                        value: buf.to_vec(),
1980                        priority: None,
1981                    });
1982                }
1983            }
1984            if let Ok(sf) = object.read_property(PropertyIdentifier::STATUS_FLAGS, None) {
1985                let mut buf = BytesMut::new();
1986                if encode_property_value(&mut buf, &sf).is_ok() {
1987                    values.push(BACnetPropertyValue {
1988                        property_identifier: PropertyIdentifier::STATUS_FLAGS,
1989                        property_array_index: None,
1990                        value: buf.to_vec(),
1991                        priority: None,
1992                    });
1993                }
1994            }
1995
1996            let device_oid = db
1997                .list_objects()
1998                .into_iter()
1999                .find(|o| o.object_type() == ObjectType::DEVICE)
2000                .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
2001
2002            (device_oid, values, current_pv, cov_increment)
2003        };
2004
2005        if values.is_empty() {
2006            return;
2007        }
2008
2009        // 3. Send a notification to each subscriber (if change exceeds COV_Increment)
2010        for sub in &subs {
2011            if !CovSubscriptionTable::should_notify(sub, current_pv, cov_increment) {
2012                continue;
2013            }
2014            let time_remaining = sub.expires_at.map_or(0, |exp| {
2015                exp.saturating_duration_since(Instant::now()).as_secs() as u32
2016            });
2017
2018            let notification = COVNotificationRequest {
2019                subscriber_process_identifier: sub.subscriber_process_identifier,
2020                initiating_device_identifier: device_oid,
2021                monitored_object_identifier: *oid,
2022                time_remaining,
2023                list_of_values: values.clone(),
2024            };
2025
2026            let mut service_buf = BytesMut::new();
2027            notification.encode(&mut service_buf);
2028
2029            if sub.issue_confirmed_notifications {
2030                // Acquire a permit from the semaphore to cap in-flight confirmed
2031                // COV notifications at 255, preventing invoke ID reuse.
2032                let permit = match cov_in_flight.clone().try_acquire_owned() {
2033                    Ok(permit) => permit,
2034                    Err(_) => {
2035                        warn!(
2036                            object = ?oid,
2037                            "255 confirmed COV notifications in-flight, skipping notification"
2038                        );
2039                        continue;
2040                    }
2041                };
2042
2043                // Allocate invoke ID from the server TSM.
2044                let id = {
2045                    let mut tsm = server_tsm.lock().await;
2046                    tsm.allocate()
2047                };
2048
2049                let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
2050                    segmented: false,
2051                    more_follows: false,
2052                    segmented_response_accepted: false,
2053                    max_segments: None,
2054                    max_apdu_length: config.max_apdu_length as u16,
2055                    invoke_id: id,
2056                    sequence_number: None,
2057                    proposed_window_size: None,
2058                    service_choice: ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION,
2059                    service_request: Bytes::from(service_buf.to_vec()),
2060                });
2061
2062                let mut buf = BytesMut::new();
2063                encode_apdu(&mut buf, &pdu);
2064
2065                // Update last-notified value optimistically for confirmed sends
2066                if let Some(pv) = current_pv {
2067                    let mut table = cov_table.write().await;
2068                    table.set_last_notified_value(
2069                        &sub.subscriber_mac,
2070                        sub.subscriber_process_identifier,
2071                        sub.monitored_object_identifier,
2072                        pv,
2073                    );
2074                }
2075
2076                let network = Arc::clone(network);
2077                let mac = sub.subscriber_mac.clone();
2078                let apdu_timeout = Duration::from_millis(config.cov_retry_timeout_ms);
2079                let tsm = Arc::clone(server_tsm);
2080                let apdu_retries = DEFAULT_APDU_RETRIES;
2081                // The permit is moved into the spawned task; it is automatically
2082                // released when the task completes (ACK, error, or max retries).
2083                tokio::spawn(async move {
2084                    let _permit = permit; // hold until task completes
2085
2086                    for attempt in 0..=apdu_retries {
2087                        // Send (or resend) the ConfirmedCOVNotification.
2088                        if let Err(e) = network
2089                            .send_apdu(&buf, &mac, true, NetworkPriority::NORMAL)
2090                            .await
2091                        {
2092                            warn!(error = %e, attempt, "COV notification send failed");
2093                            // Network-level failure: still retry after timeout
2094                        } else {
2095                            debug!(invoke_id = id, attempt, "Confirmed COV notification sent");
2096                        }
2097
2098                        // Wait up to apdu_timeout for the dispatch loop to
2099                        // record a result, checking at short intervals.
2100                        let poll_interval = Duration::from_millis(50);
2101                        let result = tokio::time::timeout(apdu_timeout, async {
2102                            loop {
2103                                tokio::time::sleep(poll_interval).await;
2104                                let mut tsm = tsm.lock().await;
2105                                if let Some(r) = tsm.take_result(id) {
2106                                    return r;
2107                                }
2108                            }
2109                        })
2110                        .await;
2111
2112                        match result {
2113                            Ok(CovAckResult::Ack) => {
2114                                debug!(invoke_id = id, "COV notification acknowledged");
2115                                return;
2116                            }
2117                            Ok(CovAckResult::Error) => {
2118                                warn!(invoke_id = id, "COV notification rejected by subscriber");
2119                                return;
2120                            }
2121                            Err(_) => {
2122                                // Timeout — no response yet.
2123                                if attempt < apdu_retries {
2124                                    debug!(
2125                                        invoke_id = id,
2126                                        attempt, "COV notification timeout, retrying"
2127                                    );
2128                                } else {
2129                                    warn!(
2130                                        invoke_id = id,
2131                                        "COV notification failed after {} retries", apdu_retries
2132                                    );
2133                                }
2134                            }
2135                        }
2136                    }
2137
2138                    // Final cleanup: ensure no stale entry in the TSM map.
2139                    let mut tsm = tsm.lock().await;
2140                    tsm.remove(id);
2141                });
2142            } else {
2143                let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
2144                    service_choice: UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION,
2145                    service_request: Bytes::from(service_buf.to_vec()),
2146                });
2147
2148                let mut buf = BytesMut::new();
2149                encode_apdu(&mut buf, &pdu);
2150
2151                if let Err(e) = network
2152                    .send_apdu(&buf, &sub.subscriber_mac, false, NetworkPriority::NORMAL)
2153                    .await
2154                {
2155                    warn!(error = %e, "Failed to send COV notification");
2156                } else if let Some(pv) = current_pv {
2157                    // Update last-notified value on successful send
2158                    let mut table = cov_table.write().await;
2159                    table.set_last_notified_value(
2160                        &sub.subscriber_mac,
2161                        sub.subscriber_process_identifier,
2162                        sub.monitored_object_identifier,
2163                        pv,
2164                    );
2165                }
2166            }
2167        }
2168    }
2169
2170    /// Convert an Error into an Error APDU.
2171    fn error_apdu_from_error(
2172        invoke_id: u8,
2173        service_choice: ConfirmedServiceChoice,
2174        error: &Error,
2175    ) -> Apdu {
2176        let (class, code) = match error {
2177            Error::Protocol { class, code } => (*class, *code),
2178            _ => (
2179                ErrorClass::SERVICES.to_raw() as u32,
2180                ErrorCode::OTHER.to_raw() as u32,
2181            ),
2182        };
2183        Apdu::Error(ErrorPdu {
2184            invoke_id,
2185            service_choice,
2186            error_class: ErrorClass::from_raw(class as u16),
2187            error_code: ErrorCode::from_raw(code as u16),
2188            error_data: Bytes::new(),
2189        })
2190    }
2191}
2192
2193#[cfg(test)]
2194mod tests {
2195    use super::*;
2196
2197    #[test]
2198    fn server_config_cov_retry_timeout_default() {
2199        let config = ServerConfig::default();
2200        assert_eq!(config.cov_retry_timeout_ms, 3000);
2201    }
2202
2203    #[test]
2204    fn server_config_time_sync_callback_default_is_none() {
2205        let config = ServerConfig::default();
2206        assert!(config.on_time_sync.is_none());
2207    }
2208
2209    // -----------------------------------------------------------------------
2210    // ServerTsm unit tests
2211    // -----------------------------------------------------------------------
2212
2213    #[test]
2214    fn server_tsm_allocate_increments() {
2215        let mut tsm = ServerTsm::new();
2216        assert_eq!(tsm.allocate(), 0);
2217        assert_eq!(tsm.allocate(), 1);
2218        assert_eq!(tsm.allocate(), 2);
2219    }
2220
2221    #[test]
2222    fn server_tsm_allocate_wraps_at_255() {
2223        let mut tsm = ServerTsm::new();
2224        tsm.next_invoke_id = 255;
2225        assert_eq!(tsm.allocate(), 255);
2226        assert_eq!(tsm.allocate(), 0); // wraps
2227    }
2228
2229    #[test]
2230    fn server_tsm_record_and_take_ack() {
2231        let mut tsm = ServerTsm::new();
2232        tsm.record_result(42, CovAckResult::Ack);
2233        assert_eq!(tsm.take_result(42), Some(CovAckResult::Ack));
2234        // Second take returns None (already consumed)
2235        assert_eq!(tsm.take_result(42), None);
2236    }
2237
2238    #[test]
2239    fn server_tsm_record_and_take_error() {
2240        let mut tsm = ServerTsm::new();
2241        tsm.record_result(7, CovAckResult::Error);
2242        assert_eq!(tsm.take_result(7), Some(CovAckResult::Error));
2243    }
2244
2245    #[test]
2246    fn server_tsm_take_nonexistent_returns_none() {
2247        let mut tsm = ServerTsm::new();
2248        assert_eq!(tsm.take_result(99), None);
2249    }
2250
2251    #[test]
2252    fn server_tsm_remove_cleans_up() {
2253        let mut tsm = ServerTsm::new();
2254        tsm.record_result(10, CovAckResult::Ack);
2255        tsm.remove(10);
2256        assert_eq!(tsm.take_result(10), None);
2257    }
2258
2259    #[test]
2260    fn server_tsm_multiple_pending() {
2261        let mut tsm = ServerTsm::new();
2262        tsm.record_result(1, CovAckResult::Ack);
2263        tsm.record_result(2, CovAckResult::Error);
2264        tsm.record_result(3, CovAckResult::Ack);
2265
2266        assert_eq!(tsm.take_result(2), Some(CovAckResult::Error));
2267        assert_eq!(tsm.take_result(1), Some(CovAckResult::Ack));
2268        assert_eq!(tsm.take_result(3), Some(CovAckResult::Ack));
2269    }
2270
2271    #[test]
2272    fn server_tsm_overwrite_result() {
2273        let mut tsm = ServerTsm::new();
2274        tsm.record_result(5, CovAckResult::Ack);
2275        // Overwrite with Error (e.g., duplicate response)
2276        tsm.record_result(5, CovAckResult::Error);
2277        assert_eq!(tsm.take_result(5), Some(CovAckResult::Error));
2278    }
2279
2280    #[test]
2281    fn cov_ack_result_debug_and_eq() {
2282        // Ensure derived traits work.
2283        assert_eq!(CovAckResult::Ack, CovAckResult::Ack);
2284        assert_ne!(CovAckResult::Ack, CovAckResult::Error);
2285        let _debug = format!("{:?}", CovAckResult::Ack);
2286    }
2287
2288    #[test]
2289    fn default_apdu_retries_constant() {
2290        assert_eq!(DEFAULT_APDU_RETRIES, 3);
2291    }
2292
2293    #[test]
2294    fn seg_receiver_timeout_is_4s() {
2295        assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2296    }
2297
2298    #[test]
2299    fn max_neg_segment_ack_retries_constant() {
2300        assert_eq!(MAX_NEG_SEGMENT_ACK_RETRIES, 3);
2301    }
2302}