commonware_runtime/utils/
cell.rs

1use crate::{signal, Error, Handle, SinkOf, StreamOf};
2use commonware_parallel::ThreadPool;
3use governor::clock::{Clock as GClock, ReasonablyRealtime};
4use prometheus_client::registry::Metric;
5use rand::{CryptoRng, RngCore};
6use rayon::ThreadPoolBuildError;
7use std::{
8    future::Future,
9    net::SocketAddr,
10    num::NonZeroUsize,
11    ops::RangeInclusive,
12    time::{Duration, SystemTime},
13};
14
15const MISSING_CONTEXT: &str = "runtime context missing";
16const DUPLICATE_CONTEXT: &str = "runtime context already present";
17
18/// Spawn a task using a [`Cell`] by taking its context, executing the provided
19/// async block, and restoring the context before the block completes.
20///
21/// The macro uses the context's default spawn configuration (supervised, shared executor with
22/// `blocking == false`). If you need to mark the task as blocking or request a dedicated thread,
23/// take the context via [`Cell::take`] and call the appropriate [`crate::Spawner`] methods before spawning.
24#[macro_export]
25macro_rules! spawn_cell {
26    ($cell:expr, $body:expr $(,)?) => {{
27        let __commonware_context = $cell.take();
28        __commonware_context.spawn(move |context| async move {
29            $cell.restore(context);
30            $body
31        })
32    }};
33}
34
35/// A wrapper around context that allows it to be taken and returned without requiring
36/// all interactions to unwrap (as with `Option<C>`).
37// TODO(#1833): Remove `Clone`
38#[derive(Clone, Debug)]
39pub enum Cell<C> {
40    /// A context available for use.
41    Present(C),
42    /// The context has been taken elsewhere.
43    Missing,
44}
45
46impl<C> Cell<C> {
47    /// Create a new slot containing `context`.
48    pub const fn new(context: C) -> Self {
49        Self::Present(context)
50    }
51
52    /// Remove the context from the slot, panicking if it is missing.
53    pub fn take(&mut self) -> C {
54        match std::mem::replace(self, Self::Missing) {
55            Self::Present(context) => context,
56            Self::Missing => panic!("{}", MISSING_CONTEXT),
57        }
58    }
59
60    /// Return a context to the slot, panicking if one is already present.
61    pub fn restore(&mut self, context: C) {
62        match self {
63            Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
64            Self::Missing => {
65                *self = Self::Present(context);
66            }
67        }
68    }
69
70    /// Returns a reference to the context.
71    ///
72    /// # Panics
73    ///
74    /// Panics if the context is missing.
75    pub fn as_present(&self) -> &C {
76        match self {
77            Self::Present(context) => context,
78            Self::Missing => panic!("{}", MISSING_CONTEXT),
79        }
80    }
81
82    /// Returns a mutable reference to the context.
83    ///
84    /// # Panics
85    ///
86    /// Panics if the context is missing.
87    pub fn as_present_mut(&mut self) -> &mut C {
88        match self {
89            Self::Present(context) => context,
90            Self::Missing => panic!("{}", MISSING_CONTEXT),
91        }
92    }
93
94    /// Consume the slot, returning the context.
95    ///
96    /// # Panics
97    ///
98    /// Panics if the context is missing.
99    pub fn into_present(self) -> C {
100        match self {
101            Self::Present(context) => context,
102            Self::Missing => panic!("{}", MISSING_CONTEXT),
103        }
104    }
105}
106
107impl<C> crate::Spawner for Cell<C>
108where
109    C: crate::Spawner,
110{
111    fn dedicated(self) -> Self {
112        Self::Present(self.into_present().dedicated())
113    }
114
115    fn shared(self, blocking: bool) -> Self {
116        Self::Present(self.into_present().shared(blocking))
117    }
118
119    fn instrumented(self) -> Self {
120        Self::Present(self.into_present().instrumented())
121    }
122
123    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
124    where
125        F: FnOnce(Self) -> Fut + Send + 'static,
126        Fut: Future<Output = T> + Send + 'static,
127        T: Send + 'static,
128    {
129        self.into_present()
130            .spawn(move |context| f(Self::Present(context)))
131    }
132
133    fn stop(
134        self,
135        value: i32,
136        timeout: Option<Duration>,
137    ) -> impl Future<Output = Result<(), Error>> + Send {
138        self.into_present().stop(value, timeout)
139    }
140
141    fn stopped(&self) -> signal::Signal {
142        self.as_present().stopped()
143    }
144}
145
146impl<C> crate::RayonPoolSpawner for Cell<C>
147where
148    C: crate::RayonPoolSpawner,
149{
150    fn create_pool(&self, concurrency: NonZeroUsize) -> Result<ThreadPool, ThreadPoolBuildError> {
151        self.as_present().create_pool(concurrency)
152    }
153}
154
155impl<C> crate::Metrics for Cell<C>
156where
157    C: crate::Metrics,
158{
159    fn label(&self) -> String {
160        self.as_present().label()
161    }
162
163    fn with_label(&self, label: &str) -> Self {
164        Self::Present(self.as_present().with_label(label))
165    }
166
167    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
168        self.as_present().register(name, help, metric)
169    }
170
171    fn encode(&self) -> String {
172        self.as_present().encode()
173    }
174}
175
176impl<C> crate::Clock for Cell<C>
177where
178    C: crate::Clock,
179{
180    fn current(&self) -> SystemTime {
181        self.as_present().current()
182    }
183
184    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
185        self.as_present().sleep(duration)
186    }
187
188    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
189        self.as_present().sleep_until(deadline)
190    }
191}
192
193#[cfg(feature = "external")]
194impl<C> crate::Pacer for Cell<C>
195where
196    C: crate::Pacer,
197{
198    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
199    where
200        F: Future<Output = T> + Send + 'a,
201        T: Send + 'a,
202    {
203        self.as_present().pace(latency, future)
204    }
205}
206
207impl<C> crate::Network for Cell<C>
208where
209    C: crate::Network,
210{
211    type Listener = <C as crate::Network>::Listener;
212
213    fn bind(
214        &self,
215        socket: SocketAddr,
216    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
217        self.as_present().bind(socket)
218    }
219
220    fn dial(
221        &self,
222        socket: SocketAddr,
223    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
224        self.as_present().dial(socket)
225    }
226}
227
228impl<C> crate::Storage for Cell<C>
229where
230    C: crate::Storage,
231{
232    type Blob = <C as crate::Storage>::Blob;
233
234    fn open_versioned(
235        &self,
236        partition: &str,
237        name: &[u8],
238        versions: RangeInclusive<u16>,
239    ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send {
240        self.as_present().open_versioned(partition, name, versions)
241    }
242
243    fn remove(
244        &self,
245        partition: &str,
246        name: Option<&[u8]>,
247    ) -> impl Future<Output = Result<(), Error>> + Send {
248        self.as_present().remove(partition, name)
249    }
250
251    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
252        self.as_present().scan(partition)
253    }
254}
255
256impl<C> RngCore for Cell<C>
257where
258    C: RngCore,
259{
260    fn next_u32(&mut self) -> u32 {
261        self.as_present_mut().next_u32()
262    }
263
264    fn next_u64(&mut self) -> u64 {
265        self.as_present_mut().next_u64()
266    }
267
268    fn fill_bytes(&mut self, dest: &mut [u8]) {
269        self.as_present_mut().fill_bytes(dest)
270    }
271
272    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
273        self.as_present_mut().try_fill_bytes(dest)
274    }
275}
276
277impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
278
279impl<C> GClock for Cell<C>
280where
281    C: GClock,
282{
283    type Instant = <C as GClock>::Instant;
284
285    fn now(&self) -> Self::Instant {
286        self.as_present().now()
287    }
288}
289
290impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}
291
292impl<C> crate::Resolver for Cell<C>
293where
294    C: crate::Resolver,
295{
296    fn resolve(
297        &self,
298        host: &str,
299    ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, crate::Error>> + Send {
300        self.as_present().resolve(host)
301    }
302}