Skip to main content

tightbeam/colony/worker/
mod.rs

1//! Worker framework for message processing
2//!
3//! Workers are the fundamental processing units in the colony architecture.
4//! They receive messages, apply policies, and produce responses.
5//!
6//! # Example
7//!
8//! ```ignore
9//! worker! {
10//!     name: MyWorker<RequestMessage, ResponseMessage>,
11//!     config: { threshold: u32 },
12//!     policies: { with_receptor_gate: [MyGate] },
13//!     handle: |message, trace, config| async move {
14//!         ResponseMessage { value: message.value + config.threshold }
15//!     }
16//! }
17//! ```
18
19#[cfg(not(feature = "std"))]
20extern crate alloc;
21#[cfg(not(feature = "std"))]
22use alloc::boxed::Box;
23#[cfg(not(feature = "std"))]
24use alloc::sync::Arc;
25
26#[cfg(feature = "std")]
27use std::sync::Arc;
28
29use core::future::Future;
30use core::pin::Pin;
31
32#[cfg(feature = "derive")]
33use crate::Errorizable;
34
35use crate::policy::{ReceptorPolicy, TransitStatus};
36use crate::trace::TraceCollector;
37use crate::Message;
38
39pub mod macros;
40
41/// Re-export unified runtime primitives with worker-specific type aliases
42pub mod worker_runtime {
43	pub mod rt {
44		pub use crate::runtime::rt::*;
45
46		/// Queue sender type alias (for backwards compatibility)
47		pub type QueueSender<T> = crate::runtime::rt::Sender<T>;
48
49		/// Queue receiver type alias (for backwards compatibility)
50		pub type QueueReceiver<T> = crate::runtime::rt::Receiver<T>;
51	}
52}
53
54pub struct WorkerRequest<I: Send, O> {
55	pub message: Arc<I>,
56	pub respond_to: worker_runtime::rt::OneshotSender<Result<O, TransitStatus>>,
57	pub trace: Arc<TraceCollector>,
58}
59
60#[cfg_attr(feature = "derive", derive(Errorizable))]
61#[derive(Debug)]
62pub enum WorkerRelayError {
63	#[cfg_attr(feature = "derive", error("Worker queue closed"))]
64	QueueClosed,
65	#[cfg_attr(feature = "derive", error("Worker response channel dropped"))]
66	ResponseDropped,
67	#[cfg_attr(feature = "derive", error("Message rejected with status {:?}"))]
68	#[cfg_attr(feature = "derive", from)]
69	Rejected(TransitStatus),
70}
71
72#[cfg(not(feature = "derive"))]
73impl core::fmt::Display for WorkerRelayError {
74	fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
75		match self {
76			Self::QueueClosed => f.write_str("worker queue closed"),
77			Self::ResponseDropped => f.write_str("worker response channel dropped"),
78			Self::Rejected(status) => write!(f, "message rejected with status {:?}", status),
79		}
80	}
81}
82
83#[cfg(not(feature = "derive"))]
84impl std::error::Error for WorkerRelayError {}
85
86pub type WorkerRelayFuture<O> = Pin<Box<dyn Future<Output = Result<O, WorkerRelayError>> + Send + 'static>>;
87pub type WorkerStartFuture<W> = Pin<Box<dyn Future<Output = Result<W, crate::error::TightBeamError>> + Send>>;
88
89#[cfg(feature = "tokio")]
90#[allow(dead_code)]
91pub fn block_on_worker_future<F, T>(future: F) -> Result<T, std::io::Error>
92where
93	F: Future<Output = T> + Send + 'static,
94	T: Send + 'static,
95{
96	// Try to use current runtime if available, otherwise create a new one
97	match tokio::runtime::Handle::try_current() {
98		Ok(handle) => Ok(handle.block_on(future)),
99		Err(_) => {
100			let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build()?;
101			Ok(runtime.block_on(future))
102		}
103	}
104}
105
106#[cfg(all(not(feature = "tokio"), feature = "std"))]
107#[allow(dead_code)]
108pub fn block_on_worker_future<F, T>(future: F) -> T
109where
110	F: Future<Output = T> + Send + 'static,
111	T: Send + 'static,
112{
113	worker_runtime::rt::block_on(future)
114}
115
116pub trait Worker: Send + Sync + Sized {
117	type Input: Send + Sync + 'static;
118	type Output: Send + 'static;
119	type Config: Send + Sync + 'static;
120
121	fn new(config: Self::Config) -> Self;
122
123	fn start(self, trace: Arc<TraceCollector>) -> WorkerStartFuture<Self>;
124
125	fn kill(self) -> ::core::result::Result<(), std::io::Error>;
126
127	fn relay(&self, message: Arc<Self::Input>) -> WorkerRelayFuture<Self::Output>;
128
129	fn queue_capacity(&self) -> usize;
130}
131
132/// Provides static metadata about a worker type
133pub trait WorkerMetadata {
134	/// Returns the registration name for this worker
135	fn name() -> &'static str;
136}
137
138pub struct WorkerPolicies<I: Send> {
139	#[allow(dead_code)]
140	pub(crate) receptor_gates: Vec<Arc<dyn ReceptorPolicy<I> + Send + Sync>>,
141}
142
143impl<I: Send> WorkerPolicies<I> {
144	pub fn receptor_gates(&self) -> &[Arc<dyn ReceptorPolicy<I> + Send + Sync>] {
145		&self.receptor_gates
146	}
147}
148
149impl<I: Send> Default for WorkerPolicies<I> {
150	fn default() -> Self {
151		Self { receptor_gates: Vec::new() }
152	}
153}
154
155pub struct WorkerPolicyBuilder<I: Send> {
156	receptor_gates: Vec<Arc<dyn ReceptorPolicy<I> + Send + Sync>>,
157}
158
159impl<I: Send> Default for WorkerPolicyBuilder<I> {
160	fn default() -> Self {
161		Self { receptor_gates: Vec::new() }
162	}
163}
164
165impl<I: Message + Send> WorkerPolicyBuilder<I> {
166	pub fn build(self) -> WorkerPolicies<I> {
167		WorkerPolicies { receptor_gates: self.receptor_gates }
168	}
169
170	pub fn with_receptor_gate<R, const N: usize>(mut self, gates: [R; N]) -> Self
171	where
172		R: ReceptorPolicy<I> + Send + Sync + 'static,
173	{
174		self.receptor_gates.extend(
175			gates
176				.into_iter()
177				.map(|gate| Arc::new(gate) as Arc<dyn ReceptorPolicy<I> + Send + Sync>),
178		);
179		self
180	}
181}
182
183#[cfg(test)]
184mod tests {
185	use std::sync::Arc;
186
187	use super::WorkerRelayError;
188	use crate::der::Sequence;
189	use crate::policy::{ReceptorPolicy, TransitStatus};
190	use crate::worker;
191	use crate::Beamable;
192
193	#[derive(Beamable, Clone, Debug, PartialEq, Sequence)]
194	pub struct RequestMessage {
195		content: String,
196		lucky_number: u32,
197	}
198
199	#[derive(Sequence, Beamable, Clone, Debug, PartialEq)]
200	pub struct PongMessage {
201		result: String,
202	}
203
204	#[derive(Default)]
205	struct PingGate;
206
207	impl ReceptorPolicy<RequestMessage> for PingGate {
208		fn evaluate(&self, maybe_ping: &RequestMessage) -> TransitStatus {
209			if maybe_ping.content == "PING" {
210				TransitStatus::Accepted
211			} else {
212				TransitStatus::Forbidden
213			}
214		}
215	}
216
217	worker! {
218		name: LuckyNumberDeterminer<RequestMessage, bool>,
219		config: {
220			lotto_number: u32,
221		},
222		handle: |message, _trace, config| async move {
223			message.lucky_number == config.lotto_number
224		}
225	}
226
227	worker! {
228		name: PingPongWorker<RequestMessage, PongMessage>,
229		policies: {
230			with_receptor_gate: [PingGate]
231		},
232		handle: |_message, _trace| async move {
233			PongMessage {
234				result: "PONG".to_string(),
235			}
236		}
237	}
238
239	#[cfg(feature = "std")]
240	crate::test_worker! {
241		name: lucky_number_worker_checks_winner,
242		setup: || {
243			LuckyNumberDeterminer::new(LuckyNumberDeterminerConf { lotto_number: 42 })
244		},
245		assertions: |worker| async move {
246			assert_eq!(worker.queue_capacity(), 64);
247
248			let winner = worker.relay(Arc::new(RequestMessage {
249				content: "PING".to_string(),
250				lucky_number: 42,
251			})).await?;
252			assert!(winner);
253
254			let loser = worker.relay(Arc::new(RequestMessage {
255				content: "PING".to_string(),
256				lucky_number: 7,
257			})).await?;
258			assert!(!loser);
259
260			Ok(())
261		}
262	}
263
264	#[cfg(feature = "std")]
265	crate::test_worker! {
266		name: test_ping_pong_worker,
267		setup: || {
268			PingPongWorker::new(())
269		},
270		assertions: |worker| async move {
271			// Test accepted message
272			let ping_msg = RequestMessage {
273				content: "PING".to_string(),
274				lucky_number: 42,
275			};
276			let response = worker.relay(Arc::new(ping_msg)).await?;
277			assert_eq!(response, PongMessage { result: "PONG".to_string() });
278
279			// Test rejected message
280			let pong_msg = RequestMessage {
281				content: "PONG".to_string(),
282				lucky_number: 42,
283			};
284
285			let result = worker.relay(Arc::new(pong_msg)).await;
286			assert!(matches!(result, Err(WorkerRelayError::Rejected(_))));
287
288			Ok(())
289		}
290	}
291}