Skip to main content

snapcast_server/
lib.rs

1#![deny(unsafe_code)]
2#![warn(clippy::redundant_closure)]
3#![warn(clippy::implicit_clone)]
4#![warn(clippy::uninlined_format_args)]
5#![warn(missing_docs)]
6
7//! Snapcast server library — embeddable synchronized multiroom audio server.
8//!
9//! See also: [`snapcast-client`](https://docs.rs/snapcast-client) for the client library.
10//! # Architecture
11//!
12//! The server is built around a channel-based API matching `snapcast-client`:
13//!
14//! - [`SnapServer`] is the main entry point
15//! - [`ServerEvent`] flows from server → consumer (client connected, stream status, custom messages)
16//! - [`ServerCommand`] flows from consumer → server (typed mutations, custom messages, stop)
17//!
18//! # Example
19//!
20//! ```no_run
21//! use snapcast_server::{SnapServer, ServerConfig, ServerEvent, ServerCommand};
22//!
23//! # async fn example() -> anyhow::Result<()> {
24//! let config = ServerConfig::default();
25//! let (mut server, mut events) = SnapServer::new(config);
26//! let _audio_tx = server.add_stream("default");
27//! let cmd = server.command_sender();
28//!
29//! tokio::spawn(async move {
30//!     while let Some(event) = events.recv().await {
31//!         match event {
32//!             ServerEvent::ClientConnected { id, ref hello } => {
33//!                 tracing::info!(id, name = hello.host_name, "Client connected");
34//!             }
35//!             _ => {}
36//!         }
37//!     }
38//! });
39//!
40//! server.run().await?;
41//! # Ok(())
42//! # }
43//! ```
44
45use std::sync::Arc;
46
47use tokio::sync::{broadcast, mpsc};
48
49// Re-export proto types that embedders need
50#[cfg(feature = "custom-protocol")]
51pub use snapcast_proto::CustomMessage;
52#[cfg(feature = "encryption")]
53pub use snapcast_proto::DEFAULT_ENCRYPTION_PSK;
54pub use snapcast_proto::SampleFormat;
55pub use snapcast_proto::message::hello::Hello;
56pub use snapcast_proto::{DEFAULT_SAMPLE_FORMAT, DEFAULT_STREAM_PORT};
57
58const EVENT_CHANNEL_SIZE: usize = 256;
59const COMMAND_CHANNEL_SIZE: usize = 64;
60const AUDIO_CHANNEL_SIZE: usize = 256;
61
62/// Channel size for F32 embedded sources — backpressure from encoder pacing.
63const F32_CHANNEL_SIZE: usize = 1;
64
65/// Audio data pushed by the consumer — either f32 or raw PCM.
66#[derive(Debug, Clone)]
67pub enum AudioData {
68    /// Interleaved f32 samples (from DSP, EQ, AirPlay receivers).
69    /// Range: -1.0 to 1.0.
70    F32(Vec<f32>),
71    /// Raw interleaved PCM bytes at the stream's configured sample format
72    /// (from pipe/file/process readers). Byte order: little-endian.
73    Pcm(Vec<u8>),
74}
75
76/// A timestamped audio frame for server input.
77#[derive(Debug, Clone)]
78pub struct AudioFrame {
79    /// Audio samples.
80    pub data: AudioData,
81    /// Timestamp in microseconds (server time).
82    pub timestamp_usec: i64,
83}
84
85/// Buffered sender for F32 audio that handles chunking, timestamping, and gap detection.
86///
87/// Accumulates variable-size F32 sample buffers and emits fixed-size 20ms chunks
88/// with monotonic timestamps. Automatically resets on playback gaps (>500ms).
89///
90/// Created by [`SnapServer::add_f32_stream`].
91pub struct F32AudioSender {
92    tx: mpsc::Sender<AudioFrame>,
93    buf: Vec<f32>,
94    chunk_samples: usize,
95    channels: u16,
96    sample_rate: u32,
97    ts: Option<time::ChunkTimestamper>,
98    last_send: std::time::Instant,
99}
100
101impl F32AudioSender {
102    fn new(tx: mpsc::Sender<AudioFrame>, sample_rate: u32, channels: u16) -> Self {
103        let chunk_samples = (sample_rate as usize * 20 / 1000) * channels as usize;
104        Self {
105            tx,
106            buf: Vec::with_capacity(chunk_samples * 2),
107            chunk_samples,
108            channels,
109            sample_rate,
110            ts: None,
111            last_send: std::time::Instant::now(),
112        }
113    }
114
115    /// Push interleaved F32 samples. Variable-size input is accumulated and
116    /// emitted as fixed 20ms chunks. Returns when all complete chunks are sent.
117    pub async fn send(
118        &mut self,
119        samples: &[f32],
120    ) -> Result<(), mpsc::error::SendError<AudioFrame>> {
121        let now = std::time::Instant::now();
122        if now.duration_since(self.last_send) > std::time::Duration::from_millis(500) {
123            self.ts = None;
124            self.buf.clear();
125        }
126        self.last_send = now;
127
128        self.buf.extend_from_slice(samples);
129        let ch = self.channels.max(1) as usize;
130        while self.buf.len() >= self.chunk_samples {
131            let chunk: Vec<f32> = self.buf.drain(..self.chunk_samples).collect();
132            let frames = (self.chunk_samples / ch) as u32;
133            let ts = self
134                .ts
135                .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
136            let timestamp_usec = ts.next(frames);
137            self.tx
138                .send(AudioFrame {
139                    data: AudioData::F32(chunk),
140                    timestamp_usec,
141                })
142                .await?;
143        }
144        Ok(())
145    }
146
147    /// Flush remaining samples (< 20ms) as a final short chunk.
148    /// Call at end-of-track to avoid losing the last few milliseconds.
149    pub async fn flush(&mut self) -> Result<(), mpsc::error::SendError<AudioFrame>> {
150        if self.buf.is_empty() {
151            return Ok(());
152        }
153        let chunk: Vec<f32> = self.buf.drain(..).collect();
154        let ch = self.channels.max(1) as usize;
155        let frames = (chunk.len() / ch) as u32;
156        let ts = self
157            .ts
158            .get_or_insert_with(|| time::ChunkTimestamper::new(self.sample_rate));
159        let timestamp_usec = ts.next(frames);
160        self.tx
161            .send(AudioFrame {
162                data: AudioData::F32(chunk),
163                timestamp_usec,
164            })
165            .await
166    }
167}
168
169/// An encoded audio chunk ready to be sent to clients.
170#[derive(Debug, Clone)]
171pub struct WireChunkData {
172    /// Stream ID this chunk belongs to.
173    pub stream_id: String,
174    /// Server timestamp in microseconds.
175    pub timestamp_usec: i64,
176    /// Encoded audio data.
177    pub data: Vec<u8>,
178}
179
180pub mod auth;
181#[cfg(feature = "encryption")]
182pub(crate) mod crypto;
183pub(crate) mod encoder;
184pub(crate) mod session;
185pub(crate) mod state;
186pub mod status;
187pub(crate) mod stream;
188pub mod time;
189
190/// Settings update pushed to a streaming client via binary protocol.
191#[derive(Debug, Clone)]
192pub struct ClientSettingsUpdate {
193    /// Target client ID.
194    pub client_id: String,
195    /// Buffer size in ms.
196    pub buffer_ms: i32,
197    /// Latency offset in ms.
198    pub latency: i32,
199    /// Volume (0–100).
200    pub volume: u16,
201    /// Mute state.
202    pub muted: bool,
203}
204
205/// Events emitted by the server to the consumer.
206#[derive(Debug)]
207#[non_exhaustive]
208pub enum ServerEvent {
209    /// A client connected via the binary protocol.
210    ClientConnected {
211        /// Unique client identifier.
212        id: String,
213        /// The client's Hello message with all connection metadata.
214        hello: snapcast_proto::message::hello::Hello,
215    },
216    /// A client disconnected.
217    ClientDisconnected {
218        /// Unique client identifier.
219        id: String,
220    },
221    /// A client's volume changed.
222    ClientVolumeChanged {
223        /// Client ID.
224        client_id: String,
225        /// New volume (0–100).
226        volume: u16,
227        /// Mute state.
228        muted: bool,
229    },
230    /// A client's latency changed.
231    ClientLatencyChanged {
232        /// Client ID.
233        client_id: String,
234        /// New latency in ms.
235        latency: i32,
236    },
237    /// A client's name changed.
238    ClientNameChanged {
239        /// Client ID.
240        client_id: String,
241        /// New name.
242        name: String,
243    },
244    /// A group's stream assignment changed.
245    GroupStreamChanged {
246        /// Group ID.
247        group_id: String,
248        /// New stream ID.
249        stream_id: String,
250    },
251    /// A group's mute state changed.
252    GroupMuteChanged {
253        /// Group ID.
254        group_id: String,
255        /// Mute state.
256        muted: bool,
257    },
258    /// A stream's status changed (playing, idle, unknown).
259    StreamStatus {
260        /// Stream identifier.
261        stream_id: String,
262        /// New status.
263        status: String,
264    },
265    /// Stream metadata/properties changed.
266    StreamMetaChanged {
267        /// Stream identifier.
268        stream_id: String,
269        /// Updated properties.
270        metadata: std::collections::HashMap<String, serde_json::Value>,
271    },
272    /// A group's name changed.
273    GroupNameChanged {
274        /// Group ID.
275        group_id: String,
276        /// New name.
277        name: String,
278    },
279    /// Server state changed — groups were reorganized (created, deleted, merged).
280    ///
281    /// Emitted after structural changes like `SetGroupClients` or `DeleteClient`
282    /// when the group topology changes. Mirrors `Server.OnUpdate` in the C++ snapserver.
283    /// The consumer should re-read server status via `GetStatus`.
284    ServerUpdated,
285    /// A stream control command was received (play, pause, next, seek, etc.).
286    ///
287    /// The library forwards this to the embedder since it doesn't own stream readers.
288    StreamControl {
289        /// Stream ID.
290        stream_id: String,
291        /// Command name.
292        command: String,
293        /// Optional parameters.
294        params: serde_json::Value,
295    },
296    /// Custom binary protocol message from a streaming client.
297    #[cfg(feature = "custom-protocol")]
298    CustomMessage {
299        /// Client ID.
300        client_id: String,
301        /// The custom message.
302        message: snapcast_proto::CustomMessage,
303    },
304}
305
306/// Commands the consumer sends to the server.
307#[derive(Debug)]
308#[non_exhaustive]
309pub enum ServerCommand {
310    /// Set a client's volume.
311    SetClientVolume {
312        /// Client ID.
313        client_id: String,
314        /// Volume (0–100).
315        volume: u16,
316        /// Mute state.
317        muted: bool,
318    },
319    /// Set a client's latency offset.
320    SetClientLatency {
321        /// Client ID.
322        client_id: String,
323        /// Latency in milliseconds.
324        latency: i32,
325    },
326    /// Set a client's display name.
327    SetClientName {
328        /// Client ID.
329        client_id: String,
330        /// New name.
331        name: String,
332    },
333    /// Assign a stream to a group.
334    SetGroupStream {
335        /// Group ID.
336        group_id: String,
337        /// Stream ID.
338        stream_id: String,
339    },
340    /// Mute/unmute a group.
341    SetGroupMute {
342        /// Group ID.
343        group_id: String,
344        /// Mute state.
345        muted: bool,
346    },
347    /// Set a group's display name.
348    SetGroupName {
349        /// Group ID.
350        group_id: String,
351        /// New name.
352        name: String,
353    },
354    /// Move clients to a group.
355    SetGroupClients {
356        /// Group ID.
357        group_id: String,
358        /// Client IDs.
359        clients: Vec<String>,
360    },
361    /// Delete a client from the server.
362    DeleteClient {
363        /// Client ID.
364        client_id: String,
365    },
366    /// Set stream metadata (artist, title, album, etc.).
367    SetStreamMeta {
368        /// Stream ID.
369        stream_id: String,
370        /// Metadata key-value pairs.
371        metadata: std::collections::HashMap<String, serde_json::Value>,
372    },
373    /// Request dynamic stream addition from an application shell.
374    ///
375    /// The embeddable library does not own stream readers. Binaries or embedders
376    /// must create streams before [`SnapServer::run`] or implement their own
377    /// orchestration around this command.
378    AddStream {
379        /// Stream source URI (e.g. `pipe:///tmp/snapfifo?name=default`).
380        uri: String,
381        /// Response: the stream ID assigned.
382        response_tx: tokio::sync::oneshot::Sender<Result<String, String>>,
383    },
384    /// Remove a stream source.
385    RemoveStream {
386        /// Stream ID to remove.
387        stream_id: String,
388    },
389    /// Forward a control command to a stream (play, pause, next, etc.).
390    StreamControl {
391        /// Stream ID.
392        stream_id: String,
393        /// Command name (e.g. "next", "previous", "pause", "seek").
394        command: String,
395        /// Optional command parameter (e.g. seek position).
396        params: serde_json::Value,
397    },
398    /// Get full server status.
399    GetStatus {
400        /// Response channel.
401        response_tx: tokio::sync::oneshot::Sender<status::ServerStatus>,
402    },
403    /// Send a custom binary protocol message to a streaming client.
404    #[cfg(feature = "custom-protocol")]
405    SendToClient {
406        /// Target client ID.
407        client_id: String,
408        /// The custom message.
409        message: snapcast_proto::CustomMessage,
410    },
411    /// Stop the server gracefully.
412    Stop,
413}
414
415/// Default codec based on compiled features.
416fn default_codec() -> &'static str {
417    #[cfg(feature = "flac")]
418    return snapcast_proto::CODEC_FLAC;
419    #[cfg(all(feature = "f32lz4", not(feature = "flac")))]
420    return snapcast_proto::CODEC_F32LZ4;
421    #[cfg(not(any(feature = "flac", feature = "f32lz4")))]
422    return snapcast_proto::CODEC_PCM;
423}
424
425/// Server configuration for the embeddable library.
426pub struct ServerConfig {
427    /// Bind address for binary protocol client connections. Default: 0.0.0.0.
428    pub stream_bind_address: String,
429    /// TCP port for binary protocol (client connections). Default: 1704.
430    pub stream_port: u16,
431    /// Audio buffer size in milliseconds. Default: 1000.
432    pub buffer_ms: u32,
433    /// Default codec: "f32lz4", "pcm", "opus", "ogg". Default: "f32lz4".
434    pub codec: String,
435    /// Default sample format. Default: 48000:16:2.
436    pub sample_format: String,
437
438    /// Auth validator for streaming clients. `None` = no auth required.
439    pub auth: Option<std::sync::Arc<dyn auth::AuthValidator>>,
440    /// Client filter — called after Hello to accept/reject connections.
441    /// `None` = accept all clients.
442    pub client_filter: Option<std::sync::Arc<dyn auth::ClientFilter>>,
443    /// Pre-shared key for f32lz4 encryption. `None` = no encryption.
444    #[cfg(feature = "encryption")]
445    pub encryption_psk: Option<String>,
446    /// Path to persist server state (clients, groups). `None` = no persistence.
447    pub state_file: Option<std::path::PathBuf>,
448    /// Send audio data to muted clients. Default: false (skip muted, saves bandwidth).
449    pub send_audio_to_muted: bool,
450}
451
452impl Default for ServerConfig {
453    fn default() -> Self {
454        Self {
455            stream_bind_address: snapcast_proto::DEFAULT_BIND_ADDRESS.into(),
456            stream_port: snapcast_proto::DEFAULT_STREAM_PORT,
457            buffer_ms: snapcast_proto::DEFAULT_BUFFER_MS,
458            codec: default_codec().into(),
459            sample_format: snapcast_proto::DEFAULT_SAMPLE_FORMAT_STRING.into(),
460
461            auth: None,
462            client_filter: None,
463            #[cfg(feature = "encryption")]
464            encryption_psk: None,
465            state_file: None,
466            send_audio_to_muted: false,
467        }
468    }
469}
470
471/// Per-stream configuration. If `None`, inherits from [`ServerConfig`].
472#[derive(Debug, Clone, Default)]
473pub struct StreamConfig {
474    /// Codec override (e.g. "flac", "f32lz4", "opus", "ogg", "pcm").
475    pub codec: Option<String>,
476    /// Sample format override (e.g. "48000:16:2").
477    pub sample_format: Option<String>,
478}
479
480/// The embeddable Snapcast server.
481pub struct SnapServer {
482    config: ServerConfig,
483    event_tx: mpsc::Sender<ServerEvent>,
484    command_tx: mpsc::Sender<ServerCommand>,
485    command_rx: Option<mpsc::Receiver<ServerCommand>>,
486    /// Named audio streams — each gets its own encoder at run().
487    streams: Vec<(String, StreamConfig, mpsc::Receiver<AudioFrame>)>,
488    /// Broadcast channel for encoded chunks → sessions.
489    chunk_tx: broadcast::Sender<WireChunkData>,
490}
491
492/// Spawn a per-stream encode loop on a dedicated thread.
493///
494/// Receives `AudioFrame`, passes `AudioData` directly to the encoder,
495/// and broadcasts encoded `WireChunkData` to sessions.
496fn spawn_stream_encoder(
497    stream_id: String,
498    mut rx: mpsc::Receiver<AudioFrame>,
499    mut enc: Box<dyn encoder::Encoder>,
500    chunk_tx: broadcast::Sender<WireChunkData>,
501    sample_rate: u32,
502    channels: u16,
503) {
504    std::thread::spawn(move || {
505        let rt = tokio::runtime::Builder::new_current_thread()
506            .enable_time()
507            .build()
508            .expect("encoder runtime");
509
510        rt.block_on(async {
511            let mut next_tick: Option<tokio::time::Instant> = None;
512            while let Some(frame) = rx.recv().await {
513                // Pace F32 sources to realtime (pipe sources pace naturally via blocking read)
514                if let AudioData::F32(ref samples) = frame.data {
515                    let num_frames = samples.len() / channels.max(1) as usize;
516                    let chunk_dur = std::time::Duration::from_micros(
517                        (num_frames as u64 * 1_000_000) / sample_rate as u64,
518                    );
519                    let now = tokio::time::Instant::now();
520                    let tick = next_tick.get_or_insert(now);
521                    // Reset on gap (>500ms behind wall clock)
522                    if now.checked_duration_since(*tick + chunk_dur)
523                        > Some(std::time::Duration::from_millis(500))
524                    {
525                        *tick = now;
526                    }
527                    *tick += chunk_dur;
528                    tokio::time::sleep_until(*tick).await;
529                }
530                match enc.encode(&frame.data) {
531                    Ok(encoded) if !encoded.data.is_empty() => {
532                        let _ = chunk_tx.send(WireChunkData {
533                            stream_id: stream_id.clone(),
534                            timestamp_usec: frame.timestamp_usec,
535                            data: encoded.data,
536                        });
537                    }
538                    Err(e) => {
539                        tracing::warn!(stream = %stream_id, error = %e, "Encode failed");
540                    }
541                    _ => {} // encoder buffering
542                }
543            }
544        });
545    });
546}
547
548/// Convert f32 samples to PCM bytes at the given bit depth.
549impl SnapServer {
550    /// Create a new server. Returns the server and event receiver.
551    pub fn new(config: ServerConfig) -> (Self, mpsc::Receiver<ServerEvent>) {
552        let (event_tx, event_rx) = mpsc::channel(EVENT_CHANNEL_SIZE);
553        let (command_tx, command_rx) = mpsc::channel(COMMAND_CHANNEL_SIZE);
554        let (chunk_tx, _) = broadcast::channel(256);
555        let server = Self {
556            config,
557            event_tx,
558            command_tx,
559            command_rx: Some(command_rx),
560            streams: Vec::new(),
561            chunk_tx,
562        };
563        (server, event_rx)
564    }
565
566    /// Add a named audio stream. Returns a sender for pushing audio frames.
567    ///
568    /// Uses the server's default codec and sample format.
569    pub fn add_stream(&mut self, name: &str) -> mpsc::Sender<AudioFrame> {
570        self.add_stream_with_config(name, StreamConfig::default())
571    }
572
573    /// Add a named F32 audio stream with automatic chunking and timestamping.
574    ///
575    /// Returns an [`F32AudioSender`] that accepts variable-size F32 sample buffers
576    /// and handles 20ms chunking, monotonic timestamps, and gap detection internally.
577    ///
578    /// # Errors
579    /// Returns an error if the server's `sample_format` cannot be parsed.
580    pub fn add_f32_stream(&mut self, name: &str) -> Result<F32AudioSender, String> {
581        let sf: SampleFormat =
582            self.config.sample_format.parse().map_err(|e| {
583                format!("invalid sample_format '{}': {e}", self.config.sample_format)
584            })?;
585        let (tx, rx) = mpsc::channel(F32_CHANNEL_SIZE);
586        self.streams
587            .push((name.to_string(), StreamConfig::default(), rx));
588        Ok(F32AudioSender::new(tx, sf.rate(), sf.channels()))
589    }
590
591    /// Add a named audio stream with per-stream codec/format overrides.
592    pub fn add_stream_with_config(
593        &mut self,
594        name: &str,
595        config: StreamConfig,
596    ) -> mpsc::Sender<AudioFrame> {
597        let (tx, rx) = mpsc::channel(AUDIO_CHANNEL_SIZE);
598        self.streams.push((name.to_string(), config, rx));
599        tx
600    }
601
602    /// Get a cloneable command sender.
603    pub fn command_sender(&self) -> mpsc::Sender<ServerCommand> {
604        self.command_tx.clone()
605    }
606
607    /// Access the server configuration.
608    pub fn config(&self) -> &ServerConfig {
609        &self.config
610    }
611
612    /// Run the server. Blocks until stopped or a fatal error occurs.
613    pub async fn run(&mut self) -> anyhow::Result<()> {
614        let mut command_rx = self
615            .command_rx
616            .take()
617            .ok_or_else(|| anyhow::anyhow!("run() already called"))?;
618
619        let event_tx = self.event_tx.clone();
620
621        let sample_format: snapcast_proto::SampleFormat = self
622            .config
623            .sample_format
624            .parse()
625            .unwrap_or(snapcast_proto::DEFAULT_SAMPLE_FORMAT);
626
627        anyhow::ensure!(
628            !self.streams.is_empty(),
629            "No streams configured — call add_stream() before run()"
630        );
631
632        tracing::info!(
633            bind_address = %self.config.stream_bind_address,
634            stream_port = self.config.stream_port,
635            "Snapserver starting"
636        );
637
638        // Create default encoder — used for codec header and first default stream
639        let default_enc_config = encoder::EncoderConfig {
640            codec: self.config.codec.clone(),
641            format: sample_format,
642            options: String::new(),
643            #[cfg(feature = "encryption")]
644            encryption_psk: self.config.encryption_psk.clone(),
645        };
646        let default_enc = encoder::create(&default_enc_config)?;
647
648        // Spawn per-stream encode loops — reuse default_enc for first default stream
649        let chunk_tx = self.chunk_tx.clone();
650        let streams = std::mem::take(&mut self.streams);
651        let mut default_enc = Some(default_enc);
652
653        // Shared state for command handlers
654        let initial_state = self
655            .config
656            .state_file
657            .as_ref()
658            .map(|p| state::ServerState::load(p))
659            .unwrap_or_default();
660        let shared_state = Arc::new(tokio::sync::Mutex::new(initial_state));
661
662        // Create session server before stream registration
663        // (first_stream_name set in loop below, but SessionServer only needs it for default routing)
664        let first_name = streams
665            .first()
666            .map(|(n, _, _)| n.clone())
667            .unwrap_or_default();
668        let session_srv = Arc::new(session::SessionServer::new(session::SessionServerConfig {
669            bind_address: self.config.stream_bind_address.clone(),
670            port: self.config.stream_port,
671            buffer_ms: self.config.buffer_ms as i32,
672            auth: self.config.auth.clone(),
673            client_filter: self.config.client_filter.clone(),
674            shared_state: Arc::clone(&shared_state),
675            default_stream: first_name.clone(),
676            send_audio_to_muted: self.config.send_audio_to_muted,
677        }));
678
679        for (name, stream_cfg, rx) in streams {
680            {
681                let mut s = shared_state.lock().await;
682                if !s.streams.iter().any(|existing| existing.id == name) {
683                    s.streams.push(state::StreamInfo {
684                        id: name.clone(),
685                        status: "idle".into(),
686                        uri: String::new(),
687                        properties: Default::default(),
688                    });
689                }
690            }
691            let mut active_format = sample_format;
692            let enc = if stream_cfg.codec.is_none() && stream_cfg.sample_format.is_none() {
693                if let Some(enc) = default_enc.take() {
694                    enc
695                } else {
696                    encoder::create(&default_enc_config)?
697                }
698            } else {
699                let stream_codec = stream_cfg.codec.as_deref().unwrap_or(&self.config.codec);
700                let stream_format: snapcast_proto::SampleFormat = stream_cfg
701                    .sample_format
702                    .as_deref()
703                    .and_then(|s| s.parse().ok())
704                    .unwrap_or(sample_format);
705                active_format = stream_format;
706                encoder::create(&encoder::EncoderConfig {
707                    codec: stream_codec.to_string(),
708                    format: stream_format,
709                    options: String::new(),
710                    #[cfg(feature = "encryption")]
711                    encryption_psk: self.config.encryption_psk.clone(),
712                })?
713            };
714            tracing::info!(stream = %name, codec = enc.name(), format = %active_format, "Stream registered");
715            session_srv
716                .register_stream_codec(&name, enc.name(), enc.header())
717                .await;
718            spawn_stream_encoder(
719                name,
720                rx,
721                enc,
722                chunk_tx.clone(),
723                active_format.rate(),
724                active_format.channels(),
725            );
726        }
727
728        let session_for_run = Arc::clone(&session_srv);
729        let session_event_tx = event_tx.clone();
730        let session_chunk_tx = self.chunk_tx.clone();
731        let session_handle = tokio::spawn(async move {
732            if let Err(e) = session_for_run
733                .run(session_chunk_tx, session_event_tx)
734                .await
735            {
736                tracing::error!(error = %e, "Session server error");
737            }
738        });
739
740        let state_file = self.config.state_file.clone();
741        let save_state = |s: &state::ServerState| {
742            if let Some(ref path) = state_file {
743                let _ = s
744                    .save(path)
745                    .map_err(|e| tracing::warn!(error = %e, "Failed to save state"));
746            }
747        };
748
749        // Main loop
750        loop {
751            tokio::select! {
752                cmd = command_rx.recv() => {
753                    match cmd {
754                        Some(ServerCommand::Stop) | None => {
755                            tracing::info!("Server stopped");
756                            session_handle.abort();
757                            return Ok(());
758                        }
759                        Some(ServerCommand::SetClientVolume { client_id, volume, muted }) => {
760                            let mut s = shared_state.lock().await;
761                            if let Some(c) = s.clients.get_mut(&client_id) {
762                                c.config.volume.percent = volume;
763                                c.config.volume.muted = muted;
764                            }
765                            let latency = s.clients.get(&client_id).map(|c| c.config.latency).unwrap_or(0);
766                            save_state(&s);
767                            drop(s);
768                            session_srv.push_settings(ClientSettingsUpdate {
769                                client_id: client_id.clone(),
770                                buffer_ms: self.config.buffer_ms as i32,
771                                latency, volume, muted,
772                            }).await;
773                            let _ = event_tx.try_send(ServerEvent::ClientVolumeChanged { client_id: client_id.clone(), volume, muted });
774                            session_srv.update_routing_for_client(&client_id).await;
775                        }
776                        Some(ServerCommand::SetClientLatency { client_id, latency }) => {
777                            let mut settings_update = None;
778                            let mut s = shared_state.lock().await;
779                            if let Some(c) = s.clients.get_mut(&client_id) {
780                                c.config.latency = latency;
781                                settings_update = Some(ClientSettingsUpdate {
782                                    client_id: client_id.clone(),
783                                    buffer_ms: self.config.buffer_ms as i32,
784                                    latency,
785                                    volume: c.config.volume.percent,
786                                    muted: c.config.volume.muted,
787                                });
788                            }
789                            save_state(&s);
790                            drop(s);
791                            if let Some(update) = settings_update {
792                                session_srv.push_settings(update).await;
793                            }
794                            let _ = event_tx.try_send(ServerEvent::ClientLatencyChanged { client_id, latency });
795                        }
796                        Some(ServerCommand::SetClientName { client_id, name }) => {
797                            let mut s = shared_state.lock().await;
798                            if let Some(c) = s.clients.get_mut(&client_id) {
799                                c.config.name = name.clone();
800                            }
801                            save_state(&s);
802                            drop(s);
803                            let _ = event_tx.try_send(ServerEvent::ClientNameChanged { client_id, name });
804                        }
805                        Some(ServerCommand::SetGroupStream { group_id, stream_id }) => {
806                            let mut s = shared_state.lock().await;
807                            s.set_group_stream(&group_id, &stream_id);
808                            save_state(&s);
809                            drop(s);
810                            let _ = event_tx.try_send(ServerEvent::GroupStreamChanged { group_id: group_id.clone(), stream_id });
811                            session_srv.update_routing_for_group(&group_id).await;
812                        }
813                        Some(ServerCommand::SetGroupMute { group_id, muted }) => {
814                            let mut s = shared_state.lock().await;
815                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
816                                g.muted = muted;
817                            }
818                            save_state(&s);
819                            drop(s);
820                            let _ = event_tx.try_send(ServerEvent::GroupMuteChanged { group_id: group_id.clone(), muted });
821                            session_srv.update_routing_for_group(&group_id).await;
822                        }
823                        Some(ServerCommand::SetGroupName { group_id, name }) => {
824                            let mut s = shared_state.lock().await;
825                            if let Some(g) = s.groups.iter_mut().find(|g| g.id == group_id) {
826                                g.name = name.clone();
827                            }
828                            save_state(&s);
829                            drop(s);
830                            let _ = event_tx.try_send(ServerEvent::GroupNameChanged { group_id, name });
831                        }
832                        Some(ServerCommand::SetGroupClients { group_id, clients }) => {
833                            let mut s = shared_state.lock().await;
834                            s.set_group_clients(&group_id, &clients);
835                            save_state(&s);
836                            drop(s);
837                            // Structural change — mirrors Server.OnUpdate in C++ snapserver
838                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
839                            session_srv.update_routing_all().await;
840                        }
841                        Some(ServerCommand::DeleteClient { client_id }) => {
842                            let mut s = shared_state.lock().await;
843                            s.remove_client_from_groups(&client_id);
844                            s.clients.remove(&client_id);
845                            save_state(&s);
846                            drop(s);
847                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
848                            session_srv.update_routing_all().await;
849                        }
850                        Some(ServerCommand::SetStreamMeta { stream_id, metadata }) => {
851                            let mut s = shared_state.lock().await;
852                            if let Some(stream) = s.streams.iter_mut().find(|st| st.id == stream_id) {
853                                stream.properties = metadata.clone();
854                            }
855                            drop(s);
856                            let _ = event_tx.try_send(ServerEvent::StreamMetaChanged { stream_id, metadata });
857                        }
858                        Some(ServerCommand::AddStream { uri, response_tx }) => {
859                            tracing::warn!(uri, "Dynamic stream addition requires application-owned stream orchestration");
860                            let _ = response_tx.send(Err(
861                                "dynamic Stream.AddStream is not supported by the embeddable server after run(); create streams before run()".into(),
862                            ));
863                        }
864                        Some(ServerCommand::RemoveStream { stream_id }) => {
865                            let mut s = shared_state.lock().await;
866                            s.streams.retain(|st| st.id != stream_id);
867                            // Clear stream_id on groups that referenced this stream
868                            for g in &mut s.groups {
869                                if g.stream_id == stream_id {
870                                    g.stream_id.clear();
871                                }
872                            }
873                            save_state(&s);
874                            drop(s);
875                            let _ = event_tx.try_send(ServerEvent::ServerUpdated);
876                            session_srv.update_routing_all().await;
877                        }
878                        Some(ServerCommand::StreamControl { stream_id, command, params }) => {
879                            tracing::debug!(stream_id, command, ?params, "Stream control forwarded");
880                            // Forward to embedder via event — the library doesn't own stream readers
881                            let _ = event_tx.try_send(ServerEvent::StreamControl { stream_id, command, params });
882                        }
883                        Some(ServerCommand::GetStatus { response_tx }) => {
884                            let s = shared_state.lock().await;
885                            let _ = response_tx.send(s.to_status());
886                        }
887                        #[cfg(feature = "custom-protocol")]
888                        Some(ServerCommand::SendToClient { client_id, message }) => {
889                            session_srv.send_custom(&client_id, message.type_id, message.payload).await;
890                        }
891                    }
892                }
893            }
894        }
895    }
896}