Skip to main content

snapcast_server/
lib.rs

1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7//! Snapcast server library — embeddable synchronized multiroom audio server.
8//!
9//! See also: [`snapcast-client`](https://docs.rs/snapcast-client) for the client library.
10//! # Architecture
11//!
12//! The server is built around a channel-based API matching `snapcast-client`:
13//!
14//! - [`SnapServer`] is the main entry point
15//! - [`ServerEvent`] flows from server → consumer (client connected, stream status, custom messages)
16//! - [`ServerCommand`] flows from consumer → server (typed mutations, custom messages, stop)
17//!
18//! # Example
19//!
20//! ```no_run
21//! use snapcast_server::{SnapServer, ServerConfig, ServerEvent, ServerCommand};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let config = ServerConfig::default();
25//! let (mut server, mut events, _audio_tx) = SnapServer::new(config);
26//! let cmd = server.command_sender();
27//!
28//! tokio::spawn(async move {
29//!     while let Some(event) = events.recv().await {
30//!         match event {
31//!             ServerEvent::ClientConnected { id, name } => {
32//!                 tracing::info!(id, name, "Client connected");
33//!             }
34//!             _ => {}
35//!         }
36//!     }
37//! });
38//!
39//! server.run().await?;
40//! # Ok(())
41//! # }
42//! ```
43
44use std::sync::Arc;
45
46use tokio::sync::mpsc;
47
48// Re-export proto types that embedders need
49#[cfg(feature = "custom-protocol")]
50pub use snapcast_proto::CustomMessage;
51pub use snapcast_proto::SampleFormat;
52pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
53
54const EVENT_CHANNEL_SIZE: usize = 256;
55const COMMAND_CHANNEL_SIZE: usize = 64;
56const AUDIO_CHANNEL_SIZE: usize = 256;
57
58/// Interleaved f32 audio frame for server input.
59#[derive(Debug)]
60pub struct AudioFrame {
61    /// Interleaved f32 samples.
62    pub samples: Vec<f32>,
63    /// Sample rate in Hz.
64    pub sample_rate: u32,
65    /// Number of channels.
66    pub channels: u16,
67    /// Timestamp in microseconds.
68    pub timestamp_usec: i64,
69}
70
71pub mod auth;
72#[cfg(feature = "encryption")]
73pub mod crypto;
74pub mod encoder;
75#[cfg(feature = "mdns")]
76pub mod mdns;
77pub mod session;
78pub mod state;
79pub mod stream;
80pub mod time;
81
82/// Settings update pushed to a streaming client via binary protocol.
83#[derive(Debug, Clone)]
84pub struct ClientSettingsUpdate {
85    /// Target client ID.
86    pub client_id: String,
87    /// Buffer size in ms.
88    pub buffer_ms: i32,
89    /// Latency offset in ms.
90    pub latency: i32,
91    /// Volume (0–100).
92    pub volume: u16,
93    /// Mute state.
94    pub muted: bool,
95}
96
97/// Events emitted by the server to the consumer.
98#[derive(Debug)]
99pub enum ServerEvent {
100    /// A client connected via the binary protocol.
101    ClientConnected {
102        /// Unique client identifier.
103        id: String,
104        /// Client hostname.
105        name: String,
106    },
107    /// A client disconnected.
108    ClientDisconnected {
109        /// Unique client identifier.
110        id: String,
111    },
112    /// A client's volume changed.
113    ClientVolumeChanged {
114        /// Client ID.
115        client_id: String,
116        /// New volume (0–100).
117        volume: u16,
118        /// Mute state.
119        muted: bool,
120    },
121    /// A client's latency changed.
122    ClientLatencyChanged {
123        /// Client ID.
124        client_id: String,
125        /// New latency in ms.
126        latency: i32,
127    },
128    /// A client's name changed.
129    ClientNameChanged {
130        /// Client ID.
131        client_id: String,
132        /// New name.
133        name: String,
134    },
135    /// A group's stream assignment changed.
136    GroupStreamChanged {
137        /// Group ID.
138        group_id: String,
139        /// New stream ID.
140        stream_id: String,
141    },
142    /// A group's mute state changed.
143    GroupMuteChanged {
144        /// Group ID.
145        group_id: String,
146        /// Mute state.
147        muted: bool,
148    },
149    /// A stream's status changed (playing, idle, unknown).
150    StreamStatus {
151        /// Stream identifier.
152        stream_id: String,
153        /// New status.
154        status: String,
155    },
156    /// Custom binary protocol message from a streaming client.
157    #[cfg(feature = "custom-protocol")]
158    CustomMessage {
159        /// Client ID.
160        client_id: String,
161        /// The custom message.
162        message: snapcast_proto::CustomMessage,
163    },
164}
165
166/// Commands the consumer sends to the server.
167#[derive(Debug)]
168pub enum ServerCommand {
169    /// Set a client's volume.
170    SetClientVolume {
171        /// Client ID.
172        client_id: String,
173        /// Volume (0–100).
174        volume: u16,
175        /// Mute state.
176        muted: bool,
177    },
178    /// Set a client's latency offset.
179    SetClientLatency {
180        /// Client ID.
181        client_id: String,
182        /// Latency in milliseconds.
183        latency: i32,
184    },
185    /// Set a client's display name.
186    SetClientName {
187        /// Client ID.
188        client_id: String,
189        /// New name.
190        name: String,
191    },
192    /// Assign a stream to a group.
193    SetGroupStream {
194        /// Group ID.
195        group_id: String,
196        /// Stream ID.
197        stream_id: String,
198    },
199    /// Mute/unmute a group.
200    SetGroupMute {
201        /// Group ID.
202        group_id: String,
203        /// Mute state.
204        muted: bool,
205    },
206    /// Set a group's display name.
207    SetGroupName {
208        /// Group ID.
209        group_id: String,
210        /// New name.
211        name: String,
212    },
213    /// Move clients to a group.
214    SetGroupClients {
215        /// Group ID.
216        group_id: String,
217        /// Client IDs.
218        clients: Vec<String>,
219    },
220    /// Delete a client from the server.
221    DeleteClient {
222        /// Client ID.
223        client_id: String,
224    },
225    /// Get full server status.
226    GetStatus {
227        /// Response channel.
228        response_tx: tokio::sync::oneshot::Sender<serde_json::Value>,
229    },
230    /// Send a custom binary protocol message to a streaming client.
231    #[cfg(feature = "custom-protocol")]
232    SendToClient {
233        /// Target client ID.
234        client_id: String,
235        /// The custom message.
236        message: snapcast_proto::CustomMessage,
237    },
238    /// Stop the server gracefully.
239    Stop,
240}
241
242/// Default codec based on compiled features.
243fn default_codec() -> &'static str {
244    #[cfg(feature = "flac")]
245    return "flac";
246    #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
247    return "f32lz4";
248    #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
249    return "pcm";
250}
251
252/// Server configuration for the embeddable library.
253pub struct ServerConfig {
254    /// TCP port for binary protocol (client connections). Default: 1704.
255    pub stream_port: u16,
256    /// Audio buffer size in milliseconds. Default: 1000.
257    pub buffer_ms: u32,
258    /// Default codec: "f32lz4", "pcm", "opus", "ogg". Default: "f32lz4".
259    pub codec: String,
260    /// Default sample format. Default: 48000:16:2.
261    pub sample_format: String,
262    /// mDNS service type. Default: "_snapcast._tcp.local.".
263    pub mdns_service_type: String,
264    /// Auth validator for streaming clients. `None` = no auth required.
265    pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
266    /// Pre-shared key for f32lz4 encryption. `None` = no encryption.
267    #[cfg(feature = "encryption")]
268    pub encryption_psk: Option<String>,
269}
270
271impl Default for ServerConfig {
272    fn default() -> Self {
273        Self {
274            stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
275            buffer_ms: 1000,
276            codec: default_codec().into(),
277            sample_format: "48000:16:2".into(),
278            mdns_service_type: "_snapcast._tcp.local.".into(),
279            auth: None,
280            #[cfg(feature = "encryption")]
281            encryption_psk: None,
282        }
283    }
284}
285
286/// The embeddable Snapcast server.
287pub struct SnapServer {
288    config: ServerConfig,
289    event_tx: mpsc::Sender<ServerEvent>,
290    command_tx: mpsc::Sender<ServerCommand>,
291    command_rx: Option<mpsc::Receiver<ServerCommand>>,
292    audio_rx: Option<mpsc::Receiver<crate::AudioFrame>>,
293    manager: Option<stream::manager::StreamManager>,
294}
295
296impl SnapServer {
297    /// Create a new server. Returns the server, event receiver, and audio input sender.
298    ///
299    /// The `audio_tx` sender allows the embedding app to push PCM audio directly
300    /// into the server as an alternative to pipe/file/process stream readers.
301    pub fn new(
302        config: ServerConfig,
303    ) -> (
304        Self,
305        mpsc::Receiver<ServerEvent>,
306        mpsc::Sender<crate::AudioFrame>,
307    ) {
308        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
309        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
310        let (audio_tx, audio_rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
311        let server = Self {
312            config,
313            event_tx,
314            command_tx,
315            command_rx: Some(command_rx),
316            audio_rx: Some(audio_rx),
317            manager: None,
318        };
319        (server, event_rx, audio_tx)
320    }
321
322    /// Set the stream manager (configured by the binary with stream readers).
323    pub fn set_manager(&mut self, manager: stream::manager::StreamManager) {
324        self.manager = Some(manager);
325    }
326
327    /// Get a cloneable command sender.
328    pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
329        self.command_tx.clone()
330    }
331
332    /// Access the server configuration.
333    pub fn config(&self) -> &ServerConfig {
334        &self.config
335    }
336
337    /// Run the server. Blocks until stopped or a fatal error occurs.
338    pub async fn run(&mut self) -> anyhow::Result<()> {
339        let mut command_rx = self
340            .command_rx
341            .take()
342            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
343
344        let mut audio_rx = self
345            .audio_rx
346            .take()
347            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
348
349        let event_tx = self.event_tx.clone();
350
351        tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
352
353        // Advertise via mDNS (protocol-level discovery)
354        #[cfg(feature = "mdns")]
355        let _mdns =
356            mdns::MdnsAdvertiser::new(self.config.stream_port, &self.config.mdns_service_type)
357                .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
358                .ok();
359
360        let manager = self.manager.take().unwrap_or_default();
361
362        let default_format = snapcast_proto::DEFAULT_SAMPLE_FORMAT;
363        let first_stream = manager.stream_ids().into_iter().next().unwrap_or_default();
364        let (codec, header) = if let Some((c, h, _)) = manager.header(&first_stream) {
365            (c.to_string(), h.to_vec())
366        } else {
367            let enc_config = encoder::EncoderConfig {
368                codec: self.config.codec.clone(),
369                format: default_format,
370                options: String::new(),
371                #[cfg(feature = "encryption")]
372                encryption_psk: self.config.encryption_psk.clone(),
373            };
374            let enc = encoder::create(&enc_config)?;
375            (self.config.codec.clone(), enc.header().to_vec())
376        };
377
378        let chunk_sender = manager.chunk_sender();
379        let audio_chunk_sender = chunk_sender.clone();
380
381        // Start session server
382        let session_srv = Arc::new(session::SessionServer::new(
383            self.config.stream_port,
384            self.config.buffer_ms as i32,
385            self.config.auth.clone(),
386        ));
387        let session_for_run = Arc::clone(&session_srv);
388        let session_event_tx = event_tx.clone();
389        let session_handle = tokio::spawn(async move {
390            if let Err(e) = session_for_run
391                .run(chunk_sender, codec, header, session_event_tx)
392                .await
393            {
394                tracing::error!(error = %e, "Session server error");
395            }
396        });
397
398        // Shared state for command handlers
399        let shared_state = Arc::new(tokio::sync::Mutex::new(state::ServerState::default()));
400
401        // Main loop
402        loop {
403            tokio::select! {
404                cmd = command_rx.recv() => {
405                    match cmd {
406                        Some(ServerCommand::Stop) | None => {
407                            tracing::info!("Server stopped");
408                            session_handle.abort();
409                            return Ok(());
410                        }
411                        Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
412                            let mut s = shared_state.lock().await;
413                            if let Some(c) = s.clients.get_mut(&client_id) {
414                                c.config.volume.percent = volume;
415                                c.config.volume.muted = muted;
416                            }
417                            session_srv.push_settings(ClientSettingsUpdate {
418                                client_id: client_id.clone(),
419                                buffer_ms: self.config.buffer_ms as i32,
420                                latency: 0, volume, muted,
421                            }).await;
422                            let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id, volume, muted });
423                        }
424                        Some(ServerCommand::SetClientLatency { client_id, latency }) => {
425                            let mut s = shared_state.lock().await;
426                            if let Some(c) = s.clients.get_mut(&client_id) {
427                                c.config.latency = latency;
428                                session_srv.push_settings(ClientSettingsUpdate {
429                                    client_id: client_id.clone(),
430                                    buffer_ms: self.config.buffer_ms as i32,
431                                    latency,
432                                    volume: c.config.volume.percent,
433                                    muted: c.config.volume.muted,
434                                }).await;
435                            }
436                            let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
437                        }
438                        Some(ServerCommand::SetClientName { client_id, name }) => {
439                            let mut s = shared_state.lock().await;
440                            if let Some(c) = s.clients.get_mut(&client_id) {
441                                c.config.name = name.clone();
442                            }
443                            let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
444                        }
445                        Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
446                            shared_state.lock().await.set_group_stream(&group_id, &stream_id);
447                            let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id, stream_id });
448                        }
449                        Some(ServerCommand::SetGroupMute { group_id, muted }) => {
450                            let mut s = shared_state.lock().await;
451                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
452                                g.muted = muted;
453                            }
454                            let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id, muted });
455                        }
456                        Some(ServerCommand::SetGroupName { group_id, name }) => {
457                            let mut s = shared_state.lock().await;
458                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
459                                g.name = name;
460                            }
461                        }
462                        Some(ServerCommand::SetGroupClients { group_id, clients }) => {
463                            let mut s = shared_state.lock().await;
464                            for cid in &clients {
465                                s.remove_client_from_groups(cid);
466                            }
467                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
468                                g.clients = clients;
469                            }
470                        }
471                        Some(ServerCommand::DeleteClient { client_id }) => {
472                            let mut s = shared_state.lock().await;
473                            s.remove_client_from_groups(&client_id);
474                            s.clients.remove(&client_id);
475                        }
476                        Some(ServerCommand::GetStatus { response_tx }) => {
477                            let s = shared_state.lock().await;
478                            let _ = response_tx.send(s.to_status_json());
479                        }
480                        #[cfg(feature = "custom-protocol")]
481                        Some(ServerCommand::SendToClient { client_id, message }) => {
482                            session_srv.send_custom(&client_id, message.type_id, message.payload).await;
483                        }
484                    }
485                }
486                frame = audio_rx.recv() => {
487                    if let Some(frame) = frame {
488                        let data = if self.config.codec == "f32lz4" {
489                            frame.samples.iter().flat_map(|s| s.to_le_bytes()).collect()
490                        } else {
491                            let mut pcm = Vec::with_capacity(frame.samples.len() * 2);
492                            for &s in &frame.samples {
493                                let i = (s.clamp(-1.0, 1.0) * i16::MAX as f32) as i16;
494                                pcm.extend_from_slice(&i.to_le_bytes());
495                            }
496                            pcm
497                        };
498                        let wire = stream::manager::WireChunkData {
499                            stream_id: "external".into(),
500                            timestamp_usec: frame.timestamp_usec,
501                            data,
502                        };
503                        let _ = audio_chunk_sender.send(wire);
504                    }
505                }
506            }
507        }
508    }
509}