1use std::collections::hash_map::DefaultHasher;
7use std::hash::{Hash, Hasher};
8use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use parking_lot::Mutex;
14use tokio::sync::{broadcast, mpsc};
15use tracing::{debug, error, info, warn};
16
17use mabi_core::RELEASE_VERSION;
18
19use crate::apdu::encoding::ApduEncoder;
20use crate::apdu::segmentation::{
21 AssemblyResult, Segment, SegmentAssembler, SegmentTransmitter, DEFAULT_WINDOW_SIZE,
22};
23use crate::apdu::types::{
24 AbortReason, ApduType, ConfirmedService, ErrorClass, ErrorCode, RejectReason,
25 UnconfirmedService,
26};
27use crate::error::{BacnetError, BacnetResult};
28use crate::network::bbmd::{Bbmd, BbmdConfig};
29use crate::network::bvlc::{BvlcFunction, BvlcMessage};
30use crate::network::npdu::Npdu;
31use crate::network::udp::{BACnetNetwork, IncomingPacket, NetworkConfig, NetworkHandle};
32use crate::object::device::{DeviceObject, DeviceObjectConfig};
33use crate::object::property::SegmentationSupport;
34use crate::object::registry::ObjectRegistry;
35use crate::object::traits::CovSupport;
36use crate::service::cov::{CovManager, CovNotification};
37use crate::service::discovery::WhoIsHandler;
38use crate::service::handler::{ServiceContext, ServiceRegistry, ServiceResult};
39use crate::service::property::{ReadPropertyHandler, WritePropertyHandler};
40use crate::service::property_multiple::{
41 ReadPropertyMultipleHandler, WritePropertyMultipleHandler,
42};
43use crate::service::subscribe_cov::SubscribeCovHandler;
44use crate::service::tsm::{ServerTsm, TransactionKey, TsmConfig};
45
46use super::metrics::{LatencyTimer, ServerMetrics};
47
48#[derive(Debug, Clone)]
50pub struct ServerConfig {
51 pub bind_addr: SocketAddr,
53 pub broadcast_addr: SocketAddr,
55 pub device_instance: u32,
57 pub device_name: String,
59 pub vendor_id: u16,
61 pub model_name: String,
63 pub max_apdu_length: u16,
65 pub max_cov_subscriptions: usize,
67 pub cov_check_interval: Duration,
69 pub shutdown_timeout: Duration,
71}
72
73impl Default for ServerConfig {
74 fn default() -> Self {
75 Self {
76 bind_addr: "0.0.0.0:47808".parse().unwrap(),
77 broadcast_addr: "255.255.255.255:47808".parse().unwrap(),
78 device_instance: 1234,
79 device_name: "BACnet Simulator".to_string(),
80 vendor_id: 0,
81 model_name: "OTSIM".to_string(),
82 max_apdu_length: 1476,
83 max_cov_subscriptions: 1000,
84 cov_check_interval: Duration::from_secs(1),
85 shutdown_timeout: Duration::from_secs(30),
86 }
87 }
88}
89
90impl ServerConfig {
91 pub fn new(device_instance: u32) -> Self {
93 Self {
94 device_instance,
95 ..Default::default()
96 }
97 }
98
99 pub fn with_bind_addr(mut self, addr: SocketAddr) -> Self {
101 self.bind_addr = addr;
102 self
103 }
104
105 pub fn with_device_name(mut self, name: impl Into<String>) -> Self {
107 self.device_name = name.into();
108 self
109 }
110
111 pub fn with_vendor_id(mut self, vendor_id: u16) -> Self {
113 self.vendor_id = vendor_id;
114 self
115 }
116}
117
118#[derive(Debug, Clone)]
120pub enum ServerEvent {
121 Started { address: SocketAddr },
123 Stopped,
125 DeviceDiscovered {
127 device_instance: u32,
128 address: SocketAddr,
129 },
130 Error { message: String },
132}
133
134enum ProcessedResponse {
139 Single(Vec<u8>, SocketAddr),
141 Segmented {
143 segments: Vec<Vec<u8>>,
145 dest: SocketAddr,
146 invoke_id: u8,
147 service_choice: u8,
148 },
149 None,
151}
152
153pub struct BACnetServer {
155 config: ServerConfig,
156 objects: Arc<ObjectRegistry>,
157 services: Arc<ServiceRegistry>,
158 metrics: Arc<ServerMetrics>,
159 cov_manager: Arc<CovManager>,
160 tsm: Arc<ServerTsm>,
161 segment_assembler: Mutex<SegmentAssembler>,
163 segment_transmitter: SegmentTransmitter,
165 bbmd: Arc<Bbmd>,
167 cov_rx: tokio::sync::Mutex<mpsc::Receiver<CovNotification>>,
168 shutdown: Arc<AtomicBool>,
169 shutdown_tx: broadcast::Sender<()>,
170 event_tx: broadcast::Sender<ServerEvent>,
171}
172
173impl BACnetServer {
174 pub fn new(config: ServerConfig, objects: ObjectRegistry) -> Self {
176 let (shutdown_tx, _) = broadcast::channel(1);
177 let (event_tx, _) = broadcast::channel(64);
178
179 let objects = Arc::new(objects);
180
181 let device_config = DeviceObjectConfig {
183 device_instance: config.device_instance,
184 device_name: config.device_name.clone(),
185 vendor_name: "OTSIM".into(),
186 vendor_id: config.vendor_id,
187 model_name: config.model_name.clone(),
188 firmware_revision: RELEASE_VERSION.into(),
189 application_software_version: RELEASE_VERSION.into(),
190 description: String::new(),
191 location: String::new(),
192 max_apdu_length: config.max_apdu_length,
193 segmentation_supported: SegmentationSupport::Both,
194 apdu_timeout: 3000,
195 number_of_apdu_retries: 3,
196 };
197 let device_object = Arc::new(DeviceObject::new(device_config, objects.clone()));
198 objects.register(device_object.clone());
199
200 let (cov_manager, cov_rx) =
202 CovManager::new(config.device_instance, config.max_cov_subscriptions);
203 let cov_manager = Arc::new(cov_manager);
204
205 let mut services = ServiceRegistry::new();
207 services.register_confirmed(Arc::new(ReadPropertyHandler));
209 services.register_confirmed(Arc::new(WritePropertyHandler));
210 services.register_confirmed(Arc::new(ReadPropertyMultipleHandler::new()));
212 services.register_confirmed(Arc::new(WritePropertyMultipleHandler::new()));
213 services.register_confirmed(Arc::new(SubscribeCovHandler::new(cov_manager.clone())));
215 services.register_confirmed(Arc::new(
216 crate::service::subscribe_cov::SubscribeCovPropertyHandler::new(cov_manager.clone()),
217 ));
218 services.register_confirmed(Arc::new(crate::service::read_range::ReadRangeHandler::new()));
220 services.register_confirmed(Arc::new(
222 crate::service::file_access::AtomicReadFileHandler::new(),
223 ));
224 services.register_confirmed(Arc::new(
225 crate::service::file_access::AtomicWriteFileHandler::new(),
226 ));
227 services.register_confirmed(Arc::new(
229 crate::service::alarm::AcknowledgeAlarmHandler::new(),
230 ));
231 services.register_confirmed(Arc::new(
232 crate::service::alarm::GetAlarmSummaryHandler::new(),
233 ));
234 services.register_confirmed(Arc::new(
235 crate::service::alarm::GetEnrollmentSummaryHandler::new(),
236 ));
237 services.register_confirmed(Arc::new(
238 crate::service::alarm::GetEventInformationHandler::new(),
239 ));
240 services.register_confirmed(Arc::new(
241 crate::service::alarm::ConfirmedEventNotificationHandler::new(),
242 ));
243 services.register_confirmed(Arc::new(
245 crate::service::create_delete::CreateObjectHandler::new(),
246 ));
247 services.register_confirmed(Arc::new(
248 crate::service::create_delete::DeleteObjectHandler::new(),
249 ));
250 services.register_confirmed(Arc::new(
252 crate::service::device_control::DeviceCommunicationControlHandler::new(),
253 ));
254 services.register_confirmed(Arc::new(
255 crate::service::device_control::ReinitializeDeviceHandler::new(),
256 ));
257 services.register_unconfirmed(Arc::new(WhoIsHandler::new(
259 config.device_instance,
260 config.max_apdu_length,
261 SegmentationSupport::Both,
262 config.vendor_id,
263 )));
264 services.register_unconfirmed(Arc::new(
266 crate::service::device_control::TimeSynchronizationHandler::new(),
267 ));
268 services.register_unconfirmed(Arc::new(
269 crate::service::device_control::UtcTimeSynchronizationHandler::new(),
270 ));
271
272 let confirmed_choices = services.supported_confirmed_services();
274 let unconfirmed_choices = services.supported_unconfirmed_services();
275 device_object.update_services_supported(&confirmed_choices, &unconfirmed_choices);
276 device_object.update_object_types_supported();
277
278 let segment_header_overhead = 5;
282 let max_segment_data =
283 (config.max_apdu_length as usize).saturating_sub(segment_header_overhead);
284 let segment_transmitter =
285 SegmentTransmitter::new(max_segment_data).with_window_size(DEFAULT_WINDOW_SIZE);
286 let segment_assembler = SegmentAssembler::default();
287
288 Self {
289 config,
290 objects,
291 services: Arc::new(services),
292 metrics: Arc::new(ServerMetrics::new()),
293 cov_manager,
294 tsm: Arc::new(ServerTsm::new()),
295 segment_assembler: Mutex::new(segment_assembler),
296 segment_transmitter,
297 bbmd: Arc::new(Bbmd::default()),
298 cov_rx: tokio::sync::Mutex::new(cov_rx),
299 shutdown: Arc::new(AtomicBool::new(false)),
300 shutdown_tx,
301 event_tx,
302 }
303 }
304
305 pub fn with_services(mut self, services: ServiceRegistry) -> Self {
307 self.services = Arc::new(services);
308 self
309 }
310
311 pub fn with_tsm_config(mut self, config: TsmConfig) -> Self {
313 self.tsm = Arc::new(ServerTsm::with_config(config));
314 self
315 }
316
317 pub fn with_bbmd_config(mut self, config: BbmdConfig) -> Self {
319 self.bbmd = Arc::new(Bbmd::new(config));
320 self
321 }
322
323 pub fn tsm(&self) -> &Arc<ServerTsm> {
325 &self.tsm
326 }
327
328 pub fn bbmd(&self) -> &Arc<Bbmd> {
330 &self.bbmd
331 }
332
333 pub fn objects(&self) -> &Arc<ObjectRegistry> {
335 &self.objects
336 }
337
338 pub fn metrics(&self) -> &Arc<ServerMetrics> {
340 &self.metrics
341 }
342
343 pub fn subscribe(&self) -> broadcast::Receiver<ServerEvent> {
345 self.event_tx.subscribe()
346 }
347
348 pub fn shutdown(&self) {
350 if !self.shutdown.swap(true, Ordering::SeqCst) {
351 info!("Shutdown requested");
352 let _ = self.shutdown_tx.send(());
353 }
354 }
355
356 pub fn is_shutdown(&self) -> bool {
358 self.shutdown.load(Ordering::SeqCst)
359 }
360
361 pub async fn run(&self) -> BacnetResult<()> {
363 let network_config = NetworkConfig::default()
365 .with_bind_addr(self.config.bind_addr)
366 .with_broadcast_addr(self.config.broadcast_addr);
367
368 let (network, mut recv_rx) = BACnetNetwork::bind(network_config).await?;
369 let network_handle = network.handle();
370
371 let local_addr = network.local_addr()?;
372 info!(address = %local_addr, "BACnet/IP server started");
373
374 let _ = self.event_tx.send(ServerEvent::Started {
375 address: local_addr,
376 });
377
378 let cov_manager = self.cov_manager.clone();
380 let mut cov_rx = {
381 let mut guard = self.cov_rx.lock().await;
382 let (_dummy_tx, dummy_rx) = mpsc::channel(1);
384 std::mem::replace(&mut *guard, dummy_rx)
385 };
386
387 let shutdown_clone = self.shutdown.clone();
389 let network_shutdown = Arc::new(AtomicBool::new(false));
390 let network_shutdown_clone = network_shutdown.clone();
391 let network_task = tokio::spawn(async move {
392 while !shutdown_clone.load(Ordering::SeqCst) {
393 if let Err(e) = network.run_receive_loop().await {
394 error!(error = %e, "Network receive loop error");
395 break;
396 }
397 }
398 network_shutdown_clone.store(true, Ordering::SeqCst);
399 });
400
401 let cov_network = network_handle.clone();
403 let metrics_clone = self.metrics.clone();
404 let shutdown_cov = self.shutdown.clone();
405 let cov_task = tokio::spawn(async move {
406 while !shutdown_cov.load(Ordering::SeqCst) {
407 tokio::select! {
408 Some(notification) = cov_rx.recv() => {
409 if let Err(e) = send_cov_notification(&cov_network, notification).await {
410 warn!(error = %e, "Failed to send COV notification");
411 } else {
412 metrics_clone.record_cov_notification_sent();
413 }
414 }
415 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
416 }
418 }
419 }
420 });
421
422 let bbmd_clone = self.bbmd.clone();
425 let bbmd_cleanup_enabled = self.bbmd.is_enabled();
426 let cleanup_interval = Duration::from_secs(30);
427
428 let cov_objects_registry = self.objects.clone();
430 let cov_manager_poll = cov_manager.clone();
431 let cov_check_interval = self.config.cov_check_interval;
432 let mut cov_ticker = tokio::time::interval(cov_check_interval);
433 cov_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
434
435 let mut shutdown_rx = self.shutdown_tx.subscribe();
436 let mut cleanup_ticker = tokio::time::interval(cleanup_interval);
437 cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
438
439 loop {
441 tokio::select! {
442 Some(packet) = recv_rx.recv() => {
444 self.metrics.record_request();
445 self.metrics.record_bytes_received(packet.data.len() as u64);
446
447 if let Err(e) = self.process_packet(&packet, &network_handle, &cov_manager).await {
448 debug!(error = %e, "Error processing packet");
449 self.metrics.record_error();
450 }
451 }
452
453 _ = cleanup_ticker.tick() => {
455 if bbmd_cleanup_enabled {
456 let expired = bbmd_clone.cleanup();
457 if expired > 0 {
458 debug!(expired, "BBMD FDT cleanup completed");
459 }
460 }
461
462 let stale = {
464 let mut assembler = self.segment_assembler.lock();
465 assembler.cleanup()
466 };
467 if stale > 0 {
468 debug!(stale, "Segment assembler cleanup completed");
469 }
470 }
471
472 _ = cov_ticker.tick() => {
474 for obj in cov_objects_registry.iter() {
476 if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::AnalogInput>() {
477 if cov_obj.check_cov() {
478 let values = cov_obj.cov_values();
479 cov_obj.reset_cov();
480 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
481 }
482 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::AnalogOutput>() {
483 if cov_obj.check_cov() {
484 let values = cov_obj.cov_values();
485 cov_obj.reset_cov();
486 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
487 }
488 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::AnalogValue>() {
489 if cov_obj.check_cov() {
490 let values = cov_obj.cov_values();
491 cov_obj.reset_cov();
492 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
493 }
494 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::BinaryInput>() {
495 if cov_obj.check_cov() {
496 let values = cov_obj.cov_values();
497 cov_obj.reset_cov();
498 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
499 }
500 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::BinaryOutput>() {
501 if cov_obj.check_cov() {
502 let values = cov_obj.cov_values();
503 cov_obj.reset_cov();
504 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
505 }
506 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::BinaryValue>() {
507 if cov_obj.check_cov() {
508 let values = cov_obj.cov_values();
509 cov_obj.reset_cov();
510 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
511 }
512 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::MultiStateInput>() {
513 if cov_obj.check_cov() {
514 let values = cov_obj.cov_values();
515 cov_obj.reset_cov();
516 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
517 }
518 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::MultiStateOutput>() {
519 if cov_obj.check_cov() {
520 let values = cov_obj.cov_values();
521 cov_obj.reset_cov();
522 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
523 }
524 } else if let Some(cov_obj) = obj.as_any().downcast_ref::<crate::object::standard::MultiStateValue>() {
525 if cov_obj.check_cov() {
526 let values = cov_obj.cov_values();
527 cov_obj.reset_cov();
528 let _ = cov_manager_poll.notify_change(obj.object_identifier(), values).await;
529 }
530 }
531 }
532
533 cov_manager_poll.cleanup_expired();
535 }
536
537 _ = shutdown_rx.recv() => {
539 info!("Shutdown signal received");
540 break;
541 }
542 }
543 }
544
545 network_shutdown.store(true, Ordering::SeqCst);
547
548 let _ = tokio::time::timeout(self.config.shutdown_timeout, async {
550 let _ = network_task.await;
551 let _ = cov_task.await;
552 })
553 .await;
554
555 let _ = self.event_tx.send(ServerEvent::Stopped);
556 info!("BACnet/IP server stopped");
557
558 Ok(())
559 }
560
561 async fn process_packet(
563 &self,
564 packet: &IncomingPacket,
565 network: &NetworkHandle,
566 _cov_manager: &Arc<CovManager>,
567 ) -> BacnetResult<()> {
568 let timer = LatencyTimer::start();
569
570 let bvlc = match &packet.bvlc {
572 Some(msg) => msg,
573 None => {
574 debug!(source = %packet.source, "Invalid BVLC message");
575 return Err(BacnetError::Protocol("Invalid BVLC message".into()));
576 }
577 };
578
579 if self.bbmd.is_enabled() {
583 match bvlc.header.function {
584 BvlcFunction::RegisterForeignDevice
585 | BvlcFunction::ReadForeignDeviceTable
586 | BvlcFunction::DeleteForeignDeviceTableEntry
587 | BvlcFunction::ReadBroadcastDistributionTable
588 | BvlcFunction::WriteBroadcastDistributionTable => {
589 let source_v4 = match packet.source {
591 SocketAddr::V4(v4) => v4,
592 SocketAddr::V6(_) => {
593 debug!("BBMD does not support IPv6 sources");
594 return Ok(());
595 }
596 };
597
598 if let Some(response) = self.bbmd.handle_message(bvlc, source_v4) {
599 if bvlc.header.function == BvlcFunction::RegisterForeignDevice {
601 self.metrics.record_bbmd_foreign_registration();
602 }
603
604 let response_bytes = response.encode();
605 self.metrics.record_bytes_sent(response_bytes.len() as u64);
606 network.send_to(&response_bytes, packet.source).await?;
607 }
608
609 let latency = timer.elapsed_us();
610 self.metrics.record_success(latency);
611 return Ok(());
612 }
613 BvlcFunction::DistributeBroadcastToNetwork => {
614 let source_v4 = match packet.source {
618 SocketAddr::V4(v4) => v4,
619 _ => SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0),
620 };
621
622 let forward_addrs = self.bbmd.get_forward_addresses(Some(&source_v4));
623 if !forward_addrs.is_empty() {
624 let forwarded = BvlcMessage::forwarded_npdu(bvlc.npdu.clone(), source_v4);
626 let forwarded_bytes = forwarded.encode();
627
628 for addr in &forward_addrs {
629 let dest = SocketAddr::V4(*addr);
630 if let Err(e) = network.send_to(&forwarded_bytes, dest).await {
631 warn!(dest = %dest, error = %e, "Failed to forward broadcast");
632 } else {
633 self.metrics.record_bbmd_forwarded();
634 self.metrics.record_bytes_sent(forwarded_bytes.len() as u64);
635 }
636 }
637
638 debug!(
639 source = %packet.source,
640 forwarded_to = forward_addrs.len(),
641 "Distributed broadcast to network"
642 );
643 }
644 }
646 BvlcFunction::OriginalBroadcastNpdu => {
647 let source_v4 = match packet.source {
649 SocketAddr::V4(v4) => v4,
650 _ => SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0),
651 };
652
653 let forward_addrs = self.bbmd.get_forward_addresses(Some(&source_v4));
654 if !forward_addrs.is_empty() {
655 let forwarded = BvlcMessage::forwarded_npdu(bvlc.npdu.clone(), source_v4);
656 let forwarded_bytes = forwarded.encode();
657
658 for addr in &forward_addrs {
659 let dest = SocketAddr::V4(*addr);
660 if let Err(e) = network.send_to(&forwarded_bytes, dest).await {
661 warn!(dest = %dest, error = %e, "Failed to forward broadcast");
662 } else {
663 self.metrics.record_bbmd_forwarded();
664 self.metrics.record_bytes_sent(forwarded_bytes.len() as u64);
665 }
666 }
667 }
668 }
670 _ => {
671 }
673 }
674 }
675
676 let npdu_data = match bvlc.npdu() {
678 Some(data) => data,
679 None => {
680 debug!(function = ?bvlc.header.function, "No NPDU in BVLC message");
681 return Ok(()); }
683 };
684
685 let npdu = Npdu::decode(npdu_data).map_err(|e| BacnetError::Protocol(e.to_string()))?;
687
688 if npdu.is_network_message() {
690 debug!("Network layer message, skipping");
691 return Ok(());
692 }
693
694 let apdu = npdu.apdu();
696 if apdu.is_empty() {
697 return Err(BacnetError::Protocol("Empty APDU".into()));
698 }
699
700 let apdu_type_byte = (apdu[0] >> 4) & 0x0F;
702 let apdu_type = ApduType::from_nibble(apdu_type_byte).ok_or_else(|| {
703 BacnetError::Protocol(format!("Unknown APDU type: {}", apdu_type_byte))
704 })?;
705
706 let ctx = ServiceContext::new(self.objects.clone(), self.config.device_instance)
708 .with_source_address(packet.source);
709
710 let response = match apdu_type {
712 ApduType::ConfirmedRequest => {
713 self.metrics.record_confirmed_request();
714 self.process_confirmed_request(apdu, &ctx, packet.source)?
715 }
716 ApduType::UnconfirmedRequest => {
717 self.metrics.record_unconfirmed_request();
718 self.process_unconfirmed_request(apdu, &ctx, packet.source)?
719 }
720 ApduType::SegmentAck => {
721 self.metrics.record_segment_ack_received();
727 debug!(source = %packet.source, "SegmentACK received");
728 return Ok(());
729 }
730 _ => {
731 debug!(apdu_type = ?apdu_type, "Unsupported APDU type");
732 return Ok(());
733 }
734 };
735
736 match response {
738 ProcessedResponse::Single(response_apdu, dest) => {
739 self.send_response(&response_apdu, dest, &npdu, bvlc, network)
740 .await?;
741 }
742 ProcessedResponse::Segmented {
743 segments,
744 dest,
745 invoke_id,
746 service_choice,
747 } => {
748 self.send_segmented_response(
749 &segments,
750 dest,
751 invoke_id,
752 service_choice,
753 &npdu,
754 bvlc,
755 network,
756 )
757 .await?;
758 }
759 ProcessedResponse::None => {}
760 }
761
762 let latency = timer.elapsed_us();
763 self.metrics.record_success(latency);
764
765 Ok(())
766 }
767
768 fn process_confirmed_request(
779 &self,
780 apdu: &[u8],
781 ctx: &ServiceContext,
782 source: SocketAddr,
783 ) -> BacnetResult<ProcessedResponse> {
784 if apdu.len() < 3 {
787 debug!(
788 "Confirmed request too short ({} bytes), discarding",
789 apdu.len()
790 );
791 self.metrics.record_error();
792 return Ok(ProcessedResponse::None);
793 }
794
795 let pdu_type_byte = apdu[0];
803 let segmented = (pdu_type_byte & 0x08) != 0;
804 let more_follows = (pdu_type_byte & 0x04) != 0;
805 let _segmented_response_accepted = (pdu_type_byte & 0x02) != 0;
806
807 let invoke_id = apdu[2];
808
809 if segmented {
811 if apdu.len() < 5 {
812 let abort = build_abort_apdu(invoke_id, AbortReason::Other);
813 self.metrics.record_error();
814 return Ok(ProcessedResponse::Single(abort, source));
815 }
816
817 let sequence_number = apdu[3];
818 let _proposed_window = apdu[4];
819
820 let (service_choice_opt, segment_data) = if sequence_number == 0 {
822 if apdu.len() < 6 {
824 let reject =
825 build_reject_apdu(invoke_id, RejectReason::MissingRequiredParameter as u8);
826 self.metrics.record_error();
827 return Ok(ProcessedResponse::Single(reject, source));
828 }
829 (Some(apdu[5]), apdu[6..].to_vec())
830 } else {
831 (None, apdu[5..].to_vec())
833 };
834
835 let mut segment = Segment::new(sequence_number, more_follows, invoke_id, segment_data);
837 if let Some(sc) = service_choice_opt {
838 segment = segment.with_service_choice(sc);
839 }
840
841 let source_hash = hash_socket_addr(&source);
843
844 self.metrics.record_segment_received();
845
846 let assembly_result = {
847 let mut assembler = self.segment_assembler.lock();
848 assembler.process_segment(source_hash, &segment)
849 };
850
851 match assembly_result {
852 Ok(AssemblyResult::NeedAck(ack_seq)) => {
853 debug!(
855 invoke_id = invoke_id,
856 sequence = ack_seq,
857 "Sending SegmentACK for intermediate segment"
858 );
859 let ack = build_segment_ack_apdu(invoke_id, ack_seq, DEFAULT_WINDOW_SIZE, true);
860 self.metrics.record_segment_ack_sent();
861 return Ok(ProcessedResponse::Single(ack, source));
862 }
863 Ok(AssemblyResult::Complete) => {
864 let (assembled_data, assembled_service_choice) = {
866 let mut assembler = self.segment_assembler.lock();
867 match assembler.get_complete(source_hash, invoke_id) {
868 Some(result) => result,
869 None => {
870 let abort = build_abort_apdu(invoke_id, AbortReason::Other);
872 self.metrics.record_error();
873 return Ok(ProcessedResponse::Single(abort, source));
874 }
875 }
876 };
877
878 self.metrics.record_segmented_request_reassembled();
879
880 let service_choice = assembled_service_choice.unwrap_or(0);
881 debug!(
882 invoke_id = invoke_id,
883 service_choice = service_choice,
884 assembled_size = assembled_data.len(),
885 "Segmented request fully reassembled"
886 );
887
888 return self.dispatch_and_respond(
889 invoke_id,
890 service_choice,
891 &assembled_data,
892 ctx,
893 source,
894 );
895 }
896 Ok(AssemblyResult::Duplicate) => {
897 debug!(invoke_id = invoke_id, "Duplicate segment ignored");
898 return Ok(ProcessedResponse::None);
899 }
900 Err(e) => {
901 warn!(
903 invoke_id = invoke_id,
904 error = %e,
905 "Segment assembly error"
906 );
907 let abort = build_abort_apdu(invoke_id, AbortReason::Other);
908 self.metrics.record_error();
909 return Ok(ProcessedResponse::Single(abort, source));
910 }
911 }
912 }
913
914 if apdu.len() < 4 {
916 let reject = build_reject_apdu(invoke_id, RejectReason::MissingRequiredParameter as u8);
917 self.metrics.record_error();
918 return Ok(ProcessedResponse::Single(reject, source));
919 }
920
921 let service_choice = apdu[3];
922 let service_data = &apdu[4..];
923
924 self.dispatch_and_respond(invoke_id, service_choice, service_data, ctx, source)
925 }
926
927 fn dispatch_and_respond(
939 &self,
940 invoke_id: u8,
941 service_choice: u8,
942 service_data: &[u8],
943 ctx: &ServiceContext,
944 source: SocketAddr,
945 ) -> BacnetResult<ProcessedResponse> {
946 let tsm_key = TransactionKey::new(source, invoke_id);
947
948 match self.tsm.begin_transaction(tsm_key, service_choice) {
950 Ok(Some(cached_response)) => {
951 debug!(
953 invoke_id = invoke_id,
954 source = %source,
955 "Duplicate request, returning cached response"
956 );
957 return Ok(ProcessedResponse::Single(cached_response, source));
958 }
959 Ok(None) => {
960 }
962 Err(crate::service::tsm::TsmError::DuplicateInProgress) => {
963 debug!(
965 invoke_id = invoke_id,
966 source = %source,
967 "Duplicate request while processing, ignoring"
968 );
969 return Ok(ProcessedResponse::None);
970 }
971 Err(crate::service::tsm::TsmError::AtCapacity) => {
972 warn!("TSM at capacity, aborting request");
974 let abort = build_abort_apdu(invoke_id, AbortReason::OutOfResources);
975 return Ok(ProcessedResponse::Single(abort, source));
976 }
977 }
978
979 if let Some(service) = ConfirmedService::from_u8(service_choice) {
981 match service {
982 ConfirmedService::ReadProperty | ConfirmedService::ReadPropertyMultiple => {
983 self.metrics.record_read_property();
984 }
985 ConfirmedService::WriteProperty | ConfirmedService::WritePropertyMultiple => {
986 self.metrics.record_write_property();
987 }
988 ConfirmedService::SubscribeCov | ConfirmedService::SubscribeCovProperty => {
989 self.metrics.record_cov_subscription();
990 }
991 _ => {}
992 }
993 }
994
995 let ctx_with_invoke = ctx.clone().with_invoke_id(invoke_id);
997 let result =
998 self.services
999 .dispatch_confirmed(service_choice, service_data, &ctx_with_invoke);
1000
1001 let response = match result {
1003 ServiceResult::SimpleAck => {
1004 let apdu = vec![0x20, invoke_id, service_choice];
1005 let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1006 if !should_send {
1007 debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1008 return Ok(ProcessedResponse::None);
1009 }
1010 ProcessedResponse::Single(apdu, source)
1011 }
1012 ServiceResult::ComplexAck(data) => {
1013 let unsegmented_len = 3 + data.len();
1015
1016 if unsegmented_len <= self.config.max_apdu_length as usize {
1017 let mut apdu = vec![0x30, invoke_id, service_choice];
1019 apdu.extend_from_slice(&data);
1020
1021 let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1022 if !should_send {
1023 debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1024 return Ok(ProcessedResponse::None);
1025 }
1026 ProcessedResponse::Single(apdu, source)
1027 } else if self.segment_transmitter.needs_segmentation(data.len()) {
1028 let segments = self.segment_transmitter.segment(&data, invoke_id);
1031 let segment_count = segments.len();
1032
1033 debug!(
1034 invoke_id = invoke_id,
1035 total_size = data.len(),
1036 segment_count = segment_count,
1037 "ComplexACK requires segmentation"
1038 );
1039
1040 let mut full_response = vec![0x30, invoke_id, service_choice];
1042 full_response.extend_from_slice(&data);
1043 let should_send = self.tsm.complete_transaction(&tsm_key, full_response);
1044 if !should_send {
1045 debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1046 return Ok(ProcessedResponse::None);
1047 }
1048
1049 self.metrics.record_segmented_response_transmitted();
1050 self.metrics.record_segments_sent(segment_count as u64);
1051
1052 let segment_data: Vec<Vec<u8>> = segments.into_iter().map(|s| s.data).collect();
1054
1055 ProcessedResponse::Segmented {
1056 segments: segment_data,
1057 dest: source,
1058 invoke_id,
1059 service_choice,
1060 }
1061 } else {
1062 let mut apdu = vec![0x30, invoke_id, service_choice];
1065 apdu.extend_from_slice(&data);
1066 let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1067 if !should_send {
1068 debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1069 return Ok(ProcessedResponse::None);
1070 }
1071 ProcessedResponse::Single(apdu, source)
1072 }
1073 }
1074 ServiceResult::Error {
1075 error_class,
1076 error_code,
1077 } => {
1078 let apdu = build_error_apdu(invoke_id, service_choice, error_class, error_code);
1079 let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1080 if !should_send {
1081 debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1082 return Ok(ProcessedResponse::None);
1083 }
1084 ProcessedResponse::Single(apdu, source)
1085 }
1086 ServiceResult::Reject(reason) => {
1087 let apdu = build_reject_apdu(invoke_id, reason);
1088 let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1089 if !should_send {
1090 debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1091 return Ok(ProcessedResponse::None);
1092 }
1093 ProcessedResponse::Single(apdu, source)
1094 }
1095 ServiceResult::Abort(reason) => {
1096 let apdu = build_abort_apdu(invoke_id, reason);
1097 let should_send = self.tsm.complete_transaction(&tsm_key, apdu.clone());
1098 if !should_send {
1099 debug!(invoke_id, "Response intentionally dropped (chaos testing)");
1100 return Ok(ProcessedResponse::None);
1101 }
1102 ProcessedResponse::Single(apdu, source)
1103 }
1104 ServiceResult::NoResponse | ServiceResult::Broadcast(_) => {
1105 return Ok(ProcessedResponse::None);
1106 }
1107 };
1108
1109 Ok(response)
1110 }
1111
1112 fn process_unconfirmed_request(
1114 &self,
1115 apdu: &[u8],
1116 ctx: &ServiceContext,
1117 source: SocketAddr,
1118 ) -> BacnetResult<ProcessedResponse> {
1119 if apdu.len() < 2 {
1120 return Err(BacnetError::Protocol(
1121 "Unconfirmed request too short".into(),
1122 ));
1123 }
1124
1125 let _pdu_type = apdu[0]; let service_choice = apdu[1];
1128 let service_data = &apdu[2..];
1129
1130 if service_choice == UnconfirmedService::WhoIs as u8 {
1132 self.metrics.record_who_is();
1133 }
1134
1135 let result = self
1137 .services
1138 .dispatch_unconfirmed(service_choice, service_data, ctx);
1139
1140 match result {
1141 ServiceResult::Broadcast(data) => {
1142 self.metrics.record_i_am_sent();
1144
1145 let mut apdu = vec![0x10, UnconfirmedService::IAm as u8];
1147 apdu.extend_from_slice(&data);
1148
1149 Ok(ProcessedResponse::Single(apdu, source))
1151 }
1152 ServiceResult::NoResponse => Ok(ProcessedResponse::None),
1153 _ => Ok(ProcessedResponse::None),
1154 }
1155 }
1156
1157 async fn send_response(
1159 &self,
1160 response_apdu: &[u8],
1161 dest: SocketAddr,
1162 npdu: &Npdu,
1163 bvlc: &BvlcMessage,
1164 network: &NetworkHandle,
1165 ) -> BacnetResult<()> {
1166 let response_npdu = if npdu.expects_reply() {
1167 Npdu::simple(response_apdu.to_vec())
1168 } else {
1169 Npdu::no_reply(response_apdu.to_vec())
1170 };
1171
1172 let response_bvlc = if bvlc.is_broadcast() {
1173 BvlcMessage::original_broadcast(response_npdu.encode())
1174 } else {
1175 BvlcMessage::original_unicast(response_npdu.encode())
1176 };
1177
1178 let response_bytes = response_bvlc.encode();
1179 self.metrics.record_bytes_sent(response_bytes.len() as u64);
1180
1181 network.send_to(&response_bytes, dest).await
1182 }
1183
1184 async fn send_segmented_response(
1192 &self,
1193 segments: &[Vec<u8>],
1194 dest: SocketAddr,
1195 invoke_id: u8,
1196 service_choice: u8,
1197 npdu: &Npdu,
1198 bvlc: &BvlcMessage,
1199 network: &NetworkHandle,
1200 ) -> BacnetResult<()> {
1201 let total = segments.len();
1202
1203 for (i, segment_data) in segments.iter().enumerate() {
1204 let more_follows = i < total - 1;
1205 let sequence_number = i as u8;
1206
1207 let mut encoder = ApduEncoder::new();
1209 encoder.encode_segmented_complex_ack_header(
1210 invoke_id,
1211 sequence_number,
1212 DEFAULT_WINDOW_SIZE,
1213 more_follows,
1214 service_choice,
1215 );
1216 encoder.put_bytes(segment_data);
1217 let segment_apdu = encoder.into_bytes();
1218
1219 let response_npdu = if npdu.expects_reply() {
1221 Npdu::simple(segment_apdu)
1222 } else {
1223 Npdu::no_reply(segment_apdu)
1224 };
1225
1226 let response_bvlc = if bvlc.is_broadcast() {
1227 BvlcMessage::original_broadcast(response_npdu.encode())
1228 } else {
1229 BvlcMessage::original_unicast(response_npdu.encode())
1230 };
1231
1232 let response_bytes = response_bvlc.encode();
1233 self.metrics.record_bytes_sent(response_bytes.len() as u64);
1234
1235 network.send_to(&response_bytes, dest).await?;
1236
1237 debug!(
1238 invoke_id = invoke_id,
1239 sequence = sequence_number,
1240 more = more_follows,
1241 size = segment_data.len(),
1242 "Sent segment {}/{}",
1243 i + 1,
1244 total
1245 );
1246 }
1247
1248 debug!(
1249 invoke_id = invoke_id,
1250 total_segments = total,
1251 "Segmented ComplexACK fully transmitted"
1252 );
1253
1254 Ok(())
1255 }
1256
1257 pub fn segment_assembler(&self) -> &Mutex<SegmentAssembler> {
1259 &self.segment_assembler
1260 }
1261}
1262
1263impl ServiceContext {
1264 fn clone(&self) -> Self {
1266 Self {
1267 objects: self.objects.clone(),
1268 device_instance: self.device_instance,
1269 invoke_id: self.invoke_id,
1270 max_apdu_length: self.max_apdu_length,
1271 source_address: self.source_address,
1272 }
1273 }
1274}
1275
1276fn build_error_apdu(
1281 invoke_id: u8,
1282 service_choice: u8,
1283 error_class: ErrorClass,
1284 error_code: ErrorCode,
1285) -> Vec<u8> {
1286 let mut encoder = ApduEncoder::new();
1287 encoder.encode_error_pdu(
1288 invoke_id,
1289 service_choice,
1290 error_class as u32,
1291 error_code as u32,
1292 );
1293 encoder.into_bytes()
1294}
1295
1296fn build_reject_apdu(invoke_id: u8, reject_reason: u8) -> Vec<u8> {
1302 let mut encoder = ApduEncoder::new();
1303 encoder.encode_reject_pdu(invoke_id, reject_reason);
1304 encoder.into_bytes()
1305}
1306
1307fn build_abort_apdu(invoke_id: u8, abort_reason: AbortReason) -> Vec<u8> {
1314 let mut encoder = ApduEncoder::new();
1315 encoder.encode_abort_pdu(invoke_id, abort_reason as u8, true);
1316 encoder.into_bytes()
1317}
1318
1319fn build_segment_ack_apdu(
1324 invoke_id: u8,
1325 sequence_number: u8,
1326 actual_window_size: u8,
1327 sent_by_server: bool,
1328) -> Vec<u8> {
1329 let mut encoder = ApduEncoder::new();
1330 encoder.encode_segment_ack_pdu(
1331 invoke_id,
1332 sequence_number,
1333 actual_window_size,
1334 sent_by_server,
1335 false, );
1337 encoder.into_bytes()
1338}
1339
1340fn hash_socket_addr(addr: &SocketAddr) -> u64 {
1345 let mut hasher = DefaultHasher::new();
1346 addr.hash(&mut hasher);
1347 hasher.finish()
1348}
1349
1350static COV_INVOKE_ID: std::sync::atomic::AtomicU8 = std::sync::atomic::AtomicU8::new(0);
1352
1353async fn send_cov_notification(
1355 network: &NetworkHandle,
1356 notification: CovNotification,
1357) -> BacnetResult<()> {
1358 let apdu = if notification.confirmed {
1359 let invoke_id = COV_INVOKE_ID.fetch_add(1, Ordering::Relaxed);
1361 notification.encode_confirmed(invoke_id)
1362 } else {
1363 let mut apdu = vec![0x10, UnconfirmedService::UnconfirmedCovNotification as u8];
1365 apdu.extend_from_slice(¬ification.encode_unconfirmed());
1366 apdu
1367 };
1368
1369 let mut npdu = Npdu::no_reply(apdu);
1370 if notification.confirmed {
1371 npdu.control.expecting_reply = true;
1372 }
1373 let bvlc = BvlcMessage::original_unicast(npdu.encode());
1374
1375 network
1376 .send_to(&bvlc.encode(), notification.destination)
1377 .await
1378}
1379
1380#[cfg(test)]
1381mod tests {
1382 use super::*;
1383
1384 #[test]
1385 fn test_server_config_default() {
1386 let config = ServerConfig::default();
1387 assert_eq!(config.bind_addr.port(), 47808);
1388 assert_eq!(config.max_apdu_length, 1476);
1389 }
1390
1391 #[test]
1392 fn test_server_config_builder() {
1393 let config = ServerConfig::new(5678)
1394 .with_device_name("Test Device")
1395 .with_vendor_id(123);
1396
1397 assert_eq!(config.device_instance, 5678);
1398 assert_eq!(config.device_name, "Test Device");
1399 assert_eq!(config.vendor_id, 123);
1400 }
1401
1402 #[test]
1403 fn test_build_error_apdu() {
1404 let apdu = build_error_apdu(1, 12, ErrorClass::Property, ErrorCode::UnknownProperty);
1405
1406 assert_eq!(apdu[0], 0x50); assert_eq!(apdu[1], 1); assert_eq!(apdu[2], 12); assert_eq!(apdu[3], 0x91);
1413 assert_eq!(apdu[4], 2); assert_eq!(apdu[5], 0x91);
1417 assert_eq!(apdu[6], 32); assert_eq!(apdu.len(), 7);
1420 }
1421
1422 #[test]
1423 fn test_build_error_apdu_unknown_object() {
1424 let apdu = build_error_apdu(3, 12, ErrorClass::Object, ErrorCode::UnknownObject);
1425
1426 assert_eq!(apdu[0], 0x50);
1427 assert_eq!(apdu[1], 3);
1428 assert_eq!(apdu[2], 12);
1429
1430 assert_eq!(apdu[3], 0x91);
1432 assert_eq!(apdu[4], 1);
1433
1434 assert_eq!(apdu[5], 0x91);
1436 assert_eq!(apdu[6], 31);
1437 }
1438
1439 #[test]
1440 fn test_build_reject_apdu() {
1441 let apdu = build_reject_apdu(7, RejectReason::UnrecognizedService as u8);
1442
1443 assert_eq!(apdu.len(), 3);
1444 assert_eq!(apdu[0], 0x60); assert_eq!(apdu[1], 7); assert_eq!(apdu[2], 9); }
1448
1449 #[test]
1450 fn test_build_reject_apdu_missing_param() {
1451 let apdu = build_reject_apdu(2, RejectReason::MissingRequiredParameter as u8);
1452
1453 assert_eq!(apdu[0], 0x60);
1454 assert_eq!(apdu[1], 2);
1455 assert_eq!(apdu[2], 5); }
1457
1458 #[test]
1459 fn test_build_abort_apdu_segmentation() {
1460 let apdu = build_abort_apdu(10, AbortReason::SegmentationNotSupported);
1461
1462 assert_eq!(apdu.len(), 3);
1463 assert_eq!(apdu[0], 0x71); assert_eq!(apdu[1], 10); assert_eq!(apdu[2], 4); }
1467
1468 #[test]
1469 fn test_build_abort_apdu_buffer_overflow() {
1470 let apdu = build_abort_apdu(5, AbortReason::BufferOverflow);
1471
1472 assert_eq!(apdu[0], 0x71); assert_eq!(apdu[1], 5);
1474 assert_eq!(apdu[2], 1); }
1476
1477 #[test]
1478 fn test_build_abort_apdu_too_long() {
1479 let apdu = build_abort_apdu(3, AbortReason::ApduTooLong);
1480
1481 assert_eq!(apdu[0], 0x71);
1482 assert_eq!(apdu[1], 3);
1483 assert_eq!(apdu[2], 11); }
1485
1486 #[test]
1487 fn test_build_abort_apdu_out_of_resources() {
1488 let apdu = build_abort_apdu(1, AbortReason::OutOfResources);
1489
1490 assert_eq!(apdu[0], 0x71);
1491 assert_eq!(apdu[1], 1);
1492 assert_eq!(apdu[2], 9); }
1494
1495 #[test]
1496 fn test_build_abort_other() {
1497 let apdu = build_abort_apdu(0, AbortReason::Other);
1498
1499 assert_eq!(apdu[0], 0x71);
1500 assert_eq!(apdu[1], 0);
1501 assert_eq!(apdu[2], 0); }
1503
1504 #[test]
1507 fn test_build_segment_ack_apdu_server() {
1508 let apdu = build_segment_ack_apdu(42, 3, 1, true);
1509
1510 assert_eq!(apdu.len(), 4);
1511 assert_eq!(apdu[0], 0x41); assert_eq!(apdu[1], 42); assert_eq!(apdu[2], 3); assert_eq!(apdu[3], 1); }
1516
1517 #[test]
1518 fn test_build_segment_ack_apdu_client() {
1519 let apdu = build_segment_ack_apdu(10, 0, 4, false);
1520
1521 assert_eq!(apdu.len(), 4);
1522 assert_eq!(apdu[0], 0x40); assert_eq!(apdu[1], 10);
1524 assert_eq!(apdu[2], 0);
1525 assert_eq!(apdu[3], 4);
1526 }
1527
1528 #[test]
1531 fn test_hash_socket_addr_deterministic() {
1532 let addr1: SocketAddr = "192.168.1.100:47808".parse().unwrap();
1533 let h1 = hash_socket_addr(&addr1);
1534 let h2 = hash_socket_addr(&addr1);
1535 assert_eq!(h1, h2);
1536 }
1537
1538 #[test]
1539 fn test_hash_socket_addr_different_addresses() {
1540 let addr1: SocketAddr = "192.168.1.100:47808".parse().unwrap();
1541 let addr2: SocketAddr = "192.168.1.101:47808".parse().unwrap();
1542 let h1 = hash_socket_addr(&addr1);
1543 let h2 = hash_socket_addr(&addr2);
1544 assert_ne!(h1, h2);
1545 }
1546
1547 #[test]
1550 fn test_segmented_complex_ack_header_first_segment() {
1551 let mut encoder = ApduEncoder::new();
1552 encoder.encode_segmented_complex_ack_header(
1553 1, 0, 1, true, 12, );
1559 let bytes = encoder.into_bytes();
1560
1561 assert_eq!(bytes.len(), 5);
1562 assert_eq!(bytes[0], 0x3C);
1564 assert_eq!(bytes[1], 1); assert_eq!(bytes[2], 0); assert_eq!(bytes[3], 1); assert_eq!(bytes[4], 12); }
1569
1570 #[test]
1571 fn test_segmented_complex_ack_header_last_segment() {
1572 let mut encoder = ApduEncoder::new();
1573 encoder.encode_segmented_complex_ack_header(
1574 5, 3, 1, false, 14, );
1580 let bytes = encoder.into_bytes();
1581
1582 assert_eq!(bytes.len(), 5);
1583 assert_eq!(bytes[0], 0x38);
1585 assert_eq!(bytes[1], 5);
1586 assert_eq!(bytes[2], 3);
1587 assert_eq!(bytes[3], 1);
1588 assert_eq!(bytes[4], 14);
1589 }
1590
1591 #[test]
1594 fn test_segment_transmitter_sizing() {
1595 let max_apdu: u16 = 1476;
1597 let header_overhead = 5;
1598 let max_seg = (max_apdu as usize) - header_overhead;
1599 let transmitter = SegmentTransmitter::new(max_seg);
1600
1601 assert!(!transmitter.needs_segmentation(1471));
1603 assert!(!transmitter.needs_segmentation(1000));
1604
1605 assert!(transmitter.needs_segmentation(1472));
1607 assert!(transmitter.needs_segmentation(3000));
1608
1609 assert_eq!(transmitter.calculate_segment_count(1471), 1);
1611 assert_eq!(transmitter.calculate_segment_count(1472), 2);
1612 assert_eq!(transmitter.calculate_segment_count(2942), 2);
1613 assert_eq!(transmitter.calculate_segment_count(2943), 3);
1614 }
1615
1616 #[test]
1617 fn test_segment_transmitter_round_trip_with_headers() {
1618 let max_apdu: u16 = 100;
1620 let header_overhead = 5;
1621 let max_seg = (max_apdu as usize) - header_overhead; let transmitter = SegmentTransmitter::new(max_seg);
1623
1624 let invoke_id = 7;
1625 let service_choice = 14; let original_data: Vec<u8> = (0..300).map(|i| (i % 256) as u8).collect();
1627
1628 let segments = transmitter.segment(&original_data, invoke_id);
1629 assert!(segments.len() > 1);
1630
1631 let mut packets: Vec<Vec<u8>> = Vec::new();
1633 let total = segments.len();
1634 for (i, seg) in segments.iter().enumerate() {
1635 let more = i < total - 1;
1636 let mut enc = ApduEncoder::new();
1637 enc.encode_segmented_complex_ack_header(invoke_id, i as u8, 1, more, service_choice);
1638 enc.put_bytes(&seg.data);
1639 packets.push(enc.into_bytes());
1640 }
1641
1642 for (i, packet) in packets.iter().enumerate() {
1644 let more = i < total - 1;
1645 if more {
1647 assert_eq!(packet[0] & 0x0C, 0x0C); } else {
1649 assert_eq!(packet[0] & 0x0C, 0x08); }
1651 assert_eq!(packet[1], invoke_id);
1652 assert_eq!(packet[2], i as u8); assert_eq!(packet[3], 1); assert_eq!(packet[4], service_choice);
1655 }
1656
1657 let mut reassembled = Vec::new();
1659 for packet in &packets {
1660 reassembled.extend_from_slice(&packet[5..]); }
1662 assert_eq!(reassembled, original_data);
1663 }
1664
1665 #[test]
1668 fn test_segment_assembler_reassembly_flow() {
1669 let mut assembler = SegmentAssembler::default();
1670 let source_hash: u64 = 12345;
1671 let invoke_id = 1;
1672
1673 let seg1 = Segment::new(0, true, invoke_id, vec![1, 2, 3]).with_service_choice(12);
1675 let result = assembler.process_segment(source_hash, &seg1).unwrap();
1676 assert!(matches!(result, AssemblyResult::NeedAck(0)));
1677
1678 let seg2 = Segment::new(1, true, invoke_id, vec![4, 5, 6]);
1679 let result = assembler.process_segment(source_hash, &seg2).unwrap();
1680 assert!(matches!(result, AssemblyResult::NeedAck(1)));
1681
1682 let seg3 = Segment::new(2, false, invoke_id, vec![7, 8, 9]);
1683 let result = assembler.process_segment(source_hash, &seg3).unwrap();
1684 assert!(matches!(result, AssemblyResult::Complete));
1685
1686 let (data, service) = assembler.get_complete(source_hash, invoke_id).unwrap();
1688 assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
1689 assert_eq!(service, Some(12));
1690 }
1691
1692 #[test]
1695 fn test_segmentation_metrics() {
1696 let metrics = ServerMetrics::new();
1697
1698 metrics.record_segmented_request_reassembled();
1699 metrics.record_segmented_response_transmitted();
1700 metrics.record_segments_sent(5);
1701 metrics.record_segment_received();
1702 metrics.record_segment_received();
1703 metrics.record_segment_ack_sent();
1704 metrics.record_segment_ack_received();
1705
1706 let snapshot = metrics.snapshot();
1707 assert_eq!(snapshot.segmented_requests_reassembled, 1);
1708 assert_eq!(snapshot.segmented_responses_transmitted, 1);
1709 assert_eq!(snapshot.segments_sent, 5);
1710 assert_eq!(snapshot.segments_received, 2);
1711 assert_eq!(snapshot.segment_acks_sent, 1);
1712 assert_eq!(snapshot.segment_acks_received, 1);
1713 }
1714
1715 #[test]
1718 fn test_segment_ack_with_nak() {
1719 let mut encoder = ApduEncoder::new();
1720 encoder.encode_segment_ack_pdu(
1721 10, 5, 2, true, true, );
1724 let bytes = encoder.into_bytes();
1725
1726 assert_eq!(bytes.len(), 4);
1727 assert_eq!(bytes[0], 0x43);
1729 assert_eq!(bytes[1], 10);
1730 assert_eq!(bytes[2], 5);
1731 assert_eq!(bytes[3], 2);
1732 }
1733
1734 #[test]
1735 fn test_segment_ack_client_no_nak() {
1736 let mut encoder = ApduEncoder::new();
1737 encoder.encode_segment_ack_pdu(
1738 255, 0, 1, false, false, );
1741 let bytes = encoder.into_bytes();
1742
1743 assert_eq!(bytes[0], 0x40); assert_eq!(bytes[1], 255); assert_eq!(bytes[2], 0); assert_eq!(bytes[3], 1); }
1748
1749 #[test]
1752 fn test_server_with_bbmd_config() {
1753 let config = ServerConfig::new(1234);
1754 let registry = ObjectRegistry::new();
1755 let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1756
1757 assert!(server.bbmd().is_enabled());
1758 }
1759
1760 #[test]
1761 fn test_server_bbmd_disabled_by_default() {
1762 let config = ServerConfig::new(1234);
1763 let registry = ObjectRegistry::new();
1764 let server = BACnetServer::new(config, registry);
1765
1766 assert!(!server.bbmd().is_enabled());
1767 }
1768
1769 #[test]
1770 fn test_server_bbmd_foreign_device_registration() {
1771 let config = ServerConfig::new(1234);
1772 let registry = ObjectRegistry::new();
1773 let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1774
1775 let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1776 let ttl_data = [0x00, 0x3C]; let msg = BvlcMessage {
1779 header: crate::network::bvlc::BvlcHeader::new(BvlcFunction::RegisterForeignDevice, 6),
1780 npdu: ttl_data.to_vec(),
1781 original_source: None,
1782 result_code: None,
1783 };
1784
1785 let response = server.bbmd().handle_message(&msg, source);
1786 assert!(response.is_some());
1787
1788 assert!(server.bbmd().fdt().is_registered(&source));
1790 }
1791
1792 #[test]
1793 fn test_server_bbmd_forward_addresses() {
1794 let config = ServerConfig::new(1234);
1795 let registry = ObjectRegistry::new();
1796 let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1797
1798 let peer = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 1), 47808);
1800 server
1801 .bbmd()
1802 .bdt()
1803 .add(crate::network::bbmd::BdtEntry::new(
1804 peer,
1805 Ipv4Addr::new(255, 0, 0, 0),
1806 ))
1807 .unwrap();
1808
1809 let foreign = SocketAddrV4::new(Ipv4Addr::new(172, 16, 0, 50), 47808);
1811 server.bbmd().fdt().register(foreign, 120).unwrap();
1812
1813 let addrs = server.bbmd().get_forward_addresses(None);
1814 assert_eq!(addrs.len(), 2);
1815 assert!(addrs.contains(&peer));
1816 assert!(addrs.contains(&foreign));
1817 }
1818
1819 #[test]
1820 fn test_server_bbmd_forward_excludes_source() {
1821 let config = ServerConfig::new(1234);
1822 let registry = ObjectRegistry::new();
1823 let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1824
1825 let peer1 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 1), 47808);
1826 let peer2 = SocketAddrV4::new(Ipv4Addr::new(10, 0, 0, 2), 47808);
1827 server
1828 .bbmd()
1829 .bdt()
1830 .add(crate::network::bbmd::BdtEntry::new(
1831 peer1,
1832 Ipv4Addr::new(255, 0, 0, 0),
1833 ))
1834 .unwrap();
1835 server
1836 .bbmd()
1837 .bdt()
1838 .add(crate::network::bbmd::BdtEntry::new(
1839 peer2,
1840 Ipv4Addr::new(255, 0, 0, 0),
1841 ))
1842 .unwrap();
1843
1844 let addrs = server.bbmd().get_forward_addresses(Some(&peer1));
1846 assert_eq!(addrs.len(), 1);
1847 assert!(addrs.contains(&peer2));
1848 }
1849
1850 #[test]
1851 fn test_server_bbmd_bdt_write_and_read() {
1852 let config = ServerConfig::new(1234);
1853 let registry = ObjectRegistry::new();
1854 let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1855
1856 let mut bdt_data = Vec::new();
1858 bdt_data.extend_from_slice(&[192, 168, 1, 100]);
1860 bdt_data.extend_from_slice(&47808u16.to_be_bytes());
1861 bdt_data.extend_from_slice(&[255, 255, 255, 0]);
1862 bdt_data.extend_from_slice(&[10, 0, 0, 1]);
1864 bdt_data.extend_from_slice(&47808u16.to_be_bytes());
1865 bdt_data.extend_from_slice(&[255, 0, 0, 0]);
1866
1867 let write_msg = BvlcMessage {
1868 header: crate::network::bvlc::BvlcHeader::new(
1869 BvlcFunction::WriteBroadcastDistributionTable,
1870 (4 + bdt_data.len()) as u16,
1871 ),
1872 npdu: bdt_data,
1873 original_source: None,
1874 result_code: None,
1875 };
1876
1877 let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 1), 47808);
1878 let response = server.bbmd().handle_message(&write_msg, source);
1879 assert!(response.is_some());
1880 assert_eq!(server.bbmd().bdt().len(), 2);
1881
1882 let read_msg = BvlcMessage {
1884 header: crate::network::bvlc::BvlcHeader::new(
1885 BvlcFunction::ReadBroadcastDistributionTable,
1886 4,
1887 ),
1888 npdu: vec![],
1889 original_source: None,
1890 result_code: None,
1891 };
1892
1893 let response = server.bbmd().handle_message(&read_msg, source);
1894 assert!(response.is_some());
1895 let response = response.unwrap();
1896 assert_eq!(
1897 response.header.function,
1898 BvlcFunction::ReadBroadcastDistributionTableAck
1899 );
1900 assert_eq!(response.npdu.len(), 20);
1902 }
1903
1904 #[test]
1905 fn test_server_bbmd_fdt_cleanup() {
1906 let config = ServerConfig::new(1234);
1907 let registry = ObjectRegistry::new();
1908 let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1909
1910 let foreign = SocketAddrV4::new(Ipv4Addr::new(172, 16, 0, 50), 47808);
1912 server.bbmd().fdt().register(foreign, 0).unwrap();
1913 assert_eq!(server.bbmd().fdt().len(), 1);
1914
1915 std::thread::sleep(std::time::Duration::from_millis(10));
1917
1918 let cleaned = server.bbmd().cleanup();
1920 assert_eq!(cleaned, 1);
1921 assert_eq!(server.bbmd().fdt().len(), 0);
1922 }
1923
1924 #[test]
1925 fn test_bbmd_metrics() {
1926 let metrics = ServerMetrics::new();
1927
1928 metrics.record_bbmd_forwarded();
1929 metrics.record_bbmd_forwarded();
1930 metrics.record_bbmd_forwarded();
1931 metrics.record_bbmd_foreign_registration();
1932 metrics.record_bbmd_foreign_registration();
1933
1934 let snapshot = metrics.snapshot();
1935 assert_eq!(snapshot.bbmd_forwarded, 3);
1936 assert_eq!(snapshot.bbmd_foreign_registrations, 2);
1937 }
1938
1939 #[test]
1940 fn test_server_bbmd_disabled_no_response() {
1941 let config = ServerConfig::new(1234);
1942 let registry = ObjectRegistry::new();
1943 let server = BACnetServer::new(config, registry);
1944 let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1947 let msg = BvlcMessage {
1948 header: crate::network::bvlc::BvlcHeader::new(BvlcFunction::RegisterForeignDevice, 6),
1949 npdu: vec![0x00, 0x3C],
1950 original_source: None,
1951 result_code: None,
1952 };
1953
1954 let response = server.bbmd().handle_message(&msg, source);
1956 assert!(response.is_none());
1957 }
1958
1959 #[test]
1960 fn test_server_bbmd_reject_foreign_devices() {
1961 let config = ServerConfig::new(1234);
1962 let registry = ObjectRegistry::new();
1963 let server = BACnetServer::new(config, registry)
1964 .with_bbmd_config(BbmdConfig::enabled().with_accept_foreign_devices(false));
1965
1966 let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1967 let msg = BvlcMessage {
1968 header: crate::network::bvlc::BvlcHeader::new(BvlcFunction::RegisterForeignDevice, 6),
1969 npdu: vec![0x00, 0x3C],
1970 original_source: None,
1971 result_code: None,
1972 };
1973
1974 let response = server.bbmd().handle_message(&msg, source);
1975 assert!(response.is_some());
1976 let response = response.unwrap();
1977 assert_eq!(
1979 response.result_code,
1980 Some(crate::network::bvlc::BvlcResultCode::RegisterForeignDeviceNak)
1981 );
1982 }
1983
1984 #[test]
1985 fn test_server_bbmd_delete_fdt_entry() {
1986 let config = ServerConfig::new(1234);
1987 let registry = ObjectRegistry::new();
1988 let server = BACnetServer::new(config, registry).with_bbmd_config(BbmdConfig::enabled());
1989
1990 let foreign = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 200), 47808);
1992 server.bbmd().fdt().register(foreign, 60).unwrap();
1993 assert!(server.bbmd().fdt().is_registered(&foreign));
1994
1995 let mut delete_data = Vec::new();
1997 delete_data.extend_from_slice(&[192, 168, 1, 200]);
1998 delete_data.extend_from_slice(&47808u16.to_be_bytes());
1999
2000 let msg = BvlcMessage {
2001 header: crate::network::bvlc::BvlcHeader::new(
2002 BvlcFunction::DeleteForeignDeviceTableEntry,
2003 (4 + delete_data.len()) as u16,
2004 ),
2005 npdu: delete_data,
2006 original_source: None,
2007 result_code: None,
2008 };
2009
2010 let source = SocketAddrV4::new(Ipv4Addr::new(192, 168, 1, 1), 47808);
2011 let response = server.bbmd().handle_message(&msg, source);
2012 assert!(response.is_some());
2013 let response = response.unwrap();
2014 assert_eq!(
2015 response.result_code,
2016 Some(crate::network::bvlc::BvlcResultCode::Success)
2017 );
2018 assert!(!server.bbmd().fdt().is_registered(&foreign));
2019 }
2020
2021 #[test]
2022 fn test_segment_assembler_cleanup() {
2023 let config = ServerConfig::new(1234);
2024 let registry = ObjectRegistry::new();
2025 let server = BACnetServer::new(config, registry);
2026
2027 {
2029 let mut assembler = server.segment_assembler().lock();
2030 let seg = Segment::new(0, true, 1, vec![1, 2, 3]).with_service_choice(12);
2031 let _ = assembler.process_segment(12345, &seg);
2032 assert_eq!(assembler.active_count(), 1);
2033 }
2034
2035 {
2038 let mut assembler = server.segment_assembler().lock();
2039 let cleaned = assembler.cleanup();
2040 assert_eq!(cleaned, 0); assert_eq!(assembler.active_count(), 1);
2042 }
2043 }
2044}