gust-runtime 0.2.0

Runtime support library (traits, envelopes, supervisors) for programs compiled from Gust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
#![warn(missing_docs)]
//! # Gust Runtime
//!
//! The runtime support library for compiled Gust state machine programs.
//!
//! When the Gust compiler generates Rust code from `.gu` source files, the
//! generated code depends on this crate for core traits and utilities.
//! Generated modules typically begin with `use gust_runtime::prelude::*;`
//! to bring all necessary items into scope.
//!
//! ## What this crate provides
//!
//! - **[`prelude::Machine`]** -- the base trait every generated state machine implements,
//!   providing state inspection and JSON serialization.
//! - **[`prelude::Supervisor`]** and **[`prelude::SupervisorRuntime`]** -- structured concurrency
//!   primitives for running multiple child machines under a supervision tree,
//!   with configurable restart strategies.
//! - **[`prelude::Envelope`]** -- a typed message wrapper for cross-machine communication.
//! - **[`prelude::SupervisorAction`]** and **[`prelude::RestartStrategy`]** -- enums that control
//!   how failures propagate and which children are restarted.
//!
//! ## Re-exports
//!
//! The prelude also re-exports [`serde`], [`serde_json`], and [`thiserror`] so
//! that generated code does not need to declare these as direct dependencies.
//!
//! ## Usage
//!
//! You normally do **not** depend on this crate directly. Instead, the Gust
//! compiler adds it to your generated code automatically. If you need to
//! interact with generated machines from hand-written Rust, add the dependency
//! to your `Cargo.toml`:
//!
//! ```toml
//! [dependencies]
//! gust-runtime = "0.1"
//! ```

