1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
45
46use tokio::sync::mpsc;
47
48#[cfg(feature = "custom-protocol")]
50pub use snapcast_proto::CustomMessage;
51pub use snapcast_proto::SampleFormat;
52pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
53
54const EVENT_CHANNEL_SIZE: usize = 256;
55const COMMAND_CHANNEL_SIZE: usize = 64;
56const AUDIO_CHANNEL_SIZE: usize = 256;
57
58#[derive(Debug)]
60pub struct AudioFrame {
61 pub samples: Vec<f32>,
63 pub sample_rate: u32,
65 pub channels: u16,
67 pub timestamp_usec: i64,
69}
70
71pub mod auth;
72#[cfg(feature = "encryption")]
73pub mod crypto;
74pub mod encoder;
75#[cfg(feature = "mdns")]
76pub mod mdns;
77pub mod session;
78pub mod state;
79pub mod stream;
80pub mod time;
81
82#[derive(Debug, Clone)]
84pub struct ClientSettingsUpdate {
85 pub client_id: String,
87 pub buffer_ms: i32,
89 pub latency: i32,
91 pub volume: u16,
93 pub muted: bool,
95}
96
97#[derive(Debug)]
99pub enum ServerEvent {
100 ClientConnected {
102 id: String,
104 name: String,
106 },
107 ClientDisconnected {
109 id: String,
111 },
112 ClientVolumeChanged {
114 client_id: String,
116 volume: u16,
118 muted: bool,
120 },
121 ClientLatencyChanged {
123 client_id: String,
125 latency: i32,
127 },
128 ClientNameChanged {
130 client_id: String,
132 name: String,
134 },
135 GroupStreamChanged {
137 group_id: String,
139 stream_id: String,
141 },
142 GroupMuteChanged {
144 group_id: String,
146 muted: bool,
148 },
149 StreamStatus {
151 stream_id: String,
153 status: String,
155 },
156 #[cfg(feature = "custom-protocol")]
158 CustomMessage {
159 client_id: String,
161 message: snapcast_proto::CustomMessage,
163 },
164}
165
166#[derive(Debug)]
168pub enum ServerCommand {
169 SetClientVolume {
171 client_id: String,
173 volume: u16,
175 muted: bool,
177 },
178 SetClientLatency {
180 client_id: String,
182 latency: i32,
184 },
185 SetClientName {
187 client_id: String,
189 name: String,
191 },
192 SetGroupStream {
194 group_id: String,
196 stream_id: String,
198 },
199 SetGroupMute {
201 group_id: String,
203 muted: bool,
205 },
206 SetGroupName {
208 group_id: String,
210 name: String,
212 },
213 SetGroupClients {
215 group_id: String,
217 clients: Vec<String>,
219 },
220 DeleteClient {
222 client_id: String,
224 },
225 GetStatus {
227 response_tx: tokio::sync::oneshot::Sender<serde_json::Value>,
229 },
230 #[cfg(feature = "custom-protocol")]
232 SendToClient {
233 client_id: String,
235 message: snapcast_proto::CustomMessage,
237 },
238 Stop,
240}
241
242fn default_codec() -> &'static str {
244 #[cfg(feature = "flac")]
245 return "flac";
246 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
247 return "f32lz4";
248 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
249 return "pcm";
250}
251
252pub struct ServerConfig {
254 pub stream_port: u16,
256 pub buffer_ms: u32,
258 pub codec: String,
260 pub sample_format: String,
262 pub mdns_service_type: String,
264 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
266 #[cfg(feature = "encryption")]
268 pub encryption_psk: Option<String>,
269}
270
271impl Default for ServerConfig {
272 fn default() -> Self {
273 Self {
274 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
275 buffer_ms: 1000,
276 codec: default_codec().into(),
277 sample_format: "48000:16:2".into(),
278 mdns_service_type: "_snapcast._tcp.local.".into(),
279 auth: None,
280 #[cfg(feature = "encryption")]
281 encryption_psk: None,
282 }
283 }
284}
285
286pub struct SnapServer {
288 config: ServerConfig,
289 event_tx: mpsc::Sender<ServerEvent>,
290 command_tx: mpsc::Sender<ServerCommand>,
291 command_rx: Option<mpsc::Receiver<ServerCommand>>,
292 audio_rx: Option<mpsc::Receiver<crate::AudioFrame>>,
293 manager: Option<stream::manager::StreamManager>,
294}
295
296impl SnapServer {
297 pub fn new(
302 config: ServerConfig,
303 ) -> (
304 Self,
305 mpsc::Receiver<ServerEvent>,
306 mpsc::Sender<crate::AudioFrame>,
307 ) {
308 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
309 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
310 let (audio_tx, audio_rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
311 let server = Self {
312 config,
313 event_tx,
314 command_tx,
315 command_rx: Some(command_rx),
316 audio_rx: Some(audio_rx),
317 manager: None,
318 };
319 (server, event_rx, audio_tx)
320 }
321
322 pub fn set_manager(&mut self, manager: stream::manager::StreamManager) {
324 self.manager = Some(manager);
325 }
326
327 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
329 self.command_tx.clone()
330 }
331
332 pub fn config(&self) -> &ServerConfig {
334 &self.config
335 }
336
337 pub async fn run(&mut self) -> anyhow::Result<()> {
339 let mut command_rx = self
340 .command_rx
341 .take()
342 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
343
344 let mut audio_rx = self
345 .audio_rx
346 .take()
347 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
348
349 let event_tx = self.event_tx.clone();
350
351 tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
352
353 #[cfg(feature = "mdns")]
355 let _mdns =
356 mdns::MdnsAdvertiser::new(self.config.stream_port, &self.config.mdns_service_type)
357 .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
358 .ok();
359
360 let manager = self.manager.take().unwrap_or_default();
361
362 let default_format = snapcast_proto::DEFAULT_SAMPLE_FORMAT;
363 let first_stream = manager.stream_ids().into_iter().next().unwrap_or_default();
364 let (codec, header) = if let Some((c, h, _)) = manager.header(&first_stream) {
365 (c.to_string(), h.to_vec())
366 } else {
367 let enc_config = encoder::EncoderConfig {
368 codec: self.config.codec.clone(),
369 format: default_format,
370 options: String::new(),
371 #[cfg(feature = "encryption")]
372 encryption_psk: self.config.encryption_psk.clone(),
373 };
374 let enc = encoder::create(&enc_config)?;
375 (self.config.codec.clone(), enc.header().to_vec())
376 };
377
378 let chunk_sender = manager.chunk_sender();
379 let audio_chunk_sender = chunk_sender.clone();
380
381 let session_srv = Arc::new(session::SessionServer::new(
383 self.config.stream_port,
384 self.config.buffer_ms as i32,
385 self.config.auth.clone(),
386 ));
387 let session_for_run = Arc::clone(&session_srv);
388 let session_event_tx = event_tx.clone();
389 let session_handle = tokio::spawn(async move {
390 if let Err(e) = session_for_run
391 .run(chunk_sender, codec, header, session_event_tx)
392 .await
393 {
394 tracing::error!(error = %e, "Session server error");
395 }
396 });
397
398 let shared_state = Arc::new(tokio::sync::Mutex::new(state::ServerState::default()));
400
401 loop {
403 tokio::select! {
404 cmd = command_rx.recv() => {
405 match cmd {
406 Some(ServerCommand::Stop) | None => {
407 tracing::info!("Server stopped");
408 session_handle.abort();
409 return Ok(());
410 }
411 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
412 let mut s = shared_state.lock().await;
413 if let Some(c) = s.clients.get_mut(&client_id) {
414 c.config.volume.percent = volume;
415 c.config.volume.muted = muted;
416 }
417 session_srv.push_settings(ClientSettingsUpdate {
418 client_id: client_id.clone(),
419 buffer_ms: self.config.buffer_ms as i32,
420 latency: 0, volume, muted,
421 }).await;
422 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id, volume, muted });
423 }
424 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
425 let mut s = shared_state.lock().await;
426 if let Some(c) = s.clients.get_mut(&client_id) {
427 c.config.latency = latency;
428 session_srv.push_settings(ClientSettingsUpdate {
429 client_id: client_id.clone(),
430 buffer_ms: self.config.buffer_ms as i32,
431 latency,
432 volume: c.config.volume.percent,
433 muted: c.config.volume.muted,
434 }).await;
435 }
436 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
437 }
438 Some(ServerCommand::SetClientName { client_id, name }) => {
439 let mut s = shared_state.lock().await;
440 if let Some(c) = s.clients.get_mut(&client_id) {
441 c.config.name = name.clone();
442 }
443 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
444 }
445 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
446 shared_state.lock().await.set_group_stream(&group_id, &stream_id);
447 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id, stream_id });
448 }
449 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
450 let mut s = shared_state.lock().await;
451 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
452 g.muted = muted;
453 }
454 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id, muted });
455 }
456 Some(ServerCommand::SetGroupName { group_id, name }) => {
457 let mut s = shared_state.lock().await;
458 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
459 g.name = name;
460 }
461 }
462 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
463 let mut s = shared_state.lock().await;
464 for cid in &clients {
465 s.remove_client_from_groups(cid);
466 }
467 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
468 g.clients = clients;
469 }
470 }
471 Some(ServerCommand::DeleteClient { client_id }) => {
472 let mut s = shared_state.lock().await;
473 s.remove_client_from_groups(&client_id);
474 s.clients.remove(&client_id);
475 }
476 Some(ServerCommand::GetStatus { response_tx }) => {
477 let s = shared_state.lock().await;
478 let _ = response_tx.send(s.to_status_json());
479 }
480 #[cfg(feature = "custom-protocol")]
481 Some(ServerCommand::SendToClient { client_id, message }) => {
482 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
483 }
484 }
485 }
486 frame = audio_rx.recv() => {
487 if let Some(frame) = frame {
488 let data = if self.config.codec == "f32lz4" {
489 frame.samples.iter().flat_map(|s| s.to_le_bytes()).collect()
490 } else {
491 let mut pcm = Vec::with_capacity(frame.samples.len() * 2);
492 for &s in &frame.samples {
493 let i = (s.clamp(-1.0, 1.0) * i16::MAX as f32) as i16;
494 pcm.extend_from_slice(&i.to_le_bytes());
495 }
496 pcm
497 };
498 let wire = stream::manager::WireChunkData {
499 stream_id: "external".into(),
500 timestamp_usec: frame.timestamp_usec,
501 data,
502 };
503 let _ = audio_chunk_sender.send(wire);
504 }
505 }
506 }
507 }
508 }
509}