pub struct Concurrent<S: Sigmask> { /* private fields */ }Expand description
Decorator for systems that makes blocking I/O operations concurrency-friendly
This struct is used as a wrapper for systems for enabling concurrent
execution of multiple possibly blocking I/O tasks on a single thread. The
inner system is expected to implement the Read, Write, and
super::Select traits with synchronous (blocking) behavior. This struct leaves
Futures returned by I/O methods pending until the I/O operation is ready
to avoid blocking the entire process. This allows you to start multiple I/O
tasks and wait for them to complete concurrently on a single thread. This
struct also provides methods for waiting for signals and waiting for a
specified duration, which are represented as Futures as well. The
select method of this struct consolidates blocking
behavior into a single system call so that the process can resume execution
as soon as any of the specified events occurs.
For system calls that do not block, such as Pipe, the wrapper directly
forwards the call to the inner system without any modification.
This struct is designed to be used in an Rc to allow multiple tasks to
share the same concurrent system. Some traits, such as Read and
Write, are implemented for Rc<Concurrent<S>> instead of
Concurrent<S> to allow the methods to return futures that capture a clone
of the Rc and keep it alive until the operation is finished. This is
necessary because the futures need to access the internal state of the
Concurrent system without capturing a reference to the original
Concurrent struct, which may not live long enough.
The following example illustrates how multiple concurrent tasks are run in a single-threaded pool:
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let system2 = system.clone();
let system3 = system.clone();
let (reader, writer) = system.pipe().unwrap();
let mut executor = futures_executor::LocalPool::new();
// We add a task that tries to read from the pipe, but nothing has been
// written to it, so the task is stalled.
let read_task = executor.spawner().spawn_local_with_handle(async move {
let mut buffer = [0; 1];
system.read(reader, &mut buffer).await.unwrap();
buffer[0]
});
executor.run_until_stalled();
// Let's add a task that writes to the pipe.
executor.spawner().spawn_local(async move {
system2.write_all(writer, &[123]).await.unwrap();
});
executor.run_until_stalled();
// The write task has written a byte to the pipe, but the read task is still
// stalled. We need to wake it up by calling `select` or `peek`.
system3.peek();
// Now the read task can proceed to the end.
let number = executor.run_until(read_task.unwrap());
assert_eq!(number, 123);Implementations§
Source§impl<S> Concurrent<S>
impl<S> Concurrent<S>
Sourcepub fn new_child_process(&self) -> Result<ChildProcessStarter<S>>
👎Deprecated since 0.14.0: use the Fork::run_in_child_process method instead
pub fn new_child_process(&self) -> Result<ChildProcessStarter<S>>
use the Fork::run_in_child_process method instead
Creates a new child process.
Returns the ChildProcessStarter<S> returned by the inner system’s
Fork::new_child_process method. This method is an inherent method of
Concurrent<S> instead of an implementation of the Fork trait because
the return type does not match with that of the inner system S.
Source§impl Concurrent<RealSystem>
impl Concurrent<RealSystem>
Sourcepub fn run_real<F, T>(&self, task: F) -> Twhere
F: Future<Output = T>,
pub fn run_real<F, T>(&self, task: F) -> Twhere
F: Future<Output = T>,
Runs the given task with concurrency support.
This function implements the main loop of the shell process. It runs the
given task while also calling select to handle signals
and other events. The task is expected to perform I/O operations using
the methods of this Concurrent instance, so that it can yield when the
operations would block. The function returns the output of the task when
it completes.
This method supports concurrency only inside the task. Other tasks
created outside the task will not be run concurrently.
This method blocks the current thread until the task completes, so it
should only be called in the main function of the shell process.
See the run_virtual method for the
VirtualSystem counterpart.
Source§impl Concurrent<VirtualSystem>
impl Concurrent<VirtualSystem>
Sourcepub async fn run_virtual<F>(&self, task: F)
pub async fn run_virtual<F>(&self, task: F)
Runs the given task with concurrency support.
This function implements the main loop of the shell process. It runs the
given task while also calling select to handle signals
and other events. The task is expected to perform I/O operations using
the methods of this Concurrent instance, so that it can yield when the
operations would block. The function returns when the task completes or
the process is terminated.
This is the VirtualSystem counterpart for the
run_real method. To allow VirtualSystem to run
multiple tasks concurrently, this method is asynchronous and returns a
future that completes when the task finishes or the process is
terminated.
Trait Implementations§
Source§impl<S> Chdir for Concurrent<S>
impl<S> Chdir for Concurrent<S>
Source§impl<S> Clock for Concurrent<S>
impl<S> Clock for Concurrent<S>
Source§impl<S: Clone + Sigmask> Clone for Concurrent<S>
impl<S: Clone + Sigmask> Clone for Concurrent<S>
Source§fn clone(&self) -> Concurrent<S>
fn clone(&self) -> Concurrent<S>
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl<S> Close for Concurrent<S>
impl<S> Close for Concurrent<S>
Source§impl<S: Default + Sigmask> Default for Concurrent<S>
impl<S: Default + Sigmask> Default for Concurrent<S>
Source§fn default() -> Concurrent<S>
fn default() -> Concurrent<S>
Source§impl<S> Dup for Concurrent<S>
impl<S> Dup for Concurrent<S>
Source§impl<S> Exec for Concurrent<S>
impl<S> Exec for Concurrent<S>
Source§impl<S> Exit for Concurrent<S>
impl<S> Exit for Concurrent<S>
Source§fn exit(
&self,
exit_status: ExitStatus,
) -> impl Future<Output = Infallible> + use<S>
fn exit( &self, exit_status: ExitStatus, ) -> impl Future<Output = Infallible> + use<S>
Source§impl<S> Fcntl for Concurrent<S>
impl<S> Fcntl for Concurrent<S>
Source§fn ofd_access(&self, fd: Fd) -> Result<OfdAccess>
fn ofd_access(&self, fd: Fd) -> Result<OfdAccess>
Source§fn get_and_set_nonblocking(&self, fd: Fd, nonblocking: bool) -> Result<bool>
fn get_and_set_nonblocking(&self, fd: Fd, nonblocking: bool) -> Result<bool>
Source§impl<S> Fstat for Concurrent<S>
impl<S> Fstat for Concurrent<S>
Source§fn fstatat(
&self,
dir_fd: Fd,
path: &CStr,
follow_symlinks: bool,
) -> Result<Self::Stat>
fn fstatat( &self, dir_fd: Fd, path: &CStr, follow_symlinks: bool, ) -> Result<Self::Stat>
Source§fn is_directory(&self, path: &CStr) -> bool
fn is_directory(&self, path: &CStr) -> bool
Source§fn fd_is_pipe(&self, fd: Fd) -> bool
fn fd_is_pipe(&self, fd: Fd) -> bool
Source§impl<S> GetCwd for Concurrent<S>
impl<S> GetCwd for Concurrent<S>
Source§impl<S> GetPid for Concurrent<S>
impl<S> GetPid for Concurrent<S>
Source§impl<S> GetPw for Concurrent<S>
impl<S> GetPw for Concurrent<S>
Source§impl<S> GetRlimit for Concurrent<S>
impl<S> GetRlimit for Concurrent<S>
Source§impl<S> GetSigaction for Concurrent<S>where
S: GetSigaction + Sigmask,
impl<S> GetSigaction for Concurrent<S>where
S: GetSigaction + Sigmask,
Source§fn get_sigaction(&self, signal: Number) -> Result<Disposition>
fn get_sigaction(&self, signal: Number) -> Result<Disposition>
Source§impl<S> GetUid for Concurrent<S>
impl<S> GetUid for Concurrent<S>
Source§impl<S> IsExecutableFile for Concurrent<S>where
S: IsExecutableFile + Sigmask,
impl<S> IsExecutableFile for Concurrent<S>where
S: IsExecutableFile + Sigmask,
Source§fn is_executable_file(&self, path: &CStr) -> bool
fn is_executable_file(&self, path: &CStr) -> bool
Source§impl<S> Isatty for Concurrent<S>
impl<S> Isatty for Concurrent<S>
Source§impl<S> Open for Concurrent<S>
This implementation does not (yet) support non-blocking open operations.
impl<S> Open for Concurrent<S>
This implementation does not (yet) support non-blocking open operations.
Source§fn open(
&self,
path: &CStr,
access: OfdAccess,
flags: EnumSet<OpenFlag>,
mode: Mode,
) -> impl Future<Output = Result<Fd>> + use<S>
fn open( &self, path: &CStr, access: OfdAccess, flags: EnumSet<OpenFlag>, mode: Mode, ) -> impl Future<Output = Result<Fd>> + use<S>
Source§fn open_tmpfile(&self, parent_dir: &Path) -> Result<Fd>
fn open_tmpfile(&self, parent_dir: &Path) -> Result<Fd>
Source§impl<S> Pipe for Concurrent<S>
impl<S> Pipe for Concurrent<S>
Source§impl<S> ReadAll for Concurrent<S>
impl<S> ReadAll for Concurrent<S>
Source§impl<S> Seek for Concurrent<S>
impl<S> Seek for Concurrent<S>
Source§impl<S> Select for Concurrent<S>
impl<S> Select for Concurrent<S>
Source§impl<S> SendSignal for Concurrent<S>where
S: SendSignal + Sigmask,
impl<S> SendSignal for Concurrent<S>where
S: SendSignal + Sigmask,
Source§impl<S> SetPgid for Concurrent<S>
impl<S> SetPgid for Concurrent<S>
Source§impl<S> SetRlimit for Concurrent<S>
impl<S> SetRlimit for Concurrent<S>
Source§impl<S> ShellPath for Concurrent<S>
impl<S> ShellPath for Concurrent<S>
Source§fn shell_path(&self) -> CString
fn shell_path(&self) -> CString
Source§impl<S> Sigaction for Concurrent<S>
Exposes the inner system’s sigaction method.
impl<S> Sigaction for Concurrent<S>
Exposes the inner system’s sigaction method.
This implementation of Sigaction simply delegates to the inner system’s
sigaction method, which bypasses the internal state of Concurrent and
may prevent the peek and
select methods from responding to received
signals without race conditions. To ensure that signal dispositions are
configured in a way that allows Concurrent to respond to signals
correctly, direct calls to sigaction should be avoided, and, if necessary,
only used to temporarily change the signal disposition for specific
operations while ensuring that the original disposition is restored
afterward before a next call to peek, select, or set_disposition.
The standard way to set a signal disposition to Concurrent is to use the
set_disposition method provided by the SignalSystem trait, which
ensures that the signal disposition and the signal mask are updated
consistently.
Source§fn sigaction(
&self,
signal: Number,
disposition: Disposition,
) -> Result<Disposition>
fn sigaction( &self, signal: Number, disposition: Disposition, ) -> Result<Disposition>
Source§impl<S> Sigmask for Concurrent<S>where
S: Sigmask,
Exposes the inner system’s sigmask method.
impl<S> Sigmask for Concurrent<S>where
S: Sigmask,
Exposes the inner system’s sigmask method.
This implementation of Sigmask simply delegates to the inner system’s
sigmask method, which bypasses the internal state of Concurrent and may
prevent the peek and
select methods from responding to received
signals without race conditions. To ensure that the signal mask is
configured in a way that allows Concurrent to respond to signals
correctly, direct calls to sigmask should be avoided, and, if necessary,
only used to temporarily change the signal mask for specific operations
while ensuring that the original mask is restored afterward before a next
call to peek, select, or set_disposition.
Source§impl<S> Signals for Concurrent<S>where
S: Sigmask,
impl<S> Signals for Concurrent<S>where
S: Sigmask,
Source§const SIGCLD: Option<Number> = S::SIGCLD
const SIGCLD: Option<Number> = S::SIGCLD
SIGCLD, if available on the systemSource§const SIGEMT: Option<Number> = S::SIGEMT
const SIGEMT: Option<Number> = S::SIGEMT
SIGEMT, if available on the systemSource§const SIGINFO: Option<Number> = S::SIGINFO
const SIGINFO: Option<Number> = S::SIGINFO
SIGINFO, if available on the systemSource§const SIGIO: Option<Number> = S::SIGIO
const SIGIO: Option<Number> = S::SIGIO
SIGIO, if available on the systemSource§const SIGLOST: Option<Number> = S::SIGLOST
const SIGLOST: Option<Number> = S::SIGLOST
SIGLOST, if available on the systemSource§const SIGPOLL: Option<Number> = S::SIGPOLL
const SIGPOLL: Option<Number> = S::SIGPOLL
SIGPOLL, if available on the systemSource§const SIGPWR: Option<Number> = S::SIGPWR
const SIGPWR: Option<Number> = S::SIGPWR
SIGPWR, if available on the systemSource§const SIGSTKFLT: Option<Number> = S::SIGSTKFLT
const SIGSTKFLT: Option<Number> = S::SIGSTKFLT
SIGSTKFLT, if available on the systemSource§const SIGTHR: Option<Number> = S::SIGTHR
const SIGTHR: Option<Number> = S::SIGTHR
SIGTHR, if available on the systemSource§const NAMED_SIGNALS: &'static [(&'static str, Option<Number>)] = S::NAMED_SIGNALS
const NAMED_SIGNALS: &'static [(&'static str, Option<Number>)] = S::NAMED_SIGNALS
Source§fn sigrt_range(&self) -> Option<RangeInclusive<Number>>
fn sigrt_range(&self) -> Option<RangeInclusive<Number>>
Source§fn iter_sigrt(&self) -> impl DoubleEndedIterator<Item = Number> + use<S>
fn iter_sigrt(&self) -> impl DoubleEndedIterator<Item = Number> + use<S>
Source§fn to_signal_number<N: Into<RawNumber>>(&self, number: N) -> Option<Number>
fn to_signal_number<N: Into<RawNumber>>(&self, number: N) -> Option<Number>
Source§fn sig2str<N: Into<RawNumber>>(&self, signal: N) -> Option<Cow<'static, str>>
fn sig2str<N: Into<RawNumber>>(&self, signal: N) -> Option<Cow<'static, str>>
Source§fn str2sig(&self, name: &str) -> Option<Number>
fn str2sig(&self, name: &str) -> Option<Number>
Source§fn validate_signal(&self, number: RawNumber) -> Option<(Name, Number)>
fn validate_signal(&self, number: RawNumber) -> Option<(Name, Number)>
Source§impl<S> Sleep for Concurrent<S>
impl<S> Sleep for Concurrent<S>
Source§impl<S> Sysconf for Concurrent<S>
impl<S> Sysconf for Concurrent<S>
Source§fn confstr_path(&self) -> Result<UnixString>
fn confstr_path(&self) -> Result<UnixString>
$PATH value where all standard utilities are
expected to be found. Read moreSource§impl<S> TcGetPgrp for Concurrent<S>
impl<S> TcGetPgrp for Concurrent<S>
Source§impl<S> TcSetPgrp for Concurrent<S>
impl<S> TcSetPgrp for Concurrent<S>
Source§impl<S> Times for Concurrent<S>
impl<S> Times for Concurrent<S>
Source§impl<S> Umask for Concurrent<S>
impl<S> Umask for Concurrent<S>
Source§impl<S> Wait for Concurrent<S>
impl<S> Wait for Concurrent<S>
Source§impl<S: Sigmask> WaitForSignals for Concurrent<S>
impl<S: Sigmask> WaitForSignals for Concurrent<S>
Source§async fn wait_for_signals(&self) -> Rc<SignalList>
async fn wait_for_signals(&self) -> Rc<SignalList>
Source§impl<S> WriteAll for Concurrent<S>
impl<S> WriteAll for Concurrent<S>
Auto Trait Implementations§
impl<S> !Freeze for Concurrent<S>
impl<S> !RefUnwindSafe for Concurrent<S>
impl<S> !Send for Concurrent<S>
impl<S> !Sync for Concurrent<S>
impl<S> Unpin for Concurrent<S>
impl<S> UnsafeUnpin for Concurrent<S>
impl<S> !UnwindSafe for Concurrent<S>
Blanket Implementations§
Source§impl<S> BlockSignals for S
impl<S> BlockSignals for S
type SavedMask = <S as Sigmask>::Sigset
Source§async fn block_sigint_sigquit(
&self,
) -> Result<<S as BlockSignals>::SavedMask, Errno>
async fn block_sigint_sigquit( &self, ) -> Result<<S as BlockSignals>::SavedMask, Errno>
Source§async fn restore_sigmask(
&self,
mask: <S as BlockSignals>::SavedMask,
) -> Result<(), Errno>
async fn restore_sigmask( &self, mask: <S as BlockSignals>::SavedMask, ) -> Result<(), Errno>
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more