snapcast_server/stream/
manager.rs1use std::collections::HashMap;
4
5use anyhow::Result;
6use snapcast_proto::SampleFormat;
7use tokio::sync::{broadcast, mpsc};
8use tokio::task::JoinHandle;
9
10use crate::encoder::{self, Encoder};
11use crate::stream;
12
13#[derive(Debug, Clone)]
15pub struct WireChunkData {
16 pub stream_id: String,
18 pub timestamp_usec: i64,
20 pub data: Vec<u8>,
22}
23
24struct ManagedStream {
26 format: SampleFormat,
27 header: Vec<u8>,
28 codec: String,
29 _encode_handle: JoinHandle<()>,
30}
31
32pub struct StreamManager {
34 streams: HashMap<String, ManagedStream>,
35 chunk_tx: broadcast::Sender<WireChunkData>,
37}
38
39impl Default for StreamManager {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl StreamManager {
46 pub fn new() -> Self {
48 let (chunk_tx, _) = broadcast::channel(256);
49 Self {
50 streams: HashMap::new(),
51 chunk_tx,
52 }
53 }
54
55 pub fn chunk_sender(&self) -> broadcast::Sender<WireChunkData> {
57 self.chunk_tx.clone()
58 }
59
60 pub fn subscribe(&self) -> broadcast::Receiver<WireChunkData> {
62 self.chunk_tx.subscribe()
63 }
64
65 pub fn add_stream_from_receiver(
67 &mut self,
68 name: &str,
69 encoder_config: encoder::EncoderConfig,
70 reader_rx: mpsc::Receiver<super::PcmChunk>,
71 ) -> Result<()> {
72 let enc = encoder::create(&encoder_config)?;
73 let header = enc.header().to_vec();
74 let codec_name = enc.name().to_string();
75 let format = encoder_config.format;
76 drop(enc);
77
78 let stream_id = name.to_string();
79 let chunk_tx = self.chunk_tx.clone();
80
81 let encode_handle = {
82 let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();
83 let enc_config = encoder_config.clone();
84 std::thread::spawn(move || {
85 let Ok(enc) = encoder::create(&enc_config) else {
86 return;
87 };
88 encode_loop(enc, reader_rx, &chunk_tx, &stream_id);
89 let _ = done_tx.send(());
90 });
91 tokio::spawn(async move {
92 let _ = done_rx.await;
93 })
94 };
95
96 self.streams.insert(
97 name.to_string(),
98 ManagedStream {
99 format,
100 header,
101 codec: codec_name.clone(),
102 _encode_handle: encode_handle,
103 },
104 );
105
106 tracing::info!(name, %format, codec = codec_name, "Stream added");
107 Ok(())
108 }
109
110 pub fn header(&self, stream_id: &str) -> Option<(&str, &[u8], SampleFormat)> {
112 self.streams
113 .get(stream_id)
114 .map(|s| (s.codec.as_str(), s.header.as_slice(), s.format))
115 }
116
117 pub fn stream_ids(&self) -> Vec<String> {
119 self.streams.keys().cloned().collect()
120 }
121}
122
123fn encode_loop(
125 mut enc: Box<dyn Encoder>,
126 mut rx: mpsc::Receiver<stream::PcmChunk>,
127 tx: &broadcast::Sender<WireChunkData>,
128 stream_id: &str,
129) {
130 let mut pending_timestamp: Option<i64> = None;
132
133 while let Some(pcm) = rx.blocking_recv() {
134 if pending_timestamp.is_none() {
136 pending_timestamp = Some(pcm.timestamp_usec);
137 }
138
139 match enc.encode(&pcm.data) {
140 Ok(encoded) => {
141 if encoded.data.is_empty() {
142 continue;
144 }
145 let wire = WireChunkData {
146 stream_id: stream_id.to_string(),
147 timestamp_usec: pending_timestamp.take().unwrap_or(pcm.timestamp_usec),
148 data: encoded.data,
149 };
150 let _ = tx.send(wire);
151 pending_timestamp = None;
153 }
154 Err(e) => {
155 tracing::warn!(stream_id, error = %e, "Encode failed");
156 pending_timestamp = None;
157 }
158 }
159 }
160}