commonware_runtime/utils/
cell.rs1use crate::{signal, Error, Handle, SinkOf, StreamOf};
2use governor::clock::{Clock as GClock, ReasonablyRealtime};
3use prometheus_client::registry::Metric;
4use rand::{CryptoRng, RngCore};
5use std::{
6 future::Future,
7 net::SocketAddr,
8 time::{Duration, SystemTime},
9};
10
11const MISSING_CONTEXT: &str = "runtime context missing";
12const DUPLICATE_CONTEXT: &str = "runtime context already present";
13
14#[macro_export]
21macro_rules! spawn_cell {
22 ($cell:expr, $body:expr $(,)?) => {{
23 let __commonware_context = $cell.take();
24 __commonware_context.spawn(move |context| async move {
25 $cell.restore(context);
26 $body
27 })
28 }};
29}
30
31#[derive(Clone, Debug)]
35pub enum Cell<C> {
36 Present(C),
38 Missing,
40}
41
42impl<C> Cell<C> {
43 pub const fn new(context: C) -> Self {
45 Self::Present(context)
46 }
47
48 pub fn take(&mut self) -> C {
50 match std::mem::replace(self, Self::Missing) {
51 Self::Present(context) => context,
52 Self::Missing => panic!("{}", MISSING_CONTEXT),
53 }
54 }
55
56 pub fn restore(&mut self, context: C) {
58 match self {
59 Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
60 Self::Missing => {
61 *self = Self::Present(context);
62 }
63 }
64 }
65
66 pub fn as_present(&self) -> &C {
72 match self {
73 Self::Present(context) => context,
74 Self::Missing => panic!("{}", MISSING_CONTEXT),
75 }
76 }
77
78 pub fn as_present_mut(&mut self) -> &mut C {
84 match self {
85 Self::Present(context) => context,
86 Self::Missing => panic!("{}", MISSING_CONTEXT),
87 }
88 }
89
90 pub fn into_present(self) -> C {
96 match self {
97 Self::Present(context) => context,
98 Self::Missing => panic!("{}", MISSING_CONTEXT),
99 }
100 }
101}
102
103impl<C> crate::Spawner for Cell<C>
104where
105 C: crate::Spawner,
106{
107 fn dedicated(self) -> Self {
108 Self::Present(self.into_present().dedicated())
109 }
110
111 fn shared(self, blocking: bool) -> Self {
112 Self::Present(self.into_present().shared(blocking))
113 }
114
115 fn instrumented(self) -> Self {
116 Self::Present(self.into_present().instrumented())
117 }
118
119 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
120 where
121 F: FnOnce(Self) -> Fut + Send + 'static,
122 Fut: Future<Output = T> + Send + 'static,
123 T: Send + 'static,
124 {
125 self.into_present()
126 .spawn(move |context| f(Self::Present(context)))
127 }
128
129 fn stop(
130 self,
131 value: i32,
132 timeout: Option<Duration>,
133 ) -> impl Future<Output = Result<(), Error>> + Send {
134 self.into_present().stop(value, timeout)
135 }
136
137 fn stopped(&self) -> signal::Signal {
138 self.as_present().stopped()
139 }
140}
141
142impl<C> crate::Metrics for Cell<C>
143where
144 C: crate::Metrics,
145{
146 fn label(&self) -> String {
147 self.as_present().label()
148 }
149
150 fn with_label(&self, label: &str) -> Self {
151 Self::Present(self.as_present().with_label(label))
152 }
153
154 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
155 self.as_present().register(name, help, metric)
156 }
157
158 fn encode(&self) -> String {
159 self.as_present().encode()
160 }
161}
162
163impl<C> crate::Clock for Cell<C>
164where
165 C: crate::Clock,
166{
167 fn current(&self) -> SystemTime {
168 self.as_present().current()
169 }
170
171 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
172 self.as_present().sleep(duration)
173 }
174
175 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
176 self.as_present().sleep_until(deadline)
177 }
178}
179
180#[cfg(feature = "external")]
181impl<C> crate::Pacer for Cell<C>
182where
183 C: crate::Pacer,
184{
185 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
186 where
187 F: Future<Output = T> + Send + 'a,
188 T: Send + 'a,
189 {
190 self.as_present().pace(latency, future)
191 }
192}
193
194impl<C> crate::Network for Cell<C>
195where
196 C: crate::Network,
197{
198 type Listener = <C as crate::Network>::Listener;
199
200 fn bind(
201 &self,
202 socket: SocketAddr,
203 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
204 self.as_present().bind(socket)
205 }
206
207 fn dial(
208 &self,
209 socket: SocketAddr,
210 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
211 self.as_present().dial(socket)
212 }
213}
214
215impl<C> crate::Storage for Cell<C>
216where
217 C: crate::Storage,
218{
219 type Blob = <C as crate::Storage>::Blob;
220
221 fn open(
222 &self,
223 partition: &str,
224 name: &[u8],
225 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
226 self.as_present().open(partition, name)
227 }
228
229 fn remove(
230 &self,
231 partition: &str,
232 name: Option<&[u8]>,
233 ) -> impl Future<Output = Result<(), Error>> + Send {
234 self.as_present().remove(partition, name)
235 }
236
237 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
238 self.as_present().scan(partition)
239 }
240}
241
242impl<C> RngCore for Cell<C>
243where
244 C: RngCore,
245{
246 fn next_u32(&mut self) -> u32 {
247 self.as_present_mut().next_u32()
248 }
249
250 fn next_u64(&mut self) -> u64 {
251 self.as_present_mut().next_u64()
252 }
253
254 fn fill_bytes(&mut self, dest: &mut [u8]) {
255 self.as_present_mut().fill_bytes(dest)
256 }
257
258 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
259 self.as_present_mut().try_fill_bytes(dest)
260 }
261}
262
263impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
264
265impl<C> GClock for Cell<C>
266where
267 C: GClock,
268{
269 type Instant = <C as GClock>::Instant;
270
271 fn now(&self) -> Self::Instant {
272 self.as_present().now()
273 }
274}
275
276impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}
277
278impl<C> crate::Resolver for Cell<C>
279where
280 C: crate::Resolver,
281{
282 fn resolve(
283 &self,
284 host: &str,
285 ) -> impl Future<Output = Result<Vec<std::net::IpAddr>, crate::Error>> + Send {
286 self.as_present().resolve(host)
287 }
288}