/// Re-exports of the runtime traits and types that generated Gust code
/// relies on. Generated `.g.rs` files `use gust_runtime::prelude::*;` to
/// bring `Machine`, `Supervisor`, `Envelope`, `RestartStrategy`, and
/// third-party re-exports (`serde`, `serde_json`, `thiserror`) into scope.
pub mod prelude {
    use std::future::Future;
    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};
    use tokio::sync::Mutex;
    use tokio::task::JoinSet;

    pub use serde::{Deserialize, Serialize};
    pub use serde_json;
    pub use thiserror;

    /// The base trait for all Gust state machines.
    ///
    /// Every machine generated by the Gust compiler implements `Machine`,
    /// which provides state inspection and JSON round-trip serialization.
    /// The trait requires [`Serialize`] and [`Deserialize`] so that machine
    /// instances can be persisted, transferred, or logged as JSON.
    ///
    /// # Associated types
    ///
    /// * `State` -- the enum of possible states for this machine. Must be
    ///   `Debug + Clone + Serialize + Deserialize`.
    ///
    /// # Examples
    ///
    /// ```rust,ignore
    /// use gust_runtime::prelude::*;
    ///
    /// // Given a generated TrafficLight machine:
    /// let light = TrafficLight::new();
    /// println!("Current state: {:?}", light.current_state());
    ///
    /// // Serialize to JSON for persistence
    /// let json = light.to_json().unwrap();
    ///
    /// // Restore from JSON
    /// let restored = TrafficLight::from_json(&json).unwrap();
    /// ```
    pub trait Machine: Serialize + for<'de> Deserialize<'de> {
        /// The enum type representing all possible states of this machine.
        type State: std::fmt::Debug + Clone + Serialize + for<'de> Deserialize<'de>;

        /// Returns a reference to the machine's current state.
        fn current_state(&self) -> &Self::State;

        /// Serializes the entire machine to a pretty-printed JSON string.
        ///
        /// This captures both the current state and any associated data,
        /// allowing the machine to be persisted or transmitted.
        fn to_json(&self) -> Result<String, serde_json::Error> {
            serde_json::to_string_pretty(self)
        }

        /// Deserializes a machine from a JSON string.
        ///
        /// The JSON must have been produced by [`to_json`](Machine::to_json)
        /// (or an equivalent serializer) for the same machine type.
        fn from_json(json: &str) -> Result<Self, serde_json::Error>
        where
            Self: Sized,
        {
            serde_json::from_str(json)
        }
    }

    /// A trait for supervised machine groups (structured concurrency).
    ///
    /// Supervisors monitor a set of child machines and react when any child
    /// enters a failure state. The supervisor decides whether to restart the
    /// child, escalate the error, or ignore it by returning a
    /// [`SupervisorAction`].
    ///
    /// # Examples
    ///
    /// ```rust,ignore
    /// use gust_runtime::prelude::*;
    ///
    /// struct MyGroup;
    ///
    /// impl Supervisor for MyGroup {
    ///     type Error = String;
    ///
    ///     fn on_child_failure(&mut self, child_id: &str, error: &String) -> SupervisorAction {
    ///         eprintln!("Child {child_id} failed: {error}");
    ///         SupervisorAction::Restart
    ///     }
    /// }
    /// ```
    pub trait Supervisor {
        /// The error type reported by child machines.
        type Error: std::fmt::Debug;

        /// Called when a child machine enters a failure state.
        ///
        /// The supervisor inspects the `child_id` and `error` and returns
        /// a [`SupervisorAction`] that determines how the runtime should
        /// respond.
        fn on_child_failure(&mut self, child_id: &str, error: &Self::Error) -> SupervisorAction;
    }

    /// The action a [`Supervisor`] takes when a child machine fails.
    #[derive(Debug, Clone)]
    pub enum SupervisorAction {
        /// Restart the failed child machine from its initial state.
        ///
        /// The child is re-initialized and begins executing again. Other
        /// children are unaffected (unless [`RestartStrategy::OneForAll`]
        /// or [`RestartStrategy::RestForOne`] is in effect).
        Restart,

        /// Stop the failed child and propagate the error up the supervision tree.
        ///
        /// Use this when a child failure is unrecoverable and the parent
        /// supervisor (or the top-level caller) should handle it.
        Escalate,

        /// Ignore the failure and let the child remain in its failed state.
        ///
        /// The child is not restarted. This is appropriate for non-critical
        /// background tasks whose failure does not affect the overall system.
        Ignore,
    }

    /// A typed message envelope for cross-machine communication.
    ///
    /// `Envelope` wraps a payload of type `T` with routing metadata
    /// (`source`, `target`) and an optional `correlation_id` for
    /// request-response patterns. It is serializable so it can be
    /// transmitted across process or network boundaries.
    ///
    /// # Examples
    ///
    /// ```rust
    /// use gust_runtime::prelude::*;
    ///
    /// let envelope = Envelope::new("order-service", "payment-service", 42u64)
    ///     .with_correlation("req-001");
    ///
    /// assert_eq!(envelope.source, "order-service");
    /// assert_eq!(envelope.target, "payment-service");
    /// assert_eq!(envelope.payload, 42u64);
    /// assert_eq!(envelope.correlation_id.as_deref(), Some("req-001"));
    /// ```
    #[derive(Debug, Clone, Serialize, Deserialize)]
    pub struct Envelope<T: Serialize> {
        /// The identifier of the sending machine or service.
        pub source: String,
        /// The identifier of the intended recipient machine or service.
        pub target: String,
        /// The message payload.
        pub payload: T,
        /// An optional correlation identifier for matching requests to responses.
        pub correlation_id: Option<String>,
    }

    impl<T: Serialize> Envelope<T> {
        /// Creates a new envelope with the given source, target, and payload.
        ///
        /// The `correlation_id` is initially `None`. Use
        /// [`with_correlation`](Envelope::with_correlation) to set it.
        pub fn new(source: impl Into<String>, target: impl Into<String>, payload: T) -> Self {
            Self {
                source: source.into(),
                target: target.into(),
                payload,
                correlation_id: None,
            }
        }

        /// Sets the correlation identifier on this envelope (builder pattern).
        ///
        /// Correlation IDs are useful for matching asynchronous responses
        /// back to the original request.
        pub fn with_correlation(mut self, id: impl Into<String>) -> Self {
            self.correlation_id = Some(id.into());
            self
        }
    }

    /// A handle to a child task spawned by a [`SupervisorRuntime`].
    ///
    /// Contains the logical identifier of the child, which is used for
    /// logging and passed to [`Supervisor::on_child_failure`] when the
    /// child reports an error.
    #[derive(Debug, Clone)]
    pub struct ChildHandle {
        /// The logical identifier of this child task.
        pub id: String,
    }

    /// Strategy that determines which children are restarted when one fails.
    ///
    /// These strategies are modeled after Erlang/OTP supervisor behaviors
    /// and are used by [`SupervisorRuntime::restart_scope`] to compute
    /// the range of children to restart.
    ///
    /// # Variants
    ///
    /// Given children `[A, B, C, D, E]` where `C` (index 2) fails:
    ///
    /// | Strategy | Restarted children |
    /// |---|---|
    /// | `OneForOne` | `[C]` |
    /// | `OneForAll` | `[A, B, C, D, E]` |
    /// | `RestForOne` | `[C, D, E]` |
    #[derive(Debug, Clone, Copy, Default)]
    pub enum RestartStrategy {
        /// Restart only the failed child. Other children are unaffected.
        ///
        /// This is the default strategy and is appropriate when children
        /// are independent of each other.
        #[default]
        OneForOne,

        /// Restart **all** children when any single child fails.
        ///
        /// Use this when children have interdependencies and a consistent
        /// group state is required.
        OneForAll,

        /// Restart the failed child and all children started **after** it.
        ///
        /// This is useful when later children depend on earlier ones:
        /// restarting from the failure point forward re-establishes the
        /// dependency chain without disturbing earlier, independent children.
        RestForOne,
    }

    /// An async runtime for supervising a group of child tasks.
    ///
    /// `SupervisorRuntime` manages a [`JoinSet`] of async tasks and applies
    /// a [`RestartStrategy`] to determine which children to restart when a
    /// failure occurs. It handles the concurrency details of spawning,
    /// joining, and tracking child tasks.
    ///
    /// # Examples
    ///
    /// ```rust,ignore
    /// use gust_runtime::prelude::*;
    ///
    /// let runtime = SupervisorRuntime::with_strategy(RestartStrategy::OneForAll);
    /// let handle = runtime.spawn_named("worker-1", async {
    ///     // ... child machine logic ...
    ///     Ok(())
    /// });
    ///
    /// // Wait for the next child to complete
    /// if let Some(result) = runtime.join_next().await {
    ///     match result {
    ///         Ok(()) => println!("child completed successfully"),
    ///         Err(e) => println!("child failed: {e}"),
    ///     }
    /// }
    /// ```
    #[derive(Debug, Clone)]
    pub struct SupervisorRuntime {
        /// The set of currently running child tasks.
        tasks: Arc<Mutex<JoinSet<Result<(), String>>>>,
        /// Counter for spawns that are waiting on the lock.
        pending_spawns: Arc<AtomicUsize>,
        /// The restart strategy governing failure responses.
        strategy: RestartStrategy,
    }

    impl Default for SupervisorRuntime {
        fn default() -> Self {
            Self::new()
        }
    }

    impl SupervisorRuntime {
        /// Creates a new `SupervisorRuntime` with the default
        /// [`RestartStrategy::OneForOne`] strategy.
        pub fn new() -> Self {
            Self::with_strategy(RestartStrategy::OneForOne)
        }

        /// Creates a new `SupervisorRuntime` with the specified restart strategy.
        pub fn with_strategy(strategy: RestartStrategy) -> Self {
            Self {
                tasks: Arc::new(Mutex::new(JoinSet::new())),
                pending_spawns: Arc::new(AtomicUsize::new(0)),
                strategy,
            }
        }

        /// Spawns a named child task and returns a [`ChildHandle`].
        ///
        /// The future `fut` is spawned onto the Tokio runtime. If the
        /// internal task set lock is immediately available the spawn happens
        /// synchronously; otherwise a helper task is used to avoid blocking.
        ///
        /// The `id` is stored in the returned [`ChildHandle`] for
        /// identification purposes.
        pub fn spawn_named<F>(&self, id: impl Into<String>, fut: F) -> ChildHandle
        where
            F: Future<Output = Result<(), String>> + Send + 'static,
        {
            let id = id.into();
            let task_id = id.clone();
            match self.tasks.try_lock() {
                Ok(mut tasks) => {
                    tasks.spawn(fut);
                }
                _ => {
                    self.pending_spawns.fetch_add(1, Ordering::SeqCst);
                    let tasks = self.tasks.clone();
                    let pending_spawns = self.pending_spawns.clone();
                    tokio::spawn(async move {
                        tasks.lock().await.spawn(fut);
                        pending_spawns.fetch_sub(1, Ordering::SeqCst);
                    });
                }
            }
            ChildHandle { id: task_id }
        }

        /// Waits for the next child task to complete and returns its result.
        ///
        /// Returns `Some(Ok(()))` when a child succeeds, `Some(Err(..))` when
        /// a child fails, or `None` when all children have completed and no
        /// pending spawns remain.
        ///
        /// This method will yield cooperatively if there are pending spawns
        /// that have not yet been added to the task set.
        pub async fn join_next(&self) -> Option<Result<(), String>> {
            loop {
                let next = self.tasks.lock().await.join_next().await;
                match next {
                    Some(Ok(inner)) => return Some(inner),
                    Some(Err(join_err)) => {
                        return Some(Err(format!("task join error: {join_err}")));
                    }
                    None if self.pending_spawns.load(Ordering::SeqCst) == 0 => return None,
                    None => tokio::task::yield_now().await,
                }
            }
        }

        /// Returns the [`RestartStrategy`] configured for this runtime.
        pub fn strategy(&self) -> RestartStrategy {
            self.strategy
        }

        /// Computes the range of child indices that should be restarted
        /// when the child at `failed_child_index` fails.
        ///
        /// The returned range depends on the configured [`RestartStrategy`]:
        ///
        /// - **OneForOne**: only the failed child (`index..index+1`)
        /// - **OneForAll**: all children (`0..child_count`)
        /// - **RestForOne**: the failed child and all that follow (`index..child_count`)
        pub fn restart_scope(
            &self,
            failed_child_index: usize,
            child_count: usize,
        ) -> std::ops::Range<usize> {
            match self.strategy {
                RestartStrategy::OneForOne => {
                    failed_child_index..failed_child_index.saturating_add(1)
                }
                RestartStrategy::OneForAll => 0..child_count,
                RestartStrategy::RestForOne => failed_child_index..child_count,
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::prelude::{RestartStrategy, SupervisorRuntime};
    use std::time::Duration;

    #[test]
    fn restart_scope_matches_strategy() {
        let one_for_one = SupervisorRuntime::with_strategy(RestartStrategy::OneForOne);
        assert_eq!(one_for_one.restart_scope(2, 5), 2..3);

        let one_for_all = SupervisorRuntime::with_strategy(RestartStrategy::OneForAll);
        assert_eq!(one_for_all.restart_scope(2, 5), 0..5);

        let rest_for_one = SupervisorRuntime::with_strategy(RestartStrategy::RestForOne);
        assert_eq!(rest_for_one.restart_scope(2, 5), 2..5);
    }

    #[tokio::test]
    async fn join_next_observes_immediately_spawned_task() {
        let runtime = SupervisorRuntime::new();
        runtime.spawn_named("worker-1", async { Ok::<(), String>(()) });

        let joined = tokio::time::timeout(Duration::from_secs(1), runtime.join_next())
            .await
            .expect("join_next should not hang");

        assert!(matches!(joined, Some(Ok(()))));
    }
}