streamkit_core/helpers.rs
1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Utility functions for node configuration and packet processing.
6//!
7//! This module provides helper functions that simplify common tasks:
8//! - [`config_helpers`]: Parse node configuration from YAML
9//! - [`packet_helpers`]: Batch packet processing utilities
10
11use crate::error::StreamKitError;
12use crate::types::Packet;
13
14/// Helper functions for parsing node configuration from JSON values.
15pub mod config_helpers {
16 use super::StreamKitError;
17 use serde::Deserialize;
18
19 /// Parses configuration from an optional JSON value, using defaults if not provided.
20 /// This is the preferred approach for nodes with sensible defaults.
21 ///
22 /// # Errors
23 ///
24 /// This function always returns `Ok` in practice, as it uses `Default` when parsing fails.
25 /// The `Result` return type is maintained for API consistency with other config helpers.
26 pub fn parse_config_optional<T>(params: Option<&serde_json::Value>) -> Result<T, StreamKitError>
27 where
28 T: for<'de> Deserialize<'de> + Default,
29 {
30 Ok(serde_json::from_value(params.unwrap_or(&serde_json::Value::Null).clone())
31 .unwrap_or_default())
32 }
33
34 /// Parses configuration from an optional JSON value, returning an error if not provided.
35 /// Use this for nodes that require explicit configuration.
36 ///
37 /// # Errors
38 ///
39 /// Returns `StreamKitError::Configuration` if `params` is `None` or if deserialization fails.
40 pub fn parse_config_required<T>(params: Option<&serde_json::Value>) -> Result<T, StreamKitError>
41 where
42 T: for<'de> Deserialize<'de>,
43 {
44 let value = params
45 .ok_or_else(|| StreamKitError::Configuration("Configuration required".to_string()))?
46 .clone();
47 serde_json::from_value(value)
48 .map_err(|e| StreamKitError::Configuration(format!("Failed to parse config: {e}")))
49 }
50
51 /// Parses configuration with detailed error messages.
52 /// Use this when you want to provide context about what failed to parse.
53 ///
54 /// # Errors
55 ///
56 /// Returns `StreamKitError::Configuration` if `params` is `None` or if deserialization fails.
57 pub fn parse_config_with_context<T>(
58 params: Option<&serde_json::Value>,
59 context: &str,
60 ) -> Result<T, StreamKitError>
61 where
62 T: for<'de> Deserialize<'de>,
63 {
64 params.map_or_else(
65 || Err(StreamKitError::Configuration(format!("{context} configuration required"))),
66 |p| {
67 serde_json::from_value(p.clone()).map_err(|e| {
68 StreamKitError::Configuration(format!("Failed to parse {context}: {e}"))
69 })
70 },
71 )
72 }
73}
74
75/// Helper functions for common packet processing patterns.
76pub mod packet_helpers {
77 use super::Packet;
78 use smallvec::SmallVec;
79 use tokio::sync::mpsc;
80
81 /// Default batch size for stack-allocated SmallVec.
82 ///
83 /// 32 packets fits typical batch processing while avoiding heap allocation.
84 /// Each Packet is ~40 bytes (enum discriminant + largest variant), so 32 packets = ~1.3KB on stack.
85 pub const DEFAULT_BATCH_CAPACITY: usize = 32;
86
87 /// A batch of packets that uses stack allocation for small batches.
88 /// Falls back to heap allocation only if more than DEFAULT_BATCH_CAPACITY packets are collected.
89 pub type PacketBatch = SmallVec<[Packet; DEFAULT_BATCH_CAPACITY]>;
90
91 /// Greedily collects a batch of packets from a receiver.
92 /// Starts with the given first packet, then attempts to drain up to `batch_size - 1`
93 /// additional packets without blocking.
94 ///
95 /// This is useful for processing packets in batches to amortize processing overhead.
96 ///
97 /// # Performance
98 ///
99 /// Uses SmallVec to avoid heap allocation for batches up to 32 packets.
100 /// For most real-time audio processing, batches are small (1-8 packets), so this
101 /// avoids allocation in the common case.
102 pub fn batch_packets_greedy(
103 first_packet: Packet,
104 rx: &mut mpsc::Receiver<Packet>,
105 batch_size: usize,
106 ) -> PacketBatch {
107 let mut batch = PacketBatch::new();
108 batch.push(first_packet);
109
110 for _ in 0..batch_size.saturating_sub(1) {
111 match rx.try_recv() {
112 Ok(packet) => batch.push(packet),
113 Err(_) => break,
114 }
115 }
116 batch
117 }
118}