1use std::any::Any;
19use std::ffi::c_void;
20use std::sync::Arc;
21
22use abi_stable::StableAbi;
23use abi_stable::std_types::{ROption, RResult, RString, RVec};
24use async_ffi::{FfiFuture, FutureExt};
25use async_trait::async_trait;
26use datafusion_catalog::{SchemaProvider, TableProvider};
27use datafusion_common::error::{DataFusionError, Result};
28use datafusion_proto::logical_plan::{
29 DefaultLogicalExtensionCodec, LogicalExtensionCodec,
30};
31use tokio::runtime::Handle;
32
33use crate::execution::FFI_TaskContextProvider;
34use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
35use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
36use crate::util::FFIResult;
37use crate::{df_result, rresult_return};
38
39#[repr(C)]
41#[derive(Debug, StableAbi)]
42pub struct FFI_SchemaProvider {
43 pub owner_name: ROption<RString>,
44
45 pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
46
47 pub table: unsafe extern "C" fn(
48 provider: &Self,
49 name: RString,
50 )
51 -> FfiFuture<FFIResult<ROption<FFI_TableProvider>>>,
52
53 pub register_table: unsafe extern "C" fn(
54 provider: &Self,
55 name: RString,
56 table: FFI_TableProvider,
57 )
58 -> FFIResult<ROption<FFI_TableProvider>>,
59
60 pub deregister_table: unsafe extern "C" fn(
61 provider: &Self,
62 name: RString,
63 )
64 -> FFIResult<ROption<FFI_TableProvider>>,
65
66 pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> bool,
67
68 pub logical_codec: FFI_LogicalExtensionCodec,
69
70 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
73
74 pub release: unsafe extern "C" fn(arg: &mut Self),
76
77 pub version: unsafe extern "C" fn() -> u64,
79
80 pub private_data: *mut c_void,
83
84 pub library_marker_id: extern "C" fn() -> usize,
88}
89
90unsafe impl Send for FFI_SchemaProvider {}
91unsafe impl Sync for FFI_SchemaProvider {}
92
93struct ProviderPrivateData {
94 provider: Arc<dyn SchemaProvider + Send>,
95 runtime: Option<Handle>,
96}
97
98impl FFI_SchemaProvider {
99 unsafe fn inner(&self) -> &Arc<dyn SchemaProvider + Send> {
100 unsafe {
101 let private_data = self.private_data as *const ProviderPrivateData;
102 &(*private_data).provider
103 }
104 }
105
106 unsafe fn runtime(&self) -> Option<Handle> {
107 unsafe {
108 let private_data = self.private_data as *const ProviderPrivateData;
109 (*private_data).runtime.clone()
110 }
111 }
112}
113
114unsafe extern "C" fn table_names_fn_wrapper(
115 provider: &FFI_SchemaProvider,
116) -> RVec<RString> {
117 unsafe {
118 let provider = provider.inner();
119
120 let table_names = provider.table_names();
121 table_names.into_iter().map(|s| s.into()).collect()
122 }
123}
124
125unsafe extern "C" fn table_fn_wrapper(
126 provider: &FFI_SchemaProvider,
127 name: RString,
128) -> FfiFuture<FFIResult<ROption<FFI_TableProvider>>> {
129 unsafe {
130 let runtime = provider.runtime();
131 let logical_codec = provider.logical_codec.clone();
132 let provider = Arc::clone(provider.inner());
133
134 async move {
135 let table = rresult_return!(provider.table(name.as_str()).await)
136 .map(|t| {
137 FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
138 })
139 .into();
140
141 RResult::ROk(table)
142 }
143 .into_ffi()
144 }
145}
146
147unsafe extern "C" fn register_table_fn_wrapper(
148 provider: &FFI_SchemaProvider,
149 name: RString,
150 table: FFI_TableProvider,
151) -> FFIResult<ROption<FFI_TableProvider>> {
152 unsafe {
153 let runtime = provider.runtime();
154 let logical_codec = provider.logical_codec.clone();
155 let provider = provider.inner();
156
157 let table = Arc::new(ForeignTableProvider(table));
158
159 let returned_table = rresult_return!(provider.register_table(name.into(), table))
160 .map(|t| {
161 FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
162 });
163
164 RResult::ROk(returned_table.into())
165 }
166}
167
168unsafe extern "C" fn deregister_table_fn_wrapper(
169 provider: &FFI_SchemaProvider,
170 name: RString,
171) -> FFIResult<ROption<FFI_TableProvider>> {
172 unsafe {
173 let runtime = provider.runtime();
174 let logical_codec = provider.logical_codec.clone();
175 let provider = provider.inner();
176
177 let returned_table = rresult_return!(provider.deregister_table(name.as_str()))
178 .map(|t| {
179 FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
180 });
181
182 RResult::ROk(returned_table.into())
183 }
184}
185
186unsafe extern "C" fn table_exist_fn_wrapper(
187 provider: &FFI_SchemaProvider,
188 name: RString,
189) -> bool {
190 unsafe { provider.inner().table_exist(name.as_str()) }
191}
192
193unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) {
194 unsafe {
195 debug_assert!(!provider.private_data.is_null());
196 let private_data =
197 Box::from_raw(provider.private_data as *mut ProviderPrivateData);
198 drop(private_data);
199 provider.private_data = std::ptr::null_mut();
200 }
201}
202
203unsafe extern "C" fn clone_fn_wrapper(
204 provider: &FFI_SchemaProvider,
205) -> FFI_SchemaProvider {
206 unsafe {
207 let old_private_data = provider.private_data as *const ProviderPrivateData;
208 let runtime = (*old_private_data).runtime.clone();
209
210 let private_data = Box::into_raw(Box::new(ProviderPrivateData {
211 provider: Arc::clone(&(*old_private_data).provider),
212 runtime,
213 })) as *mut c_void;
214
215 FFI_SchemaProvider {
216 owner_name: provider.owner_name.clone(),
217 table_names: table_names_fn_wrapper,
218 table: table_fn_wrapper,
219 register_table: register_table_fn_wrapper,
220 deregister_table: deregister_table_fn_wrapper,
221 table_exist: table_exist_fn_wrapper,
222 logical_codec: provider.logical_codec.clone(),
223 clone: clone_fn_wrapper,
224 release: release_fn_wrapper,
225 version: super::version,
226 private_data,
227 library_marker_id: crate::get_library_marker_id,
228 }
229 }
230}
231
232impl Drop for FFI_SchemaProvider {
233 fn drop(&mut self) {
234 unsafe { (self.release)(self) }
235 }
236}
237
238impl FFI_SchemaProvider {
239 pub fn new(
241 provider: Arc<dyn SchemaProvider + Send>,
242 runtime: Option<Handle>,
243 task_ctx_provider: impl Into<FFI_TaskContextProvider>,
244 logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
245 ) -> Self {
246 let task_ctx_provider = task_ctx_provider.into();
247 let logical_codec =
248 logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
249 let logical_codec = FFI_LogicalExtensionCodec::new(
250 logical_codec,
251 runtime.clone(),
252 task_ctx_provider.clone(),
253 );
254 Self::new_with_ffi_codec(provider, runtime, logical_codec)
255 }
256
257 pub fn new_with_ffi_codec(
258 provider: Arc<dyn SchemaProvider + Send>,
259 runtime: Option<Handle>,
260 logical_codec: FFI_LogicalExtensionCodec,
261 ) -> Self {
262 if let Some(provider) = provider.as_any().downcast_ref::<ForeignSchemaProvider>()
263 {
264 return provider.0.clone();
265 }
266
267 let owner_name = provider.owner_name().map(|s| s.into()).into();
268 let private_data = Box::new(ProviderPrivateData { provider, runtime });
269
270 Self {
271 owner_name,
272 table_names: table_names_fn_wrapper,
273 table: table_fn_wrapper,
274 register_table: register_table_fn_wrapper,
275 deregister_table: deregister_table_fn_wrapper,
276 table_exist: table_exist_fn_wrapper,
277 logical_codec,
278 clone: clone_fn_wrapper,
279 release: release_fn_wrapper,
280 version: super::version,
281 private_data: Box::into_raw(private_data) as *mut c_void,
282 library_marker_id: crate::get_library_marker_id,
283 }
284 }
285}
286
287#[derive(Debug)]
292pub struct ForeignSchemaProvider(pub FFI_SchemaProvider);
293
294unsafe impl Send for ForeignSchemaProvider {}
295unsafe impl Sync for ForeignSchemaProvider {}
296
297impl From<&FFI_SchemaProvider> for Arc<dyn SchemaProvider + Send> {
298 fn from(provider: &FFI_SchemaProvider) -> Self {
299 if (provider.library_marker_id)() == crate::get_library_marker_id() {
300 return Arc::clone(unsafe { provider.inner() });
301 }
302
303 Arc::new(ForeignSchemaProvider(provider.clone()))
304 as Arc<dyn SchemaProvider + Send>
305 }
306}
307
308impl Clone for FFI_SchemaProvider {
309 fn clone(&self) -> Self {
310 unsafe { (self.clone)(self) }
311 }
312}
313
314#[async_trait]
315impl SchemaProvider for ForeignSchemaProvider {
316 fn as_any(&self) -> &dyn Any {
317 self
318 }
319
320 fn owner_name(&self) -> Option<&str> {
321 let name: Option<&RString> = self.0.owner_name.as_ref().into();
322 name.map(|s| s.as_str())
323 }
324
325 fn table_names(&self) -> Vec<String> {
326 unsafe {
327 (self.0.table_names)(&self.0)
328 .into_iter()
329 .map(|s| s.into())
330 .collect()
331 }
332 }
333
334 async fn table(
335 &self,
336 name: &str,
337 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
338 unsafe {
339 let table: Option<FFI_TableProvider> =
340 df_result!((self.0.table)(&self.0, name.into()).await)?.into();
341
342 let table = table.as_ref().map(<Arc<dyn TableProvider>>::from);
343
344 Ok(table)
345 }
346 }
347
348 fn register_table(
349 &self,
350 name: String,
351 table: Arc<dyn TableProvider>,
352 ) -> Result<Option<Arc<dyn TableProvider>>> {
353 unsafe {
354 let ffi_table = match table.as_any().downcast_ref::<ForeignTableProvider>() {
355 Some(t) => t.0.clone(),
356 None => FFI_TableProvider::new_with_ffi_codec(
357 table,
358 true,
359 None,
360 self.0.logical_codec.clone(),
361 ),
362 };
363
364 let returned_provider: Option<FFI_TableProvider> =
365 df_result!((self.0.register_table)(&self.0, name.into(), ffi_table))?
366 .into();
367
368 Ok(returned_provider
369 .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
370 }
371 }
372
373 fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
374 let returned_provider: Option<FFI_TableProvider> = unsafe {
375 df_result!((self.0.deregister_table)(&self.0, name.into()))?.into()
376 };
377
378 Ok(returned_provider
379 .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
380 }
381
382 fn table_exist(&self, name: &str) -> bool {
384 unsafe { (self.0.table_exist)(&self.0, name.into()) }
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use arrow::datatypes::Schema;
391 use datafusion::catalog::MemorySchemaProvider;
392 use datafusion::datasource::empty::EmptyTable;
393
394 use super::*;
395
396 fn empty_table() -> Arc<dyn TableProvider> {
397 Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
398 }
399
400 #[tokio::test]
401 async fn test_round_trip_ffi_schema_provider() {
402 let schema_provider = Arc::new(MemorySchemaProvider::new());
403 assert!(
404 schema_provider
405 .as_ref()
406 .register_table("prior_table".to_string(), empty_table())
407 .unwrap()
408 .is_none()
409 );
410
411 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
412
413 let mut ffi_schema_provider =
414 FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None);
415 ffi_schema_provider.library_marker_id = crate::mock_foreign_marker_id;
416
417 let foreign_schema_provider: Arc<dyn SchemaProvider + Send> =
418 (&ffi_schema_provider).into();
419
420 let prior_table_names = foreign_schema_provider.table_names();
421 assert_eq!(prior_table_names.len(), 1);
422 assert_eq!(prior_table_names[0], "prior_table");
423
424 let returned_schema = foreign_schema_provider
426 .register_table("prior_table".to_string(), empty_table());
427 assert!(returned_schema.is_err());
428 assert_eq!(foreign_schema_provider.table_names().len(), 1);
429
430 let returned_schema = foreign_schema_provider
432 .register_table("second_table".to_string(), empty_table())
433 .expect("Unable to register table");
434 assert!(returned_schema.is_none());
435 assert_eq!(foreign_schema_provider.table_names().len(), 2);
436
437 let returned_schema = foreign_schema_provider
439 .deregister_table("prior_table")
440 .expect("Unable to deregister table");
441 assert!(returned_schema.is_some());
442 assert_eq!(foreign_schema_provider.table_names().len(), 1);
443
444 let returned_schema = foreign_schema_provider
446 .table("prior_table")
447 .await
448 .expect("Unable to query table");
449 assert!(returned_schema.is_none());
450 assert!(!foreign_schema_provider.table_exist("prior_table"));
451
452 let returned_schema = foreign_schema_provider
454 .table("second_table")
455 .await
456 .expect("Unable to query table");
457 assert!(returned_schema.is_some());
458 assert!(foreign_schema_provider.table_exist("second_table"));
459 }
460
461 #[test]
462 fn test_ffi_schema_provider_local_bypass() {
463 let schema_provider = Arc::new(MemorySchemaProvider::new());
464
465 let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
466 let mut ffi_schema =
467 FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None);
468
469 let foreign_schema: Arc<dyn SchemaProvider + Send> = (&ffi_schema).into();
471 assert!(
472 foreign_schema
473 .as_any()
474 .downcast_ref::<MemorySchemaProvider>()
475 .is_some()
476 );
477
478 ffi_schema.library_marker_id = crate::mock_foreign_marker_id;
480 let foreign_schema: Arc<dyn SchemaProvider + Send> = (&ffi_schema).into();
481 assert!(
482 foreign_schema
483 .as_any()
484 .downcast_ref::<ForeignSchemaProvider>()
485 .is_some()
486 );
487 }
488}