Skip to main content

sql_stream/
engine.rs

1//! Query engine implementation using Apache DataFusion
2//!
3//! This module provides the core query execution engine built on Apache DataFusion,
4//! with support for registering CSV and JSON files as tables and executing SQL queries
5//! with streaming result processing.
6
7use crate::error::{Result, SqlStreamError};
8use datafusion::arrow::util::pretty::print_batches;
9use datafusion::prelude::*;
10use std::path::Path;
11use tracing::{debug, info, instrument};
12
13/// High-performance SQL query engine powered by Apache DataFusion
14///
15/// The `QueryEngine` manages a DataFusion `SessionContext` and provides
16/// methods for registering data files and executing SQL queries with
17/// zero-copy streaming.
18pub struct QueryEngine {
19    ctx: SessionContext,
20}
21
22impl QueryEngine {
23    /// Create a new query engine with default configuration
24    ///
25    /// # Errors
26    ///
27    /// Returns an error if the session context cannot be initialized
28    #[instrument]
29    pub fn new() -> Result<Self> {
30        info!("Initializing query engine");
31        let ctx = SessionContext::new();
32        Ok(Self { ctx })
33    }
34
35    /// Register a CSV or JSON file as a table in the query engine
36    ///
37    /// The file format is automatically detected from the file extension.
38    /// Supported formats: `.csv`, `.json`
39    ///
40    /// # Arguments
41    ///
42    /// * `file_path` - Path to the data file
43    /// * `table_name` - Name to use for the table in SQL queries
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if:
48    /// - The file does not exist
49    /// - The file format is unsupported
50    /// - Schema inference fails
51    /// - Table registration fails
52    #[instrument(skip(self))]
53    pub async fn register_file(&mut self, file_path: &str, table_name: &str) -> Result<()> {
54        let path = Path::new(file_path);
55
56        // Check if file exists
57        if !path.exists() {
58            return Err(SqlStreamError::FileNotFound(path.to_path_buf()));
59        }
60
61        info!("Registering file: {} as table: {}", file_path, table_name);
62
63        // Detect file format from extension
64        let extension = path
65            .extension()
66            .and_then(|ext| ext.to_str())
67            .ok_or_else(|| SqlStreamError::UnsupportedFormat(path.to_string_lossy().to_string()))?;
68
69        match extension.to_lowercase().as_str() {
70            "csv" => {
71                debug!("Detected CSV format");
72                self.ctx
73                    .register_csv(table_name, file_path, CsvReadOptions::new())
74                    .await
75                    .map_err(|e| {
76                        SqlStreamError::TableRegistration(table_name.to_string(), e.to_string())
77                    })?;
78            }
79            "json" => {
80                debug!("Detected JSON format");
81                self.ctx
82                    .register_json(table_name, file_path, NdJsonReadOptions::default())
83                    .await
84                    .map_err(|e| {
85                        SqlStreamError::TableRegistration(table_name.to_string(), e.to_string())
86                    })?;
87            }
88            _ => {
89                return Err(SqlStreamError::UnsupportedFormat(extension.to_string()));
90            }
91        }
92
93        info!("Successfully registered table: {}", table_name);
94        Ok(())
95    }
96
97    /// Execute a SQL query and return the results as a DataFrame
98    ///
99    /// # Arguments
100    ///
101    /// * `sql` - SQL query string to execute
102    ///
103    /// # Errors
104    ///
105    /// Returns an error if query parsing or execution fails
106    #[instrument(skip(self))]
107    pub async fn execute_query(&self, sql: &str) -> Result<DataFrame> {
108        info!("Executing SQL query");
109        debug!("Query: {}", sql);
110
111        let df = self
112            .ctx
113            .sql(sql)
114            .await
115            .map_err(|e| SqlStreamError::QueryExecution(e.to_string()))?;
116
117        Ok(df)
118    }
119
120    /// Execute a SQL query and print the results to stdout
121    ///
122    /// Uses Arrow's pretty printer for formatted table output with
123    /// streaming to handle large result sets efficiently.
124    ///
125    /// # Arguments
126    ///
127    /// * `dataframe` - The DataFrame to print
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if result collection or printing fails
132    #[instrument(skip(self, dataframe))]
133    pub async fn print_results(&self, dataframe: DataFrame) -> Result<()> {
134        info!("Collecting and printing results");
135
136        // Collect results as RecordBatches
137        let batches = dataframe.collect().await?;
138
139        // Print using Arrow's pretty printer
140        print_batches(&batches).map_err(|e| {
141            SqlStreamError::QueryExecution(format!("Failed to print results: {}", e))
142        })?;
143
144        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
145        info!("Query returned {} rows", total_rows);
146
147        Ok(())
148    }
149}
150
151impl Default for QueryEngine {
152    fn default() -> Self {
153        Self::new().expect("Failed to create default QueryEngine")
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[tokio::test]
162    async fn test_engine_creation() {
163        let engine = QueryEngine::new();
164        assert!(engine.is_ok());
165    }
166
167    #[tokio::test]
168    async fn test_file_not_found() {
169        let mut engine = QueryEngine::new().unwrap();
170        let result = engine.register_file("nonexistent.csv", "test").await;
171        assert!(matches!(result, Err(SqlStreamError::FileNotFound(_))));
172    }
173}