Skip to main content

daemon8_ingest/
normalize.rs

1// SPDX-License-Identifier: LicenseRef-FCL-1.0-ALv2
2// Copyright (c) 2026 Havy.tech, LLC
3
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde_json::{Map, Value};
7
8use std::sync::Arc;
9
10use daemon8_types::{Observation, ObservationKind, Origin, Severity, SourceLocation};
11
12/// Normalize arbitrary JSON into an Observation. Extracts recognized meta
13/// fields (kind, channel, severity, file, line, function, app, data) and
14/// maps the remainder into the observation payload.
15pub fn normalize(mut raw: Value) -> Observation {
16    let obj = raw.as_object_mut();
17
18    let (kind_str, channel_str, severity_str, file, line, function, app, explicit_data, meta) =
19        match obj {
20            Some(map) => extract_meta(map),
21            None => (
22                None,
23                None,
24                None,
25                None,
26                None,
27                None,
28                None,
29                None,
30                ObsMeta::default(),
31            ),
32        };
33
34    let severity = parse_severity(severity_str.as_deref());
35
36    let source_location = file.map(|f| SourceLocation {
37        file: f,
38        line: line.unwrap_or(0),
39        function,
40    });
41
42    let origin = Origin::Application {
43        name: app.map(Into::into).unwrap_or_else(|| "unknown".into()),
44    };
45
46    let data = explicit_data.unwrap_or(raw);
47    let kind = resolve_kind(kind_str.as_deref(), channel_str, &data);
48
49    let timestamp_ns = SystemTime::now()
50        .duration_since(UNIX_EPOCH)
51        .unwrap_or_default()
52        .as_nanos() as u64;
53
54    Observation {
55        id: 0,
56        origin,
57        kind,
58        data,
59        severity,
60        source_location,
61        timestamp_ns,
62        correlation_id: meta.correlation_id.map(Arc::from),
63        parent_id: meta.parent_id,
64        tags: meta.tags,
65        session_id: meta.session_id.map(Arc::from),
66        node_id: meta.node_id.map(Arc::from),
67    }
68}
69
70#[derive(Default)]
71struct ObsMeta {
72    correlation_id: Option<String>,
73    parent_id: Option<u64>,
74    tags: Option<Vec<String>>,
75    session_id: Option<String>,
76    node_id: Option<String>,
77}
78
79type MetaFields = (
80    Option<String>,
81    Option<String>,
82    Option<String>,
83    Option<String>,
84    Option<u32>,
85    Option<String>,
86    Option<String>,
87    Option<Value>,
88    ObsMeta,
89);
90
91fn extract_meta(map: &mut Map<String, Value>) -> MetaFields {
92    let kind = map
93        .remove("kind")
94        .and_then(|v| v.as_str().map(String::from));
95    let channel = map
96        .remove("channel")
97        .and_then(|v| v.as_str().map(String::from));
98    let severity = map
99        .remove("severity")
100        .and_then(|v| v.as_str().map(String::from));
101    let file = map
102        .remove("file")
103        .and_then(|v| v.as_str().map(String::from));
104    let line = map
105        .remove("line")
106        .and_then(|v| v.as_u64().map(|n| n as u32));
107    let function = map
108        .remove("function")
109        .and_then(|v| v.as_str().map(String::from));
110    let app = map.remove("app").and_then(|v| v.as_str().map(String::from));
111    let data = map.remove("data");
112
113    let correlation_id = map
114        .remove("correlation_id")
115        .and_then(|v| v.as_str().map(String::from));
116    let parent_id = map.remove("parent_id").and_then(|v| v.as_u64());
117    let tags = map.remove("tags").and_then(|v| {
118        v.as_array().map(|arr| {
119            arr.iter()
120                .filter_map(|item| item.as_str().map(String::from))
121                .collect()
122        })
123    });
124    let session_id = map
125        .remove("session_id")
126        .and_then(|v| v.as_str().map(String::from));
127    let node_id = map
128        .remove("node_id")
129        .and_then(|v| v.as_str().map(String::from));
130
131    let meta = ObsMeta {
132        correlation_id,
133        parent_id,
134        tags,
135        session_id,
136        node_id,
137    };
138
139    (
140        kind, channel, severity, file, line, function, app, data, meta,
141    )
142}
143
144pub fn parse_severity(s: Option<&str>) -> Severity {
145    match s {
146        Some(v) => match v.to_ascii_lowercase().as_str() {
147            "trace" => Severity::Trace,
148            "debug" => Severity::Debug,
149            "info" => Severity::Info,
150            "warn" | "warning" => Severity::Warn,
151            "error" => Severity::Error,
152            _ => Severity::Debug,
153        },
154        None => Severity::Debug,
155    }
156}
157
158fn resolve_kind(kind_str: Option<&str>, channel: Option<String>, data: &Value) -> ObservationKind {
159    match kind_str {
160        Some(k) => match k.to_ascii_lowercase().as_str() {
161            "log" => ObservationKind::Log,
162            "query" => try_query(data),
163            "http" => try_http(data),
164            "exception" => try_exception(data),
165            "metric" => try_metric(data),
166            "state_snapshot" => try_state_snapshot(data),
167            "custom" => ObservationKind::Custom {
168                channel: channel.unwrap_or_else(|| "custom".into()),
169            },
170            _ => ObservationKind::Custom {
171                channel: channel.unwrap_or_else(|| k.to_string()),
172            },
173        },
174        None => match channel {
175            Some(ch) => ObservationKind::Custom { channel: ch },
176            None => ObservationKind::Log,
177        },
178    }
179}
180
181fn try_query(data: &Value) -> ObservationKind {
182    let sql = data.get("sql").and_then(Value::as_str);
183    let duration_ms = data.get("duration_ms").and_then(Value::as_f64);
184    match (sql, duration_ms) {
185        (Some(sql), Some(ms)) => ObservationKind::Query {
186            sql: sql.to_string(),
187            duration_ms: ms,
188        },
189        _ => ObservationKind::Custom {
190            channel: "query".into(),
191        },
192    }
193}
194
195fn try_http(data: &Value) -> ObservationKind {
196    let method = data.get("method").and_then(Value::as_str);
197    let url = data.get("url").and_then(Value::as_str);
198    match (method, url) {
199        (Some(m), Some(u)) => ObservationKind::HttpExchange {
200            method: m.to_string(),
201            url: u.to_string(),
202            status: data.get("status").and_then(Value::as_u64).map(|s| s as u16),
203            duration_ms: data.get("duration_ms").and_then(Value::as_f64),
204        },
205        _ => ObservationKind::Custom {
206            channel: "http".into(),
207        },
208    }
209}
210
211fn try_exception(data: &Value) -> ObservationKind {
212    let message = data.get("message").and_then(Value::as_str);
213    match message {
214        Some(msg) => ObservationKind::Exception {
215            message: msg.to_string(),
216            trace: data.get("trace").and_then(Value::as_str).map(String::from),
217        },
218        None => ObservationKind::Custom {
219            channel: "exception".into(),
220        },
221    }
222}
223
224fn try_metric(data: &Value) -> ObservationKind {
225    let name = data.get("name").and_then(Value::as_str);
226    let value = data.get("value").and_then(Value::as_f64);
227    match (name, value) {
228        (Some(n), Some(v)) => ObservationKind::Metric {
229            name: n.to_string(),
230            value: v,
231        },
232        _ => ObservationKind::Custom {
233            channel: "metric".into(),
234        },
235    }
236}
237
238fn try_state_snapshot(data: &Value) -> ObservationKind {
239    let label = data.get("label").and_then(Value::as_str);
240    match label {
241        Some(l) => ObservationKind::StateSnapshot {
242            label: l.to_string(),
243        },
244        None => ObservationKind::Custom {
245            channel: "state_snapshot".into(),
246        },
247    }
248}