devalang_wasm/engine/audio/playback/
live.rs

1use std::fs::File;
2use std::io::BufReader;
3use std::path::PathBuf;
4use std::sync::mpsc;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use anyhow::{Context, Result, bail};
10use rodio::{Decoder, OutputStream, OutputStreamHandle, Sink};
11use tokio::task::spawn_blocking;
12use tokio::time::sleep;
13
14use crate::engine::audio::settings::{AudioBitDepth, AudioChannels, AudioFormat, ResampleQuality};
15use crate::tools::logger::Logger;
16
17#[derive(Clone)]
18pub struct LivePlaybackEngine {
19    inner: Arc<LivePlaybackInner>,
20}
21
22struct LivePlaybackInner {
23    logger: Arc<Logger>,
24    _stream: OutputStream,
25    handle: OutputStreamHandle,
26}
27
28impl LivePlaybackEngine {
29    pub fn new(logger: Arc<Logger>) -> Result<Self> {
30        let (stream, handle) =
31            OutputStream::try_default().context("failed to access default audio output stream")?;
32        Ok(Self {
33            inner: Arc::new(LivePlaybackInner {
34                logger,
35                _stream: stream,
36                handle,
37            }),
38        })
39    }
40
41    pub fn logger(&self) -> &Logger {
42        &self.inner.logger
43    }
44
45    fn handle(&self) -> &OutputStreamHandle {
46        &self.inner.handle
47    }
48
49    fn create_sink(&self, source: &LiveAudioSource) -> Result<Sink> {
50        create_sink_with_handle(self.handle(), source)
51    }
52
53    pub async fn play_once(&self, source: LiveAudioSource, volume: f32) -> Result<()> {
54        let volume_display = if volume == 0.0 {
55            " [MUTED]".to_string()
56        } else if volume < 1.0 {
57            format!(" [volume: {:.0}%]", volume * 100.0)
58        } else {
59            String::new()
60        };
61
62        self.logger().action(format!(
63            "Playing {} ({:?}, {}-bit, {} ch, {}, {} Hz, length {}){}",
64            source.path.display(),
65            source.format,
66            source.bit_depth.bits(),
67            source.channels.count(),
68            source.resample_quality,
69            source.sample_rate,
70            format_duration_short(source.length),
71            volume_display
72        ));
73        let sink = Arc::new(self.create_sink(&source)?);
74        sink.set_volume(volume);
75        let sink_clone = Arc::clone(&sink);
76        spawn_blocking(move || {
77            sink_clone.sleep_until_end();
78        })
79        .await
80        .context("audio playback worker panicked")?;
81        sink.stop();
82        self.logger().success("Playback completed.");
83        Ok(())
84    }
85
86    pub async fn start_live_session(
87        &self,
88        source: LiveAudioSource,
89        options: LivePlaybackOptions,
90    ) -> Result<LivePlaybackSession> {
91        let volume = options.volume();
92        let volume_display = if volume == 0.0 {
93            " [MUTED]".to_string()
94        } else if volume < 1.0 {
95            format!(" [volume: {:.0}%]", volume * 100.0)
96        } else {
97            String::new()
98        };
99
100        self.logger().action(format!(
101            "Starting live session from {} ({:?}, {}-bit, {} ch, {}, {} Hz, loop {}){}",
102            source.path.display(),
103            source.format,
104            source.bit_depth.bits(),
105            source.channels.count(),
106            source.resample_quality,
107            source.sample_rate,
108            format_duration_short(source.length),
109            volume_display
110        ));
111        let (tx, rx) = mpsc::channel();
112        let last_update = Arc::new(Mutex::new(Instant::now()));
113        let logger = Arc::clone(&self.inner.logger);
114        let handle_clone = self.handle().clone();
115        let options_clone = options.clone();
116        let source_clone = source.clone();
117        let last_update_for_thread = Arc::clone(&last_update);
118        let handle = thread::spawn(move || {
119            run_loop(
120                logger,
121                handle_clone,
122                source_clone,
123                options_clone,
124                rx,
125                last_update_for_thread,
126            )
127        });
128
129        Ok(LivePlaybackSession::new(
130            self.clone(),
131            tx,
132            handle,
133            last_update,
134            options,
135        ))
136    }
137}
138
139fn create_sink_with_handle(handle: &OutputStreamHandle, source: &LiveAudioSource) -> Result<Sink> {
140    let file = File::open(&source.path)
141        .with_context(|| format!("unable to open audio file: {}", source.path.display()))?;
142    let reader = BufReader::new(file);
143    let decoder = Decoder::new(reader)
144        .with_context(|| format!("failed to decode audio file: {}", source.path.display()))?;
145    let sink = Sink::try_new(handle).context("failed to create audio sink")?;
146    sink.append(decoder);
147    sink.set_volume(1.0);
148    Ok(sink)
149}
150
151fn run_loop(
152    logger: Arc<Logger>,
153    handle: OutputStreamHandle,
154    initial: LiveAudioSource,
155    options: LivePlaybackOptions,
156    rx: mpsc::Receiver<PlaybackCommand>,
157    last_update: Arc<Mutex<Instant>>,
158) -> Result<()> {
159    let mut current = initial;
160    let mut pending: Option<LiveAudioSource> = None;
161    let poll_interval = options.poll_interval().max(Duration::from_millis(25));
162
163    loop {
164        logger.watch(format!(
165            "Looping {} (~{})",
166            current.path.display(),
167            format_duration_short(current.length)
168        ));
169        if let Ok(mut guard) = last_update.lock() {
170            *guard = Instant::now();
171        }
172
173        let sink = match create_sink_with_handle(&handle, &current) {
174            Ok(sink) => {
175                sink.set_volume(options.volume());
176                Arc::new(sink)
177            }
178            Err(err) => {
179                logger.error(format!("Failed to prepare live buffer: {err}"));
180                match rx.recv() {
181                    Ok(PlaybackCommand::Queue(next)) => {
182                        pending = Some(next);
183                        continue;
184                    }
185                    Ok(PlaybackCommand::Stop) | Err(_) => break,
186                }
187            }
188        };
189
190        let sink_clone = Arc::clone(&sink);
191        let wait_handle = thread::spawn(move || {
192            sink_clone.sleep_until_end();
193        });
194
195        let mut stop_requested = false;
196
197        loop {
198            if wait_handle.is_finished() {
199                let _ = wait_handle.join();
200                break;
201            }
202
203            match rx.recv_timeout(poll_interval) {
204                Ok(PlaybackCommand::Queue(next)) => {
205                    pending = Some(next);
206                }
207                Ok(PlaybackCommand::Stop) => {
208                    stop_requested = true;
209                    sink.stop();
210                    let _ = wait_handle.join();
211                    break;
212                }
213                Err(mpsc::RecvTimeoutError::Timeout) => {
214                    continue;
215                }
216                Err(mpsc::RecvTimeoutError::Disconnected) => {
217                    stop_requested = true;
218                    sink.stop();
219                    let _ = wait_handle.join();
220                    break;
221                }
222            }
223        }
224
225        if stop_requested {
226            break;
227        }
228
229        while let Ok(cmd) = rx.try_recv() {
230            match cmd {
231                PlaybackCommand::Queue(next) => pending = Some(next),
232                PlaybackCommand::Stop => {
233                    stop_requested = true;
234                    break;
235                }
236            }
237        }
238
239        if stop_requested {
240            break;
241        }
242
243        if let Some(next) = pending.take() {
244            logger.success(format!(
245                "Next build ready -> {} (~{}). Switching after current loop.",
246                next.path.display(),
247                format_duration_short(next.length)
248            ));
249            current = next;
250        } else {
251            logger.info("Replaying current loop (no pending build).");
252        }
253    }
254
255    logger.info("Live playback loop stopped.");
256    Ok(())
257}
258
259fn format_duration_short(duration: Duration) -> String {
260    if duration.as_secs() >= 1 {
261        format!("{:.2}s", duration.as_secs_f64())
262    } else {
263        let ms = duration.as_secs_f64() * 1000.0;
264        if ms >= 100.0 {
265            format!("{:.0}ms", ms)
266        } else {
267            format!("{:.1}ms", ms)
268        }
269    }
270}
271
272#[derive(Clone)]
273pub struct LiveAudioSource {
274    pub path: PathBuf,
275    pub format: AudioFormat,
276    pub bit_depth: AudioBitDepth,
277    pub channels: AudioChannels,
278    pub sample_rate: u32,
279    pub resample_quality: ResampleQuality,
280    pub length: Duration,
281}
282
283impl LiveAudioSource {
284    pub fn with_path(
285        path: PathBuf,
286        format: AudioFormat,
287        bit_depth: AudioBitDepth,
288        channels: AudioChannels,
289        sample_rate: u32,
290        resample_quality: ResampleQuality,
291        length: Duration,
292    ) -> Self {
293        Self {
294            path,
295            format,
296            bit_depth,
297            channels,
298            sample_rate,
299            resample_quality,
300            length,
301        }
302    }
303}
304
305#[derive(Clone)]
306pub struct LivePlaybackOptions {
307    poll_interval: Duration,
308    volume: f32,
309}
310
311impl LivePlaybackOptions {
312    pub fn new(poll_interval: Duration) -> Self {
313        Self {
314            poll_interval,
315            volume: 1.0,
316        }
317    }
318
319    pub fn with_volume(mut self, volume: f32) -> Self {
320        self.volume = volume.clamp(0.0, 1.0);
321        self
322    }
323
324    pub fn poll_interval(&self) -> Duration {
325        self.poll_interval
326    }
327
328    pub fn volume(&self) -> f32 {
329        self.volume
330    }
331}
332
333enum PlaybackCommand {
334    Queue(LiveAudioSource),
335    Stop,
336}
337
338pub struct LivePlaybackSession {
339    engine: LivePlaybackEngine,
340    commands: mpsc::Sender<PlaybackCommand>,
341    handle: Option<thread::JoinHandle<Result<()>>>,
342    last_update: Arc<Mutex<Instant>>,
343    options: LivePlaybackOptions,
344}
345
346impl LivePlaybackSession {
347    fn new(
348        engine: LivePlaybackEngine,
349        commands: mpsc::Sender<PlaybackCommand>,
350        handle: thread::JoinHandle<Result<()>>,
351        last_update: Arc<Mutex<Instant>>,
352        options: LivePlaybackOptions,
353    ) -> Self {
354        Self {
355            engine,
356            commands,
357            handle: Some(handle),
358            last_update,
359            options,
360        }
361    }
362
363    pub fn queue_source(&self, next: LiveAudioSource) -> Result<()> {
364        self.commands
365            .send(PlaybackCommand::Queue(next))
366            .context("failed to queue next live buffer")
367    }
368
369    pub async fn heartbeat(&self) {
370        sleep(self.options.poll_interval()).await;
371    }
372
373    pub async fn finish(mut self) -> Result<()> {
374        let _ = self.commands.send(PlaybackCommand::Stop);
375        if let Some(handle) = self.handle.take() {
376            match handle.join() {
377                Ok(result) => result?,
378                Err(err) => bail!("live playback thread panicked: {err:?}"),
379            }
380        }
381        self.engine
382            .logger()
383            .info("Live session finished; awaiting next command.");
384        Ok(())
385    }
386
387    #[allow(dead_code)]
388    pub fn last_update(&self) -> Instant {
389        *self.last_update.lock().expect("last_update poisoned")
390    }
391}