1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62const F32_CHANNEL_SIZE: usize = 1;
64
65#[derive(Debug, Clone)]
67pub enum AudioData {
68 F32(Vec<f32>),
71 Pcm(Vec<u8>),
74}
75
76#[derive(Debug, Clone)]
78pub struct AudioFrame {
79 pub data: AudioData,
81 pub timestamp_usec: i64,
83}
84
85pub struct F32AudioSender {
92 tx: mpsc::Sender<AudioFrame>,
93 buf: Vec<f32>,
94 chunk_samples: usize,
95 channels: u16,
96 sample_rate: u32,
97 ts: Option<time::ChunkTimestamper>,
98 last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102 fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103 let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104 Self {
105 tx,
106 buf: Vec::with_capacity(chunk_samples * 2),
107 chunk_samples,
108 channels,
109 sample_rate,
110 ts: None,
111 last_send: std::time::Instant::now(),
112 }
113 }
114
115 pub async fn send(
118 &mut self,
119 samples: &[f32],
120 ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121 let now = std::time::Instant::now();
122 if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123 self.ts = None;
124 self.buf.clear();
125 }
126 self.last_send = now;
127
128 self.buf.extend_from_slice(samples);
129 let ch = self.channels.max(1) as usize;
130 while self.buf.len() >= self.chunk_samples {
131 let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132 let frames = (self.chunk_samples / ch) as u32;
133 let ts = self
134 .ts
135 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136 let timestamp_usec = ts.next(frames);
137 self.tx
138 .send(AudioFrame {
139 data: AudioData::F32(chunk),
140 timestamp_usec,
141 })
142 .await?;
143 }
144 Ok(())
145 }
146
147 pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150 if self.buf.is_empty() {
151 return Ok(());
152 }
153 let chunk: Vec<f32> = self.buf.drain(..).collect();
154 let ch = self.channels.max(1) as usize;
155 let frames = (chunk.len() / ch) as u32;
156 let ts = self
157 .ts
158 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159 let timestamp_usec = ts.next(frames);
160 self.tx
161 .send(AudioFrame {
162 data: AudioData::F32(chunk),
163 timestamp_usec,
164 })
165 .await
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct WireChunkData {
172 pub stream_id: String,
174 pub timestamp_usec: i64,
176 pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184#[cfg(feature = "mdns")]
185pub(crate) mod mdns;
186pub(crate) mod session;
187pub(crate) mod state;
188pub mod status;
189pub(crate) mod stream;
190pub mod time;
191
192#[derive(Debug, Clone)]
194pub struct ClientSettingsUpdate {
195 pub client_id: String,
197 pub buffer_ms: i32,
199 pub latency: i32,
201 pub volume: u16,
203 pub muted: bool,
205}
206
207#[derive(Debug)]
209#[non_exhaustive]
210pub enum ServerEvent {
211 ClientConnected {
213 id: String,
215 hello: snapcast_proto::message::hello::Hello,
217 },
218 ClientDisconnected {
220 id: String,
222 },
223 ClientVolumeChanged {
225 client_id: String,
227 volume: u16,
229 muted: bool,
231 },
232 ClientLatencyChanged {
234 client_id: String,
236 latency: i32,
238 },
239 ClientNameChanged {
241 client_id: String,
243 name: String,
245 },
246 GroupStreamChanged {
248 group_id: String,
250 stream_id: String,
252 },
253 GroupMuteChanged {
255 group_id: String,
257 muted: bool,
259 },
260 StreamStatus {
262 stream_id: String,
264 status: String,
266 },
267 StreamMetaChanged {
269 stream_id: String,
271 metadata: std::collections::HashMap<String, serde_json::Value>,
273 },
274 GroupNameChanged {
276 group_id: String,
278 name: String,
280 },
281 ServerUpdated,
287 StreamControl {
291 stream_id: String,
293 command: String,
295 params: serde_json::Value,
297 },
298 #[cfg(feature = "custom-protocol")]
300 CustomMessage {
301 client_id: String,
303 message: snapcast_proto::CustomMessage,
305 },
306}
307
308#[derive(Debug)]
310#[non_exhaustive]
311pub enum ServerCommand {
312 SetClientVolume {
314 client_id: String,
316 volume: u16,
318 muted: bool,
320 },
321 SetClientLatency {
323 client_id: String,
325 latency: i32,
327 },
328 SetClientName {
330 client_id: String,
332 name: String,
334 },
335 SetGroupStream {
337 group_id: String,
339 stream_id: String,
341 },
342 SetGroupMute {
344 group_id: String,
346 muted: bool,
348 },
349 SetGroupName {
351 group_id: String,
353 name: String,
355 },
356 SetGroupClients {
358 group_id: String,
360 clients: Vec<String>,
362 },
363 DeleteClient {
365 client_id: String,
367 },
368 SetStreamMeta {
370 stream_id: String,
372 metadata: std::collections::HashMap<String, serde_json::Value>,
374 },
375 AddStream {
377 uri: String,
379 response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
381 },
382 RemoveStream {
384 stream_id: String,
386 },
387 StreamControl {
389 stream_id: String,
391 command: String,
393 params: serde_json::Value,
395 },
396 GetStatus {
398 response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
400 },
401 #[cfg(feature = "custom-protocol")]
403 SendToClient {
404 client_id: String,
406 message: snapcast_proto::CustomMessage,
408 },
409 Stop,
411}
412
413fn default_codec() -> &'static str {
415 #[cfg(feature = "flac")]
416 return "flac";
417 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
418 return "f32lz4";
419 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
420 return "pcm";
421}
422
423pub struct ServerConfig {
425 pub stream_port: u16,
427 pub buffer_ms: u32,
429 pub codec: String,
431 pub sample_format: String,
433 #[cfg(feature = "mdns")]
435 pub mdns_service_type: String,
436 #[cfg(feature = "mdns")]
438 pub mdns_enabled: bool,
439 #[cfg(feature = "mdns")]
441 pub mdns_name: String,
442 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
444 pub client_filter: Option<std::sync::Arc<dyn auth::ClientFilter>>,
447 #[cfg(feature = "encryption")]
449 pub encryption_psk: Option<String>,
450 pub state_file: Option<std::path::PathBuf>,
452 pub send_audio_to_muted: bool,
454}
455
456impl Default for ServerConfig {
457 fn default() -> Self {
458 Self {
459 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
460 buffer_ms: 1000,
461 codec: default_codec().into(),
462 sample_format: "48000:16:2".into(),
463 #[cfg(feature = "mdns")]
464 mdns_service_type: "_snapcast._tcp.local.".into(),
465 #[cfg(feature = "mdns")]
466 mdns_enabled: true,
467 #[cfg(feature = "mdns")]
468 mdns_name: "Snapserver".into(),
469 auth: None,
470 client_filter: None,
471 #[cfg(feature = "encryption")]
472 encryption_psk: None,
473 state_file: None,
474 send_audio_to_muted: false,
475 }
476 }
477}
478
479#[derive(Debug, Clone, Default)]
481pub struct StreamConfig {
482 pub codec: Option<String>,
484 pub sample_format: Option<String>,
486}
487
488pub struct SnapServer {
490 config: ServerConfig,
491 event_tx: mpsc::Sender<ServerEvent>,
492 command_tx: mpsc::Sender<ServerCommand>,
493 command_rx: Option<mpsc::Receiver<ServerCommand>>,
494 streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
496 chunk_tx: broadcast::Sender<WireChunkData>,
498}
499
500fn spawn_stream_encoder(
505 stream_id: String,
506 mut rx: mpsc::Receiver<AudioFrame>,
507 mut enc: Box<dyn encoder::Encoder>,
508 chunk_tx: broadcast::Sender<WireChunkData>,
509 sample_rate: u32,
510 channels: u16,
511) {
512 std::thread::spawn(move || {
513 let rt = tokio::runtime::Builder::new_current_thread()
514 .enable_time()
515 .build()
516 .expect("encoder runtime");
517
518 rt.block_on(async {
519 let mut next_tick: Option<tokio::time::Instant> = None;
520 while let Some(frame) = rx.recv().await {
521 if let AudioData::F32(ref samples) = frame.data {
523 let num_frames = samples.len() / channels.max(1) as usize;
524 let chunk_dur = std::time::Duration::from_micros(
525 (num_frames as u64 * 1_000_000) / sample_rate as u64,
526 );
527 let now = tokio::time::Instant::now();
528 let tick = next_tick.get_or_insert(now);
529 if now.checked_duration_since(*tick + chunk_dur)
531 > Some(std::time::Duration::from_millis(500))
532 {
533 *tick = now;
534 }
535 *tick += chunk_dur;
536 tokio::time::sleep_until(*tick).await;
537 }
538 match enc.encode(&frame.data) {
539 Ok(encoded) if !encoded.data.is_empty() => {
540 let _ = chunk_tx.send(WireChunkData {
541 stream_id: stream_id.clone(),
542 timestamp_usec: frame.timestamp_usec,
543 data: encoded.data,
544 });
545 }
546 Err(e) => {
547 tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
548 }
549 _ => {} }
551 }
552 });
553 });
554}
555
556impl SnapServer {
558 pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
560 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
561 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
562 let (chunk_tx, _) = broadcast::channel(256);
563 let server = Self {
564 config,
565 event_tx,
566 command_tx,
567 command_rx: Some(command_rx),
568 streams: Vec::new(),
569 chunk_tx,
570 };
571 (server, event_rx)
572 }
573
574 pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
578 self.add_stream_with_config(name, StreamConfig::default())
579 }
580
581 pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
589 let sf: SampleFormat =
590 self.config.sample_format.parse().map_err(|e| {
591 format!("invalid sample_format '{}': {e}", self.config.sample_format)
592 })?;
593 let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
594 self.streams
595 .push((name.to_string(), StreamConfig::default(), rx));
596 Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
597 }
598
599 pub fn add_stream_with_config(
601 &mut self,
602 name: &str,
603 config: StreamConfig,
604 ) -> mpsc::Sender<AudioFrame> {
605 let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
606 self.streams.push((name.to_string(), config, rx));
607 tx
608 }
609
610 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
612 self.command_tx.clone()
613 }
614
615 pub fn config(&self) -> &ServerConfig {
617 &self.config
618 }
619
620 pub async fn run(&mut self) -> anyhow::Result<()> {
622 let mut command_rx = self
623 .command_rx
624 .take()
625 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
626
627 let event_tx = self.event_tx.clone();
628
629 let sample_format: snapcast_proto::SampleFormat = self
630 .config
631 .sample_format
632 .parse()
633 .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
634
635 anyhow::ensure!(
636 !self.streams.is_empty(),
637 "No streams configured — call add_stream() before run()"
638 );
639
640 tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
641
642 #[cfg(feature = "mdns")]
644 let _mdns = if self.config.mdns_enabled {
645 mdns::MdnsAdvertiser::new(
646 self.config.stream_port,
647 &self.config.mdns_service_type,
648 &self.config.mdns_name,
649 )
650 .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
651 .ok()
652 } else {
653 None
654 };
655
656 let default_enc_config = encoder::EncoderConfig {
658 codec: self.config.codec.clone(),
659 format: sample_format,
660 options: String::new(),
661 #[cfg(feature = "encryption")]
662 encryption_psk: self.config.encryption_psk.clone(),
663 };
664 let default_enc = encoder::create(&default_enc_config)?;
665
666 let chunk_tx = self.chunk_tx.clone();
668 let streams = std::mem::take(&mut self.streams);
669 let mut default_enc = Some(default_enc);
670
671 let initial_state = self
673 .config
674 .state_file
675 .as_ref()
676 .map(|p| state::ServerState::load(p))
677 .unwrap_or_default();
678 let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
679
680 let first_name = streams
683 .first()
684 .map(|(n, _, _)| n.clone())
685 .unwrap_or_default();
686 let session_srv = Arc::new(session::SessionServer::new(
687 self.config.stream_port,
688 self.config.buffer_ms as i32,
689 self.config.auth.clone(),
690 self.config.client_filter.clone(),
691 Arc::clone(&shared_state),
692 first_name.clone(),
693 self.config.send_audio_to_muted,
694 ));
695
696 for (name, stream_cfg, rx) in streams {
697 {
698 let mut s = shared_state.lock().await;
699 s.streams.push(state::StreamInfo {
700 id: name.clone(),
701 status: "idle".into(),
702 uri: String::new(),
703 properties: Default::default(),
704 });
705 }
706 let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
707 if let Some(enc) = default_enc.take() {
708 enc
709 } else {
710 encoder::create(&default_enc_config)?
711 }
712 } else {
713 let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
714 let stream_format: snapcast_proto::SampleFormat = stream_cfg
715 .sample_format
716 .as_deref()
717 .and_then(|s| s.parse().ok())
718 .unwrap_or(sample_format);
719 encoder::create(&encoder::EncoderConfig {
720 codec: stream_codec.to_string(),
721 format: stream_format,
722 options: String::new(),
723 #[cfg(feature = "encryption")]
724 encryption_psk: self.config.encryption_psk.clone(),
725 })?
726 };
727 tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
728 session_srv
729 .register_stream_codec(&name, enc.name(), enc.header())
730 .await;
731 spawn_stream_encoder(
732 name,
733 rx,
734 enc,
735 chunk_tx.clone(),
736 sample_format.rate(),
737 sample_format.channels(),
738 );
739 }
740
741 let session_for_run = Arc::clone(&session_srv);
742 let session_event_tx = event_tx.clone();
743 let session_chunk_tx = self.chunk_tx.clone();
744 let session_handle = tokio::spawn(async move {
745 if let Err(e) = session_for_run
746 .run(session_chunk_tx, session_event_tx)
747 .await
748 {
749 tracing::error!(error = %e, "Session server error");
750 }
751 });
752
753 let state_file = self.config.state_file.clone();
754 let save_state = |s: &state::ServerState| {
755 if let Some(ref path) = state_file {
756 let _ = s
757 .save(path)
758 .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
759 }
760 };
761
762 loop {
764 tokio::select! {
765 cmd = command_rx.recv() => {
766 match cmd {
767 Some(ServerCommand::Stop) | None => {
768 tracing::info!("Server stopped");
769 session_handle.abort();
770 return Ok(());
771 }
772 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
773 let mut s = shared_state.lock().await;
774 if let Some(c) = s.clients.get_mut(&client_id) {
775 c.config.volume.percent = volume;
776 c.config.volume.muted = muted;
777 }
778 let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
779 save_state(&s);
780 drop(s);
781 session_srv.push_settings(ClientSettingsUpdate {
782 client_id: client_id.clone(),
783 buffer_ms: self.config.buffer_ms as i32,
784 latency, volume, muted,
785 }).await;
786 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
787 session_srv.update_routing_for_client(&client_id).await;
788 }
789 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
790 let mut s = shared_state.lock().await;
791 if let Some(c) = s.clients.get_mut(&client_id) {
792 c.config.latency = latency;
793 session_srv.push_settings(ClientSettingsUpdate {
794 client_id: client_id.clone(),
795 buffer_ms: self.config.buffer_ms as i32,
796 latency,
797 volume: c.config.volume.percent,
798 muted: c.config.volume.muted,
799 }).await;
800 }
801 save_state(&s);
802 drop(s);
803 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
804 }
805 Some(ServerCommand::SetClientName { client_id, name }) => {
806 let mut s = shared_state.lock().await;
807 if let Some(c) = s.clients.get_mut(&client_id) {
808 c.config.name = name.clone();
809 }
810 save_state(&s);
811 drop(s);
812 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
813 }
814 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
815 let mut s = shared_state.lock().await;
816 s.set_group_stream(&group_id, &stream_id);
817 save_state(&s);
818 drop(s);
819 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
820 session_srv.update_routing_for_group(&group_id).await;
821 }
822 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
823 let mut s = shared_state.lock().await;
824 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
825 g.muted = muted;
826 }
827 save_state(&s);
828 drop(s);
829 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
830 session_srv.update_routing_for_group(&group_id).await;
831 }
832 Some(ServerCommand::SetGroupName { group_id, name }) => {
833 let mut s = shared_state.lock().await;
834 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
835 g.name = name.clone();
836 }
837 save_state(&s);
838 drop(s);
839 let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
840 }
841 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
842 let mut s = shared_state.lock().await;
843 s.set_group_clients(&group_id, &clients);
844 save_state(&s);
845 drop(s);
846 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
848 session_srv.update_routing_all().await;
849 }
850 Some(ServerCommand::DeleteClient { client_id }) => {
851 let mut s = shared_state.lock().await;
852 s.remove_client_from_groups(&client_id);
853 s.clients.remove(&client_id);
854 save_state(&s);
855 drop(s);
856 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
857 session_srv.update_routing_all().await;
858 }
859 Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
860 let mut s = shared_state.lock().await;
861 if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
862 stream.properties = metadata.clone();
863 }
864 drop(s);
865 let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
866 }
867 Some(ServerCommand::AddStream { uri, response_tx }) => {
868 let name = uri.split("name=").nth(1)
870 .and_then(|s| s.split('&').next())
871 .unwrap_or("dynamic")
872 .to_string();
873 let mut s = shared_state.lock().await;
874 if s.streams.iter().any(|st| st.id == name) {
875 let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
876 } else {
877 s.streams.push(state::StreamInfo {
878 id: name.clone(),
879 status: "idle".into(),
880 uri: uri.clone(),
881 properties: Default::default(),
882 });
883 save_state(&s);
884 drop(s);
885 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
886 let _ = response_tx.send(Ok(name));
887 }
888 }
889 Some(ServerCommand::RemoveStream { stream_id }) => {
890 let mut s = shared_state.lock().await;
891 s.streams.retain(|st| st.id != stream_id);
892 for g in &mut s.groups {
894 if g.stream_id == stream_id {
895 g.stream_id.clear();
896 }
897 }
898 save_state(&s);
899 drop(s);
900 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
901 session_srv.update_routing_all().await;
902 }
903 Some(ServerCommand::StreamControl { stream_id, command, params }) => {
904 tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
905 let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
907 }
908 Some(ServerCommand::GetStatus { response_tx }) => {
909 let s = shared_state.lock().await;
910 let _ = response_tx.send(s.to_status());
911 }
912 #[cfg(feature = "custom-protocol")]
913 Some(ServerCommand::SendToClient { client_id, message }) => {
914 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
915 }
916 }
917 }
918 }
919 }
920 }
921}