mlxrs 0.1.0

Safe Rust bindings for Apple's MLX array framework, with LM, VLM, audio, and embeddings support
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
//! [`AudioPlayer`] — cpal-backed device playback, the mlxrs port of
//! `mlx-audio-swift`'s
//! [`MLXAudioCore.AudioPlayer.startStreaming(sampleRate:)`][swift-ap] +
//! `scheduleAudioChunk(_:withCrossfade:)` streaming path.
//!
//! ## API mirror
//!
//! Swift `AudioPlayer` exposes a roughly six-method streaming surface
//! over `AVAudioEngine` + `AVAudioPlayerNode`:
//!
//! | Swift                                       | mlxrs                                              |
//! | ------------------------------------------- | -------------------------------------------------- |
//! | `startStreaming(sampleRate:)`               | [`AudioPlayer::new`] / [`AudioPlayer::with_device`] + [`AudioPlayer::start`] |
//! | `scheduleAudioChunk(_:withCrossfade:)`      | [`AudioPlayer::write_samples`] (via [`super::output_stream::AudioOutputStream`]) |
//! | `pause()` (streaming branch)                | [`AudioPlayer::pause`]                              |
//! | `togglePlayPause()` (streaming branch)      | [`AudioPlayer::resume`]                             |
//! | `stopStreaming()` / `stop()`                | [`AudioPlayer::stop`] / `Drop`                      |
//! | `isPlaying` / `isStreamingMode`             | [`AudioPlayer::is_running`]                         |
//! | `finishStreamingInput()`                    | [`super::output_stream::AudioOutputStream::flush`]  |
//!
//! Swift's volume is read off `AVAudioPlayerNode.volume`; mlxrs
//! exposes the equivalent via [`AudioPlayer::store_volume`] +
//! [`AudioPlayer::volume`] backed by an `AtomicU32` (f32 bits) the
//! cpal callback reads each invocation.
//!
//! ## Cpal callback + buffer-queue plumbing
//!
//! The Swift path is buffer-by-buffer (`AVAudioPlayerNode.scheduleBuffer`
//! per `[Float]` chunk, each carrying its own completion handler).
//! The cpal equivalent inverts the polarity: cpal owns the I/O
//! thread, calls back into us with a pre-sized `&mut [f32]` to fill,
//! and we pull samples from a thread-safe queue. Concretely:
//!
//! ```text
//! producer thread (e.g. STS pipeline)            cpal I/O thread
//! ───────────────────────────────────            ─────────────────
//!   write_samples(&[f32]) ──┐                          │
//!                           ▼                          │
//!                 SampleQueue::push                    │
//!                           │                          │
//!                           ▼                          ▼
//!                       Arc<Mutex<VecDeque<f32>>> ── callback fills &mut [f32]
//!                           │                          │
//!                           │                          ▼
//!                           │              for s in out: s = pop_or_zero() * volume
//!//!                  AudioPlayer::buffer_depth
//! ```
//!
//! - **Producer side.** [`AudioPlayer::write_samples`] locks the
//!   shared `VecDeque<f32>` (capped at
//!   [`super::config::PlaybackConfig::queue_capacity_frames`] × channel
//!   count). Returns `Err` on overflow (a recoverable
//!   [`crate::error::Error::CapExceeded`] — no producer surprise OOM).
//!   This is the cpal-equivalent of `AVAudioPlayerNode.scheduleBuffer`
//!   returning even though the underlying scheduling chain is
//!   bounded.
//! - **Cpal callback.** Runs on cpal's audio I/O thread; locks the
//!   queue (a short critical section — only `pop_front` calls under
//!   the lock), reads the current volume from the `AtomicU32`,
//!   writes `pop * volume` per sample. On underrun (queue empty)
//!   the callback writes `0.0` — silence — instead of panicking or
//!   blocking. This matches the Swift behavior: the player node
//!   sits idle if no buffer is scheduled.
//! - **State.** Stored as `Arc<AtomicU8>` so both producer and cpal
//!   callback can observe transitions without holding the queue
//!   lock; values [`STATE_STOPPED`], [`STATE_RUNNING`],
//!   [`STATE_PAUSED`] map to Swift's `isPlaying` /
//!   `isStreamingMode` distinction (we collapse them into a single
//!   tri-state to make the cpal-side check a single atomic load).
//!
//! ## Concurrency
//!
//! The cpal callback runs on a real-time audio I/O thread with a
//! hard deadline: missing the device's callback period yields an
//! audible underrun even if the silence-fill path is correct
//! (CoreAudio fills with whatever was left in the buffer). The
//! callback **must not block** on producer-held mutexes.
//!
//! Concrete contract:
//!
//! - **Callback uses `try_lock`, not `lock`.** If the producer is
//!   mid-extend on the queue mutex, the callback emits silence for
//!   the current period instead of blocking past the device
//!   deadline. The cost (one underrun period) is bounded; blocking
//!   the audio thread is not.
//! - **Producer chunks large writes.** [`AudioPlayer::write_samples`]
//!   splits writes larger than [`WRITE_CHUNK_MAX`] into per-chunk
//!   lock acquisitions, so the callback's `try_lock` window is
//!   bounded by the duration of one chunk's `extend` (microseconds
//!   at 4096 f32 samples) rather than the duration of the full
//!   producer payload (which can be hundreds of milliseconds for a
//!   multi-second TTS chunk).
//! - **Future migration.** If audible underruns persist under
//!   profiling, swap the `Mutex<VecDeque<f32>>` for a lock-free
//!   `crossbeam-queue::ArrayQueue<f32>` (no new dep today; the
//!   `try_lock` + chunking pattern stays within the existing
//!   surface).
//!
//! ## Scope cuts (explicit)
//!
//! The Swift `AudioPlayer` exposes a few capabilities this player
//! deliberately does NOT port; each is a separate follow-up issue:
//!
//! - **Audio input / recording.** This player is playback-only.
//!   `AVAudioPlayer` / `AVAudioPlayerDelegate` are not mirrored.
//! - **File I/O (`loadAudio(from: URL)`).** This player plays raw PCM.
//!   WAV / MP3 / FLAC loading already lives in [`crate::audio::io`]; a
//!   caller that wants to play a file decodes there and pipes the
//!   resulting samples through [`AudioPlayer::write_samples`].
//! - **Format conversion (`PCMStreamConverter`).** This player expects
//!   the caller to supply samples at the configured
//!   [`super::config::PlaybackConfig::sample_rate`] /
//!   [`super::config::PlaybackConfig::channels`]. Resampling +
//!   format-conversion is a separate concern (already partially
//!   covered by [`crate::audio::io::load_audio`]'s resampling, fully
//!   covered by a future polyphase resampler follow-up).
//! - **Crossfade / fade-in (`scheduleAudioChunk(_:withCrossfade:)`'s
//!   `withCrossfade: true` branch).** Crossfade is an
//!   application-level concern; this player plays exactly the samples
//!   the caller pushes. A future helper module can wrap `AudioPlayer`
//!   with a fade-in/crossfade transform without touching the
//!   playback core.
//! - **Per-buffer completion callbacks.** Swift schedules each
//!   buffer with a `completionCallbackType: .dataConsumed` to track
//!   queued-buffer drain; cpal has no per-buffer-completion hook —
//!   instead [`super::output_stream::AudioOutputStream::flush`]
//!   blocks until [`AudioPlayer::buffer_depth`] reaches zero, which
//!   is the same end-state contract (`onDidFinishStreaming` fires
//!   when `queuedBuffers == 0`).
//! - **Timer-driven `currentTime` publishing.** Swift uses
//!   `Timer.scheduledTimer` (every 100ms) + Combine to publish
//!   `currentTime` for UI binding. mlxrs is a Rust library, not a
//!   SwiftUI ObservableObject; no `@Published` properties / no
//!   Combine equivalent. Callers that want positional readback can
//!   maintain their own sample counter against
//!   [`AudioPlayer::buffer_depth`].
//!
//! [swift-ap]: https://github.com/fintit-ai/mlx-audio-swift/blob/main/Sources/MLXAudioCore/AudioPlayer.swift

