datafusion_table_providers/sqlite/
sql_table.rs1use crate::sql::db_connection_pool::DbConnectionPool;
2use async_trait::async_trait;
3use datafusion::catalog::Session;
4use datafusion::sql::unparser::dialect::SqliteDialect;
5use futures::TryStreamExt;
6use std::fmt::Display;
7use std::{any::Any, fmt, sync::Arc};
8
9use crate::sql::sql_provider_datafusion::{
10 get_stream, to_execution_error, Result as SqlResult, SqlExec, SqlTable,
11};
12use datafusion::{
13 arrow::datatypes::SchemaRef,
14 datasource::TableProvider,
15 error::Result as DataFusionResult,
16 execution::TaskContext,
17 logical_expr::{Expr, TableProviderFilterPushDown, TableType},
18 physical_plan::{
19 stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
20 PlanProperties, SendableRecordBatchStream,
21 },
22 sql::TableReference,
23};
24
25pub struct SQLiteTable<T: 'static, P: 'static> {
26 pub(crate) base_table: SqlTable<T, P>,
27}
28
29impl<T, P> std::fmt::Debug for SQLiteTable<T, P> {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("SQLiteTable")
32 .field("base_table", &self.base_table)
33 .finish()
34 }
35}
36
37impl<T, P> SQLiteTable<T, P> {
38 pub fn new_with_schema(
39 pool: &Arc<dyn DbConnectionPool<T, P> + Send + Sync>,
40 schema: impl Into<SchemaRef>,
41 table_reference: impl Into<TableReference>,
42 ) -> Self {
43 let base_table = SqlTable::new_with_schema("sqlite", pool, schema, table_reference)
44 .with_dialect(Arc::new(SqliteDialect {}));
45
46 Self { base_table }
47 }
48
49 fn create_physical_plan(
50 &self,
51 projection: Option<&Vec<usize>>,
52 schema: &SchemaRef,
53 sql: String,
54 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
55 Ok(Arc::new(SQLiteSqlExec::new(
56 projection,
57 schema,
58 self.base_table.clone_pool(),
59 sql,
60 )?))
61 }
62}
63
64#[async_trait]
65impl<T, P> TableProvider for SQLiteTable<T, P> {
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69
70 fn schema(&self) -> SchemaRef {
71 self.base_table.schema()
72 }
73
74 fn table_type(&self) -> TableType {
75 self.base_table.table_type()
76 }
77
78 fn supports_filters_pushdown(
79 &self,
80 filters: &[&Expr],
81 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
82 self.base_table.supports_filters_pushdown(filters)
83 }
84
85 async fn scan(
86 &self,
87 _state: &dyn Session,
88 projection: Option<&Vec<usize>>,
89 filters: &[Expr],
90 limit: Option<usize>,
91 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
92 let sql = self.base_table.scan_to_sql(projection, filters, limit)?;
93 return self.create_physical_plan(projection, &self.schema(), sql);
94 }
95}
96
97impl<T, P> Display for SQLiteTable<T, P> {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(f, "SQLiteTable {}", self.base_table.name())
100 }
101}
102
103#[derive(Clone)]
104struct SQLiteSqlExec<T, P> {
105 base_exec: SqlExec<T, P>,
106}
107
108impl<T, P> SQLiteSqlExec<T, P> {
109 fn new(
110 projection: Option<&Vec<usize>>,
111 schema: &SchemaRef,
112 pool: Arc<dyn DbConnectionPool<T, P> + Send + Sync>,
113 sql: String,
114 ) -> DataFusionResult<Self> {
115 let base_exec = SqlExec::new(projection, schema, pool, sql)?;
116
117 Ok(Self { base_exec })
118 }
119
120 fn sql(&self) -> SqlResult<String> {
121 self.base_exec.sql()
122 }
123}
124
125impl<T, P> std::fmt::Debug for SQLiteSqlExec<T, P> {
126 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127 let sql = self.sql().unwrap_or_default();
128 write!(f, "SQLiteSqlExec sql={sql}")
129 }
130}
131
132impl<T, P> DisplayAs for SQLiteSqlExec<T, P> {
133 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
134 let sql = self.sql().unwrap_or_default();
135 write!(f, "SQLiteSqlExec sql={sql}")
136 }
137}
138
139impl<T: 'static, P: 'static> ExecutionPlan for SQLiteSqlExec<T, P> {
140 fn name(&self) -> &'static str {
141 "SQLiteSqlExec"
142 }
143
144 fn as_any(&self) -> &dyn Any {
145 self
146 }
147
148 fn schema(&self) -> SchemaRef {
149 self.base_exec.schema()
150 }
151
152 fn properties(&self) -> &PlanProperties {
153 self.base_exec.properties()
154 }
155
156 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
157 self.base_exec.children()
158 }
159
160 fn with_new_children(
161 self: Arc<Self>,
162 _children: Vec<Arc<dyn ExecutionPlan>>,
163 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
164 Ok(self)
165 }
166
167 fn execute(
168 &self,
169 _partition: usize,
170 _context: Arc<TaskContext>,
171 ) -> DataFusionResult<SendableRecordBatchStream> {
172 let sql = self.sql().map_err(to_execution_error)?;
173 tracing::debug!("SQLiteSqlExec sql: {sql}");
174
175 let fut = get_stream(self.base_exec.clone_pool(), sql, Arc::clone(&self.schema()));
176
177 let stream = futures::stream::once(fut).try_flatten();
178 let schema = Arc::clone(&self.schema());
179 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
180 }
181}