1use std::collections::HashMap;
7use std::net::{Ipv4Addr, Ipv6Addr};
8use std::sync::Arc;
9use std::time::Instant;
10
11use bytes::{Bytes, BytesMut};
12use tokio::sync::{broadcast, mpsc, Mutex};
13use tokio::task::JoinHandle;
14use tokio::time::{timeout, Duration};
15use tracing::{debug, warn};
16
17use bacnet_encoding::apdu::{
18 self, encode_apdu, Apdu, ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu,
19 SimpleAck,
20};
21use bacnet_network::layer::NetworkLayer;
22use bacnet_services::cov::COVNotificationRequest;
23use bacnet_transport::bip::BipTransport;
24use bacnet_transport::bip6::Bip6Transport;
25use bacnet_transport::port::TransportPort;
26use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
27use bacnet_types::error::Error;
28use bacnet_types::MacAddr;
29
30use crate::discovery::{DeviceTable, DiscoveredDevice};
31use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
32use crate::tsm::{Tsm, TsmConfig, TsmResponse};
33
34#[derive(Debug, Clone)]
36pub struct ClientConfig {
37 pub interface: Ipv4Addr,
39 pub port: u16,
41 pub broadcast_address: Ipv4Addr,
43 pub apdu_timeout_ms: u64,
45 pub apdu_retries: u8,
47 pub max_apdu_length: u16,
49 pub max_segments: Option<u8>,
51 pub segmented_response_accepted: bool,
53 pub proposed_window_size: u8,
55}
56
57impl Default for ClientConfig {
58 fn default() -> Self {
59 Self {
60 interface: Ipv4Addr::UNSPECIFIED,
61 port: 0xBAC0,
62 broadcast_address: Ipv4Addr::BROADCAST,
63 apdu_timeout_ms: 6000,
64 apdu_retries: 3,
65 max_apdu_length: 1476,
66 max_segments: None,
67 segmented_response_accepted: true,
68 proposed_window_size: 1,
69 }
70 }
71}
72
73pub struct ClientBuilder<T: TransportPort> {
75 config: ClientConfig,
76 transport: Option<T>,
77}
78
79impl<T: TransportPort + 'static> ClientBuilder<T> {
80 pub fn transport(mut self, transport: T) -> Self {
82 self.transport = Some(transport);
83 self
84 }
85
86 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
88 self.config.apdu_timeout_ms = ms;
89 self
90 }
91
92 pub fn max_apdu_length(mut self, len: u16) -> Self {
94 self.config.max_apdu_length = len;
95 self
96 }
97
98 pub async fn build(self) -> Result<BACnetClient<T>, Error> {
100 let transport = self
101 .transport
102 .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
103 BACnetClient::start(self.config, transport).await
104 }
105}
106
107pub struct BipClientBuilder {
109 config: ClientConfig,
110}
111
112impl BipClientBuilder {
113 pub fn interface(mut self, ip: Ipv4Addr) -> Self {
115 self.config.interface = ip;
116 self
117 }
118
119 pub fn port(mut self, port: u16) -> Self {
121 self.config.port = port;
122 self
123 }
124
125 pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
127 self.config.broadcast_address = addr;
128 self
129 }
130
131 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
133 self.config.apdu_timeout_ms = ms;
134 self
135 }
136
137 pub fn max_apdu_length(mut self, len: u16) -> Self {
139 self.config.max_apdu_length = len;
140 self
141 }
142
143 pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
145 let transport = BipTransport::new(
146 self.config.interface,
147 self.config.port,
148 self.config.broadcast_address,
149 );
150 BACnetClient::start(self.config, transport).await
151 }
152}
153
154struct SegmentedReceiveState {
156 receiver: SegmentReceiver,
157 expected_next_seq: u8,
159 last_activity: Instant,
161}
162
163const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
165
166type SegKey = (MacAddr, u8);
168
169pub struct BACnetClient<T: TransportPort> {
171 config: ClientConfig,
172 network: Arc<NetworkLayer<T>>,
173 tsm: Arc<Mutex<Tsm>>,
174 device_table: Arc<Mutex<DeviceTable>>,
175 cov_tx: broadcast::Sender<COVNotificationRequest>,
176 dispatch_task: Option<JoinHandle<()>>,
177 seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
178 local_mac: MacAddr,
179}
180
181impl BACnetClient<BipTransport> {
182 pub fn bip_builder() -> BipClientBuilder {
184 BipClientBuilder {
185 config: ClientConfig::default(),
186 }
187 }
188
189 pub fn builder() -> BipClientBuilder {
191 Self::bip_builder()
192 }
193
194 pub async fn read_bdt(
196 &self,
197 target: &[u8],
198 ) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
199 self.network.transport().read_bdt(target).await
200 }
201
202 pub async fn write_bdt(
204 &self,
205 target: &[u8],
206 entries: &[bacnet_transport::bbmd::BdtEntry],
207 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
208 self.network.transport().write_bdt(target, entries).await
209 }
210
211 pub async fn read_fdt(
213 &self,
214 target: &[u8],
215 ) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
216 self.network.transport().read_fdt(target).await
217 }
218
219 pub async fn delete_fdt_entry(
221 &self,
222 target: &[u8],
223 ip: [u8; 4],
224 port: u16,
225 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
226 self.network
227 .transport()
228 .delete_fdt_entry(target, ip, port)
229 .await
230 }
231
232 pub async fn register_foreign_device_bvlc(
234 &self,
235 target: &[u8],
236 ttl: u16,
237 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
238 self.network
239 .transport()
240 .register_foreign_device(target, ttl)
241 .await
242 }
243}
244
245impl BACnetClient<Bip6Transport> {
246 pub fn bip6_builder() -> Bip6ClientBuilder {
248 Bip6ClientBuilder {
249 config: ClientConfig::default(),
250 interface: Ipv6Addr::UNSPECIFIED,
251 device_instance: None,
252 }
253 }
254}
255
256pub struct Bip6ClientBuilder {
258 config: ClientConfig,
259 interface: Ipv6Addr,
260 device_instance: Option<u32>,
261}
262
263impl Bip6ClientBuilder {
264 pub fn interface(mut self, ip: Ipv6Addr) -> Self {
266 self.interface = ip;
267 self
268 }
269
270 pub fn port(mut self, port: u16) -> Self {
272 self.config.port = port;
273 self
274 }
275
276 pub fn device_instance(mut self, instance: u32) -> Self {
278 self.device_instance = Some(instance);
279 self
280 }
281
282 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
284 self.config.apdu_timeout_ms = ms;
285 self
286 }
287
288 pub fn max_apdu_length(mut self, len: u16) -> Self {
290 self.config.max_apdu_length = len;
291 self
292 }
293
294 pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
296 let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
297 BACnetClient::start(self.config, transport).await
298 }
299}
300
301#[cfg(feature = "sc-tls")]
302impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
303 pub fn sc_builder() -> ScClientBuilder {
305 ScClientBuilder {
306 config: ClientConfig::default(),
307 hub_url: String::new(),
308 tls_config: None,
309 vmac: [0; 6],
310 heartbeat_interval_ms: 30_000,
311 heartbeat_timeout_ms: 60_000,
312 reconnect: None,
313 }
314 }
315}
316
317#[cfg(feature = "sc-tls")]
321pub struct ScClientBuilder {
322 config: ClientConfig,
323 hub_url: String,
324 tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
325 vmac: bacnet_transport::sc_frame::Vmac,
326 heartbeat_interval_ms: u64,
327 heartbeat_timeout_ms: u64,
328 reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
329}
330
331#[cfg(feature = "sc-tls")]
332impl ScClientBuilder {
333 pub fn hub_url(mut self, url: &str) -> Self {
335 self.hub_url = url.to_string();
336 self
337 }
338
339 pub fn tls_config(
341 mut self,
342 config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
343 ) -> Self {
344 self.tls_config = Some(config);
345 self
346 }
347
348 pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
350 self.vmac = vmac;
351 self
352 }
353
354 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
356 self.config.apdu_timeout_ms = ms;
357 self
358 }
359
360 pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
362 self.heartbeat_interval_ms = ms;
363 self
364 }
365
366 pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
368 self.heartbeat_timeout_ms = ms;
369 self
370 }
371
372 pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
374 self.reconnect = Some(config);
375 self
376 }
377
378 pub async fn build(
380 self,
381 ) -> Result<
382 BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
383 Error,
384 > {
385 let tls_config = self
386 .tls_config
387 .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
388
389 let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
390
391 let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
392 .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
393 .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
394 if let Some(rc) = self.reconnect {
395 #[allow(deprecated)]
396 {
397 transport = transport.with_reconnect(rc);
398 }
399 }
400
401 BACnetClient::start(self.config, transport).await
402 }
403}
404
405impl<T: TransportPort + 'static> BACnetClient<T> {
406 pub fn generic_builder() -> ClientBuilder<T> {
408 ClientBuilder {
409 config: ClientConfig::default(),
410 transport: None,
411 }
412 }
413
414 pub async fn start(mut config: ClientConfig, transport: T) -> Result<Self, Error> {
416 let transport_max = transport.max_apdu_length();
418 config.max_apdu_length = config.max_apdu_length.min(transport_max);
419
420 let mut network = NetworkLayer::new(transport);
421 let mut apdu_rx = network.start().await?;
422 let local_mac = MacAddr::from_slice(network.local_mac());
423
424 let network = Arc::new(network);
425
426 let tsm_config = TsmConfig {
427 apdu_timeout_ms: config.apdu_timeout_ms,
428 apdu_retries: config.apdu_retries,
429 };
430 let tsm = Arc::new(Mutex::new(Tsm::new(tsm_config)));
431 let tsm_dispatch = Arc::clone(&tsm);
432 let device_table = Arc::new(Mutex::new(DeviceTable::new()));
433 let device_table_dispatch = Arc::clone(&device_table);
434 let network_dispatch = Arc::clone(&network);
435 let (cov_tx, _) = broadcast::channel::<COVNotificationRequest>(64);
436 let cov_tx_dispatch = cov_tx.clone();
437 let seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>> =
438 Arc::new(Mutex::new(HashMap::new()));
439 let seg_ack_senders_dispatch = Arc::clone(&seg_ack_senders);
440
441 let dispatch_task = tokio::spawn(async move {
443 let mut seg_state: HashMap<SegKey, SegmentedReceiveState> = HashMap::new();
445
446 while let Some(received) = apdu_rx.recv().await {
447 let now = Instant::now();
449 seg_state.retain(|_key, state| {
450 now.duration_since(state.last_activity) < SEG_RECEIVER_TIMEOUT
451 });
452
453 match apdu::decode_apdu(received.apdu.clone()) {
454 Ok(decoded) => {
455 Self::dispatch_apdu(
456 &tsm_dispatch,
457 &device_table_dispatch,
458 &network_dispatch,
459 &cov_tx_dispatch,
460 &mut seg_state,
461 &seg_ack_senders_dispatch,
462 &received.source_mac,
463 decoded,
464 )
465 .await;
466 }
467 Err(e) => {
468 warn!(error = %e, "Failed to decode received APDU");
469 }
470 }
471 }
472 });
473
474 Ok(Self {
475 config,
476 network,
477 tsm,
478 device_table,
479 cov_tx,
480 dispatch_task: Some(dispatch_task),
481 seg_ack_senders,
482 local_mac,
483 })
484 }
485
486 #[allow(clippy::too_many_arguments)]
488 async fn dispatch_apdu(
489 tsm: &Arc<Mutex<Tsm>>,
490 device_table: &Arc<Mutex<DeviceTable>>,
491 network: &Arc<NetworkLayer<T>>,
492 cov_tx: &broadcast::Sender<COVNotificationRequest>,
493 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
494 seg_ack_senders: &Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
495 source_mac: &[u8],
496 apdu: Apdu,
497 ) {
498 match apdu {
499 Apdu::SimpleAck(ack) => {
500 debug!(invoke_id = ack.invoke_id, "Received SimpleAck");
501 let mut tsm = tsm.lock().await;
502 tsm.complete_transaction(source_mac, ack.invoke_id, TsmResponse::SimpleAck);
503 }
504 Apdu::ComplexAck(ack) => {
505 if ack.segmented {
506 Self::handle_segmented_complex_ack(tsm, network, seg_state, source_mac, ack)
507 .await;
508 } else {
509 debug!(invoke_id = ack.invoke_id, "Received ComplexAck");
510 let mut tsm = tsm.lock().await;
511 tsm.complete_transaction(
512 source_mac,
513 ack.invoke_id,
514 TsmResponse::ComplexAck {
515 service_data: ack.service_ack,
516 },
517 );
518 }
519 }
520 Apdu::Error(err) => {
521 debug!(invoke_id = err.invoke_id, "Received Error PDU");
522 let mut tsm = tsm.lock().await;
523 tsm.complete_transaction(
524 source_mac,
525 err.invoke_id,
526 TsmResponse::Error {
527 class: err.error_class.to_raw() as u32,
528 code: err.error_code.to_raw() as u32,
529 },
530 );
531 }
532 Apdu::Reject(rej) => {
533 debug!(invoke_id = rej.invoke_id, "Received Reject PDU");
534 let mut tsm = tsm.lock().await;
535 tsm.complete_transaction(
536 source_mac,
537 rej.invoke_id,
538 TsmResponse::Reject {
539 reason: rej.reject_reason.to_raw(),
540 },
541 );
542 }
543 Apdu::Abort(abt) => {
544 debug!(invoke_id = abt.invoke_id, "Received Abort PDU");
545 let mut tsm = tsm.lock().await;
546 tsm.complete_transaction(
547 source_mac,
548 abt.invoke_id,
549 TsmResponse::Abort {
550 reason: abt.abort_reason.to_raw(),
551 },
552 );
553 }
554 Apdu::ConfirmedRequest(req) => {
555 if req.service_choice == ConfirmedServiceChoice::CONFIRMED_COV_NOTIFICATION {
557 match COVNotificationRequest::decode(&req.service_request) {
558 Ok(notification) => {
559 debug!(
560 object = ?notification.monitored_object_identifier,
561 "Received ConfirmedCOVNotification"
562 );
563 let _ = cov_tx.send(notification);
564
565 let ack = Apdu::SimpleAck(SimpleAck {
567 invoke_id: req.invoke_id,
568 service_choice: req.service_choice,
569 });
570 let mut buf = BytesMut::with_capacity(4);
571 encode_apdu(&mut buf, &ack);
572 if let Err(e) = network
573 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
574 .await
575 {
576 warn!(error = %e, "Failed to send SimpleAck for COV notification");
577 }
578 }
579 Err(e) => {
580 warn!(error = %e, "Failed to decode ConfirmedCOVNotification");
581 }
582 }
583 } else {
584 debug!(
585 service = req.service_choice.to_raw(),
586 "Ignoring ConfirmedRequest (client mode)"
587 );
588 }
589 }
590 Apdu::UnconfirmedRequest(req) => {
591 if req.service_choice == UnconfirmedServiceChoice::I_AM {
592 match bacnet_services::who_is::IAmRequest::decode(&req.service_request) {
593 Ok(i_am) => {
594 debug!(
595 device = i_am.object_identifier.instance_number(),
596 vendor = i_am.vendor_id,
597 "Received IAm"
598 );
599 let device = DiscoveredDevice {
600 object_identifier: i_am.object_identifier,
601 mac_address: MacAddr::from_slice(source_mac),
602 max_apdu_length: i_am.max_apdu_length,
603 segmentation_supported: i_am.segmentation_supported,
604 max_segments_accepted: None,
605 vendor_id: i_am.vendor_id,
606 last_seen: std::time::Instant::now(),
607 };
608 device_table.lock().await.upsert(device);
609 }
610 Err(e) => {
611 warn!(error = %e, "Failed to decode IAm");
612 }
613 }
614 } else if req.service_choice
615 == UnconfirmedServiceChoice::UNCONFIRMED_COV_NOTIFICATION
616 {
617 match COVNotificationRequest::decode(&req.service_request) {
618 Ok(notification) => {
619 debug!(
620 object = ?notification.monitored_object_identifier,
621 "Received UnconfirmedCOVNotification"
622 );
623 let _ = cov_tx.send(notification);
624 }
625 Err(e) => {
626 warn!(error = %e, "Failed to decode UnconfirmedCOVNotification");
627 }
628 }
629 } else {
630 debug!(
631 service = req.service_choice.to_raw(),
632 "Ignoring unconfirmed service in client dispatch"
633 );
634 }
635 }
636 Apdu::SegmentAck(sa) => {
637 let key = (MacAddr::from_slice(source_mac), sa.invoke_id);
639 let senders = seg_ack_senders.lock().await;
640 if let Some(tx) = senders.get(&key) {
641 let _ = tx.try_send(sa);
642 } else {
643 debug!(
644 invoke_id = sa.invoke_id,
645 "Ignoring SegmentAck for unknown transaction"
646 );
647 }
648 }
649 }
650 }
651
652 async fn handle_segmented_complex_ack(
655 tsm: &Arc<Mutex<Tsm>>,
656 network: &Arc<NetworkLayer<T>>,
657 seg_state: &mut HashMap<SegKey, SegmentedReceiveState>,
658 source_mac: &[u8],
659 ack: bacnet_encoding::apdu::ComplexAck,
660 ) {
661 let seq = ack.sequence_number.unwrap_or(0);
662 let key = (MacAddr::from_slice(source_mac), ack.invoke_id);
663
664 debug!(
665 invoke_id = ack.invoke_id,
666 seq = seq,
667 more = ack.more_follows,
668 "Received segmented ComplexAck"
669 );
670
671 const MAX_CONCURRENT_SEG_SESSIONS: usize = 64;
673 if !seg_state.contains_key(&key) && seg_state.len() >= MAX_CONCURRENT_SEG_SESSIONS {
674 warn!(
675 invoke_id = ack.invoke_id,
676 sessions = seg_state.len(),
677 "Max concurrent segmented sessions reached, dropping segment"
678 );
679 return;
680 }
681
682 let state = seg_state
684 .entry(key.clone())
685 .or_insert_with(|| SegmentedReceiveState {
686 receiver: SegmentReceiver::new(),
687 expected_next_seq: 0,
688 last_activity: Instant::now(),
689 });
690
691 state.last_activity = Instant::now();
693
694 if seq != state.expected_next_seq {
698 warn!(
699 invoke_id = ack.invoke_id,
700 expected = state.expected_next_seq,
701 received = seq,
702 "Segment gap detected, sending negative SegmentAck"
703 );
704 let neg_ack = Apdu::SegmentAck(SegmentAckPdu {
705 negative_ack: true,
706 sent_by_server: false,
707 invoke_id: ack.invoke_id,
708 sequence_number: state.expected_next_seq,
709 actual_window_size: ack.proposed_window_size.unwrap_or(1),
710 });
711 let mut buf = BytesMut::with_capacity(4);
712 encode_apdu(&mut buf, &neg_ack);
713 if let Err(e) = network
714 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
715 .await
716 {
717 warn!(error = %e, "Failed to send negative SegmentAck");
718 }
719 return;
720 }
721
722 if let Err(e) = state.receiver.receive(seq, ack.service_ack) {
724 warn!(error = %e, "Rejecting oversized segment");
725 return;
726 }
727 state.expected_next_seq = seq.wrapping_add(1);
728
729 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
731 negative_ack: false,
732 sent_by_server: false,
733 invoke_id: ack.invoke_id,
734 sequence_number: seq,
735 actual_window_size: ack.proposed_window_size.unwrap_or(1),
736 });
737 let mut buf = BytesMut::with_capacity(4);
738 encode_apdu(&mut buf, &seg_ack);
739 if let Err(e) = network
740 .send_apdu(&buf, source_mac, false, NetworkPriority::NORMAL)
741 .await
742 {
743 warn!(error = %e, "Failed to send SegmentAck");
744 }
745
746 if !ack.more_follows {
750 let state = seg_state.remove(&key).unwrap();
751 let total = state.receiver.received_count();
752 match state.receiver.reassemble(total) {
753 Ok(service_data) => {
754 debug!(
755 invoke_id = ack.invoke_id,
756 segments = total,
757 bytes = service_data.len(),
758 "Reassembled segmented ComplexAck"
759 );
760 let mut tsm = tsm.lock().await;
761 tsm.complete_transaction(
762 source_mac,
763 ack.invoke_id,
764 TsmResponse::ComplexAck {
765 service_data: Bytes::from(service_data),
766 },
767 );
768 }
769 Err(e) => {
770 warn!(error = %e, "Failed to reassemble segmented ComplexAck");
771 }
772 }
773 }
774 }
775
776 pub fn local_mac(&self) -> &[u8] {
778 &self.local_mac
779 }
780
781 pub async fn confirmed_request(
793 &self,
794 destination_mac: &[u8],
795 service_choice: ConfirmedServiceChoice,
796 service_data: &[u8],
797 ) -> Result<Bytes, Error> {
798 let unsegmented_apdu_size = 4 + service_data.len();
801 let (remote_max_apdu, remote_max_segments) = {
802 let dt = self.device_table.lock().await;
803 let device = dt.get_by_mac(destination_mac);
804 let max_apdu = device
805 .map(|d| d.max_apdu_length as u16)
806 .unwrap_or(self.config.max_apdu_length);
807 let max_seg = device.and_then(|d| d.max_segments_accepted);
808 (max_apdu, max_seg)
809 };
810 if unsegmented_apdu_size > remote_max_apdu as usize {
811 return self
812 .segmented_confirmed_request(
813 destination_mac,
814 service_choice,
815 service_data,
816 remote_max_apdu,
817 remote_max_segments,
818 )
819 .await;
820 }
821
822 let (invoke_id, rx) = {
824 let mut tsm = self.tsm.lock().await;
825 let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
826 Error::Encoding("all invoke IDs exhausted for destination".into())
827 })?;
828 let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
829 (invoke_id, rx)
830 };
831
832 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
834 segmented: false,
835 more_follows: false,
836 segmented_response_accepted: self.config.segmented_response_accepted,
837 max_segments: self.config.max_segments,
838 max_apdu_length: self.config.max_apdu_length,
839 invoke_id,
840 sequence_number: None,
841 proposed_window_size: None,
842 service_choice,
843 service_request: Bytes::copy_from_slice(service_data),
844 });
845
846 let mut buf = BytesMut::with_capacity(6 + service_data.len());
847 encode_apdu(&mut buf, &pdu);
848
849 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
853 let max_retries = self.config.apdu_retries;
854 let mut attempts: u8 = 0;
855 let mut rx = rx;
856
857 loop {
858 if let Err(e) = self
859 .network
860 .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
861 .await
862 {
863 let mut tsm = self.tsm.lock().await;
865 tsm.cancel_transaction(destination_mac, invoke_id);
866 return Err(e);
867 }
868
869 match timeout(timeout_duration, &mut rx).await {
870 Ok(Ok(response)) => {
871 return match response {
873 TsmResponse::SimpleAck => Ok(Bytes::new()),
874 TsmResponse::ComplexAck { service_data } => Ok(service_data),
875 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
876 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
877 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
878 };
879 }
880 Ok(Err(_)) => {
881 return Err(Error::Encoding("TSM response channel closed".into()));
883 }
884 Err(_timeout) => {
885 attempts += 1;
886 if attempts > max_retries {
887 let mut tsm = self.tsm.lock().await;
889 tsm.cancel_transaction(destination_mac, invoke_id);
890 return Err(Error::Timeout(timeout_duration));
891 }
892 debug!(
893 invoke_id,
894 attempt = attempts,
895 max_retries,
896 "APDU timeout, retrying confirmed request"
897 );
898 }
899 }
900 }
901 }
902
903 async fn segmented_confirmed_request(
908 &self,
909 destination_mac: &[u8],
910 service_choice: ConfirmedServiceChoice,
911 service_data: &[u8],
912 remote_max_apdu: u16,
913 remote_max_segments: Option<u32>,
914 ) -> Result<Bytes, Error> {
915 let max_seg_size = max_segment_payload(remote_max_apdu, SegmentedPduType::ConfirmedRequest);
916 let segments = split_payload(service_data, max_seg_size);
917 let total_segments = segments.len();
918
919 if total_segments > 255 {
920 return Err(Error::Segmentation(format!(
921 "payload requires {} segments, maximum is 255",
922 total_segments
923 )));
924 }
925
926 if let Some(max_seg) = remote_max_segments {
927 if total_segments > max_seg as usize {
928 return Err(Error::Segmentation(format!(
929 "request requires {} segments but remote accepts at most {}",
930 total_segments, max_seg
931 )));
932 }
933 }
934
935 debug!(
936 total_segments,
937 max_seg_size,
938 service_data_len = service_data.len(),
939 "Starting segmented confirmed request"
940 );
941
942 let (invoke_id, rx) = {
944 let mut tsm = self.tsm.lock().await;
945 let invoke_id = tsm.allocate_invoke_id(destination_mac).ok_or_else(|| {
946 Error::Encoding("all invoke IDs exhausted for destination".into())
947 })?;
948 let rx = tsm.register_transaction(MacAddr::from_slice(destination_mac), invoke_id);
949 (invoke_id, rx)
950 };
951
952 let (seg_ack_tx, mut seg_ack_rx) = mpsc::channel(16);
954 {
955 let key = (MacAddr::from_slice(destination_mac), invoke_id);
956 self.seg_ack_senders.lock().await.insert(key, seg_ack_tx);
957 }
958
959 let timeout_duration = Duration::from_millis(self.config.apdu_timeout_ms);
960 let max_ack_retries = self.config.apdu_retries;
961 let mut window_size = self.config.proposed_window_size.max(1) as usize;
962 let mut next_seq: usize = 0;
963 let mut neg_ack_retries: u32 = 0;
964 const MAX_NEG_ACK_RETRIES: u32 = 10;
965
966 let result = async {
968 while next_seq < total_segments {
969 let window_end = (next_seq + window_size).min(total_segments);
970
971 for (seq, segment_data) in segments[next_seq..window_end]
972 .iter()
973 .enumerate()
974 .map(|(i, s)| (next_seq + i, s))
975 {
976 let is_last = seq == total_segments - 1;
977 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
978 segmented: true,
979 more_follows: !is_last,
980 segmented_response_accepted: self.config.segmented_response_accepted,
981 max_segments: self.config.max_segments,
982 max_apdu_length: self.config.max_apdu_length,
983 invoke_id,
984 sequence_number: Some(seq as u8),
985 proposed_window_size: Some(self.config.proposed_window_size.max(1)),
986 service_choice,
987 service_request: segment_data.clone(),
988 });
989
990 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
991 encode_apdu(&mut buf, &pdu);
992
993 self.network
994 .send_apdu(&buf, destination_mac, true, NetworkPriority::NORMAL)
995 .await?;
996
997 debug!(seq, is_last, "Sent segment");
998 }
999
1000 let ack = {
1002 let mut ack_retries: u8 = 0;
1003 loop {
1004 match timeout(timeout_duration, seg_ack_rx.recv()).await {
1005 Ok(Some(ack)) => break ack,
1006 Ok(None) => {
1007 return Err(Error::Encoding("SegmentAck channel closed".into()));
1008 }
1009 Err(_timeout) => {
1010 ack_retries += 1;
1011 if ack_retries > max_ack_retries {
1012 return Err(Error::Timeout(timeout_duration));
1013 }
1014 warn!(
1015 attempt = ack_retries,
1016 "Retransmitting segmented request window"
1017 );
1018 for (seq, segment_data) in segments[next_seq..window_end]
1020 .iter()
1021 .enumerate()
1022 .map(|(i, s)| (next_seq + i, s))
1023 {
1024 let is_last = seq == total_segments - 1;
1025 let pdu = Apdu::ConfirmedRequest(ConfirmedRequestPdu {
1026 segmented: true,
1027 more_follows: !is_last,
1028 segmented_response_accepted: self
1029 .config
1030 .segmented_response_accepted,
1031 max_segments: self.config.max_segments,
1032 max_apdu_length: self.config.max_apdu_length,
1033 invoke_id,
1034 sequence_number: Some(seq as u8),
1035 proposed_window_size: Some(
1036 self.config.proposed_window_size.max(1),
1037 ),
1038 service_choice,
1039 service_request: segment_data.clone(),
1040 });
1041
1042 let mut buf = BytesMut::with_capacity(remote_max_apdu as usize);
1043 encode_apdu(&mut buf, &pdu);
1044
1045 self.network
1046 .send_apdu(
1047 &buf,
1048 destination_mac,
1049 true,
1050 NetworkPriority::NORMAL,
1051 )
1052 .await?;
1053 }
1054 }
1055 }
1056 }
1057 };
1058
1059 debug!(
1060 seq = ack.sequence_number,
1061 negative = ack.negative_ack,
1062 window = ack.actual_window_size,
1063 "Received SegmentAck"
1064 );
1065
1066 window_size = ack.actual_window_size.max(1) as usize;
1068
1069 if ack.negative_ack {
1070 neg_ack_retries += 1;
1071 if neg_ack_retries > MAX_NEG_ACK_RETRIES {
1072 return Err(Error::Segmentation(
1073 "too many negative SegmentAck retransmissions".into(),
1074 ));
1075 }
1076 next_seq = ack.sequence_number as usize;
1078 } else {
1079 neg_ack_retries = 0;
1080 next_seq = ack.sequence_number as usize + 1;
1082 }
1083 }
1084
1085 timeout(timeout_duration, rx)
1087 .await
1088 .map_err(|_| Error::Timeout(timeout_duration))?
1089 .map_err(|_| Error::Encoding("TSM response channel closed".into()))
1090 }
1091 .await;
1092
1093 {
1095 let key = (MacAddr::from_slice(destination_mac), invoke_id);
1096 self.seg_ack_senders.lock().await.remove(&key);
1097 }
1098
1099 let response = match result {
1101 Ok(response) => response,
1102 Err(e) => {
1103 let mut tsm = self.tsm.lock().await;
1104 tsm.cancel_transaction(destination_mac, invoke_id);
1105 return Err(e);
1106 }
1107 };
1108
1109 match response {
1110 TsmResponse::SimpleAck => Ok(Bytes::new()),
1111 TsmResponse::ComplexAck { service_data } => Ok(service_data),
1112 TsmResponse::Error { class, code } => Err(Error::Protocol { class, code }),
1113 TsmResponse::Reject { reason } => Err(Error::Reject { reason }),
1114 TsmResponse::Abort { reason } => Err(Error::Abort { reason }),
1115 }
1116 }
1117
1118 pub async fn unconfirmed_request(
1120 &self,
1121 destination_mac: &[u8],
1122 service_choice: UnconfirmedServiceChoice,
1123 service_data: &[u8],
1124 ) -> Result<(), Error> {
1125 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1126 service_choice,
1127 service_request: Bytes::copy_from_slice(service_data),
1128 });
1129
1130 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1131 encode_apdu(&mut buf, &pdu);
1132
1133 self.network
1134 .send_apdu(&buf, destination_mac, false, NetworkPriority::NORMAL)
1135 .await
1136 }
1137
1138 pub async fn broadcast_unconfirmed(
1140 &self,
1141 service_choice: UnconfirmedServiceChoice,
1142 service_data: &[u8],
1143 ) -> Result<(), Error> {
1144 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1145 service_choice,
1146 service_request: Bytes::copy_from_slice(service_data),
1147 });
1148
1149 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1150 encode_apdu(&mut buf, &pdu);
1151
1152 self.network
1153 .broadcast_apdu(&buf, false, NetworkPriority::NORMAL)
1154 .await
1155 }
1156
1157 pub async fn broadcast_global_unconfirmed(
1162 &self,
1163 service_choice: UnconfirmedServiceChoice,
1164 service_data: &[u8],
1165 ) -> Result<(), Error> {
1166 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1167 service_choice,
1168 service_request: Bytes::copy_from_slice(service_data),
1169 });
1170
1171 let mut buf = BytesMut::with_capacity(2 + service_data.len());
1172 encode_apdu(&mut buf, &pdu);
1173
1174 self.network
1175 .broadcast_global_apdu(&buf, false, NetworkPriority::NORMAL)
1176 .await
1177 }
1178
1179 pub async fn read_property(
1185 &self,
1186 destination_mac: &[u8],
1187 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1188 property_identifier: bacnet_types::enums::PropertyIdentifier,
1189 property_array_index: Option<u32>,
1190 ) -> Result<bacnet_services::read_property::ReadPropertyACK, Error> {
1191 use bacnet_services::read_property::ReadPropertyRequest;
1192
1193 let request = ReadPropertyRequest {
1194 object_identifier,
1195 property_identifier,
1196 property_array_index,
1197 };
1198 let mut buf = BytesMut::new();
1199 request.encode(&mut buf);
1200
1201 let response_data = self
1202 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_PROPERTY, &buf)
1203 .await?;
1204
1205 bacnet_services::read_property::ReadPropertyACK::decode(&response_data)
1206 }
1207
1208 pub async fn write_property(
1210 &self,
1211 destination_mac: &[u8],
1212 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1213 property_identifier: bacnet_types::enums::PropertyIdentifier,
1214 property_array_index: Option<u32>,
1215 property_value: Vec<u8>,
1216 priority: Option<u8>,
1217 ) -> Result<(), Error> {
1218 use bacnet_services::write_property::WritePropertyRequest;
1219
1220 let request = WritePropertyRequest {
1221 object_identifier,
1222 property_identifier,
1223 property_array_index,
1224 property_value,
1225 priority,
1226 };
1227 let mut buf = BytesMut::new();
1228 request.encode(&mut buf);
1229
1230 let _ = self
1231 .confirmed_request(
1232 destination_mac,
1233 ConfirmedServiceChoice::WRITE_PROPERTY,
1234 &buf,
1235 )
1236 .await?;
1237
1238 Ok(())
1239 }
1240
1241 pub async fn read_property_multiple(
1243 &self,
1244 destination_mac: &[u8],
1245 specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
1246 ) -> Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error> {
1247 use bacnet_services::rpm::{ReadPropertyMultipleACK, ReadPropertyMultipleRequest};
1248
1249 let request = ReadPropertyMultipleRequest {
1250 list_of_read_access_specs: specs,
1251 };
1252 let mut buf = BytesMut::new();
1253 request.encode(&mut buf);
1254
1255 let response_data = self
1256 .confirmed_request(
1257 destination_mac,
1258 ConfirmedServiceChoice::READ_PROPERTY_MULTIPLE,
1259 &buf,
1260 )
1261 .await?;
1262
1263 ReadPropertyMultipleACK::decode(&response_data)
1264 }
1265
1266 pub async fn write_property_multiple(
1268 &self,
1269 destination_mac: &[u8],
1270 specs: Vec<bacnet_services::wpm::WriteAccessSpecification>,
1271 ) -> Result<(), Error> {
1272 use bacnet_services::wpm::WritePropertyMultipleRequest;
1273
1274 let request = WritePropertyMultipleRequest {
1275 list_of_write_access_specs: specs,
1276 };
1277 let mut buf = BytesMut::new();
1278 request.encode(&mut buf);
1279
1280 let _ = self
1281 .confirmed_request(
1282 destination_mac,
1283 ConfirmedServiceChoice::WRITE_PROPERTY_MULTIPLE,
1284 &buf,
1285 )
1286 .await?;
1287
1288 Ok(())
1289 }
1290
1291 pub async fn who_is(
1293 &self,
1294 low_limit: Option<u32>,
1295 high_limit: Option<u32>,
1296 ) -> Result<(), Error> {
1297 use bacnet_services::who_is::WhoIsRequest;
1298
1299 let request = WhoIsRequest {
1300 low_limit,
1301 high_limit,
1302 };
1303 let mut buf = BytesMut::new();
1304 request.encode(&mut buf);
1305
1306 self.broadcast_global_unconfirmed(UnconfirmedServiceChoice::WHO_IS, &buf)
1307 .await
1308 }
1309
1310 pub async fn who_is_directed(
1312 &self,
1313 destination_mac: &[u8],
1314 low_limit: Option<u32>,
1315 high_limit: Option<u32>,
1316 ) -> Result<(), Error> {
1317 use bacnet_services::who_is::WhoIsRequest;
1318
1319 let request = WhoIsRequest {
1320 low_limit,
1321 high_limit,
1322 };
1323 let mut buf = BytesMut::new();
1324 request.encode(&mut buf);
1325
1326 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1327 service_choice: UnconfirmedServiceChoice::WHO_IS,
1328 service_request: Bytes::copy_from_slice(&buf),
1329 });
1330
1331 let mut apdu_buf = BytesMut::with_capacity(2 + buf.len());
1332 encode_apdu(&mut apdu_buf, &pdu);
1333
1334 self.network
1335 .send_apdu(&apdu_buf, destination_mac, false, NetworkPriority::NORMAL)
1336 .await
1337 }
1338
1339 pub async fn who_is_network(
1344 &self,
1345 dest_network: u16,
1346 low_limit: Option<u32>,
1347 high_limit: Option<u32>,
1348 ) -> Result<(), Error> {
1349 use bacnet_services::who_is::WhoIsRequest;
1350
1351 let request = WhoIsRequest {
1352 low_limit,
1353 high_limit,
1354 };
1355 let mut buf = BytesMut::new();
1356 request.encode(&mut buf);
1357
1358 let pdu = Apdu::UnconfirmedRequest(bacnet_encoding::apdu::UnconfirmedRequest {
1359 service_choice: UnconfirmedServiceChoice::WHO_IS,
1360 service_request: Bytes::copy_from_slice(&buf),
1361 });
1362
1363 let mut apdu_buf = BytesMut::with_capacity(2 + buf.len());
1364 encode_apdu(&mut apdu_buf, &pdu);
1365
1366 self.network
1367 .broadcast_to_network(&apdu_buf, dest_network, false, NetworkPriority::NORMAL)
1368 .await
1369 }
1370
1371 pub async fn who_has(
1373 &self,
1374 object: bacnet_services::who_has::WhoHasObject,
1375 low_limit: Option<u32>,
1376 high_limit: Option<u32>,
1377 ) -> Result<(), Error> {
1378 use bacnet_services::who_has::WhoHasRequest;
1379
1380 let request = WhoHasRequest {
1381 low_limit,
1382 high_limit,
1383 object,
1384 };
1385 let mut buf = BytesMut::new();
1386 request.encode(&mut buf)?;
1387
1388 self.broadcast_unconfirmed(UnconfirmedServiceChoice::WHO_HAS, &buf)
1389 .await
1390 }
1391
1392 pub async fn subscribe_cov(
1394 &self,
1395 destination_mac: &[u8],
1396 subscriber_process_identifier: u32,
1397 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1398 confirmed: bool,
1399 lifetime: Option<u32>,
1400 ) -> Result<(), Error> {
1401 use bacnet_services::cov::SubscribeCOVRequest;
1402
1403 let request = SubscribeCOVRequest {
1404 subscriber_process_identifier,
1405 monitored_object_identifier,
1406 issue_confirmed_notifications: Some(confirmed),
1407 lifetime,
1408 };
1409 let mut buf = BytesMut::new();
1410 request.encode(&mut buf);
1411
1412 let _ = self
1413 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1414 .await?;
1415
1416 Ok(())
1417 }
1418
1419 pub async fn unsubscribe_cov(
1421 &self,
1422 destination_mac: &[u8],
1423 subscriber_process_identifier: u32,
1424 monitored_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1425 ) -> Result<(), Error> {
1426 use bacnet_services::cov::SubscribeCOVRequest;
1427
1428 let request = SubscribeCOVRequest {
1429 subscriber_process_identifier,
1430 monitored_object_identifier,
1431 issue_confirmed_notifications: None,
1432 lifetime: None,
1433 };
1434 let mut buf = BytesMut::new();
1435 request.encode(&mut buf);
1436
1437 let _ = self
1438 .confirmed_request(destination_mac, ConfirmedServiceChoice::SUBSCRIBE_COV, &buf)
1439 .await?;
1440
1441 Ok(())
1442 }
1443
1444 pub async fn delete_object(
1446 &self,
1447 destination_mac: &[u8],
1448 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1449 ) -> Result<(), Error> {
1450 use bacnet_services::object_mgmt::DeleteObjectRequest;
1451
1452 let request = DeleteObjectRequest { object_identifier };
1453 let mut buf = BytesMut::new();
1454 request.encode(&mut buf);
1455
1456 let _ = self
1457 .confirmed_request(destination_mac, ConfirmedServiceChoice::DELETE_OBJECT, &buf)
1458 .await?;
1459
1460 Ok(())
1461 }
1462
1463 pub async fn create_object(
1465 &self,
1466 destination_mac: &[u8],
1467 object_specifier: bacnet_services::object_mgmt::ObjectSpecifier,
1468 initial_values: Vec<bacnet_services::common::BACnetPropertyValue>,
1469 ) -> Result<Bytes, Error> {
1470 use bacnet_services::object_mgmt::CreateObjectRequest;
1471
1472 let request = CreateObjectRequest {
1473 object_specifier,
1474 list_of_initial_values: initial_values,
1475 };
1476 let mut buf = BytesMut::new();
1477 request.encode(&mut buf);
1478
1479 self.confirmed_request(destination_mac, ConfirmedServiceChoice::CREATE_OBJECT, &buf)
1480 .await
1481 }
1482
1483 pub async fn device_communication_control(
1485 &self,
1486 destination_mac: &[u8],
1487 enable_disable: bacnet_types::enums::EnableDisable,
1488 time_duration: Option<u16>,
1489 password: Option<String>,
1490 ) -> Result<(), Error> {
1491 use bacnet_services::device_mgmt::DeviceCommunicationControlRequest;
1492
1493 let request = DeviceCommunicationControlRequest {
1494 time_duration,
1495 enable_disable,
1496 password,
1497 };
1498 let mut buf = BytesMut::new();
1499 request.encode(&mut buf)?;
1500
1501 let _ = self
1502 .confirmed_request(
1503 destination_mac,
1504 ConfirmedServiceChoice::DEVICE_COMMUNICATION_CONTROL,
1505 &buf,
1506 )
1507 .await?;
1508
1509 Ok(())
1510 }
1511
1512 pub async fn reinitialize_device(
1514 &self,
1515 destination_mac: &[u8],
1516 reinitialized_state: bacnet_types::enums::ReinitializedState,
1517 password: Option<String>,
1518 ) -> Result<(), Error> {
1519 use bacnet_services::device_mgmt::ReinitializeDeviceRequest;
1520
1521 let request = ReinitializeDeviceRequest {
1522 reinitialized_state,
1523 password,
1524 };
1525 let mut buf = BytesMut::new();
1526 request.encode(&mut buf)?;
1527
1528 let _ = self
1529 .confirmed_request(
1530 destination_mac,
1531 ConfirmedServiceChoice::REINITIALIZE_DEVICE,
1532 &buf,
1533 )
1534 .await?;
1535
1536 Ok(())
1537 }
1538
1539 pub async fn get_event_information(
1541 &self,
1542 destination_mac: &[u8],
1543 last_received_object_identifier: Option<bacnet_types::primitives::ObjectIdentifier>,
1544 ) -> Result<Bytes, Error> {
1545 use bacnet_services::alarm_event::GetEventInformationRequest;
1546
1547 let request = GetEventInformationRequest {
1548 last_received_object_identifier,
1549 };
1550 let mut buf = BytesMut::new();
1551 request.encode(&mut buf);
1552
1553 self.confirmed_request(
1554 destination_mac,
1555 ConfirmedServiceChoice::GET_EVENT_INFORMATION,
1556 &buf,
1557 )
1558 .await
1559 }
1560
1561 pub async fn acknowledge_alarm(
1563 &self,
1564 destination_mac: &[u8],
1565 acknowledging_process_identifier: u32,
1566 event_object_identifier: bacnet_types::primitives::ObjectIdentifier,
1567 event_state_acknowledged: u32,
1568 acknowledgment_source: &str,
1569 ) -> Result<(), Error> {
1570 use bacnet_services::alarm_event::AcknowledgeAlarmRequest;
1571
1572 let request = AcknowledgeAlarmRequest {
1573 acknowledging_process_identifier,
1574 event_object_identifier,
1575 event_state_acknowledged,
1576 timestamp: bacnet_types::primitives::BACnetTimeStamp::SequenceNumber(0),
1577 acknowledgment_source: acknowledgment_source.to_string(),
1578 };
1579 let mut buf = BytesMut::new();
1580 request.encode(&mut buf)?;
1581
1582 let _ = self
1583 .confirmed_request(
1584 destination_mac,
1585 ConfirmedServiceChoice::ACKNOWLEDGE_ALARM,
1586 &buf,
1587 )
1588 .await?;
1589
1590 Ok(())
1591 }
1592
1593 pub async fn read_range(
1595 &self,
1596 destination_mac: &[u8],
1597 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1598 property_identifier: bacnet_types::enums::PropertyIdentifier,
1599 property_array_index: Option<u32>,
1600 range: Option<bacnet_services::read_range::RangeSpec>,
1601 ) -> Result<bacnet_services::read_range::ReadRangeAck, Error> {
1602 use bacnet_services::read_range::{ReadRangeAck, ReadRangeRequest};
1603
1604 let request = ReadRangeRequest {
1605 object_identifier,
1606 property_identifier,
1607 property_array_index,
1608 range,
1609 };
1610 let mut buf = BytesMut::new();
1611 request.encode(&mut buf);
1612
1613 let response_data = self
1614 .confirmed_request(destination_mac, ConfirmedServiceChoice::READ_RANGE, &buf)
1615 .await?;
1616
1617 ReadRangeAck::decode(&response_data)
1618 }
1619
1620 pub async fn atomic_read_file(
1622 &self,
1623 destination_mac: &[u8],
1624 file_identifier: bacnet_types::primitives::ObjectIdentifier,
1625 access: bacnet_services::file::FileAccessMethod,
1626 ) -> Result<Bytes, Error> {
1627 use bacnet_services::file::AtomicReadFileRequest;
1628
1629 let request = AtomicReadFileRequest {
1630 file_identifier,
1631 access,
1632 };
1633 let mut buf = BytesMut::new();
1634 request.encode(&mut buf);
1635
1636 self.confirmed_request(
1637 destination_mac,
1638 ConfirmedServiceChoice::ATOMIC_READ_FILE,
1639 &buf,
1640 )
1641 .await
1642 }
1643
1644 pub async fn atomic_write_file(
1646 &self,
1647 destination_mac: &[u8],
1648 file_identifier: bacnet_types::primitives::ObjectIdentifier,
1649 access: bacnet_services::file::FileWriteAccessMethod,
1650 ) -> Result<Bytes, Error> {
1651 use bacnet_services::file::AtomicWriteFileRequest;
1652
1653 let request = AtomicWriteFileRequest {
1654 file_identifier,
1655 access,
1656 };
1657 let mut buf = BytesMut::new();
1658 request.encode(&mut buf);
1659
1660 self.confirmed_request(
1661 destination_mac,
1662 ConfirmedServiceChoice::ATOMIC_WRITE_FILE,
1663 &buf,
1664 )
1665 .await
1666 }
1667
1668 pub async fn add_list_element(
1670 &self,
1671 destination_mac: &[u8],
1672 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1673 property_identifier: bacnet_types::enums::PropertyIdentifier,
1674 property_array_index: Option<u32>,
1675 list_of_elements: Vec<u8>,
1676 ) -> Result<(), Error> {
1677 use bacnet_services::list_manipulation::ListElementRequest;
1678
1679 let request = ListElementRequest {
1680 object_identifier,
1681 property_identifier,
1682 property_array_index,
1683 list_of_elements,
1684 };
1685 let mut buf = BytesMut::new();
1686 request.encode(&mut buf);
1687
1688 let _ = self
1689 .confirmed_request(
1690 destination_mac,
1691 ConfirmedServiceChoice::ADD_LIST_ELEMENT,
1692 &buf,
1693 )
1694 .await?;
1695
1696 Ok(())
1697 }
1698
1699 pub async fn remove_list_element(
1701 &self,
1702 destination_mac: &[u8],
1703 object_identifier: bacnet_types::primitives::ObjectIdentifier,
1704 property_identifier: bacnet_types::enums::PropertyIdentifier,
1705 property_array_index: Option<u32>,
1706 list_of_elements: Vec<u8>,
1707 ) -> Result<(), Error> {
1708 use bacnet_services::list_manipulation::ListElementRequest;
1709
1710 let request = ListElementRequest {
1711 object_identifier,
1712 property_identifier,
1713 property_array_index,
1714 list_of_elements,
1715 };
1716 let mut buf = BytesMut::new();
1717 request.encode(&mut buf);
1718
1719 let _ = self
1720 .confirmed_request(
1721 destination_mac,
1722 ConfirmedServiceChoice::REMOVE_LIST_ELEMENT,
1723 &buf,
1724 )
1725 .await?;
1726
1727 Ok(())
1728 }
1729
1730 pub async fn time_synchronization(
1734 &self,
1735 destination_mac: &[u8],
1736 date: bacnet_types::primitives::Date,
1737 time: bacnet_types::primitives::Time,
1738 ) -> Result<(), Error> {
1739 use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1740
1741 let request = TimeSynchronizationRequest { date, time };
1742 let mut buf = BytesMut::new();
1743 request.encode(&mut buf);
1744
1745 self.unconfirmed_request(
1746 destination_mac,
1747 UnconfirmedServiceChoice::TIME_SYNCHRONIZATION,
1748 &buf,
1749 )
1750 .await
1751 }
1752
1753 pub async fn utc_time_synchronization(
1757 &self,
1758 destination_mac: &[u8],
1759 date: bacnet_types::primitives::Date,
1760 time: bacnet_types::primitives::Time,
1761 ) -> Result<(), Error> {
1762 use bacnet_services::device_mgmt::TimeSynchronizationRequest;
1763
1764 let request = TimeSynchronizationRequest { date, time };
1765 let mut buf = BytesMut::new();
1766 request.encode(&mut buf);
1767
1768 self.unconfirmed_request(
1769 destination_mac,
1770 UnconfirmedServiceChoice::UTC_TIME_SYNCHRONIZATION,
1771 &buf,
1772 )
1773 .await
1774 }
1775
1776 pub fn cov_notifications(&self) -> broadcast::Receiver<COVNotificationRequest> {
1781 self.cov_tx.subscribe()
1782 }
1783
1784 pub async fn discovered_devices(&self) -> Vec<DiscoveredDevice> {
1790 self.device_table.lock().await.all()
1791 }
1792
1793 pub async fn get_device(&self, instance: u32) -> Option<DiscoveredDevice> {
1795 self.device_table.lock().await.get(instance).cloned()
1796 }
1797
1798 pub async fn clear_devices(&self) {
1800 self.device_table.lock().await.clear();
1801 }
1802
1803 pub async fn stop(&mut self) -> Result<(), Error> {
1805 if let Some(task) = self.dispatch_task.take() {
1806 task.abort();
1807 let _ = task.await;
1808 }
1809 Ok(())
1811 }
1812}
1813
1814#[cfg(test)]
1815mod tests {
1816 use super::*;
1817 use bacnet_encoding::apdu::{ComplexAck, SimpleAck};
1818 use std::net::Ipv4Addr;
1819 use tokio::time::Duration;
1820
1821 async fn make_client() -> BACnetClient<BipTransport> {
1823 BACnetClient::builder()
1824 .interface(Ipv4Addr::LOCALHOST)
1825 .port(0)
1826 .apdu_timeout_ms(2000)
1827 .build()
1828 .await
1829 .unwrap()
1830 }
1831
1832 #[tokio::test]
1833 async fn client_start_stop() {
1834 let mut client = make_client().await;
1835 assert!(!client.local_mac().is_empty());
1836 client.stop().await.unwrap();
1837 }
1838
1839 #[tokio::test]
1840 async fn confirmed_request_simple_ack() {
1841 let mut client_a = make_client().await;
1842
1843 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1845 let mut net_b = NetworkLayer::new(transport_b);
1846 let mut rx_b = net_b.start().await.unwrap();
1847 let b_mac = net_b.local_mac().to_vec();
1848
1849 let b_handle = tokio::spawn(async move {
1851 let received = timeout(Duration::from_secs(2), rx_b.recv())
1852 .await
1853 .expect("B timed out")
1854 .expect("B channel closed");
1855
1856 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1857 if let Apdu::ConfirmedRequest(req) = decoded {
1858 let ack = Apdu::SimpleAck(SimpleAck {
1859 invoke_id: req.invoke_id,
1860 service_choice: req.service_choice,
1861 });
1862 let mut buf = BytesMut::new();
1863 encode_apdu(&mut buf, &ack);
1864 net_b
1865 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1866 .await
1867 .unwrap();
1868 }
1869 net_b.stop().await.unwrap();
1870 });
1871
1872 let result = client_a
1873 .confirmed_request(
1874 &b_mac,
1875 ConfirmedServiceChoice::WRITE_PROPERTY,
1876 &[0x01, 0x02],
1877 )
1878 .await;
1879
1880 assert!(result.is_ok());
1881 let response = result.unwrap();
1882 assert!(response.is_empty()); b_handle.await.unwrap();
1885 client_a.stop().await.unwrap();
1886 }
1887
1888 #[tokio::test]
1889 async fn confirmed_request_complex_ack() {
1890 let mut client_a = make_client().await;
1891
1892 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1893 let mut net_b = NetworkLayer::new(transport_b);
1894 let mut rx_b = net_b.start().await.unwrap();
1895 let b_mac = net_b.local_mac().to_vec();
1896
1897 let b_handle = tokio::spawn(async move {
1898 let received = timeout(Duration::from_secs(2), rx_b.recv())
1899 .await
1900 .unwrap()
1901 .unwrap();
1902
1903 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1904 if let Apdu::ConfirmedRequest(req) = decoded {
1905 let ack = Apdu::ComplexAck(ComplexAck {
1906 segmented: false,
1907 more_follows: false,
1908 invoke_id: req.invoke_id,
1909 sequence_number: None,
1910 proposed_window_size: None,
1911 service_choice: req.service_choice,
1912 service_ack: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
1913 });
1914 let mut buf = BytesMut::new();
1915 encode_apdu(&mut buf, &ack);
1916 net_b
1917 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1918 .await
1919 .unwrap();
1920 }
1921 net_b.stop().await.unwrap();
1922 });
1923
1924 let result = client_a
1925 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1926 .await;
1927
1928 assert!(result.is_ok());
1929 assert_eq!(result.unwrap(), vec![0xDE, 0xAD, 0xBE, 0xEF]);
1930
1931 b_handle.await.unwrap();
1932 client_a.stop().await.unwrap();
1933 }
1934
1935 #[tokio::test]
1936 async fn confirmed_request_timeout() {
1937 let mut client = make_client().await;
1938 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
1940 let result = client
1941 .confirmed_request(&fake_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
1942 .await;
1943 assert!(result.is_err());
1944 client.stop().await.unwrap();
1945 }
1946
1947 #[tokio::test]
1948 async fn segmented_complex_ack_reassembly() {
1949 let mut client = make_client().await;
1950
1951 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
1952 let mut net_b = NetworkLayer::new(transport_b);
1953 let mut rx_b = net_b.start().await.unwrap();
1954 let b_mac = net_b.local_mac().to_vec();
1955
1956 let b_handle = tokio::spawn(async move {
1959 let received = timeout(Duration::from_secs(2), rx_b.recv())
1961 .await
1962 .unwrap()
1963 .unwrap();
1964
1965 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
1966 let invoke_id = if let Apdu::ConfirmedRequest(req) = decoded {
1967 req.invoke_id
1968 } else {
1969 panic!("Expected ConfirmedRequest");
1970 };
1971
1972 let service_choice = ConfirmedServiceChoice::READ_PROPERTY;
1973 let segments: Vec<Bytes> = vec![
1974 Bytes::from_static(&[0x01, 0x02, 0x03]),
1975 Bytes::from_static(&[0x04, 0x05, 0x06]),
1976 Bytes::from_static(&[0x07, 0x08]),
1977 ];
1978
1979 for (i, seg) in segments.iter().enumerate() {
1980 let is_last = i == segments.len() - 1;
1981 let ack = Apdu::ComplexAck(ComplexAck {
1982 segmented: true,
1983 more_follows: !is_last,
1984 invoke_id,
1985 sequence_number: Some(i as u8),
1986 proposed_window_size: Some(1),
1987 service_choice,
1988 service_ack: seg.clone(),
1989 });
1990 let mut buf = BytesMut::new();
1991 encode_apdu(&mut buf, &ack);
1992 net_b
1993 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
1994 .await
1995 .unwrap();
1996
1997 let seg_ack_msg = timeout(Duration::from_secs(2), rx_b.recv())
1999 .await
2000 .unwrap()
2001 .unwrap();
2002 let decoded = apdu::decode_apdu(seg_ack_msg.apdu.clone()).unwrap();
2003 if let Apdu::SegmentAck(sa) = decoded {
2004 assert_eq!(sa.invoke_id, invoke_id);
2005 assert_eq!(sa.sequence_number, i as u8);
2006 } else {
2007 panic!("Expected SegmentAck, got {:?}", decoded);
2008 }
2009 }
2010
2011 net_b.stop().await.unwrap();
2012 });
2013
2014 let result = client
2016 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &[0x01])
2017 .await;
2018
2019 assert!(result.is_ok());
2020 assert_eq!(
2021 result.unwrap(),
2022 vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]
2023 );
2024
2025 b_handle.await.unwrap();
2026 client.stop().await.unwrap();
2027 }
2028
2029 #[tokio::test]
2030 async fn segmented_confirmed_request_sends_segments() {
2031 let mut client = BACnetClient::builder()
2034 .interface(Ipv4Addr::LOCALHOST)
2035 .port(0)
2036 .apdu_timeout_ms(5000)
2037 .max_apdu_length(50)
2038 .build()
2039 .await
2040 .unwrap();
2041
2042 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2043 let mut net_b = NetworkLayer::new(transport_b);
2044 let mut rx_b = net_b.start().await.unwrap();
2045 let b_mac = net_b.local_mac().to_vec();
2046
2047 let service_data: Vec<u8> = (0u8..100).collect();
2049 let expected_data = service_data.clone();
2050
2051 let b_handle = tokio::spawn(async move {
2052 let mut all_service_data = Vec::new();
2053 let mut client_mac;
2054 let mut invoke_id;
2055
2056 loop {
2057 let received = timeout(Duration::from_secs(3), rx_b.recv())
2058 .await
2059 .expect("server timed out waiting for segment")
2060 .expect("server channel closed");
2061
2062 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2063 if let Apdu::ConfirmedRequest(req) = decoded {
2064 assert!(req.segmented, "expected segmented request");
2065 invoke_id = req.invoke_id;
2066 client_mac = received.source_mac.clone();
2067 let seq = req.sequence_number.unwrap();
2068 all_service_data.extend_from_slice(&req.service_request);
2069
2070 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2072 negative_ack: false,
2073 sent_by_server: true,
2074 invoke_id,
2075 sequence_number: seq,
2076 actual_window_size: 1,
2077 });
2078 let mut buf = BytesMut::new();
2079 encode_apdu(&mut buf, &seg_ack);
2080 net_b
2081 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2082 .await
2083 .unwrap();
2084
2085 if !req.more_follows {
2086 break;
2087 }
2088 } else {
2089 panic!("Expected ConfirmedRequest, got {:?}", decoded);
2090 }
2091 }
2092
2093 let ack = Apdu::SimpleAck(SimpleAck {
2095 invoke_id,
2096 service_choice: ConfirmedServiceChoice::WRITE_PROPERTY,
2097 });
2098 let mut buf = BytesMut::new();
2099 encode_apdu(&mut buf, &ack);
2100 net_b
2101 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2102 .await
2103 .unwrap();
2104
2105 net_b.stop().await.unwrap();
2106 all_service_data
2107 });
2108
2109 let result = client
2110 .confirmed_request(
2111 &b_mac,
2112 ConfirmedServiceChoice::WRITE_PROPERTY,
2113 &service_data,
2114 )
2115 .await;
2116
2117 assert!(result.is_ok());
2118 assert!(result.unwrap().is_empty()); let received_data = b_handle.await.unwrap();
2122 assert_eq!(received_data, expected_data);
2123
2124 client.stop().await.unwrap();
2125 }
2126
2127 #[tokio::test]
2128 async fn segmented_request_with_complex_ack_response() {
2129 let mut client = BACnetClient::builder()
2130 .interface(Ipv4Addr::LOCALHOST)
2131 .port(0)
2132 .apdu_timeout_ms(5000)
2133 .max_apdu_length(50)
2134 .build()
2135 .await
2136 .unwrap();
2137
2138 let transport_b = BipTransport::new(Ipv4Addr::LOCALHOST, 0, Ipv4Addr::BROADCAST);
2139 let mut net_b = NetworkLayer::new(transport_b);
2140 let mut rx_b = net_b.start().await.unwrap();
2141 let b_mac = net_b.local_mac().to_vec();
2142
2143 let service_data: Vec<u8> = (0u8..60).collect();
2145
2146 let b_handle = tokio::spawn(async move {
2147 let mut client_mac;
2148 let mut invoke_id;
2149
2150 loop {
2151 let received = timeout(Duration::from_secs(3), rx_b.recv())
2152 .await
2153 .unwrap()
2154 .unwrap();
2155
2156 let decoded = apdu::decode_apdu(received.apdu.clone()).unwrap();
2157 if let Apdu::ConfirmedRequest(req) = decoded {
2158 invoke_id = req.invoke_id;
2159 client_mac = received.source_mac.clone();
2160 let seq = req.sequence_number.unwrap();
2161
2162 let seg_ack = Apdu::SegmentAck(SegmentAckPdu {
2163 negative_ack: false,
2164 sent_by_server: true,
2165 invoke_id,
2166 sequence_number: seq,
2167 actual_window_size: 1,
2168 });
2169 let mut buf = BytesMut::new();
2170 encode_apdu(&mut buf, &seg_ack);
2171 net_b
2172 .send_apdu(&buf, &received.source_mac, false, NetworkPriority::NORMAL)
2173 .await
2174 .unwrap();
2175
2176 if !req.more_follows {
2177 break;
2178 }
2179 }
2180 }
2181
2182 let ack = Apdu::ComplexAck(ComplexAck {
2184 segmented: false,
2185 more_follows: false,
2186 invoke_id,
2187 sequence_number: None,
2188 proposed_window_size: None,
2189 service_choice: ConfirmedServiceChoice::READ_PROPERTY,
2190 service_ack: Bytes::from_static(&[0xCA, 0xFE]),
2191 });
2192 let mut buf = BytesMut::new();
2193 encode_apdu(&mut buf, &ack);
2194 net_b
2195 .send_apdu(&buf, &client_mac, false, NetworkPriority::NORMAL)
2196 .await
2197 .unwrap();
2198
2199 net_b.stop().await.unwrap();
2200 });
2201
2202 let result = client
2203 .confirmed_request(&b_mac, ConfirmedServiceChoice::READ_PROPERTY, &service_data)
2204 .await;
2205
2206 assert!(result.is_ok());
2207 assert_eq!(result.unwrap(), vec![0xCA, 0xFE]);
2208
2209 b_handle.await.unwrap();
2210 client.stop().await.unwrap();
2211 }
2212
2213 #[tokio::test]
2214 async fn segment_overflow_guard() {
2215 let mut client = BACnetClient::builder()
2217 .interface(Ipv4Addr::LOCALHOST)
2218 .port(0)
2219 .apdu_timeout_ms(2000)
2220 .max_apdu_length(50)
2221 .build()
2222 .await
2223 .unwrap();
2224
2225 let huge_payload = vec![0u8; 256 * 44];
2228
2229 let fake_mac = vec![10, 99, 99, 99, 0xBA, 0xC0];
2232
2233 let result = client
2234 .confirmed_request(
2235 &fake_mac,
2236 ConfirmedServiceChoice::READ_PROPERTY,
2237 &huge_payload,
2238 )
2239 .await;
2240
2241 assert!(result.is_err());
2242 let err_msg = result.unwrap_err().to_string();
2243 assert!(
2244 err_msg.contains("256 segments"),
2245 "expected segment overflow error, got: {}",
2246 err_msg
2247 );
2248
2249 client.stop().await.unwrap();
2250 }
2251
2252 #[test]
2253 fn seg_receiver_timeout_is_4s() {
2254 assert_eq!(SEG_RECEIVER_TIMEOUT, Duration::from_secs(4));
2255 }
2256}