streamkit_core/
helpers.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Utility functions for node configuration and packet processing.
6//!
7//! This module provides helper functions that simplify common tasks:
8//! - [`config_helpers`]: Parse node configuration from YAML
9//! - [`packet_helpers`]: Batch packet processing utilities
10
11use crate::error::StreamKitError;
12use crate::types::Packet;
13
14/// Helper functions for parsing node configuration from JSON values.
15pub mod config_helpers {
16    use super::StreamKitError;
17    use serde::Deserialize;
18
19    /// Parses configuration from an optional JSON value, using defaults if not provided.
20    /// This is the preferred approach for nodes with sensible defaults.
21    ///
22    /// # Errors
23    ///
24    /// This function always returns `Ok` in practice, as it uses `Default` when parsing fails.
25    /// The `Result` return type is maintained for API consistency with other config helpers.
26    pub fn parse_config_optional<T>(params: Option<&serde_json::Value>) -> Result<T, StreamKitError>
27    where
28        T: for<'de> Deserialize<'de> + Default,
29    {
30        Ok(serde_json::from_value(params.unwrap_or(&serde_json::Value::Null).clone())
31            .unwrap_or_default())
32    }
33
34    /// Parses configuration from an optional JSON value, returning an error if not provided.
35    /// Use this for nodes that require explicit configuration.
36    ///
37    /// # Errors
38    ///
39    /// Returns `StreamKitError::Configuration` if `params` is `None` or if deserialization fails.
40    pub fn parse_config_required<T>(params: Option<&serde_json::Value>) -> Result<T, StreamKitError>
41    where
42        T: for<'de> Deserialize<'de>,
43    {
44        let value = params
45            .ok_or_else(|| StreamKitError::Configuration("Configuration required".to_string()))?
46            .clone();
47        serde_json::from_value(value)
48            .map_err(|e| StreamKitError::Configuration(format!("Failed to parse config: {e}")))
49    }
50
51    /// Parses configuration with detailed error messages.
52    /// Use this when you want to provide context about what failed to parse.
53    ///
54    /// # Errors
55    ///
56    /// Returns `StreamKitError::Configuration` if `params` is `None` or if deserialization fails.
57    pub fn parse_config_with_context<T>(
58        params: Option<&serde_json::Value>,
59        context: &str,
60    ) -> Result<T, StreamKitError>
61    where
62        T: for<'de> Deserialize<'de>,
63    {
64        params.map_or_else(
65            || Err(StreamKitError::Configuration(format!("{context} configuration required"))),
66            |p| {
67                serde_json::from_value(p.clone()).map_err(|e| {
68                    StreamKitError::Configuration(format!("Failed to parse {context}: {e}"))
69                })
70            },
71        )
72    }
73}
74
75/// Helper functions for common packet processing patterns.
76pub mod packet_helpers {
77    use super::Packet;
78    use smallvec::SmallVec;
79    use tokio::sync::mpsc;
80
81    /// Default batch size for stack-allocated SmallVec.
82    ///
83    /// 32 packets fits typical batch processing while avoiding heap allocation.
84    /// Each Packet is ~40 bytes (enum discriminant + largest variant), so 32 packets = ~1.3KB on stack.
85    pub const DEFAULT_BATCH_CAPACITY: usize = 32;
86
87    /// A batch of packets that uses stack allocation for small batches.
88    /// Falls back to heap allocation only if more than DEFAULT_BATCH_CAPACITY packets are collected.
89    pub type PacketBatch = SmallVec<[Packet; DEFAULT_BATCH_CAPACITY]>;
90
91    /// Greedily collects a batch of packets from a receiver.
92    /// Starts with the given first packet, then attempts to drain up to `batch_size - 1`
93    /// additional packets without blocking.
94    ///
95    /// This is useful for processing packets in batches to amortize processing overhead.
96    ///
97    /// # Performance
98    ///
99    /// Uses SmallVec to avoid heap allocation for batches up to 32 packets.
100    /// For most real-time audio processing, batches are small (1-8 packets), so this
101    /// avoids allocation in the common case.
102    pub fn batch_packets_greedy(
103        first_packet: Packet,
104        rx: &mut mpsc::Receiver<Packet>,
105        batch_size: usize,
106    ) -> PacketBatch {
107        let mut batch = PacketBatch::new();
108        batch.push(first_packet);
109
110        for _ in 0..batch_size.saturating_sub(1) {
111            match rx.try_recv() {
112                Ok(packet) => batch.push(packet),
113                Err(_) => break,
114            }
115        }
116        batch
117    }
118}