gio/
task.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{boxed::Box as Box_, future::Future, mem::transmute, panic, ptr};
4
5use glib::{
6    prelude::*,
7    signal::{connect_raw, SignalHandlerId},
8    translate::*,
9};
10
11use futures_channel::oneshot;
12
13use crate::{ffi, AsyncResult, Cancellable};
14
15glib::wrapper! {
16    // rustdoc-stripper-ignore-next
17    /// `LocalTask` provides idiomatic access to gio's `GTask` API, for
18    /// instance by being generic over their value type, while not completely departing
19    /// from the underlying C API. `LocalTask` does not require its value to be `Send`
20    /// and `Sync` and thus is useful to to implement gio style asynchronous
21    /// tasks that run in the glib main loop. If you need to run tasks in threads
22    /// see the `Task` type.
23    ///
24    /// The constructors of `LocalTask` and `Task` is marked as unsafe because this API does
25    /// not allow to automatically enforce all the invariants required to be a completely
26    /// safe abstraction. See the `Task` type for more details.
27    #[doc(alias = "GTask")]
28    pub struct LocalTask<V: ValueType>(Object<ffi::GTask, ffi::GTaskClass>) @implements AsyncResult;
29
30    match fn {
31        type_ => || ffi::g_task_get_type(),
32    }
33}
34
35glib::wrapper! {
36    // rustdoc-stripper-ignore-next
37    /// `Task` provides idiomatic access to gio's `GTask` API, for
38    /// instance by being generic over their value type, while not completely departing
39    /// from the underlying C API. `Task` is `Send` and `Sync` and requires its value to
40    /// also be `Send` and `Sync`, thus is useful to to implement gio style asynchronous
41    /// tasks that run in threads. If you need to only run tasks in glib main loop
42    /// see the `LocalTask` type.
43    ///
44    /// The constructors of `LocalTask` and `Task` is marked as unsafe because this API does
45    /// not allow to automatically enforce all the invariants required to be a completely
46    /// safe abstraction. The caller is responsible to ensure the following requirements
47    /// are satisfied
48    ///
49    /// * You should not create a `LocalTask`, upcast it to a `glib::Object` and then
50    ///   downcast it to a `Task`, as this will bypass the thread safety requirements
51    /// * You should ensure that the `return_result`, `return_error_if_cancelled` and
52    ///   `propagate()` methods are only called once.
53    #[doc(alias = "GTask")]
54    pub struct Task<V: ValueType + Send>(Object<ffi::GTask, ffi::GTaskClass>) @implements AsyncResult;
55
56    match fn {
57        type_ => || ffi::g_task_get_type(),
58    }
59}
60
61macro_rules! task_impl {
62    ($name:ident $(, @bound: $bound:tt)? $(, @safety: $safety:tt)?) => {
63        impl <V: Into<glib::Value> + ValueType $(+ $bound)?> $name<V> {
64            #[doc(alias = "g_task_new")]
65            #[allow(unused_unsafe)]
66            pub unsafe fn new<S, P, Q>(
67                source_object: Option<&S>,
68                cancellable: Option<&P>,
69                callback: Q,
70            ) -> Self
71            where
72                S: IsA<glib::Object> $(+ $bound)?,
73                P: IsA<Cancellable>,
74                Q: FnOnce($name<V>, Option<&S>) $(+ $bound)? + 'static,
75            {
76                let callback_data = Box_::new(callback);
77                unsafe extern "C" fn trampoline<
78                    S: IsA<glib::Object> $(+ $bound)?,
79                    V: ValueType $(+ $bound)?,
80                    Q: FnOnce($name<V>, Option<&S>) $(+ $bound)? + 'static,
81                >(
82                    source_object: *mut glib::gobject_ffi::GObject,
83                    res: *mut ffi::GAsyncResult,
84                    user_data: glib::ffi::gpointer,
85                ) {
86                    let callback: Box_<Q> = Box::from_raw(user_data as *mut _);
87                    let task = AsyncResult::from_glib_none(res)
88                        .downcast::<$name<V>>()
89                        .unwrap();
90                    let source_object = Option::<glib::Object>::from_glib_borrow(source_object);
91                    callback(
92                        task,
93                        source_object.as_ref().as_ref().map(|s| s.unsafe_cast_ref()),
94                    );
95                }
96                let callback = trampoline::<S, V, Q>;
97                unsafe {
98                    from_glib_full(ffi::g_task_new(
99                        source_object.map(|p| p.as_ref()).to_glib_none().0,
100                        cancellable.map(|p| p.as_ref()).to_glib_none().0,
101                        Some(callback),
102                        Box_::into_raw(callback_data) as *mut _,
103                    ))
104                }
105            }
106
107            #[doc(alias = "g_task_get_cancellable")]
108            #[doc(alias = "get_cancellable")]
109            pub fn cancellable(&self) -> Option<Cancellable> {
110                unsafe { from_glib_none(ffi::g_task_get_cancellable(self.to_glib_none().0)) }
111            }
112
113            #[doc(alias = "g_task_get_check_cancellable")]
114            #[doc(alias = "get_check_cancellable")]
115            pub fn is_check_cancellable(&self) -> bool {
116                unsafe { from_glib(ffi::g_task_get_check_cancellable(self.to_glib_none().0)) }
117            }
118
119            #[doc(alias = "g_task_set_check_cancellable")]
120            pub fn set_check_cancellable(&self, check_cancellable: bool) {
121                unsafe {
122                    ffi::g_task_set_check_cancellable(self.to_glib_none().0, check_cancellable.into_glib());
123                }
124            }
125
126            #[cfg(feature = "v2_60")]
127            #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
128            #[doc(alias = "g_task_set_name")]
129            pub fn set_name(&self, name: Option<&str>) {
130                unsafe {
131                    ffi::g_task_set_name(self.to_glib_none().0, name.to_glib_none().0);
132                }
133            }
134
135            #[doc(alias = "g_task_set_return_on_cancel")]
136            pub fn set_return_on_cancel(&self, return_on_cancel: bool) -> bool {
137                unsafe {
138                    from_glib(ffi::g_task_set_return_on_cancel(
139                        self.to_glib_none().0,
140                        return_on_cancel.into_glib(),
141                    ))
142                }
143            }
144
145            #[doc(alias = "g_task_is_valid")]
146            pub fn is_valid(
147                result: &impl IsA<AsyncResult>,
148                source_object: Option<&impl IsA<glib::Object>>,
149            ) -> bool {
150                unsafe {
151                    from_glib(ffi::g_task_is_valid(
152                        result.as_ref().to_glib_none().0,
153                        source_object.map(|p| p.as_ref()).to_glib_none().0,
154                    ))
155                }
156            }
157
158            #[doc(alias = "get_priority")]
159            #[doc(alias = "g_task_get_priority")]
160            pub fn priority(&self) -> glib::source::Priority {
161                unsafe { FromGlib::from_glib(ffi::g_task_get_priority(self.to_glib_none().0)) }
162            }
163
164            #[doc(alias = "g_task_set_priority")]
165            pub fn set_priority(&self, priority: glib::source::Priority) {
166                unsafe {
167                    ffi::g_task_set_priority(self.to_glib_none().0, priority.into_glib());
168                }
169            }
170
171            #[doc(alias = "g_task_get_completed")]
172            #[doc(alias = "get_completed")]
173            pub fn is_completed(&self) -> bool {
174                unsafe { from_glib(ffi::g_task_get_completed(self.to_glib_none().0)) }
175            }
176
177            #[doc(alias = "g_task_get_context")]
178            #[doc(alias = "get_context")]
179            pub fn context(&self) -> glib::MainContext {
180                unsafe { from_glib_none(ffi::g_task_get_context(self.to_glib_none().0)) }
181            }
182
183            #[cfg(feature = "v2_60")]
184            #[cfg_attr(docsrs, doc(cfg(feature = "v2_60")))]
185            #[doc(alias = "g_task_get_name")]
186            #[doc(alias = "get_name")]
187            pub fn name(&self) -> Option<glib::GString> {
188                unsafe { from_glib_none(ffi::g_task_get_name(self.to_glib_none().0)) }
189            }
190
191            #[doc(alias = "g_task_get_return_on_cancel")]
192            #[doc(alias = "get_return_on_cancel")]
193            pub fn is_return_on_cancel(&self) -> bool {
194                unsafe { from_glib(ffi::g_task_get_return_on_cancel(self.to_glib_none().0)) }
195            }
196
197            #[doc(alias = "g_task_had_error")]
198            pub fn had_error(&self) -> bool {
199                unsafe { from_glib(ffi::g_task_had_error(self.to_glib_none().0)) }
200            }
201
202            #[doc(alias = "completed")]
203            pub fn connect_completed_notify<F>(&self, f: F) -> SignalHandlerId
204            where
205                F: Fn(&$name<V>) $(+ $bound)? + 'static,
206            {
207                unsafe extern "C" fn notify_completed_trampoline<V, F>(
208                    this: *mut ffi::GTask,
209                    _param_spec: glib::ffi::gpointer,
210                    f: glib::ffi::gpointer,
211                ) where
212                    V: ValueType $(+ $bound)?,
213                    F: Fn(&$name<V>) + 'static,
214                {
215                    let f: &F = &*(f as *const F);
216                    f(&from_glib_borrow(this))
217                }
218                unsafe {
219                    let f: Box_<F> = Box_::new(f);
220                    connect_raw(
221                        self.as_ptr() as *mut _,
222                        b"notify::completed\0".as_ptr() as *const _,
223                        Some(transmute::<*const (), unsafe extern "C" fn()>(
224                            notify_completed_trampoline::<V, F> as *const (),
225                        )),
226                        Box_::into_raw(f),
227                    )
228                }
229            }
230
231            // the following functions are marked unsafe since they cannot be called
232            // more than once, but we have no way to enforce that since the task can be cloned
233
234            #[doc(alias = "g_task_return_error_if_cancelled")]
235            #[allow(unused_unsafe)]
236            pub $($safety)? fn return_error_if_cancelled(&self) -> bool {
237                unsafe { from_glib(ffi::g_task_return_error_if_cancelled(self.to_glib_none().0)) }
238            }
239
240            #[doc(alias = "g_task_return_value")]
241            #[doc(alias = "g_task_return_boolean")]
242            #[doc(alias = "g_task_return_int")]
243            #[doc(alias = "g_task_return_pointer")]
244            #[doc(alias = "g_task_return_error")]
245            #[allow(unused_unsafe)]
246            pub $($safety)? fn return_result(self, result: Result<V, glib::Error>) {
247                #[cfg(not(feature = "v2_64"))]
248                unsafe extern "C" fn value_free(value: *mut libc::c_void) {
249                    let _: glib::Value = from_glib_full(value as *mut glib::gobject_ffi::GValue);
250                }
251
252                match result {
253                    #[cfg(feature = "v2_64")]
254                    Ok(v) => unsafe {
255                        ffi::g_task_return_value(
256                            self.to_glib_none().0,
257                            v.to_value().to_glib_none().0 as *mut _,
258                        )
259                    },
260                    #[cfg(not(feature = "v2_64"))]
261                    Ok(v) => unsafe {
262                        let v: glib::Value = v.into();
263                        ffi::g_task_return_pointer(
264                            self.to_glib_none().0,
265                            <glib::Value as glib::translate::IntoGlibPtr::<*mut glib::gobject_ffi::GValue>>::into_glib_ptr(v) as glib::ffi::gpointer,
266                            Some(value_free),
267                        )
268                    },
269                    Err(e) => unsafe {
270                        ffi::g_task_return_error(self.to_glib_none().0, e.into_glib_ptr());
271                    },
272                }
273            }
274
275            #[doc(alias = "g_task_propagate_value")]
276            #[doc(alias = "g_task_propagate_boolean")]
277            #[doc(alias = "g_task_propagate_int")]
278            #[doc(alias = "g_task_propagate_pointer")]
279            #[allow(unused_unsafe)]
280            pub $($safety)? fn propagate(self) -> Result<V, glib::Error> {
281                let mut error = ptr::null_mut();
282
283                unsafe {
284                    #[cfg(feature = "v2_64")]
285                    {
286                        let mut value = glib::Value::uninitialized();
287                        ffi::g_task_propagate_value(
288                            self.to_glib_none().0,
289                            value.to_glib_none_mut().0,
290                            &mut error,
291                        );
292
293                        if error.is_null() {
294                            Ok(V::from_value(&value))
295                        } else {
296                            Err(from_glib_full(error))
297                        }
298                    }
299
300                    #[cfg(not(feature = "v2_64"))]
301                    {
302                        let value = ffi::g_task_propagate_pointer(self.to_glib_none().0, &mut error);
303
304                        if error.is_null() {
305                            let value = Option::<glib::Value>::from_glib_full(
306                                value as *mut glib::gobject_ffi::GValue,
307                            )
308                            .expect("Task::propagate() called before Task::return_result()");
309                            Ok(V::from_value(&value))
310                        } else {
311                            Err(from_glib_full(error))
312                        }
313                    }
314                }
315            }
316        }
317    }
318}
319
320task_impl!(LocalTask);
321task_impl!(Task, @bound: Send, @safety: unsafe);
322
323impl<V: ValueType + Send> Task<V> {
324    #[doc(alias = "g_task_run_in_thread")]
325    pub fn run_in_thread<S, Q>(&self, task_func: Q)
326    where
327        S: IsA<glib::Object> + Send,
328        Q: FnOnce(Self, Option<&S>, Option<&Cancellable>) + Send + 'static,
329    {
330        let task_func_data = Box_::new(task_func);
331
332        // We store the func pointer into the task data.
333        // We intentionally do not expose a way to set the task data in the bindings.
334        // If we detect that the task data is set, there is not much we can do, so we panic.
335        unsafe {
336            assert!(
337                ffi::g_task_get_task_data(self.to_glib_none().0).is_null(),
338                "Task data was manually set or the task was run thread multiple times"
339            );
340
341            ffi::g_task_set_task_data(
342                self.to_glib_none().0,
343                Box_::into_raw(task_func_data) as *mut _,
344                None,
345            );
346        }
347
348        unsafe extern "C" fn trampoline<V, S, Q>(
349            task: *mut ffi::GTask,
350            source_object: *mut glib::gobject_ffi::GObject,
351            user_data: glib::ffi::gpointer,
352            cancellable: *mut ffi::GCancellable,
353        ) where
354            V: ValueType + Send,
355            S: IsA<glib::Object> + Send,
356            Q: FnOnce(Task<V>, Option<&S>, Option<&Cancellable>) + Send + 'static,
357        {
358            let task = Task::from_glib_none(task);
359            let source_object = Option::<glib::Object>::from_glib_borrow(source_object);
360            let cancellable = Option::<Cancellable>::from_glib_borrow(cancellable);
361            let task_func: Box_<Q> = Box::from_raw(user_data as *mut _);
362            task_func(
363                task,
364                source_object.as_ref().as_ref().map(|s| s.unsafe_cast_ref()),
365                cancellable.as_ref().as_ref(),
366            );
367        }
368
369        let task_func = trampoline::<V, S, Q>;
370        unsafe {
371            ffi::g_task_run_in_thread(self.to_glib_none().0, Some(task_func));
372        }
373    }
374}
375
376unsafe impl<V: ValueType + Send> Send for Task<V> {}
377unsafe impl<V: ValueType + Send> Sync for Task<V> {}
378
379// rustdoc-stripper-ignore-next
380/// A handle to a task running on the I/O thread pool.
381///
382/// Like [`std::thread::JoinHandle`] for a blocking I/O task rather than a thread. The return value
383/// from the task can be retrieved by awaiting on this handle. Dropping the handle "detaches" the
384/// task, allowing it to complete but discarding the return value.
385#[derive(Debug)]
386pub struct JoinHandle<T> {
387    rx: oneshot::Receiver<std::thread::Result<T>>,
388}
389
390impl<T> JoinHandle<T> {
391    #[inline]
392    fn new() -> (Self, oneshot::Sender<std::thread::Result<T>>) {
393        let (tx, rx) = oneshot::channel();
394        (Self { rx }, tx)
395    }
396}
397
398impl<T> Future for JoinHandle<T> {
399    type Output = std::thread::Result<T>;
400    #[inline]
401    fn poll(
402        mut self: std::pin::Pin<&mut Self>,
403        cx: &mut std::task::Context<'_>,
404    ) -> std::task::Poll<Self::Output> {
405        std::pin::Pin::new(&mut self.rx)
406            .poll(cx)
407            .map(|r| r.unwrap())
408    }
409}
410
411impl<T> futures_core::FusedFuture for JoinHandle<T> {
412    #[inline]
413    fn is_terminated(&self) -> bool {
414        self.rx.is_terminated()
415    }
416}
417
418// rustdoc-stripper-ignore-next
419/// Runs a blocking I/O task on the I/O thread pool.
420///
421/// Calls `func` on the internal Gio thread pool for blocking I/O operations. The thread pool is
422/// shared with other Gio async I/O operations, and may rate-limit the tasks it receives. Callers
423/// may want to avoid blocking indefinitely by making sure blocking calls eventually time out.
424///
425/// This function should not be used to spawn async tasks. Instead, use
426/// [`glib::MainContext::spawn`] or [`glib::MainContext::spawn_local`] to run a future.
427pub fn spawn_blocking<T, F>(func: F) -> JoinHandle<T>
428where
429    T: Send + 'static,
430    F: FnOnce() -> T + Send + 'static,
431{
432    // use Cancellable::NONE as source obj to fulfill `Send` requirement
433    let task = unsafe { Task::<bool>::new(Cancellable::NONE, Cancellable::NONE, |_, _| {}) };
434    let (join, tx) = JoinHandle::new();
435    task.run_in_thread(move |task, _: Option<&Cancellable>, _| {
436        let res = panic::catch_unwind(panic::AssertUnwindSafe(func));
437        let _ = tx.send(res);
438        unsafe { ffi::g_task_return_pointer(task.to_glib_none().0, ptr::null_mut(), None) }
439    });
440
441    join
442}
443
444#[cfg(test)]
445mod test {
446    use super::*;
447    use crate::{prelude::*, test_util::run_async_local};
448
449    #[test]
450    fn test_int_async_result() {
451        let fut = run_async_local(|tx, l| {
452            let cancellable = crate::Cancellable::new();
453            let task = unsafe {
454                crate::LocalTask::new(
455                    None,
456                    Some(&cancellable),
457                    move |t: LocalTask<i32>, _b: Option<&glib::Object>| {
458                        tx.send(t.propagate()).unwrap();
459                        l.quit();
460                    },
461                )
462            };
463            task.return_result(Ok(100_i32));
464        });
465
466        match fut {
467            Err(_) => panic!(),
468            Ok(i) => assert_eq!(i, 100),
469        }
470    }
471
472    #[test]
473    fn test_object_async_result() {
474        use glib::subclass::prelude::*;
475        pub struct MySimpleObjectPrivate {
476            pub size: std::cell::RefCell<Option<i64>>,
477        }
478
479        #[glib::object_subclass]
480        impl ObjectSubclass for MySimpleObjectPrivate {
481            const NAME: &'static str = "MySimpleObjectPrivate";
482            type Type = MySimpleObject;
483
484            fn new() -> Self {
485                Self {
486                    size: std::cell::RefCell::new(Some(100)),
487                }
488            }
489        }
490
491        impl ObjectImpl for MySimpleObjectPrivate {}
492
493        glib::wrapper! {
494            pub struct MySimpleObject(ObjectSubclass<MySimpleObjectPrivate>);
495        }
496
497        impl MySimpleObject {
498            pub fn new() -> Self {
499                glib::Object::new()
500            }
501
502            #[doc(alias = "get_size")]
503            pub fn size(&self) -> Option<i64> {
504                *self.imp().size.borrow()
505            }
506
507            pub fn set_size(&self, size: i64) {
508                self.imp().size.borrow_mut().replace(size);
509            }
510        }
511
512        impl Default for MySimpleObject {
513            fn default() -> Self {
514                Self::new()
515            }
516        }
517
518        let fut = run_async_local(|tx, l| {
519            let cancellable = crate::Cancellable::new();
520            let task = unsafe {
521                crate::LocalTask::new(
522                    None,
523                    Some(&cancellable),
524                    move |t: LocalTask<glib::Object>, _b: Option<&glib::Object>| {
525                        tx.send(t.propagate()).unwrap();
526                        l.quit();
527                    },
528                )
529            };
530            let my_object = MySimpleObject::new();
531            my_object.set_size(100);
532            task.return_result(Ok(my_object.upcast::<glib::Object>()));
533        });
534
535        match fut {
536            Err(_) => panic!(),
537            Ok(o) => {
538                let o = o.downcast::<MySimpleObject>().unwrap();
539                assert_eq!(o.size(), Some(100));
540            }
541        }
542    }
543
544    #[test]
545    fn test_error() {
546        let fut = run_async_local(|tx, l| {
547            let cancellable = crate::Cancellable::new();
548            let task = unsafe {
549                crate::LocalTask::new(
550                    None,
551                    Some(&cancellable),
552                    move |t: LocalTask<i32>, _b: Option<&glib::Object>| {
553                        tx.send(t.propagate()).unwrap();
554                        l.quit();
555                    },
556                )
557            };
558            task.return_result(Err(glib::Error::new(
559                crate::IOErrorEnum::WouldBlock,
560                "WouldBlock",
561            )));
562        });
563
564        match fut {
565            Err(e) => match e.kind().unwrap() {
566                crate::IOErrorEnum::WouldBlock => {}
567                _ => panic!(),
568            },
569            Ok(_) => panic!(),
570        }
571    }
572
573    #[test]
574    fn test_cancelled() {
575        let fut = run_async_local(|tx, l| {
576            let cancellable = crate::Cancellable::new();
577            let task = unsafe {
578                crate::LocalTask::new(
579                    None,
580                    Some(&cancellable),
581                    move |t: LocalTask<i32>, _b: Option<&glib::Object>| {
582                        tx.send(t.propagate()).unwrap();
583                        l.quit();
584                    },
585                )
586            };
587            cancellable.cancel();
588            task.return_error_if_cancelled();
589        });
590
591        match fut {
592            Err(e) => match e.kind().unwrap() {
593                crate::IOErrorEnum::Cancelled => {}
594                _ => panic!(),
595            },
596            Ok(_) => panic!(),
597        }
598    }
599}