Struct BlockSynchronizer

Source
pub struct BlockSynchronizer<S> { /* private fields */ }
Expand description

Aligns multiple StateSynchronizers on the block dimension.

§Purpose

The purpose of this component is to handle streams from multiple state synchronizers and align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way, meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or unresponsive state synchronizer might recover again, or an advanced state synchronizer can be included again once we reach the block it is at.

§Limitations

  • Supports only chains with fixed blocks time for now due to the lock step mechanism.

§Initialisation

Queries all registered synchronizers for their first message and evaluates the state of each synchronizer. If a synchronizer’s first message is an older block, it is marked as delayed. If no message is received within the startup timeout, the synchronizer is marked as stale and is closed.

§Main loop

Once started, the synchronizers are queried concurrently for messages in lock step: the main loop queries all synchronizers in ready for the last emitted data, builds the FeedMessage and emits it, then it schedules the wait procedure for the next block.

§Synchronization Logic

To classify a synchronizer as delayed, we need to first define the current block. The highest block number of all ready synchronizers is considered the current block.

Once we have the current block we can easily determine which block we expect next. And if a synchronizer delivers an older block we can classify it as delayed.

If any synchronizer is not in the ready state we will try to bring it back to the ready state. This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach the height of an advanced synchronizer (and flagging it as such in the meantime).

Of course, we can’t wait forever for a synchronizer to reply/recover. All of this must happen within the block production step of the blockchain: The wait procedure consists of waiting for any of the receivers to emit a new message (within a max timeout - several multiples of the block time). Once a message is received a very short timeout start for the remaining synchronizers, to deliver a message. Any synchronizer failing to do so is transitioned to delayed.

§Note

The described process above is the goal. It is currently not implemented like that. Instead we simply wait block_time + wait_time. Synchronizers are expected to respond within that timeout. This is simpler but only works well on chains with fixed block times.

Implementations§

Source§

impl<S> BlockSynchronizer<S>

Source

pub fn new( block_time: Duration, max_wait: Duration, max_missed_blocks: u64, ) -> Self

Source

pub fn max_messages(&mut self, val: usize)

Source

pub fn register_synchronizer( self, id: ExtractorIdentity, synchronizer: S, ) -> Self

Source

pub async fn run( self, ) -> Result<(JoinHandle<()>, Receiver<FeedMessage>), BlockSynchronizerError>

Auto Trait Implementations§

§

impl<S> Freeze for BlockSynchronizer<S>

§

impl<S> RefUnwindSafe for BlockSynchronizer<S>
where S: RefUnwindSafe,

§

impl<S> Send for BlockSynchronizer<S>
where S: Send,

§

impl<S> Sync for BlockSynchronizer<S>
where S: Sync,

§

impl<S> Unpin for BlockSynchronizer<S>
where S: Unpin,

§

impl<S> UnwindSafe for BlockSynchronizer<S>
where S: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,