Skip to main content

sync_fd/
channel.rs

1use std::io::{self};
2use std::mem::MaybeUninit;
3use std::os::fd::BorrowedFd;
4use std::os::unix::io::{AsRawFd, RawFd};
5use std::sync::mpsc::{self, Receiver, Sender};
6use std::time::{Duration, Instant};
7
8use libc::FD_ZERO;
9
10#[derive(Debug, PartialEq, Eq, Clone, Copy)]
11pub enum SendError<T> {
12    Disconnected(T),
13}
14
15#[derive(Debug, PartialEq, Eq, Clone, Copy)]
16pub enum TrySendError<T> {
17    Full(T),
18    Disconnected(T),
19}
20
21impl<T> From<mpsc::SendError<T>> for SendError<T> {
22    fn from(err: mpsc::SendError<T>) -> Self {
23        SendError::Disconnected(err.0)
24    }
25}
26
27#[derive(Debug, PartialEq, Eq, Clone, Copy)]
28pub enum RecvError {
29    Disconnected,
30}
31
32impl From<mpsc::RecvError> for RecvError {
33    fn from(_: mpsc::RecvError) -> Self {
34        RecvError::Disconnected
35    }
36}
37
38#[derive(Debug, PartialEq, Eq, Clone, Copy)]
39pub enum TryRecvError {
40    Empty,
41    Disconnected,
42}
43
44impl From<mpsc::TryRecvError> for TryRecvError {
45    fn from(err: mpsc::TryRecvError) -> Self {
46        match err {
47            mpsc::TryRecvError::Empty => TryRecvError::Empty,
48            mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
49        }
50    }
51}
52
53pub enum RecvTimeoutError {
54    Timeout,
55    Disconnected,
56}
57
58impl From<mpsc::RecvTimeoutError> for RecvTimeoutError {
59    fn from(err: mpsc::RecvTimeoutError) -> Self {
60        match err {
61            mpsc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
62            mpsc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
63        }
64    }
65}
66
67/// Sender half of an MpscFd pair
68pub struct MpscFdSender<T> {
69    sender: Sender<T>,
70    pipe_write: std::io::PipeWriter,
71}
72
73/// Receiver half of an MpscFd pair
74pub struct MpscFdReceiver<T> {
75    receiver: Receiver<T>,
76    blocking: bool,
77    pipe_read: std::io::PipeReader,
78}
79
80/// Create a new MpscFd pair (sender and receiver). This acts like a
81/// `std::sync::mpsc` channel, but with a file descriptor associated with each
82/// end. The file descriptor may be polled with `libc::poll` or `libc::select`
83/// to wait for data to be available alongside other file descriptors.
84///
85/// One byte is written to the pipe for each message sent. One byte is read from
86/// the pipe for each message received.
87///
88/// The underlying implementation currently uses mpsc channels and pipes, but
89/// this may change in the future.
90///
91/// The underlying file descriptor is useful for the receiver
92pub fn mpsc_fd_pair<T>() -> io::Result<(MpscFdSender<T>, MpscFdReceiver<T>)> {
93    let (sender, receiver) = mpsc::channel();
94    let (pipe_read, pipe_write) = std::io::pipe()?;
95
96    set_nonblocking(pipe_write.as_raw_fd())?;
97    set_nonblocking(pipe_read.as_raw_fd())?;
98
99    Ok((
100        MpscFdSender { sender, pipe_write },
101        MpscFdReceiver {
102            receiver,
103            blocking: true,
104            pipe_read,
105        },
106    ))
107}
108
109fn set_nonblocking(fd: RawFd) -> io::Result<()> {
110    let flags = libc_fcntl(fd, libc::F_GETFL)?;
111    let _ = libc_fcntl_int(fd, libc::F_SETFL, flags | libc::O_NONBLOCK)?;
112    Ok(())
113}
114
115fn set_blocking(fd: RawFd) -> io::Result<()> {
116    let flags = libc_fcntl(fd, libc::F_GETFL)?;
117    let _ = libc_fcntl_int(fd, libc::F_SETFL, flags & !libc::O_NONBLOCK)?;
118    Ok(())
119}
120
121fn libc_fcntl(fd: RawFd, request: std::ffi::c_int) -> io::Result<std::ffi::c_int> {
122    loop {
123        let result = unsafe { libc::fcntl(fd, request) };
124        if result == -1 {
125            let err = io::Error::last_os_error();
126            if err.kind() == io::ErrorKind::Interrupted {
127                continue;
128            }
129            break Err(err);
130        }
131        break Ok(result);
132    }
133}
134
135fn libc_fcntl_int(fd: RawFd, request: std::ffi::c_int, arg: std::ffi::c_int) -> io::Result<()> {
136    loop {
137        let result = unsafe { libc::fcntl(fd, request, arg) };
138        if result == -1 {
139            let err = io::Error::last_os_error();
140            if err.kind() == io::ErrorKind::Interrupted {
141                continue;
142            }
143            break Err(err);
144        }
145        break Ok(());
146    }
147}
148
149fn libc_read(fd: RawFd, buf: &mut [u8]) -> io::Result<usize> {
150    loop {
151        let result = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
152        if result <= -1 {
153            let err = io::Error::last_os_error();
154            if err.kind() == io::ErrorKind::Interrupted {
155                continue;
156            }
157            break Err(err);
158        }
159        break Ok(result as usize);
160    }
161}
162
163fn libc_write(fd: RawFd, buf: &[u8]) -> io::Result<usize> {
164    loop {
165        let result = unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) };
166        if result <= -1 {
167            let err = io::Error::last_os_error();
168            if err.kind() == io::ErrorKind::Interrupted {
169                continue;
170            }
171            break Err(err);
172        }
173        break Ok(result as usize);
174    }
175}
176
177struct FdSet {
178    fdset: MaybeUninit<libc::fd_set>,
179    max: RawFd,
180}
181
182impl FdSet {
183    fn new() -> Self {
184        let mut fdset = MaybeUninit::uninit();
185        unsafe {
186            FD_ZERO(fdset.as_mut_ptr());
187        }
188
189        Self { fdset, max: 0 }
190    }
191
192    unsafe fn copy(&self, fds: *mut libc::fd_set) {
193        unsafe {
194            libc::memcpy(
195                fds as *mut _,
196                self.fdset.as_ptr() as *const _,
197                std::mem::size_of::<libc::fd_set>(),
198            )
199        };
200    }
201
202    fn set(&mut self, fd: RawFd) {
203        self.max = self.max.max(fd + 1);
204        unsafe {
205            libc::FD_SET(fd, self.fdset.as_mut_ptr());
206        }
207    }
208
209    fn is_set(&self, fd: RawFd) -> bool {
210        unsafe { libc::FD_ISSET(fd, self.fdset.as_ptr()) }
211    }
212}
213
214/// `select()` returns the number of ready descriptors that are contained in the
215/// descriptor sets,.  If the time limit expires, `select()` returns 0.
216fn libc_select(
217    read: Option<&FdSet>,
218    write: Option<&FdSet>,
219    error: Option<&FdSet>,
220    timeout: std::time::Duration,
221) -> io::Result<usize> {
222    let mut readfds = read.map(|_| MaybeUninit::uninit());
223    let read_ptr = readfds
224        .as_mut()
225        .map(|r| r.as_mut_ptr())
226        .unwrap_or(std::ptr::null_mut());
227    let mut writefds = write.map(|_| MaybeUninit::uninit());
228    let write_ptr = writefds
229        .as_mut()
230        .map(|w| w.as_mut_ptr())
231        .unwrap_or(std::ptr::null_mut());
232    let mut errorfds = error.map(|_| MaybeUninit::uninit());
233    let error_ptr = errorfds
234        .as_mut()
235        .map(|e| e.as_mut_ptr())
236        .unwrap_or(std::ptr::null_mut());
237    let max = read
238        .map(|r| r.max)
239        .max(write.map(|w| w.max))
240        .max(error.map(|e| e.max))
241        .unwrap_or(0);
242
243    loop {
244        unsafe {
245            if let Some(read) = read {
246                read.copy(read_ptr);
247            }
248            if let Some(write) = write {
249                write.copy(write_ptr);
250            }
251            if let Some(error) = error {
252                error.copy(error_ptr);
253            }
254
255            let mut timeout = libc::timeval {
256                tv_sec: timeout.as_secs() as _,
257                tv_usec: timeout.subsec_micros() as _,
258            };
259
260            let result = libc::select(max, read_ptr, write_ptr, error_ptr, &mut timeout);
261            if result <= -1 {
262                let err = io::Error::last_os_error();
263                if err.kind() == io::ErrorKind::Interrupted {
264                    continue;
265                }
266                return Err(io::Error::last_os_error());
267            }
268            break Ok(result as usize);
269        }
270    }
271}
272
273impl<T> MpscFdSender<T> {
274    /// Send a message through the mpsc channel and write a byte to the pipe.
275    /// Fails immediately if the pipe is full.
276    pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
277        // Write a byte to the pipe using libc
278        let fd = self.pipe_write.as_raw_fd();
279        let buf = [0u8; 1];
280        loop {
281            let result = unsafe { libc::write(fd, buf.as_ptr() as *const _, 1) };
282            if result == -1 {
283                let err = io::Error::last_os_error();
284                if err.kind() == io::ErrorKind::WouldBlock {
285                    return Err(TrySendError::Full(msg));
286                }
287                if err.kind() == io::ErrorKind::Interrupted {
288                    continue;
289                }
290            }
291            if result != 1 {
292                return Err(TrySendError::Full(msg));
293            }
294
295            // If this fails, the receiver has hung up. We don't care about the
296            // pipe, it'll be closed.
297            self.sender
298                .send(msg)
299                .map_err(|msg| TrySendError::Disconnected(msg.0))?;
300            break;
301        }
302
303        Ok(())
304    }
305
306    /// Send a message through the mpsc channel and block until the message is
307    /// sent.
308    ///
309    /// This may not be fair in the case of multiple senders.
310    pub fn blocking_send(&mut self, msg: T) -> Result<(), SendError<T>> {
311        let fd = self.pipe_write.as_raw_fd();
312        let mut writefds = FdSet::new();
313        let mut errorfds = FdSet::new();
314
315        writefds.set(fd);
316        errorfds.set(fd);
317
318        loop {
319            // Wait for the pipe to be writable (or error)
320            match libc_select(
321                None,
322                Some(&writefds),
323                Some(&errorfds),
324                std::time::Duration::from_secs(60),
325            ) {
326                Ok(0) => continue,
327                Ok(_) => {}
328                Err(_) => return Err(SendError::Disconnected(msg)),
329            };
330
331            // We only have one FD so it's either writable or errored.
332
333            // Write a byte to the pipe using libc
334            let buf = [0u8; 1];
335            match libc_write(fd, &buf) {
336                Ok(1) => {}
337                Ok(_) => unreachable!("Should only write 1 byte"),
338                Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
339                Err(_) => return Err(SendError::Disconnected(msg)),
340            };
341
342            self.sender.send(msg)?;
343            break Ok(());
344        }
345    }
346
347    pub fn try_clone(&self) -> io::Result<Self> {
348        Ok(MpscFdSender {
349            sender: self.sender.clone(),
350            pipe_write: self.pipe_write.try_clone()?,
351        })
352    }
353}
354
355impl<T> MpscFdReceiver<T> {
356    /// Receive a message from the mpsc channel and read a byte from the pipe.
357    pub fn recv(&mut self) -> Result<T, RecvError> {
358        let msg = self.receiver.recv()?;
359
360        // If we got a byte, or the pipe is closed, return this message
361        if !matches!(
362            self.read_byte_nonblocking()
363                .map_err(|_| RecvError::Disconnected)?,
364            ReadByteResult::WouldBlock
365        ) {
366            return Ok(msg);
367        }
368
369        loop {
370            // Keep waiting for the pipe to be readable or failed
371            self.select_timeout(Duration::from_secs(60 * 60))
372                .map_err(|_| RecvError::Disconnected)?;
373            if matches!(
374                self.read_byte_nonblocking()
375                    .map_err(|_| RecvError::Disconnected)?,
376                ReadByteResult::WouldBlock
377            ) {
378                continue;
379            }
380
381            break Ok(msg);
382        }
383    }
384
385    /// Try to receive a message without blocking.
386    ///
387    /// Readability of the fd is a hint that this will likely return a value, however
388    /// this may return Empty and leave the fd readable in certain cases.
389    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
390        // If we got a byte, we block on the channel
391        if matches!(self.read_byte_nonblocking(), Ok(ReadByteResult::Success)) {
392            // This may block slightly if the pipe was written but the task was
393            // unscheduled.
394            return self.receiver.recv().map_err(|_| TryRecvError::Disconnected);
395        }
396
397        // Otherwise it's Eof or WouldBlock, so delegate to try_recv on the channel
398        Ok(self.receiver.try_recv()?)
399    }
400
401    /// Receive with timeout. Note that this may block slightly more than `timeout`
402    /// if the pipe was written but the channel was not.
403    ///
404    /// Readability of the fd is a hint that this will likely return a value, however
405    /// this may return Empty and leave the fd readable in certain cases.
406    pub fn recv_timeout(&mut self, mut timeout: Duration) -> Result<T, RecvTimeoutError> {
407        loop {
408            let start = Instant::now();
409            if !self
410                .select_timeout(timeout)
411                .map_err(|_| RecvTimeoutError::Disconnected)?
412            {
413                return Err(RecvTimeoutError::Timeout);
414            }
415
416            match self.read_byte_nonblocking() {
417                Ok(ReadByteResult::Success) => {
418                    // This may block slightly more than `timeout` if the pipe was
419                    // written but the channel was not due to task scheduling.
420                    break self
421                        .receiver
422                        .recv()
423                        .map_err(|_| RecvTimeoutError::Disconnected);
424                }
425                Ok(ReadByteResult::Eof) => {
426                    // The pipe was closed, so delegate to the channel's recv_timeout
427                    break Ok(self.receiver.recv_timeout(
428                        timeout
429                            .checked_sub(start.elapsed())
430                            .unwrap_or(Duration::from_secs(0)),
431                    )?);
432                }
433                Ok(ReadByteResult::WouldBlock) => {
434                    timeout = timeout
435                        .checked_sub(start.elapsed())
436                        .unwrap_or(Duration::from_secs(0));
437                    if timeout == Duration::from_secs(0) {
438                        return Err(RecvTimeoutError::Timeout);
439                    }
440                }
441                Err(_) => return Err(RecvTimeoutError::Disconnected),
442            }
443        }
444    }
445
446    fn select_timeout(&mut self, timeout: Duration) -> Result<bool, TryRecvError> {
447        let mut readfds = FdSet::new();
448        let mut errorfds = FdSet::new();
449
450        let fd = self.pipe_read.as_raw_fd();
451        readfds.set(fd);
452        errorfds.set(fd);
453
454        match libc_select(Some(&readfds), None, Some(&errorfds), timeout) {
455            Ok(0) => Ok(false),
456            Ok(_) => Ok(true),
457            Err(_) => Err(TryRecvError::Disconnected),
458        }
459    }
460
461    fn read_byte_nonblocking(&mut self) -> io::Result<ReadByteResult> {
462        // Try to read a byte from the pipe
463        let fd = self.pipe_read.as_raw_fd();
464        let mut buf = [0u8; 1];
465        Ok(match libc_read(fd, &mut buf) {
466            // If the pipe is closed, we'll continue reading out of the channel
467            Ok(1) => ReadByteResult::Success,
468            Ok(0) => ReadByteResult::Eof,
469            Err(e) if e.kind() == io::ErrorKind::BrokenPipe => ReadByteResult::Eof,
470
471            Ok(_) => unreachable!("Should only read at most 1 byte"),
472            Err(e) if e.kind() == io::ErrorKind::WouldBlock => ReadByteResult::WouldBlock,
473            Err(e) => return Err(e),
474        })
475    }
476}
477
478enum ReadByteResult {
479    Success,
480    Eof,
481    WouldBlock,
482}
483
484#[cfg(unix)]
485impl<T> AsRawFd for MpscFdReceiver<T> {
486    fn as_raw_fd(&self) -> RawFd {
487        self.pipe_read.as_raw_fd()
488    }
489}
490
491#[cfg(unix)]
492impl<T> AsRawFd for MpscFdSender<T> {
493    fn as_raw_fd(&self) -> RawFd {
494        self.pipe_write.as_raw_fd()
495    }
496}
497
498#[cfg(unix)]
499impl<'a, T> Into<BorrowedFd<'a>> for &'a MpscFdReceiver<T> {
500    fn into(self) -> BorrowedFd<'a> {
501        unsafe { BorrowedFd::borrow_raw(self.pipe_read.as_raw_fd() as _) }
502    }
503}
504
505#[cfg(unix)]
506impl<'a, T> Into<BorrowedFd<'a>> for &'a MpscFdSender<T> {
507    fn into(self) -> BorrowedFd<'a> {
508        unsafe { BorrowedFd::borrow_raw(self.pipe_write.as_raw_fd() as _) }
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use std::sync::{Arc, Mutex};
516    use std::thread;
517    use std::time::{Duration, Instant};
518
519    #[test]
520    fn test_mpsc_fd_pair() {
521        let (mut sender, mut receiver) = mpsc_fd_pair::<u32>().unwrap();
522
523        sender.blocking_send(1).unwrap();
524        assert_eq!(receiver.recv().unwrap(), 1);
525
526        sender.blocking_send(2).unwrap();
527        assert_eq!(receiver.recv().unwrap(), 2);
528
529        sender.blocking_send(3).unwrap();
530        assert_eq!(receiver.recv().unwrap(), 3);
531
532        drop(sender);
533        assert_eq!(receiver.recv().unwrap_err(), RecvError::Disconnected);
534    }
535
536    /// Torture test 1: Rapid switching between blocking and non-blocking receive
537    /// with multiple senders and receivers, testing edge cases around pipe buffer limits
538    #[test]
539    fn torture_test_rapid_mode_switching() {
540        let (mut sender1, mut receiver) = mpsc_fd_pair::<u32>().unwrap();
541        let mut sender2 = sender1.try_clone().unwrap();
542        let mut sender3 = sender1.try_clone().unwrap();
543
544        let start_time = Instant::now();
545
546        // Spawn receiver thread that rapidly switches between modes
547        let receiver_handle = thread::spawn(move || {
548            let mut local_received = Vec::new();
549            let mut mode_switch_count = 0;
550
551            while start_time.elapsed() < Duration::from_secs(5) {
552                // Switch modes every few iterations
553                if mode_switch_count % 10 == 0 {
554                    // Non-blocking mode
555                    loop {
556                        match receiver.try_recv() {
557                            Ok(msg) => local_received.push(msg),
558                            Err(TryRecvError::Empty) => break,
559                            Err(TryRecvError::Disconnected) => return local_received,
560                        }
561                    }
562                } else if mode_switch_count % 10 == 5 {
563                    // Timeout mode with very short timeout
564                    match receiver.recv_timeout(Duration::from_millis(1)) {
565                        Ok(msg) => local_received.push(msg),
566                        Err(RecvTimeoutError::Timeout) => {}
567                        Err(RecvTimeoutError::Disconnected) => return local_received,
568                    }
569                } else {
570                    // Blocking mode
571                    match receiver.recv() {
572                        Ok(msg) => local_received.push(msg),
573                        Err(RecvError::Disconnected) => return local_received,
574                    }
575                }
576                mode_switch_count += 1;
577            }
578            local_received
579        });
580
581        // Spawn multiple sender threads with different patterns
582        let sender1_handle = thread::spawn(move || {
583            for i in 0..1000 {
584                if let Err(TrySendError::Full(_)) = sender1.try_send(i) {
585                    // If pipe is full, use blocking send
586                    sender1.blocking_send(i).unwrap();
587                }
588                thread::sleep(Duration::from_micros(100));
589            }
590        });
591
592        let sender2_handle = thread::spawn(move || {
593            for i in 1000..2000 {
594                sender2.blocking_send(i).unwrap();
595                thread::sleep(Duration::from_micros(50));
596            }
597        });
598
599        let sender3_handle = thread::spawn(move || {
600            for i in 2000..3000 {
601                // Rapid fire with try_send
602                for j in 0..10 {
603                    let msg = i * 10 + j;
604                    if msg >= 3000 {
605                        break; // Don't send messages outside the expected range
606                    }
607                    if let Err(TrySendError::Full(_)) = sender3.try_send(msg) {
608                        break;
609                    }
610                }
611                thread::sleep(Duration::from_millis(1));
612            }
613        });
614
615        // Wait for all threads to complete
616        sender1_handle.join().unwrap();
617        sender2_handle.join().unwrap();
618        sender3_handle.join().unwrap();
619
620        let received_messages = receiver_handle.join().unwrap();
621
622        // Verify we received a substantial number of messages
623        assert!(
624            received_messages.len() > 100,
625            "Should receive many messages, got {}",
626            received_messages.len()
627        );
628
629        // Verify no duplicates and all messages are in expected ranges
630        let mut sorted = received_messages.clone();
631        sorted.sort();
632        sorted.dedup();
633        assert_eq!(
634            sorted.len(),
635            received_messages.len(),
636            "No duplicates should exist"
637        );
638
639        // Check that messages are from expected ranges
640        for &msg in &received_messages {
641            assert!(msg < 3000, "Message {} is out of expected range", msg);
642        }
643    }
644
645    /// Torture test 2: Stress test with pipe buffer exhaustion and recovery
646    /// Tests the channel under extreme pressure with rapid send/receive cycles
647    #[test]
648    fn torture_test_pipe_buffer_stress() {
649        let (mut sender, mut receiver) = mpsc_fd_pair::<String>().unwrap();
650
651        let iterations = 1000; // Reduced for more reliable testing
652
653        // Track sent messages to ensure all are sent
654        let sent_count = Arc::new(Mutex::new(0));
655        let sent_count_clone = sent_count.clone();
656        let burst_count = Arc::new(Mutex::new(0));
657        let burst_count_clone = burst_count.clone();
658
659        // Receiver thread that switches between all three receive modes
660        let receiver_handle = thread::spawn(move || {
661            let mut local_received = Vec::new();
662            let mut consecutive_empty = 0;
663
664            // Keep receiving until sender disconnects
665            loop {
666                // Cycle through different receive modes
667                match local_received.len() % 3 {
668                    0 => {
669                        // Non-blocking mode
670                        match receiver.try_recv() {
671                            Ok(msg) => {
672                                local_received.push(msg);
673                                consecutive_empty = 0;
674                            }
675                            Err(TryRecvError::Empty) => {
676                                consecutive_empty += 1;
677                                if consecutive_empty > 100 {
678                                    // Switch to blocking mode if too many empty reads
679                                    match receiver.recv() {
680                                        Ok(msg) => {
681                                            local_received.push(msg);
682                                            consecutive_empty = 0;
683                                        }
684                                        Err(RecvError::Disconnected) => break,
685                                    }
686                                }
687                            }
688                            Err(TryRecvError::Disconnected) => break,
689                        }
690                    }
691                    1 => {
692                        // Timeout mode with varying timeouts
693                        let timeout = if local_received.len() % 100 == 0 {
694                            Duration::from_millis(10)
695                        } else {
696                            Duration::from_micros(100)
697                        };
698                        match receiver.recv_timeout(timeout) {
699                            Ok(msg) => {
700                                local_received.push(msg);
701                                consecutive_empty = 0;
702                            }
703                            Err(RecvTimeoutError::Timeout) => {
704                                consecutive_empty += 1;
705                            }
706                            Err(RecvTimeoutError::Disconnected) => break,
707                        }
708                    }
709                    2 => {
710                        // Blocking mode
711                        match receiver.recv() {
712                            Ok(msg) => {
713                                local_received.push(msg);
714                                consecutive_empty = 0;
715                            }
716                            Err(RecvError::Disconnected) => break,
717                        }
718                    }
719                    _ => unreachable!(),
720                }
721            }
722            local_received
723        });
724
725        // Sender thread that ensures all messages are sent
726        let sender_handle = thread::spawn(move || {
727            for i in 0..iterations {
728                let msg = format!("message-{}", i);
729
730                // Always use blocking send to ensure message is sent
731                match sender.blocking_send(msg) {
732                    Ok(_) => {
733                        *sent_count_clone.lock().unwrap() += 1;
734                    }
735                    Err(SendError::Disconnected(_)) => {
736                        break; // Receiver disconnected, stop sending
737                    }
738                }
739
740                // Occasionally create small bursts to stress the buffer
741                if i % 100 == 0 {
742                    for j in 0..5 {
743                        // Reduced burst size
744                        let burst_msg = format!("burst-{}-{}", i, j);
745                        if let Err(TrySendError::Full(_)) = sender.try_send(burst_msg) {
746                            // If pipe is full, skip this burst
747                            break;
748                        } else {
749                            *burst_count_clone.lock().unwrap() += 1;
750                        }
751                    }
752                }
753            }
754        });
755
756        sender_handle.join().unwrap();
757        let received_messages = receiver_handle.join().unwrap();
758        let final_sent_count = *sent_count.lock().unwrap();
759        let final_burst_count = *burst_count.lock().unwrap();
760        let total_sent = final_sent_count + final_burst_count;
761
762        // Verify all messages were sent
763        assert_eq!(
764            final_sent_count, iterations,
765            "All regular messages should be sent, sent: {}",
766            final_sent_count
767        );
768
769        // Verify we received all sent messages
770        assert_eq!(
771            received_messages.len(),
772            total_sent,
773            "Should receive all sent messages, sent: {}, received: {}",
774            total_sent,
775            received_messages.len()
776        );
777
778        // Verify message integrity - check that we have the expected message types
779        let mut message_count = 0;
780        for msg in received_messages.iter() {
781            if msg.starts_with("message-") {
782                message_count += 1;
783            }
784        }
785
786        // We should have received all regular messages
787        assert_eq!(
788            message_count, iterations,
789            "Should receive all regular messages, got {}",
790            message_count
791        );
792    }
793
794    /// Torture test 3: Concurrent access with multiple senders and single receiver
795    /// Tests race conditions and edge cases in multi-threaded scenarios
796    #[test]
797    fn torture_test_concurrent_access() {
798        let (sender, mut receiver) = mpsc_fd_pair::<u64>().unwrap();
799        let sender = Arc::new(Mutex::new(sender));
800
801        let num_senders = 5;
802        let messages_per_sender = 100; // Reduced for more reliable testing
803        let total_messages = num_senders * messages_per_sender;
804
805        // Track sent messages to ensure all are sent
806        let sent_count = Arc::new(Mutex::new(0));
807        let sent_count_clone = sent_count.clone();
808
809        // Spawn multiple sender threads
810        let sender_handles: Vec<_> = (0..num_senders)
811            .map(|sender_id| {
812                let sender = sender.clone();
813                let local_sent_count = sent_count_clone.clone();
814
815                thread::spawn(move || {
816                    let mut local_sender = sender.lock().unwrap().try_clone().unwrap();
817
818                    for i in 0..messages_per_sender {
819                        let msg = (sender_id as u64) * 10000 + i;
820
821                        // Always use blocking send to ensure message is sent
822                        match local_sender.blocking_send(msg) {
823                            Ok(_) => {
824                                *local_sent_count.lock().unwrap() += 1;
825                            }
826                            Err(SendError::Disconnected(_)) => {
827                                break; // Receiver disconnected, stop sending
828                            }
829                        }
830
831                        // Small delay to create interleaving
832                        thread::sleep(Duration::from_micros(10));
833                    }
834                })
835            })
836            .collect();
837
838        // Single receiver thread that switches between different receive modes
839        let receiver_handle = thread::spawn(move || {
840            let mut all_received = Vec::new();
841            let mut mode_counter = 0;
842            let start_time = Instant::now();
843
844            // Keep receiving until all senders disconnect or timeout
845            while start_time.elapsed() < Duration::from_secs(10) {
846                // Cycle through different receive modes
847                match mode_counter % 4 {
848                    0 => {
849                        // Non-blocking mode
850                        match receiver.try_recv() {
851                            Ok(msg) => all_received.push(msg),
852                            Err(TryRecvError::Empty) => {
853                                thread::sleep(Duration::from_micros(100));
854                            }
855                            Err(TryRecvError::Disconnected) => break,
856                        }
857                    }
858                    1 => {
859                        // Short timeout mode
860                        match receiver.recv_timeout(Duration::from_millis(1)) {
861                            Ok(msg) => all_received.push(msg),
862                            Err(RecvTimeoutError::Timeout) => {}
863                            Err(RecvTimeoutError::Disconnected) => break,
864                        }
865                    }
866                    2 => {
867                        // Longer timeout mode
868                        match receiver.recv_timeout(Duration::from_millis(10)) {
869                            Ok(msg) => all_received.push(msg),
870                            Err(RecvTimeoutError::Timeout) => {}
871                            Err(RecvTimeoutError::Disconnected) => break,
872                        }
873                    }
874                    3 => {
875                        // Blocking mode with short timeout
876                        match receiver.recv_timeout(Duration::from_millis(100)) {
877                            Ok(msg) => all_received.push(msg),
878                            Err(RecvTimeoutError::Timeout) => {}
879                            Err(RecvTimeoutError::Disconnected) => break,
880                        }
881                    }
882                    _ => unreachable!(),
883                }
884                mode_counter += 1;
885            }
886
887            all_received
888        });
889
890        // Wait for all threads to complete
891        for handle in sender_handles {
892            handle.join().unwrap();
893        }
894
895        let all_received = receiver_handle.join().unwrap();
896        let final_sent_count = *sent_count.lock().unwrap();
897
898        // Verify all messages were sent
899        assert_eq!(
900            final_sent_count, total_messages as usize,
901            "All messages should be sent, sent: {}",
902            final_sent_count
903        );
904
905        // Verify we received all sent messages
906        assert_eq!(
907            all_received.len(),
908            final_sent_count,
909            "Should receive all sent messages, sent: {}, received: {}",
910            final_sent_count,
911            all_received.len()
912        );
913
914        // Verify message ranges
915        for &msg in all_received.iter() {
916            let sender_id = msg / 10000;
917            let msg_id = msg % 10000;
918            assert!(
919                sender_id < num_senders as u64,
920                "Invalid sender ID: {}",
921                sender_id
922            );
923            assert!(
924                msg_id < messages_per_sender as u64,
925                "Invalid message ID: {}",
926                msg_id
927            );
928        }
929
930        // Check for duplicates
931        let mut sorted = all_received.clone();
932        sorted.sort();
933        sorted.dedup();
934        assert_eq!(
935            sorted.len(),
936            all_received.len(),
937            "No duplicates should exist"
938        );
939    }
940}