datafusion_table_providers/mysql/
sql_table.rs1use crate::sql::db_connection_pool::mysqlpool::MySQLConnectionPool;
2use crate::sql::db_connection_pool::DbConnectionPool;
3use async_trait::async_trait;
4use datafusion::catalog::Session;
5use datafusion::sql::unparser::dialect::MySqlDialect;
6use futures::TryStreamExt;
7use mysql_async::prelude::ToValue;
8use std::fmt::Display;
9use std::{any::Any, fmt, sync::Arc};
10
11use crate::sql::sql_provider_datafusion::{
12 self, get_stream, to_execution_error, Result as SqlResult, SqlExec, SqlTable,
13};
14use datafusion::{
15 arrow::datatypes::SchemaRef,
16 datasource::TableProvider,
17 error::Result as DataFusionResult,
18 execution::TaskContext,
19 logical_expr::{Expr, TableProviderFilterPushDown, TableType},
20 physical_plan::{
21 stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
22 PlanProperties, SendableRecordBatchStream,
23 },
24 sql::TableReference,
25};
26
27pub struct MySQLTable {
28 pool: Arc<MySQLConnectionPool>,
29 pub(crate) base_table: SqlTable<mysql_async::Conn, &'static (dyn ToValue + Sync)>,
30}
31
32impl std::fmt::Debug for MySQLTable {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("MySQLTable")
35 .field("base_table", &self.base_table)
36 .finish()
37 }
38}
39
40impl MySQLTable {
41 pub async fn new(
42 pool: &Arc<MySQLConnectionPool>,
43 table_reference: impl Into<TableReference>,
44 ) -> Result<Self, sql_provider_datafusion::Error> {
45 let dyn_pool = Arc::clone(pool)
46 as Arc<
47 dyn DbConnectionPool<mysql_async::Conn, &'static (dyn ToValue + Sync)>
48 + Send
49 + Sync,
50 >;
51 let base_table = SqlTable::new("mysql", &dyn_pool, table_reference)
52 .await?
53 .with_dialect(Arc::new(MySqlDialect {}));
54
55 Ok(Self {
56 pool: Arc::clone(pool),
57 base_table,
58 })
59 }
60
61 fn create_physical_plan(
62 &self,
63 projections: Option<&Vec<usize>>,
64 schema: &SchemaRef,
65 filters: &[Expr],
66 limit: Option<usize>,
67 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
68 let sql = self.base_table.scan_to_sql(projections, filters, limit)?;
69 Ok(Arc::new(MySQLSQLExec::new(
70 projections,
71 schema,
72 Arc::clone(&self.pool),
73 sql,
74 )?))
75 }
76}
77
78#[async_trait]
79impl TableProvider for MySQLTable {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 fn schema(&self) -> SchemaRef {
85 self.base_table.schema()
86 }
87
88 fn table_type(&self) -> TableType {
89 self.base_table.table_type()
90 }
91
92 fn supports_filters_pushdown(
93 &self,
94 filters: &[&Expr],
95 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
96 self.base_table.supports_filters_pushdown(filters)
97 }
98
99 async fn scan(
100 &self,
101 _state: &dyn Session,
102 projection: Option<&Vec<usize>>,
103 filters: &[Expr],
104 limit: Option<usize>,
105 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
106 return self.create_physical_plan(projection, &self.schema(), filters, limit);
107 }
108}
109
110impl Display for MySQLTable {
111 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112 write!(f, "MySQLTable {}", self.base_table.name())
113 }
114}
115
116struct MySQLSQLExec {
117 base_exec: SqlExec<mysql_async::Conn, &'static (dyn ToValue + Sync)>,
118}
119
120impl MySQLSQLExec {
121 fn new(
122 projections: Option<&Vec<usize>>,
123 schema: &SchemaRef,
124 pool: Arc<MySQLConnectionPool>,
125 sql: String,
126 ) -> DataFusionResult<Self> {
127 let base_exec = SqlExec::new(projections, schema, pool, sql)?;
128
129 Ok(Self { base_exec })
130 }
131
132 fn sql(&self) -> SqlResult<String> {
133 self.base_exec.sql()
134 }
135}
136
137impl std::fmt::Debug for MySQLSQLExec {
138 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
139 let sql = self.sql().unwrap_or_default();
140 write!(f, "MySQLSQLExec sql={sql}")
141 }
142}
143
144impl DisplayAs for MySQLSQLExec {
145 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
146 let sql = self.sql().unwrap_or_default();
147 write!(f, "MySQLSQLExec sql={sql}")
148 }
149}
150
151impl ExecutionPlan for MySQLSQLExec {
152 fn name(&self) -> &'static str {
153 "MySQLSQLExec"
154 }
155
156 fn as_any(&self) -> &dyn Any {
157 self
158 }
159
160 fn schema(&self) -> SchemaRef {
161 self.base_exec.schema()
162 }
163
164 fn properties(&self) -> &PlanProperties {
165 self.base_exec.properties()
166 }
167
168 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
169 self.base_exec.children()
170 }
171
172 fn with_new_children(
173 self: Arc<Self>,
174 _children: Vec<Arc<dyn ExecutionPlan>>,
175 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
176 Ok(self)
177 }
178
179 fn execute(
180 &self,
181 _partition: usize,
182 _context: Arc<TaskContext>,
183 ) -> DataFusionResult<SendableRecordBatchStream> {
184 let sql = self.sql().map_err(to_execution_error)?;
185 tracing::debug!("MySQLSQLExec sql: {sql}");
186
187 let fut = get_stream(self.base_exec.clone_pool(), sql, Arc::clone(&self.schema()));
188
189 let stream = futures::stream::once(fut).try_flatten();
190 let schema = Arc::clone(&self.schema());
191 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
192 }
193}