use std::{
  collections::VecDeque,
  sync::{
    Arc, Mutex,
    atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering},
  },
  thread,
  time::{Duration, Instant},
};

use cpal::{
  Stream, StreamError,
  traits::{DeviceTrait, HostTrait, StreamTrait},
};

use super::{
  config::{PlaybackConfig, SampleFormat},
  output_stream::AudioOutputStream,
};
use smol_str::format_smolstr;

use crate::error::{
  AllocFailurePayload, ArithmeticOverflowPayload, CapExceededPayload, Error, ExternalOpPayload,
  InvariantViolationPayload, OutOfRangePayload, Result,
};

/// Stopped — the cpal stream is built but not playing (Swift's
/// `!isStreaming && !isPlaying`).
pub const STATE_STOPPED: u8 = 0;
/// Running — the cpal stream is `play()`ing and producer writes are
/// accepted (Swift's `isStreaming && isPlaying`).
pub const STATE_RUNNING: u8 = 1;
/// Paused — the cpal stream is `pause()`d but the queue retains its
/// contents (Swift's `playerNode.pause()` branch of `pause()`).
pub const STATE_PAUSED: u8 = 2;

/// Spin-wait granularity for [`AudioPlayer::flush`]. Picked to match
/// Swift's `Timer.scheduledTimer(withTimeInterval: 0.1, ...)` poll
/// cadence so flush latency under tight contention is bounded by the
/// same order of magnitude as the Swift implementation.
const FLUSH_POLL_INTERVAL: Duration = Duration::from_millis(10);

/// Default [`AudioPlayer::flush`] timeout. Defensive cap so a stalled
/// cpal device doesn't block the producer forever; long enough that a
/// realistic 4-second queue (the [`PlaybackConfig`] default) can drain
/// at real-time playback speeds with safety margin.
const FLUSH_TIMEOUT: Duration = Duration::from_secs(30);

/// Maximum number of f32 samples the producer extends into the
/// shared queue per lock acquisition. Bounds the duration the
/// callback's `try_lock` would have to wait if it raced the producer
/// (~microseconds at 4096 samples on a modern CPU), so the cpal
/// callback's silence-on-contention fallback never lingers across
/// multiple device periods.
///
/// Picked at 4096 f32 samples (170ms at 24 kHz mono, 85ms at 48 kHz
/// stereo) — far larger than any realistic cpal callback buffer
/// (typically 64–1024 frames) but small enough that the producer's
/// per-chunk `extend` finishes well inside one device period.
pub const WRITE_CHUNK_MAX: usize = 4096;

/// Sanitize a caller-supplied volume scalar to `[0.0, 1.0]`. Public
/// helper so the policy can be unit-tested without constructing an
/// [`AudioPlayer`] (which opens a cpal device — not available in
/// CI). Used by [`AudioPlayer::store_volume`].
///
/// - **Non-finite (`NaN`, `±∞`) maps to `0.0`.** `f32::clamp`
///   preserves NaN bits, which would cause the cpal callback's
///   `sample * volume` arithmetic to emit NaN PCM (audible as
///   full-scale noise on most DACs).
/// - **Finite out-of-range is clamped to `[0.0, 1.0]`.** Matches
///   `AVAudioPlayerNode.volume`'s documented range.
#[must_use]
pub fn sanitize_volume(vol: f32) -> f32 {
  if vol.is_finite() {
    vol.clamp(0.0, 1.0)
  } else {
    0.0
  }
}

/// Thread-shared callback context. Lives behind an `Arc` so the cpal
/// stream's callback (which gets a `'static` closure) and the
/// producer-side [`AudioPlayer`] can both read/write the same state.
///
/// Kept as a dedicated struct (rather than five sibling `Arc<…>`
/// fields on `AudioPlayer`) so the `Drop` impl on [`AudioPlayer`]
/// can drop the cpal stream first (which joins the callback thread)
/// without an interleaved-drop hazard on the queue / state /
/// volume atomics.
struct SharedState {
  /// Producer-consumer queue of interleaved f32 samples. Bounded at
  /// `PlaybackConfig::queue_capacity_frames * channels` total
  /// samples; the cap is enforced in [`AudioPlayer::write_samples`].
  ///
  /// `Mutex` (not `parking_lot::Mutex`, not lock-free `ringbuf`) is
  /// chosen here because:
  /// - cpal's audio thread takes the lock for a single
  ///   `pop_front`-loop per callback (microseconds at typical 64-1024
  ///   frame callback buffers),
  /// - the producer holds the lock only across `extend` +
  ///   capacity-check arithmetic,
  /// - a future migration to `ringbuf` (one of the cpal docs'
  ///   recommended low-latency choices) is a local refactor behind
  ///   the same trait surface if profiling shows the lock matters.
  queue: Mutex<VecDeque<f32>>,
  /// Bound on `queue.lock().unwrap().len()`; computed once from
  /// `PlaybackConfig::queue_capacity_frames * channels.count()` so
  /// the producer doesn't recompute it per `write_samples` call.
  queue_capacity_samples: usize,
  /// Current state. Loaded by the cpal callback on every invocation
  /// (single atomic load is the lightweight check that gates the
  /// pop loop); written by the producer (`start`, `pause`, `resume`,
  /// `stop`).
  state: AtomicU8,
  /// **One-way terminal latch**, set by [`AudioPlayer::stop`] and
  /// never cleared. Independent of [`SharedState::state`] (which is a
  /// tri-state playback flag the cpal callback gates on every
  /// invocation) because the terminal contract is asymmetric: once
  /// `stop()` returns, NO subsequent `start()` / `pause()` /
  /// `resume()` / `write_samples()` may "rehydrate" the player to a
  /// live state and accept further producer writes. Without this
  /// separate latch, `start()` storing `STATE_RUNNING` after `stop()`
  /// would mask the terminated condition from the producer-side
  /// `STATE_STOPPED` gate in `write_samples()` and let post-stop
  /// chunks accumulate + play on the re-started cpal stream — a
  /// silent violation of [`super::output_stream::AudioOutputStream::stop`]'s
  /// "MUST NOT silently accept post-stop writes" clause.
  terminated: AtomicBool,
  /// Current volume scalar, stored as `f32::to_bits` in an
  /// `AtomicU32`. Read by the cpal callback every sample; written
  /// by [`AudioPlayer::store_volume`]. Default is 1.0 (unity gain) —
  /// matches Swift's `AVAudioPlayerNode.volume` default.
  volume_bits: AtomicU32,
  /// Captured first error from the cpal stream's `err_fn`. The
  /// callback can't bubble up `Result`, so we stash it here and
  /// surface it on the next producer call (`write_samples`, `flush`,
  /// `pause`, `resume`).
  ///
  /// `Mutex<Option<StreamError>>` — store the cpal error TYPED (not
  /// stringified) so `take_callback_error` can surface it as the typed
  /// [`crate::Error::ExternalOp`] variant with `cpal::StreamError` boxed
  /// into the source chain. If multiple device events fire we keep the
  /// first one. Cleared by [`AudioPlayer::stop`].
  callback_error: Mutex<Option<StreamError>>,
}

