Skip to main content

snapcast_server/
lib.rs

1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7//! Snapcast server library — embeddable synchronized multiroom audio server.
8//!
9//! See also: [`snapcast-client`](https://docs.rs/snapcast-client) for the client library.
10//! # Architecture
11//!
12//! The server is built around a channel-based API matching `snapcast-client`:
13//!
14//! - [`SnapServer`] is the main entry point
15//! - [`ServerEvent`] flows from server → consumer (client connected, stream status, custom messages)
16//! - [`ServerCommand`] flows from consumer → server (typed mutations, custom messages, stop)
17//!
18//! # Example
19//!
20//! ```no_run
21//! use snapcast_server::{SnapServer, ServerConfig, ServerEvent, ServerCommand};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let config = ServerConfig::default();
25//! let (mut server, mut events) = SnapServer::new(config);
26//! let _audio_tx = server.add_stream("default");
27//! let cmd = server.command_sender();
28//!
29//! tokio::spawn(async move {
30//!     while let Some(event) = events.recv().await {
31//!         match event {
32//!             ServerEvent::ClientConnected { id, ref hello } => {
33//!                 tracing::info!(id, name = hello.host_name, "Client connected");
34//!             }
35//!             _ => {}
36//!         }
37//!     }
38//! });
39//!
40//! server.run().await?;
41//! # Ok(())
42//! # }
43//! ```
44
45use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49// Re-export proto types that embedders need
50#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62/// Channel size for F32 embedded sources — backpressure from encoder pacing.
63const F32_CHANNEL_SIZE: usize = 1;
64
65/// Audio data pushed by the consumer — either f32 or raw PCM.
66#[derive(Debug, Clone)]
67pub enum AudioData {
68    /// Interleaved f32 samples (from DSP, EQ, AirPlay receivers).
69    /// Range: -1.0 to 1.0.
70    F32(Vec<f32>),
71    /// Raw interleaved PCM bytes at the stream's configured sample format
72    /// (from pipe/file/process readers). Byte order: little-endian.
73    Pcm(Vec<u8>),
74}
75
76/// A timestamped audio frame for server input.
77#[derive(Debug, Clone)]
78pub struct AudioFrame {
79    /// Audio samples.
80    pub data: AudioData,
81    /// Timestamp in microseconds (server time).
82    pub timestamp_usec: i64,
83}
84
85/// Buffered sender for F32 audio that handles chunking, timestamping, and gap detection.
86///
87/// Accumulates variable-size F32 sample buffers and emits fixed-size 20ms chunks
88/// with monotonic timestamps. Automatically resets on playback gaps (>500ms).
89///
90/// Created by [`SnapServer::add_f32_stream`].
91pub struct F32AudioSender {
92    tx: mpsc::Sender<AudioFrame>,
93    buf: Vec<f32>,
94    chunk_samples: usize,
95    channels: u16,
96    sample_rate: u32,
97    ts: Option<time::ChunkTimestamper>,
98    last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102    fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103        let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104        Self {
105            tx,
106            buf: Vec::with_capacity(chunk_samples * 2),
107            chunk_samples,
108            channels,
109            sample_rate,
110            ts: None,
111            last_send: std::time::Instant::now(),
112        }
113    }
114
115    /// Push interleaved F32 samples. Variable-size input is accumulated and
116    /// emitted as fixed 20ms chunks. Returns when all complete chunks are sent.
117    pub async fn send(
118        &mut self,
119        samples: &[f32],
120    ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121        let now = std::time::Instant::now();
122        if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123            self.ts = None;
124            self.buf.clear();
125        }
126        self.last_send = now;
127
128        self.buf.extend_from_slice(samples);
129        let ch = self.channels.max(1) as usize;
130        while self.buf.len() >= self.chunk_samples {
131            let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132            let frames = (self.chunk_samples / ch) as u32;
133            let ts = self
134                .ts
135                .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136            let timestamp_usec = ts.next(frames);
137            self.tx
138                .send(AudioFrame {
139                    data: AudioData::F32(chunk),
140                    timestamp_usec,
141                })
142                .await?;
143        }
144        Ok(())
145    }
146
147    /// Flush remaining samples (< 20ms) as a final short chunk.
148    /// Call at end-of-track to avoid losing the last few milliseconds.
149    pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150        if self.buf.is_empty() {
151            return Ok(());
152        }
153        let chunk: Vec<f32> = self.buf.drain(..).collect();
154        let ch = self.channels.max(1) as usize;
155        let frames = (chunk.len() / ch) as u32;
156        let ts = self
157            .ts
158            .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159        let timestamp_usec = ts.next(frames);
160        self.tx
161            .send(AudioFrame {
162                data: AudioData::F32(chunk),
163                timestamp_usec,
164            })
165            .await
166    }
167}
168
169/// An encoded audio chunk ready to be sent to clients.
170#[derive(Debug, Clone)]
171pub struct WireChunkData {
172    /// Stream ID this chunk belongs to.
173    pub stream_id: String,
174    /// Server timestamp in microseconds.
175    pub timestamp_usec: i64,
176    /// Encoded audio data.
177    pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184#[cfg(feature = "mdns")]
185pub(crate) mod mdns;
186pub(crate) mod session;
187pub(crate) mod state;
188pub mod status;
189pub(crate) mod stream;
190pub mod time;
191
192/// Settings update pushed to a streaming client via binary protocol.
193#[derive(Debug, Clone)]
194pub struct ClientSettingsUpdate {
195    /// Target client ID.
196    pub client_id: String,
197    /// Buffer size in ms.
198    pub buffer_ms: i32,
199    /// Latency offset in ms.
200    pub latency: i32,
201    /// Volume (0–100).
202    pub volume: u16,
203    /// Mute state.
204    pub muted: bool,
205}
206
207/// Events emitted by the server to the consumer.
208#[derive(Debug)]
209#[non_exhaustive]
210pub enum ServerEvent {
211    /// A client connected via the binary protocol.
212    ClientConnected {
213        /// Unique client identifier.
214        id: String,
215        /// The client's Hello message with all connection metadata.
216        hello: snapcast_proto::message::hello::Hello,
217    },
218    /// A client disconnected.
219    ClientDisconnected {
220        /// Unique client identifier.
221        id: String,
222    },
223    /// A client's volume changed.
224    ClientVolumeChanged {
225        /// Client ID.
226        client_id: String,
227        /// New volume (0–100).
228        volume: u16,
229        /// Mute state.
230        muted: bool,
231    },
232    /// A client's latency changed.
233    ClientLatencyChanged {
234        /// Client ID.
235        client_id: String,
236        /// New latency in ms.
237        latency: i32,
238    },
239    /// A client's name changed.
240    ClientNameChanged {
241        /// Client ID.
242        client_id: String,
243        /// New name.
244        name: String,
245    },
246    /// A group's stream assignment changed.
247    GroupStreamChanged {
248        /// Group ID.
249        group_id: String,
250        /// New stream ID.
251        stream_id: String,
252    },
253    /// A group's mute state changed.
254    GroupMuteChanged {
255        /// Group ID.
256        group_id: String,
257        /// Mute state.
258        muted: bool,
259    },
260    /// A stream's status changed (playing, idle, unknown).
261    StreamStatus {
262        /// Stream identifier.
263        stream_id: String,
264        /// New status.
265        status: String,
266    },
267    /// Stream metadata/properties changed.
268    StreamMetaChanged {
269        /// Stream identifier.
270        stream_id: String,
271        /// Updated properties.
272        metadata: std::collections::HashMap<String, serde_json::Value>,
273    },
274    /// A group's name changed.
275    GroupNameChanged {
276        /// Group ID.
277        group_id: String,
278        /// New name.
279        name: String,
280    },
281    /// Server state changed — groups were reorganized (created, deleted, merged).
282    ///
283    /// Emitted after structural changes like `SetGroupClients` or `DeleteClient`
284    /// when the group topology changes. Mirrors `Server.OnUpdate` in the C++ snapserver.
285    /// The consumer should re-read server status via `GetStatus`.
286    ServerUpdated,
287    /// A stream control command was received (play, pause, next, seek, etc.).
288    ///
289    /// The library forwards this to the embedder since it doesn't own stream readers.
290    StreamControl {
291        /// Stream ID.
292        stream_id: String,
293        /// Command name.
294        command: String,
295        /// Optional parameters.
296        params: serde_json::Value,
297    },
298    /// Custom binary protocol message from a streaming client.
299    #[cfg(feature = "custom-protocol")]
300    CustomMessage {
301        /// Client ID.
302        client_id: String,
303        /// The custom message.
304        message: snapcast_proto::CustomMessage,
305    },
306}
307
308/// Commands the consumer sends to the server.
309#[derive(Debug)]
310#[non_exhaustive]
311pub enum ServerCommand {
312    /// Set a client's volume.
313    SetClientVolume {
314        /// Client ID.
315        client_id: String,
316        /// Volume (0–100).
317        volume: u16,
318        /// Mute state.
319        muted: bool,
320    },
321    /// Set a client's latency offset.
322    SetClientLatency {
323        /// Client ID.
324        client_id: String,
325        /// Latency in milliseconds.
326        latency: i32,
327    },
328    /// Set a client's display name.
329    SetClientName {
330        /// Client ID.
331        client_id: String,
332        /// New name.
333        name: String,
334    },
335    /// Assign a stream to a group.
336    SetGroupStream {
337        /// Group ID.
338        group_id: String,
339        /// Stream ID.
340        stream_id: String,
341    },
342    /// Mute/unmute a group.
343    SetGroupMute {
344        /// Group ID.
345        group_id: String,
346        /// Mute state.
347        muted: bool,
348    },
349    /// Set a group's display name.
350    SetGroupName {
351        /// Group ID.
352        group_id: String,
353        /// New name.
354        name: String,
355    },
356    /// Move clients to a group.
357    SetGroupClients {
358        /// Group ID.
359        group_id: String,
360        /// Client IDs.
361        clients: Vec<String>,
362    },
363    /// Delete a client from the server.
364    DeleteClient {
365        /// Client ID.
366        client_id: String,
367    },
368    /// Set stream metadata (artist, title, album, etc.).
369    SetStreamMeta {
370        /// Stream ID.
371        stream_id: String,
372        /// Metadata key-value pairs.
373        metadata: std::collections::HashMap<String, serde_json::Value>,
374    },
375    /// Dynamically add a stream source.
376    AddStream {
377        /// Stream source URI (e.g. `pipe:///tmp/snapfifo?name=default`).
378        uri: String,
379        /// Response: the stream ID assigned.
380        response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
381    },
382    /// Remove a stream source.
383    RemoveStream {
384        /// Stream ID to remove.
385        stream_id: String,
386    },
387    /// Forward a control command to a stream (play, pause, next, etc.).
388    StreamControl {
389        /// Stream ID.
390        stream_id: String,
391        /// Command name (e.g. "next", "previous", "pause", "seek").
392        command: String,
393        /// Optional command parameter (e.g. seek position).
394        params: serde_json::Value,
395    },
396    /// Get full server status.
397    GetStatus {
398        /// Response channel.
399        response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
400    },
401    /// Send a custom binary protocol message to a streaming client.
402    #[cfg(feature = "custom-protocol")]
403    SendToClient {
404        /// Target client ID.
405        client_id: String,
406        /// The custom message.
407        message: snapcast_proto::CustomMessage,
408    },
409    /// Stop the server gracefully.
410    Stop,
411}
412
413/// Default codec based on compiled features.
414fn default_codec() -> &'static str {
415    #[cfg(feature = "flac")]
416    return "flac";
417    #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
418    return "f32lz4";
419    #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
420    return "pcm";
421}
422
423/// Server configuration for the embeddable library.
424pub struct ServerConfig {
425    /// TCP port for binary protocol (client connections). Default: 1704.
426    pub stream_port: u16,
427    /// Audio buffer size in milliseconds. Default: 1000.
428    pub buffer_ms: u32,
429    /// Default codec: "f32lz4", "pcm", "opus", "ogg". Default: "f32lz4".
430    pub codec: String,
431    /// Default sample format. Default: 48000:16:2.
432    pub sample_format: String,
433    /// mDNS service type. Default: "_snapcast._tcp.local.".
434    #[cfg(feature = "mdns")]
435    pub mdns_service_type: String,
436    /// Enable mDNS advertisement. Default: true (when mdns feature is compiled in).
437    #[cfg(feature = "mdns")]
438    pub mdns_enabled: bool,
439    /// mDNS service name. Default: "Snapserver".
440    #[cfg(feature = "mdns")]
441    pub mdns_name: String,
442    /// Auth validator for streaming clients. `None` = no auth required.
443    pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
444    /// Client filter — called after Hello to accept/reject connections.
445    /// `None` = accept all clients.
446    pub client_filter: Option<std::sync::Arc<dyn auth::ClientFilter>>,
447    /// Pre-shared key for f32lz4 encryption. `None` = no encryption.
448    #[cfg(feature = "encryption")]
449    pub encryption_psk: Option<String>,
450    /// Path to persist server state (clients, groups). `None` = no persistence.
451    pub state_file: Option<std::path::PathBuf>,
452    /// Send audio data to muted clients. Default: false (skip muted, saves bandwidth).
453    pub send_audio_to_muted: bool,
454}
455
456impl Default for ServerConfig {
457    fn default() -> Self {
458        Self {
459            stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
460            buffer_ms: 1000,
461            codec: default_codec().into(),
462            sample_format: "48000:16:2".into(),
463            #[cfg(feature = "mdns")]
464            mdns_service_type: "_snapcast._tcp.local.".into(),
465            #[cfg(feature = "mdns")]
466            mdns_enabled: true,
467            #[cfg(feature = "mdns")]
468            mdns_name: "Snapserver".into(),
469            auth: None,
470            client_filter: None,
471            #[cfg(feature = "encryption")]
472            encryption_psk: None,
473            state_file: None,
474            send_audio_to_muted: false,
475        }
476    }
477}
478
479/// Per-stream configuration. If `None`, inherits from [`ServerConfig`].
480#[derive(Debug, Clone, Default)]
481pub struct StreamConfig {
482    /// Codec override (e.g. "flac", "f32lz4", "opus", "ogg", "pcm").
483    pub codec: Option<String>,
484    /// Sample format override (e.g. "48000:16:2").
485    pub sample_format: Option<String>,
486}
487
488/// The embeddable Snapcast server.
489pub struct SnapServer {
490    config: ServerConfig,
491    event_tx: mpsc::Sender<ServerEvent>,
492    command_tx: mpsc::Sender<ServerCommand>,
493    command_rx: Option<mpsc::Receiver<ServerCommand>>,
494    /// Named audio streams — each gets its own encoder at run().
495    streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
496    /// Broadcast channel for encoded chunks → sessions.
497    chunk_tx: broadcast::Sender<WireChunkData>,
498}
499
500/// Spawn a per-stream encode loop on a dedicated thread.
501///
502/// Receives `AudioFrame`, passes `AudioData` directly to the encoder,
503/// and broadcasts encoded `WireChunkData` to sessions.
504fn spawn_stream_encoder(
505    stream_id: String,
506    mut rx: mpsc::Receiver<AudioFrame>,
507    mut enc: Box<dyn encoder::Encoder>,
508    chunk_tx: broadcast::Sender<WireChunkData>,
509    sample_rate: u32,
510    channels: u16,
511) {
512    std::thread::spawn(move || {
513        let rt = tokio::runtime::Builder::new_current_thread()
514            .enable_time()
515            .build()
516            .expect("encoder runtime");
517
518        rt.block_on(async {
519            let mut next_tick: Option<tokio::time::Instant> = None;
520            while let Some(frame) = rx.recv().await {
521                // Pace F32 sources to realtime (pipe sources pace naturally via blocking read)
522                if let AudioData::F32(ref samples) = frame.data {
523                    let num_frames = samples.len() / channels.max(1) as usize;
524                    let chunk_dur = std::time::Duration::from_micros(
525                        (num_frames as u64 * 1_000_000) / sample_rate as u64,
526                    );
527                    let now = tokio::time::Instant::now();
528                    let tick = next_tick.get_or_insert(now);
529                    // Reset on gap (>500ms behind wall clock)
530                    if now.checked_duration_since(*tick + chunk_dur)
531                        > Some(std::time::Duration::from_millis(500))
532                    {
533                        *tick = now;
534                    }
535                    *tick += chunk_dur;
536                    tokio::time::sleep_until(*tick).await;
537                }
538                match enc.encode(&frame.data) {
539                    Ok(encoded) if !encoded.data.is_empty() => {
540                        let _ = chunk_tx.send(WireChunkData {
541                            stream_id: stream_id.clone(),
542                            timestamp_usec: frame.timestamp_usec,
543                            data: encoded.data,
544                        });
545                    }
546                    Err(e) => {
547                        tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
548                    }
549                    _ => {} // encoder buffering
550                }
551            }
552        });
553    });
554}
555
556/// Convert f32 samples to PCM bytes at the given bit depth.
557impl SnapServer {
558    /// Create a new server. Returns the server and event receiver.
559    pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
560        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
561        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
562        let (chunk_tx, _) = broadcast::channel(256);
563        let server = Self {
564            config,
565            event_tx,
566            command_tx,
567            command_rx: Some(command_rx),
568            streams: Vec::new(),
569            chunk_tx,
570        };
571        (server, event_rx)
572    }
573
574    /// Add a named audio stream. Returns a sender for pushing audio frames.
575    ///
576    /// Uses the server's default codec and sample format.
577    pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
578        self.add_stream_with_config(name, StreamConfig::default())
579    }
580
581    /// Add a named F32 audio stream with automatic chunking and timestamping.
582    ///
583    /// Returns an [`F32AudioSender`] that accepts variable-size F32 sample buffers
584    /// and handles 20ms chunking, monotonic timestamps, and gap detection internally.
585    ///
586    /// # Errors
587    /// Returns an error if the server's `sample_format` cannot be parsed.
588    pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
589        let sf: SampleFormat =
590            self.config.sample_format.parse().map_err(|e| {
591                format!("invalid sample_format '{}': {e}", self.config.sample_format)
592            })?;
593        let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
594        self.streams
595            .push((name.to_string(), StreamConfig::default(), rx));
596        Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
597    }
598
599    /// Add a named audio stream with per-stream codec/format overrides.
600    pub fn add_stream_with_config(
601        &mut self,
602        name: &str,
603        config: StreamConfig,
604    ) -> mpsc::Sender<AudioFrame> {
605        let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
606        self.streams.push((name.to_string(), config, rx));
607        tx
608    }
609
610    /// Get a cloneable command sender.
611    pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
612        self.command_tx.clone()
613    }
614
615    /// Access the server configuration.
616    pub fn config(&self) -> &ServerConfig {
617        &self.config
618    }
619
620    /// Run the server. Blocks until stopped or a fatal error occurs.
621    pub async fn run(&mut self) -> anyhow::Result<()> {
622        let mut command_rx = self
623            .command_rx
624            .take()
625            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
626
627        let event_tx = self.event_tx.clone();
628
629        let sample_format: snapcast_proto::SampleFormat = self
630            .config
631            .sample_format
632            .parse()
633            .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
634
635        anyhow::ensure!(
636            !self.streams.is_empty(),
637            "No streams configured — call add_stream() before run()"
638        );
639
640        tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
641
642        // Advertise via mDNS (protocol-level discovery)
643        #[cfg(feature = "mdns")]
644        let _mdns = if self.config.mdns_enabled {
645            mdns::MdnsAdvertiser::new(
646                self.config.stream_port,
647                &self.config.mdns_service_type,
648                &self.config.mdns_name,
649            )
650            .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
651            .ok()
652        } else {
653            None
654        };
655
656        // Create default encoder — used for codec header and first default stream
657        let default_enc_config = encoder::EncoderConfig {
658            codec: self.config.codec.clone(),
659            format: sample_format,
660            options: String::new(),
661            #[cfg(feature = "encryption")]
662            encryption_psk: self.config.encryption_psk.clone(),
663        };
664        let default_enc = encoder::create(&default_enc_config)?;
665
666        // Spawn per-stream encode loops — reuse default_enc for first default stream
667        let chunk_tx = self.chunk_tx.clone();
668        let streams = std::mem::take(&mut self.streams);
669        let mut default_enc = Some(default_enc);
670
671        // Shared state for command handlers
672        let initial_state = self
673            .config
674            .state_file
675            .as_ref()
676            .map(|p| state::ServerState::load(p))
677            .unwrap_or_default();
678        let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
679
680        // Create session server before stream registration
681        // (first_stream_name set in loop below, but SessionServer only needs it for default routing)
682        let first_name = streams
683            .first()
684            .map(|(n, _, _)| n.clone())
685            .unwrap_or_default();
686        let session_srv = Arc::new(session::SessionServer::new(
687            self.config.stream_port,
688            self.config.buffer_ms as i32,
689            self.config.auth.clone(),
690            self.config.client_filter.clone(),
691            Arc::clone(&shared_state),
692            first_name.clone(),
693            self.config.send_audio_to_muted,
694        ));
695
696        for (name, stream_cfg, rx) in streams {
697            {
698                let mut s = shared_state.lock().await;
699                s.streams.push(state::StreamInfo {
700                    id: name.clone(),
701                    status: "idle".into(),
702                    uri: String::new(),
703                    properties: Default::default(),
704                });
705            }
706            let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
707                if let Some(enc) = default_enc.take() {
708                    enc
709                } else {
710                    encoder::create(&default_enc_config)?
711                }
712            } else {
713                let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
714                let stream_format: snapcast_proto::SampleFormat = stream_cfg
715                    .sample_format
716                    .as_deref()
717                    .and_then(|s| s.parse().ok())
718                    .unwrap_or(sample_format);
719                encoder::create(&encoder::EncoderConfig {
720                    codec: stream_codec.to_string(),
721                    format: stream_format,
722                    options: String::new(),
723                    #[cfg(feature = "encryption")]
724                    encryption_psk: self.config.encryption_psk.clone(),
725                })?
726            };
727            tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
728            session_srv
729                .register_stream_codec(&name, enc.name(), enc.header())
730                .await;
731            spawn_stream_encoder(
732                name,
733                rx,
734                enc,
735                chunk_tx.clone(),
736                sample_format.rate(),
737                sample_format.channels(),
738            );
739        }
740
741        let session_for_run = Arc::clone(&session_srv);
742        let session_event_tx = event_tx.clone();
743        let session_chunk_tx = self.chunk_tx.clone();
744        let session_handle = tokio::spawn(async move {
745            if let Err(e) = session_for_run
746                .run(session_chunk_tx, session_event_tx)
747                .await
748            {
749                tracing::error!(error = %e, "Session server error");
750            }
751        });
752
753        let state_file = self.config.state_file.clone();
754        let save_state = |s: &state::ServerState| {
755            if let Some(ref path) = state_file {
756                let _ = s
757                    .save(path)
758                    .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
759            }
760        };
761
762        // Main loop
763        loop {
764            tokio::select! {
765                cmd = command_rx.recv() => {
766                    match cmd {
767                        Some(ServerCommand::Stop) | None => {
768                            tracing::info!("Server stopped");
769                            session_handle.abort();
770                            return Ok(());
771                        }
772                        Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
773                            let mut s = shared_state.lock().await;
774                            if let Some(c) = s.clients.get_mut(&client_id) {
775                                c.config.volume.percent = volume;
776                                c.config.volume.muted = muted;
777                            }
778                            let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
779                            save_state(&s);
780                            drop(s);
781                            session_srv.push_settings(ClientSettingsUpdate {
782                                client_id: client_id.clone(),
783                                buffer_ms: self.config.buffer_ms as i32,
784                                latency, volume, muted,
785                            }).await;
786                            let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
787                            session_srv.update_routing_for_client(&client_id).await;
788                        }
789                        Some(ServerCommand::SetClientLatency { client_id, latency }) => {
790                            let mut s = shared_state.lock().await;
791                            if let Some(c) = s.clients.get_mut(&client_id) {
792                                c.config.latency = latency;
793                                session_srv.push_settings(ClientSettingsUpdate {
794                                    client_id: client_id.clone(),
795                                    buffer_ms: self.config.buffer_ms as i32,
796                                    latency,
797                                    volume: c.config.volume.percent,
798                                    muted: c.config.volume.muted,
799                                }).await;
800                            }
801                            save_state(&s);
802                            drop(s);
803                            let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
804                        }
805                        Some(ServerCommand::SetClientName { client_id, name }) => {
806                            let mut s = shared_state.lock().await;
807                            if let Some(c) = s.clients.get_mut(&client_id) {
808                                c.config.name = name.clone();
809                            }
810                            save_state(&s);
811                            drop(s);
812                            let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
813                        }
814                        Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
815                            let mut s = shared_state.lock().await;
816                            s.set_group_stream(&group_id, &stream_id);
817                            save_state(&s);
818                            drop(s);
819                            let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
820                            session_srv.update_routing_for_group(&group_id).await;
821                        }
822                        Some(ServerCommand::SetGroupMute { group_id, muted }) => {
823                            let mut s = shared_state.lock().await;
824                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
825                                g.muted = muted;
826                            }
827                            save_state(&s);
828                            drop(s);
829                            let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
830                            session_srv.update_routing_for_group(&group_id).await;
831                        }
832                        Some(ServerCommand::SetGroupName { group_id, name }) => {
833                            let mut s = shared_state.lock().await;
834                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
835                                g.name = name.clone();
836                            }
837                            save_state(&s);
838                            drop(s);
839                            let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
840                        }
841                        Some(ServerCommand::SetGroupClients { group_id, clients }) => {
842                            let mut s = shared_state.lock().await;
843                            s.set_group_clients(&group_id, &clients);
844                            save_state(&s);
845                            drop(s);
846                            // Structural change — mirrors Server.OnUpdate in C++ snapserver
847                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
848                            session_srv.update_routing_all().await;
849                        }
850                        Some(ServerCommand::DeleteClient { client_id }) => {
851                            let mut s = shared_state.lock().await;
852                            s.remove_client_from_groups(&client_id);
853                            s.clients.remove(&client_id);
854                            save_state(&s);
855                            drop(s);
856                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
857                            session_srv.update_routing_all().await;
858                        }
859                        Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
860                            let mut s = shared_state.lock().await;
861                            if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
862                                stream.properties = metadata.clone();
863                            }
864                            drop(s);
865                            let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
866                        }
867                        Some(ServerCommand::AddStream { uri, response_tx }) => {
868                            // Parse stream name from URI query param, or use the URI as ID
869                            let name = uri.split("name=").nth(1)
870                                .and_then(|s| s.split('&').next())
871                                .unwrap_or("dynamic")
872                                .to_string();
873                            let mut s = shared_state.lock().await;
874                            if s.streams.iter().any(|st| st.id == name) {
875                                let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
876                            } else {
877                                s.streams.push(state::StreamInfo {
878                                    id: name.clone(),
879                                    status: "idle".into(),
880                                    uri: uri.clone(),
881                                    properties: Default::default(),
882                                });
883                                save_state(&s);
884                                drop(s);
885                                let _ = event_tx.try_send(ServerEvent::ServerUpdated);
886                                let _ = response_tx.send(Ok(name));
887                            }
888                        }
889                        Some(ServerCommand::RemoveStream { stream_id }) => {
890                            let mut s = shared_state.lock().await;
891                            s.streams.retain(|st| st.id != stream_id);
892                            // Clear stream_id on groups that referenced this stream
893                            for g in &mut s.groups {
894                                if g.stream_id == stream_id {
895                                    g.stream_id.clear();
896                                }
897                            }
898                            save_state(&s);
899                            drop(s);
900                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
901                            session_srv.update_routing_all().await;
902                        }
903                        Some(ServerCommand::StreamControl { stream_id, command, params }) => {
904                            tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
905                            // Forward to embedder via event — the library doesn't own stream readers
906                            let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
907                        }
908                        Some(ServerCommand::GetStatus { response_tx }) => {
909                            let s = shared_state.lock().await;
910                            let _ = response_tx.send(s.to_status());
911                        }
912                        #[cfg(feature = "custom-protocol")]
913                        Some(ServerCommand::SendToClient { client_id, message }) => {
914                            session_srv.send_custom(&client_id, message.type_id, message.payload).await;
915                        }
916                    }
917                }
918            }
919        }
920    }
921}