mio_byte_fifo/
lib.rs

1//! Concurrent non-blocking byte FIFO buffer intended for use in [`Mio`] poll
2//!
3//! # Simple example
4//!
5//! ```rust
6//! # extern crate mio_byte_fifo;
7//! 
8//! use std::io::{Read, Write};
9//! 
10//! # fn main() {
11//! let (mut producer, mut consumer) = mio_byte_fifo::create(16);
12//! 
13//! let data = [0, 1, 254, 255];
14//! let n = producer.write(&data).unwrap();
15//! println!("written {} bytes: {:?}", n, data);
16//! 
17//! let mut buf = [0; 8];
18//! let n = consumer.read(&mut buf).unwrap();
19//! println!("read    {} bytes: {:?}", n, &buf[0..n]);
20//! 
21//! assert_eq!(data, buf[0..n]);
22//! # }
23//! ```
24//!
25//! # More complicated example
26//!
27//! ```rust
28//! # extern crate mio;
29//! # extern crate mio_byte_fifo;
30//! 
31//! use std::io::{Read, Write, ErrorKind};
32//! use std::thread;
33//! 
34//! use mio::{Poll, Events, Token, Ready, PollOpt};
35//! 
36//! use mio_byte_fifo::{Producer, Consumer};
37//! 
38//! 
39//! # fn main() {
40//! const FIFO_SIZE: usize = 16;
41//! const READ_BUF_SIZE: usize = 7;
42//! const EVENTS_CAPACITY: usize = 4;
43//! 
44//! let (mut producer, mut consumer) = mio_byte_fifo::create(FIFO_SIZE);
45//! let message = "The quick brown fox jumps over the lazy dog";
46//! 
47//! println!("sending message: '{}'", message);
48//! 
49//! let producer_thread = thread::spawn(move || {
50//!     let poll = Poll::new().unwrap();
51//!     let mut events = Events::with_capacity(EVENTS_CAPACITY);
52//!     let data = message.as_bytes();
53//!     let mut pos = 0;
54//! 
55//!     let write_data_part = |producer: &mut Producer, pos: &mut usize| {
56//!         loop {
57//!             match producer.write(&data[*pos..]) {
58//!                 Ok(n) => {
59//!                     println!(
60//!                         "sent     {} bytes: '{}'", n,
61//!                         std::str::from_utf8(&data[*pos..(*pos + n)]).unwrap()
62//!                     );
63//!                     *pos += n;
64//!                     if *pos >= data.len() {
65//!                         break false;
66//!                     }
67//!                 },
68//!                 Err(err) => match err.kind() {
69//!                     ErrorKind::WouldBlock => break true,
70//!                     _ => panic!("{:?}", err),
71//!                 }
72//!             }
73//!         }
74//!     };
75//! 
76//!     poll.register(&producer, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
77//!     
78//!     if !write_data_part(&mut producer, &mut pos) {
79//!         return;
80//!     }
81//! 
82//!     'outer: loop {
83//!         poll.poll(&mut events, None).unwrap();
84//! 
85//!         for event in events.iter() {
86//!             assert_eq!(event.token(), Token(0));
87//!             assert!(event.readiness().is_writable());
88//! 
89//!             if !write_data_part(&mut producer, &mut pos) {
90//!                 break 'outer;
91//!             }
92//!         }
93//!     }
94//! });
95//! 
96//! let consumer_thread = thread::spawn(move || {
97//!     let poll = Poll::new().unwrap();
98//!     let mut events = Events::with_capacity(EVENTS_CAPACITY);
99//!     let mut data = String::new();
100//!     let mut buf = [0; READ_BUF_SIZE];
101//! 
102//!     let mut read_data_part = |consumer: &mut Consumer, data: &mut String| {
103//!         loop {
104//!             match consumer.read(&mut buf) {
105//!                 Ok(n) => {
106//!                     let str_part = std::str::from_utf8(&buf[0..n]).unwrap();
107//!                     println!("received {} bytes: '{}'", n, str_part);
108//!                     data.push_str(str_part);
109//!                 },
110//!                 Err(err) => {
111//!                     match err.kind() {
112//!                         ErrorKind::BrokenPipe => break false,
113//!                         ErrorKind::WouldBlock => break true,
114//!                         _ => panic!("{:?}", err),
115//!                     }
116//!                 }
117//!             }
118//!         }
119//!     };
120//! 
121//!     poll.register(&consumer, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
122//!     
123//!     'outer: loop {
124//!         poll.poll(&mut events, None).unwrap();
125//! 
126//!         for event in events.iter() {
127//!             assert_eq!(event.token(), Token(0));
128//!             assert!(event.readiness().is_readable());
129//!             if !read_data_part(&mut consumer, &mut data) {
130//!                 break 'outer;
131//!             }
132//!         }
133//!     }
134//!     data
135//! });
136//! 
137//! producer_thread.join().unwrap();
138//! let received_message = consumer_thread.join().unwrap();
139//! 
140//! println!("received message: '{}'", received_message);
141//! assert_eq!(message, received_message);
142//! # }
143//! ```
144//!
145//! [`Mio`]: https://docs.rs/mio/
146//!
147
148extern crate mio;
149extern crate ringbuf;
150
151
152use std::io::{Write, Read, Error, ErrorKind};
153use std::sync::{Arc, atomic::{fence, AtomicBool, Ordering}};
154
155use mio::{Evented, Poll, Token, Ready, PollOpt, Registration, SetReadiness};
156
157use ringbuf::{
158    RingBuffer,
159    Producer as RbProducer, Consumer as RbConsumer,
160    PushSliceError, PopSliceError,
161    WriteIntoError, ReadFromError,
162};
163
164#[derive(Debug)]
165pub enum TransmitError {
166    This(Error),
167    Other(Error),
168}
169
170pub struct Producer {
171    reg: Registration,
172    src: SetReadiness,
173    rbp: RbProducer<u8>,
174    cls: Arc<AtomicBool>,
175}
176
177pub struct Consumer {
178    reg: Registration,
179    srp: SetReadiness,
180    rbc: RbConsumer<u8>,
181    cls: Arc<AtomicBool>,
182}
183
184pub fn create(capacity: usize) -> (Producer, Consumer) {
185    let flag = Arc::new(AtomicBool::new(true));
186
187    let rb = RingBuffer::<u8>::new(capacity);
188
189    let (regp, srp) = Registration::new2();
190    let (regc, src) = Registration::new2();
191
192    let (rbp, rbc) = rb.split();
193
194    let prod = Producer { reg: regp, src, rbp, cls: flag.clone() };
195    let cons = Consumer { reg: regc, srp, rbc, cls: flag };
196
197    (prod, cons)
198}
199
200impl Evented for Producer {
201    fn register(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
202        poll.register(&self.reg, token, interest, poll_opt)
203    }
204
205    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
206        poll.reregister(&self.reg, token, interest, poll_opt)
207    }
208
209    fn deregister(&self, poll: &Poll) -> Result<(), Error> {
210        poll.deregister(&self.reg)
211    }
212}
213
214impl Evented for Consumer {
215    fn register(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
216        poll.register(&self.reg, token, interest, poll_opt)
217    }
218
219    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
220        poll.reregister(&self.reg, token, interest, poll_opt)
221    }
222
223    fn deregister(&self, poll: &Poll) -> Result<(), Error> {
224        poll.deregister(&self.reg)
225    }
226}
227
228impl Drop for Producer {
229    fn drop(&mut self) {
230        self.cls.store(false, Ordering::SeqCst);
231        self.src.set_readiness(Ready::all()).unwrap();
232        fence(Ordering::SeqCst);
233    }
234}
235
236impl Drop for Consumer {
237    fn drop(&mut self) {
238        self.cls.store(false, Ordering::SeqCst);
239        self.srp.set_readiness(Ready::all()).unwrap();
240        fence(Ordering::SeqCst);
241    }
242}
243
244impl Write for Producer {
245    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
246        if !self.cls.load(Ordering::SeqCst) {
247            return Err(Error::new(
248                ErrorKind::BrokenPipe,
249                "Consumer was closed",
250            ))
251        }
252
253        let empty = self.rbp.is_empty();
254        match self.rbp.push_slice(buf) {
255            Ok(num) => {
256                if num > 0 && empty {
257                    let res = self.src.set_readiness(Ready::readable());
258                    fence(Ordering::SeqCst);
259                    res
260                } else {
261                    Ok(())
262                }.and(Ok(num))
263            },
264            Err(err) => match err {
265                PushSliceError::Full => Err(Error::new(
266                    ErrorKind::WouldBlock,
267                    "Ring buffer is full",
268                )),
269            }
270        }
271    }
272
273    fn flush(&mut self) -> Result<(), Error> {
274        Ok(())
275    }
276}
277
278impl Read for Consumer {
279    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
280        let full = self.rbc.is_full();
281        match self.rbc.pop_slice(buf) {
282            Ok(num) => {
283                if num > 0 && full {
284                    let res = self.srp.set_readiness(Ready::writable());
285                    fence(Ordering::SeqCst);
286                    res
287                } else {
288                    Ok(())
289                }.and(Ok(num))
290            },
291            Err(err) => match err {
292                PopSliceError::Empty => Err({
293                    if !self.cls.load(Ordering::SeqCst) {
294                        Error::new(
295                            ErrorKind::BrokenPipe,
296                            "Producer was closed",
297                        )
298                    } else {
299                        Error::new(
300                            ErrorKind::WouldBlock,
301                            "Ring buffer is empty",
302                        )
303                    }
304                }),
305            }
306        }
307    }
308}
309
310pub trait WriteTransmit {
311    fn write_transmit(&mut self, other: &mut dyn Read, count: Option<usize>)
312    -> Result<usize, TransmitError>;
313}
314
315pub trait ReadTransmit {
316    fn read_transmit(&mut self, other: &mut dyn Write, count: Option<usize>)
317    -> Result<usize, TransmitError>;
318}
319
320impl WriteTransmit for Producer {
321    fn write_transmit(&mut self, other: &mut dyn Read, count: Option<usize>)
322    -> Result<usize, TransmitError> {
323        if !self.cls.load(Ordering::SeqCst) {
324            return Err(TransmitError::This(Error::new(
325                ErrorKind::BrokenPipe, "Consumer was closed",
326            )))
327        }
328
329        let empty = self.rbp.is_empty();
330        match self.rbp.read_from(other, count) {
331            Ok(num) => {
332                if num > 0 && empty {
333                    let res = self.src.set_readiness(Ready::readable());
334                    fence(Ordering::SeqCst);
335                    res
336                } else {
337                    Ok(())
338                }.and(Ok(num)).or_else(|e| {
339                    Err(TransmitError::This(e))
340                })
341            },
342            Err(err) => Err(match err {
343                ReadFromError::Read(e) => TransmitError::Other(e),
344                ReadFromError::RbFull => TransmitError::This(Error::new(
345                    ErrorKind::WouldBlock, "Ring buffer is full",
346                )),
347            }),
348        }
349    }
350}
351
352impl ReadTransmit for Consumer {
353    fn read_transmit(&mut self, other: &mut dyn Write, count: Option<usize>)
354    -> Result<usize, TransmitError> {
355        let full = self.rbc.is_full();
356        match self.rbc.write_into(other, count) {
357            Ok(num) => {
358                if num > 0 && full {
359                    let res = self.srp.set_readiness(Ready::writable());
360                    fence(Ordering::SeqCst);
361                    res
362                } else {
363                    Ok(())
364                }.and(Ok(num)).or_else(|e| {
365                    Err(TransmitError::This(e))
366                })
367            },
368            Err(err) => Err(match err {
369                WriteIntoError::Write(e) => TransmitError::Other(e),
370                WriteIntoError::RbEmpty => TransmitError::This({
371                    if !self.cls.load(Ordering::SeqCst) {
372                        Error::new(
373                            ErrorKind::BrokenPipe, "Producer was closed",
374                        )
375                    } else {
376                        Error::new(
377                            ErrorKind::WouldBlock, "Ring buffer is empty",
378                        )
379                    }
380                }),
381            }),
382        }
383    }
384}
385
386
387#[cfg(test)]
388mod test {
389    use super::*;
390
391    use std::thread;
392    use std::time::{Duration};
393
394    use mio::{Events};
395
396
397    #[test]
398    fn reg_set_r() {
399        let (reg, sr) = Registration::new2();
400        let poll = Poll::new().unwrap();
401        let mut events = Events::with_capacity(16);
402
403        let jh = thread::spawn(move || {
404            thread::sleep(Duration::from_millis(10));
405            sr.set_readiness(Ready::readable()).unwrap();
406        });
407
408        poll.register(&reg, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
409
410        poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
411        let mut hdl = false;
412        for e in events.iter() {
413            assert_eq!(e.token().0, 0);
414            assert!(e.readiness().is_readable());
415            hdl = true;
416        }
417        assert!(hdl);
418
419        poll.deregister(&reg).unwrap();
420
421        jh.join().unwrap();
422    }
423
424    #[test]
425    fn reg_set_w() {
426        let (reg, sr) = Registration::new2();
427        let poll = Poll::new().unwrap();
428        let mut events = Events::with_capacity(16);
429
430        let jh = thread::spawn(move || {
431            thread::sleep(Duration::from_millis(10));
432            sr.set_readiness(Ready::writable()).unwrap();
433        });
434
435        poll.register(&reg, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
436
437        poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
438        let mut hdl = false;
439        for e in events.iter() {
440            assert_eq!(e.token().0, 0);
441            assert!(e.readiness().is_writable());
442            hdl = true;
443        }
444        assert!(hdl);
445
446        poll.deregister(&reg).unwrap();
447
448        jh.join().unwrap();
449    }
450
451    #[test]
452    fn reg_set_twice() {
453        let (reg, sr) = Registration::new2();
454        let poll = Poll::new().unwrap();
455        let mut events = Events::with_capacity(16);
456
457        let jh = thread::spawn(move || {
458            thread::sleep(Duration::from_millis(10));
459            sr.set_readiness(Ready::readable()).unwrap();
460
461            thread::sleep(Duration::from_millis(10));
462            sr.set_readiness(Ready::readable()).unwrap();
463        });
464
465        poll.register(&reg, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
466
467        poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
468        let mut hdl = false;
469        for e in events.iter() {
470            assert_eq!(e.token().0, 0);
471            assert!(e.readiness().is_readable());
472            hdl = true;
473        }
474        assert!(hdl);
475
476        poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
477        let mut hdl = false;
478        for e in events.iter() {
479            assert_eq!(e.token().0, 0);
480            assert!(e.readiness().is_readable());
481            hdl = true;
482        }
483        assert!(hdl);
484
485        poll.deregister(&reg).unwrap();
486
487        jh.join().unwrap();
488    }
489
490    #[test]
491    fn reg_drop() {
492        let (reg, sr) = Registration::new2();
493
494        let jh = thread::spawn(move || {
495            let _ = reg;
496        });
497
498        thread::sleep(Duration::from_millis(10));
499        sr.set_readiness(Ready::readable()).unwrap();
500
501        jh.join().unwrap();
502    }
503
504    #[test]
505    fn dereg_reg() {
506        let (reg, sr) = Registration::new2();
507        let poll = Poll::new().unwrap();
508        let mut events = Events::with_capacity(16);
509
510        let jh = thread::spawn(move || {
511            thread::sleep(Duration::from_millis(10));
512            sr.set_readiness(Ready::readable()).unwrap();
513
514            thread::sleep(Duration::from_millis(10));
515            sr.set_readiness(Ready::readable()).unwrap();
516        });
517
518        poll.register(&reg, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
519
520        poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
521        let mut hdl = false;
522        for e in events.iter() {
523            assert_eq!(e.token().0, 0);
524            assert!(e.readiness().is_readable());
525            hdl = true;
526        }
527        assert!(hdl);
528
529        poll.deregister(&reg).unwrap();
530
531        poll.register(&reg, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
532
533        poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
534        let mut hdl = false;
535        for e in events.iter() {
536            assert_eq!(e.token().0, 1);
537            assert!(e.readiness().is_readable());
538            hdl = true;
539        }
540        assert!(hdl);
541
542        poll.deregister(&reg).unwrap();
543
544        jh.join().unwrap();
545    }
546    
547    #[test]
548    fn write_read() {
549        let (mut p, mut c) = create(16);
550
551        assert_eq!(p.write(b"abcdef").unwrap(), 6);
552
553        let mut buf = [0; 6];
554        assert_eq!(c.read(&mut buf).unwrap(), 6);
555        assert_eq!(&buf, b"abcdef");
556    }
557
558    #[test]
559    fn write_read_concat() {
560        let (mut p, mut c) = create(16);
561
562        assert_eq!(p.write(b"abc").unwrap(), 3);
563        assert_eq!(p.write(b"def").unwrap(), 3);
564
565        let mut buf = [0; 6];
566        assert_eq!(c.read(&mut buf).unwrap(), 6);
567        assert_eq!(&buf, b"abcdef");
568    }
569
570    #[test]
571    fn write_read_split() {
572        let (mut p, mut c) = create(16);
573
574        assert_eq!(p.write(b"abcdef").unwrap(), 6);
575
576        let mut buf = [0; 3];
577        assert_eq!(c.read(&mut buf).unwrap(), 3);
578        assert_eq!(&buf, b"abc");
579        assert_eq!(c.read(&mut buf).unwrap(), 3);
580        assert_eq!(&buf, b"def");
581    }
582
583    #[test]
584    fn write_read_empty() {
585        let (mut p, mut c) = create(16);
586
587        let mut buf = [0; 6];
588        
589        assert_eq!(p.write(b"abc").unwrap(), 3);
590        assert_eq!(c.read(&mut buf).unwrap(), 3);
591        assert_eq!(&buf, b"abc\0\0\0");
592
593        assert_eq!(p.write(b"def").unwrap(), 3);
594        assert_eq!(c.read(&mut buf).unwrap(), 3);
595        assert_eq!(&buf, b"def\0\0\0");
596    }
597
598    #[test]
599    fn write_read_full() {
600        let (mut p, mut c) = create(8);
601
602        let range: Vec<u8> = (0..8).collect();
603        let mut buf = [0; 6];
604        
605        assert_eq!(p.write(&range).unwrap(), 8);
606
607        assert_eq!(c.read(&mut buf[0..3]).unwrap(), 3);
608        assert_eq!(&buf[0..3], &[0,1,2]);
609
610        assert_eq!(p.write(b"abcdef").unwrap(), 3);
611
612        assert_eq!(c.read(&mut buf[0..3]).unwrap(), 3);
613        assert_eq!(&buf[0..3], &[3,4,5]);
614
615        assert_eq!(c.read(&mut buf).unwrap(), 5);
616        assert_eq!(&buf[0..5], &[6,7,b'a',b'b',b'c']);
617    }
618
619    #[test]
620    fn read_block() {
621        let (_p, mut c) = create(16);
622
623        let mut buf = [0; 4];
624        match c.read(&mut buf) {
625            Ok(_) => panic!(),
626            Err(err) => {
627                assert_eq!(err.kind(), ErrorKind::WouldBlock);
628                assert_eq!(err.get_ref().unwrap().description(), "Ring buffer is empty");
629            }
630        }
631    }
632
633    #[test]
634    fn write_block() {
635        const SIZE: usize = 16;
636        let (mut p, _c) = create(SIZE);
637
638        assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
639        match p.write(b"abc") {
640            Ok(_) => panic!(),
641            Err(err) => {
642                assert_eq!(err.kind(), ErrorKind::WouldBlock);
643                assert_eq!(err.get_ref().unwrap().description(), "Ring buffer is full");
644            }
645        }
646    }
647
648    #[test]
649    fn write_read_transmit() {
650        let (mut p, mut c) = create(16);
651
652        assert_eq!(p.write_transmit(&mut (&b"abcdef"[..]), None).unwrap(), 6);
653
654        let mut buf = vec!();
655        assert_eq!(c.read_transmit(&mut buf, None).unwrap(), 6);
656        assert_eq!(&buf, b"abcdef");
657    }
658
659    #[test]
660    fn write_read_transmit_concat() {
661        let (mut p, mut c) = create(16);
662
663        assert_eq!(p.write_transmit(&mut (&b"abc"[..]), None).unwrap(), 3);
664        assert_eq!(p.write_transmit(&mut (&b"def"[..]), None).unwrap(), 3);
665
666        let mut buf = vec!();
667        assert_eq!(c.read_transmit(&mut buf, None).unwrap(), 6);
668        assert_eq!(&buf, b"abcdef");
669    }
670
671    #[test]
672    fn write_read_transmit_split() {
673        let (mut p, mut c) = create(16);
674
675        assert_eq!(p.write_transmit(&mut (&b"abcdef"[..]), None).unwrap(), 6);
676
677        let mut buf = vec!();
678        assert_eq!(c.read_transmit(&mut buf, Some(3)).unwrap(), 3);
679        assert_eq!(&buf, b"abc");
680        assert_eq!(c.read_transmit(&mut buf, None).unwrap(), 3);
681        assert_eq!(&buf, b"abcdef");
682    }
683
684    #[test]
685    fn read_transmit_block() {
686        let (_p, mut c) = create(16);
687
688        let mut buf = vec!();
689        match c.read_transmit(&mut buf, None) {
690            Ok(_) => panic!(),
691            Err(err) => match err {
692                TransmitError::This(e) => {
693                    assert_eq!(e.kind(), ErrorKind::WouldBlock);
694                    assert_eq!(e.get_ref().unwrap().description(), "Ring buffer is empty");
695                },
696                other => panic!("{:?}", other),
697            }
698        }
699    }
700
701    #[test]
702    fn write_transmit_block() {
703        const SIZE: usize = 16;
704        let (mut p, _c) = create(SIZE);
705
706        assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
707        match p.write_transmit(&mut (&b"abc"[..]), None) {
708            Ok(_) => panic!(),
709            Err(err) => match err {
710                TransmitError::This(e) => {
711                    assert_eq!(e.kind(), ErrorKind::WouldBlock);
712                    assert_eq!(e.get_ref().unwrap().description(), "Ring buffer is full");
713                },
714                other => panic!("{:?}", other),
715            }
716        }
717    }
718
719    #[test]
720    fn close_cons() {
721        let (mut p, c) = create(16);
722
723        assert_eq!(p.write(b"abc").unwrap(), 3);
724
725        (move || {
726            let _ = c;
727        })();
728
729        match p.write(b"def") {
730            Ok(_) => panic!(),
731            Err(err) => {
732                assert_eq!(err.kind(), ErrorKind::BrokenPipe);
733                assert_eq!(err.get_ref().unwrap().description(), "Consumer was closed");
734            }
735        }
736    }
737
738    #[test]
739    fn close_prod() {
740        let (mut p, mut c) = create(16);
741        let mut buf = [0; 6];
742
743        assert_eq!(p.write(b"abcdef").unwrap(), 6);
744
745        assert_eq!(c.read(&mut buf[0..3]).unwrap(), 3);
746        assert_eq!(&buf[0..3], b"abc");
747
748        (move || {
749            let _ = p;
750        })();
751
752        assert_eq!(c.read(&mut buf).unwrap(), 3);
753        assert_eq!(&buf[0..3], b"def");
754
755        match c.read(&mut buf) {
756            Ok(_) => panic!(),
757            Err(err) => {
758                assert_eq!(err.kind(), ErrorKind::BrokenPipe);
759                assert_eq!(err.get_ref().unwrap().description(), "Producer was closed");
760            }
761        }
762    }
763
764    #[test]
765    fn poll_cons() {
766        let (mut p, mut c) = create(16);
767        let poll = Poll::new().unwrap();
768        let mut events = Events::with_capacity(16);
769        let mut buf = [0; 6];
770
771        poll.register(&c, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
772
773        let jh = thread::spawn(move || {
774            thread::sleep(Duration::from_millis(10));
775            assert_eq!(p.write(b"abc").unwrap(), 3);
776            assert_eq!(p.write(b"def").unwrap(), 3);
777            p
778        });
779
780        poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
781        thread::sleep(Duration::from_millis(10));
782
783        {
784            let mut eiter = events.iter();
785
786            let event = eiter.next().unwrap();
787            assert_eq!(event.token().0, 0);
788            assert!(event.readiness().is_readable());
789            assert_eq!(c.read(&mut buf).unwrap(), 6);
790            assert_eq!(&buf, b"abcdef");
791
792            assert!(eiter.next().is_none());
793        }
794
795        poll.poll(&mut events, Some(Duration::from_millis(10))).unwrap();
796        assert!(events.iter().next().is_none());
797
798        jh.join().unwrap();
799    }
800
801    #[test]
802    fn poll_prod() {
803        const SIZE: usize = 16;
804        let (mut p, mut c) = create(SIZE);
805        let poll = Poll::new().unwrap();
806        let mut events = Events::with_capacity(16);
807
808        poll.register(&p, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
809
810        assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
811
812        let jh = thread::spawn(move || {
813            let mut buf = [0; 3];
814            thread::sleep(Duration::from_millis(10));
815            assert_eq!(c.read(&mut buf).unwrap(), 3);
816            assert_eq!(c.read(&mut buf).unwrap(), 3);
817            c
818        });
819
820        poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
821        thread::sleep(Duration::from_millis(10));
822
823        {
824            let mut eiter = events.iter();
825
826            let event = eiter.next().unwrap();
827            assert_eq!(event.token().0, 0);
828
829            assert!(event.readiness().is_writable());
830
831            assert_eq!(p.write(b"abcdefghi").unwrap(), 6);
832
833            assert!(eiter.next().is_none());
834        }
835
836        poll.poll(&mut events, Some(Duration::from_millis(10))).unwrap();
837        assert!(events.iter().next().is_none());
838
839        jh.join().unwrap();
840    }
841
842    #[test]
843    fn poll_cons_close() {
844        let (p, mut c) = create(16);
845        let poll = Poll::new().unwrap();
846        let mut events = Events::with_capacity(16);
847        let mut buf = [0; 3];
848
849        poll.register(&c, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
850
851        let jh = thread::spawn(move || {
852            thread::sleep(Duration::from_millis(10));
853            let _ = p;
854        });
855
856        'outer: for _ in 0..2 {
857            poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
858
859            for event in events.iter() {
860                assert_eq!(event.token().0, 0);
861                assert!(event.readiness().is_readable());
862                match c.read(&mut buf) {
863                    Ok(_) => panic!(),
864                    Err(err) => {
865                        match err.kind() {
866                            ErrorKind::BrokenPipe => break 'outer,
867                            ErrorKind::WouldBlock => (),
868                            _ => panic!("{:?}", err),
869                        }
870                    }
871                }
872            }
873        }
874
875        jh.join().unwrap();
876    }
877
878    #[test]
879    fn poll_prod_close() {
880        const SIZE: usize = 16;
881        let (mut p, c) = create(SIZE);
882        let poll = Poll::new().unwrap();
883        let mut events = Events::with_capacity(16);
884
885        poll.register(&p, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
886
887        assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
888
889        let jh = thread::spawn(move || {
890            thread::sleep(Duration::from_millis(10));
891            let _ = c;
892        });
893
894        'outer: loop {
895            poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
896
897            for event in events.iter() {
898                assert_eq!(event.token().0, 0);
899                assert!(event.readiness().is_writable());
900                match p.write(b"def") {
901                    Ok(_) => panic!(),
902                    Err(err) => {
903                        match err.kind() {
904                            ErrorKind::BrokenPipe => break 'outer,
905                            ErrorKind::WouldBlock => (),
906                            _ => panic!("{:?}", err),
907                        }
908                    }
909                }
910            }
911        }
912
913        jh.join().unwrap();
914    }
915
916    #[test]
917    fn poll_prod_cons() {
918        const SIZE: usize = 16;
919        let (mut p, mut c) = create(SIZE);
920
921        let cjh = thread::spawn(move || {
922            let poll = Poll::new().unwrap();
923            let mut events = Events::with_capacity(16);
924            let mut buf = [0; SIZE/2];
925
926            poll.register(&c, Token(0), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
927            
928            let mut i = 0;
929            'outer: loop {
930                poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
931                for event in events.iter() {
932                    assert_eq!(event.token().0, 0);
933                    assert!(event.readiness().is_readable());
934                    'inner: loop {
935                        match c.read(&mut buf) {
936                            Ok(n) => {
937                                assert_eq!(n, SIZE/2);
938                                assert_eq!(&buf, &[i/2; SIZE/2]);
939                                i += 1;
940                            },
941                            Err(err) => {
942                                match err.kind() {
943                                    ErrorKind::BrokenPipe => break 'outer,
944                                    ErrorKind::WouldBlock => break 'inner,
945                                    _ => panic!("{:?}", err),
946                                }
947                            }
948                        }
949                    }
950                }
951                poll.reregister(&c, Token(0), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
952            }
953            assert_eq!(i, 3);
954        });
955
956        let pjh = thread::spawn(move || {
957            let poll = Poll::new().unwrap();
958            let mut events = Events::with_capacity(16);
959
960            assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
961            poll.register(&p, Token(0), Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
962            poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
963
964            let event = events.iter().next().unwrap();
965            assert_eq!(event.token().0, 0);
966            assert!(event.readiness().is_writable());
967            assert_eq!(p.write(&[1; SIZE/2]).unwrap(), SIZE/2);
968            poll.reregister(&p, Token(0), Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
969            poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
970
971            thread::sleep(Duration::from_millis(10));
972        });
973
974        pjh.join().unwrap();
975        cjh.join().unwrap();
976    }
977}