1extern crate mio;
149extern crate ringbuf;
150
151
152use std::io::{Write, Read, Error, ErrorKind};
153use std::sync::{Arc, atomic::{fence, AtomicBool, Ordering}};
154
155use mio::{Evented, Poll, Token, Ready, PollOpt, Registration, SetReadiness};
156
157use ringbuf::{
158 RingBuffer,
159 Producer as RbProducer, Consumer as RbConsumer,
160 PushSliceError, PopSliceError,
161 WriteIntoError, ReadFromError,
162};
163
164#[derive(Debug)]
165pub enum TransmitError {
166 This(Error),
167 Other(Error),
168}
169
170pub struct Producer {
171 reg: Registration,
172 src: SetReadiness,
173 rbp: RbProducer<u8>,
174 cls: Arc<AtomicBool>,
175}
176
177pub struct Consumer {
178 reg: Registration,
179 srp: SetReadiness,
180 rbc: RbConsumer<u8>,
181 cls: Arc<AtomicBool>,
182}
183
184pub fn create(capacity: usize) -> (Producer, Consumer) {
185 let flag = Arc::new(AtomicBool::new(true));
186
187 let rb = RingBuffer::<u8>::new(capacity);
188
189 let (regp, srp) = Registration::new2();
190 let (regc, src) = Registration::new2();
191
192 let (rbp, rbc) = rb.split();
193
194 let prod = Producer { reg: regp, src, rbp, cls: flag.clone() };
195 let cons = Consumer { reg: regc, srp, rbc, cls: flag };
196
197 (prod, cons)
198}
199
200impl Evented for Producer {
201 fn register(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
202 poll.register(&self.reg, token, interest, poll_opt)
203 }
204
205 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
206 poll.reregister(&self.reg, token, interest, poll_opt)
207 }
208
209 fn deregister(&self, poll: &Poll) -> Result<(), Error> {
210 poll.deregister(&self.reg)
211 }
212}
213
214impl Evented for Consumer {
215 fn register(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
216 poll.register(&self.reg, token, interest, poll_opt)
217 }
218
219 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, poll_opt: PollOpt) -> Result<(), Error> {
220 poll.reregister(&self.reg, token, interest, poll_opt)
221 }
222
223 fn deregister(&self, poll: &Poll) -> Result<(), Error> {
224 poll.deregister(&self.reg)
225 }
226}
227
228impl Drop for Producer {
229 fn drop(&mut self) {
230 self.cls.store(false, Ordering::SeqCst);
231 self.src.set_readiness(Ready::all()).unwrap();
232 fence(Ordering::SeqCst);
233 }
234}
235
236impl Drop for Consumer {
237 fn drop(&mut self) {
238 self.cls.store(false, Ordering::SeqCst);
239 self.srp.set_readiness(Ready::all()).unwrap();
240 fence(Ordering::SeqCst);
241 }
242}
243
244impl Write for Producer {
245 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
246 if !self.cls.load(Ordering::SeqCst) {
247 return Err(Error::new(
248 ErrorKind::BrokenPipe,
249 "Consumer was closed",
250 ))
251 }
252
253 let empty = self.rbp.is_empty();
254 match self.rbp.push_slice(buf) {
255 Ok(num) => {
256 if num > 0 && empty {
257 let res = self.src.set_readiness(Ready::readable());
258 fence(Ordering::SeqCst);
259 res
260 } else {
261 Ok(())
262 }.and(Ok(num))
263 },
264 Err(err) => match err {
265 PushSliceError::Full => Err(Error::new(
266 ErrorKind::WouldBlock,
267 "Ring buffer is full",
268 )),
269 }
270 }
271 }
272
273 fn flush(&mut self) -> Result<(), Error> {
274 Ok(())
275 }
276}
277
278impl Read for Consumer {
279 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
280 let full = self.rbc.is_full();
281 match self.rbc.pop_slice(buf) {
282 Ok(num) => {
283 if num > 0 && full {
284 let res = self.srp.set_readiness(Ready::writable());
285 fence(Ordering::SeqCst);
286 res
287 } else {
288 Ok(())
289 }.and(Ok(num))
290 },
291 Err(err) => match err {
292 PopSliceError::Empty => Err({
293 if !self.cls.load(Ordering::SeqCst) {
294 Error::new(
295 ErrorKind::BrokenPipe,
296 "Producer was closed",
297 )
298 } else {
299 Error::new(
300 ErrorKind::WouldBlock,
301 "Ring buffer is empty",
302 )
303 }
304 }),
305 }
306 }
307 }
308}
309
310pub trait WriteTransmit {
311 fn write_transmit(&mut self, other: &mut dyn Read, count: Option<usize>)
312 -> Result<usize, TransmitError>;
313}
314
315pub trait ReadTransmit {
316 fn read_transmit(&mut self, other: &mut dyn Write, count: Option<usize>)
317 -> Result<usize, TransmitError>;
318}
319
320impl WriteTransmit for Producer {
321 fn write_transmit(&mut self, other: &mut dyn Read, count: Option<usize>)
322 -> Result<usize, TransmitError> {
323 if !self.cls.load(Ordering::SeqCst) {
324 return Err(TransmitError::This(Error::new(
325 ErrorKind::BrokenPipe, "Consumer was closed",
326 )))
327 }
328
329 let empty = self.rbp.is_empty();
330 match self.rbp.read_from(other, count) {
331 Ok(num) => {
332 if num > 0 && empty {
333 let res = self.src.set_readiness(Ready::readable());
334 fence(Ordering::SeqCst);
335 res
336 } else {
337 Ok(())
338 }.and(Ok(num)).or_else(|e| {
339 Err(TransmitError::This(e))
340 })
341 },
342 Err(err) => Err(match err {
343 ReadFromError::Read(e) => TransmitError::Other(e),
344 ReadFromError::RbFull => TransmitError::This(Error::new(
345 ErrorKind::WouldBlock, "Ring buffer is full",
346 )),
347 }),
348 }
349 }
350}
351
352impl ReadTransmit for Consumer {
353 fn read_transmit(&mut self, other: &mut dyn Write, count: Option<usize>)
354 -> Result<usize, TransmitError> {
355 let full = self.rbc.is_full();
356 match self.rbc.write_into(other, count) {
357 Ok(num) => {
358 if num > 0 && full {
359 let res = self.srp.set_readiness(Ready::writable());
360 fence(Ordering::SeqCst);
361 res
362 } else {
363 Ok(())
364 }.and(Ok(num)).or_else(|e| {
365 Err(TransmitError::This(e))
366 })
367 },
368 Err(err) => Err(match err {
369 WriteIntoError::Write(e) => TransmitError::Other(e),
370 WriteIntoError::RbEmpty => TransmitError::This({
371 if !self.cls.load(Ordering::SeqCst) {
372 Error::new(
373 ErrorKind::BrokenPipe, "Producer was closed",
374 )
375 } else {
376 Error::new(
377 ErrorKind::WouldBlock, "Ring buffer is empty",
378 )
379 }
380 }),
381 }),
382 }
383 }
384}
385
386
387#[cfg(test)]
388mod test {
389 use super::*;
390
391 use std::thread;
392 use std::time::{Duration};
393
394 use mio::{Events};
395
396
397 #[test]
398 fn reg_set_r() {
399 let (reg, sr) = Registration::new2();
400 let poll = Poll::new().unwrap();
401 let mut events = Events::with_capacity(16);
402
403 let jh = thread::spawn(move || {
404 thread::sleep(Duration::from_millis(10));
405 sr.set_readiness(Ready::readable()).unwrap();
406 });
407
408 poll.register(®, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
409
410 poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
411 let mut hdl = false;
412 for e in events.iter() {
413 assert_eq!(e.token().0, 0);
414 assert!(e.readiness().is_readable());
415 hdl = true;
416 }
417 assert!(hdl);
418
419 poll.deregister(®).unwrap();
420
421 jh.join().unwrap();
422 }
423
424 #[test]
425 fn reg_set_w() {
426 let (reg, sr) = Registration::new2();
427 let poll = Poll::new().unwrap();
428 let mut events = Events::with_capacity(16);
429
430 let jh = thread::spawn(move || {
431 thread::sleep(Duration::from_millis(10));
432 sr.set_readiness(Ready::writable()).unwrap();
433 });
434
435 poll.register(®, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
436
437 poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
438 let mut hdl = false;
439 for e in events.iter() {
440 assert_eq!(e.token().0, 0);
441 assert!(e.readiness().is_writable());
442 hdl = true;
443 }
444 assert!(hdl);
445
446 poll.deregister(®).unwrap();
447
448 jh.join().unwrap();
449 }
450
451 #[test]
452 fn reg_set_twice() {
453 let (reg, sr) = Registration::new2();
454 let poll = Poll::new().unwrap();
455 let mut events = Events::with_capacity(16);
456
457 let jh = thread::spawn(move || {
458 thread::sleep(Duration::from_millis(10));
459 sr.set_readiness(Ready::readable()).unwrap();
460
461 thread::sleep(Duration::from_millis(10));
462 sr.set_readiness(Ready::readable()).unwrap();
463 });
464
465 poll.register(®, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
466
467 poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
468 let mut hdl = false;
469 for e in events.iter() {
470 assert_eq!(e.token().0, 0);
471 assert!(e.readiness().is_readable());
472 hdl = true;
473 }
474 assert!(hdl);
475
476 poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
477 let mut hdl = false;
478 for e in events.iter() {
479 assert_eq!(e.token().0, 0);
480 assert!(e.readiness().is_readable());
481 hdl = true;
482 }
483 assert!(hdl);
484
485 poll.deregister(®).unwrap();
486
487 jh.join().unwrap();
488 }
489
490 #[test]
491 fn reg_drop() {
492 let (reg, sr) = Registration::new2();
493
494 let jh = thread::spawn(move || {
495 let _ = reg;
496 });
497
498 thread::sleep(Duration::from_millis(10));
499 sr.set_readiness(Ready::readable()).unwrap();
500
501 jh.join().unwrap();
502 }
503
504 #[test]
505 fn dereg_reg() {
506 let (reg, sr) = Registration::new2();
507 let poll = Poll::new().unwrap();
508 let mut events = Events::with_capacity(16);
509
510 let jh = thread::spawn(move || {
511 thread::sleep(Duration::from_millis(10));
512 sr.set_readiness(Ready::readable()).unwrap();
513
514 thread::sleep(Duration::from_millis(10));
515 sr.set_readiness(Ready::readable()).unwrap();
516 });
517
518 poll.register(®, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
519
520 poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
521 let mut hdl = false;
522 for e in events.iter() {
523 assert_eq!(e.token().0, 0);
524 assert!(e.readiness().is_readable());
525 hdl = true;
526 }
527 assert!(hdl);
528
529 poll.deregister(®).unwrap();
530
531 poll.register(®, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
532
533 poll.poll(&mut events, Some(Duration::from_secs(1))).unwrap();
534 let mut hdl = false;
535 for e in events.iter() {
536 assert_eq!(e.token().0, 1);
537 assert!(e.readiness().is_readable());
538 hdl = true;
539 }
540 assert!(hdl);
541
542 poll.deregister(®).unwrap();
543
544 jh.join().unwrap();
545 }
546
547 #[test]
548 fn write_read() {
549 let (mut p, mut c) = create(16);
550
551 assert_eq!(p.write(b"abcdef").unwrap(), 6);
552
553 let mut buf = [0; 6];
554 assert_eq!(c.read(&mut buf).unwrap(), 6);
555 assert_eq!(&buf, b"abcdef");
556 }
557
558 #[test]
559 fn write_read_concat() {
560 let (mut p, mut c) = create(16);
561
562 assert_eq!(p.write(b"abc").unwrap(), 3);
563 assert_eq!(p.write(b"def").unwrap(), 3);
564
565 let mut buf = [0; 6];
566 assert_eq!(c.read(&mut buf).unwrap(), 6);
567 assert_eq!(&buf, b"abcdef");
568 }
569
570 #[test]
571 fn write_read_split() {
572 let (mut p, mut c) = create(16);
573
574 assert_eq!(p.write(b"abcdef").unwrap(), 6);
575
576 let mut buf = [0; 3];
577 assert_eq!(c.read(&mut buf).unwrap(), 3);
578 assert_eq!(&buf, b"abc");
579 assert_eq!(c.read(&mut buf).unwrap(), 3);
580 assert_eq!(&buf, b"def");
581 }
582
583 #[test]
584 fn write_read_empty() {
585 let (mut p, mut c) = create(16);
586
587 let mut buf = [0; 6];
588
589 assert_eq!(p.write(b"abc").unwrap(), 3);
590 assert_eq!(c.read(&mut buf).unwrap(), 3);
591 assert_eq!(&buf, b"abc\0\0\0");
592
593 assert_eq!(p.write(b"def").unwrap(), 3);
594 assert_eq!(c.read(&mut buf).unwrap(), 3);
595 assert_eq!(&buf, b"def\0\0\0");
596 }
597
598 #[test]
599 fn write_read_full() {
600 let (mut p, mut c) = create(8);
601
602 let range: Vec<u8> = (0..8).collect();
603 let mut buf = [0; 6];
604
605 assert_eq!(p.write(&range).unwrap(), 8);
606
607 assert_eq!(c.read(&mut buf[0..3]).unwrap(), 3);
608 assert_eq!(&buf[0..3], &[0,1,2]);
609
610 assert_eq!(p.write(b"abcdef").unwrap(), 3);
611
612 assert_eq!(c.read(&mut buf[0..3]).unwrap(), 3);
613 assert_eq!(&buf[0..3], &[3,4,5]);
614
615 assert_eq!(c.read(&mut buf).unwrap(), 5);
616 assert_eq!(&buf[0..5], &[6,7,b'a',b'b',b'c']);
617 }
618
619 #[test]
620 fn read_block() {
621 let (_p, mut c) = create(16);
622
623 let mut buf = [0; 4];
624 match c.read(&mut buf) {
625 Ok(_) => panic!(),
626 Err(err) => {
627 assert_eq!(err.kind(), ErrorKind::WouldBlock);
628 assert_eq!(err.get_ref().unwrap().description(), "Ring buffer is empty");
629 }
630 }
631 }
632
633 #[test]
634 fn write_block() {
635 const SIZE: usize = 16;
636 let (mut p, _c) = create(SIZE);
637
638 assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
639 match p.write(b"abc") {
640 Ok(_) => panic!(),
641 Err(err) => {
642 assert_eq!(err.kind(), ErrorKind::WouldBlock);
643 assert_eq!(err.get_ref().unwrap().description(), "Ring buffer is full");
644 }
645 }
646 }
647
648 #[test]
649 fn write_read_transmit() {
650 let (mut p, mut c) = create(16);
651
652 assert_eq!(p.write_transmit(&mut (&b"abcdef"[..]), None).unwrap(), 6);
653
654 let mut buf = vec!();
655 assert_eq!(c.read_transmit(&mut buf, None).unwrap(), 6);
656 assert_eq!(&buf, b"abcdef");
657 }
658
659 #[test]
660 fn write_read_transmit_concat() {
661 let (mut p, mut c) = create(16);
662
663 assert_eq!(p.write_transmit(&mut (&b"abc"[..]), None).unwrap(), 3);
664 assert_eq!(p.write_transmit(&mut (&b"def"[..]), None).unwrap(), 3);
665
666 let mut buf = vec!();
667 assert_eq!(c.read_transmit(&mut buf, None).unwrap(), 6);
668 assert_eq!(&buf, b"abcdef");
669 }
670
671 #[test]
672 fn write_read_transmit_split() {
673 let (mut p, mut c) = create(16);
674
675 assert_eq!(p.write_transmit(&mut (&b"abcdef"[..]), None).unwrap(), 6);
676
677 let mut buf = vec!();
678 assert_eq!(c.read_transmit(&mut buf, Some(3)).unwrap(), 3);
679 assert_eq!(&buf, b"abc");
680 assert_eq!(c.read_transmit(&mut buf, None).unwrap(), 3);
681 assert_eq!(&buf, b"abcdef");
682 }
683
684 #[test]
685 fn read_transmit_block() {
686 let (_p, mut c) = create(16);
687
688 let mut buf = vec!();
689 match c.read_transmit(&mut buf, None) {
690 Ok(_) => panic!(),
691 Err(err) => match err {
692 TransmitError::This(e) => {
693 assert_eq!(e.kind(), ErrorKind::WouldBlock);
694 assert_eq!(e.get_ref().unwrap().description(), "Ring buffer is empty");
695 },
696 other => panic!("{:?}", other),
697 }
698 }
699 }
700
701 #[test]
702 fn write_transmit_block() {
703 const SIZE: usize = 16;
704 let (mut p, _c) = create(SIZE);
705
706 assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
707 match p.write_transmit(&mut (&b"abc"[..]), None) {
708 Ok(_) => panic!(),
709 Err(err) => match err {
710 TransmitError::This(e) => {
711 assert_eq!(e.kind(), ErrorKind::WouldBlock);
712 assert_eq!(e.get_ref().unwrap().description(), "Ring buffer is full");
713 },
714 other => panic!("{:?}", other),
715 }
716 }
717 }
718
719 #[test]
720 fn close_cons() {
721 let (mut p, c) = create(16);
722
723 assert_eq!(p.write(b"abc").unwrap(), 3);
724
725 (move || {
726 let _ = c;
727 })();
728
729 match p.write(b"def") {
730 Ok(_) => panic!(),
731 Err(err) => {
732 assert_eq!(err.kind(), ErrorKind::BrokenPipe);
733 assert_eq!(err.get_ref().unwrap().description(), "Consumer was closed");
734 }
735 }
736 }
737
738 #[test]
739 fn close_prod() {
740 let (mut p, mut c) = create(16);
741 let mut buf = [0; 6];
742
743 assert_eq!(p.write(b"abcdef").unwrap(), 6);
744
745 assert_eq!(c.read(&mut buf[0..3]).unwrap(), 3);
746 assert_eq!(&buf[0..3], b"abc");
747
748 (move || {
749 let _ = p;
750 })();
751
752 assert_eq!(c.read(&mut buf).unwrap(), 3);
753 assert_eq!(&buf[0..3], b"def");
754
755 match c.read(&mut buf) {
756 Ok(_) => panic!(),
757 Err(err) => {
758 assert_eq!(err.kind(), ErrorKind::BrokenPipe);
759 assert_eq!(err.get_ref().unwrap().description(), "Producer was closed");
760 }
761 }
762 }
763
764 #[test]
765 fn poll_cons() {
766 let (mut p, mut c) = create(16);
767 let poll = Poll::new().unwrap();
768 let mut events = Events::with_capacity(16);
769 let mut buf = [0; 6];
770
771 poll.register(&c, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
772
773 let jh = thread::spawn(move || {
774 thread::sleep(Duration::from_millis(10));
775 assert_eq!(p.write(b"abc").unwrap(), 3);
776 assert_eq!(p.write(b"def").unwrap(), 3);
777 p
778 });
779
780 poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
781 thread::sleep(Duration::from_millis(10));
782
783 {
784 let mut eiter = events.iter();
785
786 let event = eiter.next().unwrap();
787 assert_eq!(event.token().0, 0);
788 assert!(event.readiness().is_readable());
789 assert_eq!(c.read(&mut buf).unwrap(), 6);
790 assert_eq!(&buf, b"abcdef");
791
792 assert!(eiter.next().is_none());
793 }
794
795 poll.poll(&mut events, Some(Duration::from_millis(10))).unwrap();
796 assert!(events.iter().next().is_none());
797
798 jh.join().unwrap();
799 }
800
801 #[test]
802 fn poll_prod() {
803 const SIZE: usize = 16;
804 let (mut p, mut c) = create(SIZE);
805 let poll = Poll::new().unwrap();
806 let mut events = Events::with_capacity(16);
807
808 poll.register(&p, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
809
810 assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
811
812 let jh = thread::spawn(move || {
813 let mut buf = [0; 3];
814 thread::sleep(Duration::from_millis(10));
815 assert_eq!(c.read(&mut buf).unwrap(), 3);
816 assert_eq!(c.read(&mut buf).unwrap(), 3);
817 c
818 });
819
820 poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
821 thread::sleep(Duration::from_millis(10));
822
823 {
824 let mut eiter = events.iter();
825
826 let event = eiter.next().unwrap();
827 assert_eq!(event.token().0, 0);
828
829 assert!(event.readiness().is_writable());
830
831 assert_eq!(p.write(b"abcdefghi").unwrap(), 6);
832
833 assert!(eiter.next().is_none());
834 }
835
836 poll.poll(&mut events, Some(Duration::from_millis(10))).unwrap();
837 assert!(events.iter().next().is_none());
838
839 jh.join().unwrap();
840 }
841
842 #[test]
843 fn poll_cons_close() {
844 let (p, mut c) = create(16);
845 let poll = Poll::new().unwrap();
846 let mut events = Events::with_capacity(16);
847 let mut buf = [0; 3];
848
849 poll.register(&c, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
850
851 let jh = thread::spawn(move || {
852 thread::sleep(Duration::from_millis(10));
853 let _ = p;
854 });
855
856 'outer: for _ in 0..2 {
857 poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
858
859 for event in events.iter() {
860 assert_eq!(event.token().0, 0);
861 assert!(event.readiness().is_readable());
862 match c.read(&mut buf) {
863 Ok(_) => panic!(),
864 Err(err) => {
865 match err.kind() {
866 ErrorKind::BrokenPipe => break 'outer,
867 ErrorKind::WouldBlock => (),
868 _ => panic!("{:?}", err),
869 }
870 }
871 }
872 }
873 }
874
875 jh.join().unwrap();
876 }
877
878 #[test]
879 fn poll_prod_close() {
880 const SIZE: usize = 16;
881 let (mut p, c) = create(SIZE);
882 let poll = Poll::new().unwrap();
883 let mut events = Events::with_capacity(16);
884
885 poll.register(&p, Token(0), Ready::writable(), PollOpt::edge()).unwrap();
886
887 assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
888
889 let jh = thread::spawn(move || {
890 thread::sleep(Duration::from_millis(10));
891 let _ = c;
892 });
893
894 'outer: loop {
895 poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
896
897 for event in events.iter() {
898 assert_eq!(event.token().0, 0);
899 assert!(event.readiness().is_writable());
900 match p.write(b"def") {
901 Ok(_) => panic!(),
902 Err(err) => {
903 match err.kind() {
904 ErrorKind::BrokenPipe => break 'outer,
905 ErrorKind::WouldBlock => (),
906 _ => panic!("{:?}", err),
907 }
908 }
909 }
910 }
911 }
912
913 jh.join().unwrap();
914 }
915
916 #[test]
917 fn poll_prod_cons() {
918 const SIZE: usize = 16;
919 let (mut p, mut c) = create(SIZE);
920
921 let cjh = thread::spawn(move || {
922 let poll = Poll::new().unwrap();
923 let mut events = Events::with_capacity(16);
924 let mut buf = [0; SIZE/2];
925
926 poll.register(&c, Token(0), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
927
928 let mut i = 0;
929 'outer: loop {
930 poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
931 for event in events.iter() {
932 assert_eq!(event.token().0, 0);
933 assert!(event.readiness().is_readable());
934 'inner: loop {
935 match c.read(&mut buf) {
936 Ok(n) => {
937 assert_eq!(n, SIZE/2);
938 assert_eq!(&buf, &[i/2; SIZE/2]);
939 i += 1;
940 },
941 Err(err) => {
942 match err.kind() {
943 ErrorKind::BrokenPipe => break 'outer,
944 ErrorKind::WouldBlock => break 'inner,
945 _ => panic!("{:?}", err),
946 }
947 }
948 }
949 }
950 }
951 poll.reregister(&c, Token(0), Ready::readable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
952 }
953 assert_eq!(i, 3);
954 });
955
956 let pjh = thread::spawn(move || {
957 let poll = Poll::new().unwrap();
958 let mut events = Events::with_capacity(16);
959
960 assert_eq!(p.write(&[0; SIZE]).unwrap(), SIZE);
961 poll.register(&p, Token(0), Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
962 poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
963
964 let event = events.iter().next().unwrap();
965 assert_eq!(event.token().0, 0);
966 assert!(event.readiness().is_writable());
967 assert_eq!(p.write(&[1; SIZE/2]).unwrap(), SIZE/2);
968 poll.reregister(&p, Token(0), Ready::writable(), PollOpt::edge() | PollOpt::oneshot()).unwrap();
969 poll.poll(&mut events, Some(Duration::from_secs(10))).unwrap();
970
971 thread::sleep(Duration::from_millis(10));
972 });
973
974 pjh.join().unwrap();
975 cjh.join().unwrap();
976 }
977}