dsmr_parse/
telegram.rs

1use core::str;
2use std::io::Read;
3use std::str::FromStr;
4
5pub use error::Error;
6use log::{trace, warn};
7
8use crate::Tst;
9use crate::cosem_value::UnitValue;
10use crate::line_reader::LineReader;
11
12mod error;
13
14/// Valid telegram
15#[derive(Debug, PartialOrd, PartialEq, Clone, Default)]
16pub struct Telegram {
17	pub ident: String,
18	pub version: Option<String>,
19	pub electricity_date: Option<Tst>,
20	pub electricity_equipment_id: Option<String>,
21	pub electricity_consumed_tariff_1: Option<UnitValue<f64>>,
22	pub electricity_consumed_tariff_2: Option<UnitValue<f64>>,
23	pub electricity_generated_tariff_1: Option<UnitValue<f64>>,
24	pub electricity_generated_tariff_2: Option<UnitValue<f64>>,
25	pub current_tariff: Option<String>,
26	pub power: Option<UnitValue<f64>>,
27	pub return_power: Option<UnitValue<f64>>,
28	pub power_failure_count: Option<u32>,
29	pub long_power_failure_count: Option<u32>,
30	pub power_failure_log: Vec<PowerFailureEntry>,
31	pub voltage_sag_l1_count: Option<u32>,
32	pub voltage_sag_l2_count: Option<u32>,
33	pub voltage_sag_l3_count: Option<u32>,
34	pub voltage_swell_l1_count: Option<u32>,
35	pub voltage_swell_l2_count: Option<u32>,
36	pub voltage_swell_l3_count: Option<u32>,
37	pub message: Option<String>,
38	pub voltage_l1: Option<UnitValue<f64>>,
39	pub voltage_l2: Option<UnitValue<f64>>,
40	pub voltage_l3: Option<UnitValue<f64>>,
41	pub current_l1: Option<UnitValue<u16>>,
42	pub current_l2: Option<UnitValue<u16>>,
43	pub current_l3: Option<UnitValue<u16>>,
44	pub power_l1: Option<UnitValue<f64>>,
45	pub power_l2: Option<UnitValue<f64>>,
46	pub power_l3: Option<UnitValue<f64>>,
47	pub return_power_l1: Option<UnitValue<f64>>,
48	pub return_power_l2: Option<UnitValue<f64>>,
49	pub return_power_l3: Option<UnitValue<f64>>,
50	pub device_type: Option<String>,
51	pub gas_equipment_id: Option<String>,
52	pub gas_date: Option<Tst>,
53	pub gas_consumed: Option<UnitValue<f64>>,
54}
55
56impl Telegram {
57	pub fn read_from(src: impl Read) -> Result<Option<Self>, Error> {
58		enum ParserState {
59			WaitingForHeader,
60			ReadingHeader,
61			ReadingMessage,
62		}
63
64		const CRLF: &[u8] = b"\r\n";
65		let mut parser_state = ParserState::WaitingForHeader;
66		let mut out = Self::default();
67		let mut crc = crc16::State::<crc16::ARC>::new();
68		let mut expected_crc = None;
69		#[expect(clippy::unbuffered_bytes)]
70		let lines = LineReader::new(src.bytes());
71		for line in lines {
72			let mut line = line?;
73			trace!("Got line: {}", String::from_utf8_lossy(&line));
74			match parser_state {
75				ParserState::WaitingForHeader => {
76					if line.starts_with(b"/") {
77						crc.update(&line);
78						crc.update(CRLF);
79						line.remove(0);
80						out.ident = String::from_utf8(line)?;
81						parser_state = ParserState::ReadingHeader;
82					}
83				}
84				ParserState::ReadingHeader => {
85					if line.is_empty() {
86						crc.update(CRLF);
87						parser_state = ParserState::ReadingMessage;
88					} else {
89						parser_state = ParserState::WaitingForHeader;
90					}
91				}
92				ParserState::ReadingMessage => {
93					const CRC_PREFIX: &[u8] = b"!";
94					if let Some(crc_str) = line.strip_prefix(CRC_PREFIX) {
95						crc.update(CRC_PREFIX);
96						expected_crc = Some(u16::from_str_radix(str::from_utf8(crc_str)?, 16)?);
97						break;
98					} else {
99						crc.update(&line);
100						crc.update(CRLF);
101						if let Some(line) = ParsedLine::parse(&line) {
102							match line.obis.obis {
103								b"0.2.8" => out.version = Some(line.value_str().to_string()),
104								b"1.0.0" => out.electricity_date = Tst::try_from_bytes(line.value),
105								b"96.1.1" => out.electricity_equipment_id = parse_octet_string(line.value),
106								b"1.8.1" => out.electricity_consumed_tariff_1 = Some(line.value_str().parse()?),
107								b"1.8.2" => out.electricity_consumed_tariff_2 = Some(line.value_str().parse()?),
108								b"2.8.1" => out.electricity_generated_tariff_1 = Some(line.value_str().parse()?),
109								b"2.8.2" => out.electricity_generated_tariff_2 = Some(line.value_str().parse()?),
110								b"96.14.0" => out.current_tariff = parse_octet_string(line.value),
111								b"1.7.0" => out.power = Some(line.value_str().parse()?),
112								b"2.7.0" => out.return_power = Some(line.value_str().parse()?),
113								b"96.7.21" => out.power_failure_count = Some(line.value_str().parse()?),
114								b"96.7.9" => out.long_power_failure_count = Some(line.value_str().parse()?),
115								b"99.97.0" => {
116									out.power_failure_log = parse_buffer(line.value_str())
117										.into_iter()
118										.map(|(end_date, duration)| PowerFailureEntry { end_date, duration })
119										.collect()
120								}
121								b"32.32.0" => out.voltage_sag_l1_count = Some(line.value_str().parse()?),
122								b"52.32.0" => out.voltage_sag_l2_count = Some(line.value_str().parse()?),
123								b"72.32.0" => out.voltage_sag_l3_count = Some(line.value_str().parse()?),
124								b"32.36.0" => out.voltage_swell_l1_count = Some(line.value_str().parse()?),
125								b"52.36.0" => out.voltage_swell_l2_count = Some(line.value_str().parse()?),
126								b"72.36.0" => out.voltage_swell_l3_count = Some(line.value_str().parse()?),
127								b"96.13.0" => out.message = parse_octet_string(line.value),
128								b"32.7.0" => out.voltage_l1 = Some(line.value_str().parse()?),
129								b"52.7.0" => out.voltage_l2 = Some(line.value_str().parse()?),
130								b"72.7.0" => out.voltage_l3 = Some(line.value_str().parse()?),
131								b"31.7.0" => out.current_l1 = Some(line.value_str().parse()?),
132								b"51.7.0" => out.current_l2 = Some(line.value_str().parse()?),
133								b"71.7.0" => out.current_l3 = Some(line.value_str().parse()?),
134								b"21.7.0" => out.power_l1 = Some(line.value_str().parse()?),
135								b"41.7.0" => out.power_l2 = Some(line.value_str().parse()?),
136								b"61.7.0" => out.power_l3 = Some(line.value_str().parse()?),
137								b"22.7.0" => out.return_power_l1 = Some(line.value_str().parse()?),
138								b"42.7.0" => out.return_power_l2 = Some(line.value_str().parse()?),
139								b"62.7.0" => out.return_power_l3 = Some(line.value_str().parse()?),
140								b"24.1.0" => out.device_type = Some(line.value_str().to_string()),
141								b"96.1.0" => out.gas_equipment_id = parse_octet_string(line.value),
142								b"24.2.1" => {
143									let (gas_date, gas_consumed) = parse_mbus_value(line.value_str());
144									out.gas_date = Tst::try_from_bytes(gas_date.as_bytes());
145									if let Some(gas_consumed) = gas_consumed {
146										out.gas_consumed = Some(gas_consumed.parse()?);
147									}
148								}
149								_ => warn!(
150									"Unknown OBIS: {}-{}-{} with value: {}",
151									line.obis.medium,
152									line.obis.channel,
153									String::from_utf8_lossy(line.obis.obis),
154									String::from_utf8_lossy(line.value),
155								),
156							}
157						}
158					}
159				}
160			}
161		}
162
163		if let Some(expected_crc) = expected_crc {
164			let actual_crc = crc.get();
165			if actual_crc == expected_crc {
166				Ok(Some(out))
167			} else {
168				Err(Error::CrcMismatch(actual_crc, expected_crc))
169			}
170		} else {
171			Ok(None)
172		}
173	}
174}
175
176#[derive(Debug, PartialOrd, PartialEq, Clone)]
177pub struct PowerFailureEntry {
178	pub end_date: Tst,
179	pub duration: UnitValue<u64>,
180}
181
182fn parse_mbus_value(value: &str) -> (&str, Option<&str>) {
183	value
184		.split_once(")(")
185		.map_or((value, None), |(val1, val2)| (val1, Some(val2)))
186}
187
188fn parse_buffer(value: &str) -> Vec<(Tst, UnitValue<u64>)> {
189	let mut out = vec![];
190	let mut parts = value.split(")(");
191	'extract: {
192		let Some(count) = parts.next() else {
193			break 'extract;
194		};
195		let Ok(count) = u8::from_str(count) else {
196			break 'extract;
197		};
198		out.reserve(usize::from(count));
199		let Some(obis) = parts.next() else {
200			break 'extract;
201		};
202		let Some((obis, _)) = ParsedObis::parse(obis.as_bytes()) else {
203			break 'extract;
204		};
205		if obis.obis != b"96.7.19" {
206			break 'extract;
207		}
208		while let Some((date, value)) = parts.next().zip(parts.next()) {
209			let Some(date) = Tst::try_from_bytes(date.as_bytes()) else {
210				continue;
211			};
212			let Ok(value) = UnitValue::from_str(value) else {
213				continue;
214			};
215			out.push((date, value));
216		}
217	}
218	out
219}
220
221fn parse_octet_string(value: &[u8]) -> Option<String> {
222	let mut out = String::with_capacity(value.len() / 2);
223	for bytes in value.chunks(2) {
224		out.push(char::from(u8::from_str_radix(str::from_utf8(bytes).ok()?, 16).ok()?));
225	}
226	Some(out)
227}
228
229struct ParsedLine<'l> {
230	pub obis: ParsedObis<'l>,
231	pub value: &'l [u8],
232}
233
234impl ParsedLine<'_> {
235	fn parse(line: &[u8]) -> Option<ParsedLine> {
236		#[expect(clippy::enum_variant_names)]
237		enum State {
238			WaitingForValue,
239			ReadingValue,
240			WaitingForAnotherValue,
241		}
242
243		let (obis, line) = ParsedObis::parse(line)?;
244		let mut out = ParsedLine { obis, value: &[] };
245		let mut state = State::WaitingForValue;
246		let mut start_offset = 0;
247		for (offset, byte) in line.iter().enumerate() {
248			let c = char::from(*byte);
249			state = match state {
250				State::WaitingForValue => match c {
251					'(' => {
252						start_offset = offset + 1;
253						State::ReadingValue
254					}
255					_ => return None,
256				},
257				State::ReadingValue => match c {
258					')' => {
259						out.value = line.get(start_offset..offset)?;
260						State::WaitingForAnotherValue
261					}
262					_ => State::ReadingValue,
263				},
264				State::WaitingForAnotherValue => match c {
265					'(' => State::ReadingValue,
266					_ => return None,
267				},
268			};
269		}
270		if matches!(state, State::WaitingForAnotherValue) {
271			Some(out)
272		} else {
273			None
274		}
275	}
276
277	pub fn value_str(&self) -> &str {
278		str::from_utf8(self.value).unwrap_or("")
279	}
280}
281
282#[derive(Debug)]
283struct ParsedObis<'l> {
284	/// Group A: Specifies the medium of the object (0= Abstract Objects, 1=Electricity, 7=gas, Etc.)
285	pub medium: u8,
286	/// Group B: Specifies the channel (useful, for example, when a data concentrator is connected to several meters).
287	pub channel: u8,
288	/// Group C: Specifies the Physical Value (Current, Voltage, Energy, etc.).
289	/// Group D: Identifies types, or the result of the processing of physical quantities identified by values in value groups A and C, according to various specific algorithms.
290	/// Group E: Identifies further processing or classification of quantities identified by values in value groups A to D.
291	pub obis: &'l [u8],
292}
293
294impl ParsedObis<'_> {
295	fn parse(line: &[u8]) -> Option<(ParsedObis, &[u8])> {
296		enum State {
297			WaitingForObisMedium,
298			ReadingObisMedium,
299			WaitingForObisChannel,
300			ReadingObisChannel,
301			WaitingForObis,
302			ReadingObis,
303			Done,
304		}
305
306		let mut out = ParsedObis {
307			medium: 0,
308			channel: 0,
309			obis: &[],
310		};
311		let mut state = State::WaitingForObisMedium;
312		let mut start_offset = 0;
313		// todo: more careful parsing of obis sub-values
314		for (offset, byte) in line.iter().enumerate() {
315			let c = char::from(*byte);
316			state = match state {
317				State::WaitingForObisMedium => match c {
318					c if c.is_ascii_digit() => {
319						start_offset = offset;
320						State::ReadingObisMedium
321					}
322					_ => return None,
323				},
324				State::ReadingObisMedium => match c {
325					c if c.is_ascii_digit() => State::ReadingObisMedium,
326					'-' => {
327						out.medium = str::from_utf8(line.get(start_offset..offset)?).ok()?.parse().ok()?;
328						State::WaitingForObisChannel
329					}
330					_ => return None,
331				},
332				State::WaitingForObisChannel => match c {
333					c if c.is_ascii_digit() => {
334						start_offset = offset;
335						State::ReadingObisChannel
336					}
337					_ => return None,
338				},
339				State::ReadingObisChannel => match c {
340					c if c.is_ascii_digit() => State::ReadingObisChannel,
341					':' => {
342						out.channel = str::from_utf8(line.get(start_offset..offset)?).ok()?.parse().ok()?;
343						State::WaitingForObis
344					}
345					_ => return None,
346				},
347				State::WaitingForObis => match c {
348					c if c.is_ascii_digit() => {
349						start_offset = offset;
350						State::ReadingObis
351					}
352					_ => return None,
353				},
354				State::ReadingObis => match c {
355					c if c.is_ascii_digit() || c == '.' => State::ReadingObis,
356					'(' => {
357						out.obis = line.get(start_offset..offset)?;
358						start_offset = offset;
359						State::Done
360					}
361					_ => return None,
362				},
363				State::Done => break,
364			};
365		}
366		match state {
367			State::ReadingObis => {
368				out.obis = line.get(start_offset..)?;
369				Some((out, &[]))
370			}
371			State::Done => Some((out, line.get(start_offset..)?)),
372			_ => None,
373		}
374	}
375}