pub struct ReplicaClient { /* private fields */ }Expand description
One blocking TCP connection to a primary’s per-shard replication
listener. After Self::connect completes the handshake, the
client behaves as an Iterator<Item = Result<DecodedFrame, ReplicaError>>
yielding frames in offset order until the peer disconnects or a
hard error surfaces.
Implementations§
Source§impl ReplicaClient
impl ReplicaClient
Sourcepub fn connect<A: ToSocketAddrs>(
addr: A,
replica_id: &str,
from_offset: u64,
) -> Result<Self, ReplicaError>
pub fn connect<A: ToSocketAddrs>( addr: A, replica_id: &str, from_offset: u64, ) -> Result<Self, ReplicaError>
Connect to addr, send REPLICATE FROM <from_offset> ID <replica_id>,
read the +ACK <offset> reply, and return a ready-to-iterate
client. Blocks until the handshake completes or the connect
times out (connect_timeout argument).
Sourcepub fn connect_with_timeout<A: ToSocketAddrs>(
addr: A,
replica_id: &str,
from_offset: u64,
connect_timeout: Duration,
) -> Result<Self, ReplicaError>
pub fn connect_with_timeout<A: ToSocketAddrs>( addr: A, replica_id: &str, from_offset: u64, connect_timeout: Duration, ) -> Result<Self, ReplicaError>
Self::connect with an explicit connect timeout. Useful for
tests that don’t want to wait the default 5 s when a port is
closed.
Sourcepub fn primary_offset_at_handshake(&self) -> u64
pub fn primary_offset_at_handshake(&self) -> u64
Offset the primary reported at handshake (+ACK <N> value).
Informational — exposed so callers can log + future T1.22
snapshot-ship logic can compare against the local applied
offset to decide resume vs full-sync.
Sourcepub fn socket_handle(&self) -> Result<TcpStream>
pub fn socket_handle(&self) -> Result<TcpStream>
Return a try_clone’d handle on the underlying socket. The
clone shares the same kernel file description, so calling
shutdown(Shutdown::Both) on it unblocks any in-flight
blocking read on the original (and vice versa). T1.29.5 uses
this to interrupt a runner thread parked in next_event when
REPLICAOF retargets or REPLICAOF NO ONE demotes — without
this handle, the runner stays blocked until the upstream peer
closes the connection.
Sourcepub fn expected_offset(&self) -> u64
pub fn expected_offset(&self) -> u64
The offset the next frame should carry. Advances on every
successful next().
Sourcepub fn next_frame(&mut self) -> Option<Result<DecodedFrame, ReplicaError>>
pub fn next_frame(&mut self) -> Option<Result<DecodedFrame, ReplicaError>>
Pull the next frame from the stream. Frame-only convenience —
returns ReplicaError::SnapshotInProgress if the primary is
sending a snapshot. Callers that need the snapshot-aware
surface (T1.22) must use Self::next_event instead.
Returns None on clean peer EOF (no buffered bytes left).
Source§impl ReplicaClient
impl ReplicaClient
Sourcepub fn next_event(&mut self) -> Option<Result<ReplicaEvent, ReplicaError>>
pub fn next_event(&mut self) -> Option<Result<ReplicaEvent, ReplicaError>>
Snapshot-aware iterator step. Returns one ReplicaEvent per
call — a live Frame, a SnapshotBegin/SnapshotChunk/
SnapshotEnd, or one of the ReplicaError variants.
Returns None on clean peer EOF.
Snapshot bookkeeping:
- Entering
SnapshotBeginsetsin_snapshot = true; chunk bytes are valid untilSnapshotEnd. SnapshotEnd { ack_offset }setsexpected_offset = ack_offset(so the next liveFramehas no gap) and clearsin_snapshot.- Live
*2\r\nbytes during a snapshot returnReplicaError::UnexpectedInSnapshot(v1.18 forbids interleaving — seedocs/snapshot.md).
Trait Implementations§
Source§impl Iterator for ReplicaClient
impl Iterator for ReplicaClient
Source§fn next(&mut self) -> Option<Self::Item>
fn next(&mut self) -> Option<Self::Item>
Frame-only iterator. Use ReplicaClient::next_event for the
snapshot-aware surface.
Source§type Item = Result<DecodedFrame, ReplicaError>
type Item = Result<DecodedFrame, ReplicaError>
Source§fn next_chunk<const N: usize>(
&mut self,
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
fn next_chunk<const N: usize>(
&mut self,
) -> Result<[Self::Item; N], IntoIter<Self::Item, N>>where
Self: Sized,
iter_next_chunk)N values. Read more1.0.0 (const: unstable) · Source§fn size_hint(&self) -> (usize, Option<usize>)
fn size_hint(&self) -> (usize, Option<usize>)
1.0.0 (const: unstable) · Source§fn count(self) -> usizewhere
Self: Sized,
fn count(self) -> usizewhere
Self: Sized,
1.0.0 (const: unstable) · Source§fn last(self) -> Option<Self::Item>where
Self: Sized,
fn last(self) -> Option<Self::Item>where
Self: Sized,
Source§fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
fn advance_by(&mut self, n: usize) -> Result<(), NonZero<usize>>
iter_advance_by)n elements. Read more1.0.0 (const: unstable) · Source§fn nth(&mut self, n: usize) -> Option<Self::Item>
fn nth(&mut self, n: usize) -> Option<Self::Item>
nth element of the iterator. Read more1.28.0 (const: unstable) · Source§fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
fn step_by(self, step: usize) -> StepBy<Self>where
Self: Sized,
1.0.0 (const: unstable) · Source§fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
fn chain<U>(self, other: U) -> Chain<Self, <U as IntoIterator>::IntoIter>
1.0.0 (const: unstable) · Source§fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
fn zip<U>(self, other: U) -> Zip<Self, <U as IntoIterator>::IntoIter>where
Self: Sized,
U: IntoIterator,
Source§fn intersperse(self, separator: Self::Item) -> Intersperse<Self>
fn intersperse(self, separator: Self::Item) -> Intersperse<Self>
iter_intersperse)separator between items
of the original iterator. Read moreSource§fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
fn intersperse_with<G>(self, separator: G) -> IntersperseWith<Self, G>
iter_intersperse)separator
between items of the original iterator. Read more1.0.0 (const: unstable) · Source§fn map<B, F>(self, f: F) -> Map<Self, F>
fn map<B, F>(self, f: F) -> Map<Self, F>
1.21.0 (const: unstable) · Source§fn for_each<F>(self, f: F)
fn for_each<F>(self, f: F)
1.0.0 (const: unstable) · Source§fn filter<P>(self, predicate: P) -> Filter<Self, P>
fn filter<P>(self, predicate: P) -> Filter<Self, P>
1.0.0 (const: unstable) · Source§fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F>
1.0.0 (const: unstable) · Source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
1.0.0 (const: unstable) · Source§fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1.0.0 (const: unstable) · Source§fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1.57.0 (const: unstable) · Source§fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1.0.0 (const: unstable) · Source§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n elements. Read more1.0.0 (const: unstable) · Source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n elements, or fewer
if the underlying iterator ends sooner. Read more1.0.0 (const: unstable) · Source§fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1.0.0 (const: unstable) · Source§fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
1.29.0 (const: unstable) · Source§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
Source§fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
fn map_windows<F, R, const N: usize>(self, f: F) -> MapWindows<Self, F, N>
iter_map_windows)f for each contiguous window of size N over
self and returns an iterator over the outputs of f. Like slice::windows(),
the windows during mapping overlap as well. Read more1.0.0 (const: unstable) · Source§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
1.0.0 (const: unstable) · Source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
Iterator. Read more1.0.0 (const: unstable) · Source§fn collect<B>(self) -> B
fn collect<B>(self) -> B
Source§fn try_collect<B>(
&mut self,
) -> <<Self::Item as Try>::Residual as Residual<B>>::TryType
fn try_collect<B>( &mut self, ) -> <<Self::Item as Try>::Residual as Residual<B>>::TryType
iterator_try_collect)Source§fn collect_into<E>(self, collection: &mut E) -> &mut E
fn collect_into<E>(self, collection: &mut E) -> &mut E
iter_collect_into)1.0.0 (const: unstable) · Source§fn partition<B, F>(self, f: F) -> (B, B)
fn partition<B, F>(self, f: F) -> (B, B)
Source§fn is_partitioned<P>(self, predicate: P) -> bool
fn is_partitioned<P>(self, predicate: P) -> bool
iter_is_partitioned)true precede all those that return false. Read more1.27.0 (const: unstable) · Source§fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
fn try_fold<B, F, R>(&mut self, init: B, f: F) -> R
1.27.0 (const: unstable) · Source§fn try_for_each<F, R>(&mut self, f: F) -> R
fn try_for_each<F, R>(&mut self, f: F) -> R
1.0.0 (const: unstable) · Source§fn fold<B, F>(self, init: B, f: F) -> B
fn fold<B, F>(self, init: B, f: F) -> B
1.51.0 (const: unstable) · Source§fn reduce<F>(self, f: F) -> Option<Self::Item>
fn reduce<F>(self, f: F) -> Option<Self::Item>
Source§fn try_reduce<R>(
&mut self,
f: impl FnMut(Self::Item, Self::Item) -> R,
) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
fn try_reduce<R>( &mut self, f: impl FnMut(Self::Item, Self::Item) -> R, ) -> <<R as Try>::Residual as Residual<Option<<R as Try>::Output>>>::TryType
iterator_try_reduce)1.0.0 (const: unstable) · Source§fn all<F>(&mut self, f: F) -> bool
fn all<F>(&mut self, f: F) -> bool
1.0.0 (const: unstable) · Source§fn any<F>(&mut self, f: F) -> bool
fn any<F>(&mut self, f: F) -> bool
1.0.0 (const: unstable) · Source§fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
1.30.0 (const: unstable) · Source§fn find_map<B, F>(&mut self, f: F) -> Option<B>
fn find_map<B, F>(&mut self, f: F) -> Option<B>
Source§fn try_find<R>(
&mut self,
f: impl FnMut(&Self::Item) -> R,
) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
fn try_find<R>( &mut self, f: impl FnMut(&Self::Item) -> R, ) -> <<R as Try>::Residual as Residual<Option<Self::Item>>>::TryType
try_find)1.0.0 (const: unstable) · Source§fn position<P>(&mut self, predicate: P) -> Option<usize>
fn position<P>(&mut self, predicate: P) -> Option<usize>
1.0.0 (const: unstable) · Source§fn max(self) -> Option<Self::Item>
fn max(self) -> Option<Self::Item>
1.0.0 (const: unstable) · Source§fn min(self) -> Option<Self::Item>
fn min(self) -> Option<Self::Item>
1.6.0 (const: unstable) · Source§fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn max_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 (const: unstable) · Source§fn max_by<F>(self, compare: F) -> Option<Self::Item>
fn max_by<F>(self, compare: F) -> Option<Self::Item>
1.6.0 (const: unstable) · Source§fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
fn min_by_key<B, F>(self, f: F) -> Option<Self::Item>
1.15.0 (const: unstable) · Source§fn min_by<F>(self, compare: F) -> Option<Self::Item>
fn min_by<F>(self, compare: F) -> Option<Self::Item>
1.0.0 (const: unstable) · Source§fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
fn unzip<A, B, FromA, FromB>(self) -> (FromA, FromB)
1.36.0 (const: unstable) · Source§fn copied<'a, T>(self) -> Copied<Self>
fn copied<'a, T>(self) -> Copied<Self>
Source§fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
fn array_chunks<const N: usize>(self) -> ArrayChunks<Self, N>where
Self: Sized,
iter_array_chunks)N elements of the iterator at a time. Read more1.11.0 (const: unstable) · Source§fn product<P>(self) -> P
fn product<P>(self) -> P
Source§fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
fn cmp_by<I, F>(self, other: I, cmp: F) -> Ordering
iter_order_by)Iterator with those
of another with respect to the specified comparison function. Read more1.5.0 (const: unstable) · Source§fn partial_cmp<I>(self, other: I) -> Option<Ordering>
fn partial_cmp<I>(self, other: I) -> Option<Ordering>
PartialOrd elements of
this Iterator with those of another. The comparison works like short-circuit
evaluation, returning a result without comparing the remaining elements.
As soon as an order can be determined, the evaluation stops and a result is returned. Read moreSource§fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
fn partial_cmp_by<I, F>(self, other: I, partial_cmp: F) -> Option<Ordering>where
Self: Sized,
I: IntoIterator,
F: FnMut(Self::Item, <I as IntoIterator>::Item) -> Option<Ordering>,
iter_order_by)Iterator with those
of another with respect to the specified comparison function. Read moreSource§fn eq_by<I, F>(self, other: I, eq: F) -> bool
fn eq_by<I, F>(self, other: I, eq: F) -> bool
iter_order_by)1.5.0 (const: unstable) · Source§fn lt<I>(self, other: I) -> bool
fn lt<I>(self, other: I) -> bool
Iterator are lexicographically
less than those of another. Read more1.5.0 (const: unstable) · Source§fn le<I>(self, other: I) -> bool
fn le<I>(self, other: I) -> bool
Iterator are lexicographically
less or equal to those of another. Read more1.5.0 (const: unstable) · Source§fn gt<I>(self, other: I) -> bool
fn gt<I>(self, other: I) -> bool
Iterator are lexicographically
greater than those of another. Read more1.5.0 (const: unstable) · Source§fn ge<I>(self, other: I) -> bool
fn ge<I>(self, other: I) -> bool
Iterator are lexicographically
greater than or equal to those of another. Read more