1use std::collections::HashMap;
8use std::net::Ipv4Addr;
9use std::sync::atomic::{AtomicU8, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
15use tokio::task::JoinHandle;
16use tokio::time::Duration;
17use tracing::{debug, warn};
18
19use bacnet_encoding::apdu::{
20 self, encode_apdu, AbortPdu, Apdu, ComplexAck, ConfirmedRequest as ConfirmedRequestPdu,
21 ErrorPdu, RejectPdu, SegmentAck as SegmentAckPdu, SimpleAck,
22 UnconfirmedRequest as UnconfirmedRequestPdu,
23};
24use bacnet_encoding::primitives::encode_property_value;
25use bacnet_encoding::segmentation::{
26 max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType,
27};
28use bacnet_network::layer::NetworkLayer;
29use bacnet_objects::database::ObjectDatabase;
30use bacnet_objects::notification_class::get_notification_recipients;
31use bacnet_services::alarm_event::EventNotificationRequest;
32use bacnet_services::common::BACnetPropertyValue;
33use bacnet_services::cov::COVNotificationRequest;
34use bacnet_services::who_is::{IAmRequest, WhoIsRequest};
35use bacnet_transport::bip::BipTransport;
36use bacnet_transport::port::TransportPort;
37use bacnet_types::enums::{
38 AbortReason, ConfirmedServiceChoice, ErrorClass, ErrorCode, NetworkPriority, NotifyType,
39 ObjectType, PropertyIdentifier, RejectReason, Segmentation, UnconfirmedServiceChoice,
40};
41use bacnet_types::error::Error;
42use bacnet_types::primitives::{BACnetTimeStamp, ObjectIdentifier, PropertyValue, Time};
43use bacnet_types::MacAddr;
44
45use crate::cov::CovSubscriptionTable;
46use crate::handlers;
47
48const MAX_SEG_RECEIVERS: usize = 128;
50
51const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
53
54const MAX_NEG_SEGMENT_ACK_RETRIES: u8 = 3;
56
57const DEFAULT_APDU_RETRIES: u8 = 3;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CovAckResult {
67 Ack,
69 Error,
71}
72
73pub struct ServerTsm {
80 next_invoke_id: u8,
81 pending: HashMap<u8, CovAckResult>,
83}
84
85impl ServerTsm {
86 fn new() -> Self {
87 Self {
88 next_invoke_id: 0,
89 pending: HashMap::new(),
90 }
91 }
92
93 fn allocate(&mut self) -> u8 {
96 let id = self.next_invoke_id;
97 self.next_invoke_id = self.next_invoke_id.wrapping_add(1);
98 id
99 }
100
101 fn record_result(&mut self, invoke_id: u8, result: CovAckResult) {
103 self.pending.insert(invoke_id, result);
104 }
105
106 fn take_result(&mut self, invoke_id: u8) -> Option<CovAckResult> {
108 self.pending.remove(&invoke_id)
109 }
110
111 fn remove(&mut self, invoke_id: u8) {
113 self.pending.remove(&invoke_id);
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct TimeSyncData {
120 pub raw_service_data: Bytes,
122 pub is_utc: bool,
124}
125
126#[derive(Clone)]
128pub struct ServerConfig {
129 pub interface: Ipv4Addr,
131 pub port: u16,
133 pub broadcast_address: Ipv4Addr,
135 pub max_apdu_length: u32,
137 pub segmentation_supported: Segmentation,
139 pub vendor_id: u16,
141 pub cov_retry_timeout_ms: u64,
143 pub on_time_sync: Option<Arc<dyn Fn(TimeSyncData) + Send + Sync>>,
145 pub dcc_password: Option<String>,
147 pub reinit_password: Option<String>,
149 pub enable_fault_detection: bool,
153}
154
155impl std::fmt::Debug for ServerConfig {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("ServerConfig")
158 .field("interface", &self.interface)
159 .field("port", &self.port)
160 .field("broadcast_address", &self.broadcast_address)
161 .field("max_apdu_length", &self.max_apdu_length)
162 .field("segmentation_supported", &self.segmentation_supported)
163 .field("vendor_id", &self.vendor_id)
164 .field("cov_retry_timeout_ms", &self.cov_retry_timeout_ms)
165 .field(
166 "on_time_sync",
167 &self.on_time_sync.as_ref().map(|_| "<callback>"),
168 )
169 .field("dcc_password", &self.dcc_password.as_ref().map(|_| "***"))
170 .field(
171 "reinit_password",
172 &self.reinit_password.as_ref().map(|_| "***"),
173 )
174 .field("enable_fault_detection", &self.enable_fault_detection)
175 .finish()
176 }
177}
178
179impl Default for ServerConfig {
180 fn default() -> Self {
181 Self {
182 interface: Ipv4Addr::UNSPECIFIED,
183 port: 0xBAC0,
184 broadcast_address: Ipv4Addr::BROADCAST,
185 max_apdu_length: 1476,
186 segmentation_supported: Segmentation::NONE,
187 vendor_id: 0,
188 cov_retry_timeout_ms: 3000,
189 on_time_sync: None,
190 dcc_password: None,
191 reinit_password: None,
192 enable_fault_detection: false,
193 }
194 }
195}
196
197pub struct ServerBuilder<T: TransportPort> {
199 config: ServerConfig,
200 db: ObjectDatabase,
201 transport: Option<T>,
202}
203
204impl<T: TransportPort + 'static> ServerBuilder<T> {
205 pub fn database(mut self, db: ObjectDatabase) -> Self {
207 self.db = db;
208 self
209 }
210
211 pub fn transport(mut self, transport: T) -> Self {
213 self.transport = Some(transport);
214 self
215 }
216
217 pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
219 self.config.dcc_password = Some(password.into());
220 self
221 }
222
223 pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
225 self.config.reinit_password = Some(password.into());
226 self
227 }
228
229 pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
231 self.config.enable_fault_detection = enabled;
232 self
233 }
234
235 pub async fn build(self) -> Result<BACnetServer<T>, Error> {
237 let transport = self
238 .transport
239 .ok_or_else(|| Error::Encoding("transport not set on ServerBuilder".into()))?;
240 BACnetServer::start(self.config, self.db, transport).await
241 }
242}
243
244pub struct BipServerBuilder {
246 config: ServerConfig,
247 db: ObjectDatabase,
248}
249
250impl BipServerBuilder {
251 pub fn interface(mut self, ip: Ipv4Addr) -> Self {
253 self.config.interface = ip;
254 self
255 }
256
257 pub fn port(mut self, port: u16) -> Self {
259 self.config.port = port;
260 self
261 }
262
263 pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
265 self.config.broadcast_address = addr;
266 self
267 }
268
269 pub fn database(mut self, db: ObjectDatabase) -> Self {
271 self.db = db;
272 self
273 }
274
275 pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
277 self.config.dcc_password = Some(password.into());
278 self
279 }
280
281 pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
283 self.config.reinit_password = Some(password.into());
284 self
285 }
286
287 pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
289 self.config.enable_fault_detection = enabled;
290 self
291 }
292
293 pub async fn build(self) -> Result<BACnetServer<BipTransport>, Error> {
295 let transport = BipTransport::new(
296 self.config.interface,
297 self.config.port,
298 self.config.broadcast_address,
299 );
300 BACnetServer::start(self.config, self.db, transport).await
301 }
302}
303
304type SegKey = (MacAddr, u8);
306
307pub struct BACnetServer<T: TransportPort> {
309 #[allow(dead_code)]
310 config: ServerConfig,
311 #[allow(dead_code)]
313 network: Arc<NetworkLayer<T>>,
314 db: Arc<RwLock<ObjectDatabase>>,
316 #[allow(dead_code)]
318 cov_table: Arc<RwLock<CovSubscriptionTable>>,
319 #[allow(dead_code)]
321 seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
322 #[allow(dead_code)]
325 cov_in_flight: Arc<Semaphore>,
326 #[allow(dead_code)]
328 server_tsm: Arc<Mutex<ServerTsm>>,
329 comm_state: Arc<AtomicU8>,
332 #[allow(dead_code)]
337 dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>>,
338 dispatch_task: Option<JoinHandle<()>>,
339 cov_purge_task: Option<JoinHandle<()>>,
340 fault_detection_task: Option<JoinHandle<()>>,
341 event_enrollment_task: Option<JoinHandle<()>>,
342 trend_log_task: Option<JoinHandle<()>>,
343 schedule_tick_task: Option<JoinHandle<()>>,
344 local_mac: MacAddr,
345}
346
347impl BACnetServer<BipTransport> {
348 pub fn bip_builder() -> BipServerBuilder {
350 BipServerBuilder {
351 config: ServerConfig::default(),
352 db: ObjectDatabase::new(),
353 }
354 }
355
356 pub fn builder() -> BipServerBuilder {
358 Self::bip_builder()
359 }
360}
361
362#[cfg(feature = "sc-tls")]
363impl BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
364 pub fn sc_builder() -> ScServerBuilder {
366 ScServerBuilder {
367 config: ServerConfig::default(),
368 db: ObjectDatabase::new(),
369 hub_url: String::new(),
370 tls_config: None,
371 vmac: [0; 6],
372 heartbeat_interval_ms: 30_000,
373 heartbeat_timeout_ms: 60_000,
374 reconnect: None,
375 }
376 }
377}
378
379#[cfg(feature = "sc-tls")]
383pub struct ScServerBuilder {
384 config: ServerConfig,
385 db: ObjectDatabase,
386 hub_url: String,
387 tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
388 vmac: bacnet_transport::sc_frame::Vmac,
389 heartbeat_interval_ms: u64,
390 heartbeat_timeout_ms: u64,
391 reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
392}
393
394#[cfg(feature = "sc-tls")]
395impl ScServerBuilder {
396 pub fn hub_url(mut self, url: &str) -> Self {
398 self.hub_url = url.to_string();
399 self
400 }
401
402 pub fn tls_config(
404 mut self,
405 config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
406 ) -> Self {
407 self.tls_config = Some(config);
408 self
409 }
410
411 pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
413 self.vmac = vmac;
414 self
415 }
416
417 pub fn database(mut self, db: ObjectDatabase) -> Self {
419 self.db = db;
420 self
421 }
422
423 pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
425 self.heartbeat_interval_ms = ms;
426 self
427 }
428
429 pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
431 self.heartbeat_timeout_ms = ms;
432 self
433 }
434
435 pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
437 self.reconnect = Some(config);
438 self
439 }
440
441 pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
443 self.config.dcc_password = Some(password.into());
444 self
445 }
446
447 pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
449 self.config.reinit_password = Some(password.into());
450 self
451 }
452
453 pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
455 self.config.enable_fault_detection = enabled;
456 self
457 }
458
459 pub async fn build(
461 self,
462 ) -> Result<
463 BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
464 Error,
465 > {
466 let tls_config = self
467 .tls_config
468 .ok_or_else(|| Error::Encoding("SC server builder: tls_config is required".into()))?;
469
470 let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
471
472 let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
473 .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
474 .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
475 if let Some(rc) = self.reconnect {
476 #[allow(deprecated)]
477 {
478 transport = transport.with_reconnect(rc);
479 }
480 }
481
482 BACnetServer::start(self.config, self.db, transport).await
483 }
484}
485
486impl<T: TransportPort + 'static> BACnetServer<T> {
487 pub fn generic_builder() -> ServerBuilder<T> {
489 ServerBuilder {
490 config: ServerConfig::default(),
491 db: ObjectDatabase::new(),
492 transport: None,
493 }
494 }
495
496 pub async fn start(
498 mut config: ServerConfig,
499 db: ObjectDatabase,
500 transport: T,
501 ) -> Result<Self, Error> {
502 let transport_max = transport.max_apdu_length() as u32;
504 config.max_apdu_length = config.max_apdu_length.min(transport_max);
505
506 if config.vendor_id == 0 {
507 warn!("vendor_id is 0 (ASHRAE reserved); set a valid vendor ID for production use");
508 }
509
510 let mut network = NetworkLayer::new(transport);
511 let apdu_rx = network.start().await?;
512 let local_mac = MacAddr::from_slice(network.local_mac());
513
514 let network = Arc::new(network);
515 let db = Arc::new(RwLock::new(db));
516 let cov_table = Arc::new(RwLock::new(CovSubscriptionTable::new()));
517 let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
518 Arc::new(Mutex::new(HashMap::new()));
519
520 let cov_in_flight = Arc::new(Semaphore::new(255));
521 let server_tsm = Arc::new(Mutex::new(ServerTsm::new()));
522 let comm_state = Arc::new(AtomicU8::new(0)); let dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
524
525 let network_dispatch = Arc::clone(&network);
526 let db_dispatch = Arc::clone(&db);
527 let cov_dispatch = Arc::clone(&cov_table);
528 let seg_ack_dispatch = Arc::clone(&seg_ack_senders);
529 let cov_in_flight_dispatch = Arc::clone(&cov_in_flight);
530 let server_tsm_dispatch = Arc::clone(&server_tsm);
531 let comm_state_dispatch = Arc::clone(&comm_state);
532 let dcc_timer_dispatch = Arc::clone(&dcc_timer);
533 let config_dispatch = config.clone();
534
535 let dispatch_task = tokio::spawn(async move {
536 let mut apdu_rx = apdu_rx;
537 let mut seg_receivers: HashMap<
541 SegKey,
542 (
543 SegmentReceiver,
544 bacnet_encoding::apdu::ConfirmedRequest,
545 Instant,
546 ),
547 > = HashMap::new();
548
549 while let Some(received) = apdu_rx.recv().await {
550 let now = Instant::now();
552 seg_receivers.retain(|_key, (_rx, _req, last_activity)| {
553 now.duration_since(*last_activity) < SEG_RECEIVER_TIMEOUT
554 });
555
556 match apdu::decode_apdu(received.apdu.clone()) {
557 Ok(decoded) => {
558 let source_mac = received.source_mac.clone();
559 let mut received = Some(received);
560 let handled = if let Apdu::ConfirmedRequest(ref req) = decoded {
562 if req.segmented {
563 let seq = req.sequence_number.unwrap_or(0);
564 let key: SegKey = (source_mac.clone(), req.invoke_id);
565
566 if seq == 0 {
567 if seg_receivers.len() >= MAX_SEG_RECEIVERS {
569 warn!("Too many concurrent segmented sessions ({}), rejecting", seg_receivers.len());
570 let abort_pdu = Apdu::Abort(AbortPdu {
571 sent_by_server: true,
572 invoke_id: req.invoke_id,
573 abort_reason: AbortReason::BUFFER_OVERFLOW,
574 });
575 let mut abort_buf = BytesMut::new();
576 encode_apdu(&mut abort_buf, &abort_pdu);
577 let _ = network_dispatch
578 .send_apdu(
579 &abort_buf,
580 &source_mac,
581 false,
582 NetworkPriority::NORMAL,
583 )
584 .await;
585 continue;
586 }
587 let mut receiver = SegmentReceiver::new();
590 if let Err(e) =
591 receiver.receive(seq, req.service_request.clone())
592 {
593 warn!(error = %e, "Rejecting oversized segment");
594 continue;
595 }
596 seg_receivers.insert(
597 key.clone(),
598 (receiver, req.clone(), Instant::now()),
599 );
600 } else if let Some((receiver, _, last_activity)) =
601 seg_receivers.get_mut(&key)
602 {
603 if let Err(e) =
604 receiver.receive(seq, req.service_request.clone())
605 {
606 warn!(error = %e, "Rejecting oversized segment");
607 continue;
608 }
609 *last_activity = Instant::now();
610 } else {
611 warn!(
614 invoke_id = req.invoke_id,
615 seq = seq,
616 "Received non-initial segment without \
617 prior segment 0, aborting"
618 );
619 let abort_pdu = Apdu::Abort(AbortPdu {
620 sent_by_server: true,
621 invoke_id: req.invoke_id,
622 abort_reason: AbortReason::INVALID_APDU_IN_THIS_STATE,
623 });
624 let mut abort_buf = BytesMut::new();
625 encode_apdu(&mut abort_buf, &abort_pdu);
626 let _ = network_dispatch
627 .send_apdu(
628 &abort_buf,
629 &source_mac,
630 false,
631 NetworkPriority::NORMAL,
632 )
633 .await;
634 continue;
635 }
636
637 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
639 negative_ack: false,
640 sent_by_server: true,
641 invoke_id: req.invoke_id,
642 sequence_number: seq,
643 actual_window_size: 1,
644 });
645 let mut ack_buf = BytesMut::new();
646 encode_apdu(&mut ack_buf, &seg_ack);
647 if let Err(e) = network_dispatch
648 .send_apdu(
649 &ack_buf,
650 &source_mac,
651 false,
652 NetworkPriority::NORMAL,
653 )
654 .await
655 {
656 warn!(
657 error = %e,
658 "Failed to send SegmentAck for \
659 segmented request"
660 );
661 }
662
663 if !req.more_follows {
665 if let Some((receiver, first_req, _)) =
666 seg_receivers.remove(&key)
667 {
668 let total = receiver.received_count();
669 match receiver.reassemble(total) {
670 Ok(full_data) => {
671 let reassembled =
672 bacnet_encoding::apdu::ConfirmedRequest {
673 segmented: false,
674 more_follows: false,
675 sequence_number: None,
676 proposed_window_size: None,
677 service_request: Bytes::from(full_data),
678 invoke_id: first_req.invoke_id,
679 service_choice: first_req.service_choice,
680 max_apdu_length: first_req.max_apdu_length,
681 segmented_response_accepted: first_req
682 .segmented_response_accepted,
683 max_segments: first_req.max_segments,
684 };
685 debug!(
686 invoke_id = reassembled.invoke_id,
687 segments = total,
688 payload_len = reassembled.service_request.len(),
689 "Reassembled segmented ConfirmedRequest"
690 );
691 Self::dispatch(
692 &db_dispatch,
693 &network_dispatch,
694 &cov_dispatch,
695 &seg_ack_dispatch,
696 &cov_in_flight_dispatch,
697 &server_tsm_dispatch,
698 &comm_state_dispatch,
699 &dcc_timer_dispatch,
700 &config_dispatch,
701 &source_mac,
702 Apdu::ConfirmedRequest(reassembled),
703 received
704 .take()
705 .expect("received consumed twice"),
706 )
707 .await;
708 }
709 Err(e) => {
710 warn!(
711 error = %e,
712 "Failed to reassemble \
713 segmented request"
714 );
715 }
716 }
717 }
718 }
719
720 true } else {
722 false }
724 } else {
725 false };
727
728 if !handled {
729 Self::dispatch(
730 &db_dispatch,
731 &network_dispatch,
732 &cov_dispatch,
733 &seg_ack_dispatch,
734 &cov_in_flight_dispatch,
735 &server_tsm_dispatch,
736 &comm_state_dispatch,
737 &dcc_timer_dispatch,
738 &config_dispatch,
739 &source_mac,
740 decoded,
741 received.take().expect("received consumed twice"),
742 )
743 .await;
744 }
745 }
746 Err(e) => {
747 warn!(error = %e, "Server failed to decode received APDU");
748 }
749 }
750 }
751 });
752
753 let cov_table_for_purge = Arc::clone(&cov_table);
754 let cov_purge_task = tokio::spawn(async move {
755 let mut interval = tokio::time::interval(Duration::from_secs(30));
756 loop {
757 interval.tick().await;
758 let mut table = cov_table_for_purge.write().await;
759 let purged = table.purge_expired();
760 if purged > 0 {
761 debug!(purged, "Purged expired COV subscriptions");
762 }
763 }
764 });
765
766 let fault_detection_task = if config.enable_fault_detection {
767 let db_fault = Arc::clone(&db);
768 Some(tokio::spawn(async move {
769 let detector = crate::fault_detection::FaultDetector::default();
770 let mut interval = tokio::time::interval(Duration::from_secs(10));
771 loop {
772 interval.tick().await;
773 let mut db_guard = db_fault.write().await;
774 let changes = detector.evaluate(&mut db_guard);
775 for change in &changes {
776 debug!(
777 object = %change.object_id,
778 old = change.old_reliability,
779 new = change.new_reliability,
780 "Fault detection: reliability changed"
781 );
782 }
783 }
784 }))
785 } else {
786 None
787 };
788
789 let event_enrollment_task = if config.enable_fault_detection {
792 let db_ee = Arc::clone(&db);
793 Some(tokio::spawn(async move {
794 let mut interval = tokio::time::interval(Duration::from_secs(10));
795 loop {
796 interval.tick().await;
797 let mut db_guard = db_ee.write().await;
798 let transitions =
799 crate::event_enrollment::evaluate_event_enrollments(&mut db_guard);
800 for t in &transitions {
801 debug!(
802 enrollment = %t.enrollment_oid,
803 monitored = %t.monitored_oid,
804 from = ?t.change.from,
805 to = ?t.change.to,
806 "Event enrollment: state changed"
807 );
808 }
809 }
810 }))
811 } else {
812 None
813 };
814
815 let db_trend = Arc::clone(&db);
817 let trend_log_task = Some(tokio::spawn(async move {
818 let mut interval = tokio::time::interval(Duration::from_secs(1));
819 loop {
820 interval.tick().await;
821 crate::trend_log::poll_trend_logs(&db_trend).await;
822 }
823 }));
824
825 let db_schedule = Arc::clone(&db);
828 let schedule_tick_task = Some(tokio::spawn(async move {
829 let mut interval = tokio::time::interval(Duration::from_secs(60));
830 loop {
831 interval.tick().await;
832 crate::schedule::tick_schedules(&db_schedule).await;
833 }
834 }));
835
836 Ok(Self {
837 config,
838 network,
839 db,
840 cov_table,
841 seg_ack_senders,
842 cov_in_flight,
843 server_tsm,
844 comm_state,
845 dcc_timer,
846 dispatch_task: Some(dispatch_task),
847 cov_purge_task: Some(cov_purge_task),
848 fault_detection_task,
849 event_enrollment_task,
850 trend_log_task,
851 schedule_tick_task,
852 local_mac,
853 })
854 }
855
856 pub fn local_mac(&self) -> &[u8] {
858 &self.local_mac
859 }
860
861 pub fn database(&self) -> &Arc<RwLock<ObjectDatabase>> {
863 &self.db
864 }
865
866 pub fn comm_state(&self) -> u8 {
870 self.comm_state.load(Ordering::Acquire)
871 }
872
873 pub async fn generate_pics(&self, pics_config: &crate::pics::PicsConfig) -> crate::pics::Pics {
878 let db = self.db.read().await;
879 crate::pics::PicsGenerator::new(&db, &self.config, pics_config).generate()
880 }
881
882 pub async fn stop(&mut self) -> Result<(), Error> {
884 if let Some(task) = self.fault_detection_task.take() {
885 task.abort();
886 let _ = task.await;
887 }
888 if let Some(task) = self.event_enrollment_task.take() {
889 task.abort();
890 let _ = task.await;
891 }
892 if let Some(task) = self.trend_log_task.take() {
893 task.abort();
894 let _ = task.await;
895 }
896 if let Some(task) = self.schedule_tick_task.take() {
897 task.abort();
898 let _ = task.await;
899 }
900 if let Some(task) = self.cov_purge_task.take() {
901 task.abort();
902 let _ = task.await;
903 }
904 if let Some(task) = self.dispatch_task.take() {
905 task.abort();
906 let _ = task.await;
907 }
908 Ok(())
910 }
911
912 #[allow(clippy::too_many_arguments)]
914 async fn dispatch(
915 db: &Arc<RwLock<ObjectDatabase>>,
916 network: &Arc<NetworkLayer<T>>,
917 cov_table: &Arc<RwLock<CovSubscriptionTable>>,
918 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
919 cov_in_flight: &Arc<Semaphore>,
920 server_tsm: &Arc<Mutex<ServerTsm>>,
921 comm_state: &Arc<AtomicU8>,
922 dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
923 config: &ServerConfig,
924 source_mac: &[u8],
925 apdu: Apdu,
926 mut received: bacnet_network::layer::ReceivedApdu,
927 ) {
928 match apdu {
929 Apdu::ConfirmedRequest(req) => {
930 let reply_tx = received.reply_tx.take();
931 Self::handle_confirmed_request(
932 db,
933 network,
934 cov_table,
935 seg_ack_senders,
936 cov_in_flight,
937 server_tsm,
938 comm_state,
939 dcc_timer,
940 config,
941 source_mac,
942 req,
943 reply_tx,
944 )
945 .await;
946 }
947 Apdu::UnconfirmedRequest(req) => {
948 Self::handle_unconfirmed_request(db, network, config, comm_state, req, &received)
949 .await;
950 }
951 Apdu::SimpleAck(sa) => {
952 let mut tsm = server_tsm.lock().await;
954 tsm.record_result(sa.invoke_id, CovAckResult::Ack);
955 debug!(
956 invoke_id = sa.invoke_id,
957 "SimpleAck received for outgoing confirmed notification"
958 );
959 }
960 Apdu::Error(err) => {
961 let mut tsm = server_tsm.lock().await;
963 tsm.record_result(err.invoke_id, CovAckResult::Error);
964 debug!(
965 invoke_id = err.invoke_id,
966 error_class = err.error_class.to_raw(),
967 error_code = err.error_code.to_raw(),
968 "Error received for outgoing confirmed notification"
969 );
970 }
971 Apdu::Reject(rej) => {
972 let mut tsm = server_tsm.lock().await;
974 tsm.record_result(rej.invoke_id, CovAckResult::Error);
975 debug!(
976 invoke_id = rej.invoke_id,
977 "Reject received for outgoing confirmed notification"
978 );
979 }
980 Apdu::Abort(abort) if !abort.sent_by_server => {
981 let mut tsm = server_tsm.lock().await;
983 tsm.record_result(abort.invoke_id, CovAckResult::Error);
984 debug!(
985 invoke_id = abort.invoke_id,
986 "Abort received for outgoing confirmed notification"
987 );
988 }
989 Apdu::SegmentAck(sa) => {
990 let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
991 let senders = seg_ack_senders.lock().await;
992 if let Some(tx) = senders.get(&key) {
993 let _ = tx.try_send(sa);
994 } else {
995 debug!(
996 invoke_id = sa.invoke_id,
997 "Server ignoring SegmentAck for unknown transaction"
998 );
999 }
1000 }
1001 _ => {
1002 debug!("Server ignoring unhandled APDU type");
1003 }
1004 }
1005 }
1006
1007 #[allow(clippy::too_many_arguments)]
1009 async fn handle_confirmed_request(
1010 db: &Arc<RwLock<ObjectDatabase>>,
1011 network: &Arc<NetworkLayer<T>>,
1012 cov_table: &Arc<RwLock<CovSubscriptionTable>>,
1013 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1014 cov_in_flight: &Arc<Semaphore>,
1015 server_tsm: &Arc<Mutex<ServerTsm>>,
1016 comm_state: &Arc<AtomicU8>,
1017 dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
1018 config: &ServerConfig,
1019 source_mac: &[u8],
1020 req: bacnet_encoding::apdu::ConfirmedRequest,
1021 reply_tx: Option<tokio::sync::oneshot::Sender<Bytes>>,
1022 ) {
1023 let invoke_id = req.invoke_id;
1024 let service_choice = req.service_choice;
1025 let client_max_apdu = req.max_apdu_length;
1026 let client_accepts_segmented = req.segmented_response_accepted;
1027 let mut written_oids: Vec<ObjectIdentifier> = Vec::new();
1028
1029 let state = comm_state.load(Ordering::Acquire);
1033 if state == 1
1034 && service_choice != ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL
1035 && service_choice != ConfirmedServiceChoice::REINITIALIZE_DEVICE
1036 {
1037 debug!(
1038 service = service_choice.to_raw(),
1039 "DCC DISABLE: dropping confirmed request"
1040 );
1041 return;
1042 }
1043
1044 let complex_ack = |ack_buf: BytesMut| -> Apdu {
1046 Apdu::ComplexAck(ComplexAck {
1047 segmented: false,
1048 more_follows: false,
1049 invoke_id,
1050 sequence_number: None,
1051 proposed_window_size: None,
1052 service_choice,
1053 service_ack: ack_buf.freeze(),
1054 })
1055 };
1056 let simple_ack = || -> Apdu {
1057 Apdu::SimpleAck(SimpleAck {
1058 invoke_id,
1059 service_choice,
1060 })
1061 };
1062
1063 let mut ack_buf = BytesMut::with_capacity(512);
1064 let response = match service_choice {
1065 s if s == ConfirmedServiceChoice::READ_PROPERTY => {
1066 let db = db.read().await;
1067 match handlers::handle_read_property(&db, &req.service_request, &mut ack_buf) {
1068 Ok(()) => complex_ack(ack_buf),
1069 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1070 }
1071 }
1072 s if s == ConfirmedServiceChoice::WRITE_PROPERTY => {
1073 let result = {
1074 let mut db = db.write().await;
1075 handlers::handle_write_property(&mut db, &req.service_request)
1076 };
1077 match result {
1078 Ok(oid) => {
1079 written_oids.push(oid);
1080 simple_ack()
1081 }
1082 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1083 }
1084 }
1085 s if s == ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE => {
1086 let db = db.read().await;
1087 match handlers::handle_read_property_multiple(
1088 &db,
1089 &req.service_request,
1090 &mut ack_buf,
1091 ) {
1092 Ok(()) => complex_ack(ack_buf),
1093 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1094 }
1095 }
1096 s if s == ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE => {
1097 let result = {
1098 let mut db = db.write().await;
1099 handlers::handle_write_property_multiple(&mut db, &req.service_request)
1100 };
1101 match result {
1102 Ok(oids) => {
1103 written_oids = oids;
1104 simple_ack()
1105 }
1106 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1107 }
1108 }
1109 s if s == ConfirmedServiceChoice::SUBSCRIBE_COV => {
1110 let db = db.read().await;
1111 let mut table = cov_table.write().await;
1112 match handlers::handle_subscribe_cov(
1113 &mut table,
1114 &db,
1115 source_mac,
1116 &req.service_request,
1117 ) {
1118 Ok(()) => simple_ack(),
1119 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1120 }
1121 }
1122 s if s == ConfirmedServiceChoice::SUBSCRIBE_COV_PROPERTY => {
1123 let db = db.read().await;
1124 let mut table = cov_table.write().await;
1125 match handlers::handle_subscribe_cov_property(
1126 &mut table,
1127 &db,
1128 source_mac,
1129 &req.service_request,
1130 ) {
1131 Ok(()) => simple_ack(),
1132 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1133 }
1134 }
1135 s if s == ConfirmedServiceChoice::CREATE_OBJECT => {
1136 let result = {
1137 let mut db = db.write().await;
1138 handlers::handle_create_object(&mut db, &req.service_request, &mut ack_buf)
1139 };
1140 match result {
1141 Ok(()) => complex_ack(ack_buf),
1142 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1143 }
1144 }
1145 s if s == ConfirmedServiceChoice::DELETE_OBJECT => {
1146 let result = {
1147 let mut db = db.write().await;
1148 handlers::handle_delete_object(&mut db, &req.service_request)
1149 };
1150 match result {
1151 Ok(()) => simple_ack(),
1152 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1153 }
1154 }
1155 s if s == ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL => {
1156 match handlers::handle_device_communication_control(
1157 &req.service_request,
1158 comm_state,
1159 &config.dcc_password,
1160 ) {
1161 Ok((_state, duration)) => {
1162 if let Some(prev) = dcc_timer.lock().await.take() {
1164 prev.abort();
1165 }
1166 if let Some(minutes) = duration {
1169 let comm = Arc::clone(comm_state);
1170 let handle = tokio::spawn(async move {
1171 tokio::time::sleep(std::time::Duration::from_secs(
1172 minutes as u64 * 60,
1173 ))
1174 .await;
1175 comm.store(0, Ordering::Release);
1176 tracing::debug!(
1177 "DCC timer expired after {} min, state reverted to ENABLE",
1178 minutes
1179 );
1180 });
1181 *dcc_timer.lock().await = Some(handle);
1182 }
1183 simple_ack()
1184 }
1185 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1186 }
1187 }
1188 s if s == ConfirmedServiceChoice::REINITIALIZE_DEVICE => {
1189 match handlers::handle_reinitialize_device(
1190 &req.service_request,
1191 &config.reinit_password,
1192 ) {
1193 Ok(()) => simple_ack(),
1194 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1195 }
1196 }
1197 s if s == ConfirmedServiceChoice::GET_EVENT_INFORMATION => {
1198 let db = db.read().await;
1199 match handlers::handle_get_event_information(
1200 &db,
1201 &req.service_request,
1202 &mut ack_buf,
1203 ) {
1204 Ok(()) => complex_ack(ack_buf),
1205 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1206 }
1207 }
1208 s if s == ConfirmedServiceChoice::ACKNOWLEDGE_ALARM => {
1209 let mut db = db.write().await;
1210 match handlers::handle_acknowledge_alarm(&mut db, &req.service_request) {
1211 Ok(()) => simple_ack(),
1212 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1213 }
1214 }
1215 s if s == ConfirmedServiceChoice::READ_RANGE => {
1216 let db = db.read().await;
1217 match handlers::handle_read_range(&db, &req.service_request, &mut ack_buf) {
1218 Ok(()) => complex_ack(ack_buf),
1219 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1220 }
1221 }
1222 s if s == ConfirmedServiceChoice::ATOMIC_READ_FILE => {
1223 let db = db.read().await;
1224 match handlers::handle_atomic_read_file(&db, &req.service_request, &mut ack_buf) {
1225 Ok(()) => complex_ack(ack_buf),
1226 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1227 }
1228 }
1229 s if s == ConfirmedServiceChoice::ATOMIC_WRITE_FILE => {
1230 let result = {
1231 let mut db = db.write().await;
1232 handlers::handle_atomic_write_file(&mut db, &req.service_request, &mut ack_buf)
1233 };
1234 match result {
1235 Ok(()) => complex_ack(ack_buf),
1236 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1237 }
1238 }
1239 s if s == ConfirmedServiceChoice::ADD_LIST_ELEMENT => {
1240 let mut db = db.write().await;
1241 match handlers::handle_add_list_element(&mut db, &req.service_request) {
1242 Ok(()) => simple_ack(),
1243 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1244 }
1245 }
1246 s if s == ConfirmedServiceChoice::REMOVE_LIST_ELEMENT => {
1247 let mut db = db.write().await;
1248 match handlers::handle_remove_list_element(&mut db, &req.service_request) {
1249 Ok(()) => simple_ack(),
1250 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1251 }
1252 }
1253 _ => {
1254 debug!(
1255 service = service_choice.to_raw(),
1256 "Unsupported confirmed service"
1257 );
1258 Apdu::Reject(RejectPdu {
1259 invoke_id,
1260 reject_reason: RejectReason::UNRECOGNIZED_SERVICE,
1261 })
1262 }
1263 };
1264
1265 if let Apdu::ComplexAck(ref ack) = response {
1267 let mut full_buf = BytesMut::new();
1269 encode_apdu(&mut full_buf, &response);
1270
1271 if full_buf.len() > client_max_apdu as usize {
1272 if !client_accepts_segmented {
1274 let abort = Apdu::Abort(AbortPdu {
1276 sent_by_server: true,
1277 invoke_id,
1278 abort_reason: AbortReason::SEGMENTATION_NOT_SUPPORTED,
1279 });
1280 let mut buf = BytesMut::new();
1281 encode_apdu(&mut buf, &abort);
1282 if let Err(e) = network
1283 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1284 .await
1285 {
1286 warn!(error = %e, "Failed to send Abort for segmentation-not-supported");
1287 }
1288 } else {
1289 let network = Arc::clone(network);
1293 let seg_ack_senders = Arc::clone(seg_ack_senders);
1294 let source_mac = MacAddr::from_slice(source_mac);
1295 let service_ack_data = ack.service_ack.clone();
1296 tokio::spawn(async move {
1297 Self::send_segmented_complex_ack(
1298 &network,
1299 &seg_ack_senders,
1300 &source_mac,
1301 invoke_id,
1302 service_choice,
1303 &service_ack_data,
1304 client_max_apdu,
1305 )
1306 .await;
1307 });
1308 }
1309
1310 for oid in &written_oids {
1312 Self::fire_event_notifications(db, network, comm_state, oid).await;
1313 }
1314 for oid in &written_oids {
1315 Self::fire_cov_notifications(
1316 db,
1317 network,
1318 cov_table,
1319 cov_in_flight,
1320 server_tsm,
1321 comm_state,
1322 config,
1323 oid,
1324 )
1325 .await;
1326 }
1327 return;
1328 }
1329 }
1330
1331 let mut buf = BytesMut::new();
1336 encode_apdu(&mut buf, &response);
1337
1338 if let Some(tx) = reply_tx {
1339 use bacnet_encoding::npdu::{encode_npdu, Npdu};
1340 let apdu_bytes = buf.freeze();
1341 let npdu = Npdu {
1342 is_network_message: false,
1343 expecting_reply: false,
1344 priority: NetworkPriority::NORMAL,
1345 destination: None,
1346 source: None,
1347 payload: apdu_bytes.clone(),
1348 ..Npdu::default()
1349 };
1350 let mut npdu_buf = BytesMut::with_capacity(2 + apdu_bytes.len());
1351 match encode_npdu(&mut npdu_buf, &npdu) {
1352 Ok(()) => {
1353 let _ = tx.send(npdu_buf.freeze());
1354 }
1355 Err(e) => {
1356 warn!(error = %e, "Failed to encode NPDU for MS/TP reply");
1357 if let Err(e) = network
1360 .send_apdu(&apdu_bytes, source_mac, false, NetworkPriority::NORMAL)
1361 .await
1362 {
1363 warn!(error = %e, "Failed to send response");
1364 }
1365 }
1366 }
1367 } else if let Err(e) = network
1368 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1369 .await
1370 {
1371 warn!(error = %e, "Failed to send response");
1372 }
1373
1374 for oid in &written_oids {
1376 Self::fire_event_notifications(db, network, comm_state, oid).await;
1377 }
1378
1379 for oid in &written_oids {
1381 Self::fire_cov_notifications(
1382 db,
1383 network,
1384 cov_table,
1385 cov_in_flight,
1386 server_tsm,
1387 comm_state,
1388 config,
1389 oid,
1390 )
1391 .await;
1392 }
1393 }
1394
1395 async fn send_segmented_complex_ack(
1401 network: &Arc<NetworkLayer<T>>,
1402 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1403 source_mac: &[u8],
1404 invoke_id: u8,
1405 service_choice: ConfirmedServiceChoice,
1406 service_ack_data: &[u8],
1407 client_max_apdu: u16,
1408 ) {
1409 let max_seg_size = max_segment_payload(client_max_apdu, SegmentedPduType::ComplexAck);
1410 let segments = split_payload(service_ack_data, max_seg_size);
1411 let total_segments = segments.len();
1412
1413 if total_segments > 255 {
1414 warn!(
1415 total_segments,
1416 "Response requires too many segments, aborting"
1417 );
1418 let abort = Apdu::Abort(AbortPdu {
1419 sent_by_server: true,
1420 invoke_id,
1421 abort_reason: AbortReason::BUFFER_OVERFLOW,
1422 });
1423 let mut buf = BytesMut::new();
1424 encode_apdu(&mut buf, &abort);
1425 let _ = network
1426 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1427 .await;
1428 return;
1429 }
1430
1431 debug!(
1432 total_segments,
1433 max_seg_size,
1434 payload_len = service_ack_data.len(),
1435 "Starting segmented ComplexAck send"
1436 );
1437
1438 let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
1440 let key = (MacAddr::from_slice(source_mac), invoke_id);
1441 {
1442 seg_ack_senders.lock().await.insert(key.clone(), seg_ack_tx);
1443 }
1444
1445 let seg_timeout = Duration::from_secs(5);
1446 let mut seg_idx: usize = 0;
1447 let mut neg_ack_retries: u8 = 0;
1448
1449 while seg_idx < total_segments {
1450 let is_last = seg_idx == total_segments - 1;
1451
1452 let pdu = Apdu::ComplexAck(ComplexAck {
1453 segmented: true,
1454 more_follows: !is_last,
1455 invoke_id,
1456 sequence_number: Some(seg_idx as u8),
1457 proposed_window_size: Some(1),
1458 service_choice,
1459 service_ack: segments[seg_idx].clone(),
1460 });
1461
1462 let mut buf = BytesMut::with_capacity(client_max_apdu as usize);
1463 encode_apdu(&mut buf, &pdu);
1464
1465 if let Err(e) = network
1466 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1467 .await
1468 {
1469 warn!(error = %e, seq = seg_idx, "Failed to send segment");
1470 break;
1471 }
1472
1473 debug!(seq = seg_idx, is_last, "Sent ComplexAck segment");
1474
1475 if !is_last {
1477 match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1478 Ok(Some(ack)) => {
1479 debug!(
1480 seq = ack.sequence_number,
1481 negative = ack.negative_ack,
1482 "Received SegmentAck for ComplexAck"
1483 );
1484 if ack.negative_ack {
1485 neg_ack_retries += 1;
1486 if neg_ack_retries > MAX_NEG_SEGMENT_ACK_RETRIES {
1487 warn!(
1488 invoke_id,
1489 retries = neg_ack_retries,
1490 "Too many negative SegmentAck retries, aborting segmented send"
1491 );
1492 let abort = Apdu::Abort(AbortPdu {
1493 sent_by_server: true,
1494 invoke_id,
1495 abort_reason: AbortReason::TSM_TIMEOUT,
1496 });
1497 let mut abort_buf = BytesMut::new();
1498 encode_apdu(&mut abort_buf, &abort);
1499 let _ = network
1500 .send_apdu(
1501 &abort_buf,
1502 source_mac,
1503 false,
1504 NetworkPriority::NORMAL,
1505 )
1506 .await;
1507 break;
1508 }
1509 let requested = ack.sequence_number as usize;
1511 if requested >= total_segments {
1512 tracing::warn!(
1513 seq = requested,
1514 total = total_segments,
1515 "negative SegmentAck requests out-of-range sequence, aborting"
1516 );
1517 break;
1518 }
1519 debug!(
1520 seq = ack.sequence_number,
1521 "Negative SegmentAck — retransmitting from requested sequence"
1522 );
1523 seg_idx = requested;
1524 continue;
1525 }
1526 }
1528 Ok(None) => {
1529 warn!("SegmentAck channel closed during segmented send");
1530 break;
1531 }
1532 Err(_) => {
1533 warn!(
1534 seq = seg_idx,
1535 "Timeout waiting for SegmentAck, aborting segmented send"
1536 );
1537 let abort = Apdu::Abort(AbortPdu {
1538 sent_by_server: true,
1539 invoke_id,
1540 abort_reason: AbortReason::TSM_TIMEOUT,
1541 });
1542 let mut abort_buf = BytesMut::new();
1543 encode_apdu(&mut abort_buf, &abort);
1544 let _ = network
1545 .send_apdu(&abort_buf, source_mac, false, NetworkPriority::NORMAL)
1546 .await;
1547 break;
1548 }
1549 }
1550 }
1551
1552 seg_idx += 1;
1553 }
1554
1555 match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1557 Ok(Some(_ack)) => {
1558 debug!("Received final SegmentAck for ComplexAck");
1559 }
1560 _ => {
1561 warn!("No final SegmentAck received for ComplexAck");
1562 }
1563 }
1564
1565 seg_ack_senders.lock().await.remove(&key);
1567 }
1568
1569 async fn handle_unconfirmed_request(
1571 db: &Arc<RwLock<ObjectDatabase>>,
1572 network: &Arc<NetworkLayer<T>>,
1573 config: &ServerConfig,
1574 comm_state: &Arc<AtomicU8>,
1575 req: UnconfirmedRequestPdu,
1576 received: &bacnet_network::layer::ReceivedApdu,
1577 ) {
1578 let comm = comm_state.load(Ordering::Acquire);
1579 if comm == 1 {
1580 tracing::debug!("Dropping unconfirmed service: DCC is DISABLE");
1581 return;
1582 }
1583
1584 if req.service_choice == UnconfirmedServiceChoice::WHO_IS {
1585 let who_is = match WhoIsRequest::decode(&req.service_request) {
1586 Ok(r) => r,
1587 Err(e) => {
1588 warn!(error = %e, "Failed to decode WhoIs");
1589 return;
1590 }
1591 };
1592
1593 let db = db.read().await;
1594 let device_oid = db
1595 .list_objects()
1596 .into_iter()
1597 .find(|oid| oid.object_type() == ObjectType::DEVICE);
1598
1599 if let Some(device_oid) = device_oid {
1600 let instance = device_oid.instance_number();
1601
1602 let in_range = match (who_is.low_limit, who_is.high_limit) {
1603 (Some(low), Some(high)) => instance >= low && instance <= high,
1604 _ => true,
1605 };
1606
1607 if in_range {
1608 let i_am = IAmRequest {
1609 object_identifier: device_oid,
1610 max_apdu_length: config.max_apdu_length,
1611 segmentation_supported: config.segmentation_supported,
1612 vendor_id: config.vendor_id,
1613 };
1614
1615 let mut service_buf = BytesMut::new();
1616 i_am.encode(&mut service_buf);
1617
1618 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1619 service_choice: UnconfirmedServiceChoice::I_AM,
1620 service_request: Bytes::from(service_buf.to_vec()),
1621 });
1622
1623 let mut buf = BytesMut::new();
1624 encode_apdu(&mut buf, &pdu);
1625
1626 if let Some(ref source_net) = received.source_network {
1630 if let Err(e) = network
1631 .send_apdu_routed(
1632 &buf,
1633 source_net.network,
1634 &source_net.mac_address,
1635 &received.source_mac,
1636 false,
1637 NetworkPriority::NORMAL,
1638 )
1639 .await
1640 {
1641 warn!(error = %e, "Failed to route IAm back to remote requester");
1642 }
1643 } else if let Err(e) = network
1644 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1645 .await
1646 {
1647 warn!(error = %e, "Failed to send IAm broadcast");
1648 }
1649 }
1650 }
1651 } else if req.service_choice == UnconfirmedServiceChoice::WHO_HAS {
1652 let db = db.read().await;
1653 let device_oid = db
1654 .list_objects()
1655 .into_iter()
1656 .find(|oid| oid.object_type() == ObjectType::DEVICE);
1657
1658 if let Some(device_oid) = device_oid {
1659 match handlers::handle_who_has(&db, &req.service_request, device_oid) {
1660 Ok(Some(i_have)) => {
1661 let mut service_buf = BytesMut::new();
1662 if let Err(e) = i_have.encode(&mut service_buf) {
1663 warn!(error = %e, "Failed to encode IHave");
1664 } else {
1665 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1666 service_choice: UnconfirmedServiceChoice::I_HAVE,
1667 service_request: Bytes::from(service_buf.to_vec()),
1668 });
1669
1670 let mut buf = BytesMut::new();
1671 encode_apdu(&mut buf, &pdu);
1672
1673 if let Err(e) = network
1674 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1675 .await
1676 {
1677 warn!(error = %e, "Failed to send IHave broadcast");
1678 }
1679 }
1680 }
1681 Ok(None) => { }
1682 Err(e) => {
1683 warn!(error = %e, "Failed to decode WhoHas");
1684 }
1685 }
1686 }
1687 } else if req.service_choice == UnconfirmedServiceChoice::TIME_SYNCHRONIZATION
1688 || req.service_choice == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION
1689 {
1690 debug!("Received time synchronization request");
1691 if let Some(ref callback) = config.on_time_sync {
1692 let data = TimeSyncData {
1693 raw_service_data: req.service_request.clone(),
1694 is_utc: req.service_choice
1695 == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1696 };
1697 callback(data);
1698 }
1699 } else {
1700 debug!(
1701 service = req.service_choice.to_raw(),
1702 "Ignoring unsupported unconfirmed service"
1703 );
1704 }
1705 }
1706
1707 async fn fire_event_notifications(
1716 db: &Arc<RwLock<ObjectDatabase>>,
1717 network: &Arc<NetworkLayer<T>>,
1718 comm_state: &Arc<AtomicU8>,
1719 oid: &ObjectIdentifier,
1720 ) {
1721 if comm_state.load(Ordering::Acquire) >= 1 {
1722 return;
1723 }
1724
1725 let now = std::time::SystemTime::now()
1727 .duration_since(std::time::UNIX_EPOCH)
1728 .unwrap_or_default();
1729 let total_secs = now.as_secs();
1730 let dow = ((total_secs / 86400 + 4) % 7) as u8;
1732 let today_bit = 1u8 << dow;
1733 let day_secs = (total_secs % 86400) as u32;
1734 let current_time = Time {
1735 hour: (day_secs / 3600) as u8,
1736 minute: ((day_secs % 3600) / 60) as u8,
1737 second: (day_secs % 60) as u8,
1738 hundredths: (now.subsec_millis() / 10) as u8,
1739 };
1740
1741 let (notification, recipients) = {
1742 let mut db = db.write().await;
1743
1744 let device_oid = db
1745 .list_objects()
1746 .into_iter()
1747 .find(|o| o.object_type() == ObjectType::DEVICE)
1748 .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
1749
1750 let object = match db.get_mut(oid) {
1751 Some(o) => o,
1752 None => return,
1753 };
1754
1755 let change = match object.evaluate_intrinsic_reporting() {
1756 Some(c) => c,
1757 None => return,
1758 };
1759
1760 let notification_class = object
1762 .read_property(PropertyIdentifier::NOTIFICATION_CLASS, None)
1763 .ok()
1764 .and_then(|v| match v {
1765 PropertyValue::Unsigned(n) => Some(n as u32),
1766 _ => None,
1767 })
1768 .unwrap_or(0);
1769
1770 let notify_type = object
1771 .read_property(PropertyIdentifier::NOTIFY_TYPE, None)
1772 .ok()
1773 .and_then(|v| match v {
1774 PropertyValue::Enumerated(n) => Some(n),
1775 _ => None,
1776 })
1777 .unwrap_or(NotifyType::ALARM.to_raw());
1778
1779 let priority = if change.to == bacnet_types::enums::EventState::NORMAL {
1781 200u8
1782 } else {
1783 100u8
1784 };
1785
1786 let transition = change.transition();
1787
1788 let base_notification = EventNotificationRequest {
1789 process_identifier: 0,
1790 initiating_device_identifier: device_oid,
1791 event_object_identifier: *oid,
1792 timestamp: BACnetTimeStamp::SequenceNumber(total_secs),
1793 notification_class,
1794 priority,
1795 event_type: change.event_type().to_raw(),
1796 notify_type,
1797 ack_required: notify_type != NotifyType::ACK_NOTIFICATION.to_raw(),
1798 from_state: change.from.to_raw(),
1799 to_state: change.to.to_raw(),
1800 event_values: None,
1801 };
1802
1803 let recipients = get_notification_recipients(
1805 &db,
1806 notification_class,
1807 transition,
1808 today_bit,
1809 ¤t_time,
1810 );
1811
1812 (base_notification, recipients)
1813 };
1814
1815 if recipients.is_empty() {
1816 let mut service_buf = BytesMut::new();
1818 if let Err(e) = notification.encode(&mut service_buf) {
1819 warn!(error = %e, "Failed to encode EventNotification");
1820 return;
1821 }
1822
1823 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1824 service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1825 service_request: Bytes::from(service_buf.to_vec()),
1826 });
1827
1828 let mut buf = BytesMut::new();
1829 encode_apdu(&mut buf, &pdu);
1830
1831 if let Err(e) = network
1832 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1833 .await
1834 {
1835 warn!(error = %e, "Failed to broadcast EventNotification");
1836 }
1837 } else {
1838 for (recipient, process_id, confirmed) in &recipients {
1840 let mut targeted = notification.clone();
1841 targeted.process_identifier = *process_id;
1842
1843 let mut service_buf = BytesMut::new();
1844 if let Err(e) = targeted.encode(&mut service_buf) {
1845 warn!(error = %e, "Failed to encode EventNotification");
1846 continue;
1847 }
1848
1849 let service_bytes = Bytes::from(service_buf.to_vec());
1850
1851 if *confirmed {
1852 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1853 segmented: false,
1854 more_follows: false,
1855 segmented_response_accepted: false,
1856 max_segments: None,
1857 max_apdu_length: 1476,
1858 invoke_id: 0, sequence_number: None,
1860 proposed_window_size: None,
1861 service_choice: ConfirmedServiceChoice::CONFIRMED_EVENT_NOTIFICATION,
1862 service_request: service_bytes,
1863 });
1864
1865 let mut buf = BytesMut::new();
1866 encode_apdu(&mut buf, &pdu);
1867
1868 match recipient {
1869 bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1870 if let Err(e) = network
1871 .send_apdu(&buf, &addr.mac_address, true, NetworkPriority::NORMAL)
1872 .await
1873 {
1874 warn!(error = %e, "Failed to send confirmed EventNotification");
1875 }
1876 }
1877 bacnet_types::constructed::BACnetRecipient::Device(_) => {
1878 if let Err(e) = network
1880 .broadcast_apdu(&buf, true, NetworkPriority::NORMAL)
1881 .await
1882 {
1883 warn!(error = %e, "Failed to broadcast confirmed EventNotification");
1884 }
1885 }
1886 }
1887 } else {
1888 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1889 service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1890 service_request: service_bytes,
1891 });
1892
1893 let mut buf = BytesMut::new();
1894 encode_apdu(&mut buf, &pdu);
1895
1896 match recipient {
1897 bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1898 if let Err(e) = network
1899 .send_apdu(&buf, &addr.mac_address, false, NetworkPriority::NORMAL)
1900 .await
1901 {
1902 warn!(
1903 error = %e,
1904 "Failed to send unconfirmed EventNotification"
1905 );
1906 }
1907 }
1908 bacnet_types::constructed::BACnetRecipient::Device(_) => {
1909 if let Err(e) = network
1911 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1912 .await
1913 {
1914 warn!(
1915 error = %e,
1916 "Failed to broadcast unconfirmed EventNotification"
1917 );
1918 }
1919 }
1920 }
1921 }
1922 }
1923 }
1924 }
1925
1926 #[allow(clippy::too_many_arguments)]
1935 async fn fire_cov_notifications(
1936 db: &Arc<RwLock<ObjectDatabase>>,
1937 network: &Arc<NetworkLayer<T>>,
1938 cov_table: &Arc<RwLock<CovSubscriptionTable>>,
1939 cov_in_flight: &Arc<Semaphore>,
1940 server_tsm: &Arc<Mutex<ServerTsm>>,
1941 comm_state: &Arc<AtomicU8>,
1942 config: &ServerConfig,
1943 oid: &ObjectIdentifier,
1944 ) {
1945 if comm_state.load(Ordering::Acquire) >= 1 {
1946 return;
1947 }
1948 let subs: Vec<crate::cov::CovSubscription> = {
1950 let mut table = cov_table.write().await;
1951 table.subscriptions_for(oid).into_iter().cloned().collect()
1952 };
1953
1954 if subs.is_empty() {
1955 return;
1956 }
1957
1958 let (device_oid, values, current_pv, cov_increment) = {
1960 let db = db.read().await;
1961 let object = match db.get(oid) {
1962 Some(o) => o,
1963 None => return,
1964 };
1965
1966 let cov_increment = object.cov_increment();
1967
1968 let mut current_pv: Option<f32> = None;
1969 let mut values = Vec::new();
1970 if let Ok(pv) = object.read_property(PropertyIdentifier::PRESENT_VALUE, None) {
1971 if let PropertyValue::Real(v) = &pv {
1972 current_pv = Some(*v);
1973 }
1974 let mut buf = BytesMut::new();
1975 if encode_property_value(&mut buf, &pv).is_ok() {
1976 values.push(BACnetPropertyValue {
1977 property_identifier: PropertyIdentifier::PRESENT_VALUE,
1978 property_array_index: None,
1979 value: buf.to_vec(),
1980 priority: None,
1981 });
1982 }
1983 }
1984 if let Ok(sf) = object.read_property(PropertyIdentifier::STATUS_FLAGS, None) {
1985 let mut buf = BytesMut::new();
1986 if encode_property_value(&mut buf, &sf).is_ok() {
1987 values.push(BACnetPropertyValue {
1988 property_identifier: PropertyIdentifier::STATUS_FLAGS,
1989 property_array_index: None,
1990 value: buf.to_vec(),
1991 priority: None,
1992 });
1993 }
1994 }
1995
1996 let device_oid = db
1997 .list_objects()
1998 .into_iter()
1999 .find(|o| o.object_type() == ObjectType::DEVICE)
2000 .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
2001
2002 (device_oid, values, current_pv, cov_increment)
2003 };
2004
2005 if values.is_empty() {
2006 return;
2007 }
2008
2009 for sub in &subs {
2011 if !CovSubscriptionTable::should_notify(sub, current_pv, cov_increment) {
2012 continue;
2013 }
2014 let time_remaining = sub.expires_at.map_or(0, |exp| {
2015 exp.saturating_duration_since(Instant::now()).as_secs() as u32
2016 });
2017
2018 let notification = COVNotificationRequest {
2019 subscriber_process_identifier: sub.subscriber_process_identifier,
2020 initiating_device_identifier: device_oid,
2021 monitored_object_identifier: *oid,
2022 time_remaining,
2023 list_of_values: values.clone(),
2024 };
2025
2026 let mut service_buf = BytesMut::new();
2027 notification.encode(&mut service_buf);
2028
2029 if sub.issue_confirmed_notifications {
2030 let permit = match cov_in_flight.clone().try_acquire_owned() {
2033 Ok(permit) => permit,
2034 Err(_) => {
2035 warn!(
2036 object = ?oid,
2037 "255 confirmed COV notifications in-flight, skipping notification"
2038 );
2039 continue;
2040 }
2041 };
2042
2043 let id = {
2045 let mut tsm = server_tsm.lock().await;
2046 tsm.allocate()
2047 };
2048
2049 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
2050 segmented: false,
2051 more_follows: false,
2052 segmented_response_accepted: false,
2053 max_segments: None,
2054 max_apdu_length: config.max_apdu_length as u16,
2055 invoke_id: id,
2056 sequence_number: None,
2057 proposed_window_size: None,
2058 service_choice: ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION,
2059 service_request: Bytes::from(service_buf.to_vec()),
2060 });
2061
2062 let mut buf = BytesMut::new();
2063 encode_apdu(&mut buf, &pdu);
2064
2065 if let Some(pv) = current_pv {
2067 let mut table = cov_table.write().await;
2068 table.set_last_notified_value(
2069 &sub.subscriber_mac,
2070 sub.subscriber_process_identifier,
2071 sub.monitored_object_identifier,
2072 pv,
2073 );
2074 }
2075
2076 let network = Arc::clone(network);
2077 let mac = sub.subscriber_mac.clone();
2078 let apdu_timeout = Duration::from_millis(config.cov_retry_timeout_ms);
2079 let tsm = Arc::clone(server_tsm);
2080 let apdu_retries = DEFAULT_APDU_RETRIES;
2081 tokio::spawn(async move {
2084 let _permit = permit; for attempt in 0..=apdu_retries {
2087 if let Err(e) = network
2089 .send_apdu(&buf, &mac, true, NetworkPriority::NORMAL)
2090 .await
2091 {
2092 warn!(error = %e, attempt, "COV notification send failed");
2093 } else {
2095 debug!(invoke_id = id, attempt, "Confirmed COV notification sent");
2096 }
2097
2098 let poll_interval = Duration::from_millis(50);
2101 let result = tokio::time::timeout(apdu_timeout, async {
2102 loop {
2103 tokio::time::sleep(poll_interval).await;
2104 let mut tsm = tsm.lock().await;
2105 if let Some(r) = tsm.take_result(id) {
2106 return r;
2107 }
2108 }
2109 })
2110 .await;
2111
2112 match result {
2113 Ok(CovAckResult::Ack) => {
2114 debug!(invoke_id = id, "COV notification acknowledged");
2115 return;
2116 }
2117 Ok(CovAckResult::Error) => {
2118 warn!(invoke_id = id, "COV notification rejected by subscriber");
2119 return;
2120 }
2121 Err(_) => {
2122 if attempt < apdu_retries {
2124 debug!(
2125 invoke_id = id,
2126 attempt, "COV notification timeout, retrying"
2127 );
2128 } else {
2129 warn!(
2130 invoke_id = id,
2131 "COV notification failed after {} retries", apdu_retries
2132 );
2133 }
2134 }
2135 }
2136 }
2137
2138 let mut tsm = tsm.lock().await;
2140 tsm.remove(id);
2141 });
2142 } else {
2143 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
2144 service_choice: UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION,
2145 service_request: Bytes::from(service_buf.to_vec()),
2146 });
2147
2148 let mut buf = BytesMut::new();
2149 encode_apdu(&mut buf, &pdu);
2150
2151 if let Err(e) = network
2152 .send_apdu(&buf, &sub.subscriber_mac, false, NetworkPriority::NORMAL)
2153 .await
2154 {
2155 warn!(error = %e, "Failed to send COV notification");
2156 } else if let Some(pv) = current_pv {
2157 let mut table = cov_table.write().await;
2159 table.set_last_notified_value(
2160 &sub.subscriber_mac,
2161 sub.subscriber_process_identifier,
2162 sub.monitored_object_identifier,
2163 pv,
2164 );
2165 }
2166 }
2167 }
2168 }
2169
2170 fn error_apdu_from_error(
2172 invoke_id: u8,
2173 service_choice: ConfirmedServiceChoice,
2174 error: &Error,
2175 ) -> Apdu {
2176 let (class, code) = match error {
2177 Error::Protocol { class, code } => (*class, *code),
2178 _ => (
2179 ErrorClass::SERVICES.to_raw() as u32,
2180 ErrorCode::OTHER.to_raw() as u32,
2181 ),
2182 };
2183 Apdu::Error(ErrorPdu {
2184 invoke_id,
2185 service_choice,
2186 error_class: ErrorClass::from_raw(class as u16),
2187 error_code: ErrorCode::from_raw(code as u16),
2188 error_data: Bytes::new(),
2189 })
2190 }
2191}
2192
2193#[cfg(test)]
2194mod tests {
2195 use super::*;
2196
2197 #[test]
2198 fn server_config_cov_retry_timeout_default() {
2199 let config = ServerConfig::default();
2200 assert_eq!(config.cov_retry_timeout_ms, 3000);
2201 }
2202
2203 #[test]
2204 fn server_config_time_sync_callback_default_is_none() {
2205 let config = ServerConfig::default();
2206 assert!(config.on_time_sync.is_none());
2207 }
2208
2209 #[test]
2214 fn server_tsm_allocate_increments() {
2215 let mut tsm = ServerTsm::new();
2216 assert_eq!(tsm.allocate(), 0);
2217 assert_eq!(tsm.allocate(), 1);
2218 assert_eq!(tsm.allocate(), 2);
2219 }
2220
2221 #[test]
2222 fn server_tsm_allocate_wraps_at_255() {
2223 let mut tsm = ServerTsm::new();
2224 tsm.next_invoke_id = 255;
2225 assert_eq!(tsm.allocate(), 255);
2226 assert_eq!(tsm.allocate(), 0); }
2228
2229 #[test]
2230 fn server_tsm_record_and_take_ack() {
2231 let mut tsm = ServerTsm::new();
2232 tsm.record_result(42, CovAckResult::Ack);
2233 assert_eq!(tsm.take_result(42), Some(CovAckResult::Ack));
2234 assert_eq!(tsm.take_result(42), None);
2236 }
2237
2238 #[test]
2239 fn server_tsm_record_and_take_error() {
2240 let mut tsm = ServerTsm::new();
2241 tsm.record_result(7, CovAckResult::Error);
2242 assert_eq!(tsm.take_result(7), Some(CovAckResult::Error));
2243 }
2244
2245 #[test]
2246 fn server_tsm_take_nonexistent_returns_none() {
2247 let mut tsm = ServerTsm::new();
2248 assert_eq!(tsm.take_result(99), None);
2249 }
2250
2251 #[test]
2252 fn server_tsm_remove_cleans_up() {
2253 let mut tsm = ServerTsm::new();
2254 tsm.record_result(10, CovAckResult::Ack);
2255 tsm.remove(10);
2256 assert_eq!(tsm.take_result(10), None);
2257 }
2258
2259 #[test]
2260 fn server_tsm_multiple_pending() {
2261 let mut tsm = ServerTsm::new();
2262 tsm.record_result(1, CovAckResult::Ack);
2263 tsm.record_result(2, CovAckResult::Error);
2264 tsm.record_result(3, CovAckResult::Ack);
2265
2266 assert_eq!(tsm.take_result(2), Some(CovAckResult::Error));
2267 assert_eq!(tsm.take_result(1), Some(CovAckResult::Ack));
2268 assert_eq!(tsm.take_result(3), Some(CovAckResult::Ack));
2269 }
2270
2271 #[test]
2272 fn server_tsm_overwrite_result() {
2273 let mut tsm = ServerTsm::new();
2274 tsm.record_result(5, CovAckResult::Ack);
2275 tsm.record_result(5, CovAckResult::Error);
2277 assert_eq!(tsm.take_result(5), Some(CovAckResult::Error));
2278 }
2279
2280 #[test]
2281 fn cov_ack_result_debug_and_eq() {
2282 assert_eq!(CovAckResult::Ack, CovAckResult::Ack);
2284 assert_ne!(CovAckResult::Ack, CovAckResult::Error);
2285 let _debug = format!("{:?}", CovAckResult::Ack);
2286 }
2287
2288 #[test]
2289 fn default_apdu_retries_constant() {
2290 assert_eq!(DEFAULT_APDU_RETRIES, 3);
2291 }
2292
2293 #[test]
2294 fn seg_receiver_timeout_is_4s() {
2295 assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2296 }
2297
2298 #[test]
2299 fn max_neg_segment_ack_retries_constant() {
2300 assert_eq!(MAX_NEG_SEGMENT_ACK_RETRIES, 3);
2301 }
2302}