1use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::Utc;
10use parking_lot::RwLock;
11use tokio::sync::broadcast;
12
13use mabi_core::{
14 Device, DeviceInfo, DeviceState, Protocol, Result as CoreResult, Value,
15 types::{DataPoint, DataPointDef, DataPointId, DataType, AccessMode},
16 device::DeviceStatistics,
17};
18
19use crate::address::{GroupAddress, IndividualAddress};
20use crate::config::KnxDeviceConfig;
21use crate::dpt::{DptId, DptRegistry, DptValue};
22use crate::error::KnxResult;
23use crate::group::GroupObjectTable;
24
25pub struct KnxDevice {
27 info: RwLock<DeviceInfo>,
28 config: KnxDeviceConfig,
29 group_objects: Arc<GroupObjectTable>,
30 dpt_registry: Arc<DptRegistry>,
31 point_definitions: RwLock<HashMap<String, DataPointDef>>,
32 statistics: RwLock<DeviceStatistics>,
33 change_tx: broadcast::Sender<DataPoint>,
34}
35
36impl KnxDevice {
37 pub fn new(config: KnxDeviceConfig) -> Self {
39 let (change_tx, _) = broadcast::channel(1000);
40
41 Self {
42 info: RwLock::new(
43 DeviceInfo::new(&config.id, &config.name, Protocol::KnxIp)
44 .with_description(&config.description),
45 ),
46 config,
47 group_objects: Arc::new(GroupObjectTable::new()),
48 dpt_registry: Arc::new(DptRegistry::new()),
49 point_definitions: RwLock::new(HashMap::new()),
50 statistics: RwLock::new(DeviceStatistics::default()),
51 change_tx,
52 }
53 }
54
55 pub fn with_dpt_registry(mut self, registry: Arc<DptRegistry>) -> Self {
57 self.dpt_registry = registry;
58 self.group_objects = Arc::new(GroupObjectTable::with_registry(self.dpt_registry.clone()));
59 self
60 }
61
62 pub fn with_group_objects(mut self, table: Arc<GroupObjectTable>) -> Self {
64 self.group_objects = table;
65 self
66 }
67
68 pub fn id(&self) -> String {
70 self.info.read().id.clone()
71 }
72
73 pub fn individual_address(&self) -> IndividualAddress {
75 self.config.individual_address
76 }
77
78 pub fn group_objects(&self) -> Arc<GroupObjectTable> {
80 self.group_objects.clone()
81 }
82
83 pub fn add_group_object(
85 &self,
86 address: GroupAddress,
87 name: impl Into<String>,
88 dpt_id: &DptId,
89 ) -> KnxResult<()> {
90 let name_str = name.into();
91 let obj = self.group_objects.create(address, &name_str, dpt_id)?;
92
93 let point_id = format!("{}_{}", address, name_str);
95 let data_type = dpt_to_data_type(dpt_id);
96 let def = DataPointDef::new(&point_id, &name_str, data_type)
97 .with_access(if obj.can_write() {
98 AccessMode::ReadWrite
99 } else {
100 AccessMode::ReadOnly
101 });
102
103 self.point_definitions.write().insert(point_id, def);
104 self.update_point_count();
105
106 Ok(())
107 }
108
109 pub fn read_group(&self, address: &GroupAddress) -> KnxResult<DptValue> {
111 self.group_objects.read_value(address)
112 }
113
114 pub fn write_group(&self, address: &GroupAddress, value: &DptValue) -> KnxResult<()> {
116 self.group_objects.write_value(address, value, Some(self.id()))
117 }
118
119 fn update_point_count(&self) {
120 self.info.write().point_count = self.point_definitions.read().len();
121 }
122
123 fn set_state(&self, state: DeviceState) {
124 self.info.write().state = state;
125 self.info.write().updated_at = Utc::now();
126 }
127
128 fn dpt_to_value(dpt_value: &DptValue) -> Value {
130 match dpt_value {
131 DptValue::Bool(v) => Value::Bool(*v),
132 DptValue::U8(v) => Value::U8(*v),
133 DptValue::I8(v) => Value::I8(*v),
134 DptValue::U16(v) => Value::U16(*v),
135 DptValue::I16(v) => Value::I16(*v),
136 DptValue::U32(v) => Value::U32(*v),
137 DptValue::I32(v) => Value::I32(*v),
138 DptValue::F16(v) => Value::F32(*v),
139 DptValue::F32(v) => Value::F32(*v),
140 DptValue::String(v) => Value::String(v.clone()),
141 DptValue::Raw(v) => Value::Bytes(v.clone()),
142 _ => Value::Null, }
144 }
145
146 fn value_to_dpt(value: &Value) -> DptValue {
148 match value {
149 Value::Bool(v) => DptValue::Bool(*v),
150 Value::I8(v) => DptValue::I8(*v),
151 Value::U8(v) => DptValue::U8(*v),
152 Value::I16(v) => DptValue::I16(*v),
153 Value::U16(v) => DptValue::U16(*v),
154 Value::I32(v) => DptValue::I32(*v),
155 Value::U32(v) => DptValue::U32(*v),
156 Value::I64(v) => DptValue::I32(*v as i32),
157 Value::U64(v) => DptValue::U32(*v as u32),
158 Value::F32(v) => DptValue::F32(*v),
159 Value::F64(v) => DptValue::F32(*v as f32),
160 Value::String(v) => DptValue::String(v.clone()),
161 Value::Bytes(v) => DptValue::Raw(v.clone()),
162 _ => DptValue::Raw(vec![]),
163 }
164 }
165}
166
167fn dpt_to_data_type(dpt_id: &DptId) -> DataType {
169 match dpt_id.main {
170 1 => DataType::Bool,
171 5 => DataType::UInt8,
172 6 => DataType::Int8,
173 7 => DataType::UInt16,
174 8 => DataType::Int16,
175 9 | 14 => DataType::Float32,
176 12 => DataType::UInt32,
177 13 => DataType::Int32,
178 16 => DataType::String,
179 _ => DataType::ByteString,
180 }
181}
182
183#[async_trait]
184impl Device for KnxDevice {
185 fn info(&self) -> &DeviceInfo {
186 unsafe {
191 let ptr = self.info.data_ptr();
192 &*ptr
193 }
194 }
195
196 async fn initialize(&mut self) -> CoreResult<()> {
197 self.set_state(DeviceState::Initializing);
198
199 for go_config in &self.config.group_objects {
201 let address: GroupAddress = go_config
202 .address
203 .parse()
204 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
205
206 let dpt_id: DptId = go_config
207 .dpt
208 .parse()
209 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
210
211 self.add_group_object(address, &go_config.name, &dpt_id)
212 .map_err(|e| mabi_core::Error::Protocol(e.to_string()))?;
213 }
214
215 self.set_state(DeviceState::Uninitialized);
216 Ok(())
217 }
218
219 async fn start(&mut self) -> CoreResult<()> {
220 self.set_state(DeviceState::Online);
221 Ok(())
222 }
223
224 async fn stop(&mut self) -> CoreResult<()> {
225 self.set_state(DeviceState::Offline);
226 Ok(())
227 }
228
229 async fn tick(&mut self) -> CoreResult<()> {
230 let start = std::time::Instant::now();
231
232 let duration = start.elapsed();
236 self.statistics
237 .write()
238 .record_tick(duration.as_micros() as u64);
239
240 Ok(())
241 }
242
243 fn point_definitions(&self) -> Vec<&DataPointDef> {
244 vec![]
247 }
248
249 fn point_definition(&self, _point_id: &str) -> Option<&DataPointDef> {
250 None
252 }
253
254 async fn read(&self, point_id: &str) -> CoreResult<DataPoint> {
255 self.statistics.write().record_read();
256
257 let address_str = point_id.split('_').next().unwrap_or(point_id);
260 let address: GroupAddress = address_str
261 .parse()
262 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
263
264 let dpt_value = self
265 .group_objects
266 .read_value(&address)
267 .map_err(|e| mabi_core::Error::Protocol(e.to_string()))?;
268
269 let value = Self::dpt_to_value(&dpt_value);
270 let device_id = self.id();
271
272 Ok(DataPoint::new(
273 DataPointId::new(&device_id, point_id),
274 value,
275 ))
276 }
277
278 async fn write(&mut self, point_id: &str, value: Value) -> CoreResult<()> {
279 self.statistics.write().record_write();
280
281 let address_str = point_id.split('_').next().unwrap_or(point_id);
282 let address: GroupAddress = address_str
283 .parse()
284 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
285
286 let dpt_value = Self::value_to_dpt(&value);
287
288 self.group_objects
289 .write_value(&address, &dpt_value, Some(self.id()))
290 .map_err(|e| mabi_core::Error::Protocol(e.to_string()))?;
291
292 let device_id = self.id();
294 let data_point = DataPoint::new(
295 DataPointId::new(&device_id, point_id),
296 value,
297 );
298 let _ = self.change_tx.send(data_point);
299
300 Ok(())
301 }
302
303 fn subscribe(&self) -> Option<broadcast::Receiver<DataPoint>> {
304 Some(self.change_tx.subscribe())
305 }
306
307 fn statistics(&self) -> DeviceStatistics {
308 self.statistics.read().clone()
309 }
310}
311
312pub struct KnxDeviceBuilder {
318 config: KnxDeviceConfig,
319 dpt_registry: Option<Arc<DptRegistry>>,
320 group_objects: Vec<(GroupAddress, String, DptId, Option<DptValue>)>,
321}
322
323impl KnxDeviceBuilder {
324 pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
326 Self {
327 config: KnxDeviceConfig::new(id, name),
328 dpt_registry: None,
329 group_objects: Vec::new(),
330 }
331 }
332
333 pub fn individual_address(mut self, address: IndividualAddress) -> Self {
335 self.config.individual_address = address;
336 self
337 }
338
339 pub fn description(mut self, description: impl Into<String>) -> Self {
341 self.config.description = description.into();
342 self
343 }
344
345 pub fn dpt_registry(mut self, registry: Arc<DptRegistry>) -> Self {
347 self.dpt_registry = Some(registry);
348 self
349 }
350
351 pub fn group_object(
353 mut self,
354 address: GroupAddress,
355 name: impl Into<String>,
356 dpt_id: DptId,
357 ) -> Self {
358 self.group_objects.push((address, name.into(), dpt_id, None));
359 self
360 }
361
362 pub fn group_object_with_value(
364 mut self,
365 address: GroupAddress,
366 name: impl Into<String>,
367 dpt_id: DptId,
368 value: DptValue,
369 ) -> Self {
370 self.group_objects
371 .push((address, name.into(), dpt_id, Some(value)));
372 self
373 }
374
375 pub fn build(self) -> KnxResult<KnxDevice> {
377 let registry = self.dpt_registry.unwrap_or_else(|| Arc::new(DptRegistry::new()));
378 let table = Arc::new(GroupObjectTable::with_registry(registry.clone()));
379
380 for (address, name, dpt_id, value) in self.group_objects {
382 let obj = table.create(address, &name, &dpt_id)?;
383 if let Some(v) = value {
384 obj.write_value(&v)?;
385 }
386 }
387
388 let device = KnxDevice::new(self.config)
389 .with_dpt_registry(registry)
390 .with_group_objects(table);
391
392 Ok(device)
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[tokio::test]
401 async fn test_knx_device_builder() {
402 let device = KnxDeviceBuilder::new("knx-1", "KNX Test Device")
403 .individual_address(IndividualAddress::new(1, 2, 3))
404 .group_object(
405 GroupAddress::three_level(1, 0, 1),
406 "Light",
407 DptId::new(1, 1),
408 )
409 .group_object_with_value(
410 GroupAddress::three_level(1, 0, 2),
411 "Temperature",
412 DptId::new(9, 1),
413 DptValue::F16(22.5),
414 )
415 .build()
416 .unwrap();
417
418 assert_eq!(device.individual_address().to_string(), "1.2.3");
419
420 let temp = device
421 .read_group(&GroupAddress::three_level(1, 0, 2))
422 .unwrap();
423 if let DptValue::F16(v) = temp {
424 assert!((v - 22.5).abs() < 0.1);
425 }
426 }
427
428 #[tokio::test]
429 async fn test_knx_device_read_write() {
430 let device = KnxDeviceBuilder::new("knx-test", "Test")
431 .group_object(
432 GroupAddress::three_level(1, 0, 1),
433 "Switch",
434 DptId::new(1, 1),
435 )
436 .build()
437 .unwrap();
438
439 device
441 .write_group(&GroupAddress::three_level(1, 0, 1), &DptValue::Bool(true))
442 .unwrap();
443
444 let value = device
446 .read_group(&GroupAddress::three_level(1, 0, 1))
447 .unwrap();
448 assert_eq!(value, DptValue::Bool(true));
449 }
450}