Skip to main content

gust_runtime/
lib.rs

1#![warn(missing_docs)]
2//! # Gust Runtime
3//!
4//! The runtime support library for compiled Gust state machine programs.
5//!
6//! When the Gust compiler generates Rust code from `.gu` source files, the
7//! generated code depends on this crate for core traits and utilities.
8//! Generated modules typically begin with `use gust_runtime::prelude::*;`
9//! to bring all necessary items into scope.
10//!
11//! ## What this crate provides
12//!
13//! - **[`prelude::Machine`]** -- the base trait every generated state machine implements,
14//!   providing state inspection and JSON serialization.
15//! - **[`prelude::Supervisor`]** and **[`prelude::SupervisorRuntime`]** -- structured concurrency
16//!   primitives for running multiple child machines under a supervision tree,
17//!   with configurable restart strategies.
18//! - **[`prelude::Envelope`]** -- a typed message wrapper for cross-machine communication.
19//! - **[`prelude::SupervisorAction`]** and **[`prelude::RestartStrategy`]** -- enums that control
20//!   how failures propagate and which children are restarted.
21//!
22//! ## Re-exports
23//!
24//! The prelude also re-exports [`serde`], [`serde_json`], and [`thiserror`] so
25//! that generated code does not need to declare these as direct dependencies.
26//!
27//! ## Usage
28//!
29//! You normally do **not** depend on this crate directly. Instead, the Gust
30//! compiler adds it to your generated code automatically. If you need to
31//! interact with generated machines from hand-written Rust, add the dependency
32//! to your `Cargo.toml`:
33//!
34//! ```toml
35//! [dependencies]
36//! gust-runtime = "0.1"
37//! ```
38
39/// Re-exports of the runtime traits and types that generated Gust code
40/// relies on. Generated `.g.rs` files `use gust_runtime::prelude::*;` to
41/// bring `Machine`, `Supervisor`, `Envelope`, `RestartStrategy`, and
42/// third-party re-exports (`serde`, `serde_json`, `thiserror`) into scope.
43pub mod prelude {
44    use std::future::Future;
45    use std::sync::Arc;
46    use std::sync::atomic::{AtomicUsize, Ordering};
47    use tokio::sync::Mutex;
48    use tokio::task::JoinSet;
49
50    pub use serde::{Deserialize, Serialize};
51    pub use serde_json;
52    pub use thiserror;
53
54    /// The base trait for all Gust state machines.
55    ///
56    /// Every machine generated by the Gust compiler implements `Machine`,
57    /// which provides state inspection and JSON round-trip serialization.
58    /// The trait requires [`Serialize`] and [`Deserialize`] so that machine
59    /// instances can be persisted, transferred, or logged as JSON.
60    ///
61    /// # Associated types
62    ///
63    /// * `State` -- the enum of possible states for this machine. Must be
64    ///   `Debug + Clone + Serialize + Deserialize`.
65    ///
66    /// # Examples
67    ///
68    /// ```rust,ignore
69    /// use gust_runtime::prelude::*;
70    ///
71    /// // Given a generated TrafficLight machine:
72    /// let light = TrafficLight::new();
73    /// println!("Current state: {:?}", light.current_state());
74    ///
75    /// // Serialize to JSON for persistence
76    /// let json = light.to_json().unwrap();
77    ///
78    /// // Restore from JSON
79    /// let restored = TrafficLight::from_json(&json).unwrap();
80    /// ```
81    pub trait Machine: Serialize + for<'de> Deserialize<'de> {
82        /// The enum type representing all possible states of this machine.
83        type State: std::fmt::Debug + Clone + Serialize + for<'de> Deserialize<'de>;
84
85        /// Returns a reference to the machine's current state.
86        fn current_state(&self) -> &Self::State;
87
88        /// Serializes the entire machine to a pretty-printed JSON string.
89        ///
90        /// This captures both the current state and any associated data,
91        /// allowing the machine to be persisted or transmitted.
92        fn to_json(&self) -> Result<String, serde_json::Error> {
93            serde_json::to_string_pretty(self)
94        }
95
96        /// Deserializes a machine from a JSON string.
97        ///
98        /// The JSON must have been produced by [`to_json`](Machine::to_json)
99        /// (or an equivalent serializer) for the same machine type.
100        fn from_json(json: &str) -> Result<Self, serde_json::Error>
101        where
102            Self: Sized,
103        {
104            serde_json::from_str(json)
105        }
106    }
107
108    /// A trait for supervised machine groups (structured concurrency).
109    ///
110    /// Supervisors monitor a set of child machines and react when any child
111    /// enters a failure state. The supervisor decides whether to restart the
112    /// child, escalate the error, or ignore it by returning a
113    /// [`SupervisorAction`].
114    ///
115    /// # Examples
116    ///
117    /// ```rust,ignore
118    /// use gust_runtime::prelude::*;
119    ///
120    /// struct MyGroup;
121    ///
122    /// impl Supervisor for MyGroup {
123    ///     type Error = String;
124    ///
125    ///     fn on_child_failure(&mut self, child_id: &str, error: &String) -> SupervisorAction {
126    ///         eprintln!("Child {child_id} failed: {error}");
127    ///         SupervisorAction::Restart
128    ///     }
129    /// }
130    /// ```
131    pub trait Supervisor {
132        /// The error type reported by child machines.
133        type Error: std::fmt::Debug;
134
135        /// Called when a child machine enters a failure state.
136        ///
137        /// The supervisor inspects the `child_id` and `error` and returns
138        /// a [`SupervisorAction`] that determines how the runtime should
139        /// respond.
140        fn on_child_failure(&mut self, child_id: &str, error: &Self::Error) -> SupervisorAction;
141    }
142
143    /// The action a [`Supervisor`] takes when a child machine fails.
144    #[derive(Debug, Clone)]
145    pub enum SupervisorAction {
146        /// Restart the failed child machine from its initial state.
147        ///
148        /// The child is re-initialized and begins executing again. Other
149        /// children are unaffected (unless [`RestartStrategy::OneForAll`]
150        /// or [`RestartStrategy::RestForOne`] is in effect).
151        Restart,
152
153        /// Stop the failed child and propagate the error up the supervision tree.
154        ///
155        /// Use this when a child failure is unrecoverable and the parent
156        /// supervisor (or the top-level caller) should handle it.
157        Escalate,
158
159        /// Ignore the failure and let the child remain in its failed state.
160        ///
161        /// The child is not restarted. This is appropriate for non-critical
162        /// background tasks whose failure does not affect the overall system.
163        Ignore,
164    }
165
166    /// A typed message envelope for cross-machine communication.
167    ///
168    /// `Envelope` wraps a payload of type `T` with routing metadata
169    /// (`source`, `target`) and an optional `correlation_id` for
170    /// request-response patterns. It is serializable so it can be
171    /// transmitted across process or network boundaries.
172    ///
173    /// # Examples
174    ///
175    /// ```rust
176    /// use gust_runtime::prelude::*;
177    ///
178    /// let envelope = Envelope::new("order-service", "payment-service", 42u64)
179    ///     .with_correlation("req-001");
180    ///
181    /// assert_eq!(envelope.source, "order-service");
182    /// assert_eq!(envelope.target, "payment-service");
183    /// assert_eq!(envelope.payload, 42u64);
184    /// assert_eq!(envelope.correlation_id.as_deref(), Some("req-001"));
185    /// ```
186    #[derive(Debug, Clone, Serialize, Deserialize)]
187    pub struct Envelope<T: Serialize> {
188        /// The identifier of the sending machine or service.
189        pub source: String,
190        /// The identifier of the intended recipient machine or service.
191        pub target: String,
192        /// The message payload.
193        pub payload: T,
194        /// An optional correlation identifier for matching requests to responses.
195        pub correlation_id: Option<String>,
196    }
197
198    impl<T: Serialize> Envelope<T> {
199        /// Creates a new envelope with the given source, target, and payload.
200        ///
201        /// The `correlation_id` is initially `None`. Use
202        /// [`with_correlation`](Envelope::with_correlation) to set it.
203        pub fn new(source: impl Into<String>, target: impl Into<String>, payload: T) -> Self {
204            Self {
205                source: source.into(),
206                target: target.into(),
207                payload,
208                correlation_id: None,
209            }
210        }
211
212        /// Sets the correlation identifier on this envelope (builder pattern).
213        ///
214        /// Correlation IDs are useful for matching asynchronous responses
215        /// back to the original request.
216        pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
217            self.correlation_id = Some(id.into());
218            self
219        }
220    }
221
222    /// A handle to a child task spawned by a [`SupervisorRuntime`].
223    ///
224    /// Contains the logical identifier of the child, which is used for
225    /// logging and passed to [`Supervisor::on_child_failure`] when the
226    /// child reports an error.
227    #[derive(Debug, Clone)]
228    pub struct ChildHandle {
229        /// The logical identifier of this child task.
230        pub id: String,
231    }
232
233    /// Strategy that determines which children are restarted when one fails.
234    ///
235    /// These strategies are modeled after Erlang/OTP supervisor behaviors
236    /// and are used by [`SupervisorRuntime::restart_scope`] to compute
237    /// the range of children to restart.
238    ///
239    /// # Variants
240    ///
241    /// Given children `[A, B, C, D, E]` where `C` (index 2) fails:
242    ///
243    /// | Strategy | Restarted children |
244    /// |---|---|
245    /// | `OneForOne` | `[C]` |
246    /// | `OneForAll` | `[A, B, C, D, E]` |
247    /// | `RestForOne` | `[C, D, E]` |
248    #[derive(Debug, Clone, Copy, Default)]
249    pub enum RestartStrategy {
250        /// Restart only the failed child. Other children are unaffected.
251        ///
252        /// This is the default strategy and is appropriate when children
253        /// are independent of each other.
254        #[default]
255        OneForOne,
256
257        /// Restart **all** children when any single child fails.
258        ///
259        /// Use this when children have interdependencies and a consistent
260        /// group state is required.
261        OneForAll,
262
263        /// Restart the failed child and all children started **after** it.
264        ///
265        /// This is useful when later children depend on earlier ones:
266        /// restarting from the failure point forward re-establishes the
267        /// dependency chain without disturbing earlier, independent children.
268        RestForOne,
269    }
270
271    /// An async runtime for supervising a group of child tasks.
272    ///
273    /// `SupervisorRuntime` manages a [`JoinSet`] of async tasks and applies
274    /// a [`RestartStrategy`] to determine which children to restart when a
275    /// failure occurs. It handles the concurrency details of spawning,
276    /// joining, and tracking child tasks.
277    ///
278    /// # Examples
279    ///
280    /// ```rust,ignore
281    /// use gust_runtime::prelude::*;
282    ///
283    /// let runtime = SupervisorRuntime::with_strategy(RestartStrategy::OneForAll);
284    /// let handle = runtime.spawn_named("worker-1", async {
285    ///     // ... child machine logic ...
286    ///     Ok(())
287    /// });
288    ///
289    /// // Wait for the next child to complete
290    /// if let Some(result) = runtime.join_next().await {
291    ///     match result {
292    ///         Ok(()) => println!("child completed successfully"),
293    ///         Err(e) => println!("child failed: {e}"),
294    ///     }
295    /// }
296    /// ```
297    #[derive(Debug, Clone)]
298    pub struct SupervisorRuntime {
299        /// The set of currently running child tasks.
300        tasks: Arc<Mutex<JoinSet<Result<(), String>>>>,
301        /// Counter for spawns that are waiting on the lock.
302        pending_spawns: Arc<AtomicUsize>,
303        /// The restart strategy governing failure responses.
304        strategy: RestartStrategy,
305    }
306
307    impl Default for SupervisorRuntime {
308        fn default() -> Self {
309            Self::new()
310        }
311    }
312
313    impl SupervisorRuntime {
314        /// Creates a new `SupervisorRuntime` with the default
315        /// [`RestartStrategy::OneForOne`] strategy.
316        pub fn new() -> Self {
317            Self::with_strategy(RestartStrategy::OneForOne)
318        }
319
320        /// Creates a new `SupervisorRuntime` with the specified restart strategy.
321        pub fn with_strategy(strategy: RestartStrategy) -> Self {
322            Self {
323                tasks: Arc::new(Mutex::new(JoinSet::new())),
324                pending_spawns: Arc::new(AtomicUsize::new(0)),
325                strategy,
326            }
327        }
328
329        /// Spawns a named child task and returns a [`ChildHandle`].
330        ///
331        /// The future `fut` is spawned onto the Tokio runtime. If the
332        /// internal task set lock is immediately available the spawn happens
333        /// synchronously; otherwise a helper task is used to avoid blocking.
334        ///
335        /// The `id` is stored in the returned [`ChildHandle`] for
336        /// identification purposes.
337        pub fn spawn_named<F>(&self, id: impl Into<String>, fut: F) -> ChildHandle
338        where
339            F: Future<Output = Result<(), String>> + Send + 'static,
340        {
341            let id = id.into();
342            let task_id = id.clone();
343            match self.tasks.try_lock() {
344                Ok(mut tasks) => {
345                    tasks.spawn(fut);
346                }
347                _ => {
348                    self.pending_spawns.fetch_add(1, Ordering::SeqCst);
349                    let tasks = self.tasks.clone();
350                    let pending_spawns = self.pending_spawns.clone();
351                    tokio::spawn(async move {
352                        tasks.lock().await.spawn(fut);
353                        pending_spawns.fetch_sub(1, Ordering::SeqCst);
354                    });
355                }
356            }
357            ChildHandle { id: task_id }
358        }
359
360        /// Waits for the next child task to complete and returns its result.
361        ///
362        /// Returns `Some(Ok(()))` when a child succeeds, `Some(Err(..))` when
363        /// a child fails, or `None` when all children have completed and no
364        /// pending spawns remain.
365        ///
366        /// This method will yield cooperatively if there are pending spawns
367        /// that have not yet been added to the task set.
368        pub async fn join_next(&self) -> Option<Result<(), String>> {
369            loop {
370                let next = self.tasks.lock().await.join_next().await;
371                match next {
372                    Some(Ok(inner)) => return Some(inner),
373                    Some(Err(join_err)) => {
374                        return Some(Err(format!("task join error: {join_err}")));
375                    }
376                    None if self.pending_spawns.load(Ordering::SeqCst) == 0 => return None,
377                    None => tokio::task::yield_now().await,
378                }
379            }
380        }
381
382        /// Returns the [`RestartStrategy`] configured for this runtime.
383        pub fn strategy(&self) -> RestartStrategy {
384            self.strategy
385        }
386
387        /// Computes the range of child indices that should be restarted
388        /// when the child at `failed_child_index` fails.
389        ///
390        /// The returned range depends on the configured [`RestartStrategy`]:
391        ///
392        /// - **OneForOne**: only the failed child (`index..index+1`)
393        /// - **OneForAll**: all children (`0..child_count`)
394        /// - **RestForOne**: the failed child and all that follow (`index..child_count`)
395        pub fn restart_scope(
396            &self,
397            failed_child_index: usize,
398            child_count: usize,
399        ) -> std::ops::Range<usize> {
400            match self.strategy {
401                RestartStrategy::OneForOne => {
402                    failed_child_index..failed_child_index.saturating_add(1)
403                }
404                RestartStrategy::OneForAll => 0..child_count,
405                RestartStrategy::RestForOne => failed_child_index..child_count,
406            }
407        }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::prelude::{RestartStrategy, SupervisorRuntime};
414    use std::time::Duration;
415
416    #[test]
417    fn restart_scope_matches_strategy() {
418        let one_for_one = SupervisorRuntime::with_strategy(RestartStrategy::OneForOne);
419        assert_eq!(one_for_one.restart_scope(2, 5), 2..3);
420
421        let one_for_all = SupervisorRuntime::with_strategy(RestartStrategy::OneForAll);
422        assert_eq!(one_for_all.restart_scope(2, 5), 0..5);
423
424        let rest_for_one = SupervisorRuntime::with_strategy(RestartStrategy::RestForOne);
425        assert_eq!(rest_for_one.restart_scope(2, 5), 2..5);
426    }
427
428    #[tokio::test]
429    async fn join_next_observes_immediately_spawned_task() {
430        let runtime = SupervisorRuntime::new();
431        runtime.spawn_named("worker-1", async { Ok::<(), String>(()) });
432
433        let joined = tokio::time::timeout(Duration::from_secs(1), runtime.join_next())
434            .await
435            .expect("join_next should not hang");
436
437        assert!(matches!(joined, Some(Ok(()))));
438    }
439}