1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62const F32_CHANNEL_SIZE: usize = 1;
64
65#[derive(Debug, Clone)]
67pub enum AudioData {
68 F32(Vec<f32>),
71 Pcm(Vec<u8>),
74}
75
76#[derive(Debug, Clone)]
78pub struct AudioFrame {
79 pub data: AudioData,
81 pub timestamp_usec: i64,
83}
84
85pub struct F32AudioSender {
92 tx: mpsc::Sender<AudioFrame>,
93 buf: Vec<f32>,
94 chunk_samples: usize,
95 channels: u16,
96 sample_rate: u32,
97 ts: Option<time::ChunkTimestamper>,
98 last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102 fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103 let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104 Self {
105 tx,
106 buf: Vec::with_capacity(chunk_samples * 2),
107 chunk_samples,
108 channels,
109 sample_rate,
110 ts: None,
111 last_send: std::time::Instant::now(),
112 }
113 }
114
115 pub async fn send(
118 &mut self,
119 samples: &[f32],
120 ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121 let now = std::time::Instant::now();
122 if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123 self.ts = None;
124 self.buf.clear();
125 }
126 self.last_send = now;
127
128 self.buf.extend_from_slice(samples);
129 let ch = self.channels.max(1) as usize;
130 while self.buf.len() >= self.chunk_samples {
131 let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132 let frames = (self.chunk_samples / ch) as u32;
133 let ts = self
134 .ts
135 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136 let timestamp_usec = ts.next(frames);
137 self.tx
138 .send(AudioFrame {
139 data: AudioData::F32(chunk),
140 timestamp_usec,
141 })
142 .await?;
143 }
144 Ok(())
145 }
146
147 pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150 if self.buf.is_empty() {
151 return Ok(());
152 }
153 let chunk: Vec<f32> = self.buf.drain(..).collect();
154 let ch = self.channels.max(1) as usize;
155 let frames = (chunk.len() / ch) as u32;
156 let ts = self
157 .ts
158 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159 let timestamp_usec = ts.next(frames);
160 self.tx
161 .send(AudioFrame {
162 data: AudioData::F32(chunk),
163 timestamp_usec,
164 })
165 .await
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct WireChunkData {
172 pub stream_id: String,
174 pub timestamp_usec: i64,
176 pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184#[cfg(feature = "mdns")]
185pub(crate) mod mdns;
186pub(crate) mod session;
187pub(crate) mod state;
188pub mod status;
189pub(crate) mod stream;
190pub mod time;
191
192#[derive(Debug, Clone)]
194pub struct ClientSettingsUpdate {
195 pub client_id: String,
197 pub buffer_ms: i32,
199 pub latency: i32,
201 pub volume: u16,
203 pub muted: bool,
205}
206
207#[derive(Debug)]
209#[non_exhaustive]
210pub enum ServerEvent {
211 ClientConnected {
213 id: String,
215 hello: snapcast_proto::message::hello::Hello,
217 },
218 ClientDisconnected {
220 id: String,
222 },
223 ClientVolumeChanged {
225 client_id: String,
227 volume: u16,
229 muted: bool,
231 },
232 ClientLatencyChanged {
234 client_id: String,
236 latency: i32,
238 },
239 ClientNameChanged {
241 client_id: String,
243 name: String,
245 },
246 GroupStreamChanged {
248 group_id: String,
250 stream_id: String,
252 },
253 GroupMuteChanged {
255 group_id: String,
257 muted: bool,
259 },
260 StreamStatus {
262 stream_id: String,
264 status: String,
266 },
267 StreamMetaChanged {
269 stream_id: String,
271 metadata: std::collections::HashMap<String, serde_json::Value>,
273 },
274 GroupNameChanged {
276 group_id: String,
278 name: String,
280 },
281 ServerUpdated,
287 StreamControl {
291 stream_id: String,
293 command: String,
295 params: serde_json::Value,
297 },
298 #[cfg(feature = "custom-protocol")]
300 CustomMessage {
301 client_id: String,
303 message: snapcast_proto::CustomMessage,
305 },
306}
307
308#[derive(Debug)]
310#[non_exhaustive]
311pub enum ServerCommand {
312 SetClientVolume {
314 client_id: String,
316 volume: u16,
318 muted: bool,
320 },
321 SetClientLatency {
323 client_id: String,
325 latency: i32,
327 },
328 SetClientName {
330 client_id: String,
332 name: String,
334 },
335 SetGroupStream {
337 group_id: String,
339 stream_id: String,
341 },
342 SetGroupMute {
344 group_id: String,
346 muted: bool,
348 },
349 SetGroupName {
351 group_id: String,
353 name: String,
355 },
356 SetGroupClients {
358 group_id: String,
360 clients: Vec<String>,
362 },
363 DeleteClient {
365 client_id: String,
367 },
368 SetStreamMeta {
370 stream_id: String,
372 metadata: std::collections::HashMap<String, serde_json::Value>,
374 },
375 AddStream {
377 uri: String,
379 response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
381 },
382 RemoveStream {
384 stream_id: String,
386 },
387 StreamControl {
389 stream_id: String,
391 command: String,
393 params: serde_json::Value,
395 },
396 GetStatus {
398 response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
400 },
401 #[cfg(feature = "custom-protocol")]
403 SendToClient {
404 client_id: String,
406 message: snapcast_proto::CustomMessage,
408 },
409 Stop,
411}
412
413fn default_codec() -> &'static str {
415 #[cfg(feature = "flac")]
416 return "flac";
417 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
418 return "f32lz4";
419 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
420 return "pcm";
421}
422
423pub struct ServerConfig {
425 pub stream_port: u16,
427 pub buffer_ms: u32,
429 pub codec: String,
431 pub sample_format: String,
433 #[cfg(feature = "mdns")]
435 pub mdns_service_type: String,
436 #[cfg(feature = "mdns")]
438 pub mdns_enabled: bool,
439 #[cfg(feature = "mdns")]
441 pub mdns_name: String,
442 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
444 #[cfg(feature = "encryption")]
446 pub encryption_psk: Option<String>,
447 pub state_file: Option<std::path::PathBuf>,
449 pub send_audio_to_muted: bool,
451}
452
453impl Default for ServerConfig {
454 fn default() -> Self {
455 Self {
456 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
457 buffer_ms: 1000,
458 codec: default_codec().into(),
459 sample_format: "48000:16:2".into(),
460 #[cfg(feature = "mdns")]
461 mdns_service_type: "_snapcast._tcp.local.".into(),
462 #[cfg(feature = "mdns")]
463 mdns_enabled: true,
464 #[cfg(feature = "mdns")]
465 mdns_name: "Snapserver".into(),
466 auth: None,
467 #[cfg(feature = "encryption")]
468 encryption_psk: None,
469 state_file: None,
470 send_audio_to_muted: false,
471 }
472 }
473}
474
475#[derive(Debug, Clone, Default)]
477pub struct StreamConfig {
478 pub codec: Option<String>,
480 pub sample_format: Option<String>,
482}
483
484pub struct SnapServer {
486 config: ServerConfig,
487 event_tx: mpsc::Sender<ServerEvent>,
488 command_tx: mpsc::Sender<ServerCommand>,
489 command_rx: Option<mpsc::Receiver<ServerCommand>>,
490 streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
492 chunk_tx: broadcast::Sender<WireChunkData>,
494}
495
496fn spawn_stream_encoder(
501 stream_id: String,
502 mut rx: mpsc::Receiver<AudioFrame>,
503 mut enc: Box<dyn encoder::Encoder>,
504 chunk_tx: broadcast::Sender<WireChunkData>,
505 sample_rate: u32,
506 channels: u16,
507) {
508 std::thread::spawn(move || {
509 let rt = tokio::runtime::Builder::new_current_thread()
510 .enable_time()
511 .build()
512 .expect("encoder runtime");
513
514 rt.block_on(async {
515 let mut next_tick: Option<tokio::time::Instant> = None;
516 while let Some(frame) = rx.recv().await {
517 if let AudioData::F32(ref samples) = frame.data {
519 let num_frames = samples.len() / channels.max(1) as usize;
520 let chunk_dur = std::time::Duration::from_micros(
521 (num_frames as u64 * 1_000_000) / sample_rate as u64,
522 );
523 let now = tokio::time::Instant::now();
524 let tick = next_tick.get_or_insert(now);
525 if now.checked_duration_since(*tick + chunk_dur)
527 > Some(std::time::Duration::from_millis(500))
528 {
529 *tick = now;
530 }
531 *tick += chunk_dur;
532 tokio::time::sleep_until(*tick).await;
533 }
534 match enc.encode(&frame.data) {
535 Ok(encoded) if !encoded.data.is_empty() => {
536 let _ = chunk_tx.send(WireChunkData {
537 stream_id: stream_id.clone(),
538 timestamp_usec: frame.timestamp_usec,
539 data: encoded.data,
540 });
541 }
542 Err(e) => {
543 tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
544 }
545 _ => {} }
547 }
548 });
549 });
550}
551
552impl SnapServer {
554 pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
556 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
557 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
558 let (chunk_tx, _) = broadcast::channel(256);
559 let server = Self {
560 config,
561 event_tx,
562 command_tx,
563 command_rx: Some(command_rx),
564 streams: Vec::new(),
565 chunk_tx,
566 };
567 (server, event_rx)
568 }
569
570 pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
574 self.add_stream_with_config(name, StreamConfig::default())
575 }
576
577 pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
585 let sf: SampleFormat =
586 self.config.sample_format.parse().map_err(|e| {
587 format!("invalid sample_format '{}': {e}", self.config.sample_format)
588 })?;
589 let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
590 self.streams
591 .push((name.to_string(), StreamConfig::default(), rx));
592 Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
593 }
594
595 pub fn add_stream_with_config(
597 &mut self,
598 name: &str,
599 config: StreamConfig,
600 ) -> mpsc::Sender<AudioFrame> {
601 let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
602 self.streams.push((name.to_string(), config, rx));
603 tx
604 }
605
606 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
608 self.command_tx.clone()
609 }
610
611 pub fn config(&self) -> &ServerConfig {
613 &self.config
614 }
615
616 pub async fn run(&mut self) -> anyhow::Result<()> {
618 let mut command_rx = self
619 .command_rx
620 .take()
621 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
622
623 let event_tx = self.event_tx.clone();
624
625 let sample_format: snapcast_proto::SampleFormat = self
626 .config
627 .sample_format
628 .parse()
629 .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
630
631 anyhow::ensure!(
632 !self.streams.is_empty(),
633 "No streams configured — call add_stream() before run()"
634 );
635
636 tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
637
638 #[cfg(feature = "mdns")]
640 let _mdns = if self.config.mdns_enabled {
641 mdns::MdnsAdvertiser::new(
642 self.config.stream_port,
643 &self.config.mdns_service_type,
644 &self.config.mdns_name,
645 )
646 .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
647 .ok()
648 } else {
649 None
650 };
651
652 let default_enc_config = encoder::EncoderConfig {
654 codec: self.config.codec.clone(),
655 format: sample_format,
656 options: String::new(),
657 #[cfg(feature = "encryption")]
658 encryption_psk: self.config.encryption_psk.clone(),
659 };
660 let default_enc = encoder::create(&default_enc_config)?;
661
662 let chunk_tx = self.chunk_tx.clone();
664 let streams = std::mem::take(&mut self.streams);
665 let mut default_enc = Some(default_enc);
666
667 let initial_state = self
669 .config
670 .state_file
671 .as_ref()
672 .map(|p| state::ServerState::load(p))
673 .unwrap_or_default();
674 let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
675
676 let first_name = streams
679 .first()
680 .map(|(n, _, _)| n.clone())
681 .unwrap_or_default();
682 let session_srv = Arc::new(session::SessionServer::new(
683 self.config.stream_port,
684 self.config.buffer_ms as i32,
685 self.config.auth.clone(),
686 Arc::clone(&shared_state),
687 first_name.clone(),
688 self.config.send_audio_to_muted,
689 ));
690
691 for (name, stream_cfg, rx) in streams {
692 {
693 let mut s = shared_state.lock().await;
694 s.streams.push(state::StreamInfo {
695 id: name.clone(),
696 status: "idle".into(),
697 uri: String::new(),
698 properties: Default::default(),
699 });
700 }
701 let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
702 if let Some(enc) = default_enc.take() {
703 enc
704 } else {
705 encoder::create(&default_enc_config)?
706 }
707 } else {
708 let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
709 let stream_format: snapcast_proto::SampleFormat = stream_cfg
710 .sample_format
711 .as_deref()
712 .and_then(|s| s.parse().ok())
713 .unwrap_or(sample_format);
714 encoder::create(&encoder::EncoderConfig {
715 codec: stream_codec.to_string(),
716 format: stream_format,
717 options: String::new(),
718 #[cfg(feature = "encryption")]
719 encryption_psk: self.config.encryption_psk.clone(),
720 })?
721 };
722 tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
723 session_srv
724 .register_stream_codec(&name, enc.name(), enc.header())
725 .await;
726 spawn_stream_encoder(
727 name,
728 rx,
729 enc,
730 chunk_tx.clone(),
731 sample_format.rate(),
732 sample_format.channels(),
733 );
734 }
735
736 let session_for_run = Arc::clone(&session_srv);
737 let session_event_tx = event_tx.clone();
738 let session_chunk_tx = self.chunk_tx.clone();
739 let session_handle = tokio::spawn(async move {
740 if let Err(e) = session_for_run
741 .run(session_chunk_tx, session_event_tx)
742 .await
743 {
744 tracing::error!(error = %e, "Session server error");
745 }
746 });
747
748 let state_file = self.config.state_file.clone();
749 let save_state = |s: &state::ServerState| {
750 if let Some(ref path) = state_file {
751 let _ = s
752 .save(path)
753 .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
754 }
755 };
756
757 loop {
759 tokio::select! {
760 cmd = command_rx.recv() => {
761 match cmd {
762 Some(ServerCommand::Stop) | None => {
763 tracing::info!("Server stopped");
764 session_handle.abort();
765 return Ok(());
766 }
767 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
768 let mut s = shared_state.lock().await;
769 if let Some(c) = s.clients.get_mut(&client_id) {
770 c.config.volume.percent = volume;
771 c.config.volume.muted = muted;
772 }
773 let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
774 save_state(&s);
775 drop(s);
776 session_srv.push_settings(ClientSettingsUpdate {
777 client_id: client_id.clone(),
778 buffer_ms: self.config.buffer_ms as i32,
779 latency, volume, muted,
780 }).await;
781 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
782 session_srv.update_routing_for_client(&client_id).await;
783 }
784 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
785 let mut s = shared_state.lock().await;
786 if let Some(c) = s.clients.get_mut(&client_id) {
787 c.config.latency = latency;
788 session_srv.push_settings(ClientSettingsUpdate {
789 client_id: client_id.clone(),
790 buffer_ms: self.config.buffer_ms as i32,
791 latency,
792 volume: c.config.volume.percent,
793 muted: c.config.volume.muted,
794 }).await;
795 }
796 save_state(&s);
797 drop(s);
798 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
799 }
800 Some(ServerCommand::SetClientName { client_id, name }) => {
801 let mut s = shared_state.lock().await;
802 if let Some(c) = s.clients.get_mut(&client_id) {
803 c.config.name = name.clone();
804 }
805 save_state(&s);
806 drop(s);
807 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
808 }
809 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
810 let mut s = shared_state.lock().await;
811 s.set_group_stream(&group_id, &stream_id);
812 save_state(&s);
813 drop(s);
814 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
815 session_srv.update_routing_for_group(&group_id).await;
816 }
817 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
818 let mut s = shared_state.lock().await;
819 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
820 g.muted = muted;
821 }
822 save_state(&s);
823 drop(s);
824 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
825 session_srv.update_routing_for_group(&group_id).await;
826 }
827 Some(ServerCommand::SetGroupName { group_id, name }) => {
828 let mut s = shared_state.lock().await;
829 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
830 g.name = name.clone();
831 }
832 save_state(&s);
833 drop(s);
834 let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
835 }
836 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
837 let mut s = shared_state.lock().await;
838 s.set_group_clients(&group_id, &clients);
839 save_state(&s);
840 drop(s);
841 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
843 session_srv.update_routing_all().await;
844 }
845 Some(ServerCommand::DeleteClient { client_id }) => {
846 let mut s = shared_state.lock().await;
847 s.remove_client_from_groups(&client_id);
848 s.clients.remove(&client_id);
849 save_state(&s);
850 drop(s);
851 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
852 session_srv.update_routing_all().await;
853 }
854 Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
855 let mut s = shared_state.lock().await;
856 if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
857 stream.properties = metadata.clone();
858 }
859 drop(s);
860 let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
861 }
862 Some(ServerCommand::AddStream { uri, response_tx }) => {
863 let name = uri.split("name=").nth(1)
865 .and_then(|s| s.split('&').next())
866 .unwrap_or("dynamic")
867 .to_string();
868 let mut s = shared_state.lock().await;
869 if s.streams.iter().any(|st| st.id == name) {
870 let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
871 } else {
872 s.streams.push(state::StreamInfo {
873 id: name.clone(),
874 status: "idle".into(),
875 uri: uri.clone(),
876 properties: Default::default(),
877 });
878 save_state(&s);
879 drop(s);
880 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
881 let _ = response_tx.send(Ok(name));
882 }
883 }
884 Some(ServerCommand::RemoveStream { stream_id }) => {
885 let mut s = shared_state.lock().await;
886 s.streams.retain(|st| st.id != stream_id);
887 for g in &mut s.groups {
889 if g.stream_id == stream_id {
890 g.stream_id.clear();
891 }
892 }
893 save_state(&s);
894 drop(s);
895 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
896 session_srv.update_routing_all().await;
897 }
898 Some(ServerCommand::StreamControl { stream_id, command, params }) => {
899 tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
900 let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
902 }
903 Some(ServerCommand::GetStatus { response_tx }) => {
904 let s = shared_state.lock().await;
905 let _ = response_tx.send(s.to_status());
906 }
907 #[cfg(feature = "custom-protocol")]
908 Some(ServerCommand::SendToClient { client_id, message }) => {
909 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
910 }
911 }
912 }
913 }
914 }
915 }
916}