1use std::io::{self};
2use std::mem::MaybeUninit;
3use std::os::fd::BorrowedFd;
4use std::os::unix::io::{AsRawFd, RawFd};
5use std::sync::mpsc::{self, Receiver, Sender};
6use std::time::{Duration, Instant};
7
8use libc::FD_ZERO;
9
10#[derive(Debug, PartialEq, Eq, Clone, Copy)]
11pub enum SendError<T> {
12 Disconnected(T),
13}
14
15#[derive(Debug, PartialEq, Eq, Clone, Copy)]
16pub enum TrySendError<T> {
17 Full(T),
18 Disconnected(T),
19}
20
21impl<T> From<mpsc::SendError<T>> for SendError<T> {
22 fn from(err: mpsc::SendError<T>) -> Self {
23 SendError::Disconnected(err.0)
24 }
25}
26
27#[derive(Debug, PartialEq, Eq, Clone, Copy)]
28pub enum RecvError {
29 Disconnected,
30}
31
32impl From<mpsc::RecvError> for RecvError {
33 fn from(_: mpsc::RecvError) -> Self {
34 RecvError::Disconnected
35 }
36}
37
38#[derive(Debug, PartialEq, Eq, Clone, Copy)]
39pub enum TryRecvError {
40 Empty,
41 Disconnected,
42}
43
44impl From<mpsc::TryRecvError> for TryRecvError {
45 fn from(err: mpsc::TryRecvError) -> Self {
46 match err {
47 mpsc::TryRecvError::Empty => TryRecvError::Empty,
48 mpsc::TryRecvError::Disconnected => TryRecvError::Disconnected,
49 }
50 }
51}
52
53pub enum RecvTimeoutError {
54 Timeout,
55 Disconnected,
56}
57
58impl From<mpsc::RecvTimeoutError> for RecvTimeoutError {
59 fn from(err: mpsc::RecvTimeoutError) -> Self {
60 match err {
61 mpsc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
62 mpsc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
63 }
64 }
65}
66
67pub struct MpscFdSender<T> {
69 sender: Sender<T>,
70 pipe_write: std::io::PipeWriter,
71}
72
73pub struct MpscFdReceiver<T> {
75 receiver: Receiver<T>,
76 blocking: bool,
77 pipe_read: std::io::PipeReader,
78}
79
80pub fn mpsc_fd_pair<T>() -> io::Result<(MpscFdSender<T>, MpscFdReceiver<T>)> {
93 let (sender, receiver) = mpsc::channel();
94 let (pipe_read, pipe_write) = std::io::pipe()?;
95
96 set_nonblocking(pipe_write.as_raw_fd())?;
97 set_nonblocking(pipe_read.as_raw_fd())?;
98
99 Ok((
100 MpscFdSender { sender, pipe_write },
101 MpscFdReceiver {
102 receiver,
103 blocking: true,
104 pipe_read,
105 },
106 ))
107}
108
109fn set_nonblocking(fd: RawFd) -> io::Result<()> {
110 let flags = libc_fcntl(fd, libc::F_GETFL)?;
111 let _ = libc_fcntl_int(fd, libc::F_SETFL, flags | libc::O_NONBLOCK)?;
112 Ok(())
113}
114
115fn set_blocking(fd: RawFd) -> io::Result<()> {
116 let flags = libc_fcntl(fd, libc::F_GETFL)?;
117 let _ = libc_fcntl_int(fd, libc::F_SETFL, flags & !libc::O_NONBLOCK)?;
118 Ok(())
119}
120
121fn libc_fcntl(fd: RawFd, request: std::ffi::c_int) -> io::Result<std::ffi::c_int> {
122 loop {
123 let result = unsafe { libc::fcntl(fd, request) };
124 if result == -1 {
125 let err = io::Error::last_os_error();
126 if err.kind() == io::ErrorKind::Interrupted {
127 continue;
128 }
129 break Err(err);
130 }
131 break Ok(result);
132 }
133}
134
135fn libc_fcntl_int(fd: RawFd, request: std::ffi::c_int, arg: std::ffi::c_int) -> io::Result<()> {
136 loop {
137 let result = unsafe { libc::fcntl(fd, request, arg) };
138 if result == -1 {
139 let err = io::Error::last_os_error();
140 if err.kind() == io::ErrorKind::Interrupted {
141 continue;
142 }
143 break Err(err);
144 }
145 break Ok(());
146 }
147}
148
149fn libc_read(fd: RawFd, buf: &mut [u8]) -> io::Result<usize> {
150 loop {
151 let result = unsafe { libc::read(fd, buf.as_mut_ptr() as *mut _, buf.len()) };
152 if result <= -1 {
153 let err = io::Error::last_os_error();
154 if err.kind() == io::ErrorKind::Interrupted {
155 continue;
156 }
157 break Err(err);
158 }
159 break Ok(result as usize);
160 }
161}
162
163fn libc_write(fd: RawFd, buf: &[u8]) -> io::Result<usize> {
164 loop {
165 let result = unsafe { libc::write(fd, buf.as_ptr() as *const _, buf.len()) };
166 if result <= -1 {
167 let err = io::Error::last_os_error();
168 if err.kind() == io::ErrorKind::Interrupted {
169 continue;
170 }
171 break Err(err);
172 }
173 break Ok(result as usize);
174 }
175}
176
177struct FdSet {
178 fdset: MaybeUninit<libc::fd_set>,
179 max: RawFd,
180}
181
182impl FdSet {
183 fn new() -> Self {
184 let mut fdset = MaybeUninit::uninit();
185 unsafe {
186 FD_ZERO(fdset.as_mut_ptr());
187 }
188
189 Self { fdset, max: 0 }
190 }
191
192 unsafe fn copy(&self, fds: *mut libc::fd_set) {
193 unsafe {
194 libc::memcpy(
195 fds as *mut _,
196 self.fdset.as_ptr() as *const _,
197 std::mem::size_of::<libc::fd_set>(),
198 )
199 };
200 }
201
202 fn set(&mut self, fd: RawFd) {
203 self.max = self.max.max(fd + 1);
204 unsafe {
205 libc::FD_SET(fd, self.fdset.as_mut_ptr());
206 }
207 }
208
209 fn is_set(&self, fd: RawFd) -> bool {
210 unsafe { libc::FD_ISSET(fd, self.fdset.as_ptr()) }
211 }
212}
213
214fn libc_select(
217 read: Option<&FdSet>,
218 write: Option<&FdSet>,
219 error: Option<&FdSet>,
220 timeout: std::time::Duration,
221) -> io::Result<usize> {
222 let mut readfds = read.map(|_| MaybeUninit::uninit());
223 let read_ptr = readfds
224 .as_mut()
225 .map(|r| r.as_mut_ptr())
226 .unwrap_or(std::ptr::null_mut());
227 let mut writefds = write.map(|_| MaybeUninit::uninit());
228 let write_ptr = writefds
229 .as_mut()
230 .map(|w| w.as_mut_ptr())
231 .unwrap_or(std::ptr::null_mut());
232 let mut errorfds = error.map(|_| MaybeUninit::uninit());
233 let error_ptr = errorfds
234 .as_mut()
235 .map(|e| e.as_mut_ptr())
236 .unwrap_or(std::ptr::null_mut());
237 let max = read
238 .map(|r| r.max)
239 .max(write.map(|w| w.max))
240 .max(error.map(|e| e.max))
241 .unwrap_or(0);
242
243 loop {
244 unsafe {
245 if let Some(read) = read {
246 read.copy(read_ptr);
247 }
248 if let Some(write) = write {
249 write.copy(write_ptr);
250 }
251 if let Some(error) = error {
252 error.copy(error_ptr);
253 }
254
255 let mut timeout = libc::timeval {
256 tv_sec: timeout.as_secs() as _,
257 tv_usec: timeout.subsec_micros() as _,
258 };
259
260 let result = libc::select(max, read_ptr, write_ptr, error_ptr, &mut timeout);
261 if result <= -1 {
262 let err = io::Error::last_os_error();
263 if err.kind() == io::ErrorKind::Interrupted {
264 continue;
265 }
266 return Err(io::Error::last_os_error());
267 }
268 break Ok(result as usize);
269 }
270 }
271}
272
273impl<T> MpscFdSender<T> {
274 pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
277 let fd = self.pipe_write.as_raw_fd();
279 let buf = [0u8; 1];
280 loop {
281 let result = unsafe { libc::write(fd, buf.as_ptr() as *const _, 1) };
282 if result == -1 {
283 let err = io::Error::last_os_error();
284 if err.kind() == io::ErrorKind::WouldBlock {
285 return Err(TrySendError::Full(msg));
286 }
287 if err.kind() == io::ErrorKind::Interrupted {
288 continue;
289 }
290 }
291 if result != 1 {
292 return Err(TrySendError::Full(msg));
293 }
294
295 self.sender
298 .send(msg)
299 .map_err(|msg| TrySendError::Disconnected(msg.0))?;
300 break;
301 }
302
303 Ok(())
304 }
305
306 pub fn blocking_send(&mut self, msg: T) -> Result<(), SendError<T>> {
311 let fd = self.pipe_write.as_raw_fd();
312 let mut writefds = FdSet::new();
313 let mut errorfds = FdSet::new();
314
315 writefds.set(fd);
316 errorfds.set(fd);
317
318 loop {
319 match libc_select(
321 None,
322 Some(&writefds),
323 Some(&errorfds),
324 std::time::Duration::from_secs(60),
325 ) {
326 Ok(0) => continue,
327 Ok(_) => {}
328 Err(_) => return Err(SendError::Disconnected(msg)),
329 };
330
331 let buf = [0u8; 1];
335 match libc_write(fd, &buf) {
336 Ok(1) => {}
337 Ok(_) => unreachable!("Should only write 1 byte"),
338 Err(e) if e.kind() == io::ErrorKind::WouldBlock => continue,
339 Err(_) => return Err(SendError::Disconnected(msg)),
340 };
341
342 self.sender.send(msg)?;
343 break Ok(());
344 }
345 }
346
347 pub fn try_clone(&self) -> io::Result<Self> {
348 Ok(MpscFdSender {
349 sender: self.sender.clone(),
350 pipe_write: self.pipe_write.try_clone()?,
351 })
352 }
353}
354
355impl<T> MpscFdReceiver<T> {
356 pub fn recv(&mut self) -> Result<T, RecvError> {
358 let msg = self.receiver.recv()?;
359
360 if !matches!(
362 self.read_byte_nonblocking()
363 .map_err(|_| RecvError::Disconnected)?,
364 ReadByteResult::WouldBlock
365 ) {
366 return Ok(msg);
367 }
368
369 loop {
370 self.select_timeout(Duration::from_secs(60 * 60))
372 .map_err(|_| RecvError::Disconnected)?;
373 if matches!(
374 self.read_byte_nonblocking()
375 .map_err(|_| RecvError::Disconnected)?,
376 ReadByteResult::WouldBlock
377 ) {
378 continue;
379 }
380
381 break Ok(msg);
382 }
383 }
384
385 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
390 if matches!(self.read_byte_nonblocking(), Ok(ReadByteResult::Success)) {
392 return self.receiver.recv().map_err(|_| TryRecvError::Disconnected);
395 }
396
397 Ok(self.receiver.try_recv()?)
399 }
400
401 pub fn recv_timeout(&mut self, mut timeout: Duration) -> Result<T, RecvTimeoutError> {
407 loop {
408 let start = Instant::now();
409 if !self
410 .select_timeout(timeout)
411 .map_err(|_| RecvTimeoutError::Disconnected)?
412 {
413 return Err(RecvTimeoutError::Timeout);
414 }
415
416 match self.read_byte_nonblocking() {
417 Ok(ReadByteResult::Success) => {
418 break self
421 .receiver
422 .recv()
423 .map_err(|_| RecvTimeoutError::Disconnected);
424 }
425 Ok(ReadByteResult::Eof) => {
426 break Ok(self.receiver.recv_timeout(
428 timeout
429 .checked_sub(start.elapsed())
430 .unwrap_or(Duration::from_secs(0)),
431 )?);
432 }
433 Ok(ReadByteResult::WouldBlock) => {
434 timeout = timeout
435 .checked_sub(start.elapsed())
436 .unwrap_or(Duration::from_secs(0));
437 if timeout == Duration::from_secs(0) {
438 return Err(RecvTimeoutError::Timeout);
439 }
440 }
441 Err(_) => return Err(RecvTimeoutError::Disconnected),
442 }
443 }
444 }
445
446 fn select_timeout(&mut self, timeout: Duration) -> Result<bool, TryRecvError> {
447 let mut readfds = FdSet::new();
448 let mut errorfds = FdSet::new();
449
450 let fd = self.pipe_read.as_raw_fd();
451 readfds.set(fd);
452 errorfds.set(fd);
453
454 match libc_select(Some(&readfds), None, Some(&errorfds), timeout) {
455 Ok(0) => Ok(false),
456 Ok(_) => Ok(true),
457 Err(_) => Err(TryRecvError::Disconnected),
458 }
459 }
460
461 fn read_byte_nonblocking(&mut self) -> io::Result<ReadByteResult> {
462 let fd = self.pipe_read.as_raw_fd();
464 let mut buf = [0u8; 1];
465 Ok(match libc_read(fd, &mut buf) {
466 Ok(1) => ReadByteResult::Success,
468 Ok(0) => ReadByteResult::Eof,
469 Err(e) if e.kind() == io::ErrorKind::BrokenPipe => ReadByteResult::Eof,
470
471 Ok(_) => unreachable!("Should only read at most 1 byte"),
472 Err(e) if e.kind() == io::ErrorKind::WouldBlock => ReadByteResult::WouldBlock,
473 Err(e) => return Err(e),
474 })
475 }
476}
477
478enum ReadByteResult {
479 Success,
480 Eof,
481 WouldBlock,
482}
483
484#[cfg(unix)]
485impl<T> AsRawFd for MpscFdReceiver<T> {
486 fn as_raw_fd(&self) -> RawFd {
487 self.pipe_read.as_raw_fd()
488 }
489}
490
491#[cfg(unix)]
492impl<T> AsRawFd for MpscFdSender<T> {
493 fn as_raw_fd(&self) -> RawFd {
494 self.pipe_write.as_raw_fd()
495 }
496}
497
498#[cfg(unix)]
499impl<'a, T> Into<BorrowedFd<'a>> for &'a MpscFdReceiver<T> {
500 fn into(self) -> BorrowedFd<'a> {
501 unsafe { BorrowedFd::borrow_raw(self.pipe_read.as_raw_fd() as _) }
502 }
503}
504
505#[cfg(unix)]
506impl<'a, T> Into<BorrowedFd<'a>> for &'a MpscFdSender<T> {
507 fn into(self) -> BorrowedFd<'a> {
508 unsafe { BorrowedFd::borrow_raw(self.pipe_write.as_raw_fd() as _) }
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use std::sync::{Arc, Mutex};
516 use std::thread;
517 use std::time::{Duration, Instant};
518
519 #[test]
520 fn test_mpsc_fd_pair() {
521 let (mut sender, mut receiver) = mpsc_fd_pair::<u32>().unwrap();
522
523 sender.blocking_send(1).unwrap();
524 assert_eq!(receiver.recv().unwrap(), 1);
525
526 sender.blocking_send(2).unwrap();
527 assert_eq!(receiver.recv().unwrap(), 2);
528
529 sender.blocking_send(3).unwrap();
530 assert_eq!(receiver.recv().unwrap(), 3);
531
532 drop(sender);
533 assert_eq!(receiver.recv().unwrap_err(), RecvError::Disconnected);
534 }
535
536 #[test]
539 fn torture_test_rapid_mode_switching() {
540 let (mut sender1, mut receiver) = mpsc_fd_pair::<u32>().unwrap();
541 let mut sender2 = sender1.try_clone().unwrap();
542 let mut sender3 = sender1.try_clone().unwrap();
543
544 let start_time = Instant::now();
545
546 let receiver_handle = thread::spawn(move || {
548 let mut local_received = Vec::new();
549 let mut mode_switch_count = 0;
550
551 while start_time.elapsed() < Duration::from_secs(5) {
552 if mode_switch_count % 10 == 0 {
554 loop {
556 match receiver.try_recv() {
557 Ok(msg) => local_received.push(msg),
558 Err(TryRecvError::Empty) => break,
559 Err(TryRecvError::Disconnected) => return local_received,
560 }
561 }
562 } else if mode_switch_count % 10 == 5 {
563 match receiver.recv_timeout(Duration::from_millis(1)) {
565 Ok(msg) => local_received.push(msg),
566 Err(RecvTimeoutError::Timeout) => {}
567 Err(RecvTimeoutError::Disconnected) => return local_received,
568 }
569 } else {
570 match receiver.recv() {
572 Ok(msg) => local_received.push(msg),
573 Err(RecvError::Disconnected) => return local_received,
574 }
575 }
576 mode_switch_count += 1;
577 }
578 local_received
579 });
580
581 let sender1_handle = thread::spawn(move || {
583 for i in 0..1000 {
584 if let Err(TrySendError::Full(_)) = sender1.try_send(i) {
585 sender1.blocking_send(i).unwrap();
587 }
588 thread::sleep(Duration::from_micros(100));
589 }
590 });
591
592 let sender2_handle = thread::spawn(move || {
593 for i in 1000..2000 {
594 sender2.blocking_send(i).unwrap();
595 thread::sleep(Duration::from_micros(50));
596 }
597 });
598
599 let sender3_handle = thread::spawn(move || {
600 for i in 2000..3000 {
601 for j in 0..10 {
603 let msg = i * 10 + j;
604 if msg >= 3000 {
605 break; }
607 if let Err(TrySendError::Full(_)) = sender3.try_send(msg) {
608 break;
609 }
610 }
611 thread::sleep(Duration::from_millis(1));
612 }
613 });
614
615 sender1_handle.join().unwrap();
617 sender2_handle.join().unwrap();
618 sender3_handle.join().unwrap();
619
620 let received_messages = receiver_handle.join().unwrap();
621
622 assert!(
624 received_messages.len() > 100,
625 "Should receive many messages, got {}",
626 received_messages.len()
627 );
628
629 let mut sorted = received_messages.clone();
631 sorted.sort();
632 sorted.dedup();
633 assert_eq!(
634 sorted.len(),
635 received_messages.len(),
636 "No duplicates should exist"
637 );
638
639 for &msg in &received_messages {
641 assert!(msg < 3000, "Message {} is out of expected range", msg);
642 }
643 }
644
645 #[test]
648 fn torture_test_pipe_buffer_stress() {
649 let (mut sender, mut receiver) = mpsc_fd_pair::<String>().unwrap();
650
651 let iterations = 1000; let sent_count = Arc::new(Mutex::new(0));
655 let sent_count_clone = sent_count.clone();
656 let burst_count = Arc::new(Mutex::new(0));
657 let burst_count_clone = burst_count.clone();
658
659 let receiver_handle = thread::spawn(move || {
661 let mut local_received = Vec::new();
662 let mut consecutive_empty = 0;
663
664 loop {
666 match local_received.len() % 3 {
668 0 => {
669 match receiver.try_recv() {
671 Ok(msg) => {
672 local_received.push(msg);
673 consecutive_empty = 0;
674 }
675 Err(TryRecvError::Empty) => {
676 consecutive_empty += 1;
677 if consecutive_empty > 100 {
678 match receiver.recv() {
680 Ok(msg) => {
681 local_received.push(msg);
682 consecutive_empty = 0;
683 }
684 Err(RecvError::Disconnected) => break,
685 }
686 }
687 }
688 Err(TryRecvError::Disconnected) => break,
689 }
690 }
691 1 => {
692 let timeout = if local_received.len() % 100 == 0 {
694 Duration::from_millis(10)
695 } else {
696 Duration::from_micros(100)
697 };
698 match receiver.recv_timeout(timeout) {
699 Ok(msg) => {
700 local_received.push(msg);
701 consecutive_empty = 0;
702 }
703 Err(RecvTimeoutError::Timeout) => {
704 consecutive_empty += 1;
705 }
706 Err(RecvTimeoutError::Disconnected) => break,
707 }
708 }
709 2 => {
710 match receiver.recv() {
712 Ok(msg) => {
713 local_received.push(msg);
714 consecutive_empty = 0;
715 }
716 Err(RecvError::Disconnected) => break,
717 }
718 }
719 _ => unreachable!(),
720 }
721 }
722 local_received
723 });
724
725 let sender_handle = thread::spawn(move || {
727 for i in 0..iterations {
728 let msg = format!("message-{}", i);
729
730 match sender.blocking_send(msg) {
732 Ok(_) => {
733 *sent_count_clone.lock().unwrap() += 1;
734 }
735 Err(SendError::Disconnected(_)) => {
736 break; }
738 }
739
740 if i % 100 == 0 {
742 for j in 0..5 {
743 let burst_msg = format!("burst-{}-{}", i, j);
745 if let Err(TrySendError::Full(_)) = sender.try_send(burst_msg) {
746 break;
748 } else {
749 *burst_count_clone.lock().unwrap() += 1;
750 }
751 }
752 }
753 }
754 });
755
756 sender_handle.join().unwrap();
757 let received_messages = receiver_handle.join().unwrap();
758 let final_sent_count = *sent_count.lock().unwrap();
759 let final_burst_count = *burst_count.lock().unwrap();
760 let total_sent = final_sent_count + final_burst_count;
761
762 assert_eq!(
764 final_sent_count, iterations,
765 "All regular messages should be sent, sent: {}",
766 final_sent_count
767 );
768
769 assert_eq!(
771 received_messages.len(),
772 total_sent,
773 "Should receive all sent messages, sent: {}, received: {}",
774 total_sent,
775 received_messages.len()
776 );
777
778 let mut message_count = 0;
780 for msg in received_messages.iter() {
781 if msg.starts_with("message-") {
782 message_count += 1;
783 }
784 }
785
786 assert_eq!(
788 message_count, iterations,
789 "Should receive all regular messages, got {}",
790 message_count
791 );
792 }
793
794 #[test]
797 fn torture_test_concurrent_access() {
798 let (sender, mut receiver) = mpsc_fd_pair::<u64>().unwrap();
799 let sender = Arc::new(Mutex::new(sender));
800
801 let num_senders = 5;
802 let messages_per_sender = 100; let total_messages = num_senders * messages_per_sender;
804
805 let sent_count = Arc::new(Mutex::new(0));
807 let sent_count_clone = sent_count.clone();
808
809 let sender_handles: Vec<_> = (0..num_senders)
811 .map(|sender_id| {
812 let sender = sender.clone();
813 let local_sent_count = sent_count_clone.clone();
814
815 thread::spawn(move || {
816 let mut local_sender = sender.lock().unwrap().try_clone().unwrap();
817
818 for i in 0..messages_per_sender {
819 let msg = (sender_id as u64) * 10000 + i;
820
821 match local_sender.blocking_send(msg) {
823 Ok(_) => {
824 *local_sent_count.lock().unwrap() += 1;
825 }
826 Err(SendError::Disconnected(_)) => {
827 break; }
829 }
830
831 thread::sleep(Duration::from_micros(10));
833 }
834 })
835 })
836 .collect();
837
838 let receiver_handle = thread::spawn(move || {
840 let mut all_received = Vec::new();
841 let mut mode_counter = 0;
842 let start_time = Instant::now();
843
844 while start_time.elapsed() < Duration::from_secs(10) {
846 match mode_counter % 4 {
848 0 => {
849 match receiver.try_recv() {
851 Ok(msg) => all_received.push(msg),
852 Err(TryRecvError::Empty) => {
853 thread::sleep(Duration::from_micros(100));
854 }
855 Err(TryRecvError::Disconnected) => break,
856 }
857 }
858 1 => {
859 match receiver.recv_timeout(Duration::from_millis(1)) {
861 Ok(msg) => all_received.push(msg),
862 Err(RecvTimeoutError::Timeout) => {}
863 Err(RecvTimeoutError::Disconnected) => break,
864 }
865 }
866 2 => {
867 match receiver.recv_timeout(Duration::from_millis(10)) {
869 Ok(msg) => all_received.push(msg),
870 Err(RecvTimeoutError::Timeout) => {}
871 Err(RecvTimeoutError::Disconnected) => break,
872 }
873 }
874 3 => {
875 match receiver.recv_timeout(Duration::from_millis(100)) {
877 Ok(msg) => all_received.push(msg),
878 Err(RecvTimeoutError::Timeout) => {}
879 Err(RecvTimeoutError::Disconnected) => break,
880 }
881 }
882 _ => unreachable!(),
883 }
884 mode_counter += 1;
885 }
886
887 all_received
888 });
889
890 for handle in sender_handles {
892 handle.join().unwrap();
893 }
894
895 let all_received = receiver_handle.join().unwrap();
896 let final_sent_count = *sent_count.lock().unwrap();
897
898 assert_eq!(
900 final_sent_count, total_messages as usize,
901 "All messages should be sent, sent: {}",
902 final_sent_count
903 );
904
905 assert_eq!(
907 all_received.len(),
908 final_sent_count,
909 "Should receive all sent messages, sent: {}, received: {}",
910 final_sent_count,
911 all_received.len()
912 );
913
914 for &msg in all_received.iter() {
916 let sender_id = msg / 10000;
917 let msg_id = msg % 10000;
918 assert!(
919 sender_id < num_senders as u64,
920 "Invalid sender ID: {}",
921 sender_id
922 );
923 assert!(
924 msg_id < messages_per_sender as u64,
925 "Invalid message ID: {}",
926 msg_id
927 );
928 }
929
930 let mut sorted = all_received.clone();
932 sorted.sort();
933 sorted.dedup();
934 assert_eq!(
935 sorted.len(),
936 all_received.len(),
937 "No duplicates should exist"
938 );
939 }
940}