datafusion_ffi/execution/
task_ctx_provider.rs1use std::ffi::c_void;
19use std::sync::{Arc, Weak};
20
21use abi_stable::StableAbi;
22use datafusion_common::{DataFusionError, ffi_datafusion_err};
23use datafusion_execution::{TaskContext, TaskContextProvider};
24
25use crate::execution::task_ctx::FFI_TaskContext;
26use crate::util::FFIResult;
27use crate::{df_result, rresult};
28
29#[repr(C)]
35#[derive(Debug, StableAbi)]
36pub struct FFI_TaskContextProvider {
37 pub task_ctx: unsafe extern "C" fn(&Self) -> FFIResult<FFI_TaskContext>,
42
43 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
46
47 pub release: unsafe extern "C" fn(arg: &mut Self),
49
50 pub private_data: *mut c_void,
53
54 pub library_marker_id: extern "C" fn() -> usize,
58}
59
60unsafe impl Send for FFI_TaskContextProvider {}
61unsafe impl Sync for FFI_TaskContextProvider {}
62
63struct TaskContextProviderPrivateData {
64 ctx: Weak<dyn TaskContextProvider>,
65}
66
67impl FFI_TaskContextProvider {
68 unsafe fn inner(&self) -> Option<Arc<TaskContext>> {
69 unsafe {
70 let private_data = self.private_data as *const TaskContextProviderPrivateData;
71 (*private_data).ctx.upgrade().map(|ctx| ctx.task_ctx())
72 }
73 }
74}
75
76unsafe extern "C" fn task_ctx_fn_wrapper(
77 ctx_provider: &FFI_TaskContextProvider,
78) -> FFIResult<FFI_TaskContext> {
79 unsafe {
80 rresult!(
81 ctx_provider
82 .inner()
83 .map(FFI_TaskContext::from)
84 .ok_or_else(|| {
85 ffi_datafusion_err!(
86 "TaskContextProvider went out of scope over FFI boundary."
87 )
88 })
89 )
90 }
91}
92
93unsafe extern "C" fn clone_fn_wrapper(
94 provider: &FFI_TaskContextProvider,
95) -> FFI_TaskContextProvider {
96 unsafe {
97 let private_data = provider.private_data as *const TaskContextProviderPrivateData;
98 let ctx = Weak::clone(&(*private_data).ctx);
99
100 let private_data = Box::new(TaskContextProviderPrivateData { ctx });
101
102 FFI_TaskContextProvider {
103 task_ctx: task_ctx_fn_wrapper,
104 release: release_fn_wrapper,
105 clone: clone_fn_wrapper,
106 private_data: Box::into_raw(private_data) as *mut c_void,
107 library_marker_id: crate::get_library_marker_id,
108 }
109 }
110}
111unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_TaskContextProvider) {
112 unsafe {
113 let private_data =
114 Box::from_raw(ctx.private_data as *mut TaskContextProviderPrivateData);
115 drop(private_data);
116 }
117}
118impl Drop for FFI_TaskContextProvider {
119 fn drop(&mut self) {
120 unsafe { (self.release)(self) }
121 }
122}
123
124impl Clone for FFI_TaskContextProvider {
125 fn clone(&self) -> Self {
126 unsafe { (self.clone)(self) }
127 }
128}
129
130impl From<&Arc<dyn TaskContextProvider>> for FFI_TaskContextProvider {
131 fn from(ctx: &Arc<dyn TaskContextProvider>) -> Self {
132 let ctx = Arc::downgrade(ctx);
133 let private_data = Box::new(TaskContextProviderPrivateData { ctx });
134
135 FFI_TaskContextProvider {
136 task_ctx: task_ctx_fn_wrapper,
137 clone: clone_fn_wrapper,
138 release: release_fn_wrapper,
139 private_data: Box::into_raw(private_data) as *mut c_void,
140 library_marker_id: crate::get_library_marker_id,
141 }
142 }
143}
144
145impl TryFrom<&FFI_TaskContextProvider> for Arc<TaskContext> {
146 type Error = DataFusionError;
147 fn try_from(ffi_ctx: &FFI_TaskContextProvider) -> Result<Self, Self::Error> {
148 unsafe {
149 if (ffi_ctx.library_marker_id)() == crate::get_library_marker_id() {
150 return ffi_ctx.inner().ok_or_else(|| {
151 ffi_datafusion_err!(
152 "TaskContextProvider went out of scope over FFI boundary."
153 )
154 });
155 }
156
157 df_result!((ffi_ctx.task_ctx)(ffi_ctx)).map(Into::into)
158 }
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use std::sync::Arc;
165
166 use datafusion_common::{DataFusionError, Result};
167 use datafusion_execution::{TaskContext, TaskContextProvider};
168
169 use crate::execution::FFI_TaskContextProvider;
170
171 #[derive(Default)]
172 struct TestCtxProvider {
173 ctx: Arc<TaskContext>,
174 }
175
176 impl TaskContextProvider for TestCtxProvider {
177 fn task_ctx(&self) -> Arc<TaskContext> {
178 Arc::clone(&self.ctx)
179 }
180 }
181
182 #[test]
183 fn ffi_task_context_provider_round_trip() -> Result<()> {
184 let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
185 let mut ffi_ctx_provider: FFI_TaskContextProvider = (&Arc::clone(&ctx)).into();
186 ffi_ctx_provider.library_marker_id = crate::mock_foreign_marker_id;
187
188 let foreign_task_ctx: Arc<TaskContext> = (&ffi_ctx_provider).try_into()?;
189
190 assert_eq!(
191 format!("{foreign_task_ctx:?}"),
192 format!("{:?}", ctx.task_ctx())
193 );
194
195 Ok(())
196 }
197
198 #[test]
199 fn ffi_task_context_provider_clone() -> Result<()> {
200 let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
201 let first_provider: FFI_TaskContextProvider = (&ctx).into();
202
203 let second_provider = first_provider.clone();
204
205 let first_ctx: Arc<TaskContext> = (&first_provider).try_into()?;
206 let second_ctx: Arc<TaskContext> = (&second_provider).try_into()?;
207
208 assert!(Arc::ptr_eq(&first_ctx, &second_ctx));
209
210 Ok(())
211 }
212
213 #[test]
214 fn ffi_task_context_provider_out_of_scope() {
215 fn create_ffi_out_of_scope() -> FFI_TaskContextProvider {
216 let ctx =
217 Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
218 (&ctx).into()
219 }
220
221 let provider = create_ffi_out_of_scope();
222 let failed_ctx = <Arc<TaskContext>>::try_from(&provider);
223
224 let Err(DataFusionError::Ffi(_)) = failed_ctx else {
225 panic!("Expected out of scope error")
226 };
227 }
228}