impl SharedState {
  /// Build a [`SharedState`] with a queue pre-allocated to its full
  /// `queue_capacity_samples` bound. Pre-allocation at construction
  /// (rather than on first producer write) keeps the producer-loop
  /// lock window in [`AudioPlayer::write_samples`] to a pure
  /// `VecDeque::extend` (O(chunk) memcpy with NO realloc possible),
  /// so the cpal callback's `try_lock` window can't be inflated by
  /// allocator time on a growing queue.
  ///
  /// # Errors
  /// - [`Error::AllocFailure`] if `try_reserve_exact` fails on the bounded
  ///   queue capacity (e.g. tiny-RAM or fragmented-heap on the
  ///   caller's host).
  fn new(queue_capacity_samples: usize) -> Result<Self> {
    let mut queue = VecDeque::new();
    queue
      .try_reserve_exact(queue_capacity_samples)
      .map_err(|e| {
        Error::AllocFailure(AllocFailurePayload::new(
          "AudioPlayer::with_device: pre-allocate queue capacity",
          "samples",
          queue_capacity_samples as u64,
          e,
        ))
      })?;
    Ok(Self {
      queue: Mutex::new(queue),
      queue_capacity_samples,
      state: AtomicU8::new(STATE_STOPPED),
      terminated: AtomicBool::new(false),
      volume_bits: AtomicU32::new(1.0_f32.to_bits()),
      callback_error: Mutex::new(None),
    })
  }

  #[inline(always)]
  fn load_volume(&self) -> f32 {
    f32::from_bits(self.volume_bits.load(Ordering::Relaxed))
  }

  /// Unconditionally drain the producer-visible state (queue +
  /// captured callback error) under poison-recovering locks. Called
  /// by [`AudioPlayer::stop`] AFTER the latch + state writes and
  /// AFTER the cpal `Stream::pause()` attempt (whose result is
  /// captured separately) — see the comment on `stop()` for why this
  /// must run even when pause fails.
  ///
  /// Poison-recover (`into_inner`) on both locks: a panicked cpal
  /// callback could have poisoned either, and on stop we MUST still
  /// be able to clear them so a subsequent observer (e.g. the
  /// integration test's `buffer_depth()`) sees an empty queue and
  /// the post-stop producer-call gate doesn't surface a stale
  /// captured error.
  fn stop_cleanup(&self) {
    {
      let mut q = match self.queue.lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
      };
      q.clear();
    }
    {
      let mut e = match self.callback_error.lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
      };
      *e = None;
    }
  }
}

/// Cpal-backed device player.
///
/// See the module-level docs for the cpal-callback + buffer-queue
/// plumbing diagram and the explicit list of Swift-side capabilities
/// this player scopes out (input, file I/O, format conversion,
/// crossfade, per-buffer completions, `@Published` properties).
pub struct AudioPlayer {
  /// The cpal output stream. `None` only between
  /// [`AudioPlayer::stop`] + `Drop` (we tear the stream down on
  /// `stop` so we can rebuild a fresh one on a subsequent `start`,
  /// matching Swift's `startStreaming` ↔ `stopStreaming` lifecycle).
  ///
  /// The current implementation builds the stream once at
  /// construction time and re-uses it for the full lifetime of the
  /// player (cpal Streams support `play()` / `pause()`); we still
  /// keep this `Option` so `Drop` can take + drop it explicitly
  /// before the `SharedState` so the cpal callback thread is joined
  /// while the queue + atomics are still live.
  ///
  /// `cpal::Stream` is `Send + Sync` (per cpal 0.17.x docs) so the
  /// `AudioPlayer` can cross thread boundaries — the playback pipeline
  /// can drive a player from any thread.
  stream: Option<Stream>,
  /// Shared callback + producer state. See [`SharedState`].
  shared: Arc<SharedState>,
  /// Stored config; consulted by [`AudioPlayer::config`] introspection
  /// + the [`AudioOutputStream`] impl.
  config: PlaybackConfig,
}

impl AudioPlayer {
  /// Build an [`AudioPlayer`] bound to the default output device on
  /// the default cpal host. Mirrors the Swift
  /// `AudioPlayer.startStreaming(sampleRate:)` entry point (which
  /// implicitly uses `AVAudioEngine`'s default output node).
  ///
  /// The cpal stream is **built but not started** — call
  /// [`AudioPlayer::start`] before pushing samples. This matches the
  /// Swift split between `startStreaming` (engine prep) and
  /// `playerNode.play()` (actual playback).
  ///
  /// # Errors
  /// - [`Error::ExternalOp`] if cpal has no default output device, or the cpal
  ///   stream build fails (CoreAudio init failure, unsupported sample rate, etc.).
  /// - [`Error::InvariantViolation`] / [`Error::OutOfRange`] if config validation
  ///   fails (e.g. zero channels, or a non-F32 sample format) — delegated to
  ///   [`AudioPlayer::with_device`] / [`PlaybackConfig::cpal_config`].
  pub fn new(config: PlaybackConfig) -> Result<Self> {
    let host = cpal::default_host();
    let device = host.default_output_device().ok_or_else(|| {
      Error::ExternalOp(ExternalOpPayload::new(
        "AudioPlayer: no default cpal output device available",
        "cpal default_output_device",
        std::io::Error::other("host returned None"),
      ))
    })?;
    Self::with_device(&device, config)
  }

  /// Build an [`AudioPlayer`] bound to an explicit cpal device.
  /// Useful when the caller has already enumerated cpal devices and
  /// wants to target a specific one (the Swift API has no direct
  /// analog — `AVAudioEngine` always uses the system default — but
  /// cpal's multi-device support is a natural extension here).
  ///
  /// # Errors
  /// - [`Error::InvariantViolation`] / [`Error::OutOfRange`] if [`PlaybackConfig::cpal_config`]
  ///   rejects the config (zero channels / unsupported format), or [`Error::ExternalOp`]
  ///   if the cpal stream build fails.
  pub fn with_device(device: &cpal::Device, config: PlaybackConfig) -> Result<Self> {
    if !matches!(config.sample_format(), SampleFormat::F32) {
      return Err(Error::OutOfRange(OutOfRangePayload::new(
        "AudioPlayer: sample_format (non-F32 device negotiation reserved for a follow-up)",
        "must be SampleFormat::F32",
        format_smolstr!("{:?}", config.sample_format()),
      )));
    }

    let stream_config = config.cpal_config()?;

    // Defer error construction (`.ok_or_else` not `.ok_or`) so the
    // success path doesn't pay the allocation; preserve both operands
    // (queue_capacity_frames, channels) via `with_operands` so the
    // diagnostic identifies which input drove the overflow.
    let queue_frames = config.queue_capacity_frames();
    let channels = usize::from(config.channels().count());
    let queue_capacity_samples = queue_frames.checked_mul(channels).ok_or_else(|| {
      Error::ArithmeticOverflow(ArithmeticOverflowPayload::with_operands(
        "AudioPlayer: queue_capacity_frames * channels",
        "usize",
        [
          ("queue_capacity_frames", queue_frames as u64),
          ("channels", channels as u64),
        ],
      ))
    })?;

    let shared = Arc::new(SharedState::new(queue_capacity_samples)?);

    // cpal callback (audio I/O thread). Pulls from the queue, scales
    // by current volume, writes silence on underrun. Cloned `Arc`
    // moved into the `'static` closure cpal requires.
    let cb_shared = Arc::clone(&shared);
    let data_callback = move |out: &mut [f32], _: &cpal::OutputCallbackInfo| {
      let state = cb_shared.state.load(Ordering::Acquire);
      if state != STATE_RUNNING {
        // Paused / stopped — emit silence. (Cpal pauses the
        // callback on `Stream::pause()`, but the producer may also
        // toggle our `state` flag; the dual gate is intentional.)
        for s in out.iter_mut() {
          *s = 0.0;
        }
        return;
      }
      let volume = cb_shared.load_volume();
      // Single short critical section: drain into the cpal buffer.
      // We don't hold the lock across `*s = ...` arithmetic outside
      // this scope.
      //
      // `try_lock` (not blocking `lock`): the callback runs on the
      // real-time audio I/O thread and MUST NOT block on a
      // producer-held mutex past the device callback deadline. On
      // contention, emit silence for this period — the cost is one
      // underrun, the alternative (blocking) is unbounded latency
      // that compounds across device periods. See the module-level
      // `## Concurrency` doc-comment for the full contract.
      let mut q = match cb_shared.queue.try_lock() {
        Ok(g) => g,
        Err(std::sync::TryLockError::WouldBlock) => {
          // Producer holds the lock; emit silence rather than block
          // past the device deadline.
          for s in out.iter_mut() {
            *s = 0.0;
          }
          return;
        }
        Err(std::sync::TryLockError::Poisoned(poisoned)) => poisoned.into_inner(),
      };
      let drain_n = out.len().min(q.len());
      for slot in out.iter_mut().take(drain_n) {
        // pop_front is O(1) for VecDeque; the loop is the cpal
        // equivalent of the Swift `AVAudioPCMBuffer` per-buffer copy.
        let sample = q.pop_front().unwrap_or(0.0);
        *slot = sample * volume;
      }
      // Drop the lock before zeroing the tail — silence-on-underrun
      // doesn't need the queue.
      drop(q);
      for slot in out.iter_mut().skip(drain_n) {
        *slot = 0.0;
      }
    };

    // cpal `err_fn`. Stash the first TYPED `StreamError`; surface it
    // on the next producer call as `Error::ExternalOp` so the original
    // cpal error chain is preserved (no `format!`-stringification —
    // callers branching on `payload.inner().downcast_ref::<StreamError>()`
    // can recover the original device-backend variant). We don't have a
    // logger dep in mlxrs, so silent capture is the chosen behavior
    // (the producer will see it).
    let err_shared = Arc::clone(&shared);
    let err_callback = move |err: StreamError| {
      let mut slot = match err_shared.callback_error.lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
      };
      if slot.is_none() {
        *slot = Some(err);
      }
    };

