1use std::collections::HashMap;
8use std::net::Ipv4Addr;
9use std::sync::atomic::{AtomicU8, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{mpsc, oneshot, Mutex, RwLock, Semaphore};
15use tokio::task::JoinHandle;
16use tokio::time::Duration;
17use tracing::{debug, warn};
18
19use bacnet_encoding::apdu::{
20 self, encode_apdu, AbortPdu, Apdu, ComplexAck, ConfirmedRequest as ConfirmedRequestPdu,
21 ErrorPdu, RejectPdu, SegmentAck as SegmentAckPdu, SimpleAck,
22 UnconfirmedRequest as UnconfirmedRequestPdu,
23};
24use bacnet_encoding::primitives::encode_property_value;
25use bacnet_encoding::segmentation::{
26 max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType,
27};
28use bacnet_network::layer::NetworkLayer;
29use bacnet_objects::database::ObjectDatabase;
30use bacnet_objects::notification_class::get_notification_recipients;
31use bacnet_services::alarm_event::EventNotificationRequest;
32use bacnet_services::common::BACnetPropertyValue;
33use bacnet_services::cov::COVNotificationRequest;
34use bacnet_services::who_is::{IAmRequest, WhoIsRequest};
35use bacnet_transport::bip::BipTransport;
36use bacnet_transport::port::TransportPort;
37use bacnet_types::enums::{
38 AbortReason, ConfirmedServiceChoice, ErrorClass, ErrorCode, NetworkPriority, NotifyType,
39 ObjectType, PropertyIdentifier, RejectReason, Segmentation, UnconfirmedServiceChoice,
40};
41use bacnet_types::error::Error;
42use bacnet_types::primitives::{BACnetTimeStamp, ObjectIdentifier, PropertyValue, Time};
43use bacnet_types::MacAddr;
44
45use crate::cov::CovSubscriptionTable;
46use crate::handlers;
47
48const MAX_SEG_RECEIVERS: usize = 128;
50
51const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
53
54const MAX_NEG_SEGMENT_ACK_RETRIES: u8 = 3;
56
57const DEFAULT_APDU_RETRIES: u8 = 3;
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CovAckResult {
67 Ack,
69 Error,
71}
72
73pub struct ServerTsm {
80 next_invoke_id: u8,
81 pending: HashMap<u8, oneshot::Sender<CovAckResult>>,
84}
85
86impl ServerTsm {
87 fn new() -> Self {
88 Self {
89 next_invoke_id: 0,
90 pending: HashMap::new(),
91 }
92 }
93
94 fn allocate(&mut self) -> (u8, oneshot::Receiver<CovAckResult>) {
97 let id = self.next_invoke_id;
98 self.next_invoke_id = self.next_invoke_id.wrapping_add(1);
99 let (tx, rx) = oneshot::channel();
100 self.pending.insert(id, tx);
101 (id, rx)
102 }
103
104 fn record_result(&mut self, invoke_id: u8, result: CovAckResult) {
107 if let Some(tx) = self.pending.remove(&invoke_id) {
108 let _ = tx.send(result);
109 }
110 }
111
112 fn remove(&mut self, invoke_id: u8) {
114 self.pending.remove(&invoke_id);
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct TimeSyncData {
121 pub raw_service_data: Bytes,
123 pub is_utc: bool,
125}
126
127#[derive(Clone)]
129pub struct ServerConfig {
130 pub interface: Ipv4Addr,
132 pub port: u16,
134 pub broadcast_address: Ipv4Addr,
136 pub max_apdu_length: u32,
138 pub segmentation_supported: Segmentation,
140 pub vendor_id: u16,
142 pub cov_retry_timeout_ms: u64,
144 pub on_time_sync: Option<Arc<dyn Fn(TimeSyncData) + Send + Sync>>,
146 pub dcc_password: Option<String>,
148 pub reinit_password: Option<String>,
150 pub enable_fault_detection: bool,
154}
155
156impl std::fmt::Debug for ServerConfig {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("ServerConfig")
159 .field("interface", &self.interface)
160 .field("port", &self.port)
161 .field("broadcast_address", &self.broadcast_address)
162 .field("max_apdu_length", &self.max_apdu_length)
163 .field("segmentation_supported", &self.segmentation_supported)
164 .field("vendor_id", &self.vendor_id)
165 .field("cov_retry_timeout_ms", &self.cov_retry_timeout_ms)
166 .field(
167 "on_time_sync",
168 &self.on_time_sync.as_ref().map(|_| "<callback>"),
169 )
170 .field("dcc_password", &self.dcc_password.as_ref().map(|_| "***"))
171 .field(
172 "reinit_password",
173 &self.reinit_password.as_ref().map(|_| "***"),
174 )
175 .field("enable_fault_detection", &self.enable_fault_detection)
176 .finish()
177 }
178}
179
180impl Default for ServerConfig {
181 fn default() -> Self {
182 Self {
183 interface: Ipv4Addr::UNSPECIFIED,
184 port: 0xBAC0,
185 broadcast_address: Ipv4Addr::BROADCAST,
186 max_apdu_length: 1476,
187 segmentation_supported: Segmentation::NONE,
188 vendor_id: 0,
189 cov_retry_timeout_ms: 3000,
190 on_time_sync: None,
191 dcc_password: None,
192 reinit_password: None,
193 enable_fault_detection: false,
194 }
195 }
196}
197
198pub struct ServerBuilder<T: TransportPort> {
200 config: ServerConfig,
201 db: ObjectDatabase,
202 transport: Option<T>,
203}
204
205impl<T: TransportPort + 'static> ServerBuilder<T> {
206 pub fn database(mut self, db: ObjectDatabase) -> Self {
208 self.db = db;
209 self
210 }
211
212 pub fn transport(mut self, transport: T) -> Self {
214 self.transport = Some(transport);
215 self
216 }
217
218 pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
220 self.config.dcc_password = Some(password.into());
221 self
222 }
223
224 pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
226 self.config.reinit_password = Some(password.into());
227 self
228 }
229
230 pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
232 self.config.enable_fault_detection = enabled;
233 self
234 }
235
236 pub fn vendor_id(mut self, id: u16) -> Self {
238 self.config.vendor_id = id;
239 self
240 }
241
242 pub async fn build(self) -> Result<BACnetServer<T>, Error> {
244 let transport = self
245 .transport
246 .ok_or_else(|| Error::Encoding("transport not set on ServerBuilder".into()))?;
247 BACnetServer::start(self.config, self.db, transport).await
248 }
249}
250
251pub struct BipServerBuilder {
253 config: ServerConfig,
254 db: ObjectDatabase,
255}
256
257impl BipServerBuilder {
258 pub fn interface(mut self, ip: Ipv4Addr) -> Self {
260 self.config.interface = ip;
261 self
262 }
263
264 pub fn port(mut self, port: u16) -> Self {
266 self.config.port = port;
267 self
268 }
269
270 pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
272 self.config.broadcast_address = addr;
273 self
274 }
275
276 pub fn database(mut self, db: ObjectDatabase) -> Self {
278 self.db = db;
279 self
280 }
281
282 pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
284 self.config.dcc_password = Some(password.into());
285 self
286 }
287
288 pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
290 self.config.reinit_password = Some(password.into());
291 self
292 }
293
294 pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
296 self.config.enable_fault_detection = enabled;
297 self
298 }
299
300 pub async fn build(self) -> Result<BACnetServer<BipTransport>, Error> {
302 let transport = BipTransport::new(
303 self.config.interface,
304 self.config.port,
305 self.config.broadcast_address,
306 );
307 BACnetServer::start(self.config, self.db, transport).await
308 }
309}
310
311type SegKey = (MacAddr, u8);
313
314pub struct BACnetServer<T: TransportPort> {
316 #[allow(dead_code)]
317 config: ServerConfig,
318 #[allow(dead_code)]
320 network: Arc<NetworkLayer<T>>,
321 db: Arc<RwLock<ObjectDatabase>>,
323 #[allow(dead_code)]
325 cov_table: Arc<RwLock<CovSubscriptionTable>>,
326 #[allow(dead_code)]
328 seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
329 #[allow(dead_code)]
332 cov_in_flight: Arc<Semaphore>,
333 #[allow(dead_code)]
335 server_tsm: Arc<Mutex<ServerTsm>>,
336 comm_state: Arc<AtomicU8>,
338 #[allow(dead_code)]
341 dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>>,
342 dispatch_task: Option<JoinHandle<()>>,
343 cov_purge_task: Option<JoinHandle<()>>,
344 fault_detection_task: Option<JoinHandle<()>>,
345 event_enrollment_task: Option<JoinHandle<()>>,
346 trend_log_task: Option<JoinHandle<()>>,
347 schedule_tick_task: Option<JoinHandle<()>>,
348 local_mac: MacAddr,
349}
350
351impl BACnetServer<BipTransport> {
352 pub fn bip_builder() -> BipServerBuilder {
354 BipServerBuilder {
355 config: ServerConfig::default(),
356 db: ObjectDatabase::new(),
357 }
358 }
359
360 pub fn builder() -> BipServerBuilder {
362 Self::bip_builder()
363 }
364}
365
366#[cfg(feature = "sc-tls")]
367impl BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
368 pub fn sc_builder() -> ScServerBuilder {
370 ScServerBuilder {
371 config: ServerConfig::default(),
372 db: ObjectDatabase::new(),
373 hub_url: String::new(),
374 tls_config: None,
375 vmac: [0; 6],
376 heartbeat_interval_ms: 30_000,
377 heartbeat_timeout_ms: 60_000,
378 reconnect: None,
379 }
380 }
381}
382
383#[cfg(feature = "sc-tls")]
387pub struct ScServerBuilder {
388 config: ServerConfig,
389 db: ObjectDatabase,
390 hub_url: String,
391 tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
392 vmac: bacnet_transport::sc_frame::Vmac,
393 heartbeat_interval_ms: u64,
394 heartbeat_timeout_ms: u64,
395 reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
396}
397
398#[cfg(feature = "sc-tls")]
399impl ScServerBuilder {
400 pub fn hub_url(mut self, url: &str) -> Self {
402 self.hub_url = url.to_string();
403 self
404 }
405
406 pub fn tls_config(
408 mut self,
409 config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
410 ) -> Self {
411 self.tls_config = Some(config);
412 self
413 }
414
415 pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
417 self.vmac = vmac;
418 self
419 }
420
421 pub fn database(mut self, db: ObjectDatabase) -> Self {
423 self.db = db;
424 self
425 }
426
427 pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
429 self.heartbeat_interval_ms = ms;
430 self
431 }
432
433 pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
435 self.heartbeat_timeout_ms = ms;
436 self
437 }
438
439 pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
441 self.reconnect = Some(config);
442 self
443 }
444
445 pub fn dcc_password(mut self, password: impl Into<String>) -> Self {
447 self.config.dcc_password = Some(password.into());
448 self
449 }
450
451 pub fn reinit_password(mut self, password: impl Into<String>) -> Self {
453 self.config.reinit_password = Some(password.into());
454 self
455 }
456
457 pub fn enable_fault_detection(mut self, enabled: bool) -> Self {
459 self.config.enable_fault_detection = enabled;
460 self
461 }
462
463 pub async fn build(
465 self,
466 ) -> Result<
467 BACnetServer<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
468 Error,
469 > {
470 let tls_config = self
471 .tls_config
472 .ok_or_else(|| Error::Encoding("SC server builder: tls_config is required".into()))?;
473
474 let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
475
476 let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
477 .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
478 .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
479 if let Some(rc) = self.reconnect {
480 #[allow(deprecated)]
481 {
482 transport = transport.with_reconnect(rc);
483 }
484 }
485
486 BACnetServer::start(self.config, self.db, transport).await
487 }
488}
489
490impl<T: TransportPort + 'static> BACnetServer<T> {
491 pub fn generic_builder() -> ServerBuilder<T> {
493 ServerBuilder {
494 config: ServerConfig::default(),
495 db: ObjectDatabase::new(),
496 transport: None,
497 }
498 }
499
500 pub async fn start(
502 mut config: ServerConfig,
503 db: ObjectDatabase,
504 transport: T,
505 ) -> Result<Self, Error> {
506 let transport_max = transport.max_apdu_length() as u32;
507 config.max_apdu_length = config.max_apdu_length.min(transport_max);
508
509 if config.vendor_id == 0 {
510 warn!("vendor_id is 0 (ASHRAE reserved); set a valid vendor ID for production use");
511 }
512
513 let mut network = NetworkLayer::new(transport);
514 let apdu_rx = network.start().await?;
515 let local_mac = MacAddr::from_slice(network.local_mac());
516
517 let network = Arc::new(network);
518 let db = Arc::new(RwLock::new(db));
519 let cov_table = Arc::new(RwLock::new(CovSubscriptionTable::new()));
520 let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
521 Arc::new(Mutex::new(HashMap::new()));
522
523 let cov_in_flight = Arc::new(Semaphore::new(255));
524 let server_tsm = Arc::new(Mutex::new(ServerTsm::new()));
525 let comm_state = Arc::new(AtomicU8::new(0)); let dcc_timer: Arc<Mutex<Option<JoinHandle<()>>>> = Arc::new(Mutex::new(None));
527
528 let network_dispatch = Arc::clone(&network);
529 let db_dispatch = Arc::clone(&db);
530 let cov_dispatch = Arc::clone(&cov_table);
531 let seg_ack_dispatch = Arc::clone(&seg_ack_senders);
532 let cov_in_flight_dispatch = Arc::clone(&cov_in_flight);
533 let server_tsm_dispatch = Arc::clone(&server_tsm);
534 let comm_state_dispatch = Arc::clone(&comm_state);
535 let dcc_timer_dispatch = Arc::clone(&dcc_timer);
536 let config_dispatch = Arc::new(config.clone());
537
538 let dispatch_task = tokio::spawn(async move {
539 let mut apdu_rx = apdu_rx;
540 let mut seg_receivers: HashMap<
541 SegKey,
542 (
543 SegmentReceiver,
544 bacnet_encoding::apdu::ConfirmedRequest,
545 Instant,
546 ),
547 > = HashMap::new();
548
549 while let Some(received) = apdu_rx.recv().await {
550 let now = Instant::now();
551 seg_receivers.retain(|_key, (_rx, _req, last_activity)| {
552 now.duration_since(*last_activity) < SEG_RECEIVER_TIMEOUT
553 });
554
555 match apdu::decode_apdu(received.apdu.clone()) {
556 Ok(decoded) => {
557 let source_mac = received.source_mac.clone();
558 let mut received = Some(received);
559 let handled = if let Apdu::ConfirmedRequest(ref req) = decoded {
560 if req.segmented {
561 let seq = req.sequence_number.unwrap_or(0);
562 let key: SegKey = (source_mac.clone(), req.invoke_id);
563
564 if seq == 0 {
565 if seg_receivers.len() >= MAX_SEG_RECEIVERS {
567 let abort_pdu = Apdu::Abort(AbortPdu {
568 sent_by_server: true,
569 invoke_id: req.invoke_id,
570 abort_reason: AbortReason::BUFFER_OVERFLOW,
571 });
572 let mut abort_buf = BytesMut::new();
573 encode_apdu(&mut abort_buf, &abort_pdu);
574 let _ = network_dispatch
575 .send_apdu(
576 &abort_buf,
577 &source_mac,
578 false,
579 NetworkPriority::NORMAL,
580 )
581 .await;
582 continue;
583 }
584 let mut receiver = SegmentReceiver::new();
585 if let Err(e) =
586 receiver.receive(seq, req.service_request.clone())
587 {
588 warn!(error = %e, "Rejecting oversized segment");
589 continue;
590 }
591 seg_receivers.insert(
592 key.clone(),
593 (receiver, req.clone(), Instant::now()),
594 );
595 } else if let Some((receiver, _, last_activity)) =
596 seg_receivers.get_mut(&key)
597 {
598 if let Err(e) =
599 receiver.receive(seq, req.service_request.clone())
600 {
601 warn!(error = %e, "Rejecting oversized segment");
602 continue;
603 }
604 *last_activity = Instant::now();
605 } else {
606 warn!(
607 invoke_id = req.invoke_id,
608 seq = seq,
609 "Received non-initial segment without \
610 prior segment 0, aborting"
611 );
612 let abort_pdu = Apdu::Abort(AbortPdu {
613 sent_by_server: true,
614 invoke_id: req.invoke_id,
615 abort_reason: AbortReason::INVALID_APDU_IN_THIS_STATE,
616 });
617 let mut abort_buf = BytesMut::new();
618 encode_apdu(&mut abort_buf, &abort_pdu);
619 let _ = network_dispatch
620 .send_apdu(
621 &abort_buf,
622 &source_mac,
623 false,
624 NetworkPriority::NORMAL,
625 )
626 .await;
627 continue;
628 }
629
630 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
631 negative_ack: false,
632 sent_by_server: true,
633 invoke_id: req.invoke_id,
634 sequence_number: seq,
635 actual_window_size: 1,
636 });
637 let mut ack_buf = BytesMut::new();
638 encode_apdu(&mut ack_buf, &seg_ack);
639 if let Err(e) = network_dispatch
640 .send_apdu(
641 &ack_buf,
642 &source_mac,
643 false,
644 NetworkPriority::NORMAL,
645 )
646 .await
647 {
648 warn!(
649 error = %e,
650 "Failed to send SegmentAck for \
651 segmented request"
652 );
653 }
654
655 if !req.more_follows {
656 if let Some((receiver, first_req, _)) =
657 seg_receivers.remove(&key)
658 {
659 let total = receiver.received_count();
660 match receiver.reassemble(total) {
661 Ok(full_data) => {
662 let reassembled =
663 bacnet_encoding::apdu::ConfirmedRequest {
664 segmented: false,
665 more_follows: false,
666 sequence_number: None,
667 proposed_window_size: None,
668 service_request: Bytes::from(full_data),
669 invoke_id: first_req.invoke_id,
670 service_choice: first_req.service_choice,
671 max_apdu_length: first_req.max_apdu_length,
672 segmented_response_accepted: first_req
673 .segmented_response_accepted,
674 max_segments: first_req.max_segments,
675 };
676 debug!(
677 invoke_id = reassembled.invoke_id,
678 segments = total,
679 payload_len = reassembled.service_request.len(),
680 "Reassembled segmented ConfirmedRequest"
681 );
682 Self::dispatch(
683 &db_dispatch,
684 &network_dispatch,
685 &cov_dispatch,
686 &seg_ack_dispatch,
687 &cov_in_flight_dispatch,
688 &server_tsm_dispatch,
689 &comm_state_dispatch,
690 &dcc_timer_dispatch,
691 &config_dispatch,
692 &source_mac,
693 Apdu::ConfirmedRequest(reassembled),
694 received
695 .take()
696 .unwrap_or_else(|| {
697 warn!("received consumed twice — using empty fallback");
698 bacnet_network::layer::ReceivedApdu {
699 apdu: bytes::Bytes::new(),
700 source_mac: bacnet_types::MacAddr::new(),
701 source_network: None,
702 reply_tx: None,
703 }
704 }),
705 )
706 .await;
707 }
708 Err(e) => {
709 warn!(
710 error = %e,
711 "Failed to reassemble \
712 segmented request"
713 );
714 }
715 }
716 }
717 }
718
719 true
720 } else {
721 false
722 }
723 } else {
724 false
725 };
726
727 if !handled {
728 Self::dispatch(
729 &db_dispatch,
730 &network_dispatch,
731 &cov_dispatch,
732 &seg_ack_dispatch,
733 &cov_in_flight_dispatch,
734 &server_tsm_dispatch,
735 &comm_state_dispatch,
736 &dcc_timer_dispatch,
737 &config_dispatch,
738 &source_mac,
739 decoded,
740 received.take().unwrap_or_else(|| {
741 warn!("received consumed twice — using empty fallback");
742 bacnet_network::layer::ReceivedApdu {
743 apdu: bytes::Bytes::new(),
744 source_mac: bacnet_types::MacAddr::new(),
745 source_network: None,
746 reply_tx: None,
747 }
748 }),
749 )
750 .await;
751 }
752 }
753 Err(e) => {
754 warn!(error = %e, "Server failed to decode received APDU");
755 }
756 }
757 }
758 });
759
760 let cov_table_for_purge = Arc::clone(&cov_table);
761 let cov_purge_task = tokio::spawn(async move {
762 let mut interval = tokio::time::interval(Duration::from_secs(30));
763 loop {
764 interval.tick().await;
765 let mut table = cov_table_for_purge.write().await;
766 let purged = table.purge_expired();
767 if purged > 0 {
768 debug!(purged, "Purged expired COV subscriptions");
769 }
770 }
771 });
772
773 let fault_detection_task = if config.enable_fault_detection {
774 let db_fault = Arc::clone(&db);
775 Some(tokio::spawn(async move {
776 let detector = crate::fault_detection::FaultDetector::default();
777 let mut interval = tokio::time::interval(Duration::from_secs(10));
778 loop {
779 interval.tick().await;
780 let mut db_guard = db_fault.write().await;
781 let changes = detector.evaluate(&mut db_guard);
782 for change in &changes {
783 debug!(
784 object = %change.object_id,
785 old = change.old_reliability,
786 new = change.new_reliability,
787 "Fault detection: reliability changed"
788 );
789 }
790 }
791 }))
792 } else {
793 None
794 };
795
796 let event_enrollment_task = if config.enable_fault_detection {
797 let db_ee = Arc::clone(&db);
798 Some(tokio::spawn(async move {
799 let mut interval = tokio::time::interval(Duration::from_secs(10));
800 loop {
801 interval.tick().await;
802 let mut db_guard = db_ee.write().await;
803 let transitions =
804 crate::event_enrollment::evaluate_event_enrollments(&mut db_guard);
805 for t in &transitions {
806 debug!(
807 enrollment = %t.enrollment_oid,
808 monitored = %t.monitored_oid,
809 from = ?t.change.from,
810 to = ?t.change.to,
811 "Event enrollment: state changed"
812 );
813 }
814 }
815 }))
816 } else {
817 None
818 };
819
820 let db_trend = Arc::clone(&db);
821 let trend_log_state: crate::trend_log::TrendLogState =
822 Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new()));
823 let trend_log_task = Some(tokio::spawn(async move {
824 let mut interval = tokio::time::interval(Duration::from_secs(1));
825 loop {
826 interval.tick().await;
827 crate::trend_log::poll_trend_logs(&db_trend, &trend_log_state).await;
828 }
829 }));
830
831 let db_schedule = Arc::clone(&db);
832 let schedule_tick_task = Some(tokio::spawn(async move {
833 let mut interval = tokio::time::interval(Duration::from_secs(60));
834 loop {
835 interval.tick().await;
836 crate::schedule::tick_schedules(&db_schedule, 0).await;
838 }
839 }));
840
841 Ok(Self {
842 config,
843 network,
844 db,
845 cov_table,
846 seg_ack_senders,
847 cov_in_flight,
848 server_tsm,
849 comm_state,
850 dcc_timer,
851 dispatch_task: Some(dispatch_task),
852 cov_purge_task: Some(cov_purge_task),
853 fault_detection_task,
854 event_enrollment_task,
855 trend_log_task,
856 schedule_tick_task,
857 local_mac,
858 })
859 }
860
861 pub fn local_mac(&self) -> &[u8] {
863 &self.local_mac
864 }
865
866 pub fn database(&self) -> &Arc<RwLock<ObjectDatabase>> {
868 &self.db
869 }
870
871 pub fn comm_state(&self) -> u8 {
875 self.comm_state.load(Ordering::Acquire)
876 }
877
878 pub async fn generate_pics(&self, pics_config: &crate::pics::PicsConfig) -> crate::pics::Pics {
883 let db = self.db.read().await;
884 crate::pics::PicsGenerator::new(&db, &self.config, pics_config).generate()
885 }
886
887 pub async fn stop(&mut self) -> Result<(), Error> {
889 if let Some(task) = self.fault_detection_task.take() {
890 task.abort();
891 let _ = task.await;
892 }
893 if let Some(task) = self.event_enrollment_task.take() {
894 task.abort();
895 let _ = task.await;
896 }
897 if let Some(task) = self.trend_log_task.take() {
898 task.abort();
899 let _ = task.await;
900 }
901 if let Some(task) = self.schedule_tick_task.take() {
902 task.abort();
903 let _ = task.await;
904 }
905 if let Some(task) = self.cov_purge_task.take() {
906 task.abort();
907 let _ = task.await;
908 }
909 if let Some(task) = self.dispatch_task.take() {
910 task.abort();
911 let _ = task.await;
912 }
913 Ok(())
914 }
915
916 #[allow(clippy::too_many_arguments)]
927 async fn dispatch(
928 db: &Arc<RwLock<ObjectDatabase>>,
929 network: &Arc<NetworkLayer<T>>,
930 cov_table: &Arc<RwLock<CovSubscriptionTable>>,
931 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
932 cov_in_flight: &Arc<Semaphore>,
933 server_tsm: &Arc<Mutex<ServerTsm>>,
934 comm_state: &Arc<AtomicU8>,
935 dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
936 config: &Arc<ServerConfig>,
937 source_mac: &[u8],
938 apdu: Apdu,
939 mut received: bacnet_network::layer::ReceivedApdu,
940 ) {
941 match apdu {
942 Apdu::ConfirmedRequest(req) => {
943 let reply_tx = received.reply_tx.take();
944 let db = Arc::clone(db);
945 let network = Arc::clone(network);
946 let cov_table = Arc::clone(cov_table);
947 let seg_ack_senders = Arc::clone(seg_ack_senders);
948 let cov_in_flight = Arc::clone(cov_in_flight);
949 let server_tsm = Arc::clone(server_tsm);
950 let comm_state = Arc::clone(comm_state);
951 let dcc_timer = Arc::clone(dcc_timer);
952 let config = Arc::clone(config);
953 let source_mac = MacAddr::from_slice(source_mac);
954 tokio::spawn(async move {
955 Self::handle_confirmed_request(
956 &db,
957 &network,
958 &cov_table,
959 &seg_ack_senders,
960 &cov_in_flight,
961 &server_tsm,
962 &comm_state,
963 &dcc_timer,
964 &config,
965 &source_mac,
966 req,
967 reply_tx,
968 )
969 .await;
970 });
971 }
972 Apdu::UnconfirmedRequest(req) => {
973 let db = Arc::clone(db);
974 let network = Arc::clone(network);
975 let config = Arc::clone(config);
976 let comm_state = Arc::clone(comm_state);
977 tokio::spawn(async move {
978 Self::handle_unconfirmed_request(
979 &db,
980 &network,
981 &config,
982 &comm_state,
983 req,
984 &received,
985 )
986 .await;
987 });
988 }
989 Apdu::SimpleAck(sa) => {
991 let mut tsm = server_tsm.lock().await;
992 tsm.record_result(sa.invoke_id, CovAckResult::Ack);
993 debug!(
994 invoke_id = sa.invoke_id,
995 "SimpleAck received for outgoing confirmed notification"
996 );
997 }
998 Apdu::Error(err) => {
999 let mut tsm = server_tsm.lock().await;
1000 tsm.record_result(err.invoke_id, CovAckResult::Error);
1001 debug!(
1002 invoke_id = err.invoke_id,
1003 error_class = err.error_class.to_raw(),
1004 error_code = err.error_code.to_raw(),
1005 "Error received for outgoing confirmed notification"
1006 );
1007 }
1008 Apdu::Reject(rej) => {
1009 let mut tsm = server_tsm.lock().await;
1010 tsm.record_result(rej.invoke_id, CovAckResult::Error);
1011 debug!(
1012 invoke_id = rej.invoke_id,
1013 "Reject received for outgoing confirmed notification"
1014 );
1015 }
1016 Apdu::Abort(abort) if !abort.sent_by_server => {
1017 let mut tsm = server_tsm.lock().await;
1018 tsm.record_result(abort.invoke_id, CovAckResult::Error);
1019 debug!(
1020 invoke_id = abort.invoke_id,
1021 "Abort received for outgoing confirmed notification"
1022 );
1023 }
1024 Apdu::SegmentAck(sa) => {
1025 let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
1026 let senders = seg_ack_senders.lock().await;
1027 if let Some(tx) = senders.get(&key) {
1028 let _ = tx.try_send(sa);
1029 } else {
1030 debug!(
1031 invoke_id = sa.invoke_id,
1032 "Server ignoring SegmentAck for unknown transaction"
1033 );
1034 }
1035 }
1036 _ => {
1037 debug!("Server ignoring unhandled APDU type");
1038 }
1039 }
1040 }
1041
1042 #[allow(clippy::too_many_arguments)]
1044 async fn handle_confirmed_request(
1045 db: &Arc<RwLock<ObjectDatabase>>,
1046 network: &Arc<NetworkLayer<T>>,
1047 cov_table: &Arc<RwLock<CovSubscriptionTable>>,
1048 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1049 cov_in_flight: &Arc<Semaphore>,
1050 server_tsm: &Arc<Mutex<ServerTsm>>,
1051 comm_state: &Arc<AtomicU8>,
1052 dcc_timer: &Arc<Mutex<Option<JoinHandle<()>>>>,
1053 config: &ServerConfig,
1054 source_mac: &[u8],
1055 req: bacnet_encoding::apdu::ConfirmedRequest,
1056 reply_tx: Option<tokio::sync::oneshot::Sender<Bytes>>,
1057 ) {
1058 let invoke_id = req.invoke_id;
1059 let service_choice = req.service_choice;
1060 let client_max_apdu = req.max_apdu_length;
1061 let client_accepts_segmented = req.segmented_response_accepted;
1062 let mut written_oids: Vec<ObjectIdentifier> = Vec::new();
1063
1064 let state = comm_state.load(Ordering::Acquire);
1065 if state == 1
1066 && service_choice != ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL
1067 && service_choice != ConfirmedServiceChoice::REINITIALIZE_DEVICE
1068 {
1069 debug!(
1070 service = service_choice.to_raw(),
1071 "DCC DISABLE: dropping confirmed request"
1072 );
1073 return;
1074 }
1075
1076 let complex_ack = |ack_buf: BytesMut| -> Apdu {
1077 Apdu::ComplexAck(ComplexAck {
1078 segmented: false,
1079 more_follows: false,
1080 invoke_id,
1081 sequence_number: None,
1082 proposed_window_size: None,
1083 service_choice,
1084 service_ack: ack_buf.freeze(),
1085 })
1086 };
1087 let simple_ack = || -> Apdu {
1088 Apdu::SimpleAck(SimpleAck {
1089 invoke_id,
1090 service_choice,
1091 })
1092 };
1093
1094 let mut ack_buf = BytesMut::with_capacity(512);
1095 let response = match service_choice {
1096 s if s == ConfirmedServiceChoice::READ_PROPERTY => {
1097 let db = db.read().await;
1098 match handlers::handle_read_property(&db, &req.service_request, &mut ack_buf) {
1099 Ok(()) => complex_ack(ack_buf),
1100 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1101 }
1102 }
1103 s if s == ConfirmedServiceChoice::WRITE_PROPERTY => {
1104 let result = {
1105 let mut db = db.write().await;
1106 handlers::handle_write_property(&mut db, &req.service_request)
1107 };
1108 match result {
1109 Ok(oid) => {
1110 written_oids.push(oid);
1111 simple_ack()
1112 }
1113 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1114 }
1115 }
1116 s if s == ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE => {
1117 let db = db.read().await;
1118 match handlers::handle_read_property_multiple(
1119 &db,
1120 &req.service_request,
1121 &mut ack_buf,
1122 ) {
1123 Ok(()) => complex_ack(ack_buf),
1124 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1125 }
1126 }
1127 s if s == ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE => {
1128 let result = {
1129 let mut db = db.write().await;
1130 handlers::handle_write_property_multiple(&mut db, &req.service_request)
1131 };
1132 match result {
1133 Ok(oids) => {
1134 written_oids = oids;
1135 simple_ack()
1136 }
1137 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1138 }
1139 }
1140 s if s == ConfirmedServiceChoice::SUBSCRIBE_COV => {
1141 let db = db.read().await;
1142 let mut table = cov_table.write().await;
1143 match handlers::handle_subscribe_cov(
1144 &mut table,
1145 &db,
1146 source_mac,
1147 &req.service_request,
1148 ) {
1149 Ok(()) => simple_ack(),
1150 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1151 }
1152 }
1153 s if s == ConfirmedServiceChoice::SUBSCRIBE_COV_PROPERTY => {
1154 let db = db.read().await;
1155 let mut table = cov_table.write().await;
1156 match handlers::handle_subscribe_cov_property(
1157 &mut table,
1158 &db,
1159 source_mac,
1160 &req.service_request,
1161 ) {
1162 Ok(()) => simple_ack(),
1163 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1164 }
1165 }
1166 s if s == ConfirmedServiceChoice::CREATE_OBJECT => {
1167 let result = {
1168 let mut db = db.write().await;
1169 handlers::handle_create_object(&mut db, &req.service_request, &mut ack_buf)
1170 };
1171 match result {
1172 Ok(()) => complex_ack(ack_buf),
1173 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1174 }
1175 }
1176 s if s == ConfirmedServiceChoice::DELETE_OBJECT => {
1177 let deleted_oid =
1178 bacnet_services::object_mgmt::DeleteObjectRequest::decode(&req.service_request)
1179 .ok()
1180 .map(|r| r.object_identifier);
1181
1182 let result = {
1183 let mut db = db.write().await;
1184 handlers::handle_delete_object(&mut db, &req.service_request)
1185 };
1186 match result {
1187 Ok(()) => {
1188 if let Some(oid) = deleted_oid {
1190 let mut table = cov_table.write().await;
1191 table.remove_for_object(oid);
1192 }
1193 simple_ack()
1194 }
1195 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1196 }
1197 }
1198 s if s == ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL => {
1199 match handlers::handle_device_communication_control(
1200 &req.service_request,
1201 comm_state,
1202 &config.dcc_password,
1203 ) {
1204 Ok((_state, duration)) => {
1205 if let Some(prev) = dcc_timer.lock().await.take() {
1206 prev.abort();
1207 }
1208 if let Some(minutes) = duration {
1209 let comm = Arc::clone(comm_state);
1210 let handle = tokio::spawn(async move {
1211 tokio::time::sleep(std::time::Duration::from_secs(
1212 minutes as u64 * 60,
1213 ))
1214 .await;
1215 comm.store(0, Ordering::Release);
1216 tracing::debug!(
1217 "DCC timer expired after {} min, state reverted to ENABLE",
1218 minutes
1219 );
1220 });
1221 *dcc_timer.lock().await = Some(handle);
1222 }
1223 simple_ack()
1224 }
1225 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1226 }
1227 }
1228 s if s == ConfirmedServiceChoice::REINITIALIZE_DEVICE => {
1229 match handlers::handle_reinitialize_device(
1230 &req.service_request,
1231 &config.reinit_password,
1232 ) {
1233 Ok(()) => simple_ack(),
1234 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1235 }
1236 }
1237 s if s == ConfirmedServiceChoice::GET_EVENT_INFORMATION => {
1238 let db = db.read().await;
1239 match handlers::handle_get_event_information(
1240 &db,
1241 &req.service_request,
1242 &mut ack_buf,
1243 ) {
1244 Ok(()) => complex_ack(ack_buf),
1245 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1246 }
1247 }
1248 s if s == ConfirmedServiceChoice::ACKNOWLEDGE_ALARM => {
1249 let mut db = db.write().await;
1250 match handlers::handle_acknowledge_alarm(&mut db, &req.service_request) {
1251 Ok(()) => simple_ack(),
1252 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1253 }
1254 }
1255 s if s == ConfirmedServiceChoice::READ_RANGE => {
1256 let db = db.read().await;
1257 match handlers::handle_read_range(&db, &req.service_request, &mut ack_buf) {
1258 Ok(()) => complex_ack(ack_buf),
1259 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1260 }
1261 }
1262 s if s == ConfirmedServiceChoice::ATOMIC_READ_FILE => {
1263 let db = db.read().await;
1264 match handlers::handle_atomic_read_file(&db, &req.service_request, &mut ack_buf) {
1265 Ok(()) => complex_ack(ack_buf),
1266 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1267 }
1268 }
1269 s if s == ConfirmedServiceChoice::ATOMIC_WRITE_FILE => {
1270 let result = {
1271 let mut db = db.write().await;
1272 handlers::handle_atomic_write_file(&mut db, &req.service_request, &mut ack_buf)
1273 };
1274 match result {
1275 Ok(()) => complex_ack(ack_buf),
1276 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1277 }
1278 }
1279 s if s == ConfirmedServiceChoice::ADD_LIST_ELEMENT => {
1280 let mut db = db.write().await;
1281 match handlers::handle_add_list_element(&mut db, &req.service_request) {
1282 Ok(()) => simple_ack(),
1283 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1284 }
1285 }
1286 s if s == ConfirmedServiceChoice::REMOVE_LIST_ELEMENT => {
1287 let mut db = db.write().await;
1288 match handlers::handle_remove_list_element(&mut db, &req.service_request) {
1289 Ok(()) => simple_ack(),
1290 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1291 }
1292 }
1293 s if s == ConfirmedServiceChoice::GET_ALARM_SUMMARY => {
1294 let mut buf = BytesMut::new();
1295 let db = db.read().await;
1296 match handlers::handle_get_alarm_summary(&db, &mut buf) {
1297 Ok(()) => complex_ack(buf),
1298 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1299 }
1300 }
1301 s if s == ConfirmedServiceChoice::GET_ENROLLMENT_SUMMARY => {
1302 let mut buf = BytesMut::new();
1303 let db = db.read().await;
1304 match handlers::handle_get_enrollment_summary(&db, &req.service_request, &mut buf) {
1305 Ok(()) => complex_ack(buf),
1306 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1307 }
1308 }
1309 s if s == ConfirmedServiceChoice::CONFIRMED_TEXT_MESSAGE => {
1310 match handlers::handle_text_message(&req.service_request) {
1311 Ok(_msg) => simple_ack(),
1312 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1313 }
1314 }
1315 s if s == ConfirmedServiceChoice::LIFE_SAFETY_OPERATION => {
1316 match handlers::handle_life_safety_operation(&req.service_request) {
1317 Ok(()) => simple_ack(),
1318 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1319 }
1320 }
1321 s if s == ConfirmedServiceChoice::SUBSCRIBE_COV_PROPERTY_MULTIPLE => {
1322 let db = db.read().await;
1323 let mut table = cov_table.write().await;
1324 match handlers::handle_subscribe_cov_property_multiple(
1325 &mut table,
1326 &db,
1327 source_mac,
1328 &req.service_request,
1329 ) {
1330 Ok(()) => simple_ack(),
1331 Err(e) => Self::error_apdu_from_error(invoke_id, service_choice, &e),
1332 }
1333 }
1334 _ => {
1335 debug!(
1336 service = service_choice.to_raw(),
1337 "Unsupported confirmed service"
1338 );
1339 Apdu::Reject(RejectPdu {
1340 invoke_id,
1341 reject_reason: RejectReason::UNRECOGNIZED_SERVICE,
1342 })
1343 }
1344 };
1345
1346 if let Apdu::ComplexAck(ref ack) = response {
1347 let mut full_buf = BytesMut::new();
1348 encode_apdu(&mut full_buf, &response);
1349
1350 if full_buf.len() > client_max_apdu as usize {
1351 if !client_accepts_segmented {
1352 let abort = Apdu::Abort(AbortPdu {
1353 sent_by_server: true,
1354 invoke_id,
1355 abort_reason: AbortReason::SEGMENTATION_NOT_SUPPORTED,
1356 });
1357 let mut buf = BytesMut::new();
1358 encode_apdu(&mut buf, &abort);
1359 if let Err(e) = network
1360 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1361 .await
1362 {
1363 warn!(error = %e, "Failed to send Abort for segmentation-not-supported");
1364 }
1365 } else {
1366 let network = Arc::clone(network);
1367 let seg_ack_senders = Arc::clone(seg_ack_senders);
1368 let source_mac = MacAddr::from_slice(source_mac);
1369 let service_ack_data = ack.service_ack.clone();
1370 tokio::spawn(async move {
1371 Self::send_segmented_complex_ack(
1372 &network,
1373 &seg_ack_senders,
1374 &source_mac,
1375 invoke_id,
1376 service_choice,
1377 &service_ack_data,
1378 client_max_apdu,
1379 )
1380 .await;
1381 });
1382 }
1383
1384 for oid in &written_oids {
1385 Self::fire_event_notifications(db, network, comm_state, server_tsm, oid).await;
1386 }
1387 for oid in &written_oids {
1388 Self::fire_cov_notifications(
1389 db,
1390 network,
1391 cov_table,
1392 cov_in_flight,
1393 server_tsm,
1394 comm_state,
1395 config,
1396 oid,
1397 )
1398 .await;
1399 }
1400 return;
1401 }
1402 }
1403
1404 let mut buf = BytesMut::new();
1405 encode_apdu(&mut buf, &response);
1406
1407 if let Some(tx) = reply_tx {
1408 use bacnet_encoding::npdu::{encode_npdu, Npdu};
1409 let apdu_bytes = buf.freeze();
1410 let npdu = Npdu {
1411 is_network_message: false,
1412 expecting_reply: false,
1413 priority: NetworkPriority::NORMAL,
1414 destination: None,
1415 source: None,
1416 payload: apdu_bytes.clone(),
1417 ..Npdu::default()
1418 };
1419 let mut npdu_buf = BytesMut::with_capacity(2 + apdu_bytes.len());
1420 match encode_npdu(&mut npdu_buf, &npdu) {
1421 Ok(()) => {
1422 let _ = tx.send(npdu_buf.freeze());
1423 }
1424 Err(e) => {
1425 warn!(error = %e, "Failed to encode NPDU for MS/TP reply");
1426 if let Err(e) = network
1427 .send_apdu(&apdu_bytes, source_mac, false, NetworkPriority::NORMAL)
1428 .await
1429 {
1430 warn!(error = %e, "Failed to send response");
1431 }
1432 }
1433 }
1434 } else if let Err(e) = network
1435 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1436 .await
1437 {
1438 warn!(error = %e, "Failed to send response");
1439 }
1440
1441 for oid in &written_oids {
1442 Self::fire_event_notifications(db, network, comm_state, server_tsm, oid).await;
1443 }
1444
1445 for oid in &written_oids {
1446 Self::fire_cov_notifications(
1447 db,
1448 network,
1449 cov_table,
1450 cov_in_flight,
1451 server_tsm,
1452 comm_state,
1453 config,
1454 oid,
1455 )
1456 .await;
1457 }
1458 }
1459
1460 async fn send_segmented_complex_ack(
1466 network: &Arc<NetworkLayer<T>>,
1467 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
1468 source_mac: &[u8],
1469 invoke_id: u8,
1470 service_choice: ConfirmedServiceChoice,
1471 service_ack_data: &[u8],
1472 client_max_apdu: u16,
1473 ) {
1474 let max_seg_size = max_segment_payload(client_max_apdu, SegmentedPduType::ComplexAck);
1475 let segments = split_payload(service_ack_data, max_seg_size);
1476 let total_segments = segments.len();
1477
1478 if total_segments > 255 {
1479 warn!(
1480 total_segments,
1481 "Response requires too many segments, aborting"
1482 );
1483 let abort = Apdu::Abort(AbortPdu {
1484 sent_by_server: true,
1485 invoke_id,
1486 abort_reason: AbortReason::BUFFER_OVERFLOW,
1487 });
1488 let mut buf = BytesMut::new();
1489 encode_apdu(&mut buf, &abort);
1490 let _ = network
1491 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1492 .await;
1493 return;
1494 }
1495
1496 debug!(
1497 total_segments,
1498 max_seg_size,
1499 payload_len = service_ack_data.len(),
1500 "Starting segmented ComplexAck send"
1501 );
1502
1503 let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
1504 let key = (MacAddr::from_slice(source_mac), invoke_id);
1505 {
1506 seg_ack_senders.lock().await.insert(key.clone(), seg_ack_tx);
1507 }
1508
1509 let seg_timeout = Duration::from_secs(5);
1510 let mut seg_idx: usize = 0;
1511 let mut neg_ack_retries: u8 = 0;
1512
1513 while seg_idx < total_segments {
1514 let is_last = seg_idx == total_segments - 1;
1515
1516 let pdu = Apdu::ComplexAck(ComplexAck {
1517 segmented: true,
1518 more_follows: !is_last,
1519 invoke_id,
1520 sequence_number: Some(seg_idx as u8),
1521 proposed_window_size: Some(1),
1522 service_choice,
1523 service_ack: segments[seg_idx].clone(),
1524 });
1525
1526 let mut buf = BytesMut::with_capacity(client_max_apdu as usize);
1527 encode_apdu(&mut buf, &pdu);
1528
1529 if let Err(e) = network
1530 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
1531 .await
1532 {
1533 warn!(error = %e, seq = seg_idx, "Failed to send segment");
1534 break;
1535 }
1536
1537 debug!(seq = seg_idx, is_last, "Sent ComplexAck segment");
1538
1539 if !is_last {
1540 match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1541 Ok(Some(ack)) => {
1542 debug!(
1543 seq = ack.sequence_number,
1544 negative = ack.negative_ack,
1545 "Received SegmentAck for ComplexAck"
1546 );
1547 if ack.negative_ack {
1548 neg_ack_retries += 1;
1549 if neg_ack_retries > MAX_NEG_SEGMENT_ACK_RETRIES {
1550 warn!(
1551 invoke_id,
1552 retries = neg_ack_retries,
1553 "Too many negative SegmentAck retries, aborting segmented send"
1554 );
1555 let abort = Apdu::Abort(AbortPdu {
1556 sent_by_server: true,
1557 invoke_id,
1558 abort_reason: AbortReason::TSM_TIMEOUT,
1559 });
1560 let mut abort_buf = BytesMut::new();
1561 encode_apdu(&mut abort_buf, &abort);
1562 let _ = network
1563 .send_apdu(
1564 &abort_buf,
1565 source_mac,
1566 false,
1567 NetworkPriority::NORMAL,
1568 )
1569 .await;
1570 break;
1571 }
1572 let requested = ack.sequence_number as usize;
1573 if requested >= total_segments {
1574 tracing::warn!(
1575 seq = requested,
1576 total = total_segments,
1577 "negative SegmentAck requests out-of-range sequence, aborting"
1578 );
1579 break;
1580 }
1581 debug!(
1582 seq = ack.sequence_number,
1583 "Negative SegmentAck — retransmitting from requested sequence"
1584 );
1585 seg_idx = requested;
1586 continue;
1587 }
1588 }
1589 Ok(None) => {
1590 warn!("SegmentAck channel closed during segmented send");
1591 break;
1592 }
1593 Err(_) => {
1594 warn!(
1595 seq = seg_idx,
1596 "Timeout waiting for SegmentAck, aborting segmented send"
1597 );
1598 let abort = Apdu::Abort(AbortPdu {
1599 sent_by_server: true,
1600 invoke_id,
1601 abort_reason: AbortReason::TSM_TIMEOUT,
1602 });
1603 let mut abort_buf = BytesMut::new();
1604 encode_apdu(&mut abort_buf, &abort);
1605 let _ = network
1606 .send_apdu(&abort_buf, source_mac, false, NetworkPriority::NORMAL)
1607 .await;
1608 break;
1609 }
1610 }
1611 }
1612
1613 seg_idx += 1;
1614 }
1615
1616 match tokio::time::timeout(seg_timeout, seg_ack_rx.recv()).await {
1617 Ok(Some(_ack)) => {
1618 debug!("Received final SegmentAck for ComplexAck");
1619 }
1620 _ => {
1621 warn!("No final SegmentAck received for ComplexAck");
1622 }
1623 }
1624
1625 seg_ack_senders.lock().await.remove(&key);
1626 }
1627
1628 async fn handle_unconfirmed_request(
1630 db: &Arc<RwLock<ObjectDatabase>>,
1631 network: &Arc<NetworkLayer<T>>,
1632 config: &ServerConfig,
1633 comm_state: &Arc<AtomicU8>,
1634 req: UnconfirmedRequestPdu,
1635 received: &bacnet_network::layer::ReceivedApdu,
1636 ) {
1637 let comm = comm_state.load(Ordering::Acquire);
1638 if comm == 1 {
1639 tracing::debug!("Dropping unconfirmed service: DCC is DISABLE");
1640 return;
1641 }
1642
1643 if req.service_choice == UnconfirmedServiceChoice::WHO_IS {
1644 let who_is = match WhoIsRequest::decode(&req.service_request) {
1645 Ok(r) => r,
1646 Err(e) => {
1647 warn!(error = %e, "Failed to decode WhoIs");
1648 return;
1649 }
1650 };
1651
1652 let db = db.read().await;
1653 let device_oid = db
1654 .list_objects()
1655 .into_iter()
1656 .find(|oid| oid.object_type() == ObjectType::DEVICE);
1657
1658 if let Some(device_oid) = device_oid {
1659 let instance = device_oid.instance_number();
1660
1661 let in_range = match (who_is.low_limit, who_is.high_limit) {
1662 (Some(low), Some(high)) => instance >= low && instance <= high,
1663 _ => true,
1664 };
1665
1666 if in_range {
1667 let i_am = IAmRequest {
1668 object_identifier: device_oid,
1669 max_apdu_length: config.max_apdu_length,
1670 segmentation_supported: config.segmentation_supported,
1671 vendor_id: config.vendor_id,
1672 };
1673
1674 let mut service_buf = BytesMut::new();
1675 i_am.encode(&mut service_buf);
1676
1677 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1678 service_choice: UnconfirmedServiceChoice::I_AM,
1679 service_request: Bytes::from(service_buf.to_vec()),
1680 });
1681
1682 let mut buf = BytesMut::new();
1683 encode_apdu(&mut buf, &pdu);
1684
1685 if let Some(ref source_net) = received.source_network {
1686 if let Err(e) = network
1687 .send_apdu_routed(
1688 &buf,
1689 source_net.network,
1690 &source_net.mac_address,
1691 &received.source_mac,
1692 false,
1693 NetworkPriority::NORMAL,
1694 )
1695 .await
1696 {
1697 warn!(error = %e, "Failed to route IAm back to remote requester");
1698 }
1699 } else if let Err(e) = network
1700 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1701 .await
1702 {
1703 warn!(error = %e, "Failed to send IAm broadcast");
1704 }
1705 }
1706 }
1707 } else if req.service_choice == UnconfirmedServiceChoice::WHO_HAS {
1708 let db = db.read().await;
1709 let device_oid = db
1710 .list_objects()
1711 .into_iter()
1712 .find(|oid| oid.object_type() == ObjectType::DEVICE);
1713
1714 if let Some(device_oid) = device_oid {
1715 match handlers::handle_who_has(&db, &req.service_request, device_oid) {
1716 Ok(Some(i_have)) => {
1717 let mut service_buf = BytesMut::new();
1718 if let Err(e) = i_have.encode(&mut service_buf) {
1719 warn!(error = %e, "Failed to encode IHave");
1720 } else {
1721 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1722 service_choice: UnconfirmedServiceChoice::I_HAVE,
1723 service_request: Bytes::from(service_buf.to_vec()),
1724 });
1725
1726 let mut buf = BytesMut::new();
1727 encode_apdu(&mut buf, &pdu);
1728
1729 if let Err(e) = network
1730 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1731 .await
1732 {
1733 warn!(error = %e, "Failed to send IHave broadcast");
1734 }
1735 }
1736 }
1737 Ok(None) => {}
1738 Err(e) => {
1739 warn!(error = %e, "Failed to decode WhoHas");
1740 }
1741 }
1742 }
1743 } else if req.service_choice == UnconfirmedServiceChoice::TIME_SYNCHRONIZATION
1744 || req.service_choice == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION
1745 {
1746 debug!("Received time synchronization request");
1747 if let Some(ref callback) = config.on_time_sync {
1748 let data = TimeSyncData {
1749 raw_service_data: req.service_request.clone(),
1750 is_utc: req.service_choice
1751 == UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1752 };
1753 callback(data);
1754 }
1755 } else if req.service_choice == UnconfirmedServiceChoice::WRITE_GROUP {
1756 match handlers::handle_write_group(&req.service_request) {
1757 Ok(write_group) => {
1758 debug!(
1759 group = write_group.group_number,
1760 priority = write_group.write_priority,
1761 values = write_group.change_list.len(),
1762 "WriteGroup received"
1763 );
1764 }
1765 Err(e) => {
1766 debug!(error = %e, "WriteGroup decode failed");
1767 }
1768 }
1769 } else if req.service_choice == UnconfirmedServiceChoice::UNCONFIRMED_TEXT_MESSAGE {
1770 match handlers::handle_text_message(&req.service_request) {
1771 Ok(msg) => {
1772 debug!(
1773 source = ?msg.source_device,
1774 priority = ?msg.message_priority,
1775 "UnconfirmedTextMessage: {}",
1776 msg.message
1777 );
1778 }
1779 Err(e) => {
1780 debug!(error = %e, "UnconfirmedTextMessage decode failed");
1781 }
1782 }
1783 } else {
1784 debug!(
1785 service = req.service_choice.to_raw(),
1786 "Ignoring unsupported unconfirmed service"
1787 );
1788 }
1789 }
1790
1791 async fn fire_event_notifications(
1795 db: &Arc<RwLock<ObjectDatabase>>,
1796 network: &Arc<NetworkLayer<T>>,
1797 comm_state: &Arc<AtomicU8>,
1798 server_tsm: &Arc<Mutex<ServerTsm>>,
1799 oid: &ObjectIdentifier,
1800 ) {
1801 if comm_state.load(Ordering::Acquire) >= 1 {
1802 return;
1803 }
1804
1805 let now = std::time::SystemTime::now()
1806 .duration_since(std::time::UNIX_EPOCH)
1807 .unwrap_or_default();
1808 let total_secs = now.as_secs();
1809 let dow = ((total_secs / 86400 + 3) % 7) as u8;
1810 let today_bit = 1u8 << dow;
1811 let day_secs = (total_secs % 86400) as u32;
1812 let current_time = Time {
1813 hour: (day_secs / 3600) as u8,
1814 minute: ((day_secs % 3600) / 60) as u8,
1815 second: (day_secs % 60) as u8,
1816 hundredths: (now.subsec_millis() / 10) as u8,
1817 };
1818
1819 let (notification, recipients) = {
1820 let mut db = db.write().await;
1821
1822 let device_oid = db
1823 .list_objects()
1824 .into_iter()
1825 .find(|o| o.object_type() == ObjectType::DEVICE)
1826 .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
1827
1828 let object = match db.get_mut(oid) {
1829 Some(o) => o,
1830 None => return,
1831 };
1832
1833 let change = match object.evaluate_intrinsic_reporting() {
1834 Some(c) => c,
1835 None => return,
1836 };
1837
1838 let notification_class = object
1839 .read_property(PropertyIdentifier::NOTIFICATION_CLASS, None)
1840 .ok()
1841 .and_then(|v| match v {
1842 PropertyValue::Unsigned(n) => Some(n as u32),
1843 _ => None,
1844 })
1845 .unwrap_or(0);
1846
1847 let notify_type = object
1848 .read_property(PropertyIdentifier::NOTIFY_TYPE, None)
1849 .ok()
1850 .and_then(|v| match v {
1851 PropertyValue::Enumerated(n) => Some(n),
1852 _ => None,
1853 })
1854 .unwrap_or(NotifyType::ALARM.to_raw());
1855
1856 let priority = if change.to == bacnet_types::enums::EventState::NORMAL {
1857 200u8
1858 } else {
1859 100u8
1860 };
1861
1862 let transition = change.transition();
1863
1864 let base_notification = EventNotificationRequest {
1865 process_identifier: 0,
1866 initiating_device_identifier: device_oid,
1867 event_object_identifier: *oid,
1868 timestamp: BACnetTimeStamp::SequenceNumber(total_secs),
1869 notification_class,
1870 priority,
1871 event_type: change.event_type().to_raw(),
1872 message_text: None,
1873 notify_type,
1874 ack_required: notify_type == NotifyType::ALARM.to_raw(),
1875 from_state: change.from.to_raw(),
1876 to_state: change.to.to_raw(),
1877 event_values: None,
1878 };
1879
1880 let recipients = get_notification_recipients(
1881 &db,
1882 notification_class,
1883 transition,
1884 today_bit,
1885 ¤t_time,
1886 );
1887
1888 (base_notification, recipients)
1889 };
1890
1891 if recipients.is_empty() {
1892 let mut service_buf = BytesMut::new();
1893 if let Err(e) = notification.encode(&mut service_buf) {
1894 warn!(error = %e, "Failed to encode EventNotification");
1895 return;
1896 }
1897
1898 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1899 service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1900 service_request: Bytes::from(service_buf.to_vec()),
1901 });
1902
1903 let mut buf = BytesMut::new();
1904 encode_apdu(&mut buf, &pdu);
1905
1906 if let Err(e) = network
1907 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1908 .await
1909 {
1910 warn!(error = %e, "Failed to broadcast EventNotification");
1911 }
1912 } else {
1913 for (recipient, process_id, confirmed) in &recipients {
1914 let mut targeted = notification.clone();
1915 targeted.process_identifier = *process_id;
1916
1917 let mut service_buf = BytesMut::new();
1918 if let Err(e) = targeted.encode(&mut service_buf) {
1919 warn!(error = %e, "Failed to encode EventNotification");
1920 continue;
1921 }
1922
1923 let service_bytes = Bytes::from(service_buf.to_vec());
1924
1925 if *confirmed {
1926 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1927 segmented: false,
1928 more_follows: false,
1929 segmented_response_accepted: false,
1930 max_segments: None,
1931 max_apdu_length: 1476,
1932 invoke_id: server_tsm.lock().await.allocate().0,
1933 sequence_number: None,
1934 proposed_window_size: None,
1935 service_choice: ConfirmedServiceChoice::CONFIRMED_EVENT_NOTIFICATION,
1936 service_request: service_bytes,
1937 });
1938
1939 let mut buf = BytesMut::new();
1940 encode_apdu(&mut buf, &pdu);
1941
1942 match recipient {
1943 bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1944 if let Err(e) = network
1945 .send_apdu(&buf, &addr.mac_address, true, NetworkPriority::NORMAL)
1946 .await
1947 {
1948 warn!(error = %e, "Failed to send confirmed EventNotification");
1949 }
1950 }
1951 bacnet_types::constructed::BACnetRecipient::Device(_) => {
1952 if let Err(e) = network
1953 .broadcast_apdu(&buf, true, NetworkPriority::NORMAL)
1954 .await
1955 {
1956 warn!(error = %e, "Failed to broadcast confirmed EventNotification");
1957 }
1958 }
1959 }
1960 } else {
1961 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
1962 service_choice: UnconfirmedServiceChoice::UNCONFIRMED_EVENT_NOTIFICATION,
1963 service_request: service_bytes,
1964 });
1965
1966 let mut buf = BytesMut::new();
1967 encode_apdu(&mut buf, &pdu);
1968
1969 match recipient {
1970 bacnet_types::constructed::BACnetRecipient::Address(addr) => {
1971 if let Err(e) = network
1972 .send_apdu(&buf, &addr.mac_address, false, NetworkPriority::NORMAL)
1973 .await
1974 {
1975 warn!(
1976 error = %e,
1977 "Failed to send unconfirmed EventNotification"
1978 );
1979 }
1980 }
1981 bacnet_types::constructed::BACnetRecipient::Device(_) => {
1982 if let Err(e) = network
1983 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1984 .await
1985 {
1986 warn!(
1987 error = %e,
1988 "Failed to broadcast unconfirmed EventNotification"
1989 );
1990 }
1991 }
1992 }
1993 }
1994 }
1995 }
1996 }
1997
1998 #[allow(clippy::too_many_arguments)]
2001 async fn fire_cov_notifications(
2002 db: &Arc<RwLock<ObjectDatabase>>,
2003 network: &Arc<NetworkLayer<T>>,
2004 cov_table: &Arc<RwLock<CovSubscriptionTable>>,
2005 cov_in_flight: &Arc<Semaphore>,
2006 server_tsm: &Arc<Mutex<ServerTsm>>,
2007 comm_state: &Arc<AtomicU8>,
2008 config: &ServerConfig,
2009 oid: &ObjectIdentifier,
2010 ) {
2011 if comm_state.load(Ordering::Acquire) >= 1 {
2012 return;
2013 }
2014 let subs: Vec<crate::cov::CovSubscription> = {
2015 let mut table = cov_table.write().await;
2016 table.subscriptions_for(oid).into_iter().cloned().collect()
2017 };
2018
2019 if subs.is_empty() {
2020 return;
2021 }
2022
2023 let (device_oid, values, current_pv, cov_increment) = {
2024 let db = db.read().await;
2025 let object = match db.get(oid) {
2026 Some(o) => o,
2027 None => return,
2028 };
2029
2030 let cov_increment = object.cov_increment();
2031
2032 let mut current_pv: Option<f32> = None;
2033 let mut values = Vec::new();
2034 if let Ok(pv) = object.read_property(PropertyIdentifier::PRESENT_VALUE, None) {
2035 if let PropertyValue::Real(v) = &pv {
2036 current_pv = Some(*v);
2037 }
2038 let mut buf = BytesMut::new();
2039 if encode_property_value(&mut buf, &pv).is_ok() {
2040 values.push(BACnetPropertyValue {
2041 property_identifier: PropertyIdentifier::PRESENT_VALUE,
2042 property_array_index: None,
2043 value: buf.to_vec(),
2044 priority: None,
2045 });
2046 }
2047 }
2048 if let Ok(sf) = object.read_property(PropertyIdentifier::STATUS_FLAGS, None) {
2049 let mut buf = BytesMut::new();
2050 if encode_property_value(&mut buf, &sf).is_ok() {
2051 values.push(BACnetPropertyValue {
2052 property_identifier: PropertyIdentifier::STATUS_FLAGS,
2053 property_array_index: None,
2054 value: buf.to_vec(),
2055 priority: None,
2056 });
2057 }
2058 }
2059
2060 let device_oid = db
2061 .list_objects()
2062 .into_iter()
2063 .find(|o| o.object_type() == ObjectType::DEVICE)
2064 .unwrap_or_else(|| ObjectIdentifier::new(ObjectType::DEVICE, 0).unwrap());
2065
2066 (device_oid, values, current_pv, cov_increment)
2067 };
2068
2069 if values.is_empty() {
2070 return;
2071 }
2072
2073 for sub in &subs {
2074 if !CovSubscriptionTable::should_notify(sub, current_pv, cov_increment) {
2075 continue;
2076 }
2077 let time_remaining = sub.expires_at.map_or(0, |exp| {
2078 exp.saturating_duration_since(Instant::now()).as_secs() as u32
2079 });
2080
2081 let notification_values = if let Some(prop) = sub.monitored_property {
2082 let db = db.read().await;
2083 if let Some(object) = db.get(oid) {
2084 if let Ok(pv) = object.read_property(prop, sub.monitored_property_array_index) {
2085 let mut buf = BytesMut::new();
2086 if encode_property_value(&mut buf, &pv).is_ok() {
2087 vec![BACnetPropertyValue {
2088 property_identifier: prop,
2089 property_array_index: sub.monitored_property_array_index,
2090 value: buf.to_vec(),
2091 priority: None,
2092 }]
2093 } else {
2094 values.clone()
2095 }
2096 } else {
2097 values.clone()
2098 }
2099 } else {
2100 values.clone()
2101 }
2102 } else {
2103 values.clone()
2104 };
2105
2106 let notification = COVNotificationRequest {
2107 subscriber_process_identifier: sub.subscriber_process_identifier,
2108 initiating_device_identifier: device_oid,
2109 monitored_object_identifier: *oid,
2110 time_remaining,
2111 list_of_values: notification_values,
2112 };
2113
2114 let mut service_buf = BytesMut::new();
2115 notification.encode(&mut service_buf);
2116
2117 if sub.issue_confirmed_notifications {
2118 let permit = match cov_in_flight.clone().try_acquire_owned() {
2119 Ok(permit) => permit,
2120 Err(_) => {
2121 warn!(
2122 object = ?oid,
2123 "255 confirmed COV notifications in-flight, skipping notification"
2124 );
2125 continue;
2126 }
2127 };
2128
2129 let (id, result_rx) = {
2130 let mut tsm = server_tsm.lock().await;
2131 tsm.allocate()
2132 };
2133
2134 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
2135 segmented: false,
2136 more_follows: false,
2137 segmented_response_accepted: false,
2138 max_segments: None,
2139 max_apdu_length: config.max_apdu_length as u16,
2140 invoke_id: id,
2141 sequence_number: None,
2142 proposed_window_size: None,
2143 service_choice: ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION,
2144 service_request: Bytes::from(service_buf.to_vec()),
2145 });
2146
2147 let mut buf = BytesMut::new();
2148 encode_apdu(&mut buf, &pdu);
2149
2150 if let Some(pv) = current_pv {
2151 let mut table = cov_table.write().await;
2152 table.set_last_notified_value(
2153 &sub.subscriber_mac,
2154 sub.subscriber_process_identifier,
2155 sub.monitored_object_identifier,
2156 sub.monitored_property,
2157 pv,
2158 );
2159 }
2160
2161 let network = Arc::clone(network);
2162 let mac = sub.subscriber_mac.clone();
2163 let apdu_timeout = Duration::from_millis(config.cov_retry_timeout_ms);
2164 let tsm = Arc::clone(server_tsm);
2165 let apdu_retries = DEFAULT_APDU_RETRIES;
2166 tokio::spawn(async move {
2167 let _permit = permit;
2168 let mut pending_rx: Option<oneshot::Receiver<CovAckResult>> = Some(result_rx);
2169
2170 for attempt in 0..=apdu_retries {
2171 if let Err(e) = network
2172 .send_apdu(&buf, &mac, true, NetworkPriority::NORMAL)
2173 .await
2174 {
2175 warn!(error = %e, attempt, "COV notification send failed");
2176 } else {
2177 debug!(invoke_id = id, attempt, "Confirmed COV notification sent");
2178 }
2179
2180 let rx = pending_rx
2181 .take()
2182 .expect("receiver always set for each attempt");
2183 let result = match tokio::time::timeout(apdu_timeout, rx).await {
2184 Ok(Ok(r)) => Ok(r),
2185 Ok(Err(_)) => Err(()), Err(_) => Err(()), };
2188
2189 if result.is_err() && attempt < apdu_retries {
2190 let (tx, new_rx) = oneshot::channel();
2191 tsm.lock().await.pending.insert(id, tx);
2192 pending_rx = Some(new_rx);
2193 }
2194
2195 match result {
2196 Ok(CovAckResult::Ack) => {
2197 debug!(invoke_id = id, "COV notification acknowledged");
2198 return;
2199 }
2200 Ok(CovAckResult::Error) => {
2201 warn!(invoke_id = id, "COV notification rejected by subscriber");
2202 return;
2203 }
2204 Err(_) => {
2205 if attempt < apdu_retries {
2206 debug!(
2207 invoke_id = id,
2208 attempt, "COV notification timeout, retrying"
2209 );
2210 } else {
2211 warn!(
2212 invoke_id = id,
2213 "COV notification failed after {} retries", apdu_retries
2214 );
2215 }
2216 }
2217 }
2218 }
2219
2220 let mut tsm = tsm.lock().await;
2221 tsm.remove(id);
2222 });
2223 } else {
2224 let pdu = Apdu::UnconfirmedRequest(UnconfirmedRequestPdu {
2225 service_choice: UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION,
2226 service_request: Bytes::from(service_buf.to_vec()),
2227 });
2228
2229 let mut buf = BytesMut::new();
2230 encode_apdu(&mut buf, &pdu);
2231
2232 if let Err(e) = network
2233 .send_apdu(&buf, &sub.subscriber_mac, false, NetworkPriority::NORMAL)
2234 .await
2235 {
2236 warn!(error = %e, "Failed to send COV notification");
2237 } else if let Some(pv) = current_pv {
2238 let mut table = cov_table.write().await;
2239 table.set_last_notified_value(
2240 &sub.subscriber_mac,
2241 sub.subscriber_process_identifier,
2242 sub.monitored_object_identifier,
2243 sub.monitored_property,
2244 pv,
2245 );
2246 }
2247 }
2248 }
2249 }
2250
2251 fn error_apdu_from_error(
2253 invoke_id: u8,
2254 service_choice: ConfirmedServiceChoice,
2255 error: &Error,
2256 ) -> Apdu {
2257 let (class, code) = match error {
2258 Error::Protocol { class, code } => (*class, *code),
2259 _ => (
2260 ErrorClass::SERVICES.to_raw() as u32,
2261 ErrorCode::OTHER.to_raw() as u32,
2262 ),
2263 };
2264 Apdu::Error(ErrorPdu {
2265 invoke_id,
2266 service_choice,
2267 error_class: ErrorClass::from_raw(class as u16),
2268 error_code: ErrorCode::from_raw(code as u16),
2269 error_data: Bytes::new(),
2270 })
2271 }
2272}
2273
2274#[cfg(test)]
2275mod tests {
2276 use super::*;
2277
2278 #[test]
2279 fn server_config_cov_retry_timeout_default() {
2280 let config = ServerConfig::default();
2281 assert_eq!(config.cov_retry_timeout_ms, 3000);
2282 }
2283
2284 #[test]
2285 fn server_config_time_sync_callback_default_is_none() {
2286 let config = ServerConfig::default();
2287 assert!(config.on_time_sync.is_none());
2288 }
2289
2290 #[test]
2295 fn server_tsm_allocate_increments() {
2296 let mut tsm = ServerTsm::new();
2297 assert_eq!(tsm.allocate().0, 0);
2298 assert_eq!(tsm.allocate().0, 1);
2299 assert_eq!(tsm.allocate().0, 2);
2300 }
2301
2302 #[test]
2303 fn server_tsm_allocate_wraps_at_255() {
2304 let mut tsm = ServerTsm::new();
2305 tsm.next_invoke_id = 255;
2306 assert_eq!(tsm.allocate().0, 255);
2307 assert_eq!(tsm.allocate().0, 0); }
2309
2310 #[test]
2311 fn server_tsm_record_and_take_ack() {
2312 let mut tsm = ServerTsm::new();
2313 let (_id, rx) = tsm.allocate();
2314 tsm.record_result(_id, CovAckResult::Ack);
2315 assert_eq!(rx.blocking_recv(), Ok(CovAckResult::Ack));
2317 }
2318
2319 #[test]
2320 fn server_tsm_record_and_take_error() {
2321 let mut tsm = ServerTsm::new();
2322 let (id, rx) = tsm.allocate();
2323 tsm.record_result(id, CovAckResult::Error);
2324 assert_eq!(rx.blocking_recv(), Ok(CovAckResult::Error));
2326 }
2327
2328 #[test]
2329 fn server_tsm_record_nonexistent_is_noop() {
2330 let mut tsm = ServerTsm::new();
2331 tsm.record_result(99, CovAckResult::Ack);
2333 assert!(tsm.pending.is_empty());
2334 }
2335
2336 #[test]
2337 fn server_tsm_remove_cleans_up() {
2338 let mut tsm = ServerTsm::new();
2339 let (id, _rx) = tsm.allocate();
2340 tsm.remove(id);
2341 assert!(!tsm.pending.contains_key(&id));
2342 }
2343
2344 #[test]
2345 fn server_tsm_multiple_pending() {
2346 let mut tsm = ServerTsm::new();
2347 let (id1, rx1) = tsm.allocate();
2348 let (id2, rx2) = tsm.allocate();
2349 let (id3, rx3) = tsm.allocate();
2350
2351 tsm.record_result(id2, CovAckResult::Error);
2352 tsm.record_result(id1, CovAckResult::Ack);
2353 tsm.record_result(id3, CovAckResult::Ack);
2354
2355 assert_eq!(rx2.blocking_recv(), Ok(CovAckResult::Error));
2356 assert_eq!(rx1.blocking_recv(), Ok(CovAckResult::Ack));
2357 assert_eq!(rx3.blocking_recv(), Ok(CovAckResult::Ack));
2358 }
2359
2360 #[test]
2361 fn cov_ack_result_debug_and_eq() {
2362 assert_eq!(CovAckResult::Ack, CovAckResult::Ack);
2364 assert_ne!(CovAckResult::Ack, CovAckResult::Error);
2365 let _debug = format!("{:?}", CovAckResult::Ack);
2366 }
2367
2368 #[test]
2369 fn default_apdu_retries_constant() {
2370 assert_eq!(DEFAULT_APDU_RETRIES, 3);
2371 }
2372
2373 #[test]
2374 fn seg_receiver_timeout_is_4s() {
2375 assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2376 }
2377
2378 #[test]
2379 fn max_neg_segment_ack_retries_constant() {
2380 assert_eq!(MAX_NEG_SEGMENT_ACK_RETRIES, 3);
2381 }
2382}