1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62const F32_CHANNEL_SIZE: usize = 1;
64
65#[derive(Debug, Clone)]
67pub enum AudioData {
68 F32(Vec<f32>),
71 Pcm(Vec<u8>),
74}
75
76#[derive(Debug, Clone)]
78pub struct AudioFrame {
79 pub data: AudioData,
81 pub timestamp_usec: i64,
83}
84
85pub struct F32AudioSender {
92 tx: mpsc::Sender<AudioFrame>,
93 buf: Vec<f32>,
94 chunk_samples: usize,
95 channels: u16,
96 sample_rate: u32,
97 ts: Option<time::ChunkTimestamper>,
98 last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102 fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103 let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104 Self {
105 tx,
106 buf: Vec::with_capacity(chunk_samples * 2),
107 chunk_samples,
108 channels,
109 sample_rate,
110 ts: None,
111 last_send: std::time::Instant::now(),
112 }
113 }
114
115 pub async fn send(
118 &mut self,
119 samples: &[f32],
120 ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121 let now = std::time::Instant::now();
122 if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123 self.ts = None;
124 self.buf.clear();
125 }
126 self.last_send = now;
127
128 self.buf.extend_from_slice(samples);
129 let ch = self.channels.max(1) as usize;
130 while self.buf.len() >= self.chunk_samples {
131 let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132 let frames = (self.chunk_samples / ch) as u32;
133 let ts = self
134 .ts
135 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136 let timestamp_usec = ts.next(frames);
137 self.tx
138 .send(AudioFrame {
139 data: AudioData::F32(chunk),
140 timestamp_usec,
141 })
142 .await?;
143 }
144 Ok(())
145 }
146
147 pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150 if self.buf.is_empty() {
151 return Ok(());
152 }
153 let chunk: Vec<f32> = self.buf.drain(..).collect();
154 let ch = self.channels.max(1) as usize;
155 let frames = (chunk.len() / ch) as u32;
156 let ts = self
157 .ts
158 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159 let timestamp_usec = ts.next(frames);
160 self.tx
161 .send(AudioFrame {
162 data: AudioData::F32(chunk),
163 timestamp_usec,
164 })
165 .await
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct WireChunkData {
172 pub stream_id: String,
174 pub timestamp_usec: i64,
176 pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184#[cfg(feature = "mdns")]
185pub(crate) mod mdns;
186pub(crate) mod session;
187pub(crate) mod state;
188pub mod status;
189pub(crate) mod stream;
190pub mod time;
191
192#[derive(Debug, Clone)]
194pub struct ClientSettingsUpdate {
195 pub client_id: String,
197 pub buffer_ms: i32,
199 pub latency: i32,
201 pub volume: u16,
203 pub muted: bool,
205}
206
207#[derive(Debug)]
209#[non_exhaustive]
210pub enum ServerEvent {
211 ClientConnected {
213 id: String,
215 hello: snapcast_proto::message::hello::Hello,
217 },
218 ClientDisconnected {
220 id: String,
222 },
223 ClientVolumeChanged {
225 client_id: String,
227 volume: u16,
229 muted: bool,
231 },
232 ClientLatencyChanged {
234 client_id: String,
236 latency: i32,
238 },
239 ClientNameChanged {
241 client_id: String,
243 name: String,
245 },
246 GroupStreamChanged {
248 group_id: String,
250 stream_id: String,
252 },
253 GroupMuteChanged {
255 group_id: String,
257 muted: bool,
259 },
260 StreamStatus {
262 stream_id: String,
264 status: String,
266 },
267 StreamMetaChanged {
269 stream_id: String,
271 metadata: std::collections::HashMap<String, serde_json::Value>,
273 },
274 GroupNameChanged {
276 group_id: String,
278 name: String,
280 },
281 ServerUpdated,
287 StreamControl {
291 stream_id: String,
293 command: String,
295 params: serde_json::Value,
297 },
298 #[cfg(feature = "custom-protocol")]
300 CustomMessage {
301 client_id: String,
303 message: snapcast_proto::CustomMessage,
305 },
306}
307
308#[derive(Debug)]
310#[non_exhaustive]
311pub enum ServerCommand {
312 SetClientVolume {
314 client_id: String,
316 volume: u16,
318 muted: bool,
320 },
321 SetClientLatency {
323 client_id: String,
325 latency: i32,
327 },
328 SetClientName {
330 client_id: String,
332 name: String,
334 },
335 SetGroupStream {
337 group_id: String,
339 stream_id: String,
341 },
342 SetGroupMute {
344 group_id: String,
346 muted: bool,
348 },
349 SetGroupName {
351 group_id: String,
353 name: String,
355 },
356 SetGroupClients {
358 group_id: String,
360 clients: Vec<String>,
362 },
363 DeleteClient {
365 client_id: String,
367 },
368 SetStreamMeta {
370 stream_id: String,
372 metadata: std::collections::HashMap<String, serde_json::Value>,
374 },
375 AddStream {
381 uri: String,
383 response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
385 },
386 RemoveStream {
388 stream_id: String,
390 },
391 StreamControl {
393 stream_id: String,
395 command: String,
397 params: serde_json::Value,
399 },
400 GetStatus {
402 response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
404 },
405 #[cfg(feature = "custom-protocol")]
407 SendToClient {
408 client_id: String,
410 message: snapcast_proto::CustomMessage,
412 },
413 Stop,
415}
416
417fn default_codec() -> &'static str {
419 #[cfg(feature = "flac")]
420 return snapcast_proto::CODEC_FLAC;
421 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
422 return snapcast_proto::CODEC_F32LZ4;
423 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
424 return snapcast_proto::CODEC_PCM;
425}
426
427pub struct ServerConfig {
429 pub stream_bind_address: String,
431 pub stream_port: u16,
433 pub buffer_ms: u32,
435 pub codec: String,
437 pub sample_format: String,
439 #[cfg(feature = "mdns")]
441 pub mdns_service_type: String,
442 #[cfg(feature = "mdns")]
444 pub mdns_enabled: bool,
445 #[cfg(feature = "mdns")]
447 pub mdns_name: String,
448 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
450 pub client_filter: Option<std::sync::Arc<dyn auth::ClientFilter>>,
453 #[cfg(feature = "encryption")]
455 pub encryption_psk: Option<String>,
456 pub state_file: Option<std::path::PathBuf>,
458 pub send_audio_to_muted: bool,
460}
461
462impl Default for ServerConfig {
463 fn default() -> Self {
464 Self {
465 stream_bind_address: snapcast_proto::DEFAULT_BIND_ADDRESS.into(),
466 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
467 buffer_ms: snapcast_proto::DEFAULT_BUFFER_MS,
468 codec: default_codec().into(),
469 sample_format: snapcast_proto::DEFAULT_SAMPLE_FORMAT_STRING.into(),
470 #[cfg(feature = "mdns")]
471 mdns_service_type: snapcast_proto::DEFAULT_MDNS_SERVICE_TYPE.into(),
472 #[cfg(feature = "mdns")]
473 mdns_enabled: true,
474 #[cfg(feature = "mdns")]
475 mdns_name: snapcast_proto::DEFAULT_SERVER_NAME.into(),
476 auth: None,
477 client_filter: None,
478 #[cfg(feature = "encryption")]
479 encryption_psk: None,
480 state_file: None,
481 send_audio_to_muted: false,
482 }
483 }
484}
485
486#[derive(Debug, Clone, Default)]
488pub struct StreamConfig {
489 pub codec: Option<String>,
491 pub sample_format: Option<String>,
493}
494
495pub struct SnapServer {
497 config: ServerConfig,
498 event_tx: mpsc::Sender<ServerEvent>,
499 command_tx: mpsc::Sender<ServerCommand>,
500 command_rx: Option<mpsc::Receiver<ServerCommand>>,
501 streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
503 chunk_tx: broadcast::Sender<WireChunkData>,
505}
506
507fn spawn_stream_encoder(
512 stream_id: String,
513 mut rx: mpsc::Receiver<AudioFrame>,
514 mut enc: Box<dyn encoder::Encoder>,
515 chunk_tx: broadcast::Sender<WireChunkData>,
516 sample_rate: u32,
517 channels: u16,
518) {
519 std::thread::spawn(move || {
520 let rt = tokio::runtime::Builder::new_current_thread()
521 .enable_time()
522 .build()
523 .expect("encoder runtime");
524
525 rt.block_on(async {
526 let mut next_tick: Option<tokio::time::Instant> = None;
527 while let Some(frame) = rx.recv().await {
528 if let AudioData::F32(ref samples) = frame.data {
530 let num_frames = samples.len() / channels.max(1) as usize;
531 let chunk_dur = std::time::Duration::from_micros(
532 (num_frames as u64 * 1_000_000) / sample_rate as u64,
533 );
534 let now = tokio::time::Instant::now();
535 let tick = next_tick.get_or_insert(now);
536 if now.checked_duration_since(*tick + chunk_dur)
538 > Some(std::time::Duration::from_millis(500))
539 {
540 *tick = now;
541 }
542 *tick += chunk_dur;
543 tokio::time::sleep_until(*tick).await;
544 }
545 match enc.encode(&frame.data) {
546 Ok(encoded) if !encoded.data.is_empty() => {
547 let _ = chunk_tx.send(WireChunkData {
548 stream_id: stream_id.clone(),
549 timestamp_usec: frame.timestamp_usec,
550 data: encoded.data,
551 });
552 }
553 Err(e) => {
554 tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
555 }
556 _ => {} }
558 }
559 });
560 });
561}
562
563impl SnapServer {
565 pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
567 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
568 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
569 let (chunk_tx, _) = broadcast::channel(256);
570 let server = Self {
571 config,
572 event_tx,
573 command_tx,
574 command_rx: Some(command_rx),
575 streams: Vec::new(),
576 chunk_tx,
577 };
578 (server, event_rx)
579 }
580
581 pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
585 self.add_stream_with_config(name, StreamConfig::default())
586 }
587
588 pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
596 let sf: SampleFormat =
597 self.config.sample_format.parse().map_err(|e| {
598 format!("invalid sample_format '{}': {e}", self.config.sample_format)
599 })?;
600 let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
601 self.streams
602 .push((name.to_string(), StreamConfig::default(), rx));
603 Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
604 }
605
606 pub fn add_stream_with_config(
608 &mut self,
609 name: &str,
610 config: StreamConfig,
611 ) -> mpsc::Sender<AudioFrame> {
612 let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
613 self.streams.push((name.to_string(), config, rx));
614 tx
615 }
616
617 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
619 self.command_tx.clone()
620 }
621
622 pub fn config(&self) -> &ServerConfig {
624 &self.config
625 }
626
627 pub async fn run(&mut self) -> anyhow::Result<()> {
629 let mut command_rx = self
630 .command_rx
631 .take()
632 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
633
634 let event_tx = self.event_tx.clone();
635
636 let sample_format: snapcast_proto::SampleFormat = self
637 .config
638 .sample_format
639 .parse()
640 .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
641
642 anyhow::ensure!(
643 !self.streams.is_empty(),
644 "No streams configured — call add_stream() before run()"
645 );
646
647 tracing::info!(
648 bind_address = %self.config.stream_bind_address,
649 stream_port = self.config.stream_port,
650 "Snapserver starting"
651 );
652
653 #[cfg(feature = "mdns")]
655 let _mdns = if self.config.mdns_enabled {
656 mdns::MdnsAdvertiser::new(
657 self.config.stream_port,
658 &self.config.mdns_service_type,
659 &self.config.mdns_name,
660 )
661 .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
662 .ok()
663 } else {
664 None
665 };
666
667 let default_enc_config = encoder::EncoderConfig {
669 codec: self.config.codec.clone(),
670 format: sample_format,
671 options: String::new(),
672 #[cfg(feature = "encryption")]
673 encryption_psk: self.config.encryption_psk.clone(),
674 };
675 let default_enc = encoder::create(&default_enc_config)?;
676
677 let chunk_tx = self.chunk_tx.clone();
679 let streams = std::mem::take(&mut self.streams);
680 let mut default_enc = Some(default_enc);
681
682 let initial_state = self
684 .config
685 .state_file
686 .as_ref()
687 .map(|p| state::ServerState::load(p))
688 .unwrap_or_default();
689 let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
690
691 let first_name = streams
694 .first()
695 .map(|(n, _, _)| n.clone())
696 .unwrap_or_default();
697 let session_srv = Arc::new(session::SessionServer::new(session::SessionServerConfig {
698 bind_address: self.config.stream_bind_address.clone(),
699 port: self.config.stream_port,
700 buffer_ms: self.config.buffer_ms as i32,
701 auth: self.config.auth.clone(),
702 client_filter: self.config.client_filter.clone(),
703 shared_state: Arc::clone(&shared_state),
704 default_stream: first_name.clone(),
705 send_audio_to_muted: self.config.send_audio_to_muted,
706 }));
707
708 for (name, stream_cfg, rx) in streams {
709 {
710 let mut s = shared_state.lock().await;
711 if !s.streams.iter().any(|existing| existing.id == name) {
712 s.streams.push(state::StreamInfo {
713 id: name.clone(),
714 status: "idle".into(),
715 uri: String::new(),
716 properties: Default::default(),
717 });
718 }
719 }
720 let mut active_format = sample_format;
721 let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
722 if let Some(enc) = default_enc.take() {
723 enc
724 } else {
725 encoder::create(&default_enc_config)?
726 }
727 } else {
728 let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
729 let stream_format: snapcast_proto::SampleFormat = stream_cfg
730 .sample_format
731 .as_deref()
732 .and_then(|s| s.parse().ok())
733 .unwrap_or(sample_format);
734 active_format = stream_format;
735 encoder::create(&encoder::EncoderConfig {
736 codec: stream_codec.to_string(),
737 format: stream_format,
738 options: String::new(),
739 #[cfg(feature = "encryption")]
740 encryption_psk: self.config.encryption_psk.clone(),
741 })?
742 };
743 tracing::info!(stream = %name, codec = enc.name(), format = %active_format, "Stream registered");
744 session_srv
745 .register_stream_codec(&name, enc.name(), enc.header())
746 .await;
747 spawn_stream_encoder(
748 name,
749 rx,
750 enc,
751 chunk_tx.clone(),
752 active_format.rate(),
753 active_format.channels(),
754 );
755 }
756
757 let session_for_run = Arc::clone(&session_srv);
758 let session_event_tx = event_tx.clone();
759 let session_chunk_tx = self.chunk_tx.clone();
760 let session_handle = tokio::spawn(async move {
761 if let Err(e) = session_for_run
762 .run(session_chunk_tx, session_event_tx)
763 .await
764 {
765 tracing::error!(error = %e, "Session server error");
766 }
767 });
768
769 let state_file = self.config.state_file.clone();
770 let save_state = |s: &state::ServerState| {
771 if let Some(ref path) = state_file {
772 let _ = s
773 .save(path)
774 .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
775 }
776 };
777
778 loop {
780 tokio::select! {
781 cmd = command_rx.recv() => {
782 match cmd {
783 Some(ServerCommand::Stop) | None => {
784 tracing::info!("Server stopped");
785 session_handle.abort();
786 return Ok(());
787 }
788 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
789 let mut s = shared_state.lock().await;
790 if let Some(c) = s.clients.get_mut(&client_id) {
791 c.config.volume.percent = volume;
792 c.config.volume.muted = muted;
793 }
794 let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
795 save_state(&s);
796 drop(s);
797 session_srv.push_settings(ClientSettingsUpdate {
798 client_id: client_id.clone(),
799 buffer_ms: self.config.buffer_ms as i32,
800 latency, volume, muted,
801 }).await;
802 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
803 session_srv.update_routing_for_client(&client_id).await;
804 }
805 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
806 let mut settings_update = None;
807 let mut s = shared_state.lock().await;
808 if let Some(c) = s.clients.get_mut(&client_id) {
809 c.config.latency = latency;
810 settings_update = Some(ClientSettingsUpdate {
811 client_id: client_id.clone(),
812 buffer_ms: self.config.buffer_ms as i32,
813 latency,
814 volume: c.config.volume.percent,
815 muted: c.config.volume.muted,
816 });
817 }
818 save_state(&s);
819 drop(s);
820 if let Some(update) = settings_update {
821 session_srv.push_settings(update).await;
822 }
823 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
824 }
825 Some(ServerCommand::SetClientName { client_id, name }) => {
826 let mut s = shared_state.lock().await;
827 if let Some(c) = s.clients.get_mut(&client_id) {
828 c.config.name = name.clone();
829 }
830 save_state(&s);
831 drop(s);
832 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
833 }
834 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
835 let mut s = shared_state.lock().await;
836 s.set_group_stream(&group_id, &stream_id);
837 save_state(&s);
838 drop(s);
839 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
840 session_srv.update_routing_for_group(&group_id).await;
841 }
842 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
843 let mut s = shared_state.lock().await;
844 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
845 g.muted = muted;
846 }
847 save_state(&s);
848 drop(s);
849 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
850 session_srv.update_routing_for_group(&group_id).await;
851 }
852 Some(ServerCommand::SetGroupName { group_id, name }) => {
853 let mut s = shared_state.lock().await;
854 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
855 g.name = name.clone();
856 }
857 save_state(&s);
858 drop(s);
859 let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
860 }
861 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
862 let mut s = shared_state.lock().await;
863 s.set_group_clients(&group_id, &clients);
864 save_state(&s);
865 drop(s);
866 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
868 session_srv.update_routing_all().await;
869 }
870 Some(ServerCommand::DeleteClient { client_id }) => {
871 let mut s = shared_state.lock().await;
872 s.remove_client_from_groups(&client_id);
873 s.clients.remove(&client_id);
874 save_state(&s);
875 drop(s);
876 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
877 session_srv.update_routing_all().await;
878 }
879 Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
880 let mut s = shared_state.lock().await;
881 if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
882 stream.properties = metadata.clone();
883 }
884 drop(s);
885 let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
886 }
887 Some(ServerCommand::AddStream { uri, response_tx }) => {
888 tracing::warn!(uri, "Dynamic stream addition requires application-owned stream orchestration");
889 let _ = response_tx.send(Err(
890 "dynamic Stream.AddStream is not supported by the embeddable server after run(); create streams before run()".into(),
891 ));
892 }
893 Some(ServerCommand::RemoveStream { stream_id }) => {
894 let mut s = shared_state.lock().await;
895 s.streams.retain(|st| st.id != stream_id);
896 for g in &mut s.groups {
898 if g.stream_id == stream_id {
899 g.stream_id.clear();
900 }
901 }
902 save_state(&s);
903 drop(s);
904 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
905 session_srv.update_routing_all().await;
906 }
907 Some(ServerCommand::StreamControl { stream_id, command, params }) => {
908 tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
909 let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
911 }
912 Some(ServerCommand::GetStatus { response_tx }) => {
913 let s = shared_state.lock().await;
914 let _ = response_tx.send(s.to_status());
915 }
916 #[cfg(feature = "custom-protocol")]
917 Some(ServerCommand::SendToClient { client_id, message }) => {
918 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
919 }
920 }
921 }
922 }
923 }
924 }
925}