Skip to main content

datafusion_ffi/execution/
task_ctx_provider.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::sync::{Arc, Weak};
20
21use datafusion_common::{DataFusionError, ffi_datafusion_err};
22use datafusion_execution::{TaskContext, TaskContextProvider};
23
24use crate::execution::task_ctx::FFI_TaskContext;
25use crate::util::FFI_Result;
26use crate::{df_result, sresult};
27
28/// Struct for accessing the [`TaskContext`]. This method contains a weak
29/// reference, so there are no guarantees that the [`TaskContext`] remains
30/// valid. This is used primarily for protobuf encoding and decoding of
31/// data passed across the FFI boundary. See the crate README for
32/// additional information.
33#[repr(C)]
34#[derive(Debug)]
35pub struct FFI_TaskContextProvider {
36    /// Retrieve the current [`TaskContext`] provided the provider has not
37    /// gone out of scope. This function will return an error if the weakly
38    /// held reference to the underlying [`TaskContextProvider`] is no longer
39    /// available.
40    pub task_ctx: unsafe extern "C" fn(&Self) -> FFI_Result<FFI_TaskContext>,
41
42    /// Used to create a clone on the task context accessor. This should
43    /// only need to be called by the receiver of the plan.
44    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
45
46    /// Release the memory of the private data when it is no longer being used.
47    pub release: unsafe extern "C" fn(arg: &mut Self),
48
49    /// Internal data. This is only to be accessed by the provider of the plan.
50    /// The foreign library should never attempt to access this data.
51    pub private_data: *mut c_void,
52
53    /// Utility to identify when FFI objects are accessed locally through
54    /// the foreign interface. See [`crate::get_library_marker_id`] and
55    /// the crate's `README.md` for more information.
56    pub library_marker_id: extern "C" fn() -> usize,
57}
58
59unsafe impl Send for FFI_TaskContextProvider {}
60unsafe impl Sync for FFI_TaskContextProvider {}
61
62struct TaskContextProviderPrivateData {
63    ctx: Weak<dyn TaskContextProvider>,
64}
65
66impl FFI_TaskContextProvider {
67    unsafe fn inner(&self) -> Option<Arc<TaskContext>> {
68        unsafe {
69            let private_data = self.private_data as *const TaskContextProviderPrivateData;
70            (*private_data).ctx.upgrade().map(|ctx| ctx.task_ctx())
71        }
72    }
73}
74
75unsafe extern "C" fn task_ctx_fn_wrapper(
76    ctx_provider: &FFI_TaskContextProvider,
77) -> FFI_Result<FFI_TaskContext> {
78    unsafe {
79        sresult!(
80            ctx_provider
81                .inner()
82                .map(FFI_TaskContext::from)
83                .ok_or_else(|| {
84                    ffi_datafusion_err!(
85                        "TaskContextProvider went out of scope over FFI boundary."
86                    )
87                })
88        )
89    }
90}
91
92unsafe extern "C" fn clone_fn_wrapper(
93    provider: &FFI_TaskContextProvider,
94) -> FFI_TaskContextProvider {
95    unsafe {
96        let private_data = provider.private_data as *const TaskContextProviderPrivateData;
97        let ctx = Weak::clone(&(*private_data).ctx);
98
99        let private_data = Box::new(TaskContextProviderPrivateData { ctx });
100
101        FFI_TaskContextProvider {
102            task_ctx: task_ctx_fn_wrapper,
103            release: release_fn_wrapper,
104            clone: clone_fn_wrapper,
105            private_data: Box::into_raw(private_data) as *mut c_void,
106            library_marker_id: crate::get_library_marker_id,
107        }
108    }
109}
110unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_TaskContextProvider) {
111    unsafe {
112        let private_data =
113            Box::from_raw(ctx.private_data as *mut TaskContextProviderPrivateData);
114        drop(private_data);
115    }
116}
117impl Drop for FFI_TaskContextProvider {
118    fn drop(&mut self) {
119        unsafe { (self.release)(self) }
120    }
121}
122
123impl Clone for FFI_TaskContextProvider {
124    fn clone(&self) -> Self {
125        unsafe { (self.clone)(self) }
126    }
127}
128
129impl From<&Arc<dyn TaskContextProvider>> for FFI_TaskContextProvider {
130    fn from(ctx: &Arc<dyn TaskContextProvider>) -> Self {
131        let ctx = Arc::downgrade(ctx);
132        let private_data = Box::new(TaskContextProviderPrivateData { ctx });
133
134        FFI_TaskContextProvider {
135            task_ctx: task_ctx_fn_wrapper,
136            clone: clone_fn_wrapper,
137            release: release_fn_wrapper,
138            private_data: Box::into_raw(private_data) as *mut c_void,
139            library_marker_id: crate::get_library_marker_id,
140        }
141    }
142}
143
144impl TryFrom<&FFI_TaskContextProvider> for Arc<TaskContext> {
145    type Error = DataFusionError;
146    fn try_from(ffi_ctx: &FFI_TaskContextProvider) -> Result<Self, Self::Error> {
147        unsafe {
148            if (ffi_ctx.library_marker_id)() == crate::get_library_marker_id() {
149                return ffi_ctx.inner().ok_or_else(|| {
150                    ffi_datafusion_err!(
151                        "TaskContextProvider went out of scope over FFI boundary."
152                    )
153                });
154            }
155
156            df_result!((ffi_ctx.task_ctx)(ffi_ctx)).map(Into::into)
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use std::sync::Arc;
164
165    use datafusion_common::{DataFusionError, Result};
166    use datafusion_execution::{TaskContext, TaskContextProvider};
167
168    use crate::execution::FFI_TaskContextProvider;
169
170    #[derive(Default)]
171    struct TestCtxProvider {
172        ctx: Arc<TaskContext>,
173    }
174
175    impl TaskContextProvider for TestCtxProvider {
176        fn task_ctx(&self) -> Arc<TaskContext> {
177            Arc::clone(&self.ctx)
178        }
179    }
180
181    #[test]
182    fn ffi_task_context_provider_round_trip() -> Result<()> {
183        let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
184        let mut ffi_ctx_provider: FFI_TaskContextProvider = (&Arc::clone(&ctx)).into();
185        ffi_ctx_provider.library_marker_id = crate::mock_foreign_marker_id;
186
187        let foreign_task_ctx: Arc<TaskContext> = (&ffi_ctx_provider).try_into()?;
188
189        assert_eq!(
190            format!("{foreign_task_ctx:?}"),
191            format!("{:?}", ctx.task_ctx())
192        );
193
194        Ok(())
195    }
196
197    #[test]
198    fn ffi_task_context_provider_clone() -> Result<()> {
199        let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
200        let first_provider: FFI_TaskContextProvider = (&ctx).into();
201
202        let second_provider = first_provider.clone();
203
204        let first_ctx: Arc<TaskContext> = (&first_provider).try_into()?;
205        let second_ctx: Arc<TaskContext> = (&second_provider).try_into()?;
206
207        assert!(Arc::ptr_eq(&first_ctx, &second_ctx));
208
209        Ok(())
210    }
211
212    #[test]
213    fn ffi_task_context_provider_out_of_scope() {
214        fn create_ffi_out_of_scope() -> FFI_TaskContextProvider {
215            let ctx =
216                Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
217            (&ctx).into()
218        }
219
220        let provider = create_ffi_out_of_scope();
221        let failed_ctx = <Arc<TaskContext>>::try_from(&provider);
222
223        let Err(DataFusionError::Ffi(_)) = failed_ctx else {
224            panic!("Expected out of scope error")
225        };
226    }
227}