    let stream = device
      .build_output_stream(&stream_config, data_callback, err_callback, None)
      .map_err(|e| {
        Error::ExternalOp(ExternalOpPayload::new(
          "AudioPlayer: cpal build_output_stream failed",
          "cpal stream config",
          e,
        ))
      })?;

    Ok(Self {
      stream: Some(stream),
      shared,
      config,
    })
  }

  /// The config the player was built with. Returns by value (Copy).
  #[inline(always)]
  #[must_use]
  pub fn config(&self) -> PlaybackConfig {
    self.config
  }

  /// Number of samples currently queued for playback (the cpal
  /// equivalent of Swift's `queuedBuffers * buffer.frameLength` sum,
  /// in samples not frames).
  #[inline(always)]
  #[must_use]
  pub fn buffer_depth(&self) -> usize {
    match self.shared.queue.lock() {
      Ok(g) => g.len(),
      Err(poisoned) => poisoned.into_inner().len(),
    }
  }

  /// `true` if [`AudioPlayer::start`] has been called and neither
  /// [`AudioPlayer::pause`] nor [`AudioPlayer::stop`] has run since.
  /// Mirrors the Swift `isPlaying` getter on the streaming branch.
  #[inline(always)]
  #[must_use]
  pub fn is_running(&self) -> bool {
    self.shared.state.load(Ordering::Acquire) == STATE_RUNNING
  }

  /// `true` if the player is in [`STATE_PAUSED`] (cpal stream is
  /// `pause()`d, queue retains samples; Swift's `playerNode.pause()`
  /// branch of `AudioPlayer.pause()`).
  #[inline(always)]
  #[must_use]
  pub fn is_paused(&self) -> bool {
    self.shared.state.load(Ordering::Acquire) == STATE_PAUSED
  }

  /// Current output volume, default 1.0. Mirrors
  /// `AVAudioPlayerNode.volume`.
  #[inline(always)]
  #[must_use]
  pub fn volume(&self) -> f32 {
    self.shared.load_volume()
  }

  /// Atomically store the output volume. Clamped to `[0.0, 1.0]` — values
  /// outside the range are clamped silently (matches the
  /// `AVAudioPlayerNode.volume` 0..1 documented range).
  ///
  /// **Non-finite inputs (NaN, ±∞) are mapped to `0.0`** rather than
  /// propagated. `f32::clamp` preserves NaN bits, which would cause
  /// the callback's `sample * volume` arithmetic to emit NaN samples
  /// (audible as full-scale noise on most DACs); silent safe-default
  /// matches the unsigned-clamp idiom used elsewhere in mlxrs for
  /// user-supplied scalars.
  ///
  /// Named `store_volume` (not `set_volume`) to signal atomic-write
  /// semantics rather than fluent-builder semantics — this is a global
  /// side-effect on the player, not a `&mut self` field setter.
  ///
  /// Takes `&self` (not `&mut self`) so the volume can be adjusted
  /// concurrently with [`AudioPlayer::write_samples`] without
  /// shadowing the producer borrow — useful when a UI thread tweaks
  /// volume while a worker thread is pumping samples.
  pub fn store_volume(&self, vol: f32) {
    let sanitized = sanitize_volume(vol);
    self
      .shared
      .volume_bits
      .store(sanitized.to_bits(), Ordering::Release);
  }

  /// Start the cpal stream — samples written via
  /// [`AudioPlayer::write_samples`] start flowing to the device.
  /// Mirrors the Swift `playerNode.play()` call inside
  /// `startStreaming`.
  ///
  /// Idempotent: calling `start` on a running player is a no-op
  /// (returns `Ok(())`). Calling `start` after `pause` resumes
  /// playback (equivalent to [`AudioPlayer::resume`]).
  ///
  /// **Terminal-state contract.** [`AudioPlayer::stop`] is a
  /// one-way terminal transition; once `stop()` returns, `start()`
  /// rejects with [`Error::InvariantViolation`] ("...called on terminated
  /// player...") rather than re-arming the producer surface. The
  /// caller MUST construct a fresh [`AudioPlayer`] to resume
  /// playback. The cpal stream is preserved across `stop()` only so
  /// `Drop` can join the I/O thread cleanly — it is NOT a hook for
  /// restarting the producer pipeline.
  ///
  /// # Errors
  /// - [`Error::InvariantViolation`] if [`AudioPlayer::stop`] has already been
  ///   called on this player (one-way terminal latch) or the stream is dropped.
  /// - [`Error::ExternalOp`] if the cpal `Stream::play()` call fails.
  pub fn start(&mut self) -> Result<()> {
    // One-way terminal latch. Checked FIRST so a post-stop
    // `start()` doesn't re-arm `state = STATE_RUNNING` and let
    // subsequent `write_samples()` slip past its `STATE_STOPPED`
    // gate. Acquire-load pairs with the Release-store in `stop()`.
    if self.shared.terminated.load(Ordering::Acquire) {
      return Err(Error::InvariantViolation(InvariantViolationPayload::new(
        "AudioPlayer::start",
        "called on terminated player — construct a new AudioPlayer",
      )));
    }
    self.take_callback_error()?;
    let stream = self.stream.as_ref().ok_or_else(|| {
      Error::InvariantViolation(InvariantViolationPayload::new(
        "AudioPlayer::start",
        "stream has been dropped (post-stop)",
      ))
    })?;
    stream.play().map_err(|e| {
      Error::ExternalOp(ExternalOpPayload::new(
        "AudioPlayer::start: cpal play() failed",
        "cpal stream",
        e,
      ))
    })?;
    self.shared.state.store(STATE_RUNNING, Ordering::Release);
    Ok(())
  }

  /// Pause playback. The cpal stream is `pause()`d and the queue
  /// retains its samples; subsequent [`AudioPlayer::write_samples`]
  /// calls still buffer into the queue but no audio is emitted.
  /// Mirrors `MLXAudioCore.AudioPlayer.pause()` (streaming branch).
  ///
  /// **Terminal-state contract.** Rejects on a terminated player
  /// (see [`AudioPlayer::start`] / [`AudioPlayer::stop`]).
  ///
  /// # Errors
  /// - [`Error::InvariantViolation`] if [`AudioPlayer::stop`] has already been
  ///   called on this player (one-way terminal latch).
  /// - [`Error::ExternalOp`] if the cpal `Stream::pause()` call fails.
  pub fn pause(&mut self) -> Result<()> {
    if self.shared.terminated.load(Ordering::Acquire) {
      return Err(Error::InvariantViolation(InvariantViolationPayload::new(
        "AudioPlayer::pause",
        "called on terminated player — construct a new AudioPlayer",
      )));
    }
    self.take_callback_error()?;
    let stream = self.stream.as_ref().ok_or_else(|| {
      Error::InvariantViolation(InvariantViolationPayload::new(
        "AudioPlayer::pause",
        "stream has been dropped (post-stop)",
      ))
    })?;
    stream.pause().map_err(|e| {
      Error::ExternalOp(ExternalOpPayload::new(
        "AudioPlayer::pause: cpal pause() failed",
        "cpal stream",
        e,
      ))
    })?;
    self.shared.state.store(STATE_PAUSED, Ordering::Release);
    Ok(())
  }

  /// Resume from [`AudioPlayer::pause`]. Mirrors Swift's
  /// `togglePlayPause()` resuming branch (`playerNode.play()` +
  /// `isPlaying = true`).
  ///
  /// **Terminal-state contract.** Rejects on a terminated player
  /// (see [`AudioPlayer::start`] / [`AudioPlayer::stop`]). The
  /// dedicated `resume`-named error message keeps the call-site
  /// signal clear (the producer that called `resume()` after a
  /// stop got the same one-way-latch rejection `start()` would
  /// have surfaced).
  ///
  /// # Errors
  /// - [`Error::InvariantViolation`] if [`AudioPlayer::stop`] has already been
  ///   called on this player (one-way terminal latch).
  /// - [`Error::ExternalOp`] if the cpal `Stream::play()` call fails.
  pub fn resume(&mut self) -> Result<()> {
    if self.shared.terminated.load(Ordering::Acquire) {
      return Err(Error::InvariantViolation(InvariantViolationPayload::new(
        "AudioPlayer::resume",
        "called on terminated player — construct a new AudioPlayer",
      )));
    }
    self.start()
  }

  /// Stop playback immediately. Drops every queued sample, pauses
  /// the cpal stream, and clears any captured callback error.
  /// Mirrors `stopStreaming()`.
  ///
  /// **Terminal-state contract.** `stop()` is a **one-way terminal
  /// transition**: after it returns, every subsequent producer-side
  /// method ([`AudioPlayer::start`], [`AudioPlayer::pause`],
  /// [`AudioPlayer::resume`], [`AudioPlayer::write_samples`])
  /// rejects with [`Error::InvariantViolation`] containing
  /// "terminated"/"after stop()" so late producer chunks MUST NOT
  /// accumulate silently and replay on a later `start()` — honoring
  /// the [`super::output_stream::AudioOutputStream::stop`] contract.
  /// The one-way latch (`SharedState::terminated`) is checked BEFORE
  /// `state` on every entry so a `start(); stop(); start();`
  /// sequence cannot re-arm `state = STATE_RUNNING` and slip
  /// post-stop writes past the producer gate.
  ///
  /// The cpal stream is preserved across `stop()` only so `Drop` can
  /// join the I/O thread cleanly — it is NOT a hook for restarting
  /// the producer pipeline. The caller MUST construct a fresh
  /// [`AudioPlayer`] to resume playback. Pause is the soft-state
  /// alternative (see [`AudioPlayer::pause`] — pause-state writes
  /// still buffer for [`AudioPlayer::resume`]).
  ///
  /// `stop()` itself is idempotent and ALWAYS succeeds at moving
  /// the player to the terminated state — even if the underlying
  /// cpal `Stream::pause()` returns an error, the latch is set
  /// FIRST so re-entry on a half-stopped player is consistently
  /// rejected.
  ///
  /// # Errors
  /// - [`Error::ExternalOp`] if the cpal `Stream::pause()` call fails.
  ///   Note: the terminal latch is already set by this point, so
  ///   subsequent producer calls still reject correctly.
  pub fn stop(&mut self) -> Result<()> {
    // Set the one-way terminal latch FIRST (and unconditionally).
    // Any subsequent producer-side call (including a re-entrant
    // `stop()` on a poisoned half-stopped player) checks this latch
    // BEFORE `state`, so the terminal contract holds even if the
    // cpal `Stream::pause()` below returns an error. Release-store
    // pairs with Acquire-loads in `start` / `pause` / `resume` /
    // `write_samples`.
    self.shared.terminated.store(true, Ordering::Release);
    self.shared.state.store(STATE_STOPPED, Ordering::Release);

    // Capture the cpal pause result WITHOUT `?`-propagating — we
    // must still run the unconditional queue + callback-error
    // cleanup below even if pause fails. An `?` here would
    // skip cleanup: the latch + state were terminated but the
    // VecDeque still held queued samples, so the cpal callback
    // (running until `Drop`) would keep emitting silence-on-stopped
    // while the queue lingered. The contract is: stop() always
    // tears down the queue + error slot; the pause error (if any)
    // is reported AFTER the cleanup runs.
    let pause_result = if let Some(stream) = self.stream.as_ref() {
      stream.pause().map_err(|e| {
        Error::ExternalOp(ExternalOpPayload::new(
          "AudioPlayer::stop: cpal pause() failed",
          "cpal stream",
          e,
        ))
      })
    } else {
      Ok(())
    };

    // UNCONDITIONAL queue + callback-error cleanup. Poison-recover
    // (into_inner) rather than fail-silent on a poisoned lock so a
    // panicked callback can't leave stale samples / errors behind
    // post-stop. This MUST run regardless of the pause result above.
    self.shared.stop_cleanup();

    pause_result
  }

  /// Push interleaved PCM samples into the playback queue. Returns
  /// the number of samples accepted (`= samples.len()` on success).
  ///
  /// Surfaces a pending callback error (cpal `err_fn` capture) if
  /// one is queued — the next producer call after a device error
  /// receives the error report.
  ///
  /// Internally splits `samples` into [`WRITE_CHUNK_MAX`]-sized
  /// inner-loop chunks, each taking the queue lock for its own
  /// `extend`. This bounds the duration the audio callback's
  /// `try_lock` would have to wait on the producer to ~one chunk's
  /// extend (microseconds), so a multi-second producer payload
  /// can't stall the cpal callback past the device deadline. See
  /// the module-level `## Concurrency` doc-comment.
  ///
  /// # Errors
  /// - [`Error::InvariantViolation`] if [`AudioPlayer::stop`] has been called
  ///   on this player (one-way terminal latch — writes after stop are rejected)
  ///   or if the player is in `STATE_STOPPED` (call `start()` first).
  /// - [`Error::CapExceeded`] if the queue would overflow
  ///   [`PlaybackConfig::queue_capacity_frames`] × channel count.
  ///   The write is rejected wholesale — no partial accept on
  ///   overflow.
  /// - [`Error::ExternalOp`] if a prior cpal callback error was captured.
  pub fn write_samples(&mut self, samples: &[f32]) -> Result<usize> {
    // One-way terminal latch FIRST, before reading `state`.
    // A naive `state == STATE_STOPPED` gate is insufficient because
    // `start()` unconditionally re-arms `state = STATE_RUNNING`; the
    // sequence `start(); stop(); start(); write_samples(...)` would
    // otherwise slip past the gate and silently replay post-stop
    // chunks on the re-armed stream. Acquire-load pairs with the
    // Release-store in `stop()`.
    if self.shared.terminated.load(Ordering::Acquire) {
      return Err(Error::InvariantViolation(InvariantViolationPayload::new(
        "AudioPlayer::write_samples",
        "called after stop() — player is terminated",
      )));
    }
    self.take_callback_error()?;

    // Pre-`start()` writes are also rejected — the cpal stream isn't
    // play()ing yet, so accumulating samples here would replay on
    // the first `start()` rather than start cleanly.
    let state = self.shared.state.load(Ordering::Acquire);
    if state == STATE_STOPPED {
      return Err(Error::InvariantViolation(InvariantViolationPayload::new(
        "AudioPlayer::write_samples",
        "called on stopped player (STATE_STOPPED); call start() before writing samples",
      )));
    }

    // Whole-payload overflow check up front (one lock acquisition).
    // Chunking is purely a concurrency-shaping concern; it must not
    // weaken the cap.
    {
      let q = match self.shared.queue.lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
      };
      let q_len = q.len();
      let s_len = samples.len();
      let projected_len = q_len.checked_add(s_len).ok_or_else(|| {
        Error::ArithmeticOverflow(ArithmeticOverflowPayload::with_operands(
          "AudioPlayer::write_samples: queue length + new samples",
          "usize",
          [("queue_len", q_len as u64), ("samples_len", s_len as u64)],
        ))
      })?;
      if projected_len > self.shared.queue_capacity_samples {
        return Err(Error::CapExceeded(CapExceededPayload::new(
          "AudioPlayer::write_samples: queue overflow",
          "queue_capacity_samples",
          self.shared.queue_capacity_samples as u64,
          projected_len as u64,
        )));
      }
    }

    // Per-chunk lock acquisition with NO per-chunk
    // `reserve_exact`. The queue is pre-allocated to
    // `queue_capacity_samples` at construction (see
    // `SharedState::new` via `try_reserve_exact`), and the
    // whole-payload overflow check above guarantees
    // `q.len() + samples.len() <= queue_capacity_samples` — so
    // `VecDeque::extend` here is a pure O(chunk.len()) memcpy with
    // NO realloc possible. Allocator time CANNOT inflate the lock
    // window and the cpal callback's `try_lock` window stays
    // bounded by the per-chunk `extend` duration (microseconds at
    // WRITE_CHUNK_MAX = 4096 samples).
    for chunk in samples.chunks(WRITE_CHUNK_MAX) {
      let mut q = match self.shared.queue.lock() {
        Ok(g) => g,
        Err(poisoned) => poisoned.into_inner(),
      };
      q.extend(chunk.iter().copied());
    }
    Ok(samples.len())
  }

  /// Block until the playback queue has drained. The cpal callback
  /// continues to consume samples while this method polls; when the
  /// queue empties, [`AudioPlayer::flush`] returns. Mirrors Swift's
  /// `finishStreamingInput()` → `finishStreamIfDrained()` path.
  ///
  /// The implementation is a bounded poll loop (10ms granularity,
  /// 30s timeout) — cpal has no per-buffer-completion hook so we
  /// can't park on a condvar tied to the callback. The poll cadence
  /// matches Swift's `Timer.scheduledTimer(withTimeInterval: 0.1)`
  /// order of magnitude; the timeout prevents an indefinite block
  /// on a stalled device.
  ///
  /// If the player is not [`STATE_RUNNING`] (stopped or paused) and
  /// the queue is non-empty, this method returns immediately with a
  /// [`Error::OutOfRange`] — flushing a stopped/paused player would
  /// block forever (the callback doesn't drain unless running).
  ///
  /// # Errors
  /// - [`Error::OutOfRange`] if the flush times out or the player is not
  ///   running with a non-empty queue.
  /// - [`Error::ExternalOp`] if a cpal callback error surfaced mid-drain.
  pub fn flush(&mut self) -> Result<()> {
    self.take_callback_error()?;
    let start = Instant::now();
    loop {
      let depth = self.buffer_depth();
      if depth == 0 {
        return Ok(());
      }
      let state = self.shared.state.load(Ordering::Acquire);
      if state != STATE_RUNNING {
        return Err(Error::OutOfRange(OutOfRangePayload::new(
          "AudioPlayer::flush: player state with samples queued (call start() before flush())",
          "must be STATE_RUNNING when queue is non-empty",
          format_smolstr!("state={state}, depth={depth}"),
        )));
      }
      if start.elapsed() > FLUSH_TIMEOUT {
        return Err(Error::OutOfRange(OutOfRangePayload::new(
          "AudioPlayer::flush: drain timeout (samples still queued past FLUSH_TIMEOUT)",
          "FLUSH_TIMEOUT must elapse with queue depth reaching 0",
          format_smolstr!("FLUSH_TIMEOUT={FLUSH_TIMEOUT:?}, depth={depth}"),
        )));
      }
      thread::sleep(FLUSH_POLL_INTERVAL);
      self.take_callback_error()?;
    }
  }

  /// Pull the captured cpal `err_fn` message (if any) and surface it
  /// as a [`Error::ExternalOp`]. Called at the head of every public
  /// producer method.
  fn take_callback_error(&self) -> Result<()> {
    let mut slot = match self.shared.callback_error.lock() {
      Ok(g) => g,
      Err(poisoned) => poisoned.into_inner(),
    };
    if let Some(err) = slot.take() {
      return Err(Error::ExternalOp(ExternalOpPayload::new(
        "AudioPlayer: cpal stream callback (async) error",
        "cpal stream",
        err,
      )));
    }
    Ok(())
  }
}

