devalang_wasm/engine/audio/playback/
live.rs

1use std::fs::File;
2use std::io::BufReader;
3use std::path::PathBuf;
4use std::sync::mpsc;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use anyhow::{Context, Result, bail};
10use rodio::{Decoder, OutputStream, OutputStreamHandle, Sink};
11use tokio::time::sleep;
12
13use crate::engine::audio::settings::{AudioBitDepth, AudioChannels, AudioFormat, ResampleQuality};
14use crate::tools::logger::Logger;
15
16#[derive(Clone)]
17pub struct LivePlaybackEngine {
18    inner: Arc<LivePlaybackInner>,
19}
20
21struct LivePlaybackInner {
22    logger: Arc<Logger>,
23    _stream: OutputStream,
24    handle: OutputStreamHandle,
25}
26
27impl LivePlaybackEngine {
28    pub fn new(logger: Arc<Logger>) -> Result<Self> {
29        let (stream, handle) =
30            OutputStream::try_default().context("failed to access default audio output stream")?;
31        Ok(Self {
32            inner: Arc::new(LivePlaybackInner {
33                logger,
34                _stream: stream,
35                handle,
36            }),
37        })
38    }
39
40    pub fn logger(&self) -> &Logger {
41        &self.inner.logger
42    }
43
44    fn handle(&self) -> &OutputStreamHandle {
45        &self.inner.handle
46    }
47
48    fn create_sink(&self, source: &LiveAudioSource) -> Result<Sink> {
49        create_sink_with_handle(self.handle(), source)
50    }
51
52    pub async fn play_once(&self, source: LiveAudioSource, volume: f32) -> Result<()> {
53        let volume_display = if volume == 0.0 {
54            " [MUTED]".to_string()
55        } else if volume < 1.0 {
56            format!(" [volume: {:.0}%]", volume * 100.0)
57        } else {
58            String::new()
59        };
60
61        self.logger().action(format!(
62            "Playing {} ({:?}, {}-bit, {} ch, {}, {} Hz, length {}){}",
63            source.path.display(),
64            source.format,
65            source.bit_depth.bits(),
66            source.channels.count(),
67            source.resample_quality,
68            source.sample_rate,
69            format_duration_short(source.length),
70            volume_display
71        ));
72        let sink = Arc::new(self.create_sink(&source)?);
73        sink.set_volume(volume);
74
75        // Attempt to load scheduled print events sidecar (module.printlog) for this audio
76        let mut scheduled_logs: Vec<(f32, String)> = Vec::new();
77        if let Some(stem) = source.path.file_stem().and_then(|s| s.to_str()) {
78            let log_path = source.path.with_file_name(format!("{}.printlog", stem));
79            if let Ok(contents) = std::fs::read_to_string(&log_path) {
80                for line in contents.lines() {
81                    if let Some((t, msg)) = line.split_once('\t') {
82                        if let Ok(secs) = t.parse::<f32>() {
83                            scheduled_logs.push((secs, msg.to_string()));
84                        }
85                    }
86                }
87                scheduled_logs
88                    .sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
89            }
90        }
91
92        let sink_clone = Arc::clone(&sink);
93        let wait_handle = std::thread::spawn(move || {
94            sink_clone.sleep_until_end();
95        });
96
97        // Track playback start time so we can schedule print events
98        let start_instant = std::time::Instant::now();
99        let mut next_log_idx: usize = 0;
100
101        // Poll loop: while playback is ongoing emit scheduled prints
102        let poll_interval = std::time::Duration::from_millis(25);
103        loop {
104            if wait_handle.is_finished() {
105                let _ = wait_handle.join();
106                break;
107            }
108
109            if !scheduled_logs.is_empty() {
110                let elapsed = start_instant.elapsed().as_secs_f32();
111                while next_log_idx < scheduled_logs.len()
112                    && scheduled_logs[next_log_idx].0 <= elapsed
113                {
114                    let msg = &scheduled_logs[next_log_idx].1;
115                    self.logger().print(msg.clone());
116                    next_log_idx += 1;
117                }
118            }
119
120            std::thread::sleep(poll_interval);
121        }
122
123        sink.stop();
124        self.logger().success("Playback completed.");
125        Ok(())
126    }
127
128    pub async fn start_live_session(
129        &self,
130        source: LiveAudioSource,
131        options: LivePlaybackOptions,
132        background_event_rx: Option<
133            std::sync::Arc<
134                std::sync::Mutex<
135                    std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
136                >,
137            >,
138        >,
139    ) -> Result<LivePlaybackSession> {
140        let volume = options.volume();
141        let volume_display = if volume == 0.0 {
142            " [MUTED]".to_string()
143        } else if volume < 1.0 {
144            format!(" [volume: {:.0}%]", volume * 100.0)
145        } else {
146            String::new()
147        };
148
149        self.logger().action(format!(
150            "Starting live session from {} ({:?}, {}-bit, {} ch, {}, {} Hz, loop {}){}",
151            source.path.display(),
152            source.format,
153            source.bit_depth.bits(),
154            source.channels.count(),
155            source.resample_quality,
156            source.sample_rate,
157            format_duration_short(source.length),
158            volume_display
159        ));
160        let (tx, rx) = mpsc::channel();
161        let last_update = Arc::new(Mutex::new(Instant::now()));
162        let logger = Arc::clone(&self.inner.logger);
163        let handle_clone = self.handle().clone();
164        let options_clone = options.clone();
165        let source_clone = source.clone();
166        let last_update_for_thread = Arc::clone(&last_update);
167        let handle = thread::spawn(move || {
168            run_loop(
169                logger,
170                handle_clone,
171                source_clone,
172                options_clone,
173                rx,
174                last_update_for_thread,
175            )
176        });
177
178        Ok(LivePlaybackSession::new(
179            self.clone(),
180            tx,
181            handle,
182            last_update,
183            options,
184            background_event_rx,
185        ))
186    }
187}
188
189fn create_sink_with_handle(handle: &OutputStreamHandle, source: &LiveAudioSource) -> Result<Sink> {
190    let file = File::open(&source.path)
191        .with_context(|| format!("unable to open audio file: {}", source.path.display()))?;
192    let reader = BufReader::new(file);
193    let decoder = Decoder::new(reader)
194        .with_context(|| format!("failed to decode audio file: {}", source.path.display()))?;
195    let sink = Sink::try_new(handle).context("failed to create audio sink")?;
196    sink.append(decoder);
197    sink.set_volume(1.0);
198    Ok(sink)
199}
200
201fn run_loop(
202    logger: Arc<Logger>,
203    handle: OutputStreamHandle,
204    initial: LiveAudioSource,
205    options: LivePlaybackOptions,
206    rx: mpsc::Receiver<PlaybackCommand>,
207    last_update: Arc<Mutex<Instant>>,
208) -> Result<()> {
209    let mut current = initial;
210    let mut pending: Option<LiveAudioSource> = None;
211    let poll_interval = options.poll_interval().max(Duration::from_millis(25));
212
213    loop {
214        logger.watch(format!(
215            "Looping {} (~{})",
216            current.path.display(),
217            format_duration_short(current.length)
218        ));
219        if let Ok(mut guard) = last_update.lock() {
220            *guard = Instant::now();
221        }
222
223        let sink = match create_sink_with_handle(&handle, &current) {
224            Ok(sink) => {
225                sink.set_volume(options.volume());
226                Arc::new(sink)
227            }
228            Err(err) => {
229                logger.error(format!("Failed to prepare live buffer: {err}"));
230                match rx.recv() {
231                    Ok(PlaybackCommand::Queue(next)) => {
232                        pending = Some(next);
233                        continue;
234                    }
235                    Ok(PlaybackCommand::Stop) | Err(_) => break,
236                }
237            }
238        };
239
240        // Attempt to load scheduled print events sidecar (module.printlog) for this audio
241        let mut scheduled_logs: Vec<(f32, String)> = Vec::new();
242        if let Some(stem) = current.path.file_stem().and_then(|s| s.to_str()) {
243            let log_path = current.path.with_file_name(format!("{}.printlog", stem));
244            if let Ok(contents) = std::fs::read_to_string(&log_path) {
245                for line in contents.lines() {
246                    if let Some((t, msg)) = line.split_once('\t') {
247                        if let Ok(secs) = t.parse::<f32>() {
248                            scheduled_logs.push((secs, msg.to_string()));
249                        }
250                    }
251                }
252                scheduled_logs
253                    .sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
254            }
255        }
256
257        let sink_clone = Arc::clone(&sink);
258        let wait_handle = thread::spawn(move || {
259            sink_clone.sleep_until_end();
260        });
261
262        // Track playback start time so we can schedule print events
263        let start_instant = Instant::now();
264        let mut next_log_idx: usize = 0;
265
266        let mut stop_requested = false;
267
268        loop {
269            if wait_handle.is_finished() {
270                let _ = wait_handle.join();
271                break;
272            }
273            // Emit scheduled prints at the correct playback time
274            if !scheduled_logs.is_empty() {
275                let elapsed = start_instant.elapsed().as_secs_f32();
276                while next_log_idx < scheduled_logs.len()
277                    && scheduled_logs[next_log_idx].0 <= elapsed
278                {
279                    let msg = &scheduled_logs[next_log_idx].1;
280                    // Emit using the engine logger so print messages use the [PRINT] format
281                    logger.print(msg.clone());
282                    next_log_idx += 1;
283                }
284            }
285
286            match rx.recv_timeout(poll_interval) {
287                Ok(PlaybackCommand::Queue(next)) => {
288                    pending = Some(next);
289                }
290                Ok(PlaybackCommand::Stop) => {
291                    stop_requested = true;
292                    sink.stop();
293                    let _ = wait_handle.join();
294                    break;
295                }
296                Err(mpsc::RecvTimeoutError::Timeout) => {
297                    continue;
298                }
299                Err(mpsc::RecvTimeoutError::Disconnected) => {
300                    stop_requested = true;
301                    sink.stop();
302                    let _ = wait_handle.join();
303                    break;
304                }
305            }
306        }
307
308        if stop_requested {
309            break;
310        }
311
312        while let Ok(cmd) = rx.try_recv() {
313            match cmd {
314                PlaybackCommand::Queue(next) => pending = Some(next),
315                PlaybackCommand::Stop => {
316                    stop_requested = true;
317                    break;
318                }
319            }
320        }
321
322        if stop_requested {
323            break;
324        }
325
326        if let Some(next) = pending.take() {
327            logger.success(format!(
328                "Next build ready -> {} (~{}). Switching after current loop.",
329                next.path.display(),
330                format_duration_short(next.length)
331            ));
332            current = next;
333        } else {
334            logger.info("Replaying current loop (no pending build).");
335        }
336    }
337
338    logger.info("Live playback loop stopped.");
339    Ok(())
340}
341
342fn format_duration_short(duration: Duration) -> String {
343    if duration.as_secs() >= 1 {
344        format!("{:.2}s", duration.as_secs_f64())
345    } else {
346        let ms = duration.as_secs_f64() * 1000.0;
347        if ms >= 100.0 {
348            format!("{:.0}ms", ms)
349        } else {
350            format!("{:.1}ms", ms)
351        }
352    }
353}
354
355#[derive(Clone)]
356pub struct LiveAudioSource {
357    pub path: PathBuf,
358    pub format: AudioFormat,
359    pub bit_depth: AudioBitDepth,
360    pub channels: AudioChannels,
361    pub sample_rate: u32,
362    pub resample_quality: ResampleQuality,
363    pub length: Duration,
364}
365
366impl LiveAudioSource {
367    pub fn with_path(
368        path: PathBuf,
369        format: AudioFormat,
370        bit_depth: AudioBitDepth,
371        channels: AudioChannels,
372        sample_rate: u32,
373        resample_quality: ResampleQuality,
374        length: Duration,
375    ) -> Self {
376        Self {
377            path,
378            format,
379            bit_depth,
380            channels,
381            sample_rate,
382            resample_quality,
383            length,
384        }
385    }
386}
387
388#[derive(Clone)]
389pub struct LivePlaybackOptions {
390    poll_interval: Duration,
391    volume: f32,
392}
393
394impl LivePlaybackOptions {
395    pub fn new(poll_interval: Duration) -> Self {
396        Self {
397            poll_interval,
398            volume: 1.0,
399        }
400    }
401
402    pub fn with_volume(mut self, volume: f32) -> Self {
403        self.volume = volume.clamp(0.0, 1.0);
404        self
405    }
406
407    pub fn poll_interval(&self) -> Duration {
408        self.poll_interval
409    }
410
411    pub fn volume(&self) -> f32 {
412        self.volume
413    }
414}
415
416enum PlaybackCommand {
417    Queue(LiveAudioSource),
418    Stop,
419}
420
421pub struct LivePlaybackSession {
422    engine: LivePlaybackEngine,
423    commands: mpsc::Sender<PlaybackCommand>,
424    handle: Option<thread::JoinHandle<Result<()>>>,
425    last_update: Arc<Mutex<Instant>>,
426    options: LivePlaybackOptions,
427    background_event_rx: Option<
428        std::sync::Arc<
429            std::sync::Mutex<
430                std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
431            >,
432        >,
433    >,
434}
435
436impl LivePlaybackSession {
437    fn new(
438        engine: LivePlaybackEngine,
439        commands: mpsc::Sender<PlaybackCommand>,
440        handle: thread::JoinHandle<Result<()>>,
441        last_update: Arc<Mutex<Instant>>,
442        options: LivePlaybackOptions,
443        background_event_rx: Option<
444            std::sync::Arc<
445                std::sync::Mutex<
446                    std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
447                >,
448            >,
449        >,
450    ) -> Self {
451        Self {
452            engine,
453            commands,
454            handle: Some(handle),
455            last_update,
456            options,
457            background_event_rx,
458        }
459    }
460
461    pub fn queue_source(&self, next: LiveAudioSource) -> Result<()> {
462        self.commands
463            .send(PlaybackCommand::Queue(next))
464            .context("failed to queue next live buffer")
465    }
466
467    pub async fn heartbeat(&self) {
468        sleep(self.options.poll_interval()).await;
469    }
470
471    pub async fn finish(mut self) -> Result<()> {
472        let _ = self.commands.send(PlaybackCommand::Stop);
473        if let Some(handle) = self.handle.take() {
474            match handle.join() {
475                Ok(result) => result?,
476                Err(err) => bail!("live playback thread panicked: {err:?}"),
477            }
478        }
479        self.engine
480            .logger()
481            .info("Live session finished; awaiting next command.");
482        Ok(())
483    }
484
485    #[allow(dead_code)]
486    pub fn last_update(&self) -> Instant {
487        *self.last_update.lock().expect("last_update poisoned")
488    }
489}