1use crate::{LogEntry, LogLevel, Query};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12pub enum EventSourceKind {
13 Stderr,
14 Csvlog,
15 Jsonlog,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct SourceReference {
21 pub source_kind: EventSourceKind,
22 pub record_index: usize,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct SessionIdentity {
28 pub process_id: String,
29 pub user: Option<String>,
30 pub database: Option<String>,
31 pub client_host: Option<String>,
32 pub application_name: Option<String>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct StatementEvent {
38 pub statement: String,
39 pub queries: Vec<Query>,
40 pub duration_ms: Option<f64>,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
45pub struct DurationEvent {
46 pub duration_ms: f64,
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct ErrorEvent {
52 pub message: String,
53 pub sqlstate: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
58pub enum EventKind {
59 Statement(StatementEvent),
60 Duration(DurationEvent),
61 Error(ErrorEvent),
62 Log { level: LogLevel, message: String },
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct NormalizedEvent {
68 pub event_id: String,
69 pub timestamp: DateTime<Utc>,
70 pub source: SourceReference,
71 pub session: SessionIdentity,
72 pub queryid: Option<String>,
73 pub kind: EventKind,
74}
75
76impl NormalizedEvent {
77 pub fn from_log_entry(
78 entry: &LogEntry,
79 source_kind: EventSourceKind,
80 record_index: usize,
81 ) -> Self {
82 let source = SourceReference {
83 source_kind,
84 record_index,
85 };
86
87 let session = SessionIdentity {
88 process_id: entry.process_id.clone(),
89 user: entry.user.clone(),
90 database: entry.database.clone(),
91 client_host: entry.client_host.clone(),
92 application_name: entry.application_name.clone(),
93 };
94
95 let kind = if entry.is_query() {
96 EventKind::Statement(StatementEvent {
97 statement: entry
98 .message
99 .strip_prefix("statement: ")
100 .unwrap_or(&entry.message)
101 .to_string(),
102 queries: entry.queries.clone().unwrap_or_default(),
103 duration_ms: entry.duration,
104 })
105 } else if entry.is_duration() {
106 EventKind::Duration(DurationEvent {
107 duration_ms: entry.duration.unwrap_or(0.0),
108 })
109 } else if entry.is_error() {
110 EventKind::Error(ErrorEvent {
111 message: entry.message.clone(),
112 sqlstate: None,
113 })
114 } else {
115 EventKind::Log {
116 level: entry.message_type.clone(),
117 message: entry.message.clone(),
118 }
119 };
120
121 Self {
122 event_id: format!(
123 "{}:{}",
124 match source_kind {
125 EventSourceKind::Stderr => "stderr",
126 EventSourceKind::Csvlog => "csvlog",
127 EventSourceKind::Jsonlog => "jsonlog",
128 },
129 record_index
130 ),
131 timestamp: entry.timestamp,
132 source,
133 session,
134 queryid: None,
135 kind,
136 }
137 }
138
139 pub fn is_query(&self) -> bool {
140 matches!(self.kind, EventKind::Statement(_))
141 }
142
143 pub fn is_error(&self) -> bool {
144 matches!(self.kind, EventKind::Error(_))
145 }
146
147 pub fn duration_ms(&self) -> Option<f64> {
148 match &self.kind {
149 EventKind::Statement(statement) => statement.duration_ms,
150 EventKind::Duration(duration) => Some(duration.duration_ms),
151 _ => None,
152 }
153 }
154
155 pub fn queries(&self) -> Option<&[Query]> {
156 match &self.kind {
157 EventKind::Statement(statement) => Some(&statement.queries),
158 _ => None,
159 }
160 }
161
162 pub fn normalized_query(&self) -> Option<String> {
163 let queries = self.queries()?;
164 if queries.is_empty() {
165 return None;
166 }
167
168 Some(
169 queries
170 .iter()
171 .map(|query| query.normalized_query.clone())
172 .collect::<Vec<_>>()
173 .join(";"),
174 )
175 }
176
177 pub fn message(&self) -> &str {
178 match &self.kind {
179 EventKind::Statement(statement) => &statement.statement,
180 EventKind::Duration(_) => "",
181 EventKind::Error(error) => &error.message,
182 EventKind::Log { message, .. } => message,
183 }
184 }
185}
186
187pub fn normalize_log_entries(
188 entries: &[LogEntry],
189 source_kind: EventSourceKind,
190) -> Vec<NormalizedEvent> {
191 entries
192 .iter()
193 .enumerate()
194 .map(|(record_index, entry)| {
195 NormalizedEvent::from_log_entry(entry, source_kind, record_index)
196 })
197 .collect()
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use crate::{LogEntry, LogLevel};
204 use chrono::TimeZone;
205
206 fn entry(
207 message_type: LogLevel,
208 message: &str,
209 duration: Option<f64>,
210 queries: Option<Vec<Query>>,
211 ) -> LogEntry {
212 LogEntry {
213 timestamp: Utc.with_ymd_and_hms(2024, 8, 15, 10, 30, 0).unwrap(),
214 process_id: "12345".to_string(),
215 user: Some("postgres".to_string()),
216 database: Some("testdb".to_string()),
217 client_host: Some("10.0.0.10".to_string()),
218 application_name: Some("psql".to_string()),
219 message_type,
220 message: message.to_string(),
221 queries,
222 duration,
223 }
224 }
225
226 #[test]
227 fn converts_statement_entries_into_normalized_events() {
228 let entry = entry(
229 LogLevel::Statement,
230 "statement: SELECT * FROM users WHERE id = 1",
231 Some(42.0),
232 crate::Query::from_sql("SELECT * FROM users WHERE id = 1").ok(),
233 );
234
235 let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 7);
236
237 assert_eq!(event.event_id, "stderr:7");
238 assert_eq!(event.source.record_index, 7);
239 assert_eq!(event.source.source_kind, EventSourceKind::Stderr);
240 assert_eq!(event.session.process_id, "12345");
241 assert_eq!(event.session.user.as_deref(), Some("postgres"));
242 assert_eq!(event.session.database.as_deref(), Some("testdb"));
243 assert_eq!(event.session.client_host.as_deref(), Some("10.0.0.10"));
244 assert_eq!(event.session.application_name.as_deref(), Some("psql"));
245 assert!(event.is_query());
246 assert_eq!(event.duration_ms(), Some(42.0));
247 assert_eq!(event.message(), "SELECT * FROM users WHERE id = 1");
248 assert_eq!(
249 event.normalized_query().as_deref(),
250 Some("SELECT * FROM users WHERE id = ?")
251 );
252 }
253
254 #[test]
255 fn converts_duration_entries_into_duration_events() {
256 let entry = entry(
257 LogLevel::Duration,
258 "duration: 15.234 ms",
259 Some(15.234),
260 None,
261 );
262
263 let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 1);
264
265 assert_eq!(event.event_id, "stderr:1");
266 assert!(!event.is_query());
267 assert_eq!(event.duration_ms(), Some(15.234));
268 assert!(matches!(
269 event.kind,
270 EventKind::Duration(DurationEvent {
271 duration_ms: 15.234
272 })
273 ));
274 }
275
276 #[test]
277 fn converts_error_entries_into_error_events() {
278 let entry = entry(
279 LogLevel::Error,
280 "relation \"missing_table\" does not exist",
281 None,
282 None,
283 );
284
285 let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 2);
286
287 assert!(event.is_error());
288 assert_eq!(event.message(), "relation \"missing_table\" does not exist");
289 match event.kind {
290 EventKind::Error(error) => {
291 assert_eq!(error.message, "relation \"missing_table\" does not exist");
292 assert_eq!(error.sqlstate, None);
293 }
294 other => panic!("expected error event, got {other:?}"),
295 }
296 }
297
298 #[test]
299 fn converts_non_special_entries_into_log_events() {
300 let entry = entry(
301 LogLevel::Warning,
302 "there is no transaction in progress",
303 None,
304 None,
305 );
306
307 let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 3);
308
309 assert!(!event.is_query());
310 assert!(!event.is_error());
311 assert_eq!(event.message(), "there is no transaction in progress");
312 match event.kind {
313 EventKind::Log { level, message } => {
314 assert_eq!(level, LogLevel::Warning);
315 assert_eq!(message, "there is no transaction in progress");
316 }
317 other => panic!("expected log event, got {other:?}"),
318 }
319 }
320
321 #[test]
322 fn normalizes_log_entries_with_stable_source_references() {
323 let entries = vec![
324 entry(LogLevel::Log, "connection received", None, None),
325 entry(
326 LogLevel::Statement,
327 "statement: SELECT 1",
328 Some(1.5),
329 crate::Query::from_sql("SELECT 1").ok(),
330 ),
331 entry(LogLevel::Duration, "duration: 1.5 ms", Some(1.5), None),
332 ];
333
334 let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
335
336 assert_eq!(events.len(), 3);
337 assert_eq!(events[0].event_id, "stderr:0");
338 assert_eq!(events[1].event_id, "stderr:1");
339 assert_eq!(events[2].event_id, "stderr:2");
340 assert_eq!(events[0].source.record_index, 0);
341 assert_eq!(events[1].source.record_index, 1);
342 assert_eq!(events[2].source.record_index, 2);
343 }
344
345 #[test]
346 fn event_ids_include_source_kind_prefixes() {
347 let entry = entry(LogLevel::Log, "checkpoint complete", None, None);
348
349 let csv_event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Csvlog, 4);
350 let json_event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Jsonlog, 5);
351
352 assert_eq!(csv_event.event_id, "csvlog:4");
353 assert_eq!(json_event.event_id, "jsonlog:5");
354 }
355}