ash_flare/macros.rs
1//! Convenience macros for building supervision trees
2
3/// Implement the Worker trait with minimal boilerplate for the run method only
4///
5/// # Examples
6///
7/// ```
8/// use ash_flare::impl_worker;
9/// use std::time::Duration;
10///
11/// struct MyWorker;
12///
13/// impl_worker! {
14/// MyWorker, std::io::Error => {
15/// // do work
16/// tokio::time::sleep(Duration::from_millis(1)).await;
17/// Ok(())
18/// }
19/// }
20/// ```
21#[macro_export]
22macro_rules! impl_worker {
23 ($worker:ty, $error:ty => $run_body:block) => {
24 #[async_trait::async_trait]
25 impl $crate::Worker for $worker {
26 type Error = $error;
27
28 async fn run(&mut self) -> Result<(), Self::Error> $run_body
29 }
30 };
31}
32
33/// Implement the Worker trait with access to self for stateful workers
34///
35/// # Examples
36///
37/// ```
38/// use ash_flare::impl_worker_stateful;
39/// use std::time::Duration;
40///
41/// struct MyWorker {
42/// counter: usize,
43/// }
44///
45/// impl_worker_stateful! {
46/// MyWorker, std::io::Error => |self| {
47/// self.counter += 1;
48/// tokio::time::sleep(Duration::from_millis(1)).await;
49/// Ok(())
50/// }
51/// }
52/// ```
53#[macro_export]
54macro_rules! impl_worker_stateful {
55 ($worker:ty, $error:ty => |$self:ident| $run_body:block) => {
56 #[async_trait::async_trait]
57 impl $crate::Worker for $worker {
58 type Error = $error;
59
60 async fn run(&mut $self) -> Result<(), Self::Error> $run_body
61 }
62 };
63}
64
65/// Implement the Worker trait with a mailbox for message-passing workers
66///
67/// This macro creates a worker that receives string messages from a mailbox.
68/// The worker will loop receiving messages until the mailbox is closed.
69/// The process body should not return a Result - just process the message.
70///
71/// # Examples
72///
73/// ```
74/// use ash_flare::impl_worker_mailbox;
75///
76/// struct MessageWorker {
77/// worker_id: usize,
78/// mailbox: ash_flare::Mailbox,
79/// }
80///
81/// impl_worker_mailbox! {
82/// MessageWorker, std::io::Error => |self, msg| {
83/// println!("[Worker {}] Received: {}", self.worker_id, msg);
84/// }
85/// }
86/// ```
87#[macro_export]
88macro_rules! impl_worker_mailbox {
89 ($worker:ty, $error:ty => |$self:ident, $msg:ident| $process_body:block) => {
90 #[async_trait::async_trait]
91 impl $crate::Worker for $worker {
92 type Error = $error;
93
94 async fn run(&mut $self) -> Result<(), Self::Error> {
95 while let Some($msg) = $self.mailbox.recv().await $process_body
96 Ok(())
97 }
98 }
99 };
100}
101
102/// Build a stateful supervision tree with shared in-memory key-value store
103///
104/// # Examples
105///
106/// ```
107/// use ash_flare::{stateful_supervision_tree, Worker, WorkerContext};
108/// use async_trait::async_trait;
109/// use std::sync::Arc;
110///
111/// struct MyWorker {
112/// ctx: Arc<WorkerContext>,
113/// }
114///
115/// #[async_trait]
116/// impl Worker for MyWorker {
117/// type Error = std::io::Error;
118/// async fn run(&mut self) -> Result<(), Self::Error> {
119/// self.ctx.set("key", serde_json::json!("value"));
120/// Ok(())
121/// }
122/// }
123///
124/// let spec = stateful_supervision_tree! {
125/// name: "app",
126/// strategy: OneForOne,
127/// intensity: (5, 10),
128/// workers: [
129/// ("worker-1", |ctx| MyWorker { ctx }, Permanent),
130/// ("worker-2", |ctx| MyWorker { ctx }, Transient),
131/// ],
132/// supervisors: []
133/// };
134/// ```
135#[macro_export]
136macro_rules! stateful_supervision_tree {
137 (
138 name: $name:expr,
139 strategy: $strategy:ident,
140 intensity: ($max:expr, $secs:expr),
141 workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
142 supervisors: [ $($sup:expr),* $(,)? ]
143 ) => {
144 {
145 let spec = $crate::StatefulSupervisorSpec::new($name)
146 .with_restart_strategy($crate::RestartStrategy::$strategy)
147 .with_restart_intensity($crate::RestartIntensity {
148 max_restarts: $max,
149 within_seconds: $secs,
150 })
151 $(
152 .with_worker($id, $factory, $crate::RestartPolicy::$policy)
153 )*
154 $(
155 .with_supervisor($sup)
156 )*;
157 spec
158 }
159 };
160 (
161 name: $name:expr,
162 strategy: $strategy:ident,
163 workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
164 supervisors: [ $($sup:expr),* $(,)? ]
165 ) => {
166 {
167 let spec = $crate::StatefulSupervisorSpec::new($name)
168 .with_restart_strategy($crate::RestartStrategy::$strategy)
169 $(
170 .with_worker($id, $factory, $crate::RestartPolicy::$policy)
171 )*
172 $(
173 .with_supervisor($sup)
174 )*;
175 spec
176 }
177 };
178}
179
180/// Build a supervision tree with a declarative syntax
181///
182/// # Examples
183///
184/// ```
185/// use ash_flare::{supervision_tree, Worker};
186/// use async_trait::async_trait;
187///
188/// struct MyWorker;
189///
190/// #[async_trait]
191/// impl Worker for MyWorker {
192/// type Error = std::io::Error;
193/// async fn run(&mut self) -> Result<(), Self::Error> { Ok(()) }
194/// }
195///
196/// impl MyWorker {
197/// fn new() -> Self { Self }
198/// }
199///
200/// let spec = supervision_tree! {
201/// name: "app",
202/// strategy: OneForOne,
203/// intensity: (5, 10), // max_restarts, within_seconds
204/// workers: [
205/// ("worker-1", || MyWorker::new(), Permanent),
206/// ("worker-2", || MyWorker::new(), Transient),
207/// ],
208/// supervisors: []
209/// };
210/// ```
211#[macro_export]
212macro_rules! supervision_tree {
213 (
214 name: $name:expr,
215 strategy: $strategy:ident,
216 intensity: ($max:expr, $secs:expr),
217 workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
218 supervisors: [ $($sup:expr),* $(,)? ]
219 ) => {
220 {
221 let spec = $crate::SupervisorSpec::new($name)
222 .with_restart_strategy($crate::RestartStrategy::$strategy)
223 .with_restart_intensity($crate::RestartIntensity {
224 max_restarts: $max,
225 within_seconds: $secs,
226 })
227 $(
228 .with_worker($id, $factory, $crate::RestartPolicy::$policy)
229 )*
230 $(
231 .with_supervisor($sup)
232 )*;
233 spec
234 }
235 };
236 (
237 name: $name:expr,
238 strategy: $strategy:ident,
239 workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
240 supervisors: [ $($sup:expr),* $(,)? ]
241 ) => {
242 {
243 let spec = $crate::SupervisorSpec::new($name)
244 .with_restart_strategy($crate::RestartStrategy::$strategy)
245 $(
246 .with_worker($id, $factory, $crate::RestartPolicy::$policy)
247 )*
248 $(
249 .with_supervisor($sup)
250 )*;
251 spec
252 }
253 };
254}
255
256/// Start a distributed supervisor server with TCP or Unix socket
257///
258/// # Examples
259///
260/// ```ignore
261/// // Ignored due to requiring actual network binding
262/// use ash_flare::serve_supervisor;
263///
264/// // TCP server
265/// serve_supervisor!(tcp, handle, "127.0.0.1:8080");
266///
267/// // Unix socket server (Unix only)
268/// serve_supervisor!(unix, handle, "/tmp/supervisor.sock");
269/// ```
270#[macro_export]
271macro_rules! serve_supervisor {
272 (tcp, $handle:expr, $addr:expr) => {{
273 let server = $crate::distributed::SupervisorServer::new($handle);
274 tokio::spawn(async move { server.listen_tcp($addr).await })
275 }};
276 (unix, $handle:expr, $path:expr) => {{
277 let server = $crate::distributed::SupervisorServer::new($handle);
278 tokio::spawn(async move { server.listen_unix($path).await })
279 }};
280}
281
282/// Connect to a remote supervisor via TCP or Unix socket
283///
284/// # Examples
285///
286/// ```ignore
287/// // Ignored due to requiring actual network connection
288/// use ash_flare::connect_supervisor;
289///
290/// // Connect via TCP
291/// let remote = connect_supervisor!(tcp, "127.0.0.1:8080").await?;
292///
293/// // Connect via Unix socket (Unix only)
294/// let remote = connect_supervisor!(unix, "/tmp/supervisor.sock").await?;
295/// ```
296#[macro_export]
297macro_rules! connect_supervisor {
298 (tcp, $addr:expr) => {
299 $crate::distributed::RemoteSupervisorHandle::connect_tcp($addr)
300 };
301 (unix, $path:expr) => {
302 $crate::distributed::RemoteSupervisorHandle::connect_unix($path)
303 };
304}
305
306/// Create and start a distributed supervision system with server and client
307///
308/// # Examples
309///
310/// ```ignore
311/// // Ignored due to requiring actual network binding
312/// use ash_flare::distributed_system;
313///
314/// distributed_system! {
315/// server: tcp @ "127.0.0.1:8080" => {
316/// name: "remote-app",
317/// strategy: OneForOne,
318/// workers: [
319/// ("worker-1", || MyWorker::new(), Permanent),
320/// ],
321/// supervisors: []
322/// }
323/// }
324/// ```
325#[macro_export]
326macro_rules! distributed_system {
327 (
328 server: tcp @ $addr:expr => {
329 name: $name:expr,
330 strategy: $strategy:ident,
331 workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
332 supervisors: [ $($sup:expr),* $(,)? ]
333 }
334 ) => {
335 {
336 let spec = supervision_tree! {
337 name: $name,
338 strategy: $strategy,
339 workers: [ $(($id, $factory, $policy)),* ],
340 supervisors: [ $($sup),* ]
341 };
342 let handle = $crate::SupervisorHandle::start(spec);
343 let server = $crate::distributed::SupervisorServer::new(handle.clone());
344 let server_task = tokio::spawn(async move {
345 server.listen_tcp($addr).await
346 });
347 (handle, server_task)
348 }
349 };
350 (
351 server: unix @ $path:expr => {
352 name: $name:expr,
353 strategy: $strategy:ident,
354 workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
355 supervisors: [ $($sup:expr),* $(,)? ]
356 }
357 ) => {
358 {
359 let spec = supervision_tree! {
360 name: $name,
361 strategy: $strategy,
362 workers: [ $(($id, $factory, $policy)),* ],
363 supervisors: [ $($sup),* ]
364 };
365 let handle = $crate::SupervisorHandle::start(spec);
366 let server = $crate::distributed::SupervisorServer::new(handle.clone());
367 let server_task = tokio::spawn(async move {
368 server.listen_unix($path).await
369 });
370 (handle, server_task)
371 }
372 };
373}