commonware_runtime/utils/
cell.rs

1use crate::{signal, Error, Handle, SinkOf, StreamOf};
2use governor::clock::{Clock as GClock, ReasonablyRealtime};
3use prometheus_client::registry::Metric;
4use rand::{CryptoRng, RngCore};
5use std::{
6    future::Future,
7    net::SocketAddr,
8    time::{Duration, SystemTime},
9};
10
11const MISSING_CONTEXT: &str = "runtime context missing";
12const DUPLICATE_CONTEXT: &str = "runtime context already present";
13
14/// Spawn a task using a [`Cell`] by taking its context, executing the provided
15/// async block, and restoring the context before the block completes.
16///
17/// The macro uses the context's default spawn configuration (supervised, shared executor with
18/// `blocking == false`). If you need to mark the task as blocking or request a dedicated thread,
19/// take the context via [`Cell::take`] and call the appropriate [`crate::Spawner`] methods before spawning.
20#[macro_export]
21macro_rules! spawn_cell {
22    ($cell:expr, $body:expr $(,)?) => {{
23        let __commonware_context = $cell.take();
24        __commonware_context.spawn(move |context| async move {
25            $cell.restore(context);
26            $body
27        })
28    }};
29}
30
31/// A wrapper around context that allows it to be taken and returned without requiring
32/// all interactions to unwrap (as with `Option<C>`).
33// TODO(#1833): Remove `Clone`
34#[derive(Clone, Debug)]
35pub enum Cell<C> {
36    /// A context available for use.
37    Present(C),
38    /// The context has been taken elsewhere.
39    Missing,
40}
41
42impl<C> Cell<C> {
43    /// Create a new slot containing `context`.
44    pub fn new(context: C) -> Self {
45        Self::Present(context)
46    }
47
48    /// Remove the context from the slot, panicking if it is missing.
49    pub fn take(&mut self) -> C {
50        match std::mem::replace(self, Self::Missing) {
51            Self::Present(context) => context,
52            Self::Missing => panic!("{}", MISSING_CONTEXT),
53        }
54    }
55
56    /// Return a context to the slot, panicking if one is already present.
57    pub fn restore(&mut self, context: C) {
58        match self {
59            Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
60            Self::Missing => {
61                *self = Self::Present(context);
62            }
63        }
64    }
65
66    /// Consume the slot, returning the context and panicking if it is missing.
67    pub fn into(self) -> C {
68        match self {
69            Self::Present(context) => context,
70            Self::Missing => panic!("{}", MISSING_CONTEXT),
71        }
72    }
73}
74
75impl<C> AsRef<C> for Cell<C> {
76    fn as_ref(&self) -> &C {
77        match self {
78            Self::Present(context) => context,
79            Self::Missing => panic!("{}", MISSING_CONTEXT),
80        }
81    }
82}
83
84impl<C> AsMut<C> for Cell<C> {
85    fn as_mut(&mut self) -> &mut C {
86        match self {
87            Self::Present(context) => context,
88            Self::Missing => panic!("{}", MISSING_CONTEXT),
89        }
90    }
91}
92
93impl<C> crate::Spawner for Cell<C>
94where
95    C: crate::Spawner,
96{
97    fn dedicated(self) -> Self {
98        Self::Present(self.into().dedicated())
99    }
100
101    fn shared(self, blocking: bool) -> Self {
102        Self::Present(self.into().shared(blocking))
103    }
104
105    fn instrumented(self) -> Self {
106        Self::Present(self.into().instrumented())
107    }
108
109    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
110    where
111        F: FnOnce(Self) -> Fut + Send + 'static,
112        Fut: Future<Output = T> + Send + 'static,
113        T: Send + 'static,
114    {
115        self.into().spawn(move |context| f(Self::Present(context)))
116    }
117
118    fn stop(
119        self,
120        value: i32,
121        timeout: Option<Duration>,
122    ) -> impl Future<Output = Result<(), Error>> + Send {
123        self.into().stop(value, timeout)
124    }
125
126    fn stopped(&self) -> signal::Signal {
127        self.as_ref().stopped()
128    }
129}
130
131impl<C> crate::Metrics for Cell<C>
132where
133    C: crate::Metrics,
134{
135    fn label(&self) -> String {
136        self.as_ref().label()
137    }
138
139    fn with_label(&self, label: &str) -> Self {
140        Self::Present(self.as_ref().with_label(label))
141    }
142
143    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
144        self.as_ref().register(name, help, metric)
145    }
146
147    fn encode(&self) -> String {
148        self.as_ref().encode()
149    }
150}
151
152impl<C> crate::Clock for Cell<C>
153where
154    C: crate::Clock,
155{
156    fn current(&self) -> SystemTime {
157        self.as_ref().current()
158    }
159
160    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
161        self.as_ref().sleep(duration)
162    }
163
164    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
165        self.as_ref().sleep_until(deadline)
166    }
167}
168
169#[cfg(feature = "external")]
170impl<C> crate::Pacer for Cell<C>
171where
172    C: crate::Pacer,
173{
174    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
175    where
176        F: Future<Output = T> + Send + 'a,
177        T: Send + 'a,
178    {
179        self.as_ref().pace(latency, future)
180    }
181}
182
183impl<C> crate::Network for Cell<C>
184where
185    C: crate::Network,
186{
187    type Listener = <C as crate::Network>::Listener;
188
189    fn bind(
190        &self,
191        socket: SocketAddr,
192    ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
193        self.as_ref().bind(socket)
194    }
195
196    fn dial(
197        &self,
198        socket: SocketAddr,
199    ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
200        self.as_ref().dial(socket)
201    }
202}
203
204impl<C> crate::Storage for Cell<C>
205where
206    C: crate::Storage,
207{
208    type Blob = <C as crate::Storage>::Blob;
209
210    fn open(
211        &self,
212        partition: &str,
213        name: &[u8],
214    ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
215        self.as_ref().open(partition, name)
216    }
217
218    fn remove(
219        &self,
220        partition: &str,
221        name: Option<&[u8]>,
222    ) -> impl Future<Output = Result<(), Error>> + Send {
223        self.as_ref().remove(partition, name)
224    }
225
226    fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
227        self.as_ref().scan(partition)
228    }
229}
230
231impl<C> RngCore for Cell<C>
232where
233    C: RngCore,
234{
235    fn next_u32(&mut self) -> u32 {
236        self.as_mut().next_u32()
237    }
238
239    fn next_u64(&mut self) -> u64 {
240        self.as_mut().next_u64()
241    }
242
243    fn fill_bytes(&mut self, dest: &mut [u8]) {
244        self.as_mut().fill_bytes(dest)
245    }
246
247    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
248        self.as_mut().try_fill_bytes(dest)
249    }
250}
251
252impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
253
254impl<C> GClock for Cell<C>
255where
256    C: GClock,
257{
258    type Instant = <C as GClock>::Instant;
259
260    fn now(&self) -> Self::Instant {
261        self.as_ref().now()
262    }
263}
264
265impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}