snapcast-client 0.12.0

Snapcast client library — embeddable synchronized multiroom audio
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
//! Connection layer.
//!
//! TCP is the supported Snapcast audio transport. The WebSocket modules are
//! kept feature-gated for future interoperability work, but they are not
//! selected by [`SnapConnection::new`] until the server and client can speak a
//! verified binary audio-streaming WebSocket contract.

#[cfg(feature = "websocket")]
pub mod ws;
#[cfg(feature = "tls")]
pub mod wss;

use std::collections::HashMap;
use std::time::Duration;

use anyhow::{Context, Result};
use snapcast_proto::MessageType;
use snapcast_proto::message::base::BaseMessage;
use snapcast_proto::message::factory::{self, MessagePayload, TypedMessage};
use snapcast_proto::types::Timeval;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::oneshot;

/// Read a complete frame (header + payload) from an async reader.
async fn read_frame<R: AsyncReadExt + Unpin>(reader: &mut R) -> Result<TypedMessage> {
    // Read 26-byte header
    let mut header_buf = [0u8; BaseMessage::HEADER_SIZE];
    reader
        .read_exact(&mut header_buf)
        .await
        .context("reading base message header")?;

    let mut base = BaseMessage::read_from(&mut &header_buf[..])
        .map_err(|e| anyhow::anyhow!("parsing header: {e}"))?;

    // Stamp received time using steady clock (matching C++ steadytimeofday)
    base.received = steady_time_of_day();
    ensure_payload_size(base.size)?;

    // Read payload
    let mut payload_buf = vec![0u8; base.size as usize];
    if !payload_buf.is_empty() {
        reader
            .read_exact(&mut payload_buf)
            .await
            .context("reading payload")?;
    }

    factory::deserialize(base, &payload_buf).map_err(|e| anyhow::anyhow!("deserializing: {e}"))
}

pub(crate) fn ensure_payload_size(size: u32) -> Result<()> {
    anyhow::ensure!(
        size <= snapcast_proto::DEFAULT_MAX_PAYLOAD_SIZE,
        "payload too large: {size} bytes"
    );
    Ok(())
}

/// Write a complete frame (header + payload) to an async writer.
async fn write_frame<W: AsyncWriteExt + Unpin>(
    writer: &mut W,
    base: &mut BaseMessage,
    payload: &MessagePayload,
) -> Result<()> {
    let frame =
        factory::serialize(base, payload).map_err(|e| anyhow::anyhow!("serializing: {e}"))?;
    writer.write_all(&frame).await.context("writing frame")?;
    Ok(())
}

/// Pending request waiting for a response.
struct PendingRequest {
    tx: oneshot::Sender<TypedMessage>,
}

/// TCP connection to a snapserver.
pub struct TcpConnection {
    stream: Option<TcpStream>,
    host: String,
    port: u16,
    pending: HashMap<u16, PendingRequest>,
    next_id: u16,
}

/// Unified connection over supported transports.
pub enum SnapConnection {
    /// Plain TCP connection.
    Tcp(TcpConnection),
    #[cfg(feature = "websocket")]
    /// WebSocket (non-secure) connection.
    Ws(ws::WsConnection),
    #[cfg(feature = "tls")]
    /// WebSocket over TLS (secure) connection.
    Wss(wss::WssConnection),
}

impl SnapConnection {
    /// Create a new connection based on the scheme.
    pub fn new(scheme: &str, host: &str, port: u16) -> Result<Self> {
        match scheme {
            snapcast_proto::SCHEME_TCP => Ok(Self::Tcp(TcpConnection::new(host, port))),
            snapcast_proto::SCHEME_WS | snapcast_proto::SCHEME_WSS => anyhow::bail!(
                "websocket audio transport is not supported yet; use tcp:// for Snapcast audio"
            ),
            _ => anyhow::bail!("unsupported scheme: {scheme}"),
        }
    }

    /// Establish the connection.
    pub async fn connect(&mut self) -> Result<()> {
        match self {
            Self::Tcp(c) => c.connect().await,
            #[cfg(feature = "websocket")]
            Self::Ws(c) => c.connect().await,
            #[cfg(feature = "tls")]
            Self::Wss(c) => c.connect().await,
        }
    }

    /// Close the connection.
    pub fn disconnect(&mut self) {
        match self {
            Self::Tcp(c) => c.disconnect(),
            #[cfg(feature = "websocket")]
            Self::Ws(c) => c.disconnect(),
            #[cfg(feature = "tls")]
            Self::Wss(c) => c.disconnect(),
        }
    }

    /// Send a message.
    pub async fn send(&mut self, msg_type: MessageType, payload: &MessagePayload) -> Result<()> {
        match self {
            Self::Tcp(c) => c.send(msg_type, payload).await,
            #[cfg(feature = "websocket")]
            Self::Ws(c) => c.send(msg_type, payload).await,
            #[cfg(feature = "tls")]
            Self::Wss(c) => c.send(msg_type, payload).await,
        }
    }

