1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62const F32_CHANNEL_SIZE: usize = 1;
64
65#[derive(Debug, Clone)]
67pub enum AudioData {
68 F32(Vec<f32>),
71 Pcm(Vec<u8>),
74}
75
76#[derive(Debug, Clone)]
78pub struct AudioFrame {
79 pub data: AudioData,
81 pub timestamp_usec: i64,
83}
84
85pub struct F32AudioSender {
92 tx: mpsc::Sender<AudioFrame>,
93 buf: Vec<f32>,
94 chunk_samples: usize,
95 channels: u16,
96 sample_rate: u32,
97 ts: Option<time::ChunkTimestamper>,
98 last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102 fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103 let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104 Self {
105 tx,
106 buf: Vec::with_capacity(chunk_samples * 2),
107 chunk_samples,
108 channels,
109 sample_rate,
110 ts: None,
111 last_send: std::time::Instant::now(),
112 }
113 }
114
115 pub async fn send(
118 &mut self,
119 samples: &[f32],
120 ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121 let now = std::time::Instant::now();
122 if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123 self.ts = None;
124 self.buf.clear();
125 }
126 self.last_send = now;
127
128 self.buf.extend_from_slice(samples);
129 let ch = self.channels.max(1) as usize;
130 while self.buf.len() >= self.chunk_samples {
131 let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132 let frames = (self.chunk_samples / ch) as u32;
133 let ts = self
134 .ts
135 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136 let timestamp_usec = ts.next(frames);
137 self.tx
138 .send(AudioFrame {
139 data: AudioData::F32(chunk),
140 timestamp_usec,
141 })
142 .await?;
143 }
144 Ok(())
145 }
146
147 pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150 if self.buf.is_empty() {
151 return Ok(());
152 }
153 let chunk: Vec<f32> = self.buf.drain(..).collect();
154 let ch = self.channels.max(1) as usize;
155 let frames = (chunk.len() / ch) as u32;
156 let ts = self
157 .ts
158 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159 let timestamp_usec = ts.next(frames);
160 self.tx
161 .send(AudioFrame {
162 data: AudioData::F32(chunk),
163 timestamp_usec,
164 })
165 .await
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct WireChunkData {
172 pub stream_id: String,
174 pub timestamp_usec: i64,
176 pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184#[cfg(feature = "mdns")]
185pub(crate) mod mdns;
186pub(crate) mod session;
187pub(crate) mod state;
188pub mod status;
189pub(crate) mod stream;
190pub mod time;
191
192#[derive(Debug, Clone)]
194pub struct ClientSettingsUpdate {
195 pub client_id: String,
197 pub buffer_ms: i32,
199 pub latency: i32,
201 pub volume: u16,
203 pub muted: bool,
205}
206
207#[derive(Debug)]
209#[non_exhaustive]
210pub enum ServerEvent {
211 ClientConnected {
213 id: String,
215 hello: snapcast_proto::message::hello::Hello,
217 },
218 ClientDisconnected {
220 id: String,
222 },
223 ClientVolumeChanged {
225 client_id: String,
227 volume: u16,
229 muted: bool,
231 },
232 ClientLatencyChanged {
234 client_id: String,
236 latency: i32,
238 },
239 ClientNameChanged {
241 client_id: String,
243 name: String,
245 },
246 GroupStreamChanged {
248 group_id: String,
250 stream_id: String,
252 },
253 GroupMuteChanged {
255 group_id: String,
257 muted: bool,
259 },
260 StreamStatus {
262 stream_id: String,
264 status: String,
266 },
267 StreamMetaChanged {
269 stream_id: String,
271 metadata: std::collections::HashMap<String, serde_json::Value>,
273 },
274 GroupNameChanged {
276 group_id: String,
278 name: String,
280 },
281 ServerUpdated,
287 StreamControl {
291 stream_id: String,
293 command: String,
295 params: serde_json::Value,
297 },
298 #[cfg(feature = "custom-protocol")]
300 CustomMessage {
301 client_id: String,
303 message: snapcast_proto::CustomMessage,
305 },
306}
307
308#[derive(Debug)]
310#[non_exhaustive]
311pub enum ServerCommand {
312 SetClientVolume {
314 client_id: String,
316 volume: u16,
318 muted: bool,
320 },
321 SetClientLatency {
323 client_id: String,
325 latency: i32,
327 },
328 SetClientName {
330 client_id: String,
332 name: String,
334 },
335 SetGroupStream {
337 group_id: String,
339 stream_id: String,
341 },
342 SetGroupMute {
344 group_id: String,
346 muted: bool,
348 },
349 SetGroupName {
351 group_id: String,
353 name: String,
355 },
356 SetGroupClients {
358 group_id: String,
360 clients: Vec<String>,
362 },
363 DeleteClient {
365 client_id: String,
367 },
368 SetStreamMeta {
370 stream_id: String,
372 metadata: std::collections::HashMap<String, serde_json::Value>,
374 },
375 AddStream {
377 uri: String,
379 response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
381 },
382 RemoveStream {
384 stream_id: String,
386 },
387 StreamControl {
389 stream_id: String,
391 command: String,
393 params: serde_json::Value,
395 },
396 GetStatus {
398 response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
400 },
401 #[cfg(feature = "custom-protocol")]
403 SendToClient {
404 client_id: String,
406 message: snapcast_proto::CustomMessage,
408 },
409 Stop,
411}
412
413fn default_codec() -> &'static str {
415 #[cfg(feature = "flac")]
416 return "flac";
417 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
418 return "f32lz4";
419 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
420 return "pcm";
421}
422
423pub struct ServerConfig {
425 pub stream_port: u16,
427 pub buffer_ms: u32,
429 pub codec: String,
431 pub sample_format: String,
433 #[cfg(feature = "mdns")]
435 pub mdns_service_type: String,
436 #[cfg(feature = "mdns")]
438 pub mdns_enabled: bool,
439 #[cfg(feature = "mdns")]
441 pub mdns_name: String,
442 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
444 pub client_filter: Option<std::sync::Arc<dyn auth::ClientFilter>>,
447 #[cfg(feature = "encryption")]
449 pub encryption_psk: Option<String>,
450 pub state_file: Option<std::path::PathBuf>,
452 pub send_audio_to_muted: bool,
454}
455
456impl Default for ServerConfig {
457 fn default() -> Self {
458 Self {
459 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
460 buffer_ms: 1000,
461 codec: default_codec().into(),
462 sample_format: "48000:16:2".into(),
463 #[cfg(feature = "mdns")]
464 mdns_service_type: "_snapcast._tcp.local.".into(),
465 #[cfg(feature = "mdns")]
466 mdns_enabled: true,
467 #[cfg(feature = "mdns")]
468 mdns_name: "Snapserver".into(),
469 auth: None,
470 client_filter: None,
471 #[cfg(feature = "encryption")]
472 encryption_psk: None,
473 state_file: None,
474 send_audio_to_muted: false,
475 }
476 }
477}
478
479#[derive(Debug, Clone, Default)]
481pub struct StreamConfig {
482 pub codec: Option<String>,
484 pub sample_format: Option<String>,
486}
487
488pub struct SnapServer {
490 config: ServerConfig,
491 event_tx: mpsc::Sender<ServerEvent>,
492 command_tx: mpsc::Sender<ServerCommand>,
493 command_rx: Option<mpsc::Receiver<ServerCommand>>,
494 streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
496 chunk_tx: broadcast::Sender<WireChunkData>,
498}
499
500fn spawn_stream_encoder(
505 stream_id: String,
506 mut rx: mpsc::Receiver<AudioFrame>,
507 mut enc: Box<dyn encoder::Encoder>,
508 chunk_tx: broadcast::Sender<WireChunkData>,
509 sample_rate: u32,
510 channels: u16,
511) {
512 std::thread::spawn(move || {
513 let rt = tokio::runtime::Builder::new_current_thread()
514 .enable_time()
515 .build()
516 .expect("encoder runtime");
517
518 rt.block_on(async {
519 let mut next_tick: Option<tokio::time::Instant> = None;
520 while let Some(frame) = rx.recv().await {
521 if let AudioData::F32(ref samples) = frame.data {
523 let num_frames = samples.len() / channels.max(1) as usize;
524 let chunk_dur = std::time::Duration::from_micros(
525 (num_frames as u64 * 1_000_000) / sample_rate as u64,
526 );
527 let now = tokio::time::Instant::now();
528 let tick = next_tick.get_or_insert(now);
529 if now.checked_duration_since(*tick + chunk_dur)
531 > Some(std::time::Duration::from_millis(500))
532 {
533 *tick = now;
534 }
535 *tick += chunk_dur;
536 tokio::time::sleep_until(*tick).await;
537 }
538 match enc.encode(&frame.data) {
539 Ok(encoded) if !encoded.data.is_empty() => {
540 let _ = chunk_tx.send(WireChunkData {
541 stream_id: stream_id.clone(),
542 timestamp_usec: frame.timestamp_usec,
543 data: encoded.data,
544 });
545 }
546 Err(e) => {
547 tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
548 }
549 _ => {} }
551 }
552 });
553 });
554}
555
556impl SnapServer {
558 pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
560 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
561 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
562 let (chunk_tx, _) = broadcast::channel(256);
563 let server = Self {
564 config,
565 event_tx,
566 command_tx,
567 command_rx: Some(command_rx),
568 streams: Vec::new(),
569 chunk_tx,
570 };
571 (server, event_rx)
572 }
573
574 pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
578 self.add_stream_with_config(name, StreamConfig::default())
579 }
580
581 pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
589 let sf: SampleFormat =
590 self.config.sample_format.parse().map_err(|e| {
591 format!("invalid sample_format '{}': {e}", self.config.sample_format)
592 })?;
593 let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
594 self.streams
595 .push((name.to_string(), StreamConfig::default(), rx));
596 Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
597 }
598
599 pub fn add_stream_with_config(
601 &mut self,
602 name: &str,
603 config: StreamConfig,
604 ) -> mpsc::Sender<AudioFrame> {
605 let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
606 self.streams.push((name.to_string(), config, rx));
607 tx
608 }
609
610 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
612 self.command_tx.clone()
613 }
614
615 pub fn config(&self) -> &ServerConfig {
617 &self.config
618 }
619
620 pub async fn run(&mut self) -> anyhow::Result<()> {
622 let mut command_rx = self
623 .command_rx
624 .take()
625 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
626
627 let event_tx = self.event_tx.clone();
628
629 let sample_format: snapcast_proto::SampleFormat = self
630 .config
631 .sample_format
632 .parse()
633 .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
634
635 anyhow::ensure!(
636 !self.streams.is_empty(),
637 "No streams configured — call add_stream() before run()"
638 );
639
640 tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
641
642 #[cfg(feature = "mdns")]
644 let _mdns = if self.config.mdns_enabled {
645 mdns::MdnsAdvertiser::new(
646 self.config.stream_port,
647 &self.config.mdns_service_type,
648 &self.config.mdns_name,
649 )
650 .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
651 .ok()
652 } else {
653 None
654 };
655
656 let default_enc_config = encoder::EncoderConfig {
658 codec: self.config.codec.clone(),
659 format: sample_format,
660 options: String::new(),
661 #[cfg(feature = "encryption")]
662 encryption_psk: self.config.encryption_psk.clone(),
663 };
664 let default_enc = encoder::create(&default_enc_config)?;
665
666 let chunk_tx = self.chunk_tx.clone();
668 let streams = std::mem::take(&mut self.streams);
669 let mut default_enc = Some(default_enc);
670
671 let initial_state = self
673 .config
674 .state_file
675 .as_ref()
676 .map(|p| state::ServerState::load(p))
677 .unwrap_or_default();
678 let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
679
680 let first_name = streams
683 .first()
684 .map(|(n, _, _)| n.clone())
685 .unwrap_or_default();
686 let session_srv = Arc::new(session::SessionServer::new(
687 self.config.stream_port,
688 self.config.buffer_ms as i32,
689 self.config.auth.clone(),
690 self.config.client_filter.clone(),
691 Arc::clone(&shared_state),
692 first_name.clone(),
693 self.config.send_audio_to_muted,
694 ));
695
696 for (name, stream_cfg, rx) in streams {
697 {
698 let mut s = shared_state.lock().await;
699 if !s.streams.iter().any(|existing| existing.id == name) {
700 s.streams.push(state::StreamInfo {
701 id: name.clone(),
702 status: "idle".into(),
703 uri: String::new(),
704 properties: Default::default(),
705 });
706 }
707 }
708 let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
709 if let Some(enc) = default_enc.take() {
710 enc
711 } else {
712 encoder::create(&default_enc_config)?
713 }
714 } else {
715 let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
716 let stream_format: snapcast_proto::SampleFormat = stream_cfg
717 .sample_format
718 .as_deref()
719 .and_then(|s| s.parse().ok())
720 .unwrap_or(sample_format);
721 encoder::create(&encoder::EncoderConfig {
722 codec: stream_codec.to_string(),
723 format: stream_format,
724 options: String::new(),
725 #[cfg(feature = "encryption")]
726 encryption_psk: self.config.encryption_psk.clone(),
727 })?
728 };
729 tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
730 session_srv
731 .register_stream_codec(&name, enc.name(), enc.header())
732 .await;
733 spawn_stream_encoder(
734 name,
735 rx,
736 enc,
737 chunk_tx.clone(),
738 sample_format.rate(),
739 sample_format.channels(),
740 );
741 }
742
743 let session_for_run = Arc::clone(&session_srv);
744 let session_event_tx = event_tx.clone();
745 let session_chunk_tx = self.chunk_tx.clone();
746 let session_handle = tokio::spawn(async move {
747 if let Err(e) = session_for_run
748 .run(session_chunk_tx, session_event_tx)
749 .await
750 {
751 tracing::error!(error = %e, "Session server error");
752 }
753 });
754
755 let state_file = self.config.state_file.clone();
756 let save_state = |s: &state::ServerState| {
757 if let Some(ref path) = state_file {
758 let _ = s
759 .save(path)
760 .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
761 }
762 };
763
764 loop {
766 tokio::select! {
767 cmd = command_rx.recv() => {
768 match cmd {
769 Some(ServerCommand::Stop) | None => {
770 tracing::info!("Server stopped");
771 session_handle.abort();
772 return Ok(());
773 }
774 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
775 let mut s = shared_state.lock().await;
776 if let Some(c) = s.clients.get_mut(&client_id) {
777 c.config.volume.percent = volume;
778 c.config.volume.muted = muted;
779 }
780 let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
781 save_state(&s);
782 drop(s);
783 session_srv.push_settings(ClientSettingsUpdate {
784 client_id: client_id.clone(),
785 buffer_ms: self.config.buffer_ms as i32,
786 latency, volume, muted,
787 }).await;
788 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
789 session_srv.update_routing_for_client(&client_id).await;
790 }
791 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
792 let mut s = shared_state.lock().await;
793 if let Some(c) = s.clients.get_mut(&client_id) {
794 c.config.latency = latency;
795 session_srv.push_settings(ClientSettingsUpdate {
796 client_id: client_id.clone(),
797 buffer_ms: self.config.buffer_ms as i32,
798 latency,
799 volume: c.config.volume.percent,
800 muted: c.config.volume.muted,
801 }).await;
802 }
803 save_state(&s);
804 drop(s);
805 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
806 }
807 Some(ServerCommand::SetClientName { client_id, name }) => {
808 let mut s = shared_state.lock().await;
809 if let Some(c) = s.clients.get_mut(&client_id) {
810 c.config.name = name.clone();
811 }
812 save_state(&s);
813 drop(s);
814 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
815 }
816 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
817 let mut s = shared_state.lock().await;
818 s.set_group_stream(&group_id, &stream_id);
819 save_state(&s);
820 drop(s);
821 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
822 session_srv.update_routing_for_group(&group_id).await;
823 }
824 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
825 let mut s = shared_state.lock().await;
826 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
827 g.muted = muted;
828 }
829 save_state(&s);
830 drop(s);
831 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
832 session_srv.update_routing_for_group(&group_id).await;
833 }
834 Some(ServerCommand::SetGroupName { group_id, name }) => {
835 let mut s = shared_state.lock().await;
836 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
837 g.name = name.clone();
838 }
839 save_state(&s);
840 drop(s);
841 let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
842 }
843 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
844 let mut s = shared_state.lock().await;
845 s.set_group_clients(&group_id, &clients);
846 save_state(&s);
847 drop(s);
848 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
850 session_srv.update_routing_all().await;
851 }
852 Some(ServerCommand::DeleteClient { client_id }) => {
853 let mut s = shared_state.lock().await;
854 s.remove_client_from_groups(&client_id);
855 s.clients.remove(&client_id);
856 save_state(&s);
857 drop(s);
858 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
859 session_srv.update_routing_all().await;
860 }
861 Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
862 let mut s = shared_state.lock().await;
863 if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
864 stream.properties = metadata.clone();
865 }
866 drop(s);
867 let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
868 }
869 Some(ServerCommand::AddStream { uri, response_tx }) => {
870 let name = uri.split("name=").nth(1)
872 .and_then(|s| s.split('&').next())
873 .unwrap_or("dynamic")
874 .to_string();
875 let mut s = shared_state.lock().await;
876 if s.streams.iter().any(|st| st.id == name) {
877 let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
878 } else {
879 s.streams.push(state::StreamInfo {
880 id: name.clone(),
881 status: "idle".into(),
882 uri: uri.clone(),
883 properties: Default::default(),
884 });
885 save_state(&s);
886 drop(s);
887 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
888 let _ = response_tx.send(Ok(name));
889 }
890 }
891 Some(ServerCommand::RemoveStream { stream_id }) => {
892 let mut s = shared_state.lock().await;
893 s.streams.retain(|st| st.id != stream_id);
894 for g in &mut s.groups {
896 if g.stream_id == stream_id {
897 g.stream_id.clear();
898 }
899 }
900 save_state(&s);
901 drop(s);
902 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
903 session_srv.update_routing_all().await;
904 }
905 Some(ServerCommand::StreamControl { stream_id, command, params }) => {
906 tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
907 let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
909 }
910 Some(ServerCommand::GetStatus { response_tx }) => {
911 let s = shared_state.lock().await;
912 let _ = response_tx.send(s.to_status());
913 }
914 #[cfg(feature = "custom-protocol")]
915 Some(ServerCommand::SendToClient { client_id, message }) => {
916 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
917 }
918 }
919 }
920 }
921 }
922 }
923}