containerd_shimkit/sandbox/
sync.rs

1//! Synchronization primitives (e.g. `WaitableCell`) for the sandbox.
2
3use std::cell::OnceCell;
4use std::sync::Arc;
5
6use tokio::sync::Notify;
7
8/// A cell where we can wait (with timeout) for
9/// a value to be set
10pub struct WaitableCell<T> {
11    inner: Arc<WaitableCellImpl<T>>,
12}
13
14struct WaitableCellImpl<T> {
15    cell: OnceCell<T>,
16    notify: Notify,
17}
18
19// this is safe because access to cell guarded by the mutex
20unsafe impl<T> Send for WaitableCell<T> {}
21unsafe impl<T> Sync for WaitableCell<T> {}
22
23impl<T> Default for WaitableCell<T> {
24    fn default() -> Self {
25        Self {
26            inner: Arc::new(WaitableCellImpl {
27                cell: OnceCell::default(),
28                notify: Notify::new(),
29            }),
30        }
31    }
32}
33
34impl<T> Clone for WaitableCell<T> {
35    fn clone(&self) -> Self {
36        let inner = self.inner.clone();
37        Self { inner }
38    }
39}
40
41impl<T> WaitableCell<T> {
42    /// Creates an empty WaitableCell.
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Sets a value to the WaitableCell.
48    /// This method has no effect if the WaitableCell already has a value.
49    pub fn set(&self, val: impl Into<T>) -> Result<(), T> {
50        self.inner.cell.set(val.into())?;
51        self.inner.notify.notify_waiters();
52        Ok(())
53    }
54
55    /// If the `WaitableCell` is empty when this guard is dropped, the cell will be set to result of `f`.
56    /// ```
57    /// # use containerd_shimkit::sandbox::sync::WaitableCell;
58    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
59    /// let cell = WaitableCell::<i32>::new();
60    /// {
61    ///     let _guard = cell.set_guard_with(|| 42);
62    /// }
63    /// assert_eq!(&42, cell.wait().await);
64    /// # })
65    /// ```
66    ///
67    /// The operation is a no-op if the cell conbtains a value before the guard is dropped.
68    /// ```
69    /// # use containerd_shimkit::sandbox::sync::WaitableCell;
70    /// # tokio::runtime::Runtime::new().unwrap().block_on(async {
71    /// let cell = WaitableCell::<i32>::new();
72    /// {
73    ///     let _guard = cell.set_guard_with(|| 42);
74    ///     let _ = cell.set(24);
75    /// }
76    /// assert_eq!(&24, cell.wait().await);
77    /// # })
78    /// ```
79    ///
80    /// The function `f` will always be called, regardless of whether the `WaitableCell` has a value or not.
81    /// The `WaitableCell` is going to be set even in the case of an unwind. In this case, ff the function `f`
82    /// panics it will cause an abort, so it's recommended to avoid any panics in `f`.
83    pub fn set_guard_with<R: Into<T>, F: FnOnce() -> R>(&self, f: F) -> impl Drop + use<F, T, R> {
84        let cell = (*self).clone();
85        WaitableCellSetGuard { f: Some(f), cell }
86    }
87
88    /// Wait for the WaitableCell to be set a value.
89    pub async fn wait(&self) -> &T {
90        let notified = self.inner.notify.notified();
91        if let Some(val) = self.inner.cell.get() {
92            return val;
93        }
94        notified.await;
95        // safe because we've been notified, which can only happen
96        // a call to self.inner.cell.set(..)
97        unsafe { self.inner.cell.get().unwrap_unchecked() }
98    }
99}
100
101// This is the type returned by `WaitableCell::set_guard_with`.
102// The public API has no visibility over this type, other than it implements `Drop`
103// If the `WaitableCell` `cell`` is empty when this guard is dropped, it will set it's value with the result of `f`.
104struct WaitableCellSetGuard<T, R: Into<T>, F: FnOnce() -> R> {
105    f: Option<F>,
106    cell: WaitableCell<T>,
107}
108
109impl<T, R: Into<T>, F: FnOnce() -> R> Drop for WaitableCellSetGuard<T, R, F> {
110    fn drop(&mut self) {
111        let _ = self.cell.set(self.f.take().unwrap()());
112    }
113}
114
115#[cfg(test)]
116mod test {
117    use std::time::Duration;
118
119    use futures::FutureExt;
120    use tokio::time::{sleep, timeout};
121
122    use super::WaitableCell;
123
124    #[tokio::test]
125    async fn basic() {
126        let cell = WaitableCell::<i32>::new();
127        cell.set(42).unwrap();
128        assert_eq!(&42, cell.wait().await);
129    }
130
131    #[tokio::test]
132    async fn basic_timeout_zero() {
133        let cell = WaitableCell::<i32>::new();
134        cell.set(42).unwrap();
135        assert_eq!(Some(&42), cell.wait().now_or_never());
136    }
137
138    #[tokio::test]
139    async fn unset_timeout_zero() {
140        let cell = WaitableCell::<i32>::new();
141        assert_eq!(None, cell.wait().now_or_never());
142    }
143
144    #[tokio::test]
145    async fn unset_timeout_1ms() {
146        let cell = WaitableCell::<i32>::new();
147        assert_eq!(
148            None,
149            timeout(Duration::from_millis(1), cell.wait()).await.ok()
150        );
151    }
152
153    #[tokio::test]
154    async fn clone() {
155        let cell = WaitableCell::<i32>::new();
156        let cloned = cell.clone();
157        let _ = cloned.set(42);
158        assert_eq!(&42, cell.wait().await);
159    }
160
161    #[tokio::test]
162    async fn basic_threaded() {
163        let cell = WaitableCell::<i32>::new();
164        tokio::spawn({
165            let cell = cell.clone();
166            async move {
167                sleep(Duration::from_millis(1)).await;
168                let _ = cell.set(42);
169            }
170        });
171        assert_eq!(&42, cell.wait().await);
172    }
173
174    #[tokio::test]
175    async fn basic_double_set() {
176        let cell = WaitableCell::<i32>::new();
177        assert_eq!(Ok(()), cell.set(42));
178        assert_eq!(Err(24), cell.set(24));
179    }
180
181    #[tokio::test]
182    async fn guard() {
183        let cell = WaitableCell::<i32>::new();
184        {
185            let _guard = cell.set_guard_with(|| 42);
186        }
187        assert_eq!(&42, cell.wait().await);
188    }
189
190    #[tokio::test]
191    async fn guard_no_op() {
192        let cell = WaitableCell::<i32>::new();
193        {
194            let _guard = cell.set_guard_with(|| 42);
195            let _ = cell.set(24);
196        }
197        assert_eq!(&24, cell.wait().await);
198    }
199}