streamkit_core/
node.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Core node abstractions and ProcessorNode trait.
6//!
7//! This module defines the fundamental interface for processing nodes:
8//! - [`ProcessorNode`]: The core trait that all nodes must implement
9//! - [`NodeContext`]: Runtime context passed to nodes during execution
10//! - [`InitContext`]: Context for asynchronous initialization
11//! - [`OutputSender`]: Handle for sending packets to downstream nodes
12
13use crate::control::NodeControlMessage;
14use crate::error::StreamKitError;
15use crate::pins::{InputPin, OutputPin, PinManagementMessage, PinUpdate};
16use crate::state::NodeStateUpdate;
17use crate::stats::NodeStatsUpdate;
18use crate::telemetry::TelemetryEvent;
19use crate::types::Packet;
20use crate::AudioFramePool;
21use async_trait::async_trait;
22use std::collections::HashMap;
23use std::sync::Arc;
24use thiserror::Error;
25use tokio::sync::mpsc;
26
27/// Message type for routed packet delivery.
28/// Uses `Arc<str>` for node and pin names to avoid heap allocations on every send.
29pub type RoutedPacketMessage = (Arc<str>, Arc<str>, Packet);
30
31/// An enum representing the two ways a node's output can be routed.
32#[derive(Clone)]
33pub enum OutputRouting {
34    /// Packets are sent directly to the input channels of downstream nodes.
35    Direct(HashMap<String, mpsc::Sender<Packet>>),
36    /// Packets are sent to a central engine actor for routing.
37    /// Uses Arc<str> for node/pin names to avoid heap allocations on every packet.
38    Routed(mpsc::Sender<RoutedPacketMessage>),
39}
40
41/// A handle given to a node to send its output packets.
42#[derive(Clone)]
43pub struct OutputSender {
44    /// Node name as Arc<str> to avoid cloning allocations
45    node_name: Arc<str>,
46    routing: OutputRouting,
47    /// Cached pin names as Arc<str> to avoid repeated allocations
48    pin_name_cache: HashMap<String, Arc<str>>,
49}
50
51/// Error returned by [`OutputSender::send`] when a packet cannot be delivered.
52#[derive(Debug, Error, Clone, PartialEq, Eq)]
53pub enum OutputSendError {
54    /// The requested output pin does not exist on this node.
55    #[error("unknown output pin '{pin_name}' on node '{node_name}'")]
56    PinNotFound { node_name: String, pin_name: String },
57
58    /// The downstream channel (direct) or engine channel (routed) is closed.
59    #[error("output channel closed for pin '{pin_name}' on node '{node_name}'")]
60    ChannelClosed { node_name: String, pin_name: String },
61}
62
63impl OutputSender {
64    /// Creates a new OutputSender.
65    /// Note: The node_name String is converted to Arc<str> for efficient cloning on the hot path.
66    pub fn new(node_name: String, routing: OutputRouting) -> Self {
67        Self { node_name: Arc::from(node_name), routing, pin_name_cache: HashMap::new() }
68    }
69
70    /// Returns the node's name.
71    #[must_use]
72    pub fn node_name(&self) -> &str {
73        &self.node_name
74    }
75
76    /// Get or cache the pin name as Arc<str> to avoid repeated allocations.
77    fn get_cached_pin_name(&mut self, pin_name: &str) -> Arc<str> {
78        if let Some(cached) = self.pin_name_cache.get(pin_name) {
79            cached.clone() // O(1) Arc clone
80        } else {
81            let arc_name: Arc<str> = Arc::from(pin_name);
82            self.pin_name_cache.insert(pin_name.to_string(), arc_name.clone());
83            arc_name
84        }
85    }
86
87    /// Sends a packet from a specific output pin of this node.
88    /// Returns `Ok(())` if sent successfully.
89    ///
90    /// Nodes should stop processing when this returns an error, as it indicates
91    /// either a programming mistake (unknown pin) or that the pipeline is shutting down.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`OutputSendError::PinNotFound`] if the pin doesn't exist, or
96    /// [`OutputSendError::ChannelClosed`] if the receiving channel is closed.
97    pub async fn send(&mut self, pin_name: &str, packet: Packet) -> Result<(), OutputSendError> {
98        use tokio::sync::mpsc::error::TrySendError;
99
100        match &self.routing {
101            OutputRouting::Direct(senders) => {
102                if let Some(sender) = senders.get(pin_name) {
103                    // Fast path: avoid allocating/awaiting a future if the channel has capacity.
104                    match sender.try_send(packet) {
105                        Ok(()) => {},
106                        Err(TrySendError::Full(packet)) => {
107                            if sender.send(packet).await.is_err() {
108                                // This is expected during cancellation/shutdown, so use debug level
109                                tracing::debug!(
110                                    "Directly connected channel for pin '{}' is closed.",
111                                    pin_name
112                                );
113                                return Err(OutputSendError::ChannelClosed {
114                                    node_name: self.node_name.to_string(),
115                                    pin_name: pin_name.to_string(),
116                                });
117                            }
118                        },
119                        Err(TrySendError::Closed(_packet)) => {
120                            // This is expected during cancellation/shutdown, so use debug level
121                            tracing::debug!(
122                                "Directly connected channel for pin '{}' is closed.",
123                                pin_name
124                            );
125                            return Err(OutputSendError::ChannelClosed {
126                                node_name: self.node_name.to_string(),
127                                pin_name: pin_name.to_string(),
128                            });
129                        },
130                    }
131                } else {
132                    // Pin not found - this is a programming error, log warning and return error
133                    tracing::warn!(
134                        "OutputSender::send() called with unknown pin '{}' on node '{}'. \
135                         Available pins: {:?}. Packet dropped.",
136                        pin_name,
137                        self.node_name,
138                        senders.keys().collect::<Vec<_>>()
139                    );
140                    return Err(OutputSendError::PinNotFound {
141                        node_name: self.node_name.to_string(),
142                        pin_name: pin_name.to_string(),
143                    });
144                }
145            },
146            OutputRouting::Routed(engine_tx) => {
147                // Clone engine_tx first to release the immutable borrow on self,
148                // allowing us to call get_cached_pin_name() which needs &mut self
149                let engine_tx = engine_tx.clone();
150
151                // Use cached Arc<str> for node and pin names to avoid heap allocations
152                let cached_pin = self.get_cached_pin_name(pin_name);
153                let message = (self.node_name.clone(), cached_pin, packet);
154                match engine_tx.try_send(message) {
155                    Ok(()) => {},
156                    Err(TrySendError::Full(message)) => {
157                        if engine_tx.send(message).await.is_err() {
158                            tracing::warn!("Engine channel is closed. Cannot send packet.");
159                            return Err(OutputSendError::ChannelClosed {
160                                node_name: self.node_name.to_string(),
161                                pin_name: pin_name.to_string(),
162                            });
163                        }
164                    },
165                    Err(TrySendError::Closed(_message)) => {
166                        tracing::warn!("Engine channel is closed. Cannot send packet.");
167                        return Err(OutputSendError::ChannelClosed {
168                            node_name: self.node_name.to_string(),
169                            pin_name: pin_name.to_string(),
170                        });
171                    },
172                }
173            },
174        }
175        Ok(())
176    }
177}
178
179/// Context provided to nodes during initialization.
180///
181/// This allows nodes to perform async operations (like probing external resources)
182/// before the pipeline starts executing.
183pub struct InitContext {
184    /// The node's unique identifier in the pipeline
185    pub node_id: String,
186    /// Channel to report state changes during initialization
187    pub state_tx: tokio::sync::mpsc::Sender<NodeStateUpdate>,
188}
189
190/// The context provided by the engine to a node when it is run.
191pub struct NodeContext {
192    pub inputs: HashMap<String, mpsc::Receiver<Packet>>,
193    pub control_rx: mpsc::Receiver<NodeControlMessage>,
194    pub output_sender: OutputSender,
195    pub batch_size: usize,
196    /// Channel for the node to report state changes.
197    /// Nodes should send updates when transitioning between states to enable
198    /// monitoring and debugging. It's acceptable if sends fail (e.g., in stateless
199    /// pipelines where state tracking may not be enabled).
200    pub state_tx: mpsc::Sender<NodeStateUpdate>,
201    /// Channel for the node to report statistics updates.
202    /// Nodes should throttle these updates (e.g., every 10s or 1000 packets)
203    /// to prevent overloading the monitoring system. Like state_tx, it's
204    /// acceptable if sends fail.
205    pub stats_tx: Option<mpsc::Sender<NodeStatsUpdate>>,
206    /// Channel for the node to emit telemetry events.
207    /// Telemetry is best-effort and should never block audio processing.
208    /// Nodes should use `try_send()` or the `TelemetryEmitter` helper which
209    /// handles rate limiting and drop accounting automatically.
210    pub telemetry_tx: Option<mpsc::Sender<TelemetryEvent>>,
211    /// Session ID for gateway registration and routing (if applicable)
212    pub session_id: Option<String>,
213    /// Cancellation token for coordinated shutdown of pipeline tasks.
214    /// When this token is cancelled, nodes should stop processing and exit gracefully.
215    /// This is primarily used in stateless pipelines to abort processing when the
216    /// client disconnects or the request is interrupted.
217    pub cancellation_token: Option<tokio_util::sync::CancellationToken>,
218    /// Channel for runtime pin management messages (Tier 2).
219    /// Only provided for nodes that support dynamic pins.
220    pub pin_management_rx: Option<mpsc::Receiver<PinManagementMessage>>,
221    /// Optional per-pipeline audio buffer pool for hot-path allocations.
222    ///
223    /// Nodes that produce audio frames (decoders, resamplers, mixers) may use this to
224    /// amortize `Vec<f32>` allocations. If `None`, nodes should fall back to allocating.
225    pub audio_pool: Option<Arc<AudioFramePool>>,
226}
227
228impl NodeContext {
229    /// Retrieves an input pin receiver by name, returning an error if not found.
230    /// This is a convenience method to avoid repeated error handling boilerplate.
231    ///
232    /// # Errors
233    ///
234    /// Returns `StreamKitError::Runtime` if the requested input pin doesn't exist.
235    pub fn take_input(&mut self, pin_name: &str) -> Result<mpsc::Receiver<Packet>, StreamKitError> {
236        self.inputs.remove(pin_name).ok_or_else(|| {
237            StreamKitError::Runtime(format!("Engine did not provide '{pin_name}' pin receiver"))
238        })
239    }
240
241    /// Receives a packet from the given receiver, respecting the cancellation token if present.
242    /// Returns None if cancelled or if the channel is closed.
243    ///
244    /// This is a convenience method that should be used in node loops instead of calling recv()
245    /// directly, as it automatically handles cancellation for stateless pipelines.
246    pub async fn recv_with_cancellation(&self, rx: &mut mpsc::Receiver<Packet>) -> Option<Packet> {
247        if let Some(token) = &self.cancellation_token {
248            tokio::select! {
249                () = token.cancelled() => None,
250                packet = rx.recv() => packet,
251            }
252        } else {
253            rx.recv().await
254        }
255    }
256}
257
258/// The fundamental trait for any processing node, designed as an actor.
259#[async_trait]
260pub trait ProcessorNode: Send + Sync {
261    /// Returns the input pins for this specific node instance.
262    fn input_pins(&self) -> Vec<InputPin>;
263
264    /// Returns the output pins for this specific node instance.
265    fn output_pins(&self) -> Vec<OutputPin>;
266
267    /// For nodes that produce a final, self-contained file format, this method
268    /// should return the appropriate MIME type string.
269    fn content_type(&self) -> Option<String> {
270        None // Default implementation for nodes that don't produce a final format.
271    }
272
273    /// Tier 1: Initialization-time discovery.
274    ///
275    /// Called after instantiation but before pipeline execution.
276    /// Allows nodes to probe external resources and finalize pin definitions.
277    ///
278    /// Default implementation does nothing (static pins).
279    ///
280    /// # Example
281    /// ```ignore
282    /// async fn initialize(&mut self, ctx: &InitContext) -> Result<PinUpdate, StreamKitError> {
283    ///     // Probe external resource
284    ///     let tracks = probe_broadcast(&self.url).await?;
285    ///
286    ///     // Update pins based on discovery
287    ///     self.tracks = tracks;
288    ///     Ok(PinUpdate::Updated {
289    ///         inputs: self.input_pins(),
290    ///         outputs: self.output_pins(),
291    ///     })
292    /// }
293    /// ```
294    async fn initialize(&mut self, _ctx: &InitContext) -> Result<PinUpdate, StreamKitError> {
295        Ok(PinUpdate::NoChange)
296    }
297
298    /// Tier 2: Runtime pin management capability.
299    ///
300    /// Returns true if this node supports adding/removing pins while running.
301    /// Nodes that return true must handle PinManagementMessage messages.
302    ///
303    /// Default implementation returns false (static pins after init).
304    fn supports_dynamic_pins(&self) -> bool {
305        false
306    }
307
308    /// The main actor loop for the node. The engine will spawn this method as a task.
309    async fn run(self: Box<Self>, context: NodeContext) -> Result<(), StreamKitError>;
310}
311
312/// A factory function that creates a new instance of a node, accepting optional configuration.
313/// Wrapped in an Arc to make it cloneable.
314pub type NodeFactory = Arc<
315    dyn Fn(Option<&serde_json::Value>) -> Result<Box<dyn ProcessorNode>, StreamKitError>
316        + Send
317        + Sync,
318>;
319
320/// A factory function that computes a hash of parameters for resource caching.
321///
322/// Given parameters, returns a deterministic hash string used as part of the ResourceKey.
323/// Plugins should hash only the parameters that affect resource initialization (e.g., model path, GPU settings).
324pub type ResourceKeyHasher = Arc<dyn Fn(Option<&serde_json::Value>) -> String + Send + Sync>;