streamkit_core/
state.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Node state management and lifecycle tracking.
6//!
7//! This module defines the state machine for node execution and provides
8//! helper functions for emitting state updates.
9//!
10//! ## State Machine
11//!
12//! Nodes transition through these states during their lifecycle:
13//!
14//! ```text
15//!     Initializing
16//!          ↓
17//!        Ready ──────────┐
18//!          ↓             │
19//!       Running ←──┐     │
20//!          ↓       │     │
21//!     Recovering ──┘     │
22//!          ↓             │
23//!       Degraded         │
24//!          ↓             │
25//!       Failed ←─────────┘
26//!          ↓
27//!       Stopped
28//! ```
29
30use serde::de::Deserializer;
31use serde::{Deserialize, Serialize};
32use std::time::SystemTime;
33use ts_rs::TS;
34
35/// Why a node entered the `Stopped` state.
36///
37/// Serialized as a snake_case string for ergonomic client handling.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, TS)]
39#[ts(export)]
40#[serde(rename_all = "snake_case")]
41pub enum StopReason {
42    /// Expected end of a finite stream (typical for stateless/oneshot pipelines).
43    Completed,
44    /// Upstream closed, no more data to process.
45    InputClosed,
46    /// Downstream closed, cannot deliver outputs.
47    OutputClosed,
48    /// Shutdown was requested (user action or coordinated cancellation).
49    Shutdown,
50    /// Node cannot proceed due to missing required inputs.
51    NoInputs,
52    /// A reason not recognized by this client/version.
53    Unknown,
54}
55
56impl<'de> Deserialize<'de> for StopReason {
57    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
58    where
59        D: Deserializer<'de>,
60    {
61        let value = String::deserialize(deserializer)?;
62        Ok(Self::from(value.as_str()))
63    }
64}
65
66impl From<&str> for StopReason {
67    fn from(value: &str) -> Self {
68        match value {
69            "completed" => Self::Completed,
70            "input_closed" => Self::InputClosed,
71            "output_closed" => Self::OutputClosed,
72            "shutdown" | "shutdown_requested" => Self::Shutdown,
73            "no_inputs" => Self::NoInputs,
74            _ => Self::Unknown,
75        }
76    }
77}
78
79impl From<String> for StopReason {
80    fn from(value: String) -> Self {
81        Self::from(value.as_str())
82    }
83}
84
85/// Represents the runtime state of a node in the pipeline.
86///
87/// ## State Machine
88///
89/// Nodes transition through these states during their lifecycle:
90///
91/// ```text
92///     Initializing
93///          ↓
94///        Ready ──────────┐
95///          ↓             │
96///       Running ←──┐     │
97///          ↓       │     │
98///     Recovering ──┘     │
99///          ↓             │
100///       Degraded         │
101///          ↓             │
102///       Failed ←─────────┘
103///          ↓
104///       Stopped
105/// ```
106///
107/// ### Valid Transitions:
108/// - `Initializing` → `Ready` (source nodes) or `Running` (processing nodes)
109/// - `Ready` → `Running` (when pipeline is ready)
110/// - `Running` → `Recovering` (temporary issues, will retry)
111/// - `Running` → `Degraded` (persistent issues, no retry)
112/// - `Running` → `Failed` (fatal error)
113/// - `Running` → `Stopped` (graceful shutdown)
114/// - `Recovering` → `Running` (recovery succeeded)
115/// - `Recovering` → `Degraded` (recovery partially succeeded, quality reduced)
116/// - `Recovering` → `Failed` (recovery exhausted, giving up)
117/// - `Degraded` → `Failed` (conditions worsened)
118/// - `Ready` → `Failed` (initialization timeout or external failure)
119/// - Any state → `Stopped` (external shutdown request)
120#[derive(Debug, Clone, Serialize, Deserialize, TS)]
121#[ts(export)]
122pub enum NodeState {
123    /// Node is starting up and performing initialization.
124    /// Examples: Opening connections, loading resources, validating configuration.
125    Initializing,
126
127    /// Node has completed initialization and is ready to process data.
128    /// Source nodes (nodes with no inputs) wait in this state until all downstream
129    /// nodes are also ready, preventing packet loss during pipeline startup.
130    /// Non-source nodes typically skip this state and go directly to Running.
131    Ready,
132
133    /// Node is operating normally and processing data.
134    /// This is the expected steady state for a healthy node.
135    Running,
136
137    /// Node encountered an issue but is actively attempting to recover automatically.
138    /// The node is still running but may not be processing data during recovery.
139    ///
140    /// Examples:
141    /// - Transport node reconnecting after connection loss
142    /// - Decoder resyncing after corrupted data
143    /// - Node waiting for stalled input to resume
144    ///
145    /// The `reason` field provides a human-readable explanation.
146    /// The optional `details` field can contain node-specific structured information
147    /// (e.g., retry attempt numbers, affected resources).
148    Recovering {
149        reason: String,
150        #[ts(type = "JsonValue")]
151        details: Option<serde_json::Value>,
152    },
153
154    /// Node is operational but experiencing persistent issues that affect quality or performance.
155    /// Unlike `Recovering`, the node is not actively attempting automatic recovery.
156    ///
157    /// Examples:
158    /// - High latency or packet loss in transport
159    /// - Resource constraints (CPU, memory pressure)
160    /// - Partial functionality (some features unavailable)
161    ///
162    /// The node continues processing but users should be aware of reduced quality.
163    Degraded { reason: String },
164
165    /// Node has encountered a fatal error and stopped processing.
166    /// Manual intervention is required to restart the node.
167    ///
168    /// Examples:
169    /// - Max reconnection attempts exhausted
170    /// - Invalid configuration detected at runtime
171    /// - Unrecoverable protocol error
172    Failed { reason: String },
173
174    /// Node has stopped processing and shut down.
175    /// The `reason` field indicates why the node stopped:
176    /// - "completed" - Expected end of finite data stream (stateless pipelines)
177    /// - "input_closed" - Upstream node closed, no more data to process
178    /// - "shutdown" - Graceful shutdown was requested
179    ///
180    /// In live/dynamic pipelines, this state often indicates an issue (unexpected stop).
181    /// In stateless pipelines, "completed" is the expected end state.
182    Stopped { reason: StopReason },
183}
184
185/// A state update message sent by a node to report its current state.
186/// These updates are used for monitoring, debugging, and UI visualization.
187#[derive(Debug, Clone)]
188pub struct NodeStateUpdate {
189    /// The unique identifier of the node reporting the state
190    pub node_id: String,
191    /// The new state of the node
192    pub state: NodeState,
193    /// When this state change occurred
194    pub timestamp: SystemTime,
195}
196
197impl NodeStateUpdate {
198    /// Creates a new state update with the current timestamp.
199    #[inline]
200    pub fn new(node_id: String, state: NodeState) -> Self {
201        Self { node_id, state, timestamp: SystemTime::now() }
202    }
203}
204
205/// Helper functions for emitting node state updates.
206/// These functions reduce boilerplate when sending state updates from nodes.
207pub mod state_helpers {
208    use super::{NodeState, NodeStateUpdate, StopReason};
209    use tokio::sync::mpsc;
210
211    /// Emits a state update to the provided channel.
212    /// Failures are silently ignored as state tracking is best-effort.
213    #[inline]
214    pub fn emit_state(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str, state: NodeState) {
215        let _ = state_tx.try_send(NodeStateUpdate::new(node_id.to_string(), state));
216    }
217
218    /// Emits an Initializing state.
219    #[inline]
220    pub fn emit_initializing(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str) {
221        emit_state(state_tx, node_id, NodeState::Initializing);
222    }
223
224    /// Emits a Ready state.
225    #[inline]
226    pub fn emit_ready(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str) {
227        emit_state(state_tx, node_id, NodeState::Ready);
228    }
229
230    /// Emits a Running state.
231    #[inline]
232    pub fn emit_running(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str) {
233        emit_state(state_tx, node_id, NodeState::Running);
234    }
235
236    /// Emits a Stopped state with the given reason.
237    #[inline]
238    pub fn emit_stopped(
239        state_tx: &mpsc::Sender<NodeStateUpdate>,
240        node_id: &str,
241        reason: impl Into<StopReason>,
242    ) {
243        emit_state(state_tx, node_id, NodeState::Stopped { reason: reason.into() });
244    }
245
246    /// Emits a Failed state with the given error.
247    #[inline]
248    pub fn emit_failed(
249        state_tx: &mpsc::Sender<NodeStateUpdate>,
250        node_id: &str,
251        error: impl Into<String>,
252    ) {
253        emit_state(state_tx, node_id, NodeState::Failed { reason: error.into() });
254    }
255
256    /// Emits a Recovering state with the given reason and optional details.
257    #[inline]
258    pub fn emit_recovering(
259        state_tx: &mpsc::Sender<NodeStateUpdate>,
260        node_name: &str,
261        reason: impl Into<String>,
262        details: Option<serde_json::Value>,
263    ) {
264        emit_state(state_tx, node_name, NodeState::Recovering { reason: reason.into(), details });
265    }
266
267    /// Emits a Recovering state with retry attempt tracking.
268    ///
269    /// This is a convenience helper for nodes implementing retry logic.
270    /// The attempt count and max attempts are included in the details field
271    /// for monitoring and debugging.
272    ///
273    /// # Example
274    /// ```no_run
275    /// # use streamkit_core::state::state_helpers::emit_recovering_with_retry;
276    /// # use tokio::sync::mpsc;
277    /// # let state_tx = mpsc::channel(1).0;
278    /// emit_recovering_with_retry(
279    ///     &state_tx,
280    ///     "websocket_client",
281    ///     "Connection lost, reconnecting",
282    ///     2,
283    ///     5
284    /// );
285    /// // Emits: Recovering { reason: "Connection lost, reconnecting",
286    /// //                     details: { "attempt": 2, "max_attempts": 5 } }
287    /// ```
288    #[inline]
289    pub fn emit_recovering_with_retry(
290        state_tx: &mpsc::Sender<NodeStateUpdate>,
291        node_name: &str,
292        reason: impl Into<String>,
293        attempt: u32,
294        max_attempts: u32,
295    ) {
296        let details = serde_json::json!({
297            "attempt": attempt,
298            "max_attempts": max_attempts,
299        });
300        emit_recovering(state_tx, node_name, reason, Some(details));
301    }
302
303    /// Emits a Degraded state with the given reason.
304    #[inline]
305    pub fn emit_degraded(
306        state_tx: &mpsc::Sender<NodeStateUpdate>,
307        node_name: &str,
308        reason: impl Into<String>,
309    ) {
310        emit_state(state_tx, node_name, NodeState::Degraded { reason: reason.into() });
311    }
312}