Skip to main content

snapcast_server/
lib.rs

1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7//! Snapcast server library — embeddable synchronized multiroom audio server.
8//!
9//! See also: [`snapcast-client`](https://docs.rs/snapcast-client) for the client library.
10//! # Architecture
11//!
12//! The server is built around a channel-based API matching `snapcast-client`:
13//!
14//! - [`SnapServer`] is the main entry point
15//! - [`ServerEvent`] flows from server → consumer (client connected, stream status, custom messages)
16//! - [`ServerCommand`] flows from consumer → server (typed mutations, custom messages, stop)
17//!
18//! # Example
19//!
20//! ```no_run
21//! use snapcast_server::{SnapServer, ServerConfig, ServerEvent, ServerCommand};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let config = ServerConfig::default();
25//! let (mut server, mut events) = SnapServer::new(config);
26//! let _audio_tx = server.add_stream("default");
27//! let cmd = server.command_sender();
28//!
29//! tokio::spawn(async move {
30//!     while let Some(event) = events.recv().await {
31//!         match event {
32//!             ServerEvent::ClientConnected { id, name, .. } => {
33//!                 tracing::info!(id, name, "Client connected");
34//!             }
35//!             _ => {}
36//!         }
37//!     }
38//! });
39//!
40//! server.run().await?;
41//! # Ok(())
42//! # }
43//! ```
44
45use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49// Re-export proto types that embedders need
50#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
56
57const EVENT_CHANNEL_SIZE: usize = 256;
58const COMMAND_CHANNEL_SIZE: usize = 64;
59const AUDIO_CHANNEL_SIZE: usize = 256;
60
61/// Channel size for F32 embedded sources — backpressure from encoder pacing.
62const F32_CHANNEL_SIZE: usize = 1;
63
64/// Audio data pushed by the consumer — either f32 or raw PCM.
65#[derive(Debug, Clone)]
66pub enum AudioData {
67    /// Interleaved f32 samples (from DSP, EQ, AirPlay receivers).
68    /// Range: -1.0 to 1.0.
69    F32(Vec<f32>),
70    /// Raw interleaved PCM bytes at the stream's configured sample format
71    /// (from pipe/file/process readers). Byte order: little-endian.
72    Pcm(Vec<u8>),
73}
74
75/// A timestamped audio frame for server input.
76#[derive(Debug, Clone)]
77pub struct AudioFrame {
78    /// Audio samples.
79    pub data: AudioData,
80    /// Timestamp in microseconds (server time).
81    pub timestamp_usec: i64,
82}
83
84/// Buffered sender for F32 audio that handles chunking, timestamping, and gap detection.
85///
86/// Accumulates variable-size F32 sample buffers and emits fixed-size 20ms chunks
87/// with monotonic timestamps. Automatically resets on playback gaps (>500ms).
88///
89/// Created by [`SnapServer::add_f32_stream`].
90pub struct F32AudioSender {
91    tx: mpsc::Sender<AudioFrame>,
92    buf: Vec<f32>,
93    chunk_samples: usize,
94    channels: u16,
95    sample_rate: u32,
96    ts: Option<time::ChunkTimestamper>,
97    last_send: std::time::Instant,
98}
99
100impl F32AudioSender {
101    fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
102        let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
103        Self {
104            tx,
105            buf: Vec::with_capacity(chunk_samples * 2),
106            chunk_samples,
107            channels,
108            sample_rate,
109            ts: None,
110            last_send: std::time::Instant::now(),
111        }
112    }
113
114    /// Push interleaved F32 samples. Variable-size input is accumulated and
115    /// emitted as fixed 20ms chunks. Returns when all complete chunks are sent.
116    pub async fn send(
117        &mut self,
118        samples: &[f32],
119    ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
120        let now = std::time::Instant::now();
121        if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
122            self.ts = None;
123            self.buf.clear();
124        }
125        self.last_send = now;
126
127        self.buf.extend_from_slice(samples);
128        let ch = self.channels.max(1) as usize;
129        while self.buf.len() >= self.chunk_samples {
130            let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
131            let frames = (self.chunk_samples / ch) as u32;
132            let ts = self
133                .ts
134                .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
135            let timestamp_usec = ts.next(frames);
136            self.tx
137                .send(AudioFrame {
138                    data: AudioData::F32(chunk),
139                    timestamp_usec,
140                })
141                .await?;
142        }
143        Ok(())
144    }
145
146    /// Flush remaining samples (< 20ms) as a final short chunk.
147    /// Call at end-of-track to avoid losing the last few milliseconds.
148    pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
149        if self.buf.is_empty() {
150            return Ok(());
151        }
152        let chunk: Vec<f32> = self.buf.drain(..).collect();
153        let ch = self.channels.max(1) as usize;
154        let frames = (chunk.len() / ch) as u32;
155        let ts = self
156            .ts
157            .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
158        let timestamp_usec = ts.next(frames);
159        self.tx
160            .send(AudioFrame {
161                data: AudioData::F32(chunk),
162                timestamp_usec,
163            })
164            .await
165    }
166}
167
168/// An encoded audio chunk ready to be sent to clients.
169#[derive(Debug, Clone)]
170pub struct WireChunkData {
171    /// Stream ID this chunk belongs to.
172    pub stream_id: String,
173    /// Server timestamp in microseconds.
174    pub timestamp_usec: i64,
175    /// Encoded audio data.
176    pub data: Vec<u8>,
177}
178
179pub mod auth;
180#[cfg(feature = "encryption")]
181pub(crate) mod crypto;
182pub(crate) mod encoder;
183#[cfg(feature = "mdns")]
184pub(crate) mod mdns;
185pub(crate) mod session;
186pub(crate) mod state;
187pub mod status;
188pub(crate) mod stream;
189pub mod time;
190
191/// Settings update pushed to a streaming client via binary protocol.
192#[derive(Debug, Clone)]
193pub struct ClientSettingsUpdate {
194    /// Target client ID.
195    pub client_id: String,
196    /// Buffer size in ms.
197    pub buffer_ms: i32,
198    /// Latency offset in ms.
199    pub latency: i32,
200    /// Volume (0–100).
201    pub volume: u16,
202    /// Mute state.
203    pub muted: bool,
204}
205
206/// Events emitted by the server to the consumer.
207#[derive(Debug)]
208#[non_exhaustive]
209pub enum ServerEvent {
210    /// A client connected via the binary protocol.
211    ClientConnected {
212        /// Unique client identifier.
213        id: String,
214        /// Client hostname.
215        name: String,
216        /// Client MAC address.
217        mac: String,
218    },
219    /// A client disconnected.
220    ClientDisconnected {
221        /// Unique client identifier.
222        id: String,
223    },
224    /// A client's volume changed.
225    ClientVolumeChanged {
226        /// Client ID.
227        client_id: String,
228        /// New volume (0–100).
229        volume: u16,
230        /// Mute state.
231        muted: bool,
232    },
233    /// A client's latency changed.
234    ClientLatencyChanged {
235        /// Client ID.
236        client_id: String,
237        /// New latency in ms.
238        latency: i32,
239    },
240    /// A client's name changed.
241    ClientNameChanged {
242        /// Client ID.
243        client_id: String,
244        /// New name.
245        name: String,
246    },
247    /// A group's stream assignment changed.
248    GroupStreamChanged {
249        /// Group ID.
250        group_id: String,
251        /// New stream ID.
252        stream_id: String,
253    },
254    /// A group's mute state changed.
255    GroupMuteChanged {
256        /// Group ID.
257        group_id: String,
258        /// Mute state.
259        muted: bool,
260    },
261    /// A stream's status changed (playing, idle, unknown).
262    StreamStatus {
263        /// Stream identifier.
264        stream_id: String,
265        /// New status.
266        status: String,
267    },
268    /// Stream metadata/properties changed.
269    StreamMetaChanged {
270        /// Stream identifier.
271        stream_id: String,
272        /// Updated properties.
273        metadata: std::collections::HashMap<String, serde_json::Value>,
274    },
275    /// A group's name changed.
276    GroupNameChanged {
277        /// Group ID.
278        group_id: String,
279        /// New name.
280        name: String,
281    },
282    /// Server state changed — groups were reorganized (created, deleted, merged).
283    ///
284    /// Emitted after structural changes like `SetGroupClients` or `DeleteClient`
285    /// when the group topology changes. Mirrors `Server.OnUpdate` in the C++ snapserver.
286    /// The consumer should re-read server status via `GetStatus`.
287    ServerUpdated,
288    /// A stream control command was received (play, pause, next, seek, etc.).
289    ///
290    /// The library forwards this to the embedder since it doesn't own stream readers.
291    StreamControl {
292        /// Stream ID.
293        stream_id: String,
294        /// Command name.
295        command: String,
296        /// Optional parameters.
297        params: serde_json::Value,
298    },
299    /// Custom binary protocol message from a streaming client.
300    #[cfg(feature = "custom-protocol")]
301    CustomMessage {
302        /// Client ID.
303        client_id: String,
304        /// The custom message.
305        message: snapcast_proto::CustomMessage,
306    },
307}
308
309/// Commands the consumer sends to the server.
310#[derive(Debug)]
311#[non_exhaustive]
312pub enum ServerCommand {
313    /// Set a client's volume.
314    SetClientVolume {
315        /// Client ID.
316        client_id: String,
317        /// Volume (0–100).
318        volume: u16,
319        /// Mute state.
320        muted: bool,
321    },
322    /// Set a client's latency offset.
323    SetClientLatency {
324        /// Client ID.
325        client_id: String,
326        /// Latency in milliseconds.
327        latency: i32,
328    },
329    /// Set a client's display name.
330    SetClientName {
331        /// Client ID.
332        client_id: String,
333        /// New name.
334        name: String,
335    },
336    /// Assign a stream to a group.
337    SetGroupStream {
338        /// Group ID.
339        group_id: String,
340        /// Stream ID.
341        stream_id: String,
342    },
343    /// Mute/unmute a group.
344    SetGroupMute {
345        /// Group ID.
346        group_id: String,
347        /// Mute state.
348        muted: bool,
349    },
350    /// Set a group's display name.
351    SetGroupName {
352        /// Group ID.
353        group_id: String,
354        /// New name.
355        name: String,
356    },
357    /// Move clients to a group.
358    SetGroupClients {
359        /// Group ID.
360        group_id: String,
361        /// Client IDs.
362        clients: Vec<String>,
363    },
364    /// Delete a client from the server.
365    DeleteClient {
366        /// Client ID.
367        client_id: String,
368    },
369    /// Set stream metadata (artist, title, album, etc.).
370    SetStreamMeta {
371        /// Stream ID.
372        stream_id: String,
373        /// Metadata key-value pairs.
374        metadata: std::collections::HashMap<String, serde_json::Value>,
375    },
376    /// Dynamically add a stream source.
377    AddStream {
378        /// Stream source URI (e.g. `pipe:///tmp/snapfifo?name=default`).
379        uri: String,
380        /// Response: the stream ID assigned.
381        response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
382    },
383    /// Remove a stream source.
384    RemoveStream {
385        /// Stream ID to remove.
386        stream_id: String,
387    },
388    /// Forward a control command to a stream (play, pause, next, etc.).
389    StreamControl {
390        /// Stream ID.
391        stream_id: String,
392        /// Command name (e.g. "next", "previous", "pause", "seek").
393        command: String,
394        /// Optional command parameter (e.g. seek position).
395        params: serde_json::Value,
396    },
397    /// Get full server status.
398    GetStatus {
399        /// Response channel.
400        response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
401    },
402    /// Send a custom binary protocol message to a streaming client.
403    #[cfg(feature = "custom-protocol")]
404    SendToClient {
405        /// Target client ID.
406        client_id: String,
407        /// The custom message.
408        message: snapcast_proto::CustomMessage,
409    },
410    /// Stop the server gracefully.
411    Stop,
412}
413
414/// Default codec based on compiled features.
415fn default_codec() -> &'static str {
416    #[cfg(feature = "flac")]
417    return "flac";
418    #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
419    return "f32lz4";
420    #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
421    return "pcm";
422}
423
424/// Server configuration for the embeddable library.
425pub struct ServerConfig {
426    /// TCP port for binary protocol (client connections). Default: 1704.
427    pub stream_port: u16,
428    /// Audio buffer size in milliseconds. Default: 1000.
429    pub buffer_ms: u32,
430    /// Default codec: "f32lz4", "pcm", "opus", "ogg". Default: "f32lz4".
431    pub codec: String,
432    /// Default sample format. Default: 48000:16:2.
433    pub sample_format: String,
434    /// mDNS service type. Default: "_snapcast._tcp.local.".
435    #[cfg(feature = "mdns")]
436    pub mdns_service_type: String,
437    /// Enable mDNS advertisement. Default: true (when mdns feature is compiled in).
438    #[cfg(feature = "mdns")]
439    pub mdns_enabled: bool,
440    /// mDNS service name. Default: "Snapserver".
441    #[cfg(feature = "mdns")]
442    pub mdns_name: String,
443    /// Auth validator for streaming clients. `None` = no auth required.
444    pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
445    /// Pre-shared key for f32lz4 encryption. `None` = no encryption.
446    #[cfg(feature = "encryption")]
447    pub encryption_psk: Option<String>,
448    /// Path to persist server state (clients, groups). `None` = no persistence.
449    pub state_file: Option<std::path::PathBuf>,
450    /// Send audio data to muted clients. Default: false (skip muted, saves bandwidth).
451    pub send_audio_to_muted: bool,
452}
453
454impl Default for ServerConfig {
455    fn default() -> Self {
456        Self {
457            stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
458            buffer_ms: 1000,
459            codec: default_codec().into(),
460            sample_format: "48000:16:2".into(),
461            #[cfg(feature = "mdns")]
462            mdns_service_type: "_snapcast._tcp.local.".into(),
463            #[cfg(feature = "mdns")]
464            mdns_enabled: true,
465            #[cfg(feature = "mdns")]
466            mdns_name: "Snapserver".into(),
467            auth: None,
468            #[cfg(feature = "encryption")]
469            encryption_psk: None,
470            state_file: None,
471            send_audio_to_muted: false,
472        }
473    }
474}
475
476/// Per-stream configuration. If `None`, inherits from [`ServerConfig`].
477#[derive(Debug, Clone, Default)]
478pub struct StreamConfig {
479    /// Codec override (e.g. "flac", "f32lz4", "opus", "ogg", "pcm").
480    pub codec: Option<String>,
481    /// Sample format override (e.g. "48000:16:2").
482    pub sample_format: Option<String>,
483}
484
485/// The embeddable Snapcast server.
486pub struct SnapServer {
487    config: ServerConfig,
488    event_tx: mpsc::Sender<ServerEvent>,
489    command_tx: mpsc::Sender<ServerCommand>,
490    command_rx: Option<mpsc::Receiver<ServerCommand>>,
491    /// Named audio streams — each gets its own encoder at run().
492    streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
493    /// Broadcast channel for encoded chunks → sessions.
494    chunk_tx: broadcast::Sender<WireChunkData>,
495}
496
497/// Spawn a per-stream encode loop on a dedicated thread.
498///
499/// Receives `AudioFrame`, passes `AudioData` directly to the encoder,
500/// and broadcasts encoded `WireChunkData` to sessions.
501fn spawn_stream_encoder(
502    stream_id: String,
503    mut rx: mpsc::Receiver<AudioFrame>,
504    mut enc: Box<dyn encoder::Encoder>,
505    chunk_tx: broadcast::Sender<WireChunkData>,
506    sample_rate: u32,
507    channels: u16,
508) {
509    std::thread::spawn(move || {
510        let rt = tokio::runtime::Builder::new_current_thread()
511            .enable_time()
512            .build()
513            .expect("encoder runtime");
514
515        rt.block_on(async {
516            let mut next_tick: Option<tokio::time::Instant> = None;
517            while let Some(frame) = rx.recv().await {
518                // Pace F32 sources to realtime (pipe sources pace naturally via blocking read)
519                if let AudioData::F32(ref samples) = frame.data {
520                    let num_frames = samples.len() / channels.max(1) as usize;
521                    let chunk_dur = std::time::Duration::from_micros(
522                        (num_frames as u64 * 1_000_000) / sample_rate as u64,
523                    );
524                    let now = tokio::time::Instant::now();
525                    let tick = next_tick.get_or_insert(now);
526                    // Reset on gap (>500ms behind wall clock)
527                    if now.checked_duration_since(*tick + chunk_dur)
528                        > Some(std::time::Duration::from_millis(500))
529                    {
530                        *tick = now;
531                    }
532                    *tick += chunk_dur;
533                    tokio::time::sleep_until(*tick).await;
534                }
535                match enc.encode(&frame.data) {
536                    Ok(encoded) if !encoded.data.is_empty() => {
537                        let _ = chunk_tx.send(WireChunkData {
538                            stream_id: stream_id.clone(),
539                            timestamp_usec: frame.timestamp_usec,
540                            data: encoded.data,
541                        });
542                    }
543                    Err(e) => {
544                        tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
545                    }
546                    _ => {} // encoder buffering
547                }
548            }
549        });
550    });
551}
552
553/// Convert f32 samples to PCM bytes at the given bit depth.
554impl SnapServer {
555    /// Create a new server. Returns the server and event receiver.
556    pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
557        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
558        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
559        let (chunk_tx, _) = broadcast::channel(256);
560        let server = Self {
561            config,
562            event_tx,
563            command_tx,
564            command_rx: Some(command_rx),
565            streams: Vec::new(),
566            chunk_tx,
567        };
568        (server, event_rx)
569    }
570
571    /// Add a named audio stream. Returns a sender for pushing audio frames.
572    ///
573    /// Uses the server's default codec and sample format.
574    pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
575        self.add_stream_with_config(name, StreamConfig::default())
576    }
577
578    /// Add a named F32 audio stream with automatic chunking and timestamping.
579    ///
580    /// Returns an [`F32AudioSender`] that accepts variable-size F32 sample buffers
581    /// and handles 20ms chunking, monotonic timestamps, and gap detection internally.
582    ///
583    /// # Errors
584    /// Returns an error if the server's `sample_format` cannot be parsed.
585    pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
586        let sf: SampleFormat =
587            self.config.sample_format.parse().map_err(|e| {
588                format!("invalid sample_format '{}': {e}", self.config.sample_format)
589            })?;
590        let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
591        self.streams
592            .push((name.to_string(), StreamConfig::default(), rx));
593        Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
594    }
595
596    /// Add a named audio stream with per-stream codec/format overrides.
597    pub fn add_stream_with_config(
598        &mut self,
599        name: &str,
600        config: StreamConfig,
601    ) -> mpsc::Sender<AudioFrame> {
602        let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
603        self.streams.push((name.to_string(), config, rx));
604        tx
605    }
606
607    /// Get a cloneable command sender.
608    pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
609        self.command_tx.clone()
610    }
611
612    /// Access the server configuration.
613    pub fn config(&self) -> &ServerConfig {
614        &self.config
615    }
616
617    /// Run the server. Blocks until stopped or a fatal error occurs.
618    pub async fn run(&mut self) -> anyhow::Result<()> {
619        let mut command_rx = self
620            .command_rx
621            .take()
622            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
623
624        let event_tx = self.event_tx.clone();
625
626        let sample_format: snapcast_proto::SampleFormat = self
627            .config
628            .sample_format
629            .parse()
630            .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
631
632        anyhow::ensure!(
633            !self.streams.is_empty(),
634            "No streams configured — call add_stream() before run()"
635        );
636
637        tracing::info!(stream_port = self.config.stream_port, "Snapserver starting");
638
639        // Advertise via mDNS (protocol-level discovery)
640        #[cfg(feature = "mdns")]
641        let _mdns = if self.config.mdns_enabled {
642            mdns::MdnsAdvertiser::new(
643                self.config.stream_port,
644                &self.config.mdns_service_type,
645                &self.config.mdns_name,
646            )
647            .map_err(|e| tracing::warn!(error = %e, "mDNS advertisement failed"))
648            .ok()
649        } else {
650            None
651        };
652
653        // Create default encoder — used for codec header and first default stream
654        let default_enc_config = encoder::EncoderConfig {
655            codec: self.config.codec.clone(),
656            format: sample_format,
657            options: String::new(),
658            #[cfg(feature = "encryption")]
659            encryption_psk: self.config.encryption_psk.clone(),
660        };
661        let default_enc = encoder::create(&default_enc_config)?;
662
663        // Spawn per-stream encode loops — reuse default_enc for first default stream
664        let chunk_tx = self.chunk_tx.clone();
665        let streams = std::mem::take(&mut self.streams);
666        let mut default_enc = Some(default_enc);
667
668        // Shared state for command handlers
669        let initial_state = self
670            .config
671            .state_file
672            .as_ref()
673            .map(|p| state::ServerState::load(p))
674            .unwrap_or_default();
675        let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
676
677        // Create session server before stream registration
678        // (first_stream_name set in loop below, but SessionServer only needs it for default routing)
679        let first_name = streams
680            .first()
681            .map(|(n, _, _)| n.clone())
682            .unwrap_or_default();
683        let session_srv = Arc::new(session::SessionServer::new(
684            self.config.stream_port,
685            self.config.buffer_ms as i32,
686            self.config.auth.clone(),
687            Arc::clone(&shared_state),
688            first_name.clone(),
689            self.config.send_audio_to_muted,
690        ));
691
692        for (name, stream_cfg, rx) in streams {
693            {
694                let mut s = shared_state.lock().await;
695                s.streams.push(state::StreamInfo {
696                    id: name.clone(),
697                    status: "idle".into(),
698                    uri: String::new(),
699                    properties: Default::default(),
700                });
701            }
702            let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
703                if let Some(enc) = default_enc.take() {
704                    enc
705                } else {
706                    encoder::create(&default_enc_config)?
707                }
708            } else {
709                let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
710                let stream_format: snapcast_proto::SampleFormat = stream_cfg
711                    .sample_format
712                    .as_deref()
713                    .and_then(|s| s.parse().ok())
714                    .unwrap_or(sample_format);
715                encoder::create(&encoder::EncoderConfig {
716                    codec: stream_codec.to_string(),
717                    format: stream_format,
718                    options: String::new(),
719                    #[cfg(feature = "encryption")]
720                    encryption_psk: self.config.encryption_psk.clone(),
721                })?
722            };
723            tracing::info!(stream = %name, codec = enc.name(), %sample_format, "Stream registered");
724            session_srv
725                .register_stream_codec(&name, enc.name(), enc.header())
726                .await;
727            spawn_stream_encoder(
728                name,
729                rx,
730                enc,
731                chunk_tx.clone(),
732                sample_format.rate(),
733                sample_format.channels(),
734            );
735        }
736
737        let session_for_run = Arc::clone(&session_srv);
738        let session_event_tx = event_tx.clone();
739        let session_chunk_tx = self.chunk_tx.clone();
740        let session_handle = tokio::spawn(async move {
741            if let Err(e) = session_for_run
742                .run(session_chunk_tx, session_event_tx)
743                .await
744            {
745                tracing::error!(error = %e, "Session server error");
746            }
747        });
748
749        let state_file = self.config.state_file.clone();
750        let save_state = |s: &state::ServerState| {
751            if let Some(ref path) = state_file {
752                let _ = s
753                    .save(path)
754                    .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
755            }
756        };
757
758        // Main loop
759        loop {
760            tokio::select! {
761                cmd = command_rx.recv() => {
762                    match cmd {
763                        Some(ServerCommand::Stop) | None => {
764                            tracing::info!("Server stopped");
765                            session_handle.abort();
766                            return Ok(());
767                        }
768                        Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
769                            let mut s = shared_state.lock().await;
770                            if let Some(c) = s.clients.get_mut(&client_id) {
771                                c.config.volume.percent = volume;
772                                c.config.volume.muted = muted;
773                            }
774                            let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
775                            save_state(&s);
776                            drop(s);
777                            session_srv.push_settings(ClientSettingsUpdate {
778                                client_id: client_id.clone(),
779                                buffer_ms: self.config.buffer_ms as i32,
780                                latency, volume, muted,
781                            }).await;
782                            let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
783                            session_srv.update_routing_for_client(&client_id).await;
784                        }
785                        Some(ServerCommand::SetClientLatency { client_id, latency }) => {
786                            let mut s = shared_state.lock().await;
787                            if let Some(c) = s.clients.get_mut(&client_id) {
788                                c.config.latency = latency;
789                                session_srv.push_settings(ClientSettingsUpdate {
790                                    client_id: client_id.clone(),
791                                    buffer_ms: self.config.buffer_ms as i32,
792                                    latency,
793                                    volume: c.config.volume.percent,
794                                    muted: c.config.volume.muted,
795                                }).await;
796                            }
797                            save_state(&s);
798                            drop(s);
799                            let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
800                        }
801                        Some(ServerCommand::SetClientName { client_id, name }) => {
802                            let mut s = shared_state.lock().await;
803                            if let Some(c) = s.clients.get_mut(&client_id) {
804                                c.config.name = name.clone();
805                            }
806                            save_state(&s);
807                            drop(s);
808                            let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
809                        }
810                        Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
811                            let mut s = shared_state.lock().await;
812                            s.set_group_stream(&group_id, &stream_id);
813                            save_state(&s);
814                            drop(s);
815                            let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
816                            session_srv.update_routing_for_group(&group_id).await;
817                        }
818                        Some(ServerCommand::SetGroupMute { group_id, muted }) => {
819                            let mut s = shared_state.lock().await;
820                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
821                                g.muted = muted;
822                            }
823                            save_state(&s);
824                            drop(s);
825                            let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
826                            session_srv.update_routing_for_group(&group_id).await;
827                        }
828                        Some(ServerCommand::SetGroupName { group_id, name }) => {
829                            let mut s = shared_state.lock().await;
830                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
831                                g.name = name.clone();
832                            }
833                            save_state(&s);
834                            drop(s);
835                            let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
836                        }
837                        Some(ServerCommand::SetGroupClients { group_id, clients }) => {
838                            let mut s = shared_state.lock().await;
839                            s.set_group_clients(&group_id, &clients);
840                            save_state(&s);
841                            drop(s);
842                            // Structural change — mirrors Server.OnUpdate in C++ snapserver
843                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
844                            session_srv.update_routing_all().await;
845                        }
846                        Some(ServerCommand::DeleteClient { client_id }) => {
847                            let mut s = shared_state.lock().await;
848                            s.remove_client_from_groups(&client_id);
849                            s.clients.remove(&client_id);
850                            save_state(&s);
851                            drop(s);
852                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
853                            session_srv.update_routing_all().await;
854                        }
855                        Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
856                            let mut s = shared_state.lock().await;
857                            if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
858                                stream.properties = metadata.clone();
859                            }
860                            drop(s);
861                            let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
862                        }
863                        Some(ServerCommand::AddStream { uri, response_tx }) => {
864                            // Parse stream name from URI query param, or use the URI as ID
865                            let name = uri.split("name=").nth(1)
866                                .and_then(|s| s.split('&').next())
867                                .unwrap_or("dynamic")
868                                .to_string();
869                            let mut s = shared_state.lock().await;
870                            if s.streams.iter().any(|st| st.id == name) {
871                                let _ = response_tx.send(Err(format!("Stream '{name}' already exists")));
872                            } else {
873                                s.streams.push(state::StreamInfo {
874                                    id: name.clone(),
875                                    status: "idle".into(),
876                                    uri: uri.clone(),
877                                    properties: Default::default(),
878                                });
879                                save_state(&s);
880                                drop(s);
881                                let _ = event_tx.try_send(ServerEvent::ServerUpdated);
882                                let _ = response_tx.send(Ok(name));
883                            }
884                        }
885                        Some(ServerCommand::RemoveStream { stream_id }) => {
886                            let mut s = shared_state.lock().await;
887                            s.streams.retain(|st| st.id != stream_id);
888                            // Clear stream_id on groups that referenced this stream
889                            for g in &mut s.groups {
890                                if g.stream_id == stream_id {
891                                    g.stream_id.clear();
892                                }
893                            }
894                            save_state(&s);
895                            drop(s);
896                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
897                            session_srv.update_routing_all().await;
898                        }
899                        Some(ServerCommand::StreamControl { stream_id, command, params }) => {
900                            tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
901                            // Forward to embedder via event — the library doesn't own stream readers
902                            let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
903                        }
904                        Some(ServerCommand::GetStatus { response_tx }) => {
905                            let s = shared_state.lock().await;
906                            let _ = response_tx.send(s.to_status());
907                        }
908                        #[cfg(feature = "custom-protocol")]
909                        Some(ServerCommand::SendToClient { client_id, message }) => {
910                            session_srv.send_custom(&client_id, message.type_id, message.payload).await;
911                        }
912                    }
913                }
914            }
915        }
916    }
917}