devalang_wasm/services/live/play/
mod.rs1#![cfg(feature = "cli")]
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use tokio::select;
8
9use crate::engine::audio::playback::live::{
10 LiveAudioSource, LivePlaybackEngine, LivePlaybackOptions,
11};
12use crate::services::build::pipeline::{BuildArtifacts, BuildRequest, ProjectBuilder};
13use crate::services::watch::file::{FileWatcher, WatchOptions};
14use crate::tools::logger::Logger;
15
16#[derive(Debug, Clone)]
17pub struct LivePlayRequest {
18 pub build: BuildRequest,
19 pub live_mode: bool,
20 pub crossfade_ms: u64,
21 pub volume: f32,
22}
23
24pub struct LivePlayService {
25 logger: Arc<Logger>,
26 playback: LivePlaybackEngine,
27 builder: ProjectBuilder,
28 bg_rx_guard: std::sync::Mutex<
30 Option<
31 std::sync::Arc<
32 std::sync::Mutex<
33 std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
34 >,
35 >,
36 >,
37 >,
38}
39
40impl LivePlayService {
41 pub fn new(logger: Arc<Logger>, builder: ProjectBuilder) -> Result<Self> {
42 let playback = LivePlaybackEngine::new(logger.clone())
43 .context("failed to initialise audio playback engine")?;
44 Ok(Self {
45 logger,
46 playback,
47 builder,
48 bg_rx_guard: std::sync::Mutex::new(None),
49 })
50 }
51
52 pub async fn run(&self, request: LivePlayRequest) -> Result<()> {
53 if request.live_mode {
54 self.run_live(request).await
55 } else {
56 self.run_offline(request).await
57 }
58 }
59
60 async fn run_offline(&self, request: LivePlayRequest) -> Result<()> {
61 let artifacts = self.builder.build(&request.build)?;
62 self.logger
63 .debug(format!("Build RMS: {:.4}", artifacts.rms));
64 self.logger.watch(format!(
65 "Audio regenerated in {} (total build {})",
66 format_duration(artifacts.audio_render_time),
67 format_duration(artifacts.total_duration)
68 ));
69 self.logger.info(format!(
70 "Loop length ≈ {}",
71 format_duration(artifacts.audio_length)
72 ));
73 self.logger.success(format!(
74 "Artifacts written: AST={}, audio={}",
75 artifacts.ast_path.display(),
76 artifacts.primary_audio_path.display()
77 ));
78
79 let source = LiveAudioSource::from_artifacts(&artifacts);
80 self.playback.play_once(source, request.volume).await?;
81 self.logger.info("Playback finished.");
82 Ok(())
83 }
84
85 async fn run_live(&self, request: LivePlayRequest) -> Result<()> {
86 let mut artifacts = match self.builder.build(&request.build) {
87 Ok(artifacts) => artifacts,
88 Err(err) => {
89 self.logger.error(format!("Initial build failed: {err}"));
90 return Err(err);
91 }
92 };
93 self.logger
94 .debug(format!("Build RMS: {:.4}", artifacts.rms));
95 self.logger.watch(format!(
96 "Audio regenerated in {} (total build {})",
97 format_duration(artifacts.audio_render_time),
98 format_duration(artifacts.total_duration)
99 ));
100 self.logger.info(format!(
101 "Loop length ≈ {}",
102 format_duration(artifacts.audio_length)
103 ));
104 let poll = Duration::from_millis(request.crossfade_ms.max(10));
105 let options = LivePlaybackOptions::new(poll).with_volume(request.volume);
106
107 let initial_source = LiveAudioSource::from_artifacts(&artifacts);
108
109 use crate::engine::audio::interpreter::driver::AudioInterpreter;
112 use std::sync::mpsc as std_mpsc;
113 use std::sync::{Arc as StdArc, Mutex as StdMutex};
114
115 let (bg_tx, bg_rx_std) =
119 std_mpsc::channel::<crate::engine::audio::events::AudioEventList>();
120 let bg_rx = StdArc::new(StdMutex::new(bg_rx_std));
121 self.logger.debug(format!(
123 "[LIVE] created bg channel; bg_rx strong_count={}",
124 StdArc::strong_count(&bg_rx)
125 ));
126
127 if let Ok(mut g) = self.bg_rx_guard.lock() {
129 *g = Some(bg_rx.clone());
130 }
131
132 let mut persistent_stop_tx: Option<std_mpsc::Sender<()>>;
133 let mut persistent_handle: Option<std::thread::JoinHandle<()>>;
134
135 let spawn_persistent = |stmts: Vec<crate::language::syntax::ast::Statement>,
139 sample_rate: u32,
140 bg_tx: std_mpsc::Sender<
141 crate::engine::audio::events::AudioEventList,
142 >,
143 bg_rx: StdArc<
144 StdMutex<std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>>,
145 >,
146 logger: Arc<Logger>| {
147 let (stop_tx, stop_rx) = std_mpsc::channel::<()>();
149
150 let handle = std::thread::spawn(move || {
151 use std::time::Duration as StdDuration;
152 let r = std::panic::catch_unwind(|| {
157 logger.debug(format!(
158 "[PERSISTENT] starting persistent interpreter (sample_rate={})",
159 sample_rate
160 ));
161 logger.debug(format!(
162 "[PERSISTENT] bg_rx strong_count at thread start={}",
163 StdArc::strong_count(&bg_rx)
164 ));
165 let mut interp = AudioInterpreter::new(sample_rate);
166 interp.suppress_print = true;
173 interp.suppress_beat_emit = true;
174
175 interp.background_event_tx = Some(bg_tx.clone());
178
179 let _ = interp.collect_events(&stmts);
181 logger.debug(format!(
182 "[PERSISTENT] collect_events() returned; background_event_tx present={}",
183 interp.background_event_tx.is_some()
184 ));
185
186 loop {
188 match stop_rx.try_recv() {
190 Ok(_) | Err(std_mpsc::TryRecvError::Disconnected) => break,
191 Err(std_mpsc::TryRecvError::Empty) => {}
192 }
193
194 logger.debug(format!(
196 "[PERSISTENT] bg_rx strong_count before recv_timeout={}",
197 StdArc::strong_count(&bg_rx)
198 ));
199 match bg_rx
200 .lock()
201 .expect("bg_rx lock")
202 .recv_timeout(StdDuration::from_millis(200))
203 {
204 Ok(events) => {
205 let cnt = events.events.len();
207 let mut times: Vec<f32> = events
208 .events
209 .iter()
210 .map(|e| match e {
211 crate::engine::audio::events::AudioEvent::Note {
212 start_time,
213 ..
214 } => *start_time,
215 crate::engine::audio::events::AudioEvent::Chord {
216 start_time,
217 ..
218 } => *start_time,
219 crate::engine::audio::events::AudioEvent::Sample {
220 start_time,
221 ..
222 } => *start_time,
223 })
224 .collect();
225 for (t, _m) in &events.logs {
226 times.push(*t);
227 }
228 let earliest = times.into_iter().min_by(|a, b| {
229 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
230 });
231 if let Some(t) = earliest {
232 logger.debug(format!(
233 "[PERSISTENT] merging {} bg events, earliest_start_time={}",
234 cnt, t
235 ));
236 } else {
237 logger.debug(format!("[PERSISTENT] merging {} bg events", cnt));
238 }
239 interp.events.merge(events);
241 }
242 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
243 }
245 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
246 break;
248 }
249 }
250 }
251 logger.debug(
252 "[PERSISTENT] persistent interpreter exiting (dropping interp)".to_string(),
253 );
254 });
256
257 if let Err(err) = r {
260 logger.error(format!(
261 "[PERSISTENT] persistent thread panicked: {:?}",
262 err
263 ));
264 }
265 });
266
267 (stop_tx, handle)
269 };
270
271 {
273 let (tx, handle) = spawn_persistent(
274 artifacts.statements.clone(),
275 artifacts.sample_rate,
276 bg_tx.clone(),
277 bg_rx.clone(),
278 self.logger.clone(),
279 );
280 persistent_stop_tx = Some(tx);
281 persistent_handle = Some(handle);
282 }
283
284 let session = self
285 .playback
286 .start_live_session(initial_source, options, Some(bg_rx.clone()))
287 .await?;
288 let mut best_audio_render_time = artifacts.audio_render_time;
289
290 self.logger.watch(format!(
291 "Live mode watching {}",
292 request.build.entry_path.display()
293 ));
294 let watcher = FileWatcher::new(self.logger.clone());
295 let mut stream = watcher
296 .watch(request.build.entry_path.clone(), WatchOptions::default())
297 .await
298 .context("failed to initialise file watcher")?;
299
300 loop {
301 select! {
302 change = stream.next_change() => {
303 match change {
304 Some(path) => {
305 self.logger.watch(format!("Rebuilding after change at {}", path.display()));
306 match self.builder.build(&request.build) {
307 Ok(new_artifacts) => {
308 self.logger
309 .debug(format!("Build RMS: {:.4}", new_artifacts.rms));
310 self.logger.watch(format!(
311 "Audio regenerated in {} (total build {})",
312 format_duration(new_artifacts.audio_render_time),
313 format_duration(new_artifacts.total_duration)
314 ));
315 self.logger.info(format!(
316 "Loop length ≈ {}",
317 format_duration(new_artifacts.audio_length)
318 ));
319 if new_artifacts.audio_render_time < best_audio_render_time {
320 best_audio_render_time = new_artifacts.audio_render_time;
321 self.logger.success(format!(
322 "⏱️ New best audio regen time: {}",
323 format_duration(best_audio_render_time)
324 ));
325 } else {
326 self.logger.info(format!(
327 "Best audio regen time so far: {}",
328 format_duration(best_audio_render_time)
329 ));
330 }
331 if let Some(tx) = persistent_stop_tx.take() {
333 let _ = tx.send(());
334 }
335 if let Some(handle) = persistent_handle.take() {
336 let _ = handle.join();
337 }
338
339 artifacts = new_artifacts;
340 let (tx, handle) = spawn_persistent(artifacts.statements.clone(), artifacts.sample_rate, bg_tx.clone(), bg_rx.clone(), self.logger.clone());
341 persistent_stop_tx = Some(tx);
342 persistent_handle = Some(handle);
343
344 let next_source = LiveAudioSource::from_artifacts(&artifacts);
345 if let Err(err) = session.queue_source(next_source) {
346 self.logger.error(format!("Failed to queue live buffer: {err}"));
347 }
348 }
349 Err(err) => {
350 self.logger.error(format!("Build failed after change: {err}"));
351 }
352 }
353 }
354 None => {
355 self.logger.warn("Watch stream ended; shutting down live playback");
356 break;
357 }
358 }
359 }
360 _ = session.heartbeat() => {}
361 }
362 }
363
364 let res = session.finish().await;
366 if let Ok(mut g) = self.bg_rx_guard.lock() {
368 let _ = g.take();
369 }
370 res
371 }
372}
373
374impl LiveAudioSource {
375 fn from_artifacts(artifacts: &BuildArtifacts) -> Self {
376 LiveAudioSource::with_path(
377 artifacts.primary_audio_path.clone(),
378 artifacts.primary_format,
379 artifacts.bit_depth,
380 artifacts.channels,
381 artifacts.sample_rate,
382 artifacts.resample_quality,
383 artifacts.audio_length,
384 )
385 }
386}
387
388fn format_duration(duration: Duration) -> String {
389 if duration.as_secs() >= 1 {
390 format!("{:.2}s", duration.as_secs_f64())
391 } else {
392 let ms = duration.as_secs_f64() * 1000.0;
393 if ms >= 100.0 {
394 format!("{:.0}ms", ms)
395 } else {
396 format!("{:.1}ms", ms)
397 }
398 }
399}