streamkit_core/state.rs
1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Node state management and lifecycle tracking.
6//!
7//! This module defines the state machine for node execution and provides
8//! helper functions for emitting state updates.
9//!
10//! ## State Machine
11//!
12//! Nodes transition through these states during their lifecycle:
13//!
14//! ```text
15//! Initializing
16//! ↓
17//! Ready ──────────┐
18//! ↓ │
19//! Running ←──┐ │
20//! ↓ │ │
21//! Recovering ──┘ │
22//! ↓ │
23//! Degraded │
24//! ↓ │
25//! Failed ←─────────┘
26//! ↓
27//! Stopped
28//! ```
29
30use serde::de::Deserializer;
31use serde::{Deserialize, Serialize};
32use std::time::SystemTime;
33use ts_rs::TS;
34
35/// Why a node entered the `Stopped` state.
36///
37/// Serialized as a snake_case string for ergonomic client handling.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, TS)]
39#[ts(export)]
40#[serde(rename_all = "snake_case")]
41pub enum StopReason {
42 /// Expected end of a finite stream (typical for stateless/oneshot pipelines).
43 Completed,
44 /// Upstream closed, no more data to process.
45 InputClosed,
46 /// Downstream closed, cannot deliver outputs.
47 OutputClosed,
48 /// Shutdown was requested (user action or coordinated cancellation).
49 Shutdown,
50 /// Node cannot proceed due to missing required inputs.
51 NoInputs,
52 /// A reason not recognized by this client/version.
53 Unknown,
54}
55
56impl<'de> Deserialize<'de> for StopReason {
57 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
58 where
59 D: Deserializer<'de>,
60 {
61 let value = String::deserialize(deserializer)?;
62 Ok(Self::from(value.as_str()))
63 }
64}
65
66impl From<&str> for StopReason {
67 fn from(value: &str) -> Self {
68 match value {
69 "completed" => Self::Completed,
70 "input_closed" => Self::InputClosed,
71 "output_closed" => Self::OutputClosed,
72 "shutdown" | "shutdown_requested" => Self::Shutdown,
73 "no_inputs" => Self::NoInputs,
74 _ => Self::Unknown,
75 }
76 }
77}
78
79impl From<String> for StopReason {
80 fn from(value: String) -> Self {
81 Self::from(value.as_str())
82 }
83}
84
85/// Represents the runtime state of a node in the pipeline.
86///
87/// ## State Machine
88///
89/// Nodes transition through these states during their lifecycle:
90///
91/// ```text
92/// Initializing
93/// ↓
94/// Ready ──────────┐
95/// ↓ │
96/// Running ←──┐ │
97/// ↓ │ │
98/// Recovering ──┘ │
99/// ↓ │
100/// Degraded │
101/// ↓ │
102/// Failed ←─────────┘
103/// ↓
104/// Stopped
105/// ```
106///
107/// ### Valid Transitions:
108/// - `Initializing` → `Ready` (source nodes) or `Running` (processing nodes)
109/// - `Ready` → `Running` (when pipeline is ready)
110/// - `Running` → `Recovering` (temporary issues, will retry)
111/// - `Running` → `Degraded` (persistent issues, no retry)
112/// - `Running` → `Failed` (fatal error)
113/// - `Running` → `Stopped` (graceful shutdown)
114/// - `Recovering` → `Running` (recovery succeeded)
115/// - `Recovering` → `Degraded` (recovery partially succeeded, quality reduced)
116/// - `Recovering` → `Failed` (recovery exhausted, giving up)
117/// - `Degraded` → `Failed` (conditions worsened)
118/// - `Ready` → `Failed` (initialization timeout or external failure)
119/// - Any state → `Stopped` (external shutdown request)
120#[derive(Debug, Clone, Serialize, Deserialize, TS)]
121#[ts(export)]
122pub enum NodeState {
123 /// Node is starting up and performing initialization.
124 /// Examples: Opening connections, loading resources, validating configuration.
125 Initializing,
126
127 /// Node has completed initialization and is ready to process data.
128 /// Source nodes (nodes with no inputs) wait in this state until all downstream
129 /// nodes are also ready, preventing packet loss during pipeline startup.
130 /// Non-source nodes typically skip this state and go directly to Running.
131 Ready,
132
133 /// Node is operating normally and processing data.
134 /// This is the expected steady state for a healthy node.
135 Running,
136
137 /// Node encountered an issue but is actively attempting to recover automatically.
138 /// The node is still running but may not be processing data during recovery.
139 ///
140 /// Examples:
141 /// - Transport node reconnecting after connection loss
142 /// - Decoder resyncing after corrupted data
143 /// - Node waiting for stalled input to resume
144 ///
145 /// The `reason` field provides a human-readable explanation.
146 /// The optional `details` field can contain node-specific structured information
147 /// (e.g., retry attempt numbers, affected resources).
148 Recovering {
149 reason: String,
150 #[ts(type = "JsonValue")]
151 details: Option<serde_json::Value>,
152 },
153
154 /// Node is operational but experiencing persistent issues that affect quality or performance.
155 /// Unlike `Recovering`, the node is not actively attempting automatic recovery.
156 ///
157 /// Examples:
158 /// - High latency or packet loss in transport
159 /// - Resource constraints (CPU, memory pressure)
160 /// - Partial functionality (some features unavailable)
161 ///
162 /// The node continues processing but users should be aware of reduced quality.
163 Degraded { reason: String },
164
165 /// Node has encountered a fatal error and stopped processing.
166 /// Manual intervention is required to restart the node.
167 ///
168 /// Examples:
169 /// - Max reconnection attempts exhausted
170 /// - Invalid configuration detected at runtime
171 /// - Unrecoverable protocol error
172 Failed { reason: String },
173
174 /// Node has stopped processing and shut down.
175 /// The `reason` field indicates why the node stopped:
176 /// - "completed" - Expected end of finite data stream (stateless pipelines)
177 /// - "input_closed" - Upstream node closed, no more data to process
178 /// - "shutdown" - Graceful shutdown was requested
179 ///
180 /// In live/dynamic pipelines, this state often indicates an issue (unexpected stop).
181 /// In stateless pipelines, "completed" is the expected end state.
182 Stopped { reason: StopReason },
183}
184
185/// A state update message sent by a node to report its current state.
186/// These updates are used for monitoring, debugging, and UI visualization.
187#[derive(Debug, Clone)]
188pub struct NodeStateUpdate {
189 /// The unique identifier of the node reporting the state
190 pub node_id: String,
191 /// The new state of the node
192 pub state: NodeState,
193 /// When this state change occurred
194 pub timestamp: SystemTime,
195}
196
197impl NodeStateUpdate {
198 /// Creates a new state update with the current timestamp.
199 #[inline]
200 pub fn new(node_id: String, state: NodeState) -> Self {
201 Self { node_id, state, timestamp: SystemTime::now() }
202 }
203}
204
205/// Helper functions for emitting node state updates.
206/// These functions reduce boilerplate when sending state updates from nodes.
207pub mod state_helpers {
208 use super::{NodeState, NodeStateUpdate, StopReason};
209 use tokio::sync::mpsc;
210
211 /// Emits a state update to the provided channel.
212 /// Failures are silently ignored as state tracking is best-effort.
213 #[inline]
214 pub fn emit_state(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str, state: NodeState) {
215 let _ = state_tx.try_send(NodeStateUpdate::new(node_id.to_string(), state));
216 }
217
218 /// Emits an Initializing state.
219 #[inline]
220 pub fn emit_initializing(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str) {
221 emit_state(state_tx, node_id, NodeState::Initializing);
222 }
223
224 /// Emits a Ready state.
225 #[inline]
226 pub fn emit_ready(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str) {
227 emit_state(state_tx, node_id, NodeState::Ready);
228 }
229
230 /// Emits a Running state.
231 #[inline]
232 pub fn emit_running(state_tx: &mpsc::Sender<NodeStateUpdate>, node_id: &str) {
233 emit_state(state_tx, node_id, NodeState::Running);
234 }
235
236 /// Emits a Stopped state with the given reason.
237 #[inline]
238 pub fn emit_stopped(
239 state_tx: &mpsc::Sender<NodeStateUpdate>,
240 node_id: &str,
241 reason: impl Into<StopReason>,
242 ) {
243 emit_state(state_tx, node_id, NodeState::Stopped { reason: reason.into() });
244 }
245
246 /// Emits a Failed state with the given error.
247 #[inline]
248 pub fn emit_failed(
249 state_tx: &mpsc::Sender<NodeStateUpdate>,
250 node_id: &str,
251 error: impl Into<String>,
252 ) {
253 emit_state(state_tx, node_id, NodeState::Failed { reason: error.into() });
254 }
255
256 /// Emits a Recovering state with the given reason and optional details.
257 #[inline]
258 pub fn emit_recovering(
259 state_tx: &mpsc::Sender<NodeStateUpdate>,
260 node_name: &str,
261 reason: impl Into<String>,
262 details: Option<serde_json::Value>,
263 ) {
264 emit_state(state_tx, node_name, NodeState::Recovering { reason: reason.into(), details });
265 }
266
267 /// Emits a Recovering state with retry attempt tracking.
268 ///
269 /// This is a convenience helper for nodes implementing retry logic.
270 /// The attempt count and max attempts are included in the details field
271 /// for monitoring and debugging.
272 ///
273 /// # Example
274 /// ```no_run
275 /// # use streamkit_core::state::state_helpers::emit_recovering_with_retry;
276 /// # use tokio::sync::mpsc;
277 /// # let state_tx = mpsc::channel(1).0;
278 /// emit_recovering_with_retry(
279 /// &state_tx,
280 /// "websocket_client",
281 /// "Connection lost, reconnecting",
282 /// 2,
283 /// 5
284 /// );
285 /// // Emits: Recovering { reason: "Connection lost, reconnecting",
286 /// // details: { "attempt": 2, "max_attempts": 5 } }
287 /// ```
288 #[inline]
289 pub fn emit_recovering_with_retry(
290 state_tx: &mpsc::Sender<NodeStateUpdate>,
291 node_name: &str,
292 reason: impl Into<String>,
293 attempt: u32,
294 max_attempts: u32,
295 ) {
296 let details = serde_json::json!({
297 "attempt": attempt,
298 "max_attempts": max_attempts,
299 });
300 emit_recovering(state_tx, node_name, reason, Some(details));
301 }
302
303 /// Emits a Degraded state with the given reason.
304 #[inline]
305 pub fn emit_degraded(
306 state_tx: &mpsc::Sender<NodeStateUpdate>,
307 node_name: &str,
308 reason: impl Into<String>,
309 ) {
310 emit_state(state_tx, node_name, NodeState::Degraded { reason: reason.into() });
311 }
312}