1use fluxbench_core::BenchmarkDef;
16use fluxbench_ipc::{
17 BenchmarkConfig, FrameError, FrameReader, FrameWriter, Sample, SupervisorCommand,
18 WorkerCapabilities, WorkerMessage,
19};
20use rayon::ThreadPoolBuilder;
21use rayon::prelude::*;
22use std::env;
23use std::process::{Child, Command, Stdio};
24use std::time::{Duration, Instant};
25use thiserror::Error;
26
27#[cfg(unix)]
28use std::os::unix::io::{FromRawFd, RawFd};
29#[cfg(unix)]
30use std::os::unix::process::CommandExt;
31
32#[derive(Debug, Error)]
34#[non_exhaustive]
35pub enum SupervisorError {
36 #[error("Failed to spawn worker: {0}")]
38 SpawnFailed(#[from] std::io::Error),
39
40 #[error("IPC error: {0}")]
42 IpcError(String),
43
44 #[error("Worker crashed: {0}")]
46 WorkerCrashed(String),
47
48 #[error("Timeout waiting for worker")]
50 Timeout,
51
52 #[error("Benchmark not found: {0}")]
54 BenchmarkNotFound(String),
55
56 #[error("Worker protocol error: expected {expected}, got {got}")]
58 ProtocolError {
59 expected: String,
61 got: String,
63 },
64}
65
66impl From<FrameError> for SupervisorError {
67 fn from(e: FrameError) -> Self {
68 SupervisorError::IpcError(e.to_string())
69 }
70}
71
72#[derive(Debug)]
74pub struct IpcBenchmarkResult {
75 pub bench_id: String,
77 pub samples: Vec<Sample>,
79 pub total_iterations: u64,
81 pub total_duration_nanos: u64,
83 pub status: IpcBenchmarkStatus,
85}
86
87#[derive(Debug, Clone)]
89pub enum IpcBenchmarkStatus {
90 Success,
92 Failed {
94 message: String,
96 kind: String,
98 backtrace: Option<String>,
100 },
101 Crashed {
103 message: String,
105 kind: String,
107 backtrace: Option<String>,
109 },
110}
111
112#[derive(Debug)]
116enum PollResult {
117 DataAvailable,
118 Timeout,
119 PipeClosed,
120 Error(std::io::Error),
121}
122
123#[cfg(unix)]
125fn wait_for_data_fd(fd: i32, timeout_ms: i32) -> PollResult {
126 let mut pollfd = libc::pollfd {
127 fd,
128 events: libc::POLLIN,
129 revents: 0,
130 };
131
132 let result = unsafe { libc::poll(&mut pollfd, 1, timeout_ms) };
133
134 if result < 0 {
135 PollResult::Error(std::io::Error::last_os_error())
136 } else if result == 0 {
137 PollResult::Timeout
138 } else if pollfd.revents & libc::POLLIN != 0 {
139 PollResult::DataAvailable
140 } else if pollfd.revents & (libc::POLLERR | libc::POLLHUP | libc::POLLNVAL) != 0 {
141 PollResult::PipeClosed
142 } else {
143 PollResult::Timeout
144 }
145}
146
147#[cfg(unix)]
150fn create_pipe() -> Result<(RawFd, RawFd), std::io::Error> {
151 let mut fds = [0 as RawFd; 2];
152 let ret = unsafe { libc::pipe(fds.as_mut_ptr()) };
153 if ret != 0 {
154 return Err(std::io::Error::last_os_error());
155 }
156 for &fd in &fds {
157 unsafe {
158 let flags = libc::fcntl(fd, libc::F_GETFD);
159 libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
160 }
161 }
162 Ok((fds[0], fds[1]))
163}
164
165#[cfg(unix)]
166fn close_fd(fd: RawFd) {
167 unsafe {
168 libc::close(fd);
169 }
170}
171
172#[cfg(unix)]
174fn send_sigterm(pid: u32) -> Result<(), std::io::Error> {
175 let ret = unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM) };
176 if ret == -1 {
177 Err(std::io::Error::last_os_error())
178 } else {
179 Ok(())
180 }
181}
182
183pub struct WorkerHandle {
192 child: Child,
193 reader: FrameReader<Box<dyn std::io::Read + Send>>,
194 writer: FrameWriter<Box<dyn std::io::Write + Send>>,
195 capabilities: Option<WorkerCapabilities>,
196 timeout: Duration,
197 poll_fd: i32,
199}
200
201impl WorkerHandle {
202 pub fn spawn(timeout: Duration) -> Result<Self, SupervisorError> {
204 let binary = env::current_exe().map_err(SupervisorError::SpawnFailed)?;
205 Self::spawn_impl(&binary, timeout)
206 }
207
208 pub fn spawn_binary(binary: &str, timeout: Duration) -> Result<Self, SupervisorError> {
210 Self::spawn_impl(binary.as_ref(), timeout)
211 }
212
213 #[cfg(unix)]
216 fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
217 let (cmd_read, cmd_write) = create_pipe()?;
219 let (msg_read, msg_write) = match create_pipe() {
221 Ok(fds) => fds,
222 Err(e) => {
223 close_fd(cmd_read);
224 close_fd(cmd_write);
225 return Err(SupervisorError::SpawnFailed(e));
226 }
227 };
228
229 let mut command = Command::new(binary);
232 command
233 .arg("--flux-worker")
234 .env("FLUX_IPC_FD", format!("{},{}", cmd_read, msg_write))
235 .stdin(Stdio::null())
236 .stdout(Stdio::null())
237 .stderr(Stdio::inherit());
238
239 unsafe {
242 command.pre_exec(move || {
243 let flags = libc::fcntl(cmd_read, libc::F_GETFD);
245 if flags == -1 {
246 return Err(std::io::Error::last_os_error());
247 }
248 if libc::fcntl(cmd_read, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
249 return Err(std::io::Error::last_os_error());
250 }
251
252 let flags = libc::fcntl(msg_write, libc::F_GETFD);
253 if flags == -1 {
254 return Err(std::io::Error::last_os_error());
255 }
256 if libc::fcntl(msg_write, libc::F_SETFD, flags & !libc::FD_CLOEXEC) == -1 {
257 return Err(std::io::Error::last_os_error());
258 }
259
260 libc::close(cmd_write);
262 libc::close(msg_read);
263
264 Ok(())
265 });
266 }
267
268 let child = match command.spawn() {
269 Ok(c) => c,
270 Err(e) => {
271 close_fd(cmd_read);
272 close_fd(cmd_write);
273 close_fd(msg_read);
274 close_fd(msg_write);
275 return Err(SupervisorError::SpawnFailed(e));
276 }
277 };
278
279 close_fd(cmd_read);
281 close_fd(msg_write);
282
283 let reader_file = unsafe { std::fs::File::from_raw_fd(msg_read) };
287 let writer_file = unsafe { std::fs::File::from_raw_fd(cmd_write) };
288
289 let poll_fd = unsafe { libc::dup(msg_read) };
292 if poll_fd < 0 {
293 return Err(SupervisorError::SpawnFailed(std::io::Error::last_os_error()));
294 }
295
296 let mut handle = Self {
297 child,
298 reader: FrameReader::new(Box::new(reader_file) as Box<dyn std::io::Read + Send>),
299 writer: FrameWriter::new(Box::new(writer_file) as Box<dyn std::io::Write + Send>),
300 capabilities: None,
301 timeout,
302 poll_fd,
303 };
304
305 handle.wait_for_hello()?;
306 Ok(handle)
307 }
308
309 #[cfg(not(unix))]
312 fn spawn_impl(binary: &std::path::Path, timeout: Duration) -> Result<Self, SupervisorError> {
313 let mut child = Command::new(binary)
314 .arg("--flux-worker")
315 .stdin(Stdio::piped())
316 .stdout(Stdio::piped())
317 .stderr(Stdio::inherit())
318 .spawn()?;
319
320 let stdin = child.stdin.take().expect("stdin should be available");
321 let stdout = child.stdout.take().expect("stdout should be available");
322
323 let mut handle = Self {
324 child,
325 reader: FrameReader::new(Box::new(stdout) as Box<dyn std::io::Read + Send>),
326 writer: FrameWriter::new(Box::new(stdin) as Box<dyn std::io::Write + Send>),
327 capabilities: None,
328 timeout,
329 poll_fd: -1,
330 };
331
332 handle.wait_for_hello()?;
333 Ok(handle)
334 }
335
336 fn wait_for_hello(&mut self) -> Result<(), SupervisorError> {
340 let msg: WorkerMessage = self.reader.read()?;
341
342 match msg {
343 WorkerMessage::Hello(caps) => {
344 if caps.protocol_version != fluxbench_ipc::PROTOCOL_VERSION {
345 return Err(SupervisorError::ProtocolError {
346 expected: format!("protocol version {}", fluxbench_ipc::PROTOCOL_VERSION),
347 got: format!("protocol version {}", caps.protocol_version),
348 });
349 }
350 self.capabilities = Some(caps);
351 Ok(())
352 }
353 other => Err(SupervisorError::ProtocolError {
354 expected: "Hello".to_string(),
355 got: format!("{:?}", other),
356 }),
357 }
358 }
359
360 pub fn capabilities(&self) -> Option<&WorkerCapabilities> {
362 self.capabilities.as_ref()
363 }
364
365 pub fn run_benchmark(
367 &mut self,
368 bench_id: &str,
369 config: &BenchmarkConfig,
370 ) -> Result<IpcBenchmarkResult, SupervisorError> {
371 self.writer.write(&SupervisorCommand::Run {
372 bench_id: bench_id.to_string(),
373 config: config.clone(),
374 })?;
375
376 let mut all_samples = Vec::new();
377 let start = Instant::now();
378
379 loop {
380 let remaining = self.timeout.saturating_sub(start.elapsed());
381 if remaining.is_zero() {
382 return self.handle_timeout(all_samples);
383 }
384
385 if self.reader.has_buffered_data() {
389 if !self.is_alive() {
390 return Err(SupervisorError::WorkerCrashed(
391 "Worker process crashed with partial data buffered".to_string(),
392 ));
393 }
394 } else {
395 self.wait_for_worker_data(remaining)?;
396 }
397
398 let msg: WorkerMessage = match self.reader.read::<WorkerMessage>() {
400 Ok(msg) => msg,
401 Err(FrameError::EndOfStream) => {
402 return Err(SupervisorError::WorkerCrashed(
403 "Worker closed connection unexpectedly".to_string(),
404 ));
405 }
406 Err(e) => {
407 if !self.is_alive() {
408 return Err(SupervisorError::WorkerCrashed(
409 "Worker crashed during read".to_string(),
410 ));
411 }
412 return Err(SupervisorError::IpcError(e.to_string()));
413 }
414 };
415
416 match msg {
417 WorkerMessage::SampleBatch(batch) => {
418 all_samples.extend(batch.samples);
419 }
420 WorkerMessage::WarmupComplete { .. } | WorkerMessage::Progress { .. } => {
421 continue;
422 }
423 WorkerMessage::Complete {
424 total_iterations,
425 total_duration_nanos,
426 } => {
427 return Ok(IpcBenchmarkResult {
428 bench_id: bench_id.to_string(),
429 samples: all_samples,
430 total_iterations,
431 total_duration_nanos,
432 status: IpcBenchmarkStatus::Success,
433 });
434 }
435 WorkerMessage::Failure {
436 kind,
437 message,
438 backtrace,
439 } => {
440 let kind_str = match kind {
441 fluxbench_ipc::FailureKind::Panic => "panic",
442 fluxbench_ipc::FailureKind::Timeout => "timeout",
443 fluxbench_ipc::FailureKind::Assertion => "assertion",
444 fluxbench_ipc::FailureKind::AllocationLimit => "allocation_limit",
445 fluxbench_ipc::FailureKind::Signal => "signal",
446 fluxbench_ipc::FailureKind::Unknown => "unknown",
447 }
448 .to_string();
449 return Ok(IpcBenchmarkResult {
450 bench_id: bench_id.to_string(),
451 samples: all_samples,
452 total_iterations: 0,
453 total_duration_nanos: 0,
454 status: match kind {
455 fluxbench_ipc::FailureKind::Panic => IpcBenchmarkStatus::Crashed {
456 message,
457 kind: kind_str,
458 backtrace,
459 },
460 _ => IpcBenchmarkStatus::Failed {
461 message,
462 kind: kind_str,
463 backtrace,
464 },
465 },
466 });
467 }
468 WorkerMessage::Hello(_) => {
469 return Err(SupervisorError::ProtocolError {
470 expected: "SampleBatch/Complete/Failure".to_string(),
471 got: "Hello".to_string(),
472 });
473 }
474 }
475 }
476 }
477
478 #[cfg(unix)]
483 fn wait_for_worker_data(&mut self, remaining: Duration) -> Result<(), SupervisorError> {
484 let poll_timeout = remaining.min(Duration::from_millis(100));
485 match wait_for_data_fd(self.poll_fd, poll_timeout.as_millis() as i32) {
486 PollResult::DataAvailable => {
487 if !self.is_alive() {
488 return Err(SupervisorError::WorkerCrashed(
489 "Worker process crashed with data in pipe".to_string(),
490 ));
491 }
492 Ok(())
493 }
494 PollResult::Timeout => {
495 if !self.is_alive() {
496 return Err(SupervisorError::WorkerCrashed(
497 "Worker process exited unexpectedly".to_string(),
498 ));
499 }
500 Ok(())
502 }
503 PollResult::PipeClosed => Err(SupervisorError::WorkerCrashed(
504 "Worker pipe closed unexpectedly".to_string(),
505 )),
506 PollResult::Error(e) => {
507 Err(SupervisorError::WorkerCrashed(format!("Pipe error: {}", e)))
508 }
509 }
510 }
511
512 #[cfg(not(unix))]
513 fn wait_for_worker_data(&mut self, _remaining: Duration) -> Result<(), SupervisorError> {
514 std::thread::sleep(Duration::from_millis(10));
517 if !self.is_alive() {
518 return Err(SupervisorError::WorkerCrashed(
519 "Worker process exited unexpectedly".to_string(),
520 ));
521 }
522 Ok(())
523 }
524
525 #[cfg(unix)]
528 fn handle_timeout(
529 &mut self,
530 mut samples: Vec<Sample>,
531 ) -> Result<IpcBenchmarkResult, SupervisorError> {
532 let _ = send_sigterm(self.child.id());
534
535 let drain_deadline = Instant::now() + Duration::from_millis(500);
537 loop {
538 let remaining = drain_deadline.saturating_duration_since(Instant::now());
539 if remaining.is_zero() {
540 break;
541 }
542
543 match wait_for_data_fd(self.poll_fd, remaining.as_millis() as i32) {
544 PollResult::DataAvailable => match self.reader.read::<WorkerMessage>() {
545 Ok(WorkerMessage::SampleBatch(batch)) => {
546 samples.extend(batch.samples);
547 }
548 Ok(WorkerMessage::Complete { .. }) => break,
549 _ => break,
550 },
551 PollResult::PipeClosed => break,
552 _ => break,
553 }
554 }
555
556 if self.is_alive() {
557 let _ = self.child.kill();
558 let _ = self.child.wait();
559 }
560
561 Err(SupervisorError::Timeout)
562 }
563
564 #[cfg(not(unix))]
565 fn handle_timeout(
566 &mut self,
567 _samples: Vec<Sample>,
568 ) -> Result<IpcBenchmarkResult, SupervisorError> {
569 if self.is_alive() {
571 let _ = self.child.kill();
572 let _ = self.child.wait();
573 }
574 Err(SupervisorError::Timeout)
575 }
576
577 pub fn ping(&mut self) -> Result<bool, SupervisorError> {
579 self.writer.write(&SupervisorCommand::Ping)?;
580 Ok(true)
581 }
582
583 pub fn abort(&mut self) -> Result<(), SupervisorError> {
585 self.writer.write(&SupervisorCommand::Abort)?;
586 Ok(())
587 }
588
589 pub fn shutdown(mut self) -> Result<(), SupervisorError> {
591 self.writer.write(&SupervisorCommand::Shutdown)?;
592 let _ = self.child.wait();
593 Ok(())
594 }
595
596 pub fn is_alive(&mut self) -> bool {
598 match self.child.try_wait() {
599 Ok(Some(_)) => false,
600 Ok(None) => true,
601 Err(_) => false,
602 }
603 }
604
605 pub fn kill(&mut self) -> Result<(), SupervisorError> {
607 self.child.kill().map_err(SupervisorError::SpawnFailed)?;
608 let _ = self.child.wait();
609 Ok(())
610 }
611}
612
613impl Drop for WorkerHandle {
614 fn drop(&mut self) {
615 if self.is_alive() {
616 #[cfg(unix)]
617 {
618 let _ = send_sigterm(self.child.id());
620 std::thread::sleep(Duration::from_millis(50));
621 }
622 if self.is_alive() {
623 let _ = self.child.kill();
624 }
625 let _ = self.child.wait();
626 }
627
628 #[cfg(unix)]
630 if self.poll_fd >= 0 {
631 close_fd(self.poll_fd);
632 }
633 }
634}
635
636pub struct Supervisor {
640 config: BenchmarkConfig,
641 timeout: Duration,
642 num_workers: usize,
643}
644
645impl Supervisor {
646 pub fn new(config: BenchmarkConfig, timeout: Duration, num_workers: usize) -> Self {
648 Self {
649 config,
650 timeout,
651 num_workers: num_workers.max(1),
652 }
653 }
654
655 pub fn run_all(
659 &self,
660 benchmarks: &[&BenchmarkDef],
661 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
662 let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
663 self.run_all_configs(benchmarks, &configs)
664 }
665
666 pub fn run_all_configs(
668 &self,
669 benchmarks: &[&BenchmarkDef],
670 configs: &[BenchmarkConfig],
671 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
672 if benchmarks.is_empty() {
673 return Ok(Vec::new());
674 }
675
676 if self.num_workers == 1 || benchmarks.len() == 1 {
677 let mut results = Vec::with_capacity(benchmarks.len());
678 for (bench, cfg) in benchmarks.iter().zip(configs.iter()) {
679 results.push(self.run_isolated(bench, cfg)?);
680 }
681 return Ok(results);
682 }
683
684 let worker_count = self.num_workers.min(benchmarks.len());
685 let pool = ThreadPoolBuilder::new()
686 .num_threads(worker_count)
687 .build()
688 .map_err(|e| {
689 SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
690 })?;
691
692 let pairs: Vec<_> = benchmarks.iter().zip(configs.iter()).collect();
693 let outcomes: Vec<Result<IpcBenchmarkResult, SupervisorError>> = pool.install(|| {
694 pairs
695 .par_iter()
696 .map(|(bench, cfg)| self.run_isolated(bench, cfg))
697 .collect()
698 });
699
700 let mut results = Vec::with_capacity(outcomes.len());
701 for outcome in outcomes {
702 results.push(outcome?);
703 }
704 Ok(results)
705 }
706
707 fn run_isolated(
709 &self,
710 bench: &BenchmarkDef,
711 config: &BenchmarkConfig,
712 ) -> Result<IpcBenchmarkResult, SupervisorError> {
713 let mut worker = WorkerHandle::spawn(self.timeout)?;
714 let result = worker.run_benchmark(bench.id, config);
715 let _ = worker.shutdown();
716 result
717 }
718
719 fn crashed_result(bench: &BenchmarkDef, message: String) -> IpcBenchmarkResult {
720 IpcBenchmarkResult {
721 bench_id: bench.id.to_string(),
722 samples: Vec::new(),
723 total_iterations: 0,
724 total_duration_nanos: 0,
725 status: IpcBenchmarkStatus::Crashed {
726 message,
727 kind: "crashed".to_string(),
728 backtrace: None,
729 },
730 }
731 }
732
733 fn run_with_reuse_indexed(
734 &self,
735 benchmarks: &[(usize, &BenchmarkDef, &BenchmarkConfig)],
736 ) -> Vec<(usize, IpcBenchmarkResult)> {
737 let mut results = Vec::with_capacity(benchmarks.len());
738 if benchmarks.is_empty() {
739 return results;
740 }
741
742 let mut worker = match WorkerHandle::spawn(self.timeout) {
743 Ok(worker) => Some(worker),
744 Err(e) => {
745 let message = e.to_string();
746 for &(index, bench, _) in benchmarks {
747 results.push((index, Self::crashed_result(bench, message.clone())));
748 }
749 return results;
750 }
751 };
752
753 for &(index, bench, cfg) in benchmarks {
754 if worker.is_none() {
755 match WorkerHandle::spawn(self.timeout) {
756 Ok(new_worker) => worker = Some(new_worker),
757 Err(e) => {
758 results.push((index, Self::crashed_result(bench, e.to_string())));
759 continue;
760 }
761 }
762 }
763
764 let run_result = match worker.as_mut() {
765 Some(worker) => worker.run_benchmark(bench.id, cfg),
766 None => Err(SupervisorError::IpcError(
767 "worker unexpectedly absent after spawn".to_string(),
768 )),
769 };
770
771 match run_result {
772 Ok(result) => results.push((index, result)),
773 Err(e) => {
774 let worker_is_alive = worker.as_mut().map(|w| w.is_alive()).unwrap_or(false);
775 if !worker_is_alive {
776 if let Some(mut dead_worker) = worker.take() {
777 let _ = dead_worker.kill();
778 }
779 }
780 results.push((index, Self::crashed_result(bench, e.to_string())));
781 }
782 }
783 }
784
785 if let Some(worker) = worker {
786 let _ = worker.shutdown();
787 }
788
789 results
790 }
791
792 pub fn run_with_reuse(
796 &self,
797 benchmarks: &[&BenchmarkDef],
798 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
799 let configs: Vec<_> = benchmarks.iter().map(|_| self.config.clone()).collect();
800 self.run_with_reuse_configs(benchmarks, &configs)
801 }
802
803 pub fn run_with_reuse_configs(
805 &self,
806 benchmarks: &[&BenchmarkDef],
807 configs: &[BenchmarkConfig],
808 ) -> Result<Vec<IpcBenchmarkResult>, SupervisorError> {
809 if benchmarks.is_empty() {
810 return Ok(Vec::new());
811 }
812
813 let indexed_benchmarks: Vec<(usize, &BenchmarkDef, &BenchmarkConfig)> = benchmarks
814 .iter()
815 .zip(configs.iter())
816 .enumerate()
817 .map(|(index, (bench, cfg))| (index, *bench, cfg))
818 .collect();
819
820 let mut indexed_results = if self.num_workers == 1 || benchmarks.len() == 1 {
821 self.run_with_reuse_indexed(&indexed_benchmarks)
822 } else {
823 let worker_count = self.num_workers.min(indexed_benchmarks.len());
824 let mut shards: Vec<Vec<(usize, &BenchmarkDef, &BenchmarkConfig)>> =
825 vec![Vec::new(); worker_count];
826 for (position, entry) in indexed_benchmarks.into_iter().enumerate() {
827 shards[position % worker_count].push(entry);
828 }
829
830 let pool = ThreadPoolBuilder::new()
831 .num_threads(worker_count)
832 .build()
833 .map_err(|e| {
834 SupervisorError::IpcError(format!("Failed to build worker pool: {}", e))
835 })?;
836
837 let shard_results: Vec<Vec<(usize, IpcBenchmarkResult)>> = pool.install(|| {
838 shards
839 .into_par_iter()
840 .map(|shard| self.run_with_reuse_indexed(&shard))
841 .collect()
842 });
843
844 shard_results.into_iter().flatten().collect()
845 };
846
847 indexed_results.sort_by_key(|(index, _)| *index);
848 if indexed_results.len() != benchmarks.len() {
849 return Err(SupervisorError::IpcError(format!(
850 "Internal error: expected {} results, got {}",
851 benchmarks.len(),
852 indexed_results.len()
853 )));
854 }
855
856 Ok(indexed_results
857 .into_iter()
858 .map(|(_, result)| result)
859 .collect())
860 }
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866
867 #[test]
868 #[ignore] fn test_supervisor_spawn() {
870 let timeout = Duration::from_secs(30);
871 let config = BenchmarkConfig::default();
872 let supervisor = Supervisor::new(config, timeout, 1);
873 assert_eq!(supervisor.num_workers, 1);
874 }
875}