Skip to main content

snapcast_server/stream/
manager.rs

1//! Stream manager — owns stream readers, encodes PCM, distributes to clients.
2
3use std::collections::HashMap;
4
5use anyhow::Result;
6use snapcast_proto::SampleFormat;
7use tokio::sync::{broadcast, mpsc};
8use tokio::task::JoinHandle;
9
10use crate::encoder::{self, Encoder};
11use crate::stream;
12
13/// An encoded chunk ready to be sent to clients as a WireChunk.
14#[derive(Debug, Clone)]
15pub struct WireChunkData {
16    /// Stream ID this chunk belongs to.
17    pub stream_id: String,
18    /// Server timestamp in microseconds.
19    pub timestamp_usec: i64,
20    /// Encoded audio data.
21    pub data: Vec<u8>,
22}
23
24/// Info about a managed stream.
25struct ManagedStream {
26    format: SampleFormat,
27    header: Vec<u8>,
28    codec: String,
29    _encode_handle: JoinHandle<()>,
30}
31
32/// Manages all audio streams, encoding, and distribution.
33pub struct StreamManager {
34    streams: HashMap<String, ManagedStream>,
35    /// Broadcast sender for encoded chunks — client sessions subscribe.
36    chunk_tx: broadcast::Sender<WireChunkData>,
37}
38
39impl Default for StreamManager {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45impl StreamManager {
46    /// Create a new stream manager.
47    pub fn new() -> Self {
48        let (chunk_tx, _) = broadcast::channel(256);
49        Self {
50            streams: HashMap::new(),
51            chunk_tx,
52        }
53    }
54
55    /// Get the broadcast sender (for passing to SessionServer).
56    pub fn chunk_sender(&self) -> broadcast::Sender<WireChunkData> {
57        self.chunk_tx.clone()
58    }
59
60    /// Get a broadcast receiver for encoded chunks.
61    pub fn subscribe(&self) -> broadcast::Receiver<WireChunkData> {
62        self.chunk_tx.subscribe()
63    }
64
65    /// Add a stream from a PCM chunk receiver. The caller owns the reader.
66    pub fn add_stream_from_receiver(
67        &mut self,
68        name: &str,
69        encoder_config: encoder::EncoderConfig,
70        reader_rx: mpsc::Receiver<super::PcmChunk>,
71    ) -> Result<()> {
72        let enc = encoder::create(&encoder_config)?;
73        let header = enc.header().to_vec();
74        let codec_name = enc.name().to_string();
75        let format = encoder_config.format;
76        drop(enc);
77
78        let stream_id = name.to_string();
79        let chunk_tx = self.chunk_tx.clone();
80
81        let encode_handle = {
82            let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();
83            let enc_config = encoder_config.clone();
84            std::thread::spawn(move || {
85                let Ok(enc) = encoder::create(&enc_config) else {
86                    return;
87                };
88                encode_loop(enc, reader_rx, &chunk_tx, &stream_id);
89                let _ = done_tx.send(());
90            });
91            tokio::spawn(async move {
92                let _ = done_rx.await;
93            })
94        };
95
96        self.streams.insert(
97            name.to_string(),
98            ManagedStream {
99                format,
100                header,
101                codec: codec_name.clone(),
102                _encode_handle: encode_handle,
103            },
104        );
105
106        tracing::info!(name, %format, codec = codec_name, "Stream added");
107        Ok(())
108    }
109
110    /// Get codec header for a stream: (codec_name, header_bytes, format).
111    pub fn header(&self, stream_id: &str) -> Option<(&str, &[u8], SampleFormat)> {
112        self.streams
113            .get(stream_id)
114            .map(|s| (s.codec.as_str(), s.header.as_slice(), s.format))
115    }
116
117    /// List all stream IDs.
118    pub fn stream_ids(&self) -> Vec<String> {
119        self.streams.keys().cloned().collect()
120    }
121}
122
123/// Blocking encode loop — runs on a dedicated thread via spawn_blocking.
124fn encode_loop(
125    mut enc: Box<dyn Encoder>,
126    mut rx: mpsc::Receiver<stream::PcmChunk>,
127    tx: &broadcast::Sender<WireChunkData>,
128    stream_id: &str,
129) {
130    // Track timestamp of first buffered chunk (for codecs that buffer internally)
131    let mut pending_timestamp: Option<i64> = None;
132
133    while let Some(pcm) = rx.blocking_recv() {
134        // Save timestamp of first chunk fed to encoder
135        if pending_timestamp.is_none() {
136            pending_timestamp = Some(pcm.timestamp_usec);
137        }
138
139        match enc.encode(&pcm.data) {
140            Ok(encoded) => {
141                if encoded.data.is_empty() {
142                    // Encoder is buffering — keep the pending timestamp
143                    continue;
144                }
145                let wire = WireChunkData {
146                    stream_id: stream_id.to_string(),
147                    timestamp_usec: pending_timestamp.take().unwrap_or(pcm.timestamp_usec),
148                    data: encoded.data,
149                };
150                let _ = tx.send(wire);
151                // Reset for next frame
152                pending_timestamp = None;
153            }
154            Err(e) => {
155                tracing::warn!(stream_id, error = %e, "Encode failed");
156                pending_timestamp = None;
157            }
158        }
159    }
160}