#![deny(unsafe_code)]
#![warn(clippy::redundant_closure)]
#![warn(clippy::implicit_clone)]
#![warn(clippy::uninlined_format_args)]
#![warn(missing_docs)]
use std::sync::Arc;
use tokio::sync::mpsc;
#[cfg(feature = "custom-protocol")]
pub use snapcast_proto::CustomMessage;
pub use snapcast_proto::SampleFormat;
pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
const EVENT_CHANNEL_SIZE: usize = 256;
const COMMAND_CHANNEL_SIZE: usize = 64;
const AUDIO_CHANNEL_SIZE: usize = 256;
#[derive(Debug)]
pub struct AudioFrame {
pub samples: Vec<f32>,
pub sample_rate: u32,
pub channels: u16,
pub timestamp_usec: i64,
}
pub mod auth;
#[cfg(feature = "encryption")]
pub mod crypto;
pub mod encoder;
#[cfg(feature = "mdns")]
pub mod mdns;
pub mod session;
pub mod state;
pub mod stream;
pub mod time;
#[derive(Debug, Clone)]
pub struct ClientSettingsUpdate {
pub client_id: String,
pub buffer_ms: i32,
pub latency: i32,
pub volume: u16,
pub muted: bool,
}
#[derive(Debug)]
pub enum ServerEvent {
ClientConnected {
id: String,
name: String,
},
ClientDisconnected {
id: String,
},
ClientVolumeChanged {
client_id: String,
volume: u16,
muted: bool,
},
ClientLatencyChanged {
client_id: String,
latency: i32,
},
ClientNameChanged {
client_id: String,
name: String,
},
GroupStreamChanged {
group_id: String,
stream_id: String,
},
GroupMuteChanged {
group_id: String,
muted: bool,
},
StreamStatus {
stream_id: String,
status: String,
},
#[cfg(feature = "custom-protocol")]
CustomMessage {
client_id: String,
message: snapcast_proto::CustomMessage,
},
}
#[derive(Debug)]
pub enum ServerCommand {
SetClientVolume {
client_id: String,
volume: u16,
muted: bool,
},
SetClientLatency {
client_id: String,
latency: i32,
},
SetClientName {
client_id: String,
name: String,
},
SetGroupStream {
group_id: String,
stream_id: String,
},
SetGroupMute {
group_id: String,
muted: bool,
},
SetGroupName {
group_id: String,
name: String,
},
SetGroupClients {
group_id: String,
clients: Vec<String>,
},
DeleteClient {
client_id: String,
},
GetStatus {
response_tx: tokio::sync::oneshot::Sender<serde_json::Value>,
},
#[cfg(feature = "custom-protocol")]
SendToClient {
client_id: String,
message: snapcast_proto::CustomMessage,
},
Stop,
}
fn default_codec() -> &'static str {
#[cfg(feature = "flac")]
return "flac";
#[cfg(all(feature = "f32lz4", not(feature = "flac")))]
return "f32lz4";
#[cfg(not(any(feature = "flac", feature = "f32lz4")))]
return "pcm";
}
pub struct ServerConfig {
pub stream_port: u16,
pub buffer_ms: u32,
pub codec: String,
pub sample_format: String,
pub mdns_service_type: String,
pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
#[cfg(feature = "encryption")]
pub encryption_psk: Option<String>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
buffer_ms: 1000,
codec: default_codec().into(),
sample_format: "48000:16:2".into(),
mdns_service_type: "_snapcast._tcp.local.".into(),
auth: None,
#[cfg(feature = "encryption")]
encryption_psk: None,
}
}
}
pub struct SnapServer {
config: ServerConfig,
event_tx: mpsc::Sender<ServerEvent>,
command_tx: mpsc::Sender<ServerCommand>,
command_rx: Option<mpsc::Receiver<ServerCommand>>,
audio_rx: Option<mpsc::Receiver<crate::AudioFrame>>,
manager: Option<stream::manager::StreamManager>,
}
impl SnapServer {
pub fn new(
config: ServerConfig,
) -> (
Self,
mpsc::Receiver<ServerEvent>,
mpsc::Sender<crate::AudioFrame>,
) {
let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
let (audio_tx, audio_rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
let server = Self {
config,
event_tx,
command_tx,
command_rx: Some(command_rx),
audio_rx: Some(audio_rx),
manager: None,
};
(server, event_rx, audio_tx)
}
pub fn set_manager(&mut self, manager: stream::manager::StreamManager) {
self.manager = Some(manager);
}
pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
self.command_tx.clone()
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
pub async fn run(&mut self) -> anyhow::Result<()> {
let mut command_rx = self
.command_rx
.take()
.ok_or_else(|| anyhow::anyhow!("run() already called"))?;
let mut audio_rx = self
.audio_rx
.take()
.ok_or_else(|| anyhow::anyhow!("run() already called"))?;
let event_tx = self.event_tx.clone();
tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
#[cfg(feature = "mdns")]
let _mdns =
mdns::MdnsAdvertiser::new(self.config.stream_port, &self.config.mdns_service_type)
.map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
.ok();
let manager = self.manager.take().unwrap_or_default();
let default_format = snapcast_proto::DEFAULT_SAMPLE_FORMAT;
let first_stream = manager.stream_ids().into_iter().next().unwrap_or_default();
let (codec, header) = if let Some((c, h, _)) = manager.header(&first_stream) {
(c.to_string(), h.to_vec())
} else {
let enc_config = encoder::EncoderConfig {
codec: self.config.codec.clone(),
format: default_format,
options: String::new(),
#[cfg(feature = "encryption")]
encryption_psk: self.config.encryption_psk.clone(),
};
let enc = encoder::create(&enc_config)?;
(self.config.codec.clone(), enc.header().to_vec())
};
let chunk_sender = manager.chunk_sender();
let audio_chunk_sender = chunk_sender.clone();
let session_srv = Arc::new(session::SessionServer::new(
self.config.stream_port,
self.config.buffer_ms as i32,
self.config.auth.clone(),
));
let session_for_run = Arc::clone(&session_srv);
let session_event_tx = event_tx.clone();
let session_handle = tokio::spawn(async move {
if let Err(e) = session_for_run
.run(chunk_sender, codec, header, session_event_tx)
.await
{
tracing::error!(error = %e, "Session server error");
}
});
let shared_state = Arc::new(tokio::sync::Mutex::new(state::ServerState::default()));
loop {
tokio::select! {
cmd = command_rx.recv() => {
match cmd {
Some(ServerCommand::Stop) | None => {
tracing::info!("Server stopped");
session_handle.abort();
return Ok(());
}
Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.volume.percent = volume;
c.config.volume.muted = muted;
}
session_srv.push_settings(ClientSettingsUpdate {
client_id: client_id.clone(),
buffer_ms: self.config.buffer_ms as i32,
latency: 0, volume, muted,
}).await;
let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id, volume, muted });
}
Some(ServerCommand::SetClientLatency { client_id, latency }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.latency = latency;
session_srv.push_settings(ClientSettingsUpdate {
client_id: client_id.clone(),
buffer_ms: self.config.buffer_ms as i32,
latency,
volume: c.config.volume.percent,
muted: c.config.volume.muted,
}).await;
}
let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
}
Some(ServerCommand::SetClientName { client_id, name }) => {
let mut s = shared_state.lock().await;
if let Some(c) = s.clients.get_mut(&client_id) {
c.config.name = name.clone();
}
let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
}
Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
shared_state.lock().await.set_group_stream(&group_id, &stream_id);
let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id, stream_id });
}
Some(ServerCommand::SetGroupMute { group_id, muted }) => {
let mut s = shared_state.lock().await;
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.muted = muted;
}
let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id, muted });
}
Some(ServerCommand::SetGroupName { group_id, name }) => {
let mut s = shared_state.lock().await;
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.name = name;
}
}
Some(ServerCommand::SetGroupClients { group_id, clients }) => {
let mut s = shared_state.lock().await;
for cid in &clients {
s.remove_client_from_groups(cid);
}
if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
g.clients = clients;
}
}
Some(ServerCommand::DeleteClient { client_id }) => {
let mut s = shared_state.lock().await;
s.remove_client_from_groups(&client_id);
s.clients.remove(&client_id);
}
Some(ServerCommand::GetStatus { response_tx }) => {
let s = shared_state.lock().await;
let _ = response_tx.send(s.to_status_json());
}
#[cfg(feature = "custom-protocol")]
Some(ServerCommand::SendToClient { client_id, message }) => {
session_srv.send_custom(&client_id, message.type_id, message.payload).await;
}
}
}
frame = audio_rx.recv() => {
if let Some(frame) = frame {
let data = if self.config.codec == "f32lz4" {
frame.samples.iter().flat_map(|s| s.to_le_bytes()).collect()
} else {
let mut pcm = Vec::with_capacity(frame.samples.len() * 2);
for &s in &frame.samples {
let i = (s.clamp(-1.0, 1.0) * i16::MAX as f32) as i16;
pcm.extend_from_slice(&i.to_le_bytes());
}
pcm
};
let wire = stream::manager::WireChunkData {
stream_id: "external".into(),
timestamp_usec: frame.timestamp_usec,
data,
};
let _ = audio_chunk_sender.send(wire);
}
}
}
}
}
}