Skip to main content

rustbac_client/
server.rs

1//! BACnet server/responder implementation.
2//!
3//! [`BacnetServer`] binds a [`DataLink`] transport and dispatches incoming
4//! service requests to a user-supplied [`ServiceHandler`].  [`ObjectStore`]
5//! is a convenient thread-safe property store that implements
6//! [`ServiceHandler`] out of the box.
7
8use crate::ClientDataValue;
9use rustbac_core::apdu::{
10    ApduType, ComplexAckHeader, ConfirmedRequestHeader, SimpleAck, UnconfirmedRequestHeader,
11};
12use rustbac_core::encoding::{
13    primitives::{decode_unsigned, encode_ctx_unsigned},
14    reader::Reader,
15    tag::Tag,
16    writer::Writer,
17};
18use rustbac_core::npdu::Npdu;
19use rustbac_core::services::i_am::IAmRequest;
20use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
21use rustbac_core::services::read_property_multiple::SERVICE_READ_PROPERTY_MULTIPLE;
22use rustbac_core::services::value_codec::encode_application_data_value;
23use rustbac_core::services::write_property::SERVICE_WRITE_PROPERTY;
24use rustbac_core::types::{ObjectId, ObjectType, PropertyId};
25
26/// WritePropertyMultiple service choice (0x10).
27const SERVICE_WRITE_PROPERTY_MULTIPLE: u8 = 0x10;
28/// SubscribeCOV service choice (0x05).
29const SERVICE_SUBSCRIBE_COV: u8 = 0x05;
30/// CreateObject service choice (0x0A).
31const SERVICE_CREATE_OBJECT: u8 = 0x0A;
32/// DeleteObject service choice (0x0B).
33const SERVICE_DELETE_OBJECT: u8 = 0x0B;
34use rustbac_datalink::{DataLink, DataLinkAddress};
35use std::collections::HashMap;
36use std::sync::{Arc, Mutex};
37
38// ─────────────────────────────────────────────────────────────────────────────
39// BacnetServiceError
40// ─────────────────────────────────────────────────────────────────────────────
41
42/// Errors that a [`ServiceHandler`] may return.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum BacnetServiceError {
45    /// The addressed object does not exist.
46    UnknownObject,
47    /// The property does not exist on the object.
48    UnknownProperty,
49    /// The property is not writable.
50    WriteAccessDenied,
51    /// The supplied value is of the wrong type.
52    InvalidDataType,
53    /// The service is not supported by this server.
54    ServiceNotSupported,
55}
56
57impl BacnetServiceError {
58    /// Map the error to a (error_class, error_code) pair for the wire.
59    fn to_error_class_code(self) -> (u8, u8) {
60        match self {
61            // error-class: object (1), error-code: unknown-object (31)
62            BacnetServiceError::UnknownObject => (1, 31),
63            // error-class: property (2), error-code: unknown-property (32)
64            BacnetServiceError::UnknownProperty => (2, 32),
65            // error-class: property (2), error-code: write-access-denied (40)
66            BacnetServiceError::WriteAccessDenied => (2, 40),
67            // error-class: property (2), error-code: invalid-data-type (9)
68            BacnetServiceError::InvalidDataType => (2, 9),
69            // error-class: services (5), error-code: service-not-supported (53)
70            BacnetServiceError::ServiceNotSupported => (5, 53),
71        }
72    }
73}
74
75// ─────────────────────────────────────────────────────────────────────────────
76// ServiceHandler trait
77// ─────────────────────────────────────────────────────────────────────────────
78
79/// Handler trait that the server calls for each incoming service request.
80pub trait ServiceHandler: Send + Sync + 'static {
81    /// Called for a ReadProperty confirmed request.
82    fn read_property(
83        &self,
84        object_id: ObjectId,
85        property_id: PropertyId,
86        array_index: Option<u32>,
87    ) -> Result<ClientDataValue, BacnetServiceError>;
88
89    /// Called for a WriteProperty confirmed request.
90    fn write_property(
91        &self,
92        object_id: ObjectId,
93        property_id: PropertyId,
94        array_index: Option<u32>,
95        value: ClientDataValue,
96        priority: Option<u8>,
97    ) -> Result<(), BacnetServiceError>;
98
99    /// Called for a WritePropertyMultiple confirmed request.
100    ///
101    /// Each element of `specs` is `(object_id, vec_of_(property_id, array_index, value, priority))`.
102    /// The default implementation rejects with [`BacnetServiceError::WriteAccessDenied`].
103    #[allow(clippy::type_complexity)]
104    fn write_property_multiple(
105        &self,
106        _specs: &[(
107            ObjectId,
108            Vec<(PropertyId, Option<u32>, ClientDataValue, Option<u8>)>,
109        )],
110    ) -> Result<(), BacnetServiceError> {
111        Err(BacnetServiceError::WriteAccessDenied)
112    }
113
114    /// Called for a CreateObject confirmed request.
115    ///
116    /// The default implementation rejects with [`BacnetServiceError::WriteAccessDenied`].
117    fn create_object(
118        &self,
119        _object_type: rustbac_core::types::ObjectType,
120    ) -> Result<ObjectId, BacnetServiceError> {
121        Err(BacnetServiceError::WriteAccessDenied)
122    }
123
124    /// Called for a DeleteObject confirmed request.
125    ///
126    /// The default implementation rejects with [`BacnetServiceError::WriteAccessDenied`].
127    fn delete_object(&self, _object_id: ObjectId) -> Result<(), BacnetServiceError> {
128        Err(BacnetServiceError::WriteAccessDenied)
129    }
130
131    /// Called for a SubscribeCOV confirmed request.
132    ///
133    /// The default implementation rejects with [`BacnetServiceError::UnknownObject`].
134    fn subscribe_cov(
135        &self,
136        _subscriber_process_id: u32,
137        _monitored_object_id: ObjectId,
138        _issue_confirmed: bool,
139        _lifetime: Option<u32>,
140    ) -> Result<(), BacnetServiceError> {
141        Err(BacnetServiceError::UnknownObject)
142    }
143}
144
145// ─────────────────────────────────────────────────────────────────────────────
146// ObjectStore
147// ─────────────────────────────────────────────────────────────────────────────
148
149/// Thread-safe property store backed by `Mutex<HashMap<ObjectId, HashMap<PropertyId, ClientDataValue>>>`.
150pub struct ObjectStore {
151    inner: Mutex<HashMap<ObjectId, HashMap<PropertyId, ClientDataValue>>>,
152}
153
154impl ObjectStore {
155    /// Create an empty store.
156    pub fn new() -> Self {
157        Self {
158            inner: Mutex::new(HashMap::new()),
159        }
160    }
161
162    /// Insert or overwrite a property value.
163    pub fn set(&self, object_id: ObjectId, property_id: PropertyId, value: ClientDataValue) {
164        let mut map = self.inner.lock().expect("ObjectStore lock poisoned");
165        map.entry(object_id).or_default().insert(property_id, value);
166    }
167
168    /// Retrieve a property value, returning `None` if the object or property is absent.
169    pub fn get(&self, object_id: ObjectId, property_id: PropertyId) -> Option<ClientDataValue> {
170        let map = self.inner.lock().expect("ObjectStore lock poisoned");
171        map.get(&object_id)?.get(&property_id).cloned()
172    }
173
174    /// Remove all properties associated with an object.
175    pub fn remove_object(&self, object_id: ObjectId) {
176        let mut map = self.inner.lock().expect("ObjectStore lock poisoned");
177        map.remove(&object_id);
178    }
179
180    /// Return a snapshot of all known object identifiers.
181    pub fn object_ids(&self) -> Vec<ObjectId> {
182        let map = self.inner.lock().expect("ObjectStore lock poisoned");
183        map.keys().copied().collect()
184    }
185}
186
187impl Default for ObjectStore {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193// ─────────────────────────────────────────────────────────────────────────────
194// ObjectStoreHandler
195// ─────────────────────────────────────────────────────────────────────────────
196
197/// A [`ServiceHandler`] that delegates directly to an [`Arc<ObjectStore>`].
198///
199/// ReadProperty returns the stored value or the appropriate error.
200/// WriteProperty always accepts writes (no write-protection logic).
201pub struct ObjectStoreHandler {
202    store: Arc<ObjectStore>,
203}
204
205impl ObjectStoreHandler {
206    /// Wrap an existing shared store.
207    pub fn new(store: Arc<ObjectStore>) -> Self {
208        Self { store }
209    }
210}
211
212impl ServiceHandler for ObjectStoreHandler {
213    fn read_property(
214        &self,
215        object_id: ObjectId,
216        property_id: PropertyId,
217        _array_index: Option<u32>,
218    ) -> Result<ClientDataValue, BacnetServiceError> {
219        // Check object existence first.
220        let map = self.store.inner.lock().expect("ObjectStore lock poisoned");
221        let props = map
222            .get(&object_id)
223            .ok_or(BacnetServiceError::UnknownObject)?;
224        props
225            .get(&property_id)
226            .cloned()
227            .ok_or(BacnetServiceError::UnknownProperty)
228    }
229
230    fn write_property(
231        &self,
232        object_id: ObjectId,
233        property_id: PropertyId,
234        _array_index: Option<u32>,
235        value: ClientDataValue,
236        _priority: Option<u8>,
237    ) -> Result<(), BacnetServiceError> {
238        let mut map = self.store.inner.lock().expect("ObjectStore lock poisoned");
239        // Write is only permitted if the object already exists.
240        let props = map
241            .get_mut(&object_id)
242            .ok_or(BacnetServiceError::UnknownObject)?;
243        props.insert(property_id, value);
244        Ok(())
245    }
246}
247
248// ─────────────────────────────────────────────────────────────────────────────
249// BacnetServer
250// ─────────────────────────────────────────────────────────────────────────────
251
252/// A server that binds a [`DataLink`] and dispatches incoming service requests.
253pub struct BacnetServer<D: DataLink> {
254    datalink: Arc<D>,
255    handler: Arc<dyn ServiceHandler>,
256    device_id: u32,
257    vendor_id: u16,
258    /// Stored for future use in I-Am responses and segmentation negotiation.
259    #[allow(dead_code)]
260    max_apdu: u8,
261}
262
263impl<D: DataLink> BacnetServer<D> {
264    /// Create a new server with the given datalink and device instance number.
265    pub fn new(datalink: D, device_id: u32, handler: impl ServiceHandler) -> Self {
266        Self {
267            datalink: Arc::new(datalink),
268            handler: Arc::new(handler),
269            device_id,
270            vendor_id: 0,
271            max_apdu: 5, // standard max APDU size index 5 → 1476 bytes
272        }
273    }
274
275    /// Override the vendor ID sent in I-Am responses (default: 0).
276    pub fn with_vendor_id(mut self, vendor_id: u16) -> Self {
277        self.vendor_id = vendor_id;
278        self
279    }
280
281    /// Run the serve loop.
282    ///
283    /// Receives frames, parses them, and dispatches:
284    /// - UnconfirmedRequest Who-Is (0x08) → I-Am; others ignored.
285    /// - ConfirmedRequest ReadProperty (0x0C) → ComplexAck or Error.
286    /// - ConfirmedRequest WriteProperty (0x0F) → SimpleAck or Error.
287    /// - ConfirmedRequest ReadPropertyMultiple (0x0E) → ComplexAck or Error.
288    /// - Any other confirmed service → Reject (UNRECOGNIZED_SERVICE = 0x08).
289    pub async fn serve(self) {
290        let mut buf = [0u8; 1500];
291        loop {
292            let result = self.datalink.recv(&mut buf).await;
293            match result {
294                Ok((n, source)) => {
295                    if let Err(e) = self.handle_frame(&buf[..n], source).await {
296                        log::debug!("server: error handling frame: {e:?}");
297                    }
298                }
299                Err(e) => {
300                    log::debug!("server: datalink recv error: {e:?}");
301                    // On persistent transport errors avoid a tight busy loop.
302                    tokio::task::yield_now().await;
303                }
304            }
305        }
306    }
307
308    // ── private helpers ──────────────────────────────────────────────────────
309
310    async fn handle_frame(
311        &self,
312        frame: &[u8],
313        source: DataLinkAddress,
314    ) -> Result<(), rustbac_core::DecodeError> {
315        let mut r = Reader::new(frame);
316        let _npdu = Npdu::decode(&mut r)?;
317
318        if r.is_empty() {
319            return Ok(());
320        }
321
322        let first = r.peek_u8()?;
323        let apdu_type = ApduType::from_u8(first >> 4);
324
325        match apdu_type {
326            Some(ApduType::UnconfirmedRequest) => {
327                let header = UnconfirmedRequestHeader::decode(&mut r)?;
328                if header.service_choice == 0x08 {
329                    // Who-Is — parse optional limits then respond.
330                    let limits = decode_who_is_limits(&mut r);
331                    if matches_who_is(self.device_id, limits) {
332                        self.send_i_am(source).await;
333                    }
334                }
335                // All other unconfirmed services are ignored.
336            }
337            Some(ApduType::ConfirmedRequest) => {
338                let header = ConfirmedRequestHeader::decode(&mut r)?;
339                let invoke_id = header.invoke_id;
340                match header.service_choice {
341                    SERVICE_READ_PROPERTY => {
342                        self.handle_read_property(&mut r, invoke_id, source).await;
343                    }
344                    SERVICE_WRITE_PROPERTY => {
345                        self.handle_write_property(&mut r, invoke_id, source).await;
346                    }
347                    SERVICE_READ_PROPERTY_MULTIPLE => {
348                        self.handle_read_property_multiple(&mut r, invoke_id, source)
349                            .await;
350                    }
351                    SERVICE_WRITE_PROPERTY_MULTIPLE => {
352                        self.handle_write_property_multiple(&mut r, invoke_id, source)
353                            .await;
354                    }
355                    SERVICE_SUBSCRIBE_COV => {
356                        self.handle_subscribe_cov(&mut r, invoke_id, source).await;
357                    }
358                    SERVICE_CREATE_OBJECT => {
359                        self.handle_create_object(&mut r, invoke_id, source).await;
360                    }
361                    SERVICE_DELETE_OBJECT => {
362                        self.handle_delete_object(&mut r, invoke_id, source).await;
363                    }
364                    _ => {
365                        // Unknown service — send Reject with UNRECOGNIZED_SERVICE.
366                        self.send_reject(invoke_id, 0x08, source).await;
367                    }
368                }
369            }
370            _ => {
371                // Not a request — ignore.
372            }
373        }
374
375        Ok(())
376    }
377
378    async fn send_i_am(&self, target: DataLinkAddress) {
379        let device_id_raw = rustbac_core::types::ObjectId::new(
380            rustbac_core::types::ObjectType::Device,
381            self.device_id,
382        );
383        let req = IAmRequest {
384            device_id: device_id_raw,
385            max_apdu: 1476,
386            segmentation: 3, // no-segmentation
387            vendor_id: self.vendor_id as u32,
388        };
389        let mut buf = [0u8; 128];
390        let mut w = Writer::new(&mut buf);
391        if Npdu::new(0).encode(&mut w).is_err() {
392            return;
393        }
394        if req.encode(&mut w).is_err() {
395            return;
396        }
397        let _ = self.datalink.send(target, w.as_written()).await;
398    }
399
400    async fn handle_read_property(
401        &self,
402        r: &mut Reader<'_>,
403        invoke_id: u8,
404        source: DataLinkAddress,
405    ) {
406        // Decode: object_id [0], property_id [1], optional array_index [2].
407        let object_id = match crate::decode_ctx_object_id(r) {
408            Ok(v) => v,
409            Err(_) => return,
410        };
411        let property_id_raw = match crate::decode_ctx_unsigned(r) {
412            Ok(v) => v,
413            Err(_) => return,
414        };
415        let property_id = PropertyId::from_u32(property_id_raw);
416
417        // Optional array index.
418        let array_index = decode_optional_array_index(r);
419
420        match self
421            .handler
422            .read_property(object_id, property_id, array_index)
423        {
424            Ok(value) => {
425                let borrowed = client_value_to_borrowed(&value);
426                let mut buf = [0u8; 1400];
427                let mut w = Writer::new(&mut buf);
428                if Npdu::new(0).encode(&mut w).is_err() {
429                    return;
430                }
431                if (ComplexAckHeader {
432                    segmented: false,
433                    more_follows: false,
434                    invoke_id,
435                    sequence_number: None,
436                    proposed_window_size: None,
437                    service_choice: SERVICE_READ_PROPERTY,
438                })
439                .encode(&mut w)
440                .is_err()
441                {
442                    return;
443                }
444                if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
445                    return;
446                }
447                if encode_ctx_unsigned(&mut w, 1, property_id.to_u32()).is_err() {
448                    return;
449                }
450                if (Tag::Opening { tag_num: 3 }).encode(&mut w).is_err() {
451                    return;
452                }
453                if encode_application_data_value(&mut w, &borrowed).is_err() {
454                    return;
455                }
456                if (Tag::Closing { tag_num: 3 }).encode(&mut w).is_err() {
457                    return;
458                }
459                let _ = self.datalink.send(source, w.as_written()).await;
460            }
461            Err(err) => {
462                self.send_error(invoke_id, SERVICE_READ_PROPERTY, err, source)
463                    .await;
464            }
465        }
466    }
467
468    async fn handle_write_property(
469        &self,
470        r: &mut Reader<'_>,
471        invoke_id: u8,
472        source: DataLinkAddress,
473    ) {
474        // Decode: object_id [0], property_id [1], optional array_index [2], value [3], optional priority [4].
475        let object_id = match crate::decode_ctx_object_id(r) {
476            Ok(v) => v,
477            Err(_) => return,
478        };
479        let property_id_raw = match crate::decode_ctx_unsigned(r) {
480            Ok(v) => v,
481            Err(_) => return,
482        };
483        let property_id = PropertyId::from_u32(property_id_raw);
484
485        // Optional array index [2].
486        let next_tag = match Tag::decode(r) {
487            Ok(t) => t,
488            Err(_) => return,
489        };
490        let (array_index, value_start_tag) = match next_tag {
491            Tag::Context { tag_num: 2, len } => {
492                let idx = match decode_unsigned(r, len as usize) {
493                    Ok(v) => v,
494                    Err(_) => return,
495                };
496                let vt = match Tag::decode(r) {
497                    Ok(t) => t,
498                    Err(_) => return,
499                };
500                (Some(idx), vt)
501            }
502            other => (None, other),
503        };
504
505        if value_start_tag != (Tag::Opening { tag_num: 3 }) {
506            return;
507        }
508
509        let val = match rustbac_core::services::value_codec::decode_application_data_value(r) {
510            Ok(v) => v,
511            Err(_) => return,
512        };
513
514        match Tag::decode(r) {
515            Ok(Tag::Closing { tag_num: 3 }) => {}
516            _ => return,
517        }
518
519        // Optional priority [4].
520        let priority = if !r.is_empty() {
521            match Tag::decode(r) {
522                Ok(Tag::Context { tag_num: 4, len }) => match decode_unsigned(r, len as usize) {
523                    Ok(p) => Some(p as u8),
524                    Err(_) => return,
525                },
526                _ => None,
527            }
528        } else {
529            None
530        };
531
532        let client_val = crate::data_value_to_client(val);
533
534        match self
535            .handler
536            .write_property(object_id, property_id, array_index, client_val, priority)
537        {
538            Ok(()) => {
539                let mut buf = [0u8; 32];
540                let mut w = Writer::new(&mut buf);
541                if Npdu::new(0).encode(&mut w).is_err() {
542                    return;
543                }
544                if (SimpleAck {
545                    invoke_id,
546                    service_choice: SERVICE_WRITE_PROPERTY,
547                })
548                .encode(&mut w)
549                .is_err()
550                {
551                    return;
552                }
553                let _ = self.datalink.send(source, w.as_written()).await;
554            }
555            Err(err) => {
556                self.send_error(invoke_id, SERVICE_WRITE_PROPERTY, err, source)
557                    .await;
558            }
559        }
560    }
561
562    async fn handle_read_property_multiple(
563        &self,
564        r: &mut Reader<'_>,
565        invoke_id: u8,
566        source: DataLinkAddress,
567    ) {
568        type PropRefs = Vec<(PropertyId, Option<u32>)>;
569        // Collect all (object_id, [(property_id, array_index)]) specs from the request.
570        let mut specs: Vec<(ObjectId, PropRefs)> = Vec::new();
571
572        while !r.is_empty() {
573            // object-identifier [0]
574            let object_id = match crate::decode_ctx_object_id(r) {
575                Ok(v) => v,
576                Err(_) => return,
577            };
578
579            // list-of-property-references [1] opening tag
580            match Tag::decode(r) {
581                Ok(Tag::Opening { tag_num: 1 }) => {}
582                _ => return,
583            }
584
585            let mut props: Vec<(PropertyId, Option<u32>)> = Vec::new();
586            loop {
587                // Each property reference: property-identifier [0], optional array-index [1].
588                let tag = match Tag::decode(r) {
589                    Ok(t) => t,
590                    Err(_) => return,
591                };
592                if tag == (Tag::Closing { tag_num: 1 }) {
593                    break;
594                }
595                let property_id = match tag {
596                    Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
597                        Ok(v) => PropertyId::from_u32(v),
598                        Err(_) => return,
599                    },
600                    _ => return,
601                };
602
603                // Optional array index [1].
604                let array_index = if !r.is_empty() {
605                    // peek next tag without consuming
606                    match peek_context_tag(r, 1) {
607                        Some(len) => {
608                            // consume the tag byte(s) we already peeked
609                            match Tag::decode(r) {
610                                Ok(_) => {}
611                                Err(_) => return,
612                            }
613                            match decode_unsigned(r, len as usize) {
614                                Ok(idx) => Some(idx),
615                                Err(_) => return,
616                            }
617                        }
618                        None => None,
619                    }
620                } else {
621                    None
622                };
623
624                props.push((property_id, array_index));
625            }
626
627            specs.push((object_id, props));
628        }
629
630        // Build response buffer.
631        let mut buf = [0u8; 1400];
632        let mut w = Writer::new(&mut buf);
633        if Npdu::new(0).encode(&mut w).is_err() {
634            return;
635        }
636        if (ComplexAckHeader {
637            segmented: false,
638            more_follows: false,
639            invoke_id,
640            sequence_number: None,
641            proposed_window_size: None,
642            service_choice: SERVICE_READ_PROPERTY_MULTIPLE,
643        })
644        .encode(&mut w)
645        .is_err()
646        {
647            return;
648        }
649
650        for (object_id, props) in &specs {
651            // object-identifier [0]
652            if encode_ctx_unsigned(&mut w, 0, object_id.raw()).is_err() {
653                return;
654            }
655            // list-of-results [1] opening
656            if (Tag::Opening { tag_num: 1 }).encode(&mut w).is_err() {
657                return;
658            }
659
660            for (property_id, array_index) in props {
661                // property-identifier [2]
662                if encode_ctx_unsigned(&mut w, 2, property_id.to_u32()).is_err() {
663                    return;
664                }
665                // optional array-index [3]
666                if let Some(idx) = array_index {
667                    if encode_ctx_unsigned(&mut w, 3, *idx).is_err() {
668                        return;
669                    }
670                }
671
672                // property-access-result [4] opening
673                if (Tag::Opening { tag_num: 4 }).encode(&mut w).is_err() {
674                    return;
675                }
676
677                match self
678                    .handler
679                    .read_property(*object_id, *property_id, *array_index)
680                {
681                    Ok(value) => {
682                        let borrowed = client_value_to_borrowed(&value);
683                        if encode_application_data_value(&mut w, &borrowed).is_err() {
684                            return;
685                        }
686                    }
687                    Err(err) => {
688                        // Encode property-access-error [5] with errorClass [0] / errorCode [1].
689                        let (class, code) = err.to_error_class_code();
690                        if (Tag::Opening { tag_num: 5 }).encode(&mut w).is_err() {
691                            return;
692                        }
693                        if encode_ctx_unsigned(&mut w, 0, class as u32).is_err() {
694                            return;
695                        }
696                        if encode_ctx_unsigned(&mut w, 1, code as u32).is_err() {
697                            return;
698                        }
699                        if (Tag::Closing { tag_num: 5 }).encode(&mut w).is_err() {
700                            return;
701                        }
702                    }
703                }
704
705                // property-access-result [4] closing
706                if (Tag::Closing { tag_num: 4 }).encode(&mut w).is_err() {
707                    return;
708                }
709            }
710
711            // list-of-results [1] closing
712            if (Tag::Closing { tag_num: 1 }).encode(&mut w).is_err() {
713                return;
714            }
715        }
716
717        let _ = self.datalink.send(source, w.as_written()).await;
718    }
719
720    async fn handle_write_property_multiple(
721        &self,
722        r: &mut Reader<'_>,
723        invoke_id: u8,
724        source: DataLinkAddress,
725    ) {
726        // Parse write-access-specifications and call write_property for each property.
727        while !r.is_empty() {
728            let object_id = match crate::decode_ctx_object_id(r) {
729                Ok(v) => v,
730                Err(_) => return,
731            };
732            // Opening tag [1] — list of properties
733            match Tag::decode(r) {
734                Ok(Tag::Opening { tag_num: 1 }) => {}
735                _ => return,
736            }
737            loop {
738                // Check for closing tag [1]
739                let tag = match Tag::decode(r) {
740                    Ok(t) => t,
741                    Err(_) => return,
742                };
743                if tag == (Tag::Closing { tag_num: 1 }) {
744                    break;
745                }
746                // property-identifier [0]
747                let property_id = match tag {
748                    Tag::Context { tag_num: 0, len } => match decode_unsigned(r, len as usize) {
749                        Ok(v) => PropertyId::from_u32(v),
750                        Err(_) => return,
751                    },
752                    _ => return,
753                };
754                // optional array-index [1]
755                let array_index = if !r.is_empty() {
756                    match peek_context_tag(r, 1) {
757                        Some(len) => {
758                            let _ = Tag::decode(r);
759                            decode_unsigned(r, len as usize).ok()
760                        }
761                        None => None,
762                    }
763                } else {
764                    None
765                };
766                // property-value [2] opening
767                match Tag::decode(r) {
768                    Ok(Tag::Opening { tag_num: 2 }) => {}
769                    _ => return,
770                }
771                let val =
772                    match rustbac_core::services::value_codec::decode_application_data_value(r) {
773                        Ok(v) => v,
774                        Err(_) => return,
775                    };
776                match Tag::decode(r) {
777                    Ok(Tag::Closing { tag_num: 2 }) => {}
778                    _ => return,
779                }
780                // optional priority [3]
781                let priority = if !r.is_empty() {
782                    match peek_context_tag(r, 3) {
783                        Some(len) => {
784                            let _ = Tag::decode(r);
785                            decode_unsigned(r, len as usize).ok().map(|p| p as u8)
786                        }
787                        None => None,
788                    }
789                } else {
790                    None
791                };
792                let client_val = crate::data_value_to_client(val);
793                if let Err(err) = self.handler.write_property(
794                    object_id,
795                    property_id,
796                    array_index,
797                    client_val,
798                    priority,
799                ) {
800                    self.send_error(invoke_id, SERVICE_WRITE_PROPERTY_MULTIPLE, err, source)
801                        .await;
802                    return;
803                }
804            }
805        }
806        // All properties written successfully — send SimpleAck.
807        let mut buf = [0u8; 32];
808        let mut w = Writer::new(&mut buf);
809        if Npdu::new(0).encode(&mut w).is_err() {
810            return;
811        }
812        if (SimpleAck {
813            invoke_id,
814            service_choice: SERVICE_WRITE_PROPERTY_MULTIPLE,
815        })
816        .encode(&mut w)
817        .is_err()
818        {
819            return;
820        }
821        let _ = self.datalink.send(source, w.as_written()).await;
822    }
823
824    async fn handle_subscribe_cov(
825        &self,
826        r: &mut Reader<'_>,
827        invoke_id: u8,
828        source: DataLinkAddress,
829    ) {
830        // subscriberProcessIdentifier [0]
831        let subscriber_process_id = match Tag::decode(r) {
832            Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
833                Ok(v) => v,
834                Err(_) => return,
835            },
836            _ => return,
837        };
838        // monitoredObjectIdentifier [1]
839        let monitored_object_id = match crate::decode_ctx_object_id(r) {
840            Ok(v) => v,
841            Err(_) => return,
842        };
843        // issueConfirmedNotifications [2]
844        let issue_confirmed = match Tag::decode(r) {
845            Ok(Tag::Context { tag_num: 2, len }) => match decode_unsigned(r, len as usize) {
846                Ok(v) => v != 0,
847                Err(_) => return,
848            },
849            _ => return,
850        };
851        // optional lifetime [3]
852        let lifetime = if !r.is_empty() {
853            match peek_context_tag(r, 3) {
854                Some(len) => {
855                    let _ = Tag::decode(r);
856                    decode_unsigned(r, len as usize).ok()
857                }
858                None => None,
859            }
860        } else {
861            None
862        };
863
864        match self.handler.subscribe_cov(
865            subscriber_process_id,
866            monitored_object_id,
867            issue_confirmed,
868            lifetime,
869        ) {
870            Ok(()) => {
871                let mut buf = [0u8; 32];
872                let mut w = Writer::new(&mut buf);
873                if Npdu::new(0).encode(&mut w).is_err() {
874                    return;
875                }
876                if (SimpleAck {
877                    invoke_id,
878                    service_choice: SERVICE_SUBSCRIBE_COV,
879                })
880                .encode(&mut w)
881                .is_err()
882                {
883                    return;
884                }
885                let _ = self.datalink.send(source, w.as_written()).await;
886            }
887            Err(err) => {
888                self.send_error(invoke_id, SERVICE_SUBSCRIBE_COV, err, source)
889                    .await;
890            }
891        }
892    }
893
894    async fn handle_create_object(
895        &self,
896        r: &mut Reader<'_>,
897        invoke_id: u8,
898        source: DataLinkAddress,
899    ) {
900        // objectSpecifier [0] opening
901        match Tag::decode(r) {
902            Ok(Tag::Opening { tag_num: 0 }) => {}
903            _ => return,
904        }
905        // objectType [0] — context-tagged enumerated
906        let object_type_raw = match Tag::decode(r) {
907            Ok(Tag::Context { tag_num: 0, len }) => match decode_unsigned(r, len as usize) {
908                Ok(v) => v,
909                Err(_) => return,
910            },
911            _ => return,
912        };
913        let object_type = ObjectType::from_u16(object_type_raw as u16);
914        // objectSpecifier [0] closing
915        match Tag::decode(r) {
916            Ok(Tag::Closing { tag_num: 0 }) => {}
917            _ => return,
918        }
919
920        match self.handler.create_object(object_type) {
921            Ok(created_id) => {
922                let mut buf = [0u8; 64];
923                let mut w = Writer::new(&mut buf);
924                if Npdu::new(0).encode(&mut w).is_err() {
925                    return;
926                }
927                if (ComplexAckHeader {
928                    segmented: false,
929                    more_follows: false,
930                    invoke_id,
931                    sequence_number: None,
932                    proposed_window_size: None,
933                    service_choice: SERVICE_CREATE_OBJECT,
934                })
935                .encode(&mut w)
936                .is_err()
937                {
938                    return;
939                }
940                if encode_ctx_unsigned(&mut w, 0, created_id.raw()).is_err() {
941                    return;
942                }
943                let _ = self.datalink.send(source, w.as_written()).await;
944            }
945            Err(err) => {
946                self.send_error(invoke_id, SERVICE_CREATE_OBJECT, err, source)
947                    .await;
948            }
949        }
950    }
951
952    async fn handle_delete_object(
953        &self,
954        r: &mut Reader<'_>,
955        invoke_id: u8,
956        source: DataLinkAddress,
957    ) {
958        // objectIdentifier — application-tagged
959        let object_id = match crate::decode_ctx_object_id(r) {
960            Ok(v) => v,
961            Err(_) => return,
962        };
963
964        match self.handler.delete_object(object_id) {
965            Ok(()) => {
966                let mut buf = [0u8; 32];
967                let mut w = Writer::new(&mut buf);
968                if Npdu::new(0).encode(&mut w).is_err() {
969                    return;
970                }
971                if (SimpleAck {
972                    invoke_id,
973                    service_choice: SERVICE_DELETE_OBJECT,
974                })
975                .encode(&mut w)
976                .is_err()
977                {
978                    return;
979                }
980                let _ = self.datalink.send(source, w.as_written()).await;
981            }
982            Err(err) => {
983                self.send_error(invoke_id, SERVICE_DELETE_OBJECT, err, source)
984                    .await;
985            }
986        }
987    }
988
989    async fn send_error(
990        &self,
991        invoke_id: u8,
992        service_choice: u8,
993        err: BacnetServiceError,
994        target: DataLinkAddress,
995    ) {
996        let (class, code) = err.to_error_class_code();
997        let mut buf = [0u8; 64];
998        let mut w = Writer::new(&mut buf);
999        if Npdu::new(0).encode(&mut w).is_err() {
1000            return;
1001        }
1002        // Error PDU header: type=5 (Error)
1003        if w.write_u8(0x50).is_err() {
1004            return;
1005        }
1006        if w.write_u8(invoke_id).is_err() {
1007            return;
1008        }
1009        if w.write_u8(service_choice).is_err() {
1010            return;
1011        }
1012        // error-class: application Enumerated
1013        if (Tag::Application {
1014            tag: rustbac_core::encoding::tag::AppTag::Enumerated,
1015            len: 1,
1016        })
1017        .encode(&mut w)
1018        .is_err()
1019        {
1020            return;
1021        }
1022        if w.write_u8(class).is_err() {
1023            return;
1024        }
1025        // error-code: application Enumerated
1026        if (Tag::Application {
1027            tag: rustbac_core::encoding::tag::AppTag::Enumerated,
1028            len: 1,
1029        })
1030        .encode(&mut w)
1031        .is_err()
1032        {
1033            return;
1034        }
1035        if w.write_u8(code).is_err() {
1036            return;
1037        }
1038        let _ = self.datalink.send(target, w.as_written()).await;
1039    }
1040
1041    async fn send_reject(&self, invoke_id: u8, reason: u8, target: DataLinkAddress) {
1042        let mut buf = [0u8; 16];
1043        let mut w = Writer::new(&mut buf);
1044        if Npdu::new(0).encode(&mut w).is_err() {
1045            return;
1046        }
1047        // Reject PDU: type=6 (Reject)
1048        if w.write_u8(0x60).is_err() {
1049            return;
1050        }
1051        if w.write_u8(invoke_id).is_err() {
1052            return;
1053        }
1054        if w.write_u8(reason).is_err() {
1055            return;
1056        }
1057        let _ = self.datalink.send(target, w.as_written()).await;
1058    }
1059}
1060
1061// ─────────────────────────────────────────────────────────────────────────────
1062// Free-standing helpers
1063// ─────────────────────────────────────────────────────────────────────────────
1064
1065/// Decode Who-Is optional [0] low-limit and [1] high-limit.
1066fn decode_who_is_limits(r: &mut Reader<'_>) -> Option<(u32, u32)> {
1067    if r.is_empty() {
1068        return None;
1069    }
1070    let tag0 = Tag::decode(r).ok()?;
1071    let low = match tag0 {
1072        Tag::Context { tag_num: 0, len } => decode_unsigned(r, len as usize).ok()?,
1073        _ => return None,
1074    };
1075    let tag1 = Tag::decode(r).ok()?;
1076    let high = match tag1 {
1077        Tag::Context { tag_num: 1, len } => decode_unsigned(r, len as usize).ok()?,
1078        _ => return None,
1079    };
1080    Some((low, high))
1081}
1082
1083/// Return true when `device_id` falls within the Who-Is range (or it is a global Who-Is).
1084fn matches_who_is(device_id: u32, limits: Option<(u32, u32)>) -> bool {
1085    match limits {
1086        None => true,
1087        Some((low, high)) => device_id >= low && device_id <= high,
1088    }
1089}
1090
1091/// Decode an optional context-tagged array index from the current reader position.
1092///
1093/// This is a non-destructive peek: if the next tag is not a context tag with
1094/// tag_num == 2, `None` is returned and the reader is **not** advanced.
1095fn decode_optional_array_index(r: &mut Reader<'_>) -> Option<u32> {
1096    if r.is_empty() {
1097        return None;
1098    }
1099    // Peek the first byte to see if it could be context tag 2.
1100    let first = r.peek_u8().ok()?;
1101    // Context tag 2 with short form: upper nibble = (tag_num << 4) | 0x08 = 0x28, 0x29, 0x2A, 0x2B
1102    // The tag class bit (bit 3) = 1 for context tags; tag_num is bits 7-4.
1103    // For context tag 2: byte = (2 << 4) | 0x08 | len_byte where len_byte ∈ {1,2,3,4}
1104    // But we should decode properly and put it back if not matching.
1105    // Reader doesn't support un-consuming, so we use a clone.
1106    let tag = Tag::decode(r).ok()?;
1107    match tag {
1108        Tag::Context { tag_num: 2, len } => decode_unsigned(r, len as usize).ok(),
1109        _ => {
1110            // Not an array index tag — put it back by... we can't.
1111            // This function is only called when we know the array index is encoded
1112            // (i.e., right after property_id and before the closing tag in ReadProperty).
1113            // Since Reader has no unget, we rely on the caller to only call this
1114            // after property_id and only if the bytes represent an array index.
1115            // In practice the caller already consumed the property_id tag so any
1116            // remaining tag is either [2] array_index or end-of-frame.
1117            let _ = first; // silence unused warning
1118            None
1119        }
1120    }
1121}
1122
1123/// Peek whether the next tag in `r` is a context tag with the given `tag_num`.
1124/// Returns the `len` field if it matches, `None` otherwise.
1125/// Does NOT advance the reader.
1126fn peek_context_tag(r: &mut Reader<'_>, tag_num: u8) -> Option<u32> {
1127    let first = r.peek_u8().ok()?;
1128    // Short-form context tag: bit3=1 (context), bits 7-4 = tag_num, bits 2-0 = len (0-4).
1129    // Short form len encoding: 0-4 means length is that value (for context tags without extended len).
1130    // Byte layout: [tag_num(4) | class(1) | len(3)] where class bit set means context.
1131    // For context: byte = (tag_num << 4) | 0x08 | short_len  (short_len < 5)
1132    // Closing/Opening tags have short_len = 6 or 7.
1133    let is_context = (first & 0x08) != 0 && (first & 0x07) < 6;
1134    if !is_context {
1135        return None;
1136    }
1137    let this_tag_num = first >> 4;
1138    if this_tag_num != tag_num {
1139        return None;
1140    }
1141    let short_len = first & 0x07;
1142    if short_len < 5 {
1143        Some(short_len as u32)
1144    } else {
1145        // Extended length — not expected for small BACnet property indices, skip.
1146        None
1147    }
1148}
1149
1150/// Convert an owned [`ClientDataValue`] to a borrowed [`rustbac_core::types::DataValue`].
1151fn client_value_to_borrowed(val: &ClientDataValue) -> rustbac_core::types::DataValue<'_> {
1152    use rustbac_core::types::DataValue;
1153    match val {
1154        ClientDataValue::Null => DataValue::Null,
1155        ClientDataValue::Boolean(v) => DataValue::Boolean(*v),
1156        ClientDataValue::Unsigned(v) => DataValue::Unsigned(*v),
1157        ClientDataValue::Signed(v) => DataValue::Signed(*v),
1158        ClientDataValue::Real(v) => DataValue::Real(*v),
1159        ClientDataValue::Double(v) => DataValue::Double(*v),
1160        ClientDataValue::OctetString(v) => DataValue::OctetString(v),
1161        ClientDataValue::CharacterString(v) => DataValue::CharacterString(v),
1162        ClientDataValue::BitString { unused_bits, data } => {
1163            DataValue::BitString(rustbac_core::types::BitString {
1164                unused_bits: *unused_bits,
1165                data,
1166            })
1167        }
1168        ClientDataValue::Enumerated(v) => DataValue::Enumerated(*v),
1169        ClientDataValue::Date(v) => DataValue::Date(*v),
1170        ClientDataValue::Time(v) => DataValue::Time(*v),
1171        ClientDataValue::ObjectId(v) => DataValue::ObjectId(*v),
1172        ClientDataValue::Constructed { tag_num, values } => DataValue::Constructed {
1173            tag_num: *tag_num,
1174            values: values.iter().map(client_value_to_borrowed).collect(),
1175        },
1176    }
1177}
1178
1179// ─────────────────────────────────────────────────────────────────────────────
1180// Writer helper — expose write_u8 used above
1181// ─────────────────────────────────────────────────────────────────────────────
1182
1183/// Thin extension to allow calling `w.write_u8` inside this module.
1184#[allow(dead_code)]
1185trait WriterExt {
1186    fn write_u8(&mut self, b: u8) -> Result<(), rustbac_core::EncodeError>;
1187}
1188
1189impl WriterExt for Writer<'_> {
1190    fn write_u8(&mut self, b: u8) -> Result<(), rustbac_core::EncodeError> {
1191        Writer::write_u8(self, b)
1192    }
1193}
1194
1195// ─────────────────────────────────────────────────────────────────────────────
1196// COV Subscription Manager
1197// ─────────────────────────────────────────────────────────────────────────────
1198
1199/// Tracks active COV subscriptions and generates notifications on property changes.
1200pub struct CovSubscriptionManager {
1201    subscriptions: Mutex<Vec<CovSubscription>>,
1202}
1203
1204struct CovSubscription {
1205    subscriber_process_id: u32,
1206    monitored_object_id: ObjectId,
1207    subscriber_address: DataLinkAddress,
1208    issue_confirmed: bool,
1209    /// Absolute deadline (tokio::time::Instant). None = infinite lifetime.
1210    expires_at: Option<tokio::time::Instant>,
1211}
1212
1213impl CovSubscriptionManager {
1214    /// Create an empty subscription manager.
1215    pub fn new() -> Self {
1216        Self {
1217            subscriptions: Mutex::new(Vec::new()),
1218        }
1219    }
1220
1221    /// Add or renew a subscription. If a subscription with the same
1222    /// (process_id, object_id) already exists, it is renewed.
1223    pub fn subscribe(
1224        &self,
1225        subscriber_process_id: u32,
1226        monitored_object_id: ObjectId,
1227        subscriber_address: DataLinkAddress,
1228        issue_confirmed: bool,
1229        lifetime_seconds: Option<u32>,
1230    ) {
1231        let mut subs = self
1232            .subscriptions
1233            .lock()
1234            .expect("CovSubscriptionManager lock");
1235        // Remove existing subscription with same key
1236        subs.retain(|s| {
1237            !(s.subscriber_process_id == subscriber_process_id
1238                && s.monitored_object_id == monitored_object_id)
1239        });
1240        let expires_at = lifetime_seconds
1241            .map(|secs| tokio::time::Instant::now() + std::time::Duration::from_secs(secs as u64));
1242        subs.push(CovSubscription {
1243            subscriber_process_id,
1244            monitored_object_id,
1245            subscriber_address,
1246            issue_confirmed,
1247            expires_at,
1248        });
1249    }
1250
1251    /// Cancel a subscription identified by (process_id, object_id).
1252    pub fn cancel(&self, subscriber_process_id: u32, monitored_object_id: ObjectId) {
1253        let mut subs = self
1254            .subscriptions
1255            .lock()
1256            .expect("CovSubscriptionManager lock");
1257        subs.retain(|s| {
1258            !(s.subscriber_process_id == subscriber_process_id
1259                && s.monitored_object_id == monitored_object_id)
1260        });
1261    }
1262
1263    /// Remove expired subscriptions.
1264    pub fn purge_expired(&self) {
1265        let now = tokio::time::Instant::now();
1266        let mut subs = self
1267            .subscriptions
1268            .lock()
1269            .expect("CovSubscriptionManager lock");
1270        subs.retain(|s| s.expires_at.map_or(true, |exp| exp > now));
1271    }
1272
1273    /// Get all active subscribers for a given object.
1274    /// Returns (subscriber_address, subscriber_process_id, issue_confirmed).
1275    pub fn subscribers_for(&self, object_id: ObjectId) -> Vec<(DataLinkAddress, u32, bool)> {
1276        let now = tokio::time::Instant::now();
1277        let subs = self
1278            .subscriptions
1279            .lock()
1280            .expect("CovSubscriptionManager lock");
1281        subs.iter()
1282            .filter(|s| {
1283                s.monitored_object_id == object_id && s.expires_at.map_or(true, |exp| exp > now)
1284            })
1285            .map(|s| {
1286                (
1287                    s.subscriber_address,
1288                    s.subscriber_process_id,
1289                    s.issue_confirmed,
1290                )
1291            })
1292            .collect()
1293    }
1294
1295    /// Return the count of active (non-expired) subscriptions.
1296    pub fn active_count(&self) -> usize {
1297        let now = tokio::time::Instant::now();
1298        let subs = self
1299            .subscriptions
1300            .lock()
1301            .expect("CovSubscriptionManager lock");
1302        subs.iter()
1303            .filter(|s| s.expires_at.map_or(true, |exp| exp > now))
1304            .count()
1305    }
1306}
1307
1308impl Default for CovSubscriptionManager {
1309    fn default() -> Self {
1310        Self::new()
1311    }
1312}
1313
1314/// Encode an UnconfirmedCOVNotification PDU.
1315///
1316/// Returns the encoded bytes or `None` if the buffer is too small.
1317pub fn encode_unconfirmed_cov_notification(
1318    subscriber_process_id: u32,
1319    initiating_device_id: ObjectId,
1320    monitored_object_id: ObjectId,
1321    time_remaining: u32,
1322    values: &[(PropertyId, ClientDataValue)],
1323) -> Option<Vec<u8>> {
1324    let mut buf = [0u8; 1400];
1325    let mut w = Writer::new(&mut buf);
1326    Npdu::new(0).encode(&mut w).ok()?;
1327    // UnconfirmedRequest header: type=1, service=2 (UnconfirmedCOVNotification)
1328    UnconfirmedRequestHeader {
1329        service_choice: 0x02,
1330    }
1331    .encode(&mut w)
1332    .ok()?;
1333    // [0] subscriber-process-identifier
1334    encode_ctx_unsigned(&mut w, 0, subscriber_process_id).ok()?;
1335    // [1] initiating-device-identifier
1336    encode_ctx_unsigned(&mut w, 1, initiating_device_id.raw()).ok()?;
1337    // [2] monitored-object-identifier
1338    encode_ctx_unsigned(&mut w, 2, monitored_object_id.raw()).ok()?;
1339    // [3] time-remaining
1340    encode_ctx_unsigned(&mut w, 3, time_remaining).ok()?;
1341    // [4] list-of-values
1342    Tag::Opening { tag_num: 4 }.encode(&mut w).ok()?;
1343    for (prop_id, value) in values {
1344        // property-identifier [0]
1345        encode_ctx_unsigned(&mut w, 0, prop_id.to_u32()).ok()?;
1346        // property-value [2] (opening)
1347        Tag::Opening { tag_num: 2 }.encode(&mut w).ok()?;
1348        let borrowed = client_value_to_borrowed(value);
1349        encode_application_data_value(&mut w, &borrowed).ok()?;
1350        Tag::Closing { tag_num: 2 }.encode(&mut w).ok()?;
1351    }
1352    Tag::Closing { tag_num: 4 }.encode(&mut w).ok()?;
1353    Some(w.as_written().to_vec())
1354}
1355
1356#[cfg(test)]
1357mod tests {
1358    use super::*;
1359    use rustbac_core::apdu::{ComplexAckHeader, SimpleAck};
1360    use rustbac_core::encoding::{reader::Reader, writer::Writer};
1361    use rustbac_core::npdu::Npdu;
1362    use rustbac_core::services::read_property::SERVICE_READ_PROPERTY;
1363    use rustbac_core::services::write_property::SERVICE_WRITE_PROPERTY;
1364    use rustbac_core::types::{ObjectId, ObjectType, PropertyId};
1365    use rustbac_datalink::DataLinkAddress;
1366    use std::sync::{Arc, Mutex};
1367
1368    #[derive(Clone, Default)]
1369    struct MockDataLink {
1370        sent: Arc<Mutex<Vec<(DataLinkAddress, Vec<u8>)>>>,
1371    }
1372
1373    impl rustbac_datalink::DataLink for MockDataLink {
1374        async fn send(
1375            &self,
1376            address: DataLinkAddress,
1377            payload: &[u8],
1378        ) -> Result<(), rustbac_datalink::DataLinkError> {
1379            self.sent
1380                .lock()
1381                .expect("poisoned")
1382                .push((address, payload.to_vec()));
1383            Ok(())
1384        }
1385
1386        async fn recv(
1387            &self,
1388            _buf: &mut [u8],
1389        ) -> Result<(usize, DataLinkAddress), rustbac_datalink::DataLinkError> {
1390            Err(rustbac_datalink::DataLinkError::InvalidFrame)
1391        }
1392    }
1393
1394    fn make_server() -> (
1395        BacnetServer<MockDataLink>,
1396        Arc<Mutex<Vec<(DataLinkAddress, Vec<u8>)>>>,
1397        Arc<ObjectStore>,
1398    ) {
1399        let store = Arc::new(ObjectStore::new());
1400        let device_id = ObjectId::new(ObjectType::Device, 42);
1401        store.set(
1402            device_id,
1403            PropertyId::ObjectName,
1404            ClientDataValue::CharacterString("TestDevice".to_string()),
1405        );
1406        let handler = ObjectStoreHandler::new(store.clone());
1407        let dl = MockDataLink::default();
1408        let sent = dl.sent.clone();
1409        let server = BacnetServer::new(dl, 42, handler);
1410        (server, sent, store)
1411    }
1412
1413    fn source() -> DataLinkAddress {
1414        DataLinkAddress::Ip("127.0.0.1:47808".parse().unwrap())
1415    }
1416
1417    #[tokio::test]
1418    async fn object_store_set_get_remove() {
1419        let store = ObjectStore::new();
1420        let oid = ObjectId::new(ObjectType::AnalogValue, 1);
1421        store.set(oid, PropertyId::PresentValue, ClientDataValue::Real(3.14));
1422        assert_eq!(
1423            store.get(oid, PropertyId::PresentValue),
1424            Some(ClientDataValue::Real(3.14))
1425        );
1426        store.remove_object(oid);
1427        assert_eq!(store.get(oid, PropertyId::PresentValue), None);
1428    }
1429
1430    #[tokio::test]
1431    async fn object_store_object_ids() {
1432        let store = ObjectStore::new();
1433        let oid1 = ObjectId::new(ObjectType::AnalogValue, 1);
1434        let oid2 = ObjectId::new(ObjectType::AnalogValue, 2);
1435        store.set(oid1, PropertyId::PresentValue, ClientDataValue::Real(1.0));
1436        store.set(oid2, PropertyId::PresentValue, ClientDataValue::Real(2.0));
1437        let mut ids = store.object_ids();
1438        ids.sort_by_key(|id| id.raw());
1439        assert!(ids.contains(&oid1));
1440        assert!(ids.contains(&oid2));
1441    }
1442
1443    #[tokio::test]
1444    async fn read_property_known_returns_complex_ack() {
1445        let (server, sent, _store) = make_server();
1446        let device_id = ObjectId::new(ObjectType::Device, 42);
1447
1448        // Build a ReadProperty request frame.
1449        use rustbac_core::encoding::primitives::encode_ctx_unsigned;
1450        let mut req_buf = [0u8; 256];
1451        let mut w = Writer::new(&mut req_buf);
1452        Npdu::new(0).encode(&mut w).unwrap();
1453        rustbac_core::apdu::ConfirmedRequestHeader {
1454            segmented: false,
1455            more_follows: false,
1456            segmented_response_accepted: true,
1457            max_segments: 0,
1458            max_apdu: 5,
1459            invoke_id: 7,
1460            sequence_number: None,
1461            proposed_window_size: None,
1462            service_choice: SERVICE_READ_PROPERTY,
1463        }
1464        .encode(&mut w)
1465        .unwrap();
1466        encode_ctx_unsigned(&mut w, 0, device_id.raw()).unwrap();
1467        encode_ctx_unsigned(&mut w, 1, PropertyId::ObjectName.to_u32()).unwrap();
1468
1469        server.handle_frame(w.as_written(), source()).await.unwrap();
1470
1471        let sent = sent.lock().expect("poisoned");
1472        assert_eq!(sent.len(), 1);
1473        let mut r = Reader::new(&sent[0].1);
1474        let _npdu = Npdu::decode(&mut r).unwrap();
1475        let hdr = ComplexAckHeader::decode(&mut r).unwrap();
1476        assert_eq!(hdr.invoke_id, 7);
1477        assert_eq!(hdr.service_choice, SERVICE_READ_PROPERTY);
1478    }
1479
1480    #[tokio::test]
1481    async fn read_property_unknown_object_returns_error() {
1482        let (server, sent, _store) = make_server();
1483        let unknown = ObjectId::new(ObjectType::AnalogValue, 999);
1484
1485        use rustbac_core::encoding::primitives::encode_ctx_unsigned;
1486        let mut req_buf = [0u8; 256];
1487        let mut w = Writer::new(&mut req_buf);
1488        Npdu::new(0).encode(&mut w).unwrap();
1489        rustbac_core::apdu::ConfirmedRequestHeader {
1490            segmented: false,
1491            more_follows: false,
1492            segmented_response_accepted: true,
1493            max_segments: 0,
1494            max_apdu: 5,
1495            invoke_id: 3,
1496            sequence_number: None,
1497            proposed_window_size: None,
1498            service_choice: SERVICE_READ_PROPERTY,
1499        }
1500        .encode(&mut w)
1501        .unwrap();
1502        encode_ctx_unsigned(&mut w, 0, unknown.raw()).unwrap();
1503        encode_ctx_unsigned(&mut w, 1, PropertyId::PresentValue.to_u32()).unwrap();
1504
1505        server.handle_frame(w.as_written(), source()).await.unwrap();
1506
1507        let sent = sent.lock().expect("poisoned");
1508        assert_eq!(sent.len(), 1);
1509        // First byte after NPDU should be 0x50 (Error PDU).
1510        let mut r = Reader::new(&sent[0].1);
1511        let _npdu = Npdu::decode(&mut r).unwrap();
1512        let type_byte = r.read_u8().unwrap();
1513        assert_eq!(type_byte >> 4, 5, "expected Error PDU type");
1514    }
1515
1516    #[tokio::test]
1517    async fn write_property_updates_store() {
1518        let (server, sent, store) = make_server();
1519        let device_id = ObjectId::new(ObjectType::Device, 42);
1520
1521        use rustbac_core::encoding::primitives::encode_ctx_unsigned;
1522        use rustbac_core::services::value_codec::encode_application_data_value;
1523        use rustbac_core::types::DataValue;
1524
1525        let mut req_buf = [0u8; 256];
1526        let mut w = Writer::new(&mut req_buf);
1527        Npdu::new(0).encode(&mut w).unwrap();
1528        rustbac_core::apdu::ConfirmedRequestHeader {
1529            segmented: false,
1530            more_follows: false,
1531            segmented_response_accepted: false,
1532            max_segments: 0,
1533            max_apdu: 5,
1534            invoke_id: 11,
1535            sequence_number: None,
1536            proposed_window_size: None,
1537            service_choice: SERVICE_WRITE_PROPERTY,
1538        }
1539        .encode(&mut w)
1540        .unwrap();
1541        encode_ctx_unsigned(&mut w, 0, device_id.raw()).unwrap();
1542        encode_ctx_unsigned(&mut w, 1, PropertyId::ObjectName.to_u32()).unwrap();
1543        Tag::Opening { tag_num: 3 }.encode(&mut w).unwrap();
1544        encode_application_data_value(&mut w, &DataValue::CharacterString("NewName")).unwrap();
1545        Tag::Closing { tag_num: 3 }.encode(&mut w).unwrap();
1546
1547        server.handle_frame(w.as_written(), source()).await.unwrap();
1548
1549        // Verify SimpleAck was sent.
1550        let sent_frames = sent.lock().expect("poisoned");
1551        assert_eq!(sent_frames.len(), 1);
1552        let mut r = Reader::new(&sent_frames[0].1);
1553        let _npdu = Npdu::decode(&mut r).unwrap();
1554        let ack = SimpleAck::decode(&mut r).unwrap();
1555        assert_eq!(ack.invoke_id, 11);
1556        assert_eq!(ack.service_choice, SERVICE_WRITE_PROPERTY);
1557        drop(sent_frames);
1558
1559        // Verify store updated.
1560        assert_eq!(
1561            store.get(device_id, PropertyId::ObjectName),
1562            Some(ClientDataValue::CharacterString("NewName".to_string()))
1563        );
1564    }
1565
1566    #[tokio::test]
1567    async fn who_is_sends_i_am() {
1568        let (server, sent, _store) = make_server();
1569
1570        let mut req_buf = [0u8; 32];
1571        let mut w = Writer::new(&mut req_buf);
1572        Npdu::new(0).encode(&mut w).unwrap();
1573        // UnconfirmedRequest Who-Is (0x08) with no limits.
1574        rustbac_core::apdu::UnconfirmedRequestHeader {
1575            service_choice: 0x08,
1576        }
1577        .encode(&mut w)
1578        .unwrap();
1579
1580        server.handle_frame(w.as_written(), source()).await.unwrap();
1581
1582        let sent = sent.lock().expect("poisoned");
1583        assert_eq!(sent.len(), 1);
1584        let mut r = Reader::new(&sent[0].1);
1585        let _npdu = Npdu::decode(&mut r).unwrap();
1586        let _unconf_hdr = rustbac_core::apdu::UnconfirmedRequestHeader::decode(&mut r).unwrap();
1587        let iam = rustbac_core::services::i_am::IAmRequest::decode_after_header(&mut r).unwrap();
1588        assert_eq!(iam.device_id.instance(), 42);
1589    }
1590
1591    #[tokio::test]
1592    async fn unknown_service_sends_reject() {
1593        let (server, sent, _store) = make_server();
1594
1595        let mut req_buf = [0u8; 32];
1596        let mut w = Writer::new(&mut req_buf);
1597        Npdu::new(0).encode(&mut w).unwrap();
1598        rustbac_core::apdu::ConfirmedRequestHeader {
1599            segmented: false,
1600            more_follows: false,
1601            segmented_response_accepted: false,
1602            max_segments: 0,
1603            max_apdu: 5,
1604            invoke_id: 99,
1605            sequence_number: None,
1606            proposed_window_size: None,
1607            service_choice: 0x55, // unknown
1608        }
1609        .encode(&mut w)
1610        .unwrap();
1611
1612        server.handle_frame(w.as_written(), source()).await.unwrap();
1613
1614        let sent = sent.lock().expect("poisoned");
1615        assert_eq!(sent.len(), 1);
1616        let mut r = Reader::new(&sent[0].1);
1617        let _npdu = Npdu::decode(&mut r).unwrap();
1618        let type_byte = r.read_u8().unwrap();
1619        assert_eq!(type_byte >> 4, 6, "expected Reject PDU type");
1620        let id = r.read_u8().unwrap();
1621        let reason = r.read_u8().unwrap();
1622        assert_eq!(id, 99);
1623        assert_eq!(reason, 0x08); // UNRECOGNIZED_SERVICE
1624    }
1625
1626    #[tokio::test]
1627    async fn cov_subscription_manager_subscribe_and_cancel() {
1628        let mgr = CovSubscriptionManager::new();
1629        let obj = ObjectId::new(ObjectType::AnalogValue, 1);
1630        let addr = source();
1631
1632        mgr.subscribe(1, obj, addr, false, Some(300));
1633        assert_eq!(mgr.active_count(), 1);
1634        assert_eq!(mgr.subscribers_for(obj).len(), 1);
1635
1636        mgr.cancel(1, obj);
1637        assert_eq!(mgr.active_count(), 0);
1638        assert_eq!(mgr.subscribers_for(obj).len(), 0);
1639    }
1640
1641    #[test]
1642    fn encode_unconfirmed_cov_notification_produces_bytes() {
1643        let device_id = ObjectId::new(ObjectType::Device, 42);
1644        let object_id = ObjectId::new(ObjectType::AnalogValue, 1);
1645        let values = vec![(PropertyId::PresentValue, ClientDataValue::Real(72.5))];
1646        let result = encode_unconfirmed_cov_notification(1, device_id, object_id, 300, &values);
1647        assert!(result.is_some());
1648        let bytes = result.unwrap();
1649        assert!(bytes.len() > 10);
1650    }
1651}