devalang_wasm/services/live/play/
mod.rs

1#![cfg(feature = "cli")]
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::{Context, Result};
7use tokio::select;
8
9use crate::engine::audio::playback::live::{
10    LiveAudioSource, LivePlaybackEngine, LivePlaybackOptions,
11};
12use crate::services::build::pipeline::{BuildArtifacts, BuildRequest, ProjectBuilder};
13use crate::services::watch::file::{FileWatcher, WatchOptions};
14use crate::tools::logger::Logger;
15
16#[derive(Debug, Clone)]
17pub struct LivePlayRequest {
18    pub build: BuildRequest,
19    pub live_mode: bool,
20    pub crossfade_ms: u64,
21    pub volume: f32,
22}
23
24pub struct LivePlayService {
25    logger: Arc<Logger>,
26    playback: LivePlaybackEngine,
27    builder: ProjectBuilder,
28    /// Guard clone to keep bg_rx alive for the duration of a live session
29    bg_rx_guard: std::sync::Mutex<
30        Option<
31            std::sync::Arc<
32                std::sync::Mutex<
33                    std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>,
34                >,
35            >,
36        >,
37    >,
38}
39
40impl LivePlayService {
41    pub fn new(logger: Arc<Logger>, builder: ProjectBuilder) -> Result<Self> {
42        let playback = LivePlaybackEngine::new(logger.clone())
43            .context("failed to initialise audio playback engine")?;
44        Ok(Self {
45            logger,
46            playback,
47            builder,
48            bg_rx_guard: std::sync::Mutex::new(None),
49        })
50    }
51
52    pub async fn run(&self, request: LivePlayRequest) -> Result<()> {
53        if request.live_mode {
54            self.run_live(request).await
55        } else {
56            self.run_offline(request).await
57        }
58    }
59
60    async fn run_offline(&self, request: LivePlayRequest) -> Result<()> {
61        let artifacts = self.builder.build(&request.build)?;
62        self.logger
63            .debug(format!("Build RMS: {:.4}", artifacts.rms));
64        self.logger.watch(format!(
65            "Audio regenerated in {} (total build {})",
66            format_duration(artifacts.audio_render_time),
67            format_duration(artifacts.total_duration)
68        ));
69        self.logger.info(format!(
70            "Loop length ≈ {}",
71            format_duration(artifacts.audio_length)
72        ));
73        self.logger.success(format!(
74            "Artifacts written: AST={}, audio={}",
75            artifacts.ast_path.display(),
76            artifacts.primary_audio_path.display()
77        ));
78
79        let source = LiveAudioSource::from_artifacts(&artifacts);
80        self.playback.play_once(source, request.volume).await?;
81        self.logger.info("Playback finished.");
82        Ok(())
83    }
84
85    async fn run_live(&self, request: LivePlayRequest) -> Result<()> {
86        let mut artifacts = match self.builder.build(&request.build) {
87            Ok(artifacts) => artifacts,
88            Err(err) => {
89                self.logger.error(format!("Initial build failed: {err}"));
90                return Err(err);
91            }
92        };
93        self.logger
94            .debug(format!("Build RMS: {:.4}", artifacts.rms));
95        self.logger.watch(format!(
96            "Audio regenerated in {} (total build {})",
97            format_duration(artifacts.audio_render_time),
98            format_duration(artifacts.total_duration)
99        ));
100        self.logger.info(format!(
101            "Loop length ≈ {}",
102            format_duration(artifacts.audio_length)
103        ));
104        let poll = Duration::from_millis(request.crossfade_ms.max(10));
105        let options = LivePlaybackOptions::new(poll).with_volume(request.volume);
106
107        let initial_source = LiveAudioSource::from_artifacts(&artifacts);
108
109        // Spawn a persistent interpreter thread to keep "loop pass" background workers
110        // alive and to print realtime messages while the live session is active.
111        use crate::engine::audio::interpreter::driver::AudioInterpreter;
112        use std::sync::mpsc as std_mpsc;
113        use std::sync::{Arc as StdArc, Mutex as StdMutex};
114
115        // Create a single background channel for the entire live session and wrap the
116        // receiver in an Arc<Mutex<...>> so the receiver's lifetime can be owned by the
117        // LivePlaybackSession and shared (by reference) with the persistent thread.
118        let (bg_tx, bg_rx_std) =
119            std_mpsc::channel::<crate::engine::audio::events::AudioEventList>();
120        let bg_rx = StdArc::new(StdMutex::new(bg_rx_std));
121        // Also emit via the structured logger so it appears in normal log outputs
122        self.logger.debug(format!(
123            "[LIVE] created bg channel; bg_rx strong_count={}",
124            StdArc::strong_count(&bg_rx)
125        ));
126
127        // Keep a guardian clone in the service to avoid accidental drop during live session
128        if let Ok(mut g) = self.bg_rx_guard.lock() {
129            *g = Some(bg_rx.clone());
130        }
131
132        let mut persistent_stop_tx: Option<std_mpsc::Sender<()>>;
133        let mut persistent_handle: Option<std::thread::JoinHandle<()>>;
134
135        // Helper closure to spawn a persistent interpreter that will run collect_events once
136        // to register groups and spawn background workers, then wait until signalled to stop.
137        // It accepts the bg_tx and an Arc<Mutex<Receiver>> owned by the session.
138        let spawn_persistent = |stmts: Vec<crate::language::syntax::ast::Statement>,
139                                sample_rate: u32,
140                                bg_tx: std_mpsc::Sender<
141            crate::engine::audio::events::AudioEventList,
142        >,
143                                bg_rx: StdArc<
144            StdMutex<std::sync::mpsc::Receiver<crate::engine::audio::events::AudioEventList>>,
145        >,
146                                logger: Arc<Logger>| {
147            // Stop signal channel for the persistent thread
148            let (stop_tx, stop_rx) = std_mpsc::channel::<()>();
149
150            let handle = std::thread::spawn(move || {
151                use std::time::Duration as StdDuration;
152                // Run the persistent interpreter inside a panic-catch so we can
153                // log if it exits unexpectedly (panics will otherwise be silent
154                // inside spawned threads and lead to the bg receiver being
155                // dropped, causing worker sends to fail).
156                let r = std::panic::catch_unwind(|| {
157                    logger.debug(format!(
158                        "[PERSISTENT] starting persistent interpreter (sample_rate={})",
159                        sample_rate
160                    ));
161                    logger.debug(format!(
162                        "[PERSISTENT] bg_rx strong_count at thread start={}",
163                        StdArc::strong_count(&bg_rx)
164                    ));
165                    let mut interp = AudioInterpreter::new(sample_rate);
166                    // Suppress immediate prints in the persistent interpreter so that
167                    // the live playback engine (which replays the build's .printlog)
168                    // is the single source of truth for timed PRINT output. This
169                    // avoids duplicate prints (once from the interpreter and once
170                    // from the playback sidecar) while keeping scheduled logs in
171                    // the merged events for rendering.
172                    interp.suppress_print = true;
173                    interp.suppress_beat_emit = true;
174
175                    // Install the sender so that loop pass workers will reuse it instead of
176                    // creating their own channel and receiver owned by the interpreter.
177                    interp.background_event_tx = Some(bg_tx.clone());
178
179                    // Collect events once to register groups and spawn background 'pass' workers
180                    let _ = interp.collect_events(&stmts);
181                    logger.debug(format!(
182                        "[PERSISTENT] collect_events() returned; background_event_tx present={}",
183                        interp.background_event_tx.is_some()
184                    ));
185
186                    // Drain and merge background worker batches continuously until stop signal.
187                    loop {
188                        // Check for stop signal without blocking forever
189                        match stop_rx.try_recv() {
190                            Ok(_) | Err(std_mpsc::TryRecvError::Disconnected) => break,
191                            Err(std_mpsc::TryRecvError::Empty) => {}
192                        }
193
194                        // Receive one batch from the bg_rx with timeout and merge
195                        logger.debug(format!(
196                            "[PERSISTENT] bg_rx strong_count before recv_timeout={}",
197                            StdArc::strong_count(&bg_rx)
198                        ));
199                        match bg_rx
200                            .lock()
201                            .expect("bg_rx lock")
202                            .recv_timeout(StdDuration::from_millis(200))
203                        {
204                            Ok(events) => {
205                                // Debug: report merge and earliest time
206                                let cnt = events.events.len();
207                                let mut times: Vec<f32> = events
208                                    .events
209                                    .iter()
210                                    .map(|e| match e {
211                                        crate::engine::audio::events::AudioEvent::Note {
212                                            start_time,
213                                            ..
214                                        } => *start_time,
215                                        crate::engine::audio::events::AudioEvent::Chord {
216                                            start_time,
217                                            ..
218                                        } => *start_time,
219                                        crate::engine::audio::events::AudioEvent::Sample {
220                                            start_time,
221                                            ..
222                                        } => *start_time,
223                                        crate::engine::audio::events::AudioEvent::Stop {
224                                            time,
225                                            ..
226                                        } => *time,
227                                    })
228                                    .collect();
229                                for (t, _m) in &events.logs {
230                                    times.push(*t);
231                                }
232                                let earliest = times.into_iter().min_by(|a, b| {
233                                    a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
234                                });
235                                if let Some(t) = earliest {
236                                    logger.debug(format!(
237                                        "[PERSISTENT] merging {} bg events, earliest_start_time={}",
238                                        cnt, t
239                                    ));
240                                } else {
241                                    logger.debug(format!("[PERSISTENT] merging {} bg events", cnt));
242                                }
243                                // Merge produced events into persistent interpreter's event list
244                                interp.events.merge(events);
245                            }
246                            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
247                                // nothing to merge now, continue loop
248                            }
249                            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
250                                // workers are gone
251                                break;
252                            }
253                        }
254                    }
255                    logger.debug(
256                        "[PERSISTENT] persistent interpreter exiting (dropping interp)".to_string(),
257                    );
258                    // dropping interp will stop background workers
259                });
260
261                // If the persistent interpreter panicked, log the payload so we can
262                // diagnose unexpected thread exits that would drop the bg receiver.
263                if let Err(err) = r {
264                    logger.error(format!(
265                        "[PERSISTENT] persistent thread panicked: {:?}",
266                        err
267                    ));
268                }
269            });
270
271            // Return the stop sender so callers can request the thread to stop, and the join handle
272            (stop_tx, handle)
273        };
274
275        // Start persistent interpreter for initial artifacts
276        {
277            let (tx, handle) = spawn_persistent(
278                artifacts.statements.clone(),
279                artifacts.sample_rate,
280                bg_tx.clone(),
281                bg_rx.clone(),
282                self.logger.clone(),
283            );
284            persistent_stop_tx = Some(tx);
285            persistent_handle = Some(handle);
286        }
287
288        let session = self
289            .playback
290            .start_live_session(initial_source, options, Some(bg_rx.clone()))
291            .await?;
292        let mut best_audio_render_time = artifacts.audio_render_time;
293
294        self.logger.watch(format!(
295            "Live mode watching {}",
296            request.build.entry_path.display()
297        ));
298        let watcher = FileWatcher::new(self.logger.clone());
299        let mut stream = watcher
300            .watch(request.build.entry_path.clone(), WatchOptions::default())
301            .await
302            .context("failed to initialise file watcher")?;
303
304        loop {
305            select! {
306                change = stream.next_change() => {
307                    match change {
308                        Some(path) => {
309                            self.logger.watch(format!("Rebuilding after change at {}", path.display()));
310                            match self.builder.build(&request.build) {
311                                Ok(new_artifacts) => {
312                                    self.logger
313                                        .debug(format!("Build RMS: {:.4}", new_artifacts.rms));
314                                    self.logger.watch(format!(
315                                        "Audio regenerated in {} (total build {})",
316                                        format_duration(new_artifacts.audio_render_time),
317                                        format_duration(new_artifacts.total_duration)
318                                    ));
319                                    self.logger.info(format!(
320                                        "Loop length ≈ {}",
321                                        format_duration(new_artifacts.audio_length)
322                                    ));
323                                    if new_artifacts.audio_render_time < best_audio_render_time {
324                                        best_audio_render_time = new_artifacts.audio_render_time;
325                                        self.logger.success(format!(
326                                            "⏱️ New best audio regen time: {}",
327                                            format_duration(best_audio_render_time)
328                                        ));
329                                    } else {
330                                        self.logger.info(format!(
331                                            "Best audio regen time so far: {}",
332                                            format_duration(best_audio_render_time)
333                                        ));
334                                    }
335                                    // Stop previous persistent interpreter (if any) and spawn a new one for updated statements
336                                    if let Some(tx) = persistent_stop_tx.take() {
337                                        let _ = tx.send(());
338                                    }
339                                    if let Some(handle) = persistent_handle.take() {
340                                        let _ = handle.join();
341                                    }
342
343                                    artifacts = new_artifacts;
344                                    let (tx, handle) = spawn_persistent(artifacts.statements.clone(), artifacts.sample_rate, bg_tx.clone(), bg_rx.clone(), self.logger.clone());
345                                    persistent_stop_tx = Some(tx);
346                                    persistent_handle = Some(handle);
347
348                                    let next_source = LiveAudioSource::from_artifacts(&artifacts);
349                                    if let Err(err) = session.queue_source(next_source) {
350                                        self.logger.error(format!("Failed to queue live buffer: {err}"));
351                                    }
352                                }
353                                Err(err) => {
354                                    self.logger.error(format!("Build failed after change: {err}"));
355                                }
356                            }
357                        }
358                        None => {
359                            self.logger.warn("Watch stream ended; shutting down live playback");
360                            break;
361                        }
362                    }
363                }
364                _ = session.heartbeat() => {}
365            }
366        }
367
368        // Wait for session completion, then clear our guardian clone so the receiver may be dropped
369        let res = session.finish().await;
370        // clear guard
371        if let Ok(mut g) = self.bg_rx_guard.lock() {
372            let _ = g.take();
373        }
374        res
375    }
376}
377
378impl LiveAudioSource {
379    fn from_artifacts(artifacts: &BuildArtifacts) -> Self {
380        LiveAudioSource::with_path(
381            artifacts.primary_audio_path.clone(),
382            artifacts.primary_format,
383            artifacts.bit_depth,
384            artifacts.channels,
385            artifacts.sample_rate,
386            artifacts.resample_quality,
387            artifacts.audio_length,
388        )
389    }
390}
391
392fn format_duration(duration: Duration) -> String {
393    if duration.as_secs() >= 1 {
394        format!("{:.2}s", duration.as_secs_f64())
395    } else {
396        let ms = duration.as_secs_f64() * 1000.0;
397        if ms >= 100.0 {
398            format!("{:.0}ms", ms)
399        } else {
400            format!("{:.1}ms", ms)
401        }
402    }
403}