Skip to main content

coapum_senml/
normalize.rs

1//! SenML normalization - converting packs to resolved form
2
3use crate::{Result, SenMLError, SenMLPack, SenMLRecord, SenMLValue};
4use serde::{Deserialize, Serialize};
5
6/// A normalized SenML pack where all base values have been resolved into individual records
7#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
8pub struct NormalizedPack {
9    /// All records in resolved form (no base values except bver)
10    pub records: Vec<NormalizedRecord>,
11    /// Version information (only base value preserved)
12    pub version: Option<i32>,
13}
14
15/// A fully resolved SenML record with all base values applied
16#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
17pub struct NormalizedRecord {
18    /// Full resolved name (base name + record name)
19    pub name: String,
20    /// Resolved unit (base unit or record unit)
21    pub unit: Option<String>,
22    /// Resolved numeric value (base value + record value)
23    pub value: Option<f64>,
24    /// String value (unchanged)
25    pub string_value: Option<String>,
26    /// Boolean value (unchanged)
27    pub bool_value: Option<bool>,
28    /// Data value (unchanged)  
29    pub data_value: Option<Vec<u8>>,
30    /// Resolved sum (base sum + record sum)
31    pub sum: Option<f64>,
32    /// Resolved timestamp (base time + record time)
33    pub time: Option<f64>,
34    /// Update time (unchanged)
35    pub update_time: Option<f64>,
36}
37
38impl NormalizedPack {
39    /// Create a normalized pack from a regular SenML pack
40    pub fn from_pack(pack: &SenMLPack) -> Self {
41        let mut records = Vec::new();
42
43        if pack.records.is_empty() {
44            return Self {
45                records,
46                version: None,
47            };
48        }
49
50        // Determine if we have base values by checking the first record
51        let first_record = &pack.records[0];
52        let has_base_name = first_record.n.as_ref().map_or(false, |n| n.ends_with('/'));
53        let has_only_base_values = !first_record.has_value() && first_record.s.is_none();
54        let is_base_record = has_base_name || has_only_base_values;
55
56        let (base_name, base_time, base_unit, base_value, base_sum) = if is_base_record {
57            // Extract base values from the base record
58            (
59                first_record.n.clone().unwrap_or_default(),
60                first_record.t.unwrap_or(0.0),
61                first_record.u.clone(),
62                first_record.v.unwrap_or(0.0),
63                first_record.s.unwrap_or(0.0),
64            )
65        } else {
66            // No base record - use empty base values
67            (String::new(), 0.0, None, 0.0, 0.0)
68        };
69
70        let start_index = if is_base_record { 1 } else { 0 };
71
72        // Process each record (skip first if it's a base record)
73        for record in &pack.records[start_index..] {
74            if let Ok(normalized) = Self::normalize_record(
75                record, &base_name, base_time, &base_unit, base_value, base_sum,
76            ) {
77                records.push(normalized);
78            }
79        }
80
81        // Handle the base record itself if it has values beyond just base values
82        // Don't include base records that only contain base values (like base_value=20.0)
83        if is_base_record && first_record.has_value() {
84            // Check if this is a pure base record (name ends with '/') with only base values
85            let is_pure_base = first_record.n.as_ref().map_or(false, |n| n.ends_with('/'));
86
87            // Only include if it's not a pure base record OR if it has sum values
88            if !is_pure_base || first_record.s.is_some() {
89                if let Ok(normalized) = Self::normalize_record(
90                    first_record,
91                    "",    // No base name for base record itself
92                    0.0,   // No base time
93                    &None, // No base unit
94                    0.0,   // No base value
95                    0.0,   // No base sum
96                ) {
97                    records.insert(0, normalized);
98                }
99            }
100        }
101
102        Self {
103            records,
104            version: None, // TODO: Extract from bver if present
105        }
106    }
107
108    /// Normalize a single record with given base values
109    fn normalize_record(
110        record: &SenMLRecord,
111        base_name: &str,
112        base_time: f64,
113        base_unit: &Option<String>,
114        base_value: f64,
115        base_sum: f64,
116    ) -> Result<NormalizedRecord> {
117        // Resolve name
118        let name = match &record.n {
119            Some(n) if !base_name.is_empty() => format!("{}{}", base_name, n),
120            Some(n) => n.clone(),
121            None if !base_name.is_empty() => base_name.to_string(),
122            None => return Err(SenMLError::normalization("Record must have a name")),
123        };
124
125        // Resolve unit (record unit takes precedence)
126        let unit = record.u.clone().or_else(|| base_unit.clone());
127
128        // Resolve numeric value (add base value if both present)
129        let value = match (record.v, base_value != 0.0) {
130            (Some(v), true) => Some(v + base_value),
131            (Some(v), false) => Some(v),
132            (None, _) => None,
133        };
134
135        // Resolve sum (add base sum if both present)
136        let sum = match (record.s, base_sum != 0.0) {
137            (Some(s), true) => Some(s + base_sum),
138            (Some(s), false) => Some(s),
139            (None, _) => None,
140        };
141
142        // Resolve time (add base time if record time is relative)
143        let time = match (record.t, base_time != 0.0) {
144            (Some(t), true) => Some(base_time + t),
145            (Some(t), false) => Some(t),
146            (None, true) => Some(base_time),
147            (None, false) => None,
148        };
149
150        // String, boolean, and data values are not affected by base values
151        let string_value = record.vs.clone();
152        let bool_value = record.vb;
153        let data_value = record.vd.as_ref().and_then(|vd| {
154            // Decode base64 to actual bytes - ignore errors for now
155            base64_decode(vd).ok()
156        });
157
158        Ok(NormalizedRecord {
159            name,
160            unit,
161            value,
162            string_value,
163            bool_value,
164            data_value,
165            sum,
166            time,
167            update_time: record.ut,
168        })
169    }
170
171    /// Convert back to a SenML pack (may not preserve original base structure)
172    pub fn to_pack(&self) -> SenMLPack {
173        let records: Vec<SenMLRecord> = self
174            .records
175            .iter()
176            .map(|nr| {
177                let mut record = SenMLRecord::default();
178                record.n = Some(nr.name.clone());
179                record.u = nr.unit.clone();
180                record.v = nr.value;
181                record.vs = nr.string_value.clone();
182                record.vb = nr.bool_value;
183                record.vd = nr.data_value.as_ref().map(|data| base64_encode(data));
184                record.s = nr.sum;
185                record.t = nr.time;
186                record.ut = nr.update_time;
187                record
188            })
189            .collect();
190
191        SenMLPack { records }
192    }
193
194    /// Get all records with a specific name pattern
195    pub fn records_matching(&self, pattern: &str) -> Vec<&NormalizedRecord> {
196        self.records
197            .iter()
198            .filter(|record| record.name.contains(pattern))
199            .collect()
200    }
201
202    /// Get all records within a time range
203    pub fn records_in_time_range(&self, start: f64, end: f64) -> Vec<&NormalizedRecord> {
204        self.records
205            .iter()
206            .filter(|record| {
207                if let Some(time) = record.time {
208                    time >= start && time <= end
209                } else {
210                    false
211                }
212            })
213            .collect()
214    }
215
216    /// Get the time range of this pack
217    pub fn time_range(&self) -> Option<(f64, f64)> {
218        let times: Vec<f64> = self.records.iter().filter_map(|r| r.time).collect();
219
220        if times.is_empty() {
221            None
222        } else {
223            let min_time = times.iter().fold(f64::INFINITY, |a, &b| a.min(b));
224            let max_time = times.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
225            Some((min_time, max_time))
226        }
227    }
228
229    /// Group records by name prefix
230    pub fn group_by_prefix(&self) -> std::collections::HashMap<String, Vec<&NormalizedRecord>> {
231        let mut groups = std::collections::HashMap::new();
232
233        for record in &self.records {
234            // Extract prefix (everything before the last '/')
235            let prefix = if let Some(pos) = record.name.rfind('/') {
236                record.name[..pos].to_string()
237            } else {
238                "".to_string()
239            };
240
241            groups.entry(prefix).or_insert_with(Vec::new).push(record);
242        }
243
244        groups
245    }
246
247    /// Validate the normalized pack
248    pub fn validate(&self) -> Result<()> {
249        for (i, record) in self.records.iter().enumerate() {
250            record.validate().map_err(|e| {
251                SenMLError::validation(format!("Invalid normalized record at index {}: {}", i, e))
252            })?;
253        }
254        Ok(())
255    }
256}
257
258impl NormalizedRecord {
259    /// Get the primary value from this record
260    pub fn primary_value(&self) -> Option<SenMLValue> {
261        if let Some(v) = self.value {
262            Some(SenMLValue::Number(v))
263        } else if let Some(ref vs) = self.string_value {
264            Some(SenMLValue::String(vs.clone()))
265        } else if let Some(vb) = self.bool_value {
266            Some(SenMLValue::Boolean(vb))
267        } else if let Some(ref vd) = self.data_value {
268            Some(SenMLValue::Data(vd.clone()))
269        } else {
270            None
271        }
272    }
273
274    /// Check if this record has any value
275    pub fn has_value(&self) -> bool {
276        self.value.is_some()
277            || self.string_value.is_some()
278            || self.bool_value.is_some()
279            || self.data_value.is_some()
280    }
281
282    /// Get the base name (everything up to last '/')
283    pub fn base_name(&self) -> Option<&str> {
284        self.name.rfind('/').map(|pos| &self.name[..pos + 1])
285    }
286
287    /// Get the local name (everything after last '/')
288    pub fn local_name(&self) -> &str {
289        if let Some(pos) = self.name.rfind('/') {
290            &self.name[pos + 1..]
291        } else {
292            &self.name
293        }
294    }
295
296    /// Validate this normalized record
297    pub fn validate(&self) -> Result<()> {
298        // Must have a name
299        if self.name.is_empty() {
300            return Err(SenMLError::validation("Normalized record must have a name"));
301        }
302
303        // Must have at least one value or sum
304        if !self.has_value() && self.sum.is_none() {
305            return Err(SenMLError::validation(
306                "Normalized record must have at least one value field",
307            ));
308        }
309
310        // Validate numeric values
311        if let Some(v) = self.value {
312            if !v.is_finite() {
313                return Err(SenMLError::invalid_field_value("value", &v.to_string()));
314            }
315        }
316
317        if let Some(s) = self.sum {
318            if !s.is_finite() {
319                return Err(SenMLError::invalid_field_value("sum", &s.to_string()));
320            }
321        }
322
323        if let Some(t) = self.time {
324            if !t.is_finite() {
325                return Err(SenMLError::invalid_field_value("time", &t.to_string()));
326            }
327        }
328
329        if let Some(ut) = self.update_time {
330            if !ut.is_finite() || ut < 0.0 {
331                return Err(SenMLError::invalid_field_value(
332                    "update_time",
333                    &ut.to_string(),
334                ));
335            }
336        }
337
338        Ok(())
339    }
340}
341
342// Helper functions for base64 encoding/decoding (reused from record.rs)
343fn base64_encode(data: &[u8]) -> String {
344    // Same implementation as in record.rs
345    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
346
347    let mut result = String::new();
348    let chunks = data.chunks_exact(3);
349    let remainder = chunks.remainder();
350
351    for chunk in chunks {
352        let b1 = chunk[0] as u32;
353        let b2 = chunk[1] as u32;
354        let b3 = chunk[2] as u32;
355        let combined = (b1 << 16) | (b2 << 8) | b3;
356
357        result.push(ALPHABET[((combined >> 18) & 0x3F) as usize] as char);
358        result.push(ALPHABET[((combined >> 12) & 0x3F) as usize] as char);
359        result.push(ALPHABET[((combined >> 6) & 0x3F) as usize] as char);
360        result.push(ALPHABET[(combined & 0x3F) as usize] as char);
361    }
362
363    match remainder.len() {
364        1 => {
365            let b1 = remainder[0] as u32;
366            let combined = b1 << 16;
367            result.push(ALPHABET[((combined >> 18) & 0x3F) as usize] as char);
368            result.push(ALPHABET[((combined >> 12) & 0x3F) as usize] as char);
369            result.push_str("==");
370        }
371        2 => {
372            let b1 = remainder[0] as u32;
373            let b2 = remainder[1] as u32;
374            let combined = (b1 << 16) | (b2 << 8);
375            result.push(ALPHABET[((combined >> 18) & 0x3F) as usize] as char);
376            result.push(ALPHABET[((combined >> 12) & 0x3F) as usize] as char);
377            result.push(ALPHABET[((combined >> 6) & 0x3F) as usize] as char);
378            result.push('=');
379        }
380        _ => {}
381    }
382
383    result
384}
385
386fn base64_decode(s: &str) -> std::result::Result<Vec<u8>, &'static str> {
387    let chars: Vec<char> = s.chars().filter(|&c| c != '=').collect();
388    let mut result = Vec::new();
389
390    for chunk in chars.chunks(4) {
391        if chunk.len() < 2 {
392            return Err("Invalid base64");
393        }
394
395        let mut combined = 0u32;
396        for (i, &c) in chunk.iter().enumerate() {
397            let val = match c {
398                'A'..='Z' => (c as u32) - ('A' as u32),
399                'a'..='z' => (c as u32) - ('a' as u32) + 26,
400                '0'..='9' => (c as u32) - ('0' as u32) + 52,
401                '+' => 62,
402                '/' => 63,
403                _ => return Err("Invalid base64 character"),
404            };
405            combined |= val << (6 * (3 - i));
406        }
407
408        result.push((combined >> 16) as u8);
409        if chunk.len() > 2 {
410            result.push((combined >> 8) as u8);
411        }
412        if chunk.len() > 3 {
413            result.push(combined as u8);
414        }
415    }
416
417    Ok(result)
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use crate::{SenMLBuilder, SenMLRecord};
424
425    #[test]
426    fn test_basic_normalization() {
427        let pack = SenMLBuilder::new()
428            .base_name("device1/")
429            .base_time(1640995200.0)
430            .base_unit("Cel")
431            .add_value("temp", 22.5)
432            .build();
433
434        let normalized = pack.normalize();
435
436        assert_eq!(normalized.records.len(), 1);
437        let record = &normalized.records[0];
438        assert_eq!(record.name, "device1/temp");
439        assert_eq!(record.value, Some(22.5));
440        assert_eq!(record.unit, Some("Cel".to_string()));
441        assert_eq!(record.time, Some(1640995200.0));
442    }
443
444    #[test]
445    fn test_normalization_with_base_values() {
446        let pack = SenMLBuilder::new()
447            .base_name("sensor/")
448            .base_time(1000.0)
449            .base_value(20.0)
450            .add_measurement("temp", 2.5, 60.0) // Should become 22.5 at time 1060.0
451            .build();
452
453        let normalized = pack.normalize();
454
455        assert_eq!(normalized.records.len(), 1);
456        let record = &normalized.records[0];
457        assert_eq!(record.name, "sensor/temp");
458        assert_eq!(record.value, Some(22.5)); // 20.0 + 2.5
459        assert_eq!(record.time, Some(1060.0)); // 1000.0 + 60.0
460    }
461
462    #[test]
463    fn test_normalization_without_base_record() {
464        let mut pack = SenMLPack::new();
465        pack.add_record(SenMLRecord::with_value("standalone", 42.0));
466
467        let normalized = pack.normalize();
468
469        assert_eq!(normalized.records.len(), 1);
470        let record = &normalized.records[0];
471        assert_eq!(record.name, "standalone");
472        assert_eq!(record.value, Some(42.0));
473    }
474
475    #[test]
476    fn test_time_range() {
477        let pack = SenMLBuilder::new()
478            .add_measurement("temp1", 20.0, 100.0)
479            .add_measurement("temp2", 25.0, 200.0)
480            .add_measurement("temp3", 30.0, 150.0)
481            .build();
482
483        let normalized = pack.normalize();
484        let range = normalized.time_range();
485
486        assert_eq!(range, Some((100.0, 200.0)));
487    }
488
489    #[test]
490    fn test_records_in_time_range() {
491        let pack = SenMLBuilder::new()
492            .add_measurement("temp1", 20.0, 100.0)
493            .add_measurement("temp2", 25.0, 200.0)
494            .add_measurement("temp3", 30.0, 300.0)
495            .build();
496
497        let normalized = pack.normalize();
498        let filtered = normalized.records_in_time_range(150.0, 250.0);
499
500        assert_eq!(filtered.len(), 1);
501        assert_eq!(filtered[0].name, "temp2");
502    }
503
504    #[test]
505    fn test_group_by_prefix() {
506        let pack = SenMLBuilder::new()
507            .add_value("device1/temp", 20.0)
508            .add_value("device1/humidity", 50.0)
509            .add_value("device2/temp", 25.0)
510            .build();
511
512        let normalized = pack.normalize();
513        let groups = normalized.group_by_prefix();
514
515        assert_eq!(groups.len(), 2);
516        assert!(groups.contains_key("device1"));
517        assert!(groups.contains_key("device2"));
518        assert_eq!(groups["device1"].len(), 2);
519        assert_eq!(groups["device2"].len(), 1);
520    }
521
522    #[test]
523    fn test_local_and_base_name() {
524        let mut pack = SenMLPack::new();
525        pack.add_record(SenMLRecord::with_value("device1/sensor/temperature", 22.5));
526
527        let normalized = pack.normalize();
528        let record = &normalized.records[0];
529
530        assert_eq!(record.base_name(), Some("device1/sensor/"));
531        assert_eq!(record.local_name(), "temperature");
532    }
533
534    #[test]
535    fn test_normalization_validation() {
536        let pack = SenMLBuilder::new().add_value("valid", 25.0).build();
537
538        let normalized = pack.normalize();
539        assert!(normalized.validate().is_ok());
540    }
541
542    #[test]
543    fn test_roundtrip_normalization() {
544        let original = SenMLBuilder::new()
545            .add_value("temp", 22.5)
546            .add_string_value("status", "OK")
547            .build();
548
549        let normalized = original.normalize();
550        let restored = normalized.to_pack();
551
552        // Should have same number of records (though structure may differ)
553        assert_eq!(restored.records.len(), original.records.len());
554    }
555}