impl AudioOutputStream for AudioPlayer {
  fn write_samples(&mut self, samples: &[f32]) -> Result<usize> {
    AudioPlayer::write_samples(self, samples)
  }

  fn flush(&mut self) -> Result<()> {
    AudioPlayer::flush(self)
  }

  fn stop(&mut self) -> Result<()> {
    AudioPlayer::stop(self)
  }

  fn is_running(&self) -> bool {
    AudioPlayer::is_running(self)
  }
}

impl Drop for AudioPlayer {
  fn drop(&mut self) {
    // Mark stopped first so the callback sees STATE_STOPPED on its
    // next invocation and stops draining.
    self.shared.state.store(STATE_STOPPED, Ordering::Release);
    // Drop the stream explicitly. `cpal::Stream`'s `Drop` joins the
    // I/O thread (so the data callback is guaranteed dead after
    // this line); doing it explicitly + first means the callback
    // can't observe a half-dropped `SharedState`.
    if let Some(stream) = self.stream.take() {
      // Best-effort pause before drop — on macOS CoreAudio,
      // `Stream::drop` already stops the unit, but pausing first
      // avoids one extra callback hit on `STATE_STOPPED` silence.
      let _ = stream.pause();
      drop(stream);
    }
  }
}

#[cfg(test)]
mod tests {
  //! In-crate unit tests for `AudioPlayer` invariants that need
  //! access to private state (`SharedState::queue`,
  //! `SharedState::queue_capacity_samples`, `SharedState::terminated`,
  //! `SharedState::callback_error`).
  //!
  //! These were previously reachable from `tests/audio_playback.rs`
  //! via `pub #[doc(hidden)] _test_*` accessors on `AudioPlayer`,
  //! which leaked into the crate's public API surface (downstream
  //! crates could call them in release builds, and removing them
  //! later would be a SemVer break). Moving the tests here keeps the
  //! private invariant under in-crate test coverage without any
  //! `pub` accessor.
  //!
  //! Device-touching tests (those that call `AudioPlayer::new`,
  //! which opens a cpal output stream) remain
  //! `#[ignore = "requires real default audio output device"]` so CI
  //! without an audio device still passes — they run locally via
  //! `cargo test --features audio -- --ignored`.
  use super::*;
  use crate::audio::playback::config::{ChannelLayout, PlaybackConfig, SampleFormat};

