commonware_runtime/utils/
cell.rs

1use crate::{signal, Error, Handle, SinkOf, StreamOf};
2use governor::clock::{Clock as GClock, ReasonablyRealtime};
3use prometheus_client::registry::Metric;
4use rand::{CryptoRng, RngCore};
5use std::{
6    future::Future,
7    net::SocketAddr,
8    time::{Duration, SystemTime},
9};
10
11const MISSING_CONTEXT: &str = "runtime context missing";
12const DUPLICATE_CONTEXT: &str = "runtime context already present";
13
14/// Spawn a task using a [`Cell`] by taking its context, executing the provided
15/// async block, and restoring the context before the block completes.
16///
17/// The macro uses the context's default spawn configuration (supervised, shared executor with
18/// `blocking == false`). If you need to mark the task as blocking or request a dedicated thread,
19/// take the context via [`Cell::take`] and call the appropriate [`crate::Spawner`] methods before spawning.
20#[macro_export]
21macro_rules! spawn_cell {
22    ($cell:expr, $body:expr $(,)?) => {{
23        let __commonware_context = $cell.take();
24        __commonware_context.spawn(move |context| async move {
25            $cell.restore(context);
26            $body
27        })
28    }};
29}
30
31/// A wrapper around context that allows it to be taken and returned without requiring
32/// all interactions to unwrap (as with `Option<C>`).
33// TODO(#1833): Remove `Clone`
34#[derive(Clone, Debug)]
35pub enum Cell<C> {
36    /// A context available for use.
37    Present(C),
38    /// The context has been taken elsewhere.
39    Missing,
40}
41
42impl<C> Cell<C> {
43    /// Create a new slot containing `context`.
44    pub const fn new(context: C) -> Self {
45        Self::Present(context)
46    }
47
48    /// Remove the context from the slot, panicking if it is missing.
49    pub fn take(&mut self) -> C {
50        match std::mem::replace(self, Self::Missing) {
51            Self::Present(context) => context,
52            Self::Missing => panic!("{}", MISSING_CONTEXT),
53        }
54    }
55
56    /// Return a context to the slot, panicking if one is already present.
57    pub fn restore(&mut self, context: C) {
58        match self {
59            Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
60            Self::Missing => {
61                *self = Self::Present(context);
62            }
63        }
64    }
65
66    /// Returns a reference to the context.
67    ///
68    /// # Panics
69    ///
70    /// Panics if the context is missing.
71    pub fn as_present(&self) -> &C {
72        match self {
73            Self::Present(context) => context,
74            Self::Missing => panic!("{}", MISSING_CONTEXT),
75        }
76    }
77
78    /// Returns a mutable reference to the context.
79    ///
80    /// # Panics
81    ///
82    /// Panics if the context is missing.
83    pub fn as_present_mut(&mut self) -> &mut C {
84        match self {
85            Self::Present(context) => context,
86            Self::Missing => panic!("{}", MISSING_CONTEXT),
87        }
88    }
89
90    /// Consume the slot, returning the context.
91    ///
92    /// # Panics
93    ///
94    /// Panics if the context is missing.
95    pub fn into_present(self) -> C {
96        match self {
97            Self::Present(context) => context,
98            Self::Missing => panic!("{}", MISSING_CONTEXT),
99        }
100    }
101}
102
103impl<C> crate::Spawner for Cell<C>
104where
105    C: crate::Spawner,
106{
107    fn dedicated(self) -> Self {
108        Self::Present(self.into_present().dedicated())
109    }
110
111    fn shared(self, blocking: bool) -> Self {
112        Self::Present(self.into_present().shared(blocking))
113    }
114
115    fn instrumented(self) -> Self {
116        Self::Present(self.into_present().instrumented())
117    }
118
119    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
120    where
121        F: FnOnce(Self) -> Fut + Send + 'static,
122        Fut: Future<Output = T> + Send + 'static,
123        T: Send + 'static,
124    {
125        self.into_present()
126            .spawn(move |context| f(Self::Present(context)))
127    }
128
129    fn stop(
130        self,
131        value: i32,
132        timeout: Option<Duration>,
133    ) -> impl Future<Output = Result<(), Error>> + Send {
134        self.into_present().stop(value, timeout)
135    }
136
137    fn stopped(&self) -> signal::Signal {
138        self.as_present().stopped()
139    }
140}
141
142impl<C> crate::Metrics for Cell<C>
143where
144    C: crate::Metrics,
145{
146    fn label(&self) -> String {
147        self.as_present().label()
148    }
149
150    fn with_label(&self, label: &str) -> Self {
151        Self::Present(self.as_present().with_label(label))
152    }
153
154    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
155        self.as_present().register(name, help, metric)
156    }
157
158    fn encode(&self) -> String {
159        self.as_present().encode()
160    }
161}
162
163impl<C> crate::Clock for Cell<C>
164where
165    C: crate::Clock,
166{
167    fn current(&self) -> SystemTime {
168        self.as_present().current()
169    }
170
171    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
172        self.as_present().sleep(duration)
173    }
174
175    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
176        self.as_present().sleep_until(deadline)
177    }
178}
179
180#[cfg(feature = "external")]
181impl<C> crate::Pacer for Cell<C>
182where
183    C: crate::Pacer,
184{
185    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
186    where
187        F: Future<Output = T> + Send + 'a,
188        T: Send + 'a,
189    {
190        self.as_present().pace(latency, future)
191    }
192}
193
194impl<C> crate::Network for Cell<C>
195where
196    C: crate::Network,
197{
198    type Listener = <C as crate::Network>::Listener;
199
200    fn bind(
201        &self,
202        socket: SocketAddr,
203    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
204        self.as_present().bind(socket)
205    }
206
207    fn dial(
208        &self,
209        socket: SocketAddr,
210    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
211        self.as_present().dial(socket)
212    }
213}
214
215impl<C> crate::Storage for Cell<C>
216where
217    C: crate::Storage,
218{
219    type Blob = <C as crate::Storage>::Blob;
220
221    fn open(
222        &self,
223        partition: &str,
224        name: &[u8],
225    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
226        self.as_present().open(partition, name)
227    }
228
229    fn remove(
230        &self,
231        partition: &str,
232        name: Option<&[u8]>,
233    ) -> impl Future<Output = Result<(), Error>> + Send {
234        self.as_present().remove(partition, name)
235    }
236
237    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
238        self.as_present().scan(partition)
239    }
240}
241
242impl<C> RngCore for Cell<C>
243where
244    C: RngCore,
245{
246    fn next_u32(&mut self) -> u32 {
247        self.as_present_mut().next_u32()
248    }
249
250    fn next_u64(&mut self) -> u64 {
251        self.as_present_mut().next_u64()
252    }
253
254    fn fill_bytes(&mut self, dest: &mut [u8]) {
255        self.as_present_mut().fill_bytes(dest)
256    }
257
258    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
259        self.as_present_mut().try_fill_bytes(dest)
260    }
261}
262
263impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
264
265impl<C> GClock for Cell<C>
266where
267    C: GClock,
268{
269    type Instant = <C as GClock>::Instant;
270
271    fn now(&self) -> Self::Instant {
272        self.as_present().now()
273    }
274}
275
276impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}
277
278impl<C> crate::Resolver for Cell<C>
279where
280    C: crate::Resolver,
281{
282    fn resolve(
283        &self,
284        host: &str,
285    ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, crate::Error>> + Send {
286        self.as_present().resolve(host)
287    }
288}