BlockSynchronizer

Struct BlockSynchronizer 

Source
pub struct BlockSynchronizer<S> { /* private fields */ }
Expand description

Aligns multiple StateSynchronizers on the block dimension.

§Purpose

The purpose of this component is to handle streams from multiple state synchronizers and align/merge them according to their blocks. Ideally this should be done in a fault-tolerant way, meaning we can recover from a state synchronizer suffering from timing issues. E.g. a delayed or unresponsive state synchronizer might recover again, or an advanced state synchronizer can be included again once we reach the block it is at.

§Limitations

  • Supports only chains with fixed blocks time for now due to the lock step mechanism.

§Initialisation

Queries all registered synchronizers for their first message and evaluates the state of each synchronizer. If a synchronizer’s first message is an older block, it is marked as delayed. If no message is received within the startup timeout, the synchronizer is marked as stale and is closed.

§Main loop

Once started, the synchronizers are queried concurrently for messages in lock step: the main loop queries all synchronizers in ready for the last emitted data, builds the FeedMessage and emits it, then it schedules the wait procedure for the next block.

§Synchronization Logic

To classify a synchronizer as delayed, we need to first define the current block. The highest block number of all ready synchronizers is considered the current block.

Once we have the current block we can easily determine which block we expect next. And if a synchronizer delivers an older block we can classify it as delayed.

If any synchronizer is not in the ready state we will try to bring it back to the ready state. This is done by trying to empty any buffers of a delayed synchronizer or waiting to reach the height of an advanced synchronizer (and flagging it as such in the meantime).

Of course, we can’t wait forever for a synchronizer to reply/recover. All of this must happen within the block production step of the blockchain: The wait procedure consists of waiting for any of the individual ProtocolStateSynchronizers to emit a new message (within a max timeout - several multiples of the block time). Once a message is received a very short timeout starts for the remaining synchronizers, to deliver a message. Any synchronizer failing to do so is transitioned to delayed.

§Note

The described process above is the goal. It is currently not implemented like that. Instead we simply wait block_time + wait_time. Synchronizers are expected to respond within that timeout. This is simpler but only works well on chains with fixed block times.

Implementations§

Source§

impl<S> BlockSynchronizer<S>

Source

pub fn new( block_time: Duration, latency_buffer: Duration, max_missed_blocks: u64, ) -> Self

Source

pub fn max_messages(&mut self, val: usize)

Limits the stream to emit a maximum number of messages.

After the stream emitted max messages it will end. This is only useful for testing purposes or if you only want to process a fixed amount of messages and then terminate cleanly.

Source

pub fn startup_timeout(self, val: Duration)

Sets timeout for the first message of a protocol.

Time to wait for the full first message, including snapshot retrieval.

Source

pub fn register_synchronizer( self, id: ExtractorIdentity, synchronizer: S, ) -> Self

Source

pub async fn run( self, ) -> Result<(JoinHandle<()>, Receiver<Result<FeedMessage<BlockHeader>, BlockSynchronizerError>>), BlockSynchronizerError>

Starts the synchronization of streams.

Will error directly if the startup fails. Once the startup is complete, it will communicate any fatal errors through the stream before closing it.

Auto Trait Implementations§

§

impl<S> Freeze for BlockSynchronizer<S>

§

impl<S> RefUnwindSafe for BlockSynchronizer<S>
where S: RefUnwindSafe,

§

impl<S> Send for BlockSynchronizer<S>
where S: Send,

§

impl<S> Sync for BlockSynchronizer<S>
where S: Sync,

§

impl<S> Unpin for BlockSynchronizer<S>
where S: Unpin,

§

impl<S> UnwindSafe for BlockSynchronizer<S>
where S: UnwindSafe,

Blanket Implementations§

§

impl<T> Any for T
where T: 'static + ?Sized,

§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> Borrow<T> for T
where T: ?Sized,

§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
§

impl<T> BorrowMut<T> for T
where T: ?Sized,

§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<T> From<T> for T

§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
§

impl<T, U> Into<U> for T
where U: From<T>,

§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,