wavecraft_dev_server/reload/
rebuild.rs1use anyhow::{Context, Result};
11use console::style;
12use std::any::Any;
13use std::future::Future;
14use std::path::{Path, PathBuf};
15use std::pin::Pin;
16use std::process::Stdio;
17use std::sync::Arc;
18use tokio::io::AsyncReadExt;
19use tokio::process::Command;
20use tokio::sync::watch;
21use wavecraft_bridge::ParameterHost;
22use wavecraft_protocol::{ParameterInfo, ProcessorInfo};
23
24use crate::host::DevServerHost;
25use crate::reload::guard::BuildGuard;
26use crate::ws::WsServer;
27
28pub type ParamLoaderFn = Arc<
34 dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ParameterInfo>>> + Send>>
35 + Send
36 + Sync,
37>;
38
39pub type ProcessorLoaderFn = Arc<
41 dyn Fn(PathBuf) -> Pin<Box<dyn Future<Output = Result<Vec<ProcessorInfo>>> + Send>>
42 + Send
43 + Sync,
44>;
45
46pub type SidecarWriterFn = Arc<dyn Fn(&Path, &[ParameterInfo]) -> Result<()> + Send + Sync>;
48
49pub type TsTypesWriterFn = Arc<dyn Fn(&[ParameterInfo]) -> Result<()> + Send + Sync>;
51
52pub type ProcessorTsTypesWriterFn = Arc<dyn Fn(&[ProcessorInfo]) -> Result<()> + Send + Sync>;
54
55pub struct RebuildCallbacks {
62 pub package_name: Option<String>,
65 pub write_sidecar: Option<SidecarWriterFn>,
67 pub write_ts_types: Option<TsTypesWriterFn>,
69 pub write_processor_ts_types: Option<ProcessorTsTypesWriterFn>,
71 pub param_loader: ParamLoaderFn,
74 pub processor_loader: Option<ProcessorLoaderFn>,
76}
77
78pub struct RebuildPipeline {
82 guard: Arc<BuildGuard>,
83 engine_dir: PathBuf,
84 host: Arc<DevServerHost>,
85 ws_server: Arc<WsServer<Arc<DevServerHost>>>,
86 shutdown_rx: watch::Receiver<bool>,
87 callbacks: RebuildCallbacks,
88 #[cfg(feature = "audio")]
89 audio_reload_tx: Option<tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>>,
90 cancel_param_load_tx: watch::Sender<bool>,
92 cancel_param_load_rx: watch::Receiver<bool>,
93}
94
95impl RebuildPipeline {
96 #[allow(clippy::too_many_arguments)]
98 pub fn new(
99 guard: Arc<BuildGuard>,
100 engine_dir: PathBuf,
101 host: Arc<DevServerHost>,
102 ws_server: Arc<WsServer<Arc<DevServerHost>>>,
103 shutdown_rx: watch::Receiver<bool>,
104 callbacks: RebuildCallbacks,
105 #[cfg(feature = "audio")] audio_reload_tx: Option<
106 tokio::sync::mpsc::UnboundedSender<Vec<ParameterInfo>>,
107 >,
108 ) -> Self {
109 let (cancel_param_load_tx, cancel_param_load_rx) = watch::channel(false);
110 Self {
111 guard,
112 engine_dir,
113 host,
114 ws_server,
115 shutdown_rx,
116 callbacks,
117 #[cfg(feature = "audio")]
118 audio_reload_tx,
119 cancel_param_load_tx,
120 cancel_param_load_rx,
121 }
122 }
123
124 pub async fn handle_change(&self) -> Result<()> {
126 if !self.guard.try_start() {
127 self.guard.mark_pending();
128 let _ = self.cancel_param_load_tx.send(true);
130 println!(
131 " {} Build already in progress, queuing rebuild...",
132 style("→").dim()
133 );
134 return Ok(());
135 }
136
137 let _ = self.cancel_param_load_tx.send(false);
139
140 loop {
141 let result = self.do_build().await;
142
143 match result {
144 Ok((params, param_count_change)) => {
145 let mut reload_ok = true;
146 let mut ts_types_ok = true;
147 let mut processor_types_ok = true;
148
149 let processors = if let Some(ref loader) = self.callbacks.processor_loader {
150 match loader(self.engine_dir.clone()).await {
151 Ok(processors) => Some(processors),
152 Err(e) => {
153 processor_types_ok = false;
154 println!(
155 " {} Failed to load processor metadata after rebuild: {}",
156 style("⚠").yellow(),
157 e
158 );
159 println!(
160 " {} Save a Rust source file to retry, or restart the dev server",
161 style(" ↳").dim(),
162 );
163 None
164 }
165 }
166 } else {
167 None
168 };
169
170 if let Some(ref writer) = self.callbacks.write_sidecar
171 && let Err(e) = writer(&self.engine_dir, ¶ms)
172 {
173 println!(" Warning: failed to update param cache: {}", e);
174 }
175
176 if let Some(ref writer) = self.callbacks.write_ts_types
177 && let Err(e) = writer(¶ms)
178 {
179 ts_types_ok = false;
180 log_regeneration_failure("TypeScript parameter types", &e);
181 }
182
183 if let (Some(processors), Some(writer)) = (
184 processors.as_deref(),
185 self.callbacks.write_processor_ts_types.as_ref(),
186 ) && let Err(e) = writer(processors)
187 {
188 processor_types_ok = false;
189 log_regeneration_failure("TypeScript processor types", &e);
190 }
191
192 println!(" {} Updating parameter host...", style("→").dim());
193 let replace_result =
194 std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
195 self.host.replace_parameters(params.clone())
196 }));
197
198 match replace_result {
199 Ok(Ok(())) => {
200 println!(" {} Updated {} parameters", style("→").dim(), params.len());
201 }
202 Ok(Err(e)) => {
203 reload_ok = false;
204 println!(
205 " {} Failed to replace parameters: {:#}",
206 style("✗").red(),
207 e
208 );
209 }
210 Err(panic_payload) => {
211 reload_ok = false;
212 println!(
213 " {} Panic while replacing parameters: {}",
214 style("✗").red(),
215 panic_message(panic_payload)
216 );
217 }
218 }
219
220 if reload_ok {
221 println!(" {} Notifying UI clients...", style("→").dim());
222 let broadcast_result = tokio::spawn({
223 let ws_server = Arc::clone(&self.ws_server);
224 async move { ws_server.broadcast_parameters_changed().await }
225 })
226 .await;
227
228 match broadcast_result {
229 Ok(Ok(())) => {
230 println!(" {} UI notified", style("→").dim());
231 }
232 Ok(Err(e)) => {
233 reload_ok = false;
234 println!(
235 " {} Failed to notify UI clients: {:#}",
236 style("✗").red(),
237 e
238 );
239 }
240 Err(join_err) => {
241 reload_ok = false;
242 println!(
243 " {} Panic while notifying UI clients: {}",
244 style("✗").red(),
245 join_err
246 );
247 }
248 }
249 }
250
251 if reload_ok {
252 let change_info = if param_count_change > 0 {
253 format!(" (+{} new)", param_count_change)
254 } else if param_count_change < 0 {
255 format!(" ({} removed)", -param_count_change)
256 } else {
257 String::new()
258 };
259
260 let param_types_status = if ts_types_ok {
261 ""
262 } else {
263 " (⚠ TypeScript parameter types stale)"
264 };
265
266 let processor_types_status = if processor_types_ok {
267 ""
268 } else {
269 " (⚠ TypeScript processor types stale)"
270 };
271
272 println!(
273 " {} Hot-reload complete — {} parameters{}{}{}",
274 style("✓").green(),
275 params.len(),
276 change_info,
277 param_types_status,
278 processor_types_status,
279 );
280
281 #[cfg(feature = "audio")]
283 if let Some(ref tx) = self.audio_reload_tx {
284 let _ = tx.send(params);
285 }
286 } else {
287 println!(
288 " {} Hot-reload aborted — parameters not fully applied",
289 style("✗").red()
290 );
291 }
292 }
293 Err(e) => {
294 println!(" {} Build failed:\n{}", style("✗").red(), e);
295 }
297 }
298
299 if !self.guard.complete() {
300 break; }
302 println!(
303 " {} Pending changes detected, rebuilding...",
304 style("→").cyan()
305 );
306 }
307
308 Ok(())
309 }
310
311 async fn do_build(&self) -> Result<(Vec<ParameterInfo>, i32)> {
313 if *self.shutdown_rx.borrow() {
314 anyhow::bail!("Build cancelled due to shutdown");
315 }
316
317 println!(" {} Rebuilding plugin...", style("🔄").cyan());
318 let start = std::time::Instant::now();
319
320 let old_count = self.host.get_all_parameters().len() as i32;
322
323 let mut cmd = Command::new("cargo");
325 cmd.args([
326 "build",
327 "--lib",
328 "--features",
329 "_param-discovery",
330 "--message-format=json",
331 ]);
332
333 if let Some(ref package_name) = self.callbacks.package_name {
334 cmd.args(["--package", package_name]);
335 }
336
337 let mut child = cmd
338 .current_dir(&self.engine_dir)
339 .stdout(Stdio::piped())
340 .stderr(Stdio::piped())
341 .spawn()
342 .context("Failed to spawn cargo build")?;
343
344 let stdout = child
345 .stdout
346 .take()
347 .context("Failed to capture cargo stdout")?;
348 let stderr = child
349 .stderr
350 .take()
351 .context("Failed to capture cargo stderr")?;
352
353 let stdout_handle = tokio::spawn(read_to_end(stdout));
354 let stderr_handle = tokio::spawn(read_to_end(stderr));
355
356 let mut shutdown_rx = self.shutdown_rx.clone();
357 let status = tokio::select! {
358 status = child.wait() => status.context("Failed to wait for cargo build")?,
359 _ = shutdown_rx.changed() => {
360 self.kill_build_process(&mut child).await?;
361 let _ = stdout_handle.await;
362 let _ = stderr_handle.await;
363 anyhow::bail!("Build cancelled due to shutdown");
364 }
365 };
366
367 let stdout = stdout_handle
368 .await
369 .context("Failed to join cargo stdout task")??;
370 let stderr = stderr_handle
371 .await
372 .context("Failed to join cargo stderr task")??;
373
374 let elapsed = start.elapsed();
375
376 if !status.success() {
377 let stderr = String::from_utf8_lossy(&stderr);
379 let stdout = String::from_utf8_lossy(&stdout);
380
381 let mut error_lines = Vec::new();
383 for line in stdout.lines().chain(stderr.lines()) {
384 if let Ok(json) = serde_json::from_str::<serde_json::Value>(line)
385 && json["reason"] == "compiler-message"
386 && let Some(message) = json["message"]["rendered"].as_str()
387 {
388 error_lines.push(message.to_string());
389 }
390 }
391
392 if error_lines.is_empty() {
393 error_lines.push(stderr.to_string());
394 }
395
396 anyhow::bail!("{}", error_lines.join("\n"));
397 }
398
399 println!(
400 " {} Build succeeded in {:.1}s",
401 style("✓").green(),
402 elapsed.as_secs_f64()
403 );
404
405 let loader = Arc::clone(&self.callbacks.param_loader);
407 let engine_dir = self.engine_dir.clone();
408 let mut cancel_rx = self.cancel_param_load_rx.clone();
409
410 let params = tokio::select! {
411 result = loader(engine_dir) => {
412 result.context("Failed to load parameters from rebuilt dylib")?
413 }
414 _ = cancel_rx.changed() => {
415 if *cancel_rx.borrow_and_update() {
416 println!(
417 " {} Parameter extraction cancelled — superseded by newer change",
418 style("⚠").yellow()
419 );
420 anyhow::bail!("Parameter extraction cancelled due to newer file change");
421 }
422 loader(self.engine_dir.clone())
424 .await
425 .context("Failed to load parameters from rebuilt dylib")?
426 }
427 };
428
429 let param_count_change = params.len() as i32 - old_count;
430
431 Ok((params, param_count_change))
432 }
433
434 async fn kill_build_process(&self, child: &mut tokio::process::Child) -> Result<()> {
435 #[cfg(unix)]
436 {
437 use nix::sys::signal::{Signal, kill};
438 use nix::unistd::Pid;
439
440 if let Some(pid) = child.id() {
441 let _ = kill(Pid::from_raw(-(pid as i32)), Signal::SIGTERM);
442 }
443 }
444
445 let _ = child.kill().await;
446 Ok(())
447 }
448}
449
450async fn read_to_end(mut reader: impl tokio::io::AsyncRead + Unpin) -> Result<Vec<u8>> {
451 let mut buffer = Vec::new();
452 reader
453 .read_to_end(&mut buffer)
454 .await
455 .context("Failed to read cargo output")?;
456 Ok(buffer)
457}
458
459fn panic_message(payload: Box<dyn Any + Send>) -> String {
460 if let Some(msg) = payload.downcast_ref::<String>() {
461 msg.clone()
462 } else if let Some(msg) = payload.downcast_ref::<&str>() {
463 msg.to_string()
464 } else {
465 "Unknown panic".to_string()
466 }
467}
468
469fn log_regeneration_failure(target: &str, error: &anyhow::Error) {
470 println!(
471 " {} Failed to regenerate {}: {}",
472 style("⚠").yellow(),
473 target,
474 error
475 );
476 println!(
477 " {} Save a Rust source file to retry, or restart the dev server",
478 style(" ↳").dim(),
479 );
480}