    /// Receive the next message.
    pub async fn recv(&mut self) -> Result<TypedMessage> {
        match self {
            Self::Tcp(c) => c.recv().await,
            #[cfg(feature = "websocket")]
            Self::Ws(c) => c.recv().await,
            #[cfg(feature = "tls")]
            Self::Wss(c) => c.recv().await,
        }
    }
}

impl TcpConnection {
    /// Create a new connection to the given host and port.
    pub fn new(host: &str, port: u16) -> Self {
        Self {
            stream: None,
            host: host.to_string(),
            port,
            pending: HashMap::new(),
            next_id: 1,
        }
    }

    /// Establish the TCP connection.
    pub async fn connect(&mut self) -> Result<()> {
        let addr = format!("{}:{}", self.host, self.port);
        let stream = TcpStream::connect(&addr)
            .await
            .with_context(|| format!("connecting to {addr}"))?;
        self.stream = Some(stream);
        self.pending.clear();
        self.next_id = 1;
        Ok(())
    }

    /// Close the connection.
    pub fn disconnect(&mut self) {
        self.stream = None;
        self.pending.clear();
    }

    fn stream_mut(&mut self) -> Result<&mut TcpStream> {
        self.stream.as_mut().context("not connected")
    }

    /// Send a message without waiting for a response.
    pub async fn send(&mut self, msg_type: MessageType, payload: &MessagePayload) -> Result<()> {
        let stream = self.stream_mut()?;
        let mut base = BaseMessage {
            msg_type,
            id: 0,
            refers_to: 0,
            sent: Timeval::default(),
            received: Timeval::default(),
            size: 0,
        };
        stamp_sent(&mut base);
        write_frame(stream, &mut base, payload).await
    }

    /// Send a request and wait for the response (matched by `refersTo`).
    pub async fn send_request(
        &mut self,
        msg_type: MessageType,
        payload: &MessagePayload,
        timeout: Duration,
    ) -> Result<TypedMessage> {
        let id = self.next_id;
        self.next_id = self.next_id.wrapping_add(1);

        let (tx, rx) = oneshot::channel();
        self.pending.insert(id, PendingRequest { tx });

        let stream = self.stream_mut()?;
        let mut base = BaseMessage {
            msg_type,
            id,
            refers_to: 0,
            sent: Timeval::default(),
            received: Timeval::default(),
            size: 0,
        };
        stamp_sent(&mut base);
        write_frame(stream, &mut base, payload).await?;

        tokio::time::timeout(timeout, rx)
            .await
            .context("request timed out")?
            .context("response channel closed")
    }

    /// Receive the next message. If it's a response to a pending request,
    /// deliver it to the waiting caller and receive again.
    pub async fn recv(&mut self) -> Result<TypedMessage> {
        loop {
            let stream = self.stream_mut()?;
            let msg = read_frame(stream).await?;

            if msg.base.refers_to != 0
                && let Some(pending) = self.pending.remove(&msg.base.refers_to)
            {
                let _ = pending.tx.send(msg);
                continue;
            }
            return Ok(msg);
        }
    }
}

pub(super) fn stamp_sent(base: &mut BaseMessage) {
    let tv = steady_time_of_day();
    base.sent = tv;
}

/// Matches the C++ `chronos::steadytimeofday` — monotonic clock time.
/// On macOS/Linux, `Instant` is based on `CLOCK_MONOTONIC` which counts
/// seconds since boot, matching the C++ snapserver's clock domain.
pub(super) fn steady_time_of_day() -> Timeval {
    // Instant::now().duration_since(EPOCH) gives time since first call.
    // We need time since boot. On Unix, Instant uses CLOCK_MONOTONIC
    // which starts at boot. We can get this via the elapsed time from
    // a known-early Instant.
    let usec = monotonic_usec();
    Timeval {
        sec: (usec / 1_000_000) as i32,
        usec: (usec % 1_000_000) as i32,
    }
}

/// Microseconds since boot (monotonic clock).
/// Uses the same clock source as C++ std::chrono::steady_clock.
#[allow(unsafe_code)] // FFI: mach_continuous_time (macOS), clock_gettime (Linux)
fn monotonic_usec() -> i64 {
    #[cfg(target_os = "macos")]
    {
        // macOS: C++ steady_clock uses mach_continuous_time, not CLOCK_MONOTONIC.
        // These differ by ~2s on macOS. We must match the server's clock exactly.
        unsafe extern "C" {
            fn mach_continuous_time() -> u64;
            fn mach_timebase_info(info: *mut MachTimebaseInfo) -> i32;
        }
        #[repr(C)]
        struct MachTimebaseInfo {
            numer: u32,
            denom: u32,
        }
        static TIMEBASE: std::sync::OnceLock<(u32, u32)> = std::sync::OnceLock::new();
        let (numer, denom) = *TIMEBASE.get_or_init(|| {
            let mut info = MachTimebaseInfo { numer: 0, denom: 0 };
            unsafe {
                mach_timebase_info(&mut info);
            }
            (info.numer, info.denom)
        });
        let ticks = unsafe { mach_continuous_time() };
        let nanos = ticks as i128 * numer as i128 / denom as i128;
        (nanos / 1_000) as i64
    }
    #[cfg(all(unix, not(target_os = "macos")))]
    {
        let mut ts = libc::timespec {
            tv_sec: 0,
            tv_nsec: 0,
        };
        // SAFETY: clock_gettime with CLOCK_MONOTONIC is always safe
        unsafe {
            libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
        }
        ts.tv_sec * 1_000_000 + ts.tv_nsec / 1_000
    }
    #[cfg(not(unix))]
    {
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap_or_default();
        now.as_micros() as i64
    }
}

