Skip to main content

pg_logstats/
events.rs

1//! Normalized event model for PostgreSQL log analysis.
2//!
3//! This layer sits above raw parser output so workflows and analytics do not
4//! depend directly on the legacy `LogEntry` structure.
5
6use crate::{LogEntry, LogLevel, Query};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9
10/// The parser/source format that produced an event.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12pub enum EventSourceKind {
13    Stderr,
14    Csvlog,
15    Jsonlog,
16}
17
18/// Stable pointer back to the raw source record that produced an event.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub struct SourceReference {
21    pub source_kind: EventSourceKind,
22    pub record_index: usize,
23}
24
25/// Identity metadata carried across related PostgreSQL events.
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct SessionIdentity {
28    pub process_id: String,
29    pub user: Option<String>,
30    pub database: Option<String>,
31    pub client_host: Option<String>,
32    pub application_name: Option<String>,
33}
34
35/// Structured statement payload.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct StatementEvent {
38    pub statement: String,
39    pub queries: Vec<Query>,
40    pub duration_ms: Option<f64>,
41}
42
43/// Structured duration payload.
44#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
45pub struct DurationEvent {
46    pub duration_ms: f64,
47}
48
49/// Structured error payload.
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51pub struct ErrorEvent {
52    pub message: String,
53    pub sqlstate: Option<String>,
54}
55
56/// Normalized event kinds for investigation workflows.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub enum EventKind {
59    Statement(StatementEvent),
60    Duration(DurationEvent),
61    Error(ErrorEvent),
62    Log { level: LogLevel, message: String },
63}
64
65/// Normalized PostgreSQL event.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct NormalizedEvent {
68    pub event_id: String,
69    pub timestamp: DateTime<Utc>,
70    pub source: SourceReference,
71    pub session: SessionIdentity,
72    pub queryid: Option<String>,
73    pub kind: EventKind,
74}
75
76impl NormalizedEvent {
77    pub fn from_log_entry(
78        entry: &LogEntry,
79        source_kind: EventSourceKind,
80        record_index: usize,
81    ) -> Self {
82        let source = SourceReference {
83            source_kind,
84            record_index,
85        };
86
87        let session = SessionIdentity {
88            process_id: entry.process_id.clone(),
89            user: entry.user.clone(),
90            database: entry.database.clone(),
91            client_host: entry.client_host.clone(),
92            application_name: entry.application_name.clone(),
93        };
94
95        let kind = if entry.is_query() {
96            EventKind::Statement(StatementEvent {
97                statement: entry
98                    .message
99                    .strip_prefix("statement: ")
100                    .unwrap_or(&entry.message)
101                    .to_string(),
102                queries: entry.queries.clone().unwrap_or_default(),
103                duration_ms: entry.duration,
104            })
105        } else if entry.is_duration() {
106            EventKind::Duration(DurationEvent {
107                duration_ms: entry.duration.unwrap_or(0.0),
108            })
109        } else if entry.is_error() {
110            EventKind::Error(ErrorEvent {
111                message: entry.message.clone(),
112                sqlstate: None,
113            })
114        } else {
115            EventKind::Log {
116                level: entry.message_type.clone(),
117                message: entry.message.clone(),
118            }
119        };
120
121        Self {
122            event_id: format!(
123                "{}:{}",
124                match source_kind {
125                    EventSourceKind::Stderr => "stderr",
126                    EventSourceKind::Csvlog => "csvlog",
127                    EventSourceKind::Jsonlog => "jsonlog",
128                },
129                record_index
130            ),
131            timestamp: entry.timestamp,
132            source,
133            session,
134            queryid: None,
135            kind,
136        }
137    }
138
139    pub fn is_query(&self) -> bool {
140        matches!(self.kind, EventKind::Statement(_))
141    }
142
143    pub fn is_error(&self) -> bool {
144        matches!(self.kind, EventKind::Error(_))
145    }
146
147    pub fn duration_ms(&self) -> Option<f64> {
148        match &self.kind {
149            EventKind::Statement(statement) => statement.duration_ms,
150            EventKind::Duration(duration) => Some(duration.duration_ms),
151            _ => None,
152        }
153    }
154
155    pub fn queries(&self) -> Option<&[Query]> {
156        match &self.kind {
157            EventKind::Statement(statement) => Some(&statement.queries),
158            _ => None,
159        }
160    }
161
162    pub fn normalized_query(&self) -> Option<String> {
163        let queries = self.queries()?;
164        if queries.is_empty() {
165            return None;
166        }
167
168        Some(
169            queries
170                .iter()
171                .map(|query| query.normalized_query.clone())
172                .collect::<Vec<_>>()
173                .join(";"),
174        )
175    }
176
177    pub fn message(&self) -> &str {
178        match &self.kind {
179            EventKind::Statement(statement) => &statement.statement,
180            EventKind::Duration(_) => "",
181            EventKind::Error(error) => &error.message,
182            EventKind::Log { message, .. } => message,
183        }
184    }
185}
186
187pub fn normalize_log_entries(
188    entries: &[LogEntry],
189    source_kind: EventSourceKind,
190) -> Vec<NormalizedEvent> {
191    entries
192        .iter()
193        .enumerate()
194        .map(|(record_index, entry)| {
195            NormalizedEvent::from_log_entry(entry, source_kind, record_index)
196        })
197        .collect()
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use crate::{LogEntry, LogLevel};
204    use chrono::TimeZone;
205
206    fn entry(
207        message_type: LogLevel,
208        message: &str,
209        duration: Option<f64>,
210        queries: Option<Vec<Query>>,
211    ) -> LogEntry {
212        LogEntry {
213            timestamp: Utc.with_ymd_and_hms(2024, 8, 15, 10, 30, 0).unwrap(),
214            process_id: "12345".to_string(),
215            user: Some("postgres".to_string()),
216            database: Some("testdb".to_string()),
217            client_host: Some("10.0.0.10".to_string()),
218            application_name: Some("psql".to_string()),
219            message_type,
220            message: message.to_string(),
221            queries,
222            duration,
223        }
224    }
225
226    #[test]
227    fn converts_statement_entries_into_normalized_events() {
228        let entry = entry(
229            LogLevel::Statement,
230            "statement: SELECT * FROM users WHERE id = 1",
231            Some(42.0),
232            crate::Query::from_sql("SELECT * FROM users WHERE id = 1").ok(),
233        );
234
235        let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 7);
236
237        assert_eq!(event.event_id, "stderr:7");
238        assert_eq!(event.source.record_index, 7);
239        assert_eq!(event.source.source_kind, EventSourceKind::Stderr);
240        assert_eq!(event.session.process_id, "12345");
241        assert_eq!(event.session.user.as_deref(), Some("postgres"));
242        assert_eq!(event.session.database.as_deref(), Some("testdb"));
243        assert_eq!(event.session.client_host.as_deref(), Some("10.0.0.10"));
244        assert_eq!(event.session.application_name.as_deref(), Some("psql"));
245        assert!(event.is_query());
246        assert_eq!(event.duration_ms(), Some(42.0));
247        assert_eq!(event.message(), "SELECT * FROM users WHERE id = 1");
248        assert_eq!(
249            event.normalized_query().as_deref(),
250            Some("SELECT * FROM users WHERE id = ?")
251        );
252    }
253
254    #[test]
255    fn converts_duration_entries_into_duration_events() {
256        let entry = entry(
257            LogLevel::Duration,
258            "duration: 15.234 ms",
259            Some(15.234),
260            None,
261        );
262
263        let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 1);
264
265        assert_eq!(event.event_id, "stderr:1");
266        assert!(!event.is_query());
267        assert_eq!(event.duration_ms(), Some(15.234));
268        assert!(matches!(
269            event.kind,
270            EventKind::Duration(DurationEvent {
271                duration_ms: 15.234
272            })
273        ));
274    }
275
276    #[test]
277    fn converts_error_entries_into_error_events() {
278        let entry = entry(
279            LogLevel::Error,
280            "relation \"missing_table\" does not exist",
281            None,
282            None,
283        );
284
285        let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 2);
286
287        assert!(event.is_error());
288        assert_eq!(event.message(), "relation \"missing_table\" does not exist");
289        match event.kind {
290            EventKind::Error(error) => {
291                assert_eq!(error.message, "relation \"missing_table\" does not exist");
292                assert_eq!(error.sqlstate, None);
293            }
294            other => panic!("expected error event, got {other:?}"),
295        }
296    }
297
298    #[test]
299    fn converts_non_special_entries_into_log_events() {
300        let entry = entry(
301            LogLevel::Warning,
302            "there is no transaction in progress",
303            None,
304            None,
305        );
306
307        let event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Stderr, 3);
308
309        assert!(!event.is_query());
310        assert!(!event.is_error());
311        assert_eq!(event.message(), "there is no transaction in progress");
312        match event.kind {
313            EventKind::Log { level, message } => {
314                assert_eq!(level, LogLevel::Warning);
315                assert_eq!(message, "there is no transaction in progress");
316            }
317            other => panic!("expected log event, got {other:?}"),
318        }
319    }
320
321    #[test]
322    fn normalizes_log_entries_with_stable_source_references() {
323        let entries = vec![
324            entry(LogLevel::Log, "connection received", None, None),
325            entry(
326                LogLevel::Statement,
327                "statement: SELECT 1",
328                Some(1.5),
329                crate::Query::from_sql("SELECT 1").ok(),
330            ),
331            entry(LogLevel::Duration, "duration: 1.5 ms", Some(1.5), None),
332        ];
333
334        let events = normalize_log_entries(&entries, EventSourceKind::Stderr);
335
336        assert_eq!(events.len(), 3);
337        assert_eq!(events[0].event_id, "stderr:0");
338        assert_eq!(events[1].event_id, "stderr:1");
339        assert_eq!(events[2].event_id, "stderr:2");
340        assert_eq!(events[0].source.record_index, 0);
341        assert_eq!(events[1].source.record_index, 1);
342        assert_eq!(events[2].source.record_index, 2);
343    }
344
345    #[test]
346    fn event_ids_include_source_kind_prefixes() {
347        let entry = entry(LogLevel::Log, "checkpoint complete", None, None);
348
349        let csv_event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Csvlog, 4);
350        let json_event = NormalizedEvent::from_log_entry(&entry, EventSourceKind::Jsonlog, 5);
351
352        assert_eq!(csv_event.event_id, "csvlog:4");
353        assert_eq!(json_event.event_id, "jsonlog:5");
354    }
355}