datafusion_table_providers/sqlite/
sql_table.rs

1use crate::sql::db_connection_pool::DbConnectionPool;
2use async_trait::async_trait;
3use datafusion::catalog::Session;
4use datafusion::sql::unparser::dialect::SqliteDialect;
5use futures::TryStreamExt;
6use std::fmt::Display;
7use std::{any::Any, fmt, sync::Arc};
8
9use crate::sql::sql_provider_datafusion::{
10    get_stream, to_execution_error, Result as SqlResult, SqlExec, SqlTable,
11};
12use datafusion::{
13    arrow::datatypes::SchemaRef,
14    datasource::TableProvider,
15    error::Result as DataFusionResult,
16    execution::TaskContext,
17    logical_expr::{Expr, TableProviderFilterPushDown, TableType},
18    physical_plan::{
19        stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
20        PlanProperties, SendableRecordBatchStream,
21    },
22    sql::TableReference,
23};
24
25pub struct SQLiteTable<T: 'static, P: 'static> {
26    pub(crate) base_table: SqlTable<T, P>,
27}
28
29impl<T, P> std::fmt::Debug for SQLiteTable<T, P> {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("SQLiteTable")
32            .field("base_table", &self.base_table)
33            .finish()
34    }
35}
36
37impl<T, P> SQLiteTable<T, P> {
38    pub fn new_with_schema(
39        pool: &Arc<dyn DbConnectionPool<T, P> + Send + Sync>,
40        schema: impl Into<SchemaRef>,
41        table_reference: impl Into<TableReference>,
42    ) -> Self {
43        let base_table = SqlTable::new_with_schema("sqlite", pool, schema, table_reference)
44            .with_dialect(Arc::new(SqliteDialect {}));
45
46        Self { base_table }
47    }
48
49    fn create_physical_plan(
50        &self,
51        projection: Option<&Vec<usize>>,
52        schema: &SchemaRef,
53        sql: String,
54    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
55        Ok(Arc::new(SQLiteSqlExec::new(
56            projection,
57            schema,
58            self.base_table.clone_pool(),
59            sql,
60        )?))
61    }
62}
63
64#[async_trait]
65impl<T, P> TableProvider for SQLiteTable<T, P> {
66    fn as_any(&self) -> &dyn Any {
67        self
68    }
69
70    fn schema(&self) -> SchemaRef {
71        self.base_table.schema()
72    }
73
74    fn table_type(&self) -> TableType {
75        self.base_table.table_type()
76    }
77
78    fn supports_filters_pushdown(
79        &self,
80        filters: &[&Expr],
81    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
82        self.base_table.supports_filters_pushdown(filters)
83    }
84
85    async fn scan(
86        &self,
87        _state: &dyn Session,
88        projection: Option<&Vec<usize>>,
89        filters: &[Expr],
90        limit: Option<usize>,
91    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
92        let sql = self.base_table.scan_to_sql(projection, filters, limit)?;
93        return self.create_physical_plan(projection, &self.schema(), sql);
94    }
95}
96
97impl<T, P> Display for SQLiteTable<T, P> {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(f, "SQLiteTable {}", self.base_table.name())
100    }
101}
102
103#[derive(Clone)]
104struct SQLiteSqlExec<T, P> {
105    base_exec: SqlExec<T, P>,
106}
107
108impl<T, P> SQLiteSqlExec<T, P> {
109    fn new(
110        projection: Option<&Vec<usize>>,
111        schema: &SchemaRef,
112        pool: Arc<dyn DbConnectionPool<T, P> + Send + Sync>,
113        sql: String,
114    ) -> DataFusionResult<Self> {
115        let base_exec = SqlExec::new(projection, schema, pool, sql)?;
116
117        Ok(Self { base_exec })
118    }
119
120    fn sql(&self) -> SqlResult<String> {
121        self.base_exec.sql()
122    }
123}
124
125impl<T, P> std::fmt::Debug for SQLiteSqlExec<T, P> {
126    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127        let sql = self.sql().unwrap_or_default();
128        write!(f, "SQLiteSqlExec sql={sql}")
129    }
130}
131
132impl<T, P> DisplayAs for SQLiteSqlExec<T, P> {
133    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
134        let sql = self.sql().unwrap_or_default();
135        write!(f, "SQLiteSqlExec sql={sql}")
136    }
137}
138
139impl<T: 'static, P: 'static> ExecutionPlan for SQLiteSqlExec<T, P> {
140    fn name(&self) -> &'static str {
141        "SQLiteSqlExec"
142    }
143
144    fn as_any(&self) -> &dyn Any {
145        self
146    }
147
148    fn schema(&self) -> SchemaRef {
149        self.base_exec.schema()
150    }
151
152    fn properties(&self) -> &PlanProperties {
153        self.base_exec.properties()
154    }
155
156    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
157        self.base_exec.children()
158    }
159
160    fn with_new_children(
161        self: Arc<Self>,
162        _children: Vec<Arc<dyn ExecutionPlan>>,
163    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
164        Ok(self)
165    }
166
167    fn execute(
168        &self,
169        _partition: usize,
170        _context: Arc<TaskContext>,
171    ) -> DataFusionResult<SendableRecordBatchStream> {
172        let sql = self.sql().map_err(to_execution_error)?;
173        tracing::debug!("SQLiteSqlExec sql: {sql}");
174
175        let fut = get_stream(self.base_exec.clone_pool(), sql, Arc::clone(&self.schema()));
176
177        let stream = futures::stream::once(fut).try_flatten();
178        let schema = Arc::clone(&self.schema());
179        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
180    }
181}