Skip to main content

wavecraft_dev_server/reload/
rebuild.rs

1//! Hot-reload rebuild pipeline
2//!
3//! This module provides automatic rebuilding when Rust source files change.
4//! The pipeline watches for file changes, triggers a Cargo build, and updates
5//! the development server's parameter state without dropping WebSocket connections.
6//!
7//! CLI-specific functions (dylib discovery, subprocess param extraction, sidecar
8//! caching) are injected via [`RebuildCallbacks`] to keep this crate CLI-agnostic.
9
10use anyhow::{Context, Result};
11use console::style;
12use std::any::Any;
13use std::future::Future;
14use std::path::{Path, PathBuf};
15use std::pin::Pin;
16use std::process::Stdio;
17use std::sync::Arc;
18use tokio::io::AsyncReadExt;
19use tokio::process::Command;
20use tokio::sync::watch;
21use wavecraft_bridge::ParameterHost;
22use wavecraft_protocol::{ParameterInfo, ProcessorInfo};
23
24use crate::host::DevServerHost;
25use crate::reload::guard::BuildGuard;
26use crate::ws::WsServer;
27
28/// Callback type for loading parameters from a rebuilt dylib.
29///
30/// The closure receives the engine directory and must return the loaded
31/// parameters. This is injected by the CLI to handle dylib discovery,
32/// temp copying, and subprocess extraction.
33pub type ParamLoaderFn = Arc<
34    dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ParameterInfo>>> + Send>>
35        + Send
36        + Sync,
37>;
38
39/// Callback type for loading processors from a rebuilt dylib.
40pub type ProcessorLoaderFn = Arc<
41    dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ProcessorInfo>>> + Send>>
42        + Send
43        + Sync,
44>;
45
46/// Callback type for writing parameter cache to a sidecar file.
47pub type SidecarWriterFn = Arc<dyn Fn(&Path, &[ParameterInfo]) -> Result<()> + Send + Sync>;
48
49/// Callback type for writing generated TypeScript parameter ID types.
50pub type TsTypesWriterFn = Arc<dyn Fn(&[ParameterInfo]) -> Result<()> + Send + Sync>;
51
52/// Callback type for writing generated TypeScript processor ID types.
53pub type ProcessorTsTypesWriterFn = Arc<dyn Fn(&[ProcessorInfo]) -> Result<()> + Send + Sync>;
54
55/// Callbacks for CLI-specific operations.
56///
57/// The rebuild pipeline needs to perform operations that depend on CLI
58/// infrastructure (dylib discovery, subprocess parameter extraction,
59/// sidecar caching). These are injected as callbacks to keep the
60/// dev-server crate independent of CLI internals.
61pub struct RebuildCallbacks {
62    /// Engine package name for `cargo build --package` flag.
63    /// `None` means no `--package` flag (single-crate project).
64    pub package_name: Option<String>,
65    /// Optional sidecar cache writer (writes params to JSON file).
66    pub write_sidecar: Option<SidecarWriterFn>,
67    /// Optional TypeScript types writer (writes generated parameter ID typings).
68    pub write_ts_types: Option<TsTypesWriterFn>,
69    /// Optional TypeScript types writer for processor IDs.
70    pub write_processor_ts_types: Option<ProcessorTsTypesWriterFn>,
71    /// Loads parameters from the rebuilt dylib (async).
72    /// Receives the engine directory and returns parsed parameters.
73    pub param_loader: ParamLoaderFn,
74    /// Loads processors from the rebuilt dylib (async).
75    pub processor_loader: Option<ProcessorLoaderFn>,
76}
77
78/// Rebuild pipeline for hot-reload.
79///
80/// Coordinates Cargo builds, parameter reloading, and WebSocket notifications.
81pub struct RebuildPipeline {
82    guard: Arc<BuildGuard>,
83    engine_dir: PathBuf,
84    host: Arc<DevServerHost>,
85    ws_server: Arc<WsServer<Arc<DevServerHost>>>,
86    shutdown_rx: watch::Receiver<bool>,
87    callbacks: RebuildCallbacks,
88    #[cfg(feature = "audio")]
89    audio_reload_tx: Option<tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>>,
90    /// Channel for canceling the current parameter extraction when superseded.
91    cancel_param_load_tx: watch::Sender<bool>,
92    cancel_param_load_rx: watch::Receiver<bool>,
93}
94
95impl RebuildPipeline {
96    /// Create a new rebuild pipeline.
97    #[allow(clippy::too_many_arguments)]
98    pub fn new(
99        guard: Arc<BuildGuard>,
100        engine_dir: PathBuf,
101        host: Arc<DevServerHost>,
102        ws_server: Arc<WsServer<Arc<DevServerHost>>>,
103        shutdown_rx: watch::Receiver<bool>,
104        callbacks: RebuildCallbacks,
105        #[cfg(feature = "audio")] audio_reload_tx: Option<
106            tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>,
107        >,
108    ) -> Self {
109        let (cancel_param_load_tx, cancel_param_load_rx) = watch::channel(false);
110        Self {
111            guard,
112            engine_dir,
113            host,
114            ws_server,
115            shutdown_rx,
116            callbacks,
117            #[cfg(feature = "audio")]
118            audio_reload_tx,
119            cancel_param_load_tx,
120            cancel_param_load_rx,
121        }
122    }
123
124    /// Handle a file change event. Triggers rebuild if not already running.
125    pub async fn handle_change(&self) -> Result<()> {
126        if !self.guard.try_start() {
127            self.guard.mark_pending();
128            // Cancel any ongoing parameter extraction - it will be superseded
129            let _ = self.cancel_param_load_tx.send(true);
130            println!(
131                "  {} Build already in progress, queuing rebuild...",
132                style("→").dim()
133            );
134            return Ok(());
135        }
136
137        // Reset cancellation flag at the start of a new build cycle
138        let _ = self.cancel_param_load_tx.send(false);
139
140        loop {
141            let result = self.do_build().await;
142
143            match result {
144                Ok((params, param_count_change)) => {
145                    let mut reload_ok = true;
146                    let mut ts_types_ok = true;
147                    let mut processor_types_ok = true;
148
149                    let processors = if let Some(ref loader) = self.callbacks.processor_loader {
150                        match loader(self.engine_dir.clone()).await {
151                            Ok(processors) => Some(processors),
152                            Err(e) => {
153                                processor_types_ok = false;
154                                println!(
155                                    "  {} Failed to load processor metadata after rebuild: {}",
156                                    style("⚠").yellow(),
157                                    e
158                                );
159                                println!(
160                                    "  {} Save a Rust source file to retry, or restart the dev server",
161                                    style("  ↳").dim(),
162                                );
163                                None
164                            }
165                        }
166                    } else {
167                        None
168                    };
169
170                    if let Some(ref writer) = self.callbacks.write_sidecar
171                        && let Err(e) = writer(&self.engine_dir, &params)
172                    {
173                        println!("  Warning: failed to update param cache: {}", e);
174                    }
175
176                    if let Some(ref writer) = self.callbacks.write_ts_types
177                        && let Err(e) = writer(&params)
178                    {
179                        ts_types_ok = false;
180                        log_regeneration_failure("TypeScript parameter types", &e);
181                    }
182
183                    if let (Some(processors), Some(writer)) = (
184                        processors.as_deref(),
185                        self.callbacks.write_processor_ts_types.as_ref(),
186                    ) && let Err(e) = writer(processors)
187                    {
188                        processor_types_ok = false;
189                        log_regeneration_failure("TypeScript processor types", &e);
190                    }
191
192                    println!("  {} Updating parameter host...", style("→").dim());
193                    let replace_result =
194                        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
195                            self.host.replace_parameters(params.clone())
196                        }));
197
198                    match replace_result {
199                        Ok(Ok(())) => {
200                            println!("  {} Updated {} parameters", style("→").dim(), params.len());
201                        }
202                        Ok(Err(e)) => {
203                            reload_ok = false;
204                            println!(
205                                "  {} Failed to replace parameters: {:#}",
206                                style("✗").red(),
207                                e
208                            );
209                        }
210                        Err(panic_payload) => {
211                            reload_ok = false;
212                            println!(
213                                "  {} Panic while replacing parameters: {}",
214                                style("✗").red(),
215                                panic_message(panic_payload)
216                            );
217                        }
218                    }
219
220                    if reload_ok {
221                        println!("  {} Notifying UI clients...", style("→").dim());
222                        let broadcast_result = tokio::spawn({
223                            let ws_server = Arc::clone(&self.ws_server);
224                            async move { ws_server.broadcast_parameters_changed().await }
225                        })
226                        .await;
227
228                        match broadcast_result {
229                            Ok(Ok(())) => {
230                                println!("  {} UI notified", style("→").dim());
231                            }
232                            Ok(Err(e)) => {
233                                reload_ok = false;
234                                println!(
235                                    "  {} Failed to notify UI clients: {:#}",
236                                    style("✗").red(),
237                                    e
238                                );
239                            }
240                            Err(join_err) => {
241                                reload_ok = false;
242                                println!(
243                                    "  {} Panic while notifying UI clients: {}",
244                                    style("✗").red(),
245                                    join_err
246                                );
247                            }
248                        }
249                    }
250
251                    if reload_ok {
252                        let change_info = if param_count_change > 0 {
253                            format!(" (+{} new)", param_count_change)
254                        } else if param_count_change < 0 {
255                            format!(" ({} removed)", -param_count_change)
256                        } else {
257                            String::new()
258                        };
259
260                        let param_types_status = if ts_types_ok {
261                            ""
262                        } else {
263                            " (⚠ TypeScript parameter types stale)"
264                        };
265
266                        let processor_types_status = if processor_types_ok {
267                            ""
268                        } else {
269                            " (⚠ TypeScript processor types stale)"
270                        };
271
272                        println!(
273                            "  {} Hot-reload complete — {} parameters{}{}{}",
274                            style("✓").green(),
275                            params.len(),
276                            change_info,
277                            param_types_status,
278                            processor_types_status,
279                        );
280
281                        // Trigger audio reload if audio is enabled
282                        #[cfg(feature = "audio")]
283                        if let Some(ref tx) = self.audio_reload_tx {
284                            let _ = tx.send(params);
285                        }
286                    } else {
287                        println!(
288                            "  {} Hot-reload aborted — parameters not fully applied",
289                            style("✗").red()
290                        );
291                    }
292                }
293                Err(e) => {
294                    println!("  {} Build failed:\n{}", style("✗").red(), e);
295                    // Preserve old state, don't update parameters
296                }
297            }
298
299            if !self.guard.complete() {
300                break; // No pending rebuild
301            }
302            println!(
303                "  {} Pending changes detected, rebuilding...",
304                style("→").cyan()
305            );
306        }
307
308        Ok(())
309    }
310
311    /// Execute a Cargo build and load new parameters on success.
312    async fn do_build(&self) -> Result<(Vec<ParameterInfo>, i32)> {
313        if *self.shutdown_rx.borrow() {
314            anyhow::bail!("Build cancelled due to shutdown");
315        }
316
317        println!("  {} Rebuilding plugin...", style("🔄").cyan());
318        let start = std::time::Instant::now();
319
320        // Get old parameter count for change reporting
321        let old_count = self.host.get_all_parameters().len() as i32;
322
323        // Build command with optional --package flag
324        let mut cmd = Command::new("cargo");
325        cmd.args([
326            "build",
327            "--lib",
328            "--features",
329            "_param-discovery",
330            "--message-format=json",
331        ]);
332
333        if let Some(ref package_name) = self.callbacks.package_name {
334            cmd.args(["--package", package_name]);
335        }
336
337        let mut child = cmd
338            .current_dir(&self.engine_dir)
339            .stdout(Stdio::piped())
340            .stderr(Stdio::piped())
341            .spawn()
342            .context("Failed to spawn cargo build")?;
343
344        let stdout = child
345            .stdout
346            .take()
347            .context("Failed to capture cargo stdout")?;
348        let stderr = child
349            .stderr
350            .take()
351            .context("Failed to capture cargo stderr")?;
352
353        let stdout_handle = tokio::spawn(read_to_end(stdout));
354        let stderr_handle = tokio::spawn(read_to_end(stderr));
355
356        let mut shutdown_rx = self.shutdown_rx.clone();
357        let status = tokio::select! {
358            status = child.wait() => status.context("Failed to wait for cargo build")?,
359            _ = shutdown_rx.changed() => {
360                self.kill_build_process(&mut child).await?;
361                let _ = stdout_handle.await;
362                let _ = stderr_handle.await;
363                anyhow::bail!("Build cancelled due to shutdown");
364            }
365        };
366
367        let stdout = stdout_handle
368            .await
369            .context("Failed to join cargo stdout task")??;
370        let stderr = stderr_handle
371            .await
372            .context("Failed to join cargo stderr task")??;
373
374        let elapsed = start.elapsed();
375
376        if !status.success() {
377            // Parse JSON output for errors
378            let stderr = String::from_utf8_lossy(&stderr);
379            let stdout = String::from_utf8_lossy(&stdout);
380
381            // Try to extract compiler errors from JSON
382            let mut error_lines = Vec::new();
383            for line in stdout.lines().chain(stderr.lines()) {
384                if let Ok(json) = serde_json::from_str::<serde_json::Value>(line)
385                    && json["reason"] == "compiler-message"
386                    && let Some(message) = json["message"]["rendered"].as_str()
387                {
388                    error_lines.push(message.to_string());
389                }
390            }
391
392            if error_lines.is_empty() {
393                error_lines.push(stderr.to_string());
394            }
395
396            anyhow::bail!("{}", error_lines.join("\n"));
397        }
398
399        println!(
400            "  {} Build succeeded in {:.1}s",
401            style("✓").green(),
402            elapsed.as_secs_f64()
403        );
404
405        // Load parameters via the injected callback, but race against cancellation
406        let loader = Arc::clone(&self.callbacks.param_loader);
407        let engine_dir = self.engine_dir.clone();
408        let mut cancel_rx = self.cancel_param_load_rx.clone();
409
410        let params = tokio::select! {
411            result = loader(engine_dir) => {
412                result.context("Failed to load parameters from rebuilt dylib")?
413            }
414            _ = cancel_rx.changed() => {
415                if *cancel_rx.borrow_and_update() {
416                    println!(
417                        "  {} Parameter extraction cancelled — superseded by newer change",
418                        style("⚠").yellow()
419                    );
420                    anyhow::bail!("Parameter extraction cancelled due to newer file change");
421                }
422                // If cancellation was reset to false, continue waiting
423                loader(self.engine_dir.clone())
424                    .await
425                    .context("Failed to load parameters from rebuilt dylib")?
426            }
427        };
428
429        let param_count_change = params.len() as i32 - old_count;
430
431        Ok((params, param_count_change))
432    }
433
434    async fn kill_build_process(&self, child: &mut tokio::process::Child) -> Result<()> {
435        #[cfg(unix)]
436        {
437            use nix::sys::signal::{Signal, kill};
438            use nix::unistd::Pid;
439
440            if let Some(pid) = child.id() {
441                let _ = kill(Pid::from_raw(-(pid as i32)), Signal::SIGTERM);
442            }
443        }
444
445        let _ = child.kill().await;
446        Ok(())
447    }
448}
449
450async fn read_to_end(mut reader: impl tokio::io::AsyncRead + Unpin) -> Result<Vec<u8>> {
451    let mut buffer = Vec::new();
452    reader
453        .read_to_end(&mut buffer)
454        .await
455        .context("Failed to read cargo output")?;
456    Ok(buffer)
457}
458
459fn panic_message(payload: Box<dyn Any + Send>) -> String {
460    if let Some(msg) = payload.downcast_ref::<String>() {
461        msg.clone()
462    } else if let Some(msg) = payload.downcast_ref::<&str>() {
463        msg.to_string()
464    } else {
465        "Unknown panic".to_string()
466    }
467}
468
469fn log_regeneration_failure(target: &str, error: &anyhow::Error) {
470    println!(
471        "  {} Failed to regenerate {}: {}",
472        style("⚠").yellow(),
473        target,
474        error
475    );
476    println!(
477        "  {} Save a Rust source file to retry, or restart the dev server",
478        style("  ↳").dim(),
479    );
480}