Skip to main content

snapcast_server/
lib.rs

1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7//! Snapcast server library — embeddable synchronized multiroom audio server.
8//!
9//! See also: [`snapcast-client`](https://docs.rs/snapcast-client) for the client library.
10//! # Architecture
11//!
12//! The server is built around a channel-based API matching `snapcast-client`:
13//!
14//! - [`SnapServer`] is the main entry point
15//! - [`ServerEvent`] flows from server → consumer (client connected, stream status, custom messages)
16//! - [`ServerCommand`] flows from consumer → server (typed mutations, custom messages, stop)
17//!
18//! # Example
19//!
20//! ```no_run
21//! use snapcast_server::{SnapServer, ServerConfig, ServerEvent, ServerCommand};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let config = ServerConfig::default();
25//! let (mut server, mut events) = SnapServer::new(config);
26//! let _audio_tx = server.add_stream("default");
27//! let cmd = server.command_sender();
28//!
29//! tokio::spawn(async move {
30//!     while let Some(event) = events.recv().await {
31//!         match event {
32//!             ServerEvent::ClientConnected { id, ref hello } => {
33//!                 tracing::info!(id, name = hello.host_name, "Client connected");
34//!             }
35//!             _ => {}
36//!         }
37//!     }
38//! });
39//!
40//! server.run().await?;
41//! # Ok(())
42//! # }
43//! ```
44
45use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49// Re-export proto types that embedders need
50#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62/// Channel size for F32 embedded sources — backpressure from encoder pacing.
63const F32_CHANNEL_SIZE: usize = 1;
64
65/// Audio data pushed by the consumer — either f32 or raw PCM.
66#[derive(Debug, Clone)]
67pub enum AudioData {
68    /// Interleaved f32 samples (from DSP, EQ, AirPlay receivers).
69    /// Range: -1.0 to 1.0.
70    F32(Vec<f32>),
71    /// Raw interleaved PCM bytes at the stream's configured sample format
72    /// (from pipe/file/process readers). Byte order: little-endian.
73    Pcm(Vec<u8>),
74}
75
76/// A timestamped audio frame for server input.
77#[derive(Debug, Clone)]
78pub struct AudioFrame {
79    /// Audio samples.
80    pub data: AudioData,
81    /// Timestamp in microseconds (server time).
82    pub timestamp_usec: i64,
83}
84
85/// Buffered sender for F32 audio that handles chunking, timestamping, and gap detection.
86///
87/// Accumulates variable-size F32 sample buffers and emits fixed-size 20ms chunks
88/// with monotonic timestamps. Automatically resets on playback gaps (>500ms).
89///
90/// Created by [`SnapServer::add_f32_stream`].
91pub struct F32AudioSender {
92    tx: mpsc::Sender<AudioFrame>,
93    buf: Vec<f32>,
94    chunk_samples: usize,
95    channels: u16,
96    sample_rate: u32,
97    ts: Option<time::ChunkTimestamper>,
98    last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102    fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103        let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104        Self {
105            tx,
106            buf: Vec::with_capacity(chunk_samples * 2),
107            chunk_samples,
108            channels,
109            sample_rate,
110            ts: None,
111            last_send: std::time::Instant::now(),
112        }
113    }
114
115    /// Push interleaved F32 samples. Variable-size input is accumulated and
116    /// emitted as fixed 20ms chunks. Returns when all complete chunks are sent.
117    pub async fn send(
118        &mut self,
119        samples: &[f32],
120    ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121        let now = std::time::Instant::now();
122        if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123            self.ts = None;
124            self.buf.clear();
125        }
126        self.last_send = now;
127
128        self.buf.extend_from_slice(samples);
129        let ch = self.channels.max(1) as usize;
130        while self.buf.len() >= self.chunk_samples {
131            let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132            let frames = (self.chunk_samples / ch) as u32;
133            let ts = self
134                .ts
135                .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136            let timestamp_usec = ts.next(frames);
137            self.tx
138                .send(AudioFrame {
139                    data: AudioData::F32(chunk),
140                    timestamp_usec,
141                })
142                .await?;
143        }
144        Ok(())
145    }
146
147    /// Flush remaining samples (< 20ms) as a final short chunk.
148    /// Call at end-of-track to avoid losing the last few milliseconds.
149    pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150        if self.buf.is_empty() {
151            return Ok(());
152        }
153        let chunk: Vec<f32> = self.buf.drain(..).collect();
154        let ch = self.channels.max(1) as usize;
155        let frames = (chunk.len() / ch) as u32;
156        let ts = self
157            .ts
158            .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159        let timestamp_usec = ts.next(frames);
160        self.tx
161            .send(AudioFrame {
162                data: AudioData::F32(chunk),
163                timestamp_usec,
164            })
165            .await
166    }
167}
168
169/// An encoded audio chunk ready to be sent to clients.
170#[derive(Debug, Clone)]
171pub struct WireChunkData {
172    /// Stream ID this chunk belongs to.
173    pub stream_id: String,
174    /// Server timestamp in microseconds.
175    pub timestamp_usec: i64,
176    /// Encoded audio data.
177    pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184#[cfg(feature = "mdns")]
185pub(crate) mod mdns;
186pub(crate) mod session;
187pub(crate) mod state;
188pub mod status;
189pub(crate) mod stream;
190pub mod time;
191
192/// Settings update pushed to a streaming client via binary protocol.
193#[derive(Debug, Clone)]
194pub struct ClientSettingsUpdate {
195    /// Target client ID.
196    pub client_id: String,
197    /// Buffer size in ms.
198    pub buffer_ms: i32,
199    /// Latency offset in ms.
200    pub latency: i32,
201    /// Volume (0–100).
202    pub volume: u16,
203    /// Mute state.
204    pub muted: bool,
205}
206
207/// Events emitted by the server to the consumer.
208#[derive(Debug)]
209#[non_exhaustive]
210pub enum ServerEvent {
211    /// A client connected via the binary protocol.
212    ClientConnected {
213        /// Unique client identifier.
214        id: String,
215        /// The client's Hello message with all connection metadata.
216        hello: snapcast_proto::message::hello::Hello,
217    },
218    /// A client disconnected.
219    ClientDisconnected {
220        /// Unique client identifier.
221        id: String,
222    },
223    /// A client's volume changed.
224    ClientVolumeChanged {
225        /// Client ID.
226        client_id: String,
227        /// New volume (0–100).
228        volume: u16,
229        /// Mute state.
230        muted: bool,
231    },
232    /// A client's latency changed.
233    ClientLatencyChanged {
234        /// Client ID.
235        client_id: String,
236        /// New latency in ms.
237        latency: i32,
238    },
239    /// A client's name changed.
240    ClientNameChanged {
241        /// Client ID.
242        client_id: String,
243        /// New name.
244        name: String,
245    },
246    /// A group's stream assignment changed.
247    GroupStreamChanged {
248        /// Group ID.
249        group_id: String,
250        /// New stream ID.
251        stream_id: String,
252    },
253    /// A group's mute state changed.
254    GroupMuteChanged {
255        /// Group ID.
256        group_id: String,
257        /// Mute state.
258        muted: bool,
259    },
260    /// A stream's status changed (playing, idle, unknown).
261    StreamStatus {
262        /// Stream identifier.
263        stream_id: String,
264        /// New status.
265        status: String,
266    },
267    /// Stream metadata/properties changed.
268    StreamMetaChanged {
269        /// Stream identifier.
270        stream_id: String,
271        /// Updated properties.
272        metadata: std::collections::HashMap<String, serde_json::Value>,
273    },
274    /// A group's name changed.
275    GroupNameChanged {
276        /// Group ID.
277        group_id: String,
278        /// New name.
279        name: String,
280    },
281    /// Server state changed — groups were reorganized (created, deleted, merged).
282    ///
283    /// Emitted after structural changes like `SetGroupClients` or `DeleteClient`
284    /// when the group topology changes. Mirrors `Server.OnUpdate` in the C++ snapserver.
285    /// The consumer should re-read server status via `GetStatus`.
286    ServerUpdated,
287    /// A stream control command was received (play, pause, next, seek, etc.).
288    ///
289    /// The library forwards this to the embedder since it doesn't own stream readers.
290    StreamControl {
291        /// Stream ID.
292        stream_id: String,
293        /// Command name.
294        command: String,
295        /// Optional parameters.
296        params: serde_json::Value,
297    },
298    /// Custom binary protocol message from a streaming client.
299    #[cfg(feature = "custom-protocol")]
300    CustomMessage {
301        /// Client ID.
302        client_id: String,
303        /// The custom message.
304        message: snapcast_proto::CustomMessage,
305    },
306}
307
308/// Commands the consumer sends to the server.
309#[derive(Debug)]
310#[non_exhaustive]
311pub enum ServerCommand {
312    /// Set a client's volume.
313    SetClientVolume {
314        /// Client ID.
315        client_id: String,
316        /// Volume (0–100).
317        volume: u16,
318        /// Mute state.
319        muted: bool,
320    },
321    /// Set a client's latency offset.
322    SetClientLatency {
323        /// Client ID.
324        client_id: String,
325        /// Latency in milliseconds.
326        latency: i32,
327    },
328    /// Set a client's display name.
329    SetClientName {
330        /// Client ID.
331        client_id: String,
332        /// New name.
333        name: String,
334    },
335    /// Assign a stream to a group.
336    SetGroupStream {
337        /// Group ID.
338        group_id: String,
339        /// Stream ID.
340        stream_id: String,
341    },
342    /// Mute/unmute a group.
343    SetGroupMute {
344        /// Group ID.
345        group_id: String,
346        /// Mute state.
347        muted: bool,
348    },
349    /// Set a group's display name.
350    SetGroupName {
351        /// Group ID.
352        group_id: String,
353        /// New name.
354        name: String,
355    },
356    /// Move clients to a group.
357    SetGroupClients {
358        /// Group ID.
359        group_id: String,
360        /// Client IDs.
361        clients: Vec<String>,
362    },
363    /// Delete a client from the server.
364    DeleteClient {
365        /// Client ID.
366        client_id: String,
367    },
368    /// Set stream metadata (artist, title, album, etc.).
369    SetStreamMeta {
370        /// Stream ID.
371        stream_id: String,
372        /// Metadata key-value pairs.
373        metadata: std::collections::HashMap<String, serde_json::Value>,
374    },
375    /// Dynamically add a stream source.
376    AddStream {
377        /// Stream source URI (e.g. `pipe:///tmp/snapfifo?name=default`).
378        uri: String,
379        /// Response: the stream ID assigned.
380        response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
381    },
382    /// Remove a stream source.
383    RemoveStream {
384        /// Stream ID to remove.
385        stream_id: String,
386    },
387    /// Forward a control command to a stream (play, pause, next, etc.).
388    StreamControl {
389        /// Stream ID.
390        stream_id: String,
391        /// Command name (e.g. "next", "previous", "pause", "seek").
392        command: String,
393        /// Optional command parameter (e.g. seek position).
394        params: serde_json::Value,
395    },
396    /// Get full server status.
397    GetStatus {
398        /// Response channel.
399        response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
400    },
401    /// Send a custom binary protocol message to a streaming client.
402    #[cfg(feature = "custom-protocol")]
403    SendToClient {
404        /// Target client ID.
405        client_id: String,
406        /// The custom message.
407        message: snapcast_proto::CustomMessage,
408    },
409    /// Stop the server gracefully.
410    Stop,
411}
412
413/// Default codec based on compiled features.
414fn default_codec() -> &'static str {
415    #[cfg(feature = "flac")]
416    return "flac";
417    #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
418    return "f32lz4";
419    #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
420    return "pcm";
421}
422
423/// Server configuration for the embeddable library.
424pub struct ServerConfig {
425    /// TCP port for binary protocol (client connections). Default: 1704.
426    pub stream_port: u16,
427    /// Audio buffer size in milliseconds. Default: 1000.
428    pub buffer_ms: u32,
429    /// Default codec: "f32lz4", "pcm", "opus", "ogg". Default: "f32lz4".
430    pub codec: String,
431    /// Default sample format. Default: 48000:16:2.
432    pub sample_format: String,
433    /// mDNS service type. Default: "_snapcast._tcp.local.".
434    #[cfg(feature = "mdns")]
435    pub mdns_service_type: String,
436    /// Enable mDNS advertisement. Default: true (when mdns feature is compiled in).
437    #[cfg(feature = "mdns")]
438    pub mdns_enabled: bool,
439    /// mDNS service name. Default: "Snapserver".
440    #[cfg(feature = "mdns")]
441    pub mdns_name: String,
442    /// Auth validator for streaming clients. `None` = no auth required.
443    pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
444    /// Pre-shared key for f32lz4 encryption. `None` = no encryption.
445    #[cfg(feature = "encryption")]
446    pub encryption_psk: Option<String>,
447    /// Path to persist server state (clients, groups). `None` = no persistence.
448    pub state_file: Option<std::path::PathBuf>,
449    /// Send audio data to muted clients. Default: false (skip muted, saves bandwidth).
450    pub send_audio_to_muted: bool,
451}
452
453impl Default for ServerConfig {
454    fn default() -> Self {
455        Self {
456            stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
457            buffer_ms: 1000,
458            codec: default_codec().into(),
459            sample_format: "48000:16:2".into(),
460            #[cfg(feature = "mdns")]
461            mdns_service_type: "_snapcast._tcp.local.".into(),
462            #[cfg(feature = "mdns")]
463            mdns_enabled: true,
464            #[cfg(feature = "mdns")]
465            mdns_name: "Snapserver".into(),
466            auth: None,
467            #[cfg(feature = "encryption")]
468            encryption_psk: None,
469            state_file: None,
470            send_audio_to_muted: false,
471        }
472    }
473}
474
475/// Per-stream configuration. If `None`, inherits from [`ServerConfig`].
476#[derive(Debug, Clone, Default)]
477pub struct StreamConfig {
478    /// Codec override (e.g. "flac", "f32lz4", "opus", "ogg", "pcm").
479    pub codec: Option<String>,
480    /// Sample format override (e.g. "48000:16:2").
481    pub sample_format: Option<String>,
482}
483
484/// The embeddable Snapcast server.
485pub struct SnapServer {
486    config: ServerConfig,
487    event_tx: mpsc::Sender<ServerEvent>,
488    command_tx: mpsc::Sender<ServerCommand>,
489    command_rx: Option<mpsc::Receiver<ServerCommand>>,
490    /// Named audio streams — each gets its own encoder at run().
491    streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
492    /// Broadcast channel for encoded chunks → sessions.
493    chunk_tx: broadcast::Sender<WireChunkData>,
494}
495
496/// Spawn a per-stream encode loop on a dedicated thread.
497///
498/// Receives `AudioFrame`, passes `AudioData` directly to the encoder,
499/// and broadcasts encoded `WireChunkData` to sessions.
500fn spawn_stream_encoder(
501    stream_id: String,
502    mut rx: mpsc::Receiver<AudioFrame>,
503    mut enc: Box<dyn encoder::Encoder>,
504    chunk_tx: broadcast::Sender<WireChunkData>,
505    sample_rate: u32,
506    channels: u16,
507) {
508    std::thread::spawn(move || {
509        let rt = tokio::runtime::Builder::new_current_thread()
510            .enable_time()
511            .build()
512            .expect("encoder runtime");
513
514        rt.block_on(async {
515            let mut next_tick: Option<tokio::time::Instant> = None;
516            while let Some(frame) = rx.recv().await {
517                // Pace F32 sources to realtime (pipe sources pace naturally via blocking read)
518                if let AudioData::F32(ref samples) = frame.data {
519                    let num_frames = samples.len() / channels.max(1) as usize;
520                    let chunk_dur = std::time::Duration::from_micros(
521                        (num_frames as u64 * 1_000_000) / sample_rate as u64,
522                    );
523                    let now = tokio::time::Instant::now();
524                    let tick = next_tick.get_or_insert(now);
525                    // Reset on gap (>500ms behind wall clock)
526                    if now.checked_duration_since(*tick + chunk_dur)
527                        > Some(std::time::Duration::from_millis(500))
528                    {
529                        *tick = now;
530                    }
531                    *tick += chunk_dur;
532                    tokio::time::sleep_until(*tick).await;
533                }
534                match enc.encode(&frame.data) {
535                    Ok(encoded) if !encoded.data.is_empty() => {
536                        let _ = chunk_tx.send(WireChunkData {
537                            stream_id: stream_id.clone(),
538                            timestamp_usec: frame.timestamp_usec,
539                            data: encoded.data,
540                        });
541                    }
542                    Err(e) => {
543                        tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
544                    }
545                    _ => {} // encoder buffering
546                }
547            }
548        });
549    });
550}
551
552/// Convert f32 samples to PCM bytes at the given bit depth.
553impl SnapServer {
554    /// Create a new server. Returns the server and event receiver.
555    pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
556        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
557        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
558        let (chunk_tx, _) = broadcast::channel(256);
559        let server = Self {
560            config,
561            event_tx,
562            command_tx,
563            command_rx: Some(command_rx),
564            streams: Vec::new(),
565            chunk_tx,
566        };
567        (server, event_rx)
568    }
569
570    /// Add a named audio stream. Returns a sender for pushing audio frames.
571    ///
572    /// Uses the server's default codec and sample format.
573    pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
574        self.add_stream_with_config(name, StreamConfig::default())
575    }
576
577    /// Add a named F32 audio stream with automatic chunking and timestamping.
578    ///
579    /// Returns an [`F32AudioSender`] that accepts variable-size F32 sample buffers
580    /// and handles 20ms chunking, monotonic timestamps, and gap detection internally.
581    ///
582    /// # Errors
583    /// Returns an error if the server's `sample_format` cannot be parsed.
584    pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
585        let sf: SampleFormat =
586            self.config.sample_format.parse().map_err(|e| {
587                format!("invalid sample_format '{}': {e}", self.config.sample_format)
588            })?;
589        let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
590        self.streams
591            .push((name.to_string(), StreamConfig::default(), rx));
592        Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
593    }
594
595    /// Add a named audio stream with per-stream codec/format overrides.
596    pub fn add_stream_with_config(
597        &mut self,
598        name: &str,
599        config: StreamConfig,
600    ) -> mpsc::Sender<AudioFrame> {
601        let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
602        self.streams.push((name.to_string(), config, rx));
603        tx
604    }
605
606    /// Get a cloneable command sender.
607    pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
608        self.command_tx.clone()
609    }
610
611    /// Access the server configuration.
612    pub fn config(&self) -> &ServerConfig {
613        &self.config
614    }
615
616    /// Run the server. Blocks until stopped or a fatal error occurs.
617    pub async fn run(&mut self) -> anyhow::Result<()> {
618        let mut command_rx = self
619            .command_rx
620            .take()
621            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
622
623        let event_tx = self.event_tx.clone();
624
625        let sample_format: snapcast_proto::SampleFormat = self
626            .config
627            .sample_format
628            .parse()
629            .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
630
631        anyhow::ensure!(
632            !self.streams.is_empty(),
633            "No streams configured — call add_stream() before run()"
634        );
635
636        tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
637
638        // Advertise via mDNS (protocol-level discovery)
639        #[cfg(feature = "mdns")]
640        let _mdns = if self.config.mdns_enabled {
641            mdns::MdnsAdvertiser::new(
642                self.config.stream_port,
643                &self.config.mdns_service_type,
644                &self.config.mdns_name,
645            )
646            .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
647            .ok()
648        } else {
649            None
650        };
651
652        // Create default encoder — used for codec header and first default stream
653        let default_enc_config = encoder::EncoderConfig {
654            codec: self.config.codec.clone(),
655            format: sample_format,
656            options: String::new(),
657            #[cfg(feature = "encryption")]
658            encryption_psk: self.config.encryption_psk.clone(),
659        };
660        let default_enc = encoder::create(&default_enc_config)?;
661
662        // Spawn per-stream encode loops — reuse default_enc for first default stream
663        let chunk_tx = self.chunk_tx.clone();
664        let streams = std::mem::take(&mut self.streams);
665        let mut default_enc = Some(default_enc);
666
667        // Shared state for command handlers
668        let initial_state = self
669            .config
670            .state_file
671            .as_ref()
672            .map(|p| state::ServerState::load(p))
673            .unwrap_or_default();
674        let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
675
676        // Create session server before stream registration
677        // (first_stream_name set in loop below, but SessionServer only needs it for default routing)
678        let first_name = streams
679            .first()
680            .map(|(n, _, _)| n.clone())
681            .unwrap_or_default();
682        let session_srv = Arc::new(session::SessionServer::new(
683            self.config.stream_port,
684            self.config.buffer_ms as i32,
685            self.config.auth.clone(),
686            Arc::clone(&shared_state),
687            first_name.clone(),
688            self.config.send_audio_to_muted,
689        ));
690
691        for (name, stream_cfg, rx) in streams {
692            {
693                let mut s = shared_state.lock().await;
694                s.streams.push(state::StreamInfo {
695                    id: name.clone(),
696                    status: "idle".into(),
697                    uri: String::new(),
698                    properties: Default::default(),
699                });
700            }
701            let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
702                if let Some(enc) = default_enc.take() {
703                    enc
704                } else {
705                    encoder::create(&default_enc_config)?
706                }
707            } else {
708                let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
709                let stream_format: snapcast_proto::SampleFormat = stream_cfg
710                    .sample_format
711                    .as_deref()
712                    .and_then(|s| s.parse().ok())
713                    .unwrap_or(sample_format);
714                encoder::create(&encoder::EncoderConfig {
715                    codec: stream_codec.to_string(),
716                    format: stream_format,
717                    options: String::new(),
718                    #[cfg(feature = "encryption")]
719                    encryption_psk: self.config.encryption_psk.clone(),
720                })?
721            };
722            tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
723            session_srv
724                .register_stream_codec(&name, enc.name(), enc.header())
725                .await;
726            spawn_stream_encoder(
727                name,
728                rx,
729                enc,
730                chunk_tx.clone(),
731                sample_format.rate(),
732                sample_format.channels(),
733            );
734        }
735
736        let session_for_run = Arc::clone(&session_srv);
737        let session_event_tx = event_tx.clone();
738        let session_chunk_tx = self.chunk_tx.clone();
739        let session_handle = tokio::spawn(async move {
740            if let Err(e) = session_for_run
741                .run(session_chunk_tx, session_event_tx)
742                .await
743            {
744                tracing::error!(error = %e, "Session server error");
745            }
746        });
747
748        let state_file = self.config.state_file.clone();
749        let save_state = |s: &state::ServerState| {
750            if let Some(ref path) = state_file {
751                let _ = s
752                    .save(path)
753                    .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
754            }
755        };
756
757        // Main loop
758        loop {
759            tokio::select! {
760                cmd = command_rx.recv() => {
761                    match cmd {
762                        Some(ServerCommand::Stop) | None => {
763                            tracing::info!("Server stopped");
764                            session_handle.abort();
765                            return Ok(());
766                        }
767                        Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
768                            let mut s = shared_state.lock().await;
769                            if let Some(c) = s.clients.get_mut(&client_id) {
770                                c.config.volume.percent = volume;
771                                c.config.volume.muted = muted;
772                            }
773                            let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
774                            save_state(&s);
775                            drop(s);
776                            session_srv.push_settings(ClientSettingsUpdate {
777                                client_id: client_id.clone(),
778                                buffer_ms: self.config.buffer_ms as i32,
779                                latency, volume, muted,
780                            }).await;
781                            let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
782                            session_srv.update_routing_for_client(&client_id).await;
783                        }
784                        Some(ServerCommand::SetClientLatency { client_id, latency }) => {
785                            let mut s = shared_state.lock().await;
786                            if let Some(c) = s.clients.get_mut(&client_id) {
787                                c.config.latency = latency;
788                                session_srv.push_settings(ClientSettingsUpdate {
789                                    client_id: client_id.clone(),
790                                    buffer_ms: self.config.buffer_ms as i32,
791                                    latency,
792                                    volume: c.config.volume.percent,
793                                    muted: c.config.volume.muted,
794                                }).await;
795                            }
796                            save_state(&s);
797                            drop(s);
798                            let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
799                        }
800                        Some(ServerCommand::SetClientName { client_id, name }) => {
801                            let mut s = shared_state.lock().await;
802                            if let Some(c) = s.clients.get_mut(&client_id) {
803                                c.config.name = name.clone();
804                            }
805                            save_state(&s);
806                            drop(s);
807                            let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
808                        }
809                        Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
810                            let mut s = shared_state.lock().await;
811                            s.set_group_stream(&group_id, &stream_id);
812                            save_state(&s);
813                            drop(s);
814                            let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
815                            session_srv.update_routing_for_group(&group_id).await;
816                        }
817                        Some(ServerCommand::SetGroupMute { group_id, muted }) => {
818                            let mut s = shared_state.lock().await;
819                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
820                                g.muted = muted;
821                            }
822                            save_state(&s);
823                            drop(s);
824                            let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
825                            session_srv.update_routing_for_group(&group_id).await;
826                        }
827                        Some(ServerCommand::SetGroupName { group_id, name }) => {
828                            let mut s = shared_state.lock().await;
829                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
830                                g.name = name.clone();
831                            }
832                            save_state(&s);
833                            drop(s);
834                            let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
835                        }
836                        Some(ServerCommand::SetGroupClients { group_id, clients }) => {
837                            let mut s = shared_state.lock().await;
838                            s.set_group_clients(&group_id, &clients);
839                            save_state(&s);
840                            drop(s);
841                            // Structural change — mirrors Server.OnUpdate in C++ snapserver
842                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
843                            session_srv.update_routing_all().await;
844                        }
845                        Some(ServerCommand::DeleteClient { client_id }) => {
846                            let mut s = shared_state.lock().await;
847                            s.remove_client_from_groups(&client_id);
848                            s.clients.remove(&client_id);
849                            save_state(&s);
850                            drop(s);
851                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
852                            session_srv.update_routing_all().await;
853                        }
854                        Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
855                            let mut s = shared_state.lock().await;
856                            if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
857                                stream.properties = metadata.clone();
858                            }
859                            drop(s);
860                            let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
861                        }
862                        Some(ServerCommand::AddStream { uri, response_tx }) => {
863                            // Parse stream name from URI query param, or use the URI as ID
864                            let name = uri.split("name=").nth(1)
865                                .and_then(|s| s.split('&').next())
866                                .unwrap_or("dynamic")
867                                .to_string();
868                            let mut s = shared_state.lock().await;
869                            if s.streams.iter().any(|st| st.id == name) {
870                                let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
871                            } else {
872                                s.streams.push(state::StreamInfo {
873                                    id: name.clone(),
874                                    status: "idle".into(),
875                                    uri: uri.clone(),
876                                    properties: Default::default(),
877                                });
878                                save_state(&s);
879                                drop(s);
880                                let _ = event_tx.try_send(ServerEvent::ServerUpdated);
881                                let _ = response_tx.send(Ok(name));
882                            }
883                        }
884                        Some(ServerCommand::RemoveStream { stream_id }) => {
885                            let mut s = shared_state.lock().await;
886                            s.streams.retain(|st| st.id != stream_id);
887                            // Clear stream_id on groups that referenced this stream
888                            for g in &mut s.groups {
889                                if g.stream_id == stream_id {
890                                    g.stream_id.clear();
891                                }
892                            }
893                            save_state(&s);
894                            drop(s);
895                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
896                            session_srv.update_routing_all().await;
897                        }
898                        Some(ServerCommand::StreamControl { stream_id, command, params }) => {
899                            tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
900                            // Forward to embedder via event — the library doesn't own stream readers
901                            let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
902                        }
903                        Some(ServerCommand::GetStatus { response_tx }) => {
904                            let s = shared_state.lock().await;
905                            let _ = response_tx.send(s.to_status());
906                        }
907                        #[cfg(feature = "custom-protocol")]
908                        Some(ServerCommand::SendToClient { client_id, message }) => {
909                            session_srv.send_custom(&client_id, message.type_id, message.payload).await;
910                        }
911                    }
912                }
913            }
914        }
915    }
916}