commonware_runtime/utils/
cell.rs1use crate::{signal, Error, Handle, SinkOf, StreamOf};
2use governor::clock::{Clock as GClock, ReasonablyRealtime};
3use prometheus_client::registry::Metric;
4use rand::{CryptoRng, RngCore};
5use std::{
6 future::Future,
7 net::SocketAddr,
8 time::{Duration, SystemTime},
9};
10
11const MISSING_CONTEXT: &str = "runtime context missing";
12const DUPLICATE_CONTEXT: &str = "runtime context already present";
13
14#[macro_export]
21macro_rules! spawn_cell {
22 ($cell:expr, $body:expr $(,)?) => {{
23 let __commonware_context = $cell.take();
24 __commonware_context.spawn(move |context| async move {
25 $cell.restore(context);
26 $body
27 })
28 }};
29}
30
31#[derive(Clone, Debug)]
35pub enum Cell<C> {
36 Present(C),
38 Missing,
40}
41
42impl<C> Cell<C> {
43 pub fn new(context: C) -> Self {
45 Self::Present(context)
46 }
47
48 pub fn take(&mut self) -> C {
50 match std::mem::replace(self, Self::Missing) {
51 Self::Present(context) => context,
52 Self::Missing => panic!("{}", MISSING_CONTEXT),
53 }
54 }
55
56 pub fn restore(&mut self, context: C) {
58 match self {
59 Self::Present(_) => panic!("{}", DUPLICATE_CONTEXT),
60 Self::Missing => {
61 *self = Self::Present(context);
62 }
63 }
64 }
65
66 pub fn into(self) -> C {
68 match self {
69 Self::Present(context) => context,
70 Self::Missing => panic!("{}", MISSING_CONTEXT),
71 }
72 }
73}
74
75impl<C> AsRef<C> for Cell<C> {
76 fn as_ref(&self) -> &C {
77 match self {
78 Self::Present(context) => context,
79 Self::Missing => panic!("{}", MISSING_CONTEXT),
80 }
81 }
82}
83
84impl<C> AsMut<C> for Cell<C> {
85 fn as_mut(&mut self) -> &mut C {
86 match self {
87 Self::Present(context) => context,
88 Self::Missing => panic!("{}", MISSING_CONTEXT),
89 }
90 }
91}
92
93impl<C> crate::Spawner for Cell<C>
94where
95 C: crate::Spawner,
96{
97 fn dedicated(self) -> Self {
98 Self::Present(self.into().dedicated())
99 }
100
101 fn shared(self, blocking: bool) -> Self {
102 Self::Present(self.into().shared(blocking))
103 }
104
105 fn instrumented(self) -> Self {
106 Self::Present(self.into().instrumented())
107 }
108
109 fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
110 where
111 F: FnOnce(Self) -> Fut + Send + 'static,
112 Fut: Future<Output = T> + Send + 'static,
113 T: Send + 'static,
114 {
115 self.into().spawn(move |context| f(Self::Present(context)))
116 }
117
118 fn stop(
119 self,
120 value: i32,
121 timeout: Option<Duration>,
122 ) -> impl Future<Output = Result<(), Error>> + Send {
123 self.into().stop(value, timeout)
124 }
125
126 fn stopped(&self) -> signal::Signal {
127 self.as_ref().stopped()
128 }
129}
130
131impl<C> crate::Metrics for Cell<C>
132where
133 C: crate::Metrics,
134{
135 fn label(&self) -> String {
136 self.as_ref().label()
137 }
138
139 fn with_label(&self, label: &str) -> Self {
140 Self::Present(self.as_ref().with_label(label))
141 }
142
143 fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
144 self.as_ref().register(name, help, metric)
145 }
146
147 fn encode(&self) -> String {
148 self.as_ref().encode()
149 }
150}
151
152impl<C> crate::Clock for Cell<C>
153where
154 C: crate::Clock,
155{
156 fn current(&self) -> SystemTime {
157 self.as_ref().current()
158 }
159
160 fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
161 self.as_ref().sleep(duration)
162 }
163
164 fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
165 self.as_ref().sleep_until(deadline)
166 }
167}
168
169#[cfg(feature = "external")]
170impl<C> crate::Pacer for Cell<C>
171where
172 C: crate::Pacer,
173{
174 fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
175 where
176 F: Future<Output = T> + Send + 'a,
177 T: Send + 'a,
178 {
179 self.as_ref().pace(latency, future)
180 }
181}
182
183impl<C> crate::Network for Cell<C>
184where
185 C: crate::Network,
186{
187 type Listener = <C as crate::Network>::Listener;
188
189 fn bind(
190 &self,
191 socket: SocketAddr,
192 ) -> impl Future<Output = Result<Self::Listener, Error>> + Send {
193 self.as_ref().bind(socket)
194 }
195
196 fn dial(
197 &self,
198 socket: SocketAddr,
199 ) -> impl Future<Output = Result<(SinkOf<Self>, StreamOf<Self>), Error>> + Send {
200 self.as_ref().dial(socket)
201 }
202}
203
204impl<C> crate::Storage for Cell<C>
205where
206 C: crate::Storage,
207{
208 type Blob = <C as crate::Storage>::Blob;
209
210 fn open(
211 &self,
212 partition: &str,
213 name: &[u8],
214 ) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
215 self.as_ref().open(partition, name)
216 }
217
218 fn remove(
219 &self,
220 partition: &str,
221 name: Option<&[u8]>,
222 ) -> impl Future<Output = Result<(), Error>> + Send {
223 self.as_ref().remove(partition, name)
224 }
225
226 fn scan(&self, partition: &str) -> impl Future<Output = Result<Vec<Vec<u8>>, Error>> + Send {
227 self.as_ref().scan(partition)
228 }
229}
230
231impl<C> RngCore for Cell<C>
232where
233 C: RngCore,
234{
235 fn next_u32(&mut self) -> u32 {
236 self.as_mut().next_u32()
237 }
238
239 fn next_u64(&mut self) -> u64 {
240 self.as_mut().next_u64()
241 }
242
243 fn fill_bytes(&mut self, dest: &mut [u8]) {
244 self.as_mut().fill_bytes(dest)
245 }
246
247 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
248 self.as_mut().try_fill_bytes(dest)
249 }
250}
251
252impl<C> CryptoRng for Cell<C> where C: CryptoRng {}
253
254impl<C> GClock for Cell<C>
255where
256 C: GClock,
257{
258 type Instant = <C as GClock>::Instant;
259
260 fn now(&self) -> Self::Instant {
261 self.as_ref().now()
262 }
263}
264
265impl<C> ReasonablyRealtime for Cell<C> where C: ReasonablyRealtime {}