wavecraft_dev_server/reload/
rebuild.rs1use anyhow::{Context, Result};
11use console::style;
12use std::any::Any;
13use std::future::Future;
14use std::path::{Path, PathBuf};
15use std::pin::Pin;
16use std::process::Stdio;
17use std::sync::Arc;
18use tokio::io::AsyncReadExt;
19use tokio::process::Command;
20use tokio::sync::watch;
21use wavecraft_bridge::ParameterHost;
22use wavecraft_protocol::{ParameterInfo, ProcessorInfo};
23
24use crate::host::DevServerHost;
25use crate::reload::guard::BuildGuard;
26use crate::ws::WsServer;
27
28pub type ParamLoaderFn = Arc<
34 dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ParameterInfo>>> + Send>>
35 + Send
36 + Sync,
37>;
38
39pub type ProcessorLoaderFn = Arc<
41 dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ProcessorInfo>>> + Send>>
42 + Send
43 + Sync,
44>;
45
46pub type SidecarWriterFn = Arc<dyn Fn(&Path, &[ParameterInfo]) -> Result<()> + Send + Sync>;
48
49pub type TsTypesWriterFn = Arc<dyn Fn(&[ParameterInfo]) -> Result<()> + Send + Sync>;
51
52pub type ProcessorTsTypesWriterFn = Arc<dyn Fn(&[ProcessorInfo]) -> Result<()> + Send + Sync>;
54
55pub struct RebuildCallbacks {
62 pub package_name: Option<String>,
65 pub write_sidecar: Option<SidecarWriterFn>,
67 pub write_ts_types: Option<TsTypesWriterFn>,
69 pub write_processor_ts_types: Option<ProcessorTsTypesWriterFn>,
71 pub param_loader: ParamLoaderFn,
74 pub processor_loader: Option<ProcessorLoaderFn>,
76}
77
78pub struct RebuildPipeline {
82 guard: Arc<BuildGuard>,
83 engine_dir: PathBuf,
84 host: Arc<DevServerHost>,
85 ws_server: Arc<WsServer<Arc<DevServerHost>>>,
86 shutdown_rx: watch::Receiver<bool>,
87 callbacks: RebuildCallbacks,
88 #[cfg(feature = "audio")]
89 audio_reload_tx: Option<tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>>,
90 cancel_param_load_tx: watch::Sender<bool>,
92 cancel_param_load_rx: watch::Receiver<bool>,
93}
94
95impl RebuildPipeline {
96 #[allow(clippy::too_many_arguments)]
98 pub fn new(
99 guard: Arc<BuildGuard>,
100 engine_dir: PathBuf,
101 host: Arc<DevServerHost>,
102 ws_server: Arc<WsServer<Arc<DevServerHost>>>,
103 shutdown_rx: watch::Receiver<bool>,
104 callbacks: RebuildCallbacks,
105 #[cfg(feature = "audio")] audio_reload_tx: Option<
106 tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>,
107 >,
108 ) -> Self {
109 let (cancel_param_load_tx, cancel_param_load_rx) = watch::channel(false);
110 Self {
111 guard,
112 engine_dir,
113 host,
114 ws_server,
115 shutdown_rx,
116 callbacks,
117 #[cfg(feature = "audio")]
118 audio_reload_tx,
119 cancel_param_load_tx,
120 cancel_param_load_rx,
121 }
122 }
123
124 pub async fn handle_change(&self) -> Result<()> {
126 if !self.guard.try_start() {
127 self.guard.mark_pending();
128 let _ = self.cancel_param_load_tx.send(true);
130 println!(
131 " {} Build already in progress, queuing rebuild...",
132 style("→").dim()
133 );
134 return Ok(());
135 }
136
137 let _ = self.cancel_param_load_tx.send(false);
139
140 loop {
141 let result = self.do_build().await;
142
143 match result {
144 Ok((params, param_count_change)) => {
145 let mut reload_ok = true;
146 let mut ts_types_ok = true;
147 let mut processor_types_ok = true;
148
149 let processors = if let Some(ref loader) = self.callbacks.processor_loader {
150 match loader(self.engine_dir.clone()).await {
151 Ok(processors) => Some(processors),
152 Err(e) => {
153 processor_types_ok = false;
154 println!(
155 " {} Failed to load processor metadata after rebuild: {}",
156 style("⚠").yellow(),
157 e
158 );
159 println!(
160 " {} Save a Rust source file to retry, or restart the dev server",
161 style(" ↳").dim(),
162 );
163 None
164 }
165 }
166 } else {
167 None
168 };
169
170 if let Some(ref writer) = self.callbacks.write_sidecar
171 && let Err(e) = writer(&self.engine_dir, ¶ms)
172 {
173 println!(" Warning: failed to update param cache: {}", e);
174 }
175
176 if let Some(ref writer) = self.callbacks.write_ts_types
177 && let Err(e) = writer(¶ms)
178 {
179 ts_types_ok = false;
180 println!(
181 " {} Failed to regenerate TypeScript parameter types: {}",
182 style("⚠").yellow(),
183 e
184 );
185 println!(
186 " {} Save a Rust source file to retry, or restart the dev server",
187 style(" ↳").dim(),
188 );
189 }
190
191 if let (Some(processors), Some(writer)) = (
192 processors.as_deref(),
193 self.callbacks.write_processor_ts_types.as_ref(),
194 ) && let Err(e) = writer(processors)
195 {
196 processor_types_ok = false;
197 println!(
198 " {} Failed to regenerate TypeScript processor types: {}",
199 style("⚠").yellow(),
200 e
201 );
202 println!(
203 " {} Save a Rust source file to retry, or restart the dev server",
204 style(" ↳").dim(),
205 );
206 }
207
208 println!(" {} Updating parameter host...", style("→").dim());
209 let replace_result =
210 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
211 self.host.replace_parameters(params.clone())
212 }));
213
214 match replace_result {
215 Ok(Ok(())) => {
216 println!(" {} Updated {} parameters", style("→").dim(), params.len());
217 }
218 Ok(Err(e)) => {
219 reload_ok = false;
220 println!(
221 " {} Failed to replace parameters: {:#}",
222 style("✗").red(),
223 e
224 );
225 }
226 Err(panic_payload) => {
227 reload_ok = false;
228 println!(
229 " {} Panic while replacing parameters: {}",
230 style("✗").red(),
231 panic_message(panic_payload)
232 );
233 }
234 }
235
236 if reload_ok {
237 println!(" {} Notifying UI clients...", style("→").dim());
238 let broadcast_result = tokio::spawn({
239 let ws_server = Arc::clone(&self.ws_server);
240 async move { ws_server.broadcast_parameters_changed().await }
241 })
242 .await;
243
244 match broadcast_result {
245 Ok(Ok(())) => {
246 println!(" {} UI notified", style("→").dim());
247 }
248 Ok(Err(e)) => {
249 reload_ok = false;
250 println!(
251 " {} Failed to notify UI clients: {:#}",
252 style("✗").red(),
253 e
254 );
255 }
256 Err(join_err) => {
257 reload_ok = false;
258 println!(
259 " {} Panic while notifying UI clients: {}",
260 style("✗").red(),
261 join_err
262 );
263 }
264 }
265 }
266
267 if reload_ok {
268 let change_info = if param_count_change > 0 {
269 format!(" (+{} new)", param_count_change)
270 } else if param_count_change < 0 {
271 format!(" ({} removed)", -param_count_change)
272 } else {
273 String::new()
274 };
275
276 let param_types_status = if ts_types_ok {
277 ""
278 } else {
279 " (⚠ TypeScript parameter types stale)"
280 };
281
282 let processor_types_status = if processor_types_ok {
283 ""
284 } else {
285 " (⚠ TypeScript processor types stale)"
286 };
287
288 println!(
289 " {} Hot-reload complete — {} parameters{}{}{}",
290 style("✓").green(),
291 params.len(),
292 change_info,
293 param_types_status,
294 processor_types_status,
295 );
296
297 #[cfg(feature = "audio")]
299 if let Some(ref tx) = self.audio_reload_tx {
300 let _ = tx.send(params);
301 }
302 } else {
303 println!(
304 " {} Hot-reload aborted — parameters not fully applied",
305 style("✗").red()
306 );
307 }
308 }
309 Err(e) => {
310 println!(" {} Build failed:\n{}", style("✗").red(), e);
311 }
313 }
314
315 if !self.guard.complete() {
316 break; }
318 println!(
319 " {} Pending changes detected, rebuilding...",
320 style("→").cyan()
321 );
322 }
323
324 Ok(())
325 }
326
327 async fn do_build(&self) -> Result<(Vec<ParameterInfo>, i32)> {
329 if *self.shutdown_rx.borrow() {
330 anyhow::bail!("Build cancelled due to shutdown");
331 }
332
333 println!(" {} Rebuilding plugin...", style("🔄").cyan());
334 let start = std::time::Instant::now();
335
336 let old_count = self.host.get_all_parameters().len() as i32;
338
339 let mut cmd = Command::new("cargo");
341 cmd.args([
342 "build",
343 "--lib",
344 "--features",
345 "_param-discovery",
346 "--message-format=json",
347 ]);
348
349 if let Some(ref package_name) = self.callbacks.package_name {
350 cmd.args(["--package", package_name]);
351 }
352
353 let mut child = cmd
354 .current_dir(&self.engine_dir)
355 .stdout(Stdio::piped())
356 .stderr(Stdio::piped())
357 .spawn()
358 .context("Failed to spawn cargo build")?;
359
360 let stdout = child
361 .stdout
362 .take()
363 .context("Failed to capture cargo stdout")?;
364 let stderr = child
365 .stderr
366 .take()
367 .context("Failed to capture cargo stderr")?;
368
369 let stdout_handle = tokio::spawn(read_to_end(stdout));
370 let stderr_handle = tokio::spawn(read_to_end(stderr));
371
372 let mut shutdown_rx = self.shutdown_rx.clone();
373 let status = tokio::select! {
374 status = child.wait() => status.context("Failed to wait for cargo build")?,
375 _ = shutdown_rx.changed() => {
376 self.kill_build_process(&mut child).await?;
377 let _ = stdout_handle.await;
378 let _ = stderr_handle.await;
379 anyhow::bail!("Build cancelled due to shutdown");
380 }
381 };
382
383 let stdout = stdout_handle
384 .await
385 .context("Failed to join cargo stdout task")??;
386 let stderr = stderr_handle
387 .await
388 .context("Failed to join cargo stderr task")??;
389
390 let elapsed = start.elapsed();
391
392 if !status.success() {
393 let stderr = String::from_utf8_lossy(&stderr);
395 let stdout = String::from_utf8_lossy(&stdout);
396
397 let mut error_lines = Vec::new();
399 for line in stdout.lines().chain(stderr.lines()) {
400 if let Ok(json) = serde_json::from_str::<serde_json::Value>(line)
401 && json["reason"] == "compiler-message"
402 && let Some(message) = json["message"]["rendered"].as_str()
403 {
404 error_lines.push(message.to_string());
405 }
406 }
407
408 if error_lines.is_empty() {
409 error_lines.push(stderr.to_string());
410 }
411
412 anyhow::bail!("{}", error_lines.join("\n"));
413 }
414
415 println!(
416 " {} Build succeeded in {:.1}s",
417 style("✓").green(),
418 elapsed.as_secs_f64()
419 );
420
421 let loader = Arc::clone(&self.callbacks.param_loader);
423 let engine_dir = self.engine_dir.clone();
424 let mut cancel_rx = self.cancel_param_load_rx.clone();
425
426 let params = tokio::select! {
427 result = loader(engine_dir) => {
428 result.context("Failed to load parameters from rebuilt dylib")?
429 }
430 _ = cancel_rx.changed() => {
431 if *cancel_rx.borrow_and_update() {
432 println!(
433 " {} Parameter extraction cancelled — superseded by newer change",
434 style("⚠").yellow()
435 );
436 anyhow::bail!("Parameter extraction cancelled due to newer file change");
437 }
438 loader(self.engine_dir.clone())
440 .await
441 .context("Failed to load parameters from rebuilt dylib")?
442 }
443 };
444
445 let param_count_change = params.len() as i32 - old_count;
446
447 Ok((params, param_count_change))
448 }
449
450 async fn kill_build_process(&self, child: &mut tokio::process::Child) -> Result<()> {
451 #[cfg(unix)]
452 {
453 use nix::sys::signal::{Signal, kill};
454 use nix::unistd::Pid;
455
456 if let Some(pid) = child.id() {
457 let _ = kill(Pid::from_raw(-(pid as i32)), Signal::SIGTERM);
458 }
459 }
460
461 let _ = child.kill().await;
462 Ok(())
463 }
464}
465
466async fn read_to_end(mut reader: impl tokio::io::AsyncRead + Unpin) -> Result<Vec<u8>> {
467 let mut buffer = Vec::new();
468 reader
469 .read_to_end(&mut buffer)
470 .await
471 .context("Failed to read cargo output")?;
472 Ok(buffer)
473}
474
475fn panic_message(payload: Box<dyn Any + Send>) -> String {
476 if let Some(msg) = payload.downcast_ref::<String>() {
477 msg.clone()
478 } else if let Some(msg) = payload.downcast_ref::<&str>() {
479 msg.to_string()
480 } else {
481 "Unknown panic".to_string()
482 }
483}