devalang_wasm/engine/audio/playback/
live.rs1use std::fs::File;
2use std::io::BufReader;
3use std::path::PathBuf;
4use std::sync::mpsc;
5use std::sync::{Arc, Mutex};
6use std::thread;
7use std::time::{Duration, Instant};
8
9use anyhow::{Context, Result, bail};
10use rodio::{Decoder, OutputStream, OutputStreamHandle, Sink};
11use tokio::time::sleep;
12
13use crate::engine::audio::settings::{AudioBitDepth, AudioChannels, AudioFormat, ResampleQuality};
14use crate::tools::logger::Logger;
15
16#[derive(Clone)]
17pub struct LivePlaybackEngine {
18 inner: Arc<LivePlaybackInner>,
19}
20
21struct LivePlaybackInner {
22 logger: Arc<Logger>,
23 _stream: OutputStream,
24 handle: OutputStreamHandle,
25}
26
27impl LivePlaybackEngine {
28 pub fn new(logger: Arc<Logger>) -> Result<Self> {
29 let (stream, handle) =
30 OutputStream::try_default().context("failed to access default audio output stream")?;
31 Ok(Self {
32 inner: Arc::new(LivePlaybackInner {
33 logger,
34 _stream: stream,
35 handle,
36 }),
37 })
38 }
39
40 pub fn logger(&self) -> &Logger {
41 &self.inner.logger
42 }
43
44 fn handle(&self) -> &OutputStreamHandle {
45 &self.inner.handle
46 }
47
48 fn create_sink(&self, source: &LiveAudioSource) -> Result<Sink> {
49 create_sink_with_handle(self.handle(), source)
50 }
51
52 pub async fn play_once(&self, source: LiveAudioSource, volume: f32) -> Result<()> {
53 let volume_display = if volume == 0.0 {
54 " [MUTED]".to_string()
55 } else if volume < 1.0 {
56 format!(" [volume: {:.0}%]", volume * 100.0)
57 } else {
58 String::new()
59 };
60
61 self.logger().action(format!(
62 "Playing {} ({:?}, {}-bit, {} ch, {}, {} Hz, length {}){}",
63 source.path.display(),
64 source.format,
65 source.bit_depth.bits(),
66 source.channels.count(),
67 source.resample_quality,
68 source.sample_rate,
69 format_duration_short(source.length),
70 volume_display
71 ));
72 let sink = Arc::new(self.create_sink(&source)?);
73 sink.set_volume(volume);
74
75 let mut scheduled_logs: Vec<(f32, String)> = Vec::new();
77 if let Some(stem) = source.path.file_stem().and_then(|s| s.to_str()) {
78 let log_path = source.path.with_file_name(format!("{}.printlog", stem));
79 if let Ok(contents) = std::fs::read_to_string(&log_path) {
80 for line in contents.lines() {
81 if let Some((t, msg)) = line.split_once('\t') {
82 if let Ok(secs) = t.parse::<f32>() {
83 scheduled_logs.push((secs, msg.to_string()));
84 }
85 }
86 }
87 scheduled_logs
88 .sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
89 }
90 }
91
92 let sink_clone = Arc::clone(&sink);
93 let wait_handle = std::thread::spawn(move || {
94 sink_clone.sleep_until_end();
95 });
96
97 let start_instant = std::time::Instant::now();
99 let mut next_log_idx: usize = 0;
100
101 let poll_interval = std::time::Duration::from_millis(25);
103 loop {
104 if wait_handle.is_finished() {
105 let _ = wait_handle.join();
106 break;
107 }
108
109 if !scheduled_logs.is_empty() {
110 let elapsed = start_instant.elapsed().as_secs_f32();
111 while next_log_idx < scheduled_logs.len()
112 && scheduled_logs[next_log_idx].0 <= elapsed
113 {
114 let msg = &scheduled_logs[next_log_idx].1;
115 self.logger().print(msg.clone());
116 next_log_idx += 1;
117 }
118 }
119
120 std::thread::sleep(poll_interval);
121 }
122
123 sink.stop();
124 self.logger().success("Playback completed.");
125 Ok(())
126 }
127
128 pub async fn start_live_session(
129 &self,
130 source: LiveAudioSource,
131 options: LivePlaybackOptions,
132 background_event_rx: Option<
133 std::sync::Arc<
134 std::sync::Mutex<
135 std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
136 >,
137 >,
138 >,
139 ) -> Result<LivePlaybackSession> {
140 let volume = options.volume();
141 let volume_display = if volume == 0.0 {
142 " [MUTED]".to_string()
143 } else if volume < 1.0 {
144 format!(" [volume: {:.0}%]", volume * 100.0)
145 } else {
146 String::new()
147 };
148
149 self.logger().action(format!(
150 "Starting live session from {} ({:?}, {}-bit, {} ch, {}, {} Hz, loop {}){}",
151 source.path.display(),
152 source.format,
153 source.bit_depth.bits(),
154 source.channels.count(),
155 source.resample_quality,
156 source.sample_rate,
157 format_duration_short(source.length),
158 volume_display
159 ));
160 let (tx, rx) = mpsc::channel();
161 let last_update = Arc::new(Mutex::new(Instant::now()));
162 let logger = Arc::clone(&self.inner.logger);
163 let handle_clone = self.handle().clone();
164 let options_clone = options.clone();
165 let source_clone = source.clone();
166 let last_update_for_thread = Arc::clone(&last_update);
167 let handle = thread::spawn(move || {
168 run_loop(
169 logger,
170 handle_clone,
171 source_clone,
172 options_clone,
173 rx,
174 last_update_for_thread,
175 )
176 });
177
178 Ok(LivePlaybackSession::new(
179 self.clone(),
180 tx,
181 handle,
182 last_update,
183 options,
184 background_event_rx,
185 ))
186 }
187}
188
189fn create_sink_with_handle(handle: &OutputStreamHandle, source: &LiveAudioSource) -> Result<Sink> {
190 let file = File::open(&source.path)
191 .with_context(|| format!("unable to open audio file: {}", source.path.display()))?;
192 let reader = BufReader::new(file);
193 let decoder = Decoder::new(reader)
194 .with_context(|| format!("failed to decode audio file: {}", source.path.display()))?;
195 let sink = Sink::try_new(handle).context("failed to create audio sink")?;
196 sink.append(decoder);
197 sink.set_volume(1.0);
198 Ok(sink)
199}
200
201fn run_loop(
202 logger: Arc<Logger>,
203 handle: OutputStreamHandle,
204 initial: LiveAudioSource,
205 options: LivePlaybackOptions,
206 rx: mpsc::Receiver<PlaybackCommand>,
207 last_update: Arc<Mutex<Instant>>,
208) -> Result<()> {
209 let mut current = initial;
210 let mut pending: Option<LiveAudioSource> = None;
211 let poll_interval = options.poll_interval().max(Duration::from_millis(25));
212
213 loop {
214 logger.watch(format!(
215 "Looping {} (~{})",
216 current.path.display(),
217 format_duration_short(current.length)
218 ));
219 if let Ok(mut guard) = last_update.lock() {
220 *guard = Instant::now();
221 }
222
223 let sink = match create_sink_with_handle(&handle, ¤t) {
224 Ok(sink) => {
225 sink.set_volume(options.volume());
226 Arc::new(sink)
227 }
228 Err(err) => {
229 logger.error(format!("Failed to prepare live buffer: {err}"));
230 match rx.recv() {
231 Ok(PlaybackCommand::Queue(next)) => {
232 pending = Some(next);
233 continue;
234 }
235 Ok(PlaybackCommand::Stop) | Err(_) => break,
236 }
237 }
238 };
239
240 let mut scheduled_logs: Vec<(f32, String)> = Vec::new();
242 if let Some(stem) = current.path.file_stem().and_then(|s| s.to_str()) {
243 let log_path = current.path.with_file_name(format!("{}.printlog", stem));
244 if let Ok(contents) = std::fs::read_to_string(&log_path) {
245 for line in contents.lines() {
246 if let Some((t, msg)) = line.split_once('\t') {
247 if let Ok(secs) = t.parse::<f32>() {
248 scheduled_logs.push((secs, msg.to_string()));
249 }
250 }
251 }
252 scheduled_logs
253 .sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
254 }
255 }
256
257 let sink_clone = Arc::clone(&sink);
258 let wait_handle = thread::spawn(move || {
259 sink_clone.sleep_until_end();
260 });
261
262 let start_instant = Instant::now();
264 let mut next_log_idx: usize = 0;
265
266 let mut stop_requested = false;
267
268 loop {
269 if wait_handle.is_finished() {
270 let _ = wait_handle.join();
271 break;
272 }
273 if !scheduled_logs.is_empty() {
275 let elapsed = start_instant.elapsed().as_secs_f32();
276 while next_log_idx < scheduled_logs.len()
277 && scheduled_logs[next_log_idx].0 <= elapsed
278 {
279 let msg = &scheduled_logs[next_log_idx].1;
280 logger.print(msg.clone());
282 next_log_idx += 1;
283 }
284 }
285
286 match rx.recv_timeout(poll_interval) {
287 Ok(PlaybackCommand::Queue(next)) => {
288 pending = Some(next);
289 }
290 Ok(PlaybackCommand::Stop) => {
291 stop_requested = true;
292 sink.stop();
293 let _ = wait_handle.join();
294 break;
295 }
296 Err(mpsc::RecvTimeoutError::Timeout) => {
297 continue;
298 }
299 Err(mpsc::RecvTimeoutError::Disconnected) => {
300 stop_requested = true;
301 sink.stop();
302 let _ = wait_handle.join();
303 break;
304 }
305 }
306 }
307
308 if stop_requested {
309 break;
310 }
311
312 while let Ok(cmd) = rx.try_recv() {
313 match cmd {
314 PlaybackCommand::Queue(next) => pending = Some(next),
315 PlaybackCommand::Stop => {
316 stop_requested = true;
317 break;
318 }
319 }
320 }
321
322 if stop_requested {
323 break;
324 }
325
326 if let Some(next) = pending.take() {
327 logger.success(format!(
328 "Next build ready -> {} (~{}). Switching after current loop.",
329 next.path.display(),
330 format_duration_short(next.length)
331 ));
332 current = next;
333 } else {
334 logger.info("Replaying current loop (no pending build).");
335 }
336 }
337
338 logger.info("Live playback loop stopped.");
339 Ok(())
340}
341
342fn format_duration_short(duration: Duration) -> String {
343 if duration.as_secs() >= 1 {
344 format!("{:.2}s", duration.as_secs_f64())
345 } else {
346 let ms = duration.as_secs_f64() * 1000.0;
347 if ms >= 100.0 {
348 format!("{:.0}ms", ms)
349 } else {
350 format!("{:.1}ms", ms)
351 }
352 }
353}
354
355#[derive(Clone)]
356pub struct LiveAudioSource {
357 pub path: PathBuf,
358 pub format: AudioFormat,
359 pub bit_depth: AudioBitDepth,
360 pub channels: AudioChannels,
361 pub sample_rate: u32,
362 pub resample_quality: ResampleQuality,
363 pub length: Duration,
364}
365
366impl LiveAudioSource {
367 pub fn with_path(
368 path: PathBuf,
369 format: AudioFormat,
370 bit_depth: AudioBitDepth,
371 channels: AudioChannels,
372 sample_rate: u32,
373 resample_quality: ResampleQuality,
374 length: Duration,
375 ) -> Self {
376 Self {
377 path,
378 format,
379 bit_depth,
380 channels,
381 sample_rate,
382 resample_quality,
383 length,
384 }
385 }
386}
387
388#[derive(Clone)]
389pub struct LivePlaybackOptions {
390 poll_interval: Duration,
391 volume: f32,
392}
393
394impl LivePlaybackOptions {
395 pub fn new(poll_interval: Duration) -> Self {
396 Self {
397 poll_interval,
398 volume: 1.0,
399 }
400 }
401
402 pub fn with_volume(mut self, volume: f32) -> Self {
403 self.volume = volume.clamp(0.0, 1.0);
404 self
405 }
406
407 pub fn poll_interval(&self) -> Duration {
408 self.poll_interval
409 }
410
411 pub fn volume(&self) -> f32 {
412 self.volume
413 }
414}
415
416enum PlaybackCommand {
417 Queue(LiveAudioSource),
418 Stop,
419}
420
421pub struct LivePlaybackSession {
422 engine: LivePlaybackEngine,
423 commands: mpsc::Sender<PlaybackCommand>,
424 handle: Option<thread::JoinHandle<Result<()>>>,
425 last_update: Arc<Mutex<Instant>>,
426 options: LivePlaybackOptions,
427 background_event_rx: Option<
428 std::sync::Arc<
429 std::sync::Mutex<
430 std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
431 >,
432 >,
433 >,
434}
435
436impl LivePlaybackSession {
437 fn new(
438 engine: LivePlaybackEngine,
439 commands: mpsc::Sender<PlaybackCommand>,
440 handle: thread::JoinHandle<Result<()>>,
441 last_update: Arc<Mutex<Instant>>,
442 options: LivePlaybackOptions,
443 background_event_rx: Option<
444 std::sync::Arc<
445 std::sync::Mutex<
446 std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
447 >,
448 >,
449 >,
450 ) -> Self {
451 Self {
452 engine,
453 commands,
454 handle: Some(handle),
455 last_update,
456 options,
457 background_event_rx,
458 }
459 }
460
461 pub fn queue_source(&self, next: LiveAudioSource) -> Result<()> {
462 self.commands
463 .send(PlaybackCommand::Queue(next))
464 .context("failed to queue next live buffer")
465 }
466
467 pub async fn heartbeat(&self) {
468 sleep(self.options.poll_interval()).await;
469 }
470
471 pub async fn finish(mut self) -> Result<()> {
472 let _ = self.commands.send(PlaybackCommand::Stop);
473 if let Some(handle) = self.handle.take() {
474 match handle.join() {
475 Ok(result) => result?,
476 Err(err) => bail!("live playback thread panicked: {err:?}"),
477 }
478 }
479 self.engine
480 .logger()
481 .info("Live session finished; awaiting next command.");
482 Ok(())
483 }
484
485 #[allow(dead_code)]
486 pub fn last_update(&self) -> Instant {
487 *self.last_update.lock().expect("last_update poisoned")
488 }
489}