devalang_wasm/engine/audio/playback/
live.rs1use std::fs::File;
2use std::io::BufReader;
3use std::path::PathBuf;
4use std::sync::mpsc;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use anyhow::{Context, Result, bail};
10use rodio::{Decoder, OutputStream, OutputStreamHandle, Sink};
11use tokio::task::spawn_blocking;
12use tokio::time::sleep;
13
14use crate::engine::audio::settings::{AudioBitDepth, AudioChannels, AudioFormat, ResampleQuality};
15use crate::tools::logger::Logger;
16
17#[derive(Clone)]
18pub struct LivePlaybackEngine {
19 inner: Arc<LivePlaybackInner>,
20}
21
22struct LivePlaybackInner {
23 logger: Arc<Logger>,
24 _stream: OutputStream,
25 handle: OutputStreamHandle,
26}
27
28impl LivePlaybackEngine {
29 pub fn new(logger: Arc<Logger>) -> Result<Self> {
30 let (stream, handle) =
31 OutputStream::try_default().context("failed to access default audio output stream")?;
32 Ok(Self {
33 inner: Arc::new(LivePlaybackInner {
34 logger,
35 _stream: stream,
36 handle,
37 }),
38 })
39 }
40
41 pub fn logger(&self) -> &Logger {
42 &self.inner.logger
43 }
44
45 fn handle(&self) -> &OutputStreamHandle {
46 &self.inner.handle
47 }
48
49 fn create_sink(&self, source: &LiveAudioSource) -> Result<Sink> {
50 create_sink_with_handle(self.handle(), source)
51 }
52
53 pub async fn play_once(&self, source: LiveAudioSource, volume: f32) -> Result<()> {
54 let volume_display = if volume == 0.0 {
55 " [MUTED]".to_string()
56 } else if volume < 1.0 {
57 format!(" [volume: {:.0}%]", volume * 100.0)
58 } else {
59 String::new()
60 };
61
62 self.logger().action(format!(
63 "Playing {} ({:?}, {}-bit, {} ch, {}, {} Hz, length {}){}",
64 source.path.display(),
65 source.format,
66 source.bit_depth.bits(),
67 source.channels.count(),
68 source.resample_quality,
69 source.sample_rate,
70 format_duration_short(source.length),
71 volume_display
72 ));
73 let sink = Arc::new(self.create_sink(&source)?);
74 sink.set_volume(volume);
75 let sink_clone = Arc::clone(&sink);
76 spawn_blocking(move || {
77 sink_clone.sleep_until_end();
78 })
79 .await
80 .context("audio playback worker panicked")?;
81 sink.stop();
82 self.logger().success("Playback completed.");
83 Ok(())
84 }
85
86 pub async fn start_live_session(
87 &self,
88 source: LiveAudioSource,
89 options: LivePlaybackOptions,
90 ) -> Result<LivePlaybackSession> {
91 let volume = options.volume();
92 let volume_display = if volume == 0.0 {
93 " [MUTED]".to_string()
94 } else if volume < 1.0 {
95 format!(" [volume: {:.0}%]", volume * 100.0)
96 } else {
97 String::new()
98 };
99
100 self.logger().action(format!(
101 "Starting live session from {} ({:?}, {}-bit, {} ch, {}, {} Hz, loop {}){}",
102 source.path.display(),
103 source.format,
104 source.bit_depth.bits(),
105 source.channels.count(),
106 source.resample_quality,
107 source.sample_rate,
108 format_duration_short(source.length),
109 volume_display
110 ));
111 let (tx, rx) = mpsc::channel();
112 let last_update = Arc::new(Mutex::new(Instant::now()));
113 let logger = Arc::clone(&self.inner.logger);
114 let handle_clone = self.handle().clone();
115 let options_clone = options.clone();
116 let source_clone = source.clone();
117 let last_update_for_thread = Arc::clone(&last_update);
118 let handle = thread::spawn(move || {
119 run_loop(
120 logger,
121 handle_clone,
122 source_clone,
123 options_clone,
124 rx,
125 last_update_for_thread,
126 )
127 });
128
129 Ok(LivePlaybackSession::new(
130 self.clone(),
131 tx,
132 handle,
133 last_update,
134 options,
135 ))
136 }
137}
138
139fn create_sink_with_handle(handle: &OutputStreamHandle, source: &LiveAudioSource) -> Result<Sink> {
140 let file = File::open(&source.path)
141 .with_context(|| format!("unable to open audio file: {}", source.path.display()))?;
142 let reader = BufReader::new(file);
143 let decoder = Decoder::new(reader)
144 .with_context(|| format!("failed to decode audio file: {}", source.path.display()))?;
145 let sink = Sink::try_new(handle).context("failed to create audio sink")?;
146 sink.append(decoder);
147 sink.set_volume(1.0);
148 Ok(sink)
149}
150
151fn run_loop(
152 logger: Arc<Logger>,
153 handle: OutputStreamHandle,
154 initial: LiveAudioSource,
155 options: LivePlaybackOptions,
156 rx: mpsc::Receiver<PlaybackCommand>,
157 last_update: Arc<Mutex<Instant>>,
158) -> Result<()> {
159 let mut current = initial;
160 let mut pending: Option<LiveAudioSource> = None;
161 let poll_interval = options.poll_interval().max(Duration::from_millis(25));
162
163 loop {
164 logger.watch(format!(
165 "Looping {} (~{})",
166 current.path.display(),
167 format_duration_short(current.length)
168 ));
169 if let Ok(mut guard) = last_update.lock() {
170 *guard = Instant::now();
171 }
172
173 let sink = match create_sink_with_handle(&handle, ¤t) {
174 Ok(sink) => {
175 sink.set_volume(options.volume());
176 Arc::new(sink)
177 }
178 Err(err) => {
179 logger.error(format!("Failed to prepare live buffer: {err}"));
180 match rx.recv() {
181 Ok(PlaybackCommand::Queue(next)) => {
182 pending = Some(next);
183 continue;
184 }
185 Ok(PlaybackCommand::Stop) | Err(_) => break,
186 }
187 }
188 };
189
190 let sink_clone = Arc::clone(&sink);
191 let wait_handle = thread::spawn(move || {
192 sink_clone.sleep_until_end();
193 });
194
195 let mut stop_requested = false;
196
197 loop {
198 if wait_handle.is_finished() {
199 let _ = wait_handle.join();
200 break;
201 }
202
203 match rx.recv_timeout(poll_interval) {
204 Ok(PlaybackCommand::Queue(next)) => {
205 pending = Some(next);
206 }
207 Ok(PlaybackCommand::Stop) => {
208 stop_requested = true;
209 sink.stop();
210 let _ = wait_handle.join();
211 break;
212 }
213 Err(mpsc::RecvTimeoutError::Timeout) => {
214 continue;
215 }
216 Err(mpsc::RecvTimeoutError::Disconnected) => {
217 stop_requested = true;
218 sink.stop();
219 let _ = wait_handle.join();
220 break;
221 }
222 }
223 }
224
225 if stop_requested {
226 break;
227 }
228
229 while let Ok(cmd) = rx.try_recv() {
230 match cmd {
231 PlaybackCommand::Queue(next) => pending = Some(next),
232 PlaybackCommand::Stop => {
233 stop_requested = true;
234 break;
235 }
236 }
237 }
238
239 if stop_requested {
240 break;
241 }
242
243 if let Some(next) = pending.take() {
244 logger.success(format!(
245 "Next build ready -> {} (~{}). Switching after current loop.",
246 next.path.display(),
247 format_duration_short(next.length)
248 ));
249 current = next;
250 } else {
251 logger.info("Replaying current loop (no pending build).");
252 }
253 }
254
255 logger.info("Live playback loop stopped.");
256 Ok(())
257}
258
259fn format_duration_short(duration: Duration) -> String {
260 if duration.as_secs() >= 1 {
261 format!("{:.2}s", duration.as_secs_f64())
262 } else {
263 let ms = duration.as_secs_f64() * 1000.0;
264 if ms >= 100.0 {
265 format!("{:.0}ms", ms)
266 } else {
267 format!("{:.1}ms", ms)
268 }
269 }
270}
271
272#[derive(Clone)]
273pub struct LiveAudioSource {
274 pub path: PathBuf,
275 pub format: AudioFormat,
276 pub bit_depth: AudioBitDepth,
277 pub channels: AudioChannels,
278 pub sample_rate: u32,
279 pub resample_quality: ResampleQuality,
280 pub length: Duration,
281}
282
283impl LiveAudioSource {
284 pub fn with_path(
285 path: PathBuf,
286 format: AudioFormat,
287 bit_depth: AudioBitDepth,
288 channels: AudioChannels,
289 sample_rate: u32,
290 resample_quality: ResampleQuality,
291 length: Duration,
292 ) -> Self {
293 Self {
294 path,
295 format,
296 bit_depth,
297 channels,
298 sample_rate,
299 resample_quality,
300 length,
301 }
302 }
303}
304
305#[derive(Clone)]
306pub struct LivePlaybackOptions {
307 poll_interval: Duration,
308 volume: f32,
309}
310
311impl LivePlaybackOptions {
312 pub fn new(poll_interval: Duration) -> Self {
313 Self {
314 poll_interval,
315 volume: 1.0,
316 }
317 }
318
319 pub fn with_volume(mut self, volume: f32) -> Self {
320 self.volume = volume.clamp(0.0, 1.0);
321 self
322 }
323
324 pub fn poll_interval(&self) -> Duration {
325 self.poll_interval
326 }
327
328 pub fn volume(&self) -> f32 {
329 self.volume
330 }
331}
332
333enum PlaybackCommand {
334 Queue(LiveAudioSource),
335 Stop,
336}
337
338pub struct LivePlaybackSession {
339 engine: LivePlaybackEngine,
340 commands: mpsc::Sender<PlaybackCommand>,
341 handle: Option<thread::JoinHandle<Result<()>>>,
342 last_update: Arc<Mutex<Instant>>,
343 options: LivePlaybackOptions,
344}
345
346impl LivePlaybackSession {
347 fn new(
348 engine: LivePlaybackEngine,
349 commands: mpsc::Sender<PlaybackCommand>,
350 handle: thread::JoinHandle<Result<()>>,
351 last_update: Arc<Mutex<Instant>>,
352 options: LivePlaybackOptions,
353 ) -> Self {
354 Self {
355 engine,
356 commands,
357 handle: Some(handle),
358 last_update,
359 options,
360 }
361 }
362
363 pub fn queue_source(&self, next: LiveAudioSource) -> Result<()> {
364 self.commands
365 .send(PlaybackCommand::Queue(next))
366 .context("failed to queue next live buffer")
367 }
368
369 pub async fn heartbeat(&self) {
370 sleep(self.options.poll_interval()).await;
371 }
372
373 pub async fn finish(mut self) -> Result<()> {
374 let _ = self.commands.send(PlaybackCommand::Stop);
375 if let Some(handle) = self.handle.take() {
376 match handle.join() {
377 Ok(result) => result?,
378 Err(err) => bail!("live playback thread panicked: {err:?}"),
379 }
380 }
381 self.engine
382 .logger()
383 .info("Live session finished; awaiting next command.");
384 Ok(())
385 }
386
387 #[allow(dead_code)]
388 pub fn last_update(&self) -> Instant {
389 *self.last_update.lock().expect("last_update poisoned")
390 }
391}