commonware_runtime/utils/
cell.rs1use crate::{signal, Error, Handle};
2use governor::clock::{Clock as GClock, ReasonablyRealtime};
3use prometheus_client::registry::Metric;
4use rand::{CryptoRng, RngCore};
5use std::{
6 future::Future,
7 net::SocketAddr,
8 num::NonZeroUsize,
9 ops::RangeInclusive,
10 time::{Duration, SystemTime},
11};
12
13const MISSING_CONTEXT: &str = "runtime context missing";
14const DUPLICATE_CONTEXT: &str = "runtime context already present";
15
16#[macro_export]
23macro_rules! spawn_cell {
24 ($cell:expr, $body:expr $(,)?) => {{
25 let __commonware_context = $cell.take();
26 __commonware_context.spawn(move |context| async move {
27 $cell.restore(context);
28 $body
29 })
30 }};
31}
32
33#[derive(Clone, Debug)]
37pub enum Cell<C> {
38 Present(C),
40 Missing,
42}
43
44impl<C> Cell<C> {
45 pub const fn new(context: C) -> Self {
47 Self::Present(context)
48 }
49
50 pub fn take(&mut self) -> C {
52 match std::mem::replace(self, Self::Missing) {
53 Self::Present(context) => context,
54 Self::Missing => panic!("{}", MISSING_CONTEXT),
55 }
56 }
57
58 pub fn restore(&mut self, context: C) {
60 match self {
61 Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
62 Self::Missing => {
63 *self = Self::Present(context);
64 }
65 }
66 }
67
68 pub fn as_present(&self) -> &C {
74 match self {
75 Self::Present(context) => context,
76 Self::Missing => panic!("{}", MISSING_CONTEXT),
77 }
78 }
79
80 pub fn as_present_mut(&mut self) -> &mut C {
86 match self {
87 Self::Present(context) => context,
88 Self::Missing => panic!("{}", MISSING_CONTEXT),
89 }
90 }
91
92 pub fn into_present(self) -> C {
98 match self {
99 Self::Present(context) => context,
100 Self::Missing => panic!("{}", MISSING_CONTEXT),
101 }
102 }
103}
104
105impl<C> crate::Spawner for Cell<C>
106where
107 C: crate::Spawner,
108{
109 fn dedicated(self) -> Self {
110 Self::Present(self.into_present().dedicated())
111 }
112
113 fn shared(self, blocking: bool) -> Self {
114 Self::Present(self.into_present().shared(blocking))
115 }
116
117 fn instrumented(self) -> Self {
118 Self::Present(self.into_present().instrumented())
119 }
120
121 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
122 where
123 F: FnOnce(Self) -> Fut + Send + 'static,
124 Fut: Future<Output = T> + Send + 'static,
125 T: Send + 'static,
126 {
127 self.into_present()
128 .spawn(move |context| f(Self::Present(context)))
129 }
130
131 fn stop(
132 self,
133 value: i32,
134 timeout: Option<Duration>,
135 ) -> impl Future<Output = Result<(), Error>> + Send {
136 self.into_present().stop(value, timeout)
137 }
138
139 fn stopped(&self) -> signal::Signal {
140 self.as_present().stopped()
141 }
142}
143
144impl<C> crate::Metrics for Cell<C>
145where
146 C: crate::Metrics,
147{
148 fn label(&self) -> String {
149 self.as_present().label()
150 }
151
152 fn with_label(&self, label: &str) -> Self {
153 Self::Present(self.as_present().with_label(label))
154 }
155
156 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
157 self.as_present().register(name, help, metric)
158 }
159
160 fn encode(&self) -> String {
161 self.as_present().encode()
162 }
163
164 fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
165 Self::Present(self.as_present().with_attribute(key, value))
166 }
167
168 fn with_scope(&self) -> Self {
169 Self::Present(self.as_present().with_scope())
170 }
171}
172
173impl<C> crate::Clock for Cell<C>
174where
175 C: crate::Clock,
176{
177 fn current(&self) -> SystemTime {
178 self.as_present().current()
179 }
180
181 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
182 self.as_present().sleep(duration)
183 }
184
185 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
186 self.as_present().sleep_until(deadline)
187 }
188}
189
190#[cfg(feature = "external")]
191impl<C> crate::Pacer for Cell<C>
192where
193 C: crate::Pacer,
194{
195 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
196 where
197 F: Future<Output = T> + Send + 'a,
198 T: Send + 'a,
199 {
200 self.as_present().pace(latency, future)
201 }
202}
203
204commonware_macros::stability_scope!(BETA {
205 use crate::{SinkOf, StreamOf};
206 use commonware_parallel::ThreadPool;
207 use rayon::ThreadPoolBuildError;
208
209 impl<C> crate::ThreadPooler for Cell<C>
210 where
211 C: crate::ThreadPooler,
212 {
213 fn create_thread_pool(
214 &self,
215 concurrency: NonZeroUsize,
216 ) -> Result<ThreadPool, ThreadPoolBuildError> {
217 self.as_present().create_thread_pool(concurrency)
218 }
219 }
220
221 impl<C> crate::Network for Cell<C>
222 where
223 C: crate::Network,
224 {
225 type Listener = <C as crate::Network>::Listener;
226
227 fn bind(
228 &self,
229 socket: SocketAddr,
230 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
231 self.as_present().bind(socket)
232 }
233
234 fn dial(
235 &self,
236 socket: SocketAddr,
237 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
238 self.as_present().dial(socket)
239 }
240 }
241
242 impl<C> crate::Storage for Cell<C>
243 where
244 C: crate::Storage,
245 {
246 type Blob = <C as crate::Storage>::Blob;
247
248 fn open_versioned(
249 &self,
250 partition: &str,
251 name: &[u8],
252 versions: RangeInclusive<u16>,
253 ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send {
254 self.as_present().open_versioned(partition, name, versions)
255 }
256
257 fn remove(
258 &self,
259 partition: &str,
260 name: Option<&[u8]>,
261 ) -> impl Future<Output = Result<(), Error>> + Send {
262 self.as_present().remove(partition, name)
263 }
264
265 fn scan(
266 &self,
267 partition: &str,
268 ) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
269 self.as_present().scan(partition)
270 }
271 }
272
273 impl<C> crate::Resolver for Cell<C>
274 where
275 C: crate::Resolver,
276 {
277 fn resolve(
278 &self,
279 host: &str,
280 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, crate::Error>> + Send {
281 self.as_present().resolve(host)
282 }
283 }
284});
285
286impl<C> RngCore for Cell<C>
287where
288 C: RngCore,
289{
290 fn next_u32(&mut self) -> u32 {
291 self.as_present_mut().next_u32()
292 }
293
294 fn next_u64(&mut self) -> u64 {
295 self.as_present_mut().next_u64()
296 }
297
298 fn fill_bytes(&mut self, dest: &mut [u8]) {
299 self.as_present_mut().fill_bytes(dest)
300 }
301
302 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
303 self.as_present_mut().try_fill_bytes(dest)
304 }
305}
306
307impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
308
309impl<C> GClock for Cell<C>
310where
311 C: GClock,
312{
313 type Instant = <C as GClock>::Instant;
314
315 fn now(&self) -> Self::Instant {
316 self.as_present().now()
317 }
318}
319
320impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}
321
322impl<C> crate::BufferPooler for Cell<C>
323where
324 C: crate::BufferPooler,
325{
326 fn network_buffer_pool(&self) -> &crate::BufferPool {
327 self.as_present().network_buffer_pool()
328 }
329
330 fn storage_buffer_pool(&self) -> &crate::BufferPool {
331 self.as_present().storage_buffer_pool()
332 }
333}