1use std::collections::HashMap;
7use std::net::Ipv4Addr;
8#[cfg(feature = "ipv6")]
9use std::net::Ipv6Addr;
10use std::sync::Arc;
11use std::time::Instant;
12
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{broadcast, mpsc, Mutex};
15use tokio::task::JoinHandle;
16use tokio::time::{timeout, Duration};
17use tracing::{debug, warn};
18
19use bacnet_encoding::apdu::{
20 self, encode_apdu, AbortPdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu,
21 SegmentAck as SegmentAckPdu, SimpleAck,
22};
23use bacnet_encoding::npdu::NpduAddress;
24use bacnet_network::layer::NetworkLayer;
25use bacnet_services::cov::COVNotificationRequest;
26use bacnet_transport::bip::BipTransport;
27#[cfg(feature = "ipv6")]
28use bacnet_transport::bip6::Bip6Transport;
29use bacnet_transport::port::TransportPort;
30use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
31use bacnet_types::error::Error;
32use bacnet_types::MacAddr;
33
34use crate::discovery::{DeviceTable, DiscoveredDevice};
35use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
36use crate::tsm::{Tsm, TsmConfig, TsmResponse};
37
38#[derive(Debug, Clone)]
40pub struct ClientConfig {
41 pub interface: Ipv4Addr,
43 pub port: u16,
45 pub broadcast_address: Ipv4Addr,
47 pub apdu_timeout_ms: u64,
49 pub apdu_retries: u8,
51 pub max_apdu_length: u16,
53 pub max_segments: Option<u8>,
55 pub segmented_response_accepted: bool,
57 pub proposed_window_size: u8,
59}
60
61impl Default for ClientConfig {
62 fn default() -> Self {
63 Self {
64 interface: Ipv4Addr::UNSPECIFIED,
65 port: 0xBAC0,
66 broadcast_address: Ipv4Addr::BROADCAST,
67 apdu_timeout_ms: 6000,
68 apdu_retries: 3,
69 max_apdu_length: 1476,
70 max_segments: None,
71 segmented_response_accepted: true,
72 proposed_window_size: 1,
73 }
74 }
75}
76
77pub struct ClientBuilder<T: TransportPort> {
79 config: ClientConfig,
80 transport: Option<T>,
81}
82
83impl<T: TransportPort + 'static> ClientBuilder<T> {
84 pub fn transport(mut self, transport: T) -> Self {
86 self.transport = Some(transport);
87 self
88 }
89
90 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
92 self.config.apdu_timeout_ms = ms;
93 self
94 }
95
96 pub fn max_apdu_length(mut self, len: u16) -> Self {
98 self.config.max_apdu_length = len;
99 self
100 }
101
102 pub async fn build(self) -> Result<BACnetClient<T>, Error> {
104 let transport = self
105 .transport
106 .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
107 BACnetClient::start(self.config, transport).await
108 }
109}
110
111pub struct BipClientBuilder {
113 config: ClientConfig,
114}
115
116impl BipClientBuilder {
117 pub fn interface(mut self, ip: Ipv4Addr) -> Self {
119 self.config.interface = ip;
120 self
121 }
122
123 pub fn port(mut self, port: u16) -> Self {
125 self.config.port = port;
126 self
127 }
128
129 pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
131 self.config.broadcast_address = addr;
132 self
133 }
134
135 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
137 self.config.apdu_timeout_ms = ms;
138 self
139 }
140
141 pub fn max_apdu_length(mut self, len: u16) -> Self {
143 self.config.max_apdu_length = len;
144 self
145 }
146
147 pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
149 let transport = BipTransport::new(
150 self.config.interface,
151 self.config.port,
152 self.config.broadcast_address,
153 );
154 BACnetClient::start(self.config, transport).await
155 }
156}
157
158const DEFAULT_BATCH_CONCURRENCY: usize = 32;
164
165#[derive(Debug, Clone)]
167pub struct DeviceReadRequest {
168 pub device_instance: u32,
170 pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
172 pub property_identifier: bacnet_types::enums::PropertyIdentifier,
174 pub property_array_index: Option<u32>,
176}
177
178#[derive(Debug)]
180pub struct DeviceReadResult {
181 pub device_instance: u32,
183 pub result: Result<bacnet_services::read_property::ReadPropertyACK, Error>,
185}
186
187#[derive(Debug, Clone)]
189pub struct DeviceRpmRequest {
190 pub device_instance: u32,
192 pub specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
194}
195
196#[derive(Debug)]
198pub struct DeviceRpmResult {
199 pub device_instance: u32,
201 pub result: Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error>,
203}
204
205#[derive(Debug, Clone)]
207pub struct DeviceWriteRequest {
208 pub device_instance: u32,
210 pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
212 pub property_identifier: bacnet_types::enums::PropertyIdentifier,
214 pub property_array_index: Option<u32>,
216 pub property_value: Vec<u8>,
218 pub priority: Option<u8>,
220}
221
222#[derive(Debug)]
224pub struct DeviceWriteResult {
225 pub device_instance: u32,
227 pub result: Result<(), Error>,
229}
230
231struct SegmentedReceiveState {
233 receiver: SegmentReceiver,
234 expected_next_seq: u8,
236 last_activity: Instant,
238 window_position: u8,
240 proposed_window_size: u8,
242}
243
244const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
246
247type SegKey = (MacAddr, u8);
249
250pub struct BACnetClient<T: TransportPort> {
252 config: ClientConfig,
253 network: Arc<NetworkLayer<T>>,
254 tsm: Arc<Mutex<Tsm>>,
255 device_table: Arc<Mutex<DeviceTable>>,
256 cov_tx: broadcast::Sender<COVNotificationRequest>,
257 dispatch_task: Option<JoinHandle<()>>,
258 seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
259 local_mac: MacAddr,
260}
261
262impl BACnetClient<BipTransport> {
263 pub fn bip_builder() -> BipClientBuilder {
265 BipClientBuilder {
266 config: ClientConfig::default(),
267 }
268 }
269
270 pub fn builder() -> BipClientBuilder {
271 Self::bip_builder()
272 }
273
274 pub async fn read_bdt(
276 &self,
277 target: &[u8],
278 ) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
279 self.network.transport().read_bdt(target).await
280 }
281
282 pub async fn write_bdt(
284 &self,
285 target: &[u8],
286 entries: &[bacnet_transport::bbmd::BdtEntry],
287 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
288 self.network.transport().write_bdt(target, entries).await
289 }
290
291 pub async fn read_fdt(
293 &self,
294 target: &[u8],
295 ) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
296 self.network.transport().read_fdt(target).await
297 }
298
299 pub async fn delete_fdt_entry(
301 &self,
302 target: &[u8],
303 ip: [u8; 4],
304 port: u16,
305 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
306 self.network
307 .transport()
308 .delete_fdt_entry(target, ip, port)
309 .await
310 }
311
312 pub async fn register_foreign_device_bvlc(
314 &self,
315 target: &[u8],
316 ttl: u16,
317 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
318 self.network
319 .transport()
320 .register_foreign_device_bvlc(target, ttl)
321 .await
322 }
323}
324
325#[cfg(feature = "ipv6")]
326impl BACnetClient<Bip6Transport> {
327 pub fn bip6_builder() -> Bip6ClientBuilder {
329 Bip6ClientBuilder {
330 config: ClientConfig::default(),
331 interface: Ipv6Addr::UNSPECIFIED,
332 device_instance: None,
333 }
334 }
335}
336
337#[cfg(feature = "ipv6")]
339pub struct Bip6ClientBuilder {
340 config: ClientConfig,
341 interface: Ipv6Addr,
342 device_instance: Option<u32>,
343}
344
345#[cfg(feature = "ipv6")]
346impl Bip6ClientBuilder {
347 pub fn interface(mut self, ip: Ipv6Addr) -> Self {
349 self.interface = ip;
350 self
351 }
352
353 pub fn port(mut self, port: u16) -> Self {
355 self.config.port = port;
356 self
357 }
358
359 pub fn device_instance(mut self, instance: u32) -> Self {
361 self.device_instance = Some(instance);
362 self
363 }
364
365 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
367 self.config.apdu_timeout_ms = ms;
368 self
369 }
370
371 pub fn max_apdu_length(mut self, len: u16) -> Self {
373 self.config.max_apdu_length = len;
374 self
375 }
376
377 pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
379 let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
380 BACnetClient::start(self.config, transport).await
381 }
382}
383
384#[cfg(feature = "sc-tls")]
385impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
386 pub fn sc_builder() -> ScClientBuilder {
388 ScClientBuilder {
389 config: ClientConfig::default(),
390 hub_url: String::new(),
391 tls_config: None,
392 vmac: [0; 6],
393 heartbeat_interval_ms: 30_000,
394 heartbeat_timeout_ms: 60_000,
395 reconnect: None,
396 }
397 }
398}
399
400#[cfg(feature = "sc-tls")]
404pub struct ScClientBuilder {
405 config: ClientConfig,
406 hub_url: String,
407 tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
408 vmac: bacnet_transport::sc_frame::Vmac,
409 heartbeat_interval_ms: u64,
410 heartbeat_timeout_ms: u64,
411 reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
412}
413
414#[cfg(feature = "sc-tls")]
415impl ScClientBuilder {
416 pub fn hub_url(mut self, url: &str) -> Self {
418 self.hub_url = url.to_string();
419 self
420 }
421
422 pub fn tls_config(
424 mut self,
425 config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
426 ) -> Self {
427 self.tls_config = Some(config);
428 self
429 }
430
431 pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
433 self.vmac = vmac;
434 self
435 }
436
437 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
439 self.config.apdu_timeout_ms = ms;
440 self
441 }
442
443 pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
445 self.heartbeat_interval_ms = ms;
446 self
447 }
448
449 pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
451 self.heartbeat_timeout_ms = ms;
452 self
453 }
454
455 pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
457 self.reconnect = Some(config);
458 self
459 }
460
461 pub async fn build(
463 self,
464 ) -> Result<
465 BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
466 Error,
467 > {
468 let tls_config = self
469 .tls_config
470 .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
471
472 let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
473
474 let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
475 .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
476 .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
477 if let Some(rc) = self.reconnect {
478 #[allow(deprecated)]
479 {
480 transport = transport.with_reconnect(rc);
481 }
482 }
483
484 BACnetClient::start(self.config, transport).await
485 }
486}
487
488enum ConfirmedTarget<'a> {
490 Local {
491 mac: &'a [u8],
492 },
493 Routed {
494 router_mac: &'a [u8],
495 dest_network: u16,
496 dest_mac: &'a [u8],
497 },
498}
499
500impl<'a> ConfirmedTarget<'a> {
501 fn tsm_mac(&self) -> &[u8] {
503 match self {
504 Self::Local { mac } => mac,
505 Self::Routed { router_mac, .. } => router_mac,
506 }
507 }
508}
509
510impl<T: TransportPort + 'static> BACnetClient<T> {
511 pub fn generic_builder() -> ClientBuilder<T> {
513 ClientBuilder {
514 config: ClientConfig::default(),
515 transport: None,
516 }
517 }
518
519 pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
521 let transport_max = transport.max_apdu_length();
522 config.max_apdu_length = config.max_apdu_length.min(transport_max);
523
524 let mut network = NetworkLayer::new(transport);
525 let mut apdu_rx = network.start().await?;
526 let local_mac = MacAddr::from_slice(network.local_mac());
527
528 let network = Arc::new(network);
529
530 let tsm_config = TsmConfig {
531 apdu_timeout_ms: config.apdu_timeout_ms,
532 apdu_segment_timeout_ms: config.apdu_timeout_ms,
533 apdu_retries: config.apdu_retries,
534 };
535 let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
536 let tsm_dispatch = Arc::clone(&tsm);
537 let device_table = Arc::new(Mutex::new(DeviceTable::new()));
538 let device_table_dispatch = Arc::clone(&device_table);
539 let network_dispatch = Arc::clone(&network);
540 let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
541 let cov_tx_dispatch = cov_tx.clone();
542 let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
543 Arc::new(Mutex::new(HashMap::new()));
544 let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
545 let segmented_response_accepted = config.segmented_response_accepted;
546
547 let dispatch_task = tokio::spawn(async move {
548 let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
549 let mut last_device_purge = Instant::now();
550 const DEVICE_PURGE_INTERVAL: Duration = Duration::from_secs(300);
551 const DEVICE_MAX_AGE: Duration = Duration::from_secs(600);
552
553 while let Some(received) = apdu_rx.recv().await {
554 let now = Instant::now();
555
556 if now.duration_since(last_device_purge) >= DEVICE_PURGE_INTERVAL {
558 device_table_dispatch
559 .lock()
560 .await
561 .purge_stale(DEVICE_MAX_AGE);
562 last_device_purge = now;
563 }
564 let stale_keys: Vec<SegKey> = seg_state
566 .iter()
567 .filter(|(_, state)| {
568 now.duration_since(state.last_activity) >= SEG_RECEIVER_TIMEOUT
569 })
570 .map(|(key, _)| key.clone())
571 .collect();
572 for key in &stale_keys {
573 if let Some(_state) = seg_state.remove(key) {
574 let abort = Apdu::Abort(AbortPdu {
575 sent_by_server: false,
576 invoke_id: key.1,
577 abort_reason: bacnet_types::enums::AbortReason::TSM_TIMEOUT,
578 });
579 let mut buf = BytesMut::with_capacity(4);
580 encode_apdu(&mut buf, &abort);
581 let _ = network_dispatch
582 .send_apdu(&buf, &key.0, false, NetworkPriority::NORMAL)
583 .await;
584 }
585 }
586
587 match apdu::decode_apdu(received.apdu.clone()) {
588 Ok(decoded) => {
589 Self::dispatch_apdu(
590 &tsm_dispatch,
591 &device_table_dispatch,
592 &network_dispatch,
593 &cov_tx_dispatch,
594 &mut seg_state,
595 &seg_ack_senders_dispatch,
596 &received.source_mac,
597 &received.source_network,
598 decoded,
599 segmented_response_accepted,
600 )
601 .await;
602 }
603 Err(e) => {
604 warn!(error = %e, "Failed to decode received APDU");
605 }
606 }
607 }
608 });
609
610 Ok(Self {
611 config,
612 network,
613 tsm,
614 device_table,
615 cov_tx,
616 dispatch_task: Some(dispatch_task),
617 seg_ack_senders,
618 local_mac,
619 })
620 }
621
622 #[allow(clippy::too_many_arguments)]
624 async fn dispatch_apdu(
625 tsm: &Arc<Mutex<Tsm>>,
626 device_table: &Arc<Mutex<DeviceTable>>,
627 network: &Arc<NetworkLayer<T>>,
628 cov_tx: &broadcast::Sender<COVNotificationRequest>,
629 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
630 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
631 source_mac: &[u8],
632 source_network: &Option<NpduAddress>,
633 apdu: Apdu,
634 segmented_response_accepted: bool,
635 ) {
636 match apdu {
637 Apdu::SimpleAck(ack) => {
638 debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
639 let mut tsm = tsm.lock().await;
640 tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
641 }
642 Apdu::ComplexAck(ack) => {
643 if ack.segmented {
644 Self::handle_segmented_complex_ack(
645 tsm,
646 network,
647 seg_state,
648 source_mac,
649 ack,
650 segmented_response_accepted,
651 )
652 .await;
653 } else {
654 debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
655 let mut tsm = tsm.lock().await;
656 tsm.complete_transaction(
657 source_mac,
658 ack.invoke_id,
659 TsmResponse::ComplexAck {
660 service_data: ack.service_ack,
661 },
662 );
663 }
664 }
665 Apdu::Error(err) => {
666 debug!(invoke_id = err.invoke_id, "Received Error PDU");
667 let mut tsm = tsm.lock().await;
668 tsm.complete_transaction(
669 source_mac,
670 err.invoke_id,
671 TsmResponse::Error {
672 class: err.error_class.to_raw() as u32,
673 code: err.error_code.to_raw() as u32,
674 },
675 );
676 }
677 Apdu::Reject(rej) => {
678 debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
679 let mut tsm = tsm.lock().await;
680 tsm.complete_transaction(
681 source_mac,
682 rej.invoke_id,
683 TsmResponse::Reject {
684 reason: rej.reject_reason.to_raw(),
685 },
686 );
687 }
688 Apdu::Abort(abt) => {
689 debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
690 let mut tsm = tsm.lock().await;
691 tsm.complete_transaction(
692 source_mac,
693 abt.invoke_id,
694 TsmResponse::Abort {
695 reason: abt.abort_reason.to_raw(),
696 },
697 );
698 }
699 Apdu::ConfirmedRequest(req) => {
700 if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
701 match COVNotificationRequest::decode(&req.service_request) {
702 Ok(notification) => {
703 debug!(
704 object = ?notification.monitored_object_identifier,
705 "Received ConfirmedCOVNotification"
706 );
707 let _ = cov_tx.send(notification);
708
709 let ack = Apdu::SimpleAck(SimpleAck {
710 invoke_id: req.invoke_id,
711 service_choice: req.service_choice,
712 });
713 let mut buf = BytesMut::with_capacity(4);
714 encode_apdu(&mut buf, &ack);
715 if let Err(e) = network
716 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
717 .await
718 {
719 warn!(error = %e, "Failed to send SimpleAck for COV notification");
720 }
721 }
722 Err(e) => {
723 warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
724 }
725 }
726 } else {
727 debug!(
728 service = req.service_choice.to_raw(),
729 "Ignoring ConfirmedRequest (client mode)"
730 );
731 }
732 }
733 Apdu::UnconfirmedRequest(req) => {
734 if req.service_choice == UnconfirmedServiceChoice::I_AM {
735 match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
736 Ok(i_am) => {
737 debug!(
738 device = i_am.object_identifier.instance_number(),
739 vendor = i_am.vendor_id,
740 "Received IAm"
741 );
742 let (src_net, src_addr) = match source_network {
743 Some(npdu_addr) if !npdu_addr.mac_address.is_empty() => {
744 (Some(npdu_addr.network), Some(npdu_addr.mac_address.clone()))
745 }
746 _ => (None, None),
747 };
748 let device = DiscoveredDevice {
749 object_identifier: i_am.object_identifier,
750 mac_address: MacAddr::from_slice(source_mac),
751 max_apdu_length: i_am.max_apdu_length,
752 segmentation_supported: i_am.segmentation_supported,
753 max_segments_accepted: None,
754 vendor_id: i_am.vendor_id,
755 last_seen: std::time::Instant::now(),
756 source_network: src_net,
757 source_address: src_addr,
758 };
759 device_table.lock().await.upsert(device);
760 }
761 Err(e) => {
762 warn!(error = %e, "Failed to decode IAm");
763 }
764 }
765 } else if req.service_choice
766 == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
767 {
768 match COVNotificationRequest::decode(&req.service_request) {
769 Ok(notification) => {
770 debug!(
771 object = ?notification.monitored_object_identifier,
772 "Received UnconfirmedCOVNotification"
773 );
774 let _ = cov_tx.send(notification);
775 }
776 Err(e) => {
777 warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
778 }
779 }
780 } else {
781 debug!(
782 service = req.service_choice.to_raw(),
783 "Ignoring unconfirmed service in client dispatch"
784 );
785 }
786 }
787 Apdu::SegmentAck(sa) => {
788 let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
789 let senders = seg_ack_senders.lock().await;
790 if let Some(tx) = senders.get(&key) {
791 let _ = tx.try_send(sa);
792 } else {
793 debug!(
794 invoke_id = sa.invoke_id,
795 "Ignoring SegmentAck for unknown transaction"
796 );
797 }
798 }
799 }
800 }
801
802 async fn handle_segmented_complex_ack(
805 tsm: &Arc<Mutex<Tsm>>,
806 network: &Arc<NetworkLayer<T>>,
807 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
808 source_mac: &[u8],
809 ack: bacnet_encoding::apdu::ComplexAck,
810 segmented_response_accepted: bool,
811 ) {
812 let seq = ack.sequence_number.unwrap_or(0);
813 let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
814
815 debug!(
816 invoke_id = ack.invoke_id,
817 seq = seq,
818 more = ack.more_follows,
819 "Received segmented ComplexAck"
820 );
821
822 if !segmented_response_accepted {
824 let abort = Apdu::Abort(AbortPdu {
825 sent_by_server: false,
826 invoke_id: ack.invoke_id,
827 abort_reason: bacnet_types::enums::AbortReason::SEGMENTATION_NOT_SUPPORTED,
828 });
829 let mut buf = BytesMut::with_capacity(4);
830 encode_apdu(&mut buf, &abort);
831 let _ = network
832 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
833 .await;
834 return;
835 }
836
837 const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
838 if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
839 warn!(
840 invoke_id = ack.invoke_id,
841 sessions = seg_state.len(),
842 "Max concurrent segmented sessions reached, dropping segment"
843 );
844 return;
845 }
846
847 let proposed_ws = ack.proposed_window_size.unwrap_or(1);
848 let state = seg_state
849 .entry(key.clone())
850 .or_insert_with(|| SegmentedReceiveState {
851 receiver: SegmentReceiver::new(),
852 expected_next_seq: 0,
853 last_activity: Instant::now(),
854 window_position: 0,
855 proposed_window_size: proposed_ws,
856 });
857
858 state.last_activity = Instant::now();
859
860 if seq != state.expected_next_seq {
861 if seq < state.expected_next_seq {
863 debug!(
865 invoke_id = ack.invoke_id,
866 seq, "Discarding duplicate segment"
867 );
868 } else {
869 warn!(
871 invoke_id = ack.invoke_id,
872 expected = state.expected_next_seq,
873 received = seq,
874 "Segment gap detected, sending negative SegmentAck"
875 );
876 }
877 let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
878 negative_ack: seq >= state.expected_next_seq,
879 sent_by_server: false,
880 invoke_id: ack.invoke_id,
881 sequence_number: if state.expected_next_seq > 0 {
883 state.expected_next_seq.wrapping_sub(1)
884 } else {
885 0
886 },
887 actual_window_size: proposed_ws,
888 });
889 let mut buf = BytesMut::with_capacity(4);
890 encode_apdu(&mut buf, &neg_ack);
891 if let Err(e) = network
892 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
893 .await
894 {
895 warn!(error = %e, "Failed to send SegmentAck");
896 }
897 return;
898 }
899
900 if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
901 warn!(error = %e, "Rejecting oversized segment");
902 return;
903 }
904 state.expected_next_seq = seq.wrapping_add(1);
905 state.window_position += 1;
906
907 let should_ack = !ack.more_follows || state.window_position >= state.proposed_window_size;
909
910 if should_ack {
911 state.window_position = 0;
912 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
913 negative_ack: false,
914 sent_by_server: false,
915 invoke_id: ack.invoke_id,
916 sequence_number: seq,
917 actual_window_size: proposed_ws,
918 });
919 let mut buf = BytesMut::with_capacity(4);
920 encode_apdu(&mut buf, &seg_ack);
921 if let Err(e) = network
922 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
923 .await
924 {
925 warn!(error = %e, "Failed to send SegmentAck");
926 }
927 }
928
929 if !ack.more_follows {
930 let state = seg_state.remove(&key).unwrap();
931 let total = state.receiver.received_count();
932 match state.receiver.reassemble(total) {
933 Ok(service_data) => {
934 debug!(
935 invoke_id = ack.invoke_id,
936 segments = total,
937 bytes = service_data.len(),
938 "Reassembled segmented ComplexAck"
939 );
940 let mut tsm = tsm.lock().await;
941 tsm.complete_transaction(
942 source_mac,
943 ack.invoke_id,
944 TsmResponse::ComplexAck {
945 service_data: Bytes::from(service_data),
946 },
947 );
948 }
949 Err(e) => {
950 warn!(error = %e, "Failed to reassemble segmented ComplexAck");
951 }
952 }
953 }
954 }
955
956 pub fn local_mac(&self) -> &[u8] {
958 &self.local_mac
959 }
960
961 pub async fn confirmed_request(
967 &self,
968 destination_mac: &[u8],
969 service_choice: ConfirmedServiceChoice,
970 service_data: &[u8],
971 ) -> Result<Bytes, Error> {
972 self.confirmed_request_inner(
973 ConfirmedTarget::Local {
974 mac: destination_mac,
975 },
976 service_choice,
977 service_data,
978 )
979 .await
980 }
981
982 pub async fn confirmed_request_routed(
987 &self,
988 router_mac: &[u8],
989 dest_network: u16,
990 dest_mac: &[u8],
991 service_choice: ConfirmedServiceChoice,
992 service_data: &[u8],
993 ) -> Result<Bytes, Error> {
994 self.confirmed_request_inner(
995 ConfirmedTarget::Routed {
996 router_mac,
997 dest_network,
998 dest_mac,
999 },
1000 service_choice,
1001 service_data,
1002 )
1003 .await
1004 }
1005
1006 async fn confirmed_request_inner(
1007 &self,
1008 target: ConfirmedTarget<'_>,
1009 service_choice: ConfirmedServiceChoice,
1010 service_data: &[u8],
1011 ) -> Result<Bytes, Error> {
1012 let tsm_mac = target.tsm_mac();
1013
1014 if let ConfirmedTarget::Local { mac } = &target {
1015 let unsegmented_apdu_size = 4 + service_data.len();
1016 let (remote_max_apdu, remote_max_segments) = {
1017 let dt = self.device_table.lock().await;
1018 let device = dt.get_by_mac(mac);
1019 let max_apdu = device
1020 .map(|d| d.max_apdu_length as u16)
1021 .unwrap_or(self.config.max_apdu_length);
1022 let max_seg = device.and_then(|d| d.max_segments_accepted);
1023 (max_apdu, max_seg)
1024 };
1025 if unsegmented_apdu_size > remote_max_apdu as usize {
1026 return self
1027 .segmented_confirmed_request(
1028 mac,
1029 service_choice,
1030 service_data,
1031 remote_max_apdu,
1032 remote_max_segments,
1033 )
1034 .await;
1035 }
1036 }
1037
1038 let (invoke_id, rx) = {
1039 let mut tsm = self.tsm.lock().await;
1040 let invoke_id = tsm.allocate_invoke_id(tsm_mac).ok_or_else(|| {
1041 Error::Encoding("all invoke IDs exhausted for destination".into())
1042 })?;
1043 let rx = tsm.register_transaction(MacAddr::from_slice(tsm_mac), invoke_id);
1044 (invoke_id, rx)
1045 };
1046
1047 let mut guard = crate::tsm::TsmGuard::new(
1049 std::sync::Arc::clone(&self.tsm),
1050 MacAddr::from_slice(tsm_mac),
1051 invoke_id,
1052 );
1053
1054 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1055 segmented: false,
1056 more_follows: false,
1057 segmented_response_accepted: self.config.segmented_response_accepted,
1058 max_segments: self.config.max_segments,
1059 max_apdu_length: self.config.max_apdu_length,
1060 invoke_id,
1061 sequence_number: None,
1062 proposed_window_size: None,
1063 service_choice,
1064 service_request: Bytes::copy_from_slice(service_data),
1065 });
1066
1067 let mut buf = BytesMut::with_capacity(6 + service_data.len());
1068 encode_apdu(&mut buf, &pdu);
1069
1070 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
1071 let max_retries = self.config.apdu_retries;
1072 let mut attempts: u8 = 0;
1073 let mut rx = rx;
1074
1075 loop {
1076 let send_result = match &target {
1077 ConfirmedTarget::Local { mac } => {
1078 self.network
1079 .send_apdu(&buf, mac, true, NetworkPriority::NORMAL)
1080 .await
1081 }
1082 ConfirmedTarget::Routed {
1083 router_mac,
1084 dest_network,
1085 dest_mac,
1086 } => {
1087 self.network
1088 .send_apdu_routed(
1089 &buf,
1090 *dest_network,
1091 dest_mac,
1092 router_mac,
1093 true,
1094 NetworkPriority::NORMAL,
1095 )
1096 .await
1097 }
1098 };
1099 if let Err(e) = send_result {
1100 guard.mark_completed();
1101 let mut tsm = self.tsm.lock().await;
1102 tsm.cancel_transaction(tsm_mac, invoke_id);
1103 return Err(e);
1104 }
1105
1106 match timeout(timeout_duration, &mut rx).await {
1107 Ok(Ok(response)) => {
1108 guard.mark_completed();
1109 return match response {
1110 TsmResponse::SimpleAck => Ok(Bytes::new()),
1111 TsmResponse::ComplexAck { service_data } => Ok(service_data),
1112 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1113 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1114 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1115 };
1116 }
1117 Ok(Err(_)) => {
1118 guard.mark_completed();
1119 return Err(Error::Encoding("TSM response channel closed".into()));
1120 }
1121 Err(_timeout) => {
1122 attempts += 1;
1123 if attempts > max_retries {
1124 guard.mark_completed();
1125 let mut tsm = self.tsm.lock().await;
1126 tsm.cancel_transaction(tsm_mac, invoke_id);
1127 return Err(Error::Timeout(timeout_duration));
1128 }
1129 debug!(
1130 invoke_id,
1131 attempt = attempts,
1132 max_retries,
1133 "APDU timeout, retrying confirmed request"
1134 );
1135 }
1136 }
1137 }
1138 }
1139
1140 async fn segmented_confirmed_request(
1142 &self,
1143 destination_mac: &[u8],
1144 service_choice: ConfirmedServiceChoice,
1145 service_data: &[u8],
1146 remote_max_apdu: u16,
1147 remote_max_segments: Option<u32>,
1148 ) -> Result<Bytes, Error> {
1149 let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
1150 let segments = split_payload(service_data, max_seg_size);
1151 let total_segments = segments.len();
1152
1153 if total_segments > 256 {
1154 return Err(Error::Segmentation(format!(
1155 "payload requires {} segments, maximum is 256",
1156 total_segments
1157 )));
1158 }
1159
1160 if let Some(max_seg) = remote_max_segments {
1161 if total_segments > max_seg as usize {
1162 return Err(Error::Segmentation(format!(
1163 "request requires {} segments but remote accepts at most {}",
1164 total_segments, max_seg
1165 )));
1166 }
1167 }
1168
1169 debug!(
1170 total_segments,
1171 max_seg_size,
1172 service_data_len = service_data.len(),
1173 "Starting segmented confirmed request"
1174 );
1175
1176 let (invoke_id, rx) = {
1177 let mut tsm = self.tsm.lock().await;
1178 let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
1179 Error::Encoding("all invoke IDs exhausted for destination".into())
1180 })?;
1181 let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
1182 (invoke_id, rx)
1183 };
1184
1185 let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
1186 {
1187 let key = (MacAddr::from_slice(destination_mac), invoke_id);
1188 self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
1189 }
1190
1191 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
1193 let max_ack_retries = self.config.apdu_retries;
1194 let mut window_size = self.config.proposed_window_size.max(1) as usize;
1195 let mut next_seq: usize = 0;
1196 let mut neg_ack_retries: u32 = 0;
1197 const MAX_NEG_ACK_RETRIES: u32 = 10;
1198
1199 let result = async {
1200 while next_seq < total_segments {
1201 let window_end = (next_seq + window_size).min(total_segments);
1202
1203 for (seq, segment_data) in segments[next_seq..window_end]
1204 .iter()
1205 .enumerate()
1206 .map(|(i, s)| (next_seq + i, s))
1207 {
1208 let is_last = seq == total_segments - 1;
1209 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1210 segmented: true,
1211 more_follows: !is_last,
1212 segmented_response_accepted: self.config.segmented_response_accepted,
1213 max_segments: self.config.max_segments,
1214 max_apdu_length: self.config.max_apdu_length,
1215 invoke_id,
1216 sequence_number: Some(seq as u8),
1217 proposed_window_size: Some(self.config.proposed_window_size.max(1)),
1218 service_choice,
1219 service_request: segment_data.clone(),
1220 });
1221
1222 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1223 encode_apdu(&mut buf, &pdu);
1224
1225 self.network
1226 .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
1227 .await?;
1228
1229 debug!(seq, is_last, "Sent segment");
1230 }
1231
1232 let ack = {
1233 let mut ack_retries: u8 = 0;
1234 loop {
1235 match timeout(timeout_duration, seg_ack_rx.recv()).await {
1236 Ok(Some(ack)) => break ack,
1237 Ok(None) => {
1238 return Err(Error::Encoding("SegmentAck channel closed".into()));
1239 }
1240 Err(_timeout) => {
1241 ack_retries += 1;
1242 if ack_retries > max_ack_retries {
1243 return Err(Error::Timeout(timeout_duration));
1244 }
1245 warn!(
1246 attempt = ack_retries,
1247 "Retransmitting segmented request window"
1248 );
1249 for (seq, segment_data) in segments[next_seq..window_end]
1250 .iter()
1251 .enumerate()
1252 .map(|(i, s)| (next_seq + i, s))
1253 {
1254 let is_last = seq == total_segments - 1;
1255 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1256 segmented: true,
1257 more_follows: !is_last,
1258 segmented_response_accepted: self
1259 .config
1260 .segmented_response_accepted,
1261 max_segments: self.config.max_segments,
1262 max_apdu_length: self.config.max_apdu_length,
1263 invoke_id,
1264 sequence_number: Some(seq as u8),
1265 proposed_window_size: Some(
1266 self.config.proposed_window_size.max(1),
1267 ),
1268 service_choice,
1269 service_request: segment_data.clone(),
1270 });
1271
1272 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1273 encode_apdu(&mut buf, &pdu);
1274
1275 self.network
1276 .send_apdu(
1277 &buf,
1278 destination_mac,
1279 true,
1280 NetworkPriority::NORMAL,
1281 )
1282 .await?;
1283 }
1284 }
1285 }
1286 }
1287 };
1288
1289 debug!(
1290 seq = ack.sequence_number,
1291 negative = ack.negative_ack,
1292 window = ack.actual_window_size,
1293 "Received SegmentAck"
1294 );
1295
1296 window_size = ack.actual_window_size.max(1) as usize;
1297
1298 let ack_seq = ack.sequence_number as usize;
1299 if ack_seq >= total_segments {
1300 return Err(Error::Segmentation(format!(
1301 "SegmentAck sequence {} out of range (total {})",
1302 ack_seq, total_segments
1303 )));
1304 }
1305
1306 if ack.negative_ack {
1307 neg_ack_retries += 1;
1308 if neg_ack_retries > MAX_NEG_ACK_RETRIES {
1309 return Err(Error::Segmentation(
1310 "too many negative SegmentAck retransmissions".into(),
1311 ));
1312 }
1313 next_seq = ack_seq;
1314 } else {
1315 neg_ack_retries = 0;
1316 next_seq = ack_seq + 1;
1317 }
1318 }
1319
1320 timeout(timeout_duration, rx)
1321 .await
1322 .map_err(|_| Error::Timeout(timeout_duration))?
1323 .map_err(|_| Error::Encoding("TSM response channel closed".into()))
1324 }
1325 .await;
1326
1327 {
1328 let key = (MacAddr::from_slice(destination_mac), invoke_id);
1329 self.seg_ack_senders.lock().await.remove(&key);
1330 }
1331
1332 let response = match result {
1333 Ok(response) => response,
1334 Err(e) => {
1335 let mut tsm = self.tsm.lock().await;
1336 tsm.cancel_transaction(destination_mac, invoke_id);
1337 return Err(e);
1338 }
1339 };
1340
1341 match response {
1342 TsmResponse::SimpleAck => Ok(Bytes::new()),
1343 TsmResponse::ComplexAck { service_data } => Ok(service_data),
1344 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1345 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1346 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1347 }
1348 }
1349
1350 pub async fn unconfirmed_request(
1352 &self,
1353 destination_mac: &[u8],
1354 service_choice: UnconfirmedServiceChoice,
1355 service_data: &[u8],
1356 ) -> Result<(), Error> {
1357 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1358 service_choice,
1359 service_request: Bytes::copy_from_slice(service_data),
1360 });
1361
1362 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1363 encode_apdu(&mut buf, &pdu);
1364
1365 self.network
1366 .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1367 .await
1368 }
1369
1370 pub async fn broadcast_unconfirmed(
1372 &self,
1373 service_choice: UnconfirmedServiceChoice,
1374 service_data: &[u8],
1375 ) -> Result<(), Error> {
1376 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1377 service_choice,
1378 service_request: Bytes::copy_from_slice(service_data),
1379 });
1380
1381 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1382 encode_apdu(&mut buf, &pdu);
1383
1384 self.network
1385 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1386 .await
1387 }
1388
1389 pub async fn broadcast_global_unconfirmed(
1391 &self,
1392 service_choice: UnconfirmedServiceChoice,
1393 service_data: &[u8],
1394 ) -> Result<(), Error> {
1395 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1396 service_choice,
1397 service_request: Bytes::copy_from_slice(service_data),
1398 });
1399
1400 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1401 encode_apdu(&mut buf, &pdu);
1402
1403 self.network
1404 .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1405 .await
1406 }
1407
1408 pub async fn broadcast_network_unconfirmed(
1410 &self,
1411 service_choice: UnconfirmedServiceChoice,
1412 service_data: &[u8],
1413 dest_network: u16,
1414 ) -> Result<(), Error> {
1415 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1416 service_choice,
1417 service_request: Bytes::copy_from_slice(service_data),
1418 });
1419
1420 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1421 encode_apdu(&mut buf, &pdu);
1422
1423 self.network
1424 .broadcast_to_network(&buf, dest_network, false, NetworkPriority::NORMAL)
1425 .await
1426 }
1427
1428 pub async fn read_property(
1430 &self,
1431 destination_mac: &[u8],
1432 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1433 property_identifier: bacnet_types::enums::PropertyIdentifier,
1434 property_array_index: Option<u32>,
1435 ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1436 use bacnet_services::read_property::ReadPropertyRequest;
1437
1438 let request = ReadPropertyRequest {
1439 object_identifier,
1440 property_identifier,
1441 property_array_index,
1442 };
1443 let mut buf = BytesMut::new();
1444 request.encode(&mut buf);
1445
1446 let response_data = self
1447 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1448 .await?;
1449
1450 bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1451 }
1452
1453 pub async fn read_property_from_device(
1455 &self,
1456 device_instance: u32,
1457 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1458 property_identifier: bacnet_types::enums::PropertyIdentifier,
1459 property_array_index: Option<u32>,
1460 ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1461 let (mac, routing) = {
1462 let dt = self.device_table.lock().await;
1463 let device = dt.get(device_instance).ok_or_else(|| {
1464 Error::Encoding(format!("device {device_instance} not in device table"))
1465 })?;
1466 let routing = match (&device.source_network, &device.source_address) {
1467 (Some(snet), Some(sadr)) => Some((*snet, sadr.to_vec())),
1468 _ => None,
1469 };
1470 (device.mac_address.to_vec(), routing)
1471 };
1472
1473 if let Some((dnet, dadr)) = routing {
1474 self.read_property_routed(
1475 &mac,
1476 dnet,
1477 &dadr,
1478 object_identifier,
1479 property_identifier,
1480 property_array_index,
1481 )
1482 .await
1483 } else {
1484 self.read_property(
1485 &mac,
1486 object_identifier,
1487 property_identifier,
1488 property_array_index,
1489 )
1490 .await
1491 }
1492 }
1493
1494 pub async fn read_property_routed(
1496 &self,
1497 router_mac: &[u8],
1498 dest_network: u16,
1499 dest_mac: &[u8],
1500 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1501 property_identifier: bacnet_types::enums::PropertyIdentifier,
1502 property_array_index: Option<u32>,
1503 ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1504 use bacnet_services::read_property::ReadPropertyRequest;
1505
1506 let request = ReadPropertyRequest {
1507 object_identifier,
1508 property_identifier,
1509 property_array_index,
1510 };
1511 let mut buf = BytesMut::new();
1512 request.encode(&mut buf);
1513
1514 let response_data = self
1515 .confirmed_request_routed(
1516 router_mac,
1517 dest_network,
1518 dest_mac,
1519 ConfirmedServiceChoice::READ_PROPERTY,
1520 &buf,
1521 )
1522 .await?;
1523
1524 bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1525 }
1526
1527 pub async fn write_property(
1529 &self,
1530 destination_mac: &[u8],
1531 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1532 property_identifier: bacnet_types::enums::PropertyIdentifier,
1533 property_array_index: Option<u32>,
1534 property_value: Vec<u8>,
1535 priority: Option<u8>,
1536 ) -> Result<(), Error> {
1537 use bacnet_services::write_property::WritePropertyRequest;
1538
1539 let request = WritePropertyRequest {
1540 object_identifier,
1541 property_identifier,
1542 property_array_index,
1543 property_value,
1544 priority,
1545 };
1546 let mut buf = BytesMut::new();
1547 request.encode(&mut buf);
1548
1549 let _ = self
1550 .confirmed_request(
1551 destination_mac,
1552 ConfirmedServiceChoice::WRITE_PROPERTY,
1553 &buf,
1554 )
1555 .await?;
1556
1557 Ok(())
1558 }
1559
1560 pub async fn read_property_multiple(
1562 &self,
1563 destination_mac: &[u8],
1564 specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1565 ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1566 use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1567
1568 let request = ReadPropertyMultipleRequest {
1569 list_of_read_access_specs: specs,
1570 };
1571 let mut buf = BytesMut::new();
1572 request.encode(&mut buf);
1573
1574 let response_data = self
1575 .confirmed_request(
1576 destination_mac,
1577 ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1578 &buf,
1579 )
1580 .await?;
1581
1582 ReadPropertyMultipleACK::decode(&response_data)
1583 }
1584
1585 pub async fn write_property_multiple(
1587 &self,
1588 destination_mac: &[u8],
1589 specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1590 ) -> Result<(), Error> {
1591 use bacnet_services::wpm::WritePropertyMultipleRequest;
1592
1593 let request = WritePropertyMultipleRequest {
1594 list_of_write_access_specs: specs,
1595 };
1596 let mut buf = BytesMut::new();
1597 request.encode(&mut buf);
1598
1599 let _ = self
1600 .confirmed_request(
1601 destination_mac,
1602 ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1603 &buf,
1604 )
1605 .await?;
1606
1607 Ok(())
1608 }
1609
1610 pub async fn read_property_multiple_from_device(
1616 &self,
1617 device_instance: u32,
1618 specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1619 ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1620 let (mac, routing) = self.resolve_device(device_instance).await?;
1621
1622 if let Some((dnet, dadr)) = routing {
1623 use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1624
1625 let request = ReadPropertyMultipleRequest {
1626 list_of_read_access_specs: specs,
1627 };
1628 let mut buf = BytesMut::new();
1629 request.encode(&mut buf);
1630
1631 let response_data = self
1632 .confirmed_request_routed(
1633 &mac,
1634 dnet,
1635 &dadr,
1636 ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1637 &buf,
1638 )
1639 .await?;
1640
1641 ReadPropertyMultipleACK::decode(&response_data)
1642 } else {
1643 self.read_property_multiple(&mac, specs).await
1644 }
1645 }
1646
1647 pub async fn write_property_to_device(
1649 &self,
1650 device_instance: u32,
1651 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1652 property_identifier: bacnet_types::enums::PropertyIdentifier,
1653 property_array_index: Option<u32>,
1654 property_value: Vec<u8>,
1655 priority: Option<u8>,
1656 ) -> Result<(), Error> {
1657 let (mac, routing) = self.resolve_device(device_instance).await?;
1658
1659 if let Some((dnet, dadr)) = routing {
1660 use bacnet_services::write_property::WritePropertyRequest;
1661
1662 let request = WritePropertyRequest {
1663 object_identifier,
1664 property_identifier,
1665 property_array_index,
1666 property_value,
1667 priority,
1668 };
1669 let mut buf = BytesMut::new();
1670 request.encode(&mut buf);
1671
1672 let _ = self
1673 .confirmed_request_routed(
1674 &mac,
1675 dnet,
1676 &dadr,
1677 ConfirmedServiceChoice::WRITE_PROPERTY,
1678 &buf,
1679 )
1680 .await?;
1681 Ok(())
1682 } else {
1683 self.write_property(
1684 &mac,
1685 object_identifier,
1686 property_identifier,
1687 property_array_index,
1688 property_value,
1689 priority,
1690 )
1691 .await
1692 }
1693 }
1694
1695 pub async fn write_property_multiple_to_device(
1697 &self,
1698 device_instance: u32,
1699 specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1700 ) -> Result<(), Error> {
1701 let (mac, routing) = self.resolve_device(device_instance).await?;
1702
1703 if let Some((dnet, dadr)) = routing {
1704 use bacnet_services::wpm::WritePropertyMultipleRequest;
1705
1706 let request = WritePropertyMultipleRequest {
1707 list_of_write_access_specs: specs,
1708 };
1709 let mut buf = BytesMut::new();
1710 request.encode(&mut buf);
1711
1712 let _ = self
1713 .confirmed_request_routed(
1714 &mac,
1715 dnet,
1716 &dadr,
1717 ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1718 &buf,
1719 )
1720 .await?;
1721 Ok(())
1722 } else {
1723 self.write_property_multiple(&mac, specs).await
1724 }
1725 }
1726
1727 async fn resolve_device(
1729 &self,
1730 device_instance: u32,
1731 ) -> Result<(Vec<u8>, Option<(u16, Vec<u8>)>), Error> {
1732 let dt = self.device_table.lock().await;
1733 let device = dt.get(device_instance).ok_or_else(|| {
1734 Error::Encoding(format!("device {device_instance} not in device table"))
1735 })?;
1736 let routing = match (&device.source_network, &device.source_address) {
1737 (Some(snet), Some(sadr)) => Some((*snet, sadr.to_vec())),
1738 _ => None,
1739 };
1740 Ok((device.mac_address.to_vec(), routing))
1741 }
1742
1743 pub async fn read_property_from_devices(
1753 &self,
1754 requests: Vec<DeviceReadRequest>,
1755 max_concurrent: Option<usize>,
1756 ) -> Vec<DeviceReadResult> {
1757 use futures_util::stream::{self, StreamExt};
1758
1759 let concurrency = max_concurrent.unwrap_or(DEFAULT_BATCH_CONCURRENCY);
1760
1761 stream::iter(requests)
1762 .map(|req| async move {
1763 let result = self
1764 .read_property_from_device(
1765 req.device_instance,
1766 req.object_identifier,
1767 req.property_identifier,
1768 req.property_array_index,
1769 )
1770 .await;
1771 DeviceReadResult {
1772 device_instance: req.device_instance,
1773 result,
1774 }
1775 })
1776 .buffer_unordered(concurrency)
1777 .collect()
1778 .await
1779 }
1780
1781 pub async fn read_property_multiple_from_devices(
1787 &self,
1788 requests: Vec<DeviceRpmRequest>,
1789 max_concurrent: Option<usize>,
1790 ) -> Vec<DeviceRpmResult> {
1791 use futures_util::stream::{self, StreamExt};
1792
1793 let concurrency = max_concurrent.unwrap_or(DEFAULT_BATCH_CONCURRENCY);
1794
1795 stream::iter(requests)
1796 .map(|req| async move {
1797 let result = self
1798 .read_property_multiple_from_device(req.device_instance, req.specs)
1799 .await;
1800 DeviceRpmResult {
1801 device_instance: req.device_instance,
1802 result,
1803 }
1804 })
1805 .buffer_unordered(concurrency)
1806 .collect()
1807 .await
1808 }
1809
1810 pub async fn write_property_to_devices(
1815 &self,
1816 requests: Vec<DeviceWriteRequest>,
1817 max_concurrent: Option<usize>,
1818 ) -> Vec<DeviceWriteResult> {
1819 use futures_util::stream::{self, StreamExt};
1820
1821 let concurrency = max_concurrent.unwrap_or(DEFAULT_BATCH_CONCURRENCY);
1822
1823 stream::iter(requests)
1824 .map(|req| async move {
1825 let result = self
1826 .write_property_to_device(
1827 req.device_instance,
1828 req.object_identifier,
1829 req.property_identifier,
1830 req.property_array_index,
1831 req.property_value,
1832 req.priority,
1833 )
1834 .await;
1835 DeviceWriteResult {
1836 device_instance: req.device_instance,
1837 result,
1838 }
1839 })
1840 .buffer_unordered(concurrency)
1841 .collect()
1842 .await
1843 }
1844
1845 pub async fn who_is(
1847 &self,
1848 low_limit: Option<u32>,
1849 high_limit: Option<u32>,
1850 ) -> Result<(), Error> {
1851 use bacnet_services::who_is::WhoIsRequest;
1852
1853 let request = WhoIsRequest {
1854 low_limit,
1855 high_limit,
1856 };
1857 let mut buf = BytesMut::new();
1858 request.encode(&mut buf);
1859
1860 self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1861 .await
1862 }
1863
1864 pub async fn who_is_directed(
1866 &self,
1867 destination_mac: &[u8],
1868 low_limit: Option<u32>,
1869 high_limit: Option<u32>,
1870 ) -> Result<(), Error> {
1871 use bacnet_services::who_is::WhoIsRequest;
1872
1873 let request = WhoIsRequest {
1874 low_limit,
1875 high_limit,
1876 };
1877 let mut buf = BytesMut::new();
1878 request.encode(&mut buf);
1879
1880 self.unconfirmed_request(destination_mac, UnconfirmedServiceChoice::WHO_IS, &buf)
1881 .await
1882 }
1883
1884 pub async fn who_is_network(
1886 &self,
1887 dest_network: u16,
1888 low_limit: Option<u32>,
1889 high_limit: Option<u32>,
1890 ) -> Result<(), Error> {
1891 use bacnet_services::who_is::WhoIsRequest;
1892
1893 let request = WhoIsRequest {
1894 low_limit,
1895 high_limit,
1896 };
1897 let mut buf = BytesMut::new();
1898 request.encode(&mut buf);
1899
1900 self.broadcast_network_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf, dest_network)
1901 .await
1902 }
1903
1904 pub async fn who_has(
1906 &self,
1907 object: bacnet_services::who_has::WhoHasObject,
1908 low_limit: Option<u32>,
1909 high_limit: Option<u32>,
1910 ) -> Result<(), Error> {
1911 use bacnet_services::who_has::WhoHasRequest;
1912
1913 let request = WhoHasRequest {
1914 low_limit,
1915 high_limit,
1916 object,
1917 };
1918 let mut buf = BytesMut::new();
1919 request.encode(&mut buf)?;
1920
1921 self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1922 .await
1923 }
1924
1925 pub async fn subscribe_cov(
1927 &self,
1928 destination_mac: &[u8],
1929 subscriber_process_identifier: u32,
1930 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1931 confirmed: bool,
1932 lifetime: Option<u32>,
1933 ) -> Result<(), Error> {
1934 use bacnet_services::cov::SubscribeCOVRequest;
1935
1936 let request = SubscribeCOVRequest {
1937 subscriber_process_identifier,
1938 monitored_object_identifier,
1939 issue_confirmed_notifications: Some(confirmed),
1940 lifetime,
1941 };
1942 let mut buf = BytesMut::new();
1943 request.encode(&mut buf);
1944
1945 let _ = self
1946 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1947 .await?;
1948
1949 Ok(())
1950 }
1951
1952 pub async fn unsubscribe_cov(
1954 &self,
1955 destination_mac: &[u8],
1956 subscriber_process_identifier: u32,
1957 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1958 ) -> Result<(), Error> {
1959 use bacnet_services::cov::SubscribeCOVRequest;
1960
1961 let request = SubscribeCOVRequest {
1962 subscriber_process_identifier,
1963 monitored_object_identifier,
1964 issue_confirmed_notifications: None,
1965 lifetime: None,
1966 };
1967 let mut buf = BytesMut::new();
1968 request.encode(&mut buf);
1969
1970 let _ = self
1971 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1972 .await?;
1973
1974 Ok(())
1975 }
1976
1977 pub async fn delete_object(
1979 &self,
1980 destination_mac: &[u8],
1981 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1982 ) -> Result<(), Error> {
1983 use bacnet_services::object_mgmt::DeleteObjectRequest;
1984
1985 let request = DeleteObjectRequest { object_identifier };
1986 let mut buf = BytesMut::new();
1987 request.encode(&mut buf);
1988
1989 let _ = self
1990 .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1991 .await?;
1992
1993 Ok(())
1994 }
1995
1996 pub async fn create_object(
1998 &self,
1999 destination_mac: &[u8],
2000 object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
2001 initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
2002 ) -> Result<Bytes, Error> {
2003 use bacnet_services::object_mgmt::CreateObjectRequest;
2004
2005 let request = CreateObjectRequest {
2006 object_specifier,
2007 list_of_initial_values: initial_values,
2008 };
2009 let mut buf = BytesMut::new();
2010 request.encode(&mut buf);
2011
2012 self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
2013 .await
2014 }
2015
2016 pub async fn device_communication_control(
2018 &self,
2019 destination_mac: &[u8],
2020 enable_disable: bacnet_types::enums::EnableDisable,
2021 time_duration: Option<u16>,
2022 password: Option<String>,
2023 ) -> Result<(), Error> {
2024 use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
2025
2026 let request = DeviceCommunicationControlRequest {
2027 time_duration,
2028 enable_disable,
2029 password,
2030 };
2031 let mut buf = BytesMut::new();
2032 request.encode(&mut buf)?;
2033
2034 let _ = self
2035 .confirmed_request(
2036 destination_mac,
2037 ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
2038 &buf,
2039 )
2040 .await?;
2041
2042 Ok(())
2043 }
2044
2045 pub async fn reinitialize_device(
2047 &self,
2048 destination_mac: &[u8],
2049 reinitialized_state: bacnet_types::enums::ReinitializedState,
2050 password: Option<String>,
2051 ) -> Result<(), Error> {
2052 use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
2053
2054 let request = ReinitializeDeviceRequest {
2055 reinitialized_state,
2056 password,
2057 };
2058 let mut buf = BytesMut::new();
2059 request.encode(&mut buf)?;
2060
2061 let _ = self
2062 .confirmed_request(
2063 destination_mac,
2064 ConfirmedServiceChoice::REINITIALIZE_DEVICE,
2065 &buf,
2066 )
2067 .await?;
2068
2069 Ok(())
2070 }
2071
2072 pub async fn get_event_information(
2074 &self,
2075 destination_mac: &[u8],
2076 last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
2077 ) -> Result<Bytes, Error> {
2078 use bacnet_services::alarm_event::GetEventInformationRequest;
2079
2080 let request = GetEventInformationRequest {
2081 last_received_object_identifier,
2082 };
2083 let mut buf = BytesMut::new();
2084 request.encode(&mut buf);
2085
2086 self.confirmed_request(
2087 destination_mac,
2088 ConfirmedServiceChoice::GET_EVENT_INFORMATION,
2089 &buf,
2090 )
2091 .await
2092 }
2093
2094 pub async fn acknowledge_alarm(
2096 &self,
2097 destination_mac: &[u8],
2098 acknowledging_process_identifier: u32,
2099 event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
2100 event_state_acknowledged: u32,
2101 acknowledgment_source: &str,
2102 ) -> Result<(), Error> {
2103 use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
2104
2105 let request = AcknowledgeAlarmRequest {
2106 acknowledging_process_identifier,
2107 event_object_identifier,
2108 event_state_acknowledged,
2109 timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
2110 acknowledgment_source: acknowledgment_source.to_string(),
2111 time_of_acknowledgment: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
2112 };
2113 let mut buf = BytesMut::new();
2114 request.encode(&mut buf)?;
2115
2116 let _ = self
2117 .confirmed_request(
2118 destination_mac,
2119 ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
2120 &buf,
2121 )
2122 .await?;
2123
2124 Ok(())
2125 }
2126
2127 pub async fn read_range(
2129 &self,
2130 destination_mac: &[u8],
2131 object_identifier: bacnet_types::primitives::ObjectIdentifier,
2132 property_identifier: bacnet_types::enums::PropertyIdentifier,
2133 property_array_index: Option<u32>,
2134 range: Option<bacnet_services::read_range::RangeSpec>,
2135 ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
2136 use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
2137
2138 let request = ReadRangeRequest {
2139 object_identifier,
2140 property_identifier,
2141 property_array_index,
2142 range,
2143 };
2144 let mut buf = BytesMut::new();
2145 request.encode(&mut buf);
2146
2147 let response_data = self
2148 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
2149 .await?;
2150
2151 ReadRangeAck::decode(&response_data)
2152 }
2153
2154 pub async fn atomic_read_file(
2156 &self,
2157 destination_mac: &[u8],
2158 file_identifier: bacnet_types::primitives::ObjectIdentifier,
2159 access: bacnet_services::file::FileAccessMethod,
2160 ) -> Result<Bytes, Error> {
2161 use bacnet_services::file::AtomicReadFileRequest;
2162
2163 let request = AtomicReadFileRequest {
2164 file_identifier,
2165 access,
2166 };
2167 let mut buf = BytesMut::new();
2168 request.encode(&mut buf);
2169
2170 self.confirmed_request(
2171 destination_mac,
2172 ConfirmedServiceChoice::ATOMIC_READ_FILE,
2173 &buf,
2174 )
2175 .await
2176 }
2177
2178 pub async fn atomic_write_file(
2180 &self,
2181 destination_mac: &[u8],
2182 file_identifier: bacnet_types::primitives::ObjectIdentifier,
2183 access: bacnet_services::file::FileWriteAccessMethod,
2184 ) -> Result<Bytes, Error> {
2185 use bacnet_services::file::AtomicWriteFileRequest;
2186
2187 let request = AtomicWriteFileRequest {
2188 file_identifier,
2189 access,
2190 };
2191 let mut buf = BytesMut::new();
2192 request.encode(&mut buf);
2193
2194 self.confirmed_request(
2195 destination_mac,
2196 ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
2197 &buf,
2198 )
2199 .await
2200 }
2201
2202 pub async fn add_list_element(
2204 &self,
2205 destination_mac: &[u8],
2206 object_identifier: bacnet_types::primitives::ObjectIdentifier,
2207 property_identifier: bacnet_types::enums::PropertyIdentifier,
2208 property_array_index: Option<u32>,
2209 list_of_elements: Vec<u8>,
2210 ) -> Result<(), Error> {
2211 use bacnet_services::list_manipulation::ListElementRequest;
2212
2213 let request = ListElementRequest {
2214 object_identifier,
2215 property_identifier,
2216 property_array_index,
2217 list_of_elements,
2218 };
2219 let mut buf = BytesMut::new();
2220 request.encode(&mut buf);
2221
2222 let _ = self
2223 .confirmed_request(
2224 destination_mac,
2225 ConfirmedServiceChoice::ADD_LIST_ELEMENT,
2226 &buf,
2227 )
2228 .await?;
2229
2230 Ok(())
2231 }
2232
2233 pub async fn remove_list_element(
2235 &self,
2236 destination_mac: &[u8],
2237 object_identifier: bacnet_types::primitives::ObjectIdentifier,
2238 property_identifier: bacnet_types::enums::PropertyIdentifier,
2239 property_array_index: Option<u32>,
2240 list_of_elements: Vec<u8>,
2241 ) -> Result<(), Error> {
2242 use bacnet_services::list_manipulation::ListElementRequest;
2243
2244 let request = ListElementRequest {
2245 object_identifier,
2246 property_identifier,
2247 property_array_index,
2248 list_of_elements,
2249 };
2250 let mut buf = BytesMut::new();
2251 request.encode(&mut buf);
2252
2253 let _ = self
2254 .confirmed_request(
2255 destination_mac,
2256 ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
2257 &buf,
2258 )
2259 .await?;
2260
2261 Ok(())
2262 }
2263
2264 pub async fn time_synchronization(
2266 &self,
2267 destination_mac: &[u8],
2268 date: bacnet_types::primitives::Date,
2269 time: bacnet_types::primitives::Time,
2270 ) -> Result<(), Error> {
2271 use bacnet_services::device_mgmt::TimeSynchronizationRequest;
2272
2273 let request = TimeSynchronizationRequest { date, time };
2274 let mut buf = BytesMut::new();
2275 request.encode(&mut buf);
2276
2277 self.unconfirmed_request(
2278 destination_mac,
2279 UnconfirmedServiceChoice::TIME_SYNCHRONIZATION,
2280 &buf,
2281 )
2282 .await
2283 }
2284
2285 pub async fn utc_time_synchronization(
2287 &self,
2288 destination_mac: &[u8],
2289 date: bacnet_types::primitives::Date,
2290 time: bacnet_types::primitives::Time,
2291 ) -> Result<(), Error> {
2292 use bacnet_services::device_mgmt::TimeSynchronizationRequest;
2293
2294 let request = TimeSynchronizationRequest { date, time };
2295 let mut buf = BytesMut::new();
2296 request.encode(&mut buf);
2297
2298 self.unconfirmed_request(
2299 destination_mac,
2300 UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
2301 &buf,
2302 )
2303 .await
2304 }
2305
2306 pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
2309 self.cov_tx.subscribe()
2310 }
2311
2312 pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
2314 self.device_table.lock().await.all()
2315 }
2316
2317 pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
2319 self.device_table.lock().await.get(instance).cloned()
2320 }
2321
2322 pub async fn clear_devices(&self) {
2324 self.device_table.lock().await.clear();
2325 }
2326
2327 pub async fn add_device(&self, instance: u32, mac: &[u8]) -> Result<(), Error> {
2333 let oid = bacnet_types::primitives::ObjectIdentifier::new(
2334 bacnet_types::enums::ObjectType::DEVICE,
2335 instance,
2336 )?;
2337 let device = DiscoveredDevice {
2338 object_identifier: oid,
2339 mac_address: MacAddr::from_slice(mac),
2340 max_apdu_length: 1476,
2341 segmentation_supported: bacnet_types::enums::Segmentation::NONE,
2342 max_segments_accepted: None,
2343 vendor_id: 0,
2344 last_seen: std::time::Instant::now(),
2345 source_network: None,
2346 source_address: None,
2347 };
2348 self.device_table.lock().await.upsert(device);
2349 Ok(())
2350 }
2351
2352 pub async fn stop(&mut self) -> Result<(), Error> {
2354 if let Some(task) = self.dispatch_task.take() {
2355 task.abort();
2356 let _ = task.await;
2357 }
2358 Ok(())
2359 }
2360}
2361
2362#[cfg(test)]
2363mod tests {
2364 use super::*;
2365 use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
2366 use std::net::Ipv4Addr;
2367 use tokio::time::Duration;
2368
2369 async fn make_client() -> BACnetClient<BipTransport> {
2370 BACnetClient::builder()
2371 .interface(Ipv4Addr::LOCALHOST)
2372 .port(0)
2373 .apdu_timeout_ms(2000)
2374 .build()
2375 .await
2376 .unwrap()
2377 }
2378
2379 #[tokio::test]
2380 async fn client_start_stop() {
2381 let mut client = make_client().await;
2382 assert!(!client.local_mac().is_empty());
2383 client.stop().await.unwrap();
2384 }
2385
2386 #[tokio::test]
2387 async fn confirmed_request_simple_ack() {
2388 let mut client_a = make_client().await;
2389
2390 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2391 let mut net_b = NetworkLayer::new(transport_b);
2392 let mut rx_b = net_b.start().await.unwrap();
2393 let b_mac = net_b.local_mac().to_vec();
2394
2395 let b_handle = tokio::spawn(async move {
2396 let received = timeout(Duration::from_secs(2), rx_b.recv())
2397 .await
2398 .expect("B timed out")
2399 .expect("B channel closed");
2400
2401 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2402 if let Apdu::ConfirmedRequest(req) = decoded {
2403 let ack = Apdu::SimpleAck(SimpleAck {
2404 invoke_id: req.invoke_id,
2405 service_choice: req.service_choice,
2406 });
2407 let mut buf = BytesMut::new();
2408 encode_apdu(&mut buf, &ack);
2409 net_b
2410 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2411 .await
2412 .unwrap();
2413 }
2414 net_b.stop().await.unwrap();
2415 });
2416
2417 let result = client_a
2418 .confirmed_request(
2419 &b_mac,
2420 ConfirmedServiceChoice::WRITE_PROPERTY,
2421 &[0x01, 0x02],
2422 )
2423 .await;
2424
2425 assert!(result.is_ok());
2426 let response = result.unwrap();
2427 assert!(response.is_empty());
2428
2429 b_handle.await.unwrap();
2430 client_a.stop().await.unwrap();
2431 }
2432
2433 #[tokio::test]
2434 async fn confirmed_request_complex_ack() {
2435 let mut client_a = make_client().await;
2436
2437 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2438 let mut net_b = NetworkLayer::new(transport_b);
2439 let mut rx_b = net_b.start().await.unwrap();
2440 let b_mac = net_b.local_mac().to_vec();
2441
2442 let b_handle = tokio::spawn(async move {
2443 let received = timeout(Duration::from_secs(2), rx_b.recv())
2444 .await
2445 .unwrap()
2446 .unwrap();
2447
2448 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2449 if let Apdu::ConfirmedRequest(req) = decoded {
2450 let ack = Apdu::ComplexAck(ComplexAck {
2451 segmented: false,
2452 more_follows: false,
2453 invoke_id: req.invoke_id,
2454 sequence_number: None,
2455 proposed_window_size: None,
2456 service_choice: req.service_choice,
2457 service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
2458 });
2459 let mut buf = BytesMut::new();
2460 encode_apdu(&mut buf, &ack);
2461 net_b
2462 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2463 .await
2464 .unwrap();
2465 }
2466 net_b.stop().await.unwrap();
2467 });
2468
2469 let result = client_a
2470 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2471 .await;
2472
2473 assert!(result.is_ok());
2474 assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
2475
2476 b_handle.await.unwrap();
2477 client_a.stop().await.unwrap();
2478 }
2479
2480 #[tokio::test]
2481 async fn confirmed_request_timeout() {
2482 let mut client = make_client().await;
2483 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2484 let result = client
2485 .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2486 .await;
2487 assert!(result.is_err());
2488 client.stop().await.unwrap();
2489 }
2490
2491 #[tokio::test]
2492 async fn segmented_complex_ack_reassembly() {
2493 let mut client = make_client().await;
2494
2495 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2496 let mut net_b = NetworkLayer::new(transport_b);
2497 let mut rx_b = net_b.start().await.unwrap();
2498 let b_mac = net_b.local_mac().to_vec();
2499
2500 let b_handle = tokio::spawn(async move {
2501 let received = timeout(Duration::from_secs(2), rx_b.recv())
2502 .await
2503 .unwrap()
2504 .unwrap();
2505
2506 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2507 let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
2508 req.invoke_id
2509 } else {
2510 panic!("Expected ConfirmedRequest");
2511 };
2512
2513 let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
2514 let segments: Vec<Bytes> = vec![
2515 Bytes::from_static(&[0x01, 0x02, 0x03]),
2516 Bytes::from_static(&[0x04, 0x05, 0x06]),
2517 Bytes::from_static(&[0x07, 0x08]),
2518 ];
2519
2520 for (i, seg) in segments.iter().enumerate() {
2521 let is_last = i == segments.len() - 1;
2522 let ack = Apdu::ComplexAck(ComplexAck {
2523 segmented: true,
2524 more_follows: !is_last,
2525 invoke_id,
2526 sequence_number: Some(i as u8),
2527 proposed_window_size: Some(1),
2528 service_choice,
2529 service_ack: seg.clone(),
2530 });
2531 let mut buf = BytesMut::new();
2532 encode_apdu(&mut buf, &ack);
2533 net_b
2534 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2535 .await
2536 .unwrap();
2537
2538 let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
2539 .await
2540 .unwrap()
2541 .unwrap();
2542 let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
2543 if let Apdu::SegmentAck(sa) = decoded {
2544 assert_eq!(sa.invoke_id, invoke_id);
2545 assert_eq!(sa.sequence_number, i as u8);
2546 } else {
2547 panic!("Expected SegmentAck, got {:?}", decoded);
2548 }
2549 }
2550
2551 net_b.stop().await.unwrap();
2552 });
2553
2554 let result = client
2555 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2556 .await;
2557
2558 assert!(result.is_ok());
2559 assert_eq!(
2560 result.unwrap(),
2561 vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
2562 );
2563
2564 b_handle.await.unwrap();
2565 client.stop().await.unwrap();
2566 }
2567
2568 #[tokio::test]
2569 async fn segmented_confirmed_request_sends_segments() {
2570 let mut client = BACnetClient::builder()
2571 .interface(Ipv4Addr::LOCALHOST)
2572 .port(0)
2573 .apdu_timeout_ms(5000)
2574 .max_apdu_length(50)
2575 .build()
2576 .await
2577 .unwrap();
2578
2579 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2580 let mut net_b = NetworkLayer::new(transport_b);
2581 let mut rx_b = net_b.start().await.unwrap();
2582 let b_mac = net_b.local_mac().to_vec();
2583
2584 let service_data: Vec<u8> = (0u8..100).collect();
2585 let expected_data = service_data.clone();
2586
2587 let b_handle = tokio::spawn(async move {
2588 let mut all_service_data = Vec::new();
2589 let mut client_mac;
2590 let mut invoke_id;
2591
2592 loop {
2593 let received = timeout(Duration::from_secs(3), rx_b.recv())
2594 .await
2595 .expect("server timed out waiting for segment")
2596 .expect("server channel closed");
2597
2598 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2599 if let Apdu::ConfirmedRequest(req) = decoded {
2600 assert!(req.segmented, "expected segmented request");
2601 invoke_id = req.invoke_id;
2602 client_mac = received.source_mac.clone();
2603 let seq = req.sequence_number.unwrap();
2604 all_service_data.extend_from_slice(&req.service_request);
2605
2606 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2607 negative_ack: false,
2608 sent_by_server: true,
2609 invoke_id,
2610 sequence_number: seq,
2611 actual_window_size: 1,
2612 });
2613 let mut buf = BytesMut::new();
2614 encode_apdu(&mut buf, &seg_ack);
2615 net_b
2616 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2617 .await
2618 .unwrap();
2619
2620 if !req.more_follows {
2621 break;
2622 }
2623 } else {
2624 panic!("Expected ConfirmedRequest, got {:?}", decoded);
2625 }
2626 }
2627
2628 let ack = Apdu::SimpleAck(SimpleAck {
2629 invoke_id,
2630 service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
2631 });
2632 let mut buf = BytesMut::new();
2633 encode_apdu(&mut buf, &ack);
2634 net_b
2635 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2636 .await
2637 .unwrap();
2638
2639 net_b.stop().await.unwrap();
2640 all_service_data
2641 });
2642
2643 let result = client
2644 .confirmed_request(
2645 &b_mac,
2646 ConfirmedServiceChoice::WRITE_PROPERTY,
2647 &service_data,
2648 )
2649 .await;
2650
2651 assert!(result.is_ok());
2652 assert!(result.unwrap().is_empty());
2653
2654 let received_data = b_handle.await.unwrap();
2655 assert_eq!(received_data, expected_data);
2656
2657 client.stop().await.unwrap();
2658 }
2659
2660 #[tokio::test]
2661 async fn segmented_request_with_complex_ack_response() {
2662 let mut client = BACnetClient::builder()
2663 .interface(Ipv4Addr::LOCALHOST)
2664 .port(0)
2665 .apdu_timeout_ms(5000)
2666 .max_apdu_length(50)
2667 .build()
2668 .await
2669 .unwrap();
2670
2671 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2672 let mut net_b = NetworkLayer::new(transport_b);
2673 let mut rx_b = net_b.start().await.unwrap();
2674 let b_mac = net_b.local_mac().to_vec();
2675
2676 let service_data: Vec<u8> = (0u8..60).collect();
2677
2678 let b_handle = tokio::spawn(async move {
2679 let mut client_mac;
2680 let mut invoke_id;
2681
2682 loop {
2683 let received = timeout(Duration::from_secs(3), rx_b.recv())
2684 .await
2685 .unwrap()
2686 .unwrap();
2687
2688 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2689 if let Apdu::ConfirmedRequest(req) = decoded {
2690 invoke_id = req.invoke_id;
2691 client_mac = received.source_mac.clone();
2692 let seq = req.sequence_number.unwrap();
2693
2694 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2695 negative_ack: false,
2696 sent_by_server: true,
2697 invoke_id,
2698 sequence_number: seq,
2699 actual_window_size: 1,
2700 });
2701 let mut buf = BytesMut::new();
2702 encode_apdu(&mut buf, &seg_ack);
2703 net_b
2704 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2705 .await
2706 .unwrap();
2707
2708 if !req.more_follows {
2709 break;
2710 }
2711 }
2712 }
2713
2714 let ack = Apdu::ComplexAck(ComplexAck {
2715 segmented: false,
2716 more_follows: false,
2717 invoke_id,
2718 sequence_number: None,
2719 proposed_window_size: None,
2720 service_choice: ConfirmedServiceChoice::READ_PROPERTY,
2721 service_ack: Bytes::from_static(&[0xCA, 0xFE]),
2722 });
2723 let mut buf = BytesMut::new();
2724 encode_apdu(&mut buf, &ack);
2725 net_b
2726 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2727 .await
2728 .unwrap();
2729
2730 net_b.stop().await.unwrap();
2731 });
2732
2733 let result = client
2734 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
2735 .await;
2736
2737 assert!(result.is_ok());
2738 assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
2739
2740 b_handle.await.unwrap();
2741 client.stop().await.unwrap();
2742 }
2743
2744 #[tokio::test]
2745 async fn segment_overflow_guard() {
2746 let mut client = BACnetClient::builder()
2747 .interface(Ipv4Addr::LOCALHOST)
2748 .port(0)
2749 .apdu_timeout_ms(2000)
2750 .max_apdu_length(50)
2751 .build()
2752 .await
2753 .unwrap();
2754
2755 let huge_payload = vec![0u8; 257 * 44];
2756 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2757
2758 let result = client
2759 .confirmed_request(
2760 &fake_mac,
2761 ConfirmedServiceChoice::READ_PROPERTY,
2762 &huge_payload,
2763 )
2764 .await;
2765
2766 assert!(
2767 result.is_err(),
2768 "expected error for oversized payload, got success"
2769 );
2770
2771 client.stop().await.unwrap();
2772 }
2773
2774 #[test]
2775 fn seg_receiver_timeout_is_4s() {
2776 assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2777 }
2778}