Skip to main content

wavecraft_dev_server/reload/
rebuild.rs

1//! Hot-reload rebuild pipeline
2//!
3//! This module provides automatic rebuilding when Rust source files change.
4//! The pipeline watches for file changes, triggers a Cargo build, and updates
5//! the development server's parameter state without dropping WebSocket connections.
6//!
7//! CLI-specific functions (dylib discovery, subprocess param extraction, sidecar
8//! caching) are injected via [`RebuildCallbacks`] to keep this crate CLI-agnostic.
9
10use anyhow::{Context, Result};
11use console::style;
12use std::any::Any;
13use std::future::Future;
14use std::path::{Path, PathBuf};
15use std::pin::Pin;
16use std::process::Stdio;
17use std::sync::Arc;
18use tokio::io::AsyncReadExt;
19use tokio::process::Command;
20use tokio::sync::watch;
21use wavecraft_bridge::ParameterHost;
22use wavecraft_protocol::{ParameterInfo, ProcessorInfo};
23
24use crate::host::DevServerHost;
25use crate::reload::guard::BuildGuard;
26use crate::ws::WsServer;
27
28/// Callback type for loading parameters from a rebuilt dylib.
29///
30/// The closure receives the engine directory and must return the loaded
31/// parameters. This is injected by the CLI to handle dylib discovery,
32/// temp copying, and subprocess extraction.
33pub type ParamLoaderFn = Arc<
34    dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ParameterInfo>>> + Send>>
35        + Send
36        + Sync,
37>;
38
39/// Callback type for loading processors from a rebuilt dylib.
40pub type ProcessorLoaderFn = Arc<
41    dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ProcessorInfo>>> + Send>>
42        + Send
43        + Sync,
44>;
45
46/// Callback type for writing parameter cache to a sidecar file.
47pub type SidecarWriterFn = Arc<dyn Fn(&Path, &[ParameterInfo]) -> Result<()> + Send + Sync>;
48
49/// Callback type for writing generated TypeScript parameter ID types.
50pub type TsTypesWriterFn = Arc<dyn Fn(&[ParameterInfo]) -> Result<()> + Send + Sync>;
51
52/// Callback type for writing generated TypeScript processor ID types.
53pub type ProcessorTsTypesWriterFn = Arc<dyn Fn(&[ProcessorInfo]) -> Result<()> + Send + Sync>;
54
55/// Callbacks for CLI-specific operations.
56///
57/// The rebuild pipeline needs to perform operations that depend on CLI
58/// infrastructure (dylib discovery, subprocess parameter extraction,
59/// sidecar caching). These are injected as callbacks to keep the
60/// dev-server crate independent of CLI internals.
61pub struct RebuildCallbacks {
62    /// Engine package name for `cargo build --package` flag.
63    /// `None` means no `--package` flag (single-crate project).
64    pub package_name: Option<String>,
65    /// Optional sidecar cache writer (writes params to JSON file).
66    pub write_sidecar: Option<SidecarWriterFn>,
67    /// Optional TypeScript types writer (writes generated parameter ID typings).
68    pub write_ts_types: Option<TsTypesWriterFn>,
69    /// Optional TypeScript types writer for processor IDs.
70    pub write_processor_ts_types: Option<ProcessorTsTypesWriterFn>,
71    /// Loads parameters from the rebuilt dylib (async).
72    /// Receives the engine directory and returns parsed parameters.
73    pub param_loader: ParamLoaderFn,
74    /// Loads processors from the rebuilt dylib (async).
75    pub processor_loader: Option<ProcessorLoaderFn>,
76}
77
78/// Rebuild pipeline for hot-reload.
79///
80/// Coordinates Cargo builds, parameter reloading, and WebSocket notifications.
81pub struct RebuildPipeline {
82    guard: Arc<BuildGuard>,
83    engine_dir: PathBuf,
84    host: Arc<DevServerHost>,
85    ws_server: Arc<WsServer<Arc<DevServerHost>>>,
86    shutdown_rx: watch::Receiver<bool>,
87    callbacks: RebuildCallbacks,
88    #[cfg(feature = "audio")]
89    audio_reload_tx: Option<tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>>,
90    /// Channel for canceling the current parameter extraction when superseded.
91    cancel_param_load_tx: watch::Sender<bool>,
92    cancel_param_load_rx: watch::Receiver<bool>,
93}
94
95impl RebuildPipeline {
96    /// Create a new rebuild pipeline.
97    #[allow(clippy::too_many_arguments)]
98    pub fn new(
99        guard: Arc<BuildGuard>,
100        engine_dir: PathBuf,
101        host: Arc<DevServerHost>,
102        ws_server: Arc<WsServer<Arc<DevServerHost>>>,
103        shutdown_rx: watch::Receiver<bool>,
104        callbacks: RebuildCallbacks,
105        #[cfg(feature = "audio")] audio_reload_tx: Option<
106            tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>,
107        >,
108    ) -> Self {
109        let (cancel_param_load_tx, cancel_param_load_rx) = watch::channel(false);
110        Self {
111            guard,
112            engine_dir,
113            host,
114            ws_server,
115            shutdown_rx,
116            callbacks,
117            #[cfg(feature = "audio")]
118            audio_reload_tx,
119            cancel_param_load_tx,
120            cancel_param_load_rx,
121        }
122    }
123
124    /// Handle a file change event. Triggers rebuild if not already running.
125    pub async fn handle_change(&self) -> Result<()> {
126        if !self.guard.try_start() {
127            self.guard.mark_pending();
128            // Cancel any ongoing parameter extraction - it will be superseded
129            let _ = self.cancel_param_load_tx.send(true);
130            println!(
131                "  {} Build already in progress, queuing rebuild...",
132                style("→").dim()
133            );
134            return Ok(());
135        }
136
137        // Reset cancellation flag at the start of a new build cycle
138        let _ = self.cancel_param_load_tx.send(false);
139
140        loop {
141            let result = self.do_build().await;
142
143            match result {
144                Ok((params, param_count_change)) => {
145                    let mut reload_ok = true;
146                    let mut ts_types_ok = true;
147                    let mut processor_types_ok = true;
148
149                    let processors = if let Some(ref loader) = self.callbacks.processor_loader {
150                        match loader(self.engine_dir.clone()).await {
151                            Ok(processors) => Some(processors),
152                            Err(e) => {
153                                processor_types_ok = false;
154                                println!(
155                                    "  {} Failed to load processor metadata after rebuild: {}",
156                                    style("⚠").yellow(),
157                                    e
158                                );
159                                println!(
160                                    "  {} Save a Rust source file to retry, or restart the dev server",
161                                    style("  ↳").dim(),
162                                );
163                                None
164                            }
165                        }
166                    } else {
167                        None
168                    };
169
170                    if let Some(ref writer) = self.callbacks.write_sidecar
171                        && let Err(e) = writer(&self.engine_dir, &params)
172                    {
173                        println!("  Warning: failed to update param cache: {}", e);
174                    }
175
176                    if let Some(ref writer) = self.callbacks.write_ts_types
177                        && let Err(e) = writer(&params)
178                    {
179                        ts_types_ok = false;
180                        println!(
181                            "  {} Failed to regenerate TypeScript parameter types: {}",
182                            style("⚠").yellow(),
183                            e
184                        );
185                        println!(
186                            "  {} Save a Rust source file to retry, or restart the dev server",
187                            style("  ↳").dim(),
188                        );
189                    }
190
191                    if let (Some(processors), Some(writer)) = (
192                        processors.as_deref(),
193                        self.callbacks.write_processor_ts_types.as_ref(),
194                    ) && let Err(e) = writer(processors)
195                    {
196                        processor_types_ok = false;
197                        println!(
198                            "  {} Failed to regenerate TypeScript processor types: {}",
199                            style("⚠").yellow(),
200                            e
201                        );
202                        println!(
203                            "  {} Save a Rust source file to retry, or restart the dev server",
204                            style("  ↳").dim(),
205                        );
206                    }
207
208                    println!("  {} Updating parameter host...", style("→").dim());
209                    let replace_result =
210                        std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
211                            self.host.replace_parameters(params.clone())
212                        }));
213
214                    match replace_result {
215                        Ok(Ok(())) => {
216                            println!("  {} Updated {} parameters", style("→").dim(), params.len());
217                        }
218                        Ok(Err(e)) => {
219                            reload_ok = false;
220                            println!(
221                                "  {} Failed to replace parameters: {:#}",
222                                style("✗").red(),
223                                e
224                            );
225                        }
226                        Err(panic_payload) => {
227                            reload_ok = false;
228                            println!(
229                                "  {} Panic while replacing parameters: {}",
230                                style("✗").red(),
231                                panic_message(panic_payload)
232                            );
233                        }
234                    }
235
236                    if reload_ok {
237                        println!("  {} Notifying UI clients...", style("→").dim());
238                        let broadcast_result = tokio::spawn({
239                            let ws_server = Arc::clone(&self.ws_server);
240                            async move { ws_server.broadcast_parameters_changed().await }
241                        })
242                        .await;
243
244                        match broadcast_result {
245                            Ok(Ok(())) => {
246                                println!("  {} UI notified", style("→").dim());
247                            }
248                            Ok(Err(e)) => {
249                                reload_ok = false;
250                                println!(
251                                    "  {} Failed to notify UI clients: {:#}",
252                                    style("✗").red(),
253                                    e
254                                );
255                            }
256                            Err(join_err) => {
257                                reload_ok = false;
258                                println!(
259                                    "  {} Panic while notifying UI clients: {}",
260                                    style("✗").red(),
261                                    join_err
262                                );
263                            }
264                        }
265                    }
266
267                    if reload_ok {
268                        let change_info = if param_count_change > 0 {
269                            format!(" (+{} new)", param_count_change)
270                        } else if param_count_change < 0 {
271                            format!(" ({} removed)", -param_count_change)
272                        } else {
273                            String::new()
274                        };
275
276                        let param_types_status = if ts_types_ok {
277                            ""
278                        } else {
279                            " (⚠ TypeScript parameter types stale)"
280                        };
281
282                        let processor_types_status = if processor_types_ok {
283                            ""
284                        } else {
285                            " (⚠ TypeScript processor types stale)"
286                        };
287
288                        println!(
289                            "  {} Hot-reload complete — {} parameters{}{}{}",
290                            style("✓").green(),
291                            params.len(),
292                            change_info,
293                            param_types_status,
294                            processor_types_status,
295                        );
296
297                        // Trigger audio reload if audio is enabled
298                        #[cfg(feature = "audio")]
299                        if let Some(ref tx) = self.audio_reload_tx {
300                            let _ = tx.send(params);
301                        }
302                    } else {
303                        println!(
304                            "  {} Hot-reload aborted — parameters not fully applied",
305                            style("✗").red()
306                        );
307                    }
308                }
309                Err(e) => {
310                    println!("  {} Build failed:\n{}", style("✗").red(), e);
311                    // Preserve old state, don't update parameters
312                }
313            }
314
315            if !self.guard.complete() {
316                break; // No pending rebuild
317            }
318            println!(
319                "  {} Pending changes detected, rebuilding...",
320                style("→").cyan()
321            );
322        }
323
324        Ok(())
325    }
326
327    /// Execute a Cargo build and load new parameters on success.
328    async fn do_build(&self) -> Result<(Vec<ParameterInfo>, i32)> {
329        if *self.shutdown_rx.borrow() {
330            anyhow::bail!("Build cancelled due to shutdown");
331        }
332
333        println!("  {} Rebuilding plugin...", style("🔄").cyan());
334        let start = std::time::Instant::now();
335
336        // Get old parameter count for change reporting
337        let old_count = self.host.get_all_parameters().len() as i32;
338
339        // Build command with optional --package flag
340        let mut cmd = Command::new("cargo");
341        cmd.args([
342            "build",
343            "--lib",
344            "--features",
345            "_param-discovery",
346            "--message-format=json",
347        ]);
348
349        if let Some(ref package_name) = self.callbacks.package_name {
350            cmd.args(["--package", package_name]);
351        }
352
353        let mut child = cmd
354            .current_dir(&self.engine_dir)
355            .stdout(Stdio::piped())
356            .stderr(Stdio::piped())
357            .spawn()
358            .context("Failed to spawn cargo build")?;
359
360        let stdout = child
361            .stdout
362            .take()
363            .context("Failed to capture cargo stdout")?;
364        let stderr = child
365            .stderr
366            .take()
367            .context("Failed to capture cargo stderr")?;
368
369        let stdout_handle = tokio::spawn(read_to_end(stdout));
370        let stderr_handle = tokio::spawn(read_to_end(stderr));
371
372        let mut shutdown_rx = self.shutdown_rx.clone();
373        let status = tokio::select! {
374            status = child.wait() => status.context("Failed to wait for cargo build")?,
375            _ = shutdown_rx.changed() => {
376                self.kill_build_process(&mut child).await?;
377                let _ = stdout_handle.await;
378                let _ = stderr_handle.await;
379                anyhow::bail!("Build cancelled due to shutdown");
380            }
381        };
382
383        let stdout = stdout_handle
384            .await
385            .context("Failed to join cargo stdout task")??;
386        let stderr = stderr_handle
387            .await
388            .context("Failed to join cargo stderr task")??;
389
390        let elapsed = start.elapsed();
391
392        if !status.success() {
393            // Parse JSON output for errors
394            let stderr = String::from_utf8_lossy(&stderr);
395            let stdout = String::from_utf8_lossy(&stdout);
396
397            // Try to extract compiler errors from JSON
398            let mut error_lines = Vec::new();
399            for line in stdout.lines().chain(stderr.lines()) {
400                if let Ok(json) = serde_json::from_str::<serde_json::Value>(line)
401                    && json["reason"] == "compiler-message"
402                    && let Some(message) = json["message"]["rendered"].as_str()
403                {
404                    error_lines.push(message.to_string());
405                }
406            }
407
408            if error_lines.is_empty() {
409                error_lines.push(stderr.to_string());
410            }
411
412            anyhow::bail!("{}", error_lines.join("\n"));
413        }
414
415        println!(
416            "  {} Build succeeded in {:.1}s",
417            style("✓").green(),
418            elapsed.as_secs_f64()
419        );
420
421        // Load parameters via the injected callback, but race against cancellation
422        let loader = Arc::clone(&self.callbacks.param_loader);
423        let engine_dir = self.engine_dir.clone();
424        let mut cancel_rx = self.cancel_param_load_rx.clone();
425
426        let params = tokio::select! {
427            result = loader(engine_dir) => {
428                result.context("Failed to load parameters from rebuilt dylib")?
429            }
430            _ = cancel_rx.changed() => {
431                if *cancel_rx.borrow_and_update() {
432                    println!(
433                        "  {} Parameter extraction cancelled — superseded by newer change",
434                        style("⚠").yellow()
435                    );
436                    anyhow::bail!("Parameter extraction cancelled due to newer file change");
437                }
438                // If cancellation was reset to false, continue waiting
439                loader(self.engine_dir.clone())
440                    .await
441                    .context("Failed to load parameters from rebuilt dylib")?
442            }
443        };
444
445        let param_count_change = params.len() as i32 - old_count;
446
447        Ok((params, param_count_change))
448    }
449
450    async fn kill_build_process(&self, child: &mut tokio::process::Child) -> Result<()> {
451        #[cfg(unix)]
452        {
453            use nix::sys::signal::{Signal, kill};
454            use nix::unistd::Pid;
455
456            if let Some(pid) = child.id() {
457                let _ = kill(Pid::from_raw(-(pid as i32)), Signal::SIGTERM);
458            }
459        }
460
461        let _ = child.kill().await;
462        Ok(())
463    }
464}
465
466async fn read_to_end(mut reader: impl tokio::io::AsyncRead + Unpin) -> Result<Vec<u8>> {
467    let mut buffer = Vec::new();
468    reader
469        .read_to_end(&mut buffer)
470        .await
471        .context("Failed to read cargo output")?;
472    Ok(buffer)
473}
474
475fn panic_message(payload: Box<dyn Any + Send>) -> String {
476    if let Some(msg) = payload.downcast_ref::<String>() {
477        msg.clone()
478    } else if let Some(msg) = payload.downcast_ref::<&str>() {
479        msg.to_string()
480    } else {
481        "Unknown panic".to_string()
482    }
483}