wavecraft_dev_server/reload/
rebuild.rs1use anyhow::{Context, Result};
11use console::style;
12use std::any::Any;
13use std::future::Future;
14use std::path::{Path, PathBuf};
15use std::pin::Pin;
16use std::process::Stdio;
17use std::sync::Arc;
18use tokio::io::AsyncReadExt;
19use tokio::process::Command;
20use tokio::sync::watch;
21use wavecraft_bridge::ParameterHost;
22use wavecraft_protocol::{ParameterInfo, ProcessorInfo};
23
24use crate::host::DevServerHost;
25use crate::reload::guard::BuildGuard;
26use crate::ws::WsServer;
27
28pub type ParamLoaderFn = Arc<
34 dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ParameterInfo>>> + Send>>
35 + Send
36 + Sync,
37>;
38
39pub type ProcessorLoaderFn = Arc<
41 dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ProcessorInfo>>> + Send>>
42 + Send
43 + Sync,
44>;
45
46pub type SidecarWriterFn = Arc<dyn Fn(&Path, &[ParameterInfo]) -> Result<()> + Send + Sync>;
48
49pub type TsTypesWriterFn = Arc<dyn Fn(&[ParameterInfo]) -> Result<()> + Send + Sync>;
51
52pub type ProcessorTsTypesWriterFn = Arc<dyn Fn(&[ProcessorInfo]) -> Result<()> + Send + Sync>;
54
55pub struct RebuildCallbacks {
62 pub package_name: Option<String>,
65 pub write_sidecar: Option<SidecarWriterFn>,
67 pub write_ts_types: Option<TsTypesWriterFn>,
69 pub write_processor_ts_types: Option<ProcessorTsTypesWriterFn>,
71 pub param_loader: ParamLoaderFn,
74 pub processor_loader: Option<ProcessorLoaderFn>,
76 pub additional_watch_paths: Vec<PathBuf>,
81}
82
83pub struct RebuildPipeline {
87 guard: Arc<BuildGuard>,
88 engine_dir: PathBuf,
89 host: Arc<DevServerHost>,
90 ws_server: Arc<WsServer<Arc<DevServerHost>>>,
91 shutdown_rx: watch::Receiver<bool>,
92 callbacks: RebuildCallbacks,
93 #[cfg(feature = "audio")]
94 audio_reload_tx: Option<tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>>,
95 cancel_param_load_tx: watch::Sender<bool>,
97 cancel_param_load_rx: watch::Receiver<bool>,
98}
99
100impl RebuildPipeline {
101 #[allow(clippy::too_many_arguments)]
103 pub fn new(
104 guard: Arc<BuildGuard>,
105 engine_dir: PathBuf,
106 host: Arc<DevServerHost>,
107 ws_server: Arc<WsServer<Arc<DevServerHost>>>,
108 shutdown_rx: watch::Receiver<bool>,
109 callbacks: RebuildCallbacks,
110 #[cfg(feature = "audio")] audio_reload_tx: Option<
111 tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>,
112 >,
113 ) -> Self {
114 let (cancel_param_load_tx, cancel_param_load_rx) = watch::channel(false);
115 Self {
116 guard,
117 engine_dir,
118 host,
119 ws_server,
120 shutdown_rx,
121 callbacks,
122 #[cfg(feature = "audio")]
123 audio_reload_tx,
124 cancel_param_load_tx,
125 cancel_param_load_rx,
126 }
127 }
128
129 pub async fn handle_change(&self) -> Result<()> {
131 if !self.guard.try_start() {
132 self.guard.mark_pending();
133 let _ = self.cancel_param_load_tx.send(true);
135 println!(
136 " {} Build already in progress, queuing rebuild...",
137 style("→").dim()
138 );
139 return Ok(());
140 }
141
142 let _ = self.cancel_param_load_tx.send(false);
144
145 loop {
146 let result = self.do_build().await;
147
148 match result {
149 Ok((params, param_count_change)) => {
150 let mut reload_ok = true;
151 let mut ts_types_ok = true;
152 let mut processor_types_ok = true;
153
154 let processors = if let Some(ref loader) = self.callbacks.processor_loader {
155 match loader(self.engine_dir.clone()).await {
156 Ok(processors) => Some(processors),
157 Err(e) => {
158 processor_types_ok = false;
159 println!(
160 " {} Failed to load processor metadata after rebuild: {}",
161 style("⚠").yellow(),
162 e
163 );
164 println!(
165 " {} Save a Rust source file to retry, or restart the dev server",
166 style(" ↳").dim(),
167 );
168 None
169 }
170 }
171 } else {
172 None
173 };
174
175 if let Some(ref writer) = self.callbacks.write_sidecar
176 && let Err(e) = writer(&self.engine_dir, ¶ms)
177 {
178 println!(" Warning: failed to update param cache: {}", e);
179 }
180
181 if let Some(ref writer) = self.callbacks.write_ts_types
182 && let Err(e) = writer(¶ms)
183 {
184 ts_types_ok = false;
185 log_regeneration_failure("TypeScript parameter types", &e);
186 }
187
188 if let (Some(processors), Some(writer)) = (
189 processors.as_deref(),
190 self.callbacks.write_processor_ts_types.as_ref(),
191 ) && let Err(e) = writer(processors)
192 {
193 processor_types_ok = false;
194 log_regeneration_failure("TypeScript processor types", &e);
195 }
196
197 println!(" {} Updating parameter host...", style("→").dim());
198 let replace_result =
199 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
200 self.host.replace_parameters(params.clone())
201 }));
202
203 match replace_result {
204 Ok(Ok(())) => {
205 println!(" {} Updated {} parameters", style("→").dim(), params.len());
206 }
207 Ok(Err(e)) => {
208 reload_ok = false;
209 println!(
210 " {} Failed to replace parameters: {:#}",
211 style("✗").red(),
212 e
213 );
214 }
215 Err(panic_payload) => {
216 reload_ok = false;
217 println!(
218 " {} Panic while replacing parameters: {}",
219 style("✗").red(),
220 panic_message(panic_payload)
221 );
222 }
223 }
224
225 if reload_ok {
226 println!(" {} Notifying UI clients...", style("→").dim());
227 let broadcast_result = tokio::spawn({
228 let ws_server = Arc::clone(&self.ws_server);
229 async move { ws_server.broadcast_parameters_changed().await }
230 })
231 .await;
232
233 match broadcast_result {
234 Ok(Ok(())) => {
235 println!(" {} UI notified", style("→").dim());
236 }
237 Ok(Err(e)) => {
238 reload_ok = false;
239 println!(
240 " {} Failed to notify UI clients: {:#}",
241 style("✗").red(),
242 e
243 );
244 }
245 Err(join_err) => {
246 reload_ok = false;
247 println!(
248 " {} Panic while notifying UI clients: {}",
249 style("✗").red(),
250 join_err
251 );
252 }
253 }
254 }
255
256 if reload_ok {
257 let change_info = if param_count_change > 0 {
258 format!(" (+{} new)", param_count_change)
259 } else if param_count_change < 0 {
260 format!(" ({} removed)", -param_count_change)
261 } else {
262 String::new()
263 };
264
265 let param_types_status = if ts_types_ok {
266 ""
267 } else {
268 " (⚠ TypeScript parameter types stale)"
269 };
270
271 let processor_types_status = if processor_types_ok {
272 ""
273 } else {
274 " (⚠ TypeScript processor types stale)"
275 };
276
277 println!(
278 " {} Hot-reload complete — {} parameters{}{}{}",
279 style("✓").green(),
280 params.len(),
281 change_info,
282 param_types_status,
283 processor_types_status,
284 );
285
286 #[cfg(feature = "audio")]
288 if let Some(ref tx) = self.audio_reload_tx {
289 let _ = tx.send(params);
290 }
291 } else {
292 println!(
293 " {} Hot-reload aborted — parameters not fully applied",
294 style("✗").red()
295 );
296 }
297 }
298 Err(e) => {
299 println!(" {} Build failed:\n{}", style("✗").red(), e);
300 }
302 }
303
304 if !self.guard.complete() {
305 break; }
307 println!(
308 " {} Pending changes detected, rebuilding...",
309 style("→").cyan()
310 );
311 }
312
313 Ok(())
314 }
315
316 async fn do_build(&self) -> Result<(Vec<ParameterInfo>, i32)> {
318 if *self.shutdown_rx.borrow() {
319 anyhow::bail!("Build cancelled due to shutdown");
320 }
321
322 println!(" {} Rebuilding plugin...", style("🔄").cyan());
323 let start = std::time::Instant::now();
324
325 let old_count = self.host.get_all_parameters().len() as i32;
327
328 let mut cmd = Command::new("cargo");
330 cmd.args([
331 "build",
332 "--lib",
333 "--features",
334 "_param-discovery",
335 "--message-format=json",
336 ]);
337
338 if let Some(ref package_name) = self.callbacks.package_name {
339 cmd.args(["--package", package_name]);
340 }
341
342 let mut child = cmd
343 .current_dir(&self.engine_dir)
344 .stdout(Stdio::piped())
345 .stderr(Stdio::piped())
346 .spawn()
347 .context("Failed to spawn cargo build")?;
348
349 let stdout = child
350 .stdout
351 .take()
352 .context("Failed to capture cargo stdout")?;
353 let stderr = child
354 .stderr
355 .take()
356 .context("Failed to capture cargo stderr")?;
357
358 let stdout_handle = tokio::spawn(read_to_end(stdout));
359 let stderr_handle = tokio::spawn(read_to_end(stderr));
360
361 let mut shutdown_rx = self.shutdown_rx.clone();
362 let status = tokio::select! {
363 status = child.wait() => status.context("Failed to wait for cargo build")?,
364 _ = shutdown_rx.changed() => {
365 self.kill_build_process(&mut child).await?;
366 let _ = stdout_handle.await;
367 let _ = stderr_handle.await;
368 anyhow::bail!("Build cancelled due to shutdown");
369 }
370 };
371
372 let stdout = stdout_handle
373 .await
374 .context("Failed to join cargo stdout task")??;
375 let stderr = stderr_handle
376 .await
377 .context("Failed to join cargo stderr task")??;
378
379 let elapsed = start.elapsed();
380
381 if !status.success() {
382 let stderr = String::from_utf8_lossy(&stderr);
384 let stdout = String::from_utf8_lossy(&stdout);
385
386 let mut error_lines = Vec::new();
388 for line in stdout.lines().chain(stderr.lines()) {
389 if let Ok(json) = serde_json::from_str::<serde_json::Value>(line)
390 && json["reason"] == "compiler-message"
391 && let Some(message) = json["message"]["rendered"].as_str()
392 {
393 error_lines.push(message.to_string());
394 }
395 }
396
397 if error_lines.is_empty() {
398 error_lines.push(stderr.to_string());
399 }
400
401 anyhow::bail!("{}", error_lines.join("\n"));
402 }
403
404 println!(
405 " {} Build succeeded in {:.1}s",
406 style("✓").green(),
407 elapsed.as_secs_f64()
408 );
409
410 let loader = Arc::clone(&self.callbacks.param_loader);
412 let engine_dir = self.engine_dir.clone();
413 let mut cancel_rx = self.cancel_param_load_rx.clone();
414
415 let params = tokio::select! {
416 result = loader(engine_dir) => {
417 result.context("Failed to load parameters from rebuilt dylib")?
418 }
419 _ = cancel_rx.changed() => {
420 if *cancel_rx.borrow_and_update() {
421 println!(
422 " {} Parameter extraction cancelled — superseded by newer change",
423 style("⚠").yellow()
424 );
425 anyhow::bail!("Parameter extraction cancelled due to newer file change");
426 }
427 loader(self.engine_dir.clone())
429 .await
430 .context("Failed to load parameters from rebuilt dylib")?
431 }
432 };
433
434 let param_count_change = params.len() as i32 - old_count;
435
436 Ok((params, param_count_change))
437 }
438
439 async fn kill_build_process(&self, child: &mut tokio::process::Child) -> Result<()> {
440 #[cfg(unix)]
441 {
442 use nix::sys::signal::{Signal, kill};
443 use nix::unistd::Pid;
444
445 if let Some(pid) = child.id() {
446 let _ = kill(Pid::from_raw(-(pid as i32)), Signal::SIGTERM);
447 }
448 }
449
450 let _ = child.kill().await;
451 Ok(())
452 }
453}
454
455async fn read_to_end(mut reader: impl tokio::io::AsyncRead + Unpin) -> Result<Vec<u8>> {
456 let mut buffer = Vec::new();
457 reader
458 .read_to_end(&mut buffer)
459 .await
460 .context("Failed to read cargo output")?;
461 Ok(buffer)
462}
463
464fn panic_message(payload: Box<dyn Any + Send>) -> String {
465 if let Some(msg) = payload.downcast_ref::<String>() {
466 msg.clone()
467 } else if let Some(msg) = payload.downcast_ref::<&str>() {
468 msg.to_string()
469 } else {
470 "Unknown panic".to_string()
471 }
472}
473
474fn log_regeneration_failure(target: &str, error: &anyhow::Error) {
475 println!(
476 " {} Failed to regenerate {}: {}",
477 style("⚠").yellow(),
478 target,
479 error
480 );
481 println!(
482 " {} Save a Rust source file to retry, or restart the dev server",
483 style(" ↳").dim(),
484 );
485}