1use fluxbench_core::BenchmarkDef;
16use fluxbench_ipc::{
17 BenchmarkConfig, FrameError, FrameReader, FrameWriter, Sample, SupervisorCommand,
18 WorkerCapabilities, WorkerMessage,
19};
20use rayon::ThreadPoolBuilder;
21use rayon::prelude::*;
22use std::env;
23use std::process::{Child, Command, Stdio};
24use std::time::{Duration, Instant};
25use thiserror::Error;
26
27#[cfg(unix)]
28use std::os::unix::io::{FromRawFd, RawFd};
29#[cfg(unix)]
30use std::os::unix::process::CommandExt;
31
32#[derive(Debug, Error)]
33#[non_exhaustive]
34pub enum SupervisorError {
35 #[error("Failed to spawn worker: {0}")]
36 SpawnFailed(#[from] std::io::Error),
37
38 #[error("IPC error: {0}")]
39 IpcError(String),
40
41 #[error("Worker crashed: {0}")]
42 WorkerCrashed(String),
43
44 #[error("Timeout waiting for worker")]
45 Timeout,
46
47 #[error("Benchmark not found: {0}")]
48 BenchmarkNotFound(String),
49
50 #[error("Worker protocol error: expected {expected}, got {got}")]
51 ProtocolError { expected: String, got: String },
52}
53
54impl From<FrameError> for SupervisorError {
55 fn from(e: FrameError) -> Self {
56 SupervisorError::IpcError(e.to_string())
57 }
58}
59
60#[derive(Debug)]
62pub struct IpcBenchmarkResult {
63 pub bench_id: String,
64 pub samples: Vec<Sample>,
65 pub total_iterations: u64,
66 pub total_duration_nanos: u64,
67 pub status: IpcBenchmarkStatus,
68}
69
70#[derive(Debug, Clone)]
71pub enum IpcBenchmarkStatus {
72 Success,
73 Failed {
74 message: String,
75 kind: String,
76 backtrace: Option<String>,
77 },
78 Crashed {
79 message: String,
80 kind: String,
81 backtrace: Option<String>,
82 },
83}
84
85#[derive(Debug)]
89enum PollResult {
90 DataAvailable,
91 Timeout,
92 PipeClosed,
93 Error(std::io::Error),
94}
95
96#[cfg(unix)]
98fn wait_for_data_fd(fd: i32, timeout_ms: i32) -> PollResult {
99 let mut pollfd = libc::pollfd {
100 fd,
101 events: libc::POLLIN,
102 revents: 0,
103 };
104
105 let result = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
106
107 if result < 0 {
108 PollResult::Error(std::io::Error::last_os_error())
109 } else if result == 0 {
110 PollResult::Timeout
111 } else if pollfd.revents & libc::POLLIN != 0 {
112 PollResult::DataAvailable
113 } else if pollfd.revents & (libc::POLLERR | libc::POLLHUP | libc::POLLNVAL) != 0 {
114 PollResult::PipeClosed
115 } else {
116 PollResult::Timeout
117 }
118}
119
120#[cfg(unix)]
123fn create_pipe() -> Result<(RawFd, RawFd), std::io::Error> {
124 let mut fds = [0 as RawFd; 2];
125 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
126 if ret != 0 {
127 return Err(std::io::Error::last_os_error());
128 }
129 for &fd in &fds {
130 unsafe {
131 let flags = libc::fcntl(fd, libc::F_GETFD);
132 libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
133 }
134 }
135 Ok((fds[0], fds[1]))
136}
137
138#[cfg(unix)]
139fn close_fd(fd: RawFd) {
140 unsafe {
141 libc::close(fd);
142 }
143}
144
145#[cfg(unix)]
147fn send_sigterm(pid: u32) -> Result<(), std::io::Error> {
148 let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
149 if ret == -1 {
150 Err(std::io::Error::last_os_error())
151 } else {
152 Ok(())
153 }
154}
155
156pub struct WorkerHandle {
165 child: Child,
166 reader: FrameReader<Box<dyn std::io::Read + Send>>,
167 writer: FrameWriter<Box<dyn std::io::Write + Send>>,
168 capabilities: Option<WorkerCapabilities>,
169 timeout: Duration,
170 poll_fd: i32,
172}
173
174impl WorkerHandle {
175 pub fn spawn(timeout: Duration) -> Result<Self, SupervisorError> {
177 let binary = env::current_exe().map_err(SupervisorError::SpawnFailed)?;
178 Self::spawn_impl(&binary, timeout)
179 }
180
181 pub fn spawn_binary(binary: &str, timeout: Duration) -> Result<Self, SupervisorError> {
183 Self::spawn_impl(binary.as_ref(), timeout)
184 }
185
186 #[cfg(unix)]
189 fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
190 let (cmd_read, cmd_write) = create_pipe()?;
192 let (msg_read, msg_write) = match create_pipe() {
194 Ok(fds) => fds,
195 Err(e) => {
196 close_fd(cmd_read);
197 close_fd(cmd_write);
198 return Err(SupervisorError::SpawnFailed(e));
199 }
200 };
201
202 let mut command = Command::new(binary);
205 command
206 .arg("--flux-worker")
207 .env("FLUX_IPC_FD", format!("{},{}", cmd_read, msg_write))
208 .stdin(Stdio::null())
209 .stdout(Stdio::null())
210 .stderr(Stdio::inherit());
211
212 unsafe {
215 command.pre_exec(move || {
216 let flags = libc::fcntl(cmd_read, libc::F_GETFD);
218 if flags == -1 {
219 return Err(std::io::Error::last_os_error());
220 }
221 if libc::fcntl(cmd_read, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
222 return Err(std::io::Error::last_os_error());
223 }
224
225 let flags = libc::fcntl(msg_write, libc::F_GETFD);
226 if flags == -1 {
227 return Err(std::io::Error::last_os_error());
228 }
229 if libc::fcntl(msg_write, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
230 return Err(std::io::Error::last_os_error());
231 }
232
233 libc::close(cmd_write);
235 libc::close(msg_read);
236
237 Ok(())
238 });
239 }
240
241 let child = match command.spawn() {
242 Ok(c) => c,
243 Err(e) => {
244 close_fd(cmd_read);
245 close_fd(cmd_write);
246 close_fd(msg_read);
247 close_fd(msg_write);
248 return Err(SupervisorError::SpawnFailed(e));
249 }
250 };
251
252 close_fd(cmd_read);
254 close_fd(msg_write);
255
256 let reader_file = unsafe { std::fs::File::from_raw_fd(msg_read) };
260 let writer_file = unsafe { std::fs::File::from_raw_fd(cmd_write) };
261
262 let poll_fd = unsafe { libc::dup(msg_read) };
265 if poll_fd < 0 {
266 return Err(SupervisorError::SpawnFailed(std::io::Error::last_os_error()));
267 }
268
269 let mut handle = Self {
270 child,
271 reader: FrameReader::new(Box::new(reader_file) as Box<dyn std::io::Read + Send>),
272 writer: FrameWriter::new(Box::new(writer_file) as Box<dyn std::io::Write + Send>),
273 capabilities: None,
274 timeout,
275 poll_fd,
276 };
277
278 handle.wait_for_hello()?;
279 Ok(handle)
280 }
281
282 #[cfg(not(unix))]
285 fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
286 let mut child = Command::new(binary)
287 .arg("--flux-worker")
288 .stdin(Stdio::piped())
289 .stdout(Stdio::piped())
290 .stderr(Stdio::inherit())
291 .spawn()?;
292
293 let stdin = child.stdin.take().expect("stdin should be available");
294 let stdout = child.stdout.take().expect("stdout should be available");
295
296 let mut handle = Self {
297 child,
298 reader: FrameReader::new(Box::new(stdout) as Box<dyn std::io::Read + Send>),
299 writer: FrameWriter::new(Box::new(stdin) as Box<dyn std::io::Write + Send>),
300 capabilities: None,
301 timeout,
302 poll_fd: -1,
303 };
304
305 handle.wait_for_hello()?;
306 Ok(handle)
307 }
308
309 fn wait_for_hello(&mut self) -> Result<(), SupervisorError> {
313 let msg: WorkerMessage = self.reader.read()?;
314
315 match msg {
316 WorkerMessage::Hello(caps) => {
317 if caps.protocol_version != fluxbench_ipc::PROTOCOL_VERSION {
318 return Err(SupervisorError::ProtocolError {
319 expected: format!("protocol version {}", fluxbench_ipc::PROTOCOL_VERSION),
320 got: format!("protocol version {}", caps.protocol_version),
321 });
322 }
323 self.capabilities = Some(caps);
324 Ok(())
325 }
326 other => Err(SupervisorError::ProtocolError {
327 expected: "Hello".to_string(),
328 got: format!("{:?}", other),
329 }),
330 }
331 }
332
333 pub fn capabilities(&self) -> Option<&WorkerCapabilities> {
335 self.capabilities.as_ref()
336 }
337
338 pub fn run_benchmark(
340 &mut self,
341 bench_id: &str,
342 config: &BenchmarkConfig,
343 ) -> Result<IpcBenchmarkResult, SupervisorError> {
344 self.writer.write(&SupervisorCommand::Run {
345 bench_id: bench_id.to_string(),
346 config: config.clone(),
347 })?;
348
349 let mut all_samples = Vec::new();
350 let start = Instant::now();
351
352 loop {
353 let remaining = self.timeout.saturating_sub(start.elapsed());
354 if remaining.is_zero() {
355 return self.handle_timeout(all_samples);
356 }
357
358 if self.reader.has_buffered_data() {
362 if !self.is_alive() {
363 return Err(SupervisorError::WorkerCrashed(
364 "Worker process crashed with partial data buffered".to_string(),
365 ));
366 }
367 } else {
368 self.wait_for_worker_data(remaining)?;
369 }
370
371 let msg: WorkerMessage = match self.reader.read::<WorkerMessage>() {
373 Ok(msg) => msg,
374 Err(FrameError::EndOfStream) => {
375 return Err(SupervisorError::WorkerCrashed(
376 "Worker closed connection unexpectedly".to_string(),
377 ));
378 }
379 Err(e) => {
380 if !self.is_alive() {
381 return Err(SupervisorError::WorkerCrashed(
382 "Worker crashed during read".to_string(),
383 ));
384 }
385 return Err(SupervisorError::IpcError(e.to_string()));
386 }
387 };
388
389 match msg {
390 WorkerMessage::SampleBatch(batch) => {
391 all_samples.extend(batch.samples);
392 }
393 WorkerMessage::WarmupComplete { .. } | WorkerMessage::Progress { .. } => {
394 continue;
395 }
396 WorkerMessage::Complete {
397 total_iterations,
398 total_duration_nanos,
399 } => {
400 return Ok(IpcBenchmarkResult {
401 bench_id: bench_id.to_string(),
402 samples: all_samples,
403 total_iterations,
404 total_duration_nanos,
405 status: IpcBenchmarkStatus::Success,
406 });
407 }
408 WorkerMessage::Failure {
409 kind,
410 message,
411 backtrace,
412 } => {
413 let kind_str = match kind {
414 fluxbench_ipc::FailureKind::Panic => "panic",
415 fluxbench_ipc::FailureKind::Timeout => "timeout",
416 fluxbench_ipc::FailureKind::Assertion => "assertion",
417 fluxbench_ipc::FailureKind::AllocationLimit => "allocation_limit",
418 fluxbench_ipc::FailureKind::Signal => "signal",
419 fluxbench_ipc::FailureKind::Unknown => "unknown",
420 }
421 .to_string();
422 return Ok(IpcBenchmarkResult {
423 bench_id: bench_id.to_string(),
424 samples: all_samples,
425 total_iterations: 0,
426 total_duration_nanos: 0,
427 status: match kind {
428 fluxbench_ipc::FailureKind::Panic => IpcBenchmarkStatus::Crashed {
429 message,
430 kind: kind_str,
431 backtrace,
432 },
433 _ => IpcBenchmarkStatus::Failed {
434 message,
435 kind: kind_str,
436 backtrace,
437 },
438 },
439 });
440 }
441 WorkerMessage::Hello(_) => {
442 return Err(SupervisorError::ProtocolError {
443 expected: "SampleBatch/Complete/Failure".to_string(),
444 got: "Hello".to_string(),
445 });
446 }
447 }
448 }
449 }
450
451 #[cfg(unix)]
456 fn wait_for_worker_data(&mut self, remaining: Duration) -> Result<(), SupervisorError> {
457 let poll_timeout = remaining.min(Duration::from_millis(100));
458 match wait_for_data_fd(self.poll_fd, poll_timeout.as_millis() as i32) {
459 PollResult::DataAvailable => {
460 if !self.is_alive() {
461 return Err(SupervisorError::WorkerCrashed(
462 "Worker process crashed with data in pipe".to_string(),
463 ));
464 }
465 Ok(())
466 }
467 PollResult::Timeout => {
468 if !self.is_alive() {
469 return Err(SupervisorError::WorkerCrashed(
470 "Worker process exited unexpectedly".to_string(),
471 ));
472 }
473 Ok(())
475 }
476 PollResult::PipeClosed => Err(SupervisorError::WorkerCrashed(
477 "Worker pipe closed unexpectedly".to_string(),
478 )),
479 PollResult::Error(e) => {
480 Err(SupervisorError::WorkerCrashed(format!("Pipe error: {}", e)))
481 }
482 }
483 }
484
485 #[cfg(not(unix))]
486 fn wait_for_worker_data(&mut self, _remaining: Duration) -> Result<(), SupervisorError> {
487 std::thread::sleep(Duration::from_millis(10));
490 if !self.is_alive() {
491 return Err(SupervisorError::WorkerCrashed(
492 "Worker process exited unexpectedly".to_string(),
493 ));
494 }
495 Ok(())
496 }
497
498 #[cfg(unix)]
501 fn handle_timeout(
502 &mut self,
503 mut samples: Vec<Sample>,
504 ) -> Result<IpcBenchmarkResult, SupervisorError> {
505 let _ = send_sigterm(self.child.id());
507
508 let drain_deadline = Instant::now() + Duration::from_millis(500);
510 loop {
511 let remaining = drain_deadline.saturating_duration_since(Instant::now());
512 if remaining.is_zero() {
513 break;
514 }
515
516 match wait_for_data_fd(self.poll_fd, remaining.as_millis() as i32) {
517 PollResult::DataAvailable => match self.reader.read::<WorkerMessage>() {
518 Ok(WorkerMessage::SampleBatch(batch)) => {
519 samples.extend(batch.samples);
520 }
521 Ok(WorkerMessage::Complete { .. }) => break,
522 _ => break,
523 },
524 PollResult::PipeClosed => break,
525 _ => break,
526 }
527 }
528
529 if self.is_alive() {
530 let _ = self.child.kill();
531 let _ = self.child.wait();
532 }
533
534 Err(SupervisorError::Timeout)
535 }
536
537 #[cfg(not(unix))]
538 fn handle_timeout(
539 &mut self,
540 _samples: Vec<Sample>,
541 ) -> Result<IpcBenchmarkResult, SupervisorError> {
542 if self.is_alive() {
544 let _ = self.child.kill();
545 let _ = self.child.wait();
546 }
547 Err(SupervisorError::Timeout)
548 }
549
550 pub fn ping(&mut self) -> Result<bool, SupervisorError> {
552 self.writer.write(&SupervisorCommand::Ping)?;
553 Ok(true)
554 }
555
556 pub fn abort(&mut self) -> Result<(), SupervisorError> {
558 self.writer.write(&SupervisorCommand::Abort)?;
559 Ok(())
560 }
561
562 pub fn shutdown(mut self) -> Result<(), SupervisorError> {
564 self.writer.write(&SupervisorCommand::Shutdown)?;
565 let _ = self.child.wait();
566 Ok(())
567 }
568
569 pub fn is_alive(&mut self) -> bool {
571 match self.child.try_wait() {
572 Ok(Some(_)) => false,
573 Ok(None) => true,
574 Err(_) => false,
575 }
576 }
577
578 pub fn kill(&mut self) -> Result<(), SupervisorError> {
580 self.child.kill().map_err(SupervisorError::SpawnFailed)?;
581 let _ = self.child.wait();
582 Ok(())
583 }
584}
585
586impl Drop for WorkerHandle {
587 fn drop(&mut self) {
588 if self.is_alive() {
589 #[cfg(unix)]
590 {
591 let _ = send_sigterm(self.child.id());
593 std::thread::sleep(Duration::from_millis(50));
594 }
595 if self.is_alive() {
596 let _ = self.child.kill();
597 }
598 let _ = self.child.wait();
599 }
600
601 #[cfg(unix)]
603 if self.poll_fd >= 0 {
604 close_fd(self.poll_fd);
605 }
606 }
607}
608
609pub struct Supervisor {
613 config: BenchmarkConfig,
614 timeout: Duration,
615 num_workers: usize,
616}
617
618impl Supervisor {
619 pub fn new(config: BenchmarkConfig, timeout: Duration, num_workers: usize) -> Self {
621 Self {
622 config,
623 timeout,
624 num_workers: num_workers.max(1),
625 }
626 }
627
628 pub fn run_all(
632 &self,
633 benchmarks: &[&BenchmarkDef],
634 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
635 let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
636 self.run_all_configs(benchmarks, &configs)
637 }
638
639 pub fn run_all_configs(
641 &self,
642 benchmarks: &[&BenchmarkDef],
643 configs: &[BenchmarkConfig],
644 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
645 if benchmarks.is_empty() {
646 return Ok(Vec::new());
647 }
648
649 if self.num_workers == 1 || benchmarks.len() == 1 {
650 let mut results = Vec::with_capacity(benchmarks.len());
651 for (bench, cfg) in benchmarks.iter().zip(configs.iter()) {
652 results.push(self.run_isolated(bench, cfg)?);
653 }
654 return Ok(results);
655 }
656
657 let worker_count = self.num_workers.min(benchmarks.len());
658 let pool = ThreadPoolBuilder::new()
659 .num_threads(worker_count)
660 .build()
661 .map_err(|e| {
662 SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
663 })?;
664
665 let pairs: Vec<_> = benchmarks.iter().zip(configs.iter()).collect();
666 let outcomes: Vec<Result<IpcBenchmarkResult, SupervisorError>> = pool.install(|| {
667 pairs
668 .par_iter()
669 .map(|(bench, cfg)| self.run_isolated(bench, cfg))
670 .collect()
671 });
672
673 let mut results = Vec::with_capacity(outcomes.len());
674 for outcome in outcomes {
675 results.push(outcome?);
676 }
677 Ok(results)
678 }
679
680 fn run_isolated(
682 &self,
683 bench: &BenchmarkDef,
684 config: &BenchmarkConfig,
685 ) -> Result<IpcBenchmarkResult, SupervisorError> {
686 let mut worker = WorkerHandle::spawn(self.timeout)?;
687 let result = worker.run_benchmark(bench.id, config);
688 let _ = worker.shutdown();
689 result
690 }
691
692 fn crashed_result(bench: &BenchmarkDef, message: String) -> IpcBenchmarkResult {
693 IpcBenchmarkResult {
694 bench_id: bench.id.to_string(),
695 samples: Vec::new(),
696 total_iterations: 0,
697 total_duration_nanos: 0,
698 status: IpcBenchmarkStatus::Crashed {
699 message,
700 kind: "crashed".to_string(),
701 backtrace: None,
702 },
703 }
704 }
705
706 fn run_with_reuse_indexed(
707 &self,
708 benchmarks: &[(usize, &BenchmarkDef, &BenchmarkConfig)],
709 ) -> Vec<(usize, IpcBenchmarkResult)> {
710 let mut results = Vec::with_capacity(benchmarks.len());
711 if benchmarks.is_empty() {
712 return results;
713 }
714
715 let mut worker = match WorkerHandle::spawn(self.timeout) {
716 Ok(worker) => Some(worker),
717 Err(e) => {
718 let message = e.to_string();
719 for &(index, bench, _) in benchmarks {
720 results.push((index, Self::crashed_result(bench, message.clone())));
721 }
722 return results;
723 }
724 };
725
726 for &(index, bench, cfg) in benchmarks {
727 if worker.is_none() {
728 match WorkerHandle::spawn(self.timeout) {
729 Ok(new_worker) => worker = Some(new_worker),
730 Err(e) => {
731 results.push((index, Self::crashed_result(bench, e.to_string())));
732 continue;
733 }
734 }
735 }
736
737 let run_result = match worker.as_mut() {
738 Some(worker) => worker.run_benchmark(bench.id, cfg),
739 None => unreachable!("worker should exist after spawn check"),
740 };
741
742 match run_result {
743 Ok(result) => results.push((index, result)),
744 Err(e) => {
745 let worker_is_alive = worker.as_mut().map(|w| w.is_alive()).unwrap_or(false);
746 if !worker_is_alive {
747 if let Some(mut dead_worker) = worker.take() {
748 let _ = dead_worker.kill();
749 }
750 }
751 results.push((index, Self::crashed_result(bench, e.to_string())));
752 }
753 }
754 }
755
756 if let Some(worker) = worker {
757 let _ = worker.shutdown();
758 }
759
760 results
761 }
762
763 pub fn run_with_reuse(
767 &self,
768 benchmarks: &[&BenchmarkDef],
769 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
770 let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
771 self.run_with_reuse_configs(benchmarks, &configs)
772 }
773
774 pub fn run_with_reuse_configs(
776 &self,
777 benchmarks: &[&BenchmarkDef],
778 configs: &[BenchmarkConfig],
779 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
780 if benchmarks.is_empty() {
781 return Ok(Vec::new());
782 }
783
784 let indexed_benchmarks: Vec<(usize, &BenchmarkDef, &BenchmarkConfig)> = benchmarks
785 .iter()
786 .zip(configs.iter())
787 .enumerate()
788 .map(|(index, (bench, cfg))| (index, *bench, cfg))
789 .collect();
790
791 let mut indexed_results = if self.num_workers == 1 || benchmarks.len() == 1 {
792 self.run_with_reuse_indexed(&indexed_benchmarks)
793 } else {
794 let worker_count = self.num_workers.min(indexed_benchmarks.len());
795 let mut shards: Vec<Vec<(usize, &BenchmarkDef, &BenchmarkConfig)>> =
796 vec![Vec::new(); worker_count];
797 for (position, entry) in indexed_benchmarks.into_iter().enumerate() {
798 shards[position % worker_count].push(entry);
799 }
800
801 let pool = ThreadPoolBuilder::new()
802 .num_threads(worker_count)
803 .build()
804 .map_err(|e| {
805 SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
806 })?;
807
808 let shard_results: Vec<Vec<(usize, IpcBenchmarkResult)>> = pool.install(|| {
809 shards
810 .into_par_iter()
811 .map(|shard| self.run_with_reuse_indexed(&shard))
812 .collect()
813 });
814
815 shard_results.into_iter().flatten().collect()
816 };
817
818 indexed_results.sort_by_key(|(index, _)| *index);
819 if indexed_results.len() != benchmarks.len() {
820 return Err(SupervisorError::IpcError(format!(
821 "Internal error: expected {} results, got {}",
822 benchmarks.len(),
823 indexed_results.len()
824 )));
825 }
826
827 Ok(indexed_results
828 .into_iter()
829 .map(|(_, result)| result)
830 .collect())
831 }
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837
838 #[test]
839 #[ignore] fn test_supervisor_spawn() {
841 let timeout = Duration::from_secs(30);
842 let config = BenchmarkConfig::default();
843 let supervisor = Supervisor::new(config, timeout, 1);
844 assert_eq!(supervisor.num_workers, 1);
845 }
846}