use std::any::Any;
use std::ffi::c_void;
use std::sync::Arc;
use abi_stable::StableAbi;
use abi_stable::std_types::{ROption, RResult, RString, RVec};
use async_ffi::{FfiFuture, FutureExt};
use async_trait::async_trait;
use datafusion_catalog::{SchemaProvider, TableProvider};
use datafusion_common::error::{DataFusionError, Result};
use datafusion_proto::logical_plan::{
DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use tokio::runtime::Handle;
use crate::execution::FFI_TaskContextProvider;
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use crate::table_provider::{FFI_TableProvider, ForeignTableProvider};
use crate::util::FFIResult;
use crate::{df_result, rresult_return};
#[repr(C)]
#[derive(Debug, StableAbi)]
pub struct FFI_SchemaProvider {
pub owner_name: ROption<RString>,
pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
pub table: unsafe extern "C" fn(
provider: &Self,
name: RString,
)
-> FfiFuture<FFIResult<ROption<FFI_TableProvider>>>,
pub register_table: unsafe extern "C" fn(
provider: &Self,
name: RString,
table: FFI_TableProvider,
)
-> FFIResult<ROption<FFI_TableProvider>>,
pub deregister_table: unsafe extern "C" fn(
provider: &Self,
name: RString,
)
-> FFIResult<ROption<FFI_TableProvider>>,
pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> bool,
pub logical_codec: FFI_LogicalExtensionCodec,
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
pub release: unsafe extern "C" fn(arg: &mut Self),
pub version: unsafe extern "C" fn() -> u64,
pub private_data: *mut c_void,
pub library_marker_id: extern "C" fn() -> usize,
}
unsafe impl Send for FFI_SchemaProvider {}
unsafe impl Sync for FFI_SchemaProvider {}
struct ProviderPrivateData {
provider: Arc<dyn SchemaProvider + Send>,
runtime: Option<Handle>,
}
impl FFI_SchemaProvider {
unsafe fn inner(&self) -> &Arc<dyn SchemaProvider + Send> {
unsafe {
let private_data = self.private_data as *const ProviderPrivateData;
&(*private_data).provider
}
}
unsafe fn runtime(&self) -> Option<Handle> {
unsafe {
let private_data = self.private_data as *const ProviderPrivateData;
(*private_data).runtime.clone()
}
}
}
unsafe extern "C" fn table_names_fn_wrapper(
provider: &FFI_SchemaProvider,
) -> RVec<RString> {
unsafe {
let provider = provider.inner();
let table_names = provider.table_names();
table_names.into_iter().map(|s| s.into()).collect()
}
}
unsafe extern "C" fn table_fn_wrapper(
provider: &FFI_SchemaProvider,
name: RString,
) -> FfiFuture<FFIResult<ROption<FFI_TableProvider>>> {
unsafe {
let runtime = provider.runtime();
let logical_codec = provider.logical_codec.clone();
let provider = Arc::clone(provider.inner());
async move {
let table = rresult_return!(provider.table(name.as_str()).await)
.map(|t| {
FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
})
.into();
RResult::ROk(table)
}
.into_ffi()
}
}
unsafe extern "C" fn register_table_fn_wrapper(
provider: &FFI_SchemaProvider,
name: RString,
table: FFI_TableProvider,
) -> FFIResult<ROption<FFI_TableProvider>> {
unsafe {
let runtime = provider.runtime();
let logical_codec = provider.logical_codec.clone();
let provider = provider.inner();
let table = Arc::new(ForeignTableProvider(table));
let returned_table = rresult_return!(provider.register_table(name.into(), table))
.map(|t| {
FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
});
RResult::ROk(returned_table.into())
}
}
unsafe extern "C" fn deregister_table_fn_wrapper(
provider: &FFI_SchemaProvider,
name: RString,
) -> FFIResult<ROption<FFI_TableProvider>> {
unsafe {
let runtime = provider.runtime();
let logical_codec = provider.logical_codec.clone();
let provider = provider.inner();
let returned_table = rresult_return!(provider.deregister_table(name.as_str()))
.map(|t| {
FFI_TableProvider::new_with_ffi_codec(t, true, runtime, logical_codec)
});
RResult::ROk(returned_table.into())
}
}
unsafe extern "C" fn table_exist_fn_wrapper(
provider: &FFI_SchemaProvider,
name: RString,
) -> bool {
unsafe { provider.inner().table_exist(name.as_str()) }
}
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) {
unsafe {
debug_assert!(!provider.private_data.is_null());
let private_data =
Box::from_raw(provider.private_data as *mut ProviderPrivateData);
drop(private_data);
provider.private_data = std::ptr::null_mut();
}
}
unsafe extern "C" fn clone_fn_wrapper(
provider: &FFI_SchemaProvider,
) -> FFI_SchemaProvider {
unsafe {
let old_private_data = provider.private_data as *const ProviderPrivateData;
let runtime = (*old_private_data).runtime.clone();
let private_data = Box::into_raw(Box::new(ProviderPrivateData {
provider: Arc::clone(&(*old_private_data).provider),
runtime,
})) as *mut c_void;
FFI_SchemaProvider {
owner_name: provider.owner_name.clone(),
table_names: table_names_fn_wrapper,
table: table_fn_wrapper,
register_table: register_table_fn_wrapper,
deregister_table: deregister_table_fn_wrapper,
table_exist: table_exist_fn_wrapper,
logical_codec: provider.logical_codec.clone(),
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data,
library_marker_id: crate::get_library_marker_id,
}
}
}
impl Drop for FFI_SchemaProvider {
fn drop(&mut self) {
unsafe { (self.release)(self) }
}
}
impl FFI_SchemaProvider {
pub fn new(
provider: Arc<dyn SchemaProvider + Send>,
runtime: Option<Handle>,
task_ctx_provider: impl Into<FFI_TaskContextProvider>,
logical_codec: Option<Arc<dyn LogicalExtensionCodec>>,
) -> Self {
let task_ctx_provider = task_ctx_provider.into();
let logical_codec =
logical_codec.unwrap_or_else(|| Arc::new(DefaultLogicalExtensionCodec {}));
let logical_codec = FFI_LogicalExtensionCodec::new(
logical_codec,
runtime.clone(),
task_ctx_provider.clone(),
);
Self::new_with_ffi_codec(provider, runtime, logical_codec)
}
pub fn new_with_ffi_codec(
provider: Arc<dyn SchemaProvider + Send>,
runtime: Option<Handle>,
logical_codec: FFI_LogicalExtensionCodec,
) -> Self {
if let Some(provider) = provider.as_any().downcast_ref::<ForeignSchemaProvider>()
{
return provider.0.clone();
}
let owner_name = provider.owner_name().map(|s| s.into()).into();
let private_data = Box::new(ProviderPrivateData { provider, runtime });
Self {
owner_name,
table_names: table_names_fn_wrapper,
table: table_fn_wrapper,
register_table: register_table_fn_wrapper,
deregister_table: deregister_table_fn_wrapper,
table_exist: table_exist_fn_wrapper,
logical_codec,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data: Box::into_raw(private_data) as *mut c_void,
library_marker_id: crate::get_library_marker_id,
}
}
}
#[derive(Debug)]
pub struct ForeignSchemaProvider(pub FFI_SchemaProvider);
unsafe impl Send for ForeignSchemaProvider {}
unsafe impl Sync for ForeignSchemaProvider {}
impl From<&FFI_SchemaProvider> for Arc<dyn SchemaProvider + Send> {
fn from(provider: &FFI_SchemaProvider) -> Self {
if (provider.library_marker_id)() == crate::get_library_marker_id() {
return Arc::clone(unsafe { provider.inner() });
}
Arc::new(ForeignSchemaProvider(provider.clone()))
as Arc<dyn SchemaProvider + Send>
}
}
impl Clone for FFI_SchemaProvider {
fn clone(&self) -> Self {
unsafe { (self.clone)(self) }
}
}
#[async_trait]
impl SchemaProvider for ForeignSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn owner_name(&self) -> Option<&str> {
let name: Option<&RString> = self.0.owner_name.as_ref().into();
name.map(|s| s.as_str())
}
fn table_names(&self) -> Vec<String> {
unsafe {
(self.0.table_names)(&self.0)
.into_iter()
.map(|s| s.into())
.collect()
}
}
async fn table(
&self,
name: &str,
) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
unsafe {
let table: Option<FFI_TableProvider> =
df_result!((self.0.table)(&self.0, name.into()).await)?.into();
let table = table.as_ref().map(<Arc<dyn TableProvider>>::from);
Ok(table)
}
}
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> Result<Option<Arc<dyn TableProvider>>> {
unsafe {
let ffi_table = match table.as_any().downcast_ref::<ForeignTableProvider>() {
Some(t) => t.0.clone(),
None => FFI_TableProvider::new_with_ffi_codec(
table,
true,
None,
self.0.logical_codec.clone(),
),
};
let returned_provider: Option<FFI_TableProvider> =
df_result!((self.0.register_table)(&self.0, name.into(), ffi_table))?
.into();
Ok(returned_provider
.map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
}
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let returned_provider: Option<FFI_TableProvider> = unsafe {
df_result!((self.0.deregister_table)(&self.0, name.into()))?.into()
};
Ok(returned_provider
.map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn TableProvider>))
}
fn table_exist(&self, name: &str) -> bool {
unsafe { (self.0.table_exist)(&self.0, name.into()) }
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::Schema;
use datafusion::catalog::MemorySchemaProvider;
use datafusion::datasource::empty::EmptyTable;
use super::*;
fn empty_table() -> Arc<dyn TableProvider> {
Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
}
#[tokio::test]
async fn test_round_trip_ffi_schema_provider() {
let schema_provider = Arc::new(MemorySchemaProvider::new());
assert!(
schema_provider
.as_ref()
.register_table("prior_table".to_string(), empty_table())
.unwrap()
.is_none()
);
let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
let mut ffi_schema_provider =
FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None);
ffi_schema_provider.library_marker_id = crate::mock_foreign_marker_id;
let foreign_schema_provider: Arc<dyn SchemaProvider + Send> =
(&ffi_schema_provider).into();
let prior_table_names = foreign_schema_provider.table_names();
assert_eq!(prior_table_names.len(), 1);
assert_eq!(prior_table_names[0], "prior_table");
let returned_schema = foreign_schema_provider
.register_table("prior_table".to_string(), empty_table());
assert!(returned_schema.is_err());
assert_eq!(foreign_schema_provider.table_names().len(), 1);
let returned_schema = foreign_schema_provider
.register_table("second_table".to_string(), empty_table())
.expect("Unable to register table");
assert!(returned_schema.is_none());
assert_eq!(foreign_schema_provider.table_names().len(), 2);
let returned_schema = foreign_schema_provider
.deregister_table("prior_table")
.expect("Unable to deregister table");
assert!(returned_schema.is_some());
assert_eq!(foreign_schema_provider.table_names().len(), 1);
let returned_schema = foreign_schema_provider
.table("prior_table")
.await
.expect("Unable to query table");
assert!(returned_schema.is_none());
assert!(!foreign_schema_provider.table_exist("prior_table"));
let returned_schema = foreign_schema_provider
.table("second_table")
.await
.expect("Unable to query table");
assert!(returned_schema.is_some());
assert!(foreign_schema_provider.table_exist("second_table"));
}
#[test]
fn test_ffi_schema_provider_local_bypass() {
let schema_provider = Arc::new(MemorySchemaProvider::new());
let (_ctx, task_ctx_provider) = crate::util::tests::test_session_and_ctx();
let mut ffi_schema =
FFI_SchemaProvider::new(schema_provider, None, task_ctx_provider, None);
let foreign_schema: Arc<dyn SchemaProvider + Send> = (&ffi_schema).into();
assert!(
foreign_schema
.as_any()
.downcast_ref::<MemorySchemaProvider>()
.is_some()
);
ffi_schema.library_marker_id = crate::mock_foreign_marker_id;
let foreign_schema: Arc<dyn SchemaProvider + Send> = (&ffi_schema).into();
assert!(
foreign_schema
.as_any()
.downcast_ref::<ForeignSchemaProvider>()
.is_some()
);
}
}