1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62const F32_CHANNEL_SIZE: usize = 1;
64
65#[derive(Debug, Clone)]
67pub enum AudioData {
68 F32(Vec<f32>),
71 Pcm(Vec<u8>),
74}
75
76#[derive(Debug, Clone)]
78pub struct AudioFrame {
79 pub data: AudioData,
81 pub timestamp_usec: i64,
83}
84
85pub struct F32AudioSender {
92 tx: mpsc::Sender<AudioFrame>,
93 buf: Vec<f32>,
94 chunk_samples: usize,
95 channels: u16,
96 sample_rate: u32,
97 ts: Option<time::ChunkTimestamper>,
98 last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102 fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103 let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104 Self {
105 tx,
106 buf: Vec::with_capacity(chunk_samples * 2),
107 chunk_samples,
108 channels,
109 sample_rate,
110 ts: None,
111 last_send: std::time::Instant::now(),
112 }
113 }
114
115 pub async fn send(
118 &mut self,
119 samples: &[f32],
120 ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121 let now = std::time::Instant::now();
122 if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123 self.ts = None;
124 self.buf.clear();
125 }
126 self.last_send = now;
127
128 self.buf.extend_from_slice(samples);
129 let ch = self.channels.max(1) as usize;
130 while self.buf.len() >= self.chunk_samples {
131 let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132 let frames = (self.chunk_samples / ch) as u32;
133 let ts = self
134 .ts
135 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136 let timestamp_usec = ts.next(frames);
137 self.tx
138 .send(AudioFrame {
139 data: AudioData::F32(chunk),
140 timestamp_usec,
141 })
142 .await?;
143 }
144 Ok(())
145 }
146
147 pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150 if self.buf.is_empty() {
151 return Ok(());
152 }
153 let chunk: Vec<f32> = self.buf.drain(..).collect();
154 let ch = self.channels.max(1) as usize;
155 let frames = (chunk.len() / ch) as u32;
156 let ts = self
157 .ts
158 .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159 let timestamp_usec = ts.next(frames);
160 self.tx
161 .send(AudioFrame {
162 data: AudioData::F32(chunk),
163 timestamp_usec,
164 })
165 .await
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct WireChunkData {
172 pub stream_id: String,
174 pub timestamp_usec: i64,
176 pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184pub(crate) mod session;
185pub(crate) mod state;
186pub mod status;
187pub(crate) mod stream;
188pub mod time;
189
190#[derive(Debug, Clone)]
192pub struct ClientSettingsUpdate {
193 pub client_id: String,
195 pub buffer_ms: i32,
197 pub latency: i32,
199 pub volume: u16,
201 pub muted: bool,
203}
204
205#[derive(Debug)]
207#[non_exhaustive]
208pub enum ServerEvent {
209 ClientConnected {
211 id: String,
213 hello: snapcast_proto::message::hello::Hello,
215 },
216 ClientDisconnected {
218 id: String,
220 },
221 ClientVolumeChanged {
223 client_id: String,
225 volume: u16,
227 muted: bool,
229 },
230 ClientLatencyChanged {
232 client_id: String,
234 latency: i32,
236 },
237 ClientNameChanged {
239 client_id: String,
241 name: String,
243 },
244 GroupStreamChanged {
246 group_id: String,
248 stream_id: String,
250 },
251 GroupMuteChanged {
253 group_id: String,
255 muted: bool,
257 },
258 StreamStatus {
260 stream_id: String,
262 status: String,
264 },
265 StreamMetaChanged {
267 stream_id: String,
269 metadata: std::collections::HashMap<String, serde_json::Value>,
271 },
272 GroupNameChanged {
274 group_id: String,
276 name: String,
278 },
279 ServerUpdated,
285 StreamControl {
289 stream_id: String,
291 command: String,
293 params: serde_json::Value,
295 },
296 #[cfg(feature = "custom-protocol")]
298 CustomMessage {
299 client_id: String,
301 message: snapcast_proto::CustomMessage,
303 },
304}
305
306#[derive(Debug)]
308#[non_exhaustive]
309pub enum ServerCommand {
310 SetClientVolume {
312 client_id: String,
314 volume: u16,
316 muted: bool,
318 },
319 SetClientLatency {
321 client_id: String,
323 latency: i32,
325 },
326 SetClientName {
328 client_id: String,
330 name: String,
332 },
333 SetGroupStream {
335 group_id: String,
337 stream_id: String,
339 },
340 SetGroupMute {
342 group_id: String,
344 muted: bool,
346 },
347 SetGroupName {
349 group_id: String,
351 name: String,
353 },
354 SetGroupClients {
356 group_id: String,
358 clients: Vec<String>,
360 },
361 DeleteClient {
363 client_id: String,
365 },
366 SetStreamMeta {
368 stream_id: String,
370 metadata: std::collections::HashMap<String, serde_json::Value>,
372 },
373 AddStream {
379 uri: String,
381 response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
383 },
384 RemoveStream {
386 stream_id: String,
388 },
389 StreamControl {
391 stream_id: String,
393 command: String,
395 params: serde_json::Value,
397 },
398 GetStatus {
400 response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
402 },
403 #[cfg(feature = "custom-protocol")]
405 SendToClient {
406 client_id: String,
408 message: snapcast_proto::CustomMessage,
410 },
411 Stop,
413}
414
415fn default_codec() -> &'static str {
417 #[cfg(feature = "flac")]
418 return snapcast_proto::CODEC_FLAC;
419 #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
420 return snapcast_proto::CODEC_F32LZ4;
421 #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
422 return snapcast_proto::CODEC_PCM;
423}
424
425pub struct ServerConfig {
427 pub stream_bind_address: String,
429 pub stream_port: u16,
431 pub buffer_ms: u32,
433 pub codec: String,
435 pub sample_format: String,
437
438 pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
440 pub client_filter: Option<std::sync::Arc<dyn auth::ClientFilter>>,
443 #[cfg(feature = "encryption")]
445 pub encryption_psk: Option<String>,
446 pub state_file: Option<std::path::PathBuf>,
448 pub send_audio_to_muted: bool,
450}
451
452impl Default for ServerConfig {
453 fn default() -> Self {
454 Self {
455 stream_bind_address: snapcast_proto::DEFAULT_BIND_ADDRESS.into(),
456 stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
457 buffer_ms: snapcast_proto::DEFAULT_BUFFER_MS,
458 codec: default_codec().into(),
459 sample_format: snapcast_proto::DEFAULT_SAMPLE_FORMAT_STRING.into(),
460
461 auth: None,
462 client_filter: None,
463 #[cfg(feature = "encryption")]
464 encryption_psk: None,
465 state_file: None,
466 send_audio_to_muted: false,
467 }
468 }
469}
470
471#[derive(Debug, Clone, Default)]
473pub struct StreamConfig {
474 pub codec: Option<String>,
476 pub sample_format: Option<String>,
478}
479
480pub struct SnapServer {
482 config: ServerConfig,
483 event_tx: mpsc::Sender<ServerEvent>,
484 command_tx: mpsc::Sender<ServerCommand>,
485 command_rx: Option<mpsc::Receiver<ServerCommand>>,
486 streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
488 chunk_tx: broadcast::Sender<WireChunkData>,
490}
491
492fn spawn_stream_encoder(
497 stream_id: String,
498 mut rx: mpsc::Receiver<AudioFrame>,
499 mut enc: Box<dyn encoder::Encoder>,
500 chunk_tx: broadcast::Sender<WireChunkData>,
501 sample_rate: u32,
502 channels: u16,
503) {
504 std::thread::spawn(move || {
505 let rt = tokio::runtime::Builder::new_current_thread()
506 .enable_time()
507 .build()
508 .expect("encoder runtime");
509
510 rt.block_on(async {
511 let mut next_tick: Option<tokio::time::Instant> = None;
512 while let Some(frame) = rx.recv().await {
513 if let AudioData::F32(ref samples) = frame.data {
515 let num_frames = samples.len() / channels.max(1) as usize;
516 let chunk_dur = std::time::Duration::from_micros(
517 (num_frames as u64 * 1_000_000) / sample_rate as u64,
518 );
519 let now = tokio::time::Instant::now();
520 let tick = next_tick.get_or_insert(now);
521 if now.checked_duration_since(*tick + chunk_dur)
523 > Some(std::time::Duration::from_millis(500))
524 {
525 *tick = now;
526 }
527 *tick += chunk_dur;
528 tokio::time::sleep_until(*tick).await;
529 }
530 match enc.encode(&frame.data) {
531 Ok(encoded) if !encoded.data.is_empty() => {
532 let _ = chunk_tx.send(WireChunkData {
533 stream_id: stream_id.clone(),
534 timestamp_usec: frame.timestamp_usec,
535 data: encoded.data,
536 });
537 }
538 Err(e) => {
539 tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
540 }
541 _ => {} }
543 }
544 });
545 });
546}
547
548impl SnapServer {
550 pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
552 let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
553 let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
554 let (chunk_tx, _) = broadcast::channel(256);
555 let server = Self {
556 config,
557 event_tx,
558 command_tx,
559 command_rx: Some(command_rx),
560 streams: Vec::new(),
561 chunk_tx,
562 };
563 (server, event_rx)
564 }
565
566 pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
570 self.add_stream_with_config(name, StreamConfig::default())
571 }
572
573 pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
581 let sf: SampleFormat =
582 self.config.sample_format.parse().map_err(|e| {
583 format!("invalid sample_format '{}': {e}", self.config.sample_format)
584 })?;
585 let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
586 self.streams
587 .push((name.to_string(), StreamConfig::default(), rx));
588 Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
589 }
590
591 pub fn add_stream_with_config(
593 &mut self,
594 name: &str,
595 config: StreamConfig,
596 ) -> mpsc::Sender<AudioFrame> {
597 let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
598 self.streams.push((name.to_string(), config, rx));
599 tx
600 }
601
602 pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
604 self.command_tx.clone()
605 }
606
607 pub fn config(&self) -> &ServerConfig {
609 &self.config
610 }
611
612 pub async fn run(&mut self) -> anyhow::Result<()> {
614 let mut command_rx = self
615 .command_rx
616 .take()
617 .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
618
619 let event_tx = self.event_tx.clone();
620
621 let sample_format: snapcast_proto::SampleFormat = self
622 .config
623 .sample_format
624 .parse()
625 .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
626
627 anyhow::ensure!(
628 !self.streams.is_empty(),
629 "No streams configured — call add_stream() before run()"
630 );
631
632 tracing::info!(
633 bind_address = %self.config.stream_bind_address,
634 stream_port = self.config.stream_port,
635 "Snapserver starting"
636 );
637
638 let default_enc_config = encoder::EncoderConfig {
640 codec: self.config.codec.clone(),
641 format: sample_format,
642 options: String::new(),
643 #[cfg(feature = "encryption")]
644 encryption_psk: self.config.encryption_psk.clone(),
645 };
646 let default_enc = encoder::create(&default_enc_config)?;
647
648 let chunk_tx = self.chunk_tx.clone();
650 let streams = std::mem::take(&mut self.streams);
651 let mut default_enc = Some(default_enc);
652
653 let initial_state = self
655 .config
656 .state_file
657 .as_ref()
658 .map(|p| state::ServerState::load(p))
659 .unwrap_or_default();
660 let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
661
662 let first_name = streams
665 .first()
666 .map(|(n, _, _)| n.clone())
667 .unwrap_or_default();
668 let session_srv = Arc::new(session::SessionServer::new(session::SessionServerConfig {
669 bind_address: self.config.stream_bind_address.clone(),
670 port: self.config.stream_port,
671 buffer_ms: self.config.buffer_ms as i32,
672 auth: self.config.auth.clone(),
673 client_filter: self.config.client_filter.clone(),
674 shared_state: Arc::clone(&shared_state),
675 default_stream: first_name.clone(),
676 send_audio_to_muted: self.config.send_audio_to_muted,
677 }));
678
679 for (name, stream_cfg, rx) in streams {
680 {
681 let mut s = shared_state.lock().await;
682 if !s.streams.iter().any(|existing| existing.id == name) {
683 s.streams.push(state::StreamInfo {
684 id: name.clone(),
685 status: "idle".into(),
686 uri: String::new(),
687 properties: Default::default(),
688 });
689 }
690 }
691 let mut active_format = sample_format;
692 let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
693 if let Some(enc) = default_enc.take() {
694 enc
695 } else {
696 encoder::create(&default_enc_config)?
697 }
698 } else {
699 let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
700 let stream_format: snapcast_proto::SampleFormat = stream_cfg
701 .sample_format
702 .as_deref()
703 .and_then(|s| s.parse().ok())
704 .unwrap_or(sample_format);
705 active_format = stream_format;
706 encoder::create(&encoder::EncoderConfig {
707 codec: stream_codec.to_string(),
708 format: stream_format,
709 options: String::new(),
710 #[cfg(feature = "encryption")]
711 encryption_psk: self.config.encryption_psk.clone(),
712 })?
713 };
714 tracing::info!(stream = %name, codec = enc.name(), format = %active_format, "Stream registered");
715 session_srv
716 .register_stream_codec(&name, enc.name(), enc.header())
717 .await;
718 spawn_stream_encoder(
719 name,
720 rx,
721 enc,
722 chunk_tx.clone(),
723 active_format.rate(),
724 active_format.channels(),
725 );
726 }
727
728 let session_for_run = Arc::clone(&session_srv);
729 let session_event_tx = event_tx.clone();
730 let session_chunk_tx = self.chunk_tx.clone();
731 let session_handle = tokio::spawn(async move {
732 if let Err(e) = session_for_run
733 .run(session_chunk_tx, session_event_tx)
734 .await
735 {
736 tracing::error!(error = %e, "Session server error");
737 }
738 });
739
740 let state_file = self.config.state_file.clone();
741 let save_state = |s: &state::ServerState| {
742 if let Some(ref path) = state_file {
743 let _ = s
744 .save(path)
745 .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
746 }
747 };
748
749 loop {
751 tokio::select! {
752 cmd = command_rx.recv() => {
753 match cmd {
754 Some(ServerCommand::Stop) | None => {
755 tracing::info!("Server stopped");
756 session_handle.abort();
757 return Ok(());
758 }
759 Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
760 let mut s = shared_state.lock().await;
761 if let Some(c) = s.clients.get_mut(&client_id) {
762 c.config.volume.percent = volume;
763 c.config.volume.muted = muted;
764 }
765 let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
766 save_state(&s);
767 drop(s);
768 session_srv.push_settings(ClientSettingsUpdate {
769 client_id: client_id.clone(),
770 buffer_ms: self.config.buffer_ms as i32,
771 latency, volume, muted,
772 }).await;
773 let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
774 session_srv.update_routing_for_client(&client_id).await;
775 }
776 Some(ServerCommand::SetClientLatency { client_id, latency }) => {
777 let mut settings_update = None;
778 let mut s = shared_state.lock().await;
779 if let Some(c) = s.clients.get_mut(&client_id) {
780 c.config.latency = latency;
781 settings_update = Some(ClientSettingsUpdate {
782 client_id: client_id.clone(),
783 buffer_ms: self.config.buffer_ms as i32,
784 latency,
785 volume: c.config.volume.percent,
786 muted: c.config.volume.muted,
787 });
788 }
789 save_state(&s);
790 drop(s);
791 if let Some(update) = settings_update {
792 session_srv.push_settings(update).await;
793 }
794 let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
795 }
796 Some(ServerCommand::SetClientName { client_id, name }) => {
797 let mut s = shared_state.lock().await;
798 if let Some(c) = s.clients.get_mut(&client_id) {
799 c.config.name = name.clone();
800 }
801 save_state(&s);
802 drop(s);
803 let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
804 }
805 Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
806 let mut s = shared_state.lock().await;
807 s.set_group_stream(&group_id, &stream_id);
808 save_state(&s);
809 drop(s);
810 let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
811 session_srv.update_routing_for_group(&group_id).await;
812 }
813 Some(ServerCommand::SetGroupMute { group_id, muted }) => {
814 let mut s = shared_state.lock().await;
815 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
816 g.muted = muted;
817 }
818 save_state(&s);
819 drop(s);
820 let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
821 session_srv.update_routing_for_group(&group_id).await;
822 }
823 Some(ServerCommand::SetGroupName { group_id, name }) => {
824 let mut s = shared_state.lock().await;
825 if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
826 g.name = name.clone();
827 }
828 save_state(&s);
829 drop(s);
830 let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
831 }
832 Some(ServerCommand::SetGroupClients { group_id, clients }) => {
833 let mut s = shared_state.lock().await;
834 s.set_group_clients(&group_id, &clients);
835 save_state(&s);
836 drop(s);
837 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
839 session_srv.update_routing_all().await;
840 }
841 Some(ServerCommand::DeleteClient { client_id }) => {
842 let mut s = shared_state.lock().await;
843 s.remove_client_from_groups(&client_id);
844 s.clients.remove(&client_id);
845 save_state(&s);
846 drop(s);
847 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
848 session_srv.update_routing_all().await;
849 }
850 Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
851 let mut s = shared_state.lock().await;
852 if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
853 stream.properties = metadata.clone();
854 }
855 drop(s);
856 let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
857 }
858 Some(ServerCommand::AddStream { uri, response_tx }) => {
859 tracing::warn!(uri, "Dynamic stream addition requires application-owned stream orchestration");
860 let _ = response_tx.send(Err(
861 "dynamic Stream.AddStream is not supported by the embeddable server after run(); create streams before run()".into(),
862 ));
863 }
864 Some(ServerCommand::RemoveStream { stream_id }) => {
865 let mut s = shared_state.lock().await;
866 s.streams.retain(|st| st.id != stream_id);
867 for g in &mut s.groups {
869 if g.stream_id == stream_id {
870 g.stream_id.clear();
871 }
872 }
873 save_state(&s);
874 drop(s);
875 let _ = event_tx.try_send(ServerEvent::ServerUpdated);
876 session_srv.update_routing_all().await;
877 }
878 Some(ServerCommand::StreamControl { stream_id, command, params }) => {
879 tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
880 let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
882 }
883 Some(ServerCommand::GetStatus { response_tx }) => {
884 let s = shared_state.lock().await;
885 let _ = response_tx.send(s.to_status());
886 }
887 #[cfg(feature = "custom-protocol")]
888 Some(ServerCommand::SendToClient { client_id, message }) => {
889 session_srv.send_custom(&client_id, message.type_id, message.payload).await;
890 }
891 }
892 }
893 }
894 }
895 }
896}