tightbeam/colony/worker/
mod.rs1#[cfg(not(feature = "std"))]
20extern crate alloc;
21#[cfg(not(feature = "std"))]
22use alloc::boxed::Box;
23#[cfg(not(feature = "std"))]
24use alloc::sync::Arc;
25
26#[cfg(feature = "std")]
27use std::sync::Arc;
28
29use core::future::Future;
30use core::pin::Pin;
31
32#[cfg(feature = "derive")]
33use crate::Errorizable;
34
35use crate::policy::{ReceptorPolicy, TransitStatus};
36use crate::trace::TraceCollector;
37use crate::Message;
38
39pub mod macros;
40
41pub mod worker_runtime {
43 pub mod rt {
44 pub use crate::runtime::rt::*;
45
46 pub type QueueSender<T> = crate::runtime::rt::Sender<T>;
48
49 pub type QueueReceiver<T> = crate::runtime::rt::Receiver<T>;
51 }
52}
53
54pub struct WorkerRequest<I: Send, O> {
55 pub message: Arc<I>,
56 pub respond_to: worker_runtime::rt::OneshotSender<Result<O, TransitStatus>>,
57 pub trace: Arc<TraceCollector>,
58}
59
60#[cfg_attr(feature = "derive", derive(Errorizable))]
61#[derive(Debug)]
62pub enum WorkerRelayError {
63 #[cfg_attr(feature = "derive", error("Worker queue closed"))]
64 QueueClosed,
65 #[cfg_attr(feature = "derive", error("Worker response channel dropped"))]
66 ResponseDropped,
67 #[cfg_attr(feature = "derive", error("Message rejected with status {:?}"))]
68 #[cfg_attr(feature = "derive", from)]
69 Rejected(TransitStatus),
70}
71
72#[cfg(not(feature = "derive"))]
73impl core::fmt::Display for WorkerRelayError {
74 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
75 match self {
76 Self::QueueClosed => f.write_str("worker queue closed"),
77 Self::ResponseDropped => f.write_str("worker response channel dropped"),
78 Self::Rejected(status) => write!(f, "message rejected with status {:?}", status),
79 }
80 }
81}
82
83#[cfg(not(feature = "derive"))]
84impl std::error::Error for WorkerRelayError {}
85
86pub type WorkerRelayFuture<O> = Pin<Box<dyn Future<Output = Result<O, WorkerRelayError>> + Send + 'static>>;
87pub type WorkerStartFuture<W> = Pin<Box<dyn Future<Output = Result<W, crate::error::TightBeamError>> + Send>>;
88
89#[cfg(feature = "tokio")]
90#[allow(dead_code)]
91pub fn block_on_worker_future<F, T>(future: F) -> Result<T, std::io::Error>
92where
93 F: Future<Output = T> + Send + 'static,
94 T: Send + 'static,
95{
96 match tokio::runtime::Handle::try_current() {
98 Ok(handle) => Ok(handle.block_on(future)),
99 Err(_) => {
100 let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
101 Ok(runtime.block_on(future))
102 }
103 }
104}
105
106#[cfg(all(not(feature = "tokio"), feature = "std"))]
107#[allow(dead_code)]
108pub fn block_on_worker_future<F, T>(future: F) -> T
109where
110 F: Future<Output = T> + Send + 'static,
111 T: Send + 'static,
112{
113 worker_runtime::rt::block_on(future)
114}
115
116pub trait Worker: Send + Sync + Sized {
117 type Input: Send + Sync + 'static;
118 type Output: Send + 'static;
119 type Config: Send + Sync + 'static;
120
121 fn new(config: Self::Config) -> Self;
122
123 fn start(self, trace: Arc<TraceCollector>) -> WorkerStartFuture<Self>;
124
125 fn kill(self) -> ::core::result::Result<(), std::io::Error>;
126
127 fn relay(&self, message: Arc<Self::Input>) -> WorkerRelayFuture<Self::Output>;
128
129 fn queue_capacity(&self) -> usize;
130}
131
132pub trait WorkerMetadata {
134 fn name() -> &'static str;
136}
137
138pub struct WorkerPolicies<I: Send> {
139 #[allow(dead_code)]
140 pub(crate) receptor_gates: Vec<Arc<dyn ReceptorPolicy<I> + Send + Sync>>,
141}
142
143impl<I: Send> WorkerPolicies<I> {
144 pub fn receptor_gates(&self) -> &[Arc<dyn ReceptorPolicy<I> + Send + Sync>] {
145 &self.receptor_gates
146 }
147}
148
149impl<I: Send> Default for WorkerPolicies<I> {
150 fn default() -> Self {
151 Self { receptor_gates: Vec::new() }
152 }
153}
154
155pub struct WorkerPolicyBuilder<I: Send> {
156 receptor_gates: Vec<Arc<dyn ReceptorPolicy<I> + Send + Sync>>,
157}
158
159impl<I: Send> Default for WorkerPolicyBuilder<I> {
160 fn default() -> Self {
161 Self { receptor_gates: Vec::new() }
162 }
163}
164
165impl<I: Message + Send> WorkerPolicyBuilder<I> {
166 pub fn build(self) -> WorkerPolicies<I> {
167 WorkerPolicies { receptor_gates: self.receptor_gates }
168 }
169
170 pub fn with_receptor_gate<R, const N: usize>(mut self, gates: [R; N]) -> Self
171 where
172 R: ReceptorPolicy<I> + Send + Sync + 'static,
173 {
174 self.receptor_gates.extend(
175 gates
176 .into_iter()
177 .map(|gate| Arc::new(gate) as Arc<dyn ReceptorPolicy<I> + Send + Sync>),
178 );
179 self
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use std::sync::Arc;
186
187 use super::WorkerRelayError;
188 use crate::der::Sequence;
189 use crate::policy::{ReceptorPolicy, TransitStatus};
190 use crate::worker;
191 use crate::Beamable;
192
193 #[derive(Beamable, Clone, Debug, PartialEq, Sequence)]
194 pub struct RequestMessage {
195 content: String,
196 lucky_number: u32,
197 }
198
199 #[derive(Sequence, Beamable, Clone, Debug, PartialEq)]
200 pub struct PongMessage {
201 result: String,
202 }
203
204 #[derive(Default)]
205 struct PingGate;
206
207 impl ReceptorPolicy<RequestMessage> for PingGate {
208 fn evaluate(&self, maybe_ping: &RequestMessage) -> TransitStatus {
209 if maybe_ping.content == "PING" {
210 TransitStatus::Accepted
211 } else {
212 TransitStatus::Forbidden
213 }
214 }
215 }
216
217 worker! {
218 name: LuckyNumberDeterminer<RequestMessage, bool>,
219 config: {
220 lotto_number: u32,
221 },
222 handle: |message, _trace, config| async move {
223 message.lucky_number == config.lotto_number
224 }
225 }
226
227 worker! {
228 name: PingPongWorker<RequestMessage, PongMessage>,
229 policies: {
230 with_receptor_gate: [PingGate]
231 },
232 handle: |_message, _trace| async move {
233 PongMessage {
234 result: "PONG".to_string(),
235 }
236 }
237 }
238
239 #[cfg(feature = "std")]
240 crate::test_worker! {
241 name: lucky_number_worker_checks_winner,
242 setup: || {
243 LuckyNumberDeterminer::new(LuckyNumberDeterminerConf { lotto_number: 42 })
244 },
245 assertions: |worker| async move {
246 assert_eq!(worker.queue_capacity(), 64);
247
248 let winner = worker.relay(Arc::new(RequestMessage {
249 content: "PING".to_string(),
250 lucky_number: 42,
251 })).await?;
252 assert!(winner);
253
254 let loser = worker.relay(Arc::new(RequestMessage {
255 content: "PING".to_string(),
256 lucky_number: 7,
257 })).await?;
258 assert!(!loser);
259
260 Ok(())
261 }
262 }
263
264 #[cfg(feature = "std")]
265 crate::test_worker! {
266 name: test_ping_pong_worker,
267 setup: || {
268 PingPongWorker::new(())
269 },
270 assertions: |worker| async move {
271 let ping_msg = RequestMessage {
273 content: "PING".to_string(),
274 lucky_number: 42,
275 };
276 let response = worker.relay(Arc::new(ping_msg)).await?;
277 assert_eq!(response, PongMessage { result: "PONG".to_string() });
278
279 let pong_msg = RequestMessage {
281 content: "PONG".to_string(),
282 lucky_number: 42,
283 };
284
285 let result = worker.relay(Arc::new(pong_msg)).await;
286 assert!(matches!(result, Err(WorkerRelayError::Rejected(_))));
287
288 Ok(())
289 }
290 }
291}