1use std::time::{SystemTime, UNIX_EPOCH};
5
6use serde_json::{Map, Value};
7
8use std::sync::Arc;
9
10use daemon8_types::{Observation, ObservationKind, Origin, Severity, SourceLocation};
11
12pub fn normalize(mut raw: Value) -> Observation {
16 let obj = raw.as_object_mut();
17
18 let (kind_str, channel_str, severity_str, file, line, function, app, explicit_data, meta) =
19 match obj {
20 Some(map) => extract_meta(map),
21 None => (
22 None,
23 None,
24 None,
25 None,
26 None,
27 None,
28 None,
29 None,
30 ObsMeta::default(),
31 ),
32 };
33
34 let severity = parse_severity(severity_str.as_deref());
35
36 let source_location = file.map(|f| SourceLocation {
37 file: f,
38 line: line.unwrap_or(0),
39 function,
40 });
41
42 let origin = Origin::Application {
43 name: app.map(Into::into).unwrap_or_else(|| "unknown".into()),
44 };
45
46 let data = explicit_data.unwrap_or(raw);
47 let kind = resolve_kind(kind_str.as_deref(), channel_str, &data);
48
49 let timestamp_ns = SystemTime::now()
50 .duration_since(UNIX_EPOCH)
51 .unwrap_or_default()
52 .as_nanos() as u64;
53
54 Observation {
55 id: 0,
56 origin,
57 kind,
58 data,
59 severity,
60 source_location,
61 timestamp_ns,
62 correlation_id: meta.correlation_id.map(Arc::from),
63 parent_id: meta.parent_id,
64 tags: meta.tags,
65 session_id: meta.session_id.map(Arc::from),
66 node_id: meta.node_id.map(Arc::from),
67 debug_session_id: None,
68 checkpoint_id: None,
69 error_hash: None,
70 }
71}
72
73#[derive(Default)]
74struct ObsMeta {
75 correlation_id: Option<String>,
76 parent_id: Option<u64>,
77 tags: Option<Vec<String>>,
78 session_id: Option<String>,
79 node_id: Option<String>,
80}
81
82type MetaFields = (
83 Option<String>,
84 Option<String>,
85 Option<String>,
86 Option<String>,
87 Option<u32>,
88 Option<String>,
89 Option<String>,
90 Option<Value>,
91 ObsMeta,
92);
93
94fn extract_meta(map: &mut Map<String, Value>) -> MetaFields {
95 let kind = map
96 .remove("kind")
97 .and_then(|v| v.as_str().map(String::from));
98 let channel = map
99 .remove("channel")
100 .and_then(|v| v.as_str().map(String::from));
101 let severity = map
102 .remove("severity")
103 .and_then(|v| v.as_str().map(String::from));
104 let file = map
105 .remove("file")
106 .and_then(|v| v.as_str().map(String::from));
107 let line = map
108 .remove("line")
109 .and_then(|v| v.as_u64().map(|n| n as u32));
110 let function = map
111 .remove("function")
112 .and_then(|v| v.as_str().map(String::from));
113 let app = map.remove("app").and_then(|v| v.as_str().map(String::from));
114 let data = map.remove("data");
115
116 let correlation_id = map
117 .remove("correlation_id")
118 .and_then(|v| v.as_str().map(String::from));
119 let parent_id = map.remove("parent_id").and_then(|v| v.as_u64());
120 let tags = map.remove("tags").and_then(|v| {
121 v.as_array().map(|arr| {
122 arr.iter()
123 .filter_map(|item| item.as_str().map(String::from))
124 .collect()
125 })
126 });
127 let session_id = map
128 .remove("session_id")
129 .and_then(|v| v.as_str().map(String::from));
130 let node_id = map
131 .remove("node_id")
132 .and_then(|v| v.as_str().map(String::from));
133
134 let meta = ObsMeta {
135 correlation_id,
136 parent_id,
137 tags,
138 session_id,
139 node_id,
140 };
141
142 (
143 kind, channel, severity, file, line, function, app, data, meta,
144 )
145}
146
147pub fn parse_severity(s: Option<&str>) -> Severity {
148 match s {
149 Some(v) => match v.to_ascii_lowercase().as_str() {
150 "trace" => Severity::Trace,
151 "debug" => Severity::Debug,
152 "info" => Severity::Info,
153 "warn" | "warning" => Severity::Warn,
154 "error" => Severity::Error,
155 _ => Severity::Debug,
156 },
157 None => Severity::Debug,
158 }
159}
160
161fn resolve_kind(kind_str: Option<&str>, channel: Option<String>, data: &Value) -> ObservationKind {
162 match kind_str {
163 Some(k) => match k.to_ascii_lowercase().as_str() {
164 "log" => ObservationKind::Log,
165 "query" => try_query(data),
166 "http" => try_http(data),
167 "exception" => try_exception(data),
168 "metric" => try_metric(data),
169 "state_snapshot" => try_state_snapshot(data),
170 "tool_call" => try_tool_call(data),
171 "custom" => ObservationKind::Custom {
172 channel: channel.unwrap_or_else(|| "custom".into()),
173 },
174 _ => ObservationKind::Custom {
175 channel: channel.unwrap_or_else(|| k.to_string()),
176 },
177 },
178 None => match channel {
179 Some(ch) => ObservationKind::Custom { channel: ch },
180 None => ObservationKind::Log,
181 },
182 }
183}
184
185fn try_query(data: &Value) -> ObservationKind {
186 let sql = data.get("sql").and_then(Value::as_str);
187 let duration_ms = data.get("duration_ms").and_then(Value::as_f64);
188 match (sql, duration_ms) {
189 (Some(sql), Some(ms)) => ObservationKind::Query {
190 sql: sql.to_string(),
191 duration_ms: ms,
192 },
193 _ => ObservationKind::Custom {
194 channel: "query".into(),
195 },
196 }
197}
198
199fn try_http(data: &Value) -> ObservationKind {
200 let method = data.get("method").and_then(Value::as_str);
201 let url = data.get("url").and_then(Value::as_str);
202 match (method, url) {
203 (Some(m), Some(u)) => ObservationKind::HttpExchange {
204 method: m.to_string(),
205 url: u.to_string(),
206 status: data.get("status").and_then(Value::as_u64).map(|s| s as u16),
207 duration_ms: data.get("duration_ms").and_then(Value::as_f64),
208 },
209 _ => ObservationKind::Custom {
210 channel: "http".into(),
211 },
212 }
213}
214
215fn try_exception(data: &Value) -> ObservationKind {
216 let message = data.get("message").and_then(Value::as_str);
217 match message {
218 Some(msg) => ObservationKind::Exception {
219 message: msg.to_string(),
220 trace: data.get("trace").and_then(Value::as_str).map(String::from),
221 },
222 None => ObservationKind::Custom {
223 channel: "exception".into(),
224 },
225 }
226}
227
228fn try_tool_call(data: &Value) -> ObservationKind {
229 let tool = data
230 .get("tool")
231 .and_then(Value::as_str)
232 .unwrap_or("unknown")
233 .to_string();
234 let input = data.get("input").cloned().unwrap_or(Value::Null);
235 let output = data.get("output").cloned();
236 let exit_code = data
237 .get("exit_code")
238 .and_then(Value::as_i64)
239 .map(|n| n as i32);
240 let duration_ms = data.get("duration_ms").and_then(Value::as_f64);
241 ObservationKind::ToolCall {
242 tool,
243 input,
244 output,
245 exit_code,
246 duration_ms,
247 }
248}
249
250fn try_metric(data: &Value) -> ObservationKind {
251 let name = data.get("name").and_then(Value::as_str);
252 let value = data.get("value").and_then(Value::as_f64);
253 match (name, value) {
254 (Some(n), Some(v)) => ObservationKind::Metric {
255 name: n.to_string(),
256 value: v,
257 },
258 _ => ObservationKind::Custom {
259 channel: "metric".into(),
260 },
261 }
262}
263
264fn try_state_snapshot(data: &Value) -> ObservationKind {
265 let label = data.get("label").and_then(Value::as_str);
266 match label {
267 Some(l) => ObservationKind::StateSnapshot {
268 label: l.to_string(),
269 },
270 None => ObservationKind::Custom {
271 channel: "state_snapshot".into(),
272 },
273 }
274}