1use async_trait::async_trait;
8use datafusion::arrow::array::{
9 Array, ArrayRef, Int64Array, StringArray, TimestampMillisecondArray,
10};
11use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
12use datafusion::arrow::record_batch::RecordBatch;
13use datafusion::datasource::MemTable;
14use datafusion::execution::context::SessionConfig;
15use datafusion::prelude::*;
16use std::collections::HashMap;
17use std::sync::Arc;
18use tracing::{debug, instrument};
19
20use super::{MetricsQuery, MetricsRepository, ResultKey};
21use crate::analyzers::context::AnalyzerContext;
22use crate::error::{Result, TermError};
23
24pub struct DataFusionQueryExecutor;
54
55impl DataFusionQueryExecutor {
56 fn create_optimized_context() -> SessionContext {
58 let config = SessionConfig::new()
59 .with_information_schema(true)
60 .with_default_catalog_and_schema("term", "repository")
61 .with_target_partitions(num_cpus::get())
63 .with_batch_size(8192); SessionContext::new_with_config(config)
66 }
67
68 #[instrument(skip(data, tags, analyzers), fields(
85 data_size = data.len(),
86 filter_count = tags.len() + analyzers.as_ref().map(|a| a.len()).unwrap_or(0),
87 time_range = format_args!("{:?}-{:?}", after, before),
88 limit = limit,
89 offset = offset
90 ))]
91 #[allow(clippy::too_many_arguments)]
92 pub async fn execute_optimized_query(
93 data: Vec<(ResultKey, AnalyzerContext)>,
94 before: Option<i64>,
95 after: Option<i64>,
96 tags: &HashMap<String, String>,
97 analyzers: &Option<Vec<String>>,
98 limit: Option<usize>,
99 offset: Option<usize>,
100 ascending: bool,
101 ) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
102 if data.is_empty() {
103 debug!("No data to query, returning empty results");
104 return Ok(vec![]);
105 }
106
107 let ctx = Self::create_optimized_context();
109
110 let record_batch = Self::create_record_batch(&data).map_err(|e| {
112 TermError::repository_with_source(
113 "datafusion",
114 "execute_query",
115 "Failed to convert repository data to Arrow format",
116 Box::new(e),
117 )
118 })?;
119 let schema = record_batch.schema();
120
121 let table = MemTable::try_new(schema, vec![vec![record_batch]]).map_err(|e| {
123 TermError::repository_with_source(
124 "datafusion",
125 "execute_query",
126 "Failed to create DataFusion table from Arrow data",
127 Box::new(e),
128 )
129 })?;
130
131 let table_name = "metrics_data";
133 ctx.register_table(table_name, Arc::new(table))
134 .map_err(|e| {
135 TermError::repository_with_source(
136 "datafusion",
137 "execute_query",
138 "Failed to register table with DataFusion context",
139 Box::new(e),
140 )
141 })?;
142
143 let sql = Self::build_optimized_sql(
145 table_name, before, after, tags, analyzers, limit, offset, ascending,
146 )?;
147
148 debug!("Executing DataFusion query: {}", sql);
149
150 let df = ctx.sql(&sql).await.map_err(|e| {
152 TermError::repository_with_source(
153 "datafusion",
154 "execute_query",
155 format!("Failed to parse SQL query: {sql}"),
156 Box::new(e),
157 )
158 })?;
159
160 let results = df.collect().await.map_err(|e| {
161 TermError::repository_with_source(
162 "datafusion",
163 "execute_query",
164 "Failed to execute DataFusion query",
165 Box::new(e),
166 )
167 })?;
168
169 Self::convert_results_back(&data, results).await
171 }
172
173 #[instrument(skip(data), fields(data_size = data.len()))]
178 fn create_record_batch(data: &[(ResultKey, AnalyzerContext)]) -> Result<RecordBatch> {
179 let len = data.len();
180
181 let timestamps: Vec<i64> = data.iter().map(|(key, _)| key.timestamp).collect();
183 let timestamp_array = TimestampMillisecondArray::from(timestamps);
184
185 let indices: Vec<i64> = (0..len as i64).collect();
187 let index_array = Int64Array::from(indices);
188
189 let mut all_tag_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
191 for (key, _) in data {
192 for tag_key in key.tags.keys() {
193 all_tag_keys.insert(tag_key.clone());
194 }
195 }
196
197 let mut tag_arrays: Vec<(String, ArrayRef)> = Vec::new();
199 for tag_key in &all_tag_keys {
200 let tag_values: Vec<Option<String>> = data
201 .iter()
202 .map(|(key, _)| key.tags.get(tag_key).cloned())
203 .collect();
204 let tag_array = StringArray::from(tag_values);
205 tag_arrays.push((format!("tag_{tag_key}"), Arc::new(tag_array) as ArrayRef));
206 }
207
208 let has_metrics: Vec<bool> = data
210 .iter()
211 .map(|(_, ctx)| !ctx.all_metrics().is_empty())
212 .collect();
213 let metrics_array = Arc::new(
214 has_metrics
215 .iter()
216 .map(|&has| if has { Some("true") } else { Some("false") })
217 .collect::<StringArray>(),
218 ) as ArrayRef;
219
220 let mut fields = vec![
222 Field::new(
223 "timestamp",
224 DataType::Timestamp(TimeUnit::Millisecond, None),
225 false,
226 ),
227 Field::new("row_index", DataType::Int64, false),
228 Field::new("has_metrics", DataType::Utf8, true),
229 ];
230
231 for (tag_key, _) in &tag_arrays {
232 fields.push(Field::new(tag_key, DataType::Utf8, true));
233 }
234
235 let schema = Arc::new(Schema::new(fields));
236
237 let mut columns: Vec<ArrayRef> = vec![
239 Arc::new(timestamp_array) as ArrayRef,
240 Arc::new(index_array) as ArrayRef,
241 metrics_array,
242 ];
243
244 for (_, array) in tag_arrays {
245 columns.push(array);
246 }
247
248 RecordBatch::try_new(schema, columns).map_err(|e| {
249 TermError::repository_with_source(
250 "datafusion",
251 "create_record_batch",
252 format!("Failed to create Arrow RecordBatch for {len} rows"),
253 Box::new(e),
254 )
255 })
256 }
257
258 #[instrument(skip(tags, analyzers), fields(
260 has_time_filter = before.is_some() || after.is_some(),
261 tag_filter_count = tags.len(),
262 has_analyzer_filter = analyzers.is_some()
263 ))]
264 #[allow(clippy::too_many_arguments)]
265 fn build_optimized_sql(
266 table_name: &str,
267 before: Option<i64>,
268 after: Option<i64>,
269 tags: &HashMap<String, String>,
270 analyzers: &Option<Vec<String>>,
271 limit: Option<usize>,
272 offset: Option<usize>,
273 ascending: bool,
274 ) -> Result<String> {
275 let mut sql = format!("SELECT * FROM {table_name} WHERE 1=1");
276
277 if let Some(before_ts) = before {
279 sql.push_str(&format!(
280 " AND timestamp < TIMESTAMP '{}'",
281 chrono::DateTime::from_timestamp_millis(before_ts)
282 .unwrap_or_else(chrono::Utc::now)
283 .format("%Y-%m-%d %H:%M:%S%.3f")
284 ));
285 }
286 if let Some(after_ts) = after {
287 sql.push_str(&format!(
288 " AND timestamp >= TIMESTAMP '{}'",
289 chrono::DateTime::from_timestamp_millis(after_ts)
290 .unwrap_or_else(chrono::Utc::now)
291 .format("%Y-%m-%d %H:%M:%S%.3f")
292 ));
293 }
294
295 for (tag_key, tag_value) in tags {
297 let safe_key = tag_key.replace(['\'', '"'], "_"); let safe_value = tag_value.replace(['\'', '"'], "_");
299 sql.push_str(&format!(" AND tag_{safe_key} = '{safe_value}'"));
300 }
301
302 if let Some(_analyzer_list) = analyzers {
304 sql.push_str(" AND has_metrics = 'true'");
307 }
308
309 let sort_direction = if ascending { "ASC" } else { "DESC" };
311 sql.push_str(&format!(" ORDER BY timestamp {sort_direction}"));
312
313 if let Some(limit_val) = limit {
315 sql.push_str(&format!(" LIMIT {limit_val}"));
316 if let Some(offset_val) = offset {
317 sql.push_str(&format!(" OFFSET {offset_val}"));
318 }
319 }
320
321 Ok(sql)
322 }
323
324 #[instrument(skip(original_data, results))]
326 async fn convert_results_back(
327 original_data: &[(ResultKey, AnalyzerContext)],
328 results: Vec<RecordBatch>,
329 ) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
330 let mut output = Vec::new();
331
332 for batch in results {
333 let row_indices = batch
334 .column_by_name("row_index")
335 .ok_or_else(|| TermError::Internal("Missing row_index column".to_string()))?
336 .as_any()
337 .downcast_ref::<Int64Array>()
338 .ok_or_else(|| TermError::Internal("Invalid row_index column type".to_string()))?;
339
340 for row_idx in 0..batch.num_rows() {
341 let original_idx = row_indices.value(row_idx) as usize;
342 if let Some((key, context)) = original_data.get(original_idx) {
343 output.push((key.clone(), context.clone()));
344 }
345 }
346 }
347
348 debug!(
349 "Converted {} DataFusion results back to original format",
350 output.len()
351 );
352 Ok(output)
353 }
354}
355
356impl Default for DataFusionQueryExecutor {
357 fn default() -> Self {
358 Self
359 }
360}
361
362#[async_trait]
364pub trait DataFusionQueryExecutorExt: MetricsRepository {
365 #[instrument(skip(self, query))]
381 async fn execute_datafusion_query(
382 &self,
383 query: MetricsQuery,
384 ) -> Result<Vec<(ResultKey, AnalyzerContext)>> {
385 let all_keys = self.list_keys().await?;
387 let mut data = Vec::with_capacity(all_keys.len());
388
389 for key in all_keys {
390 if let Ok(Some(context)) = self.get(&key).await {
391 data.push((key, context));
392 }
393 }
394
395 DataFusionQueryExecutor::execute_optimized_query(
397 data,
398 query.get_before(),
399 query.get_after(),
400 query.get_tags(),
401 query.get_analyzers(),
402 query.get_limit(),
403 query.get_offset(),
404 query.is_ascending(),
405 )
406 .await
407 }
408}
409
410impl<T: MetricsRepository + ?Sized> DataFusionQueryExecutorExt for T {}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use crate::analyzers::types::MetricValue;
417
418 fn create_test_data() -> Vec<(ResultKey, AnalyzerContext)> {
419 let mut data = Vec::new();
420
421 for i in 0..100 {
422 let key = ResultKey::new(i * 1000)
423 .with_tag("env", if i % 2 == 0 { "prod" } else { "staging" })
424 .with_tag("region", if i % 3 == 0 { "us-east-1" } else { "us-west-2" })
425 .with_tag("version", format!("v{}.0.0", i % 5));
426
427 let mut context = AnalyzerContext::new();
428 context.store_metric("row_count", MetricValue::Long(i * 100));
429 context.store_metric(
430 "completeness",
431 MetricValue::Double(0.95 + (i as f64 * 0.001)),
432 );
433
434 data.push((key, context));
435 }
436
437 data
438 }
439
440 #[tokio::test]
441 async fn test_datafusion_query_executor_basic() {
442 let data = create_test_data();
443
444 let results = DataFusionQueryExecutor::execute_optimized_query(
446 data.clone(),
447 None,
448 None,
449 &HashMap::new(),
450 &None,
451 Some(10),
452 None,
453 false,
454 )
455 .await
456 .unwrap();
457
458 assert_eq!(results.len(), 10);
459 assert!(results[0].0.timestamp >= results[1].0.timestamp);
461 }
462
463 #[tokio::test]
464 async fn test_datafusion_query_executor_time_filter() {
465 let data = create_test_data();
466
467 let results = DataFusionQueryExecutor::execute_optimized_query(
469 data,
470 Some(50000), Some(10000), &HashMap::new(),
473 &None,
474 None,
475 None,
476 true, )
478 .await
479 .unwrap();
480
481 assert_eq!(results.len(), 40);
483 assert!(results[0].0.timestamp >= 10000);
484 assert!(results[0].0.timestamp < 50000);
485
486 assert!(results[0].0.timestamp <= results[1].0.timestamp);
488 }
489
490 #[tokio::test]
491 async fn test_datafusion_query_executor_tag_filter() {
492 let data = create_test_data();
493
494 let mut tags = HashMap::new();
495 tags.insert("env".to_string(), "prod".to_string());
496
497 let results = DataFusionQueryExecutor::execute_optimized_query(
498 data, None, None, &tags, &None, None, None, false,
499 )
500 .await
501 .unwrap();
502
503 assert_eq!(results.len(), 50);
505 for (key, _) in results {
506 assert_eq!(key.get_tag("env"), Some("prod"));
507 }
508 }
509
510 #[tokio::test]
511 async fn test_datafusion_query_executor_pagination() {
512 let data = create_test_data();
513
514 let page1 = DataFusionQueryExecutor::execute_optimized_query(
516 data.clone(),
517 None,
518 None,
519 &HashMap::new(),
520 &None,
521 Some(20), Some(0), true, )
525 .await
526 .unwrap();
527
528 let page2 = DataFusionQueryExecutor::execute_optimized_query(
529 data,
530 None,
531 None,
532 &HashMap::new(),
533 &None,
534 Some(20), Some(20), true, )
538 .await
539 .unwrap();
540
541 assert_eq!(page1.len(), 20);
542 assert_eq!(page2.len(), 20);
543
544 assert_ne!(page1[0].0.timestamp, page2[0].0.timestamp);
546
547 assert!(page1[19].0.timestamp < page2[0].0.timestamp);
549 }
550
551 #[tokio::test]
552 async fn test_record_batch_creation() {
553 let data = create_test_data();
554
555 let batch = DataFusionQueryExecutor::create_record_batch(&data[0..10]).unwrap();
556
557 assert_eq!(batch.num_rows(), 10);
558 assert!(batch.num_columns() >= 5); let timestamps = batch.column_by_name("timestamp").unwrap();
562 assert_eq!(
563 timestamps.data_type(),
564 &DataType::Timestamp(TimeUnit::Millisecond, None)
565 );
566
567 let indices = batch.column_by_name("row_index").unwrap();
569 assert_eq!(indices.data_type(), &DataType::Int64);
570 }
571}