Skip to main content

snapcast_server/
lib.rs

1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7//! Snapcast server library — embeddable synchronized multiroom audio server.
8//!
9//! See also: [`snapcast-client`](https://docs.rs/snapcast-client) for the client library.
10//! # Architecture
11//!
12//! The server is built around a channel-based API matching `snapcast-client`:
13//!
14//! - [`SnapServer`] is the main entry point
15//! - [`ServerEvent`] flows from server → consumer (client connected, stream status, custom messages)
16//! - [`ServerCommand`] flows from consumer → server (typed mutations, custom messages, stop)
17//!
18//! # Example
19//!
20//! ```no_run
21//! use snapcast_server::{SnapServer, ServerConfig, ServerEvent, ServerCommand};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let config = ServerConfig::default();
25//! let (mut server, mut events) = SnapServer::new(config);
26//! let _audio_tx = server.add_stream("default");
27//! let cmd = server.command_sender();
28//!
29//! tokio::spawn(async move {
30//!     while let Some(event) = events.recv().await {
31//!         match event {
32//!             ServerEvent::ClientConnected { id, ref hello } => {
33//!                 tracing::info!(id, name = hello.host_name, "Client connected");
34//!             }
35//!             _ => {}
36//!         }
37//!     }
38//! });
39//!
40//! server.run().await?;
41//! # Ok(())
42//! # }
43//! ```
44
45use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49// Re-export proto types that embedders need
50#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62/// Channel size for F32 embedded sources — backpressure from encoder pacing.
63const F32_CHANNEL_SIZE: usize = 1;
64
65/// Audio data pushed by the consumer — either f32 or raw PCM.
66#[derive(Debug, Clone)]
67pub enum AudioData {
68    /// Interleaved f32 samples (from DSP, EQ, AirPlay receivers).
69    /// Range: -1.0 to 1.0.
70    F32(Vec<f32>),
71    /// Raw interleaved PCM bytes at the stream's configured sample format
72    /// (from pipe/file/process readers). Byte order: little-endian.
73    Pcm(Vec<u8>),
74}
75
76/// A timestamped audio frame for server input.
77#[derive(Debug, Clone)]
78pub struct AudioFrame {
79    /// Audio samples.
80    pub data: AudioData,
81    /// Timestamp in microseconds (server time).
82    pub timestamp_usec: i64,
83}
84
85/// Buffered sender for F32 audio that handles chunking, timestamping, and gap detection.
86///
87/// Accumulates variable-size F32 sample buffers and emits fixed-size 20ms chunks
88/// with monotonic timestamps. Automatically resets on playback gaps (>500ms).
89///
90/// Created by [`SnapServer::add_f32_stream`].
91pub struct F32AudioSender {
92    tx: mpsc::Sender<AudioFrame>,
93    buf: Vec<f32>,
94    chunk_samples: usize,
95    channels: u16,
96    sample_rate: u32,
97    ts: Option<time::ChunkTimestamper>,
98    last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102    fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103        let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104        Self {
105            tx,
106            buf: Vec::with_capacity(chunk_samples * 2),
107            chunk_samples,
108            channels,
109            sample_rate,
110            ts: None,
111            last_send: std::time::Instant::now(),
112        }
113    }
114
115    /// Push interleaved F32 samples. Variable-size input is accumulated and
116    /// emitted as fixed 20ms chunks. Returns when all complete chunks are sent.
117    pub async fn send(
118        &mut self,
119        samples: &[f32],
120    ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121        let now = std::time::Instant::now();
122        if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123            self.ts = None;
124            self.buf.clear();
125        }
126        self.last_send = now;
127
128        self.buf.extend_from_slice(samples);
129        let ch = self.channels.max(1) as usize;
130        while self.buf.len() >= self.chunk_samples {
131            let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132            let frames = (self.chunk_samples / ch) as u32;
133            let ts = self
134                .ts
135                .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136            let timestamp_usec = ts.next(frames);
137            self.tx
138                .send(AudioFrame {
139                    data: AudioData::F32(chunk),
140                    timestamp_usec,
141                })
142                .await?;
143        }
144        Ok(())
145    }
146
147    /// Flush remaining samples (< 20ms) as a final short chunk.
148    /// Call at end-of-track to avoid losing the last few milliseconds.
149    pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150        if self.buf.is_empty() {
151            return Ok(());
152        }
153        let chunk: Vec<f32> = self.buf.drain(..).collect();
154        let ch = self.channels.max(1) as usize;
155        let frames = (chunk.len() / ch) as u32;
156        let ts = self
157            .ts
158            .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159        let timestamp_usec = ts.next(frames);
160        self.tx
161            .send(AudioFrame {
162                data: AudioData::F32(chunk),
163                timestamp_usec,
164            })
165            .await
166    }
167}
168
169/// An encoded audio chunk ready to be sent to clients.
170#[derive(Debug, Clone)]
171pub struct WireChunkData {
172    /// Stream ID this chunk belongs to.
173    pub stream_id: String,
174    /// Server timestamp in microseconds.
175    pub timestamp_usec: i64,
176    /// Encoded audio data.
177    pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184#[cfg(feature = "mdns")]
185pub(crate) mod mdns;
186pub(crate) mod session;
187pub(crate) mod state;
188pub mod status;
189pub(crate) mod stream;
190pub mod time;
191
192/// Settings update pushed to a streaming client via binary protocol.
193#[derive(Debug, Clone)]
194pub struct ClientSettingsUpdate {
195    /// Target client ID.
196    pub client_id: String,
197    /// Buffer size in ms.
198    pub buffer_ms: i32,
199    /// Latency offset in ms.
200    pub latency: i32,
201    /// Volume (0–100).
202    pub volume: u16,
203    /// Mute state.
204    pub muted: bool,
205}
206
207/// Events emitted by the server to the consumer.
208#[derive(Debug)]
209#[non_exhaustive]
210pub enum ServerEvent {
211    /// A client connected via the binary protocol.
212    ClientConnected {
213        /// Unique client identifier.
214        id: String,
215        /// The client's Hello message with all connection metadata.
216        hello: snapcast_proto::message::hello::Hello,
217    },
218    /// A client disconnected.
219    ClientDisconnected {
220        /// Unique client identifier.
221        id: String,
222    },
223    /// A client's volume changed.
224    ClientVolumeChanged {
225        /// Client ID.
226        client_id: String,
227        /// New volume (0–100).
228        volume: u16,
229        /// Mute state.
230        muted: bool,
231    },
232    /// A client's latency changed.
233    ClientLatencyChanged {
234        /// Client ID.
235        client_id: String,
236        /// New latency in ms.
237        latency: i32,
238    },
239    /// A client's name changed.
240    ClientNameChanged {
241        /// Client ID.
242        client_id: String,
243        /// New name.
244        name: String,
245    },
246    /// A group's stream assignment changed.
247    GroupStreamChanged {
248        /// Group ID.
249        group_id: String,
250        /// New stream ID.
251        stream_id: String,
252    },
253    /// A group's mute state changed.
254    GroupMuteChanged {
255        /// Group ID.
256        group_id: String,
257        /// Mute state.
258        muted: bool,
259    },
260    /// A stream's status changed (playing, idle, unknown).
261    StreamStatus {
262        /// Stream identifier.
263        stream_id: String,
264        /// New status.
265        status: String,
266    },
267    /// Stream metadata/properties changed.
268    StreamMetaChanged {
269        /// Stream identifier.
270        stream_id: String,
271        /// Updated properties.
272        metadata: std::collections::HashMap<String, serde_json::Value>,
273    },
274    /// A group's name changed.
275    GroupNameChanged {
276        /// Group ID.
277        group_id: String,
278        /// New name.
279        name: String,
280    },
281    /// Server state changed — groups were reorganized (created, deleted, merged).
282    ///
283    /// Emitted after structural changes like `SetGroupClients` or `DeleteClient`
284    /// when the group topology changes. Mirrors `Server.OnUpdate` in the C++ snapserver.
285    /// The consumer should re-read server status via `GetStatus`.
286    ServerUpdated,
287    /// A stream control command was received (play, pause, next, seek, etc.).
288    ///
289    /// The library forwards this to the embedder since it doesn't own stream readers.
290    StreamControl {
291        /// Stream ID.
292        stream_id: String,
293        /// Command name.
294        command: String,
295        /// Optional parameters.
296        params: serde_json::Value,
297    },
298    /// Custom binary protocol message from a streaming client.
299    #[cfg(feature = "custom-protocol")]
300    CustomMessage {
301        /// Client ID.
302        client_id: String,
303        /// The custom message.
304        message: snapcast_proto::CustomMessage,
305    },
306}
307
308/// Commands the consumer sends to the server.
309#[derive(Debug)]
310#[non_exhaustive]
311pub enum ServerCommand {
312    /// Set a client's volume.
313    SetClientVolume {
314        /// Client ID.
315        client_id: String,
316        /// Volume (0–100).
317        volume: u16,
318        /// Mute state.
319        muted: bool,
320    },
321    /// Set a client's latency offset.
322    SetClientLatency {
323        /// Client ID.
324        client_id: String,
325        /// Latency in milliseconds.
326        latency: i32,
327    },
328    /// Set a client's display name.
329    SetClientName {
330        /// Client ID.
331        client_id: String,
332        /// New name.
333        name: String,
334    },
335    /// Assign a stream to a group.
336    SetGroupStream {
337        /// Group ID.
338        group_id: String,
339        /// Stream ID.
340        stream_id: String,
341    },
342    /// Mute/unmute a group.
343    SetGroupMute {
344        /// Group ID.
345        group_id: String,
346        /// Mute state.
347        muted: bool,
348    },
349    /// Set a group's display name.
350    SetGroupName {
351        /// Group ID.
352        group_id: String,
353        /// New name.
354        name: String,
355    },
356    /// Move clients to a group.
357    SetGroupClients {
358        /// Group ID.
359        group_id: String,
360        /// Client IDs.
361        clients: Vec<String>,
362    },
363    /// Delete a client from the server.
364    DeleteClient {
365        /// Client ID.
366        client_id: String,
367    },
368    /// Set stream metadata (artist, title, album, etc.).
369    SetStreamMeta {
370        /// Stream ID.
371        stream_id: String,
372        /// Metadata key-value pairs.
373        metadata: std::collections::HashMap<String, serde_json::Value>,
374    },
375    /// Request dynamic stream addition from an application shell.
376    ///
377    /// The embeddable library does not own stream readers. Binaries or embedders
378    /// must create streams before [`SnapServer::run`] or implement their own
379    /// orchestration around this command.
380    AddStream {
381        /// Stream source URI (e.g. `pipe:///tmp/snapfifo?name=default`).
382        uri: String,
383        /// Response: the stream ID assigned.
384        response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
385    },
386    /// Remove a stream source.
387    RemoveStream {
388        /// Stream ID to remove.
389        stream_id: String,
390    },
391    /// Forward a control command to a stream (play, pause, next, etc.).
392    StreamControl {
393        /// Stream ID.
394        stream_id: String,
395        /// Command name (e.g. "next", "previous", "pause", "seek").
396        command: String,
397        /// Optional command parameter (e.g. seek position).
398        params: serde_json::Value,
399    },
400    /// Get full server status.
401    GetStatus {
402        /// Response channel.
403        response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
404    },
405    /// Send a custom binary protocol message to a streaming client.
406    #[cfg(feature = "custom-protocol")]
407    SendToClient {
408        /// Target client ID.
409        client_id: String,
410        /// The custom message.
411        message: snapcast_proto::CustomMessage,
412    },
413    /// Stop the server gracefully.
414    Stop,
415}
416
417/// Default codec based on compiled features.
418fn default_codec() -> &'static str {
419    #[cfg(feature = "flac")]
420    return snapcast_proto::CODEC_FLAC;
421    #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
422    return snapcast_proto::CODEC_F32LZ4;
423    #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
424    return snapcast_proto::CODEC_PCM;
425}
426
427/// Server configuration for the embeddable library.
428pub struct ServerConfig {
429    /// Bind address for binary protocol client connections. Default: 0.0.0.0.
430    pub stream_bind_address: String,
431    /// TCP port for binary protocol (client connections). Default: 1704.
432    pub stream_port: u16,
433    /// Audio buffer size in milliseconds. Default: 1000.
434    pub buffer_ms: u32,
435    /// Default codec: "f32lz4", "pcm", "opus", "ogg". Default: "f32lz4".
436    pub codec: String,
437    /// Default sample format. Default: 48000:16:2.
438    pub sample_format: String,
439    /// mDNS service type. Default: "_snapcast._tcp.local.".
440    #[cfg(feature = "mdns")]
441    pub mdns_service_type: String,
442    /// Enable mDNS advertisement. Default: true (when mdns feature is compiled in).
443    #[cfg(feature = "mdns")]
444    pub mdns_enabled: bool,
445    /// mDNS service name. Default: "Snapserver".
446    #[cfg(feature = "mdns")]
447    pub mdns_name: String,
448    /// Auth validator for streaming clients. `None` = no auth required.
449    pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
450    /// Client filter — called after Hello to accept/reject connections.
451    /// `None` = accept all clients.
452    pub client_filter: Option<std::sync::Arc<dyn auth::ClientFilter>>,
453    /// Pre-shared key for f32lz4 encryption. `None` = no encryption.
454    #[cfg(feature = "encryption")]
455    pub encryption_psk: Option<String>,
456    /// Path to persist server state (clients, groups). `None` = no persistence.
457    pub state_file: Option<std::path::PathBuf>,
458    /// Send audio data to muted clients. Default: false (skip muted, saves bandwidth).
459    pub send_audio_to_muted: bool,
460}
461
462impl Default for ServerConfig {
463    fn default() -> Self {
464        Self {
465            stream_bind_address: snapcast_proto::DEFAULT_BIND_ADDRESS.into(),
466            stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
467            buffer_ms: snapcast_proto::DEFAULT_BUFFER_MS,
468            codec: default_codec().into(),
469            sample_format: snapcast_proto::DEFAULT_SAMPLE_FORMAT_STRING.into(),
470            #[cfg(feature = "mdns")]
471            mdns_service_type: snapcast_proto::DEFAULT_MDNS_SERVICE_TYPE.into(),
472            #[cfg(feature = "mdns")]
473            mdns_enabled: true,
474            #[cfg(feature = "mdns")]
475            mdns_name: snapcast_proto::DEFAULT_SERVER_NAME.into(),
476            auth: None,
477            client_filter: None,
478            #[cfg(feature = "encryption")]
479            encryption_psk: None,
480            state_file: None,
481            send_audio_to_muted: false,
482        }
483    }
484}
485
486/// Per-stream configuration. If `None`, inherits from [`ServerConfig`].
487#[derive(Debug, Clone, Default)]
488pub struct StreamConfig {
489    /// Codec override (e.g. "flac", "f32lz4", "opus", "ogg", "pcm").
490    pub codec: Option<String>,
491    /// Sample format override (e.g. "48000:16:2").
492    pub sample_format: Option<String>,
493}
494
495/// The embeddable Snapcast server.
496pub struct SnapServer {
497    config: ServerConfig,
498    event_tx: mpsc::Sender<ServerEvent>,
499    command_tx: mpsc::Sender<ServerCommand>,
500    command_rx: Option<mpsc::Receiver<ServerCommand>>,
501    /// Named audio streams — each gets its own encoder at run().
502    streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
503    /// Broadcast channel for encoded chunks → sessions.
504    chunk_tx: broadcast::Sender<WireChunkData>,
505}
506
507/// Spawn a per-stream encode loop on a dedicated thread.
508///
509/// Receives `AudioFrame`, passes `AudioData` directly to the encoder,
510/// and broadcasts encoded `WireChunkData` to sessions.
511fn spawn_stream_encoder(
512    stream_id: String,
513    mut rx: mpsc::Receiver<AudioFrame>,
514    mut enc: Box<dyn encoder::Encoder>,
515    chunk_tx: broadcast::Sender<WireChunkData>,
516    sample_rate: u32,
517    channels: u16,
518) {
519    std::thread::spawn(move || {
520        let rt = tokio::runtime::Builder::new_current_thread()
521            .enable_time()
522            .build()
523            .expect("encoder runtime");
524
525        rt.block_on(async {
526            let mut next_tick: Option<tokio::time::Instant> = None;
527            while let Some(frame) = rx.recv().await {
528                // Pace F32 sources to realtime (pipe sources pace naturally via blocking read)
529                if let AudioData::F32(ref samples) = frame.data {
530                    let num_frames = samples.len() / channels.max(1) as usize;
531                    let chunk_dur = std::time::Duration::from_micros(
532                        (num_frames as u64 * 1_000_000) / sample_rate as u64,
533                    );
534                    let now = tokio::time::Instant::now();
535                    let tick = next_tick.get_or_insert(now);
536                    // Reset on gap (>500ms behind wall clock)
537                    if now.checked_duration_since(*tick + chunk_dur)
538                        > Some(std::time::Duration::from_millis(500))
539                    {
540                        *tick = now;
541                    }
542                    *tick += chunk_dur;
543                    tokio::time::sleep_until(*tick).await;
544                }
545                match enc.encode(&frame.data) {
546                    Ok(encoded) if !encoded.data.is_empty() => {
547                        let _ = chunk_tx.send(WireChunkData {
548                            stream_id: stream_id.clone(),
549                            timestamp_usec: frame.timestamp_usec,
550                            data: encoded.data,
551                        });
552                    }
553                    Err(e) => {
554                        tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
555                    }
556                    _ => {} // encoder buffering
557                }
558            }
559        });
560    });
561}
562
563/// Convert f32 samples to PCM bytes at the given bit depth.
564impl SnapServer {
565    /// Create a new server. Returns the server and event receiver.
566    pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
567        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
568        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
569        let (chunk_tx, _) = broadcast::channel(256);
570        let server = Self {
571            config,
572            event_tx,
573            command_tx,
574            command_rx: Some(command_rx),
575            streams: Vec::new(),
576            chunk_tx,
577        };
578        (server, event_rx)
579    }
580
581    /// Add a named audio stream. Returns a sender for pushing audio frames.
582    ///
583    /// Uses the server's default codec and sample format.
584    pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
585        self.add_stream_with_config(name, StreamConfig::default())
586    }
587
588    /// Add a named F32 audio stream with automatic chunking and timestamping.
589    ///
590    /// Returns an [`F32AudioSender`] that accepts variable-size F32 sample buffers
591    /// and handles 20ms chunking, monotonic timestamps, and gap detection internally.
592    ///
593    /// # Errors
594    /// Returns an error if the server's `sample_format` cannot be parsed.
595    pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
596        let sf: SampleFormat =
597            self.config.sample_format.parse().map_err(|e| {
598                format!("invalid sample_format '{}': {e}", self.config.sample_format)
599            })?;
600        let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
601        self.streams
602            .push((name.to_string(), StreamConfig::default(), rx));
603        Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
604    }
605
606    /// Add a named audio stream with per-stream codec/format overrides.
607    pub fn add_stream_with_config(
608        &mut self,
609        name: &str,
610        config: StreamConfig,
611    ) -> mpsc::Sender<AudioFrame> {
612        let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
613        self.streams.push((name.to_string(), config, rx));
614        tx
615    }
616
617    /// Get a cloneable command sender.
618    pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
619        self.command_tx.clone()
620    }
621
622    /// Access the server configuration.
623    pub fn config(&self) -> &ServerConfig {
624        &self.config
625    }
626
627    /// Run the server. Blocks until stopped or a fatal error occurs.
628    pub async fn run(&mut self) -> anyhow::Result<()> {
629        let mut command_rx = self
630            .command_rx
631            .take()
632            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
633
634        let event_tx = self.event_tx.clone();
635
636        let sample_format: snapcast_proto::SampleFormat = self
637            .config
638            .sample_format
639            .parse()
640            .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
641
642        anyhow::ensure!(
643            !self.streams.is_empty(),
644            "No streams configured — call add_stream() before run()"
645        );
646
647        tracing::info!(
648            bind_address = %self.config.stream_bind_address,
649            stream_port = self.config.stream_port,
650            "Snapserver starting"
651        );
652
653        // Advertise via mDNS (protocol-level discovery)
654        #[cfg(feature = "mdns")]
655        let _mdns = if self.config.mdns_enabled {
656            mdns::MdnsAdvertiser::new(
657                self.config.stream_port,
658                &self.config.mdns_service_type,
659                &self.config.mdns_name,
660            )
661            .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
662            .ok()
663        } else {
664            None
665        };
666
667        // Create default encoder — used for codec header and first default stream
668        let default_enc_config = encoder::EncoderConfig {
669            codec: self.config.codec.clone(),
670            format: sample_format,
671            options: String::new(),
672            #[cfg(feature = "encryption")]
673            encryption_psk: self.config.encryption_psk.clone(),
674        };
675        let default_enc = encoder::create(&default_enc_config)?;
676
677        // Spawn per-stream encode loops — reuse default_enc for first default stream
678        let chunk_tx = self.chunk_tx.clone();
679        let streams = std::mem::take(&mut self.streams);
680        let mut default_enc = Some(default_enc);
681
682        // Shared state for command handlers
683        let initial_state = self
684            .config
685            .state_file
686            .as_ref()
687            .map(|p| state::ServerState::load(p))
688            .unwrap_or_default();
689        let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
690
691        // Create session server before stream registration
692        // (first_stream_name set in loop below, but SessionServer only needs it for default routing)
693        let first_name = streams
694            .first()
695            .map(|(n, _, _)| n.clone())
696            .unwrap_or_default();
697        let session_srv = Arc::new(session::SessionServer::new(session::SessionServerConfig {
698            bind_address: self.config.stream_bind_address.clone(),
699            port: self.config.stream_port,
700            buffer_ms: self.config.buffer_ms as i32,
701            auth: self.config.auth.clone(),
702            client_filter: self.config.client_filter.clone(),
703            shared_state: Arc::clone(&shared_state),
704            default_stream: first_name.clone(),
705            send_audio_to_muted: self.config.send_audio_to_muted,
706        }));
707
708        for (name, stream_cfg, rx) in streams {
709            {
710                let mut s = shared_state.lock().await;
711                if !s.streams.iter().any(|existing| existing.id == name) {
712                    s.streams.push(state::StreamInfo {
713                        id: name.clone(),
714                        status: "idle".into(),
715                        uri: String::new(),
716                        properties: Default::default(),
717                    });
718                }
719            }
720            let mut active_format = sample_format;
721            let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
722                if let Some(enc) = default_enc.take() {
723                    enc
724                } else {
725                    encoder::create(&default_enc_config)?
726                }
727            } else {
728                let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
729                let stream_format: snapcast_proto::SampleFormat = stream_cfg
730                    .sample_format
731                    .as_deref()
732                    .and_then(|s| s.parse().ok())
733                    .unwrap_or(sample_format);
734                active_format = stream_format;
735                encoder::create(&encoder::EncoderConfig {
736                    codec: stream_codec.to_string(),
737                    format: stream_format,
738                    options: String::new(),
739                    #[cfg(feature = "encryption")]
740                    encryption_psk: self.config.encryption_psk.clone(),
741                })?
742            };
743            tracing::info!(stream = %name, codec = enc.name(), format = %active_format, "Stream registered");
744            session_srv
745                .register_stream_codec(&name, enc.name(), enc.header())
746                .await;
747            spawn_stream_encoder(
748                name,
749                rx,
750                enc,
751                chunk_tx.clone(),
752                active_format.rate(),
753                active_format.channels(),
754            );
755        }
756
757        let session_for_run = Arc::clone(&session_srv);
758        let session_event_tx = event_tx.clone();
759        let session_chunk_tx = self.chunk_tx.clone();
760        let session_handle = tokio::spawn(async move {
761            if let Err(e) = session_for_run
762                .run(session_chunk_tx, session_event_tx)
763                .await
764            {
765                tracing::error!(error = %e, "Session server error");
766            }
767        });
768
769        let state_file = self.config.state_file.clone();
770        let save_state = |s: &state::ServerState| {
771            if let Some(ref path) = state_file {
772                let _ = s
773                    .save(path)
774                    .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
775            }
776        };
777
778        // Main loop
779        loop {
780            tokio::select! {
781                cmd = command_rx.recv() => {
782                    match cmd {
783                        Some(ServerCommand::Stop) | None => {
784                            tracing::info!("Server stopped");
785                            session_handle.abort();
786                            return Ok(());
787                        }
788                        Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
789                            let mut s = shared_state.lock().await;
790                            if let Some(c) = s.clients.get_mut(&client_id) {
791                                c.config.volume.percent = volume;
792                                c.config.volume.muted = muted;
793                            }
794                            let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
795                            save_state(&s);
796                            drop(s);
797                            session_srv.push_settings(ClientSettingsUpdate {
798                                client_id: client_id.clone(),
799                                buffer_ms: self.config.buffer_ms as i32,
800                                latency, volume, muted,
801                            }).await;
802                            let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
803                            session_srv.update_routing_for_client(&client_id).await;
804                        }
805                        Some(ServerCommand::SetClientLatency { client_id, latency }) => {
806                            let mut settings_update = None;
807                            let mut s = shared_state.lock().await;
808                            if let Some(c) = s.clients.get_mut(&client_id) {
809                                c.config.latency = latency;
810                                settings_update = Some(ClientSettingsUpdate {
811                                    client_id: client_id.clone(),
812                                    buffer_ms: self.config.buffer_ms as i32,
813                                    latency,
814                                    volume: c.config.volume.percent,
815                                    muted: c.config.volume.muted,
816                                });
817                            }
818                            save_state(&s);
819                            drop(s);
820                            if let Some(update) = settings_update {
821                                session_srv.push_settings(update).await;
822                            }
823                            let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
824                        }
825                        Some(ServerCommand::SetClientName { client_id, name }) => {
826                            let mut s = shared_state.lock().await;
827                            if let Some(c) = s.clients.get_mut(&client_id) {
828                                c.config.name = name.clone();
829                            }
830                            save_state(&s);
831                            drop(s);
832                            let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
833                        }
834                        Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
835                            let mut s = shared_state.lock().await;
836                            s.set_group_stream(&group_id, &stream_id);
837                            save_state(&s);
838                            drop(s);
839                            let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
840                            session_srv.update_routing_for_group(&group_id).await;
841                        }
842                        Some(ServerCommand::SetGroupMute { group_id, muted }) => {
843                            let mut s = shared_state.lock().await;
844                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
845                                g.muted = muted;
846                            }
847                            save_state(&s);
848                            drop(s);
849                            let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
850                            session_srv.update_routing_for_group(&group_id).await;
851                        }
852                        Some(ServerCommand::SetGroupName { group_id, name }) => {
853                            let mut s = shared_state.lock().await;
854                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
855                                g.name = name.clone();
856                            }
857                            save_state(&s);
858                            drop(s);
859                            let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
860                        }
861                        Some(ServerCommand::SetGroupClients { group_id, clients }) => {
862                            let mut s = shared_state.lock().await;
863                            s.set_group_clients(&group_id, &clients);
864                            save_state(&s);
865                            drop(s);
866                            // Structural change — mirrors Server.OnUpdate in C++ snapserver
867                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
868                            session_srv.update_routing_all().await;
869                        }
870                        Some(ServerCommand::DeleteClient { client_id }) => {
871                            let mut s = shared_state.lock().await;
872                            s.remove_client_from_groups(&client_id);
873                            s.clients.remove(&client_id);
874                            save_state(&s);
875                            drop(s);
876                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
877                            session_srv.update_routing_all().await;
878                        }
879                        Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
880                            let mut s = shared_state.lock().await;
881                            if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
882                                stream.properties = metadata.clone();
883                            }
884                            drop(s);
885                            let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
886                        }
887                        Some(ServerCommand::AddStream { uri, response_tx }) => {
888                            tracing::warn!(uri, "Dynamic stream addition requires application-owned stream orchestration");
889                            let _ = response_tx.send(Err(
890                                "dynamic Stream.AddStream is not supported by the embeddable server after run(); create streams before run()".into(),
891                            ));
892                        }
893                        Some(ServerCommand::RemoveStream { stream_id }) => {
894                            let mut s = shared_state.lock().await;
895                            s.streams.retain(|st| st.id != stream_id);
896                            // Clear stream_id on groups that referenced this stream
897                            for g in &mut s.groups {
898                                if g.stream_id == stream_id {
899                                    g.stream_id.clear();
900                                }
901                            }
902                            save_state(&s);
903                            drop(s);
904                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
905                            session_srv.update_routing_all().await;
906                        }
907                        Some(ServerCommand::StreamControl { stream_id, command, params }) => {
908                            tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
909                            // Forward to embedder via event — the library doesn't own stream readers
910                            let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
911                        }
912                        Some(ServerCommand::GetStatus { response_tx }) => {
913                            let s = shared_state.lock().await;
914                            let _ = response_tx.send(s.to_status());
915                        }
916                        #[cfg(feature = "custom-protocol")]
917                        Some(ServerCommand::SendToClient { client_id, message }) => {
918                            session_srv.send_custom(&client_id, message.type_id, message.payload).await;
919                        }
920                    }
921                }
922            }
923        }
924    }
925}