1use super::{CaughtSignals, Clock, Errno, Fcntl, Read, Result, Select, Write};
20use crate::io::Fd;
21use crate::waker::{ScheduledWakerQueue, WakerSet};
22use std::cell::{Cell, LazyCell, OnceCell, RefCell};
23use std::collections::HashMap;
24use std::future::poll_fn;
25use std::ops::{Deref, DerefMut};
26use std::pin::pin;
27use std::rc::{Rc, Weak};
28use std::task::Poll::{Pending, Ready};
29use std::task::{Context, Waker};
30use std::time::{Duration, Instant};
31
32#[derive(Clone, Debug, Default)]
99pub struct Concurrent<S> {
100 inner: S,
101 state: RefCell<State>,
102}
103
104#[derive(Clone, Debug, Default)]
106struct State {
107 reads: HashMap<Fd, WakerSet>,
109 writes: HashMap<Fd, WakerSet>,
111 timeouts: ScheduledWakerQueue,
113
114 catches: WakerSet,
116 signals: Weak<SignalList>,
123 select_mask: Option<Vec<crate::signal::Number>>,
131}
132
133impl<S> Concurrent<S> {
134 #[must_use]
136 pub fn new(inner: S) -> Self {
137 let state = Default::default();
138 Self { inner, state }
139 }
140}
141
142impl<S> Read for Rc<Concurrent<S>>
152where
153 S: Fcntl + Read,
154{
155 fn read<'a>(
156 &self,
157 fd: Fd,
158 buffer: &'a mut [u8],
159 ) -> impl Future<Output = Result<usize>> + use<'a, S> {
160 let this = Rc::clone(self);
161 async move {
162 let this = TemporaryNonBlockingGuard::new(&this, fd);
163 let waker = LazyCell::default();
164 loop {
165 match this.inner.read(fd, buffer).await {
166 #[allow(unreachable_patterns)]
168 Err(Errno::EAGAIN | Errno::EWOULDBLOCK | Errno::EINTR) => {
169 this.yield_for_read(fd, &waker).await
170 }
171
172 result => return result,
173 }
174 }
175 }
176 }
177}
178
179impl<S> Write for Rc<Concurrent<S>>
189where
190 S: Fcntl + Write,
191{
192 fn write<'a>(
193 &self,
194 fd: Fd,
195 buffer: &'a [u8],
196 ) -> impl Future<Output = Result<usize>> + use<'a, S> {
197 let this = Rc::clone(self);
198 async move {
199 let this = TemporaryNonBlockingGuard::new(&this, fd);
200 let waker = LazyCell::default();
201 loop {
202 match this.inner.write(fd, buffer).await {
203 #[allow(unreachable_patterns)]
205 Err(Errno::EAGAIN | Errno::EWOULDBLOCK | Errno::EINTR) => {
206 this.yield_for_write(fd, &waker).await
207 }
208
209 result => return result,
210 }
211 }
212 }
213 }
214}
215
216impl<S> Concurrent<S> {
217 async fn yield_for_read<F>(&self, fd: Fd, waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>)
218 where
219 F: FnOnce() -> Rc<Cell<Option<Waker>>>,
220 {
221 self.yield_once(fd, waker, |state| &mut state.reads).await
222 }
223
224 async fn yield_for_write<F>(&self, fd: Fd, waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>)
225 where
226 F: FnOnce() -> Rc<Cell<Option<Waker>>>,
227 {
228 self.yield_once(fd, waker, |state| &mut state.writes).await
229 }
230
231 async fn yield_once<F, G>(
234 &self,
235 fd: Fd,
236 waker: &LazyCell<Rc<Cell<Option<Waker>>>, F>,
237 target: G,
238 ) where
239 F: FnOnce() -> Rc<Cell<Option<Waker>>>,
240 G: Fn(&mut State) -> &mut HashMap<Fd, WakerSet>,
241 {
242 let mut first_time = true;
243 poll_fn(|context| {
244 if first_time {
245 first_time = false;
246 waker.set(Some(context.waker().clone()));
247 target(&mut self.state.borrow_mut())
248 .entry(fd)
249 .or_default()
250 .insert(Rc::downgrade(waker));
251 Pending
252 } else {
253 Ready(())
254 }
255 })
256 .await
257 }
258}
259
260impl<S> Concurrent<S>
261where
262 S: Clock,
263{
264 pub async fn sleep_until(&self, deadline: Instant) {
269 let waker: LazyCell<Rc<Cell<Option<Waker>>>> = LazyCell::default();
270 poll_fn(|context| {
271 if self.inner.now() >= deadline {
272 Ready(())
273 } else {
274 waker.set(Some(context.waker().clone()));
275 self.state
276 .borrow_mut()
277 .timeouts
278 .push(deadline, Rc::downgrade(&waker));
279 Pending
280 }
281 })
282 .await
283 }
284
285 pub async fn sleep(&self, duration: Duration) {
290 let now = self.inner.now();
291 let deadline = now + duration;
292 self.sleep_until(deadline).await;
293 }
294}
295
296impl<S> Concurrent<S> {
297 pub async fn wait_for_signals(&self) -> Rc<SignalList> {
313 let signals = {
314 let mut state = self.state.borrow_mut();
315 state.signals.upgrade().unwrap_or_else(|| {
316 let signals = Rc::new(SignalList::new());
317 state.signals = Rc::downgrade(&signals);
318 signals
319 })
320 };
321
322 let waker: LazyCell<Rc<Cell<Option<Waker>>>> = LazyCell::default();
323 poll_fn(|context| {
324 if signals.0.get().is_some() {
325 Ready(())
326 } else {
327 waker.set(Some(context.waker().clone()));
328 self.state
329 .borrow_mut()
330 .catches
331 .insert(Rc::downgrade(&waker));
332 Pending
333 }
334 })
335 .await;
336
337 signals
338 }
339}
340
341impl<S> Concurrent<S>
342where
343 S: CaughtSignals + Clock + Select,
344{
345 pub fn peek(&self) {
352 let select = pin!(self.select_impl(true));
353 let poll = select.poll(&mut Context::from_waker(Waker::noop()));
354 debug_assert_eq!(poll, Ready(()), "peek should not block");
355 }
356
357 pub async fn select(&self) {
381 self.select_impl(false).await;
382 }
383
384 #[allow(clippy::await_holding_refcell_ref)]
385 async fn select_impl(&self, peek: bool) {
386 let mut state = self.state.borrow_mut();
391
392 let mut readers = state.reads.keys().cloned().collect();
394 let mut writers = state.writes.keys().cloned().collect();
395 let timeout = if peek {
396 Some(Duration::ZERO)
397 } else {
398 state
399 .timeouts
400 .next_wake_time()
401 .map(|target| target.saturating_duration_since(self.inner.now()))
402 };
403 let signal_mask = (state.signals.strong_count() > 0)
404 .then(|| state.select_mask.as_deref())
405 .flatten();
406
407 let result = self
409 .inner
410 .select(&mut readers, &mut writers, timeout, signal_mask)
411 .await;
412
413 if result != Err(Errno::EINTR) {
415 wake_tasks_for_ready_fds(&mut state.reads, &readers);
420 wake_tasks_for_ready_fds(&mut state.writes, &writers);
421 }
422 if !state.timeouts.is_empty() {
423 state.timeouts.wake(self.inner.now());
424 }
425 if let Some(signal_list) = state.signals.upgrade() {
426 let signals = self.inner.caught_signals();
429 if !signals.is_empty() {
430 let set_result = signal_list.0.set(signals);
431 debug_assert_eq!(set_result, Ok(()), "SignalList should not be filled yet");
432 state.catches.wake_all();
433 state.signals = Weak::new();
435 }
436 }
437 }
438}
439
440fn wake_tasks_for_ready_fds(task_map: &mut HashMap<Fd, WakerSet>, ready_fds: &[Fd]) {
441 task_map.retain(|fd, wakers| {
442 if ready_fds.contains(fd) {
443 wakers.wake_all();
444 false
445 } else {
446 true
447 }
448 })
449}
450
451#[derive(Debug)]
454struct TemporaryNonBlockingGuard<'a, S: Fcntl> {
455 system: &'a Concurrent<S>,
456 fd: Fd,
457 original_nonblocking: bool,
458}
459
460impl<'a, S: Fcntl> TemporaryNonBlockingGuard<'a, S> {
461 fn new(system: &'a Concurrent<S>, fd: Fd) -> Self {
462 Self {
463 system,
464 fd,
465 original_nonblocking: system.inner.get_and_set_nonblocking(fd, true) == Ok(true),
466 }
467 }
468}
469
470impl<'a, S: Fcntl> Drop for TemporaryNonBlockingGuard<'a, S> {
471 fn drop(&mut self) {
472 if !self.original_nonblocking {
473 self.system
474 .inner
475 .get_and_set_nonblocking(self.fd, false)
476 .ok();
477 }
478 }
479}
480
481impl<'a, S: Fcntl> Deref for TemporaryNonBlockingGuard<'a, S> {
482 type Target = Concurrent<S>;
483
484 fn deref(&self) -> &Self::Target {
485 self.system
486 }
487}
488
489#[derive(Clone, Debug, Eq, PartialEq)]
496pub struct SignalList(OnceCell<Vec<crate::signal::Number>>);
497
498impl Deref for SignalList {
499 type Target = Vec<crate::signal::Number>;
500
501 fn deref(&self) -> &Vec<crate::signal::Number> {
502 self.0.get().unwrap()
504 }
505}
506
507impl DerefMut for SignalList {
508 fn deref_mut(&mut self) -> &mut Vec<crate::signal::Number> {
509 self.0.get_mut().unwrap()
511 }
512}
513
514impl SignalList {
515 #[must_use]
516 fn new() -> Self {
517 Self(OnceCell::new())
518 }
519
520 pub fn into_vec(self) -> Vec<crate::signal::Number> {
522 self.0.into_inner().unwrap()
524 }
525}
526
527mod delegates;
528mod run;
529mod rw_all;
530mod signal;
531
532#[cfg(test)]
533mod tests {
534 use super::super::{
535 Close as _, Disposition, Mode, OfdAccess, Open as _, OpenFlag, Pipe as _, SendSignal,
536 };
537 use super::*;
538 use crate::system::r#virtual::{PIPE_SIZE, SIGCHLD, SIGINT, SIGUSR2, VirtualSystem};
539 use crate::test_helper::WakeFlag;
540 use crate::trap::SignalSystem as _;
541 use assert_matches::assert_matches;
542 use futures_util::FutureExt as _;
543 use std::sync::Arc;
544 use std::task::Poll::{Pending, Ready};
545
546 #[test]
547 fn peek_with_no_conditions_returns_immediately() {
548 let system = Concurrent::new(VirtualSystem::new());
549 system.peek();
550 }
551
552 #[test]
553 fn select_with_no_conditions_never_completes() {
554 let system = Concurrent::new(VirtualSystem::new());
555
556 let future = pin!(system.select());
557
558 let wake_flag = Arc::new(WakeFlag::new());
559 let waker = Waker::from(wake_flag.clone());
560 let mut context = Context::from_waker(&waker);
561 assert_eq!(future.poll(&mut context), Pending);
562 assert!(!wake_flag.is_woken());
563 }
564
565 #[test]
566 fn regular_file_read_completes_immediately() {
567 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
568 let fd = system
569 .open(
570 c"/foo",
571 OfdAccess::ReadOnly,
572 OpenFlag::Create.into(),
573 Mode::empty(),
574 )
575 .now_or_never()
576 .unwrap()
577 .unwrap();
578
579 let mut buffer = [0; 4];
580 let future = pin!(system.read(fd, &mut buffer));
581
582 let wake_flag = Arc::new(WakeFlag::new());
583 let waker = Waker::from(wake_flag.clone());
584 let mut context = Context::from_waker(&waker);
585 assert_eq!(future.poll(&mut context), Ready(Ok(0)));
586 assert!(!wake_flag.is_woken());
587 }
588
589 #[test]
590 fn pipe_read_becomes_ready_on_data_available() {
591 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
592 let (read_fd, write_fd) = system.pipe().unwrap();
593
594 let mut buffer1 = [0; 4];
595 let mut buffer2 = [0; 4];
596 let mut read1 = pin!(system.read(read_fd, &mut buffer1));
597 let mut read2 = pin!(system.read(read_fd, &mut buffer2));
598
599 let wake_flag1 = Arc::new(WakeFlag::new());
600 let wake_flag2 = Arc::new(WakeFlag::new());
601 let waker1 = Waker::from(wake_flag1.clone());
602 let waker2 = Waker::from(wake_flag2.clone());
603 let mut context1 = Context::from_waker(&waker1);
604 let mut context2 = Context::from_waker(&waker2);
605 assert_eq!(read1.as_mut().poll(&mut context1), Pending);
606 assert_eq!(read2.as_mut().poll(&mut context2), Pending);
607
608 let mut select = pin!(system.select());
609 let mut context3 = Context::from_waker(Waker::noop());
610 assert_eq!(select.as_mut().poll(&mut context3), Pending);
611 assert!(!wake_flag1.is_woken());
612 assert!(!wake_flag2.is_woken());
613
614 system
616 .write(write_fd, &[1, 2, 3, 4])
617 .now_or_never()
618 .unwrap()
619 .unwrap();
620 assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
621 assert!(wake_flag1.is_woken());
622 assert!(wake_flag2.is_woken());
623 }
624
625 #[test]
626 fn select_wakes_only_read_tasks_with_ready_fd() {
627 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
628 let (read_fd1, write_fd1) = system.pipe().unwrap();
629 let (read_fd2, _write_fd2) = system.pipe().unwrap();
630
631 let mut buffer1 = [0; 4];
632 let mut buffer2 = [0; 4];
633 let mut read1 = pin!(system.read(read_fd1, &mut buffer1));
634 let mut read2 = pin!(system.read(read_fd2, &mut buffer2));
635
636 let wake_flag1 = Arc::new(WakeFlag::new());
637 let wake_flag2 = Arc::new(WakeFlag::new());
638 let waker1 = Waker::from(wake_flag1.clone());
639 let waker2 = Waker::from(wake_flag2.clone());
640 let mut context1 = Context::from_waker(&waker1);
641 let mut context2 = Context::from_waker(&waker2);
642 assert_eq!(read1.as_mut().poll(&mut context1), Pending);
643 assert_eq!(read2.as_mut().poll(&mut context2), Pending);
644
645 let mut select = pin!(system.select());
646 let mut context3 = Context::from_waker(Waker::noop());
647 assert_eq!(select.as_mut().poll(&mut context3), Pending);
648 assert!(!wake_flag1.is_woken());
649 assert!(!wake_flag2.is_woken());
650
651 system
653 .write(write_fd1, &[1, 2, 3, 4])
654 .now_or_never()
655 .unwrap()
656 .unwrap();
657 assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
658 assert!(wake_flag1.is_woken());
659 assert!(!wake_flag2.is_woken());
660 }
661
662 #[test]
663 fn read_preserves_fd_blocking_mode() {
664 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
665 let fd = system
666 .open(
667 c"/foo",
668 OfdAccess::ReadOnly,
669 OpenFlag::Create.into(),
670 Mode::empty(),
671 )
672 .now_or_never()
673 .unwrap()
674 .unwrap();
675
676 let mut buffer = [0; 4];
677 system
678 .read(fd, &mut buffer)
679 .now_or_never()
680 .unwrap()
681 .unwrap();
682
683 assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
686
687 system.inner.get_and_set_nonblocking(fd, true).ok();
688 system
689 .read(fd, &mut buffer)
690 .now_or_never()
691 .unwrap()
692 .unwrap();
693 assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
696 }
697
698 #[test]
699 fn regular_file_write_completes_immediately() {
700 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
701 let fd = system
702 .open(
703 c"/foo",
704 OfdAccess::WriteOnly,
705 OpenFlag::Create.into(),
706 Mode::empty(),
707 )
708 .now_or_never()
709 .unwrap()
710 .unwrap();
711
712 let buffer = [1, 2, 3, 4];
713 let future = pin!(system.write(fd, &buffer));
714
715 let wake_flag = Arc::new(WakeFlag::new());
716 let waker = Waker::from(wake_flag.clone());
717 let mut context = Context::from_waker(&waker);
718 assert_eq!(future.poll(&mut context), Ready(Ok(4)));
719 assert!(!wake_flag.is_woken());
720 }
721
722 #[test]
723 fn pipe_write_becomes_ready_on_buffer_space() {
724 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
725 let (read_fd, write_fd) = system.pipe().unwrap();
726 system
728 .write(write_fd, &[0; PIPE_SIZE])
729 .now_or_never()
730 .unwrap()
731 .unwrap();
732
733 let buffer1 = [1, 2, 3, 4];
734 let buffer2 = [5, 6, 7, 8];
735 let mut write1 = pin!(system.write(write_fd, &buffer1));
736 let mut write2 = pin!(system.write(write_fd, &buffer2));
737
738 let wake_flag1 = Arc::new(WakeFlag::new());
739 let wake_flag2 = Arc::new(WakeFlag::new());
740 let waker1 = Waker::from(wake_flag1.clone());
741 let waker2 = Waker::from(wake_flag2.clone());
742 let mut context1 = Context::from_waker(&waker1);
743 let mut context2 = Context::from_waker(&waker2);
744 assert_eq!(write1.as_mut().poll(&mut context1), Pending);
745 assert_eq!(write2.as_mut().poll(&mut context2), Pending);
746
747 let mut select = pin!(system.select());
748 let mut context3 = Context::from_waker(Waker::noop());
749 assert_eq!(select.as_mut().poll(&mut context3), Pending);
750 assert!(!wake_flag1.is_woken());
751 assert!(!wake_flag2.is_woken());
752
753 let mut read_buffer = [0; PIPE_SIZE];
755 system
756 .read(read_fd, &mut read_buffer)
757 .now_or_never()
758 .unwrap()
759 .unwrap();
760 assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
761 assert!(wake_flag1.is_woken());
762 assert!(wake_flag2.is_woken());
763 }
764
765 #[test]
766 fn select_wakes_only_write_tasks_with_ready_fd() {
767 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
768 let (read_fd1, write_fd1) = system.pipe().unwrap();
769 let (_read_fd2, write_fd2) = system.pipe().unwrap();
770 system
772 .write(write_fd1, &[0; PIPE_SIZE])
773 .now_or_never()
774 .unwrap()
775 .unwrap();
776 system
777 .write(write_fd2, &[0; PIPE_SIZE])
778 .now_or_never()
779 .unwrap()
780 .unwrap();
781
782 let buffer1 = [1, 2, 3, 4];
783 let buffer2 = [5, 6, 7, 8];
784 let mut write1 = pin!(system.write(write_fd1, &buffer1));
785 let mut write2 = pin!(system.write(write_fd2, &buffer2));
786
787 let wake_flag1 = Arc::new(WakeFlag::new());
788 let wake_flag2 = Arc::new(WakeFlag::new());
789 let waker1 = Waker::from(wake_flag1.clone());
790 let waker2 = Waker::from(wake_flag2.clone());
791 let mut context1 = Context::from_waker(&waker1);
792 let mut context2 = Context::from_waker(&waker2);
793 assert_eq!(write1.as_mut().poll(&mut context1), Pending);
794 assert_eq!(write2.as_mut().poll(&mut context2), Pending);
795
796 let mut select = pin!(system.select());
797 let mut context3 = Context::from_waker(Waker::noop());
798 assert_eq!(select.as_mut().poll(&mut context3), Pending);
799 assert!(!wake_flag1.is_woken());
800 assert!(!wake_flag2.is_woken());
801
802 let mut read_buffer = [0; PIPE_SIZE];
804 system
805 .read(read_fd1, &mut read_buffer)
806 .now_or_never()
807 .unwrap()
808 .unwrap();
809 assert_eq!(select.as_mut().poll(&mut context3), Ready(()));
810 assert!(wake_flag1.is_woken());
811 assert!(!wake_flag2.is_woken());
812 }
813
814 #[test]
815 fn write_preserves_fd_blocking_mode() {
816 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
817 let fd = system
818 .open(
819 c"/foo",
820 OfdAccess::WriteOnly,
821 OpenFlag::Create.into(),
822 Mode::empty(),
823 )
824 .now_or_never()
825 .unwrap()
826 .unwrap();
827
828 let buffer = [1, 2, 3, 4];
829 system.write(fd, &buffer).now_or_never().unwrap().unwrap();
830
831 assert_eq!(system.inner.get_and_set_nonblocking(fd, false), Ok(false));
834
835 system.inner.get_and_set_nonblocking(fd, true).ok();
836 system.write(fd, &buffer).now_or_never().unwrap().unwrap();
837 assert_eq!(system.inner.get_and_set_nonblocking(fd, true), Ok(true));
840 }
841
842 #[test]
843 fn sleep_completes_after_duration() {
844 let system = VirtualSystem::new();
845 let state = system.state.clone();
846 let now = Instant::now();
847 state.borrow_mut().now = Some(now);
848 let system = Concurrent::new(system);
849
850 let mut sleep = pin!(system.sleep(Duration::from_secs(1)));
851
852 let wake_flag = Arc::new(WakeFlag::new());
853 let waker = Waker::from(wake_flag.clone());
854 let mut context = Context::from_waker(&waker);
855 assert_eq!(sleep.as_mut().poll(&mut context), Pending);
856
857 let mut select = pin!(system.select());
858 assert_eq!(select.as_mut().poll(&mut context), Pending);
859 assert!(!wake_flag.is_woken());
860
861 state
863 .borrow_mut()
864 .advance_time(now + Duration::from_secs(1));
865 assert_eq!(select.as_mut().poll(&mut context), Ready(()));
866 assert!(wake_flag.is_woken());
867
868 let wake_flag = Arc::new(WakeFlag::new());
869 let waker = Waker::from(wake_flag.clone());
870 let mut context = Context::from_waker(&waker);
871 assert_eq!(sleep.poll(&mut context), Ready(()));
872 assert!(!wake_flag.is_woken());
873 }
874
875 #[test]
876 fn signal_wait_completes_on_signal() {
877 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
878 system
879 .set_disposition(SIGINT, Disposition::Catch)
880 .now_or_never()
881 .unwrap()
882 .unwrap();
883 system
884 .set_disposition(SIGCHLD, Disposition::Catch)
885 .now_or_never()
886 .unwrap()
887 .unwrap();
888 system
889 .set_disposition(SIGUSR2, Disposition::Catch)
890 .now_or_never()
891 .unwrap()
892 .unwrap();
893
894 let mut wait = pin!(system.wait_for_signals());
895
896 let wake_flag = Arc::new(WakeFlag::new());
897 let waker = Waker::from(wake_flag.clone());
898 let mut context = Context::from_waker(&waker);
899 assert_eq!(wait.as_mut().poll(&mut context), Pending);
900
901 let mut select = pin!(system.select());
902 let mut null_context = Context::from_waker(Waker::noop());
903 assert_eq!(select.as_mut().poll(&mut null_context), Pending);
904 assert!(!wake_flag.is_woken());
905
906 system.raise(SIGINT).now_or_never().unwrap().unwrap();
908 system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
909 assert_eq!(select.as_mut().poll(&mut null_context), Ready(()));
910 assert!(wake_flag.is_woken());
911
912 let wake_flag = Arc::new(WakeFlag::new());
913 let waker = Waker::from(wake_flag.clone());
914 let mut context = Context::from_waker(&waker);
915 assert_matches!(wait.poll(&mut context), Ready(signals) => {
916 assert_matches!(***signals, [SIGINT, SIGCHLD] | [SIGCHLD, SIGINT]);
917 });
918 }
919
920 #[test]
921 fn select_does_not_consume_caught_signals_until_tasks_are_waiting_for_signals() {
922 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
923 let (read_fd, write_fd) = system.pipe().unwrap();
924 system
925 .set_disposition(SIGCHLD, Disposition::Catch)
926 .now_or_never()
927 .unwrap()
928 .unwrap();
929 system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
930
931 let mut buffer = [0; 4];
932 let mut read = pin!(system.read(read_fd, &mut buffer));
933
934 let mut null_context = Context::from_waker(Waker::noop());
935 assert_eq!(read.as_mut().poll(&mut null_context), Pending);
936
937 system
938 .write(write_fd, b"foo")
939 .now_or_never()
940 .unwrap()
941 .unwrap();
942 system.select().now_or_never().unwrap();
943
944 let mut wait = pin!(system.wait_for_signals());
945 assert_eq!(wait.as_mut().poll(&mut null_context), Pending);
946
947 system.select().now_or_never().unwrap();
948 assert_matches!(wait.poll(&mut null_context), Ready(signals) => {
949 assert_eq!(**signals, &[SIGCHLD]);
950 });
951 }
952
953 #[test]
954 fn wait_for_signals_can_be_used_many_times() {
955 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
956 system
957 .set_disposition(SIGINT, Disposition::Catch)
958 .now_or_never()
959 .unwrap()
960 .unwrap();
961 system
962 .set_disposition(SIGCHLD, Disposition::Catch)
963 .now_or_never()
964 .unwrap()
965 .unwrap();
966
967 let mut wait1 = pin!(system.wait_for_signals());
968 let mut null_context = Context::from_waker(Waker::noop());
969 assert_eq!(wait1.as_mut().poll(&mut null_context), Pending);
970
971 system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
972 system.select().now_or_never().unwrap();
973
974 let mut wait2 = pin!(system.wait_for_signals());
975 assert_eq!(wait2.as_mut().poll(&mut null_context), Pending);
976
977 system.raise(SIGINT).now_or_never().unwrap().unwrap();
978 system.select().now_or_never().unwrap();
979
980 assert_matches!(wait1.poll(&mut null_context), Ready(signals) => {
981 assert_eq!(**signals, &[SIGCHLD]);
982 });
983 assert_matches!(wait2.poll(&mut null_context), Ready(signals) => {
984 assert_eq!(**signals, &[SIGINT]);
985 });
986 }
987
988 #[test]
989 fn select_completes_when_any_condition_is_ready() {
990 let system = VirtualSystem::new();
991 let state = system.state.clone();
992 let now = Instant::now();
993 state.borrow_mut().now = Some(now);
994 let system = Rc::new(Concurrent::new(system));
995 let (read_fd, write_fd) = system.pipe().unwrap();
996 let mut buffer = [0; 4];
997 system
998 .set_disposition(SIGINT, Disposition::Catch)
999 .now_or_never()
1000 .unwrap()
1001 .unwrap();
1002
1003 let mut sleep = pin!(system.sleep(Duration::from_secs(3)));
1004 let mut read = pin!(system.read(read_fd, &mut buffer));
1005 let mut wait = pin!(system.wait_for_signals());
1006
1007 let wake_sleep = Arc::new(WakeFlag::new());
1008 let wake_read = Arc::new(WakeFlag::new());
1009 let wake_wait = Arc::new(WakeFlag::new());
1010 let sleep_waker = Waker::from(wake_sleep.clone());
1011 let read_waker = Waker::from(wake_read.clone());
1012 let wait_waker = Waker::from(wake_wait.clone());
1013 let mut sleep_context = Context::from_waker(&sleep_waker);
1014 let mut read_context = Context::from_waker(&read_waker);
1015 let mut wait_context = Context::from_waker(&wait_waker);
1016 assert_eq!(sleep.as_mut().poll(&mut sleep_context), Pending);
1017 assert_eq!(read.as_mut().poll(&mut read_context), Pending);
1018 assert_eq!(wait.as_mut().poll(&mut wait_context), Pending);
1019
1020 let mut select = pin!(system.select());
1021
1022 let wake_select = Arc::new(WakeFlag::new());
1023 let select_waker = Waker::from(wake_select.clone());
1024 let mut select_context = Context::from_waker(&select_waker);
1025 assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1026 assert!(!wake_sleep.is_woken());
1027 assert!(!wake_read.is_woken());
1028 assert!(!wake_wait.is_woken());
1029 assert!(!wake_select.is_woken());
1030
1031 system
1032 .write(write_fd, b"foo")
1033 .now_or_never()
1034 .unwrap()
1035 .unwrap();
1036 assert!(wake_select.is_woken());
1037
1038 let wake_select = Arc::new(WakeFlag::new());
1039 let select_waker = Waker::from(wake_select.clone());
1040 let mut select_context = Context::from_waker(&select_waker);
1041 assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1042 assert!(!wake_sleep.is_woken());
1043 assert!(wake_read.is_woken());
1044 assert!(!wake_wait.is_woken());
1045 assert!(!wake_select.is_woken());
1046
1047 assert_eq!(read.now_or_never().unwrap(), Ok(3));
1048
1049 let mut select = pin!(system.select());
1050
1051 let wake_select = Arc::new(WakeFlag::new());
1052 let select_waker = Waker::from(wake_select.clone());
1053 let mut select_context = Context::from_waker(&select_waker);
1054 assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1055 assert!(!wake_sleep.is_woken());
1056 assert!(!wake_wait.is_woken());
1057 assert!(!wake_select.is_woken());
1058
1059 state
1060 .borrow_mut()
1061 .advance_time(now + Duration::from_secs(3));
1062 assert!(wake_select.is_woken());
1063
1064 let wake_select = Arc::new(WakeFlag::new());
1065 let select_waker = Waker::from(wake_select.clone());
1066 let mut select_context = Context::from_waker(&select_waker);
1067 assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1068 assert!(wake_sleep.is_woken());
1069 assert!(!wake_wait.is_woken());
1070 assert!(!wake_select.is_woken());
1071
1072 sleep.now_or_never().unwrap();
1073
1074 let mut select = pin!(system.select());
1075
1076 let wake_select = Arc::new(WakeFlag::new());
1077 let select_waker = Waker::from(wake_select.clone());
1078 let mut select_context = Context::from_waker(&select_waker);
1079 assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1080 assert!(!wake_wait.is_woken());
1081 assert!(!wake_select.is_woken());
1082
1083 system.raise(SIGINT).now_or_never().unwrap().unwrap();
1084 assert!(wake_select.is_woken());
1085
1086 let wake_select = Arc::new(WakeFlag::new());
1087 let select_waker = Waker::from(wake_select.clone());
1088 let mut select_context = Context::from_waker(&select_waker);
1089 assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1090 assert!(wake_wait.is_woken());
1091 assert!(!wake_select.is_woken());
1092
1093 assert_eq!(**wait.now_or_never().unwrap(), &[SIGINT]);
1094 }
1095
1096 #[test]
1097 fn select_wakes_all_reads_and_writes_on_ebadf() {
1098 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
1099 let (read_fd1, _write_fd1) = system.pipe().unwrap();
1100 let (_read_fd2, write_fd2) = system.pipe().unwrap();
1101 system
1103 .write(write_fd2, &[0; PIPE_SIZE])
1104 .now_or_never()
1105 .unwrap()
1106 .unwrap();
1107
1108 let mut read_buffer = [0; 4];
1109 let mut read = pin!(system.read(read_fd1, &mut read_buffer));
1110 let mut write = pin!(system.write(write_fd2, &[1, 2, 3, 4]));
1111
1112 let wake_flag1 = Arc::new(WakeFlag::new());
1113 let wake_flag2 = Arc::new(WakeFlag::new());
1114 let waker1 = Waker::from(wake_flag1.clone());
1115 let waker2 = Waker::from(wake_flag2.clone());
1116 let mut context1 = Context::from_waker(&waker1);
1117 let mut context2 = Context::from_waker(&waker2);
1118 assert_eq!(read.as_mut().poll(&mut context1), Pending);
1119 assert_eq!(write.as_mut().poll(&mut context2), Pending);
1120
1121 let mut select = pin!(system.select());
1122
1123 let wake_select = Arc::new(WakeFlag::new());
1124 let select_waker = Waker::from(wake_select.clone());
1125 let mut select_context = Context::from_waker(&select_waker);
1126 assert_eq!(select.as_mut().poll(&mut select_context), Pending);
1127 assert!(!wake_flag1.is_woken());
1128 assert!(!wake_flag2.is_woken());
1129 assert!(!wake_select.is_woken());
1130
1131 system.close(read_fd1).unwrap();
1133 assert!(wake_select.is_woken());
1134
1135 let wake_select = Arc::new(WakeFlag::new());
1136 let select_waker = Waker::from(wake_select.clone());
1137 let mut select_context = Context::from_waker(&select_waker);
1138 assert_eq!(select.as_mut().poll(&mut select_context), Ready(()));
1139 assert!(wake_flag1.is_woken());
1140 assert!(wake_flag2.is_woken());
1141 assert!(!wake_select.is_woken());
1142 }
1143
1144 #[test]
1145 fn select_does_not_wake_reads_or_writes_on_eintr() {
1146 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
1148 let (read_fd1, _write_fd1) = system.pipe().unwrap();
1149 let (_read_fd2, write_fd2) = system.pipe().unwrap();
1150 system
1152 .write(write_fd2, &[0; PIPE_SIZE])
1153 .now_or_never()
1154 .unwrap()
1155 .unwrap();
1156 system
1157 .set_disposition(SIGUSR2, Disposition::Catch)
1158 .now_or_never()
1159 .unwrap()
1160 .unwrap();
1161
1162 let mut read_buffer = [0; 4];
1163 let mut read = pin!(system.read(read_fd1, &mut read_buffer));
1164 let mut write = pin!(system.write(write_fd2, &[1]));
1165 let mut wait = pin!(system.wait_for_signals());
1166
1167 let wake_flag1 = Arc::new(WakeFlag::new());
1168 let wake_flag2 = Arc::new(WakeFlag::new());
1169 let wake_flag3 = Arc::new(WakeFlag::new());
1170 let waker1 = Waker::from(wake_flag1.clone());
1171 let waker2 = Waker::from(wake_flag2.clone());
1172 let waker3 = Waker::from(wake_flag3.clone());
1173 let mut context1 = Context::from_waker(&waker1);
1174 let mut context2 = Context::from_waker(&waker2);
1175 let mut context3 = Context::from_waker(&waker3);
1176 assert_eq!(read.as_mut().poll(&mut context1), Pending);
1177 assert_eq!(write.as_mut().poll(&mut context2), Pending);
1178 assert_eq!(wait.as_mut().poll(&mut context3), Pending);
1179 assert!(!wake_flag1.is_woken());
1180 assert!(!wake_flag2.is_woken());
1181 assert!(!wake_flag3.is_woken());
1182
1183 system.raise(SIGUSR2).now_or_never().unwrap().unwrap();
1184
1185 let mut select_fut = pin!(system.select());
1186 let mut context4 = Context::from_waker(Waker::noop());
1187 assert_eq!(select_fut.as_mut().poll(&mut context4), Ready(()));
1188 assert!(!wake_flag1.is_woken());
1189 assert!(!wake_flag2.is_woken());
1190 }
1191
1192 #[test]
1193 fn signal_wait_is_made_ready_by_peek_after_caught() {
1194 let system = Rc::new(Concurrent::new(VirtualSystem::new()));
1195 system
1196 .set_disposition(SIGINT, Disposition::Catch)
1197 .now_or_never()
1198 .unwrap()
1199 .unwrap();
1200 system
1201 .set_disposition(SIGCHLD, Disposition::Catch)
1202 .now_or_never()
1203 .unwrap()
1204 .unwrap();
1205 system
1206 .set_disposition(SIGUSR2, Disposition::Catch)
1207 .now_or_never()
1208 .unwrap()
1209 .unwrap();
1210
1211 let mut wait = pin!(system.wait_for_signals());
1212
1213 let wake_flag = Arc::new(WakeFlag::new());
1214 let waker = Waker::from(wake_flag.clone());
1215 let mut context = Context::from_waker(&waker);
1216 assert_eq!(wait.as_mut().poll(&mut context), Pending);
1217
1218 system.peek();
1219 assert!(!wake_flag.is_woken());
1220
1221 system.raise(SIGINT).now_or_never().unwrap().unwrap();
1223 system.raise(SIGCHLD).now_or_never().unwrap().unwrap();
1224 system.peek();
1225 assert!(wake_flag.is_woken());
1226
1227 let wake_flag = Arc::new(WakeFlag::new());
1228 let waker = Waker::from(wake_flag.clone());
1229 let mut context = Context::from_waker(&waker);
1230 assert_matches!(wait.poll(&mut context), Ready(signals) => {
1231 assert_matches!(***signals, [SIGINT, SIGCHLD] | [SIGCHLD, SIGINT]);
1232 });
1233 }
1234}