  /// `stop()`'s unconditional cleanup path:
  /// `SharedState::stop_cleanup` MUST drain a non-empty queue + any
  /// captured callback error to None, regardless of how it was
  /// invoked. This is the cleanup branch `AudioPlayer::stop` runs
  /// AFTER capturing the cpal pause result; if pause errs, this
  /// still runs (the `Result` is returned at the end, not via `?`).
  ///
  /// Constructs `SharedState` directly (no cpal stream needed) so
  /// the test runs in CI without an audio device — this is the
  /// "struct mock" path used when injecting a real
  /// cpal pause failure is impractical (cpal `Stream::pause()` on
  /// macOS CoreAudio doesn't err on a healthy device, and there's
  /// no public hook to swap in a faulty stream).
  #[test]
  fn shared_state_stop_cleanup_drains_queue_and_clears_error_unconditionally() {
    let shared = SharedState::new(4096).expect("pre-allocate test queue");

    // Pre-populate the queue + callback_error to non-empty so the
    // cleanup's effect is observable.
    {
      let mut q = shared.queue.lock().unwrap();
      q.extend([0.1_f32, 0.2, 0.3, 0.4]);
    }
    {
      let mut e = shared.callback_error.lock().unwrap();
      // The slot is now typed `Option<StreamError>` (to
      // surface typed `Error::ExternalOp` from the async cpal callback
      // path); use a real `StreamError` variant.
      *e = Some(StreamError::DeviceNotAvailable);
    }

    // Mirror `AudioPlayer::stop`'s ordering: latch FIRST, then
    // state, then (in stop() — captured pause result, then) the
    // unconditional cleanup. We exercise the cleanup branch
    // directly here.
    shared.terminated.store(true, Ordering::Release);
    shared.state.store(STATE_STOPPED, Ordering::Release);
    shared.stop_cleanup();

    // Latch + state were set before cleanup — confirm they survive.
    assert!(
      shared.terminated.load(Ordering::Acquire),
      "terminated latch must be set (stop ordering: latch -> state -> cleanup)"
    );
    assert_eq!(
      shared.state.load(Ordering::Acquire),
      STATE_STOPPED,
      "state must be STOPPED post-stop"
    );

    // The unconditional cleanup MUST have drained the queue + cleared the
    // captured callback error — both observable to the producer-side gates
    // (`buffer_depth()` reads `queue.len()`; `take_callback_error()` reads
    // `callback_error`).
    assert_eq!(
      shared.queue.lock().unwrap().len(),
      0,
      "stop_cleanup must drain the queue unconditionally (this branch \
       runs even when cpal pause errs — an early `?` on pause would \
       skip this and leave samples lingering until Drop)"
    );
    assert!(
      shared.callback_error.lock().unwrap().is_none(),
      "stop_cleanup must clear captured callback_error unconditionally"
    );
  }

