1use std::collections::HashMap;
7use std::net::{Ipv4Addr, Ipv6Addr};
8use std::sync::Arc;
9use std::time::Instant;
10
11use bytes::{Bytes, BytesMut};
12use tokio::sync::{broadcast, mpsc, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::{timeout, Duration};
15use tracing::{debug, warn};
16
17use bacnet_encoding::apdu::{
18 self, encode_apdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu,
19 SimpleAck,
20};
21use bacnet_network::layer::NetworkLayer;
22use bacnet_services::cov::COVNotificationRequest;
23use bacnet_transport::bip::BipTransport;
24use bacnet_transport::bip6::Bip6Transport;
25use bacnet_transport::port::TransportPort;
26use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
27use bacnet_types::error::Error;
28use bacnet_types::MacAddr;
29
30use crate::discovery::{DeviceTable, DiscoveredDevice};
31use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
32use crate::tsm::{Tsm, TsmConfig, TsmResponse};
33
34#[derive(Debug, Clone)]
36pub struct ClientConfig {
37 pub interface: Ipv4Addr,
39 pub port: u16,
41 pub broadcast_address: Ipv4Addr,
43 pub apdu_timeout_ms: u64,
45 pub apdu_retries: u8,
47 pub max_apdu_length: u16,
49 pub max_segments: Option<u8>,
51 pub segmented_response_accepted: bool,
53 pub proposed_window_size: u8,
55}
56
57impl Default for ClientConfig {
58 fn default() -> Self {
59 Self {
60 interface: Ipv4Addr::UNSPECIFIED,
61 port: 0xBAC0,
62 broadcast_address: Ipv4Addr::BROADCAST,
63 apdu_timeout_ms: 6000,
64 apdu_retries: 3,
65 max_apdu_length: 1476,
66 max_segments: None,
67 segmented_response_accepted: true,
68 proposed_window_size: 1,
69 }
70 }
71}
72
73pub struct ClientBuilder<T: TransportPort> {
75 config: ClientConfig,
76 transport: Option<T>,
77}
78
79impl<T: TransportPort + 'static> ClientBuilder<T> {
80 pub fn transport(mut self, transport: T) -> Self {
82 self.transport = Some(transport);
83 self
84 }
85
86 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
88 self.config.apdu_timeout_ms = ms;
89 self
90 }
91
92 pub fn max_apdu_length(mut self, len: u16) -> Self {
94 self.config.max_apdu_length = len;
95 self
96 }
97
98 pub async fn build(self) -> Result<BACnetClient<T>, Error> {
100 let transport = self
101 .transport
102 .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
103 BACnetClient::start(self.config, transport).await
104 }
105}
106
107pub struct BipClientBuilder {
109 config: ClientConfig,
110}
111
112impl BipClientBuilder {
113 pub fn interface(mut self, ip: Ipv4Addr) -> Self {
115 self.config.interface = ip;
116 self
117 }
118
119 pub fn port(mut self, port: u16) -> Self {
121 self.config.port = port;
122 self
123 }
124
125 pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
127 self.config.broadcast_address = addr;
128 self
129 }
130
131 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
133 self.config.apdu_timeout_ms = ms;
134 self
135 }
136
137 pub fn max_apdu_length(mut self, len: u16) -> Self {
139 self.config.max_apdu_length = len;
140 self
141 }
142
143 pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
145 let transport = BipTransport::new(
146 self.config.interface,
147 self.config.port,
148 self.config.broadcast_address,
149 );
150 BACnetClient::start(self.config, transport).await
151 }
152}
153
154struct SegmentedReceiveState {
156 receiver: SegmentReceiver,
157 expected_next_seq: u8,
159 last_activity: Instant,
161}
162
163const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
165
166type SegKey = (MacAddr, u8);
168
169pub struct BACnetClient<T: TransportPort> {
171 config: ClientConfig,
172 network: Arc<NetworkLayer<T>>,
173 tsm: Arc<Mutex<Tsm>>,
174 device_table: Arc<Mutex<DeviceTable>>,
175 cov_tx: broadcast::Sender<COVNotificationRequest>,
176 dispatch_task: Option<JoinHandle<()>>,
177 seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
178 local_mac: MacAddr,
179}
180
181impl BACnetClient<BipTransport> {
182 pub fn bip_builder() -> BipClientBuilder {
184 BipClientBuilder {
185 config: ClientConfig::default(),
186 }
187 }
188
189 pub fn builder() -> BipClientBuilder {
191 Self::bip_builder()
192 }
193
194 pub async fn read_bdt(
196 &self,
197 target: &[u8],
198 ) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
199 self.network.transport().read_bdt(target).await
200 }
201
202 pub async fn write_bdt(
204 &self,
205 target: &[u8],
206 entries: &[bacnet_transport::bbmd::BdtEntry],
207 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
208 self.network.transport().write_bdt(target, entries).await
209 }
210
211 pub async fn read_fdt(
213 &self,
214 target: &[u8],
215 ) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
216 self.network.transport().read_fdt(target).await
217 }
218
219 pub async fn delete_fdt_entry(
221 &self,
222 target: &[u8],
223 ip: [u8; 4],
224 port: u16,
225 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
226 self.network
227 .transport()
228 .delete_fdt_entry(target, ip, port)
229 .await
230 }
231
232 pub async fn register_foreign_device_bvlc(
234 &self,
235 target: &[u8],
236 ttl: u16,
237 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
238 self.network
239 .transport()
240 .register_foreign_device(target, ttl)
241 .await
242 }
243}
244
245impl BACnetClient<Bip6Transport> {
246 pub fn bip6_builder() -> Bip6ClientBuilder {
248 Bip6ClientBuilder {
249 config: ClientConfig::default(),
250 interface: Ipv6Addr::UNSPECIFIED,
251 device_instance: None,
252 }
253 }
254}
255
256pub struct Bip6ClientBuilder {
258 config: ClientConfig,
259 interface: Ipv6Addr,
260 device_instance: Option<u32>,
261}
262
263impl Bip6ClientBuilder {
264 pub fn interface(mut self, ip: Ipv6Addr) -> Self {
266 self.interface = ip;
267 self
268 }
269
270 pub fn port(mut self, port: u16) -> Self {
272 self.config.port = port;
273 self
274 }
275
276 pub fn device_instance(mut self, instance: u32) -> Self {
278 self.device_instance = Some(instance);
279 self
280 }
281
282 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
284 self.config.apdu_timeout_ms = ms;
285 self
286 }
287
288 pub fn max_apdu_length(mut self, len: u16) -> Self {
290 self.config.max_apdu_length = len;
291 self
292 }
293
294 pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
296 let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
297 BACnetClient::start(self.config, transport).await
298 }
299}
300
301#[cfg(feature = "sc-tls")]
302impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
303 pub fn sc_builder() -> ScClientBuilder {
305 ScClientBuilder {
306 config: ClientConfig::default(),
307 hub_url: String::new(),
308 tls_config: None,
309 vmac: [0; 6],
310 heartbeat_interval_ms: 30_000,
311 heartbeat_timeout_ms: 60_000,
312 reconnect: None,
313 }
314 }
315}
316
317#[cfg(feature = "sc-tls")]
321pub struct ScClientBuilder {
322 config: ClientConfig,
323 hub_url: String,
324 tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
325 vmac: bacnet_transport::sc_frame::Vmac,
326 heartbeat_interval_ms: u64,
327 heartbeat_timeout_ms: u64,
328 reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
329}
330
331#[cfg(feature = "sc-tls")]
332impl ScClientBuilder {
333 pub fn hub_url(mut self, url: &str) -> Self {
335 self.hub_url = url.to_string();
336 self
337 }
338
339 pub fn tls_config(
341 mut self,
342 config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
343 ) -> Self {
344 self.tls_config = Some(config);
345 self
346 }
347
348 pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
350 self.vmac = vmac;
351 self
352 }
353
354 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
356 self.config.apdu_timeout_ms = ms;
357 self
358 }
359
360 pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
362 self.heartbeat_interval_ms = ms;
363 self
364 }
365
366 pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
368 self.heartbeat_timeout_ms = ms;
369 self
370 }
371
372 pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
374 self.reconnect = Some(config);
375 self
376 }
377
378 pub async fn build(
380 self,
381 ) -> Result<
382 BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
383 Error,
384 > {
385 let tls_config = self
386 .tls_config
387 .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
388
389 let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
390
391 let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
392 .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
393 .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
394 if let Some(rc) = self.reconnect {
395 #[allow(deprecated)]
396 {
397 transport = transport.with_reconnect(rc);
398 }
399 }
400
401 BACnetClient::start(self.config, transport).await
402 }
403}
404
405impl<T: TransportPort + 'static> BACnetClient<T> {
406 pub fn generic_builder() -> ClientBuilder<T> {
408 ClientBuilder {
409 config: ClientConfig::default(),
410 transport: None,
411 }
412 }
413
414 pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
416 let transport_max = transport.max_apdu_length();
418 config.max_apdu_length = config.max_apdu_length.min(transport_max);
419
420 let mut network = NetworkLayer::new(transport);
421 let mut apdu_rx = network.start().await?;
422 let local_mac = MacAddr::from_slice(network.local_mac());
423
424 let network = Arc::new(network);
425
426 let tsm_config = TsmConfig {
427 apdu_timeout_ms: config.apdu_timeout_ms,
428 apdu_retries: config.apdu_retries,
429 };
430 let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
431 let tsm_dispatch = Arc::clone(&tsm);
432 let device_table = Arc::new(Mutex::new(DeviceTable::new()));
433 let device_table_dispatch = Arc::clone(&device_table);
434 let network_dispatch = Arc::clone(&network);
435 let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
436 let cov_tx_dispatch = cov_tx.clone();
437 let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
438 Arc::new(Mutex::new(HashMap::new()));
439 let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
440
441 let dispatch_task = tokio::spawn(async move {
443 let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
445
446 while let Some(received) = apdu_rx.recv().await {
447 let now = Instant::now();
449 seg_state.retain(|_key, state| {
450 now.duration_since(state.last_activity) < SEG_RECEIVER_TIMEOUT
451 });
452
453 match apdu::decode_apdu(received.apdu.clone()) {
454 Ok(decoded) => {
455 Self::dispatch_apdu(
456 &tsm_dispatch,
457 &device_table_dispatch,
458 &network_dispatch,
459 &cov_tx_dispatch,
460 &mut seg_state,
461 &seg_ack_senders_dispatch,
462 &received.source_mac,
463 decoded,
464 )
465 .await;
466 }
467 Err(e) => {
468 warn!(error = %e, "Failed to decode received APDU");
469 }
470 }
471 }
472 });
473
474 Ok(Self {
475 config,
476 network,
477 tsm,
478 device_table,
479 cov_tx,
480 dispatch_task: Some(dispatch_task),
481 seg_ack_senders,
482 local_mac,
483 })
484 }
485
486 #[allow(clippy::too_many_arguments)]
488 async fn dispatch_apdu(
489 tsm: &Arc<Mutex<Tsm>>,
490 device_table: &Arc<Mutex<DeviceTable>>,
491 network: &Arc<NetworkLayer<T>>,
492 cov_tx: &broadcast::Sender<COVNotificationRequest>,
493 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
494 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
495 source_mac: &[u8],
496 apdu: Apdu,
497 ) {
498 match apdu {
499 Apdu::SimpleAck(ack) => {
500 debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
501 let mut tsm = tsm.lock().await;
502 tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
503 }
504 Apdu::ComplexAck(ack) => {
505 if ack.segmented {
506 Self::handle_segmented_complex_ack(tsm, network, seg_state, source_mac, ack)
507 .await;
508 } else {
509 debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
510 let mut tsm = tsm.lock().await;
511 tsm.complete_transaction(
512 source_mac,
513 ack.invoke_id,
514 TsmResponse::ComplexAck {
515 service_data: ack.service_ack,
516 },
517 );
518 }
519 }
520 Apdu::Error(err) => {
521 debug!(invoke_id = err.invoke_id, "Received Error PDU");
522 let mut tsm = tsm.lock().await;
523 tsm.complete_transaction(
524 source_mac,
525 err.invoke_id,
526 TsmResponse::Error {
527 class: err.error_class.to_raw() as u32,
528 code: err.error_code.to_raw() as u32,
529 },
530 );
531 }
532 Apdu::Reject(rej) => {
533 debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
534 let mut tsm = tsm.lock().await;
535 tsm.complete_transaction(
536 source_mac,
537 rej.invoke_id,
538 TsmResponse::Reject {
539 reason: rej.reject_reason.to_raw(),
540 },
541 );
542 }
543 Apdu::Abort(abt) => {
544 debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
545 let mut tsm = tsm.lock().await;
546 tsm.complete_transaction(
547 source_mac,
548 abt.invoke_id,
549 TsmResponse::Abort {
550 reason: abt.abort_reason.to_raw(),
551 },
552 );
553 }
554 Apdu::ConfirmedRequest(req) => {
555 if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
557 match COVNotificationRequest::decode(&req.service_request) {
558 Ok(notification) => {
559 debug!(
560 object = ?notification.monitored_object_identifier,
561 "Received ConfirmedCOVNotification"
562 );
563 let _ = cov_tx.send(notification);
564
565 let ack = Apdu::SimpleAck(SimpleAck {
567 invoke_id: req.invoke_id,
568 service_choice: req.service_choice,
569 });
570 let mut buf = BytesMut::with_capacity(4);
571 encode_apdu(&mut buf, &ack);
572 if let Err(e) = network
573 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
574 .await
575 {
576 warn!(error = %e, "Failed to send SimpleAck for COV notification");
577 }
578 }
579 Err(e) => {
580 warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
581 }
582 }
583 } else {
584 debug!(
585 service = req.service_choice.to_raw(),
586 "Ignoring ConfirmedRequest (client mode)"
587 );
588 }
589 }
590 Apdu::UnconfirmedRequest(req) => {
591 if req.service_choice == UnconfirmedServiceChoice::I_AM {
592 match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
593 Ok(i_am) => {
594 debug!(
595 device = i_am.object_identifier.instance_number(),
596 vendor = i_am.vendor_id,
597 "Received IAm"
598 );
599 let device = DiscoveredDevice {
600 object_identifier: i_am.object_identifier,
601 mac_address: MacAddr::from_slice(source_mac),
602 max_apdu_length: i_am.max_apdu_length,
603 segmentation_supported: i_am.segmentation_supported,
604 max_segments_accepted: None,
605 vendor_id: i_am.vendor_id,
606 last_seen: std::time::Instant::now(),
607 };
608 device_table.lock().await.upsert(device);
609 }
610 Err(e) => {
611 warn!(error = %e, "Failed to decode IAm");
612 }
613 }
614 } else if req.service_choice
615 == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
616 {
617 match COVNotificationRequest::decode(&req.service_request) {
618 Ok(notification) => {
619 debug!(
620 object = ?notification.monitored_object_identifier,
621 "Received UnconfirmedCOVNotification"
622 );
623 let _ = cov_tx.send(notification);
624 }
625 Err(e) => {
626 warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
627 }
628 }
629 } else {
630 debug!(
631 service = req.service_choice.to_raw(),
632 "Ignoring unconfirmed service in client dispatch"
633 );
634 }
635 }
636 Apdu::SegmentAck(sa) => {
637 let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
639 let senders = seg_ack_senders.lock().await;
640 if let Some(tx) = senders.get(&key) {
641 let _ = tx.try_send(sa);
642 } else {
643 debug!(
644 invoke_id = sa.invoke_id,
645 "Ignoring SegmentAck for unknown transaction"
646 );
647 }
648 }
649 }
650 }
651
652 async fn handle_segmented_complex_ack(
655 tsm: &Arc<Mutex<Tsm>>,
656 network: &Arc<NetworkLayer<T>>,
657 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
658 source_mac: &[u8],
659 ack: bacnet_encoding::apdu::ComplexAck,
660 ) {
661 let seq = ack.sequence_number.unwrap_or(0);
662 let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
663
664 debug!(
665 invoke_id = ack.invoke_id,
666 seq = seq,
667 more = ack.more_follows,
668 "Received segmented ComplexAck"
669 );
670
671 const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
673 if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
674 warn!(
675 invoke_id = ack.invoke_id,
676 sessions = seg_state.len(),
677 "Max concurrent segmented sessions reached, dropping segment"
678 );
679 return;
680 }
681
682 let state = seg_state
684 .entry(key.clone())
685 .or_insert_with(|| SegmentedReceiveState {
686 receiver: SegmentReceiver::new(),
687 expected_next_seq: 0,
688 last_activity: Instant::now(),
689 });
690
691 state.last_activity = Instant::now();
693
694 if seq != state.expected_next_seq {
698 warn!(
699 invoke_id = ack.invoke_id,
700 expected = state.expected_next_seq,
701 received = seq,
702 "Segment gap detected, sending negative SegmentAck"
703 );
704 let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
705 negative_ack: true,
706 sent_by_server: false,
707 invoke_id: ack.invoke_id,
708 sequence_number: state.expected_next_seq,
709 actual_window_size: ack.proposed_window_size.unwrap_or(1),
710 });
711 let mut buf = BytesMut::with_capacity(4);
712 encode_apdu(&mut buf, &neg_ack);
713 if let Err(e) = network
714 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
715 .await
716 {
717 warn!(error = %e, "Failed to send negative SegmentAck");
718 }
719 return;
720 }
721
722 if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
724 warn!(error = %e, "Rejecting oversized segment");
725 return;
726 }
727 state.expected_next_seq = seq.wrapping_add(1);
728
729 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
731 negative_ack: false,
732 sent_by_server: false,
733 invoke_id: ack.invoke_id,
734 sequence_number: seq,
735 actual_window_size: ack.proposed_window_size.unwrap_or(1),
736 });
737 let mut buf = BytesMut::with_capacity(4);
738 encode_apdu(&mut buf, &seg_ack);
739 if let Err(e) = network
740 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
741 .await
742 {
743 warn!(error = %e, "Failed to send SegmentAck");
744 }
745
746 if !ack.more_follows {
750 let state = seg_state.remove(&key).unwrap();
751 let total = state.receiver.received_count();
752 match state.receiver.reassemble(total) {
753 Ok(service_data) => {
754 debug!(
755 invoke_id = ack.invoke_id,
756 segments = total,
757 bytes = service_data.len(),
758 "Reassembled segmented ComplexAck"
759 );
760 let mut tsm = tsm.lock().await;
761 tsm.complete_transaction(
762 source_mac,
763 ack.invoke_id,
764 TsmResponse::ComplexAck {
765 service_data: Bytes::from(service_data),
766 },
767 );
768 }
769 Err(e) => {
770 warn!(error = %e, "Failed to reassemble segmented ComplexAck");
771 }
772 }
773 }
774 }
775
776 pub fn local_mac(&self) -> &[u8] {
778 &self.local_mac
779 }
780
781 pub async fn confirmed_request(
793 &self,
794 destination_mac: &[u8],
795 service_choice: ConfirmedServiceChoice,
796 service_data: &[u8],
797 ) -> Result<Bytes, Error> {
798 let unsegmented_apdu_size = 4 + service_data.len();
801 let (remote_max_apdu, remote_max_segments) = {
802 let dt = self.device_table.lock().await;
803 let device = dt.get_by_mac(destination_mac);
804 let max_apdu = device
805 .map(|d| d.max_apdu_length as u16)
806 .unwrap_or(self.config.max_apdu_length);
807 let max_seg = device.and_then(|d| d.max_segments_accepted);
808 (max_apdu, max_seg)
809 };
810 if unsegmented_apdu_size > remote_max_apdu as usize {
811 return self
812 .segmented_confirmed_request(
813 destination_mac,
814 service_choice,
815 service_data,
816 remote_max_apdu,
817 remote_max_segments,
818 )
819 .await;
820 }
821
822 let (invoke_id, rx) = {
824 let mut tsm = self.tsm.lock().await;
825 let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
826 Error::Encoding("all invoke IDs exhausted for destination".into())
827 })?;
828 let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
829 (invoke_id, rx)
830 };
831
832 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
834 segmented: false,
835 more_follows: false,
836 segmented_response_accepted: self.config.segmented_response_accepted,
837 max_segments: self.config.max_segments,
838 max_apdu_length: self.config.max_apdu_length,
839 invoke_id,
840 sequence_number: None,
841 proposed_window_size: None,
842 service_choice,
843 service_request: Bytes::copy_from_slice(service_data),
844 });
845
846 let mut buf = BytesMut::with_capacity(6 + service_data.len());
847 encode_apdu(&mut buf, &pdu);
848
849 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
853 let max_retries = self.config.apdu_retries;
854 let mut attempts: u8 = 0;
855 let mut rx = rx;
856
857 loop {
858 if let Err(e) = self
859 .network
860 .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
861 .await
862 {
863 let mut tsm = self.tsm.lock().await;
865 tsm.cancel_transaction(destination_mac, invoke_id);
866 return Err(e);
867 }
868
869 match timeout(timeout_duration, &mut rx).await {
870 Ok(Ok(response)) => {
871 return match response {
873 TsmResponse::SimpleAck => Ok(Bytes::new()),
874 TsmResponse::ComplexAck { service_data } => Ok(service_data),
875 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
876 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
877 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
878 };
879 }
880 Ok(Err(_)) => {
881 return Err(Error::Encoding("TSM response channel closed".into()));
883 }
884 Err(_timeout) => {
885 attempts += 1;
886 if attempts > max_retries {
887 let mut tsm = self.tsm.lock().await;
889 tsm.cancel_transaction(destination_mac, invoke_id);
890 return Err(Error::Timeout(timeout_duration));
891 }
892 debug!(
893 invoke_id,
894 attempt = attempts,
895 max_retries,
896 "APDU timeout, retrying confirmed request"
897 );
898 }
899 }
900 }
901 }
902
903 async fn segmented_confirmed_request(
908 &self,
909 destination_mac: &[u8],
910 service_choice: ConfirmedServiceChoice,
911 service_data: &[u8],
912 remote_max_apdu: u16,
913 remote_max_segments: Option<u32>,
914 ) -> Result<Bytes, Error> {
915 let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
916 let segments = split_payload(service_data, max_seg_size);
917 let total_segments = segments.len();
918
919 if total_segments > 255 {
920 return Err(Error::Segmentation(format!(
921 "payload requires {} segments, maximum is 255",
922 total_segments
923 )));
924 }
925
926 if let Some(max_seg) = remote_max_segments {
927 if total_segments > max_seg as usize {
928 return Err(Error::Segmentation(format!(
929 "request requires {} segments but remote accepts at most {}",
930 total_segments, max_seg
931 )));
932 }
933 }
934
935 debug!(
936 total_segments,
937 max_seg_size,
938 service_data_len = service_data.len(),
939 "Starting segmented confirmed request"
940 );
941
942 let (invoke_id, rx) = {
944 let mut tsm = self.tsm.lock().await;
945 let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
946 Error::Encoding("all invoke IDs exhausted for destination".into())
947 })?;
948 let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
949 (invoke_id, rx)
950 };
951
952 let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
954 {
955 let key = (MacAddr::from_slice(destination_mac), invoke_id);
956 self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
957 }
958
959 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
960 let max_ack_retries = self.config.apdu_retries;
961 let mut window_size = self.config.proposed_window_size.max(1) as usize;
962 let mut next_seq: usize = 0;
963 let mut neg_ack_retries: u32 = 0;
964 const MAX_NEG_ACK_RETRIES: u32 = 10;
965
966 let result = async {
968 while next_seq < total_segments {
969 let window_end = (next_seq + window_size).min(total_segments);
970
971 for (seq, segment_data) in segments[next_seq..window_end]
972 .iter()
973 .enumerate()
974 .map(|(i, s)| (next_seq + i, s))
975 {
976 let is_last = seq == total_segments - 1;
977 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
978 segmented: true,
979 more_follows: !is_last,
980 segmented_response_accepted: self.config.segmented_response_accepted,
981 max_segments: self.config.max_segments,
982 max_apdu_length: self.config.max_apdu_length,
983 invoke_id,
984 sequence_number: Some(seq as u8),
985 proposed_window_size: Some(self.config.proposed_window_size.max(1)),
986 service_choice,
987 service_request: segment_data.clone(),
988 });
989
990 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
991 encode_apdu(&mut buf, &pdu);
992
993 self.network
994 .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
995 .await?;
996
997 debug!(seq, is_last, "Sent segment");
998 }
999
1000 let ack = {
1002 let mut ack_retries: u8 = 0;
1003 loop {
1004 match timeout(timeout_duration, seg_ack_rx.recv()).await {
1005 Ok(Some(ack)) => break ack,
1006 Ok(None) => {
1007 return Err(Error::Encoding("SegmentAck channel closed".into()));
1008 }
1009 Err(_timeout) => {
1010 ack_retries += 1;
1011 if ack_retries > max_ack_retries {
1012 return Err(Error::Timeout(timeout_duration));
1013 }
1014 warn!(
1015 attempt = ack_retries,
1016 "Retransmitting segmented request window"
1017 );
1018 for (seq, segment_data) in segments[next_seq..window_end]
1020 .iter()
1021 .enumerate()
1022 .map(|(i, s)| (next_seq + i, s))
1023 {
1024 let is_last = seq == total_segments - 1;
1025 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1026 segmented: true,
1027 more_follows: !is_last,
1028 segmented_response_accepted: self
1029 .config
1030 .segmented_response_accepted,
1031 max_segments: self.config.max_segments,
1032 max_apdu_length: self.config.max_apdu_length,
1033 invoke_id,
1034 sequence_number: Some(seq as u8),
1035 proposed_window_size: Some(
1036 self.config.proposed_window_size.max(1),
1037 ),
1038 service_choice,
1039 service_request: segment_data.clone(),
1040 });
1041
1042 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1043 encode_apdu(&mut buf, &pdu);
1044
1045 self.network
1046 .send_apdu(
1047 &buf,
1048 destination_mac,
1049 true,
1050 NetworkPriority::NORMAL,
1051 )
1052 .await?;
1053 }
1054 }
1055 }
1056 }
1057 };
1058
1059 debug!(
1060 seq = ack.sequence_number,
1061 negative = ack.negative_ack,
1062 window = ack.actual_window_size,
1063 "Received SegmentAck"
1064 );
1065
1066 window_size = ack.actual_window_size.max(1) as usize;
1068
1069 if ack.negative_ack {
1070 neg_ack_retries += 1;
1071 if neg_ack_retries > MAX_NEG_ACK_RETRIES {
1072 return Err(Error::Segmentation(
1073 "too many negative SegmentAck retransmissions".into(),
1074 ));
1075 }
1076 next_seq = ack.sequence_number as usize;
1078 } else {
1079 neg_ack_retries = 0;
1080 next_seq = ack.sequence_number as usize + 1;
1082 }
1083 }
1084
1085 timeout(timeout_duration, rx)
1087 .await
1088 .map_err(|_| Error::Timeout(timeout_duration))?
1089 .map_err(|_| Error::Encoding("TSM response channel closed".into()))
1090 }
1091 .await;
1092
1093 {
1095 let key = (MacAddr::from_slice(destination_mac), invoke_id);
1096 self.seg_ack_senders.lock().await.remove(&key);
1097 }
1098
1099 let response = match result {
1101 Ok(response) => response,
1102 Err(e) => {
1103 let mut tsm = self.tsm.lock().await;
1104 tsm.cancel_transaction(destination_mac, invoke_id);
1105 return Err(e);
1106 }
1107 };
1108
1109 match response {
1110 TsmResponse::SimpleAck => Ok(Bytes::new()),
1111 TsmResponse::ComplexAck { service_data } => Ok(service_data),
1112 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1113 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1114 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1115 }
1116 }
1117
1118 pub async fn unconfirmed_request(
1120 &self,
1121 destination_mac: &[u8],
1122 service_choice: UnconfirmedServiceChoice,
1123 service_data: &[u8],
1124 ) -> Result<(), Error> {
1125 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1126 service_choice,
1127 service_request: Bytes::copy_from_slice(service_data),
1128 });
1129
1130 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1131 encode_apdu(&mut buf, &pdu);
1132
1133 self.network
1134 .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1135 .await
1136 }
1137
1138 pub async fn broadcast_unconfirmed(
1140 &self,
1141 service_choice: UnconfirmedServiceChoice,
1142 service_data: &[u8],
1143 ) -> Result<(), Error> {
1144 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1145 service_choice,
1146 service_request: Bytes::copy_from_slice(service_data),
1147 });
1148
1149 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1150 encode_apdu(&mut buf, &pdu);
1151
1152 self.network
1153 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1154 .await
1155 }
1156
1157 pub async fn broadcast_global_unconfirmed(
1162 &self,
1163 service_choice: UnconfirmedServiceChoice,
1164 service_data: &[u8],
1165 ) -> Result<(), Error> {
1166 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1167 service_choice,
1168 service_request: Bytes::copy_from_slice(service_data),
1169 });
1170
1171 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1172 encode_apdu(&mut buf, &pdu);
1173
1174 self.network
1175 .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1176 .await
1177 }
1178
1179 pub async fn broadcast_network_unconfirmed(
1181 &self,
1182 service_choice: UnconfirmedServiceChoice,
1183 service_data: &[u8],
1184 dest_network: u16,
1185 ) -> Result<(), Error> {
1186 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1187 service_choice,
1188 service_request: Bytes::copy_from_slice(service_data),
1189 });
1190
1191 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1192 encode_apdu(&mut buf, &pdu);
1193
1194 self.network
1195 .broadcast_to_network(&buf, dest_network, false, NetworkPriority::NORMAL)
1196 .await
1197 }
1198
1199 pub async fn read_property(
1205 &self,
1206 destination_mac: &[u8],
1207 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1208 property_identifier: bacnet_types::enums::PropertyIdentifier,
1209 property_array_index: Option<u32>,
1210 ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1211 use bacnet_services::read_property::ReadPropertyRequest;
1212
1213 let request = ReadPropertyRequest {
1214 object_identifier,
1215 property_identifier,
1216 property_array_index,
1217 };
1218 let mut buf = BytesMut::new();
1219 request.encode(&mut buf);
1220
1221 let response_data = self
1222 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1223 .await?;
1224
1225 bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1226 }
1227
1228 pub async fn write_property(
1230 &self,
1231 destination_mac: &[u8],
1232 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1233 property_identifier: bacnet_types::enums::PropertyIdentifier,
1234 property_array_index: Option<u32>,
1235 property_value: Vec<u8>,
1236 priority: Option<u8>,
1237 ) -> Result<(), Error> {
1238 use bacnet_services::write_property::WritePropertyRequest;
1239
1240 let request = WritePropertyRequest {
1241 object_identifier,
1242 property_identifier,
1243 property_array_index,
1244 property_value,
1245 priority,
1246 };
1247 let mut buf = BytesMut::new();
1248 request.encode(&mut buf);
1249
1250 let _ = self
1251 .confirmed_request(
1252 destination_mac,
1253 ConfirmedServiceChoice::WRITE_PROPERTY,
1254 &buf,
1255 )
1256 .await?;
1257
1258 Ok(())
1259 }
1260
1261 pub async fn read_property_multiple(
1263 &self,
1264 destination_mac: &[u8],
1265 specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1266 ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1267 use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1268
1269 let request = ReadPropertyMultipleRequest {
1270 list_of_read_access_specs: specs,
1271 };
1272 let mut buf = BytesMut::new();
1273 request.encode(&mut buf);
1274
1275 let response_data = self
1276 .confirmed_request(
1277 destination_mac,
1278 ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1279 &buf,
1280 )
1281 .await?;
1282
1283 ReadPropertyMultipleACK::decode(&response_data)
1284 }
1285
1286 pub async fn write_property_multiple(
1288 &self,
1289 destination_mac: &[u8],
1290 specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1291 ) -> Result<(), Error> {
1292 use bacnet_services::wpm::WritePropertyMultipleRequest;
1293
1294 let request = WritePropertyMultipleRequest {
1295 list_of_write_access_specs: specs,
1296 };
1297 let mut buf = BytesMut::new();
1298 request.encode(&mut buf);
1299
1300 let _ = self
1301 .confirmed_request(
1302 destination_mac,
1303 ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1304 &buf,
1305 )
1306 .await?;
1307
1308 Ok(())
1309 }
1310
1311 pub async fn who_is(
1313 &self,
1314 low_limit: Option<u32>,
1315 high_limit: Option<u32>,
1316 ) -> Result<(), Error> {
1317 use bacnet_services::who_is::WhoIsRequest;
1318
1319 let request = WhoIsRequest {
1320 low_limit,
1321 high_limit,
1322 };
1323 let mut buf = BytesMut::new();
1324 request.encode(&mut buf);
1325
1326 self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1327 .await
1328 }
1329
1330 pub async fn who_is_directed(
1332 &self,
1333 destination_mac: &[u8],
1334 low_limit: Option<u32>,
1335 high_limit: Option<u32>,
1336 ) -> Result<(), Error> {
1337 use bacnet_services::who_is::WhoIsRequest;
1338
1339 let request = WhoIsRequest {
1340 low_limit,
1341 high_limit,
1342 };
1343 let mut buf = BytesMut::new();
1344 request.encode(&mut buf);
1345
1346 self.unconfirmed_request(destination_mac, UnconfirmedServiceChoice::WHO_IS, &buf)
1347 .await
1348 }
1349
1350 pub async fn who_is_network(
1355 &self,
1356 dest_network: u16,
1357 low_limit: Option<u32>,
1358 high_limit: Option<u32>,
1359 ) -> Result<(), Error> {
1360 use bacnet_services::who_is::WhoIsRequest;
1361
1362 let request = WhoIsRequest {
1363 low_limit,
1364 high_limit,
1365 };
1366 let mut buf = BytesMut::new();
1367 request.encode(&mut buf);
1368
1369 self.broadcast_network_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf, dest_network)
1370 .await
1371 }
1372
1373 pub async fn who_has(
1375 &self,
1376 object: bacnet_services::who_has::WhoHasObject,
1377 low_limit: Option<u32>,
1378 high_limit: Option<u32>,
1379 ) -> Result<(), Error> {
1380 use bacnet_services::who_has::WhoHasRequest;
1381
1382 let request = WhoHasRequest {
1383 low_limit,
1384 high_limit,
1385 object,
1386 };
1387 let mut buf = BytesMut::new();
1388 request.encode(&mut buf)?;
1389
1390 self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1391 .await
1392 }
1393
1394 pub async fn subscribe_cov(
1396 &self,
1397 destination_mac: &[u8],
1398 subscriber_process_identifier: u32,
1399 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1400 confirmed: bool,
1401 lifetime: Option<u32>,
1402 ) -> Result<(), Error> {
1403 use bacnet_services::cov::SubscribeCOVRequest;
1404
1405 let request = SubscribeCOVRequest {
1406 subscriber_process_identifier,
1407 monitored_object_identifier,
1408 issue_confirmed_notifications: Some(confirmed),
1409 lifetime,
1410 };
1411 let mut buf = BytesMut::new();
1412 request.encode(&mut buf);
1413
1414 let _ = self
1415 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1416 .await?;
1417
1418 Ok(())
1419 }
1420
1421 pub async fn unsubscribe_cov(
1423 &self,
1424 destination_mac: &[u8],
1425 subscriber_process_identifier: u32,
1426 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1427 ) -> Result<(), Error> {
1428 use bacnet_services::cov::SubscribeCOVRequest;
1429
1430 let request = SubscribeCOVRequest {
1431 subscriber_process_identifier,
1432 monitored_object_identifier,
1433 issue_confirmed_notifications: None,
1434 lifetime: None,
1435 };
1436 let mut buf = BytesMut::new();
1437 request.encode(&mut buf);
1438
1439 let _ = self
1440 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1441 .await?;
1442
1443 Ok(())
1444 }
1445
1446 pub async fn delete_object(
1448 &self,
1449 destination_mac: &[u8],
1450 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1451 ) -> Result<(), Error> {
1452 use bacnet_services::object_mgmt::DeleteObjectRequest;
1453
1454 let request = DeleteObjectRequest { object_identifier };
1455 let mut buf = BytesMut::new();
1456 request.encode(&mut buf);
1457
1458 let _ = self
1459 .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1460 .await?;
1461
1462 Ok(())
1463 }
1464
1465 pub async fn create_object(
1467 &self,
1468 destination_mac: &[u8],
1469 object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
1470 initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
1471 ) -> Result<Bytes, Error> {
1472 use bacnet_services::object_mgmt::CreateObjectRequest;
1473
1474 let request = CreateObjectRequest {
1475 object_specifier,
1476 list_of_initial_values: initial_values,
1477 };
1478 let mut buf = BytesMut::new();
1479 request.encode(&mut buf);
1480
1481 self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
1482 .await
1483 }
1484
1485 pub async fn device_communication_control(
1487 &self,
1488 destination_mac: &[u8],
1489 enable_disable: bacnet_types::enums::EnableDisable,
1490 time_duration: Option<u16>,
1491 password: Option<String>,
1492 ) -> Result<(), Error> {
1493 use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
1494
1495 let request = DeviceCommunicationControlRequest {
1496 time_duration,
1497 enable_disable,
1498 password,
1499 };
1500 let mut buf = BytesMut::new();
1501 request.encode(&mut buf)?;
1502
1503 let _ = self
1504 .confirmed_request(
1505 destination_mac,
1506 ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
1507 &buf,
1508 )
1509 .await?;
1510
1511 Ok(())
1512 }
1513
1514 pub async fn reinitialize_device(
1516 &self,
1517 destination_mac: &[u8],
1518 reinitialized_state: bacnet_types::enums::ReinitializedState,
1519 password: Option<String>,
1520 ) -> Result<(), Error> {
1521 use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
1522
1523 let request = ReinitializeDeviceRequest {
1524 reinitialized_state,
1525 password,
1526 };
1527 let mut buf = BytesMut::new();
1528 request.encode(&mut buf)?;
1529
1530 let _ = self
1531 .confirmed_request(
1532 destination_mac,
1533 ConfirmedServiceChoice::REINITIALIZE_DEVICE,
1534 &buf,
1535 )
1536 .await?;
1537
1538 Ok(())
1539 }
1540
1541 pub async fn get_event_information(
1543 &self,
1544 destination_mac: &[u8],
1545 last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
1546 ) -> Result<Bytes, Error> {
1547 use bacnet_services::alarm_event::GetEventInformationRequest;
1548
1549 let request = GetEventInformationRequest {
1550 last_received_object_identifier,
1551 };
1552 let mut buf = BytesMut::new();
1553 request.encode(&mut buf);
1554
1555 self.confirmed_request(
1556 destination_mac,
1557 ConfirmedServiceChoice::GET_EVENT_INFORMATION,
1558 &buf,
1559 )
1560 .await
1561 }
1562
1563 pub async fn acknowledge_alarm(
1565 &self,
1566 destination_mac: &[u8],
1567 acknowledging_process_identifier: u32,
1568 event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1569 event_state_acknowledged: u32,
1570 acknowledgment_source: &str,
1571 ) -> Result<(), Error> {
1572 use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
1573
1574 let request = AcknowledgeAlarmRequest {
1575 acknowledging_process_identifier,
1576 event_object_identifier,
1577 event_state_acknowledged,
1578 timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
1579 acknowledgment_source: acknowledgment_source.to_string(),
1580 };
1581 let mut buf = BytesMut::new();
1582 request.encode(&mut buf)?;
1583
1584 let _ = self
1585 .confirmed_request(
1586 destination_mac,
1587 ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
1588 &buf,
1589 )
1590 .await?;
1591
1592 Ok(())
1593 }
1594
1595 pub async fn read_range(
1597 &self,
1598 destination_mac: &[u8],
1599 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1600 property_identifier: bacnet_types::enums::PropertyIdentifier,
1601 property_array_index: Option<u32>,
1602 range: Option<bacnet_services::read_range::RangeSpec>,
1603 ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
1604 use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
1605
1606 let request = ReadRangeRequest {
1607 object_identifier,
1608 property_identifier,
1609 property_array_index,
1610 range,
1611 };
1612 let mut buf = BytesMut::new();
1613 request.encode(&mut buf);
1614
1615 let response_data = self
1616 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
1617 .await?;
1618
1619 ReadRangeAck::decode(&response_data)
1620 }
1621
1622 pub async fn atomic_read_file(
1624 &self,
1625 destination_mac: &[u8],
1626 file_identifier: bacnet_types::primitives::ObjectIdentifier,
1627 access: bacnet_services::file::FileAccessMethod,
1628 ) -> Result<Bytes, Error> {
1629 use bacnet_services::file::AtomicReadFileRequest;
1630
1631 let request = AtomicReadFileRequest {
1632 file_identifier,
1633 access,
1634 };
1635 let mut buf = BytesMut::new();
1636 request.encode(&mut buf);
1637
1638 self.confirmed_request(
1639 destination_mac,
1640 ConfirmedServiceChoice::ATOMIC_READ_FILE,
1641 &buf,
1642 )
1643 .await
1644 }
1645
1646 pub async fn atomic_write_file(
1648 &self,
1649 destination_mac: &[u8],
1650 file_identifier: bacnet_types::primitives::ObjectIdentifier,
1651 access: bacnet_services::file::FileWriteAccessMethod,
1652 ) -> Result<Bytes, Error> {
1653 use bacnet_services::file::AtomicWriteFileRequest;
1654
1655 let request = AtomicWriteFileRequest {
1656 file_identifier,
1657 access,
1658 };
1659 let mut buf = BytesMut::new();
1660 request.encode(&mut buf);
1661
1662 self.confirmed_request(
1663 destination_mac,
1664 ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
1665 &buf,
1666 )
1667 .await
1668 }
1669
1670 pub async fn add_list_element(
1672 &self,
1673 destination_mac: &[u8],
1674 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1675 property_identifier: bacnet_types::enums::PropertyIdentifier,
1676 property_array_index: Option<u32>,
1677 list_of_elements: Vec<u8>,
1678 ) -> Result<(), Error> {
1679 use bacnet_services::list_manipulation::ListElementRequest;
1680
1681 let request = ListElementRequest {
1682 object_identifier,
1683 property_identifier,
1684 property_array_index,
1685 list_of_elements,
1686 };
1687 let mut buf = BytesMut::new();
1688 request.encode(&mut buf);
1689
1690 let _ = self
1691 .confirmed_request(
1692 destination_mac,
1693 ConfirmedServiceChoice::ADD_LIST_ELEMENT,
1694 &buf,
1695 )
1696 .await?;
1697
1698 Ok(())
1699 }
1700
1701 pub async fn remove_list_element(
1703 &self,
1704 destination_mac: &[u8],
1705 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1706 property_identifier: bacnet_types::enums::PropertyIdentifier,
1707 property_array_index: Option<u32>,
1708 list_of_elements: Vec<u8>,
1709 ) -> Result<(), Error> {
1710 use bacnet_services::list_manipulation::ListElementRequest;
1711
1712 let request = ListElementRequest {
1713 object_identifier,
1714 property_identifier,
1715 property_array_index,
1716 list_of_elements,
1717 };
1718 let mut buf = BytesMut::new();
1719 request.encode(&mut buf);
1720
1721 let _ = self
1722 .confirmed_request(
1723 destination_mac,
1724 ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
1725 &buf,
1726 )
1727 .await?;
1728
1729 Ok(())
1730 }
1731
1732 pub async fn time_synchronization(
1736 &self,
1737 destination_mac: &[u8],
1738 date: bacnet_types::primitives::Date,
1739 time: bacnet_types::primitives::Time,
1740 ) -> Result<(), Error> {
1741 use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1742
1743 let request = TimeSynchronizationRequest { date, time };
1744 let mut buf = BytesMut::new();
1745 request.encode(&mut buf);
1746
1747 self.unconfirmed_request(
1748 destination_mac,
1749 UnconfirmedServiceChoice::TIME_SYNCHRONIZATION,
1750 &buf,
1751 )
1752 .await
1753 }
1754
1755 pub async fn utc_time_synchronization(
1759 &self,
1760 destination_mac: &[u8],
1761 date: bacnet_types::primitives::Date,
1762 time: bacnet_types::primitives::Time,
1763 ) -> Result<(), Error> {
1764 use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1765
1766 let request = TimeSynchronizationRequest { date, time };
1767 let mut buf = BytesMut::new();
1768 request.encode(&mut buf);
1769
1770 self.unconfirmed_request(
1771 destination_mac,
1772 UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1773 &buf,
1774 )
1775 .await
1776 }
1777
1778 pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
1783 self.cov_tx.subscribe()
1784 }
1785
1786 pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
1792 self.device_table.lock().await.all()
1793 }
1794
1795 pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
1797 self.device_table.lock().await.get(instance).cloned()
1798 }
1799
1800 pub async fn clear_devices(&self) {
1802 self.device_table.lock().await.clear();
1803 }
1804
1805 pub async fn stop(&mut self) -> Result<(), Error> {
1807 if let Some(task) = self.dispatch_task.take() {
1808 task.abort();
1809 let _ = task.await;
1810 }
1811 Ok(())
1813 }
1814}
1815
1816#[cfg(test)]
1817mod tests {
1818 use super::*;
1819 use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
1820 use std::net::Ipv4Addr;
1821 use tokio::time::Duration;
1822
1823 async fn make_client() -> BACnetClient<BipTransport> {
1825 BACnetClient::builder()
1826 .interface(Ipv4Addr::LOCALHOST)
1827 .port(0)
1828 .apdu_timeout_ms(2000)
1829 .build()
1830 .await
1831 .unwrap()
1832 }
1833
1834 #[tokio::test]
1835 async fn client_start_stop() {
1836 let mut client = make_client().await;
1837 assert!(!client.local_mac().is_empty());
1838 client.stop().await.unwrap();
1839 }
1840
1841 #[tokio::test]
1842 async fn confirmed_request_simple_ack() {
1843 let mut client_a = make_client().await;
1844
1845 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1847 let mut net_b = NetworkLayer::new(transport_b);
1848 let mut rx_b = net_b.start().await.unwrap();
1849 let b_mac = net_b.local_mac().to_vec();
1850
1851 let b_handle = tokio::spawn(async move {
1853 let received = timeout(Duration::from_secs(2), rx_b.recv())
1854 .await
1855 .expect("B timed out")
1856 .expect("B channel closed");
1857
1858 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1859 if let Apdu::ConfirmedRequest(req) = decoded {
1860 let ack = Apdu::SimpleAck(SimpleAck {
1861 invoke_id: req.invoke_id,
1862 service_choice: req.service_choice,
1863 });
1864 let mut buf = BytesMut::new();
1865 encode_apdu(&mut buf, &ack);
1866 net_b
1867 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1868 .await
1869 .unwrap();
1870 }
1871 net_b.stop().await.unwrap();
1872 });
1873
1874 let result = client_a
1875 .confirmed_request(
1876 &b_mac,
1877 ConfirmedServiceChoice::WRITE_PROPERTY,
1878 &[0x01, 0x02],
1879 )
1880 .await;
1881
1882 assert!(result.is_ok());
1883 let response = result.unwrap();
1884 assert!(response.is_empty()); b_handle.await.unwrap();
1887 client_a.stop().await.unwrap();
1888 }
1889
1890 #[tokio::test]
1891 async fn confirmed_request_complex_ack() {
1892 let mut client_a = make_client().await;
1893
1894 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1895 let mut net_b = NetworkLayer::new(transport_b);
1896 let mut rx_b = net_b.start().await.unwrap();
1897 let b_mac = net_b.local_mac().to_vec();
1898
1899 let b_handle = tokio::spawn(async move {
1900 let received = timeout(Duration::from_secs(2), rx_b.recv())
1901 .await
1902 .unwrap()
1903 .unwrap();
1904
1905 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1906 if let Apdu::ConfirmedRequest(req) = decoded {
1907 let ack = Apdu::ComplexAck(ComplexAck {
1908 segmented: false,
1909 more_follows: false,
1910 invoke_id: req.invoke_id,
1911 sequence_number: None,
1912 proposed_window_size: None,
1913 service_choice: req.service_choice,
1914 service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
1915 });
1916 let mut buf = BytesMut::new();
1917 encode_apdu(&mut buf, &ack);
1918 net_b
1919 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1920 .await
1921 .unwrap();
1922 }
1923 net_b.stop().await.unwrap();
1924 });
1925
1926 let result = client_a
1927 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1928 .await;
1929
1930 assert!(result.is_ok());
1931 assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
1932
1933 b_handle.await.unwrap();
1934 client_a.stop().await.unwrap();
1935 }
1936
1937 #[tokio::test]
1938 async fn confirmed_request_timeout() {
1939 let mut client = make_client().await;
1940 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
1942 let result = client
1943 .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1944 .await;
1945 assert!(result.is_err());
1946 client.stop().await.unwrap();
1947 }
1948
1949 #[tokio::test]
1950 async fn segmented_complex_ack_reassembly() {
1951 let mut client = make_client().await;
1952
1953 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1954 let mut net_b = NetworkLayer::new(transport_b);
1955 let mut rx_b = net_b.start().await.unwrap();
1956 let b_mac = net_b.local_mac().to_vec();
1957
1958 let b_handle = tokio::spawn(async move {
1961 let received = timeout(Duration::from_secs(2), rx_b.recv())
1963 .await
1964 .unwrap()
1965 .unwrap();
1966
1967 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1968 let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
1969 req.invoke_id
1970 } else {
1971 panic!("Expected ConfirmedRequest");
1972 };
1973
1974 let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
1975 let segments: Vec<Bytes> = vec![
1976 Bytes::from_static(&[0x01, 0x02, 0x03]),
1977 Bytes::from_static(&[0x04, 0x05, 0x06]),
1978 Bytes::from_static(&[0x07, 0x08]),
1979 ];
1980
1981 for (i, seg) in segments.iter().enumerate() {
1982 let is_last = i == segments.len() - 1;
1983 let ack = Apdu::ComplexAck(ComplexAck {
1984 segmented: true,
1985 more_follows: !is_last,
1986 invoke_id,
1987 sequence_number: Some(i as u8),
1988 proposed_window_size: Some(1),
1989 service_choice,
1990 service_ack: seg.clone(),
1991 });
1992 let mut buf = BytesMut::new();
1993 encode_apdu(&mut buf, &ack);
1994 net_b
1995 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1996 .await
1997 .unwrap();
1998
1999 let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
2001 .await
2002 .unwrap()
2003 .unwrap();
2004 let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
2005 if let Apdu::SegmentAck(sa) = decoded {
2006 assert_eq!(sa.invoke_id, invoke_id);
2007 assert_eq!(sa.sequence_number, i as u8);
2008 } else {
2009 panic!("Expected SegmentAck, got {:?}", decoded);
2010 }
2011 }
2012
2013 net_b.stop().await.unwrap();
2014 });
2015
2016 let result = client
2018 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2019 .await;
2020
2021 assert!(result.is_ok());
2022 assert_eq!(
2023 result.unwrap(),
2024 vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
2025 );
2026
2027 b_handle.await.unwrap();
2028 client.stop().await.unwrap();
2029 }
2030
2031 #[tokio::test]
2032 async fn segmented_confirmed_request_sends_segments() {
2033 let mut client = BACnetClient::builder()
2036 .interface(Ipv4Addr::LOCALHOST)
2037 .port(0)
2038 .apdu_timeout_ms(5000)
2039 .max_apdu_length(50)
2040 .build()
2041 .await
2042 .unwrap();
2043
2044 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2045 let mut net_b = NetworkLayer::new(transport_b);
2046 let mut rx_b = net_b.start().await.unwrap();
2047 let b_mac = net_b.local_mac().to_vec();
2048
2049 let service_data: Vec<u8> = (0u8..100).collect();
2051 let expected_data = service_data.clone();
2052
2053 let b_handle = tokio::spawn(async move {
2054 let mut all_service_data = Vec::new();
2055 let mut client_mac;
2056 let mut invoke_id;
2057
2058 loop {
2059 let received = timeout(Duration::from_secs(3), rx_b.recv())
2060 .await
2061 .expect("server timed out waiting for segment")
2062 .expect("server channel closed");
2063
2064 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2065 if let Apdu::ConfirmedRequest(req) = decoded {
2066 assert!(req.segmented, "expected segmented request");
2067 invoke_id = req.invoke_id;
2068 client_mac = received.source_mac.clone();
2069 let seq = req.sequence_number.unwrap();
2070 all_service_data.extend_from_slice(&req.service_request);
2071
2072 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2074 negative_ack: false,
2075 sent_by_server: true,
2076 invoke_id,
2077 sequence_number: seq,
2078 actual_window_size: 1,
2079 });
2080 let mut buf = BytesMut::new();
2081 encode_apdu(&mut buf, &seg_ack);
2082 net_b
2083 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2084 .await
2085 .unwrap();
2086
2087 if !req.more_follows {
2088 break;
2089 }
2090 } else {
2091 panic!("Expected ConfirmedRequest, got {:?}", decoded);
2092 }
2093 }
2094
2095 let ack = Apdu::SimpleAck(SimpleAck {
2097 invoke_id,
2098 service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
2099 });
2100 let mut buf = BytesMut::new();
2101 encode_apdu(&mut buf, &ack);
2102 net_b
2103 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2104 .await
2105 .unwrap();
2106
2107 net_b.stop().await.unwrap();
2108 all_service_data
2109 });
2110
2111 let result = client
2112 .confirmed_request(
2113 &b_mac,
2114 ConfirmedServiceChoice::WRITE_PROPERTY,
2115 &service_data,
2116 )
2117 .await;
2118
2119 assert!(result.is_ok());
2120 assert!(result.unwrap().is_empty()); let received_data = b_handle.await.unwrap();
2124 assert_eq!(received_data, expected_data);
2125
2126 client.stop().await.unwrap();
2127 }
2128
2129 #[tokio::test]
2130 async fn segmented_request_with_complex_ack_response() {
2131 let mut client = BACnetClient::builder()
2132 .interface(Ipv4Addr::LOCALHOST)
2133 .port(0)
2134 .apdu_timeout_ms(5000)
2135 .max_apdu_length(50)
2136 .build()
2137 .await
2138 .unwrap();
2139
2140 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2141 let mut net_b = NetworkLayer::new(transport_b);
2142 let mut rx_b = net_b.start().await.unwrap();
2143 let b_mac = net_b.local_mac().to_vec();
2144
2145 let service_data: Vec<u8> = (0u8..60).collect();
2147
2148 let b_handle = tokio::spawn(async move {
2149 let mut client_mac;
2150 let mut invoke_id;
2151
2152 loop {
2153 let received = timeout(Duration::from_secs(3), rx_b.recv())
2154 .await
2155 .unwrap()
2156 .unwrap();
2157
2158 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2159 if let Apdu::ConfirmedRequest(req) = decoded {
2160 invoke_id = req.invoke_id;
2161 client_mac = received.source_mac.clone();
2162 let seq = req.sequence_number.unwrap();
2163
2164 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2165 negative_ack: false,
2166 sent_by_server: true,
2167 invoke_id,
2168 sequence_number: seq,
2169 actual_window_size: 1,
2170 });
2171 let mut buf = BytesMut::new();
2172 encode_apdu(&mut buf, &seg_ack);
2173 net_b
2174 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2175 .await
2176 .unwrap();
2177
2178 if !req.more_follows {
2179 break;
2180 }
2181 }
2182 }
2183
2184 let ack = Apdu::ComplexAck(ComplexAck {
2186 segmented: false,
2187 more_follows: false,
2188 invoke_id,
2189 sequence_number: None,
2190 proposed_window_size: None,
2191 service_choice: ConfirmedServiceChoice::READ_PROPERTY,
2192 service_ack: Bytes::from_static(&[0xCA, 0xFE]),
2193 });
2194 let mut buf = BytesMut::new();
2195 encode_apdu(&mut buf, &ack);
2196 net_b
2197 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2198 .await
2199 .unwrap();
2200
2201 net_b.stop().await.unwrap();
2202 });
2203
2204 let result = client
2205 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
2206 .await;
2207
2208 assert!(result.is_ok());
2209 assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
2210
2211 b_handle.await.unwrap();
2212 client.stop().await.unwrap();
2213 }
2214
2215 #[tokio::test]
2216 async fn segment_overflow_guard() {
2217 let mut client = BACnetClient::builder()
2219 .interface(Ipv4Addr::LOCALHOST)
2220 .port(0)
2221 .apdu_timeout_ms(2000)
2222 .max_apdu_length(50)
2223 .build()
2224 .await
2225 .unwrap();
2226
2227 let huge_payload = vec![0u8; 256 * 44];
2230
2231 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2234
2235 let result = client
2236 .confirmed_request(
2237 &fake_mac,
2238 ConfirmedServiceChoice::READ_PROPERTY,
2239 &huge_payload,
2240 )
2241 .await;
2242
2243 assert!(result.is_err());
2244 let err_msg = result.unwrap_err().to_string();
2245 assert!(
2246 err_msg.contains("256 segments"),
2247 "expected segment overflow error, got: {}",
2248 err_msg
2249 );
2250
2251 client.stop().await.unwrap();
2252 }
2253
2254 #[test]
2255 fn seg_receiver_timeout_is_4s() {
2256 assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2257 }
2258}