1use std::collections::HashMap;
7use std::net::Ipv4Addr;
8#[cfg(feature = "ipv6")]
9use std::net::Ipv6Addr;
10use std::sync::Arc;
11use std::time::Instant;
12
13use bytes::{Bytes, BytesMut};
14use tokio::sync::{broadcast, mpsc, Mutex};
15use tokio::task::JoinHandle;
16use tokio::time::{timeout, Duration};
17use tracing::{debug, warn};
18
19use bacnet_encoding::apdu::{
20 self, encode_apdu, validate_max_apdu_length, AbortPdu, Apdu,
21 ConfirmedRequest as ConfirmedRequestPdu, SegmentAck as SegmentAckPdu, SimpleAck,
22};
23use bacnet_encoding::npdu::NpduAddress;
24use bacnet_network::layer::NetworkLayer;
25use bacnet_services::cov::COVNotificationRequest;
26use bacnet_transport::bip::BipTransport;
27#[cfg(feature = "ipv6")]
28use bacnet_transport::bip6::Bip6Transport;
29use bacnet_transport::port::TransportPort;
30use bacnet_types::enums::{ConfirmedServiceChoice, NetworkPriority, UnconfirmedServiceChoice};
31use bacnet_types::error::Error;
32use bacnet_types::MacAddr;
33
34use crate::discovery::{DeviceTable, DiscoveredDevice};
35use crate::segmentation::{max_segment_payload, split_payload, SegmentReceiver, SegmentedPduType};
36use crate::tsm::{Tsm, TsmConfig, TsmResponse};
37
38#[derive(Debug, Clone)]
40pub struct ClientConfig {
41 pub interface: Ipv4Addr,
43 pub port: u16,
45 pub broadcast_address: Ipv4Addr,
47 pub apdu_timeout_ms: u64,
49 pub apdu_retries: u8,
51 pub max_apdu_length: u16,
53 pub max_segments: Option<u8>,
55 pub segmented_response_accepted: bool,
57 pub proposed_window_size: u8,
59}
60
61impl Default for ClientConfig {
62 fn default() -> Self {
63 Self {
64 interface: Ipv4Addr::UNSPECIFIED,
65 port: 0xBAC0,
66 broadcast_address: Ipv4Addr::BROADCAST,
67 apdu_timeout_ms: 6000,
68 apdu_retries: 3,
69 max_apdu_length: 1476,
70 max_segments: None,
71 segmented_response_accepted: true,
72 proposed_window_size: 1,
73 }
74 }
75}
76
77pub struct ClientBuilder<T: TransportPort> {
79 config: ClientConfig,
80 transport: Option<T>,
81}
82
83impl<T: TransportPort + 'static> ClientBuilder<T> {
84 pub fn transport(mut self, transport: T) -> Self {
86 self.transport = Some(transport);
87 self
88 }
89
90 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
92 self.config.apdu_timeout_ms = ms;
93 self
94 }
95
96 pub fn max_apdu_length(mut self, len: u16) -> Self {
98 self.config.max_apdu_length = len;
99 self
100 }
101
102 pub async fn build(self) -> Result<BACnetClient<T>, Error> {
104 let transport = self
105 .transport
106 .ok_or_else(|| Error::Encoding("transport not set on ClientBuilder".into()))?;
107 BACnetClient::start(self.config, transport).await
108 }
109}
110
111pub struct BipClientBuilder {
113 config: ClientConfig,
114}
115
116impl BipClientBuilder {
117 pub fn interface(mut self, ip: Ipv4Addr) -> Self {
119 self.config.interface = ip;
120 self
121 }
122
123 pub fn port(mut self, port: u16) -> Self {
125 self.config.port = port;
126 self
127 }
128
129 pub fn broadcast_address(mut self, addr: Ipv4Addr) -> Self {
131 self.config.broadcast_address = addr;
132 self
133 }
134
135 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
137 self.config.apdu_timeout_ms = ms;
138 self
139 }
140
141 pub fn max_apdu_length(mut self, len: u16) -> Self {
143 self.config.max_apdu_length = len;
144 self
145 }
146
147 pub async fn build(self) -> Result<BACnetClient<BipTransport>, Error> {
149 let transport = BipTransport::new(
150 self.config.interface,
151 self.config.port,
152 self.config.broadcast_address,
153 );
154 BACnetClient::start(self.config, transport).await
155 }
156}
157
158const DEFAULT_BATCH_CONCURRENCY: usize = 32;
164
165#[derive(Debug, Clone)]
167pub struct DeviceReadRequest {
168 pub device_instance: u32,
170 pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
172 pub property_identifier: bacnet_types::enums::PropertyIdentifier,
174 pub property_array_index: Option<u32>,
176}
177
178#[derive(Debug)]
180pub struct DeviceReadResult {
181 pub device_instance: u32,
183 pub result: Result<bacnet_services::read_property::ReadPropertyACK, Error>,
185}
186
187#[derive(Debug, Clone)]
189pub struct DeviceRpmRequest {
190 pub device_instance: u32,
192 pub specs: Vec<bacnet_services::rpm::ReadAccessSpecification>,
194}
195
196#[derive(Debug)]
198pub struct DeviceRpmResult {
199 pub device_instance: u32,
201 pub result: Result<bacnet_services::rpm::ReadPropertyMultipleACK, Error>,
203}
204
205#[derive(Debug, Clone)]
207pub struct DeviceWriteRequest {
208 pub device_instance: u32,
210 pub object_identifier: bacnet_types::primitives::ObjectIdentifier,
212 pub property_identifier: bacnet_types::enums::PropertyIdentifier,
214 pub property_array_index: Option<u32>,
216 pub property_value: Vec<u8>,
218 pub priority: Option<u8>,
220}
221
222#[derive(Debug)]
224pub struct DeviceWriteResult {
225 pub device_instance: u32,
227 pub result: Result<(), Error>,
229}
230
231struct SegmentedReceiveState {
233 receiver: SegmentReceiver,
234 reply_mac: MacAddr,
236 expected_next_seq: u8,
238 last_activity: Instant,
240 window_position: u8,
242 proposed_window_size: u8,
244}
245
246const SEG_RECEIVER_TIMEOUT: Duration = Duration::from_secs(4);
248
249type SegKey = (MacAddr, u8);
251
252pub struct BACnetClient<T: TransportPort> {
254 config: ClientConfig,
255 network: Arc<NetworkLayer<T>>,
256 tsm: Arc<Mutex<Tsm>>,
257 device_table: Arc<Mutex<DeviceTable>>,
258 cov_tx: broadcast::Sender<COVNotificationRequest>,
259 dispatch_task: Option<JoinHandle<()>>,
260 seg_ack_senders: Arc<Mutex<HashMap<SegKey, mpsc::Sender<SegmentAckPdu>>>>,
261 local_mac: MacAddr,
262}
263
264impl BACnetClient<BipTransport> {
265 pub fn bip_builder() -> BipClientBuilder {
267 BipClientBuilder {
268 config: ClientConfig::default(),
269 }
270 }
271
272 pub fn builder() -> BipClientBuilder {
273 Self::bip_builder()
274 }
275
276 pub async fn read_bdt(
278 &self,
279 target: &[u8],
280 ) -> Result<Vec<bacnet_transport::bbmd::BdtEntry>, Error> {
281 self.network.transport().read_bdt(target).await
282 }
283
284 pub async fn write_bdt(
286 &self,
287 target: &[u8],
288 entries: &[bacnet_transport::bbmd::BdtEntry],
289 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
290 self.network.transport().write_bdt(target, entries).await
291 }
292
293 pub async fn read_fdt(
295 &self,
296 target: &[u8],
297 ) -> Result<Vec<bacnet_transport::bbmd::FdtEntryWire>, Error> {
298 self.network.transport().read_fdt(target).await
299 }
300
301 pub async fn delete_fdt_entry(
303 &self,
304 target: &[u8],
305 ip: [u8; 4],
306 port: u16,
307 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
308 self.network
309 .transport()
310 .delete_fdt_entry(target, ip, port)
311 .await
312 }
313
314 pub async fn register_foreign_device_bvlc(
316 &self,
317 target: &[u8],
318 ttl: u16,
319 ) -> Result<bacnet_types::enums::BvlcResultCode, Error> {
320 self.network
321 .transport()
322 .register_foreign_device_bvlc(target, ttl)
323 .await
324 }
325}
326
327#[cfg(feature = "ipv6")]
328impl BACnetClient<Bip6Transport> {
329 pub fn bip6_builder() -> Bip6ClientBuilder {
331 Bip6ClientBuilder {
332 config: ClientConfig::default(),
333 interface: Ipv6Addr::UNSPECIFIED,
334 device_instance: None,
335 }
336 }
337}
338
339#[cfg(feature = "ipv6")]
341pub struct Bip6ClientBuilder {
342 config: ClientConfig,
343 interface: Ipv6Addr,
344 device_instance: Option<u32>,
345}
346
347#[cfg(feature = "ipv6")]
348impl Bip6ClientBuilder {
349 pub fn interface(mut self, ip: Ipv6Addr) -> Self {
351 self.interface = ip;
352 self
353 }
354
355 pub fn port(mut self, port: u16) -> Self {
357 self.config.port = port;
358 self
359 }
360
361 pub fn device_instance(mut self, instance: u32) -> Self {
363 self.device_instance = Some(instance);
364 self
365 }
366
367 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
369 self.config.apdu_timeout_ms = ms;
370 self
371 }
372
373 pub fn max_apdu_length(mut self, len: u16) -> Self {
375 self.config.max_apdu_length = len;
376 self
377 }
378
379 pub async fn build(self) -> Result<BACnetClient<Bip6Transport>, Error> {
381 let transport = Bip6Transport::new(self.interface, self.config.port, self.device_instance);
382 BACnetClient::start(self.config, transport).await
383 }
384}
385
386#[cfg(feature = "sc-tls")]
387impl BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>> {
388 pub fn sc_builder() -> ScClientBuilder {
390 ScClientBuilder {
391 config: ClientConfig::default(),
392 hub_url: String::new(),
393 tls_config: None,
394 vmac: [0; 6],
395 heartbeat_interval_ms: 30_000,
396 heartbeat_timeout_ms: 60_000,
397 reconnect: None,
398 }
399 }
400}
401
402#[cfg(feature = "sc-tls")]
406pub struct ScClientBuilder {
407 config: ClientConfig,
408 hub_url: String,
409 tls_config: Option<std::sync::Arc<tokio_rustls::rustls::ClientConfig>>,
410 vmac: bacnet_transport::sc_frame::Vmac,
411 heartbeat_interval_ms: u64,
412 heartbeat_timeout_ms: u64,
413 reconnect: Option<bacnet_transport::sc::ScReconnectConfig>,
414}
415
416#[cfg(feature = "sc-tls")]
417impl ScClientBuilder {
418 pub fn hub_url(mut self, url: &str) -> Self {
420 self.hub_url = url.to_string();
421 self
422 }
423
424 pub fn tls_config(
426 mut self,
427 config: std::sync::Arc<tokio_rustls::rustls::ClientConfig>,
428 ) -> Self {
429 self.tls_config = Some(config);
430 self
431 }
432
433 pub fn vmac(mut self, vmac: [u8; 6]) -> Self {
435 self.vmac = vmac;
436 self
437 }
438
439 pub fn apdu_timeout_ms(mut self, ms: u64) -> Self {
441 self.config.apdu_timeout_ms = ms;
442 self
443 }
444
445 pub fn heartbeat_interval_ms(mut self, ms: u64) -> Self {
447 self.heartbeat_interval_ms = ms;
448 self
449 }
450
451 pub fn heartbeat_timeout_ms(mut self, ms: u64) -> Self {
453 self.heartbeat_timeout_ms = ms;
454 self
455 }
456
457 pub fn reconnect(mut self, config: bacnet_transport::sc::ScReconnectConfig) -> Self {
459 self.reconnect = Some(config);
460 self
461 }
462
463 pub async fn build(
465 self,
466 ) -> Result<
467 BACnetClient<bacnet_transport::sc::ScTransport<bacnet_transport::sc_tls::TlsWebSocket>>,
468 Error,
469 > {
470 let tls_config = self
471 .tls_config
472 .ok_or_else(|| Error::Encoding("SC client builder: tls_config is required".into()))?;
473
474 let ws = bacnet_transport::sc_tls::TlsWebSocket::connect(&self.hub_url, tls_config).await?;
475
476 let mut transport = bacnet_transport::sc::ScTransport::new(ws, self.vmac)
477 .with_heartbeat_interval_ms(self.heartbeat_interval_ms)
478 .with_heartbeat_timeout_ms(self.heartbeat_timeout_ms);
479 if let Some(rc) = self.reconnect {
480 #[allow(deprecated)]
481 {
482 transport = transport.with_reconnect(rc);
483 }
484 }
485
486 BACnetClient::start(self.config, transport).await
487 }
488}
489
490#[derive(Clone, Copy)]
492enum ConfirmedTarget<'a> {
493 Local {
494 mac: &'a [u8],
495 },
496 Routed {
497 router_mac: &'a [u8],
498 dest_network: u16,
499 dest_mac: &'a [u8],
500 },
501}
502
503impl<'a> ConfirmedTarget<'a> {
504 fn tsm_mac(&self) -> MacAddr {
506 match self {
507 Self::Local { mac } => MacAddr::from_slice(mac),
508 Self::Routed {
509 dest_network,
510 dest_mac,
511 ..
512 } => routed_tsm_mac(*dest_network, dest_mac),
513 }
514 }
515}
516
517fn routed_tsm_mac(network: u16, mac: &[u8]) -> MacAddr {
518 let mut key = MacAddr::new();
519 key.extend_from_slice(&[0xFF, b'R']);
520 key.extend_from_slice(&network.to_be_bytes());
521 key.push(mac.len() as u8);
522 key.extend_from_slice(mac);
523 key
524}
525
526fn response_tsm_mac(source_mac: &[u8], source_network: &Option<NpduAddress>) -> MacAddr {
527 match source_network {
528 Some(address) if !address.mac_address.is_empty() => {
529 routed_tsm_mac(address.network, &address.mac_address)
530 }
531 _ => MacAddr::from_slice(source_mac),
532 }
533}
534
535mod cov;
536mod device_mgmt;
537mod discovery;
538mod dispatch;
539mod file_list;
540mod lifecycle;
541mod object_mgmt;
542mod property;
543mod requests;
544mod segmentation;
545
546#[cfg(test)]
547mod tests;
548
549impl<T: TransportPort + 'static> BACnetClient<T> {
550 pub fn generic_builder() -> ClientBuilder<T> {
552 ClientBuilder {
553 config: ClientConfig::default(),
554 transport: None,
555 }
556 }
557}