pub enum InputReaderCommand {
Replay {
metadata: Value,
data: Value,
},
Extend,
Pause,
Queue {
checkpoint_requested: bool,
},
Disconnect,
}Expand description
Commands for an InputReader to execute.
§Transitions
The following diagram shows the possible order in which the controller can issue commands to InputReaders:
┌─⯇─ (start) ─⯈──┐
│ │ │
│ │ ┌───┐ │
│ ▼ ▼ │ │
├─⯇─ Replay ──┘ │
│ │ │
│ ▼ │
├─⯇─ Extend⯇─────┤
│ │ │
│ │ ┌───┐ │
│ ▼ ▼ │ │
├─⯇─ Queue ──┘ │
│ │ │
│ ▼ │
├─⯇─ Pause ─⯈────┘
│ │
│ ▼
└───⯈Disconnect
│
▼
(end)§Stalls
When the controller issues a InputReaderCommand::Replay or
InputReaderCommand::Queue command to an input adapter, it waits for the
input adapter to respond to them. Until it receives a reply, the next step
cannot proceed. An input adapter that does not respond to one of these
commands will stall the entire pipeline. However, the controller also uses
InputReader::is_closed to detect that an input adapter has died due to an
error or reaching end-of-input, so input adapters for which it is difficult
to handle errors gracefully can report that they have died using
is_closed, if necessary, as described in more detail below.
§End-of-input handling
If an input adapter reaches the end of its input, and it isn’t implemented to wait for and pass along further input, then it should:
-
Make sure that it has already indicated that it has buffered all of its data, via InputConsumer::buffered.
-
Call InputConsumer::eoi to indicate that it has reached end of input.
-
Respond to InputReaderCommand::Queue until it has queued all of its input and has none left.
-
Optionally, at this point, it may exit and start returning
truefromInputReader::is_closed.
§Error handling
If an input adapter encounters a fatal error that keeps it from continuing
to obtain input, then it should report the error via InputConsumer::error
with true for fatal. Afterward, it may exit and start returning true
from InputReader::is_closed.
§Additional requirement
An input adapter should ensure that, if it flushes any records to the circuit in response to InputReaderCommand::Replay or InputReaderCommand::Queue, then it finishes up and responds to the consumer using InputConsumer::replayed or InputConsumer::extended, respectively. If it instead dies mid-way, then the controller will not record the step properly and fault tolerance replay will be incorrect.
Variants§
Replay
Tells the input reader to replay the step described by metadata and
data by reading and flushing buffers for the data in the step, and
then InputConsumer::replayed to signal completion.
The input reader should report the data that it queues to InputConsumer::buffered as it does the replay.
The input reader doesn’t have to process other commands while it does the replay.
§Constraints
Only fault-tolerant input readers need to accept this. It will be issued zero or more times, before any other command.
Extend
Tells the input reader to accept further input. The first time it
receives this command, the reader should start from the resume point
passed as resume_info when the endpoint was opened, if any, and
otherwise from the beginning of input.
The input reader should report the data that it queues to InputConsumer::buffered as it queues it.
§Constraints
The controller will not call this function:
-
Twice on a given reader without an intervening InputReaderCommand::Pause.
-
If it requested a replay (with InputReaderCommand::Replay) and the reader hasn’t yet reported that the replay is complete.
Pause
Tells the input reader to stop reading more input.
The controller uses this to limit the number of buffered records and to respond to user requests to pause the pipeline.
§Constraints
The controller issues this only after a paired InputReaderCommand::Extend.
Queue
Tells the input reader to flush input buffers to the circuit.
The input reader can call InputConsumer::max_batch_size to find out how many records it should flush. When it’s done, it must call InputConsumer::extended to report it.
The checkpoint_requested flag indicates that the controller is trying
to checkpoint or suspend the pipeline. This serves as a hint to the reader
to try to clear the checkpoint barrier by returning Resume::Seek or
Resume::Replay if possible. For instance, if the reader has multiple
buffers queued, it can choose to stop flushing them after reaching the first
buffer that corresponds to a seekable position in the input stream.
§Constraints
The controller won’t issue this command before it first issues InputReaderCommand::Extend.
Disconnect
Tells the reader it’s going to be dropped soon and should clean up.
The reader can continue to queue some data buffers afterward if that’s the easiest implementation.
§Constraints
The controller calls this only once and won’t call any other functions for a given reader after it calls this one.
Implementations§
Source§impl InputReaderCommand
impl InputReaderCommand
Sourcepub fn as_nonft(&self) -> Option<NonFtInputReaderCommand>
pub fn as_nonft(&self) -> Option<NonFtInputReaderCommand>
Returns this command translated to a NonFtInputReaderCommand, or
None if that is not possible (because this command is only for
fault-tolerant endpoints).
Trait Implementations§
Auto Trait Implementations§
impl Freeze for InputReaderCommand
impl RefUnwindSafe for InputReaderCommand
impl Send for InputReaderCommand
impl Sync for InputReaderCommand
impl Unpin for InputReaderCommand
impl UnsafeUnpin for InputReaderCommand
impl UnwindSafe for InputReaderCommand
Blanket Implementations§
impl<T> Allocation for T
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<F, W, T, D> Deserialize<With<T, W>, D> for F
impl<F, W, T, D> Deserialize<With<T, W>, D> for F
impl<T> ErasedDestructor for Twhere
T: 'static,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more