commonware_runtime/utils/
cell.rs1use crate::{signal, Error, Handle, SinkOf, StreamOf};
2use commonware_parallel::ThreadPool;
3use governor::clock::{Clock as GClock, ReasonablyRealtime};
4use prometheus_client::registry::Metric;
5use rand::{CryptoRng, RngCore};
6use rayon::ThreadPoolBuildError;
7use std::{
8 future::Future,
9 net::SocketAddr,
10 num::NonZeroUsize,
11 ops::RangeInclusive,
12 time::{Duration, SystemTime},
13};
14
15const MISSING_CONTEXT: &str = "runtime context missing";
16const DUPLICATE_CONTEXT: &str = "runtime context already present";
17
18#[macro_export]
25macro_rules! spawn_cell {
26 ($cell:expr, $body:expr $(,)?) => {{
27 let __commonware_context = $cell.take();
28 __commonware_context.spawn(move |context| async move {
29 $cell.restore(context);
30 $body
31 })
32 }};
33}
34
35#[derive(Clone, Debug)]
39pub enum Cell<C> {
40 Present(C),
42 Missing,
44}
45
46impl<C> Cell<C> {
47 pub const fn new(context: C) -> Self {
49 Self::Present(context)
50 }
51
52 pub fn take(&mut self) -> C {
54 match std::mem::replace(self, Self::Missing) {
55 Self::Present(context) => context,
56 Self::Missing => panic!("{}", MISSING_CONTEXT),
57 }
58 }
59
60 pub fn restore(&mut self, context: C) {
62 match self {
63 Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
64 Self::Missing => {
65 *self = Self::Present(context);
66 }
67 }
68 }
69
70 pub fn as_present(&self) -> &C {
76 match self {
77 Self::Present(context) => context,
78 Self::Missing => panic!("{}", MISSING_CONTEXT),
79 }
80 }
81
82 pub fn as_present_mut(&mut self) -> &mut C {
88 match self {
89 Self::Present(context) => context,
90 Self::Missing => panic!("{}", MISSING_CONTEXT),
91 }
92 }
93
94 pub fn into_present(self) -> C {
100 match self {
101 Self::Present(context) => context,
102 Self::Missing => panic!("{}", MISSING_CONTEXT),
103 }
104 }
105}
106
107impl<C> crate::Spawner for Cell<C>
108where
109 C: crate::Spawner,
110{
111 fn dedicated(self) -> Self {
112 Self::Present(self.into_present().dedicated())
113 }
114
115 fn shared(self, blocking: bool) -> Self {
116 Self::Present(self.into_present().shared(blocking))
117 }
118
119 fn instrumented(self) -> Self {
120 Self::Present(self.into_present().instrumented())
121 }
122
123 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
124 where
125 F: FnOnce(Self) -> Fut + Send + 'static,
126 Fut: Future<Output = T> + Send + 'static,
127 T: Send + 'static,
128 {
129 self.into_present()
130 .spawn(move |context| f(Self::Present(context)))
131 }
132
133 fn stop(
134 self,
135 value: i32,
136 timeout: Option<Duration>,
137 ) -> impl Future<Output = Result<(), Error>> + Send {
138 self.into_present().stop(value, timeout)
139 }
140
141 fn stopped(&self) -> signal::Signal {
142 self.as_present().stopped()
143 }
144}
145
146impl<C> crate::RayonPoolSpawner for Cell<C>
147where
148 C: crate::RayonPoolSpawner,
149{
150 fn create_pool(&self, concurrency: NonZeroUsize) -> Result<ThreadPool, ThreadPoolBuildError> {
151 self.as_present().create_pool(concurrency)
152 }
153}
154
155impl<C> crate::Metrics for Cell<C>
156where
157 C: crate::Metrics,
158{
159 fn label(&self) -> String {
160 self.as_present().label()
161 }
162
163 fn with_label(&self, label: &str) -> Self {
164 Self::Present(self.as_present().with_label(label))
165 }
166
167 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
168 self.as_present().register(name, help, metric)
169 }
170
171 fn encode(&self) -> String {
172 self.as_present().encode()
173 }
174}
175
176impl<C> crate::Clock for Cell<C>
177where
178 C: crate::Clock,
179{
180 fn current(&self) -> SystemTime {
181 self.as_present().current()
182 }
183
184 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
185 self.as_present().sleep(duration)
186 }
187
188 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
189 self.as_present().sleep_until(deadline)
190 }
191}
192
193#[cfg(feature = "external")]
194impl<C> crate::Pacer for Cell<C>
195where
196 C: crate::Pacer,
197{
198 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
199 where
200 F: Future<Output = T> + Send + 'a,
201 T: Send + 'a,
202 {
203 self.as_present().pace(latency, future)
204 }
205}
206
207impl<C> crate::Network for Cell<C>
208where
209 C: crate::Network,
210{
211 type Listener = <C as crate::Network>::Listener;
212
213 fn bind(
214 &self,
215 socket: SocketAddr,
216 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
217 self.as_present().bind(socket)
218 }
219
220 fn dial(
221 &self,
222 socket: SocketAddr,
223 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
224 self.as_present().dial(socket)
225 }
226}
227
228impl<C> crate::Storage for Cell<C>
229where
230 C: crate::Storage,
231{
232 type Blob = <C as crate::Storage>::Blob;
233
234 fn open_versioned(
235 &self,
236 partition: &str,
237 name: &[u8],
238 versions: RangeInclusive<u16>,
239 ) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send {
240 self.as_present().open_versioned(partition, name, versions)
241 }
242
243 fn remove(
244 &self,
245 partition: &str,
246 name: Option<&[u8]>,
247 ) -> impl Future<Output = Result<(), Error>> + Send {
248 self.as_present().remove(partition, name)
249 }
250
251 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
252 self.as_present().scan(partition)
253 }
254}
255
256impl<C> RngCore for Cell<C>
257where
258 C: RngCore,
259{
260 fn next_u32(&mut self) -> u32 {
261 self.as_present_mut().next_u32()
262 }
263
264 fn next_u64(&mut self) -> u64 {
265 self.as_present_mut().next_u64()
266 }
267
268 fn fill_bytes(&mut self, dest: &mut [u8]) {
269 self.as_present_mut().fill_bytes(dest)
270 }
271
272 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
273 self.as_present_mut().try_fill_bytes(dest)
274 }
275}
276
277impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
278
279impl<C> GClock for Cell<C>
280where
281 C: GClock,
282{
283 type Instant = <C as GClock>::Instant;
284
285 fn now(&self) -> Self::Instant {
286 self.as_present().now()
287 }
288}
289
290impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}
291
292impl<C> crate::Resolver for Cell<C>
293where
294 C: crate::Resolver,
295{
296 fn resolve(
297 &self,
298 host: &str,
299 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, crate::Error>> + Send {
300 self.as_present().resolve(host)
301 }
302}