Skip to main content

mabi_bacnet/server/
bacnet_server.rs

1//! BACnet/IP server implementation.
2//!
3//! Provides a complete BACnet/IP server with service handling, COV support,
4//! and device discovery.
5
6use std::collections::hash_map::DefaultHasher;
7use std::hash::{Hash, Hasher};
8use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use parking_lot::Mutex;
14use tokio::sync::{broadcast, mpsc};
15use tracing::{debug, error, info, warn};
16
17use mabi_core::RELEASE_VERSION;
18
19use crate::apdu::encoding::ApduEncoder;
20use crate::apdu::segmentation::{
21    AssemblyResult, Segment, SegmentAssembler, SegmentTransmitter, DEFAULT_WINDOW_SIZE,
22};
23use crate::apdu::types::{
24    AbortReason, ApduType, ConfirmedService, ErrorClass, ErrorCode, RejectReason,
25    UnconfirmedService,
26};
27use crate::error::{BacnetError, BacnetResult};
28use crate::network::bbmd::{Bbmd, BbmdConfig};
29use crate::network::bvlc::{BvlcFunction, BvlcMessage};
30use crate::network::npdu::Npdu;
31use crate::network::udp::{BACnetNetwork, IncomingPacket, NetworkConfig, NetworkHandle};
32use crate::object::device::{DeviceObject, DeviceObjectConfig};
33use crate::object::property::SegmentationSupport;
34use crate::object::registry::ObjectRegistry;
35use crate::object::traits::CovSupport;
36use crate::service::cov::{CovManager, CovNotification};
37use crate::service::discovery::WhoIsHandler;
38use crate::service::handler::{ServiceContext, ServiceRegistry, ServiceResult};
39use crate::service::property::{ReadPropertyHandler, WritePropertyHandler};
40use crate::service::property_multiple::{
41    ReadPropertyMultipleHandler, WritePropertyMultipleHandler,
42};
43use crate::service::subscribe_cov::SubscribeCovHandler;
44use crate::service::tsm::{ServerTsm, TransactionKey, TsmConfig};
45
46use super::metrics::{LatencyTimer, ServerMetrics};
47
48/// Server configuration.
49#[derive(Debug, Clone)]
50pub struct ServerConfig {
51    /// Network bind address.
52    pub bind_addr: SocketAddr,
53    /// Broadcast address.
54    pub broadcast_addr: SocketAddr,
55    /// Device instance number.
56    pub device_instance: u32,
57    /// Device name.
58    pub device_name: String,
59    /// Vendor ID.
60    pub vendor_id: u16,
61    /// Model name.
62    pub model_name: String,
63    /// Maximum APDU length.
64    pub max_apdu_length: u16,
65    /// Maximum COV subscriptions.
66    pub max_cov_subscriptions: usize,
67    /// COV check interval.
68    pub cov_check_interval: Duration,
69    /// Shutdown timeout.
70    pub shutdown_timeout: Duration,
71}
72
73impl Default for ServerConfig {
74    fn default() -> Self {
75        Self {
76            bind_addr: "0.0.0.0:47808".parse().unwrap(),
77            broadcast_addr: "255.255.255.255:47808".parse().unwrap(),
78            device_instance: 1234,
79            device_name: "BACnet Simulator".to_string(),
80            vendor_id: 0,
81            model_name: "OTSIM".to_string(),
82            max_apdu_length: 1476,
83            max_cov_subscriptions: 1000,
84            cov_check_interval: Duration::from_secs(1),
85            shutdown_timeout: Duration::from_secs(30),
86        }
87    }
88}
89
90impl ServerConfig {
91    /// Create a new config with the specified device instance.
92    pub fn new(device_instance: u32) -> Self {
93        Self {
94            device_instance,
95            ..Default::default()
96        }
97    }
98
99    /// Set the bind address.
100    pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
101        self.bind_addr = addr;
102        self
103    }
104
105    /// Set the device name.
106    pub fn with_device_name(mut self, name: impl Into<String>) -> Self {
107        self.device_name = name.into();
108        self
109    }
110
111    /// Set the vendor ID.
112    pub fn with_vendor_id(mut self, vendor_id: u16) -> Self {
113        self.vendor_id = vendor_id;
114        self
115    }
116}
117
118/// Server events.
119#[derive(Debug, Clone)]
120pub enum ServerEvent {
121    /// Server started.
122    Started { address: SocketAddr },
123    /// Server stopped.
124    Stopped,
125    /// Device discovered (I-Am received).
126    DeviceDiscovered {
127        device_instance: u32,
128        address: SocketAddr,
129    },
130    /// Error occurred.
131    Error { message: String },
132}
133
134/// Processed response from a confirmed or unconfirmed request.
135///
136/// May be a single unsegmented APDU, a series of segmented APDUs,
137/// or no response at all.
138enum ProcessedResponse {
139    /// Single unsegmented APDU response.
140    Single(Vec<u8>, SocketAddr),
141    /// Segmented response: multiple APDU segments to send sequentially.
142    Segmented {
143        /// The raw service data segments (without headers — headers are built at send time).
144        segments: Vec<Vec<u8>>,
145        dest: SocketAddr,
146        invoke_id: u8,
147        service_choice: u8,
148    },
149    /// No response needed.
150    None,
151}
152
153/// BACnet/IP server.
154pub struct BACnetServer {
155    config: ServerConfig,
156    objects: Arc<ObjectRegistry>,
157    services: Arc<ServiceRegistry>,
158    metrics: Arc<ServerMetrics>,
159    cov_manager: Arc<CovManager>,
160    tsm: Arc<ServerTsm>,
161    /// Segment assembler for incoming segmented requests (receive side).
162    segment_assembler: Mutex<SegmentAssembler>,
163    /// Segment transmitter for outgoing segmented responses (send side).
164    segment_transmitter: SegmentTransmitter,
165    /// BBMD for cross-subnet broadcast management.
166    bbmd: Arc<Bbmd>,
167    cov_rx: tokio::sync::Mutex<mpsc::Receiver<CovNotification>>,
168    shutdown: Arc<AtomicBool>,
169    shutdown_tx: broadcast::Sender<()>,
170    event_tx: broadcast::Sender<ServerEvent>,
171}
172
173impl BACnetServer {
174    /// Create a new BACnet/IP server.
175    pub fn new(config: ServerConfig, objects: ObjectRegistry) -> Self {
176        let (shutdown_tx, _) = broadcast::channel(1);
177        let (event_tx, _) = broadcast::channel(64);
178
179        let objects = Arc::new(objects);
180
181        // Create and register the Device object (mandatory per ASHRAE 135)
182        let device_config = DeviceObjectConfig {
183            device_instance: config.device_instance,
184            device_name: config.device_name.clone(),
185            vendor_name: "OTSIM".into(),
186            vendor_id: config.vendor_id,
187            model_name: config.model_name.clone(),
188            firmware_revision: RELEASE_VERSION.into(),
189            application_software_version: RELEASE_VERSION.into(),
190            description: String::new(),
191            location: String::new(),
192            max_apdu_length: config.max_apdu_length,
193            segmentation_supported: SegmentationSupport::Both,
194            apdu_timeout: 3000,
195            number_of_apdu_retries: 3,
196        };
197        let device_object = Arc::new(DeviceObject::new(device_config, objects.clone()));
198        objects.register(device_object.clone());
199
200        // Create COV manager early so the SubscribeCOV handler can use it
201        let (cov_manager, cov_rx) =
202            CovManager::new(config.device_instance, config.max_cov_subscriptions);
203        let cov_manager = Arc::new(cov_manager);
204
205        // Create service registry with default handlers
206        let mut services = ServiceRegistry::new();
207        // Property services
208        services.register_confirmed(Arc::new(ReadPropertyHandler));
209        services.register_confirmed(Arc::new(WritePropertyHandler));
210        // Property multiple services (batch operations)
211        services.register_confirmed(Arc::new(ReadPropertyMultipleHandler::new()));
212        services.register_confirmed(Arc::new(WritePropertyMultipleHandler::new()));
213        // COV services
214        services.register_confirmed(Arc::new(SubscribeCovHandler::new(cov_manager.clone())));
215        services.register_confirmed(Arc::new(
216            crate::service::subscribe_cov::SubscribeCovPropertyHandler::new(cov_manager.clone()),
217        ));
218        // Log services (ReadRange for TrendLog)
219        services.register_confirmed(Arc::new(crate::service::read_range::ReadRangeHandler::new()));
220        // File access services (AtomicReadFile/AtomicWriteFile)
221        services.register_confirmed(Arc::new(
222            crate::service::file_access::AtomicReadFileHandler::new(),
223        ));
224        services.register_confirmed(Arc::new(
225            crate::service::file_access::AtomicWriteFileHandler::new(),
226        ));
227        // Alarm and event services
228        services.register_confirmed(Arc::new(
229            crate::service::alarm::AcknowledgeAlarmHandler::new(),
230        ));
231        services.register_confirmed(Arc::new(
232            crate::service::alarm::GetAlarmSummaryHandler::new(),
233        ));
234        services.register_confirmed(Arc::new(
235            crate::service::alarm::GetEnrollmentSummaryHandler::new(),
236        ));
237        services.register_confirmed(Arc::new(
238            crate::service::alarm::GetEventInformationHandler::new(),
239        ));
240        services.register_confirmed(Arc::new(
241            crate::service::alarm::ConfirmedEventNotificationHandler::new(),
242        ));
243        // Object management services
244        services.register_confirmed(Arc::new(
245            crate::service::create_delete::CreateObjectHandler::new(),
246        ));
247        services.register_confirmed(Arc::new(
248            crate::service::create_delete::DeleteObjectHandler::new(),
249        ));
250        // Device control services
251        services.register_confirmed(Arc::new(
252            crate::service::device_control::DeviceCommunicationControlHandler::new(),
253        ));
254        services.register_confirmed(Arc::new(
255            crate::service::device_control::ReinitializeDeviceHandler::new(),
256        ));
257        // Discovery services
258        services.register_unconfirmed(Arc::new(WhoIsHandler::new(
259            config.device_instance,
260            config.max_apdu_length,
261            SegmentationSupport::Both,
262            config.vendor_id,
263        )));
264        // Time synchronization services
265        services.register_unconfirmed(Arc::new(
266            crate::service::device_control::TimeSynchronizationHandler::new(),
267        ));
268        services.register_unconfirmed(Arc::new(
269            crate::service::device_control::UtcTimeSynchronizationHandler::new(),
270        ));
271
272        // Update Device object with registered services and object types
273        let confirmed_choices = services.supported_confirmed_services();
274        let unconfirmed_choices = services.supported_unconfirmed_services();
275        device_object.update_services_supported(&confirmed_choices, &unconfirmed_choices);
276        device_object.update_object_types_supported();
277
278        // Segmentation: the max segment size for outgoing responses is the
279        // max_apdu_length minus the segmented ComplexACK header overhead (5 bytes:
280        // pdu_type, invoke_id, sequence_number, window_size, service_choice).
281        let segment_header_overhead = 5;
282        let max_segment_data =
283            (config.max_apdu_length as usize).saturating_sub(segment_header_overhead);
284        let segment_transmitter =
285            SegmentTransmitter::new(max_segment_data).with_window_size(DEFAULT_WINDOW_SIZE);
286        let segment_assembler = SegmentAssembler::default();
287
288        Self {
289            config,
290            objects,
291            services: Arc::new(services),
292            metrics: Arc::new(ServerMetrics::new()),
293            cov_manager,
294            tsm: Arc::new(ServerTsm::new()),
295            segment_assembler: Mutex::new(segment_assembler),
296            segment_transmitter,
297            bbmd: Arc::new(Bbmd::default()),
298            cov_rx: tokio::sync::Mutex::new(cov_rx),
299            shutdown: Arc::new(AtomicBool::new(false)),
300            shutdown_tx,
301            event_tx,
302        }
303    }
304
305    /// Create with custom service registry.
306    pub fn with_services(mut self, services: ServiceRegistry) -> Self {
307        self.services = Arc::new(services);
308        self
309    }
310
311    /// Create with custom TSM configuration.
312    pub fn with_tsm_config(mut self, config: TsmConfig) -> Self {
313        self.tsm = Arc::new(ServerTsm::with_config(config));
314        self
315    }
316
317    /// Create with custom BBMD configuration.
318    pub fn with_bbmd_config(mut self, config: BbmdConfig) -> Self {
319        self.bbmd = Arc::new(Bbmd::new(config));
320        self
321    }
322
323    /// Get a reference to the TSM.
324    pub fn tsm(&self) -> &Arc<ServerTsm> {
325        &self.tsm
326    }
327
328    /// Get a reference to the BBMD.
329    pub fn bbmd(&self) -> &Arc<Bbmd> {
330        &self.bbmd
331    }
332
333    /// Get the object registry.
334    pub fn objects(&self) -> &Arc<ObjectRegistry> {
335        &self.objects
336    }
337
338    /// Get server metrics.
339    pub fn metrics(&self) -> &Arc<ServerMetrics> {
340        &self.metrics
341    }
342
343    /// Subscribe to server events.
344    pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
345        self.event_tx.subscribe()
346    }
347
348    /// Request server shutdown.
349    pub fn shutdown(&self) {
350        if !self.shutdown.swap(true, Ordering::SeqCst) {
351            info!("Shutdown requested");
352            let _ = self.shutdown_tx.send(());
353        }
354    }
355
356    /// Check if shutdown has been requested.
357    pub fn is_shutdown(&self) -> bool {
358        self.shutdown.load(Ordering::SeqCst)
359    }
360
361    /// Run the server.
362    pub async fn run(&self) -> BacnetResult<()> {
363        // Bind network
364        let network_config = NetworkConfig::default()
365            .with_bind_addr(self.config.bind_addr)
366            .with_broadcast_addr(self.config.broadcast_addr);
367
368        let (network, mut recv_rx) = BACnetNetwork::bind(network_config).await?;
369        let network_handle = network.handle();
370
371        let local_addr = network.local_addr()?;
372        info!(address = %local_addr, "BACnet/IP server started");
373
374        let _ = self.event_tx.send(ServerEvent::Started {
375            address: local_addr,
376        });
377
378        // Take the COV receiver out of the server (can only run once)
379        let cov_manager = self.cov_manager.clone();
380        let mut cov_rx = {
381            let mut guard = self.cov_rx.lock().await;
382            // Replace with a dummy channel — the original receiver is consumed
383            let (_dummy_tx, dummy_rx) = mpsc::channel(1);
384            std::mem::replace(&mut *guard, dummy_rx)
385        };
386
387        // Spawn network receive loop
388        let shutdown_clone = self.shutdown.clone();
389        let network_shutdown = Arc::new(AtomicBool::new(false));
390        let network_shutdown_clone = network_shutdown.clone();
391        let network_task = tokio::spawn(async move {
392            while !shutdown_clone.load(Ordering::SeqCst) {
393                if let Err(e) = network.run_receive_loop().await {
394                    error!(error = %e, "Network receive loop error");
395                    break;
396                }
397            }
398            network_shutdown_clone.store(true, Ordering::SeqCst);
399        });
400
401        // Spawn COV notification sender
402        let cov_network = network_handle.clone();
403        let metrics_clone = self.metrics.clone();
404        let shutdown_cov = self.shutdown.clone();
405        let cov_task = tokio::spawn(async move {
406            while !shutdown_cov.load(Ordering::SeqCst) {
407                tokio::select! {
408                    Some(notification) = cov_rx.recv() => {
409                        if let Err(e) = send_cov_notification(&cov_network, notification).await {
410                            warn!(error = %e, "Failed to send COV notification");
411                        } else {
412                            metrics_clone.record_cov_notification_sent();
413                        }
414                    }
415                    _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
416                        // Check shutdown periodically
417                    }
418                }
419            }
420        });
421
422        // Periodic cleanup for BBMD FDT expiration and stale segment assemblies.
423        // Runs inline in the main select! loop on a 30-second interval.
424        let bbmd_clone = self.bbmd.clone();
425        let bbmd_cleanup_enabled = self.bbmd.is_enabled();
426        let cleanup_interval = Duration::from_secs(30);
427
428        // COV change detection polling interval
429        let cov_objects_registry = self.objects.clone();
430        let cov_manager_poll = cov_manager.clone();
431        let cov_check_interval = self.config.cov_check_interval;
432        let mut cov_ticker = tokio::time::interval(cov_check_interval);
433        cov_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
434
435        let mut shutdown_rx = self.shutdown_tx.subscribe();
436        let mut cleanup_ticker = tokio::time::interval(cleanup_interval);
437        cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
438
439        // Main processing loop
440        loop {
441            tokio::select! {
442                // Process incoming packets
443                Some(packet) = recv_rx.recv() => {
444                    self.metrics.record_request();
445                    self.metrics.record_bytes_received(packet.data.len() as u64);
446
447                    if let Err(e) = self.process_packet(&packet, &network_handle, &cov_manager).await {
448                        debug!(error = %e, "Error processing packet");
449                        self.metrics.record_error();
450                    }
451                }
452
453                // Periodic cleanup: FDT expiration + stale segment assemblies
454                _ = cleanup_ticker.tick() => {
455                    if bbmd_cleanup_enabled {
456                        let expired = bbmd_clone.cleanup();
457                        if expired > 0 {
458                            debug!(expired, "BBMD FDT cleanup completed");
459                        }
460                    }
461
462                    // Clean up stale segment assembly entries
463                    let stale = {
464                        let mut assembler = self.segment_assembler.lock();
465                        assembler.cleanup()
466                    };
467                    if stale > 0 {
468                        debug!(stale, "Segment assembler cleanup completed");
469                    }
470                }
471
472                // COV change detection polling
473                _ = cov_ticker.tick() => {
474                    // Check all objects for COV changes and send notifications
475                    for obj in cov_objects_registry.iter() {
476                        if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::AnalogInput>() {
477                            if cov_obj.check_cov() {
478                                let values = cov_obj.cov_values();
479                                cov_obj.reset_cov();
480                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
481                            }
482                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::AnalogOutput>() {
483                            if cov_obj.check_cov() {
484                                let values = cov_obj.cov_values();
485                                cov_obj.reset_cov();
486                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
487                            }
488                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::AnalogValue>() {
489                            if cov_obj.check_cov() {
490                                let values = cov_obj.cov_values();
491                                cov_obj.reset_cov();
492                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
493                            }
494                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::BinaryInput>() {
495                            if cov_obj.check_cov() {
496                                let values = cov_obj.cov_values();
497                                cov_obj.reset_cov();
498                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
499                            }
500                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::BinaryOutput>() {
501                            if cov_obj.check_cov() {
502                                let values = cov_obj.cov_values();
503                                cov_obj.reset_cov();
504                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
505                            }
506                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::BinaryValue>() {
507                            if cov_obj.check_cov() {
508                                let values = cov_obj.cov_values();
509                                cov_obj.reset_cov();
510                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
511                            }
512                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::MultiStateInput>() {
513                            if cov_obj.check_cov() {
514                                let values = cov_obj.cov_values();
515                                cov_obj.reset_cov();
516                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
517                            }
518                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::MultiStateOutput>() {
519                            if cov_obj.check_cov() {
520                                let values = cov_obj.cov_values();
521                                cov_obj.reset_cov();
522                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
523                            }
524                        } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::MultiStateValue>() {
525                            if cov_obj.check_cov() {
526                                let values = cov_obj.cov_values();
527                                cov_obj.reset_cov();
528                                let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
529                            }
530                        }
531                    }
532
533                    // Also cleanup expired subscriptions
534                    cov_manager_poll.cleanup_expired();
535                }
536
537                // Handle shutdown
538                _ = shutdown_rx.recv() => {
539                    info!("Shutdown signal received");
540                    break;
541                }
542            }
543        }
544
545        // Graceful shutdown - set network shutdown flag
546        network_shutdown.store(true, Ordering::SeqCst);
547
548        // Wait for tasks to finish
549        let _ = tokio::time::timeout(self.config.shutdown_timeout, async {
550            let _ = network_task.await;
551            let _ = cov_task.await;
552        })
553        .await;
554
555        let _ = self.event_tx.send(ServerEvent::Stopped);
556        info!("BACnet/IP server stopped");
557
558        Ok(())
559    }
560
561    /// Process an incoming packet.
562    async fn process_packet(
563        &self,
564        packet: &IncomingPacket,
565        network: &NetworkHandle,
566        _cov_manager: &Arc<CovManager>,
567    ) -> BacnetResult<()> {
568        let timer = LatencyTimer::start();
569
570        // Parse BVLC message
571        let bvlc = match &packet.bvlc {
572            Some(msg) => msg,
573            None => {
574                debug!(source = %packet.source, "Invalid BVLC message");
575                return Err(BacnetError::Protocol("Invalid BVLC message".into()));
576            }
577        };
578
579        // ── BBMD BVLC-level handling ──
580        // Intercept BBMD-specific BVLC functions before NPDU/APDU processing.
581        // These are layer-2 management functions that operate at the BVLC level.
582        if self.bbmd.is_enabled() {
583            match bvlc.header.function {
584                BvlcFunction::RegisterForeignDevice
585                | BvlcFunction::ReadForeignDeviceTable
586                | BvlcFunction::DeleteForeignDeviceTableEntry
587                | BvlcFunction::ReadBroadcastDistributionTable
588                | BvlcFunction::WriteBroadcastDistributionTable => {
589                    // Convert source to SocketAddrV4 for BBMD handler
590                    let source_v4 = match packet.source {
591                        SocketAddr::V4(v4) => v4,
592                        SocketAddr::V6(_) => {
593                            debug!("BBMD does not support IPv6 sources");
594                            return Ok(());
595                        }
596                    };
597
598                    if let Some(response) = self.bbmd.handle_message(bvlc, source_v4) {
599                        // Track foreign device registration metrics
600                        if bvlc.header.function == BvlcFunction::RegisterForeignDevice {
601                            self.metrics.record_bbmd_foreign_registration();
602                        }
603
604                        let response_bytes = response.encode();
605                        self.metrics.record_bytes_sent(response_bytes.len() as u64);
606                        network.send_to(&response_bytes, packet.source).await?;
607                    }
608
609                    let latency = timer.elapsed_us();
610                    self.metrics.record_success(latency);
611                    return Ok(());
612                }
613                BvlcFunction::DistributeBroadcastToNetwork => {
614                    // Client requests this BBMD to distribute a broadcast.
615                    // Forward to all BDT peers and registered foreign devices,
616                    // then process the NPDU locally (fall through to normal processing).
617                    let source_v4 = match packet.source {
618                        SocketAddr::V4(v4) => v4,
619                        _ => SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0),
620                    };
621
622                    let forward_addrs = self.bbmd.get_forward_addresses(Some(&source_v4));
623                    if !forward_addrs.is_empty() {
624                        // Re-wrap as Forwarded-NPDU for each recipient
625                        let forwarded = BvlcMessage::forwarded_npdu(bvlc.npdu.clone(), source_v4);
626                        let forwarded_bytes = forwarded.encode();
627
628                        for addr in &forward_addrs {
629                            let dest = SocketAddr::V4(*addr);
630                            if let Err(e) = network.send_to(&forwarded_bytes, dest).await {
631                                warn!(dest = %dest, error = %e, "Failed to forward broadcast");
632                            } else {
633                                self.metrics.record_bbmd_forwarded();
634                                self.metrics.record_bytes_sent(forwarded_bytes.len() as u64);
635                            }
636                        }
637
638                        debug!(
639                            source = %packet.source,
640                            forwarded_to = forward_addrs.len(),
641                            "Distributed broadcast to network"
642                        );
643                    }
644                    // Fall through to process the NPDU locally
645                }
646                BvlcFunction::OriginalBroadcastNpdu => {
647                    // Local broadcast: forward to BDT peers and foreign devices
648                    let source_v4 = match packet.source {
649                        SocketAddr::V4(v4) => v4,
650                        _ => SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0),
651                    };
652
653                    let forward_addrs = self.bbmd.get_forward_addresses(Some(&source_v4));
654                    if !forward_addrs.is_empty() {
655                        let forwarded = BvlcMessage::forwarded_npdu(bvlc.npdu.clone(), source_v4);
656                        let forwarded_bytes = forwarded.encode();
657
658                        for addr in &forward_addrs {
659                            let dest = SocketAddr::V4(*addr);
660                            if let Err(e) = network.send_to(&forwarded_bytes, dest).await {
661                                warn!(dest = %dest, error = %e, "Failed to forward broadcast");
662                            } else {
663                                self.metrics.record_bbmd_forwarded();
664                                self.metrics.record_bytes_sent(forwarded_bytes.len() as u64);
665                            }
666                        }
667                    }
668                    // Fall through to process locally
669                }
670                _ => {
671                    // Other BVLC functions — continue to normal NPDU/APDU processing
672                }
673            }
674        }
675
676        // Get NPDU from BVLC
677        let npdu_data = match bvlc.npdu() {
678            Some(data) => data,
679            None => {
680                debug!(function = ?bvlc.header.function, "No NPDU in BVLC message");
681                return Ok(()); // Not an error, just a BVLC-only message
682            }
683        };
684
685        // Parse NPDU
686        let npdu = Npdu::decode(npdu_data).map_err(|e| BacnetError::Protocol(e.to_string()))?;
687
688        // Skip network layer messages for now
689        if npdu.is_network_message() {
690            debug!("Network layer message, skipping");
691            return Ok(());
692        }
693
694        // Get APDU data
695        let apdu = npdu.apdu();
696        if apdu.is_empty() {
697            return Err(BacnetError::Protocol("Empty APDU".into()));
698        }
699
700        // Parse APDU type
701        let apdu_type_byte = (apdu[0] >> 4) & 0x0F;
702        let apdu_type = ApduType::from_nibble(apdu_type_byte).ok_or_else(|| {
703            BacnetError::Protocol(format!("Unknown APDU type: {}", apdu_type_byte))
704        })?;
705
706        // Create service context with packet source address for COV subscriptions
707        let ctx = ServiceContext::new(self.objects.clone(), self.config.device_instance)
708            .with_source_address(packet.source);
709
710        // Process based on APDU type
711        let response = match apdu_type {
712            ApduType::ConfirmedRequest => {
713                self.metrics.record_confirmed_request();
714                self.process_confirmed_request(apdu, &ctx, packet.source)?
715            }
716            ApduType::UnconfirmedRequest => {
717                self.metrics.record_unconfirmed_request();
718                self.process_unconfirmed_request(apdu, &ctx, packet.source)?
719            }
720            ApduType::SegmentAck => {
721                // SegmentACK received from a client acknowledging our segmented response.
722                // Per ASHRAE 135 Clause 5.4.5.2, we would track outstanding segments
723                // and send the next window. For a synchronous server simulator we record
724                // the metric and acknowledge; the actual segment sending happens inline
725                // in dispatch_and_respond (see below).
726                self.metrics.record_segment_ack_received();
727                debug!(source = %packet.source, "SegmentACK received");
728                return Ok(());
729            }
730            _ => {
731                debug!(apdu_type = ?apdu_type, "Unsupported APDU type");
732                return Ok(());
733            }
734        };
735
736        // Send response(s) — may be multiple APDUs if response is segmented
737        match response {
738            ProcessedResponse::Single(response_apdu, dest) => {
739                self.send_response(&response_apdu, dest, &npdu, bvlc, network)
740                    .await?;
741            }
742            ProcessedResponse::Segmented {
743                segments,
744                dest,
745                invoke_id,
746                service_choice,
747            } => {
748                self.send_segmented_response(
749                    &segments,
750                    dest,
751                    invoke_id,
752                    service_choice,
753                    &npdu,
754                    bvlc,
755                    network,
756                )
757                .await?;
758            }
759            ProcessedResponse::None => {}
760        }
761
762        let latency = timer.elapsed_us();
763        self.metrics.record_success(latency);
764
765        Ok(())
766    }
767
768    /// Process a confirmed request.
769    ///
770    /// Implements the 3-tier BACnet error response protocol:
771    /// - **Error PDU** (0x50): Service-level errors (known service, operation failed)
772    /// - **Reject PDU** (0x60): Protocol violations in the request PDU
773    /// - **Abort PDU** (0x70): Server-side inability to process (resources, segmentation)
774    ///
775    /// Supports reassembly of incoming segmented requests via `SegmentAssembler`.
776    /// When a segmented request arrives, intermediate segments are acknowledged with
777    /// SegmentACK and the service handler is only invoked once the full message is assembled.
778    fn process_confirmed_request(
779        &self,
780        apdu: &[u8],
781        ctx: &ServiceContext,
782        source: SocketAddr,
783    ) -> BacnetResult<ProcessedResponse> {
784        // Too short to even contain a valid header — cannot extract invoke_id,
785        // so we must silently discard per ASHRAE 135 Clause 6.1.
786        if apdu.len() < 3 {
787            debug!(
788                "Confirmed request too short ({} bytes), discarding",
789                apdu.len()
790            );
791            self.metrics.record_error();
792            return Ok(ProcessedResponse::None);
793        }
794
795        // Parse confirmed request header per ASHRAE 135, Clause 20.1.2:
796        // Byte 0: PDU type (bits 7-4) | segmented (bit 3) | more_follows (bit 2) | SA (bit 1)
797        // Byte 1: max-segments-accepted (bits 6-4) | max-APDU-length-accepted (bits 3-0)
798        // Byte 2: invoke-id
799        // Byte 3: sequence-number (only if segmented)
800        // Byte 4: proposed-window-size (only if segmented)
801        // Then: service-choice | service-data
802        let pdu_type_byte = apdu[0];
803        let segmented = (pdu_type_byte & 0x08) != 0;
804        let more_follows = (pdu_type_byte & 0x04) != 0;
805        let _segmented_response_accepted = (pdu_type_byte & 0x02) != 0;
806
807        let invoke_id = apdu[2];
808
809        // Handle segmented requests — reassemble via SegmentAssembler
810        if segmented {
811            if apdu.len() < 5 {
812                let abort = build_abort_apdu(invoke_id, AbortReason::Other);
813                self.metrics.record_error();
814                return Ok(ProcessedResponse::Single(abort, source));
815            }
816
817            let sequence_number = apdu[3];
818            let _proposed_window = apdu[4];
819
820            // Extract service choice from the first segment only
821            let (service_choice_opt, segment_data) = if sequence_number == 0 {
822                // First segment: service-choice is at byte 5
823                if apdu.len() < 6 {
824                    let reject =
825                        build_reject_apdu(invoke_id, RejectReason::MissingRequiredParameter as u8);
826                    self.metrics.record_error();
827                    return Ok(ProcessedResponse::Single(reject, source));
828                }
829                (Some(apdu[5]), apdu[6..].to_vec())
830            } else {
831                // Subsequent segments: no service-choice header, all bytes are data
832                (None, apdu[5..].to_vec())
833            };
834
835            // Build a Segment struct for the assembler
836            let mut segment = Segment::new(sequence_number, more_follows, invoke_id, segment_data);
837            if let Some(sc) = service_choice_opt {
838                segment = segment.with_service_choice(sc);
839            }
840
841            // Hash the source address for the assembler key
842            let source_hash = hash_socket_addr(&source);
843
844            self.metrics.record_segment_received();
845
846            let assembly_result = {
847                let mut assembler = self.segment_assembler.lock();
848                assembler.process_segment(source_hash, &segment)
849            };
850
851            match assembly_result {
852                Ok(AssemblyResult::NeedAck(ack_seq)) => {
853                    // Intermediate segment — send SegmentACK and wait for more
854                    debug!(
855                        invoke_id = invoke_id,
856                        sequence = ack_seq,
857                        "Sending SegmentACK for intermediate segment"
858                    );
859                    let ack = build_segment_ack_apdu(invoke_id, ack_seq, DEFAULT_WINDOW_SIZE, true);
860                    self.metrics.record_segment_ack_sent();
861                    return Ok(ProcessedResponse::Single(ack, source));
862                }
863                Ok(AssemblyResult::Complete) => {
864                    // All segments received — assemble and dispatch
865                    let (assembled_data, assembled_service_choice) = {
866                        let mut assembler = self.segment_assembler.lock();
867                        match assembler.get_complete(source_hash, invoke_id) {
868                            Some(result) => result,
869                            None => {
870                                // Should not happen if assembler returned Complete
871                                let abort = build_abort_apdu(invoke_id, AbortReason::Other);
872                                self.metrics.record_error();
873                                return Ok(ProcessedResponse::Single(abort, source));
874                            }
875                        }
876                    };
877
878                    self.metrics.record_segmented_request_reassembled();
879
880                    let service_choice = assembled_service_choice.unwrap_or(0);
881                    debug!(
882                        invoke_id = invoke_id,
883                        service_choice = service_choice,
884                        assembled_size = assembled_data.len(),
885                        "Segmented request fully reassembled"
886                    );
887
888                    return self.dispatch_and_respond(
889                        invoke_id,
890                        service_choice,
891                        &assembled_data,
892                        ctx,
893                        source,
894                    );
895                }
896                Ok(AssemblyResult::Duplicate) => {
897                    debug!(invoke_id = invoke_id, "Duplicate segment ignored");
898                    return Ok(ProcessedResponse::None);
899                }
900                Err(e) => {
901                    // Assembly error — abort the transaction
902                    warn!(
903                        invoke_id = invoke_id,
904                        error = %e,
905                        "Segment assembly error"
906                    );
907                    let abort = build_abort_apdu(invoke_id, AbortReason::Other);
908                    self.metrics.record_error();
909                    return Ok(ProcessedResponse::Single(abort, source));
910                }
911            }
912        }
913
914        // Non-segmented: need at least 4 bytes (pdu_type, max_info, invoke_id, service_choice)
915        if apdu.len() < 4 {
916            let reject = build_reject_apdu(invoke_id, RejectReason::MissingRequiredParameter as u8);
917            self.metrics.record_error();
918            return Ok(ProcessedResponse::Single(reject, source));
919        }
920
921        let service_choice = apdu[3];
922        let service_data = &apdu[4..];
923
924        self.dispatch_and_respond(invoke_id, service_choice, service_data, ctx, source)
925    }
926
927    /// Dispatch a confirmed service and build the response APDU.
928    ///
929    /// Integrates with the server TSM for duplicate detection and chaos testing.
930    /// Supports segmented outgoing responses when the ComplexACK exceeds max_apdu_length.
931    ///
932    /// Flow:
933    /// 1. Check TSM for duplicate requests (return cached response if available).
934    /// 2. Dispatch to the service handler.
935    /// 3. Build response — segmenting if needed.
936    /// 4. Cache the response in the TSM.
937    /// 5. Apply chaos drop/delay if configured.
938    fn dispatch_and_respond(
939        &self,
940        invoke_id: u8,
941        service_choice: u8,
942        service_data: &[u8],
943        ctx: &ServiceContext,
944        source: SocketAddr,
945    ) -> BacnetResult<ProcessedResponse> {
946        let tsm_key = TransactionKey::new(source, invoke_id);
947
948        // Step 1: TSM duplicate detection
949        match self.tsm.begin_transaction(tsm_key, service_choice) {
950            Ok(Some(cached_response)) => {
951                // Duplicate with cached response — return it directly
952                debug!(
953                    invoke_id = invoke_id,
954                    source = %source,
955                    "Duplicate request, returning cached response"
956                );
957                return Ok(ProcessedResponse::Single(cached_response, source));
958            }
959            Ok(None) => {
960                // New transaction — proceed with handler dispatch
961            }
962            Err(crate::service::tsm::TsmError::DuplicateInProgress) => {
963                // Duplicate while still processing — silently ignore
964                debug!(
965                    invoke_id = invoke_id,
966                    source = %source,
967                    "Duplicate request while processing, ignoring"
968                );
969                return Ok(ProcessedResponse::None);
970            }
971            Err(crate::service::tsm::TsmError::AtCapacity) => {
972                // TSM at capacity — abort
973                warn!("TSM at capacity, aborting request");
974                let abort = build_abort_apdu(invoke_id, AbortReason::OutOfResources);
975                return Ok(ProcessedResponse::Single(abort, source));
976            }
977        }
978
979        // Step 2: Track specific services for metrics
980        if let Some(service) = ConfirmedService::from_u8(service_choice) {
981            match service {
982                ConfirmedService::ReadProperty | ConfirmedService::ReadPropertyMultiple => {
983                    self.metrics.record_read_property();
984                }
985                ConfirmedService::WriteProperty | ConfirmedService::WritePropertyMultiple => {
986                    self.metrics.record_write_property();
987                }
988                ConfirmedService::SubscribeCov | ConfirmedService::SubscribeCovProperty => {
989                    self.metrics.record_cov_subscription();
990                }
991                _ => {}
992            }
993        }
994
995        // Step 3: Dispatch to handler
996        let ctx_with_invoke = ctx.clone().with_invoke_id(invoke_id);
997        let result =
998            self.services
999                .dispatch_confirmed(service_choice, service_data, &ctx_with_invoke);
1000
1001        // Step 4: Build response APDU using the 3-tier protocol
1002        let response = match result {
1003            ServiceResult::SimpleAck => {
1004                let apdu = vec![0x20, invoke_id, service_choice];
1005                let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1006                if !should_send {
1007                    debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1008                    return Ok(ProcessedResponse::None);
1009                }
1010                ProcessedResponse::Single(apdu, source)
1011            }
1012            ServiceResult::ComplexAck(data) => {
1013                // Non-segmented ComplexACK header is 3 bytes (type, invoke_id, service_choice)
1014                let unsegmented_len = 3 + data.len();
1015
1016                if unsegmented_len <= self.config.max_apdu_length as usize {
1017                    // Fits in a single APDU — unsegmented ComplexACK
1018                    let mut apdu = vec![0x30, invoke_id, service_choice];
1019                    apdu.extend_from_slice(&data);
1020
1021                    let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1022                    if !should_send {
1023                        debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1024                        return Ok(ProcessedResponse::None);
1025                    }
1026                    ProcessedResponse::Single(apdu, source)
1027                } else if self.segment_transmitter.needs_segmentation(data.len()) {
1028                    // Needs segmentation — split the service data into segments.
1029                    // Each segment gets a 5-byte segmented ComplexACK header added at send time.
1030                    let segments = self.segment_transmitter.segment(&data, invoke_id);
1031                    let segment_count = segments.len();
1032
1033                    debug!(
1034                        invoke_id = invoke_id,
1035                        total_size = data.len(),
1036                        segment_count = segment_count,
1037                        "ComplexACK requires segmentation"
1038                    );
1039
1040                    // Cache the full unsegmented response in TSM for potential retransmission
1041                    let mut full_response = vec![0x30, invoke_id, service_choice];
1042                    full_response.extend_from_slice(&data);
1043                    let should_send = self.tsm.complete_transaction(&tsm_key, full_response);
1044                    if !should_send {
1045                        debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1046                        return Ok(ProcessedResponse::None);
1047                    }
1048
1049                    self.metrics.record_segmented_response_transmitted();
1050                    self.metrics.record_segments_sent(segment_count as u64);
1051
1052                    // Extract raw segment data
1053                    let segment_data: Vec<Vec<u8>> = segments.into_iter().map(|s| s.data).collect();
1054
1055                    ProcessedResponse::Segmented {
1056                        segments: segment_data,
1057                        dest: source,
1058                        invoke_id,
1059                        service_choice,
1060                    }
1061                } else {
1062                    // Data fits in one segment but somehow exceeds unsegmented limit
1063                    // (edge case — shouldn't normally happen, but handle gracefully)
1064                    let mut apdu = vec![0x30, invoke_id, service_choice];
1065                    apdu.extend_from_slice(&data);
1066                    let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1067                    if !should_send {
1068                        debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1069                        return Ok(ProcessedResponse::None);
1070                    }
1071                    ProcessedResponse::Single(apdu, source)
1072                }
1073            }
1074            ServiceResult::Error {
1075                error_class,
1076                error_code,
1077            } => {
1078                let apdu = build_error_apdu(invoke_id, service_choice, error_class, error_code);
1079                let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1080                if !should_send {
1081                    debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1082                    return Ok(ProcessedResponse::None);
1083                }
1084                ProcessedResponse::Single(apdu, source)
1085            }
1086            ServiceResult::Reject(reason) => {
1087                let apdu = build_reject_apdu(invoke_id, reason);
1088                let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1089                if !should_send {
1090                    debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1091                    return Ok(ProcessedResponse::None);
1092                }
1093                ProcessedResponse::Single(apdu, source)
1094            }
1095            ServiceResult::Abort(reason) => {
1096                let apdu = build_abort_apdu(invoke_id, reason);
1097                let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1098                if !should_send {
1099                    debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1100                    return Ok(ProcessedResponse::None);
1101                }
1102                ProcessedResponse::Single(apdu, source)
1103            }
1104            ServiceResult::NoResponse | ServiceResult::Broadcast(_) => {
1105                return Ok(ProcessedResponse::None);
1106            }
1107        };
1108
1109        Ok(response)
1110    }
1111
1112    /// Process an unconfirmed request.
1113    fn process_unconfirmed_request(
1114        &self,
1115        apdu: &[u8],
1116        ctx: &ServiceContext,
1117        source: SocketAddr,
1118    ) -> BacnetResult<ProcessedResponse> {
1119        if apdu.len() < 2 {
1120            return Err(BacnetError::Protocol(
1121                "Unconfirmed request too short".into(),
1122            ));
1123        }
1124
1125        // Parse unconfirmed request
1126        let _pdu_type = apdu[0]; // Should be 0x10
1127        let service_choice = apdu[1];
1128        let service_data = &apdu[2..];
1129
1130        // Track Who-Is
1131        if service_choice == UnconfirmedService::WhoIs as u8 {
1132            self.metrics.record_who_is();
1133        }
1134
1135        // Dispatch to handler
1136        let result = self
1137            .services
1138            .dispatch_unconfirmed(service_choice, service_data, ctx);
1139
1140        match result {
1141            ServiceResult::Broadcast(data) => {
1142                // For I-Am responses, broadcast back
1143                self.metrics.record_i_am_sent();
1144
1145                // Build unconfirmed APDU: type (0x10) | service (I-Am = 0) | data
1146                let mut apdu = vec![0x10, UnconfirmedService::IAm as u8];
1147                apdu.extend_from_slice(&data);
1148
1149                // Return with broadcast or unicast destination
1150                Ok(ProcessedResponse::Single(apdu, source))
1151            }
1152            ServiceResult::NoResponse => Ok(ProcessedResponse::None),
1153            _ => Ok(ProcessedResponse::None),
1154        }
1155    }
1156
1157    /// Send a single unsegmented APDU response.
1158    async fn send_response(
1159        &self,
1160        response_apdu: &[u8],
1161        dest: SocketAddr,
1162        npdu: &Npdu,
1163        bvlc: &BvlcMessage,
1164        network: &NetworkHandle,
1165    ) -> BacnetResult<()> {
1166        let response_npdu = if npdu.expects_reply() {
1167            Npdu::simple(response_apdu.to_vec())
1168        } else {
1169            Npdu::no_reply(response_apdu.to_vec())
1170        };
1171
1172        let response_bvlc = if bvlc.is_broadcast() {
1173            BvlcMessage::original_broadcast(response_npdu.encode())
1174        } else {
1175            BvlcMessage::original_unicast(response_npdu.encode())
1176        };
1177
1178        let response_bytes = response_bvlc.encode();
1179        self.metrics.record_bytes_sent(response_bytes.len() as u64);
1180
1181        network.send_to(&response_bytes, dest).await
1182    }
1183
1184    /// Send a segmented ComplexACK response as multiple individual APDU packets.
1185    ///
1186    /// Per ASHRAE 135, Clause 5.4.5.2, we send segments within the proposed
1187    /// window size. For simplicity in this simulator, we send all segments
1188    /// sequentially without waiting for SegmentACK between windows (window=1).
1189    /// This matches common BACnet/IP behavior where the underlying transport
1190    /// (UDP) is reliable on a LAN.
1191    async fn send_segmented_response(
1192        &self,
1193        segments: &[Vec<u8>],
1194        dest: SocketAddr,
1195        invoke_id: u8,
1196        service_choice: u8,
1197        npdu: &Npdu,
1198        bvlc: &BvlcMessage,
1199        network: &NetworkHandle,
1200    ) -> BacnetResult<()> {
1201        let total = segments.len();
1202
1203        for (i, segment_data) in segments.iter().enumerate() {
1204            let more_follows = i < total - 1;
1205            let sequence_number = i as u8;
1206
1207            // Build the segmented ComplexACK APDU for this segment
1208            let mut encoder = ApduEncoder::new();
1209            encoder.encode_segmented_complex_ack_header(
1210                invoke_id,
1211                sequence_number,
1212                DEFAULT_WINDOW_SIZE,
1213                more_follows,
1214                service_choice,
1215            );
1216            encoder.put_bytes(segment_data);
1217            let segment_apdu = encoder.into_bytes();
1218
1219            // Wrap in NPDU/BVLC and send
1220            let response_npdu = if npdu.expects_reply() {
1221                Npdu::simple(segment_apdu)
1222            } else {
1223                Npdu::no_reply(segment_apdu)
1224            };
1225
1226            let response_bvlc = if bvlc.is_broadcast() {
1227                BvlcMessage::original_broadcast(response_npdu.encode())
1228            } else {
1229                BvlcMessage::original_unicast(response_npdu.encode())
1230            };
1231
1232            let response_bytes = response_bvlc.encode();
1233            self.metrics.record_bytes_sent(response_bytes.len() as u64);
1234
1235            network.send_to(&response_bytes, dest).await?;
1236
1237            debug!(
1238                invoke_id = invoke_id,
1239                sequence = sequence_number,
1240                more = more_follows,
1241                size = segment_data.len(),
1242                "Sent segment {}/{}",
1243                i + 1,
1244                total
1245            );
1246        }
1247
1248        debug!(
1249            invoke_id = invoke_id,
1250            total_segments = total,
1251            "Segmented ComplexACK fully transmitted"
1252        );
1253
1254        Ok(())
1255    }
1256
1257    /// Get the segment assembler (for testing / cleanup).
1258    pub fn segment_assembler(&self) -> &Mutex<SegmentAssembler> {
1259        &self.segment_assembler
1260    }
1261}
1262
1263impl ServiceContext {
1264    /// Clone with invoke ID.
1265    fn clone(&self) -> Self {
1266        Self {
1267            objects: self.objects.clone(),
1268            device_instance: self.device_instance,
1269            invoke_id: self.invoke_id,
1270            max_apdu_length: self.max_apdu_length,
1271            source_address: self.source_address,
1272        }
1273    }
1274}
1275
1276/// Build an error APDU using the standard ApduEncoder.
1277///
1278/// Produces a BACnet-Error-PDU per ASHRAE 135, Clause 21.8, with
1279/// error-class and error-code encoded as ENUMERATED (Application Tag 9).
1280fn build_error_apdu(
1281    invoke_id: u8,
1282    service_choice: u8,
1283    error_class: ErrorClass,
1284    error_code: ErrorCode,
1285) -> Vec<u8> {
1286    let mut encoder = ApduEncoder::new();
1287    encoder.encode_error_pdu(
1288        invoke_id,
1289        service_choice,
1290        error_class as u32,
1291        error_code as u32,
1292    );
1293    encoder.into_bytes()
1294}
1295
1296/// Build a Reject APDU.
1297///
1298/// Produces a BACnet-Reject-PDU per ASHRAE 135, Clause 21.6.
1299/// Sent when the server detects a protocol violation in the request PDU
1300/// (invalid tags, missing parameters, unrecognized service, etc.).
1301fn build_reject_apdu(invoke_id: u8, reject_reason: u8) -> Vec<u8> {
1302    let mut encoder = ApduEncoder::new();
1303    encoder.encode_reject_pdu(invoke_id, reject_reason);
1304    encoder.into_bytes()
1305}
1306
1307/// Build an Abort APDU.
1308///
1309/// Produces a BACnet-Abort-PDU per ASHRAE 135, Clause 21.7.
1310/// Sent when the server cannot process the request due to server-side issues
1311/// (resource exhaustion, segmentation failure, buffer overflow, etc.).
1312/// The `sent_by_server` bit is always set to true since this is server-generated.
1313fn build_abort_apdu(invoke_id: u8, abort_reason: AbortReason) -> Vec<u8> {
1314    let mut encoder = ApduEncoder::new();
1315    encoder.encode_abort_pdu(invoke_id, abort_reason as u8, true);
1316    encoder.into_bytes()
1317}
1318
1319/// Build a SegmentACK APDU.
1320///
1321/// Produces a BACnet-SegmentACK-PDU per ASHRAE 135, Clause 21.5.
1322/// Sent to acknowledge receipt of segment(s) during a segmented transfer.
1323fn build_segment_ack_apdu(
1324    invoke_id: u8,
1325    sequence_number: u8,
1326    actual_window_size: u8,
1327    sent_by_server: bool,
1328) -> Vec<u8> {
1329    let mut encoder = ApduEncoder::new();
1330    encoder.encode_segment_ack_pdu(
1331        invoke_id,
1332        sequence_number,
1333        actual_window_size,
1334        sent_by_server,
1335        false, // not a NAK
1336    );
1337    encoder.into_bytes()
1338}
1339
1340/// Hash a SocketAddr for use as a key in the segment assembler.
1341///
1342/// The assembler uses (source_hash, invoke_id) as a key to distinguish
1343/// concurrent segmented transfers from different clients.
1344fn hash_socket_addr(addr: &SocketAddr) -> u64 {
1345    let mut hasher = DefaultHasher::new();
1346    addr.hash(&mut hasher);
1347    hasher.finish()
1348}
1349
1350/// Invoke ID counter for confirmed COV notifications.
1351static COV_INVOKE_ID: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0);
1352
1353/// Send a COV notification.
1354async fn send_cov_notification(
1355    network: &NetworkHandle,
1356    notification: CovNotification,
1357) -> BacnetResult<()> {
1358    let apdu = if notification.confirmed {
1359        // Confirmed COV notification — proper ConfirmedRequest APDU
1360        let invoke_id = COV_INVOKE_ID.fetch_add(1, Ordering::Relaxed);
1361        notification.encode_confirmed(invoke_id)
1362    } else {
1363        // Unconfirmed COV notification
1364        let mut apdu = vec![0x10, UnconfirmedService::UnconfirmedCovNotification as u8];
1365        apdu.extend_from_slice(&notification.encode_unconfirmed());
1366        apdu
1367    };
1368
1369    let mut npdu = Npdu::no_reply(apdu);
1370    if notification.confirmed {
1371        npdu.control.expecting_reply = true;
1372    }
1373    let bvlc = BvlcMessage::original_unicast(npdu.encode());
1374
1375    network
1376        .send_to(&bvlc.encode(), notification.destination)
1377        .await
1378}
1379
1380#[cfg(test)]
1381mod tests {
1382    use super::*;
1383
1384    #[test]
1385    fn test_server_config_default() {
1386        let config = ServerConfig::default();
1387        assert_eq!(config.bind_addr.port(), 47808);
1388        assert_eq!(config.max_apdu_length, 1476);
1389    }
1390
1391    #[test]
1392    fn test_server_config_builder() {
1393        let config = ServerConfig::new(5678)
1394            .with_device_name("Test Device")
1395            .with_vendor_id(123);
1396
1397        assert_eq!(config.device_instance, 5678);
1398        assert_eq!(config.device_name, "Test Device");
1399        assert_eq!(config.vendor_id, 123);
1400    }
1401
1402    #[test]
1403    fn test_build_error_apdu() {
1404        let apdu = build_error_apdu(1, 12, ErrorClass::Property, ErrorCode::UnknownProperty);
1405
1406        // Error PDU header
1407        assert_eq!(apdu[0], 0x50); // Error PDU type
1408        assert_eq!(apdu[1], 1); // Invoke ID
1409        assert_eq!(apdu[2], 12); // Service choice (ReadProperty)
1410
1411        // Error class = Property (2): Enumerated Tag 9, length 1 → 0x91
1412        assert_eq!(apdu[3], 0x91);
1413        assert_eq!(apdu[4], 2); // ErrorClass::Property = 2
1414
1415        // Error code = UnknownProperty (32): Enumerated Tag 9, length 1 → 0x91
1416        assert_eq!(apdu[5], 0x91);
1417        assert_eq!(apdu[6], 32); // ErrorCode::UnknownProperty = 32
1418
1419        assert_eq!(apdu.len(), 7);
1420    }
1421
1422    #[test]
1423    fn test_build_error_apdu_unknown_object() {
1424        let apdu = build_error_apdu(3, 12, ErrorClass::Object, ErrorCode::UnknownObject);
1425
1426        assert_eq!(apdu[0], 0x50);
1427        assert_eq!(apdu[1], 3);
1428        assert_eq!(apdu[2], 12);
1429
1430        // ErrorClass::Object = 1
1431        assert_eq!(apdu[3], 0x91);
1432        assert_eq!(apdu[4], 1);
1433
1434        // ErrorCode::UnknownObject = 31
1435        assert_eq!(apdu[5], 0x91);
1436        assert_eq!(apdu[6], 31);
1437    }
1438
1439    #[test]
1440    fn test_build_reject_apdu() {
1441        let apdu = build_reject_apdu(7, RejectReason::UnrecognizedService as u8);
1442
1443        assert_eq!(apdu.len(), 3);
1444        assert_eq!(apdu[0], 0x60); // Reject PDU type
1445        assert_eq!(apdu[1], 7); // Invoke ID
1446        assert_eq!(apdu[2], 9); // UnrecognizedService = 9
1447    }
1448
1449    #[test]
1450    fn test_build_reject_apdu_missing_param() {
1451        let apdu = build_reject_apdu(2, RejectReason::MissingRequiredParameter as u8);
1452
1453        assert_eq!(apdu[0], 0x60);
1454        assert_eq!(apdu[1], 2);
1455        assert_eq!(apdu[2], 5); // MissingRequiredParameter = 5
1456    }
1457
1458    #[test]
1459    fn test_build_abort_apdu_segmentation() {
1460        let apdu = build_abort_apdu(10, AbortReason::SegmentationNotSupported);
1461
1462        assert_eq!(apdu.len(), 3);
1463        assert_eq!(apdu[0], 0x71); // Abort PDU type (0x70) + server bit (0x01)
1464        assert_eq!(apdu[1], 10); // Invoke ID
1465        assert_eq!(apdu[2], 4); // SegmentationNotSupported = 4
1466    }
1467
1468    #[test]
1469    fn test_build_abort_apdu_buffer_overflow() {
1470        let apdu = build_abort_apdu(5, AbortReason::BufferOverflow);
1471
1472        assert_eq!(apdu[0], 0x71); // Server-sent abort
1473        assert_eq!(apdu[1], 5);
1474        assert_eq!(apdu[2], 1); // BufferOverflow = 1
1475    }
1476
1477    #[test]
1478    fn test_build_abort_apdu_too_long() {
1479        let apdu = build_abort_apdu(3, AbortReason::ApduTooLong);
1480
1481        assert_eq!(apdu[0], 0x71);
1482        assert_eq!(apdu[1], 3);
1483        assert_eq!(apdu[2], 11); // ApduTooLong = 11
1484    }
1485
1486    #[test]
1487    fn test_build_abort_apdu_out_of_resources() {
1488        let apdu = build_abort_apdu(1, AbortReason::OutOfResources);
1489
1490        assert_eq!(apdu[0], 0x71);
1491        assert_eq!(apdu[1], 1);
1492        assert_eq!(apdu[2], 9); // OutOfResources = 9
1493    }
1494
1495    #[test]
1496    fn test_build_abort_other() {
1497        let apdu = build_abort_apdu(0, AbortReason::Other);
1498
1499        assert_eq!(apdu[0], 0x71);
1500        assert_eq!(apdu[1], 0);
1501        assert_eq!(apdu[2], 0); // Other = 0
1502    }
1503
1504    // ===== SegmentACK PDU tests =====
1505
1506    #[test]
1507    fn test_build_segment_ack_apdu_server() {
1508        let apdu = build_segment_ack_apdu(42, 3, 1, true);
1509
1510        assert_eq!(apdu.len(), 4);
1511        assert_eq!(apdu[0], 0x41); // SegmentACK (0x40) + server bit (0x01)
1512        assert_eq!(apdu[1], 42); // Invoke ID
1513        assert_eq!(apdu[2], 3); // Sequence number
1514        assert_eq!(apdu[3], 1); // Window size
1515    }
1516
1517    #[test]
1518    fn test_build_segment_ack_apdu_client() {
1519        let apdu = build_segment_ack_apdu(10, 0, 4, false);
1520
1521        assert_eq!(apdu.len(), 4);
1522        assert_eq!(apdu[0], 0x40); // SegmentACK (0x40), no server bit
1523        assert_eq!(apdu[1], 10);
1524        assert_eq!(apdu[2], 0);
1525        assert_eq!(apdu[3], 4);
1526    }
1527
1528    // ===== hash_socket_addr tests =====
1529
1530    #[test]
1531    fn test_hash_socket_addr_deterministic() {
1532        let addr1: SocketAddr = "192.168.1.100:47808".parse().unwrap();
1533        let h1 = hash_socket_addr(&addr1);
1534        let h2 = hash_socket_addr(&addr1);
1535        assert_eq!(h1, h2);
1536    }
1537
1538    #[test]
1539    fn test_hash_socket_addr_different_addresses() {
1540        let addr1: SocketAddr = "192.168.1.100:47808".parse().unwrap();
1541        let addr2: SocketAddr = "192.168.1.101:47808".parse().unwrap();
1542        let h1 = hash_socket_addr(&addr1);
1543        let h2 = hash_socket_addr(&addr2);
1544        assert_ne!(h1, h2);
1545    }
1546
1547    // ===== Segmented ComplexACK header tests =====
1548
1549    #[test]
1550    fn test_segmented_complex_ack_header_first_segment() {
1551        let mut encoder = ApduEncoder::new();
1552        encoder.encode_segmented_complex_ack_header(
1553            1,    // invoke_id
1554            0,    // sequence 0 (first)
1555            1,    // window_size
1556            true, // more follows
1557            12,   // ReadProperty service
1558        );
1559        let bytes = encoder.into_bytes();
1560
1561        assert_eq!(bytes.len(), 5);
1562        // PDU type byte: ComplexACK (0x30) | segmented (0x08) | more_follows (0x04) = 0x3C
1563        assert_eq!(bytes[0], 0x3C);
1564        assert_eq!(bytes[1], 1); // invoke_id
1565        assert_eq!(bytes[2], 0); // sequence_number
1566        assert_eq!(bytes[3], 1); // window_size
1567        assert_eq!(bytes[4], 12); // service_choice
1568    }
1569
1570    #[test]
1571    fn test_segmented_complex_ack_header_last_segment() {
1572        let mut encoder = ApduEncoder::new();
1573        encoder.encode_segmented_complex_ack_header(
1574            5,     // invoke_id
1575            3,     // sequence 3 (last)
1576            1,     // window_size
1577            false, // no more
1578            14,    // ReadPropertyMultiple
1579        );
1580        let bytes = encoder.into_bytes();
1581
1582        assert_eq!(bytes.len(), 5);
1583        // PDU type byte: ComplexACK (0x30) | segmented (0x08) = 0x38 (no more-follows)
1584        assert_eq!(bytes[0], 0x38);
1585        assert_eq!(bytes[1], 5);
1586        assert_eq!(bytes[2], 3);
1587        assert_eq!(bytes[3], 1);
1588        assert_eq!(bytes[4], 14);
1589    }
1590
1591    // ===== Segment transmitter integration tests =====
1592
1593    #[test]
1594    fn test_segment_transmitter_sizing() {
1595        // With max_apdu = 1476 and header overhead = 5, max segment data = 1471
1596        let max_apdu: u16 = 1476;
1597        let header_overhead = 5;
1598        let max_seg = (max_apdu as usize) - header_overhead;
1599        let transmitter = SegmentTransmitter::new(max_seg);
1600
1601        // Data that fits in one segment (1471 bytes or less)
1602        assert!(!transmitter.needs_segmentation(1471));
1603        assert!(!transmitter.needs_segmentation(1000));
1604
1605        // Data that needs segmentation
1606        assert!(transmitter.needs_segmentation(1472));
1607        assert!(transmitter.needs_segmentation(3000));
1608
1609        // Verify segment count
1610        assert_eq!(transmitter.calculate_segment_count(1471), 1);
1611        assert_eq!(transmitter.calculate_segment_count(1472), 2);
1612        assert_eq!(transmitter.calculate_segment_count(2942), 2);
1613        assert_eq!(transmitter.calculate_segment_count(2943), 3);
1614    }
1615
1616    #[test]
1617    fn test_segment_transmitter_round_trip_with_headers() {
1618        // Simulate the server's segmentation of a large response
1619        let max_apdu: u16 = 100;
1620        let header_overhead = 5;
1621        let max_seg = (max_apdu as usize) - header_overhead; // 95 bytes per segment
1622        let transmitter = SegmentTransmitter::new(max_seg);
1623
1624        let invoke_id = 7;
1625        let service_choice = 14; // ReadPropertyMultiple
1626        let original_data: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
1627
1628        let segments = transmitter.segment(&original_data, invoke_id);
1629        assert!(segments.len() > 1);
1630
1631        // Build full segmented APDU packets (as the server would)
1632        let mut packets: Vec<Vec<u8>> = Vec::new();
1633        let total = segments.len();
1634        for (i, seg) in segments.iter().enumerate() {
1635            let more = i < total - 1;
1636            let mut enc = ApduEncoder::new();
1637            enc.encode_segmented_complex_ack_header(invoke_id, i as u8, 1, more, service_choice);
1638            enc.put_bytes(&seg.data);
1639            packets.push(enc.into_bytes());
1640        }
1641
1642        // Verify each packet structure
1643        for (i, packet) in packets.iter().enumerate() {
1644            let more = i < total - 1;
1645            // Byte 0: type flags
1646            if more {
1647                assert_eq!(packet[0] & 0x0C, 0x0C); // segmented + more_follows
1648            } else {
1649                assert_eq!(packet[0] & 0x0C, 0x08); // segmented, no more_follows
1650            }
1651            assert_eq!(packet[1], invoke_id);
1652            assert_eq!(packet[2], i as u8); // sequence
1653            assert_eq!(packet[3], 1); // window_size
1654            assert_eq!(packet[4], service_choice);
1655        }
1656
1657        // Reassemble from segments and verify data integrity
1658        let mut reassembled = Vec::new();
1659        for packet in &packets {
1660            reassembled.extend_from_slice(&packet[5..]); // skip 5-byte header
1661        }
1662        assert_eq!(reassembled, original_data);
1663    }
1664
1665    // ===== Segment assembler integration tests =====
1666
1667    #[test]
1668    fn test_segment_assembler_reassembly_flow() {
1669        let mut assembler = SegmentAssembler::default();
1670        let source_hash: u64 = 12345;
1671        let invoke_id = 1;
1672
1673        // Simulate 3 segments arriving
1674        let seg1 = Segment::new(0, true, invoke_id, vec![1, 2, 3]).with_service_choice(12);
1675        let result = assembler.process_segment(source_hash, &seg1).unwrap();
1676        assert!(matches!(result, AssemblyResult::NeedAck(0)));
1677
1678        let seg2 = Segment::new(1, true, invoke_id, vec![4, 5, 6]);
1679        let result = assembler.process_segment(source_hash, &seg2).unwrap();
1680        assert!(matches!(result, AssemblyResult::NeedAck(1)));
1681
1682        let seg3 = Segment::new(2, false, invoke_id, vec![7, 8, 9]);
1683        let result = assembler.process_segment(source_hash, &seg3).unwrap();
1684        assert!(matches!(result, AssemblyResult::Complete));
1685
1686        // Retrieve and verify
1687        let (data, service) = assembler.get_complete(source_hash, invoke_id).unwrap();
1688        assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
1689        assert_eq!(service, Some(12));
1690    }
1691
1692    // ===== Metrics tests =====
1693
1694    #[test]
1695    fn test_segmentation_metrics() {
1696        let metrics = ServerMetrics::new();
1697
1698        metrics.record_segmented_request_reassembled();
1699        metrics.record_segmented_response_transmitted();
1700        metrics.record_segments_sent(5);
1701        metrics.record_segment_received();
1702        metrics.record_segment_received();
1703        metrics.record_segment_ack_sent();
1704        metrics.record_segment_ack_received();
1705
1706        let snapshot = metrics.snapshot();
1707        assert_eq!(snapshot.segmented_requests_reassembled, 1);
1708        assert_eq!(snapshot.segmented_responses_transmitted, 1);
1709        assert_eq!(snapshot.segments_sent, 5);
1710        assert_eq!(snapshot.segments_received, 2);
1711        assert_eq!(snapshot.segment_acks_sent, 1);
1712        assert_eq!(snapshot.segment_acks_received, 1);
1713    }
1714
1715    // ===== SegmentACK encoder edge cases =====
1716
1717    #[test]
1718    fn test_segment_ack_with_nak() {
1719        let mut encoder = ApduEncoder::new();
1720        encoder.encode_segment_ack_pdu(
1721            10, 5, 2, true, // server
1722            true, // NAK
1723        );
1724        let bytes = encoder.into_bytes();
1725
1726        assert_eq!(bytes.len(), 4);
1727        // 0x40 | 0x02 (NAK) | 0x01 (server) = 0x43
1728        assert_eq!(bytes[0], 0x43);
1729        assert_eq!(bytes[1], 10);
1730        assert_eq!(bytes[2], 5);
1731        assert_eq!(bytes[3], 2);
1732    }
1733
1734    #[test]
1735    fn test_segment_ack_client_no_nak() {
1736        let mut encoder = ApduEncoder::new();
1737        encoder.encode_segment_ack_pdu(
1738            255, 0, 1, false, // client
1739            false, // no NAK
1740        );
1741        let bytes = encoder.into_bytes();
1742
1743        assert_eq!(bytes[0], 0x40); // Plain SegmentACK
1744        assert_eq!(bytes[1], 255); // Max invoke_id
1745        assert_eq!(bytes[2], 0); // Sequence 0
1746        assert_eq!(bytes[3], 1); // Window 1
1747    }
1748
1749    // ===== BBMD server integration tests =====
1750
1751    #[test]
1752    fn test_server_with_bbmd_config() {
1753        let config = ServerConfig::new(1234);
1754        let registry = ObjectRegistry::new();
1755        let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1756
1757        assert!(server.bbmd().is_enabled());
1758    }
1759
1760    #[test]
1761    fn test_server_bbmd_disabled_by_default() {
1762        let config = ServerConfig::new(1234);
1763        let registry = ObjectRegistry::new();
1764        let server = BACnetServer::new(config, registry);
1765
1766        assert!(!server.bbmd().is_enabled());
1767    }
1768
1769    #[test]
1770    fn test_server_bbmd_foreign_device_registration() {
1771        let config = ServerConfig::new(1234);
1772        let registry = ObjectRegistry::new();
1773        let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1774
1775        let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1776        let ttl_data = [0x00, 0x3C]; // TTL = 60 seconds
1777
1778        let msg = BvlcMessage {
1779            header: crate::network::bvlc::BvlcHeader::new(BvlcFunction::RegisterForeignDevice, 6),
1780            npdu: ttl_data.to_vec(),
1781            original_source: None,
1782            result_code: None,
1783        };
1784
1785        let response = server.bbmd().handle_message(&msg, source);
1786        assert!(response.is_some());
1787
1788        // Verify device is registered
1789        assert!(server.bbmd().fdt().is_registered(&source));
1790    }
1791
1792    #[test]
1793    fn test_server_bbmd_forward_addresses() {
1794        let config = ServerConfig::new(1234);
1795        let registry = ObjectRegistry::new();
1796        let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1797
1798        // Add a BDT peer
1799        let peer = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 1), 47808);
1800        server
1801            .bbmd()
1802            .bdt()
1803            .add(crate::network::bbmd::BdtEntry::new(
1804                peer,
1805                Ipv4Addr::new(255, 0, 0, 0),
1806            ))
1807            .unwrap();
1808
1809        // Register a foreign device
1810        let foreign = SocketAddrV4::new(Ipv4Addr::new(172, 16, 0, 50), 47808);
1811        server.bbmd().fdt().register(foreign, 120).unwrap();
1812
1813        let addrs = server.bbmd().get_forward_addresses(None);
1814        assert_eq!(addrs.len(), 2);
1815        assert!(addrs.contains(&peer));
1816        assert!(addrs.contains(&foreign));
1817    }
1818
1819    #[test]
1820    fn test_server_bbmd_forward_excludes_source() {
1821        let config = ServerConfig::new(1234);
1822        let registry = ObjectRegistry::new();
1823        let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1824
1825        let peer1 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 1), 47808);
1826        let peer2 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 2), 47808);
1827        server
1828            .bbmd()
1829            .bdt()
1830            .add(crate::network::bbmd::BdtEntry::new(
1831                peer1,
1832                Ipv4Addr::new(255, 0, 0, 0),
1833            ))
1834            .unwrap();
1835        server
1836            .bbmd()
1837            .bdt()
1838            .add(crate::network::bbmd::BdtEntry::new(
1839                peer2,
1840                Ipv4Addr::new(255, 0, 0, 0),
1841            ))
1842            .unwrap();
1843
1844        // Exclude peer1 (the source)
1845        let addrs = server.bbmd().get_forward_addresses(Some(&peer1));
1846        assert_eq!(addrs.len(), 1);
1847        assert!(addrs.contains(&peer2));
1848    }
1849
1850    #[test]
1851    fn test_server_bbmd_bdt_write_and_read() {
1852        let config = ServerConfig::new(1234);
1853        let registry = ObjectRegistry::new();
1854        let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1855
1856        // Write BDT with 2 entries (10 bytes each)
1857        let mut bdt_data = Vec::new();
1858        // Entry 1: 192.168.1.100:47808, mask 255.255.255.0
1859        bdt_data.extend_from_slice(&[192, 168, 1, 100]);
1860        bdt_data.extend_from_slice(&47808u16.to_be_bytes());
1861        bdt_data.extend_from_slice(&[255, 255, 255, 0]);
1862        // Entry 2: 10.0.0.1:47808, mask 255.0.0.0
1863        bdt_data.extend_from_slice(&[10, 0, 0, 1]);
1864        bdt_data.extend_from_slice(&47808u16.to_be_bytes());
1865        bdt_data.extend_from_slice(&[255, 0, 0, 0]);
1866
1867        let write_msg = BvlcMessage {
1868            header: crate::network::bvlc::BvlcHeader::new(
1869                BvlcFunction::WriteBroadcastDistributionTable,
1870                (4 + bdt_data.len()) as u16,
1871            ),
1872            npdu: bdt_data,
1873            original_source: None,
1874            result_code: None,
1875        };
1876
1877        let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 1), 47808);
1878        let response = server.bbmd().handle_message(&write_msg, source);
1879        assert!(response.is_some());
1880        assert_eq!(server.bbmd().bdt().len(), 2);
1881
1882        // Read BDT
1883        let read_msg = BvlcMessage {
1884            header: crate::network::bvlc::BvlcHeader::new(
1885                BvlcFunction::ReadBroadcastDistributionTable,
1886                4,
1887            ),
1888            npdu: vec![],
1889            original_source: None,
1890            result_code: None,
1891        };
1892
1893        let response = server.bbmd().handle_message(&read_msg, source);
1894        assert!(response.is_some());
1895        let response = response.unwrap();
1896        assert_eq!(
1897            response.header.function,
1898            BvlcFunction::ReadBroadcastDistributionTableAck
1899        );
1900        // 2 entries * 10 bytes
1901        assert_eq!(response.npdu.len(), 20);
1902    }
1903
1904    #[test]
1905    fn test_server_bbmd_fdt_cleanup() {
1906        let config = ServerConfig::new(1234);
1907        let registry = ObjectRegistry::new();
1908        let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1909
1910        // Register with TTL=0 (immediately expired)
1911        let foreign = SocketAddrV4::new(Ipv4Addr::new(172, 16, 0, 50), 47808);
1912        server.bbmd().fdt().register(foreign, 0).unwrap();
1913        assert_eq!(server.bbmd().fdt().len(), 1);
1914
1915        // Wait a tiny bit to ensure expiry
1916        std::thread::sleep(std::time::Duration::from_millis(10));
1917
1918        // Cleanup
1919        let cleaned = server.bbmd().cleanup();
1920        assert_eq!(cleaned, 1);
1921        assert_eq!(server.bbmd().fdt().len(), 0);
1922    }
1923
1924    #[test]
1925    fn test_bbmd_metrics() {
1926        let metrics = ServerMetrics::new();
1927
1928        metrics.record_bbmd_forwarded();
1929        metrics.record_bbmd_forwarded();
1930        metrics.record_bbmd_forwarded();
1931        metrics.record_bbmd_foreign_registration();
1932        metrics.record_bbmd_foreign_registration();
1933
1934        let snapshot = metrics.snapshot();
1935        assert_eq!(snapshot.bbmd_forwarded, 3);
1936        assert_eq!(snapshot.bbmd_foreign_registrations, 2);
1937    }
1938
1939    #[test]
1940    fn test_server_bbmd_disabled_no_response() {
1941        let config = ServerConfig::new(1234);
1942        let registry = ObjectRegistry::new();
1943        let server = BACnetServer::new(config, registry);
1944        // BBMD is disabled by default
1945
1946        let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1947        let msg = BvlcMessage {
1948            header: crate::network::bvlc::BvlcHeader::new(BvlcFunction::RegisterForeignDevice, 6),
1949            npdu: vec![0x00, 0x3C],
1950            original_source: None,
1951            result_code: None,
1952        };
1953
1954        // When disabled, handle_message returns None
1955        let response = server.bbmd().handle_message(&msg, source);
1956        assert!(response.is_none());
1957    }
1958
1959    #[test]
1960    fn test_server_bbmd_reject_foreign_devices() {
1961        let config = ServerConfig::new(1234);
1962        let registry = ObjectRegistry::new();
1963        let server = BACnetServer::new(config, registry)
1964            .with_bbmd_config(BbmdConfig::enabled().with_accept_foreign_devices(false));
1965
1966        let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1967        let msg = BvlcMessage {
1968            header: crate::network::bvlc::BvlcHeader::new(BvlcFunction::RegisterForeignDevice, 6),
1969            npdu: vec![0x00, 0x3C],
1970            original_source: None,
1971            result_code: None,
1972        };
1973
1974        let response = server.bbmd().handle_message(&msg, source);
1975        assert!(response.is_some());
1976        let response = response.unwrap();
1977        // Should be NAK
1978        assert_eq!(
1979            response.result_code,
1980            Some(crate::network::bvlc::BvlcResultCode::RegisterForeignDeviceNak)
1981        );
1982    }
1983
1984    #[test]
1985    fn test_server_bbmd_delete_fdt_entry() {
1986        let config = ServerConfig::new(1234);
1987        let registry = ObjectRegistry::new();
1988        let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1989
1990        // Register a foreign device first
1991        let foreign = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1992        server.bbmd().fdt().register(foreign, 60).unwrap();
1993        assert!(server.bbmd().fdt().is_registered(&foreign));
1994
1995        // Delete it via BVLC message
1996        let mut delete_data = Vec::new();
1997        delete_data.extend_from_slice(&[192, 168, 1, 200]);
1998        delete_data.extend_from_slice(&47808u16.to_be_bytes());
1999
2000        let msg = BvlcMessage {
2001            header: crate::network::bvlc::BvlcHeader::new(
2002                BvlcFunction::DeleteForeignDeviceTableEntry,
2003                (4 + delete_data.len()) as u16,
2004            ),
2005            npdu: delete_data,
2006            original_source: None,
2007            result_code: None,
2008        };
2009
2010        let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 1), 47808);
2011        let response = server.bbmd().handle_message(&msg, source);
2012        assert!(response.is_some());
2013        let response = response.unwrap();
2014        assert_eq!(
2015            response.result_code,
2016            Some(crate::network::bvlc::BvlcResultCode::Success)
2017        );
2018        assert!(!server.bbmd().fdt().is_registered(&foreign));
2019    }
2020
2021    #[test]
2022    fn test_segment_assembler_cleanup() {
2023        let config = ServerConfig::new(1234);
2024        let registry = ObjectRegistry::new();
2025        let server = BACnetServer::new(config, registry);
2026
2027        // Feed a partial segment (incomplete assembly)
2028        {
2029            let mut assembler = server.segment_assembler().lock();
2030            let seg = Segment::new(0, true, 1, vec![1, 2, 3]).with_service_choice(12);
2031            let _ = assembler.process_segment(12345, &seg);
2032            assert_eq!(assembler.active_count(), 1);
2033        }
2034
2035        // The cleanup won't remove it yet (it's not stale — timeout is 10s default).
2036        // But we can verify the cleanup interface works.
2037        {
2038            let mut assembler = server.segment_assembler().lock();
2039            let cleaned = assembler.cleanup();
2040            assert_eq!(cleaned, 0); // Not stale yet
2041            assert_eq!(assembler.active_count(), 1);
2042        }
2043    }
2044}