Skip to main content

grit_lib/
simple_ipc.rs

1//! Git-compatible "simple IPC" over Unix domain sockets using pkt-line framing.
2//!
3//! Used by `test-tool simple-ipc` (`t0052-simple-ipc.sh`).
4
5use std::collections::VecDeque;
6use std::fs::OpenOptions;
7use std::io::{self, Read, Write};
8use std::net::Shutdown;
9use std::os::unix::fs::FileTypeExt;
10use std::os::unix::io::AsRawFd;
11use std::os::unix::net::{UnixListener, UnixStream};
12use std::path::{Path, PathBuf};
13use std::sync::atomic::{AtomicBool, Ordering};
14use std::sync::{Arc, Condvar, Mutex};
15use std::thread;
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18/// Application callback return value that requests server shutdown (no reply).
19pub const SIMPLE_IPC_QUIT: i32 = -2;
20
21/// Git `LARGE_PACKET_DATA_MAX` (`65520 - 4`).
22const LARGE_PACKET_DATA_MAX: usize = 65516;
23
24const CONNECT_TIMEOUT_MS: i32 = 1000;
25const WAIT_STEP_MS: u64 = 50;
26
27/// Whether simple IPC is supported (Unix only).
28#[must_use]
29pub fn supports_simple_ipc() -> bool {
30    cfg!(unix)
31}
32
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34pub enum IpcActiveState {
35    Listening,
36    NotListening,
37    InvalidPath,
38    PathNotFound,
39    OtherError,
40}
41
42fn packet_hex_len(payload_len: usize) -> io::Result<[u8; 4]> {
43    let packet_size = payload_len + 4;
44    if packet_size > 0xffff {
45        return Err(io::Error::new(
46            io::ErrorKind::InvalidInput,
47            "packet exceeds max size",
48        ));
49    }
50    Ok(hex4_upper(packet_size))
51}
52
53fn hex4_upper(mut n: usize) -> [u8; 4] {
54    const HEX: &[u8; 16] = b"0123456789abcdef";
55    let mut out = [0u8; 4];
56    for i in (0..4).rev() {
57        out[i] = HEX[n & 0xf];
58        n >>= 4;
59    }
60    out
61}
62
63fn write_packet(w: &mut dyn Write, buf: &[u8]) -> io::Result<()> {
64    if buf.len() > LARGE_PACKET_DATA_MAX {
65        return Err(io::Error::new(
66            io::ErrorKind::InvalidInput,
67            "packet exceeds max size",
68        ));
69    }
70    let hdr = packet_hex_len(buf.len())?;
71    w.write_all(&hdr)?;
72    w.write_all(buf)?;
73    Ok(())
74}
75
76fn write_packetized_from_buf(w: &mut dyn Write, mut data: &[u8]) -> io::Result<()> {
77    while !data.is_empty() {
78        let n = data.len().min(LARGE_PACKET_DATA_MAX);
79        write_packet(w, &data[..n])?;
80        data = &data[n..];
81    }
82    Ok(())
83}
84
85fn packet_flush_gently(w: &mut dyn Write) -> io::Result<()> {
86    w.write_all(b"0000")?;
87    w.flush()?;
88    Ok(())
89}
90
91fn read_one_packet<R: Read>(r: &mut R, buf: &mut Vec<u8>) -> io::Result<Option<()>> {
92    let mut linelen = [0u8; 4];
93    match r.read_exact(&mut linelen) {
94        Ok(()) => {}
95        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
96        Err(e) => return Err(e),
97    }
98    let len_str = std::str::from_utf8(&linelen).map_err(|e| {
99        io::Error::new(
100            io::ErrorKind::InvalidData,
101            format!("invalid pkt-line length encoding: {e}"),
102        )
103    })?;
104    let len = usize::from_str_radix(len_str, 16).map_err(|e| {
105        io::Error::new(
106            io::ErrorKind::InvalidData,
107            format!("invalid pkt-line length: {e}"),
108        )
109    })?;
110    match len {
111        0 => Ok(None),
112        1 | 2 => Ok(Some(())),
113        n if n < 4 => Err(io::Error::new(
114            io::ErrorKind::InvalidData,
115            format!("bad pkt-line length {n}"),
116        )),
117        n => {
118            let payload = n - 4;
119            let start = buf.len();
120            buf.resize(start + payload, 0);
121            r.read_exact(&mut buf[start..])?;
122            Ok(Some(()))
123        }
124    }
125}
126
127fn read_packetized_to_end<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
128    let mut out = Vec::new();
129    loop {
130        if read_one_packet(r, &mut out)?.is_none() {
131            break;
132        }
133    }
134    Ok(out)
135}
136
137fn unix_stream_connect(path: &Path, _disallow_chdir: bool) -> io::Result<UnixStream> {
138    // Git may chdir for overlong paths; harness paths (e.g. `ipc-test`) fit in `sun_path`.
139    UnixStream::connect(path)
140}
141
142fn connect_with_retry(path: &Path, wait_if_busy: bool, wait_if_not_found: bool) -> IpcActiveState {
143    let mut elapsed: i32 = 0;
144    loop {
145        match unix_stream_connect(path, false) {
146            Ok(s) => {
147                drop(s);
148                return IpcActiveState::Listening;
149            }
150            Err(e) => {
151                let code = e.raw_os_error();
152                let retry = match code {
153                    Some(libc::ENOENT) => wait_if_not_found,
154                    Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => wait_if_busy,
155                    _ => false,
156                };
157                if !retry || elapsed >= CONNECT_TIMEOUT_MS {
158                    return match code {
159                        Some(libc::ENOENT) => IpcActiveState::PathNotFound,
160                        Some(libc::ECONNREFUSED) => IpcActiveState::NotListening,
161                        _ => IpcActiveState::OtherError,
162                    };
163                }
164                thread::sleep(Duration::from_millis(WAIT_STEP_MS));
165                elapsed += WAIT_STEP_MS as i32;
166            }
167        }
168    }
169}
170
171/// Probe whether a server is accepting connections at `path`.
172pub fn ipc_get_active_state(path: &Path) -> IpcActiveState {
173    let meta = match std::fs::symlink_metadata(path) {
174        Ok(m) => m,
175        Err(e) if e.kind() == io::ErrorKind::NotFound => return IpcActiveState::NotListening,
176        Err(_) => return IpcActiveState::InvalidPath,
177    };
178    if !meta.file_type().is_socket() {
179        return IpcActiveState::InvalidPath;
180    }
181    connect_with_retry(path, false, false)
182}
183
184#[derive(Default)]
185pub struct IpcClientConnectOptions {
186    pub wait_if_busy: bool,
187    pub wait_if_not_found: bool,
188    pub uds_disallow_chdir: bool,
189}
190
191fn connect_for_client(path: &Path, options: &IpcClientConnectOptions) -> io::Result<UnixStream> {
192    let mut elapsed: i32 = 0;
193    loop {
194        match unix_stream_connect(path, options.uds_disallow_chdir) {
195            Ok(s) => return Ok(s),
196            Err(e) => {
197                let code = e.raw_os_error();
198                let retry = match code {
199                    Some(libc::ENOENT) => options.wait_if_not_found,
200                    Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => options.wait_if_busy,
201                    _ => false,
202                };
203                if !retry || elapsed >= CONNECT_TIMEOUT_MS {
204                    return Err(e);
205                }
206                thread::sleep(Duration::from_millis(WAIT_STEP_MS));
207                elapsed += WAIT_STEP_MS as i32;
208            }
209        }
210    }
211}
212
213/// Connect, send pkt-line message + flush, read full response (until flush).
214pub fn ipc_client_send_command(
215    path: &Path,
216    options: &IpcClientConnectOptions,
217    message: &[u8],
218) -> io::Result<Vec<u8>> {
219    let mut stream = connect_for_client(path, options)?;
220    write_packetized_from_buf(&mut stream, message)?;
221    packet_flush_gently(&mut stream)?;
222    read_packetized_to_end(&mut stream)
223}
224
225fn block_sigpipe() {
226    unsafe {
227        let mut set: libc::sigset_t = std::mem::zeroed();
228        libc::sigemptyset(&mut set);
229        libc::sigaddset(&mut set, libc::SIGPIPE);
230        libc::pthread_sigmask(libc::SIG_BLOCK, &set, std::ptr::null_mut());
231    }
232}
233
234fn wait_for_io_start(stream: &UnixStream, server_shutdown: &AtomicBool) -> io::Result<()> {
235    let fd = stream.as_raw_fd();
236    loop {
237        if server_shutdown.load(Ordering::SeqCst) {
238            return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "shutdown"));
239        }
240        let mut pfd = libc::pollfd {
241            fd,
242            events: libc::POLLIN,
243            revents: 0,
244        };
245        let n = unsafe { libc::poll(&mut pfd, 1, 10) };
246        if n < 0 {
247            let err = io::Error::last_os_error();
248            if err.raw_os_error() == Some(libc::EINTR) {
249                continue;
250            }
251            return Err(err);
252        }
253        if pfd.revents & libc::POLLHUP != 0 {
254            return Err(io::Error::new(
255                io::ErrorKind::ConnectionAborted,
256                "client hangup",
257            ));
258        }
259        if pfd.revents & libc::POLLIN != 0 {
260            return Ok(());
261        }
262    }
263}
264
265type AppCb = Arc<dyn Fn(&[u8], &mut dyn Write) -> i32 + Send + Sync + 'static>;
266
267struct WorkQueue {
268    fifo: Mutex<VecDeque<UnixStream>>,
269    cv: Condvar,
270    shutdown_requested: AtomicBool,
271    capacity: usize,
272}
273
274impl WorkQueue {
275    fn new(capacity: usize) -> Self {
276        Self {
277            fifo: Mutex::new(VecDeque::new()),
278            cv: Condvar::new(),
279            shutdown_requested: AtomicBool::new(false),
280            capacity,
281        }
282    }
283
284    fn enqueue(&self, stream: UnixStream) {
285        let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
286        if self.shutdown_requested.load(Ordering::SeqCst) {
287            return;
288        }
289        if guard.len() >= self.capacity {
290            return;
291        }
292        guard.push_back(stream);
293        self.cv.notify_one();
294    }
295
296    fn dequeue(&self) -> Option<UnixStream> {
297        let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
298        loop {
299            if let Some(s) = guard.pop_front() {
300                return Some(s);
301            }
302            if self.shutdown_requested.load(Ordering::SeqCst) {
303                return None;
304            }
305            guard = self.cv.wait(guard).unwrap_or_else(|e| e.into_inner());
306        }
307    }
308
309    fn stop(&self) {
310        self.shutdown_requested.store(true, Ordering::SeqCst);
311        let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
312        guard.clear();
313        drop(guard);
314        self.cv.notify_all();
315    }
316}
317
318fn serve_one_connection(
319    mut stream: UnixStream,
320    app: AppCb,
321    server_shutdown: Arc<AtomicBool>,
322    wake: Arc<Mutex<UnixStream>>,
323    queue: Arc<WorkQueue>,
324) {
325    if wait_for_io_start(&stream, &server_shutdown).is_err() {
326        let _ = stream.shutdown(Shutdown::Both);
327        return;
328    }
329    let request = match read_packetized_to_end(&mut stream) {
330        Ok(r) => r,
331        Err(_) => {
332            let _ = stream.shutdown(Shutdown::Both);
333            return;
334        }
335    };
336    let ret = app(&request, &mut stream);
337    let _ = packet_flush_gently(&mut stream);
338    let _ = stream.shutdown(Shutdown::Both);
339    if ret == SIMPLE_IPC_QUIT {
340        server_shutdown.store(true, Ordering::SeqCst);
341        queue.stop();
342        if let Ok(mut tx) = wake.lock() {
343            let _ = tx.write_all(b"Q");
344        }
345    }
346}
347
348#[derive(Debug)]
349pub enum ServerRunError {
350    Io(io::Error),
351    AddressInUse,
352}
353
354impl std::fmt::Display for ServerRunError {
355    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
356        match self {
357            ServerRunError::Io(e) => write!(f, "{e}"),
358            ServerRunError::AddressInUse => write!(f, "socket path already in use"),
359        }
360    }
361}
362
363impl std::error::Error for ServerRunError {
364    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
365        match self {
366            ServerRunError::Io(e) => Some(e),
367            ServerRunError::AddressInUse => None,
368        }
369    }
370}
371
372fn try_bind_server(path: &Path) -> io::Result<()> {
373    let lock_path = path.with_extension("lock");
374    let _ = OpenOptions::new()
375        .write(true)
376        .create(true)
377        .truncate(true)
378        .open(&lock_path);
379
380    if path.exists() {
381        if is_socket(path) && unix_stream_connect(path, false).is_ok() {
382            let _ = std::fs::remove_file(&lock_path);
383            return Err(io::Error::new(
384                io::ErrorKind::AddrInUse,
385                "another server is listening",
386            ));
387        }
388        let _ = std::fs::remove_file(path);
389    }
390
391    let _ = std::fs::remove_file(&lock_path);
392    Ok(())
393}
394
395fn is_socket(path: &Path) -> bool {
396    std::fs::symlink_metadata(path)
397        .ok()
398        .is_some_and(|m| m.file_type().is_socket())
399}
400
401/// Blocking IPC server (`run-daemon`).
402pub fn ipc_server_run(path: &Path, nr_threads: usize, app: AppCb) -> Result<(), ServerRunError> {
403    try_bind_server(path).map_err(|e| {
404        if e.kind() == io::ErrorKind::AddrInUse {
405            ServerRunError::AddressInUse
406        } else {
407            ServerRunError::Io(e)
408        }
409    })?;
410
411    let listener = UnixListener::bind(path).map_err(ServerRunError::Io)?;
412    listener.set_nonblocking(true).map_err(ServerRunError::Io)?;
413
414    let nr_threads = nr_threads.max(1);
415    let capacity = nr_threads.saturating_mul(100).max(1);
416    let queue = Arc::new(WorkQueue::new(capacity));
417    let server_shutdown = Arc::new(AtomicBool::new(false));
418
419    let (shutdown_tx, shutdown_rx) = UnixStream::pair().map_err(ServerRunError::Io)?;
420    shutdown_rx
421        .set_nonblocking(true)
422        .map_err(ServerRunError::Io)?;
423    let wake = Arc::new(Mutex::new(shutdown_tx));
424
425    let mut worker_handles = Vec::new();
426    for _ in 0..nr_threads {
427        let q = Arc::clone(&queue);
428        let app_w = Arc::clone(&app);
429        let shut = Arc::clone(&server_shutdown);
430        let wake_w = Arc::clone(&wake);
431        let q_for_worker = Arc::clone(&queue);
432        worker_handles.push(thread::spawn(move || {
433            block_sigpipe();
434            while let Some(stream) = q.dequeue() {
435                serve_one_connection(
436                    stream,
437                    app_w.clone(),
438                    shut.clone(),
439                    wake_w.clone(),
440                    q_for_worker.clone(),
441                );
442            }
443        }));
444    }
445
446    block_sigpipe();
447    let l_fd = listener.as_raw_fd();
448    let s_fd = shutdown_rx.as_raw_fd();
449
450    loop {
451        if server_shutdown.load(Ordering::SeqCst) {
452            break;
453        }
454        let mut fds = [
455            libc::pollfd {
456                fd: s_fd,
457                events: libc::POLLIN,
458                revents: 0,
459            },
460            libc::pollfd {
461                fd: l_fd,
462                events: libc::POLLIN,
463                revents: 0,
464            },
465        ];
466        let n = unsafe { libc::poll(fds.as_mut_ptr(), 2, 60_000) };
467        if n < 0 {
468            let err = io::Error::last_os_error();
469            if err.raw_os_error() == Some(libc::EINTR) {
470                continue;
471            }
472            break;
473        }
474        if fds[0].revents & libc::POLLIN != 0 {
475            break;
476        }
477        if fds[1].revents & libc::POLLIN != 0 {
478            loop {
479                match listener.accept() {
480                    Ok((stream, _)) => {
481                        let _ = stream.set_nonblocking(false);
482                        queue.enqueue(stream);
483                    }
484                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
485                    Err(_) => break,
486                }
487            }
488        }
489    }
490
491    queue.stop();
492    drop(listener);
493    for h in worker_handles {
494        let _ = h.join();
495    }
496    let _ = std::fs::remove_file(path);
497    Ok(())
498}
499
500/// Test daemon command handler (mirrors Git `test-simple-ipc.c`).
501#[must_use]
502pub fn test_app_callback() -> AppCb {
503    Arc::new(|request: &[u8], reply: &mut dyn Write| {
504        if request == b"quit" {
505            return SIMPLE_IPC_QUIT;
506        }
507        if request == b"ping" {
508            let _ = write_packetized_from_buf(reply, b"pong");
509            return 0;
510        }
511        if request == b"big" {
512            let mut line = Vec::with_capacity(84);
513            for row in 0..10_000 {
514                line.clear();
515                use std::io::Write as _;
516                let _ = writeln!(&mut line, "big: {:075}", row);
517                let _ = write_packetized_from_buf(reply, &line);
518            }
519            return 0;
520        }
521        if request == b"chunk" {
522            let mut line = Vec::with_capacity(84);
523            for row in 0..10_000 {
524                line.clear();
525                use std::io::Write as _;
526                let _ = writeln!(&mut line, "big: {:075}", row);
527                let _ = write_packet(reply, &line);
528            }
529            return 0;
530        }
531        if request == b"slow" {
532            let mut line = Vec::with_capacity(84);
533            for row in 0..1000 {
534                line.clear();
535                use std::io::Write as _;
536                let _ = writeln!(&mut line, "big: {:075}", row);
537                let _ = write_packet(reply, &line);
538                thread::sleep(Duration::from_millis(10));
539            }
540            return 0;
541        }
542        if request.len() >= 10 && request.starts_with(b"sendbytes ") {
543            return handle_sendbytes(request, reply);
544        }
545        let msg = format!("unhandled command: {}", String::from_utf8_lossy(request));
546        let _ = write_packetized_from_buf(reply, msg.as_bytes());
547        0
548    })
549}
550
551fn handle_sendbytes(request: &[u8], reply: &mut dyn Write) -> i32 {
552    let rest = &request[b"sendbytes ".len()..];
553    if rest.is_empty() {
554        return 0;
555    }
556    let b0 = rest[0];
557    let mut errs = 0usize;
558    for &b in &rest[1..] {
559        if b != b0 {
560            errs += 1;
561        }
562    }
563    if errs > 0 {
564        let msg = format!("errs:{errs}\n");
565        let _ = write_packetized_from_buf(reply, msg.as_bytes());
566    } else {
567        let msg = format!("rcvd:{}{:08}\n", char::from(b0), rest.len());
568        let _ = write_packetized_from_buf(reply, msg.as_bytes());
569    }
570    0
571}
572
573/// `test-tool simple-ipc` entry (Unix). Returns exit code.
574pub fn run_simple_ipc_tool(args: &[String]) -> i32 {
575    if args.first().map(|s| s.as_str()) == Some("SUPPORTS_SIMPLE_IPC") {
576        return 0;
577    }
578    if args.is_empty() {
579        eprintln!("usage: test-tool simple-ipc <subcommand> ...");
580        return 1;
581    }
582
583    let mut path = PathBuf::from("ipc-test");
584    let mut nr_threads = 5usize;
585    let mut max_wait_sec = 60u64;
586    let mut bytecount = 1024usize;
587    let mut batchsize = 10usize;
588    let mut token: Option<String> = None;
589    let mut bytevalue: u8 = b'x';
590
591    let sub = args[0].clone();
592    let mut i = 1usize;
593    while i < args.len() {
594        let a = args[i].as_str();
595        if let Some(v) = a.strip_prefix("--name=") {
596            path = PathBuf::from(v);
597        } else if let Some(v) = a.strip_prefix("--threads=") {
598            nr_threads = v.parse().unwrap_or(1).max(1);
599        } else if let Some(v) = a.strip_prefix("--max-wait=") {
600            max_wait_sec = v.parse().unwrap_or(0);
601        } else if let Some(v) = a.strip_prefix("--bytecount=") {
602            bytecount = v.parse().unwrap_or(1).max(1);
603        } else if let Some(v) = a.strip_prefix("--batchsize=") {
604            batchsize = v.parse().unwrap_or(1).max(1);
605        } else if let Some(v) = a.strip_prefix("--token=") {
606            token = Some(v.to_string());
607        } else if let Some(v) = a.strip_prefix("--byte=") {
608            if let Some(c) = v.as_bytes().first() {
609                bytevalue = *c;
610            }
611        }
612        i += 1;
613    }
614
615    match sub.as_str() {
616        "is-active" => match ipc_get_active_state(&path) {
617            IpcActiveState::Listening => 0,
618            IpcActiveState::NotListening => {
619                eprintln!("no server listening at '{}'", path.display());
620                1
621            }
622            IpcActiveState::PathNotFound => {
623                eprintln!("path not found '{}'", path.display());
624                1
625            }
626            IpcActiveState::InvalidPath => {
627                eprintln!("invalid pipe/socket name '{}'", path.display());
628                1
629            }
630            IpcActiveState::OtherError => {
631                eprintln!("other error for '{}'", path.display());
632                1
633            }
634        },
635        "run-daemon" => {
636            let app = test_app_callback();
637            match ipc_server_run(&path, nr_threads, app) {
638                Ok(()) => 0,
639                Err(ServerRunError::AddressInUse) => {
640                    eprintln!("socket/pipe already in use: '{}'", path.display());
641                    1
642                }
643                Err(ServerRunError::Io(e)) => {
644                    eprintln!("could not start server on '{}': {e}", path.display());
645                    1
646                }
647            }
648        }
649        "start-daemon" => match spawn_daemon(&path, nr_threads, max_wait_sec) {
650            Ok(()) => 0,
651            Err(e) => {
652                eprintln!("{e}");
653                1
654            }
655        },
656        "stop-daemon" => {
657            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
658                eprintln!("no server listening at '{}'", path.display());
659                return 1;
660            }
661            let opts = IpcClientConnectOptions {
662                wait_if_busy: true,
663                wait_if_not_found: false,
664                uds_disallow_chdir: false,
665            };
666            if ipc_client_send_command(&path, &opts, b"quit").is_err() {
667                return 1;
668            }
669            let deadline = SystemTime::now()
670                .duration_since(UNIX_EPOCH)
671                .unwrap_or_default()
672                .as_secs()
673                + max_wait_sec;
674            loop {
675                if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
676                    return 0;
677                }
678                let now = SystemTime::now()
679                    .duration_since(UNIX_EPOCH)
680                    .unwrap_or_default()
681                    .as_secs();
682                if now > deadline {
683                    eprintln!("daemon has not shutdown yet");
684                    return 1;
685                }
686                thread::sleep(Duration::from_millis(100));
687            }
688        }
689        "send" => {
690            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
691                eprintln!("no server listening at '{}'", path.display());
692                return 1;
693            }
694            let cmd = token.as_deref().unwrap_or("(no-command)");
695            let opts = IpcClientConnectOptions {
696                wait_if_busy: true,
697                wait_if_not_found: false,
698                uds_disallow_chdir: false,
699            };
700            match ipc_client_send_command(&path, &opts, cmd.as_bytes()) {
701                Ok(resp) => {
702                    if !resp.is_empty() {
703                        println!("{}", String::from_utf8_lossy(&resp).trim_end());
704                    }
705                    0
706                }
707                Err(_) => {
708                    eprintln!("failed to send '{cmd}' to '{}'", path.display());
709                    1
710                }
711            }
712        }
713        "sendbytes" => {
714            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
715                eprintln!("no server listening at '{}'", path.display());
716                return 1;
717            }
718            let mut msg = b"sendbytes ".to_vec();
719            msg.extend(std::iter::repeat_n(bytevalue, bytecount));
720            let opts = IpcClientConnectOptions {
721                wait_if_busy: true,
722                wait_if_not_found: false,
723                uds_disallow_chdir: false,
724            };
725            match ipc_client_send_command(&path, &opts, &msg) {
726                Ok(resp) => {
727                    let tail = String::from_utf8_lossy(&resp);
728                    let tail = tail.trim_end();
729                    println!("sent:{}{:08} {tail}", char::from(bytevalue), bytecount);
730                    0
731                }
732                Err(_) => 1,
733            }
734        }
735        "multiple" => {
736            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
737                eprintln!("no server listening at '{}'", path.display());
738                return 1;
739            }
740            run_multiple(&path, nr_threads, bytecount, batchsize)
741        }
742        _ => {
743            eprintln!("Unhandled subcommand: '{sub}'");
744            1
745        }
746    }
747}
748
749fn spawn_daemon(path: &Path, nr_threads: usize, max_wait_sec: u64) -> Result<(), String> {
750    use std::process::{Command, Stdio};
751    let exe = std::env::current_exe().map_err(|e| e.to_string())?;
752    let mut cmd = Command::new(exe);
753    cmd.arg("test-tool")
754        .arg("simple-ipc")
755        .arg("run-daemon")
756        .arg(format!("--name={}", path.display()))
757        .arg(format!("--threads={nr_threads}"))
758        .stdin(Stdio::null())
759        .stdout(Stdio::null())
760        .stderr(Stdio::null());
761    cmd.spawn().map_err(|e| e.to_string())?;
762    let deadline = SystemTime::now()
763        .duration_since(UNIX_EPOCH)
764        .map_err(|e| e.to_string())?
765        .as_secs()
766        + max_wait_sec.max(1);
767    loop {
768        if matches!(ipc_get_active_state(path), IpcActiveState::Listening) {
769            return Ok(());
770        }
771        let now = SystemTime::now()
772            .duration_since(UNIX_EPOCH)
773            .map_err(|e| e.to_string())?
774            .as_secs();
775        if now > deadline {
776            return Err("daemon not online yet".to_string());
777        }
778        thread::sleep(Duration::from_millis(50));
779    }
780}
781
782fn run_multiple(path: &Path, nr_threads: usize, bytecount: usize, batchsize: usize) -> i32 {
783    use std::sync::atomic::{AtomicUsize, Ordering as AOrd};
784    let sum_errors = Arc::new(AtomicUsize::new(0));
785    let sum_good = Arc::new(AtomicUsize::new(0));
786    let sum_join_errors = Arc::new(AtomicUsize::new(0));
787    let mut handles = Vec::new();
788    for k in 0..nr_threads {
789        let p = path.to_path_buf();
790        let letter = (b'A' + (k % 26) as u8) as char;
791        let base_count = bytecount + batchsize * (k / 26);
792        let batch = batchsize;
793        let sg = Arc::clone(&sum_good);
794        let se = Arc::clone(&sum_errors);
795        handles.push(thread::spawn(move || {
796            for t in 0..batch {
797                let n = base_count + t;
798                let mut msg = b"sendbytes ".to_vec();
799                msg.extend(std::iter::repeat_n(letter as u8, n));
800                let opts = IpcClientConnectOptions {
801                    wait_if_busy: true,
802                    wait_if_not_found: false,
803                    uds_disallow_chdir: true,
804                };
805                match ipc_client_send_command(&p, &opts, &msg) {
806                    Ok(resp) => {
807                        let tail = String::from_utf8_lossy(&resp);
808                        let tail = tail.trim_end();
809                        println!("sent:{}{:08} {tail}", letter, n);
810                        sg.fetch_add(1, AOrd::SeqCst);
811                    }
812                    Err(_) => {
813                        se.fetch_add(1, AOrd::SeqCst);
814                    }
815                }
816            }
817        }));
818    }
819    for h in handles {
820        if h.join().is_err() {
821            sum_join_errors.fetch_add(1, AOrd::SeqCst);
822        }
823    }
824    let good = sum_good.load(AOrd::SeqCst);
825    let je = sum_join_errors.load(AOrd::SeqCst);
826    let err = sum_errors.load(AOrd::SeqCst);
827    println!("client (good {good}) (join {je}), (errors {err})");
828    if je + err > 0 {
829        1
830    } else {
831        0
832    }
833}