pub struct Concurrent<S> { /* private fields */ }Expand description
Decorator for systems that makes blocking I/O operations concurrency-friendly
This struct is used as a wrapper for systems for enabling concurrent
execution of multiple possibly blocking I/O tasks on a single thread. The
inner system is expected to implement the Read, Write, and
Select traits with synchronous (blocking) behavior. This struct leaves
Futures returned by I/O methods pending until the I/O operation is ready
to avoid blocking the entire process. This allows you to start multiple I/O
tasks and wait for them to complete concurrently on a single thread. This
struct also provides methods for waiting for signals and waiting for a
specified duration, which are represented as Futures as well. The
select method of this struct consolidates blocking
behavior into a single system call so that the process can resume execution
as soon as any of the specified events occurs.
For system calls that do not block, such as Pipe, the wrapper directly
forwards the call to the inner system without any modification.
This struct is designed to be used in an Rc to allow multiple tasks to
share the same concurrent system. Some traits, such as Read and
Write, are implemented for Rc<Concurrent<S>> instead of
Concurrent<S> to allow the methods to return futures that capture a clone
of the Rc and keep it alive until the operation is finished. This is
necessary because the futures need to access the internal state of the
Concurrent system without capturing a reference to the original
Concurrent struct, which may not live long enough.
The following example illustrates how multiple concurrent tasks are run in a single-threaded pool:
let system = Rc::new(Concurrent::new(VirtualSystem::new()));
let system2 = system.clone();
let system3 = system.clone();
let (reader, writer) = system.pipe().unwrap();
let mut executor = futures_executor::LocalPool::new();
// We add a task that tries to read from the pipe, but nothing has been
// written to it, so the task is stalled.
let read_task = executor.spawner().spawn_local_with_handle(async move {
let mut buffer = [0; 1];
system.read(reader, &mut buffer).await.unwrap();
buffer[0]
});
executor.run_until_stalled();
// Let's add a task that writes to the pipe.
executor.spawner().spawn_local(async move {
system2.write_all(writer, &[123]).await.unwrap();
});
executor.run_until_stalled();
// The write task has written a byte to the pipe, but the read task is still
// stalled. We need to wake it up by calling `select` or `peek`.
system3.peek();
// Now the read task can proceed to the end.
let number = executor.run_until(read_task.unwrap());
assert_eq!(number, 123);Implementations§
Source§impl<S> Concurrent<S>where
S: Fork,
impl<S> Concurrent<S>where
S: Fork,
Sourcepub fn new_child_process(&self) -> Result<ChildProcessStarter<S>>
pub fn new_child_process(&self) -> Result<ChildProcessStarter<S>>
Creates a new child process.
Returns the ChildProcessStarter<S> returned by the inner system’s
Fork::new_child_process method. This method is an inherent method of
Concurrent<S> instead of an implementation of the Fork trait because
the return type does not match with that of the inner system S.
Source§impl Concurrent<RealSystem>
impl Concurrent<RealSystem>
Sourcepub fn run_real<F, T>(&self, task: F) -> Twhere
F: Future<Output = T>,
pub fn run_real<F, T>(&self, task: F) -> Twhere
F: Future<Output = T>,
Runs the given task with concurrency support.
This function implements the main loop of the shell process. It runs the
given task while also calling select to handle signals
and other events. The task is expected to perform I/O operations using
the methods of this Concurrent instance, so that it can yield when the
operations would block. The function returns the output of the task when
it completes.
This method supports concurrency only inside the task. Other tasks
created outside the task will not be run concurrently.
This method blocks the current thread until the task completes, so it
should only be called in the main function of the shell process.
See the run_virtual method for the
VirtualSystem counterpart.
Source§impl Concurrent<VirtualSystem>
impl Concurrent<VirtualSystem>
Sourcepub async fn run_virtual<F>(&self, task: F)
pub async fn run_virtual<F>(&self, task: F)
Runs the given task with concurrency support.
This function implements the main loop of the shell process. It runs the
given task while also calling select to handle signals
and other events. The task is expected to perform I/O operations using
the methods of this Concurrent instance, so that it can yield when the
operations would block. The function returns the output of the task when
it completes.
This is the VirtualSystem counterpart for the
run_real method. To allow VirtualSystem to run
multiple tasks concurrently, this method is asynchronous and returns a
future that completes when the task finishes or the process is
terminated.
Source§impl<S> Concurrent<S>
impl<S> Concurrent<S>
Sourcepub async fn read_all_to(
&self,
fd: Fd,
buffer: &mut Vec<u8>,
) -> Result<(), Errno>
pub async fn read_all_to( &self, fd: Fd, buffer: &mut Vec<u8>, ) -> Result<(), Errno>
Reads from the file descriptor until EOF is reached, appending the data to the provided buffer.
In case of an error, the buffer will contain all data read up to the point of failure.
Use read_all if you don’t have an existing buffer to
append to.
Source§impl<S> Concurrent<S>
impl<S> Concurrent<S>
Sourcepub async fn write_all(&self, fd: Fd, data: &[u8]) -> Result<(), Errno>
pub async fn write_all(&self, fd: Fd, data: &[u8]) -> Result<(), Errno>
Writes all data from the provided buffer to the file descriptor.
This method ensures that all data is written, even if multiple write operations are required due to partial writes.
If the data is empty, this method will return immediately without performing write operations.
Sourcepub async fn print_error<T: AsRef<[u8]>>(&self, message: T)
pub async fn print_error<T: AsRef<[u8]>>(&self, message: T)
Writes the given message to standard error.
This is a convenience method that calls write_all
with Fd::STDERR.
Source§impl<S> Concurrent<S>
impl<S> Concurrent<S>
Source§impl<S> Concurrent<S>where
S: Clock,
impl<S> Concurrent<S>where
S: Clock,
Sourcepub async fn sleep_until(&self, deadline: Instant)
pub async fn sleep_until(&self, deadline: Instant)
Waits until the specified deadline.
The returned future will be pending until the specified deadline is reached, at which point it will complete.
Source§impl<S> Concurrent<S>
impl<S> Concurrent<S>
Sourcepub async fn wait_for_signals(&self) -> Rc<SignalList>
pub async fn wait_for_signals(&self) -> Rc<SignalList>
Waits for signals to be caught.
The returned future will be pending until any signal is caught, at which point it will complete with a list of caught signals. The list is shared among all tasks waiting for signals, so that they can see the same list of caught signals when they are woken up.
Before calling this method, the caller needs to set_disposition for
the signals it wants to catch.
If this Concurrent system is used in an Env, you should call
Env::wait_for_signals instead of this
method, so that the trap set can handle the signals properly.
Source§impl<S> Concurrent<S>
impl<S> Concurrent<S>
Sourcepub fn peek(&self)
pub fn peek(&self)
Peeks for any ready events without blocking.
This method performs a select system call with the file descriptors
and timeout of pending tasks, and wakes the tasks whose events are
ready. This method is similar to select, but it
does not block and returns immediately.
Sourcepub async fn select(&self)
pub async fn select(&self)
Waits for any of pending tasks to become ready.
This method performs a select system call with the file descriptors
and timeout of pending tasks, and wakes the tasks whose events are
ready. This method should be called in the main loop of the process to
ensure that tasks can make progress. In a typical use case, the main
loop would look like this:
loop {
// Run ready tasks until they yield again
run_ready_tasks();
// Wait for any pending task to become ready
concurrent.select().await;
}The run_real and run_virtual
methods provide a convenient way to implement such a main loop.
The future returned by this method will be pending if and only if the
future returned by the inner system’s select method
is pending.
Trait Implementations§
Source§impl<S> Chdir for Concurrent<S>where
S: Chdir,
impl<S> Chdir for Concurrent<S>where
S: Chdir,
Source§impl<S> Clock for Concurrent<S>where
S: Clock,
impl<S> Clock for Concurrent<S>where
S: Clock,
Source§impl<S: Clone> Clone for Concurrent<S>
impl<S: Clone> Clone for Concurrent<S>
Source§fn clone(&self) -> Concurrent<S>
fn clone(&self) -> Concurrent<S>
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl<S> Close for Concurrent<S>where
S: Close,
impl<S> Close for Concurrent<S>where
S: Close,
Source§impl<S: Debug> Debug for Concurrent<S>
impl<S: Debug> Debug for Concurrent<S>
Source§impl<S: Default> Default for Concurrent<S>
impl<S: Default> Default for Concurrent<S>
Source§fn default() -> Concurrent<S>
fn default() -> Concurrent<S>
Source§impl<S> Dup for Concurrent<S>where
S: Dup,
impl<S> Dup for Concurrent<S>where
S: Dup,
Source§impl<S> Exec for Concurrent<S>where
S: Exec,
impl<S> Exec for Concurrent<S>where
S: Exec,
Source§impl<S> Exit for Concurrent<S>where
S: Exit,
impl<S> Exit for Concurrent<S>where
S: Exit,
Source§fn exit(
&self,
exit_status: ExitStatus,
) -> impl Future<Output = Infallible> + use<S>
fn exit( &self, exit_status: ExitStatus, ) -> impl Future<Output = Infallible> + use<S>
Source§impl<S> Fcntl for Concurrent<S>where
S: Fcntl,
impl<S> Fcntl for Concurrent<S>where
S: Fcntl,
Source§fn ofd_access(&self, fd: Fd) -> Result<OfdAccess>
fn ofd_access(&self, fd: Fd) -> Result<OfdAccess>
Source§fn get_and_set_nonblocking(&self, fd: Fd, nonblocking: bool) -> Result<bool>
fn get_and_set_nonblocking(&self, fd: Fd, nonblocking: bool) -> Result<bool>
Source§impl<S> Fstat for Concurrent<S>where
S: Fstat,
impl<S> Fstat for Concurrent<S>where
S: Fstat,
Source§fn fstatat(
&self,
dir_fd: Fd,
path: &CStr,
follow_symlinks: bool,
) -> Result<Self::Stat>
fn fstatat( &self, dir_fd: Fd, path: &CStr, follow_symlinks: bool, ) -> Result<Self::Stat>
Source§fn is_directory(&self, path: &CStr) -> bool
fn is_directory(&self, path: &CStr) -> bool
Source§fn fd_is_pipe(&self, fd: Fd) -> bool
fn fd_is_pipe(&self, fd: Fd) -> bool
Source§impl<S> GetCwd for Concurrent<S>where
S: GetCwd,
impl<S> GetCwd for Concurrent<S>where
S: GetCwd,
Source§impl<S> GetPid for Concurrent<S>where
S: GetPid,
impl<S> GetPid for Concurrent<S>where
S: GetPid,
Source§impl<S> GetPw for Concurrent<S>where
S: GetPw,
impl<S> GetPw for Concurrent<S>where
S: GetPw,
Source§impl<S> GetRlimit for Concurrent<S>where
S: GetRlimit,
impl<S> GetRlimit for Concurrent<S>where
S: GetRlimit,
Source§impl<S> GetSigaction for Concurrent<S>where
S: GetSigaction,
impl<S> GetSigaction for Concurrent<S>where
S: GetSigaction,
Source§fn get_sigaction(&self, signal: Number) -> Result<Disposition>
fn get_sigaction(&self, signal: Number) -> Result<Disposition>
Source§impl<S> GetUid for Concurrent<S>where
S: GetUid,
impl<S> GetUid for Concurrent<S>where
S: GetUid,
Source§impl<S> IsExecutableFile for Concurrent<S>where
S: IsExecutableFile,
impl<S> IsExecutableFile for Concurrent<S>where
S: IsExecutableFile,
Source§fn is_executable_file(&self, path: &CStr) -> bool
fn is_executable_file(&self, path: &CStr) -> bool
Source§impl<S> Isatty for Concurrent<S>where
S: Isatty,
impl<S> Isatty for Concurrent<S>where
S: Isatty,
Source§impl<S> Open for Concurrent<S>where
S: Open,
This implementation does not (yet) support non-blocking open operations.
impl<S> Open for Concurrent<S>where
S: Open,
This implementation does not (yet) support non-blocking open operations.
Source§fn open(
&self,
path: &CStr,
access: OfdAccess,
flags: EnumSet<OpenFlag>,
mode: Mode,
) -> impl Future<Output = Result<Fd>> + use<S>
fn open( &self, path: &CStr, access: OfdAccess, flags: EnumSet<OpenFlag>, mode: Mode, ) -> impl Future<Output = Result<Fd>> + use<S>
Source§fn open_tmpfile(&self, parent_dir: &Path) -> Result<Fd>
fn open_tmpfile(&self, parent_dir: &Path) -> Result<Fd>
Source§impl<S> Pipe for Concurrent<S>where
S: Pipe,
impl<S> Pipe for Concurrent<S>where
S: Pipe,
Source§impl<S> Seek for Concurrent<S>where
S: Seek,
impl<S> Seek for Concurrent<S>where
S: Seek,
Source§impl<S> SendSignal for Concurrent<S>where
S: SendSignal,
impl<S> SendSignal for Concurrent<S>where
S: SendSignal,
Source§impl<S> SetPgid for Concurrent<S>where
S: SetPgid,
impl<S> SetPgid for Concurrent<S>where
S: SetPgid,
Source§impl<S> SetRlimit for Concurrent<S>where
S: SetRlimit,
impl<S> SetRlimit for Concurrent<S>where
S: SetRlimit,
Source§impl<S> ShellPath for Concurrent<S>where
S: ShellPath,
impl<S> ShellPath for Concurrent<S>where
S: ShellPath,
Source§fn shell_path(&self) -> CString
fn shell_path(&self) -> CString
Source§impl<S> Sigaction for Concurrent<S>where
S: Sigaction,
Exposes the inner system’s sigaction method.
impl<S> Sigaction for Concurrent<S>where
S: Sigaction,
Exposes the inner system’s sigaction method.
This implementation of Sigaction simply delegates to the inner system’s
sigaction method, which bypasses the internal state of Concurrent and
may prevent the peek and
select methods from responding to received signals
without race conditions. To ensure that signal dispositions are configured
in a way that allows Concurrent to respond to signals correctly, direct
calls to sigaction should be avoided, and, if necessary, only used to
temporarily change the signal disposition for specific operations while
ensuring that the original disposition is restored afterward before a next
call to peek, select, or set_disposition.
The standard way to set a signal disposition to Concurrent is to use the
set_disposition method provided by the SignalSystem trait, which
ensures that the signal disposition and the signal mask are updated
consistently.
Source§fn sigaction(
&self,
signal: Number,
disposition: Disposition,
) -> Result<Disposition>
fn sigaction( &self, signal: Number, disposition: Disposition, ) -> Result<Disposition>
Source§impl<S> Sigmask for Concurrent<S>where
S: Sigmask,
Exposes the inner system’s sigmask method.
impl<S> Sigmask for Concurrent<S>where
S: Sigmask,
Exposes the inner system’s sigmask method.
This implementation of Sigmask simply delegates to the inner system’s
sigmask method, which bypasses the internal state of Concurrent and may
prevent the peek and select
methods from responding to received signals without race conditions. To
ensure that the signal mask is configured in a way that allows Concurrent
to respond to signals correctly, direct calls to sigmask should be
avoided, and, if necessary, only used to temporarily change the signal mask
for specific operations while ensuring that the original mask is restored
afterward before a next call to peek, select, or set_disposition.
Source§impl<S> Signals for Concurrent<S>where
S: Signals,
impl<S> Signals for Concurrent<S>where
S: Signals,
Source§const SIGCLD: Option<Number> = S::SIGCLD
const SIGCLD: Option<Number> = S::SIGCLD
SIGCLD, if available on the systemSource§const SIGEMT: Option<Number> = S::SIGEMT
const SIGEMT: Option<Number> = S::SIGEMT
SIGEMT, if available on the systemSource§const SIGINFO: Option<Number> = S::SIGINFO
const SIGINFO: Option<Number> = S::SIGINFO
SIGINFO, if available on the systemSource§const SIGIO: Option<Number> = S::SIGIO
const SIGIO: Option<Number> = S::SIGIO
SIGIO, if available on the systemSource§const SIGLOST: Option<Number> = S::SIGLOST
const SIGLOST: Option<Number> = S::SIGLOST
SIGLOST, if available on the systemSource§const SIGPOLL: Option<Number> = S::SIGPOLL
const SIGPOLL: Option<Number> = S::SIGPOLL
SIGPOLL, if available on the systemSource§const SIGPWR: Option<Number> = S::SIGPWR
const SIGPWR: Option<Number> = S::SIGPWR
SIGPWR, if available on the systemSource§const SIGSTKFLT: Option<Number> = S::SIGSTKFLT
const SIGSTKFLT: Option<Number> = S::SIGSTKFLT
SIGSTKFLT, if available on the systemSource§const SIGTHR: Option<Number> = S::SIGTHR
const SIGTHR: Option<Number> = S::SIGTHR
SIGTHR, if available on the systemSource§const NAMED_SIGNALS: &'static [(&'static str, Option<Number>)] = S::NAMED_SIGNALS
const NAMED_SIGNALS: &'static [(&'static str, Option<Number>)] = S::NAMED_SIGNALS
Source§fn sigrt_range(&self) -> Option<RangeInclusive<Number>>
fn sigrt_range(&self) -> Option<RangeInclusive<Number>>
Source§fn iter_sigrt(&self) -> impl DoubleEndedIterator<Item = Number> + use<S>
fn iter_sigrt(&self) -> impl DoubleEndedIterator<Item = Number> + use<S>
Source§fn to_signal_number<N: Into<RawNumber>>(&self, number: N) -> Option<Number>
fn to_signal_number<N: Into<RawNumber>>(&self, number: N) -> Option<Number>
Source§fn sig2str<N: Into<RawNumber>>(&self, signal: N) -> Option<Cow<'static, str>>
fn sig2str<N: Into<RawNumber>>(&self, signal: N) -> Option<Cow<'static, str>>
Source§fn str2sig(&self, name: &str) -> Option<Number>
fn str2sig(&self, name: &str) -> Option<Number>
Source§fn validate_signal(&self, number: RawNumber) -> Option<(Name, Number)>
fn validate_signal(&self, number: RawNumber) -> Option<(Name, Number)>
Source§impl<S> Sysconf for Concurrent<S>where
S: Sysconf,
impl<S> Sysconf for Concurrent<S>where
S: Sysconf,
Source§fn confstr_path(&self) -> Result<UnixString>
fn confstr_path(&self) -> Result<UnixString>
$PATH value where all standard utilities are
expected to be found. Read moreSource§impl<S> TcGetPgrp for Concurrent<S>where
S: TcGetPgrp,
impl<S> TcGetPgrp for Concurrent<S>where
S: TcGetPgrp,
Source§impl<S> TcSetPgrp for Concurrent<S>where
S: TcSetPgrp,
impl<S> TcSetPgrp for Concurrent<S>where
S: TcSetPgrp,
Source§impl<S> Times for Concurrent<S>where
S: Times,
impl<S> Times for Concurrent<S>where
S: Times,
Source§impl<S> Umask for Concurrent<S>where
S: Umask,
impl<S> Umask for Concurrent<S>where
S: Umask,
Auto Trait Implementations§
impl<S> !Freeze for Concurrent<S>
impl<S> !RefUnwindSafe for Concurrent<S>
impl<S> !Send for Concurrent<S>
impl<S> !Sync for Concurrent<S>
impl<S> Unpin for Concurrent<S>where
S: Unpin,
impl<S> UnsafeUnpin for Concurrent<S>where
S: UnsafeUnpin,
impl<S> !UnwindSafe for Concurrent<S>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more