#![deny(unsafe_code)]
#![warn(clippy::redundant_closure)]
#![warn(clippy::implicit_clone)]
#![warn(clippy::uninlined_format_args)]
#![warn(missing_docs)]
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc};
#[cfg(feature = "custom-protocol")]
pub use snapcast_proto::CustomMessage;
#[cfg(feature = "encryption")]
pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
pub use snapcast_proto::SampleFormat;
pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
const EVENT_CHANNEL_SIZE: usize = 256;
const COMMAND_CHANNEL_SIZE: usize = 64;
const AUDIO_CHANNEL_SIZE: usize = 256;
#[derive(Debug, Clone)]
pub enum AudioData {
F32(Vec<f32>),
Pcm(Vec<u8>),
}
#[derive(Debug, Clone)]
pub struct AudioFrame {
pub data: AudioData,
pub timestamp_usec: i64,
}
#[derive(Debug, Clone)]
pub struct WireChunkData {
pub stream_id: String,
pub timestamp_usec: i64,
pub data: Vec<u8>,
}
pub mod auth;
#[cfg(feature = "encryption")]
pub mod crypto;
pub(crate) mod encoder;
#[cfg(feature = "mdns")]
pub mod mdns;
pub mod session;
pub mod state;
pub(crate) mod stream;
pub mod time;
#[derive(Debug, Clone)]
pub struct ClientSettingsUpdate {
pub client_id: String,
pub buffer_ms: i32,
pub latency: i32,
pub volume: u16,
pub muted: bool,
}
#[derive(Debug)]
pub enum ServerEvent {
ClientConnected {
id: String,
name: String,
mac: String,
},
ClientDisconnected {
id: String,
},
ClientVolumeChanged {
client_id: String,
volume: u16,
muted: bool,
},
ClientLatencyChanged {
client_id: String,
latency: i32,
},
ClientNameChanged {
client_id: String,
name: String,
},
GroupStreamChanged {
group_id: String,
stream_id: String,
},
GroupMuteChanged {
group_id: String,
muted: bool,
},
StreamStatus {
stream_id: String,
status: String,
},
GroupNameChanged {
group_id: String,
name: String,
},
ServerUpdated,
#[cfg(feature = "custom-protocol")]
CustomMessage {
client_id: String,
message: snapcast_proto::CustomMessage,
},
}
#[derive(Debug)]
pub enum ServerCommand {
SetClientVolume {
client_id: String,
volume: u16,
muted: bool,
},
SetClientLatency {
client_id: String,
latency: i32,
},
SetClientName {
client_id: String,
name: String,
},
SetGroupStream {
group_id: String,
stream_id: String,
},
SetGroupMute {
group_id: String,
muted: bool,
},
SetGroupName {
group_id: String,
name: String,
},
SetGroupClients {
group_id: String,
clients: Vec<String>,
},
DeleteClient {
client_id: String,
},
GetStatus {
response_tx: tokio::sync::oneshot::Sender<serde_json::Value>,
},
#[cfg(feature = "custom-protocol")]
SendToClient {
client_id: String,
message: snapcast_proto::CustomMessage,
},
Stop,
}
fn default_codec() -> &'static str {
#[cfg(feature = "flac")]
return "flac";
#[cfg(all(feature = "f32lz4", not(feature = "flac")))]
return "f32lz4";
#[cfg(not(any(feature = "flac", feature = "f32lz4")))]
return "pcm";
}
pub struct ServerConfig {
pub stream_port: u16,
pub buffer_ms: u32,
pub codec: String,
pub sample_format: String,
pub mdns_service_type: String,
pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
#[cfg(feature = "encryption")]
pub encryption_psk: Option<String>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
buffer_ms: 1000,
codec: default_codec().into(),
sample_format: "48000:16:2".into(),
mdns_service_type: "_snapcast._tcp.local.".into(),
auth: None,
#[cfg(feature = "encryption")]
encryption_psk: None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamConfig {
pub codec: Option<String>,
pub sample_format: Option<String>,
}
pub struct SnapServer {
config: ServerConfig,
event_tx: mpsc::Sender<ServerEvent>,
command_tx: mpsc::Sender<ServerCommand>,
command_rx: Option<mpsc::Receiver<ServerCommand>>,
streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
chunk_tx: broadcast::Sender<WireChunkData>,
}
fn spawn_stream_encoder(
stream_id: String,
mut rx: mpsc::Receiver<AudioFrame>,
mut enc: Box<dyn encoder::Encoder>,
chunk_tx: broadcast::Sender<WireChunkData>,
) {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("encoder runtime");
rt.block_on(async {
while let Some(frame) = rx.recv().await {
match enc.encode(&frame.data) {
Ok(encoded) if !encoded.data.is_empty() => {
let _ = chunk_tx.send(WireChunkData {
stream_id: stream_id.clone(),
timestamp_usec: frame.timestamp_usec,
data: encoded.data,
});
}
Err(e) => {
tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
}
_ => {} }
}
});
});
}
impl SnapServer {
pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
let (chunk_tx, _) = broadcast::channel(256);
let server = Self {
config,
event_tx,
command_tx,
command_rx: Some(command_rx),
streams: Vec::new(),
chunk_tx,
};
(server, event_rx)
}
pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
self.add_stream_with_config(name, StreamConfig::default())
}
pub fn add_stream_with_config(
&mut self,
name: &str,
config: StreamConfig,
) -> mpsc::Sender<AudioFrame> {
let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
self.streams.push((name.to_string(), config, rx));
tx
}
pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
self.command_tx.clone()
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
pub async fn run(&mut self) -> anyhow::Result<()> {
let mut command_rx = self
.command_rx
.take()
.ok_or_else(|| anyhow::anyhow!("run() already called"))?;
let event_tx = self.event_tx.clone();
let sample_format: snapcast_proto::SampleFormat = self
.config
.sample_format
.parse()
.unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
anyhow::ensure!(
!self.streams.is_empty(),
"No streams configured — call add_stream() before run()"
);
tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
#[cfg(feature = "mdns")]
let _mdns =
mdns::MdnsAdvertiser::new(self.config.stream_port, &self.config.mdns_service_type)
.map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
.ok();
let default_enc_config = encoder::EncoderConfig {
codec: self.config.codec.clone(),
format: sample_format,
options: String::new(),
#[cfg(feature = "encryption")]
encryption_psk: self.config.encryption_psk.clone(),
};
let default_enc = encoder::create(&default_enc_config)?;
let codec = default_enc.name().to_string();
let codec_header = default_enc.header().to_vec();
let chunk_tx = self.chunk_tx.clone();
let streams = std::mem::take(&mut self.streams);
let mut default_enc = Some(default_enc);
for (name, stream_cfg, rx) in streams {
let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
if let Some(enc) = default_enc.take() {
enc
} else {
encoder::create(&default_enc_config)?
}
} else {
let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
let stream_format: snapcast_proto::SampleFormat = stream_cfg
.sample_format
.as_deref()
.and_then(|s| s.parse().ok())
.unwrap_or(sample_format);
encoder::create(&encoder::EncoderConfig {
codec: stream_codec.to_string(),
format: stream_format,
options: String::new(),
#[cfg(feature = "encryption")]
encryption_psk: self.config.encryption_psk.clone(),
})?
};
tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
spawn_stream_encoder(name, rx, enc, chunk_tx.clone());
}
let session_srv = Arc::new(session::SessionServer::new(
self.config.stream_port,
self.config.buffer_ms as i32,
self.config.auth.clone(),
));
let session_for_run = Arc::clone(&session_srv);
let session_event_tx = event_tx.clone();
let session_chunk_tx = self.chunk_tx.clone();
let session_handle = tokio::spawn(async move {
if let Err(e) = session_for_run
.run(session_chunk_tx, codec, codec_header, session_event_tx)
.await
{
tracing::error!(error = %e, "Session server error");
}
});
let shared_state = Arc::new(tokio::sync::Mutex::new(state::ServerState::default()));
loop {
tokio::select! {
cmd = command_rx.recv() => {
match cmd {
Some(ServerCommand::Stop) | None => {
tracing::info!("Server stopped");
session_handle.abort();
return Ok(());
}
Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.volume.percent = volume;
c.config.volume.muted = muted;
}
session_srv.push_settings(ClientSettingsUpdate {
client_id: client_id.clone(),
buffer_ms: self.config.buffer_ms as i32,
latency: 0, volume, muted,
}).await;
let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id, volume, muted });
}
Some(ServerCommand::SetClientLatency { client_id, latency }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.latency = latency;
session_srv.push_settings(ClientSettingsUpdate {
client_id: client_id.clone(),
buffer_ms: self.config.buffer_ms as i32,
latency,
volume: c.config.volume.percent,
muted: c.config.volume.muted,
}).await;
}
let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
}
Some(ServerCommand::SetClientName { client_id, name }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.name = name.clone();
}
let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
}
Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
shared_state.lock().await.set_group_stream(&group_id, &stream_id);
let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id, stream_id });
}
Some(ServerCommand::SetGroupMute { group_id, muted }) => {
let mut s = shared_state.lock().await;
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.muted = muted;
}
let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id, muted });
}
Some(ServerCommand::SetGroupName { group_id, name }) => {
let mut s = shared_state.lock().await;
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.name = name.clone();
}
let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
}
Some(ServerCommand::SetGroupClients { group_id, clients }) => {
let mut s = shared_state.lock().await;
for cid in &clients {
s.remove_client_from_groups(cid);
}
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.clients = clients;
}
let _ = event_tx.try_send(ServerEvent::ServerUpdated);
}
Some(ServerCommand::DeleteClient { client_id }) => {
let mut s = shared_state.lock().await;
s.remove_client_from_groups(&client_id);
s.clients.remove(&client_id);
drop(s);
let _ = event_tx.try_send(ServerEvent::ServerUpdated);
}
Some(ServerCommand::GetStatus { response_tx }) => {
let s = shared_state.lock().await;
let _ = response_tx.send(s.to_status_json());
}
#[cfg(feature = "custom-protocol")]
Some(ServerCommand::SendToClient { client_id, message }) => {
session_srv.send_custom(&client_id, message.type_id, message.payload).await;
}
}
}
}
}
}
}