Skip to main content

fluxbench_cli/
supervisor.rs

1//! Supervisor Process
2//!
3//! Manages worker processes and aggregates results via IPC.
4//!
5//! On Unix, creates dynamically-allocated pipe pairs and passes their fd
6//! numbers to the worker via the `FLUX_IPC_FD` environment variable,
7//! leaving stdout/stderr free for user benchmark code. On non-Unix
8//! platforms, falls back to stdin/stdout pipes (user `println!` may
9//! corrupt the protocol stream in this mode).
10//!
11//! **Timeout behavior:** On Unix, sends SIGTERM for graceful shutdown then
12//! drains pending samples (500ms window) before SIGKILL. On non-Unix,
13//! kills immediately without draining — partial samples are lost.
14
15use fluxbench_core::BenchmarkDef;
16use fluxbench_ipc::{
17    BenchmarkConfig, FrameError, FrameReader, FrameWriter, Sample, SupervisorCommand,
18    WorkerCapabilities, WorkerMessage,
19};
20use rayon::ThreadPoolBuilder;
21use rayon::prelude::*;
22use std::env;
23use std::process::{Child, Command, Stdio};
24use std::time::{Duration, Instant};
25use thiserror::Error;
26
27#[cfg(unix)]
28use std::os::unix::io::{FromRawFd, RawFd};
29#[cfg(unix)]
30use std::os::unix::process::CommandExt;
31
32#[derive(Debug, Error)]
33#[non_exhaustive]
34pub enum SupervisorError {
35    #[error("Failed to spawn worker: {0}")]
36    SpawnFailed(#[from] std::io::Error),
37
38    #[error("IPC error: {0}")]
39    IpcError(String),
40
41    #[error("Worker crashed: {0}")]
42    WorkerCrashed(String),
43
44    #[error("Timeout waiting for worker")]
45    Timeout,
46
47    #[error("Benchmark not found: {0}")]
48    BenchmarkNotFound(String),
49
50    #[error("Worker protocol error: expected {expected}, got {got}")]
51    ProtocolError { expected: String, got: String },
52}
53
54impl From<FrameError> for SupervisorError {
55    fn from(e: FrameError) -> Self {
56        SupervisorError::IpcError(e.to_string())
57    }
58}
59
60/// Result from a benchmark run via IPC
61#[derive(Debug)]
62pub struct IpcBenchmarkResult {
63    pub bench_id: String,
64    pub samples: Vec<Sample>,
65    pub total_iterations: u64,
66    pub total_duration_nanos: u64,
67    pub status: IpcBenchmarkStatus,
68}
69
70#[derive(Debug, Clone)]
71pub enum IpcBenchmarkStatus {
72    Success,
73    Failed {
74        message: String,
75        kind: String,
76        backtrace: Option<String>,
77    },
78    Crashed {
79        message: String,
80        kind: String,
81        backtrace: Option<String>,
82    },
83}
84
85// ─── Platform-specific poll ──────────────────────────────────────────────────
86
87/// Result of polling for data
88#[derive(Debug)]
89enum PollResult {
90    DataAvailable,
91    Timeout,
92    PipeClosed,
93    Error(std::io::Error),
94}
95
96/// Wait for data to be available on a raw fd with timeout (Unix: `poll(2)`).
97#[cfg(unix)]
98fn wait_for_data_fd(fd: i32, timeout_ms: i32) -> PollResult {
99    let mut pollfd = libc::pollfd {
100        fd,
101        events: libc::POLLIN,
102        revents: 0,
103    };
104
105    let result = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
106
107    if result < 0 {
108        PollResult::Error(std::io::Error::last_os_error())
109    } else if result == 0 {
110        PollResult::Timeout
111    } else if pollfd.revents & libc::POLLIN != 0 {
112        PollResult::DataAvailable
113    } else if pollfd.revents & (libc::POLLERR | libc::POLLHUP | libc::POLLNVAL) != 0 {
114        PollResult::PipeClosed
115    } else {
116        PollResult::Timeout
117    }
118}
119
120// ─── Platform-specific pipe/fd helpers (Unix only) ───────────────────────────
121
122#[cfg(unix)]
123fn create_pipe() -> Result<(RawFd, RawFd), std::io::Error> {
124    let mut fds = [0 as RawFd; 2];
125    let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
126    if ret != 0 {
127        return Err(std::io::Error::last_os_error());
128    }
129    for &fd in &fds {
130        unsafe {
131            let flags = libc::fcntl(fd, libc::F_GETFD);
132            libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
133        }
134    }
135    Ok((fds[0], fds[1]))
136}
137
138#[cfg(unix)]
139fn close_fd(fd: RawFd) {
140    unsafe {
141        libc::close(fd);
142    }
143}
144
145/// Send SIGTERM to a process. Returns `Err` if the signal could not be delivered.
146#[cfg(unix)]
147fn send_sigterm(pid: u32) -> Result<(), std::io::Error> {
148    let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
149    if ret == -1 {
150        Err(std::io::Error::last_os_error())
151    } else {
152        Ok(())
153    }
154}
155
156// ─── WorkerHandle ────────────────────────────────────────────────────────────
157
158/// Worker process handle.
159///
160/// Wraps the IPC reader/writer as trait objects so both the Unix (File-backed)
161/// and non-Unix (ChildStdin/ChildStdout-backed) paths share the same struct.
162/// Trait objects are bounded by `Send` because the handle may be moved across
163/// threads in the supervisor's `rayon` thread pool.
164pub struct WorkerHandle {
165    child: Child,
166    reader: FrameReader<Box<dyn std::io::Read + Send>>,
167    writer: FrameWriter<Box<dyn std::io::Write + Send>>,
168    capabilities: Option<WorkerCapabilities>,
169    timeout: Duration,
170    /// Raw fd for `poll(2)` (Unix). On non-Unix this is unused (-1).
171    poll_fd: i32,
172}
173
174impl WorkerHandle {
175    /// Spawn a new worker process.
176    pub fn spawn(timeout: Duration) -> Result<Self, SupervisorError> {
177        let binary = env::current_exe().map_err(SupervisorError::SpawnFailed)?;
178        Self::spawn_impl(&binary, timeout)
179    }
180
181    /// Spawn a worker for a specific binary (for testing).
182    pub fn spawn_binary(binary: &str, timeout: Duration) -> Result<Self, SupervisorError> {
183        Self::spawn_impl(binary.as_ref(), timeout)
184    }
185
186    // ── Unix spawn: dedicated pipe fds, stdout free for user code ────────
187
188    #[cfg(unix)]
189    fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
190        // cmd_pipe: supervisor writes commands → worker reads from cmd_read
191        let (cmd_read, cmd_write) = create_pipe()?;
192        // msg_pipe: worker writes messages from msg_write → supervisor reads from msg_read
193        let (msg_read, msg_write) = match create_pipe() {
194            Ok(fds) => fds,
195            Err(e) => {
196                close_fd(cmd_read);
197                close_fd(cmd_write);
198                return Err(SupervisorError::SpawnFailed(e));
199            }
200        };
201
202        // Pass the actual pipe fd numbers to the worker — no dup2 to hardcoded
203        // fds 3/4 which may already be in use by library static initializers.
204        let mut command = Command::new(binary);
205        command
206            .arg("--flux-worker")
207            .env("FLUX_IPC_FD", format!("{},{}", cmd_read, msg_write))
208            .stdin(Stdio::null())
209            .stdout(Stdio::null())
210            .stderr(Stdio::inherit());
211
212        // In the child: clear CLOEXEC on the child-side fds so they survive exec,
213        // and close the parent-side fds.
214        unsafe {
215            command.pre_exec(move || {
216                // Keep child-side fds open across exec
217                let flags = libc::fcntl(cmd_read, libc::F_GETFD);
218                if flags == -1 {
219                    return Err(std::io::Error::last_os_error());
220                }
221                if libc::fcntl(cmd_read, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
222                    return Err(std::io::Error::last_os_error());
223                }
224
225                let flags = libc::fcntl(msg_write, libc::F_GETFD);
226                if flags == -1 {
227                    return Err(std::io::Error::last_os_error());
228                }
229                if libc::fcntl(msg_write, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
230                    return Err(std::io::Error::last_os_error());
231                }
232
233                // Close parent-side ends that leaked into the child
234                libc::close(cmd_write);
235                libc::close(msg_read);
236
237                Ok(())
238            });
239        }
240
241        let child = match command.spawn() {
242            Ok(c) => c,
243            Err(e) => {
244                close_fd(cmd_read);
245                close_fd(cmd_write);
246                close_fd(msg_read);
247                close_fd(msg_write);
248                return Err(SupervisorError::SpawnFailed(e));
249            }
250        };
251
252        // Close child-side ends in the parent
253        close_fd(cmd_read);
254        close_fd(msg_write);
255
256        // SAFETY: msg_read and cmd_write are valid fds whose child-side
257        // counterparts have been closed above. from_raw_fd takes ownership
258        // and will close them on drop.
259        let reader_file = unsafe { std::fs::File::from_raw_fd(msg_read) };
260        let writer_file = unsafe { std::fs::File::from_raw_fd(cmd_write) };
261
262        // Duplicate the fd for poll(2) so we have an independent handle that
263        // doesn't alias the File-owned fd. Closed in WorkerHandle::drop().
264        let poll_fd = unsafe { libc::dup(msg_read) };
265        if poll_fd < 0 {
266            return Err(SupervisorError::SpawnFailed(std::io::Error::last_os_error()));
267        }
268
269        let mut handle = Self {
270            child,
271            reader: FrameReader::new(Box::new(reader_file) as Box<dyn std::io::Read + Send>),
272            writer: FrameWriter::new(Box::new(writer_file) as Box<dyn std::io::Write + Send>),
273            capabilities: None,
274            timeout,
275            poll_fd,
276        };
277
278        handle.wait_for_hello()?;
279        Ok(handle)
280    }
281
282    // ── Non-Unix spawn: stdin/stdout pipes (fallback) ────────────────────
283
284    #[cfg(not(unix))]
285    fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
286        let mut child = Command::new(binary)
287            .arg("--flux-worker")
288            .stdin(Stdio::piped())
289            .stdout(Stdio::piped())
290            .stderr(Stdio::inherit())
291            .spawn()?;
292
293        let stdin = child.stdin.take().expect("stdin should be available");
294        let stdout = child.stdout.take().expect("stdout should be available");
295
296        let mut handle = Self {
297            child,
298            reader: FrameReader::new(Box::new(stdout) as Box<dyn std::io::Read + Send>),
299            writer: FrameWriter::new(Box::new(stdin) as Box<dyn std::io::Write + Send>),
300            capabilities: None,
301            timeout,
302            poll_fd: -1,
303        };
304
305        handle.wait_for_hello()?;
306        Ok(handle)
307    }
308
309    // ── Shared logic ─────────────────────────────────────────────────────
310
311    /// Wait for Hello message from worker and validate protocol version
312    fn wait_for_hello(&mut self) -> Result<(), SupervisorError> {
313        let msg: WorkerMessage = self.reader.read()?;
314
315        match msg {
316            WorkerMessage::Hello(caps) => {
317                if caps.protocol_version != fluxbench_ipc::PROTOCOL_VERSION {
318                    return Err(SupervisorError::ProtocolError {
319                        expected: format!("protocol version {}", fluxbench_ipc::PROTOCOL_VERSION),
320                        got: format!("protocol version {}", caps.protocol_version),
321                    });
322                }
323                self.capabilities = Some(caps);
324                Ok(())
325            }
326            other => Err(SupervisorError::ProtocolError {
327                expected: "Hello".to_string(),
328                got: format!("{:?}", other),
329            }),
330        }
331    }
332
333    /// Get worker capabilities
334    pub fn capabilities(&self) -> Option<&WorkerCapabilities> {
335        self.capabilities.as_ref()
336    }
337
338    /// Run a benchmark on this worker
339    pub fn run_benchmark(
340        &mut self,
341        bench_id: &str,
342        config: &BenchmarkConfig,
343    ) -> Result<IpcBenchmarkResult, SupervisorError> {
344        self.writer.write(&SupervisorCommand::Run {
345            bench_id: bench_id.to_string(),
346            config: config.clone(),
347        })?;
348
349        let mut all_samples = Vec::new();
350        let start = Instant::now();
351
352        loop {
353            let remaining = self.timeout.saturating_sub(start.elapsed());
354            if remaining.is_zero() {
355                return self.handle_timeout(all_samples);
356            }
357
358            // Check if there's buffered data, or poll for new data.
359            // Even with buffered data we verify the worker is alive — the buffer
360            // might hold an incomplete frame that will never be completed.
361            if self.reader.has_buffered_data() {
362                if !self.is_alive() {
363                    return Err(SupervisorError::WorkerCrashed(
364                        "Worker process crashed with partial data buffered".to_string(),
365                    ));
366                }
367            } else {
368                self.wait_for_worker_data(remaining)?;
369            }
370
371            // Read next message (blocking — poll/sleep above confirmed data is available)
372            let msg: WorkerMessage = match self.reader.read::<WorkerMessage>() {
373                Ok(msg) => msg,
374                Err(FrameError::EndOfStream) => {
375                    return Err(SupervisorError::WorkerCrashed(
376                        "Worker closed connection unexpectedly".to_string(),
377                    ));
378                }
379                Err(e) => {
380                    if !self.is_alive() {
381                        return Err(SupervisorError::WorkerCrashed(
382                            "Worker crashed during read".to_string(),
383                        ));
384                    }
385                    return Err(SupervisorError::IpcError(e.to_string()));
386                }
387            };
388
389            match msg {
390                WorkerMessage::SampleBatch(batch) => {
391                    all_samples.extend(batch.samples);
392                }
393                WorkerMessage::WarmupComplete { .. } | WorkerMessage::Progress { .. } => {
394                    continue;
395                }
396                WorkerMessage::Complete {
397                    total_iterations,
398                    total_duration_nanos,
399                } => {
400                    return Ok(IpcBenchmarkResult {
401                        bench_id: bench_id.to_string(),
402                        samples: all_samples,
403                        total_iterations,
404                        total_duration_nanos,
405                        status: IpcBenchmarkStatus::Success,
406                    });
407                }
408                WorkerMessage::Failure {
409                    kind,
410                    message,
411                    backtrace,
412                } => {
413                    let kind_str = match kind {
414                        fluxbench_ipc::FailureKind::Panic => "panic",
415                        fluxbench_ipc::FailureKind::Timeout => "timeout",
416                        fluxbench_ipc::FailureKind::Assertion => "assertion",
417                        fluxbench_ipc::FailureKind::AllocationLimit => "allocation_limit",
418                        fluxbench_ipc::FailureKind::Signal => "signal",
419                        fluxbench_ipc::FailureKind::Unknown => "unknown",
420                    }
421                    .to_string();
422                    return Ok(IpcBenchmarkResult {
423                        bench_id: bench_id.to_string(),
424                        samples: all_samples,
425                        total_iterations: 0,
426                        total_duration_nanos: 0,
427                        status: match kind {
428                            fluxbench_ipc::FailureKind::Panic => IpcBenchmarkStatus::Crashed {
429                                message,
430                                kind: kind_str,
431                                backtrace,
432                            },
433                            _ => IpcBenchmarkStatus::Failed {
434                                message,
435                                kind: kind_str,
436                                backtrace,
437                            },
438                        },
439                    });
440                }
441                WorkerMessage::Hello(_) => {
442                    return Err(SupervisorError::ProtocolError {
443                        expected: "SampleBatch/Complete/Failure".to_string(),
444                        got: "Hello".to_string(),
445                    });
446                }
447            }
448        }
449    }
450
451    /// Wait for data from the worker, checking liveness periodically.
452    ///
453    /// On Unix this uses `poll(2)` on the message fd.
454    /// On non-Unix this uses a polling sleep loop (less efficient but portable).
455    #[cfg(unix)]
456    fn wait_for_worker_data(&mut self, remaining: Duration) -> Result<(), SupervisorError> {
457        let poll_timeout = remaining.min(Duration::from_millis(100));
458        match wait_for_data_fd(self.poll_fd, poll_timeout.as_millis() as i32) {
459            PollResult::DataAvailable => {
460                if !self.is_alive() {
461                    return Err(SupervisorError::WorkerCrashed(
462                        "Worker process crashed with data in pipe".to_string(),
463                    ));
464                }
465                Ok(())
466            }
467            PollResult::Timeout => {
468                if !self.is_alive() {
469                    return Err(SupervisorError::WorkerCrashed(
470                        "Worker process exited unexpectedly".to_string(),
471                    ));
472                }
473                // Signal caller to re-loop (no data yet)
474                Ok(())
475            }
476            PollResult::PipeClosed => Err(SupervisorError::WorkerCrashed(
477                "Worker pipe closed unexpectedly".to_string(),
478            )),
479            PollResult::Error(e) => {
480                Err(SupervisorError::WorkerCrashed(format!("Pipe error: {}", e)))
481            }
482        }
483    }
484
485    #[cfg(not(unix))]
486    fn wait_for_worker_data(&mut self, _remaining: Duration) -> Result<(), SupervisorError> {
487        // Without poll(2), sleep briefly then check liveness.
488        // The subsequent blocking read will pick up data.
489        std::thread::sleep(Duration::from_millis(10));
490        if !self.is_alive() {
491            return Err(SupervisorError::WorkerCrashed(
492                "Worker process exited unexpectedly".to_string(),
493            ));
494        }
495        Ok(())
496    }
497
498    /// Handle timeout: on Unix send SIGTERM → drain 500ms → SIGKILL.
499    /// On non-Unix just kill immediately.
500    #[cfg(unix)]
501    fn handle_timeout(
502        &mut self,
503        mut samples: Vec<Sample>,
504    ) -> Result<IpcBenchmarkResult, SupervisorError> {
505        // Send SIGTERM for graceful shutdown (ignore error — worker may already be dead)
506        let _ = send_sigterm(self.child.id());
507
508        // Drain any messages the worker flushes in response to SIGTERM (500ms window)
509        let drain_deadline = Instant::now() + Duration::from_millis(500);
510        loop {
511            let remaining = drain_deadline.saturating_duration_since(Instant::now());
512            if remaining.is_zero() {
513                break;
514            }
515
516            match wait_for_data_fd(self.poll_fd, remaining.as_millis() as i32) {
517                PollResult::DataAvailable => match self.reader.read::<WorkerMessage>() {
518                    Ok(WorkerMessage::SampleBatch(batch)) => {
519                        samples.extend(batch.samples);
520                    }
521                    Ok(WorkerMessage::Complete { .. }) => break,
522                    _ => break,
523                },
524                PollResult::PipeClosed => break,
525                _ => break,
526            }
527        }
528
529        if self.is_alive() {
530            let _ = self.child.kill();
531            let _ = self.child.wait();
532        }
533
534        Err(SupervisorError::Timeout)
535    }
536
537    #[cfg(not(unix))]
538    fn handle_timeout(
539        &mut self,
540        _samples: Vec<Sample>,
541    ) -> Result<IpcBenchmarkResult, SupervisorError> {
542        // No SIGTERM on non-Unix — just kill immediately
543        if self.is_alive() {
544            let _ = self.child.kill();
545            let _ = self.child.wait();
546        }
547        Err(SupervisorError::Timeout)
548    }
549
550    /// Ping the worker to check if it's alive
551    pub fn ping(&mut self) -> Result<bool, SupervisorError> {
552        self.writer.write(&SupervisorCommand::Ping)?;
553        Ok(true)
554    }
555
556    /// Abort the current benchmark
557    pub fn abort(&mut self) -> Result<(), SupervisorError> {
558        self.writer.write(&SupervisorCommand::Abort)?;
559        Ok(())
560    }
561
562    /// Shutdown the worker gracefully
563    pub fn shutdown(mut self) -> Result<(), SupervisorError> {
564        self.writer.write(&SupervisorCommand::Shutdown)?;
565        let _ = self.child.wait();
566        Ok(())
567    }
568
569    /// Check if worker process is still running
570    pub fn is_alive(&mut self) -> bool {
571        match self.child.try_wait() {
572            Ok(Some(_)) => false,
573            Ok(None) => true,
574            Err(_) => false,
575        }
576    }
577
578    /// Kill the worker process forcefully
579    pub fn kill(&mut self) -> Result<(), SupervisorError> {
580        self.child.kill().map_err(SupervisorError::SpawnFailed)?;
581        let _ = self.child.wait();
582        Ok(())
583    }
584}
585
586impl Drop for WorkerHandle {
587    fn drop(&mut self) {
588        if self.is_alive() {
589            #[cfg(unix)]
590            {
591                // Graceful: SIGTERM first, brief wait, then SIGKILL
592                let _ = send_sigterm(self.child.id());
593                std::thread::sleep(Duration::from_millis(50));
594            }
595            if self.is_alive() {
596                let _ = self.child.kill();
597            }
598            let _ = self.child.wait();
599        }
600
601        // Close the duplicated poll fd (Unix only; -1 on non-Unix is a no-op)
602        #[cfg(unix)]
603        if self.poll_fd >= 0 {
604            close_fd(self.poll_fd);
605        }
606    }
607}
608
609// ─── Supervisor ──────────────────────────────────────────────────────────────
610
611/// Supervisor that manages worker pool and distributes benchmarks
612pub struct Supervisor {
613    config: BenchmarkConfig,
614    timeout: Duration,
615    num_workers: usize,
616}
617
618impl Supervisor {
619    /// Create a new supervisor
620    pub fn new(config: BenchmarkConfig, timeout: Duration, num_workers: usize) -> Self {
621        Self {
622            config,
623            timeout,
624            num_workers: num_workers.max(1),
625        }
626    }
627
628    /// Run all benchmarks with process isolation using the default config.
629    ///
630    /// For per-benchmark configuration, use [`run_all_configs`](Self::run_all_configs).
631    pub fn run_all(
632        &self,
633        benchmarks: &[&BenchmarkDef],
634    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
635        let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
636        self.run_all_configs(benchmarks, &configs)
637    }
638
639    /// Run all benchmarks with per-benchmark configs
640    pub fn run_all_configs(
641        &self,
642        benchmarks: &[&BenchmarkDef],
643        configs: &[BenchmarkConfig],
644    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
645        if benchmarks.is_empty() {
646            return Ok(Vec::new());
647        }
648
649        if self.num_workers == 1 || benchmarks.len() == 1 {
650            let mut results = Vec::with_capacity(benchmarks.len());
651            for (bench, cfg) in benchmarks.iter().zip(configs.iter()) {
652                results.push(self.run_isolated(bench, cfg)?);
653            }
654            return Ok(results);
655        }
656
657        let worker_count = self.num_workers.min(benchmarks.len());
658        let pool = ThreadPoolBuilder::new()
659            .num_threads(worker_count)
660            .build()
661            .map_err(|e| {
662                SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
663            })?;
664
665        let pairs: Vec<_> = benchmarks.iter().zip(configs.iter()).collect();
666        let outcomes: Vec<Result<IpcBenchmarkResult, SupervisorError>> = pool.install(|| {
667            pairs
668                .par_iter()
669                .map(|(bench, cfg)| self.run_isolated(bench, cfg))
670                .collect()
671        });
672
673        let mut results = Vec::with_capacity(outcomes.len());
674        for outcome in outcomes {
675            results.push(outcome?);
676        }
677        Ok(results)
678    }
679
680    /// Run a single benchmark in an isolated worker process
681    fn run_isolated(
682        &self,
683        bench: &BenchmarkDef,
684        config: &BenchmarkConfig,
685    ) -> Result<IpcBenchmarkResult, SupervisorError> {
686        let mut worker = WorkerHandle::spawn(self.timeout)?;
687        let result = worker.run_benchmark(bench.id, config);
688        let _ = worker.shutdown();
689        result
690    }
691
692    fn crashed_result(bench: &BenchmarkDef, message: String) -> IpcBenchmarkResult {
693        IpcBenchmarkResult {
694            bench_id: bench.id.to_string(),
695            samples: Vec::new(),
696            total_iterations: 0,
697            total_duration_nanos: 0,
698            status: IpcBenchmarkStatus::Crashed {
699                message,
700                kind: "crashed".to_string(),
701                backtrace: None,
702            },
703        }
704    }
705
706    fn run_with_reuse_indexed(
707        &self,
708        benchmarks: &[(usize, &BenchmarkDef, &BenchmarkConfig)],
709    ) -> Vec<(usize, IpcBenchmarkResult)> {
710        let mut results = Vec::with_capacity(benchmarks.len());
711        if benchmarks.is_empty() {
712            return results;
713        }
714
715        let mut worker = match WorkerHandle::spawn(self.timeout) {
716            Ok(worker) => Some(worker),
717            Err(e) => {
718                let message = e.to_string();
719                for &(index, bench, _) in benchmarks {
720                    results.push((index, Self::crashed_result(bench, message.clone())));
721                }
722                return results;
723            }
724        };
725
726        for &(index, bench, cfg) in benchmarks {
727            if worker.is_none() {
728                match WorkerHandle::spawn(self.timeout) {
729                    Ok(new_worker) => worker = Some(new_worker),
730                    Err(e) => {
731                        results.push((index, Self::crashed_result(bench, e.to_string())));
732                        continue;
733                    }
734                }
735            }
736
737            let run_result = match worker.as_mut() {
738                Some(worker) => worker.run_benchmark(bench.id, cfg),
739                None => unreachable!("worker should exist after spawn check"),
740            };
741
742            match run_result {
743                Ok(result) => results.push((index, result)),
744                Err(e) => {
745                    let worker_is_alive = worker.as_mut().map(|w| w.is_alive()).unwrap_or(false);
746                    if !worker_is_alive {
747                        if let Some(mut dead_worker) = worker.take() {
748                            let _ = dead_worker.kill();
749                        }
750                    }
751                    results.push((index, Self::crashed_result(bench, e.to_string())));
752                }
753            }
754        }
755
756        if let Some(worker) = worker {
757            let _ = worker.shutdown();
758        }
759
760        results
761    }
762
763    /// Run benchmarks with worker reuse using the default config.
764    ///
765    /// For per-benchmark configuration, use [`run_with_reuse_configs`](Self::run_with_reuse_configs).
766    pub fn run_with_reuse(
767        &self,
768        benchmarks: &[&BenchmarkDef],
769    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
770        let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
771        self.run_with_reuse_configs(benchmarks, &configs)
772    }
773
774    /// Run benchmarks with worker reuse and per-benchmark configs
775    pub fn run_with_reuse_configs(
776        &self,
777        benchmarks: &[&BenchmarkDef],
778        configs: &[BenchmarkConfig],
779    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
780        if benchmarks.is_empty() {
781            return Ok(Vec::new());
782        }
783
784        let indexed_benchmarks: Vec<(usize, &BenchmarkDef, &BenchmarkConfig)> = benchmarks
785            .iter()
786            .zip(configs.iter())
787            .enumerate()
788            .map(|(index, (bench, cfg))| (index, *bench, cfg))
789            .collect();
790
791        let mut indexed_results = if self.num_workers == 1 || benchmarks.len() == 1 {
792            self.run_with_reuse_indexed(&indexed_benchmarks)
793        } else {
794            let worker_count = self.num_workers.min(indexed_benchmarks.len());
795            let mut shards: Vec<Vec<(usize, &BenchmarkDef, &BenchmarkConfig)>> =
796                vec![Vec::new(); worker_count];
797            for (position, entry) in indexed_benchmarks.into_iter().enumerate() {
798                shards[position % worker_count].push(entry);
799            }
800
801            let pool = ThreadPoolBuilder::new()
802                .num_threads(worker_count)
803                .build()
804                .map_err(|e| {
805                    SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
806                })?;
807
808            let shard_results: Vec<Vec<(usize, IpcBenchmarkResult)>> = pool.install(|| {
809                shards
810                    .into_par_iter()
811                    .map(|shard| self.run_with_reuse_indexed(&shard))
812                    .collect()
813            });
814
815            shard_results.into_iter().flatten().collect()
816        };
817
818        indexed_results.sort_by_key(|(index, _)| *index);
819        if indexed_results.len() != benchmarks.len() {
820            return Err(SupervisorError::IpcError(format!(
821                "Internal error: expected {} results, got {}",
822                benchmarks.len(),
823                indexed_results.len()
824            )));
825        }
826
827        Ok(indexed_results
828            .into_iter()
829            .map(|(_, result)| result)
830            .collect())
831    }
832}
833
834#[cfg(test)]
835mod tests {
836    use super::*;
837
838    #[test]
839    #[ignore] // Requires built binary
840    fn test_supervisor_spawn() {
841        let timeout = Duration::from_secs(30);
842        let config = BenchmarkConfig::default();
843        let supervisor = Supervisor::new(config, timeout, 1);
844        assert_eq!(supervisor.num_workers, 1);
845    }
846}