devalang_wasm/services/live/play/
mod.rs1#![cfg(feature = "cli")]
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use tokio::select;
8
9use crate::engine::audio::playback::live::{
10 LiveAudioSource, LivePlaybackEngine, LivePlaybackOptions,
11};
12use crate::services::build::pipeline::{BuildArtifacts, BuildRequest, ProjectBuilder};
13use crate::services::watch::file::{FileWatcher, WatchOptions};
14use crate::tools::logger::Logger;
15
16#[derive(Debug, Clone)]
17pub struct LivePlayRequest {
18 pub build: BuildRequest,
19 pub live_mode: bool,
20 pub crossfade_ms: u64,
21 pub volume: f32,
22}
23
24pub struct LivePlayService {
25 logger: Arc<Logger>,
26 playback: LivePlaybackEngine,
27 builder: ProjectBuilder,
28 bg_rx_guard: std::sync::Mutex<
30 Option<
31 std::sync::Arc<
32 std::sync::Mutex<
33 std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
34 >,
35 >,
36 >,
37 >,
38}
39
40impl LivePlayService {
41 pub fn new(logger: Arc<Logger>, builder: ProjectBuilder) -> Result<Self> {
42 let playback = LivePlaybackEngine::new(logger.clone())
43 .context("failed to initialise audio playback engine")?;
44 Ok(Self {
45 logger,
46 playback,
47 builder,
48 bg_rx_guard: std::sync::Mutex::new(None),
49 })
50 }
51
52 pub async fn run(&self, request: LivePlayRequest) -> Result<()> {
53 if request.live_mode {
54 self.run_live(request).await
55 } else {
56 self.run_offline(request).await
57 }
58 }
59
60 async fn run_offline(&self, request: LivePlayRequest) -> Result<()> {
61 let artifacts = self.builder.build(&request.build)?;
62 self.logger
63 .debug(format!("Build RMS: {:.4}", artifacts.rms));
64 self.logger.watch(format!(
65 "Audio regenerated in {} (total build {})",
66 format_duration(artifacts.audio_render_time),
67 format_duration(artifacts.total_duration)
68 ));
69 self.logger.info(format!(
70 "Loop length ≈ {}",
71 format_duration(artifacts.audio_length)
72 ));
73 self.logger.success(format!(
74 "Artifacts written: AST={}, audio={}",
75 artifacts.ast_path.display(),
76 artifacts.primary_audio_path.display()
77 ));
78
79 let source = LiveAudioSource::from_artifacts(&artifacts);
80 self.playback.play_once(source, request.volume).await?;
81 self.logger.info("Playback finished.");
82 Ok(())
83 }
84
85 async fn run_live(&self, request: LivePlayRequest) -> Result<()> {
86 let mut artifacts = match self.builder.build(&request.build) {
87 Ok(artifacts) => artifacts,
88 Err(err) => {
89 self.logger.error(format!("Initial build failed: {err}"));
90 return Err(err);
91 }
92 };
93 self.logger
94 .debug(format!("Build RMS: {:.4}", artifacts.rms));
95 self.logger.watch(format!(
96 "Audio regenerated in {} (total build {})",
97 format_duration(artifacts.audio_render_time),
98 format_duration(artifacts.total_duration)
99 ));
100 self.logger.info(format!(
101 "Loop length ≈ {}",
102 format_duration(artifacts.audio_length)
103 ));
104 let poll = Duration::from_millis(request.crossfade_ms.max(10));
105 let options = LivePlaybackOptions::new(poll).with_volume(request.volume);
106
107 let initial_source = LiveAudioSource::from_artifacts(&artifacts);
108
109 use crate::engine::audio::interpreter::driver::AudioInterpreter;
112 use std::sync::mpsc as std_mpsc;
113 use std::sync::{Arc as StdArc, Mutex as StdMutex};
114
115 let (bg_tx, bg_rx_std) =
119 std_mpsc::channel::<crate::engine::audio::events::AudioEventList>();
120 let bg_rx = StdArc::new(StdMutex::new(bg_rx_std));
121 self.logger.debug(format!(
123 "[LIVE] created bg channel; bg_rx strong_count={}",
124 StdArc::strong_count(&bg_rx)
125 ));
126
127 if let Ok(mut g) = self.bg_rx_guard.lock() {
129 *g = Some(bg_rx.clone());
130 }
131
132 let mut persistent_stop_tx: Option<std_mpsc::Sender<()>>;
133 let mut persistent_handle: Option<std::thread::JoinHandle<()>>;
134
135 let spawn_persistent = |stmts: Vec<crate::language::syntax::ast::Statement>,
139 sample_rate: u32,
140 bg_tx: std_mpsc::Sender<
141 crate::engine::audio::events::AudioEventList,
142 >,
143 bg_rx: StdArc<
144 StdMutex<std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>>,
145 >,
146 logger: Arc<Logger>| {
147 let (stop_tx, stop_rx) = std_mpsc::channel::<()>();
149
150 let handle = std::thread::spawn(move || {
151 use std::time::Duration as StdDuration;
152 let r = std::panic::catch_unwind(|| {
157 logger.debug(format!(
158 "[PERSISTENT] starting persistent interpreter (sample_rate={})",
159 sample_rate
160 ));
161 logger.debug(format!(
162 "[PERSISTENT] bg_rx strong_count at thread start={}",
163 StdArc::strong_count(&bg_rx)
164 ));
165 let mut interp = AudioInterpreter::new(sample_rate);
166 interp.suppress_print = true;
173 interp.suppress_beat_emit = true;
174
175 interp.background_event_tx = Some(bg_tx.clone());
178
179 let _ = interp.collect_events(&stmts);
181 logger.debug(format!(
182 "[PERSISTENT] collect_events() returned; background_event_tx present={}",
183 interp.background_event_tx.is_some()
184 ));
185
186 loop {
188 match stop_rx.try_recv() {
190 Ok(_) | Err(std_mpsc::TryRecvError::Disconnected) => break,
191 Err(std_mpsc::TryRecvError::Empty) => {}
192 }
193
194 logger.debug(format!(
196 "[PERSISTENT] bg_rx strong_count before recv_timeout={}",
197 StdArc::strong_count(&bg_rx)
198 ));
199 match bg_rx
200 .lock()
201 .expect("bg_rx lock")
202 .recv_timeout(StdDuration::from_millis(200))
203 {
204 Ok(events) => {
205 let cnt = events.events.len();
207 let mut times: Vec<f32> = events
208 .events
209 .iter()
210 .map(|e| match e {
211 crate::engine::audio::events::AudioEvent::Note {
212 start_time,
213 ..
214 } => *start_time,
215 crate::engine::audio::events::AudioEvent::Chord {
216 start_time,
217 ..
218 } => *start_time,
219 crate::engine::audio::events::AudioEvent::Sample {
220 start_time,
221 ..
222 } => *start_time,
223 crate::engine::audio::events::AudioEvent::Stop {
224 time,
225 ..
226 } => *time,
227 })
228 .collect();
229 for (t, _m) in &events.logs {
230 times.push(*t);
231 }
232 let earliest = times.into_iter().min_by(|a, b| {
233 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
234 });
235 if let Some(t) = earliest {
236 logger.debug(format!(
237 "[PERSISTENT] merging {} bg events, earliest_start_time={}",
238 cnt, t
239 ));
240 } else {
241 logger.debug(format!("[PERSISTENT] merging {} bg events", cnt));
242 }
243 interp.events.merge(events);
245 }
246 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
247 }
249 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
250 break;
252 }
253 }
254 }
255 logger.debug(
256 "[PERSISTENT] persistent interpreter exiting (dropping interp)".to_string(),
257 );
258 });
260
261 if let Err(err) = r {
264 logger.error(format!(
265 "[PERSISTENT] persistent thread panicked: {:?}",
266 err
267 ));
268 }
269 });
270
271 (stop_tx, handle)
273 };
274
275 {
277 let (tx, handle) = spawn_persistent(
278 artifacts.statements.clone(),
279 artifacts.sample_rate,
280 bg_tx.clone(),
281 bg_rx.clone(),
282 self.logger.clone(),
283 );
284 persistent_stop_tx = Some(tx);
285 persistent_handle = Some(handle);
286 }
287
288 let session = self
289 .playback
290 .start_live_session(initial_source, options, Some(bg_rx.clone()))
291 .await?;
292 let mut best_audio_render_time = artifacts.audio_render_time;
293
294 self.logger.watch(format!(
295 "Live mode watching {}",
296 request.build.entry_path.display()
297 ));
298 let watcher = FileWatcher::new(self.logger.clone());
299 let mut stream = watcher
300 .watch(request.build.entry_path.clone(), WatchOptions::default())
301 .await
302 .context("failed to initialise file watcher")?;
303
304 loop {
305 select! {
306 change = stream.next_change() => {
307 match change {
308 Some(path) => {
309 self.logger.watch(format!("Rebuilding after change at {}", path.display()));
310 match self.builder.build(&request.build) {
311 Ok(new_artifacts) => {
312 self.logger
313 .debug(format!("Build RMS: {:.4}", new_artifacts.rms));
314 self.logger.watch(format!(
315 "Audio regenerated in {} (total build {})",
316 format_duration(new_artifacts.audio_render_time),
317 format_duration(new_artifacts.total_duration)
318 ));
319 self.logger.info(format!(
320 "Loop length ≈ {}",
321 format_duration(new_artifacts.audio_length)
322 ));
323 if new_artifacts.audio_render_time < best_audio_render_time {
324 best_audio_render_time = new_artifacts.audio_render_time;
325 self.logger.success(format!(
326 "⏱️ New best audio regen time: {}",
327 format_duration(best_audio_render_time)
328 ));
329 } else {
330 self.logger.info(format!(
331 "Best audio regen time so far: {}",
332 format_duration(best_audio_render_time)
333 ));
334 }
335 if let Some(tx) = persistent_stop_tx.take() {
337 let _ = tx.send(());
338 }
339 if let Some(handle) = persistent_handle.take() {
340 let _ = handle.join();
341 }
342
343 artifacts = new_artifacts;
344 let (tx, handle) = spawn_persistent(artifacts.statements.clone(), artifacts.sample_rate, bg_tx.clone(), bg_rx.clone(), self.logger.clone());
345 persistent_stop_tx = Some(tx);
346 persistent_handle = Some(handle);
347
348 let next_source = LiveAudioSource::from_artifacts(&artifacts);
349 if let Err(err) = session.queue_source(next_source) {
350 self.logger.error(format!("Failed to queue live buffer: {err}"));
351 }
352 }
353 Err(err) => {
354 self.logger.error(format!("Build failed after change: {err}"));
355 }
356 }
357 }
358 None => {
359 self.logger.warn("Watch stream ended; shutting down live playback");
360 break;
361 }
362 }
363 }
364 _ = session.heartbeat() => {}
365 }
366 }
367
368 let res = session.finish().await;
370 if let Ok(mut g) = self.bg_rx_guard.lock() {
372 let _ = g.take();
373 }
374 res
375 }
376}
377
378impl LiveAudioSource {
379 fn from_artifacts(artifacts: &BuildArtifacts) -> Self {
380 LiveAudioSource::with_path(
381 artifacts.primary_audio_path.clone(),
382 artifacts.primary_format,
383 artifacts.bit_depth,
384 artifacts.channels,
385 artifacts.sample_rate,
386 artifacts.resample_quality,
387 artifacts.audio_length,
388 )
389 }
390}
391
392fn format_duration(duration: Duration) -> String {
393 if duration.as_secs() >= 1 {
394 format!("{:.2}s", duration.as_secs_f64())
395 } else {
396 let ms = duration.as_secs_f64() * 1000.0;
397 if ms >= 100.0 {
398 format!("{:.0}ms", ms)
399 } else {
400 format!("{:.1}ms", ms)
401 }
402 }
403}