datafusion_ffi/execution/
task_ctx_provider.rs1use std::ffi::c_void;
19use std::sync::{Arc, Weak};
20
21use datafusion_common::{DataFusionError, ffi_datafusion_err};
22use datafusion_execution::{TaskContext, TaskContextProvider};
23
24use crate::execution::task_ctx::FFI_TaskContext;
25use crate::util::FFI_Result;
26use crate::{df_result, sresult};
27
28#[repr(C)]
34#[derive(Debug)]
35pub struct FFI_TaskContextProvider {
36 pub task_ctx: unsafe extern "C" fn(&Self) -> FFI_Result<FFI_TaskContext>,
41
42 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
45
46 pub release: unsafe extern "C" fn(arg: &mut Self),
48
49 pub private_data: *mut c_void,
52
53 pub library_marker_id: extern "C" fn() -> usize,
57}
58
59unsafe impl Send for FFI_TaskContextProvider {}
60unsafe impl Sync for FFI_TaskContextProvider {}
61
62struct TaskContextProviderPrivateData {
63 ctx: Weak<dyn TaskContextProvider>,
64}
65
66impl FFI_TaskContextProvider {
67 unsafe fn inner(&self) -> Option<Arc<TaskContext>> {
68 unsafe {
69 let private_data = self.private_data as *const TaskContextProviderPrivateData;
70 (*private_data).ctx.upgrade().map(|ctx| ctx.task_ctx())
71 }
72 }
73}
74
75unsafe extern "C" fn task_ctx_fn_wrapper(
76 ctx_provider: &FFI_TaskContextProvider,
77) -> FFI_Result<FFI_TaskContext> {
78 unsafe {
79 sresult!(
80 ctx_provider
81 .inner()
82 .map(FFI_TaskContext::from)
83 .ok_or_else(|| {
84 ffi_datafusion_err!(
85 "TaskContextProvider went out of scope over FFI boundary."
86 )
87 })
88 )
89 }
90}
91
92unsafe extern "C" fn clone_fn_wrapper(
93 provider: &FFI_TaskContextProvider,
94) -> FFI_TaskContextProvider {
95 unsafe {
96 let private_data = provider.private_data as *const TaskContextProviderPrivateData;
97 let ctx = Weak::clone(&(*private_data).ctx);
98
99 let private_data = Box::new(TaskContextProviderPrivateData { ctx });
100
101 FFI_TaskContextProvider {
102 task_ctx: task_ctx_fn_wrapper,
103 release: release_fn_wrapper,
104 clone: clone_fn_wrapper,
105 private_data: Box::into_raw(private_data) as *mut c_void,
106 library_marker_id: crate::get_library_marker_id,
107 }
108 }
109}
110unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_TaskContextProvider) {
111 unsafe {
112 let private_data =
113 Box::from_raw(ctx.private_data as *mut TaskContextProviderPrivateData);
114 drop(private_data);
115 }
116}
117impl Drop for FFI_TaskContextProvider {
118 fn drop(&mut self) {
119 unsafe { (self.release)(self) }
120 }
121}
122
123impl Clone for FFI_TaskContextProvider {
124 fn clone(&self) -> Self {
125 unsafe { (self.clone)(self) }
126 }
127}
128
129impl From<&Arc<dyn TaskContextProvider>> for FFI_TaskContextProvider {
130 fn from(ctx: &Arc<dyn TaskContextProvider>) -> Self {
131 let ctx = Arc::downgrade(ctx);
132 let private_data = Box::new(TaskContextProviderPrivateData { ctx });
133
134 FFI_TaskContextProvider {
135 task_ctx: task_ctx_fn_wrapper,
136 clone: clone_fn_wrapper,
137 release: release_fn_wrapper,
138 private_data: Box::into_raw(private_data) as *mut c_void,
139 library_marker_id: crate::get_library_marker_id,
140 }
141 }
142}
143
144impl TryFrom<&FFI_TaskContextProvider> for Arc<TaskContext> {
145 type Error = DataFusionError;
146 fn try_from(ffi_ctx: &FFI_TaskContextProvider) -> Result<Self, Self::Error> {
147 unsafe {
148 if (ffi_ctx.library_marker_id)() == crate::get_library_marker_id() {
149 return ffi_ctx.inner().ok_or_else(|| {
150 ffi_datafusion_err!(
151 "TaskContextProvider went out of scope over FFI boundary."
152 )
153 });
154 }
155
156 df_result!((ffi_ctx.task_ctx)(ffi_ctx)).map(Into::into)
157 }
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use std::sync::Arc;
164
165 use datafusion_common::{DataFusionError, Result};
166 use datafusion_execution::{TaskContext, TaskContextProvider};
167
168 use crate::execution::FFI_TaskContextProvider;
169
170 #[derive(Default)]
171 struct TestCtxProvider {
172 ctx: Arc<TaskContext>,
173 }
174
175 impl TaskContextProvider for TestCtxProvider {
176 fn task_ctx(&self) -> Arc<TaskContext> {
177 Arc::clone(&self.ctx)
178 }
179 }
180
181 #[test]
182 fn ffi_task_context_provider_round_trip() -> Result<()> {
183 let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
184 let mut ffi_ctx_provider: FFI_TaskContextProvider = (&Arc::clone(&ctx)).into();
185 ffi_ctx_provider.library_marker_id = crate::mock_foreign_marker_id;
186
187 let foreign_task_ctx: Arc<TaskContext> = (&ffi_ctx_provider).try_into()?;
188
189 assert_eq!(
190 format!("{foreign_task_ctx:?}"),
191 format!("{:?}", ctx.task_ctx())
192 );
193
194 Ok(())
195 }
196
197 #[test]
198 fn ffi_task_context_provider_clone() -> Result<()> {
199 let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
200 let first_provider: FFI_TaskContextProvider = (&ctx).into();
201
202 let second_provider = first_provider.clone();
203
204 let first_ctx: Arc<TaskContext> = (&first_provider).try_into()?;
205 let second_ctx: Arc<TaskContext> = (&second_provider).try_into()?;
206
207 assert!(Arc::ptr_eq(&first_ctx, &second_ctx));
208
209 Ok(())
210 }
211
212 #[test]
213 fn ffi_task_context_provider_out_of_scope() {
214 fn create_ffi_out_of_scope() -> FFI_TaskContextProvider {
215 let ctx =
216 Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
217 (&ctx).into()
218 }
219
220 let provider = create_ffi_out_of_scope();
221 let failed_ctx = <Arc<TaskContext>>::try_from(&provider);
222
223 let Err(DataFusionError::Ffi(_)) = failed_ctx else {
224 panic!("Expected out of scope error")
225 };
226 }
227}