Skip to main content

wavecraft_dev_server/reload/
rebuild.rs

1//! Hot-reload rebuild pipeline
2//!
3//! This module provides automatic rebuilding when Rust source files change.
4//! The pipeline watches for file changes, triggers a Cargo build, and updates
5//! the development server's parameter state without dropping WebSocket connections.
6//!
7//! CLI-specific functions (dylib discovery, subprocess param extraction, sidecar
8//! caching) are injected via [`RebuildCallbacks`] to keep this crate CLI-agnostic.
9
10use anyhow::{Context, Result};
11use console::style;
12use std::any::Any;
13use std::future::Future;
14use std::path::{Path, PathBuf};
15use std::pin::Pin;
16use std::process::Stdio;
17use std::sync::Arc;
18use tokio::io::AsyncReadExt;
19use tokio::process::Command;
20use tokio::sync::watch;
21use wavecraft_bridge::ParameterHost;
22use wavecraft_protocol::{ParameterInfo, ProcessorInfo};
23
24use crate::host::DevServerHost;
25use crate::reload::guard::BuildGuard;
26use crate::ws::WsServer;
27
28/// Callback type for loading parameters from a rebuilt dylib.
29///
30/// The closure receives the engine directory and must return the loaded
31/// parameters. This is injected by the CLI to handle dylib discovery,
32/// temp copying, and subprocess extraction.
33pub type ParamLoaderFn = Arc<
34    dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ParameterInfo>>> + Send>>
35        + Send
36        + Sync,
37>;
38
39/// Callback type for loading processors from a rebuilt dylib.
40pub type ProcessorLoaderFn = Arc<
41    dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ProcessorInfo>>> + Send>>
42        + Send
43        + Sync,
44>;
45
46/// Callback type for writing parameter cache to a sidecar file.
47pub type SidecarWriterFn = Arc<dyn Fn(&Path, &[ParameterInfo]) -> Result<()> + Send + Sync>;
48
49/// Callback type for writing generated TypeScript parameter ID types.
50pub type TsTypesWriterFn = Arc<dyn Fn(&[ParameterInfo]) -> Result<()> + Send + Sync>;
51
52/// Callback type for writing generated TypeScript processor ID types.
53pub type ProcessorTsTypesWriterFn = Arc<dyn Fn(&[ProcessorInfo]) -> Result<()> + Send + Sync>;
54
55/// Callbacks for CLI-specific operations.
56///
57/// The rebuild pipeline needs to perform operations that depend on CLI
58/// infrastructure (dylib discovery, subprocess parameter extraction,
59/// sidecar caching). These are injected as callbacks to keep the
60/// dev-server crate independent of CLI internals.
61pub struct RebuildCallbacks {
62    /// Engine package name for `cargo build --package` flag.
63    /// `None` means no `--package` flag (single-crate project).
64    pub package_name: Option<String>,
65    /// Optional sidecar cache writer (writes params to JSON file).
66    pub write_sidecar: Option<SidecarWriterFn>,
67    /// Optional TypeScript types writer (writes generated parameter ID typings).
68    pub write_ts_types: Option<TsTypesWriterFn>,
69    /// Optional TypeScript types writer for processor IDs.
70    pub write_processor_ts_types: Option<ProcessorTsTypesWriterFn>,
71    /// Loads parameters from the rebuilt dylib (async).
72    /// Receives the engine directory and returns parsed parameters.
73    pub param_loader: ParamLoaderFn,
74    /// Loads processors from the rebuilt dylib (async).
75    pub processor_loader: Option<ProcessorLoaderFn>,
76    /// Additional Rust source paths to watch for hot-reload triggers.
77    ///
78    /// This is primarily used in SDK development mode where the active engine
79    /// depends on sibling workspace crates (e.g. `engine/crates/wavecraft-processors`).
80    pub additional_watch_paths: Vec<PathBuf>,
81}
82
83/// Rebuild pipeline for hot-reload.
84///
85/// Coordinates Cargo builds, parameter reloading, and WebSocket notifications.
86pub struct RebuildPipeline {
87    guard: Arc<BuildGuard>,
88    engine_dir: PathBuf,
89    host: Arc<DevServerHost>,
90    ws_server: Arc<WsServer<Arc<DevServerHost>>>,
91    shutdown_rx: watch::Receiver<bool>,
92    callbacks: RebuildCallbacks,
93    #[cfg(feature = "audio")]
94    audio_reload_tx: Option<tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>>,
95    /// Channel for canceling the current parameter extraction when superseded.
96    cancel_param_load_tx: watch::Sender<bool>,
97    cancel_param_load_rx: watch::Receiver<bool>,
98}
99
100impl RebuildPipeline {
101    /// Create a new rebuild pipeline.
102    #[allow(clippy::too_many_arguments)]
103    pub fn new(
104        guard: Arc<BuildGuard>,
105        engine_dir: PathBuf,
106        host: Arc<DevServerHost>,
107        ws_server: Arc<WsServer<Arc<DevServerHost>>>,
108        shutdown_rx: watch::Receiver<bool>,
109        callbacks: RebuildCallbacks,
110        #[cfg(feature = "audio")] audio_reload_tx: Option<
111            tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>,
112        >,
113    ) -> Self {
114        let (cancel_param_load_tx, cancel_param_load_rx) = watch::channel(false);
115        Self {
116            guard,
117            engine_dir,
118            host,
119            ws_server,
120            shutdown_rx,
121            callbacks,
122            #[cfg(feature = "audio")]
123            audio_reload_tx,
124            cancel_param_load_tx,
125            cancel_param_load_rx,
126        }
127    }
128
129    /// Handle a file change event. Triggers rebuild if not already running.
130    pub async fn handle_change(&self) -> Result<()> {
131        if !self.guard.try_start() {
132            self.guard.mark_pending();
133            // Cancel any ongoing parameter extraction - it will be superseded
134            let _ = self.cancel_param_load_tx.send(true);
135            println!(
136                "  {} Build already in progress, queuing rebuild...",
137                style("→").dim()
138            );
139            return Ok(());
140        }
141
142        // Reset cancellation flag at the start of a new build cycle
143        let _ = self.cancel_param_load_tx.send(false);
144
145        loop {
146            let result = self.do_build().await;
147
148            match result {
149                Ok((params, param_count_change)) => {
150                    let mut reload_ok = true;
151                    let mut ts_types_ok = true;
152                    let mut processor_types_ok = true;
153
154                    let processors = if let Some(ref loader) = self.callbacks.processor_loader {
155                        match loader(self.engine_dir.clone()).await {
156                            Ok(processors) => Some(processors),
157                            Err(e) => {
158                                processor_types_ok = false;
159                                println!(
160                                    "  {} Failed to load processor metadata after rebuild: {}",
161                                    style("⚠").yellow(),
162                                    e
163                                );
164                                println!(
165                                    "  {} Save a Rust source file to retry, or restart the dev server",
166                                    style("  ↳").dim(),
167                                );
168                                None
169                            }
170                        }
171                    } else {
172                        None
173                    };
174
175                    if let Some(ref writer) = self.callbacks.write_sidecar
176                        && let Err(e) = writer(&self.engine_dir, &params)
177                    {
178                        println!("  Warning: failed to update param cache: {}", e);
179                    }
180
181                    if let Some(ref writer) = self.callbacks.write_ts_types
182                        && let Err(e) = writer(&params)
183                    {
184                        ts_types_ok = false;
185                        log_regeneration_failure("TypeScript parameter types", &e);
186                    }
187
188                    if let (Some(processors), Some(writer)) = (
189                        processors.as_deref(),
190                        self.callbacks.write_processor_ts_types.as_ref(),
191                    ) && let Err(e) = writer(processors)
192                    {
193                        processor_types_ok = false;
194                        log_regeneration_failure("TypeScript processor types", &e);
195                    }
196
197                    println!("  {} Updating parameter host...", style("→").dim());
198                    let replace_result =
199                        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
200                            self.host.replace_parameters(params.clone())
201                        }));
202
203                    match replace_result {
204                        Ok(Ok(())) => {
205                            println!("  {} Updated {} parameters", style("→").dim(), params.len());
206                        }
207                        Ok(Err(e)) => {
208                            reload_ok = false;
209                            println!(
210                                "  {} Failed to replace parameters: {:#}",
211                                style("✗").red(),
212                                e
213                            );
214                        }
215                        Err(panic_payload) => {
216                            reload_ok = false;
217                            println!(
218                                "  {} Panic while replacing parameters: {}",
219                                style("✗").red(),
220                                panic_message(panic_payload)
221                            );
222                        }
223                    }
224
225                    if reload_ok {
226                        println!("  {} Notifying UI clients...", style("→").dim());
227                        let broadcast_result = tokio::spawn({
228                            let ws_server = Arc::clone(&self.ws_server);
229                            async move { ws_server.broadcast_parameters_changed().await }
230                        })
231                        .await;
232
233                        match broadcast_result {
234                            Ok(Ok(())) => {
235                                println!("  {} UI notified", style("→").dim());
236                            }
237                            Ok(Err(e)) => {
238                                reload_ok = false;
239                                println!(
240                                    "  {} Failed to notify UI clients: {:#}",
241                                    style("✗").red(),
242                                    e
243                                );
244                            }
245                            Err(join_err) => {
246                                reload_ok = false;
247                                println!(
248                                    "  {} Panic while notifying UI clients: {}",
249                                    style("✗").red(),
250                                    join_err
251                                );
252                            }
253                        }
254                    }
255
256                    if reload_ok {
257                        let change_info = if param_count_change > 0 {
258                            format!(" (+{} new)", param_count_change)
259                        } else if param_count_change < 0 {
260                            format!(" ({} removed)", -param_count_change)
261                        } else {
262                            String::new()
263                        };
264
265                        let param_types_status = if ts_types_ok {
266                            ""
267                        } else {
268                            " (⚠ TypeScript parameter types stale)"
269                        };
270
271                        let processor_types_status = if processor_types_ok {
272                            ""
273                        } else {
274                            " (⚠ TypeScript processor types stale)"
275                        };
276
277                        println!(
278                            "  {} Hot-reload complete — {} parameters{}{}{}",
279                            style("✓").green(),
280                            params.len(),
281                            change_info,
282                            param_types_status,
283                            processor_types_status,
284                        );
285
286                        // Trigger audio reload if audio is enabled
287                        #[cfg(feature = "audio")]
288                        if let Some(ref tx) = self.audio_reload_tx {
289                            let _ = tx.send(params);
290                        }
291                    } else {
292                        println!(
293                            "  {} Hot-reload aborted — parameters not fully applied",
294                            style("✗").red()
295                        );
296                    }
297                }
298                Err(e) => {
299                    println!("  {} Build failed:\n{}", style("✗").red(), e);
300                    // Preserve old state, don't update parameters
301                }
302            }
303
304            if !self.guard.complete() {
305                break; // No pending rebuild
306            }
307            println!(
308                "  {} Pending changes detected, rebuilding...",
309                style("→").cyan()
310            );
311        }
312
313        Ok(())
314    }
315
316    /// Execute a Cargo build and load new parameters on success.
317    async fn do_build(&self) -> Result<(Vec<ParameterInfo>, i32)> {
318        if *self.shutdown_rx.borrow() {
319            anyhow::bail!("Build cancelled due to shutdown");
320        }
321
322        println!("  {} Rebuilding plugin...", style("🔄").cyan());
323        let start = std::time::Instant::now();
324
325        // Get old parameter count for change reporting
326        let old_count = self.host.get_all_parameters().len() as i32;
327
328        // Build command with optional --package flag
329        let mut cmd = Command::new("cargo");
330        cmd.args([
331            "build",
332            "--lib",
333            "--features",
334            "_param-discovery",
335            "--message-format=json",
336        ]);
337
338        if let Some(ref package_name) = self.callbacks.package_name {
339            cmd.args(["--package", package_name]);
340        }
341
342        let mut child = cmd
343            .current_dir(&self.engine_dir)
344            .stdout(Stdio::piped())
345            .stderr(Stdio::piped())
346            .spawn()
347            .context("Failed to spawn cargo build")?;
348
349        let stdout = child
350            .stdout
351            .take()
352            .context("Failed to capture cargo stdout")?;
353        let stderr = child
354            .stderr
355            .take()
356            .context("Failed to capture cargo stderr")?;
357
358        let stdout_handle = tokio::spawn(read_to_end(stdout));
359        let stderr_handle = tokio::spawn(read_to_end(stderr));
360
361        let mut shutdown_rx = self.shutdown_rx.clone();
362        let status = tokio::select! {
363            status = child.wait() => status.context("Failed to wait for cargo build")?,
364            _ = shutdown_rx.changed() => {
365                self.kill_build_process(&mut child).await?;
366                let _ = stdout_handle.await;
367                let _ = stderr_handle.await;
368                anyhow::bail!("Build cancelled due to shutdown");
369            }
370        };
371
372        let stdout = stdout_handle
373            .await
374            .context("Failed to join cargo stdout task")??;
375        let stderr = stderr_handle
376            .await
377            .context("Failed to join cargo stderr task")??;
378
379        let elapsed = start.elapsed();
380
381        if !status.success() {
382            // Parse JSON output for errors
383            let stderr = String::from_utf8_lossy(&stderr);
384            let stdout = String::from_utf8_lossy(&stdout);
385
386            // Try to extract compiler errors from JSON
387            let mut error_lines = Vec::new();
388            for line in stdout.lines().chain(stderr.lines()) {
389                if let Ok(json) = serde_json::from_str::<serde_json::Value>(line)
390                    && json["reason"] == "compiler-message"
391                    && let Some(message) = json["message"]["rendered"].as_str()
392                {
393                    error_lines.push(message.to_string());
394                }
395            }
396
397            if error_lines.is_empty() {
398                error_lines.push(stderr.to_string());
399            }
400
401            anyhow::bail!("{}", error_lines.join("\n"));
402        }
403
404        println!(
405            "  {} Build succeeded in {:.1}s",
406            style("✓").green(),
407            elapsed.as_secs_f64()
408        );
409
410        // Load parameters via the injected callback, but race against cancellation
411        let loader = Arc::clone(&self.callbacks.param_loader);
412        let engine_dir = self.engine_dir.clone();
413        let mut cancel_rx = self.cancel_param_load_rx.clone();
414
415        let params = tokio::select! {
416            result = loader(engine_dir) => {
417                result.context("Failed to load parameters from rebuilt dylib")?
418            }
419            _ = cancel_rx.changed() => {
420                if *cancel_rx.borrow_and_update() {
421                    println!(
422                        "  {} Parameter extraction cancelled — superseded by newer change",
423                        style("⚠").yellow()
424                    );
425                    anyhow::bail!("Parameter extraction cancelled due to newer file change");
426                }
427                // If cancellation was reset to false, continue waiting
428                loader(self.engine_dir.clone())
429                    .await
430                    .context("Failed to load parameters from rebuilt dylib")?
431            }
432        };
433
434        let param_count_change = params.len() as i32 - old_count;
435
436        Ok((params, param_count_change))
437    }
438
439    async fn kill_build_process(&self, child: &mut tokio::process::Child) -> Result<()> {
440        #[cfg(unix)]
441        {
442            use nix::sys::signal::{Signal, kill};
443            use nix::unistd::Pid;
444
445            if let Some(pid) = child.id() {
446                let _ = kill(Pid::from_raw(-(pid as i32)), Signal::SIGTERM);
447            }
448        }
449
450        let _ = child.kill().await;
451        Ok(())
452    }
453}
454
455async fn read_to_end(mut reader: impl tokio::io::AsyncRead + Unpin) -> Result<Vec<u8>> {
456    let mut buffer = Vec::new();
457    reader
458        .read_to_end(&mut buffer)
459        .await
460        .context("Failed to read cargo output")?;
461    Ok(buffer)
462}
463
464fn panic_message(payload: Box<dyn Any + Send>) -> String {
465    if let Some(msg) = payload.downcast_ref::<String>() {
466        msg.clone()
467    } else if let Some(msg) = payload.downcast_ref::<&str>() {
468        msg.to_string()
469    } else {
470        "Unknown panic".to_string()
471    }
472}
473
474fn log_regeneration_failure(target: &str, error: &anyhow::Error) {
475    println!(
476        "  {} Failed to regenerate {}: {}",
477        style("⚠").yellow(),
478        target,
479        error
480    );
481    println!(
482        "  {} Save a Rust source file to retry, or restart the dev server",
483        style("  ↳").dim(),
484    );
485}