/// Current time in microseconds using the steady clock.
pub fn now_usec() -> i64 {
    monotonic_usec()
}

#[cfg(test)]
mod tests {
    use super::*;
    use snapcast_proto::message::time::Time;

    /// Test frame read/write with in-memory buffers (no network needed).
    #[tokio::test]
    async fn write_and_read_frame() {
        let payload = MessagePayload::Time(Time {
            latency: Timeval { sec: 0, usec: 1234 },
        });
        let mut base = BaseMessage {
            msg_type: MessageType::Time,
            id: 42,
            refers_to: 0,
            sent: Timeval { sec: 1, usec: 0 },
            received: Timeval::default(),
            size: 0,
        };

        // Write to buffer
        let mut buf = Vec::new();
        write_frame(&mut buf, &mut base, &payload).await.unwrap();

        // Size should be header + payload
        assert_eq!(buf.len(), BaseMessage::HEADER_SIZE + Time::SIZE as usize);

        // Read back
        let mut cursor = std::io::Cursor::new(&buf);
        let msg = read_frame(&mut cursor).await.unwrap();
        assert_eq!(msg.base.msg_type, MessageType::Time);
        assert_eq!(msg.base.id, 42);
        match msg.payload {
            MessagePayload::Time(t) => assert_eq!(t.latency.usec, 1234),
            _ => panic!("expected Time"),
        }
    }

    #[tokio::test]
    async fn write_and_read_error_frame() {
        use snapcast_proto::message::error::Error;

        let payload = MessagePayload::Error(Error {
            code: 401,
            error: "Unauthorized".into(),
            message: "bad auth".into(),
        });
        let mut base = BaseMessage {
            msg_type: MessageType::Error,
            id: 0,
            refers_to: 7,
            sent: Timeval::default(),
            received: Timeval::default(),
            size: 0,
        };

        let mut buf = Vec::new();
        write_frame(&mut buf, &mut base, &payload).await.unwrap();

        let mut cursor = std::io::Cursor::new(&buf);
        let msg = read_frame(&mut cursor).await.unwrap();
        assert_eq!(msg.base.refers_to, 7);
        match msg.payload {
            MessagePayload::Error(e) => {
                assert_eq!(e.code, 401);
                assert_eq!(e.error, "Unauthorized");
            }
            _ => panic!("expected Error"),
        }
    }

    #[tokio::test]
    async fn write_and_read_multiple_frames() {
        let frames: Vec<(MessageType, MessagePayload)> = vec![
            (MessageType::Time, MessagePayload::Time(Time::default())),
            (
                MessageType::ClientInfo,
                MessagePayload::ClientInfo(snapcast_proto::message::client_info::ClientInfo {
                    volume: 80,
                    muted: false,
                }),
            ),
        ];

        let mut buf = Vec::new();
        for (mt, payload) in &frames {
            let mut base = BaseMessage {
                msg_type: *mt,
                id: 0,
                refers_to: 0,
                sent: Timeval::default(),
                received: Timeval::default(),
                size: 0,
            };
            write_frame(&mut buf, &mut base, payload).await.unwrap();
        }

        // Read both back
        let mut cursor = std::io::Cursor::new(&buf);
        let msg1 = read_frame(&mut cursor).await.unwrap();
        assert_eq!(msg1.base.msg_type, MessageType::Time);
        let msg2 = read_frame(&mut cursor).await.unwrap();
        assert_eq!(msg2.base.msg_type, MessageType::ClientInfo);
    }

    #[test]
    fn tcp_connection_new() {
        let conn = TcpConnection::new("localhost", 1704);
        assert!(conn.stream.is_none());
        assert_eq!(conn.host, "localhost");
        assert_eq!(conn.port, 1704);
    }

    #[test]
    fn rejects_oversized_payload() {
        let too_large = snapcast_proto::DEFAULT_MAX_PAYLOAD_SIZE + 1;
        assert!(ensure_payload_size(too_large).is_err());
    }

    #[test]
    fn rejects_websocket_audio_scheme() {
        assert!(SnapConnection::new("ws", "localhost", 1780).is_err());
        assert!(SnapConnection::new("wss", "localhost", 1788).is_err());
    }
}