datafusion_table_providers/mysql/
sql_table.rs

1use crate::sql::db_connection_pool::mysqlpool::MySQLConnectionPool;
2use crate::sql::db_connection_pool::DbConnectionPool;
3use async_trait::async_trait;
4use datafusion::catalog::Session;
5use datafusion::sql::unparser::dialect::MySqlDialect;
6use futures::TryStreamExt;
7use mysql_async::prelude::ToValue;
8use std::fmt::Display;
9use std::{any::Any, fmt, sync::Arc};
10
11use crate::sql::sql_provider_datafusion::{
12    self, get_stream, to_execution_error, Result as SqlResult, SqlExec, SqlTable,
13};
14use datafusion::{
15    arrow::datatypes::SchemaRef,
16    datasource::TableProvider,
17    error::Result as DataFusionResult,
18    execution::TaskContext,
19    logical_expr::{Expr, TableProviderFilterPushDown, TableType},
20    physical_plan::{
21        stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan,
22        PlanProperties, SendableRecordBatchStream,
23    },
24    sql::TableReference,
25};
26
27pub struct MySQLTable {
28    pool: Arc<MySQLConnectionPool>,
29    pub(crate) base_table: SqlTable<mysql_async::Conn, &'static (dyn ToValue + Sync)>,
30}
31
32impl std::fmt::Debug for MySQLTable {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("MySQLTable")
35            .field("base_table", &self.base_table)
36            .finish()
37    }
38}
39
40impl MySQLTable {
41    pub async fn new(
42        pool: &Arc<MySQLConnectionPool>,
43        table_reference: impl Into<TableReference>,
44    ) -> Result<Self, sql_provider_datafusion::Error> {
45        let dyn_pool = Arc::clone(pool)
46            as Arc<
47                dyn DbConnectionPool<mysql_async::Conn, &'static (dyn ToValue + Sync)>
48                    + Send
49                    + Sync,
50            >;
51        let base_table = SqlTable::new("mysql", &dyn_pool, table_reference)
52            .await?
53            .with_dialect(Arc::new(MySqlDialect {}));
54
55        Ok(Self {
56            pool: Arc::clone(pool),
57            base_table,
58        })
59    }
60
61    fn create_physical_plan(
62        &self,
63        projections: Option<&Vec<usize>>,
64        schema: &SchemaRef,
65        filters: &[Expr],
66        limit: Option<usize>,
67    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
68        let sql = self.base_table.scan_to_sql(projections, filters, limit)?;
69        Ok(Arc::new(MySQLSQLExec::new(
70            projections,
71            schema,
72            Arc::clone(&self.pool),
73            sql,
74        )?))
75    }
76}
77
78#[async_trait]
79impl TableProvider for MySQLTable {
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    fn schema(&self) -> SchemaRef {
85        self.base_table.schema()
86    }
87
88    fn table_type(&self) -> TableType {
89        self.base_table.table_type()
90    }
91
92    fn supports_filters_pushdown(
93        &self,
94        filters: &[&Expr],
95    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
96        self.base_table.supports_filters_pushdown(filters)
97    }
98
99    async fn scan(
100        &self,
101        _state: &dyn Session,
102        projection: Option<&Vec<usize>>,
103        filters: &[Expr],
104        limit: Option<usize>,
105    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
106        return self.create_physical_plan(projection, &self.schema(), filters, limit);
107    }
108}
109
110impl Display for MySQLTable {
111    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
112        write!(f, "MySQLTable {}", self.base_table.name())
113    }
114}
115
116struct MySQLSQLExec {
117    base_exec: SqlExec<mysql_async::Conn, &'static (dyn ToValue + Sync)>,
118}
119
120impl MySQLSQLExec {
121    fn new(
122        projections: Option<&Vec<usize>>,
123        schema: &SchemaRef,
124        pool: Arc<MySQLConnectionPool>,
125        sql: String,
126    ) -> DataFusionResult<Self> {
127        let base_exec = SqlExec::new(projections, schema, pool, sql)?;
128
129        Ok(Self { base_exec })
130    }
131
132    fn sql(&self) -> SqlResult<String> {
133        self.base_exec.sql()
134    }
135}
136
137impl std::fmt::Debug for MySQLSQLExec {
138    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
139        let sql = self.sql().unwrap_or_default();
140        write!(f, "MySQLSQLExec sql={sql}")
141    }
142}
143
144impl DisplayAs for MySQLSQLExec {
145    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
146        let sql = self.sql().unwrap_or_default();
147        write!(f, "MySQLSQLExec sql={sql}")
148    }
149}
150
151impl ExecutionPlan for MySQLSQLExec {
152    fn name(&self) -> &'static str {
153        "MySQLSQLExec"
154    }
155
156    fn as_any(&self) -> &dyn Any {
157        self
158    }
159
160    fn schema(&self) -> SchemaRef {
161        self.base_exec.schema()
162    }
163
164    fn properties(&self) -> &PlanProperties {
165        self.base_exec.properties()
166    }
167
168    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
169        self.base_exec.children()
170    }
171
172    fn with_new_children(
173        self: Arc<Self>,
174        _children: Vec<Arc<dyn ExecutionPlan>>,
175    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
176        Ok(self)
177    }
178
179    fn execute(
180        &self,
181        _partition: usize,
182        _context: Arc<TaskContext>,
183    ) -> DataFusionResult<SendableRecordBatchStream> {
184        let sql = self.sql().map_err(to_execution_error)?;
185        tracing::debug!("MySQLSQLExec sql: {sql}");
186
187        let fut = get_stream(self.base_exec.clone_pool(), sql, Arc::clone(&self.schema()));
188
189        let stream = futures::stream::once(fut).try_flatten();
190        let schema = Arc::clone(&self.schema());
191        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
192    }
193}