Skip to main content

commonware_runtime/utils/
cell.rs

1use crate::{signal, Error, Handle};
2use governor::clock::{Clock as GClock, ReasonablyRealtime};
3use prometheus_client::registry::Metric;
4use rand::{CryptoRng, RngCore};
5use std::{
6    future::Future,
7    net::SocketAddr,
8    num::NonZeroUsize,
9    ops::RangeInclusive,
10    time::{Duration, SystemTime},
11};
12
13const MISSING_CONTEXT: &str = "runtime context missing";
14const DUPLICATE_CONTEXT: &str = "runtime context already present";
15
16/// Spawn a task using a [`Cell`] by taking its context, executing the provided
17/// async block, and restoring the context before the block completes.
18///
19/// The macro uses the context's default spawn configuration (supervised, shared executor with
20/// `blocking == false`). If you need to mark the task as blocking or request a dedicated thread,
21/// take the context via [`Cell::take`] and call the appropriate [`crate::Spawner`] methods before spawning.
22#[macro_export]
23macro_rules! spawn_cell {
24    ($cell:expr, $body:expr $(,)?) => {{
25        let __commonware_context = $cell.take();
26        __commonware_context.spawn(move |context| async move {
27            $cell.restore(context);
28            $body
29        })
30    }};
31}
32
33/// A wrapper around context that allows it to be taken and returned without requiring
34/// all interactions to unwrap (as with `Option<C>`).
35// TODO(#1833): Remove `Clone`
36#[derive(Clone, Debug)]
37pub enum Cell<C> {
38    /// A context available for use.
39    Present(C),
40    /// The context has been taken elsewhere.
41    Missing,
42}
43
44impl<C> Cell<C> {
45    /// Create a new slot containing `context`.
46    pub const fn new(context: C) -> Self {
47        Self::Present(context)
48    }
49
50    /// Remove the context from the slot, panicking if it is missing.
51    pub fn take(&mut self) -> C {
52        match std::mem::replace(self, Self::Missing) {
53            Self::Present(context) => context,
54            Self::Missing => panic!("{}", MISSING_CONTEXT),
55        }
56    }
57
58    /// Return a context to the slot, panicking if one is already present.
59    pub fn restore(&mut self, context: C) {
60        match self {
61            Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
62            Self::Missing => {
63                *self = Self::Present(context);
64            }
65        }
66    }
67
68    /// Returns a reference to the context.
69    ///
70    /// # Panics
71    ///
72    /// Panics if the context is missing.
73    pub fn as_present(&self) -> &C {
74        match self {
75            Self::Present(context) => context,
76            Self::Missing => panic!("{}", MISSING_CONTEXT),
77        }
78    }
79
80    /// Returns a mutable reference to the context.
81    ///
82    /// # Panics
83    ///
84    /// Panics if the context is missing.
85    pub fn as_present_mut(&mut self) -> &mut C {
86        match self {
87            Self::Present(context) => context,
88            Self::Missing => panic!("{}", MISSING_CONTEXT),
89        }
90    }
91
92    /// Consume the slot, returning the context.
93    ///
94    /// # Panics
95    ///
96    /// Panics if the context is missing.
97    pub fn into_present(self) -> C {
98        match self {
99            Self::Present(context) => context,
100            Self::Missing => panic!("{}", MISSING_CONTEXT),
101        }
102    }
103}
104
105impl<C> crate::Spawner for Cell<C>
106where
107    C: crate::Spawner,
108{
109    fn dedicated(self) -> Self {
110        Self::Present(self.into_present().dedicated())
111    }
112
113    fn shared(self, blocking: bool) -> Self {
114        Self::Present(self.into_present().shared(blocking))
115    }
116
117    fn instrumented(self) -> Self {
118        Self::Present(self.into_present().instrumented())
119    }
120
121    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122    where
123        F: FnOnce(Self) -> Fut + Send + 'static,
124        Fut: Future<Output = T> + Send + 'static,
125        T: Send + 'static,
126    {
127        self.into_present()
128            .spawn(move |context| f(Self::Present(context)))
129    }
130
131    fn stop(
132        self,
133        value: i32,
134        timeout: Option<Duration>,
135    ) -> impl Future<Output = Result<(), Error>> + Send {
136        self.into_present().stop(value, timeout)
137    }
138
139    fn stopped(&self) -> signal::Signal {
140        self.as_present().stopped()
141    }
142}
143
144impl<C> crate::Metrics for Cell<C>
145where
146    C: crate::Metrics,
147{
148    fn label(&self) -> String {
149        self.as_present().label()
150    }
151
152    fn with_label(&self, label: &str) -> Self {
153        Self::Present(self.as_present().with_label(label))
154    }
155
156    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
157        self.as_present().register(name, help, metric)
158    }
159
160    fn encode(&self) -> String {
161        self.as_present().encode()
162    }
163
164    fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
165        Self::Present(self.as_present().with_attribute(key, value))
166    }
167
168    fn with_scope(&self) -> Self {
169        Self::Present(self.as_present().with_scope())
170    }
171}
172
173impl<C> crate::Clock for Cell<C>
174where
175    C: crate::Clock,
176{
177    fn current(&self) -> SystemTime {
178        self.as_present().current()
179    }
180
181    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
182        self.as_present().sleep(duration)
183    }
184
185    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
186        self.as_present().sleep_until(deadline)
187    }
188}
189
190#[cfg(feature = "external")]
191impl<C> crate::Pacer for Cell<C>
192where
193    C: crate::Pacer,
194{
195    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
196    where
197        F: Future<Output = T> + Send + 'a,
198        T: Send + 'a,
199    {
200        self.as_present().pace(latency, future)
201    }
202}
203
204commonware_macros::stability_scope!(BETA {
205    use crate::{SinkOf, StreamOf};
206    use commonware_parallel::ThreadPool;
207    use rayon::ThreadPoolBuildError;
208
209    impl<C> crate::ThreadPooler for Cell<C>
210    where
211        C: crate::ThreadPooler,
212    {
213        fn create_thread_pool(
214            &self,
215            concurrency: NonZeroUsize,
216        ) -> Result<ThreadPool, ThreadPoolBuildError> {
217            self.as_present().create_thread_pool(concurrency)
218        }
219    }
220
221    impl<C> crate::Network for Cell<C>
222    where
223        C: crate::Network,
224    {
225        type Listener = <C as crate::Network>::Listener;
226
227        fn bind(
228            &self,
229            socket: SocketAddr,
230        ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
231            self.as_present().bind(socket)
232        }
233
234        fn dial(
235            &self,
236            socket: SocketAddr,
237        ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
238            self.as_present().dial(socket)
239        }
240    }
241
242    impl<C> crate::Storage for Cell<C>
243    where
244        C: crate::Storage,
245    {
246        type Blob = <C as crate::Storage>::Blob;
247
248        fn open_versioned(
249            &self,
250            partition: &str,
251            name: &[u8],
252            versions: RangeInclusive<u16>,
253        ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send {
254            self.as_present().open_versioned(partition, name, versions)
255        }
256
257        fn remove(
258            &self,
259            partition: &str,
260            name: Option<&[u8]>,
261        ) -> impl Future<Output = Result<(), Error>> + Send {
262            self.as_present().remove(partition, name)
263        }
264
265        fn scan(
266            &self,
267            partition: &str,
268        ) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
269            self.as_present().scan(partition)
270        }
271    }
272
273    impl<C> crate::Resolver for Cell<C>
274    where
275        C: crate::Resolver,
276    {
277        fn resolve(
278            &self,
279            host: &str,
280        ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, crate::Error>> + Send {
281            self.as_present().resolve(host)
282        }
283    }
284});
285
286impl<C> RngCore for Cell<C>
287where
288    C: RngCore,
289{
290    fn next_u32(&mut self) -> u32 {
291        self.as_present_mut().next_u32()
292    }
293
294    fn next_u64(&mut self) -> u64 {
295        self.as_present_mut().next_u64()
296    }
297
298    fn fill_bytes(&mut self, dest: &mut [u8]) {
299        self.as_present_mut().fill_bytes(dest)
300    }
301
302    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
303        self.as_present_mut().try_fill_bytes(dest)
304    }
305}
306
307impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
308
309impl<C> GClock for Cell<C>
310where
311    C: GClock,
312{
313    type Instant = <C as GClock>::Instant;
314
315    fn now(&self) -> Self::Instant {
316        self.as_present().now()
317    }
318}
319
320impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}
321
322impl<C> crate::BufferPooler for Cell<C>
323where
324    C: crate::BufferPooler,
325{
326    fn network_buffer_pool(&self) -> &crate::BufferPool {
327        self.as_present().network_buffer_pool()
328    }
329
330    fn storage_buffer_pool(&self) -> &crate::BufferPool {
331        self.as_present().storage_buffer_pool()
332    }
333}