datafusion_ffi/execution/
task_ctx_provider.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::sync::{Arc, Weak};
20
21use abi_stable::StableAbi;
22use datafusion_common::{DataFusionError, ffi_datafusion_err};
23use datafusion_execution::{TaskContext, TaskContextProvider};
24
25use crate::execution::task_ctx::FFI_TaskContext;
26use crate::util::FFIResult;
27use crate::{df_result, rresult};
28
29/// Struct for accessing the [`TaskContext`]. This method contains a weak
30/// reference, so there are no guarantees that the [`TaskContext`] remains
31/// valid. This is used primarily for protobuf encoding and decoding of
32/// data passed across the FFI boundary. See the crate README for
33/// additional information.
34#[repr(C)]
35#[derive(Debug, StableAbi)]
36pub struct FFI_TaskContextProvider {
37    /// Retrieve the current [`TaskContext`] provided the provider has not
38    /// gone out of scope. This function will return an error if the weakly
39    /// held reference to the underlying [`TaskContextProvider`] is no longer
40    /// available.
41    pub task_ctx: unsafe extern "C" fn(&Self) -> FFIResult<FFI_TaskContext>,
42
43    /// Used to create a clone on the task context accessor. This should
44    /// only need to be called by the receiver of the plan.
45    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
46
47    /// Release the memory of the private data when it is no longer being used.
48    pub release: unsafe extern "C" fn(arg: &mut Self),
49
50    /// Internal data. This is only to be accessed by the provider of the plan.
51    /// The foreign library should never attempt to access this data.
52    pub private_data: *mut c_void,
53
54    /// Utility to identify when FFI objects are accessed locally through
55    /// the foreign interface. See [`crate::get_library_marker_id`] and
56    /// the crate's `README.md` for more information.
57    pub library_marker_id: extern "C" fn() -> usize,
58}
59
60unsafe impl Send for FFI_TaskContextProvider {}
61unsafe impl Sync for FFI_TaskContextProvider {}
62
63struct TaskContextProviderPrivateData {
64    ctx: Weak<dyn TaskContextProvider>,
65}
66
67impl FFI_TaskContextProvider {
68    unsafe fn inner(&self) -> Option<Arc<TaskContext>> {
69        unsafe {
70            let private_data = self.private_data as *const TaskContextProviderPrivateData;
71            (*private_data).ctx.upgrade().map(|ctx| ctx.task_ctx())
72        }
73    }
74}
75
76unsafe extern "C" fn task_ctx_fn_wrapper(
77    ctx_provider: &FFI_TaskContextProvider,
78) -> FFIResult<FFI_TaskContext> {
79    unsafe {
80        rresult!(
81            ctx_provider
82                .inner()
83                .map(FFI_TaskContext::from)
84                .ok_or_else(|| {
85                    ffi_datafusion_err!(
86                        "TaskContextProvider went out of scope over FFI boundary."
87                    )
88                })
89        )
90    }
91}
92
93unsafe extern "C" fn clone_fn_wrapper(
94    provider: &FFI_TaskContextProvider,
95) -> FFI_TaskContextProvider {
96    unsafe {
97        let private_data = provider.private_data as *const TaskContextProviderPrivateData;
98        let ctx = Weak::clone(&(*private_data).ctx);
99
100        let private_data = Box::new(TaskContextProviderPrivateData { ctx });
101
102        FFI_TaskContextProvider {
103            task_ctx: task_ctx_fn_wrapper,
104            release: release_fn_wrapper,
105            clone: clone_fn_wrapper,
106            private_data: Box::into_raw(private_data) as *mut c_void,
107            library_marker_id: crate::get_library_marker_id,
108        }
109    }
110}
111unsafe extern "C" fn release_fn_wrapper(ctx: &mut FFI_TaskContextProvider) {
112    unsafe {
113        let private_data =
114            Box::from_raw(ctx.private_data as *mut TaskContextProviderPrivateData);
115        drop(private_data);
116    }
117}
118impl Drop for FFI_TaskContextProvider {
119    fn drop(&mut self) {
120        unsafe { (self.release)(self) }
121    }
122}
123
124impl Clone for FFI_TaskContextProvider {
125    fn clone(&self) -> Self {
126        unsafe { (self.clone)(self) }
127    }
128}
129
130impl From<&Arc<dyn TaskContextProvider>> for FFI_TaskContextProvider {
131    fn from(ctx: &Arc<dyn TaskContextProvider>) -> Self {
132        let ctx = Arc::downgrade(ctx);
133        let private_data = Box::new(TaskContextProviderPrivateData { ctx });
134
135        FFI_TaskContextProvider {
136            task_ctx: task_ctx_fn_wrapper,
137            clone: clone_fn_wrapper,
138            release: release_fn_wrapper,
139            private_data: Box::into_raw(private_data) as *mut c_void,
140            library_marker_id: crate::get_library_marker_id,
141        }
142    }
143}
144
145impl TryFrom<&FFI_TaskContextProvider> for Arc<TaskContext> {
146    type Error = DataFusionError;
147    fn try_from(ffi_ctx: &FFI_TaskContextProvider) -> Result<Self, Self::Error> {
148        unsafe {
149            if (ffi_ctx.library_marker_id)() == crate::get_library_marker_id() {
150                return ffi_ctx.inner().ok_or_else(|| {
151                    ffi_datafusion_err!(
152                        "TaskContextProvider went out of scope over FFI boundary."
153                    )
154                });
155            }
156
157            df_result!((ffi_ctx.task_ctx)(ffi_ctx)).map(Into::into)
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use std::sync::Arc;
165
166    use datafusion_common::{DataFusionError, Result};
167    use datafusion_execution::{TaskContext, TaskContextProvider};
168
169    use crate::execution::FFI_TaskContextProvider;
170
171    #[derive(Default)]
172    struct TestCtxProvider {
173        ctx: Arc<TaskContext>,
174    }
175
176    impl TaskContextProvider for TestCtxProvider {
177        fn task_ctx(&self) -> Arc<TaskContext> {
178            Arc::clone(&self.ctx)
179        }
180    }
181
182    #[test]
183    fn ffi_task_context_provider_round_trip() -> Result<()> {
184        let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
185        let mut ffi_ctx_provider: FFI_TaskContextProvider = (&Arc::clone(&ctx)).into();
186        ffi_ctx_provider.library_marker_id = crate::mock_foreign_marker_id;
187
188        let foreign_task_ctx: Arc<TaskContext> = (&ffi_ctx_provider).try_into()?;
189
190        assert_eq!(
191            format!("{foreign_task_ctx:?}"),
192            format!("{:?}", ctx.task_ctx())
193        );
194
195        Ok(())
196    }
197
198    #[test]
199    fn ffi_task_context_provider_clone() -> Result<()> {
200        let ctx = Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
201        let first_provider: FFI_TaskContextProvider = (&ctx).into();
202
203        let second_provider = first_provider.clone();
204
205        let first_ctx: Arc<TaskContext> = (&first_provider).try_into()?;
206        let second_ctx: Arc<TaskContext> = (&second_provider).try_into()?;
207
208        assert!(Arc::ptr_eq(&first_ctx, &second_ctx));
209
210        Ok(())
211    }
212
213    #[test]
214    fn ffi_task_context_provider_out_of_scope() {
215        fn create_ffi_out_of_scope() -> FFI_TaskContextProvider {
216            let ctx =
217                Arc::new(TestCtxProvider::default()) as Arc<dyn TaskContextProvider>;
218            (&ctx).into()
219        }
220
221        let provider = create_ffi_out_of_scope();
222        let failed_ctx = <Arc<TaskContext>>::try_from(&provider);
223
224        let Err(DataFusionError::Ffi(_)) = failed_ctx else {
225            panic!("Expected out of scope error")
226        };
227    }
228}