1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
56
57const EVENT_CHANNEL_SIZE: usize = 256;
58const COMMAND_CHANNEL_SIZE: usize = 64;
59const AUDIO_CHANNEL_SIZE: usize = 256;
60
61const F32_CHANNEL_SIZE: usize = 1;
63
64#[derive(Debug, Clone)]
66pub enum AudioData {
67 F32(Vec<f32>),
70 Pcm(Vec<u8>),
73}
74
75#[derive(Debug, Clone)]
77pub struct AudioFrame {
78 pub data: AudioData,
80 pub timestamp_usec: i64,
82}
83
84pub struct F32AudioSender {
91 tx: mpsc::Sender<AudioFrame>,
92 buf: Vec<f32>,
93 chunk_samples: usize,
94 channels: u16,
95 sample_rate: u32,
96 ts: Option<time::ChunkTimestamper>,
97 last_send: std::time::Instant,
98}
99
100impl F32AudioSender {
101 fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
102 let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
103 Self {
104 tx,
105 buf: Vec::with_capacity(chunk_samples * 2),
106 chunk_samples,
107 channels,
108 sample_rate,
109 ts: None,
110 last_send: std::time::Instant::now(),
111 }
112 }
113
114 pub async fn send(
117 &mut self,
118 samples: &[f32],
119 ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
120 let now = std::time::Instant::now();
121 if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
122 self.ts = None;
123 self.buf.clear();
124 }
125 self.last_send = now;
126
127 self.buf.extend_from_slice(samples);
128 let ch = self.channels.max(1) as usize;
129 while self.buf.len() >= self.chunk_samples {
130 let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
131 let frames = (self.chunk_samples / ch) as u32;
132 let ts = self
133 .ts
134 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
135 let timestamp_usec = ts.next(frames);
136 self.tx
137 .send(AudioFrame {
138 data: AudioData::F32(chunk),
139 timestamp_usec,
140 })
141 .await?;
142 }
143 Ok(())
144 }
145
146 pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
149 if self.buf.is_empty() {
150 return Ok(());
151 }
152 let chunk: Vec<f32> = self.buf.drain(..).collect();
153 let ch = self.channels.max(1) as usize;
154 let frames = (chunk.len() / ch) as u32;
155 let ts = self
156 .ts
157 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
158 let timestamp_usec = ts.next(frames);
159 self.tx
160 .send(AudioFrame {
161 data: AudioData::F32(chunk),
162 timestamp_usec,
163 })
164 .await
165 }
166}
167
168#[derive(Debug, Clone)]
170pub struct WireChunkData {
171 pub stream_id: String,
173 pub timestamp_usec: i64,
175 pub data: Vec<u8>,
177}
178
179pub mod auth;
180#[cfg(feature = "encryption")]
181pub(crate) mod crypto;
182pub(crate) mod encoder;
183#[cfg(feature = "mdns")]
184pub(crate) mod mdns;
185pub(crate) mod session;
186pub(crate) mod state;
187pub mod status;
188pub(crate) mod stream;
189pub mod time;
190
191#[derive(Debug, Clone)]
193pub struct ClientSettingsUpdate {
194 pub client_id: String,
196 pub buffer_ms: i32,
198 pub latency: i32,
200 pub volume: u16,
202 pub muted: bool,
204}
205
206#[derive(Debug)]
208#[non_exhaustive]
209pub enum ServerEvent {
210 ClientConnected {
212 id: String,
214 name: String,
216 mac: String,
218 },
219 ClientDisconnected {
221 id: String,
223 },
224 ClientVolumeChanged {
226 client_id: String,
228 volume: u16,
230 muted: bool,
232 },
233 ClientLatencyChanged {
235 client_id: String,
237 latency: i32,
239 },
240 ClientNameChanged {
242 client_id: String,
244 name: String,
246 },
247 GroupStreamChanged {
249 group_id: String,
251 stream_id: String,
253 },
254 GroupMuteChanged {
256 group_id: String,
258 muted: bool,
260 },
261 StreamStatus {
263 stream_id: String,
265 status: String,
267 },
268 StreamMetaChanged {
270 stream_id: String,
272 metadata: std::collections::HashMap<String, serde_json::Value>,
274 },
275 GroupNameChanged {
277 group_id: String,
279 name: String,
281 },
282 ServerUpdated,
288 StreamControl {
292 stream_id: String,
294 command: String,
296 params: serde_json::Value,
298 },
299 #[cfg(feature = "custom-protocol")]
301 CustomMessage {
302 client_id: String,
304 message: snapcast_proto::CustomMessage,
306 },
307}
308
309#[derive(Debug)]
311#[non_exhaustive]
312pub enum ServerCommand {
313 SetClientVolume {
315 client_id: String,
317 volume: u16,
319 muted: bool,
321 },
322 SetClientLatency {
324 client_id: String,
326 latency: i32,
328 },
329 SetClientName {
331 client_id: String,
333 name: String,
335 },
336 SetGroupStream {
338 group_id: String,
340 stream_id: String,
342 },
343 SetGroupMute {
345 group_id: String,
347 muted: bool,
349 },
350 SetGroupName {
352 group_id: String,
354 name: String,
356 },
357 SetGroupClients {
359 group_id: String,
361 clients: Vec<String>,
363 },
364 DeleteClient {
366 client_id: String,
368 },
369 SetStreamMeta {
371 stream_id: String,
373 metadata: std::collections::HashMap<String, serde_json::Value>,
375 },
376 AddStream {
378 uri: String,
380 response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
382 },
383 RemoveStream {
385 stream_id: String,
387 },
388 StreamControl {
390 stream_id: String,
392 command: String,
394 params: serde_json::Value,
396 },
397 GetStatus {
399 response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
401 },
402 #[cfg(feature = "custom-protocol")]
404 SendToClient {
405 client_id: String,
407 message: snapcast_proto::CustomMessage,
409 },
410 Stop,
412}
413
414fn default_codec() -> &'static str {
416 #[cfg(feature = "flac")]
417 return "flac";
418 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
419 return "f32lz4";
420 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
421 return "pcm";
422}
423
424pub struct ServerConfig {
426 pub stream_port: u16,
428 pub buffer_ms: u32,
430 pub codec: String,
432 pub sample_format: String,
434 #[cfg(feature = "mdns")]
436 pub mdns_service_type: String,
437 #[cfg(feature = "mdns")]
439 pub mdns_enabled: bool,
440 #[cfg(feature = "mdns")]
442 pub mdns_name: String,
443 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
445 #[cfg(feature = "encryption")]
447 pub encryption_psk: Option<String>,
448 pub state_file: Option<std::path::PathBuf>,
450 pub send_audio_to_muted: bool,
452}
453
454impl Default for ServerConfig {
455 fn default() -> Self {
456 Self {
457 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
458 buffer_ms: 1000,
459 codec: default_codec().into(),
460 sample_format: "48000:16:2".into(),
461 #[cfg(feature = "mdns")]
462 mdns_service_type: "_snapcast._tcp.local.".into(),
463 #[cfg(feature = "mdns")]
464 mdns_enabled: true,
465 #[cfg(feature = "mdns")]
466 mdns_name: "Snapserver".into(),
467 auth: None,
468 #[cfg(feature = "encryption")]
469 encryption_psk: None,
470 state_file: None,
471 send_audio_to_muted: false,
472 }
473 }
474}
475
476#[derive(Debug, Clone, Default)]
478pub struct StreamConfig {
479 pub codec: Option<String>,
481 pub sample_format: Option<String>,
483}
484
485pub struct SnapServer {
487 config: ServerConfig,
488 event_tx: mpsc::Sender<ServerEvent>,
489 command_tx: mpsc::Sender<ServerCommand>,
490 command_rx: Option<mpsc::Receiver<ServerCommand>>,
491 streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
493 chunk_tx: broadcast::Sender<WireChunkData>,
495}
496
497fn spawn_stream_encoder(
502 stream_id: String,
503 mut rx: mpsc::Receiver<AudioFrame>,
504 mut enc: Box<dyn encoder::Encoder>,
505 chunk_tx: broadcast::Sender<WireChunkData>,
506 sample_rate: u32,
507 channels: u16,
508) {
509 std::thread::spawn(move || {
510 let rt = tokio::runtime::Builder::new_current_thread()
511 .enable_time()
512 .build()
513 .expect("encoder runtime");
514
515 rt.block_on(async {
516 let mut next_tick: Option<tokio::time::Instant> = None;
517 while let Some(frame) = rx.recv().await {
518 if let AudioData::F32(ref samples) = frame.data {
520 let num_frames = samples.len() / channels.max(1) as usize;
521 let chunk_dur = std::time::Duration::from_micros(
522 (num_frames as u64 * 1_000_000) / sample_rate as u64,
523 );
524 let now = tokio::time::Instant::now();
525 let tick = next_tick.get_or_insert(now);
526 if now.checked_duration_since(*tick + chunk_dur)
528 > Some(std::time::Duration::from_millis(500))
529 {
530 *tick = now;
531 }
532 *tick += chunk_dur;
533 tokio::time::sleep_until(*tick).await;
534 }
535 match enc.encode(&frame.data) {
536 Ok(encoded) if !encoded.data.is_empty() => {
537 let _ = chunk_tx.send(WireChunkData {
538 stream_id: stream_id.clone(),
539 timestamp_usec: frame.timestamp_usec,
540 data: encoded.data,
541 });
542 }
543 Err(e) => {
544 tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
545 }
546 _ => {} }
548 }
549 });
550 });
551}
552
553impl SnapServer {
555 pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
557 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
558 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
559 let (chunk_tx, _) = broadcast::channel(256);
560 let server = Self {
561 config,
562 event_tx,
563 command_tx,
564 command_rx: Some(command_rx),
565 streams: Vec::new(),
566 chunk_tx,
567 };
568 (server, event_rx)
569 }
570
571 pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
575 self.add_stream_with_config(name, StreamConfig::default())
576 }
577
578 pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
586 let sf: SampleFormat =
587 self.config.sample_format.parse().map_err(|e| {
588 format!("invalid sample_format '{}': {e}", self.config.sample_format)
589 })?;
590 let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
591 self.streams
592 .push((name.to_string(), StreamConfig::default(), rx));
593 Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
594 }
595
596 pub fn add_stream_with_config(
598 &mut self,
599 name: &str,
600 config: StreamConfig,
601 ) -> mpsc::Sender<AudioFrame> {
602 let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
603 self.streams.push((name.to_string(), config, rx));
604 tx
605 }
606
607 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
609 self.command_tx.clone()
610 }
611
612 pub fn config(&self) -> &ServerConfig {
614 &self.config
615 }
616
617 pub async fn run(&mut self) -> anyhow::Result<()> {
619 let mut command_rx = self
620 .command_rx
621 .take()
622 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
623
624 let event_tx = self.event_tx.clone();
625
626 let sample_format: snapcast_proto::SampleFormat = self
627 .config
628 .sample_format
629 .parse()
630 .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
631
632 anyhow::ensure!(
633 !self.streams.is_empty(),
634 "No streams configured — call add_stream() before run()"
635 );
636
637 tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
638
639 #[cfg(feature = "mdns")]
641 let _mdns = if self.config.mdns_enabled {
642 mdns::MdnsAdvertiser::new(
643 self.config.stream_port,
644 &self.config.mdns_service_type,
645 &self.config.mdns_name,
646 )
647 .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
648 .ok()
649 } else {
650 None
651 };
652
653 let default_enc_config = encoder::EncoderConfig {
655 codec: self.config.codec.clone(),
656 format: sample_format,
657 options: String::new(),
658 #[cfg(feature = "encryption")]
659 encryption_psk: self.config.encryption_psk.clone(),
660 };
661 let default_enc = encoder::create(&default_enc_config)?;
662
663 let chunk_tx = self.chunk_tx.clone();
665 let streams = std::mem::take(&mut self.streams);
666 let mut default_enc = Some(default_enc);
667
668 let initial_state = self
670 .config
671 .state_file
672 .as_ref()
673 .map(|p| state::ServerState::load(p))
674 .unwrap_or_default();
675 let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
676
677 let first_name = streams
680 .first()
681 .map(|(n, _, _)| n.clone())
682 .unwrap_or_default();
683 let session_srv = Arc::new(session::SessionServer::new(
684 self.config.stream_port,
685 self.config.buffer_ms as i32,
686 self.config.auth.clone(),
687 Arc::clone(&shared_state),
688 first_name.clone(),
689 self.config.send_audio_to_muted,
690 ));
691
692 for (name, stream_cfg, rx) in streams {
693 {
694 let mut s = shared_state.lock().await;
695 s.streams.push(state::StreamInfo {
696 id: name.clone(),
697 status: "idle".into(),
698 uri: String::new(),
699 properties: Default::default(),
700 });
701 }
702 let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
703 if let Some(enc) = default_enc.take() {
704 enc
705 } else {
706 encoder::create(&default_enc_config)?
707 }
708 } else {
709 let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
710 let stream_format: snapcast_proto::SampleFormat = stream_cfg
711 .sample_format
712 .as_deref()
713 .and_then(|s| s.parse().ok())
714 .unwrap_or(sample_format);
715 encoder::create(&encoder::EncoderConfig {
716 codec: stream_codec.to_string(),
717 format: stream_format,
718 options: String::new(),
719 #[cfg(feature = "encryption")]
720 encryption_psk: self.config.encryption_psk.clone(),
721 })?
722 };
723 tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
724 session_srv
725 .register_stream_codec(&name, enc.name(), enc.header())
726 .await;
727 spawn_stream_encoder(
728 name,
729 rx,
730 enc,
731 chunk_tx.clone(),
732 sample_format.rate(),
733 sample_format.channels(),
734 );
735 }
736
737 let session_for_run = Arc::clone(&session_srv);
738 let session_event_tx = event_tx.clone();
739 let session_chunk_tx = self.chunk_tx.clone();
740 let session_handle = tokio::spawn(async move {
741 if let Err(e) = session_for_run
742 .run(session_chunk_tx, session_event_tx)
743 .await
744 {
745 tracing::error!(error = %e, "Session server error");
746 }
747 });
748
749 let state_file = self.config.state_file.clone();
750 let save_state = |s: &state::ServerState| {
751 if let Some(ref path) = state_file {
752 let _ = s
753 .save(path)
754 .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
755 }
756 };
757
758 loop {
760 tokio::select! {
761 cmd = command_rx.recv() => {
762 match cmd {
763 Some(ServerCommand::Stop) | None => {
764 tracing::info!("Server stopped");
765 session_handle.abort();
766 return Ok(());
767 }
768 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
769 let mut s = shared_state.lock().await;
770 if let Some(c) = s.clients.get_mut(&client_id) {
771 c.config.volume.percent = volume;
772 c.config.volume.muted = muted;
773 }
774 let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
775 save_state(&s);
776 drop(s);
777 session_srv.push_settings(ClientSettingsUpdate {
778 client_id: client_id.clone(),
779 buffer_ms: self.config.buffer_ms as i32,
780 latency, volume, muted,
781 }).await;
782 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
783 session_srv.update_routing_for_client(&client_id).await;
784 }
785 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
786 let mut s = shared_state.lock().await;
787 if let Some(c) = s.clients.get_mut(&client_id) {
788 c.config.latency = latency;
789 session_srv.push_settings(ClientSettingsUpdate {
790 client_id: client_id.clone(),
791 buffer_ms: self.config.buffer_ms as i32,
792 latency,
793 volume: c.config.volume.percent,
794 muted: c.config.volume.muted,
795 }).await;
796 }
797 save_state(&s);
798 drop(s);
799 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
800 }
801 Some(ServerCommand::SetClientName { client_id, name }) => {
802 let mut s = shared_state.lock().await;
803 if let Some(c) = s.clients.get_mut(&client_id) {
804 c.config.name = name.clone();
805 }
806 save_state(&s);
807 drop(s);
808 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
809 }
810 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
811 let mut s = shared_state.lock().await;
812 s.set_group_stream(&group_id, &stream_id);
813 save_state(&s);
814 drop(s);
815 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
816 session_srv.update_routing_for_group(&group_id).await;
817 }
818 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
819 let mut s = shared_state.lock().await;
820 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
821 g.muted = muted;
822 }
823 save_state(&s);
824 drop(s);
825 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
826 session_srv.update_routing_for_group(&group_id).await;
827 }
828 Some(ServerCommand::SetGroupName { group_id, name }) => {
829 let mut s = shared_state.lock().await;
830 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
831 g.name = name.clone();
832 }
833 save_state(&s);
834 drop(s);
835 let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
836 }
837 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
838 let mut s = shared_state.lock().await;
839 s.set_group_clients(&group_id, &clients);
840 save_state(&s);
841 drop(s);
842 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
844 session_srv.update_routing_all().await;
845 }
846 Some(ServerCommand::DeleteClient { client_id }) => {
847 let mut s = shared_state.lock().await;
848 s.remove_client_from_groups(&client_id);
849 s.clients.remove(&client_id);
850 save_state(&s);
851 drop(s);
852 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
853 session_srv.update_routing_all().await;
854 }
855 Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
856 let mut s = shared_state.lock().await;
857 if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
858 stream.properties = metadata.clone();
859 }
860 drop(s);
861 let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
862 }
863 Some(ServerCommand::AddStream { uri, response_tx }) => {
864 let name = uri.split("name=").nth(1)
866 .and_then(|s| s.split('&').next())
867 .unwrap_or("dynamic")
868 .to_string();
869 let mut s = shared_state.lock().await;
870 if s.streams.iter().any(|st| st.id == name) {
871 let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
872 } else {
873 s.streams.push(state::StreamInfo {
874 id: name.clone(),
875 status: "idle".into(),
876 uri: uri.clone(),
877 properties: Default::default(),
878 });
879 save_state(&s);
880 drop(s);
881 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
882 let _ = response_tx.send(Ok(name));
883 }
884 }
885 Some(ServerCommand::RemoveStream { stream_id }) => {
886 let mut s = shared_state.lock().await;
887 s.streams.retain(|st| st.id != stream_id);
888 for g in &mut s.groups {
890 if g.stream_id == stream_id {
891 g.stream_id.clear();
892 }
893 }
894 save_state(&s);
895 drop(s);
896 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
897 session_srv.update_routing_all().await;
898 }
899 Some(ServerCommand::StreamControl { stream_id, command, params }) => {
900 tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
901 let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
903 }
904 Some(ServerCommand::GetStatus { response_tx }) => {
905 let s = shared_state.lock().await;
906 let _ = response_tx.send(s.to_status());
907 }
908 #[cfg(feature = "custom-protocol")]
909 Some(ServerCommand::SendToClient { client_id, message }) => {
910 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
911 }
912 }
913 }
914 }
915 }
916 }
917}