1use std::{any::Any, ffi::c_void, sync::Arc};
19
20use abi_stable::{
21 std_types::{ROption, RResult, RString, RVec},
22 StableAbi,
23};
24use async_ffi::{FfiFuture, FutureExt};
25use async_trait::async_trait;
26use datafusion::{
27 catalog::{SchemaProvider, TableProvider},
28 error::DataFusionError,
29};
30use tokio::runtime::Handle;
31
32use crate::{
33 df_result, rresult_return,
34 table_provider::{FFI_TableProvider, ForeignTableProvider},
35};
36
37use datafusion::error::Result;
38
39#[repr(C)]
41#[derive(Debug, StableAbi)]
42#[allow(non_camel_case_types)]
43pub struct FFI_SchemaProvider {
44 pub owner_name: ROption<RString>,
45
46 pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
47
48 pub table: unsafe extern "C" fn(
49 provider: &Self,
50 name: RString,
51 ) -> FfiFuture<
52 RResult<ROption<FFI_TableProvider>, RString>,
53 >,
54
55 pub register_table:
56 unsafe extern "C" fn(
57 provider: &Self,
58 name: RString,
59 table: FFI_TableProvider,
60 ) -> RResult<ROption<FFI_TableProvider>, RString>,
61
62 pub deregister_table:
63 unsafe extern "C" fn(
64 provider: &Self,
65 name: RString,
66 ) -> RResult<ROption<FFI_TableProvider>, RString>,
67
68 pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> bool,
69
70 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
73
74 pub release: unsafe extern "C" fn(arg: &mut Self),
76
77 pub version: unsafe extern "C" fn() -> u64,
79
80 pub private_data: *mut c_void,
83}
84
85unsafe impl Send for FFI_SchemaProvider {}
86unsafe impl Sync for FFI_SchemaProvider {}
87
88struct ProviderPrivateData {
89 provider: Arc<dyn SchemaProvider + Send>,
90 runtime: Option<Handle>,
91}
92
93impl FFI_SchemaProvider {
94 unsafe fn inner(&self) -> &Arc<dyn SchemaProvider + Send> {
95 let private_data = self.private_data as *const ProviderPrivateData;
96 &(*private_data).provider
97 }
98
99 unsafe fn runtime(&self) -> Option<Handle> {
100 let private_data = self.private_data as *const ProviderPrivateData;
101 (*private_data).runtime.clone()
102 }
103}
104
105unsafe extern "C" fn table_names_fn_wrapper(
106 provider: &FFI_SchemaProvider,
107) -> RVec<RString> {
108 let provider = provider.inner();
109
110 let table_names = provider.table_names();
111 table_names.into_iter().map(|s| s.into()).collect()
112}
113
114unsafe extern "C" fn table_fn_wrapper(
115 provider: &FFI_SchemaProvider,
116 name: RString,
117) -> FfiFuture<RResult<ROption<FFI_TableProvider>, RString>> {
118 let runtime = provider.runtime();
119 let provider = Arc::clone(provider.inner());
120
121 async move {
122 let table = rresult_return!(provider.table(name.as_str()).await)
123 .map(|t| FFI_TableProvider::new(t, true, runtime))
124 .into();
125
126 RResult::ROk(table)
127 }
128 .into_ffi()
129}
130
131unsafe extern "C" fn register_table_fn_wrapper(
132 provider: &FFI_SchemaProvider,
133 name: RString,
134 table: FFI_TableProvider,
135) -> RResult<ROption<FFI_TableProvider>, RString> {
136 let runtime = provider.runtime();
137 let provider = provider.inner();
138
139 let table = Arc::new(ForeignTableProvider(table));
140
141 let returned_table = rresult_return!(provider.register_table(name.into(), table))
142 .map(|t| FFI_TableProvider::new(t, true, runtime));
143
144 RResult::ROk(returned_table.into())
145}
146
147unsafe extern "C" fn deregister_table_fn_wrapper(
148 provider: &FFI_SchemaProvider,
149 name: RString,
150) -> RResult<ROption<FFI_TableProvider>, RString> {
151 let runtime = provider.runtime();
152 let provider = provider.inner();
153
154 let returned_table = rresult_return!(provider.deregister_table(name.as_str()))
155 .map(|t| FFI_TableProvider::new(t, true, runtime));
156
157 RResult::ROk(returned_table.into())
158}
159
160unsafe extern "C" fn table_exist_fn_wrapper(
161 provider: &FFI_SchemaProvider,
162 name: RString,
163) -> bool {
164 provider.inner().table_exist(name.as_str())
165}
166
167unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) {
168 let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
169 drop(private_data);
170}
171
172unsafe extern "C" fn clone_fn_wrapper(
173 provider: &FFI_SchemaProvider,
174) -> FFI_SchemaProvider {
175 let old_private_data = provider.private_data as *const ProviderPrivateData;
176 let runtime = (*old_private_data).runtime.clone();
177
178 let private_data = Box::into_raw(Box::new(ProviderPrivateData {
179 provider: Arc::clone(&(*old_private_data).provider),
180 runtime,
181 })) as *mut c_void;
182
183 FFI_SchemaProvider {
184 owner_name: provider.owner_name.clone(),
185 table_names: table_names_fn_wrapper,
186 clone: clone_fn_wrapper,
187 release: release_fn_wrapper,
188 version: super::version,
189 private_data,
190 table: table_fn_wrapper,
191 register_table: register_table_fn_wrapper,
192 deregister_table: deregister_table_fn_wrapper,
193 table_exist: table_exist_fn_wrapper,
194 }
195}
196
197impl Drop for FFI_SchemaProvider {
198 fn drop(&mut self) {
199 unsafe { (self.release)(self) }
200 }
201}
202
203impl FFI_SchemaProvider {
204 pub fn new(
206 provider: Arc<dyn SchemaProvider + Send>,
207 runtime: Option<Handle>,
208 ) -> Self {
209 let owner_name = provider.owner_name().map(|s| s.into()).into();
210 let private_data = Box::new(ProviderPrivateData { provider, runtime });
211
212 Self {
213 owner_name,
214 table_names: table_names_fn_wrapper,
215 clone: clone_fn_wrapper,
216 release: release_fn_wrapper,
217 version: super::version,
218 private_data: Box::into_raw(private_data) as *mut c_void,
219 table: table_fn_wrapper,
220 register_table: register_table_fn_wrapper,
221 deregister_table: deregister_table_fn_wrapper,
222 table_exist: table_exist_fn_wrapper,
223 }
224 }
225}
226
227#[derive(Debug)]
232pub struct ForeignSchemaProvider(pub FFI_SchemaProvider);
233
234unsafe impl Send for ForeignSchemaProvider {}
235unsafe impl Sync for ForeignSchemaProvider {}
236
237impl From<&FFI_SchemaProvider> for ForeignSchemaProvider {
238 fn from(provider: &FFI_SchemaProvider) -> Self {
239 Self(provider.clone())
240 }
241}
242
243impl Clone for FFI_SchemaProvider {
244 fn clone(&self) -> Self {
245 unsafe { (self.clone)(self) }
246 }
247}
248
249#[async_trait]
250impl SchemaProvider for ForeignSchemaProvider {
251 fn as_any(&self) -> &dyn Any {
252 self
253 }
254
255 fn owner_name(&self) -> Option<&str> {
256 let name: Option<&RString> = self.0.owner_name.as_ref().into();
257 name.map(|s| s.as_str())
258 }
259
260 fn table_names(&self) -> Vec<String> {
261 unsafe {
262 (self.0.table_names)(&self.0)
263 .into_iter()
264 .map(|s| s.into())
265 .collect()
266 }
267 }
268
269 async fn table(
270 &self,
271 name: &str,
272 ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
273 unsafe {
274 let table: Option<FFI_TableProvider> =
275 df_result!((self.0.table)(&self.0, name.into()).await)?.into();
276
277 let table = table.as_ref().map(|t| {
278 Arc::new(ForeignTableProvider::from(t)) as Arc<dyn TableProvider>
279 });
280
281 Ok(table)
282 }
283 }
284
285 fn register_table(
286 &self,
287 name: String,
288 table: Arc<dyn TableProvider>,
289 ) -> Result<Option<Arc<dyn TableProvider>>> {
290 unsafe {
291 let ffi_table = match table.as_any().downcast_ref::<ForeignTableProvider>() {
292 Some(t) => t.0.clone(),
293 None => FFI_TableProvider::new(table, true, None),
294 };
295
296 let returned_provider: Option<FFI_TableProvider> =
297 df_result!((self.0.register_table)(&self.0, name.into(), ffi_table))?
298 .into();
299
300 Ok(returned_provider
301 .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
302 }
303 }
304
305 fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
306 let returned_provider: Option<FFI_TableProvider> = unsafe {
307 df_result!((self.0.deregister_table)(&self.0, name.into()))?.into()
308 };
309
310 Ok(returned_provider
311 .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
312 }
313
314 fn table_exist(&self, name: &str) -> bool {
316 unsafe { (self.0.table_exist)(&self.0, name.into()) }
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use arrow::datatypes::Schema;
323 use datafusion::{catalog::MemorySchemaProvider, datasource::empty::EmptyTable};
324
325 use super::*;
326
327 fn empty_table() -> Arc<dyn TableProvider> {
328 Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
329 }
330
331 #[tokio::test]
332 async fn test_round_trip_ffi_schema_provider() {
333 let schema_provider = Arc::new(MemorySchemaProvider::new());
334 assert!(schema_provider
335 .as_ref()
336 .register_table("prior_table".to_string(), empty_table())
337 .unwrap()
338 .is_none());
339
340 let ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, None);
341
342 let foreign_schema_provider: ForeignSchemaProvider =
343 (&ffi_schema_provider).into();
344
345 let prior_table_names = foreign_schema_provider.table_names();
346 assert_eq!(prior_table_names.len(), 1);
347 assert_eq!(prior_table_names[0], "prior_table");
348
349 let returned_schema = foreign_schema_provider
351 .register_table("prior_table".to_string(), empty_table());
352 assert!(returned_schema.is_err());
353 assert_eq!(foreign_schema_provider.table_names().len(), 1);
354
355 let returned_schema = foreign_schema_provider
357 .register_table("second_table".to_string(), empty_table())
358 .expect("Unable to register table");
359 assert!(returned_schema.is_none());
360 assert_eq!(foreign_schema_provider.table_names().len(), 2);
361
362 let returned_schema = foreign_schema_provider
364 .deregister_table("prior_table")
365 .expect("Unable to deregister table");
366 assert!(returned_schema.is_some());
367 assert_eq!(foreign_schema_provider.table_names().len(), 1);
368
369 let returned_schema = foreign_schema_provider
371 .table("prior_table")
372 .await
373 .expect("Unable to query table");
374 assert!(returned_schema.is_none());
375 assert!(!foreign_schema_provider.table_exist("prior_table"));
376
377 let returned_schema = foreign_schema_provider
379 .table("second_table")
380 .await
381 .expect("Unable to query table");
382 assert!(returned_schema.is_some());
383 assert!(foreign_schema_provider.table_exist("second_table"));
384 }
385}