  /// Corollary: `stop_cleanup` is also poison-safe — a panicked
  /// callback that poisoned the queue or callback_error lock must
  /// not prevent stop from draining. We simulate poisoning by
  /// running a closure that panics while holding the lock.
  #[test]
  fn shared_state_stop_cleanup_recovers_from_poisoned_locks() {
    use std::{panic, sync::Arc};

    let shared = Arc::new(SharedState::new(64).expect("pre-allocate test queue"));

    // Poison the queue lock from another thread by panicking while
    // holding it. The Mutex transitions to Poisoned state; subsequent
    // `lock()` returns `Err(PoisonError)`.
    let queue_poisoner = Arc::clone(&shared);
    let _ = std::thread::spawn(move || {
      let _g = queue_poisoner.queue.lock().unwrap();
      panic!("simulated callback panic poisoning queue lock");
    })
    .join();

    // Poison the callback_error lock similarly.
    let err_poisoner = Arc::clone(&shared);
    let _ = std::thread::spawn(move || {
      let _g = err_poisoner.callback_error.lock().unwrap();
      panic!("simulated callback panic poisoning callback_error lock");
    })
    .join();

    assert!(
      shared.queue.is_poisoned(),
      "test setup: queue lock should be poisoned"
    );
    assert!(
      shared.callback_error.is_poisoned(),
      "test setup: callback_error lock should be poisoned"
    );

    // Now exercise stop_cleanup: it MUST NOT propagate the
    // PoisonError (no `unwrap`) and MUST still drain.
    let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
      shared.stop_cleanup();
    }));
    assert!(
      result.is_ok(),
      "stop_cleanup must not panic on poisoned locks (poison-recover via into_inner)"
    );
  }

  /// Queue
  /// is pre-allocated to its full `queue_capacity_samples` bound at
  /// construction time, NOT on first producer write. Asserts the
  /// `try_reserve_exact` contract ("AT LEAST `additional` more").
  ///
  /// Lives in-crate so it can read the private
  /// `shared.queue.capacity()` + `shared.queue_capacity_samples`
  /// without needing the previously-leaked `pub _test_*` accessors.
  /// Device-gated: `AudioPlayer::new` opens a cpal output stream.
  #[cfg(target_os = "macos")]
  #[test]
  #[ignore = "requires real default audio output device"]
  fn audio_player_pre_allocates_queue_capacity_at_construction() {
    let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
      .with_queue_capacity_frames(4096);
    let player = AudioPlayer::new(cfg).unwrap();

    assert_eq!(player.buffer_depth(), 0);
    let cap_samples = player.shared.queue_capacity_samples;
    assert_eq!(
      cap_samples, 4096,
      "queue cap = frames * channels = 4096 * 1"
    );
    let underlying = match player.shared.queue.lock() {
      Ok(g) => g.capacity(),
      Err(poisoned) => poisoned.into_inner().capacity(),
    };
    assert!(
      underlying >= cap_samples,
      "VecDeque underlying capacity ({underlying}) must be >= bounded cap ({cap_samples}) per \
       try_reserve_exact contract"
    );
  }

  /// Producer-side `write_samples` MUST NOT grow the underlying
  /// VecDeque capacity. The bound is pre-allocated at construction,
  /// so `extend` is a pure O(chunk) memcpy with no realloc inside
  /// the producer's lock window (the cpal callback's `try_lock`
  /// can't be inflated by allocator time).
  #[cfg(target_os = "macos")]
  #[test]
  #[ignore = "requires real default audio output device"]
  fn audio_player_write_samples_does_not_grow_queue_capacity_during_playback() {
    let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
      .with_queue_capacity_frames(4096);
    let mut player = AudioPlayer::new(cfg).unwrap();
    player.start().unwrap();
    // pause() so the callback doesn't drain the just-pushed samples
    // and trigger a buffer-depth race in the post-write capacity
    // read.
    player.pause().unwrap();

    let cap_before = match player.shared.queue.lock() {
      Ok(g) => g.capacity(),
      Err(poisoned) => poisoned.into_inner().capacity(),
    };
    let cap_samples = player.shared.queue_capacity_samples;

    // Push 1024 samples — well under the 4096 bound, so `extend`
    // is a pure memcpy with no realloc.
    player.write_samples(&[0.25_f32; 1024]).unwrap();

    let cap_after = match player.shared.queue.lock() {
      Ok(g) => g.capacity(),
      Err(poisoned) => poisoned.into_inner().capacity(),
    };
    assert_eq!(
      cap_after, cap_before,
      "queue capacity grew during write_samples (before={cap_before}, after={cap_after}) — \
       producer-loop `extend` must not realloc because the queue is pre-allocated to \
       queue_capacity_samples ({cap_samples}) at construction"
    );

    let _ = player.stop();
  }

  // ── SharedState::new — error + success field invariants ────────────────
  //
  // Device-free: `SharedState::new` is the queue-preallocation step
  // `AudioPlayer::with_device` runs BEFORE building the cpal stream, so
  // both its `try_reserve_exact` error branch and its success-path field
  // initialization are exercisable without opening an audio device.

  /// `SharedState::new` MUST surface a `try_reserve_exact` failure as a
  /// typed [`Error::AllocFailure`] (not a panic / abort). A
  /// `usize::MAX` requested capacity overflows the
  /// `usize::MAX * size_of::<f32>()` byte-size computation inside
  /// `VecDeque::try_reserve_exact`, which returns
  /// `Err(TryReserveError { kind: CapacityOverflow })` — the fallible
  /// reservation that this branch maps to `AllocFailure`. No real
  /// allocation is attempted (the overflow is caught pre-allocation),
  /// so the test is deterministic on any host.
  ///
  /// Oracle is the literal payload the source constructs at the
  /// reservation site (context / item / count), matched independently
  /// of calling the function to derive any of them.
  #[test]
  fn shared_state_new_allocfailure_on_overflowing_capacity() {
    let err = SharedState::new(usize::MAX)
      .err()
      .expect("usize::MAX sample capacity must overflow try_reserve_exact -> AllocFailure");
    match err {
      Error::AllocFailure(p) => {
        assert_eq!(
          p.context(),
          "AudioPlayer::with_device: pre-allocate queue capacity",
          "AllocFailure context must name the queue-preallocation site"
        );
        assert_eq!(p.item(), "samples", "AllocFailure item must be \"samples\"");
        assert_eq!(
          p.count(),
          usize::MAX as u64,
          "AllocFailure count must echo the overflowing capacity argument"
        );
      }
      other => panic!("expected Error::AllocFailure, got {other:?}"),
    }
  }

  /// `SharedState::new` success path stores the bounded sample cap and
  /// initializes every atomic to its documented default: `state =
  /// STATE_STOPPED`, `terminated = false`, `volume_bits = 1.0`
  /// (unity gain, matching Swift's `AVAudioPlayerNode.volume` default),
  /// `callback_error = None`, and an empty queue. Oracles are the
  /// documented constants, not reads derived from the constructor.
  #[test]
  fn shared_state_new_success_initializes_default_fields() {
    let shared = SharedState::new(2048).expect("small bounded capacity reserves fine");

    assert_eq!(
      shared.queue_capacity_samples, 2048,
      "queue_capacity_samples must be stored verbatim from the constructor argument"
    );
    assert_eq!(
      shared.state.load(Ordering::Acquire),
      STATE_STOPPED,
      "freshly-built SharedState must start STATE_STOPPED (built-but-not-playing)"
    );
    assert!(
      !shared.terminated.load(Ordering::Acquire),
      "terminated latch must start cleared (not yet stopped)"
    );
    // Volume default is unity gain. Compare against the independently
    // known constant 1.0, decoded through the same f32-bits transport
    // the cpal callback uses.
    assert_eq!(
      f32::from_bits(shared.volume_bits.load(Ordering::Relaxed)),
      1.0_f32,
      "volume_bits default must decode to unity gain (1.0)"
    );
    assert!(
      shared.callback_error.lock().unwrap().is_none(),
      "callback_error must start empty (no captured device error)"
    );
    assert_eq!(
      shared.queue.lock().unwrap().len(),
      0,
      "queue must start empty regardless of reserved capacity"
    );
  }

  /// `SharedState::load_volume` decodes the `AtomicU32`-stored f32 bits
  /// the cpal callback reads each sample. Round-trip a known,
  /// exactly-representable f32 (`0.375 = 3/8`) through a raw
  /// `volume_bits.store(..)` and assert `load_volume` returns it
  /// bit-exact. The store side is a plain atomic write (NOT the fn
  /// under test), so the expected value is independent of `load_volume`
  /// itself. `0.375` is chosen because it has an exact binary f32
  /// representation, so the assertion can use `==` without an epsilon.
  #[test]
  fn shared_state_load_volume_decodes_stored_bits() {
    let shared = SharedState::new(16).expect("tiny capacity reserves fine");

    // Default before any store: unity gain.
    assert_eq!(
      shared.load_volume(),
      1.0_f32,
      "load_volume must decode the unity-gain default"
    );

    // Store a known exact value via the raw atomic (independent of the
    // fn under test) and confirm load_volume decodes it bit-exact.
    shared
      .volume_bits
      .store(0.375_f32.to_bits(), Ordering::Relaxed);
    assert_eq!(
      shared.load_volume(),
      0.375_f32,
      "load_volume must bit-exactly decode a stored f32 (0.375 is exactly representable)"
    );

    // Zero round-trips too (the sanitized non-finite/clamped-negative
    // sink value the callback would multiply by).
    shared
      .volume_bits
      .store(0.0_f32.to_bits(), Ordering::Relaxed);
    assert_eq!(
      shared.load_volume(),
      0.0_f32,
      "load_volume must decode a stored 0.0 (silence gain)"
    );
  }

  // ── Device-reachable producer-gate error paths ─────────────────────────
  //
  // These two error branches need a constructed `AudioPlayer` (hence a
  // real cpal output device) but ARE reachable on a healthy device — the
  // CI-skipped, locally-runnable `#[ignore]` gate matches the existing
  // real-device tests in this module. They cover the pre-`start()`
  // write rejection and the flush-while-not-RUNNING-with-queued-samples
  // rejection that the happy-path real-device tests don't hit.

  /// `write_samples` BEFORE `start()` (state still `STATE_STOPPED`,
  /// terminated latch clear) MUST reject with an
  /// [`Error::InvariantViolation`] naming the stopped state — pre-start
  /// writes would otherwise accumulate and replay on the first
  /// `start()`. Independent oracle: the literal context + requirement
  /// strings the source constructs at this gate.
  #[cfg(target_os = "macos")]
  #[test]
  #[ignore = "requires real default audio output device"]
  fn audio_player_write_before_start_rejected_state_stopped() {
    let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
      .with_queue_capacity_frames(1024);
    let mut player = AudioPlayer::new(cfg).unwrap();
    // No start() — the player is in STATE_STOPPED with the terminated
    // latch clear, so the STATE_STOPPED branch (not the terminated
    // branch) is the one that fires.
    let err = player.write_samples(&[0.5_f32; 16]).unwrap_err();
    match err {
      Error::InvariantViolation(p) => {
        assert_eq!(p.context(), "AudioPlayer::write_samples");
        assert!(
          p.requirement().contains("STATE_STOPPED"),
          "pre-start write rejection must name STATE_STOPPED, got: {}",
          p.requirement()
        );
      }
      other => panic!("expected InvariantViolation (STATE_STOPPED), got {other:?}"),
    }
    let _ = player.stop();
  }

  /// `flush` on a paused player with a non-empty queue MUST return
  /// immediately with an [`Error::OutOfRange`] rather than spin-poll to
  /// the 30s timeout: the cpal callback only drains while
  /// `STATE_RUNNING`, so a paused queue can never reach depth 0. We
  /// reach the non-RUNNING-with-queue branch via `start(); pause();
  /// write_samples(...)` — the same fixture pattern the existing
  /// overflow real-device test uses to fill a non-draining queue.
  /// Oracle: the literal requirement phrase + a `depth>0` check derived
  /// from the known pushed length (independent of the flush call).
  #[cfg(target_os = "macos")]
  #[test]
  #[ignore = "requires real default audio output device"]
  fn audio_player_flush_on_paused_with_queued_samples_errs() {
    let cfg = PlaybackConfig::new(16_000, ChannelLayout::Mono, SampleFormat::F32)
      .with_queue_capacity_frames(1024);
    let mut player = AudioPlayer::new(cfg).unwrap();
    player.start().unwrap();
    player.pause().unwrap();
    // Paused: writes still buffer but the callback won't drain.
    player.write_samples(&[0.25_f32; 64]).unwrap();
    assert_eq!(
      player.buffer_depth(),
      64,
      "paused player must retain the 64 pushed samples (callback drains only when RUNNING)"
    );

    let err = player.flush().unwrap_err();
    match err {
      Error::OutOfRange(p) => {
        assert!(
          p.requirement().contains("STATE_RUNNING"),
          "flush-on-non-running requirement must name STATE_RUNNING, got: {}",
          p.requirement()
        );
      }
      other => panic!("expected OutOfRange (flush on paused with queue), got {other:?}"),
    }
    let _ = player.stop();
  }
}