streamkit_core/node.rs
1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Core node abstractions and ProcessorNode trait.
6//!
7//! This module defines the fundamental interface for processing nodes:
8//! - [`ProcessorNode`]: The core trait that all nodes must implement
9//! - [`NodeContext`]: Runtime context passed to nodes during execution
10//! - [`InitContext`]: Context for asynchronous initialization
11//! - [`OutputSender`]: Handle for sending packets to downstream nodes
12
13use crate::control::NodeControlMessage;
14use crate::error::StreamKitError;
15use crate::pins::{InputPin, OutputPin, PinManagementMessage, PinUpdate};
16use crate::state::NodeStateUpdate;
17use crate::stats::NodeStatsUpdate;
18use crate::telemetry::TelemetryEvent;
19use crate::types::Packet;
20use crate::AudioFramePool;
21use async_trait::async_trait;
22use std::collections::HashMap;
23use std::sync::Arc;
24use thiserror::Error;
25use tokio::sync::mpsc;
26
27/// Message type for routed packet delivery.
28/// Uses `Arc<str>` for node and pin names to avoid heap allocations on every send.
29pub type RoutedPacketMessage = (Arc<str>, Arc<str>, Packet);
30
31/// An enum representing the two ways a node's output can be routed.
32#[derive(Clone)]
33pub enum OutputRouting {
34 /// Packets are sent directly to the input channels of downstream nodes.
35 Direct(HashMap<String, mpsc::Sender<Packet>>),
36 /// Packets are sent to a central engine actor for routing.
37 /// Uses Arc<str> for node/pin names to avoid heap allocations on every packet.
38 Routed(mpsc::Sender<RoutedPacketMessage>),
39}
40
41/// A handle given to a node to send its output packets.
42#[derive(Clone)]
43pub struct OutputSender {
44 /// Node name as Arc<str> to avoid cloning allocations
45 node_name: Arc<str>,
46 routing: OutputRouting,
47 /// Cached pin names as Arc<str> to avoid repeated allocations
48 pin_name_cache: HashMap<String, Arc<str>>,
49}
50
51/// Error returned by [`OutputSender::send`] when a packet cannot be delivered.
52#[derive(Debug, Error, Clone, PartialEq, Eq)]
53pub enum OutputSendError {
54 /// The requested output pin does not exist on this node.
55 #[error("unknown output pin '{pin_name}' on node '{node_name}'")]
56 PinNotFound { node_name: String, pin_name: String },
57
58 /// The downstream channel (direct) or engine channel (routed) is closed.
59 #[error("output channel closed for pin '{pin_name}' on node '{node_name}'")]
60 ChannelClosed { node_name: String, pin_name: String },
61}
62
63impl OutputSender {
64 /// Creates a new OutputSender.
65 /// Note: The node_name String is converted to Arc<str> for efficient cloning on the hot path.
66 pub fn new(node_name: String, routing: OutputRouting) -> Self {
67 Self { node_name: Arc::from(node_name), routing, pin_name_cache: HashMap::new() }
68 }
69
70 /// Returns the node's name.
71 #[must_use]
72 pub fn node_name(&self) -> &str {
73 &self.node_name
74 }
75
76 /// Get or cache the pin name as Arc<str> to avoid repeated allocations.
77 fn get_cached_pin_name(&mut self, pin_name: &str) -> Arc<str> {
78 if let Some(cached) = self.pin_name_cache.get(pin_name) {
79 cached.clone() // O(1) Arc clone
80 } else {
81 let arc_name: Arc<str> = Arc::from(pin_name);
82 self.pin_name_cache.insert(pin_name.to_string(), arc_name.clone());
83 arc_name
84 }
85 }
86
87 /// Sends a packet from a specific output pin of this node.
88 /// Returns `Ok(())` if sent successfully.
89 ///
90 /// Nodes should stop processing when this returns an error, as it indicates
91 /// either a programming mistake (unknown pin) or that the pipeline is shutting down.
92 ///
93 /// # Errors
94 ///
95 /// Returns [`OutputSendError::PinNotFound`] if the pin doesn't exist, or
96 /// [`OutputSendError::ChannelClosed`] if the receiving channel is closed.
97 pub async fn send(&mut self, pin_name: &str, packet: Packet) -> Result<(), OutputSendError> {
98 use tokio::sync::mpsc::error::TrySendError;
99
100 match &self.routing {
101 OutputRouting::Direct(senders) => {
102 if let Some(sender) = senders.get(pin_name) {
103 // Fast path: avoid allocating/awaiting a future if the channel has capacity.
104 match sender.try_send(packet) {
105 Ok(()) => {},
106 Err(TrySendError::Full(packet)) => {
107 if sender.send(packet).await.is_err() {
108 // This is expected during cancellation/shutdown, so use debug level
109 tracing::debug!(
110 "Directly connected channel for pin '{}' is closed.",
111 pin_name
112 );
113 return Err(OutputSendError::ChannelClosed {
114 node_name: self.node_name.to_string(),
115 pin_name: pin_name.to_string(),
116 });
117 }
118 },
119 Err(TrySendError::Closed(_packet)) => {
120 // This is expected during cancellation/shutdown, so use debug level
121 tracing::debug!(
122 "Directly connected channel for pin '{}' is closed.",
123 pin_name
124 );
125 return Err(OutputSendError::ChannelClosed {
126 node_name: self.node_name.to_string(),
127 pin_name: pin_name.to_string(),
128 });
129 },
130 }
131 } else {
132 // Pin not found - this is a programming error, log warning and return error
133 tracing::warn!(
134 "OutputSender::send() called with unknown pin '{}' on node '{}'. \
135 Available pins: {:?}. Packet dropped.",
136 pin_name,
137 self.node_name,
138 senders.keys().collect::<Vec<_>>()
139 );
140 return Err(OutputSendError::PinNotFound {
141 node_name: self.node_name.to_string(),
142 pin_name: pin_name.to_string(),
143 });
144 }
145 },
146 OutputRouting::Routed(engine_tx) => {
147 // Clone engine_tx first to release the immutable borrow on self,
148 // allowing us to call get_cached_pin_name() which needs &mut self
149 let engine_tx = engine_tx.clone();
150
151 // Use cached Arc<str> for node and pin names to avoid heap allocations
152 let cached_pin = self.get_cached_pin_name(pin_name);
153 let message = (self.node_name.clone(), cached_pin, packet);
154 match engine_tx.try_send(message) {
155 Ok(()) => {},
156 Err(TrySendError::Full(message)) => {
157 if engine_tx.send(message).await.is_err() {
158 tracing::warn!("Engine channel is closed. Cannot send packet.");
159 return Err(OutputSendError::ChannelClosed {
160 node_name: self.node_name.to_string(),
161 pin_name: pin_name.to_string(),
162 });
163 }
164 },
165 Err(TrySendError::Closed(_message)) => {
166 tracing::warn!("Engine channel is closed. Cannot send packet.");
167 return Err(OutputSendError::ChannelClosed {
168 node_name: self.node_name.to_string(),
169 pin_name: pin_name.to_string(),
170 });
171 },
172 }
173 },
174 }
175 Ok(())
176 }
177}
178
179/// Context provided to nodes during initialization.
180///
181/// This allows nodes to perform async operations (like probing external resources)
182/// before the pipeline starts executing.
183pub struct InitContext {
184 /// The node's unique identifier in the pipeline
185 pub node_id: String,
186 /// Channel to report state changes during initialization
187 pub state_tx: tokio::sync::mpsc::Sender<NodeStateUpdate>,
188}
189
190/// The context provided by the engine to a node when it is run.
191pub struct NodeContext {
192 pub inputs: HashMap<String, mpsc::Receiver<Packet>>,
193 pub control_rx: mpsc::Receiver<NodeControlMessage>,
194 pub output_sender: OutputSender,
195 pub batch_size: usize,
196 /// Channel for the node to report state changes.
197 /// Nodes should send updates when transitioning between states to enable
198 /// monitoring and debugging. It's acceptable if sends fail (e.g., in stateless
199 /// pipelines where state tracking may not be enabled).
200 pub state_tx: mpsc::Sender<NodeStateUpdate>,
201 /// Channel for the node to report statistics updates.
202 /// Nodes should throttle these updates (e.g., every 10s or 1000 packets)
203 /// to prevent overloading the monitoring system. Like state_tx, it's
204 /// acceptable if sends fail.
205 pub stats_tx: Option<mpsc::Sender<NodeStatsUpdate>>,
206 /// Channel for the node to emit telemetry events.
207 /// Telemetry is best-effort and should never block audio processing.
208 /// Nodes should use `try_send()` or the `TelemetryEmitter` helper which
209 /// handles rate limiting and drop accounting automatically.
210 pub telemetry_tx: Option<mpsc::Sender<TelemetryEvent>>,
211 /// Session ID for gateway registration and routing (if applicable)
212 pub session_id: Option<String>,
213 /// Cancellation token for coordinated shutdown of pipeline tasks.
214 /// When this token is cancelled, nodes should stop processing and exit gracefully.
215 /// This is primarily used in stateless pipelines to abort processing when the
216 /// client disconnects or the request is interrupted.
217 pub cancellation_token: Option<tokio_util::sync::CancellationToken>,
218 /// Channel for runtime pin management messages (Tier 2).
219 /// Only provided for nodes that support dynamic pins.
220 pub pin_management_rx: Option<mpsc::Receiver<PinManagementMessage>>,
221 /// Optional per-pipeline audio buffer pool for hot-path allocations.
222 ///
223 /// Nodes that produce audio frames (decoders, resamplers, mixers) may use this to
224 /// amortize `Vec<f32>` allocations. If `None`, nodes should fall back to allocating.
225 pub audio_pool: Option<Arc<AudioFramePool>>,
226}
227
228impl NodeContext {
229 /// Retrieves an input pin receiver by name, returning an error if not found.
230 /// This is a convenience method to avoid repeated error handling boilerplate.
231 ///
232 /// # Errors
233 ///
234 /// Returns `StreamKitError::Runtime` if the requested input pin doesn't exist.
235 pub fn take_input(&mut self, pin_name: &str) -> Result<mpsc::Receiver<Packet>, StreamKitError> {
236 self.inputs.remove(pin_name).ok_or_else(|| {
237 StreamKitError::Runtime(format!("Engine did not provide '{pin_name}' pin receiver"))
238 })
239 }
240
241 /// Receives a packet from the given receiver, respecting the cancellation token if present.
242 /// Returns None if cancelled or if the channel is closed.
243 ///
244 /// This is a convenience method that should be used in node loops instead of calling recv()
245 /// directly, as it automatically handles cancellation for stateless pipelines.
246 pub async fn recv_with_cancellation(&self, rx: &mut mpsc::Receiver<Packet>) -> Option<Packet> {
247 if let Some(token) = &self.cancellation_token {
248 tokio::select! {
249 () = token.cancelled() => None,
250 packet = rx.recv() => packet,
251 }
252 } else {
253 rx.recv().await
254 }
255 }
256}
257
258/// The fundamental trait for any processing node, designed as an actor.
259#[async_trait]
260pub trait ProcessorNode: Send + Sync {
261 /// Returns the input pins for this specific node instance.
262 fn input_pins(&self) -> Vec<InputPin>;
263
264 /// Returns the output pins for this specific node instance.
265 fn output_pins(&self) -> Vec<OutputPin>;
266
267 /// For nodes that produce a final, self-contained file format, this method
268 /// should return the appropriate MIME type string.
269 fn content_type(&self) -> Option<String> {
270 None // Default implementation for nodes that don't produce a final format.
271 }
272
273 /// Tier 1: Initialization-time discovery.
274 ///
275 /// Called after instantiation but before pipeline execution.
276 /// Allows nodes to probe external resources and finalize pin definitions.
277 ///
278 /// Default implementation does nothing (static pins).
279 ///
280 /// # Example
281 /// ```ignore
282 /// async fn initialize(&mut self, ctx: &InitContext) -> Result<PinUpdate, StreamKitError> {
283 /// // Probe external resource
284 /// let tracks = probe_broadcast(&self.url).await?;
285 ///
286 /// // Update pins based on discovery
287 /// self.tracks = tracks;
288 /// Ok(PinUpdate::Updated {
289 /// inputs: self.input_pins(),
290 /// outputs: self.output_pins(),
291 /// })
292 /// }
293 /// ```
294 async fn initialize(&mut self, _ctx: &InitContext) -> Result<PinUpdate, StreamKitError> {
295 Ok(PinUpdate::NoChange)
296 }
297
298 /// Tier 2: Runtime pin management capability.
299 ///
300 /// Returns true if this node supports adding/removing pins while running.
301 /// Nodes that return true must handle PinManagementMessage messages.
302 ///
303 /// Default implementation returns false (static pins after init).
304 fn supports_dynamic_pins(&self) -> bool {
305 false
306 }
307
308 /// The main actor loop for the node. The engine will spawn this method as a task.
309 async fn run(self: Box<Self>, context: NodeContext) -> Result<(), StreamKitError>;
310}
311
312/// A factory function that creates a new instance of a node, accepting optional configuration.
313/// Wrapped in an Arc to make it cloneable.
314pub type NodeFactory = Arc<
315 dyn Fn(Option<&serde_json::Value>) -> Result<Box<dyn ProcessorNode>, StreamKitError>
316 + Send
317 + Sync,
318>;
319
320/// A factory function that computes a hash of parameters for resource caching.
321///
322/// Given parameters, returns a deterministic hash string used as part of the ResourceKey.
323/// Plugins should hash only the parameters that affect resource initialization (e.g., model path, GPU settings).
324pub type ResourceKeyHasher = Arc<dyn Fn(Option<&serde_json::Value>) -> String + Send + Sync>;