gust_runtime/lib.rs
1#![warn(missing_docs)]
2//! # Gust Runtime
3//!
4//! The runtime support library for compiled Gust state machine programs.
5//!
6//! When the Gust compiler generates Rust code from `.gu` source files, the
7//! generated code depends on this crate for core traits and utilities.
8//! Generated modules typically begin with `use gust_runtime::prelude::*;`
9//! to bring all necessary items into scope.
10//!
11//! ## What this crate provides
12//!
13//! - **[`prelude::Machine`]** -- the base trait every generated state machine implements,
14//! providing state inspection and JSON serialization.
15//! - **[`prelude::Supervisor`]** and **[`prelude::SupervisorRuntime`]** -- structured concurrency
16//! primitives for running multiple child machines under a supervision tree,
17//! with configurable restart strategies.
18//! - **[`prelude::Envelope`]** -- a typed message wrapper for cross-machine communication.
19//! - **[`prelude::SupervisorAction`]** and **[`prelude::RestartStrategy`]** -- enums that control
20//! how failures propagate and which children are restarted.
21//!
22//! ## Re-exports
23//!
24//! The prelude also re-exports [`serde`], [`serde_json`], and [`thiserror`] so
25//! that generated code does not need to declare these as direct dependencies.
26//!
27//! ## Usage
28//!
29//! You normally do **not** depend on this crate directly. Instead, the Gust
30//! compiler adds it to your generated code automatically. If you need to
31//! interact with generated machines from hand-written Rust, add the dependency
32//! to your `Cargo.toml`:
33//!
34//! ```toml
35//! [dependencies]
36//! gust-runtime = "0.1"
37//! ```
38
39/// Re-exports of the runtime traits and types that generated Gust code
40/// relies on. Generated `.g.rs` files `use gust_runtime::prelude::*;` to
41/// bring `Machine`, `Supervisor`, `Envelope`, `RestartStrategy`, and
42/// third-party re-exports (`serde`, `serde_json`, `thiserror`) into scope.
43pub mod prelude {
44 use std::future::Future;
45 use std::sync::Arc;
46 use std::sync::atomic::{AtomicUsize, Ordering};
47 use tokio::sync::Mutex;
48 use tokio::task::JoinSet;
49
50 pub use serde::{Deserialize, Serialize};
51 pub use serde_json;
52 pub use thiserror;
53
54 /// The base trait for all Gust state machines.
55 ///
56 /// Every machine generated by the Gust compiler implements `Machine`,
57 /// which provides state inspection and JSON round-trip serialization.
58 /// The trait requires [`Serialize`] and [`Deserialize`] so that machine
59 /// instances can be persisted, transferred, or logged as JSON.
60 ///
61 /// # Associated types
62 ///
63 /// * `State` -- the enum of possible states for this machine. Must be
64 /// `Debug + Clone + Serialize + Deserialize`.
65 ///
66 /// # Examples
67 ///
68 /// ```rust,ignore
69 /// use gust_runtime::prelude::*;
70 ///
71 /// // Given a generated TrafficLight machine:
72 /// let light = TrafficLight::new();
73 /// println!("Current state: {:?}", light.current_state());
74 ///
75 /// // Serialize to JSON for persistence
76 /// let json = light.to_json().unwrap();
77 ///
78 /// // Restore from JSON
79 /// let restored = TrafficLight::from_json(&json).unwrap();
80 /// ```
81 pub trait Machine: Serialize + for<'de> Deserialize<'de> {
82 /// The enum type representing all possible states of this machine.
83 type State: std::fmt::Debug + Clone + Serialize + for<'de> Deserialize<'de>;
84
85 /// Returns a reference to the machine's current state.
86 fn current_state(&self) -> &Self::State;
87
88 /// Serializes the entire machine to a pretty-printed JSON string.
89 ///
90 /// This captures both the current state and any associated data,
91 /// allowing the machine to be persisted or transmitted.
92 fn to_json(&self) -> Result<String, serde_json::Error> {
93 serde_json::to_string_pretty(self)
94 }
95
96 /// Deserializes a machine from a JSON string.
97 ///
98 /// The JSON must have been produced by [`to_json`](Machine::to_json)
99 /// (or an equivalent serializer) for the same machine type.
100 fn from_json(json: &str) -> Result<Self, serde_json::Error>
101 where
102 Self: Sized,
103 {
104 serde_json::from_str(json)
105 }
106 }
107
108 /// A trait for supervised machine groups (structured concurrency).
109 ///
110 /// Supervisors monitor a set of child machines and react when any child
111 /// enters a failure state. The supervisor decides whether to restart the
112 /// child, escalate the error, or ignore it by returning a
113 /// [`SupervisorAction`].
114 ///
115 /// # Examples
116 ///
117 /// ```rust,ignore
118 /// use gust_runtime::prelude::*;
119 ///
120 /// struct MyGroup;
121 ///
122 /// impl Supervisor for MyGroup {
123 /// type Error = String;
124 ///
125 /// fn on_child_failure(&mut self, child_id: &str, error: &String) -> SupervisorAction {
126 /// eprintln!("Child {child_id} failed: {error}");
127 /// SupervisorAction::Restart
128 /// }
129 /// }
130 /// ```
131 pub trait Supervisor {
132 /// The error type reported by child machines.
133 type Error: std::fmt::Debug;
134
135 /// Called when a child machine enters a failure state.
136 ///
137 /// The supervisor inspects the `child_id` and `error` and returns
138 /// a [`SupervisorAction`] that determines how the runtime should
139 /// respond.
140 fn on_child_failure(&mut self, child_id: &str, error: &Self::Error) -> SupervisorAction;
141 }
142
143 /// The action a [`Supervisor`] takes when a child machine fails.
144 #[derive(Debug, Clone)]
145 pub enum SupervisorAction {
146 /// Restart the failed child machine from its initial state.
147 ///
148 /// The child is re-initialized and begins executing again. Other
149 /// children are unaffected (unless [`RestartStrategy::OneForAll`]
150 /// or [`RestartStrategy::RestForOne`] is in effect).
151 Restart,
152
153 /// Stop the failed child and propagate the error up the supervision tree.
154 ///
155 /// Use this when a child failure is unrecoverable and the parent
156 /// supervisor (or the top-level caller) should handle it.
157 Escalate,
158
159 /// Ignore the failure and let the child remain in its failed state.
160 ///
161 /// The child is not restarted. This is appropriate for non-critical
162 /// background tasks whose failure does not affect the overall system.
163 Ignore,
164 }
165
166 /// A typed message envelope for cross-machine communication.
167 ///
168 /// `Envelope` wraps a payload of type `T` with routing metadata
169 /// (`source`, `target`) and an optional `correlation_id` for
170 /// request-response patterns. It is serializable so it can be
171 /// transmitted across process or network boundaries.
172 ///
173 /// # Examples
174 ///
175 /// ```rust
176 /// use gust_runtime::prelude::*;
177 ///
178 /// let envelope = Envelope::new("order-service", "payment-service", 42u64)
179 /// .with_correlation("req-001");
180 ///
181 /// assert_eq!(envelope.source, "order-service");
182 /// assert_eq!(envelope.target, "payment-service");
183 /// assert_eq!(envelope.payload, 42u64);
184 /// assert_eq!(envelope.correlation_id.as_deref(), Some("req-001"));
185 /// ```
186 #[derive(Debug, Clone, Serialize, Deserialize)]
187 pub struct Envelope<T: Serialize> {
188 /// The identifier of the sending machine or service.
189 pub source: String,
190 /// The identifier of the intended recipient machine or service.
191 pub target: String,
192 /// The message payload.
193 pub payload: T,
194 /// An optional correlation identifier for matching requests to responses.
195 pub correlation_id: Option<String>,
196 }
197
198 impl<T: Serialize> Envelope<T> {
199 /// Creates a new envelope with the given source, target, and payload.
200 ///
201 /// The `correlation_id` is initially `None`. Use
202 /// [`with_correlation`](Envelope::with_correlation) to set it.
203 pub fn new(source: impl Into<String>, target: impl Into<String>, payload: T) -> Self {
204 Self {
205 source: source.into(),
206 target: target.into(),
207 payload,
208 correlation_id: None,
209 }
210 }
211
212 /// Sets the correlation identifier on this envelope (builder pattern).
213 ///
214 /// Correlation IDs are useful for matching asynchronous responses
215 /// back to the original request.
216 pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
217 self.correlation_id = Some(id.into());
218 self
219 }
220 }
221
222 /// A handle to a child task spawned by a [`SupervisorRuntime`].
223 ///
224 /// Contains the logical identifier of the child, which is used for
225 /// logging and passed to [`Supervisor::on_child_failure`] when the
226 /// child reports an error.
227 #[derive(Debug, Clone)]
228 pub struct ChildHandle {
229 /// The logical identifier of this child task.
230 pub id: String,
231 }
232
233 /// Strategy that determines which children are restarted when one fails.
234 ///
235 /// These strategies are modeled after Erlang/OTP supervisor behaviors
236 /// and are used by [`SupervisorRuntime::restart_scope`] to compute
237 /// the range of children to restart.
238 ///
239 /// # Variants
240 ///
241 /// Given children `[A, B, C, D, E]` where `C` (index 2) fails:
242 ///
243 /// | Strategy | Restarted children |
244 /// |---|---|
245 /// | `OneForOne` | `[C]` |
246 /// | `OneForAll` | `[A, B, C, D, E]` |
247 /// | `RestForOne` | `[C, D, E]` |
248 #[derive(Debug, Clone, Copy, Default)]
249 pub enum RestartStrategy {
250 /// Restart only the failed child. Other children are unaffected.
251 ///
252 /// This is the default strategy and is appropriate when children
253 /// are independent of each other.
254 #[default]
255 OneForOne,
256
257 /// Restart **all** children when any single child fails.
258 ///
259 /// Use this when children have interdependencies and a consistent
260 /// group state is required.
261 OneForAll,
262
263 /// Restart the failed child and all children started **after** it.
264 ///
265 /// This is useful when later children depend on earlier ones:
266 /// restarting from the failure point forward re-establishes the
267 /// dependency chain without disturbing earlier, independent children.
268 RestForOne,
269 }
270
271 /// An async runtime for supervising a group of child tasks.
272 ///
273 /// `SupervisorRuntime` manages a [`JoinSet`] of async tasks and applies
274 /// a [`RestartStrategy`] to determine which children to restart when a
275 /// failure occurs. It handles the concurrency details of spawning,
276 /// joining, and tracking child tasks.
277 ///
278 /// # Examples
279 ///
280 /// ```rust,ignore
281 /// use gust_runtime::prelude::*;
282 ///
283 /// let runtime = SupervisorRuntime::with_strategy(RestartStrategy::OneForAll);
284 /// let handle = runtime.spawn_named("worker-1", async {
285 /// // ... child machine logic ...
286 /// Ok(())
287 /// });
288 ///
289 /// // Wait for the next child to complete
290 /// if let Some(result) = runtime.join_next().await {
291 /// match result {
292 /// Ok(()) => println!("child completed successfully"),
293 /// Err(e) => println!("child failed: {e}"),
294 /// }
295 /// }
296 /// ```
297 #[derive(Debug, Clone)]
298 pub struct SupervisorRuntime {
299 /// The set of currently running child tasks.
300 tasks: Arc<Mutex<JoinSet<Result<(), String>>>>,
301 /// Counter for spawns that are waiting on the lock.
302 pending_spawns: Arc<AtomicUsize>,
303 /// The restart strategy governing failure responses.
304 strategy: RestartStrategy,
305 }
306
307 impl Default for SupervisorRuntime {
308 fn default() -> Self {
309 Self::new()
310 }
311 }
312
313 impl SupervisorRuntime {
314 /// Creates a new `SupervisorRuntime` with the default
315 /// [`RestartStrategy::OneForOne`] strategy.
316 pub fn new() -> Self {
317 Self::with_strategy(RestartStrategy::OneForOne)
318 }
319
320 /// Creates a new `SupervisorRuntime` with the specified restart strategy.
321 pub fn with_strategy(strategy: RestartStrategy) -> Self {
322 Self {
323 tasks: Arc::new(Mutex::new(JoinSet::new())),
324 pending_spawns: Arc::new(AtomicUsize::new(0)),
325 strategy,
326 }
327 }
328
329 /// Spawns a named child task and returns a [`ChildHandle`].
330 ///
331 /// The future `fut` is spawned onto the Tokio runtime. If the
332 /// internal task set lock is immediately available the spawn happens
333 /// synchronously; otherwise a helper task is used to avoid blocking.
334 ///
335 /// The `id` is stored in the returned [`ChildHandle`] for
336 /// identification purposes.
337 pub fn spawn_named<F>(&self, id: impl Into<String>, fut: F) -> ChildHandle
338 where
339 F: Future<Output = Result<(), String>> + Send + 'static,
340 {
341 let id = id.into();
342 let task_id = id.clone();
343 match self.tasks.try_lock() {
344 Ok(mut tasks) => {
345 tasks.spawn(fut);
346 }
347 _ => {
348 self.pending_spawns.fetch_add(1, Ordering::SeqCst);
349 let tasks = self.tasks.clone();
350 let pending_spawns = self.pending_spawns.clone();
351 tokio::spawn(async move {
352 tasks.lock().await.spawn(fut);
353 pending_spawns.fetch_sub(1, Ordering::SeqCst);
354 });
355 }
356 }
357 ChildHandle { id: task_id }
358 }
359
360 /// Waits for the next child task to complete and returns its result.
361 ///
362 /// Returns `Some(Ok(()))` when a child succeeds, `Some(Err(..))` when
363 /// a child fails, or `None` when all children have completed and no
364 /// pending spawns remain.
365 ///
366 /// This method will yield cooperatively if there are pending spawns
367 /// that have not yet been added to the task set.
368 pub async fn join_next(&self) -> Option<Result<(), String>> {
369 loop {
370 let next = self.tasks.lock().await.join_next().await;
371 match next {
372 Some(Ok(inner)) => return Some(inner),
373 Some(Err(join_err)) => {
374 return Some(Err(format!("task join error: {join_err}")));
375 }
376 None if self.pending_spawns.load(Ordering::SeqCst) == 0 => return None,
377 None => tokio::task::yield_now().await,
378 }
379 }
380 }
381
382 /// Returns the [`RestartStrategy`] configured for this runtime.
383 pub fn strategy(&self) -> RestartStrategy {
384 self.strategy
385 }
386
387 /// Computes the range of child indices that should be restarted
388 /// when the child at `failed_child_index` fails.
389 ///
390 /// The returned range depends on the configured [`RestartStrategy`]:
391 ///
392 /// - **OneForOne**: only the failed child (`index..index+1`)
393 /// - **OneForAll**: all children (`0..child_count`)
394 /// - **RestForOne**: the failed child and all that follow (`index..child_count`)
395 pub fn restart_scope(
396 &self,
397 failed_child_index: usize,
398 child_count: usize,
399 ) -> std::ops::Range<usize> {
400 match self.strategy {
401 RestartStrategy::OneForOne => {
402 failed_child_index..failed_child_index.saturating_add(1)
403 }
404 RestartStrategy::OneForAll => 0..child_count,
405 RestartStrategy::RestForOne => failed_child_index..child_count,
406 }
407 }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::prelude::{RestartStrategy, SupervisorRuntime};
414 use std::time::Duration;
415
416 #[test]
417 fn restart_scope_matches_strategy() {
418 let one_for_one = SupervisorRuntime::with_strategy(RestartStrategy::OneForOne);
419 assert_eq!(one_for_one.restart_scope(2, 5), 2..3);
420
421 let one_for_all = SupervisorRuntime::with_strategy(RestartStrategy::OneForAll);
422 assert_eq!(one_for_all.restart_scope(2, 5), 0..5);
423
424 let rest_for_one = SupervisorRuntime::with_strategy(RestartStrategy::RestForOne);
425 assert_eq!(rest_for_one.restart_scope(2, 5), 2..5);
426 }
427
428 #[tokio::test]
429 async fn join_next_observes_immediately_spawned_task() {
430 let runtime = SupervisorRuntime::new();
431 runtime.spawn_named("worker-1", async { Ok::<(), String>(()) });
432
433 let joined = tokio::time::timeout(Duration::from_secs(1), runtime.join_next())
434 .await
435 .expect("join_next should not hang");
436
437 assert!(matches!(joined, Some(Ok(()))));
438 }
439}