1use std::collections::VecDeque;
6use std::fs::OpenOptions;
7use std::io::{self, Read, Write};
8use std::net::Shutdown;
9use std::os::unix::fs::FileTypeExt;
10use std::os::unix::net::{UnixListener, UnixStream};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17pub const SIMPLE_IPC_QUIT: i32 = -2;
19
20const LARGE_PACKET_DATA_MAX: usize = 65516;
22
23const CONNECT_TIMEOUT_MS: i32 = 1000;
24const WAIT_STEP_MS: u64 = 50;
25
26#[must_use]
28pub fn supports_simple_ipc() -> bool {
29 cfg!(unix)
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum IpcActiveState {
34 Listening,
35 NotListening,
36 InvalidPath,
37 PathNotFound,
38 OtherError,
39}
40
41fn packet_hex_len(payload_len: usize) -> io::Result<[u8; 4]> {
42 let packet_size = payload_len + 4;
43 if packet_size > 0xffff {
44 return Err(io::Error::new(
45 io::ErrorKind::InvalidInput,
46 "packet exceeds max size",
47 ));
48 }
49 Ok(hex4_upper(packet_size))
50}
51
52fn hex4_upper(mut n: usize) -> [u8; 4] {
53 const HEX: &[u8; 16] = b"0123456789abcdef";
54 let mut out = [0u8; 4];
55 for i in (0..4).rev() {
56 out[i] = HEX[n & 0xf];
57 n >>= 4;
58 }
59 out
60}
61
62fn write_packet(w: &mut dyn Write, buf: &[u8]) -> io::Result<()> {
63 if buf.len() > LARGE_PACKET_DATA_MAX {
64 return Err(io::Error::new(
65 io::ErrorKind::InvalidInput,
66 "packet exceeds max size",
67 ));
68 }
69 let hdr = packet_hex_len(buf.len())?;
70 w.write_all(&hdr)?;
71 w.write_all(buf)?;
72 Ok(())
73}
74
75fn write_packetized_from_buf(w: &mut dyn Write, mut data: &[u8]) -> io::Result<()> {
76 while !data.is_empty() {
77 let n = data.len().min(LARGE_PACKET_DATA_MAX);
78 write_packet(w, &data[..n])?;
79 data = &data[n..];
80 }
81 Ok(())
82}
83
84fn packet_flush_gently(w: &mut dyn Write) -> io::Result<()> {
85 w.write_all(b"0000")?;
86 w.flush()?;
87 Ok(())
88}
89
90fn read_one_packet<R: Read>(r: &mut R, buf: &mut Vec<u8>) -> io::Result<Option<()>> {
91 let mut linelen = [0u8; 4];
92 match r.read_exact(&mut linelen) {
93 Ok(()) => {}
94 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
95 Err(e) => return Err(e),
96 }
97 let len_str = std::str::from_utf8(&linelen).map_err(|e| {
98 io::Error::new(
99 io::ErrorKind::InvalidData,
100 format!("invalid pkt-line length encoding: {e}"),
101 )
102 })?;
103 let len = usize::from_str_radix(len_str, 16).map_err(|e| {
104 io::Error::new(
105 io::ErrorKind::InvalidData,
106 format!("invalid pkt-line length: {e}"),
107 )
108 })?;
109 match len {
110 0 => Ok(None),
111 1 | 2 => Ok(Some(())),
112 n if n < 4 => Err(io::Error::new(
113 io::ErrorKind::InvalidData,
114 format!("bad pkt-line length {n}"),
115 )),
116 n => {
117 let payload = n - 4;
118 let start = buf.len();
119 buf.resize(start + payload, 0);
120 r.read_exact(&mut buf[start..])?;
121 Ok(Some(()))
122 }
123 }
124}
125
126fn read_packetized_to_end<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
127 let mut out = Vec::new();
128 loop {
129 if read_one_packet(r, &mut out)?.is_none() {
130 break;
131 }
132 }
133 Ok(out)
134}
135
136fn unix_stream_connect(path: &Path, _disallow_chdir: bool) -> io::Result<UnixStream> {
137 UnixStream::connect(path)
139}
140
141fn connect_with_retry(path: &Path, wait_if_busy: bool, wait_if_not_found: bool) -> IpcActiveState {
142 let mut elapsed: i32 = 0;
143 loop {
144 match unix_stream_connect(path, false) {
145 Ok(s) => {
146 drop(s);
147 return IpcActiveState::Listening;
148 }
149 Err(e) => {
150 let code = e.raw_os_error();
151 let retry = match code {
152 Some(libc::ENOENT) => wait_if_not_found,
153 Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => wait_if_busy,
154 _ => false,
155 };
156 if !retry || elapsed >= CONNECT_TIMEOUT_MS {
157 return match code {
158 Some(libc::ENOENT) => IpcActiveState::PathNotFound,
159 Some(libc::ECONNREFUSED) => IpcActiveState::NotListening,
160 _ => IpcActiveState::OtherError,
161 };
162 }
163 thread::sleep(Duration::from_millis(WAIT_STEP_MS));
164 elapsed += WAIT_STEP_MS as i32;
165 }
166 }
167 }
168}
169
170pub fn ipc_get_active_state(path: &Path) -> IpcActiveState {
172 let meta = match std::fs::symlink_metadata(path) {
173 Ok(m) => m,
174 Err(e) if e.kind() == io::ErrorKind::NotFound => return IpcActiveState::NotListening,
175 Err(_) => return IpcActiveState::InvalidPath,
176 };
177 if !meta.file_type().is_socket() {
178 return IpcActiveState::InvalidPath;
179 }
180 connect_with_retry(path, false, false)
181}
182
183#[derive(Default)]
184pub struct IpcClientConnectOptions {
185 pub wait_if_busy: bool,
186 pub wait_if_not_found: bool,
187 pub uds_disallow_chdir: bool,
188}
189
190fn connect_for_client(path: &Path, options: &IpcClientConnectOptions) -> io::Result<UnixStream> {
191 let mut elapsed: i32 = 0;
192 loop {
193 match unix_stream_connect(path, options.uds_disallow_chdir) {
194 Ok(s) => return Ok(s),
195 Err(e) => {
196 let code = e.raw_os_error();
197 let retry = match code {
198 Some(libc::ENOENT) => options.wait_if_not_found,
199 Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => options.wait_if_busy,
200 _ => false,
201 };
202 if !retry || elapsed >= CONNECT_TIMEOUT_MS {
203 return Err(e);
204 }
205 thread::sleep(Duration::from_millis(WAIT_STEP_MS));
206 elapsed += WAIT_STEP_MS as i32;
207 }
208 }
209 }
210}
211
212pub fn ipc_client_send_command(
214 path: &Path,
215 options: &IpcClientConnectOptions,
216 message: &[u8],
217) -> io::Result<Vec<u8>> {
218 let mut stream = connect_for_client(path, options)?;
219 write_packetized_from_buf(&mut stream, message)?;
220 packet_flush_gently(&mut stream)?;
221 read_packetized_to_end(&mut stream)
222}
223
224fn block_sigpipe() {
225 use nix::sys::signal::{pthread_sigmask, SigSet, SigmaskHow, Signal};
226 let mut set = SigSet::empty();
227 set.add(Signal::SIGPIPE);
228 let _ = pthread_sigmask(SigmaskHow::SIG_BLOCK, Some(&set), None);
229}
230
231fn wait_for_io_start(stream: &UnixStream, server_shutdown: &AtomicBool) -> io::Result<()> {
232 use nix::poll::{poll, PollFd, PollFlags};
233 use std::os::fd::AsFd;
234 loop {
235 if server_shutdown.load(Ordering::SeqCst) {
236 return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "shutdown"));
237 }
238 let mut fds = [PollFd::new(stream.as_fd(), PollFlags::POLLIN)];
239 match poll(&mut fds, 10u16) {
240 Ok(_) => {}
241 Err(nix::errno::Errno::EINTR) => continue,
242 Err(e) => return Err(io::Error::from_raw_os_error(e as i32)),
243 }
244 let revents = fds[0].revents().unwrap_or_else(PollFlags::empty);
245 if revents.contains(PollFlags::POLLHUP) {
246 return Err(io::Error::new(
247 io::ErrorKind::ConnectionAborted,
248 "client hangup",
249 ));
250 }
251 if revents.contains(PollFlags::POLLIN) {
252 return Ok(());
253 }
254 }
255}
256
257type AppCb = Arc<dyn Fn(&[u8], &mut dyn Write) -> i32 + Send + Sync + 'static>;
258
259struct WorkQueue {
260 fifo: Mutex<VecDeque<UnixStream>>,
261 cv: Condvar,
262 shutdown_requested: AtomicBool,
263 capacity: usize,
264}
265
266impl WorkQueue {
267 fn new(capacity: usize) -> Self {
268 Self {
269 fifo: Mutex::new(VecDeque::new()),
270 cv: Condvar::new(),
271 shutdown_requested: AtomicBool::new(false),
272 capacity,
273 }
274 }
275
276 fn enqueue(&self, stream: UnixStream) {
277 let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
278 if self.shutdown_requested.load(Ordering::SeqCst) {
279 return;
280 }
281 if guard.len() >= self.capacity {
282 return;
283 }
284 guard.push_back(stream);
285 self.cv.notify_one();
286 }
287
288 fn dequeue(&self) -> Option<UnixStream> {
289 let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
290 loop {
291 if let Some(s) = guard.pop_front() {
292 return Some(s);
293 }
294 if self.shutdown_requested.load(Ordering::SeqCst) {
295 return None;
296 }
297 guard = self.cv.wait(guard).unwrap_or_else(|e| e.into_inner());
298 }
299 }
300
301 fn stop(&self) {
302 self.shutdown_requested.store(true, Ordering::SeqCst);
303 let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
304 guard.clear();
305 drop(guard);
306 self.cv.notify_all();
307 }
308}
309
310fn serve_one_connection(
311 mut stream: UnixStream,
312 app: AppCb,
313 server_shutdown: Arc<AtomicBool>,
314 wake: Arc<Mutex<UnixStream>>,
315 queue: Arc<WorkQueue>,
316) {
317 if wait_for_io_start(&stream, &server_shutdown).is_err() {
318 let _ = stream.shutdown(Shutdown::Both);
319 return;
320 }
321 let request = match read_packetized_to_end(&mut stream) {
322 Ok(r) => r,
323 Err(_) => {
324 let _ = stream.shutdown(Shutdown::Both);
325 return;
326 }
327 };
328 let ret = app(&request, &mut stream);
329 let _ = packet_flush_gently(&mut stream);
330 let _ = stream.shutdown(Shutdown::Both);
331 if ret == SIMPLE_IPC_QUIT {
332 server_shutdown.store(true, Ordering::SeqCst);
333 queue.stop();
334 if let Ok(mut tx) = wake.lock() {
335 let _ = tx.write_all(b"Q");
336 }
337 }
338}
339
340#[derive(Debug)]
341pub enum ServerRunError {
342 Io(io::Error),
343 AddressInUse,
344}
345
346impl std::fmt::Display for ServerRunError {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 match self {
349 ServerRunError::Io(e) => write!(f, "{e}"),
350 ServerRunError::AddressInUse => write!(f, "socket path already in use"),
351 }
352 }
353}
354
355impl std::error::Error for ServerRunError {
356 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
357 match self {
358 ServerRunError::Io(e) => Some(e),
359 ServerRunError::AddressInUse => None,
360 }
361 }
362}
363
364fn try_bind_server(path: &Path) -> io::Result<()> {
365 let lock_path = path.with_extension("lock");
366 let _ = OpenOptions::new()
367 .write(true)
368 .create(true)
369 .truncate(true)
370 .open(&lock_path);
371
372 if path.exists() {
373 if is_socket(path) && unix_stream_connect(path, false).is_ok() {
374 let _ = std::fs::remove_file(&lock_path);
375 return Err(io::Error::new(
376 io::ErrorKind::AddrInUse,
377 "another server is listening",
378 ));
379 }
380 let _ = std::fs::remove_file(path);
381 }
382
383 let _ = std::fs::remove_file(&lock_path);
384 Ok(())
385}
386
387fn is_socket(path: &Path) -> bool {
388 std::fs::symlink_metadata(path)
389 .ok()
390 .is_some_and(|m| m.file_type().is_socket())
391}
392
393pub fn ipc_server_run(path: &Path, nr_threads: usize, app: AppCb) -> Result<(), ServerRunError> {
395 try_bind_server(path).map_err(|e| {
396 if e.kind() == io::ErrorKind::AddrInUse {
397 ServerRunError::AddressInUse
398 } else {
399 ServerRunError::Io(e)
400 }
401 })?;
402
403 let listener = UnixListener::bind(path).map_err(ServerRunError::Io)?;
404 listener.set_nonblocking(true).map_err(ServerRunError::Io)?;
405
406 let nr_threads = nr_threads.max(1);
407 let capacity = nr_threads.saturating_mul(100).max(1);
408 let queue = Arc::new(WorkQueue::new(capacity));
409 let server_shutdown = Arc::new(AtomicBool::new(false));
410
411 let (shutdown_tx, shutdown_rx) = UnixStream::pair().map_err(ServerRunError::Io)?;
412 shutdown_rx
413 .set_nonblocking(true)
414 .map_err(ServerRunError::Io)?;
415 let wake = Arc::new(Mutex::new(shutdown_tx));
416
417 let mut worker_handles = Vec::new();
418 for _ in 0..nr_threads {
419 let q = Arc::clone(&queue);
420 let app_w = Arc::clone(&app);
421 let shut = Arc::clone(&server_shutdown);
422 let wake_w = Arc::clone(&wake);
423 let q_for_worker = Arc::clone(&queue);
424 worker_handles.push(thread::spawn(move || {
425 block_sigpipe();
426 while let Some(stream) = q.dequeue() {
427 serve_one_connection(
428 stream,
429 app_w.clone(),
430 shut.clone(),
431 wake_w.clone(),
432 q_for_worker.clone(),
433 );
434 }
435 }));
436 }
437
438 block_sigpipe();
439 use nix::poll::{poll, PollFd, PollFlags};
440 use std::os::fd::AsFd;
441
442 loop {
443 if server_shutdown.load(Ordering::SeqCst) {
444 break;
445 }
446 let mut fds = [
448 PollFd::new(shutdown_rx.as_fd(), PollFlags::POLLIN),
449 PollFd::new(listener.as_fd(), PollFlags::POLLIN),
450 ];
451 match poll(&mut fds, 60_000u16) {
452 Ok(_) => {}
453 Err(nix::errno::Errno::EINTR) => continue,
454 Err(_) => break,
455 }
456 let revents0 = fds[0].revents().unwrap_or_else(PollFlags::empty);
457 let revents1 = fds[1].revents().unwrap_or_else(PollFlags::empty);
458 if revents0.contains(PollFlags::POLLIN) {
459 break;
460 }
461 if revents1.contains(PollFlags::POLLIN) {
462 loop {
463 match listener.accept() {
464 Ok((stream, _)) => {
465 let _ = stream.set_nonblocking(false);
466 queue.enqueue(stream);
467 }
468 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
469 Err(_) => break,
470 }
471 }
472 }
473 }
474
475 queue.stop();
476 drop(listener);
477 for h in worker_handles {
478 let _ = h.join();
479 }
480 let _ = std::fs::remove_file(path);
481 Ok(())
482}
483
484#[must_use]
486pub fn test_app_callback() -> AppCb {
487 Arc::new(|request: &[u8], reply: &mut dyn Write| {
488 if request == b"quit" {
489 return SIMPLE_IPC_QUIT;
490 }
491 if request == b"ping" {
492 let _ = write_packetized_from_buf(reply, b"pong");
493 return 0;
494 }
495 if request == b"big" {
496 let mut line = Vec::with_capacity(84);
497 for row in 0..10_000 {
498 line.clear();
499 use std::io::Write as _;
500 let _ = writeln!(&mut line, "big: {:075}", row);
501 let _ = write_packetized_from_buf(reply, &line);
502 }
503 return 0;
504 }
505 if request == b"chunk" {
506 let mut line = Vec::with_capacity(84);
507 for row in 0..10_000 {
508 line.clear();
509 use std::io::Write as _;
510 let _ = writeln!(&mut line, "big: {:075}", row);
511 let _ = write_packet(reply, &line);
512 }
513 return 0;
514 }
515 if request == b"slow" {
516 let mut line = Vec::with_capacity(84);
517 for row in 0..1000 {
518 line.clear();
519 use std::io::Write as _;
520 let _ = writeln!(&mut line, "big: {:075}", row);
521 let _ = write_packet(reply, &line);
522 thread::sleep(Duration::from_millis(10));
523 }
524 return 0;
525 }
526 if request.len() >= 10 && request.starts_with(b"sendbytes ") {
527 return handle_sendbytes(request, reply);
528 }
529 let msg = format!("unhandled command: {}", String::from_utf8_lossy(request));
530 let _ = write_packetized_from_buf(reply, msg.as_bytes());
531 0
532 })
533}
534
535fn handle_sendbytes(request: &[u8], reply: &mut dyn Write) -> i32 {
536 let rest = &request[b"sendbytes ".len()..];
537 if rest.is_empty() {
538 return 0;
539 }
540 let b0 = rest[0];
541 let mut errs = 0usize;
542 for &b in &rest[1..] {
543 if b != b0 {
544 errs += 1;
545 }
546 }
547 if errs > 0 {
548 let msg = format!("errs:{errs}\n");
549 let _ = write_packetized_from_buf(reply, msg.as_bytes());
550 } else {
551 let msg = format!("rcvd:{}{:08}\n", char::from(b0), rest.len());
552 let _ = write_packetized_from_buf(reply, msg.as_bytes());
553 }
554 0
555}
556
557pub fn run_simple_ipc_tool(args: &[String]) -> i32 {
559 if args.first().map(|s| s.as_str()) == Some("SUPPORTS_SIMPLE_IPC") {
560 return 0;
561 }
562 if args.is_empty() {
563 eprintln!("usage: test-tool simple-ipc <subcommand> ...");
564 return 1;
565 }
566
567 let mut path = PathBuf::from("ipc-test");
568 let mut nr_threads = 5usize;
569 let mut max_wait_sec = 60u64;
570 let mut bytecount = 1024usize;
571 let mut batchsize = 10usize;
572 let mut token: Option<String> = None;
573 let mut bytevalue: u8 = b'x';
574
575 let sub = args[0].clone();
576 let mut i = 1usize;
577 while i < args.len() {
578 let a = args[i].as_str();
579 if let Some(v) = a.strip_prefix("--name=") {
580 path = PathBuf::from(v);
581 } else if let Some(v) = a.strip_prefix("--threads=") {
582 nr_threads = v.parse().unwrap_or(1).max(1);
583 } else if let Some(v) = a.strip_prefix("--max-wait=") {
584 max_wait_sec = v.parse().unwrap_or(0);
585 } else if let Some(v) = a.strip_prefix("--bytecount=") {
586 bytecount = v.parse().unwrap_or(1).max(1);
587 } else if let Some(v) = a.strip_prefix("--batchsize=") {
588 batchsize = v.parse().unwrap_or(1).max(1);
589 } else if let Some(v) = a.strip_prefix("--token=") {
590 token = Some(v.to_string());
591 } else if let Some(v) = a.strip_prefix("--byte=") {
592 if let Some(c) = v.as_bytes().first() {
593 bytevalue = *c;
594 }
595 }
596 i += 1;
597 }
598
599 match sub.as_str() {
600 "is-active" => match ipc_get_active_state(&path) {
601 IpcActiveState::Listening => 0,
602 IpcActiveState::NotListening => {
603 eprintln!("no server listening at '{}'", path.display());
604 1
605 }
606 IpcActiveState::PathNotFound => {
607 eprintln!("path not found '{}'", path.display());
608 1
609 }
610 IpcActiveState::InvalidPath => {
611 eprintln!("invalid pipe/socket name '{}'", path.display());
612 1
613 }
614 IpcActiveState::OtherError => {
615 eprintln!("other error for '{}'", path.display());
616 1
617 }
618 },
619 "run-daemon" => {
620 let app = test_app_callback();
621 match ipc_server_run(&path, nr_threads, app) {
622 Ok(()) => 0,
623 Err(ServerRunError::AddressInUse) => {
624 eprintln!("socket/pipe already in use: '{}'", path.display());
625 1
626 }
627 Err(ServerRunError::Io(e)) => {
628 eprintln!("could not start server on '{}': {e}", path.display());
629 1
630 }
631 }
632 }
633 "start-daemon" => match spawn_daemon(&path, nr_threads, max_wait_sec) {
634 Ok(()) => 0,
635 Err(e) => {
636 eprintln!("{e}");
637 1
638 }
639 },
640 "stop-daemon" => {
641 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
642 eprintln!("no server listening at '{}'", path.display());
643 return 1;
644 }
645 let opts = IpcClientConnectOptions {
646 wait_if_busy: true,
647 wait_if_not_found: false,
648 uds_disallow_chdir: false,
649 };
650 if ipc_client_send_command(&path, &opts, b"quit").is_err() {
651 return 1;
652 }
653 let deadline = SystemTime::now()
654 .duration_since(UNIX_EPOCH)
655 .unwrap_or_default()
656 .as_secs()
657 + max_wait_sec;
658 loop {
659 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
660 return 0;
661 }
662 let now = SystemTime::now()
663 .duration_since(UNIX_EPOCH)
664 .unwrap_or_default()
665 .as_secs();
666 if now > deadline {
667 eprintln!("daemon has not shutdown yet");
668 return 1;
669 }
670 thread::sleep(Duration::from_millis(100));
671 }
672 }
673 "send" => {
674 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
675 eprintln!("no server listening at '{}'", path.display());
676 return 1;
677 }
678 let cmd = token.as_deref().unwrap_or("(no-command)");
679 let opts = IpcClientConnectOptions {
680 wait_if_busy: true,
681 wait_if_not_found: false,
682 uds_disallow_chdir: false,
683 };
684 match ipc_client_send_command(&path, &opts, cmd.as_bytes()) {
685 Ok(resp) => {
686 if !resp.is_empty() {
687 println!("{}", String::from_utf8_lossy(&resp).trim_end());
688 }
689 0
690 }
691 Err(_) => {
692 eprintln!("failed to send '{cmd}' to '{}'", path.display());
693 1
694 }
695 }
696 }
697 "sendbytes" => {
698 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
699 eprintln!("no server listening at '{}'", path.display());
700 return 1;
701 }
702 let mut msg = b"sendbytes ".to_vec();
703 msg.extend(std::iter::repeat_n(bytevalue, bytecount));
704 let opts = IpcClientConnectOptions {
705 wait_if_busy: true,
706 wait_if_not_found: false,
707 uds_disallow_chdir: false,
708 };
709 match ipc_client_send_command(&path, &opts, &msg) {
710 Ok(resp) => {
711 let tail = String::from_utf8_lossy(&resp);
712 let tail = tail.trim_end();
713 println!("sent:{}{:08} {tail}", char::from(bytevalue), bytecount);
714 0
715 }
716 Err(_) => 1,
717 }
718 }
719 "multiple" => {
720 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
721 eprintln!("no server listening at '{}'", path.display());
722 return 1;
723 }
724 run_multiple(&path, nr_threads, bytecount, batchsize)
725 }
726 _ => {
727 eprintln!("Unhandled subcommand: '{sub}'");
728 1
729 }
730 }
731}
732
733fn spawn_daemon(path: &Path, nr_threads: usize, max_wait_sec: u64) -> Result<(), String> {
734 use std::process::{Command, Stdio};
735 let exe = std::env::current_exe().map_err(|e| e.to_string())?;
736 let mut cmd = Command::new(exe);
737 cmd.arg("test-tool")
738 .arg("simple-ipc")
739 .arg("run-daemon")
740 .arg(format!("--name={}", path.display()))
741 .arg(format!("--threads={nr_threads}"))
742 .stdin(Stdio::null())
743 .stdout(Stdio::null())
744 .stderr(Stdio::null());
745 cmd.spawn().map_err(|e| e.to_string())?;
746 let deadline = SystemTime::now()
747 .duration_since(UNIX_EPOCH)
748 .map_err(|e| e.to_string())?
749 .as_secs()
750 + max_wait_sec.max(1);
751 loop {
752 if matches!(ipc_get_active_state(path), IpcActiveState::Listening) {
753 return Ok(());
754 }
755 let now = SystemTime::now()
756 .duration_since(UNIX_EPOCH)
757 .map_err(|e| e.to_string())?
758 .as_secs();
759 if now > deadline {
760 return Err("daemon not online yet".to_string());
761 }
762 thread::sleep(Duration::from_millis(50));
763 }
764}
765
766fn run_multiple(path: &Path, nr_threads: usize, bytecount: usize, batchsize: usize) -> i32 {
767 use std::sync::atomic::{AtomicUsize, Ordering as AOrd};
768 let sum_errors = Arc::new(AtomicUsize::new(0));
769 let sum_good = Arc::new(AtomicUsize::new(0));
770 let sum_join_errors = Arc::new(AtomicUsize::new(0));
771 let mut handles = Vec::new();
772 for k in 0..nr_threads {
773 let p = path.to_path_buf();
774 let letter = (b'A' + (k % 26) as u8) as char;
775 let base_count = bytecount + batchsize * (k / 26);
776 let batch = batchsize;
777 let sg = Arc::clone(&sum_good);
778 let se = Arc::clone(&sum_errors);
779 handles.push(thread::spawn(move || {
780 for t in 0..batch {
781 let n = base_count + t;
782 let mut msg = b"sendbytes ".to_vec();
783 msg.extend(std::iter::repeat_n(letter as u8, n));
784 let opts = IpcClientConnectOptions {
785 wait_if_busy: true,
786 wait_if_not_found: false,
787 uds_disallow_chdir: true,
788 };
789 match ipc_client_send_command(&p, &opts, &msg) {
790 Ok(resp) => {
791 let tail = String::from_utf8_lossy(&resp);
792 let tail = tail.trim_end();
793 println!("sent:{}{:08} {tail}", letter, n);
794 sg.fetch_add(1, AOrd::SeqCst);
795 }
796 Err(_) => {
797 se.fetch_add(1, AOrd::SeqCst);
798 }
799 }
800 }
801 }));
802 }
803 for h in handles {
804 if h.join().is_err() {
805 sum_join_errors.fetch_add(1, AOrd::SeqCst);
806 }
807 }
808 let good = sum_good.load(AOrd::SeqCst);
809 let je = sum_join_errors.load(AOrd::SeqCst);
810 let err = sum_errors.load(AOrd::SeqCst);
811 println!("client (good {good}) (join {je}), (errors {err})");
812 if je + err > 0 {
813 1
814 } else {
815 0
816 }
817}