Skip to main content

grit_lib/
simple_ipc.rs

1//! Git-compatible "simple IPC" over Unix domain sockets using pkt-line framing.
2//!
3//! Used by `test-tool simple-ipc` (`t0052-simple-ipc.sh`).
4
5use std::collections::VecDeque;
6use std::fs::OpenOptions;
7use std::io::{self, Read, Write};
8use std::net::Shutdown;
9use std::os::unix::fs::FileTypeExt;
10use std::os::unix::net::{UnixListener, UnixStream};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Condvar, Mutex};
14use std::thread;
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17/// Application callback return value that requests server shutdown (no reply).
18pub const SIMPLE_IPC_QUIT: i32 = -2;
19
20/// Git `LARGE_PACKET_DATA_MAX` (`65520 - 4`).
21const LARGE_PACKET_DATA_MAX: usize = 65516;
22
23const CONNECT_TIMEOUT_MS: i32 = 1000;
24const WAIT_STEP_MS: u64 = 50;
25
26/// Whether simple IPC is supported (Unix only).
27#[must_use]
28pub fn supports_simple_ipc() -> bool {
29    cfg!(unix)
30}
31
32#[derive(Clone, Copy, Debug, PartialEq, Eq)]
33pub enum IpcActiveState {
34    Listening,
35    NotListening,
36    InvalidPath,
37    PathNotFound,
38    OtherError,
39}
40
41fn packet_hex_len(payload_len: usize) -> io::Result<[u8; 4]> {
42    let packet_size = payload_len + 4;
43    if packet_size > 0xffff {
44        return Err(io::Error::new(
45            io::ErrorKind::InvalidInput,
46            "packet exceeds max size",
47        ));
48    }
49    Ok(hex4_upper(packet_size))
50}
51
52fn hex4_upper(mut n: usize) -> [u8; 4] {
53    const HEX: &[u8; 16] = b"0123456789abcdef";
54    let mut out = [0u8; 4];
55    for i in (0..4).rev() {
56        out[i] = HEX[n & 0xf];
57        n >>= 4;
58    }
59    out
60}
61
62fn write_packet(w: &mut dyn Write, buf: &[u8]) -> io::Result<()> {
63    if buf.len() > LARGE_PACKET_DATA_MAX {
64        return Err(io::Error::new(
65            io::ErrorKind::InvalidInput,
66            "packet exceeds max size",
67        ));
68    }
69    let hdr = packet_hex_len(buf.len())?;
70    w.write_all(&hdr)?;
71    w.write_all(buf)?;
72    Ok(())
73}
74
75fn write_packetized_from_buf(w: &mut dyn Write, mut data: &[u8]) -> io::Result<()> {
76    while !data.is_empty() {
77        let n = data.len().min(LARGE_PACKET_DATA_MAX);
78        write_packet(w, &data[..n])?;
79        data = &data[n..];
80    }
81    Ok(())
82}
83
84fn packet_flush_gently(w: &mut dyn Write) -> io::Result<()> {
85    w.write_all(b"0000")?;
86    w.flush()?;
87    Ok(())
88}
89
90fn read_one_packet<R: Read>(r: &mut R, buf: &mut Vec<u8>) -> io::Result<Option<()>> {
91    let mut linelen = [0u8; 4];
92    match r.read_exact(&mut linelen) {
93        Ok(()) => {}
94        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
95        Err(e) => return Err(e),
96    }
97    let len_str = std::str::from_utf8(&linelen).map_err(|e| {
98        io::Error::new(
99            io::ErrorKind::InvalidData,
100            format!("invalid pkt-line length encoding: {e}"),
101        )
102    })?;
103    let len = usize::from_str_radix(len_str, 16).map_err(|e| {
104        io::Error::new(
105            io::ErrorKind::InvalidData,
106            format!("invalid pkt-line length: {e}"),
107        )
108    })?;
109    match len {
110        0 => Ok(None),
111        1 | 2 => Ok(Some(())),
112        n if n < 4 => Err(io::Error::new(
113            io::ErrorKind::InvalidData,
114            format!("bad pkt-line length {n}"),
115        )),
116        n => {
117            let payload = n - 4;
118            let start = buf.len();
119            buf.resize(start + payload, 0);
120            r.read_exact(&mut buf[start..])?;
121            Ok(Some(()))
122        }
123    }
124}
125
126fn read_packetized_to_end<R: Read>(r: &mut R) -> io::Result<Vec<u8>> {
127    let mut out = Vec::new();
128    loop {
129        if read_one_packet(r, &mut out)?.is_none() {
130            break;
131        }
132    }
133    Ok(out)
134}
135
136fn unix_stream_connect(path: &Path, _disallow_chdir: bool) -> io::Result<UnixStream> {
137    // Git may chdir for overlong paths; harness paths (e.g. `ipc-test`) fit in `sun_path`.
138    UnixStream::connect(path)
139}
140
141fn connect_with_retry(path: &Path, wait_if_busy: bool, wait_if_not_found: bool) -> IpcActiveState {
142    let mut elapsed: i32 = 0;
143    loop {
144        match unix_stream_connect(path, false) {
145            Ok(s) => {
146                drop(s);
147                return IpcActiveState::Listening;
148            }
149            Err(e) => {
150                let code = e.raw_os_error();
151                let retry = match code {
152                    Some(libc::ENOENT) => wait_if_not_found,
153                    Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => wait_if_busy,
154                    _ => false,
155                };
156                if !retry || elapsed >= CONNECT_TIMEOUT_MS {
157                    return match code {
158                        Some(libc::ENOENT) => IpcActiveState::PathNotFound,
159                        Some(libc::ECONNREFUSED) => IpcActiveState::NotListening,
160                        _ => IpcActiveState::OtherError,
161                    };
162                }
163                thread::sleep(Duration::from_millis(WAIT_STEP_MS));
164                elapsed += WAIT_STEP_MS as i32;
165            }
166        }
167    }
168}
169
170/// Probe whether a server is accepting connections at `path`.
171pub fn ipc_get_active_state(path: &Path) -> IpcActiveState {
172    let meta = match std::fs::symlink_metadata(path) {
173        Ok(m) => m,
174        Err(e) if e.kind() == io::ErrorKind::NotFound => return IpcActiveState::NotListening,
175        Err(_) => return IpcActiveState::InvalidPath,
176    };
177    if !meta.file_type().is_socket() {
178        return IpcActiveState::InvalidPath;
179    }
180    connect_with_retry(path, false, false)
181}
182
183#[derive(Default)]
184pub struct IpcClientConnectOptions {
185    pub wait_if_busy: bool,
186    pub wait_if_not_found: bool,
187    pub uds_disallow_chdir: bool,
188}
189
190fn connect_for_client(path: &Path, options: &IpcClientConnectOptions) -> io::Result<UnixStream> {
191    let mut elapsed: i32 = 0;
192    loop {
193        match unix_stream_connect(path, options.uds_disallow_chdir) {
194            Ok(s) => return Ok(s),
195            Err(e) => {
196                let code = e.raw_os_error();
197                let retry = match code {
198                    Some(libc::ENOENT) => options.wait_if_not_found,
199                    Some(libc::ECONNREFUSED) | Some(libc::ETIMEDOUT) => options.wait_if_busy,
200                    _ => false,
201                };
202                if !retry || elapsed >= CONNECT_TIMEOUT_MS {
203                    return Err(e);
204                }
205                thread::sleep(Duration::from_millis(WAIT_STEP_MS));
206                elapsed += WAIT_STEP_MS as i32;
207            }
208        }
209    }
210}
211
212/// Connect, send pkt-line message + flush, read full response (until flush).
213pub fn ipc_client_send_command(
214    path: &Path,
215    options: &IpcClientConnectOptions,
216    message: &[u8],
217) -> io::Result<Vec<u8>> {
218    let mut stream = connect_for_client(path, options)?;
219    write_packetized_from_buf(&mut stream, message)?;
220    packet_flush_gently(&mut stream)?;
221    read_packetized_to_end(&mut stream)
222}
223
224fn block_sigpipe() {
225    use nix::sys::signal::{pthread_sigmask, SigSet, SigmaskHow, Signal};
226    let mut set = SigSet::empty();
227    set.add(Signal::SIGPIPE);
228    let _ = pthread_sigmask(SigmaskHow::SIG_BLOCK, Some(&set), None);
229}
230
231fn wait_for_io_start(stream: &UnixStream, server_shutdown: &AtomicBool) -> io::Result<()> {
232    use nix::poll::{poll, PollFd, PollFlags};
233    use std::os::fd::AsFd;
234    loop {
235        if server_shutdown.load(Ordering::SeqCst) {
236            return Err(io::Error::new(io::ErrorKind::ConnectionAborted, "shutdown"));
237        }
238        let mut fds = [PollFd::new(stream.as_fd(), PollFlags::POLLIN)];
239        match poll(&mut fds, 10u16) {
240            Ok(_) => {}
241            Err(nix::errno::Errno::EINTR) => continue,
242            Err(e) => return Err(io::Error::from_raw_os_error(e as i32)),
243        }
244        let revents = fds[0].revents().unwrap_or_else(PollFlags::empty);
245        if revents.contains(PollFlags::POLLHUP) {
246            return Err(io::Error::new(
247                io::ErrorKind::ConnectionAborted,
248                "client hangup",
249            ));
250        }
251        if revents.contains(PollFlags::POLLIN) {
252            return Ok(());
253        }
254    }
255}
256
257type AppCb = Arc<dyn Fn(&[u8], &mut dyn Write) -> i32 + Send + Sync + 'static>;
258
259struct WorkQueue {
260    fifo: Mutex<VecDeque<UnixStream>>,
261    cv: Condvar,
262    shutdown_requested: AtomicBool,
263    capacity: usize,
264}
265
266impl WorkQueue {
267    fn new(capacity: usize) -> Self {
268        Self {
269            fifo: Mutex::new(VecDeque::new()),
270            cv: Condvar::new(),
271            shutdown_requested: AtomicBool::new(false),
272            capacity,
273        }
274    }
275
276    fn enqueue(&self, stream: UnixStream) {
277        let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
278        if self.shutdown_requested.load(Ordering::SeqCst) {
279            return;
280        }
281        if guard.len() >= self.capacity {
282            return;
283        }
284        guard.push_back(stream);
285        self.cv.notify_one();
286    }
287
288    fn dequeue(&self) -> Option<UnixStream> {
289        let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
290        loop {
291            if let Some(s) = guard.pop_front() {
292                return Some(s);
293            }
294            if self.shutdown_requested.load(Ordering::SeqCst) {
295                return None;
296            }
297            guard = self.cv.wait(guard).unwrap_or_else(|e| e.into_inner());
298        }
299    }
300
301    fn stop(&self) {
302        self.shutdown_requested.store(true, Ordering::SeqCst);
303        let mut guard = self.fifo.lock().unwrap_or_else(|e| e.into_inner());
304        guard.clear();
305        drop(guard);
306        self.cv.notify_all();
307    }
308}
309
310fn serve_one_connection(
311    mut stream: UnixStream,
312    app: AppCb,
313    server_shutdown: Arc<AtomicBool>,
314    wake: Arc<Mutex<UnixStream>>,
315    queue: Arc<WorkQueue>,
316) {
317    if wait_for_io_start(&stream, &server_shutdown).is_err() {
318        let _ = stream.shutdown(Shutdown::Both);
319        return;
320    }
321    let request = match read_packetized_to_end(&mut stream) {
322        Ok(r) => r,
323        Err(_) => {
324            let _ = stream.shutdown(Shutdown::Both);
325            return;
326        }
327    };
328    let ret = app(&request, &mut stream);
329    let _ = packet_flush_gently(&mut stream);
330    let _ = stream.shutdown(Shutdown::Both);
331    if ret == SIMPLE_IPC_QUIT {
332        server_shutdown.store(true, Ordering::SeqCst);
333        queue.stop();
334        if let Ok(mut tx) = wake.lock() {
335            let _ = tx.write_all(b"Q");
336        }
337    }
338}
339
340#[derive(Debug)]
341pub enum ServerRunError {
342    Io(io::Error),
343    AddressInUse,
344}
345
346impl std::fmt::Display for ServerRunError {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        match self {
349            ServerRunError::Io(e) => write!(f, "{e}"),
350            ServerRunError::AddressInUse => write!(f, "socket path already in use"),
351        }
352    }
353}
354
355impl std::error::Error for ServerRunError {
356    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
357        match self {
358            ServerRunError::Io(e) => Some(e),
359            ServerRunError::AddressInUse => None,
360        }
361    }
362}
363
364fn try_bind_server(path: &Path) -> io::Result<()> {
365    let lock_path = path.with_extension("lock");
366    let _ = OpenOptions::new()
367        .write(true)
368        .create(true)
369        .truncate(true)
370        .open(&lock_path);
371
372    if path.exists() {
373        if is_socket(path) && unix_stream_connect(path, false).is_ok() {
374            let _ = std::fs::remove_file(&lock_path);
375            return Err(io::Error::new(
376                io::ErrorKind::AddrInUse,
377                "another server is listening",
378            ));
379        }
380        let _ = std::fs::remove_file(path);
381    }
382
383    let _ = std::fs::remove_file(&lock_path);
384    Ok(())
385}
386
387fn is_socket(path: &Path) -> bool {
388    std::fs::symlink_metadata(path)
389        .ok()
390        .is_some_and(|m| m.file_type().is_socket())
391}
392
393/// Blocking IPC server (`run-daemon`).
394pub fn ipc_server_run(path: &Path, nr_threads: usize, app: AppCb) -> Result<(), ServerRunError> {
395    try_bind_server(path).map_err(|e| {
396        if e.kind() == io::ErrorKind::AddrInUse {
397            ServerRunError::AddressInUse
398        } else {
399            ServerRunError::Io(e)
400        }
401    })?;
402
403    let listener = UnixListener::bind(path).map_err(ServerRunError::Io)?;
404    listener.set_nonblocking(true).map_err(ServerRunError::Io)?;
405
406    let nr_threads = nr_threads.max(1);
407    let capacity = nr_threads.saturating_mul(100).max(1);
408    let queue = Arc::new(WorkQueue::new(capacity));
409    let server_shutdown = Arc::new(AtomicBool::new(false));
410
411    let (shutdown_tx, shutdown_rx) = UnixStream::pair().map_err(ServerRunError::Io)?;
412    shutdown_rx
413        .set_nonblocking(true)
414        .map_err(ServerRunError::Io)?;
415    let wake = Arc::new(Mutex::new(shutdown_tx));
416
417    let mut worker_handles = Vec::new();
418    for _ in 0..nr_threads {
419        let q = Arc::clone(&queue);
420        let app_w = Arc::clone(&app);
421        let shut = Arc::clone(&server_shutdown);
422        let wake_w = Arc::clone(&wake);
423        let q_for_worker = Arc::clone(&queue);
424        worker_handles.push(thread::spawn(move || {
425            block_sigpipe();
426            while let Some(stream) = q.dequeue() {
427                serve_one_connection(
428                    stream,
429                    app_w.clone(),
430                    shut.clone(),
431                    wake_w.clone(),
432                    q_for_worker.clone(),
433                );
434            }
435        }));
436    }
437
438    block_sigpipe();
439    use nix::poll::{poll, PollFd, PollFlags};
440    use std::os::fd::AsFd;
441
442    loop {
443        if server_shutdown.load(Ordering::SeqCst) {
444            break;
445        }
446        // fds[0] = shutdown pipe, fds[1] = listening socket.
447        let mut fds = [
448            PollFd::new(shutdown_rx.as_fd(), PollFlags::POLLIN),
449            PollFd::new(listener.as_fd(), PollFlags::POLLIN),
450        ];
451        match poll(&mut fds, 60_000u16) {
452            Ok(_) => {}
453            Err(nix::errno::Errno::EINTR) => continue,
454            Err(_) => break,
455        }
456        let revents0 = fds[0].revents().unwrap_or_else(PollFlags::empty);
457        let revents1 = fds[1].revents().unwrap_or_else(PollFlags::empty);
458        if revents0.contains(PollFlags::POLLIN) {
459            break;
460        }
461        if revents1.contains(PollFlags::POLLIN) {
462            loop {
463                match listener.accept() {
464                    Ok((stream, _)) => {
465                        let _ = stream.set_nonblocking(false);
466                        queue.enqueue(stream);
467                    }
468                    Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
469                    Err(_) => break,
470                }
471            }
472        }
473    }
474
475    queue.stop();
476    drop(listener);
477    for h in worker_handles {
478        let _ = h.join();
479    }
480    let _ = std::fs::remove_file(path);
481    Ok(())
482}
483
484/// Test daemon command handler providing Git-compatible test-IPC behavior.
485#[must_use]
486pub fn test_app_callback() -> AppCb {
487    Arc::new(|request: &[u8], reply: &mut dyn Write| {
488        if request == b"quit" {
489            return SIMPLE_IPC_QUIT;
490        }
491        if request == b"ping" {
492            let _ = write_packetized_from_buf(reply, b"pong");
493            return 0;
494        }
495        if request == b"big" {
496            let mut line = Vec::with_capacity(84);
497            for row in 0..10_000 {
498                line.clear();
499                use std::io::Write as _;
500                let _ = writeln!(&mut line, "big: {:075}", row);
501                let _ = write_packetized_from_buf(reply, &line);
502            }
503            return 0;
504        }
505        if request == b"chunk" {
506            let mut line = Vec::with_capacity(84);
507            for row in 0..10_000 {
508                line.clear();
509                use std::io::Write as _;
510                let _ = writeln!(&mut line, "big: {:075}", row);
511                let _ = write_packet(reply, &line);
512            }
513            return 0;
514        }
515        if request == b"slow" {
516            let mut line = Vec::with_capacity(84);
517            for row in 0..1000 {
518                line.clear();
519                use std::io::Write as _;
520                let _ = writeln!(&mut line, "big: {:075}", row);
521                let _ = write_packet(reply, &line);
522                thread::sleep(Duration::from_millis(10));
523            }
524            return 0;
525        }
526        if request.len() >= 10 && request.starts_with(b"sendbytes ") {
527            return handle_sendbytes(request, reply);
528        }
529        let msg = format!("unhandled command: {}", String::from_utf8_lossy(request));
530        let _ = write_packetized_from_buf(reply, msg.as_bytes());
531        0
532    })
533}
534
535fn handle_sendbytes(request: &[u8], reply: &mut dyn Write) -> i32 {
536    let rest = &request[b"sendbytes ".len()..];
537    if rest.is_empty() {
538        return 0;
539    }
540    let b0 = rest[0];
541    let mut errs = 0usize;
542    for &b in &rest[1..] {
543        if b != b0 {
544            errs += 1;
545        }
546    }
547    if errs > 0 {
548        let msg = format!("errs:{errs}\n");
549        let _ = write_packetized_from_buf(reply, msg.as_bytes());
550    } else {
551        let msg = format!("rcvd:{}{:08}\n", char::from(b0), rest.len());
552        let _ = write_packetized_from_buf(reply, msg.as_bytes());
553    }
554    0
555}
556
557/// `test-tool simple-ipc` entry (Unix). Returns exit code.
558pub fn run_simple_ipc_tool(args: &[String]) -> i32 {
559    if args.first().map(|s| s.as_str()) == Some("SUPPORTS_SIMPLE_IPC") {
560        return 0;
561    }
562    if args.is_empty() {
563        eprintln!("usage: test-tool simple-ipc <subcommand> ...");
564        return 1;
565    }
566
567    let mut path = PathBuf::from("ipc-test");
568    let mut nr_threads = 5usize;
569    let mut max_wait_sec = 60u64;
570    let mut bytecount = 1024usize;
571    let mut batchsize = 10usize;
572    let mut token: Option<String> = None;
573    let mut bytevalue: u8 = b'x';
574
575    let sub = args[0].clone();
576    let mut i = 1usize;
577    while i < args.len() {
578        let a = args[i].as_str();
579        if let Some(v) = a.strip_prefix("--name=") {
580            path = PathBuf::from(v);
581        } else if let Some(v) = a.strip_prefix("--threads=") {
582            nr_threads = v.parse().unwrap_or(1).max(1);
583        } else if let Some(v) = a.strip_prefix("--max-wait=") {
584            max_wait_sec = v.parse().unwrap_or(0);
585        } else if let Some(v) = a.strip_prefix("--bytecount=") {
586            bytecount = v.parse().unwrap_or(1).max(1);
587        } else if let Some(v) = a.strip_prefix("--batchsize=") {
588            batchsize = v.parse().unwrap_or(1).max(1);
589        } else if let Some(v) = a.strip_prefix("--token=") {
590            token = Some(v.to_string());
591        } else if let Some(v) = a.strip_prefix("--byte=") {
592            if let Some(c) = v.as_bytes().first() {
593                bytevalue = *c;
594            }
595        }
596        i += 1;
597    }
598
599    match sub.as_str() {
600        "is-active" => match ipc_get_active_state(&path) {
601            IpcActiveState::Listening => 0,
602            IpcActiveState::NotListening => {
603                eprintln!("no server listening at '{}'", path.display());
604                1
605            }
606            IpcActiveState::PathNotFound => {
607                eprintln!("path not found '{}'", path.display());
608                1
609            }
610            IpcActiveState::InvalidPath => {
611                eprintln!("invalid pipe/socket name '{}'", path.display());
612                1
613            }
614            IpcActiveState::OtherError => {
615                eprintln!("other error for '{}'", path.display());
616                1
617            }
618        },
619        "run-daemon" => {
620            let app = test_app_callback();
621            match ipc_server_run(&path, nr_threads, app) {
622                Ok(()) => 0,
623                Err(ServerRunError::AddressInUse) => {
624                    eprintln!("socket/pipe already in use: '{}'", path.display());
625                    1
626                }
627                Err(ServerRunError::Io(e)) => {
628                    eprintln!("could not start server on '{}': {e}", path.display());
629                    1
630                }
631            }
632        }
633        "start-daemon" => match spawn_daemon(&path, nr_threads, max_wait_sec) {
634            Ok(()) => 0,
635            Err(e) => {
636                eprintln!("{e}");
637                1
638            }
639        },
640        "stop-daemon" => {
641            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
642                eprintln!("no server listening at '{}'", path.display());
643                return 1;
644            }
645            let opts = IpcClientConnectOptions {
646                wait_if_busy: true,
647                wait_if_not_found: false,
648                uds_disallow_chdir: false,
649            };
650            if ipc_client_send_command(&path, &opts, b"quit").is_err() {
651                return 1;
652            }
653            let deadline = SystemTime::now()
654                .duration_since(UNIX_EPOCH)
655                .unwrap_or_default()
656                .as_secs()
657                + max_wait_sec;
658            loop {
659                if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
660                    return 0;
661                }
662                let now = SystemTime::now()
663                    .duration_since(UNIX_EPOCH)
664                    .unwrap_or_default()
665                    .as_secs();
666                if now > deadline {
667                    eprintln!("daemon has not shutdown yet");
668                    return 1;
669                }
670                thread::sleep(Duration::from_millis(100));
671            }
672        }
673        "send" => {
674            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
675                eprintln!("no server listening at '{}'", path.display());
676                return 1;
677            }
678            let cmd = token.as_deref().unwrap_or("(no-command)");
679            let opts = IpcClientConnectOptions {
680                wait_if_busy: true,
681                wait_if_not_found: false,
682                uds_disallow_chdir: false,
683            };
684            match ipc_client_send_command(&path, &opts, cmd.as_bytes()) {
685                Ok(resp) => {
686                    if !resp.is_empty() {
687                        println!("{}", String::from_utf8_lossy(&resp).trim_end());
688                    }
689                    0
690                }
691                Err(_) => {
692                    eprintln!("failed to send '{cmd}' to '{}'", path.display());
693                    1
694                }
695            }
696        }
697        "sendbytes" => {
698            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
699                eprintln!("no server listening at '{}'", path.display());
700                return 1;
701            }
702            let mut msg = b"sendbytes ".to_vec();
703            msg.extend(std::iter::repeat_n(bytevalue, bytecount));
704            let opts = IpcClientConnectOptions {
705                wait_if_busy: true,
706                wait_if_not_found: false,
707                uds_disallow_chdir: false,
708            };
709            match ipc_client_send_command(&path, &opts, &msg) {
710                Ok(resp) => {
711                    let tail = String::from_utf8_lossy(&resp);
712                    let tail = tail.trim_end();
713                    println!("sent:{}{:08} {tail}", char::from(bytevalue), bytecount);
714                    0
715                }
716                Err(_) => 1,
717            }
718        }
719        "multiple" => {
720            if !matches!(ipc_get_active_state(&path), IpcActiveState::Listening) {
721                eprintln!("no server listening at '{}'", path.display());
722                return 1;
723            }
724            run_multiple(&path, nr_threads, bytecount, batchsize)
725        }
726        _ => {
727            eprintln!("Unhandled subcommand: '{sub}'");
728            1
729        }
730    }
731}
732
733fn spawn_daemon(path: &Path, nr_threads: usize, max_wait_sec: u64) -> Result<(), String> {
734    use std::process::{Command, Stdio};
735    let exe = std::env::current_exe().map_err(|e| e.to_string())?;
736    let mut cmd = Command::new(exe);
737    cmd.arg("test-tool")
738        .arg("simple-ipc")
739        .arg("run-daemon")
740        .arg(format!("--name={}", path.display()))
741        .arg(format!("--threads={nr_threads}"))
742        .stdin(Stdio::null())
743        .stdout(Stdio::null())
744        .stderr(Stdio::null());
745    cmd.spawn().map_err(|e| e.to_string())?;
746    let deadline = SystemTime::now()
747        .duration_since(UNIX_EPOCH)
748        .map_err(|e| e.to_string())?
749        .as_secs()
750        + max_wait_sec.max(1);
751    loop {
752        if matches!(ipc_get_active_state(path), IpcActiveState::Listening) {
753            return Ok(());
754        }
755        let now = SystemTime::now()
756            .duration_since(UNIX_EPOCH)
757            .map_err(|e| e.to_string())?
758            .as_secs();
759        if now > deadline {
760            return Err("daemon not online yet".to_string());
761        }
762        thread::sleep(Duration::from_millis(50));
763    }
764}
765
766fn run_multiple(path: &Path, nr_threads: usize, bytecount: usize, batchsize: usize) -> i32 {
767    use std::sync::atomic::{AtomicUsize, Ordering as AOrd};
768    let sum_errors = Arc::new(AtomicUsize::new(0));
769    let sum_good = Arc::new(AtomicUsize::new(0));
770    let sum_join_errors = Arc::new(AtomicUsize::new(0));
771    let mut handles = Vec::new();
772    for k in 0..nr_threads {
773        let p = path.to_path_buf();
774        let letter = (b'A' + (k % 26) as u8) as char;
775        let base_count = bytecount + batchsize * (k / 26);
776        let batch = batchsize;
777        let sg = Arc::clone(&sum_good);
778        let se = Arc::clone(&sum_errors);
779        handles.push(thread::spawn(move || {
780            for t in 0..batch {
781                let n = base_count + t;
782                let mut msg = b"sendbytes ".to_vec();
783                msg.extend(std::iter::repeat_n(letter as u8, n));
784                let opts = IpcClientConnectOptions {
785                    wait_if_busy: true,
786                    wait_if_not_found: false,
787                    uds_disallow_chdir: true,
788                };
789                match ipc_client_send_command(&p, &opts, &msg) {
790                    Ok(resp) => {
791                        let tail = String::from_utf8_lossy(&resp);
792                        let tail = tail.trim_end();
793                        println!("sent:{}{:08} {tail}", letter, n);
794                        sg.fetch_add(1, AOrd::SeqCst);
795                    }
796                    Err(_) => {
797                        se.fetch_add(1, AOrd::SeqCst);
798                    }
799                }
800            }
801        }));
802    }
803    for h in handles {
804        if h.join().is_err() {
805            sum_join_errors.fetch_add(1, AOrd::SeqCst);
806        }
807    }
808    let good = sum_good.load(AOrd::SeqCst);
809    let je = sum_join_errors.load(AOrd::SeqCst);
810    let err = sum_errors.load(AOrd::SeqCst);
811    println!("client (good {good}) (join {je}), (errors {err})");
812    if je + err > 0 {
813        1
814    } else {
815        0
816    }
817}