use std::collections::HashMap;
use anyhow::Result;
use snapcast_proto::SampleFormat;
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;
use crate::encoder::{self, Encoder};
use crate::stream;
#[derive(Debug, Clone)]
pub struct WireChunkData {
pub stream_id: String,
pub timestamp_usec: i64,
pub data: Vec<u8>,
}
struct ManagedStream {
format: SampleFormat,
header: Vec<u8>,
codec: String,
_encode_handle: JoinHandle<()>,
}
pub struct StreamManager {
streams: HashMap<String, ManagedStream>,
chunk_tx: broadcast::Sender<WireChunkData>,
}
impl Default for StreamManager {
fn default() -> Self {
Self::new()
}
}
impl StreamManager {
pub fn new() -> Self {
let (chunk_tx, _) = broadcast::channel(256);
Self {
streams: HashMap::new(),
chunk_tx,
}
}
pub fn chunk_sender(&self) -> broadcast::Sender<WireChunkData> {
self.chunk_tx.clone()
}
pub fn subscribe(&self) -> broadcast::Receiver<WireChunkData> {
self.chunk_tx.subscribe()
}
pub fn add_stream_from_receiver(
&mut self,
name: &str,
encoder_config: encoder::EncoderConfig,
reader_rx: mpsc::Receiver<super::PcmChunk>,
) -> Result<()> {
let enc = encoder::create(&encoder_config)?;
let header = enc.header().to_vec();
let codec_name = enc.name().to_string();
let format = encoder_config.format;
drop(enc);
let stream_id = name.to_string();
let chunk_tx = self.chunk_tx.clone();
let encode_handle = {
let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();
let enc_config = encoder_config.clone();
std::thread::spawn(move || {
let Ok(enc) = encoder::create(&enc_config) else {
return;
};
encode_loop(enc, reader_rx, &chunk_tx, &stream_id);
let _ = done_tx.send(());
});
tokio::spawn(async move {
let _ = done_rx.await;
})
};
self.streams.insert(
name.to_string(),
ManagedStream {
format,
header,
codec: codec_name.clone(),
_encode_handle: encode_handle,
},
);
tracing::info!(name, %format, codec = codec_name, "Stream added");
Ok(())
}
pub fn header(&self, stream_id: &str) -> Option<(&str, &[u8], SampleFormat)> {
self.streams
.get(stream_id)
.map(|s| (s.codec.as_str(), s.header.as_slice(), s.format))
}
pub fn stream_ids(&self) -> Vec<String> {
self.streams.keys().cloned().collect()
}
}
fn encode_loop(
mut enc: Box<dyn Encoder>,
mut rx: mpsc::Receiver<stream::PcmChunk>,
tx: &broadcast::Sender<WireChunkData>,
stream_id: &str,
) {
let mut pending_timestamp: Option<i64> = None;
while let Some(pcm) = rx.blocking_recv() {
if pending_timestamp.is_none() {
pending_timestamp = Some(pcm.timestamp_usec);
}
match enc.encode(&pcm.data) {
Ok(encoded) => {
if encoded.data.is_empty() {
continue;
}
let wire = WireChunkData {
stream_id: stream_id.to_string(),
timestamp_usec: pending_timestamp.take().unwrap_or(pcm.timestamp_usec),
data: encoded.data,
};
let _ = tx.send(wire);
pending_timestamp = None;
}
Err(e) => {
tracing::warn!(stream_id, error = %e, "Encode failed");
pending_timestamp = None;
}
}
}
}