Skip to main content

fluxbench_cli/
supervisor.rs

1//! Supervisor Process
2//!
3//! Manages worker processes and aggregates results via IPC.
4//!
5//! On Unix, creates dynamically-allocated pipe pairs and passes their fd
6//! numbers to the worker via the `FLUX_IPC_FD` environment variable,
7//! leaving stdout/stderr free for user benchmark code. On non-Unix
8//! platforms, falls back to stdin/stdout pipes (user `println!` may
9//! corrupt the protocol stream in this mode).
10//!
11//! **Timeout behavior:** On Unix, sends SIGTERM for graceful shutdown then
12//! drains pending samples (500ms window) before SIGKILL. On non-Unix,
13//! kills immediately without draining — partial samples are lost.
14
15use fluxbench_core::BenchmarkDef;
16use fluxbench_ipc::{
17    BenchmarkConfig, FrameError, FrameReader, FrameWriter, Sample, SupervisorCommand,
18    WorkerCapabilities, WorkerMessage,
19};
20use rayon::ThreadPoolBuilder;
21use rayon::prelude::*;
22use std::env;
23use std::process::{Child, Command, Stdio};
24use std::time::{Duration, Instant};
25use thiserror::Error;
26
27#[cfg(unix)]
28use std::os::unix::io::{FromRawFd, RawFd};
29#[cfg(unix)]
30use std::os::unix::process::CommandExt;
31
32/// Errors that can occur during supervisor operations
33#[derive(Debug, Error)]
34#[non_exhaustive]
35pub enum SupervisorError {
36    /// Failed to spawn worker process.
37    #[error("Failed to spawn worker: {0}")]
38    SpawnFailed(#[from] std::io::Error),
39
40    /// IPC communication error.
41    #[error("IPC error: {0}")]
42    IpcError(String),
43
44    /// Worker process crashed or exited unexpectedly.
45    #[error("Worker crashed: {0}")]
46    WorkerCrashed(String),
47
48    /// Worker did not respond within timeout.
49    #[error("Timeout waiting for worker")]
50    Timeout,
51
52    /// Benchmark with the given ID not found.
53    #[error("Benchmark not found: {0}")]
54    BenchmarkNotFound(String),
55
56    /// Worker protocol version or message type mismatch.
57    #[error("Worker protocol error: expected {expected}, got {got}")]
58    ProtocolError {
59        /// Expected message or protocol version.
60        expected: String,
61        /// Actual message or protocol version received.
62        got: String,
63    },
64}
65
66impl From<FrameError> for SupervisorError {
67    fn from(e: FrameError) -> Self {
68        SupervisorError::IpcError(e.to_string())
69    }
70}
71
72/// Result from a benchmark run via IPC
73#[derive(Debug)]
74pub struct IpcBenchmarkResult {
75    /// Benchmark identifier.
76    pub bench_id: String,
77    /// Timing samples collected during measurement phase.
78    pub samples: Vec<Sample>,
79    /// Total number of iterations performed.
80    pub total_iterations: u64,
81    /// Total benchmark duration in nanoseconds.
82    pub total_duration_nanos: u64,
83    /// Status of the benchmark execution.
84    pub status: IpcBenchmarkStatus,
85}
86
87/// Status of a benchmark execution
88#[derive(Debug, Clone)]
89pub enum IpcBenchmarkStatus {
90    /// Benchmark completed successfully.
91    Success,
92    /// Benchmark failed with an error (non-panic).
93    Failed {
94        /// Error message.
95        message: String,
96        /// Error kind/category.
97        kind: String,
98        /// Optional backtrace from the error.
99        backtrace: Option<String>,
100    },
101    /// Benchmark crashed (panic or signal).
102    Crashed {
103        /// Error message.
104        message: String,
105        /// Crash kind/category.
106        kind: String,
107        /// Optional backtrace from the crash.
108        backtrace: Option<String>,
109    },
110}
111
112// ─── Platform-specific poll ──────────────────────────────────────────────────
113
114/// Result of polling for data
115#[derive(Debug)]
116enum PollResult {
117    DataAvailable,
118    Timeout,
119    PipeClosed,
120    Error(std::io::Error),
121}
122
123/// Wait for data to be available on a raw fd with timeout (Unix: `poll(2)`).
124#[cfg(unix)]
125fn wait_for_data_fd(fd: i32, timeout_ms: i32) -> PollResult {
126    let mut pollfd = libc::pollfd {
127        fd,
128        events: libc::POLLIN,
129        revents: 0,
130    };
131
132    let result = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
133
134    if result < 0 {
135        PollResult::Error(std::io::Error::last_os_error())
136    } else if result == 0 {
137        PollResult::Timeout
138    } else if pollfd.revents & libc::POLLIN != 0 {
139        PollResult::DataAvailable
140    } else if pollfd.revents & (libc::POLLERR | libc::POLLHUP | libc::POLLNVAL) != 0 {
141        PollResult::PipeClosed
142    } else {
143        PollResult::Timeout
144    }
145}
146
147// ─── Platform-specific pipe/fd helpers (Unix only) ───────────────────────────
148
149#[cfg(unix)]
150fn create_pipe() -> Result<(RawFd, RawFd), std::io::Error> {
151    let mut fds = [0 as RawFd; 2];
152    let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
153    if ret != 0 {
154        return Err(std::io::Error::last_os_error());
155    }
156    for &fd in &fds {
157        unsafe {
158            let flags = libc::fcntl(fd, libc::F_GETFD);
159            libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
160        }
161    }
162    Ok((fds[0], fds[1]))
163}
164
165#[cfg(unix)]
166fn close_fd(fd: RawFd) {
167    unsafe {
168        libc::close(fd);
169    }
170}
171
172/// Send SIGTERM to a process. Returns `Err` if the signal could not be delivered.
173#[cfg(unix)]
174fn send_sigterm(pid: u32) -> Result<(), std::io::Error> {
175    let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
176    if ret == -1 {
177        Err(std::io::Error::last_os_error())
178    } else {
179        Ok(())
180    }
181}
182
183// ─── WorkerHandle ────────────────────────────────────────────────────────────
184
185/// Worker process handle.
186///
187/// Wraps the IPC reader/writer as trait objects so both the Unix (File-backed)
188/// and non-Unix (ChildStdin/ChildStdout-backed) paths share the same struct.
189/// Trait objects are bounded by `Send` because the handle may be moved across
190/// threads in the supervisor's `rayon` thread pool.
191pub struct WorkerHandle {
192    child: Child,
193    reader: FrameReader<Box<dyn std::io::Read + Send>>,
194    writer: FrameWriter<Box<dyn std::io::Write + Send>>,
195    capabilities: Option<WorkerCapabilities>,
196    timeout: Duration,
197    /// Raw fd for `poll(2)` (Unix). On non-Unix this is unused (-1).
198    poll_fd: i32,
199}
200
201impl WorkerHandle {
202    /// Spawn a new worker process.
203    pub fn spawn(timeout: Duration) -> Result<Self, SupervisorError> {
204        let binary = env::current_exe().map_err(SupervisorError::SpawnFailed)?;
205        Self::spawn_impl(&binary, timeout)
206    }
207
208    /// Spawn a worker for a specific binary (for testing).
209    pub fn spawn_binary(binary: &str, timeout: Duration) -> Result<Self, SupervisorError> {
210        Self::spawn_impl(binary.as_ref(), timeout)
211    }
212
213    // ── Unix spawn: dedicated pipe fds, stdout free for user code ────────
214
215    #[cfg(unix)]
216    fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
217        // cmd_pipe: supervisor writes commands → worker reads from cmd_read
218        let (cmd_read, cmd_write) = create_pipe()?;
219        // msg_pipe: worker writes messages from msg_write → supervisor reads from msg_read
220        let (msg_read, msg_write) = match create_pipe() {
221            Ok(fds) => fds,
222            Err(e) => {
223                close_fd(cmd_read);
224                close_fd(cmd_write);
225                return Err(SupervisorError::SpawnFailed(e));
226            }
227        };
228
229        // Pass the actual pipe fd numbers to the worker — no dup2 to hardcoded
230        // fds 3/4 which may already be in use by library static initializers.
231        let mut command = Command::new(binary);
232        command
233            .arg("--flux-worker")
234            .env("FLUX_IPC_FD", format!("{},{}", cmd_read, msg_write))
235            .stdin(Stdio::null())
236            .stdout(Stdio::null())
237            .stderr(Stdio::inherit());
238
239        // In the child: clear CLOEXEC on the child-side fds so they survive exec,
240        // and close the parent-side fds.
241        unsafe {
242            command.pre_exec(move || {
243                // Keep child-side fds open across exec
244                let flags = libc::fcntl(cmd_read, libc::F_GETFD);
245                if flags == -1 {
246                    return Err(std::io::Error::last_os_error());
247                }
248                if libc::fcntl(cmd_read, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
249                    return Err(std::io::Error::last_os_error());
250                }
251
252                let flags = libc::fcntl(msg_write, libc::F_GETFD);
253                if flags == -1 {
254                    return Err(std::io::Error::last_os_error());
255                }
256                if libc::fcntl(msg_write, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
257                    return Err(std::io::Error::last_os_error());
258                }
259
260                // Close parent-side ends that leaked into the child
261                libc::close(cmd_write);
262                libc::close(msg_read);
263
264                Ok(())
265            });
266        }
267
268        let child = match command.spawn() {
269            Ok(c) => c,
270            Err(e) => {
271                close_fd(cmd_read);
272                close_fd(cmd_write);
273                close_fd(msg_read);
274                close_fd(msg_write);
275                return Err(SupervisorError::SpawnFailed(e));
276            }
277        };
278
279        // Close child-side ends in the parent
280        close_fd(cmd_read);
281        close_fd(msg_write);
282
283        // SAFETY: msg_read and cmd_write are valid fds whose child-side
284        // counterparts have been closed above. from_raw_fd takes ownership
285        // and will close them on drop.
286        let reader_file = unsafe { std::fs::File::from_raw_fd(msg_read) };
287        let writer_file = unsafe { std::fs::File::from_raw_fd(cmd_write) };
288
289        // Duplicate the fd for poll(2) so we have an independent handle that
290        // doesn't alias the File-owned fd. Closed in WorkerHandle::drop().
291        let poll_fd = unsafe { libc::dup(msg_read) };
292        if poll_fd < 0 {
293            return Err(SupervisorError::SpawnFailed(std::io::Error::last_os_error()));
294        }
295
296        let mut handle = Self {
297            child,
298            reader: FrameReader::new(Box::new(reader_file) as Box<dyn std::io::Read + Send>),
299            writer: FrameWriter::new(Box::new(writer_file) as Box<dyn std::io::Write + Send>),
300            capabilities: None,
301            timeout,
302            poll_fd,
303        };
304
305        handle.wait_for_hello()?;
306        Ok(handle)
307    }
308
309    // ── Non-Unix spawn: stdin/stdout pipes (fallback) ────────────────────
310
311    #[cfg(not(unix))]
312    fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
313        let mut child = Command::new(binary)
314            .arg("--flux-worker")
315            .stdin(Stdio::piped())
316            .stdout(Stdio::piped())
317            .stderr(Stdio::inherit())
318            .spawn()?;
319
320        let stdin = child.stdin.take().expect("stdin should be available");
321        let stdout = child.stdout.take().expect("stdout should be available");
322
323        let mut handle = Self {
324            child,
325            reader: FrameReader::new(Box::new(stdout) as Box<dyn std::io::Read + Send>),
326            writer: FrameWriter::new(Box::new(stdin) as Box<dyn std::io::Write + Send>),
327            capabilities: None,
328            timeout,
329            poll_fd: -1,
330        };
331
332        handle.wait_for_hello()?;
333        Ok(handle)
334    }
335
336    // ── Shared logic ─────────────────────────────────────────────────────
337
338    /// Wait for Hello message from worker and validate protocol version
339    fn wait_for_hello(&mut self) -> Result<(), SupervisorError> {
340        let msg: WorkerMessage = self.reader.read()?;
341
342        match msg {
343            WorkerMessage::Hello(caps) => {
344                if caps.protocol_version != fluxbench_ipc::PROTOCOL_VERSION {
345                    return Err(SupervisorError::ProtocolError {
346                        expected: format!("protocol version {}", fluxbench_ipc::PROTOCOL_VERSION),
347                        got: format!("protocol version {}", caps.protocol_version),
348                    });
349                }
350                self.capabilities = Some(caps);
351                Ok(())
352            }
353            other => Err(SupervisorError::ProtocolError {
354                expected: "Hello".to_string(),
355                got: format!("{:?}", other),
356            }),
357        }
358    }
359
360    /// Get worker capabilities
361    pub fn capabilities(&self) -> Option<&WorkerCapabilities> {
362        self.capabilities.as_ref()
363    }
364
365    /// Run a benchmark on this worker
366    pub fn run_benchmark(
367        &mut self,
368        bench_id: &str,
369        config: &BenchmarkConfig,
370    ) -> Result<IpcBenchmarkResult, SupervisorError> {
371        self.writer.write(&SupervisorCommand::Run {
372            bench_id: bench_id.to_string(),
373            config: config.clone(),
374        })?;
375
376        let mut all_samples = Vec::new();
377        let start = Instant::now();
378
379        loop {
380            let remaining = self.timeout.saturating_sub(start.elapsed());
381            if remaining.is_zero() {
382                return self.handle_timeout(all_samples);
383            }
384
385            // Check if there's buffered data, or poll for new data.
386            // Even with buffered data we verify the worker is alive — the buffer
387            // might hold an incomplete frame that will never be completed.
388            if self.reader.has_buffered_data() {
389                if !self.is_alive() {
390                    return Err(SupervisorError::WorkerCrashed(
391                        "Worker process crashed with partial data buffered".to_string(),
392                    ));
393                }
394            } else {
395                self.wait_for_worker_data(remaining)?;
396            }
397
398            // Read next message (blocking — poll/sleep above confirmed data is available)
399            let msg: WorkerMessage = match self.reader.read::<WorkerMessage>() {
400                Ok(msg) => msg,
401                Err(FrameError::EndOfStream) => {
402                    return Err(SupervisorError::WorkerCrashed(
403                        "Worker closed connection unexpectedly".to_string(),
404                    ));
405                }
406                Err(e) => {
407                    if !self.is_alive() {
408                        return Err(SupervisorError::WorkerCrashed(
409                            "Worker crashed during read".to_string(),
410                        ));
411                    }
412                    return Err(SupervisorError::IpcError(e.to_string()));
413                }
414            };
415
416            match msg {
417                WorkerMessage::SampleBatch(batch) => {
418                    all_samples.extend(batch.samples);
419                }
420                WorkerMessage::WarmupComplete { .. } | WorkerMessage::Progress { .. } => {
421                    continue;
422                }
423                WorkerMessage::Complete {
424                    total_iterations,
425                    total_duration_nanos,
426                } => {
427                    return Ok(IpcBenchmarkResult {
428                        bench_id: bench_id.to_string(),
429                        samples: all_samples,
430                        total_iterations,
431                        total_duration_nanos,
432                        status: IpcBenchmarkStatus::Success,
433                    });
434                }
435                WorkerMessage::Failure {
436                    kind,
437                    message,
438                    backtrace,
439                } => {
440                    let kind_str = match kind {
441                        fluxbench_ipc::FailureKind::Panic => "panic",
442                        fluxbench_ipc::FailureKind::Timeout => "timeout",
443                        fluxbench_ipc::FailureKind::Assertion => "assertion",
444                        fluxbench_ipc::FailureKind::AllocationLimit => "allocation_limit",
445                        fluxbench_ipc::FailureKind::Signal => "signal",
446                        fluxbench_ipc::FailureKind::Unknown => "unknown",
447                    }
448                    .to_string();
449                    return Ok(IpcBenchmarkResult {
450                        bench_id: bench_id.to_string(),
451                        samples: all_samples,
452                        total_iterations: 0,
453                        total_duration_nanos: 0,
454                        status: match kind {
455                            fluxbench_ipc::FailureKind::Panic => IpcBenchmarkStatus::Crashed {
456                                message,
457                                kind: kind_str,
458                                backtrace,
459                            },
460                            _ => IpcBenchmarkStatus::Failed {
461                                message,
462                                kind: kind_str,
463                                backtrace,
464                            },
465                        },
466                    });
467                }
468                WorkerMessage::Hello(_) => {
469                    return Err(SupervisorError::ProtocolError {
470                        expected: "SampleBatch/Complete/Failure".to_string(),
471                        got: "Hello".to_string(),
472                    });
473                }
474            }
475        }
476    }
477
478    /// Wait for data from the worker, checking liveness periodically.
479    ///
480    /// On Unix this uses `poll(2)` on the message fd.
481    /// On non-Unix this uses a polling sleep loop (less efficient but portable).
482    #[cfg(unix)]
483    fn wait_for_worker_data(&mut self, remaining: Duration) -> Result<(), SupervisorError> {
484        let poll_timeout = remaining.min(Duration::from_millis(100));
485        match wait_for_data_fd(self.poll_fd, poll_timeout.as_millis() as i32) {
486            PollResult::DataAvailable => {
487                if !self.is_alive() {
488                    return Err(SupervisorError::WorkerCrashed(
489                        "Worker process crashed with data in pipe".to_string(),
490                    ));
491                }
492                Ok(())
493            }
494            PollResult::Timeout => {
495                if !self.is_alive() {
496                    return Err(SupervisorError::WorkerCrashed(
497                        "Worker process exited unexpectedly".to_string(),
498                    ));
499                }
500                // Signal caller to re-loop (no data yet)
501                Ok(())
502            }
503            PollResult::PipeClosed => Err(SupervisorError::WorkerCrashed(
504                "Worker pipe closed unexpectedly".to_string(),
505            )),
506            PollResult::Error(e) => {
507                Err(SupervisorError::WorkerCrashed(format!("Pipe error: {}", e)))
508            }
509        }
510    }
511
512    #[cfg(not(unix))]
513    fn wait_for_worker_data(&mut self, _remaining: Duration) -> Result<(), SupervisorError> {
514        // Without poll(2), sleep briefly then check liveness.
515        // The subsequent blocking read will pick up data.
516        std::thread::sleep(Duration::from_millis(10));
517        if !self.is_alive() {
518            return Err(SupervisorError::WorkerCrashed(
519                "Worker process exited unexpectedly".to_string(),
520            ));
521        }
522        Ok(())
523    }
524
525    /// Handle timeout: on Unix send SIGTERM → drain 500ms → SIGKILL.
526    /// On non-Unix just kill immediately.
527    #[cfg(unix)]
528    fn handle_timeout(
529        &mut self,
530        mut samples: Vec<Sample>,
531    ) -> Result<IpcBenchmarkResult, SupervisorError> {
532        // Send SIGTERM for graceful shutdown (ignore error — worker may already be dead)
533        let _ = send_sigterm(self.child.id());
534
535        // Drain any messages the worker flushes in response to SIGTERM (500ms window)
536        let drain_deadline = Instant::now() + Duration::from_millis(500);
537        loop {
538            let remaining = drain_deadline.saturating_duration_since(Instant::now());
539            if remaining.is_zero() {
540                break;
541            }
542
543            match wait_for_data_fd(self.poll_fd, remaining.as_millis() as i32) {
544                PollResult::DataAvailable => match self.reader.read::<WorkerMessage>() {
545                    Ok(WorkerMessage::SampleBatch(batch)) => {
546                        samples.extend(batch.samples);
547                    }
548                    Ok(WorkerMessage::Complete { .. }) => break,
549                    _ => break,
550                },
551                PollResult::PipeClosed => break,
552                _ => break,
553            }
554        }
555
556        if self.is_alive() {
557            let _ = self.child.kill();
558            let _ = self.child.wait();
559        }
560
561        Err(SupervisorError::Timeout)
562    }
563
564    #[cfg(not(unix))]
565    fn handle_timeout(
566        &mut self,
567        _samples: Vec<Sample>,
568    ) -> Result<IpcBenchmarkResult, SupervisorError> {
569        // No SIGTERM on non-Unix — just kill immediately
570        if self.is_alive() {
571            let _ = self.child.kill();
572            let _ = self.child.wait();
573        }
574        Err(SupervisorError::Timeout)
575    }
576
577    /// Ping the worker to check if it's alive
578    pub fn ping(&mut self) -> Result<bool, SupervisorError> {
579        self.writer.write(&SupervisorCommand::Ping)?;
580        Ok(true)
581    }
582
583    /// Abort the current benchmark
584    pub fn abort(&mut self) -> Result<(), SupervisorError> {
585        self.writer.write(&SupervisorCommand::Abort)?;
586        Ok(())
587    }
588
589    /// Shutdown the worker gracefully
590    pub fn shutdown(mut self) -> Result<(), SupervisorError> {
591        self.writer.write(&SupervisorCommand::Shutdown)?;
592        let _ = self.child.wait();
593        Ok(())
594    }
595
596    /// Check if worker process is still running
597    pub fn is_alive(&mut self) -> bool {
598        match self.child.try_wait() {
599            Ok(Some(_)) => false,
600            Ok(None) => true,
601            Err(_) => false,
602        }
603    }
604
605    /// Kill the worker process forcefully
606    pub fn kill(&mut self) -> Result<(), SupervisorError> {
607        self.child.kill().map_err(SupervisorError::SpawnFailed)?;
608        let _ = self.child.wait();
609        Ok(())
610    }
611}
612
613impl Drop for WorkerHandle {
614    fn drop(&mut self) {
615        if self.is_alive() {
616            #[cfg(unix)]
617            {
618                // Graceful: SIGTERM first, brief wait, then SIGKILL
619                let _ = send_sigterm(self.child.id());
620                std::thread::sleep(Duration::from_millis(50));
621            }
622            if self.is_alive() {
623                let _ = self.child.kill();
624            }
625            let _ = self.child.wait();
626        }
627
628        // Close the duplicated poll fd (Unix only; -1 on non-Unix is a no-op)
629        #[cfg(unix)]
630        if self.poll_fd >= 0 {
631            close_fd(self.poll_fd);
632        }
633    }
634}
635
636// ─── Supervisor ──────────────────────────────────────────────────────────────
637
638/// Supervisor that manages worker pool and distributes benchmarks
639pub struct Supervisor {
640    config: BenchmarkConfig,
641    timeout: Duration,
642    num_workers: usize,
643}
644
645impl Supervisor {
646    /// Create a new supervisor
647    pub fn new(config: BenchmarkConfig, timeout: Duration, num_workers: usize) -> Self {
648        Self {
649            config,
650            timeout,
651            num_workers: num_workers.max(1),
652        }
653    }
654
655    /// Run all benchmarks with process isolation using the default config.
656    ///
657    /// For per-benchmark configuration, use [`run_all_configs`](Self::run_all_configs).
658    pub fn run_all(
659        &self,
660        benchmarks: &[&BenchmarkDef],
661    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
662        let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
663        self.run_all_configs(benchmarks, &configs)
664    }
665
666    /// Run all benchmarks with per-benchmark configs
667    pub fn run_all_configs(
668        &self,
669        benchmarks: &[&BenchmarkDef],
670        configs: &[BenchmarkConfig],
671    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
672        if benchmarks.is_empty() {
673            return Ok(Vec::new());
674        }
675
676        if self.num_workers == 1 || benchmarks.len() == 1 {
677            let mut results = Vec::with_capacity(benchmarks.len());
678            for (bench, cfg) in benchmarks.iter().zip(configs.iter()) {
679                results.push(self.run_isolated(bench, cfg)?);
680            }
681            return Ok(results);
682        }
683
684        let worker_count = self.num_workers.min(benchmarks.len());
685        let pool = ThreadPoolBuilder::new()
686            .num_threads(worker_count)
687            .build()
688            .map_err(|e| {
689                SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
690            })?;
691
692        let pairs: Vec<_> = benchmarks.iter().zip(configs.iter()).collect();
693        let outcomes: Vec<Result<IpcBenchmarkResult, SupervisorError>> = pool.install(|| {
694            pairs
695                .par_iter()
696                .map(|(bench, cfg)| self.run_isolated(bench, cfg))
697                .collect()
698        });
699
700        let mut results = Vec::with_capacity(outcomes.len());
701        for outcome in outcomes {
702            results.push(outcome?);
703        }
704        Ok(results)
705    }
706
707    /// Run a single benchmark in an isolated worker process
708    fn run_isolated(
709        &self,
710        bench: &BenchmarkDef,
711        config: &BenchmarkConfig,
712    ) -> Result<IpcBenchmarkResult, SupervisorError> {
713        let mut worker = WorkerHandle::spawn(self.timeout)?;
714        let result = worker.run_benchmark(bench.id, config);
715        let _ = worker.shutdown();
716        result
717    }
718
719    fn crashed_result(bench: &BenchmarkDef, message: String) -> IpcBenchmarkResult {
720        IpcBenchmarkResult {
721            bench_id: bench.id.to_string(),
722            samples: Vec::new(),
723            total_iterations: 0,
724            total_duration_nanos: 0,
725            status: IpcBenchmarkStatus::Crashed {
726                message,
727                kind: "crashed".to_string(),
728                backtrace: None,
729            },
730        }
731    }
732
733    fn run_with_reuse_indexed(
734        &self,
735        benchmarks: &[(usize, &BenchmarkDef, &BenchmarkConfig)],
736    ) -> Vec<(usize, IpcBenchmarkResult)> {
737        let mut results = Vec::with_capacity(benchmarks.len());
738        if benchmarks.is_empty() {
739            return results;
740        }
741
742        let mut worker = match WorkerHandle::spawn(self.timeout) {
743            Ok(worker) => Some(worker),
744            Err(e) => {
745                let message = e.to_string();
746                for &(index, bench, _) in benchmarks {
747                    results.push((index, Self::crashed_result(bench, message.clone())));
748                }
749                return results;
750            }
751        };
752
753        for &(index, bench, cfg) in benchmarks {
754            if worker.is_none() {
755                match WorkerHandle::spawn(self.timeout) {
756                    Ok(new_worker) => worker = Some(new_worker),
757                    Err(e) => {
758                        results.push((index, Self::crashed_result(bench, e.to_string())));
759                        continue;
760                    }
761                }
762            }
763
764            let run_result = match worker.as_mut() {
765                Some(worker) => worker.run_benchmark(bench.id, cfg),
766                None => Err(SupervisorError::IpcError(
767                    "worker unexpectedly absent after spawn".to_string(),
768                )),
769            };
770
771            match run_result {
772                Ok(result) => results.push((index, result)),
773                Err(e) => {
774                    let worker_is_alive = worker.as_mut().map(|w| w.is_alive()).unwrap_or(false);
775                    if !worker_is_alive {
776                        if let Some(mut dead_worker) = worker.take() {
777                            let _ = dead_worker.kill();
778                        }
779                    }
780                    results.push((index, Self::crashed_result(bench, e.to_string())));
781                }
782            }
783        }
784
785        if let Some(worker) = worker {
786            let _ = worker.shutdown();
787        }
788
789        results
790    }
791
792    /// Run benchmarks with worker reuse using the default config.
793    ///
794    /// For per-benchmark configuration, use [`run_with_reuse_configs`](Self::run_with_reuse_configs).
795    pub fn run_with_reuse(
796        &self,
797        benchmarks: &[&BenchmarkDef],
798    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
799        let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
800        self.run_with_reuse_configs(benchmarks, &configs)
801    }
802
803    /// Run benchmarks with worker reuse and per-benchmark configs
804    pub fn run_with_reuse_configs(
805        &self,
806        benchmarks: &[&BenchmarkDef],
807        configs: &[BenchmarkConfig],
808    ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
809        if benchmarks.is_empty() {
810            return Ok(Vec::new());
811        }
812
813        let indexed_benchmarks: Vec<(usize, &BenchmarkDef, &BenchmarkConfig)> = benchmarks
814            .iter()
815            .zip(configs.iter())
816            .enumerate()
817            .map(|(index, (bench, cfg))| (index, *bench, cfg))
818            .collect();
819
820        let mut indexed_results = if self.num_workers == 1 || benchmarks.len() == 1 {
821            self.run_with_reuse_indexed(&indexed_benchmarks)
822        } else {
823            let worker_count = self.num_workers.min(indexed_benchmarks.len());
824            let mut shards: Vec<Vec<(usize, &BenchmarkDef, &BenchmarkConfig)>> =
825                vec![Vec::new(); worker_count];
826            for (position, entry) in indexed_benchmarks.into_iter().enumerate() {
827                shards[position % worker_count].push(entry);
828            }
829
830            let pool = ThreadPoolBuilder::new()
831                .num_threads(worker_count)
832                .build()
833                .map_err(|e| {
834                    SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
835                })?;
836
837            let shard_results: Vec<Vec<(usize, IpcBenchmarkResult)>> = pool.install(|| {
838                shards
839                    .into_par_iter()
840                    .map(|shard| self.run_with_reuse_indexed(&shard))
841                    .collect()
842            });
843
844            shard_results.into_iter().flatten().collect()
845        };
846
847        indexed_results.sort_by_key(|(index, _)| *index);
848        if indexed_results.len() != benchmarks.len() {
849            return Err(SupervisorError::IpcError(format!(
850                "Internal error: expected {} results, got {}",
851                benchmarks.len(),
852                indexed_results.len()
853            )));
854        }
855
856        Ok(indexed_results
857            .into_iter()
858            .map(|(_, result)| result)
859            .collect())
860    }
861}
862
863#[cfg(test)]
864mod tests {
865    use super::*;
866
867    #[test]
868    #[ignore] // Requires built binary
869    fn test_supervisor_spawn() {
870        let timeout = Duration::from_secs(30);
871        let config = BenchmarkConfig::default();
872        let supervisor = Supervisor::new(config, timeout, 1);
873        assert_eq!(supervisor.num_workers, 1);
874    }
875}