containerd_shimkit/sandbox/
sync.rs1use std::cell::OnceCell;
4use std::sync::Arc;
5
6use tokio::sync::Notify;
7
8pub struct WaitableCell<T> {
11 inner: Arc<WaitableCellImpl<T>>,
12}
13
14struct WaitableCellImpl<T> {
15 cell: OnceCell<T>,
16 notify: Notify,
17}
18
19unsafe impl<T> Send for WaitableCell<T> {}
21unsafe impl<T> Sync for WaitableCell<T> {}
22
23impl<T> Default for WaitableCell<T> {
24 fn default() -> Self {
25 Self {
26 inner: Arc::new(WaitableCellImpl {
27 cell: OnceCell::default(),
28 notify: Notify::new(),
29 }),
30 }
31 }
32}
33
34impl<T> Clone for WaitableCell<T> {
35 fn clone(&self) -> Self {
36 let inner = self.inner.clone();
37 Self { inner }
38 }
39}
40
41impl<T> WaitableCell<T> {
42 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn set(&self, val: impl Into<T>) -> Result<(), T> {
50 self.inner.cell.set(val.into())?;
51 self.inner.notify.notify_waiters();
52 Ok(())
53 }
54
55 pub fn set_guard_with<R: Into<T>, F: FnOnce() -> R>(&self, f: F) -> impl Drop + use<F, T, R> {
84 let cell = (*self).clone();
85 WaitableCellSetGuard { f: Some(f), cell }
86 }
87
88 pub async fn wait(&self) -> &T {
90 let notified = self.inner.notify.notified();
91 if let Some(val) = self.inner.cell.get() {
92 return val;
93 }
94 notified.await;
95 unsafe { self.inner.cell.get().unwrap_unchecked() }
98 }
99}
100
101struct WaitableCellSetGuard<T, R: Into<T>, F: FnOnce() -> R> {
105 f: Option<F>,
106 cell: WaitableCell<T>,
107}
108
109impl<T, R: Into<T>, F: FnOnce() -> R> Drop for WaitableCellSetGuard<T, R, F> {
110 fn drop(&mut self) {
111 let _ = self.cell.set(self.f.take().unwrap()());
112 }
113}
114
115#[cfg(test)]
116mod test {
117 use std::time::Duration;
118
119 use futures::FutureExt;
120 use tokio::time::{sleep, timeout};
121
122 use super::WaitableCell;
123
124 #[tokio::test]
125 async fn basic() {
126 let cell = WaitableCell::<i32>::new();
127 cell.set(42).unwrap();
128 assert_eq!(&42, cell.wait().await);
129 }
130
131 #[tokio::test]
132 async fn basic_timeout_zero() {
133 let cell = WaitableCell::<i32>::new();
134 cell.set(42).unwrap();
135 assert_eq!(Some(&42), cell.wait().now_or_never());
136 }
137
138 #[tokio::test]
139 async fn unset_timeout_zero() {
140 let cell = WaitableCell::<i32>::new();
141 assert_eq!(None, cell.wait().now_or_never());
142 }
143
144 #[tokio::test]
145 async fn unset_timeout_1ms() {
146 let cell = WaitableCell::<i32>::new();
147 assert_eq!(
148 None,
149 timeout(Duration::from_millis(1), cell.wait()).await.ok()
150 );
151 }
152
153 #[tokio::test]
154 async fn clone() {
155 let cell = WaitableCell::<i32>::new();
156 let cloned = cell.clone();
157 let _ = cloned.set(42);
158 assert_eq!(&42, cell.wait().await);
159 }
160
161 #[tokio::test]
162 async fn basic_threaded() {
163 let cell = WaitableCell::<i32>::new();
164 tokio::spawn({
165 let cell = cell.clone();
166 async move {
167 sleep(Duration::from_millis(1)).await;
168 let _ = cell.set(42);
169 }
170 });
171 assert_eq!(&42, cell.wait().await);
172 }
173
174 #[tokio::test]
175 async fn basic_double_set() {
176 let cell = WaitableCell::<i32>::new();
177 assert_eq!(Ok(()), cell.set(42));
178 assert_eq!(Err(24), cell.set(24));
179 }
180
181 #[tokio::test]
182 async fn guard() {
183 let cell = WaitableCell::<i32>::new();
184 {
185 let _guard = cell.set_guard_with(|| 42);
186 }
187 assert_eq!(&42, cell.wait().await);
188 }
189
190 #[tokio::test]
191 async fn guard_no_op() {
192 let cell = WaitableCell::<i32>::new();
193 {
194 let _guard = cell.set_guard_with(|| 42);
195 let _ = cell.set(24);
196 }
197 assert_eq!(&24, cell.wait().await);
198 }
199}