1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
56
57const EVENT_CHANNEL_SIZE: usize = 256;
58const COMMAND_CHANNEL_SIZE: usize = 64;
59const AUDIO_CHANNEL_SIZE: usize = 256;
60
61#[derive(Debug, Clone)]
63pub enum AudioData {
64 F32(Vec<f32>),
67 Pcm(Vec<u8>),
70}
71
72#[derive(Debug, Clone)]
74pub struct AudioFrame {
75 pub data: AudioData,
77 pub timestamp_usec: i64,
79}
80
81#[derive(Debug, Clone)]
83pub struct WireChunkData {
84 pub stream_id: String,
86 pub timestamp_usec: i64,
88 pub data: Vec<u8>,
90}
91
92pub mod auth;
93#[cfg(feature = "encryption")]
94pub mod crypto;
95pub(crate) mod encoder;
96#[cfg(feature = "mdns")]
97pub mod mdns;
98pub mod session;
99pub mod state;
100pub(crate) mod stream;
101pub mod time;
102
103#[derive(Debug, Clone)]
105pub struct ClientSettingsUpdate {
106 pub client_id: String,
108 pub buffer_ms: i32,
110 pub latency: i32,
112 pub volume: u16,
114 pub muted: bool,
116}
117
118#[derive(Debug)]
120pub enum ServerEvent {
121 ClientConnected {
123 id: String,
125 name: String,
127 mac: String,
129 },
130 ClientDisconnected {
132 id: String,
134 },
135 ClientVolumeChanged {
137 client_id: String,
139 volume: u16,
141 muted: bool,
143 },
144 ClientLatencyChanged {
146 client_id: String,
148 latency: i32,
150 },
151 ClientNameChanged {
153 client_id: String,
155 name: String,
157 },
158 GroupStreamChanged {
160 group_id: String,
162 stream_id: String,
164 },
165 GroupMuteChanged {
167 group_id: String,
169 muted: bool,
171 },
172 StreamStatus {
174 stream_id: String,
176 status: String,
178 },
179 GroupNameChanged {
181 group_id: String,
183 name: String,
185 },
186 ServerUpdated,
192 #[cfg(feature = "custom-protocol")]
194 CustomMessage {
195 client_id: String,
197 message: snapcast_proto::CustomMessage,
199 },
200}
201
202#[derive(Debug)]
204pub enum ServerCommand {
205 SetClientVolume {
207 client_id: String,
209 volume: u16,
211 muted: bool,
213 },
214 SetClientLatency {
216 client_id: String,
218 latency: i32,
220 },
221 SetClientName {
223 client_id: String,
225 name: String,
227 },
228 SetGroupStream {
230 group_id: String,
232 stream_id: String,
234 },
235 SetGroupMute {
237 group_id: String,
239 muted: bool,
241 },
242 SetGroupName {
244 group_id: String,
246 name: String,
248 },
249 SetGroupClients {
251 group_id: String,
253 clients: Vec<String>,
255 },
256 DeleteClient {
258 client_id: String,
260 },
261 GetStatus {
263 response_tx: tokio::sync::oneshot::Sender<serde_json::Value>,
265 },
266 #[cfg(feature = "custom-protocol")]
268 SendToClient {
269 client_id: String,
271 message: snapcast_proto::CustomMessage,
273 },
274 Stop,
276}
277
278fn default_codec() -> &'static str {
280 #[cfg(feature = "flac")]
281 return "flac";
282 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
283 return "f32lz4";
284 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
285 return "pcm";
286}
287
288pub struct ServerConfig {
290 pub stream_port: u16,
292 pub buffer_ms: u32,
294 pub codec: String,
296 pub sample_format: String,
298 pub mdns_service_type: String,
300 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
302 #[cfg(feature = "encryption")]
304 pub encryption_psk: Option<String>,
305}
306
307impl Default for ServerConfig {
308 fn default() -> Self {
309 Self {
310 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
311 buffer_ms: 1000,
312 codec: default_codec().into(),
313 sample_format: "48000:16:2".into(),
314 mdns_service_type: "_snapcast._tcp.local.".into(),
315 auth: None,
316 #[cfg(feature = "encryption")]
317 encryption_psk: None,
318 }
319 }
320}
321
322#[derive(Debug, Clone, Default)]
324pub struct StreamConfig {
325 pub codec: Option<String>,
327 pub sample_format: Option<String>,
329}
330
331pub struct SnapServer {
333 config: ServerConfig,
334 event_tx: mpsc::Sender<ServerEvent>,
335 command_tx: mpsc::Sender<ServerCommand>,
336 command_rx: Option<mpsc::Receiver<ServerCommand>>,
337 streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
339 chunk_tx: broadcast::Sender<WireChunkData>,
341}
342
343fn spawn_stream_encoder(
348 stream_id: String,
349 mut rx: mpsc::Receiver<AudioFrame>,
350 mut enc: Box<dyn encoder::Encoder>,
351 chunk_tx: broadcast::Sender<WireChunkData>,
352) {
353 std::thread::spawn(move || {
354 let rt = tokio::runtime::Builder::new_current_thread()
355 .enable_all()
356 .build()
357 .expect("encoder runtime");
358
359 rt.block_on(async {
360 while let Some(frame) = rx.recv().await {
361 match enc.encode(&frame.data) {
362 Ok(encoded) if !encoded.data.is_empty() => {
363 let _ = chunk_tx.send(WireChunkData {
364 stream_id: stream_id.clone(),
365 timestamp_usec: frame.timestamp_usec,
366 data: encoded.data,
367 });
368 }
369 Err(e) => {
370 tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
371 }
372 _ => {} }
374 }
375 });
376 });
377}
378
379impl SnapServer {
381 pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
383 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
384 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
385 let (chunk_tx, _) = broadcast::channel(256);
386 let server = Self {
387 config,
388 event_tx,
389 command_tx,
390 command_rx: Some(command_rx),
391 streams: Vec::new(),
392 chunk_tx,
393 };
394 (server, event_rx)
395 }
396
397 pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
401 self.add_stream_with_config(name, StreamConfig::default())
402 }
403
404 pub fn add_stream_with_config(
406 &mut self,
407 name: &str,
408 config: StreamConfig,
409 ) -> mpsc::Sender<AudioFrame> {
410 let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
411 self.streams.push((name.to_string(), config, rx));
412 tx
413 }
414
415 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
417 self.command_tx.clone()
418 }
419
420 pub fn config(&self) -> &ServerConfig {
422 &self.config
423 }
424
425 pub async fn run(&mut self) -> anyhow::Result<()> {
427 let mut command_rx = self
428 .command_rx
429 .take()
430 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
431
432 let event_tx = self.event_tx.clone();
433
434 let sample_format: snapcast_proto::SampleFormat = self
435 .config
436 .sample_format
437 .parse()
438 .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
439
440 anyhow::ensure!(
441 !self.streams.is_empty(),
442 "No streams configured — call add_stream() before run()"
443 );
444
445 tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
446
447 #[cfg(feature = "mdns")]
449 let _mdns =
450 mdns::MdnsAdvertiser::new(self.config.stream_port, &self.config.mdns_service_type)
451 .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
452 .ok();
453
454 let default_enc_config = encoder::EncoderConfig {
456 codec: self.config.codec.clone(),
457 format: sample_format,
458 options: String::new(),
459 #[cfg(feature = "encryption")]
460 encryption_psk: self.config.encryption_psk.clone(),
461 };
462 let default_enc = encoder::create(&default_enc_config)?;
463 let codec = default_enc.name().to_string();
464 let codec_header = default_enc.header().to_vec();
465
466 let chunk_tx = self.chunk_tx.clone();
468 let streams = std::mem::take(&mut self.streams);
469 let mut default_enc = Some(default_enc);
470 for (name, stream_cfg, rx) in streams {
471 let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
472 if let Some(enc) = default_enc.take() {
473 enc
474 } else {
475 encoder::create(&default_enc_config)?
476 }
477 } else {
478 let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
479 let stream_format: snapcast_proto::SampleFormat = stream_cfg
480 .sample_format
481 .as_deref()
482 .and_then(|s| s.parse().ok())
483 .unwrap_or(sample_format);
484 encoder::create(&encoder::EncoderConfig {
485 codec: stream_codec.to_string(),
486 format: stream_format,
487 options: String::new(),
488 #[cfg(feature = "encryption")]
489 encryption_psk: self.config.encryption_psk.clone(),
490 })?
491 };
492 tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
493 spawn_stream_encoder(name, rx, enc, chunk_tx.clone());
494 }
495
496 let session_srv = Arc::new(session::SessionServer::new(
498 self.config.stream_port,
499 self.config.buffer_ms as i32,
500 self.config.auth.clone(),
501 ));
502 let session_for_run = Arc::clone(&session_srv);
503 let session_event_tx = event_tx.clone();
504 let session_chunk_tx = self.chunk_tx.clone();
505 let session_handle = tokio::spawn(async move {
506 if let Err(e) = session_for_run
507 .run(session_chunk_tx, codec, codec_header, session_event_tx)
508 .await
509 {
510 tracing::error!(error = %e, "Session server error");
511 }
512 });
513
514 let shared_state = Arc::new(tokio::sync::Mutex::new(state::ServerState::default()));
516
517 loop {
519 tokio::select! {
520 cmd = command_rx.recv() => {
521 match cmd {
522 Some(ServerCommand::Stop) | None => {
523 tracing::info!("Server stopped");
524 session_handle.abort();
525 return Ok(());
526 }
527 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
528 let mut s = shared_state.lock().await;
529 if let Some(c) = s.clients.get_mut(&client_id) {
530 c.config.volume.percent = volume;
531 c.config.volume.muted = muted;
532 }
533 session_srv.push_settings(ClientSettingsUpdate {
534 client_id: client_id.clone(),
535 buffer_ms: self.config.buffer_ms as i32,
536 latency: 0, volume, muted,
537 }).await;
538 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id, volume, muted });
539 }
540 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
541 let mut s = shared_state.lock().await;
542 if let Some(c) = s.clients.get_mut(&client_id) {
543 c.config.latency = latency;
544 session_srv.push_settings(ClientSettingsUpdate {
545 client_id: client_id.clone(),
546 buffer_ms: self.config.buffer_ms as i32,
547 latency,
548 volume: c.config.volume.percent,
549 muted: c.config.volume.muted,
550 }).await;
551 }
552 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
553 }
554 Some(ServerCommand::SetClientName { client_id, name }) => {
555 let mut s = shared_state.lock().await;
556 if let Some(c) = s.clients.get_mut(&client_id) {
557 c.config.name = name.clone();
558 }
559 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
560 }
561 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
562 shared_state.lock().await.set_group_stream(&group_id, &stream_id);
563 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id, stream_id });
564 }
565 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
566 let mut s = shared_state.lock().await;
567 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
568 g.muted = muted;
569 }
570 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id, muted });
571 }
572 Some(ServerCommand::SetGroupName { group_id, name }) => {
573 let mut s = shared_state.lock().await;
574 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
575 g.name = name.clone();
576 }
577 let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
578 }
579 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
580 let mut s = shared_state.lock().await;
581 for cid in &clients {
582 s.remove_client_from_groups(cid);
583 }
584 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
585 g.clients = clients;
586 }
587 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
589 }
590 Some(ServerCommand::DeleteClient { client_id }) => {
591 let mut s = shared_state.lock().await;
592 s.remove_client_from_groups(&client_id);
593 s.clients.remove(&client_id);
594 drop(s);
595 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
596 }
597 Some(ServerCommand::GetStatus { response_tx }) => {
598 let s = shared_state.lock().await;
599 let _ = response_tx.send(s.to_status_json());
600 }
601 #[cfg(feature = "custom-protocol")]
602 Some(ServerCommand::SendToClient { client_id, message }) => {
603 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
604 }
605 }
606 }
607 }
608 }
609 }
610}