#![deny(unsafe_code)]
#![warn(clippy::redundant_closure)]
#![warn(clippy::implicit_clone)]
#![warn(clippy::uninlined_format_args)]
#![warn(missing_docs)]
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
#[cfg(feature = "custom-protocol")]
pub use snapcast_proto::CustomMessage;
#[cfg(feature = "encryption")]
pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
pub use snapcast_proto::SampleFormat;
pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
const EVENT_CHANNEL_SIZE: usize = 256;
const COMMAND_CHANNEL_SIZE: usize = 64;
const AUDIO_CHANNEL_SIZE: usize = 256;
const F32_CHANNEL_SIZE: usize = 1;
#[derive(Debug, Clone)]
pub enum AudioData {
F32(Vec<f32>),
Pcm(Vec<u8>),
}
#[derive(Debug, Clone)]
pub struct AudioFrame {
pub data: AudioData,
pub timestamp_usec: i64,
}
pub struct F32AudioSender {
tx: mpsc::Sender<AudioFrame>,
buf: Vec<f32>,
chunk_samples: usize,
channels: u16,
sample_rate: u32,
ts: Option<time::ChunkTimestamper>,
last_send: std::time::Instant,
}
impl F32AudioSender {
fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
Self {
tx,
buf: Vec::with_capacity(chunk_samples * 2),
chunk_samples,
channels,
sample_rate,
ts: None,
last_send: std::time::Instant::now(),
}
}
pub async fn send(
&mut self,
samples: &[f32],
) -> Result<(), mpsc::error::SendError<AudioFrame>> {
let now = std::time::Instant::now();
if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
self.ts = None;
self.buf.clear();
}
self.last_send = now;
self.buf.extend_from_slice(samples);
let ch = self.channels.max(1) as usize;
while self.buf.len() >= self.chunk_samples {
let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
let frames = (self.chunk_samples / ch) as u32;
let ts = self
.ts
.get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
let timestamp_usec = ts.next(frames);
self.tx
.send(AudioFrame {
data: AudioData::F32(chunk),
timestamp_usec,
})
.await?;
}
Ok(())
}
pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
if self.buf.is_empty() {
return Ok(());
}
let chunk: Vec<f32> = self.buf.drain(..).collect();
let ch = self.channels.max(1) as usize;
let frames = (chunk.len() / ch) as u32;
let ts = self
.ts
.get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
let timestamp_usec = ts.next(frames);
self.tx
.send(AudioFrame {
data: AudioData::F32(chunk),
timestamp_usec,
})
.await
}
}
#[derive(Debug, Clone)]
pub struct WireChunkData {
pub stream_id: String,
pub timestamp_usec: i64,
pub data: Vec<u8>,
}
pub mod auth;
#[cfg(feature = "encryption")]
pub(crate) mod crypto;
pub(crate) mod encoder;
#[cfg(feature = "mdns")]
pub(crate) mod mdns;
pub(crate) mod session;
pub(crate) mod state;
pub mod status;
pub(crate) mod stream;
pub mod time;
#[derive(Debug, Clone)]
pub struct ClientSettingsUpdate {
pub client_id: String,
pub buffer_ms: i32,
pub latency: i32,
pub volume: u16,
pub muted: bool,
}
#[derive(Debug)]
#[non_exhaustive]
pub enum ServerEvent {
ClientConnected {
id: String,
name: String,
mac: String,
},
ClientDisconnected {
id: String,
},
ClientVolumeChanged {
client_id: String,
volume: u16,
muted: bool,
},
ClientLatencyChanged {
client_id: String,
latency: i32,
},
ClientNameChanged {
client_id: String,
name: String,
},
GroupStreamChanged {
group_id: String,
stream_id: String,
},
GroupMuteChanged {
group_id: String,
muted: bool,
},
StreamStatus {
stream_id: String,
status: String,
},
StreamMetaChanged {
stream_id: String,
metadata: std::collections::HashMap<String, serde_json::Value>,
},
GroupNameChanged {
group_id: String,
name: String,
},
ServerUpdated,
StreamControl {
stream_id: String,
command: String,
params: serde_json::Value,
},
#[cfg(feature = "custom-protocol")]
CustomMessage {
client_id: String,
message: snapcast_proto::CustomMessage,
},
}
#[derive(Debug)]
#[non_exhaustive]
pub enum ServerCommand {
SetClientVolume {
client_id: String,
volume: u16,
muted: bool,
},
SetClientLatency {
client_id: String,
latency: i32,
},
SetClientName {
client_id: String,
name: String,
},
SetGroupStream {
group_id: String,
stream_id: String,
},
SetGroupMute {
group_id: String,
muted: bool,
},
SetGroupName {
group_id: String,
name: String,
},
SetGroupClients {
group_id: String,
clients: Vec<String>,
},
DeleteClient {
client_id: String,
},
SetStreamMeta {
stream_id: String,
metadata: std::collections::HashMap<String, serde_json::Value>,
},
AddStream {
uri: String,
response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
},
RemoveStream {
stream_id: String,
},
StreamControl {
stream_id: String,
command: String,
params: serde_json::Value,
},
GetStatus {
response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
},
#[cfg(feature = "custom-protocol")]
SendToClient {
client_id: String,
message: snapcast_proto::CustomMessage,
},
Stop,
}
fn default_codec() -> &'static str {
#[cfg(feature = "flac")]
return "flac";
#[cfg(all(feature = "f32lz4", not(feature = "flac")))]
return "f32lz4";
#[cfg(not(any(feature = "flac", feature = "f32lz4")))]
return "pcm";
}
pub struct ServerConfig {
pub stream_port: u16,
pub buffer_ms: u32,
pub codec: String,
pub sample_format: String,
#[cfg(feature = "mdns")]
pub mdns_service_type: String,
#[cfg(feature = "mdns")]
pub mdns_enabled: bool,
#[cfg(feature = "mdns")]
pub mdns_name: String,
pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
#[cfg(feature = "encryption")]
pub encryption_psk: Option<String>,
pub state_file: Option<std::path::PathBuf>,
pub send_audio_to_muted: bool,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
buffer_ms: 1000,
codec: default_codec().into(),
sample_format: "48000:16:2".into(),
#[cfg(feature = "mdns")]
mdns_service_type: "_snapcast._tcp.local.".into(),
#[cfg(feature = "mdns")]
mdns_enabled: true,
#[cfg(feature = "mdns")]
mdns_name: "Snapserver".into(),
auth: None,
#[cfg(feature = "encryption")]
encryption_psk: None,
state_file: None,
send_audio_to_muted: false,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamConfig {
pub codec: Option<String>,
pub sample_format: Option<String>,
}
pub struct SnapServer {
config: ServerConfig,
event_tx: mpsc::Sender<ServerEvent>,
command_tx: mpsc::Sender<ServerCommand>,
command_rx: Option<mpsc::Receiver<ServerCommand>>,
streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
chunk_tx: broadcast::Sender<WireChunkData>,
}
fn spawn_stream_encoder(
stream_id: String,
mut rx: mpsc::Receiver<AudioFrame>,
mut enc: Box<dyn encoder::Encoder>,
chunk_tx: broadcast::Sender<WireChunkData>,
sample_rate: u32,
channels: u16,
) {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.expect("encoder runtime");
rt.block_on(async {
let mut next_tick: Option<tokio::time::Instant> = None;
while let Some(frame) = rx.recv().await {
if let AudioData::F32(ref samples) = frame.data {
let num_frames = samples.len() / channels.max(1) as usize;
let chunk_dur = std::time::Duration::from_micros(
(num_frames as u64 * 1_000_000) / sample_rate as u64,
);
let now = tokio::time::Instant::now();
let tick = next_tick.get_or_insert(now);
if now.checked_duration_since(*tick + chunk_dur)
> Some(std::time::Duration::from_millis(500))
{
*tick = now;
}
*tick += chunk_dur;
tokio::time::sleep_until(*tick).await;
}
match enc.encode(&frame.data) {
Ok(encoded) if !encoded.data.is_empty() => {
let _ = chunk_tx.send(WireChunkData {
stream_id: stream_id.clone(),
timestamp_usec: frame.timestamp_usec,
data: encoded.data,
});
}
Err(e) => {
tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
}
_ => {} }
}
});
});
}
impl SnapServer {
pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
let (chunk_tx, _) = broadcast::channel(256);
let server = Self {
config,
event_tx,
command_tx,
command_rx: Some(command_rx),
streams: Vec::new(),
chunk_tx,
};
(server, event_rx)
}
pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
self.add_stream_with_config(name, StreamConfig::default())
}
pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
let sf: SampleFormat =
self.config.sample_format.parse().map_err(|e| {
format!("invalid sample_format '{}': {e}", self.config.sample_format)
})?;
let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
self.streams
.push((name.to_string(), StreamConfig::default(), rx));
Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
}
pub fn add_stream_with_config(
&mut self,
name: &str,
config: StreamConfig,
) -> mpsc::Sender<AudioFrame> {
let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
self.streams.push((name.to_string(), config, rx));
tx
}
pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
self.command_tx.clone()
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
pub async fn run(&mut self) -> anyhow::Result<()> {
let mut command_rx = self
.command_rx
.take()
.ok_or_else(|| anyhow::anyhow!("run() already called"))?;
let event_tx = self.event_tx.clone();
let sample_format: snapcast_proto::SampleFormat = self
.config
.sample_format
.parse()
.unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
anyhow::ensure!(
!self.streams.is_empty(),
"No streams configured — call add_stream() before run()"
);
tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
#[cfg(feature = "mdns")]
let _mdns = if self.config.mdns_enabled {
mdns::MdnsAdvertiser::new(
self.config.stream_port,
&self.config.mdns_service_type,
&self.config.mdns_name,
)
.map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
.ok()
} else {
None
};
let default_enc_config = encoder::EncoderConfig {
codec: self.config.codec.clone(),
format: sample_format,
options: String::new(),
#[cfg(feature = "encryption")]
encryption_psk: self.config.encryption_psk.clone(),
};
let default_enc = encoder::create(&default_enc_config)?;
let chunk_tx = self.chunk_tx.clone();
let streams = std::mem::take(&mut self.streams);
let mut default_enc = Some(default_enc);
let initial_state = self
.config
.state_file
.as_ref()
.map(|p| state::ServerState::load(p))
.unwrap_or_default();
let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
let first_name = streams
.first()
.map(|(n, _, _)| n.clone())
.unwrap_or_default();
let session_srv = Arc::new(session::SessionServer::new(
self.config.stream_port,
self.config.buffer_ms as i32,
self.config.auth.clone(),
Arc::clone(&shared_state),
first_name.clone(),
self.config.send_audio_to_muted,
));
for (name, stream_cfg, rx) in streams {
{
let mut s = shared_state.lock().await;
s.streams.push(state::StreamInfo {
id: name.clone(),
status: "idle".into(),
uri: String::new(),
properties: Default::default(),
});
}
let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
if let Some(enc) = default_enc.take() {
enc
} else {
encoder::create(&default_enc_config)?
}
} else {
let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
let stream_format: snapcast_proto::SampleFormat = stream_cfg
.sample_format
.as_deref()
.and_then(|s| s.parse().ok())
.unwrap_or(sample_format);
encoder::create(&encoder::EncoderConfig {
codec: stream_codec.to_string(),
format: stream_format,
options: String::new(),
#[cfg(feature = "encryption")]
encryption_psk: self.config.encryption_psk.clone(),
})?
};
tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
session_srv
.register_stream_codec(&name, enc.name(), enc.header())
.await;
spawn_stream_encoder(
name,
rx,
enc,
chunk_tx.clone(),
sample_format.rate(),
sample_format.channels(),
);
}
let session_for_run = Arc::clone(&session_srv);
let session_event_tx = event_tx.clone();
let session_chunk_tx = self.chunk_tx.clone();
let session_handle = tokio::spawn(async move {
if let Err(e) = session_for_run
.run(session_chunk_tx, session_event_tx)
.await
{
tracing::error!(error = %e, "Session server error");
}
});
let state_file = self.config.state_file.clone();
let save_state = |s: &state::ServerState| {
if let Some(ref path) = state_file {
let _ = s
.save(path)
.map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
}
};
loop {
tokio::select! {
cmd = command_rx.recv() => {
match cmd {
Some(ServerCommand::Stop) | None => {
tracing::info!("Server stopped");
session_handle.abort();
return Ok(());
}
Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.volume.percent = volume;
c.config.volume.muted = muted;
}
let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
save_state(&s);
drop(s);
session_srv.push_settings(ClientSettingsUpdate {
client_id: client_id.clone(),
buffer_ms: self.config.buffer_ms as i32,
latency, volume, muted,
}).await;
let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
session_srv.update_routing_for_client(&client_id).await;
}
Some(ServerCommand::SetClientLatency { client_id, latency }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.latency = latency;
session_srv.push_settings(ClientSettingsUpdate {
client_id: client_id.clone(),
buffer_ms: self.config.buffer_ms as i32,
latency,
volume: c.config.volume.percent,
muted: c.config.volume.muted,
}).await;
}
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
}
Some(ServerCommand::SetClientName { client_id, name }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.name = name.clone();
}
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
}
Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
let mut s = shared_state.lock().await;
s.set_group_stream(&group_id, &stream_id);
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
session_srv.update_routing_for_group(&group_id).await;
}
Some(ServerCommand::SetGroupMute { group_id, muted }) => {
let mut s = shared_state.lock().await;
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.muted = muted;
}
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
session_srv.update_routing_for_group(&group_id).await;
}
Some(ServerCommand::SetGroupName { group_id, name }) => {
let mut s = shared_state.lock().await;
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.name = name.clone();
}
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
}
Some(ServerCommand::SetGroupClients { group_id, clients }) => {
let mut s = shared_state.lock().await;
s.set_group_clients(&group_id, &clients);
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::ServerUpdated);
session_srv.update_routing_all().await;
}
Some(ServerCommand::DeleteClient { client_id }) => {
let mut s = shared_state.lock().await;
s.remove_client_from_groups(&client_id);
s.clients.remove(&client_id);
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::ServerUpdated);
session_srv.update_routing_all().await;
}
Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
let mut s = shared_state.lock().await;
if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
stream.properties = metadata.clone();
}
drop(s);
let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
}
Some(ServerCommand::AddStream { uri, response_tx }) => {
let name = uri.split("name=").nth(1)
.and_then(|s| s.split('&').next())
.unwrap_or("dynamic")
.to_string();
let mut s = shared_state.lock().await;
if s.streams.iter().any(|st| st.id == name) {
let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
} else {
s.streams.push(state::StreamInfo {
id: name.clone(),
status: "idle".into(),
uri: uri.clone(),
properties: Default::default(),
});
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::ServerUpdated);
let _ = response_tx.send(Ok(name));
}
}
Some(ServerCommand::RemoveStream { stream_id }) => {
let mut s = shared_state.lock().await;
s.streams.retain(|st| st.id != stream_id);
for g in &mut s.groups {
if g.stream_id == stream_id {
g.stream_id.clear();
}
}
save_state(&s);
drop(s);
let _ = event_tx.try_send(ServerEvent::ServerUpdated);
session_srv.update_routing_all().await;
}
Some(ServerCommand::StreamControl { stream_id, command, params }) => {
tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
}
Some(ServerCommand::GetStatus { response_tx }) => {
let s = shared_state.lock().await;
let _ = response_tx.send(s.to_status());
}
#[cfg(feature = "custom-protocol")]
Some(ServerCommand::SendToClient { client_id, message }) => {
session_srv.send_custom(&client_id, message.type_id, message.payload).await;
}
}
}
}
}
}
}