Skip to main content

ash_flare/
macros.rs

1//! Convenience macros for building supervision trees
2
3/// Implement the Worker trait with minimal boilerplate for the run method only
4///
5/// # Examples
6///
7/// ```
8/// use ash_flare::impl_worker;
9/// use std::time::Duration;
10///
11/// struct MyWorker;
12///
13/// impl_worker! {
14///     MyWorker, std::io::Error => {
15///         // do work
16///         tokio::time::sleep(Duration::from_millis(1)).await;
17///         Ok(())
18///     }
19/// }
20/// ```
21#[macro_export]
22macro_rules! impl_worker {
23    ($worker:ty, $error:ty => $run_body:block) => {
24        #[async_trait::async_trait]
25        impl $crate::Worker for $worker {
26            type Error = $error;
27
28            async fn run(&mut self) -> Result<(), Self::Error> $run_body
29        }
30    };
31}
32
33/// Implement the Worker trait with access to self for stateful workers
34///
35/// # Examples
36///
37/// ```
38/// use ash_flare::impl_worker_stateful;
39/// use std::time::Duration;
40///
41/// struct MyWorker {
42///     counter: usize,
43/// }
44///
45/// impl_worker_stateful! {
46///     MyWorker, std::io::Error => |self| {
47///         self.counter += 1;
48///         tokio::time::sleep(Duration::from_millis(1)).await;
49///         Ok(())
50///     }
51/// }
52/// ```
53#[macro_export]
54macro_rules! impl_worker_stateful {
55    ($worker:ty, $error:ty => |$self:ident| $run_body:block) => {
56        #[async_trait::async_trait]
57        impl $crate::Worker for $worker {
58            type Error = $error;
59
60            async fn run(&mut $self) -> Result<(), Self::Error> $run_body
61        }
62    };
63}
64
65/// Implement the Worker trait with a mailbox for message-passing workers
66///
67/// This macro creates a worker that receives string messages from a mailbox.
68/// The worker will loop receiving messages until the mailbox is closed.
69/// The process body should not return a Result - just process the message.
70///
71/// # Examples
72///
73/// ```
74/// use ash_flare::impl_worker_mailbox;
75///
76/// struct MessageWorker {
77///     worker_id: usize,
78///     mailbox: ash_flare::Mailbox,
79/// }
80///
81/// impl_worker_mailbox! {
82///     MessageWorker, std::io::Error => |self, msg| {
83///         println!("[Worker {}] Received: {}", self.worker_id, msg);
84///     }
85/// }
86/// ```
87#[macro_export]
88macro_rules! impl_worker_mailbox {
89    ($worker:ty, $error:ty => |$self:ident, $msg:ident| $process_body:block) => {
90        #[async_trait::async_trait]
91        impl $crate::Worker for $worker {
92            type Error = $error;
93
94            async fn run(&mut $self) -> Result<(), Self::Error> {
95                while let Some($msg) = $self.mailbox.recv().await $process_body
96                Ok(())
97            }
98        }
99    };
100}
101
102/// Build a stateful supervision tree with shared in-memory key-value store
103///
104/// # Examples
105///
106/// ```
107/// use ash_flare::{stateful_supervision_tree, Worker, WorkerContext};
108/// use async_trait::async_trait;
109/// use std::sync::Arc;
110///
111/// struct MyWorker {
112///     ctx: Arc<WorkerContext>,
113/// }
114///
115/// #[async_trait]
116/// impl Worker for MyWorker {
117///     type Error = std::io::Error;
118///     async fn run(&mut self) -> Result<(), Self::Error> {
119///         self.ctx.set("key", serde_json::json!("value"));
120///         Ok(())
121///     }
122/// }
123///
124/// let spec = stateful_supervision_tree! {
125///     name: "app",
126///     strategy: OneForOne,
127///     intensity: (5, 10),
128///     workers: [
129///         ("worker-1", |ctx| MyWorker { ctx }, Permanent),
130///         ("worker-2", |ctx| MyWorker { ctx }, Transient),
131///     ],
132///     supervisors: []
133/// };
134/// ```
135#[macro_export]
136macro_rules! stateful_supervision_tree {
137    (
138        name: $name:expr,
139        strategy: $strategy:ident,
140        intensity: ($max:expr, $secs:expr),
141        workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
142        supervisors: [ $($sup:expr),* $(,)? ]
143    ) => {
144        {
145            let spec = $crate::StatefulSupervisorSpec::new($name)
146                .with_restart_strategy($crate::RestartStrategy::$strategy)
147                .with_restart_intensity($crate::RestartIntensity {
148                    max_restarts: $max,
149                    within_seconds: $secs,
150                })
151                $(
152                    .with_worker($id, $factory, $crate::RestartPolicy::$policy)
153                )*
154                $(
155                    .with_supervisor($sup)
156                )*;
157            spec
158        }
159    };
160    (
161        name: $name:expr,
162        strategy: $strategy:ident,
163        workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
164        supervisors: [ $($sup:expr),* $(,)? ]
165    ) => {
166        {
167            let spec = $crate::StatefulSupervisorSpec::new($name)
168                .with_restart_strategy($crate::RestartStrategy::$strategy)
169                $(
170                    .with_worker($id, $factory, $crate::RestartPolicy::$policy)
171                )*
172                $(
173                    .with_supervisor($sup)
174                )*;
175            spec
176        }
177    };
178}
179
180/// Build a supervision tree with a declarative syntax
181///
182/// # Examples
183///
184/// ```
185/// use ash_flare::{supervision_tree, Worker};
186/// use async_trait::async_trait;
187///
188/// struct MyWorker;
189///
190/// #[async_trait]
191/// impl Worker for MyWorker {
192///     type Error = std::io::Error;
193///     async fn run(&mut self) -> Result<(), Self::Error> { Ok(()) }
194/// }
195///
196/// impl MyWorker {
197///     fn new() -> Self { Self }
198/// }
199///
200/// let spec = supervision_tree! {
201///     name: "app",
202///     strategy: OneForOne,
203///     intensity: (5, 10), // max_restarts, within_seconds
204///     workers: [
205///         ("worker-1", || MyWorker::new(), Permanent),
206///         ("worker-2", || MyWorker::new(), Transient),
207///     ],
208///     supervisors: []
209/// };
210/// ```
211#[macro_export]
212macro_rules! supervision_tree {
213    (
214        name: $name:expr,
215        strategy: $strategy:ident,
216        intensity: ($max:expr, $secs:expr),
217        workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
218        supervisors: [ $($sup:expr),* $(,)? ]
219    ) => {
220        {
221            let spec = $crate::SupervisorSpec::new($name)
222                .with_restart_strategy($crate::RestartStrategy::$strategy)
223                .with_restart_intensity($crate::RestartIntensity {
224                    max_restarts: $max,
225                    within_seconds: $secs,
226                })
227                $(
228                    .with_worker($id, $factory, $crate::RestartPolicy::$policy)
229                )*
230                $(
231                    .with_supervisor($sup)
232                )*;
233            spec
234        }
235    };
236    (
237        name: $name:expr,
238        strategy: $strategy:ident,
239        workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
240        supervisors: [ $($sup:expr),* $(,)? ]
241    ) => {
242        {
243            let spec = $crate::SupervisorSpec::new($name)
244                .with_restart_strategy($crate::RestartStrategy::$strategy)
245                $(
246                    .with_worker($id, $factory, $crate::RestartPolicy::$policy)
247                )*
248                $(
249                    .with_supervisor($sup)
250                )*;
251            spec
252        }
253    };
254}
255
256/// Start a distributed supervisor server with TCP or Unix socket
257///
258/// # Examples
259///
260/// ```ignore
261/// // Ignored due to requiring actual network binding
262/// use ash_flare::serve_supervisor;
263///
264/// // TCP server
265/// serve_supervisor!(tcp, handle, "127.0.0.1:8080");
266///
267/// // Unix socket server (Unix only)
268/// serve_supervisor!(unix, handle, "/tmp/supervisor.sock");
269/// ```
270#[macro_export]
271macro_rules! serve_supervisor {
272    (tcp, $handle:expr, $addr:expr) => {{
273        let server = $crate::distributed::SupervisorServer::new($handle);
274        tokio::spawn(async move { server.listen_tcp($addr).await })
275    }};
276    (unix, $handle:expr, $path:expr) => {{
277        let server = $crate::distributed::SupervisorServer::new($handle);
278        tokio::spawn(async move { server.listen_unix($path).await })
279    }};
280}
281
282/// Connect to a remote supervisor via TCP or Unix socket
283///
284/// # Examples
285///
286/// ```ignore
287/// // Ignored due to requiring actual network connection
288/// use ash_flare::connect_supervisor;
289///
290/// // Connect via TCP
291/// let remote = connect_supervisor!(tcp, "127.0.0.1:8080").await?;
292///
293/// // Connect via Unix socket (Unix only)
294/// let remote = connect_supervisor!(unix, "/tmp/supervisor.sock").await?;
295/// ```
296#[macro_export]
297macro_rules! connect_supervisor {
298    (tcp, $addr:expr) => {
299        $crate::distributed::RemoteSupervisorHandle::connect_tcp($addr)
300    };
301    (unix, $path:expr) => {
302        $crate::distributed::RemoteSupervisorHandle::connect_unix($path)
303    };
304}
305
306/// Create and start a distributed supervision system with server and client
307///
308/// # Examples
309///
310/// ```ignore
311/// // Ignored due to requiring actual network binding
312/// use ash_flare::distributed_system;
313///
314/// distributed_system! {
315///     server: tcp @ "127.0.0.1:8080" => {
316///         name: "remote-app",
317///         strategy: OneForOne,
318///         workers: [
319///             ("worker-1", || MyWorker::new(), Permanent),
320///         ],
321///         supervisors: []
322///     }
323/// }
324/// ```
325#[macro_export]
326macro_rules! distributed_system {
327    (
328        server: tcp @ $addr:expr => {
329            name: $name:expr,
330            strategy: $strategy:ident,
331            workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
332            supervisors: [ $($sup:expr),* $(,)? ]
333        }
334    ) => {
335        {
336            let spec = supervision_tree! {
337                name: $name,
338                strategy: $strategy,
339                workers: [ $(($id, $factory, $policy)),* ],
340                supervisors: [ $($sup),* ]
341            };
342            let handle = $crate::SupervisorHandle::start(spec);
343            let server = $crate::distributed::SupervisorServer::new(handle.clone());
344            let server_task = tokio::spawn(async move {
345                server.listen_tcp($addr).await
346            });
347            (handle, server_task)
348        }
349    };
350    (
351        server: unix @ $path:expr => {
352            name: $name:expr,
353            strategy: $strategy:ident,
354            workers: [ $(($id:expr, $factory:expr, $policy:ident)),* $(,)? ],
355            supervisors: [ $($sup:expr),* $(,)? ]
356        }
357    ) => {
358        {
359            let spec = supervision_tree! {
360                name: $name,
361                strategy: $strategy,
362                workers: [ $(($id, $factory, $policy)),* ],
363                supervisors: [ $($sup),* ]
364            };
365            let handle = $crate::SupervisorHandle::start(spec);
366            let server = $crate::distributed::SupervisorServer::new(handle.clone());
367            let server_task = tokio::spawn(async move {
368                server.listen_unix($path).await
369            });
370            (handle, server_task)
371        }
372    };
373}