1use crate::{EventKind, NormalizedEvent, Query, SessionIdentity, SourceReference, StatementEvent};
9use chrono::{DateTime, Utc};
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
15pub struct QueryFamilyIdentity {
16 pub family_id: String,
17 pub normalized_sql: String,
18 pub database: Option<String>,
19 pub user: Option<String>,
20 pub application_name: Option<String>,
21 pub queryid: Option<String>,
22}
23
24impl QueryFamilyIdentity {
25 pub fn new(normalized_sql: String, session: &SessionIdentity, queryid: Option<String>) -> Self {
26 let family_id = format!(
27 "queryid={}|db={}|user={}|app={}|sql={}",
28 queryid.as_deref().unwrap_or(""),
29 session.database.as_deref().unwrap_or(""),
30 session.user.as_deref().unwrap_or(""),
31 session.application_name.as_deref().unwrap_or(""),
32 normalized_sql
33 );
34
35 Self {
36 family_id,
37 normalized_sql,
38 database: session.database.clone(),
39 user: session.user.clone(),
40 application_name: session.application_name.clone(),
41 queryid,
42 }
43 }
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
48pub enum CorrelationConfidence {
49 Exact,
50 StatementOnly,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct QueryExecution {
56 pub execution_id: String,
57 pub timestamp: DateTime<Utc>,
58 pub session: SessionIdentity,
59 pub statement: String,
60 pub queries: Vec<Query>,
61 pub query_family: QueryFamilyIdentity,
62 pub duration_ms: Option<f64>,
63 pub evidence: Vec<SourceReference>,
64 pub confidence: CorrelationConfidence,
65}
66
67#[derive(Debug, Clone)]
68struct PendingStatement {
69 event_id: String,
70 timestamp: DateTime<Utc>,
71 source: SourceReference,
72 session: SessionIdentity,
73 queryid: Option<String>,
74 statement: StatementEvent,
75}
76
77pub trait Correlator {
79 fn correlate(&self, events: &[NormalizedEvent]) -> Vec<QueryExecution>;
80}
81
82#[derive(Debug, Clone, Copy, Default)]
88pub struct ProcessOrderCorrelator;
89
90impl Correlator for ProcessOrderCorrelator {
91 fn correlate(&self, events: &[NormalizedEvent]) -> Vec<QueryExecution> {
92 correlate_by_process_order(events)
93 }
94}
95
96pub fn correlate_query_executions(events: &[NormalizedEvent]) -> Vec<QueryExecution> {
99 ProcessOrderCorrelator.correlate(events)
100}
101
102fn correlate_by_process_order(events: &[NormalizedEvent]) -> Vec<QueryExecution> {
103 let mut executions = Vec::new();
104 let mut pending_by_process: HashMap<String, PendingStatement> = HashMap::new();
105
106 for event in events {
107 match &event.kind {
108 EventKind::Statement(statement) => {
109 if let Some(previous) = pending_by_process.remove(&event.session.process_id) {
110 executions.push(execution_from_pending(
111 previous,
112 None,
113 None,
114 CorrelationConfidence::StatementOnly,
115 ));
116 }
117
118 if let Some(duration_ms) = statement.duration_ms {
119 executions.push(execution_from_statement_event(
120 event,
121 statement,
122 Some(duration_ms),
123 vec![event.source.clone()],
124 CorrelationConfidence::Exact,
125 ));
126 } else {
127 pending_by_process.insert(
128 event.session.process_id.clone(),
129 PendingStatement {
130 event_id: event.event_id.clone(),
131 timestamp: event.timestamp,
132 source: event.source.clone(),
133 session: event.session.clone(),
134 queryid: event.queryid.clone(),
135 statement: statement.clone(),
136 },
137 );
138 }
139 }
140 EventKind::Duration(duration) => {
141 if let Some(pending) = pending_by_process.remove(&event.session.process_id) {
142 if event.timestamp >= pending.timestamp {
143 executions.push(execution_from_pending(
144 pending,
145 Some(duration.duration_ms),
146 Some(event.source.clone()),
147 CorrelationConfidence::Exact,
148 ));
149 } else {
150 pending_by_process.insert(event.session.process_id.clone(), pending);
151 }
152 }
153 }
154 _ => {}
155 }
156 }
157
158 let mut remaining: Vec<_> = pending_by_process.into_values().collect();
159 remaining.sort_by_key(|pending| pending.timestamp);
160 for pending in remaining {
161 executions.push(execution_from_pending(
162 pending,
163 None,
164 None,
165 CorrelationConfidence::StatementOnly,
166 ));
167 }
168
169 executions.sort_by_key(|execution| execution.timestamp);
170 executions
171}
172
173fn execution_from_pending(
174 pending: PendingStatement,
175 duration_ms: Option<f64>,
176 duration_source: Option<SourceReference>,
177 confidence: CorrelationConfidence,
178) -> QueryExecution {
179 let mut evidence = vec![pending.source];
180 if let Some(duration_source) = duration_source {
181 evidence.push(duration_source);
182 }
183
184 let normalized_sql = normalized_sql(&pending.statement);
185 let query_family = QueryFamilyIdentity::new(normalized_sql, &pending.session, pending.queryid);
186
187 QueryExecution {
188 execution_id: pending.event_id,
189 timestamp: pending.timestamp,
190 session: pending.session,
191 statement: pending.statement.statement,
192 queries: pending.statement.queries,
193 query_family,
194 duration_ms,
195 evidence,
196 confidence,
197 }
198}
199
200fn execution_from_statement_event(
201 event: &NormalizedEvent,
202 statement: &StatementEvent,
203 duration_ms: Option<f64>,
204 evidence: Vec<SourceReference>,
205 confidence: CorrelationConfidence,
206) -> QueryExecution {
207 let normalized_sql = normalized_sql(statement);
208 let query_family =
209 QueryFamilyIdentity::new(normalized_sql, &event.session, event.queryid.clone());
210
211 QueryExecution {
212 execution_id: event.event_id.clone(),
213 timestamp: event.timestamp,
214 session: event.session.clone(),
215 statement: statement.statement.clone(),
216 queries: statement.queries.clone(),
217 query_family,
218 duration_ms,
219 evidence,
220 confidence,
221 }
222}
223
224fn normalized_sql(statement: &StatementEvent) -> String {
225 if statement.queries.is_empty() {
226 statement.statement.clone()
227 } else {
228 statement
229 .queries
230 .iter()
231 .map(|query| query.normalized_query.clone())
232 .collect::<Vec<_>>()
233 .join(";")
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::{
241 DurationEvent, EventKind, EventSourceKind, NormalizedEvent, Query, SourceReference,
242 };
243 use chrono::{Duration, TimeZone};
244
245 fn session(process_id: &str, database: &str) -> SessionIdentity {
246 SessionIdentity {
247 process_id: process_id.to_string(),
248 user: Some("postgres".to_string()),
249 database: Some(database.to_string()),
250 client_host: None,
251 application_name: Some("psql".to_string()),
252 }
253 }
254
255 fn statement_event(index: usize, process_id: &str, sql: &str) -> NormalizedEvent {
256 NormalizedEvent {
257 event_id: format!("stderr:{index}"),
258 timestamp: Utc.with_ymd_and_hms(2024, 8, 15, 10, 30, 0).unwrap()
259 + Duration::milliseconds(index as i64),
260 source: SourceReference {
261 source_kind: EventSourceKind::Stderr,
262 record_index: index,
263 },
264 session: session(process_id, "testdb"),
265 queryid: None,
266 kind: EventKind::Statement(StatementEvent {
267 statement: sql.to_string(),
268 queries: Query::from_sql(sql).unwrap(),
269 duration_ms: None,
270 }),
271 }
272 }
273
274 fn duration_event(index: usize, process_id: &str, duration_ms: f64) -> NormalizedEvent {
275 NormalizedEvent {
276 event_id: format!("stderr:{index}"),
277 timestamp: Utc.with_ymd_and_hms(2024, 8, 15, 10, 30, 0).unwrap()
278 + Duration::milliseconds(index as i64),
279 source: SourceReference {
280 source_kind: EventSourceKind::Stderr,
281 record_index: index,
282 },
283 session: session(process_id, "testdb"),
284 queryid: None,
285 kind: EventKind::Duration(DurationEvent { duration_ms }),
286 }
287 }
288
289 #[test]
290 fn pairs_statement_with_following_duration_on_same_process() {
291 let events = vec![
292 statement_event(0, "12345", "SELECT * FROM users WHERE id = 1"),
293 duration_event(1, "12345", 42.5),
294 ];
295
296 let executions = ProcessOrderCorrelator.correlate(&events);
297
298 assert_eq!(executions.len(), 1);
299 assert_eq!(executions[0].duration_ms, Some(42.5));
300 assert_eq!(executions[0].confidence, CorrelationConfidence::Exact);
301 assert_eq!(executions[0].evidence.len(), 2);
302 assert_eq!(executions[0].evidence[0].record_index, 0);
303 assert_eq!(executions[0].evidence[1].record_index, 1);
304 }
305
306 #[test]
307 fn default_correlation_function_uses_process_order_strategy() {
308 let events = vec![
309 statement_event(0, "12345", "SELECT * FROM users WHERE id = 1"),
310 duration_event(1, "12345", 42.5),
311 ];
312
313 let via_function = correlate_query_executions(&events);
314 let via_strategy = ProcessOrderCorrelator.correlate(&events);
315
316 assert_eq!(via_function.len(), via_strategy.len());
317 assert_eq!(via_function[0].duration_ms, via_strategy[0].duration_ms);
318 assert_eq!(via_function[0].evidence, via_strategy[0].evidence);
319 }
320
321 #[test]
322 fn does_not_pair_duration_from_another_process() {
323 let events = vec![
324 statement_event(0, "11111", "SELECT * FROM users WHERE id = 1"),
325 duration_event(1, "22222", 42.5),
326 ];
327
328 let executions = correlate_query_executions(&events);
329
330 assert_eq!(executions.len(), 1);
331 assert_eq!(executions[0].duration_ms, None);
332 assert_eq!(
333 executions[0].confidence,
334 CorrelationConfidence::StatementOnly
335 );
336 assert_eq!(executions[0].evidence.len(), 1);
337 }
338
339 #[test]
340 fn flushes_previous_pending_statement_when_same_process_starts_new_statement() {
341 let events = vec![
342 statement_event(0, "12345", "SELECT * FROM users WHERE id = 1"),
343 statement_event(1, "12345", "SELECT * FROM posts WHERE id = 2"),
344 duration_event(2, "12345", 12.0),
345 ];
346
347 let executions = correlate_query_executions(&events);
348
349 assert_eq!(executions.len(), 2);
350 assert_eq!(executions[0].duration_ms, None);
351 assert_eq!(
352 executions[0].confidence,
353 CorrelationConfidence::StatementOnly
354 );
355 assert_eq!(executions[1].duration_ms, Some(12.0));
356 assert_eq!(executions[1].confidence, CorrelationConfidence::Exact);
357 }
358
359 #[test]
360 fn query_family_identity_includes_normalized_sql_and_metadata() {
361 let mut event = statement_event(0, "12345", "SELECT * FROM users WHERE id = 1");
362 event.session.database = Some("analytics".to_string());
363 event.session.user = Some("reporter".to_string());
364 event.session.application_name = Some("dashboard".to_string());
365 let events = vec![event, duration_event(1, "12345", 5.0)];
366
367 let executions = correlate_query_executions(&events);
368 let family = &executions[0].query_family;
369
370 assert_eq!(family.normalized_sql, "SELECT * FROM users WHERE id = ?");
371 assert_eq!(family.database.as_deref(), Some("analytics"));
372 assert_eq!(family.user.as_deref(), Some("reporter"));
373 assert_eq!(family.application_name.as_deref(), Some("dashboard"));
374 assert_eq!(
375 family.family_id,
376 "queryid=|db=analytics|user=reporter|app=dashboard|sql=SELECT * FROM users WHERE id = ?"
377 );
378 }
379}