Skip to main content

daemon8_ingest/
normalize.rs

1// SPDX-License-Identifier: LicenseRef-FCL-1.0-ALv2
2// Copyright (c) 2026 Havy.tech, LLC
3
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde_json::{Map, Value};
7
8use std::sync::Arc;
9
10use daemon8_types::{Observation, ObservationKind, Origin, Severity, SourceLocation};
11
12/// Normalize arbitrary JSON into an Observation. Extracts recognized meta
13/// fields (kind, channel, severity, file, line, function, app, data) and
14/// maps the remainder into the observation payload.
15pub fn normalize(mut raw: Value) -> Observation {
16    let obj = raw.as_object_mut();
17
18    let (kind_str, channel_str, severity_str, file, line, function, app, explicit_data, meta) =
19        match obj {
20            Some(map) => extract_meta(map),
21            None => (
22                None,
23                None,
24                None,
25                None,
26                None,
27                None,
28                None,
29                None,
30                ObsMeta::default(),
31            ),
32        };
33
34    let severity = parse_severity(severity_str.as_deref());
35
36    let source_location = file.map(|f| SourceLocation {
37        file: f,
38        line: line.unwrap_or(0),
39        function,
40    });
41
42    let origin = Origin::Application {
43        name: app.map(Into::into).unwrap_or_else(|| "unknown".into()),
44    };
45
46    let data = explicit_data.unwrap_or(raw);
47    let kind = resolve_kind(kind_str.as_deref(), channel_str, &data);
48
49    let timestamp_ns = SystemTime::now()
50        .duration_since(UNIX_EPOCH)
51        .unwrap_or_default()
52        .as_nanos() as u64;
53
54    Observation {
55        id: 0,
56        origin,
57        kind,
58        data,
59        severity,
60        source_location,
61        timestamp_ns,
62        correlation_id: meta.correlation_id.map(Arc::from),
63        parent_id: meta.parent_id,
64        tags: meta.tags,
65        session_id: meta.session_id.map(Arc::from),
66        node_id: meta.node_id.map(Arc::from),
67        debug_session_id: None,
68        checkpoint_id: None,
69        error_hash: None,
70    }
71}
72
73#[derive(Default)]
74struct ObsMeta {
75    correlation_id: Option<String>,
76    parent_id: Option<u64>,
77    tags: Option<Vec<String>>,
78    session_id: Option<String>,
79    node_id: Option<String>,
80}
81
82type MetaFields = (
83    Option<String>,
84    Option<String>,
85    Option<String>,
86    Option<String>,
87    Option<u32>,
88    Option<String>,
89    Option<String>,
90    Option<Value>,
91    ObsMeta,
92);
93
94fn extract_meta(map: &mut Map<String, Value>) -> MetaFields {
95    let kind = map
96        .remove("kind")
97        .and_then(|v| v.as_str().map(String::from));
98    let channel = map
99        .remove("channel")
100        .and_then(|v| v.as_str().map(String::from));
101    let severity = map
102        .remove("severity")
103        .and_then(|v| v.as_str().map(String::from));
104    let file = map
105        .remove("file")
106        .and_then(|v| v.as_str().map(String::from));
107    let line = map
108        .remove("line")
109        .and_then(|v| v.as_u64().map(|n| n as u32));
110    let function = map
111        .remove("function")
112        .and_then(|v| v.as_str().map(String::from));
113    let app = map.remove("app").and_then(|v| v.as_str().map(String::from));
114    let data = map.remove("data");
115
116    let correlation_id = map
117        .remove("correlation_id")
118        .and_then(|v| v.as_str().map(String::from));
119    let parent_id = map.remove("parent_id").and_then(|v| v.as_u64());
120    let tags = map.remove("tags").and_then(|v| {
121        v.as_array().map(|arr| {
122            arr.iter()
123                .filter_map(|item| item.as_str().map(String::from))
124                .collect()
125        })
126    });
127    let session_id = map
128        .remove("session_id")
129        .and_then(|v| v.as_str().map(String::from));
130    let node_id = map
131        .remove("node_id")
132        .and_then(|v| v.as_str().map(String::from));
133
134    let meta = ObsMeta {
135        correlation_id,
136        parent_id,
137        tags,
138        session_id,
139        node_id,
140    };
141
142    (
143        kind, channel, severity, file, line, function, app, data, meta,
144    )
145}
146
147pub fn parse_severity(s: Option<&str>) -> Severity {
148    match s {
149        Some(v) => match v.to_ascii_lowercase().as_str() {
150            "trace" => Severity::Trace,
151            "debug" => Severity::Debug,
152            "info" => Severity::Info,
153            "warn" | "warning" => Severity::Warn,
154            "error" => Severity::Error,
155            _ => Severity::Debug,
156        },
157        None => Severity::Debug,
158    }
159}
160
161fn resolve_kind(kind_str: Option<&str>, channel: Option<String>, data: &Value) -> ObservationKind {
162    match kind_str {
163        Some(k) => match k.to_ascii_lowercase().as_str() {
164            "log" => ObservationKind::Log,
165            "query" => try_query(data),
166            "http" => try_http(data),
167            "exception" => try_exception(data),
168            "metric" => try_metric(data),
169            "state_snapshot" => try_state_snapshot(data),
170            "tool_call" => try_tool_call(data),
171            "custom" => ObservationKind::Custom {
172                channel: channel.unwrap_or_else(|| "custom".into()),
173            },
174            _ => ObservationKind::Custom {
175                channel: channel.unwrap_or_else(|| k.to_string()),
176            },
177        },
178        None => match channel {
179            Some(ch) => ObservationKind::Custom { channel: ch },
180            None => ObservationKind::Log,
181        },
182    }
183}
184
185fn try_query(data: &Value) -> ObservationKind {
186    let sql = data.get("sql").and_then(Value::as_str);
187    let duration_ms = data.get("duration_ms").and_then(Value::as_f64);
188    match (sql, duration_ms) {
189        (Some(sql), Some(ms)) => ObservationKind::Query {
190            sql: sql.to_string(),
191            duration_ms: ms,
192        },
193        _ => ObservationKind::Custom {
194            channel: "query".into(),
195        },
196    }
197}
198
199fn try_http(data: &Value) -> ObservationKind {
200    let method = data.get("method").and_then(Value::as_str);
201    let url = data.get("url").and_then(Value::as_str);
202    match (method, url) {
203        (Some(m), Some(u)) => ObservationKind::HttpExchange {
204            method: m.to_string(),
205            url: u.to_string(),
206            status: data.get("status").and_then(Value::as_u64).map(|s| s as u16),
207            duration_ms: data.get("duration_ms").and_then(Value::as_f64),
208        },
209        _ => ObservationKind::Custom {
210            channel: "http".into(),
211        },
212    }
213}
214
215fn try_exception(data: &Value) -> ObservationKind {
216    let message = data.get("message").and_then(Value::as_str);
217    match message {
218        Some(msg) => ObservationKind::Exception {
219            message: msg.to_string(),
220            trace: data.get("trace").and_then(Value::as_str).map(String::from),
221        },
222        None => ObservationKind::Custom {
223            channel: "exception".into(),
224        },
225    }
226}
227
228fn try_tool_call(data: &Value) -> ObservationKind {
229    let tool = data
230        .get("tool")
231        .and_then(Value::as_str)
232        .unwrap_or("unknown")
233        .to_string();
234    let input = data.get("input").cloned().unwrap_or(Value::Null);
235    let output = data.get("output").cloned();
236    let exit_code = data
237        .get("exit_code")
238        .and_then(Value::as_i64)
239        .map(|n| n as i32);
240    let duration_ms = data.get("duration_ms").and_then(Value::as_f64);
241    ObservationKind::ToolCall {
242        tool,
243        input,
244        output,
245        exit_code,
246        duration_ms,
247    }
248}
249
250fn try_metric(data: &Value) -> ObservationKind {
251    let name = data.get("name").and_then(Value::as_str);
252    let value = data.get("value").and_then(Value::as_f64);
253    match (name, value) {
254        (Some(n), Some(v)) => ObservationKind::Metric {
255            name: n.to_string(),
256            value: v,
257        },
258        _ => ObservationKind::Custom {
259            channel: "metric".into(),
260        },
261    }
262}
263
264fn try_state_snapshot(data: &Value) -> ObservationKind {
265    let label = data.get("label").and_then(Value::as_str);
266    match label {
267        Some(l) => ObservationKind::StateSnapshot {
268            label: l.to_string(),
269        },
270        None => ObservationKind::Custom {
271            channel: "state_snapshot".into(),
272        },
273    }
274}