1use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde_json::{Map, Value};
7
8use std::sync::Arc;
9
10use daemon8_types::{Observation, ObservationKind, Origin, Severity, SourceLocation};
11
12pub fn normalize(mut raw: Value) -> Observation {
16 let obj = raw.as_object_mut();
17
18 let (kind_str, channel_str, severity_str, file, line, function, app, explicit_data, meta) =
19 match obj {
20 Some(map) => extract_meta(map),
21 None => (
22 None,
23 None,
24 None,
25 None,
26 None,
27 None,
28 None,
29 None,
30 ObsMeta::default(),
31 ),
32 };
33
34 let severity = parse_severity(severity_str.as_deref());
35
36 let source_location = file.map(|f| SourceLocation {
37 file: f,
38 line: line.unwrap_or(0),
39 function,
40 });
41
42 let origin = Origin::Application {
43 name: app.map(Into::into).unwrap_or_else(|| "unknown".into()),
44 };
45
46 let data = explicit_data.unwrap_or(raw);
47 let kind = resolve_kind(kind_str.as_deref(), channel_str, &data);
48
49 let timestamp_ns = SystemTime::now()
50 .duration_since(UNIX_EPOCH)
51 .unwrap_or_default()
52 .as_nanos() as u64;
53
54 Observation {
55 id: 0,
56 origin,
57 kind,
58 data,
59 severity,
60 source_location,
61 timestamp_ns,
62 correlation_id: meta.correlation_id.map(Arc::from),
63 parent_id: meta.parent_id,
64 tags: meta.tags,
65 session_id: meta.session_id.map(Arc::from),
66 node_id: meta.node_id.map(Arc::from),
67 }
68}
69
70#[derive(Default)]
71struct ObsMeta {
72 correlation_id: Option<String>,
73 parent_id: Option<u64>,
74 tags: Option<Vec<String>>,
75 session_id: Option<String>,
76 node_id: Option<String>,
77}
78
79type MetaFields = (
80 Option<String>,
81 Option<String>,
82 Option<String>,
83 Option<String>,
84 Option<u32>,
85 Option<String>,
86 Option<String>,
87 Option<Value>,
88 ObsMeta,
89);
90
91fn extract_meta(map: &mut Map<String, Value>) -> MetaFields {
92 let kind = map
93 .remove("kind")
94 .and_then(|v| v.as_str().map(String::from));
95 let channel = map
96 .remove("channel")
97 .and_then(|v| v.as_str().map(String::from));
98 let severity = map
99 .remove("severity")
100 .and_then(|v| v.as_str().map(String::from));
101 let file = map
102 .remove("file")
103 .and_then(|v| v.as_str().map(String::from));
104 let line = map
105 .remove("line")
106 .and_then(|v| v.as_u64().map(|n| n as u32));
107 let function = map
108 .remove("function")
109 .and_then(|v| v.as_str().map(String::from));
110 let app = map.remove("app").and_then(|v| v.as_str().map(String::from));
111 let data = map.remove("data");
112
113 let correlation_id = map
114 .remove("correlation_id")
115 .and_then(|v| v.as_str().map(String::from));
116 let parent_id = map.remove("parent_id").and_then(|v| v.as_u64());
117 let tags = map.remove("tags").and_then(|v| {
118 v.as_array().map(|arr| {
119 arr.iter()
120 .filter_map(|item| item.as_str().map(String::from))
121 .collect()
122 })
123 });
124 let session_id = map
125 .remove("session_id")
126 .and_then(|v| v.as_str().map(String::from));
127 let node_id = map
128 .remove("node_id")
129 .and_then(|v| v.as_str().map(String::from));
130
131 let meta = ObsMeta {
132 correlation_id,
133 parent_id,
134 tags,
135 session_id,
136 node_id,
137 };
138
139 (
140 kind, channel, severity, file, line, function, app, data, meta,
141 )
142}
143
144pub fn parse_severity(s: Option<&str>) -> Severity {
145 match s {
146 Some(v) => match v.to_ascii_lowercase().as_str() {
147 "trace" => Severity::Trace,
148 "debug" => Severity::Debug,
149 "info" => Severity::Info,
150 "warn" | "warning" => Severity::Warn,
151 "error" => Severity::Error,
152 _ => Severity::Debug,
153 },
154 None => Severity::Debug,
155 }
156}
157
158fn resolve_kind(kind_str: Option<&str>, channel: Option<String>, data: &Value) -> ObservationKind {
159 match kind_str {
160 Some(k) => match k.to_ascii_lowercase().as_str() {
161 "log" => ObservationKind::Log,
162 "query" => try_query(data),
163 "http" => try_http(data),
164 "exception" => try_exception(data),
165 "metric" => try_metric(data),
166 "state_snapshot" => try_state_snapshot(data),
167 "custom" => ObservationKind::Custom {
168 channel: channel.unwrap_or_else(|| "custom".into()),
169 },
170 _ => ObservationKind::Custom {
171 channel: channel.unwrap_or_else(|| k.to_string()),
172 },
173 },
174 None => match channel {
175 Some(ch) => ObservationKind::Custom { channel: ch },
176 None => ObservationKind::Log,
177 },
178 }
179}
180
181fn try_query(data: &Value) -> ObservationKind {
182 let sql = data.get("sql").and_then(Value::as_str);
183 let duration_ms = data.get("duration_ms").and_then(Value::as_f64);
184 match (sql, duration_ms) {
185 (Some(sql), Some(ms)) => ObservationKind::Query {
186 sql: sql.to_string(),
187 duration_ms: ms,
188 },
189 _ => ObservationKind::Custom {
190 channel: "query".into(),
191 },
192 }
193}
194
195fn try_http(data: &Value) -> ObservationKind {
196 let method = data.get("method").and_then(Value::as_str);
197 let url = data.get("url").and_then(Value::as_str);
198 match (method, url) {
199 (Some(m), Some(u)) => ObservationKind::HttpExchange {
200 method: m.to_string(),
201 url: u.to_string(),
202 status: data.get("status").and_then(Value::as_u64).map(|s| s as u16),
203 duration_ms: data.get("duration_ms").and_then(Value::as_f64),
204 },
205 _ => ObservationKind::Custom {
206 channel: "http".into(),
207 },
208 }
209}
210
211fn try_exception(data: &Value) -> ObservationKind {
212 let message = data.get("message").and_then(Value::as_str);
213 match message {
214 Some(msg) => ObservationKind::Exception {
215 message: msg.to_string(),
216 trace: data.get("trace").and_then(Value::as_str).map(String::from),
217 },
218 None => ObservationKind::Custom {
219 channel: "exception".into(),
220 },
221 }
222}
223
224fn try_metric(data: &Value) -> ObservationKind {
225 let name = data.get("name").and_then(Value::as_str);
226 let value = data.get("value").and_then(Value::as_f64);
227 match (name, value) {
228 (Some(n), Some(v)) => ObservationKind::Metric {
229 name: n.to_string(),
230 value: v,
231 },
232 _ => ObservationKind::Custom {
233 channel: "metric".into(),
234 },
235 }
236}
237
238fn try_state_snapshot(data: &Value) -> ObservationKind {
239 let label = data.get("label").and_then(Value::as_str);
240 match label {
241 Some(l) => ObservationKind::StateSnapshot {
242 label: l.to_string(),
243 },
244 None => ObservationKind::Custom {
245 channel: "state_snapshot".into(),
246 },
247 }
248}