Skip to main content

pg_logstats/
correlation.rs

1//! Correlation layer for normalized PostgreSQL events.
2//!
3//! The first correlation slice pairs statement events with following duration
4//! events from the same backend process. This is intentionally modeled as a
5//! strategy so structured log implementations can later use stronger keys such
6//! as session ID and per-session line number.
7
8use crate::{EventKind, NormalizedEvent, Query, SessionIdentity, SourceReference, StatementEvent};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13/// A deterministic grouping key for related executions of the same query shape.
14#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
15pub struct QueryFamilyIdentity {
16    pub family_id: String,
17    pub normalized_sql: String,
18    pub database: Option<String>,
19    pub user: Option<String>,
20    pub application_name: Option<String>,
21    pub queryid: Option<String>,
22}
23
24impl QueryFamilyIdentity {
25    pub fn new(normalized_sql: String, session: &SessionIdentity, queryid: Option<String>) -> Self {
26        let family_id = format!(
27            "queryid={}|db={}|user={}|app={}|sql={}",
28            queryid.as_deref().unwrap_or(""),
29            session.database.as_deref().unwrap_or(""),
30            session.user.as_deref().unwrap_or(""),
31            session.application_name.as_deref().unwrap_or(""),
32            normalized_sql
33        );
34
35        Self {
36            family_id,
37            normalized_sql,
38            database: session.database.clone(),
39            user: session.user.clone(),
40            application_name: session.application_name.clone(),
41            queryid,
42        }
43    }
44}
45
46/// How confidently the execution was reconstructed from raw events.
47#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48pub enum CorrelationConfidence {
49    Exact,
50    StatementOnly,
51}
52
53/// A correlated query execution suitable for analytics and findings.
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct QueryExecution {
56    pub execution_id: String,
57    pub timestamp: DateTime<Utc>,
58    pub session: SessionIdentity,
59    pub statement: String,
60    pub queries: Vec<Query>,
61    pub query_family: QueryFamilyIdentity,
62    pub duration_ms: Option<f64>,
63    pub evidence: Vec<SourceReference>,
64    pub confidence: CorrelationConfidence,
65}
66
67#[derive(Debug, Clone)]
68struct PendingStatement {
69    event_id: String,
70    timestamp: DateTime<Utc>,
71    source: SourceReference,
72    session: SessionIdentity,
73    queryid: Option<String>,
74    statement: StatementEvent,
75}
76
77/// Strategy interface for reconstructing higher-level query executions.
78pub trait Correlator {
79    fn correlate(&self, events: &[NormalizedEvent]) -> Vec<QueryExecution>;
80}
81
82/// Correlates statement and duration events by process ID and stream order.
83///
84/// This fits current stderr parsing and remains a fallback for structured logs
85/// that do not provide session line numbers. Future csvlog/jsonlog/RDS/GCP
86/// implementations should prefer session-aware keys where available.
87#[derive(Debug, Clone, Copy, Default)]
88pub struct ProcessOrderCorrelator;
89
90impl Correlator for ProcessOrderCorrelator {
91    fn correlate(&self, events: &[NormalizedEvent]) -> Vec<QueryExecution> {
92        correlate_by_process_order(events)
93    }
94}
95
96/// Pair statement and duration events into query executions using the default
97/// process-order strategy.
98pub fn correlate_query_executions(events: &[NormalizedEvent]) -> Vec<QueryExecution> {
99    ProcessOrderCorrelator.correlate(events)
100}
101
102fn correlate_by_process_order(events: &[NormalizedEvent]) -> Vec<QueryExecution> {
103    let mut executions = Vec::new();
104    let mut pending_by_process: HashMap<String, PendingStatement> = HashMap::new();
105
106    for event in events {
107        match &event.kind {
108            EventKind::Statement(statement) => {
109                if let Some(previous) = pending_by_process.remove(&event.session.process_id) {
110                    executions.push(execution_from_pending(
111                        previous,
112                        None,
113                        None,
114                        CorrelationConfidence::StatementOnly,
115                    ));
116                }
117
118                if let Some(duration_ms) = statement.duration_ms {
119                    executions.push(execution_from_statement_event(
120                        event,
121                        statement,
122                        Some(duration_ms),
123                        vec![event.source.clone()],
124                        CorrelationConfidence::Exact,
125                    ));
126                } else {
127                    pending_by_process.insert(
128                        event.session.process_id.clone(),
129                        PendingStatement {
130                            event_id: event.event_id.clone(),
131                            timestamp: event.timestamp,
132                            source: event.source.clone(),
133                            session: event.session.clone(),
134                            queryid: event.queryid.clone(),
135                            statement: statement.clone(),
136                        },
137                    );
138                }
139            }
140            EventKind::Duration(duration) => {
141                if let Some(pending) = pending_by_process.remove(&event.session.process_id) {
142                    if event.timestamp >= pending.timestamp {
143                        executions.push(execution_from_pending(
144                            pending,
145                            Some(duration.duration_ms),
146                            Some(event.source.clone()),
147                            CorrelationConfidence::Exact,
148                        ));
149                    } else {
150                        pending_by_process.insert(event.session.process_id.clone(), pending);
151                    }
152                }
153            }
154            _ => {}
155        }
156    }
157
158    let mut remaining: Vec<_> = pending_by_process.into_values().collect();
159    remaining.sort_by_key(|pending| pending.timestamp);
160    for pending in remaining {
161        executions.push(execution_from_pending(
162            pending,
163            None,
164            None,
165            CorrelationConfidence::StatementOnly,
166        ));
167    }
168
169    executions.sort_by_key(|execution| execution.timestamp);
170    executions
171}
172
173fn execution_from_pending(
174    pending: PendingStatement,
175    duration_ms: Option<f64>,
176    duration_source: Option<SourceReference>,
177    confidence: CorrelationConfidence,
178) -> QueryExecution {
179    let mut evidence = vec![pending.source];
180    if let Some(duration_source) = duration_source {
181        evidence.push(duration_source);
182    }
183
184    let normalized_sql = normalized_sql(&pending.statement);
185    let query_family = QueryFamilyIdentity::new(normalized_sql, &pending.session, pending.queryid);
186
187    QueryExecution {
188        execution_id: pending.event_id,
189        timestamp: pending.timestamp,
190        session: pending.session,
191        statement: pending.statement.statement,
192        queries: pending.statement.queries,
193        query_family,
194        duration_ms,
195        evidence,
196        confidence,
197    }
198}
199
200fn execution_from_statement_event(
201    event: &NormalizedEvent,
202    statement: &StatementEvent,
203    duration_ms: Option<f64>,
204    evidence: Vec<SourceReference>,
205    confidence: CorrelationConfidence,
206) -> QueryExecution {
207    let normalized_sql = normalized_sql(statement);
208    let query_family =
209        QueryFamilyIdentity::new(normalized_sql, &event.session, event.queryid.clone());
210
211    QueryExecution {
212        execution_id: event.event_id.clone(),
213        timestamp: event.timestamp,
214        session: event.session.clone(),
215        statement: statement.statement.clone(),
216        queries: statement.queries.clone(),
217        query_family,
218        duration_ms,
219        evidence,
220        confidence,
221    }
222}
223
224fn normalized_sql(statement: &StatementEvent) -> String {
225    if statement.queries.is_empty() {
226        statement.statement.clone()
227    } else {
228        statement
229            .queries
230            .iter()
231            .map(|query| query.normalized_query.clone())
232            .collect::<Vec<_>>()
233            .join(";")
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::{
241        DurationEvent, EventKind, EventSourceKind, NormalizedEvent, Query, SourceReference,
242    };
243    use chrono::{Duration, TimeZone};
244
245    fn session(process_id: &str, database: &str) -> SessionIdentity {
246        SessionIdentity {
247            process_id: process_id.to_string(),
248            user: Some("postgres".to_string()),
249            database: Some(database.to_string()),
250            client_host: None,
251            application_name: Some("psql".to_string()),
252        }
253    }
254
255    fn statement_event(index: usize, process_id: &str, sql: &str) -> NormalizedEvent {
256        NormalizedEvent {
257            event_id: format!("stderr:{index}"),
258            timestamp: Utc.with_ymd_and_hms(2024, 8, 15, 10, 30, 0).unwrap()
259                + Duration::milliseconds(index as i64),
260            source: SourceReference {
261                source_kind: EventSourceKind::Stderr,
262                record_index: index,
263            },
264            session: session(process_id, "testdb"),
265            queryid: None,
266            kind: EventKind::Statement(StatementEvent {
267                statement: sql.to_string(),
268                queries: Query::from_sql(sql).unwrap(),
269                duration_ms: None,
270            }),
271        }
272    }
273
274    fn duration_event(index: usize, process_id: &str, duration_ms: f64) -> NormalizedEvent {
275        NormalizedEvent {
276            event_id: format!("stderr:{index}"),
277            timestamp: Utc.with_ymd_and_hms(2024, 8, 15, 10, 30, 0).unwrap()
278                + Duration::milliseconds(index as i64),
279            source: SourceReference {
280                source_kind: EventSourceKind::Stderr,
281                record_index: index,
282            },
283            session: session(process_id, "testdb"),
284            queryid: None,
285            kind: EventKind::Duration(DurationEvent { duration_ms }),
286        }
287    }
288
289    #[test]
290    fn pairs_statement_with_following_duration_on_same_process() {
291        let events = vec![
292            statement_event(0, "12345", "SELECT * FROM users WHERE id = 1"),
293            duration_event(1, "12345", 42.5),
294        ];
295
296        let executions = ProcessOrderCorrelator.correlate(&events);
297
298        assert_eq!(executions.len(), 1);
299        assert_eq!(executions[0].duration_ms, Some(42.5));
300        assert_eq!(executions[0].confidence, CorrelationConfidence::Exact);
301        assert_eq!(executions[0].evidence.len(), 2);
302        assert_eq!(executions[0].evidence[0].record_index, 0);
303        assert_eq!(executions[0].evidence[1].record_index, 1);
304    }
305
306    #[test]
307    fn default_correlation_function_uses_process_order_strategy() {
308        let events = vec![
309            statement_event(0, "12345", "SELECT * FROM users WHERE id = 1"),
310            duration_event(1, "12345", 42.5),
311        ];
312
313        let via_function = correlate_query_executions(&events);
314        let via_strategy = ProcessOrderCorrelator.correlate(&events);
315
316        assert_eq!(via_function.len(), via_strategy.len());
317        assert_eq!(via_function[0].duration_ms, via_strategy[0].duration_ms);
318        assert_eq!(via_function[0].evidence, via_strategy[0].evidence);
319    }
320
321    #[test]
322    fn does_not_pair_duration_from_another_process() {
323        let events = vec![
324            statement_event(0, "11111", "SELECT * FROM users WHERE id = 1"),
325            duration_event(1, "22222", 42.5),
326        ];
327
328        let executions = correlate_query_executions(&events);
329
330        assert_eq!(executions.len(), 1);
331        assert_eq!(executions[0].duration_ms, None);
332        assert_eq!(
333            executions[0].confidence,
334            CorrelationConfidence::StatementOnly
335        );
336        assert_eq!(executions[0].evidence.len(), 1);
337    }
338
339    #[test]
340    fn flushes_previous_pending_statement_when_same_process_starts_new_statement() {
341        let events = vec![
342            statement_event(0, "12345", "SELECT * FROM users WHERE id = 1"),
343            statement_event(1, "12345", "SELECT * FROM posts WHERE id = 2"),
344            duration_event(2, "12345", 12.0),
345        ];
346
347        let executions = correlate_query_executions(&events);
348
349        assert_eq!(executions.len(), 2);
350        assert_eq!(executions[0].duration_ms, None);
351        assert_eq!(
352            executions[0].confidence,
353            CorrelationConfidence::StatementOnly
354        );
355        assert_eq!(executions[1].duration_ms, Some(12.0));
356        assert_eq!(executions[1].confidence, CorrelationConfidence::Exact);
357    }
358
359    #[test]
360    fn query_family_identity_includes_normalized_sql_and_metadata() {
361        let mut event = statement_event(0, "12345", "SELECT * FROM users WHERE id = 1");
362        event.session.database = Some("analytics".to_string());
363        event.session.user = Some("reporter".to_string());
364        event.session.application_name = Some("dashboard".to_string());
365        let events = vec![event, duration_event(1, "12345", 5.0)];
366
367        let executions = correlate_query_executions(&events);
368        let family = &executions[0].query_family;
369
370        assert_eq!(family.normalized_sql, "SELECT * FROM users WHERE id = ?");
371        assert_eq!(family.database.as_deref(), Some("analytics"));
372        assert_eq!(family.user.as_deref(), Some("reporter"));
373        assert_eq!(family.application_name.as_deref(), Some("dashboard"));
374        assert_eq!(
375            family.family_id,
376            "queryid=|db=analytics|user=reporter|app=dashboard|sql=SELECT * FROM users WHERE id = ?"
377        );
378    }
379}