1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
use std::sync::mpsc::Sender;
use once_cell::sync::Lazy;
use crate::error::Error;
use crate::runtime::execution::RUNTIME;
use crate::runtime::work::Work;
thread_local! {
/// Thread-local runtime delegate.
///
/// This object serves as the per-thread reference to the [`RUNTIME`] that can be used to
/// enqueue work on the runtime thread.
///
/// # Usage
///
/// ```ignore
/// assert!(
/// RUNTIME_THREAD_LOCAL.with(|runtime|
/// runtime.enqueue(Work::new(|| ()))
/// ).is_ok()
/// )
/// ```
pub(super) static RUNTIME_THREAD_LOCAL: Lazy<RuntimeThreadLocal> = Lazy::new(|| {
RUNTIME.lock().unwrap().thread_local()
});
}
/// Per-thread delegate for global runtime.
pub struct RuntimeThreadLocal(Sender<Work>);
impl RuntimeThreadLocal {
/// Initialize [`RuntimeThreadLocal`] from [`Sender`] that allows the delegate to send work to
/// the actual [`crate::runtime::execution::Runtime`].
///
/// # Arguments
///
/// * `sender` - Sender through which work can be sent to runtime.
pub(super) fn from_sender(sender: Sender<Work>) -> Self {
RuntimeThreadLocal(sender)
}
/// Enqueue work on runtime.
///
/// # Arguments
///
/// * `function` - Unit of work in function closure to enqueue.
pub(super) fn enqueue(&self, function: Work) -> Result<(), Error> {
self.0.send(function).map_err(|_| Error::Runtime)
}
}
/// Enqueue work on the runtime without caring about the return value. This is useful in situations
/// where work must be performed but the result does not matter. For example, when destorying CUDA
/// object as part of dropping an object.
///
/// # Arguments
///
/// * `f` - Function closure to execute on runtime.
///
/// # Example
///
/// ```ignore
/// enqueue_decoupled(move || {
/// // ...
/// });
/// ```
#[inline]
pub fn enqueue_decoupled(f: impl FnOnce() + Send + 'static) {
let f = Box::new(f);
RUNTIME_THREAD_LOCAL
.with(|runtime| runtime.enqueue(Work::new(f)))
.expect("runtime broken")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_enqueue_works() {
let (tx, rx) = std::sync::mpsc::channel();
assert!(RUNTIME_THREAD_LOCAL
.with(|runtime| {
runtime.enqueue(Work::new(move || {
assert!(tx.send(true).is_ok());
}))
})
.is_ok());
assert!(matches!(
rx.recv_timeout(std::time::Duration::from_millis(100)),
Ok(true),
));
}
#[test]
fn test_enqueue_decoupled_works() {
let (tx, rx) = std::sync::mpsc::channel();
enqueue_decoupled(move || {
assert!(tx.send(true).is_ok());
});
assert!(matches!(
rx.recv_timeout(std::time::Duration::from_millis(100)),
Ok(true),
));
}
}