1use std::collections::HashMap;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::Utc;
10use parking_lot::RwLock;
11use tokio::sync::broadcast;
12
13use mabi_core::{
14 device::DeviceStatistics,
15 types::{AccessMode, DataPoint, DataPointDef, DataPointId, DataType},
16 Device, DeviceInfo, DeviceState, Protocol, Result as CoreResult, Value,
17};
18
19use crate::address::{GroupAddress, IndividualAddress};
20use crate::config::KnxDeviceConfig;
21use crate::dpt::{DptId, DptRegistry, DptValue};
22use crate::error::KnxResult;
23use crate::group::GroupObjectTable;
24
25pub struct KnxDevice {
27 info: RwLock<DeviceInfo>,
28 config: KnxDeviceConfig,
29 group_objects: Arc<GroupObjectTable>,
30 dpt_registry: Arc<DptRegistry>,
31 point_definitions: RwLock<HashMap<String, DataPointDef>>,
32 statistics: RwLock<DeviceStatistics>,
33 change_tx: broadcast::Sender<DataPoint>,
34}
35
36impl KnxDevice {
37 pub fn new(config: KnxDeviceConfig) -> Self {
39 let (change_tx, _) = broadcast::channel(1000);
40
41 Self {
42 info: RwLock::new(
43 DeviceInfo::new(&config.id, &config.name, Protocol::KnxIp)
44 .with_description(&config.description),
45 ),
46 config,
47 group_objects: Arc::new(GroupObjectTable::new()),
48 dpt_registry: Arc::new(DptRegistry::new()),
49 point_definitions: RwLock::new(HashMap::new()),
50 statistics: RwLock::new(DeviceStatistics::default()),
51 change_tx,
52 }
53 }
54
55 pub fn with_dpt_registry(mut self, registry: Arc<DptRegistry>) -> Self {
57 self.dpt_registry = registry;
58 self.group_objects = Arc::new(GroupObjectTable::with_registry(self.dpt_registry.clone()));
59 self
60 }
61
62 pub fn with_group_objects(mut self, table: Arc<GroupObjectTable>) -> Self {
64 self.group_objects = table;
65 self
66 }
67
68 pub fn id(&self) -> String {
70 self.info.read().id.clone()
71 }
72
73 pub fn individual_address(&self) -> IndividualAddress {
75 self.config.individual_address
76 }
77
78 pub fn group_objects(&self) -> Arc<GroupObjectTable> {
80 self.group_objects.clone()
81 }
82
83 pub fn add_group_object(
85 &self,
86 address: GroupAddress,
87 name: impl Into<String>,
88 dpt_id: &DptId,
89 ) -> KnxResult<()> {
90 let name_str = name.into();
91 let obj = self.group_objects.create(address, &name_str, dpt_id)?;
92
93 let point_id = format!("{}_{}", address, name_str);
95 let data_type = dpt_to_data_type(dpt_id);
96 let def =
97 DataPointDef::new(&point_id, &name_str, data_type).with_access(if obj.can_write() {
98 AccessMode::ReadWrite
99 } else {
100 AccessMode::ReadOnly
101 });
102
103 self.point_definitions.write().insert(point_id, def);
104 self.update_point_count();
105
106 Ok(())
107 }
108
109 pub fn read_group(&self, address: &GroupAddress) -> KnxResult<DptValue> {
111 self.group_objects.read_value(address)
112 }
113
114 pub fn write_group(&self, address: &GroupAddress, value: &DptValue) -> KnxResult<()> {
116 self.group_objects
117 .write_value(address, value, Some(self.id()))
118 }
119
120 fn update_point_count(&self) {
121 self.info.write().point_count = self.point_definitions.read().len();
122 }
123
124 fn set_state(&self, state: DeviceState) {
125 self.info.write().state = state;
126 self.info.write().updated_at = Utc::now();
127 }
128
129 fn dpt_to_value(dpt_value: &DptValue) -> Value {
131 match dpt_value {
132 DptValue::Bool(v) => Value::Bool(*v),
133 DptValue::U8(v) => Value::U8(*v),
134 DptValue::I8(v) => Value::I8(*v),
135 DptValue::U16(v) => Value::U16(*v),
136 DptValue::I16(v) => Value::I16(*v),
137 DptValue::U32(v) => Value::U32(*v),
138 DptValue::I32(v) => Value::I32(*v),
139 DptValue::F16(v) => Value::F32(*v),
140 DptValue::F32(v) => Value::F32(*v),
141 DptValue::String(v) => Value::String(v.clone()),
142 DptValue::Raw(v) => Value::Bytes(v.clone()),
143 _ => Value::Null, }
145 }
146
147 fn value_to_dpt(value: &Value) -> DptValue {
149 match value {
150 Value::Bool(v) => DptValue::Bool(*v),
151 Value::I8(v) => DptValue::I8(*v),
152 Value::U8(v) => DptValue::U8(*v),
153 Value::I16(v) => DptValue::I16(*v),
154 Value::U16(v) => DptValue::U16(*v),
155 Value::I32(v) => DptValue::I32(*v),
156 Value::U32(v) => DptValue::U32(*v),
157 Value::I64(v) => DptValue::I32(*v as i32),
158 Value::U64(v) => DptValue::U32(*v as u32),
159 Value::F32(v) => DptValue::F32(*v),
160 Value::F64(v) => DptValue::F32(*v as f32),
161 Value::String(v) => DptValue::String(v.clone()),
162 Value::Bytes(v) => DptValue::Raw(v.clone()),
163 _ => DptValue::Raw(vec![]),
164 }
165 }
166}
167
168fn dpt_to_data_type(dpt_id: &DptId) -> DataType {
170 match dpt_id.main {
171 1 => DataType::Bool,
172 5 => DataType::UInt8,
173 6 => DataType::Int8,
174 7 => DataType::UInt16,
175 8 => DataType::Int16,
176 9 | 14 => DataType::Float32,
177 12 => DataType::UInt32,
178 13 => DataType::Int32,
179 16 => DataType::String,
180 _ => DataType::ByteString,
181 }
182}
183
184#[async_trait]
185impl Device for KnxDevice {
186 fn info(&self) -> &DeviceInfo {
187 unsafe {
192 let ptr = self.info.data_ptr();
193 &*ptr
194 }
195 }
196
197 async fn initialize(&mut self) -> CoreResult<()> {
198 self.set_state(DeviceState::Initializing);
199
200 for go_config in &self.config.group_objects {
202 let address: GroupAddress = go_config
203 .address
204 .parse()
205 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
206
207 let dpt_id: DptId = go_config
208 .dpt
209 .parse()
210 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
211
212 self.add_group_object(address, &go_config.name, &dpt_id)
213 .map_err(|e| mabi_core::Error::Protocol(e.to_string()))?;
214 }
215
216 self.set_state(DeviceState::Uninitialized);
217 Ok(())
218 }
219
220 async fn start(&mut self) -> CoreResult<()> {
221 self.set_state(DeviceState::Online);
222 Ok(())
223 }
224
225 async fn stop(&mut self) -> CoreResult<()> {
226 self.set_state(DeviceState::Offline);
227 Ok(())
228 }
229
230 async fn tick(&mut self) -> CoreResult<()> {
231 let start = std::time::Instant::now();
232
233 let duration = start.elapsed();
237 self.statistics
238 .write()
239 .record_tick(duration.as_micros() as u64);
240
241 Ok(())
242 }
243
244 fn point_definitions(&self) -> Vec<&DataPointDef> {
245 vec![]
248 }
249
250 fn point_definition(&self, _point_id: &str) -> Option<&DataPointDef> {
251 None
253 }
254
255 async fn read(&self, point_id: &str) -> CoreResult<DataPoint> {
256 self.statistics.write().record_read();
257
258 let address_str = point_id.split('_').next().unwrap_or(point_id);
261 let address: GroupAddress = address_str
262 .parse()
263 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
264
265 let dpt_value = self
266 .group_objects
267 .read_value(&address)
268 .map_err(|e| mabi_core::Error::Protocol(e.to_string()))?;
269
270 let value = Self::dpt_to_value(&dpt_value);
271 let device_id = self.id();
272
273 Ok(DataPoint::new(
274 DataPointId::new(&device_id, point_id),
275 value,
276 ))
277 }
278
279 async fn write(&mut self, point_id: &str, value: Value) -> CoreResult<()> {
280 self.statistics.write().record_write();
281
282 let address_str = point_id.split('_').next().unwrap_or(point_id);
283 let address: GroupAddress = address_str
284 .parse()
285 .map_err(|e: crate::error::KnxError| mabi_core::Error::Protocol(e.to_string()))?;
286
287 let dpt_value = Self::value_to_dpt(&value);
288
289 self.group_objects
290 .write_value(&address, &dpt_value, Some(self.id()))
291 .map_err(|e| mabi_core::Error::Protocol(e.to_string()))?;
292
293 let device_id = self.id();
295 let data_point = DataPoint::new(DataPointId::new(&device_id, point_id), value);
296 let _ = self.change_tx.send(data_point);
297
298 Ok(())
299 }
300
301 fn subscribe(&self) -> Option<broadcast::Receiver<DataPoint>> {
302 Some(self.change_tx.subscribe())
303 }
304
305 fn statistics(&self) -> DeviceStatistics {
306 self.statistics.read().clone()
307 }
308}
309
310pub struct KnxDeviceBuilder {
316 config: KnxDeviceConfig,
317 dpt_registry: Option<Arc<DptRegistry>>,
318 group_objects: Vec<(GroupAddress, String, DptId, Option<DptValue>)>,
319}
320
321impl KnxDeviceBuilder {
322 pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
324 Self {
325 config: KnxDeviceConfig::new(id, name),
326 dpt_registry: None,
327 group_objects: Vec::new(),
328 }
329 }
330
331 pub fn individual_address(mut self, address: IndividualAddress) -> Self {
333 self.config.individual_address = address;
334 self
335 }
336
337 pub fn description(mut self, description: impl Into<String>) -> Self {
339 self.config.description = description.into();
340 self
341 }
342
343 pub fn dpt_registry(mut self, registry: Arc<DptRegistry>) -> Self {
345 self.dpt_registry = Some(registry);
346 self
347 }
348
349 pub fn group_object(
351 mut self,
352 address: GroupAddress,
353 name: impl Into<String>,
354 dpt_id: DptId,
355 ) -> Self {
356 self.group_objects
357 .push((address, name.into(), dpt_id, None));
358 self
359 }
360
361 pub fn group_object_with_value(
363 mut self,
364 address: GroupAddress,
365 name: impl Into<String>,
366 dpt_id: DptId,
367 value: DptValue,
368 ) -> Self {
369 self.group_objects
370 .push((address, name.into(), dpt_id, Some(value)));
371 self
372 }
373
374 pub fn build(self) -> KnxResult<KnxDevice> {
376 let registry = self
377 .dpt_registry
378 .unwrap_or_else(|| Arc::new(DptRegistry::new()));
379 let table = Arc::new(GroupObjectTable::with_registry(registry.clone()));
380
381 for (address, name, dpt_id, value) in self.group_objects {
383 let obj = table.create(address, &name, &dpt_id)?;
384 if let Some(v) = value {
385 obj.write_value(&v)?;
386 }
387 }
388
389 let device = KnxDevice::new(self.config)
390 .with_dpt_registry(registry)
391 .with_group_objects(table);
392
393 Ok(device)
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400
401 #[tokio::test]
402 async fn test_knx_device_builder() {
403 let device = KnxDeviceBuilder::new("knx-1", "KNX Test Device")
404 .individual_address(IndividualAddress::new(1, 2, 3))
405 .group_object(
406 GroupAddress::three_level(1, 0, 1),
407 "Light",
408 DptId::new(1, 1),
409 )
410 .group_object_with_value(
411 GroupAddress::three_level(1, 0, 2),
412 "Temperature",
413 DptId::new(9, 1),
414 DptValue::F16(22.5),
415 )
416 .build()
417 .unwrap();
418
419 assert_eq!(device.individual_address().to_string(), "1.2.3");
420
421 let temp = device
422 .read_group(&GroupAddress::three_level(1, 0, 2))
423 .unwrap();
424 if let DptValue::F16(v) = temp {
425 assert!((v - 22.5).abs() < 0.1);
426 }
427 }
428
429 #[tokio::test]
430 async fn test_knx_device_read_write() {
431 let device = KnxDeviceBuilder::new("knx-test", "Test")
432 .group_object(
433 GroupAddress::three_level(1, 0, 1),
434 "Switch",
435 DptId::new(1, 1),
436 )
437 .build()
438 .unwrap();
439
440 device
442 .write_group(&GroupAddress::three_level(1, 0, 1), &DptValue::Bool(true))
443 .unwrap();
444
445 let value = device
447 .read_group(&GroupAddress::three_level(1, 0, 1))
448 .unwrap();
449 assert_eq!(value, DptValue::Bool(true));
450 }
451}