Skip to main content

snapcast_server/
lib.rs

1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7//! Snapcast server library — embeddable synchronized multiroom audio server.
8//!
9//! See also: [`snapcast-client`](https://docs.rs/snapcast-client) for the client library.
10//! # Architecture
11//!
12//! The server is built around a channel-based API matching `snapcast-client`:
13//!
14//! - [`SnapServer`] is the main entry point
15//! - [`ServerEvent`] flows from server → consumer (client connected, stream status, custom messages)
16//! - [`ServerCommand`] flows from consumer → server (typed mutations, custom messages, stop)
17//!
18//! # Example
19//!
20//! ```no_run
21//! use snapcast_server::{SnapServer, ServerConfig, ServerEvent, ServerCommand};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let config = ServerConfig::default();
25//! let (mut server, mut events) = SnapServer::new(config);
26//! let _audio_tx = server.add_stream("default");
27//! let cmd = server.command_sender();
28//!
29//! tokio::spawn(async move {
30//!     while let Some(event) = events.recv().await {
31//!         match event {
32//!             ServerEvent::ClientConnected { id, name, .. } => {
33//!                 tracing::info!(id, name, "Client connected");
34//!             }
35//!             _ => {}
36//!         }
37//!     }
38//! });
39//!
40//! server.run().await?;
41//! # Ok(())
42//! # }
43//! ```
44
45use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49// Re-export proto types that embedders need
50#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
56
57const EVENT_CHANNEL_SIZE: usize = 256;
58const COMMAND_CHANNEL_SIZE: usize = 64;
59const AUDIO_CHANNEL_SIZE: usize = 256;
60
61/// Audio data pushed by the consumer — either f32 or raw PCM.
62#[derive(Debug, Clone)]
63pub enum AudioData {
64    /// Interleaved f32 samples (from DSP, EQ, AirPlay receivers).
65    /// Range: -1.0 to 1.0.
66    F32(Vec<f32>),
67    /// Raw interleaved PCM bytes at the stream's configured sample format
68    /// (from pipe/file/process readers). Byte order: little-endian.
69    Pcm(Vec<u8>),
70}
71
72/// A timestamped audio frame for server input.
73#[derive(Debug, Clone)]
74pub struct AudioFrame {
75    /// Audio samples.
76    pub data: AudioData,
77    /// Timestamp in microseconds (server time).
78    pub timestamp_usec: i64,
79}
80
81/// An encoded audio chunk ready to be sent to clients.
82#[derive(Debug, Clone)]
83pub struct WireChunkData {
84    /// Stream ID this chunk belongs to.
85    pub stream_id: String,
86    /// Server timestamp in microseconds.
87    pub timestamp_usec: i64,
88    /// Encoded audio data.
89    pub data: Vec<u8>,
90}
91
92pub mod auth;
93#[cfg(feature = "encryption")]
94pub mod crypto;
95pub(crate) mod encoder;
96#[cfg(feature = "mdns")]
97pub mod mdns;
98pub mod session;
99pub mod state;
100pub(crate) mod stream;
101pub mod time;
102
103/// Settings update pushed to a streaming client via binary protocol.
104#[derive(Debug, Clone)]
105pub struct ClientSettingsUpdate {
106    /// Target client ID.
107    pub client_id: String,
108    /// Buffer size in ms.
109    pub buffer_ms: i32,
110    /// Latency offset in ms.
111    pub latency: i32,
112    /// Volume (0–100).
113    pub volume: u16,
114    /// Mute state.
115    pub muted: bool,
116}
117
118/// Events emitted by the server to the consumer.
119#[derive(Debug)]
120pub enum ServerEvent {
121    /// A client connected via the binary protocol.
122    ClientConnected {
123        /// Unique client identifier.
124        id: String,
125        /// Client hostname.
126        name: String,
127        /// Client MAC address.
128        mac: String,
129    },
130    /// A client disconnected.
131    ClientDisconnected {
132        /// Unique client identifier.
133        id: String,
134    },
135    /// A client's volume changed.
136    ClientVolumeChanged {
137        /// Client ID.
138        client_id: String,
139        /// New volume (0–100).
140        volume: u16,
141        /// Mute state.
142        muted: bool,
143    },
144    /// A client's latency changed.
145    ClientLatencyChanged {
146        /// Client ID.
147        client_id: String,
148        /// New latency in ms.
149        latency: i32,
150    },
151    /// A client's name changed.
152    ClientNameChanged {
153        /// Client ID.
154        client_id: String,
155        /// New name.
156        name: String,
157    },
158    /// A group's stream assignment changed.
159    GroupStreamChanged {
160        /// Group ID.
161        group_id: String,
162        /// New stream ID.
163        stream_id: String,
164    },
165    /// A group's mute state changed.
166    GroupMuteChanged {
167        /// Group ID.
168        group_id: String,
169        /// Mute state.
170        muted: bool,
171    },
172    /// A stream's status changed (playing, idle, unknown).
173    StreamStatus {
174        /// Stream identifier.
175        stream_id: String,
176        /// New status.
177        status: String,
178    },
179    /// A group's name changed.
180    GroupNameChanged {
181        /// Group ID.
182        group_id: String,
183        /// New name.
184        name: String,
185    },
186    /// Server state changed — groups were reorganized (created, deleted, merged).
187    ///
188    /// Emitted after structural changes like `SetGroupClients` or `DeleteClient`
189    /// when the group topology changes. Mirrors `Server.OnUpdate` in the C++ snapserver.
190    /// The consumer should re-read server status via `GetStatus`.
191    ServerUpdated,
192    /// Custom binary protocol message from a streaming client.
193    #[cfg(feature = "custom-protocol")]
194    CustomMessage {
195        /// Client ID.
196        client_id: String,
197        /// The custom message.
198        message: snapcast_proto::CustomMessage,
199    },
200}
201
202/// Commands the consumer sends to the server.
203#[derive(Debug)]
204pub enum ServerCommand {
205    /// Set a client's volume.
206    SetClientVolume {
207        /// Client ID.
208        client_id: String,
209        /// Volume (0–100).
210        volume: u16,
211        /// Mute state.
212        muted: bool,
213    },
214    /// Set a client's latency offset.
215    SetClientLatency {
216        /// Client ID.
217        client_id: String,
218        /// Latency in milliseconds.
219        latency: i32,
220    },
221    /// Set a client's display name.
222    SetClientName {
223        /// Client ID.
224        client_id: String,
225        /// New name.
226        name: String,
227    },
228    /// Assign a stream to a group.
229    SetGroupStream {
230        /// Group ID.
231        group_id: String,
232        /// Stream ID.
233        stream_id: String,
234    },
235    /// Mute/unmute a group.
236    SetGroupMute {
237        /// Group ID.
238        group_id: String,
239        /// Mute state.
240        muted: bool,
241    },
242    /// Set a group's display name.
243    SetGroupName {
244        /// Group ID.
245        group_id: String,
246        /// New name.
247        name: String,
248    },
249    /// Move clients to a group.
250    SetGroupClients {
251        /// Group ID.
252        group_id: String,
253        /// Client IDs.
254        clients: Vec<String>,
255    },
256    /// Delete a client from the server.
257    DeleteClient {
258        /// Client ID.
259        client_id: String,
260    },
261    /// Get full server status.
262    GetStatus {
263        /// Response channel.
264        response_tx: tokio::sync::oneshot::Sender<serde_json::Value>,
265    },
266    /// Send a custom binary protocol message to a streaming client.
267    #[cfg(feature = "custom-protocol")]
268    SendToClient {
269        /// Target client ID.
270        client_id: String,
271        /// The custom message.
272        message: snapcast_proto::CustomMessage,
273    },
274    /// Stop the server gracefully.
275    Stop,
276}
277
278/// Default codec based on compiled features.
279fn default_codec() -> &'static str {
280    #[cfg(feature = "flac")]
281    return "flac";
282    #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
283    return "f32lz4";
284    #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
285    return "pcm";
286}
287
288/// Server configuration for the embeddable library.
289pub struct ServerConfig {
290    /// TCP port for binary protocol (client connections). Default: 1704.
291    pub stream_port: u16,
292    /// Audio buffer size in milliseconds. Default: 1000.
293    pub buffer_ms: u32,
294    /// Default codec: "f32lz4", "pcm", "opus", "ogg". Default: "f32lz4".
295    pub codec: String,
296    /// Default sample format. Default: 48000:16:2.
297    pub sample_format: String,
298    /// mDNS service type. Default: "_snapcast._tcp.local.".
299    pub mdns_service_type: String,
300    /// Auth validator for streaming clients. `None` = no auth required.
301    pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
302    /// Pre-shared key for f32lz4 encryption. `None` = no encryption.
303    #[cfg(feature = "encryption")]
304    pub encryption_psk: Option<String>,
305}
306
307impl Default for ServerConfig {
308    fn default() -> Self {
309        Self {
310            stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
311            buffer_ms: 1000,
312            codec: default_codec().into(),
313            sample_format: "48000:16:2".into(),
314            mdns_service_type: "_snapcast._tcp.local.".into(),
315            auth: None,
316            #[cfg(feature = "encryption")]
317            encryption_psk: None,
318        }
319    }
320}
321
322/// Per-stream configuration. If `None`, inherits from [`ServerConfig`].
323#[derive(Debug, Clone, Default)]
324pub struct StreamConfig {
325    /// Codec override (e.g. "flac", "f32lz4", "opus", "ogg", "pcm").
326    pub codec: Option<String>,
327    /// Sample format override (e.g. "48000:16:2").
328    pub sample_format: Option<String>,
329}
330
331/// The embeddable Snapcast server.
332pub struct SnapServer {
333    config: ServerConfig,
334    event_tx: mpsc::Sender<ServerEvent>,
335    command_tx: mpsc::Sender<ServerCommand>,
336    command_rx: Option<mpsc::Receiver<ServerCommand>>,
337    /// Named audio streams — each gets its own encoder at run().
338    streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
339    /// Broadcast channel for encoded chunks → sessions.
340    chunk_tx: broadcast::Sender<WireChunkData>,
341}
342
343/// Spawn a per-stream encode loop on a dedicated thread.
344///
345/// Receives `AudioFrame`, passes `AudioData` directly to the encoder,
346/// and broadcasts encoded `WireChunkData` to sessions.
347fn spawn_stream_encoder(
348    stream_id: String,
349    mut rx: mpsc::Receiver<AudioFrame>,
350    mut enc: Box<dyn encoder::Encoder>,
351    chunk_tx: broadcast::Sender<WireChunkData>,
352) {
353    std::thread::spawn(move || {
354        let rt = tokio::runtime::Builder::new_current_thread()
355            .enable_all()
356            .build()
357            .expect("encoder runtime");
358
359        rt.block_on(async {
360            while let Some(frame) = rx.recv().await {
361                match enc.encode(&frame.data) {
362                    Ok(encoded) if !encoded.data.is_empty() => {
363                        let _ = chunk_tx.send(WireChunkData {
364                            stream_id: stream_id.clone(),
365                            timestamp_usec: frame.timestamp_usec,
366                            data: encoded.data,
367                        });
368                    }
369                    Err(e) => {
370                        tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
371                    }
372                    _ => {} // encoder buffering
373                }
374            }
375        });
376    });
377}
378
379/// Convert f32 samples to PCM bytes at the given bit depth.
380impl SnapServer {
381    /// Create a new server. Returns the server and event receiver.
382    pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
383        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
384        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
385        let (chunk_tx, _) = broadcast::channel(256);
386        let server = Self {
387            config,
388            event_tx,
389            command_tx,
390            command_rx: Some(command_rx),
391            streams: Vec::new(),
392            chunk_tx,
393        };
394        (server, event_rx)
395    }
396
397    /// Add a named audio stream. Returns a sender for pushing audio frames.
398    ///
399    /// Uses the server's default codec and sample format.
400    pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
401        self.add_stream_with_config(name, StreamConfig::default())
402    }
403
404    /// Add a named audio stream with per-stream codec/format overrides.
405    pub fn add_stream_with_config(
406        &mut self,
407        name: &str,
408        config: StreamConfig,
409    ) -> mpsc::Sender<AudioFrame> {
410        let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
411        self.streams.push((name.to_string(), config, rx));
412        tx
413    }
414
415    /// Get a cloneable command sender.
416    pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
417        self.command_tx.clone()
418    }
419
420    /// Access the server configuration.
421    pub fn config(&self) -> &ServerConfig {
422        &self.config
423    }
424
425    /// Run the server. Blocks until stopped or a fatal error occurs.
426    pub async fn run(&mut self) -> anyhow::Result<()> {
427        let mut command_rx = self
428            .command_rx
429            .take()
430            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
431
432        let event_tx = self.event_tx.clone();
433
434        let sample_format: snapcast_proto::SampleFormat = self
435            .config
436            .sample_format
437            .parse()
438            .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
439
440        anyhow::ensure!(
441            !self.streams.is_empty(),
442            "No streams configured — call add_stream() before run()"
443        );
444
445        tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
446
447        // Advertise via mDNS (protocol-level discovery)
448        #[cfg(feature = "mdns")]
449        let _mdns =
450            mdns::MdnsAdvertiser::new(self.config.stream_port, &self.config.mdns_service_type)
451                .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
452                .ok();
453
454        // Create default encoder — used for codec header and first default stream
455        let default_enc_config = encoder::EncoderConfig {
456            codec: self.config.codec.clone(),
457            format: sample_format,
458            options: String::new(),
459            #[cfg(feature = "encryption")]
460            encryption_psk: self.config.encryption_psk.clone(),
461        };
462        let default_enc = encoder::create(&default_enc_config)?;
463        let codec = default_enc.name().to_string();
464        let codec_header = default_enc.header().to_vec();
465
466        // Spawn per-stream encode loops — reuse default_enc for first default stream
467        let chunk_tx = self.chunk_tx.clone();
468        let streams = std::mem::take(&mut self.streams);
469        let mut default_enc = Some(default_enc);
470        for (name, stream_cfg, rx) in streams {
471            let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
472                if let Some(enc) = default_enc.take() {
473                    enc
474                } else {
475                    encoder::create(&default_enc_config)?
476                }
477            } else {
478                let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
479                let stream_format: snapcast_proto::SampleFormat = stream_cfg
480                    .sample_format
481                    .as_deref()
482                    .and_then(|s| s.parse().ok())
483                    .unwrap_or(sample_format);
484                encoder::create(&encoder::EncoderConfig {
485                    codec: stream_codec.to_string(),
486                    format: stream_format,
487                    options: String::new(),
488                    #[cfg(feature = "encryption")]
489                    encryption_psk: self.config.encryption_psk.clone(),
490                })?
491            };
492            tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
493            spawn_stream_encoder(name, rx, enc, chunk_tx.clone());
494        }
495
496        // Start session server
497        let session_srv = Arc::new(session::SessionServer::new(
498            self.config.stream_port,
499            self.config.buffer_ms as i32,
500            self.config.auth.clone(),
501        ));
502        let session_for_run = Arc::clone(&session_srv);
503        let session_event_tx = event_tx.clone();
504        let session_chunk_tx = self.chunk_tx.clone();
505        let session_handle = tokio::spawn(async move {
506            if let Err(e) = session_for_run
507                .run(session_chunk_tx, codec, codec_header, session_event_tx)
508                .await
509            {
510                tracing::error!(error = %e, "Session server error");
511            }
512        });
513
514        // Shared state for command handlers
515        let shared_state = Arc::new(tokio::sync::Mutex::new(state::ServerState::default()));
516
517        // Main loop
518        loop {
519            tokio::select! {
520                cmd = command_rx.recv() => {
521                    match cmd {
522                        Some(ServerCommand::Stop) | None => {
523                            tracing::info!("Server stopped");
524                            session_handle.abort();
525                            return Ok(());
526                        }
527                        Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
528                            let mut s = shared_state.lock().await;
529                            if let Some(c) = s.clients.get_mut(&client_id) {
530                                c.config.volume.percent = volume;
531                                c.config.volume.muted = muted;
532                            }
533                            session_srv.push_settings(ClientSettingsUpdate {
534                                client_id: client_id.clone(),
535                                buffer_ms: self.config.buffer_ms as i32,
536                                latency: 0, volume, muted,
537                            }).await;
538                            let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id, volume, muted });
539                        }
540                        Some(ServerCommand::SetClientLatency { client_id, latency }) => {
541                            let mut s = shared_state.lock().await;
542                            if let Some(c) = s.clients.get_mut(&client_id) {
543                                c.config.latency = latency;
544                                session_srv.push_settings(ClientSettingsUpdate {
545                                    client_id: client_id.clone(),
546                                    buffer_ms: self.config.buffer_ms as i32,
547                                    latency,
548                                    volume: c.config.volume.percent,
549                                    muted: c.config.volume.muted,
550                                }).await;
551                            }
552                            let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
553                        }
554                        Some(ServerCommand::SetClientName { client_id, name }) => {
555                            let mut s = shared_state.lock().await;
556                            if let Some(c) = s.clients.get_mut(&client_id) {
557                                c.config.name = name.clone();
558                            }
559                            let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
560                        }
561                        Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
562                            shared_state.lock().await.set_group_stream(&group_id, &stream_id);
563                            let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id, stream_id });
564                        }
565                        Some(ServerCommand::SetGroupMute { group_id, muted }) => {
566                            let mut s = shared_state.lock().await;
567                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
568                                g.muted = muted;
569                            }
570                            let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id, muted });
571                        }
572                        Some(ServerCommand::SetGroupName { group_id, name }) => {
573                            let mut s = shared_state.lock().await;
574                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
575                                g.name = name.clone();
576                            }
577                            let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
578                        }
579                        Some(ServerCommand::SetGroupClients { group_id, clients }) => {
580                            let mut s = shared_state.lock().await;
581                            for cid in &clients {
582                                s.remove_client_from_groups(cid);
583                            }
584                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
585                                g.clients = clients;
586                            }
587                            // Structural change — mirrors Server.OnUpdate in C++ snapserver
588                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
589                        }
590                        Some(ServerCommand::DeleteClient { client_id }) => {
591                            let mut s = shared_state.lock().await;
592                            s.remove_client_from_groups(&client_id);
593                            s.clients.remove(&client_id);
594                            drop(s);
595                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
596                        }
597                        Some(ServerCommand::GetStatus { response_tx }) => {
598                            let s = shared_state.lock().await;
599                            let _ = response_tx.send(s.to_status_json());
600                        }
601                        #[cfg(feature = "custom-protocol")]
602                        Some(ServerCommand::SendToClient { client_id, message }) => {
603                            session_srv.send_custom(&client_id, message.type_id, message.payload).await;
604                        }
605                    }
606                }
607            }
608        }
609    }
610}