dsmr_parse/
telegram.rs

1use core::str;
2use std::io::Read;
3use std::str::FromStr;
4
5pub use error::Error;
6use log::{trace, warn};
7
8use crate::Tst;
9use crate::line_reader::LineReader;
10use crate::unit_value::UnitValue;
11
12mod error;
13
14/// Valid telegram
15#[derive(Debug, PartialOrd, PartialEq, Clone, Default)]
16pub struct Telegram {
17	pub ident: String,
18	pub version: Option<String>,
19	pub electricity_date: Option<Tst>,
20	pub electricity_equipment_id: Option<String>,
21	pub electricity_consumed_tariff_1: Option<UnitValue<f64>>,
22	pub electricity_consumed_tariff_2: Option<UnitValue<f64>>,
23	pub electricity_generated_tariff_1: Option<UnitValue<f64>>,
24	pub electricity_generated_tariff_2: Option<UnitValue<f64>>,
25	pub current_tariff: Option<String>,
26	pub power: Option<UnitValue<f64>>,
27	pub return_power: Option<UnitValue<f64>>,
28	pub power_failure_count: Option<u32>,
29	pub long_power_failure_count: Option<u32>,
30	pub power_failure_log: Vec<PowerFailureEntry>,
31	pub voltage_sag_l1_count: Option<u32>,
32	pub voltage_sag_l2_count: Option<u32>,
33	pub voltage_sag_l3_count: Option<u32>,
34	pub voltage_swell_l1_count: Option<u32>,
35	pub voltage_swell_l2_count: Option<u32>,
36	pub voltage_swell_l3_count: Option<u32>,
37	pub message: Option<String>,
38	pub voltage_l1: Option<UnitValue<f64>>,
39	pub voltage_l2: Option<UnitValue<f64>>,
40	pub voltage_l3: Option<UnitValue<f64>>,
41	pub current_l1: Option<UnitValue<u16>>,
42	pub current_l2: Option<UnitValue<u16>>,
43	pub current_l3: Option<UnitValue<u16>>,
44	pub power_l1: Option<UnitValue<f64>>,
45	pub power_l2: Option<UnitValue<f64>>,
46	pub power_l3: Option<UnitValue<f64>>,
47	pub return_power_l1: Option<UnitValue<f64>>,
48	pub return_power_l2: Option<UnitValue<f64>>,
49	pub return_power_l3: Option<UnitValue<f64>>,
50	pub device_type: Option<String>,
51	pub gas_equipment_id: Option<String>,
52	pub gas_date: Option<Tst>,
53	pub gas_consumed: Option<UnitValue<f64>>,
54}
55
56impl Telegram {
57	/// Try to read a single telegram from a [Read] source
58	///
59	/// See [crate-level](crate) documentation for more details.
60	pub fn read_from(src: impl Read) -> Result<Option<Self>, Error> {
61		enum ParserState {
62			WaitingForHeader,
63			ReadingHeader,
64			ReadingMessage,
65		}
66
67		const CRLF: &[u8] = b"\r\n";
68		let mut parser_state = ParserState::WaitingForHeader;
69		let mut out = Self::default();
70		let mut crc = crc16::State::<crc16::ARC>::new();
71		let mut expected_crc = None;
72		#[expect(clippy::unbuffered_bytes)]
73		let lines = LineReader::new(src.bytes());
74		for line in lines {
75			let mut line = line?;
76			trace!("Got line: {}", String::from_utf8_lossy(&line));
77			match parser_state {
78				ParserState::WaitingForHeader => {
79					if line.starts_with(b"/") {
80						crc.update(&line);
81						crc.update(CRLF);
82						line.remove(0);
83						out.ident = String::from_utf8(line)?;
84						parser_state = ParserState::ReadingHeader;
85					}
86				}
87				ParserState::ReadingHeader => {
88					if line.is_empty() {
89						crc.update(CRLF);
90						parser_state = ParserState::ReadingMessage;
91					} else {
92						parser_state = ParserState::WaitingForHeader;
93					}
94				}
95				ParserState::ReadingMessage => {
96					const CRC_PREFIX: &[u8] = b"!";
97					if let Some(crc_str) = line.strip_prefix(CRC_PREFIX) {
98						crc.update(CRC_PREFIX);
99						expected_crc = Some(u16::from_str_radix(str::from_utf8(crc_str)?, 16)?);
100						break;
101					} else {
102						crc.update(&line);
103						crc.update(CRLF);
104						if let Some(line) = ParsedLine::parse(&line) {
105							match line.obis.obis {
106								b"0.2.8" => out.version = Some(line.value_str().to_string()),
107								b"1.0.0" => out.electricity_date = Tst::try_from_bytes(line.value),
108								b"96.1.1" => out.electricity_equipment_id = parse_octet_string(line.value),
109								b"1.8.1" => out.electricity_consumed_tariff_1 = Some(line.value_str().parse()?),
110								b"1.8.2" => out.electricity_consumed_tariff_2 = Some(line.value_str().parse()?),
111								b"2.8.1" => out.electricity_generated_tariff_1 = Some(line.value_str().parse()?),
112								b"2.8.2" => out.electricity_generated_tariff_2 = Some(line.value_str().parse()?),
113								b"96.14.0" => out.current_tariff = parse_octet_string(line.value),
114								b"1.7.0" => out.power = Some(line.value_str().parse()?),
115								b"2.7.0" => out.return_power = Some(line.value_str().parse()?),
116								b"96.7.21" => out.power_failure_count = Some(line.value_str().parse()?),
117								b"96.7.9" => out.long_power_failure_count = Some(line.value_str().parse()?),
118								b"99.97.0" => {
119									out.power_failure_log = parse_buffer(line.value_str())
120										.into_iter()
121										.map(|(end_date, duration)| PowerFailureEntry { end_date, duration })
122										.collect()
123								}
124								b"32.32.0" => out.voltage_sag_l1_count = Some(line.value_str().parse()?),
125								b"52.32.0" => out.voltage_sag_l2_count = Some(line.value_str().parse()?),
126								b"72.32.0" => out.voltage_sag_l3_count = Some(line.value_str().parse()?),
127								b"32.36.0" => out.voltage_swell_l1_count = Some(line.value_str().parse()?),
128								b"52.36.0" => out.voltage_swell_l2_count = Some(line.value_str().parse()?),
129								b"72.36.0" => out.voltage_swell_l3_count = Some(line.value_str().parse()?),
130								b"96.13.0" => out.message = parse_octet_string(line.value),
131								b"32.7.0" => out.voltage_l1 = Some(line.value_str().parse()?),
132								b"52.7.0" => out.voltage_l2 = Some(line.value_str().parse()?),
133								b"72.7.0" => out.voltage_l3 = Some(line.value_str().parse()?),
134								b"31.7.0" => out.current_l1 = Some(line.value_str().parse()?),
135								b"51.7.0" => out.current_l2 = Some(line.value_str().parse()?),
136								b"71.7.0" => out.current_l3 = Some(line.value_str().parse()?),
137								b"21.7.0" => out.power_l1 = Some(line.value_str().parse()?),
138								b"41.7.0" => out.power_l2 = Some(line.value_str().parse()?),
139								b"61.7.0" => out.power_l3 = Some(line.value_str().parse()?),
140								b"22.7.0" => out.return_power_l1 = Some(line.value_str().parse()?),
141								b"42.7.0" => out.return_power_l2 = Some(line.value_str().parse()?),
142								b"62.7.0" => out.return_power_l3 = Some(line.value_str().parse()?),
143								b"24.1.0" => out.device_type = Some(line.value_str().to_string()),
144								b"96.1.0" => out.gas_equipment_id = parse_octet_string(line.value),
145								b"24.2.1" => {
146									let (gas_date, gas_consumed) = parse_mbus_value(line.value_str());
147									out.gas_date = Tst::try_from_bytes(gas_date.as_bytes());
148									if let Some(gas_consumed) = gas_consumed {
149										out.gas_consumed = Some(gas_consumed.parse()?);
150									}
151								}
152								_ => warn!(
153									"Unknown OBIS: {}-{}-{} with value: {}",
154									line.obis.medium,
155									line.obis.channel,
156									String::from_utf8_lossy(line.obis.obis),
157									String::from_utf8_lossy(line.value),
158								),
159							}
160						}
161					}
162				}
163			}
164		}
165
166		if let Some(expected_crc) = expected_crc {
167			let actual_crc = crc.get();
168			if actual_crc == expected_crc {
169				Ok(Some(out))
170			} else {
171				Err(Error::CrcMismatch(actual_crc, expected_crc))
172			}
173		} else {
174			Ok(None)
175		}
176	}
177}
178
179#[derive(Debug, PartialOrd, PartialEq, Clone)]
180pub struct PowerFailureEntry {
181	pub end_date: Tst,
182	pub duration: UnitValue<u64>,
183}
184
185fn parse_mbus_value(value: &str) -> (&str, Option<&str>) {
186	value
187		.split_once(")(")
188		.map_or((value, None), |(val1, val2)| (val1, Some(val2)))
189}
190
191fn parse_buffer(value: &str) -> Vec<(Tst, UnitValue<u64>)> {
192	let mut out = vec![];
193	let mut parts = value.split(")(");
194	'extract: {
195		let Some(count) = parts.next() else {
196			break 'extract;
197		};
198		let Ok(count) = u8::from_str(count) else {
199			break 'extract;
200		};
201		out.reserve(usize::from(count));
202		let Some(obis) = parts.next() else {
203			break 'extract;
204		};
205		let Some((obis, _)) = ParsedObis::parse(obis.as_bytes()) else {
206			break 'extract;
207		};
208		if obis.obis != b"96.7.19" {
209			break 'extract;
210		}
211		while let Some((date, value)) = parts.next().zip(parts.next()) {
212			let Some(date) = Tst::try_from_bytes(date.as_bytes()) else {
213				continue;
214			};
215			let Ok(value) = UnitValue::from_str(value) else {
216				continue;
217			};
218			out.push((date, value));
219		}
220	}
221	out
222}
223
224fn parse_octet_string(value: &[u8]) -> Option<String> {
225	let mut out = String::with_capacity(value.len() / 2);
226	for bytes in value.chunks(2) {
227		out.push(char::from(u8::from_str_radix(str::from_utf8(bytes).ok()?, 16).ok()?));
228	}
229	Some(out)
230}
231
232struct ParsedLine<'l> {
233	pub obis: ParsedObis<'l>,
234	pub value: &'l [u8],
235}
236
237impl ParsedLine<'_> {
238	fn parse(line: &[u8]) -> Option<ParsedLine> {
239		#[expect(clippy::enum_variant_names)]
240		enum State {
241			WaitingForValue,
242			ReadingValue,
243			WaitingForAnotherValue,
244		}
245
246		let (obis, line) = ParsedObis::parse(line)?;
247		let mut out = ParsedLine { obis, value: &[] };
248		let mut state = State::WaitingForValue;
249		let mut start_offset = 0;
250		for (offset, byte) in line.iter().enumerate() {
251			let c = char::from(*byte);
252			state = match state {
253				State::WaitingForValue => match c {
254					'(' => {
255						start_offset = offset + 1;
256						State::ReadingValue
257					}
258					_ => return None,
259				},
260				State::ReadingValue => match c {
261					')' => {
262						out.value = line.get(start_offset..offset)?;
263						State::WaitingForAnotherValue
264					}
265					_ => State::ReadingValue,
266				},
267				State::WaitingForAnotherValue => match c {
268					'(' => State::ReadingValue,
269					_ => return None,
270				},
271			};
272		}
273		if matches!(state, State::WaitingForAnotherValue) {
274			Some(out)
275		} else {
276			None
277		}
278	}
279
280	pub fn value_str(&self) -> &str {
281		str::from_utf8(self.value).unwrap_or("")
282	}
283}
284
285#[derive(Debug)]
286struct ParsedObis<'l> {
287	/// Group A: Specifies the medium of the object (0= Abstract Objects, 1=Electricity, 7=gas, Etc.)
288	pub medium: u8,
289	/// Group B: Specifies the channel (useful, for example, when a data concentrator is connected to several meters).
290	pub channel: u8,
291	/// Group C: Specifies the Physical Value (Current, Voltage, Energy, etc.).
292	/// Group D: Identifies types, or the result of the processing of physical quantities identified by values in value groups A and C, according to various specific algorithms.
293	/// Group E: Identifies further processing or classification of quantities identified by values in value groups A to D.
294	pub obis: &'l [u8],
295}
296
297impl ParsedObis<'_> {
298	fn parse(line: &[u8]) -> Option<(ParsedObis, &[u8])> {
299		enum State {
300			WaitingForObisMedium,
301			ReadingObisMedium,
302			WaitingForObisChannel,
303			ReadingObisChannel,
304			WaitingForObis,
305			ReadingObis,
306			Done,
307		}
308
309		let mut out = ParsedObis {
310			medium: 0,
311			channel: 0,
312			obis: &[],
313		};
314		let mut state = State::WaitingForObisMedium;
315		let mut start_offset = 0;
316		// todo: more careful parsing of obis sub-values
317		for (offset, byte) in line.iter().enumerate() {
318			let c = char::from(*byte);
319			state = match state {
320				State::WaitingForObisMedium => match c {
321					c if c.is_ascii_digit() => {
322						start_offset = offset;
323						State::ReadingObisMedium
324					}
325					_ => return None,
326				},
327				State::ReadingObisMedium => match c {
328					c if c.is_ascii_digit() => State::ReadingObisMedium,
329					'-' => {
330						out.medium = str::from_utf8(line.get(start_offset..offset)?).ok()?.parse().ok()?;
331						State::WaitingForObisChannel
332					}
333					_ => return None,
334				},
335				State::WaitingForObisChannel => match c {
336					c if c.is_ascii_digit() => {
337						start_offset = offset;
338						State::ReadingObisChannel
339					}
340					_ => return None,
341				},
342				State::ReadingObisChannel => match c {
343					c if c.is_ascii_digit() => State::ReadingObisChannel,
344					':' => {
345						out.channel = str::from_utf8(line.get(start_offset..offset)?).ok()?.parse().ok()?;
346						State::WaitingForObis
347					}
348					_ => return None,
349				},
350				State::WaitingForObis => match c {
351					c if c.is_ascii_digit() => {
352						start_offset = offset;
353						State::ReadingObis
354					}
355					_ => return None,
356				},
357				State::ReadingObis => match c {
358					c if c.is_ascii_digit() || c == '.' => State::ReadingObis,
359					'(' => {
360						out.obis = line.get(start_offset..offset)?;
361						start_offset = offset;
362						State::Done
363					}
364					_ => return None,
365				},
366				State::Done => break,
367			};
368		}
369		match state {
370			State::ReadingObis => {
371				out.obis = line.get(start_offset..)?;
372				Some((out, &[]))
373			}
374			State::Done => Some((out, line.get(start_offset..)?)),
375			_ => None,
376		}
377	}
378}