1use std::sync::Arc;
7
8use dashmap::DashMap;
9use parking_lot::RwLock;
10use tokio::sync::broadcast;
11
12use crate::address::GroupAddress;
13use crate::dpt::{BoxedDptCodec, DptCodec, DptId, DptRegistry, DptValue};
14use crate::error::{KnxError, KnxResult};
15
16#[derive(Debug, Clone, Copy, Default)]
22pub struct GroupObjectFlags {
23 pub communication: bool,
25 pub read: bool,
27 pub write: bool,
29 pub transmit: bool,
31 pub update: bool,
33}
34
35impl GroupObjectFlags {
36 pub fn read_write() -> Self {
38 Self {
39 communication: true,
40 read: true,
41 write: true,
42 transmit: true,
43 update: true,
44 }
45 }
46
47 pub fn read_only() -> Self {
49 Self {
50 communication: true,
51 read: true,
52 write: false,
53 transmit: true,
54 update: false,
55 }
56 }
57
58 pub fn write_only() -> Self {
60 Self {
61 communication: true,
62 read: false,
63 write: true,
64 transmit: false,
65 update: true,
66 }
67 }
68
69 pub fn is_enabled(&self) -> bool {
71 self.communication
72 }
73}
74
75pub struct GroupObject {
81 address: GroupAddress,
83 name: String,
85 description: String,
87 codec: Arc<BoxedDptCodec>,
89 flags: GroupObjectFlags,
91 value: RwLock<Vec<u8>>,
93 last_update: RwLock<std::time::Instant>,
95}
96
97impl std::fmt::Debug for GroupObject {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("GroupObject")
100 .field("address", &self.address)
101 .field("name", &self.name)
102 .field("description", &self.description)
103 .field("dpt_id", &self.codec.id())
104 .field("flags", &self.flags)
105 .field("value", &self.value.read().as_slice())
106 .finish()
107 }
108}
109
110impl GroupObject {
111 pub fn new(address: GroupAddress, name: impl Into<String>, codec: Arc<BoxedDptCodec>) -> Self {
113 let default_value = codec.default_value();
114 let encoded = codec.encode(&default_value).unwrap_or_default();
115
116 Self {
117 address,
118 name: name.into(),
119 description: String::new(),
120 codec,
121 flags: GroupObjectFlags::read_write(),
122 value: RwLock::new(encoded),
123 last_update: RwLock::new(std::time::Instant::now()),
124 }
125 }
126
127 pub fn with_description(mut self, description: impl Into<String>) -> Self {
129 self.description = description.into();
130 self
131 }
132
133 pub fn with_flags(mut self, flags: GroupObjectFlags) -> Self {
135 self.flags = flags;
136 self
137 }
138
139 pub fn with_value(self, value: &DptValue) -> KnxResult<Self> {
141 self.write_value(value)?;
142 Ok(self)
143 }
144
145 pub fn address(&self) -> GroupAddress {
147 self.address
148 }
149
150 pub fn name(&self) -> &str {
152 &self.name
153 }
154
155 pub fn description(&self) -> &str {
157 &self.description
158 }
159
160 pub fn dpt_id(&self) -> DptId {
162 self.codec.id()
163 }
164
165 pub fn flags(&self) -> GroupObjectFlags {
167 self.flags
168 }
169
170 pub fn read_raw(&self) -> Vec<u8> {
172 self.value.read().clone()
173 }
174
175 pub fn read_value(&self) -> KnxResult<DptValue> {
177 let raw = self.value.read();
178 self.codec.decode(&raw)
179 }
180
181 pub fn write_raw(&self, data: &[u8]) -> KnxResult<()> {
183 if !self.flags.write {
184 return Err(KnxError::GroupObjectWriteNotAllowed(self.name.clone()));
185 }
186
187 *self.value.write() = data.to_vec();
188 *self.last_update.write() = std::time::Instant::now();
189 Ok(())
190 }
191
192 pub fn write_value(&self, value: &DptValue) -> KnxResult<()> {
194 if !self.flags.write {
195 return Err(KnxError::GroupObjectWriteNotAllowed(self.name.clone()));
196 }
197
198 let encoded = self.codec.encode(value)?;
199 *self.value.write() = encoded;
200 *self.last_update.write() = std::time::Instant::now();
201 Ok(())
202 }
203
204 pub fn can_read(&self) -> bool {
206 self.flags.communication && self.flags.read
207 }
208
209 pub fn can_write(&self) -> bool {
211 self.flags.communication && self.flags.write
212 }
213
214 pub fn last_update(&self) -> std::time::Instant {
216 *self.last_update.read()
217 }
218
219 pub fn age(&self) -> std::time::Duration {
221 self.last_update.read().elapsed()
222 }
223}
224
225#[derive(Debug, Clone)]
231pub enum GroupEvent {
232 ValueWrite {
234 address: GroupAddress,
235 value: Vec<u8>,
236 source: Option<String>,
237 },
238 ReadRequest {
240 address: GroupAddress,
241 source: Option<String>,
242 },
243 ReadResponse {
245 address: GroupAddress,
246 value: Vec<u8>,
247 },
248 ObjectCreated { address: GroupAddress },
250 ObjectRemoved { address: GroupAddress },
252}
253
254impl GroupEvent {
255 pub fn address(&self) -> GroupAddress {
257 match self {
258 Self::ValueWrite { address, .. } => *address,
259 Self::ReadRequest { address, .. } => *address,
260 Self::ReadResponse { address, .. } => *address,
261 Self::ObjectCreated { address } => *address,
262 Self::ObjectRemoved { address } => *address,
263 }
264 }
265}
266
267pub struct GroupObjectTable {
273 objects: DashMap<GroupAddress, Arc<GroupObject>>,
275 event_tx: broadcast::Sender<GroupEvent>,
277 dpt_registry: Arc<DptRegistry>,
279}
280
281impl GroupObjectTable {
282 pub fn new() -> Self {
284 Self::with_registry(Arc::new(DptRegistry::new()))
285 }
286
287 pub fn with_registry(registry: Arc<DptRegistry>) -> Self {
289 let (tx, _) = broadcast::channel(1000);
290 Self {
291 objects: DashMap::new(),
292 event_tx: tx,
293 dpt_registry: registry,
294 }
295 }
296
297 pub fn subscribe(&self) -> broadcast::Receiver<GroupEvent> {
299 self.event_tx.subscribe()
300 }
301
302 pub fn add(&self, object: GroupObject) {
304 let address = object.address;
305 self.objects.insert(address, Arc::new(object));
306 let _ = self.event_tx.send(GroupEvent::ObjectCreated { address });
307 }
308
309 pub fn create(
311 &self,
312 address: GroupAddress,
313 name: impl Into<String>,
314 dpt_id: &DptId,
315 ) -> KnxResult<Arc<GroupObject>> {
316 let codec = self
317 .dpt_registry
318 .get(dpt_id)
319 .ok_or_else(|| KnxError::InvalidDpt(format!("Unknown DPT: {}", dpt_id)))?;
320
321 let object = Arc::new(GroupObject::new(address, name, codec));
322 self.objects.insert(address, object.clone());
323 let _ = self.event_tx.send(GroupEvent::ObjectCreated { address });
324
325 Ok(object)
326 }
327
328 pub fn remove(&self, address: &GroupAddress) -> Option<Arc<GroupObject>> {
330 let removed = self.objects.remove(address).map(|(_, v)| v);
331 if removed.is_some() {
332 let _ = self.event_tx.send(GroupEvent::ObjectRemoved { address: *address });
333 }
334 removed
335 }
336
337 pub fn get(&self, address: &GroupAddress) -> Option<Arc<GroupObject>> {
339 self.objects.get(address).map(|v| v.clone())
340 }
341
342 pub fn contains(&self, address: &GroupAddress) -> bool {
344 self.objects.contains_key(address)
345 }
346
347 pub fn addresses(&self) -> Vec<GroupAddress> {
349 self.objects.iter().map(|r| *r.key()).collect()
350 }
351
352 pub fn len(&self) -> usize {
354 self.objects.len()
355 }
356
357 pub fn is_empty(&self) -> bool {
359 self.objects.is_empty()
360 }
361
362 pub fn read(&self, address: &GroupAddress) -> KnxResult<Vec<u8>> {
364 let obj = self
365 .get(address)
366 .ok_or_else(|| KnxError::GroupObjectNotFound(address.to_string()))?;
367
368 if !obj.can_read() {
369 return Err(KnxError::GroupObjectReadNotAllowed(obj.name().to_string()));
370 }
371
372 Ok(obj.read_raw())
373 }
374
375 pub fn read_value(&self, address: &GroupAddress) -> KnxResult<DptValue> {
377 let obj = self
378 .get(address)
379 .ok_or_else(|| KnxError::GroupObjectNotFound(address.to_string()))?;
380
381 if !obj.can_read() {
382 return Err(KnxError::GroupObjectReadNotAllowed(obj.name().to_string()));
383 }
384
385 obj.read_value()
386 }
387
388 pub fn write(&self, address: &GroupAddress, data: &[u8], source: Option<String>) -> KnxResult<()> {
390 let obj = self
391 .get(address)
392 .ok_or_else(|| KnxError::GroupObjectNotFound(address.to_string()))?;
393
394 obj.write_raw(data)?;
395
396 let _ = self.event_tx.send(GroupEvent::ValueWrite {
397 address: *address,
398 value: data.to_vec(),
399 source,
400 });
401
402 Ok(())
403 }
404
405 pub fn write_value(
407 &self,
408 address: &GroupAddress,
409 value: &DptValue,
410 source: Option<String>,
411 ) -> KnxResult<()> {
412 let obj = self
413 .get(address)
414 .ok_or_else(|| KnxError::GroupObjectNotFound(address.to_string()))?;
415
416 obj.write_value(value)?;
417
418 let _ = self.event_tx.send(GroupEvent::ValueWrite {
419 address: *address,
420 value: obj.read_raw(),
421 source,
422 });
423
424 Ok(())
425 }
426
427 pub fn handle_read_request(
429 &self,
430 address: &GroupAddress,
431 source: Option<String>,
432 ) -> KnxResult<Vec<u8>> {
433 let _ = self.event_tx.send(GroupEvent::ReadRequest {
434 address: *address,
435 source,
436 });
437
438 let value = self.read(address)?;
439
440 let _ = self.event_tx.send(GroupEvent::ReadResponse {
441 address: *address,
442 value: value.clone(),
443 });
444
445 Ok(value)
446 }
447
448 pub fn iter(&self) -> impl Iterator<Item = Arc<GroupObject>> + '_ {
450 self.objects.iter().map(|r| r.value().clone())
451 }
452
453 pub fn filter<F>(&self, predicate: F) -> Vec<Arc<GroupObject>>
455 where
456 F: Fn(&GroupObject) -> bool,
457 {
458 self.objects
459 .iter()
460 .filter(|r| predicate(r.value()))
461 .map(|r| r.value().clone())
462 .collect()
463 }
464
465 pub fn range(&self, start: GroupAddress, end: GroupAddress) -> Vec<Arc<GroupObject>> {
467 self.objects
468 .iter()
469 .filter(|r| {
470 let addr = r.key().raw();
471 addr >= start.raw() && addr <= end.raw()
472 })
473 .map(|r| r.value().clone())
474 .collect()
475 }
476}
477
478impl Default for GroupObjectTable {
479 fn default() -> Self {
480 Self::new()
481 }
482}
483
484pub struct GroupObjectBuilder {
490 address: GroupAddress,
491 name: String,
492 description: String,
493 dpt_id: DptId,
494 flags: GroupObjectFlags,
495 initial_value: Option<DptValue>,
496}
497
498impl GroupObjectBuilder {
499 pub fn new(address: GroupAddress, dpt_id: DptId) -> Self {
501 Self {
502 address,
503 name: format!("Group {}", address),
504 description: String::new(),
505 dpt_id,
506 flags: GroupObjectFlags::read_write(),
507 initial_value: None,
508 }
509 }
510
511 pub fn name(mut self, name: impl Into<String>) -> Self {
513 self.name = name.into();
514 self
515 }
516
517 pub fn description(mut self, description: impl Into<String>) -> Self {
519 self.description = description.into();
520 self
521 }
522
523 pub fn flags(mut self, flags: GroupObjectFlags) -> Self {
525 self.flags = flags;
526 self
527 }
528
529 pub fn read_only(mut self) -> Self {
531 self.flags = GroupObjectFlags::read_only();
532 self
533 }
534
535 pub fn write_only(mut self) -> Self {
537 self.flags = GroupObjectFlags::write_only();
538 self
539 }
540
541 pub fn initial_value(mut self, value: DptValue) -> Self {
543 self.initial_value = Some(value);
544 self
545 }
546
547 pub fn build(self, registry: &DptRegistry) -> KnxResult<GroupObject> {
549 let codec = registry
550 .get(&self.dpt_id)
551 .ok_or_else(|| KnxError::InvalidDpt(format!("Unknown DPT: {}", self.dpt_id)))?;
552
553 let mut obj = GroupObject::new(self.address, self.name, codec)
554 .with_description(self.description)
555 .with_flags(self.flags);
556
557 if let Some(value) = self.initial_value {
558 obj = obj.with_value(&value)?;
559 }
560
561 Ok(obj)
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use super::*;
568 use crate::dpt::DptId;
569
570 #[test]
571 fn test_group_object_read_write() {
572 let registry = DptRegistry::new();
573 let codec = registry.get(&DptId::new(9, 1)).unwrap();
574
575 let obj = GroupObject::new(GroupAddress::three_level(1, 2, 3), "Temperature", codec);
576
577 obj.write_value(&DptValue::F16(25.5)).unwrap();
578
579 let value = obj.read_value().unwrap();
580 if let DptValue::F16(v) = value {
581 assert!((v - 25.5).abs() < 0.1);
582 } else {
583 panic!("Expected F16");
584 }
585 }
586
587 #[test]
588 fn test_group_object_flags() {
589 let registry = DptRegistry::new();
590 let codec = registry.get(&DptId::new(1, 1)).unwrap();
591
592 let obj = GroupObject::new(GroupAddress::three_level(1, 0, 1), "Switch", codec)
593 .with_flags(GroupObjectFlags::read_only());
594
595 assert!(obj.can_read());
596 assert!(!obj.can_write());
597
598 let result = obj.write_value(&DptValue::Bool(true));
599 assert!(result.is_err());
600 }
601
602 #[test]
603 fn test_group_object_table() {
604 let table = GroupObjectTable::new();
605
606 let addr1 = GroupAddress::three_level(1, 0, 1);
607 let addr2 = GroupAddress::three_level(1, 0, 2);
608
609 table.create(addr1, "Light 1", &DptId::new(1, 1)).unwrap();
610 table.create(addr2, "Light 2", &DptId::new(1, 1)).unwrap();
611
612 assert_eq!(table.len(), 2);
613 assert!(table.contains(&addr1));
614
615 table.write(&addr1, &[1], None).unwrap();
616 let value = table.read(&addr1).unwrap();
617 assert_eq!(value, vec![1]);
618 }
619
620 #[test]
621 fn test_group_object_builder() {
622 let registry = DptRegistry::new();
623
624 let obj = GroupObjectBuilder::new(GroupAddress::three_level(2, 0, 1), DptId::new(9, 1))
626 .name("Room Temperature")
627 .description("Living room temperature sensor")
628 .read_only()
629 .build(®istry)
630 .unwrap();
631
632 assert_eq!(obj.name(), "Room Temperature");
633 assert!(obj.can_read());
634 assert!(!obj.can_write());
635
636 let obj2 = GroupObjectBuilder::new(GroupAddress::three_level(2, 0, 2), DptId::new(9, 1))
638 .name("Setpoint")
639 .initial_value(DptValue::F16(22.0))
640 .build(®istry)
641 .unwrap();
642
643 assert_eq!(obj2.name(), "Setpoint");
644 assert!(obj2.can_read());
645 assert!(obj2.can_write());
646 }
647
648 #[test]
649 fn test_group_event_subscription() {
650 let table = GroupObjectTable::new();
651 let mut rx = table.subscribe();
652
653 let addr = GroupAddress::three_level(1, 0, 1);
654 table.create(addr, "Test", &DptId::new(1, 1)).unwrap();
655
656 let event = rx.try_recv().unwrap();
658 assert!(matches!(event, GroupEvent::ObjectCreated { .. }));
659 }
660}