1use std::collections::VecDeque;
6use std::fs::OpenOptions;
7use std::io::{self, Read, Write};
8use std::net::Shutdown;
9use std::os::unix::fs::FileTypeExt;
10use std::os::unix::io::AsRawFd;
11use std::os::unix::net::{UnixListener, UnixStream};
12use std::path::{Path, PathBuf};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::{Arc, Condvar, Mutex};
15use std::thread;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18pub const SIMPLE_IPC_QUIT: i32 = -2;
20
21const LARGE_PACKET_DATA_MAX: usize = 65516;
23
24const CONNECT_TIMEOUT_MS: i32 = 1000;
25const WAIT_STEP_MS: u64 = 50;
26
27#[must_use]
29pub fn supports_simple_ipc() -> bool {
30 cfg!(unix)
31}
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34pub enum IpcActiveState {
35 Listening,
36 NotListening,
37 InvalidPath,
38 PathNotFound,
39 OtherError,
40}
41
42fn packet_hex_len(payload_len: usize) -> io::Result<[u8; 4]> {
43 let packet_size = payload_len + 4;
44 if packet_size > 0xffff {
45 return Err(io::Error::new(
46 io::ErrorKind::InvalidInput,
47 "packet exceeds max size",
48 ));
49 }
50 Ok(hex4_upper(packet_size))
51}
52
53fn hex4_upper(mut n: usize) -> [u8; 4] {
54 const HEX: &[u8; 16] = b"0123456789abcdef";
55 let mut out = [0u8; 4];
56 for i in (0..4).rev() {
57 out[i] = HEX[n & 0xf];
58 n >>= 4;
59 }
60 out
61}
62
63fn write_packet(w: &mut dyn Write, buf: &[u8]) -> io::Result<()> {
64 if buf.len() > LARGE_PACKET_DATA_MAX {
65 return Err(io::Error::new(
66 io::ErrorKind::InvalidInput,
67 "packet exceeds max size",
68 ));
69 }
70 let hdr = packet_hex_len(buf.len())?;
71 w.write_all(&hdr)?;
72 w.write_all(buf)?;
73 Ok(())
74}
75
76fn write_packetized_from_buf(w: &mut dyn Write, mut data: &[u8]) -> io::Result<()> {
77 while !data.is_empty() {
78 let n = data.len().min(LARGE_PACKET_DATA_MAX);
79 write_packet(w, &data[..n])?;
80 data = &data[n..];
81 }
82 Ok(())
83}
84
85fn packet_flush_gently(w: &mut dyn Write) -> io::Result<()> {
86 w.write_all(b"0000")?;
87 w.flush()?;
88 Ok(())
89}
90
91fn read_one_packet<R: Read>(r: &mut R, buf: &mut Vec<u8>) -> io::Result<Option<()>> {
92 let mut linelen = [0u8; 4];
93 match r.read_exact(&mut linelen) {
94 Ok(()) => {}
95 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
96 Err(e) => return Err(e),
97 }
98 let len_str = std::str::from_utf8(&linelen).map_err(|e| {
99 io::Error::new(
100 io::ErrorKind::InvalidData,
101 format!("invalid pkt-line length encoding: {e}"),
102 )
103 })?;
104 let len = usize::from_str_radix(len_str, 16).map_err(|e| {
105 io::Error::new(
106 io::ErrorKind::InvalidData,
107 format!("invalid pkt-line length: {e}"),
108 )
109 })?;
110 match len {
111 0 => Ok(None),
112 1 | 2 => Ok(Some(())),
113 n if n < 4 => Err(io::Error::new(
114 io::ErrorKind::InvalidData,
115 format!("bad pkt-line length {n}"),
116 )),
117 n => {
118 let payload = n - 4;
119 let start = buf.len();
120 buf.resize(start + payload, 0);
121 r.read_exact(&mut buf[start..])?;
122 Ok(Some(()))
123 }
124 }
125}
126
127fn read_packetized_to_end<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
128 let mut out = Vec::new();
129 loop {
130 if read_one_packet(r, &mut out)?.is_none() {
131 break;
132 }
133 }
134 Ok(out)
135}
136
137fn unix_stream_connect(path: &Path, _disallow_chdir: bool) -> io::Result<UnixStream> {
138 UnixStream::connect(path)
140}
141
142fn connect_with_retry(path: &Path, wait_if_busy: bool, wait_if_not_found: bool) -> IpcActiveState {
143 let mut elapsed: i32 = 0;
144 loop {
145 match unix_stream_connect(path, false) {
146 Ok(s) => {
147 drop(s);
148 return IpcActiveState::Listening;
149 }
150 Err(e) => {
151 let code = e.raw_os_error();
152 let retry = match code {
153 Some(libc::ENOENT) => wait_if_not_found,
154 Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => wait_if_busy,
155 _ => false,
156 };
157 if !retry || elapsed >= CONNECT_TIMEOUT_MS {
158 return match code {
159 Some(libc::ENOENT) => IpcActiveState::PathNotFound,
160 Some(libc::ECONNREFUSED) => IpcActiveState::NotListening,
161 _ => IpcActiveState::OtherError,
162 };
163 }
164 thread::sleep(Duration::from_millis(WAIT_STEP_MS));
165 elapsed += WAIT_STEP_MS as i32;
166 }
167 }
168 }
169}
170
171pub fn ipc_get_active_state(path: &Path) -> IpcActiveState {
173 let meta = match std::fs::symlink_metadata(path) {
174 Ok(m) => m,
175 Err(e) if e.kind() == io::ErrorKind::NotFound => return IpcActiveState::NotListening,
176 Err(_) => return IpcActiveState::InvalidPath,
177 };
178 if !meta.file_type().is_socket() {
179 return IpcActiveState::InvalidPath;
180 }
181 connect_with_retry(path, false, false)
182}
183
184#[derive(Default)]
185pub struct IpcClientConnectOptions {
186 pub wait_if_busy: bool,
187 pub wait_if_not_found: bool,
188 pub uds_disallow_chdir: bool,
189}
190
191fn connect_for_client(path: &Path, options: &IpcClientConnectOptions) -> io::Result<UnixStream> {
192 let mut elapsed: i32 = 0;
193 loop {
194 match unix_stream_connect(path, options.uds_disallow_chdir) {
195 Ok(s) => return Ok(s),
196 Err(e) => {
197 let code = e.raw_os_error();
198 let retry = match code {
199 Some(libc::ENOENT) => options.wait_if_not_found,
200 Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => options.wait_if_busy,
201 _ => false,
202 };
203 if !retry || elapsed >= CONNECT_TIMEOUT_MS {
204 return Err(e);
205 }
206 thread::sleep(Duration::from_millis(WAIT_STEP_MS));
207 elapsed += WAIT_STEP_MS as i32;
208 }
209 }
210 }
211}
212
213pub fn ipc_client_send_command(
215 path: &Path,
216 options: &IpcClientConnectOptions,
217 message: &[u8],
218) -> io::Result<Vec<u8>> {
219 let mut stream = connect_for_client(path, options)?;
220 write_packetized_from_buf(&mut stream, message)?;
221 packet_flush_gently(&mut stream)?;
222 read_packetized_to_end(&mut stream)
223}
224
225fn block_sigpipe() {
226 unsafe {
227 let mut set: libc::sigset_t = std::mem::zeroed();
228 libc::sigemptyset(&mut set);
229 libc::sigaddset(&mut set, libc::SIGPIPE);
230 libc::pthread_sigmask(libc::SIG_BLOCK, &set, std::ptr::null_mut());
231 }
232}
233
234fn wait_for_io_start(stream: &UnixStream, server_shutdown: &AtomicBool) -> io::Result<()> {
235 let fd = stream.as_raw_fd();
236 loop {
237 if server_shutdown.load(Ordering::SeqCst) {
238 return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "shutdown"));
239 }
240 let mut pfd = libc::pollfd {
241 fd,
242 events: libc::POLLIN,
243 revents: 0,
244 };
245 let n = unsafe { libc::poll(&mut pfd, 1, 10) };
246 if n < 0 {
247 let err = io::Error::last_os_error();
248 if err.raw_os_error() == Some(libc::EINTR) {
249 continue;
250 }
251 return Err(err);
252 }
253 if pfd.revents & libc::POLLHUP != 0 {
254 return Err(io::Error::new(
255 io::ErrorKind::ConnectionAborted,
256 "client hangup",
257 ));
258 }
259 if pfd.revents & libc::POLLIN != 0 {
260 return Ok(());
261 }
262 }
263}
264
265type AppCb = Arc<dyn Fn(&[u8], &mut dyn Write) -> i32 + Send + Sync + 'static>;
266
267struct WorkQueue {
268 fifo: Mutex<VecDeque<UnixStream>>,
269 cv: Condvar,
270 shutdown_requested: AtomicBool,
271 capacity: usize,
272}
273
274impl WorkQueue {
275 fn new(capacity: usize) -> Self {
276 Self {
277 fifo: Mutex::new(VecDeque::new()),
278 cv: Condvar::new(),
279 shutdown_requested: AtomicBool::new(false),
280 capacity,
281 }
282 }
283
284 fn enqueue(&self, stream: UnixStream) {
285 let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
286 if self.shutdown_requested.load(Ordering::SeqCst) {
287 return;
288 }
289 if guard.len() >= self.capacity {
290 return;
291 }
292 guard.push_back(stream);
293 self.cv.notify_one();
294 }
295
296 fn dequeue(&self) -> Option<UnixStream> {
297 let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
298 loop {
299 if let Some(s) = guard.pop_front() {
300 return Some(s);
301 }
302 if self.shutdown_requested.load(Ordering::SeqCst) {
303 return None;
304 }
305 guard = self.cv.wait(guard).unwrap_or_else(|e| e.into_inner());
306 }
307 }
308
309 fn stop(&self) {
310 self.shutdown_requested.store(true, Ordering::SeqCst);
311 let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
312 guard.clear();
313 drop(guard);
314 self.cv.notify_all();
315 }
316}
317
318fn serve_one_connection(
319 mut stream: UnixStream,
320 app: AppCb,
321 server_shutdown: Arc<AtomicBool>,
322 wake: Arc<Mutex<UnixStream>>,
323 queue: Arc<WorkQueue>,
324) {
325 if wait_for_io_start(&stream, &server_shutdown).is_err() {
326 let _ = stream.shutdown(Shutdown::Both);
327 return;
328 }
329 let request = match read_packetized_to_end(&mut stream) {
330 Ok(r) => r,
331 Err(_) => {
332 let _ = stream.shutdown(Shutdown::Both);
333 return;
334 }
335 };
336 let ret = app(&request, &mut stream);
337 let _ = packet_flush_gently(&mut stream);
338 let _ = stream.shutdown(Shutdown::Both);
339 if ret == SIMPLE_IPC_QUIT {
340 server_shutdown.store(true, Ordering::SeqCst);
341 queue.stop();
342 if let Ok(mut tx) = wake.lock() {
343 let _ = tx.write_all(b"Q");
344 }
345 }
346}
347
348#[derive(Debug)]
349pub enum ServerRunError {
350 Io(io::Error),
351 AddressInUse,
352}
353
354impl std::fmt::Display for ServerRunError {
355 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356 match self {
357 ServerRunError::Io(e) => write!(f, "{e}"),
358 ServerRunError::AddressInUse => write!(f, "socket path already in use"),
359 }
360 }
361}
362
363impl std::error::Error for ServerRunError {
364 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
365 match self {
366 ServerRunError::Io(e) => Some(e),
367 ServerRunError::AddressInUse => None,
368 }
369 }
370}
371
372fn try_bind_server(path: &Path) -> io::Result<()> {
373 let lock_path = path.with_extension("lock");
374 let _ = OpenOptions::new()
375 .write(true)
376 .create(true)
377 .truncate(true)
378 .open(&lock_path);
379
380 if path.exists() {
381 if is_socket(path) && unix_stream_connect(path, false).is_ok() {
382 let _ = std::fs::remove_file(&lock_path);
383 return Err(io::Error::new(
384 io::ErrorKind::AddrInUse,
385 "another server is listening",
386 ));
387 }
388 let _ = std::fs::remove_file(path);
389 }
390
391 let _ = std::fs::remove_file(&lock_path);
392 Ok(())
393}
394
395fn is_socket(path: &Path) -> bool {
396 std::fs::symlink_metadata(path)
397 .ok()
398 .is_some_and(|m| m.file_type().is_socket())
399}
400
401pub fn ipc_server_run(path: &Path, nr_threads: usize, app: AppCb) -> Result<(), ServerRunError> {
403 try_bind_server(path).map_err(|e| {
404 if e.kind() == io::ErrorKind::AddrInUse {
405 ServerRunError::AddressInUse
406 } else {
407 ServerRunError::Io(e)
408 }
409 })?;
410
411 let listener = UnixListener::bind(path).map_err(ServerRunError::Io)?;
412 listener.set_nonblocking(true).map_err(ServerRunError::Io)?;
413
414 let nr_threads = nr_threads.max(1);
415 let capacity = nr_threads.saturating_mul(100).max(1);
416 let queue = Arc::new(WorkQueue::new(capacity));
417 let server_shutdown = Arc::new(AtomicBool::new(false));
418
419 let (shutdown_tx, shutdown_rx) = UnixStream::pair().map_err(ServerRunError::Io)?;
420 shutdown_rx
421 .set_nonblocking(true)
422 .map_err(ServerRunError::Io)?;
423 let wake = Arc::new(Mutex::new(shutdown_tx));
424
425 let mut worker_handles = Vec::new();
426 for _ in 0..nr_threads {
427 let q = Arc::clone(&queue);
428 let app_w = Arc::clone(&app);
429 let shut = Arc::clone(&server_shutdown);
430 let wake_w = Arc::clone(&wake);
431 let q_for_worker = Arc::clone(&queue);
432 worker_handles.push(thread::spawn(move || {
433 block_sigpipe();
434 while let Some(stream) = q.dequeue() {
435 serve_one_connection(
436 stream,
437 app_w.clone(),
438 shut.clone(),
439 wake_w.clone(),
440 q_for_worker.clone(),
441 );
442 }
443 }));
444 }
445
446 block_sigpipe();
447 let l_fd = listener.as_raw_fd();
448 let s_fd = shutdown_rx.as_raw_fd();
449
450 loop {
451 if server_shutdown.load(Ordering::SeqCst) {
452 break;
453 }
454 let mut fds = [
455 libc::pollfd {
456 fd: s_fd,
457 events: libc::POLLIN,
458 revents: 0,
459 },
460 libc::pollfd {
461 fd: l_fd,
462 events: libc::POLLIN,
463 revents: 0,
464 },
465 ];
466 let n = unsafe { libc::poll(fds.as_mut_ptr(), 2, 60_000) };
467 if n < 0 {
468 let err = io::Error::last_os_error();
469 if err.raw_os_error() == Some(libc::EINTR) {
470 continue;
471 }
472 break;
473 }
474 if fds[0].revents & libc::POLLIN != 0 {
475 break;
476 }
477 if fds[1].revents & libc::POLLIN != 0 {
478 loop {
479 match listener.accept() {
480 Ok((stream, _)) => {
481 let _ = stream.set_nonblocking(false);
482 queue.enqueue(stream);
483 }
484 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
485 Err(_) => break,
486 }
487 }
488 }
489 }
490
491 queue.stop();
492 drop(listener);
493 for h in worker_handles {
494 let _ = h.join();
495 }
496 let _ = std::fs::remove_file(path);
497 Ok(())
498}
499
500#[must_use]
502pub fn test_app_callback() -> AppCb {
503 Arc::new(|request: &[u8], reply: &mut dyn Write| {
504 if request == b"quit" {
505 return SIMPLE_IPC_QUIT;
506 }
507 if request == b"ping" {
508 let _ = write_packetized_from_buf(reply, b"pong");
509 return 0;
510 }
511 if request == b"big" {
512 let mut line = Vec::with_capacity(84);
513 for row in 0..10_000 {
514 line.clear();
515 use std::io::Write as _;
516 let _ = writeln!(&mut line, "big: {:075}", row);
517 let _ = write_packetized_from_buf(reply, &line);
518 }
519 return 0;
520 }
521 if request == b"chunk" {
522 let mut line = Vec::with_capacity(84);
523 for row in 0..10_000 {
524 line.clear();
525 use std::io::Write as _;
526 let _ = writeln!(&mut line, "big: {:075}", row);
527 let _ = write_packet(reply, &line);
528 }
529 return 0;
530 }
531 if request == b"slow" {
532 let mut line = Vec::with_capacity(84);
533 for row in 0..1000 {
534 line.clear();
535 use std::io::Write as _;
536 let _ = writeln!(&mut line, "big: {:075}", row);
537 let _ = write_packet(reply, &line);
538 thread::sleep(Duration::from_millis(10));
539 }
540 return 0;
541 }
542 if request.len() >= 10 && request.starts_with(b"sendbytes ") {
543 return handle_sendbytes(request, reply);
544 }
545 let msg = format!("unhandled command: {}", String::from_utf8_lossy(request));
546 let _ = write_packetized_from_buf(reply, msg.as_bytes());
547 0
548 })
549}
550
551fn handle_sendbytes(request: &[u8], reply: &mut dyn Write) -> i32 {
552 let rest = &request[b"sendbytes ".len()..];
553 if rest.is_empty() {
554 return 0;
555 }
556 let b0 = rest[0];
557 let mut errs = 0usize;
558 for &b in &rest[1..] {
559 if b != b0 {
560 errs += 1;
561 }
562 }
563 if errs > 0 {
564 let msg = format!("errs:{errs}\n");
565 let _ = write_packetized_from_buf(reply, msg.as_bytes());
566 } else {
567 let msg = format!("rcvd:{}{:08}\n", char::from(b0), rest.len());
568 let _ = write_packetized_from_buf(reply, msg.as_bytes());
569 }
570 0
571}
572
573pub fn run_simple_ipc_tool(args: &[String]) -> i32 {
575 if args.first().map(|s| s.as_str()) == Some("SUPPORTS_SIMPLE_IPC") {
576 return 0;
577 }
578 if args.is_empty() {
579 eprintln!("usage: test-tool simple-ipc <subcommand> ...");
580 return 1;
581 }
582
583 let mut path = PathBuf::from("ipc-test");
584 let mut nr_threads = 5usize;
585 let mut max_wait_sec = 60u64;
586 let mut bytecount = 1024usize;
587 let mut batchsize = 10usize;
588 let mut token: Option<String> = None;
589 let mut bytevalue: u8 = b'x';
590
591 let sub = args[0].clone();
592 let mut i = 1usize;
593 while i < args.len() {
594 let a = args[i].as_str();
595 if let Some(v) = a.strip_prefix("--name=") {
596 path = PathBuf::from(v);
597 } else if let Some(v) = a.strip_prefix("--threads=") {
598 nr_threads = v.parse().unwrap_or(1).max(1);
599 } else if let Some(v) = a.strip_prefix("--max-wait=") {
600 max_wait_sec = v.parse().unwrap_or(0);
601 } else if let Some(v) = a.strip_prefix("--bytecount=") {
602 bytecount = v.parse().unwrap_or(1).max(1);
603 } else if let Some(v) = a.strip_prefix("--batchsize=") {
604 batchsize = v.parse().unwrap_or(1).max(1);
605 } else if let Some(v) = a.strip_prefix("--token=") {
606 token = Some(v.to_string());
607 } else if let Some(v) = a.strip_prefix("--byte=") {
608 if let Some(c) = v.as_bytes().first() {
609 bytevalue = *c;
610 }
611 }
612 i += 1;
613 }
614
615 match sub.as_str() {
616 "is-active" => match ipc_get_active_state(&path) {
617 IpcActiveState::Listening => 0,
618 IpcActiveState::NotListening => {
619 eprintln!("no server listening at '{}'", path.display());
620 1
621 }
622 IpcActiveState::PathNotFound => {
623 eprintln!("path not found '{}'", path.display());
624 1
625 }
626 IpcActiveState::InvalidPath => {
627 eprintln!("invalid pipe/socket name '{}'", path.display());
628 1
629 }
630 IpcActiveState::OtherError => {
631 eprintln!("other error for '{}'", path.display());
632 1
633 }
634 },
635 "run-daemon" => {
636 let app = test_app_callback();
637 match ipc_server_run(&path, nr_threads, app) {
638 Ok(()) => 0,
639 Err(ServerRunError::AddressInUse) => {
640 eprintln!("socket/pipe already in use: '{}'", path.display());
641 1
642 }
643 Err(ServerRunError::Io(e)) => {
644 eprintln!("could not start server on '{}': {e}", path.display());
645 1
646 }
647 }
648 }
649 "start-daemon" => match spawn_daemon(&path, nr_threads, max_wait_sec) {
650 Ok(()) => 0,
651 Err(e) => {
652 eprintln!("{e}");
653 1
654 }
655 },
656 "stop-daemon" => {
657 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
658 eprintln!("no server listening at '{}'", path.display());
659 return 1;
660 }
661 let opts = IpcClientConnectOptions {
662 wait_if_busy: true,
663 wait_if_not_found: false,
664 uds_disallow_chdir: false,
665 };
666 if ipc_client_send_command(&path, &opts, b"quit").is_err() {
667 return 1;
668 }
669 let deadline = SystemTime::now()
670 .duration_since(UNIX_EPOCH)
671 .unwrap_or_default()
672 .as_secs()
673 + max_wait_sec;
674 loop {
675 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
676 return 0;
677 }
678 let now = SystemTime::now()
679 .duration_since(UNIX_EPOCH)
680 .unwrap_or_default()
681 .as_secs();
682 if now > deadline {
683 eprintln!("daemon has not shutdown yet");
684 return 1;
685 }
686 thread::sleep(Duration::from_millis(100));
687 }
688 }
689 "send" => {
690 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
691 eprintln!("no server listening at '{}'", path.display());
692 return 1;
693 }
694 let cmd = token.as_deref().unwrap_or("(no-command)");
695 let opts = IpcClientConnectOptions {
696 wait_if_busy: true,
697 wait_if_not_found: false,
698 uds_disallow_chdir: false,
699 };
700 match ipc_client_send_command(&path, &opts, cmd.as_bytes()) {
701 Ok(resp) => {
702 if !resp.is_empty() {
703 println!("{}", String::from_utf8_lossy(&resp).trim_end());
704 }
705 0
706 }
707 Err(_) => {
708 eprintln!("failed to send '{cmd}' to '{}'", path.display());
709 1
710 }
711 }
712 }
713 "sendbytes" => {
714 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
715 eprintln!("no server listening at '{}'", path.display());
716 return 1;
717 }
718 let mut msg = b"sendbytes ".to_vec();
719 msg.extend(std::iter::repeat_n(bytevalue, bytecount));
720 let opts = IpcClientConnectOptions {
721 wait_if_busy: true,
722 wait_if_not_found: false,
723 uds_disallow_chdir: false,
724 };
725 match ipc_client_send_command(&path, &opts, &msg) {
726 Ok(resp) => {
727 let tail = String::from_utf8_lossy(&resp);
728 let tail = tail.trim_end();
729 println!("sent:{}{:08} {tail}", char::from(bytevalue), bytecount);
730 0
731 }
732 Err(_) => 1,
733 }
734 }
735 "multiple" => {
736 if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
737 eprintln!("no server listening at '{}'", path.display());
738 return 1;
739 }
740 run_multiple(&path, nr_threads, bytecount, batchsize)
741 }
742 _ => {
743 eprintln!("Unhandled subcommand: '{sub}'");
744 1
745 }
746 }
747}
748
749fn spawn_daemon(path: &Path, nr_threads: usize, max_wait_sec: u64) -> Result<(), String> {
750 use std::process::{Command, Stdio};
751 let exe = std::env::current_exe().map_err(|e| e.to_string())?;
752 let mut cmd = Command::new(exe);
753 cmd.arg("test-tool")
754 .arg("simple-ipc")
755 .arg("run-daemon")
756 .arg(format!("--name={}", path.display()))
757 .arg(format!("--threads={nr_threads}"))
758 .stdin(Stdio::null())
759 .stdout(Stdio::null())
760 .stderr(Stdio::null());
761 cmd.spawn().map_err(|e| e.to_string())?;
762 let deadline = SystemTime::now()
763 .duration_since(UNIX_EPOCH)
764 .map_err(|e| e.to_string())?
765 .as_secs()
766 + max_wait_sec.max(1);
767 loop {
768 if matches!(ipc_get_active_state(path), IpcActiveState::Listening) {
769 return Ok(());
770 }
771 let now = SystemTime::now()
772 .duration_since(UNIX_EPOCH)
773 .map_err(|e| e.to_string())?
774 .as_secs();
775 if now > deadline {
776 return Err("daemon not online yet".to_string());
777 }
778 thread::sleep(Duration::from_millis(50));
779 }
780}
781
782fn run_multiple(path: &Path, nr_threads: usize, bytecount: usize, batchsize: usize) -> i32 {
783 use std::sync::atomic::{AtomicUsize, Ordering as AOrd};
784 let sum_errors = Arc::new(AtomicUsize::new(0));
785 let sum_good = Arc::new(AtomicUsize::new(0));
786 let sum_join_errors = Arc::new(AtomicUsize::new(0));
787 let mut handles = Vec::new();
788 for k in 0..nr_threads {
789 let p = path.to_path_buf();
790 let letter = (b'A' + (k % 26) as u8) as char;
791 let base_count = bytecount + batchsize * (k / 26);
792 let batch = batchsize;
793 let sg = Arc::clone(&sum_good);
794 let se = Arc::clone(&sum_errors);
795 handles.push(thread::spawn(move || {
796 for t in 0..batch {
797 let n = base_count + t;
798 let mut msg = b"sendbytes ".to_vec();
799 msg.extend(std::iter::repeat_n(letter as u8, n));
800 let opts = IpcClientConnectOptions {
801 wait_if_busy: true,
802 wait_if_not_found: false,
803 uds_disallow_chdir: true,
804 };
805 match ipc_client_send_command(&p, &opts, &msg) {
806 Ok(resp) => {
807 let tail = String::from_utf8_lossy(&resp);
808 let tail = tail.trim_end();
809 println!("sent:{}{:08} {tail}", letter, n);
810 sg.fetch_add(1, AOrd::SeqCst);
811 }
812 Err(_) => {
813 se.fetch_add(1, AOrd::SeqCst);
814 }
815 }
816 }
817 }));
818 }
819 for h in handles {
820 if h.join().is_err() {
821 sum_join_errors.fetch_add(1, AOrd::SeqCst);
822 }
823 }
824 let good = sum_good.load(AOrd::SeqCst);
825 let je = sum_join_errors.load(AOrd::SeqCst);
826 let err = sum_errors.load(AOrd::SeqCst);
827 println!("client (good {good}) (join {je}), (errors {err})");
828 if je + err > 0 {
829 1
830 } else {
831 0
832 }
833}