Struct tsyncp::util::stream_pool::StreamPool
source · pub struct StreamPool<S, const N: usize = 0> { /* private fields */ }Expand description
Pool of byte streams (i.e. TcpStream), which manages receiving and send to the underlying connections.
First generic parameter S is the type of the underlying byte stream, and const generic N is
the number of streams when the pool uses an array instead of a vec. When vec is used, N
defaults to 0.
The pool can be made of either a Vec of streams, or an array of streams.
When it’s an array, it stores the streams in an array on the stack, which may allow better
performance. Array’s length is the limit of total number of connections the pool can have, and
current number of streams is kept as a field len.
Unused array indices are placed as uninitialized MaybeUninit elements.
This implementation does a careful job so that uninitialized memory is never accessed; however,
I do understand that part of the safety concern of using uninitialized memory is that, in the
future, I may forget what part of the code does what and accidently expose the unsafe code.
Therefore, if I find that using an array really has no effect on performance, I will remove the array implementation completely.
For streaming data from connections, the pool tries to poll every stream in the pool at every poll event. However, if one of the streams returns a result, it will save the last index of the stream it was polled, and start polling at the next index in the next poll event.
For sending data to connections, it tries to send to all connections even when encountering an error with some connections. So, it collects all the errors while sending data, and return them after data is flushed. When an error is encountered, the pool removes the errored stream from the pool, stores the error, then continues polling to the next stream.
Implementations§
source§impl<S, const N: usize> StreamPool<S, N>
impl<S, const N: usize> StreamPool<S, N>
sourcepub fn push_stream(
&mut self,
stream: S,
addr: SocketAddr,
) -> Result<(), StreamPoolError>
pub fn push_stream( &mut self, stream: S, addr: SocketAddr, ) -> Result<(), StreamPoolError>
Wrap the given byte stream into Framed, then push it into the pool.
source§impl<S, const N: usize> StreamPool<S, N>
impl<S, const N: usize> StreamPool<S, N>
sourcepub fn array<const M: usize>() -> StreamPool<S, M>
pub fn array<const M: usize>() -> StreamPool<S, M>
Returns pool with an array of streams with limit M.
Attempt to push a stream when the pool already has M streams will return an error.
sourcepub fn with_capacity(capacity: usize) -> Self
pub fn with_capacity(capacity: usize) -> Self
Returns a pool with a vec of streams with a capacity but without limit.
sourcepub fn with_limit(limit: usize) -> Self
pub fn with_limit(limit: usize) -> Self
Returns a pool with a vec of streams with a limit.
Attempt to push a stream when the pool already has already reached the limit will return an error.
sourcepub fn addrs(&self) -> Vec<SocketAddr>
pub fn addrs(&self) -> Vec<SocketAddr>
Returns remote addresses of all streams in the pool.
sourcepub fn get(&self, index: usize) -> Option<&(Framed<S>, SocketAddr)>
pub fn get(&self, index: usize) -> Option<&(Framed<S>, SocketAddr)>
Returns a reference of stream and address at the index.
sourcepub fn get_mut(&mut self, index: usize) -> Option<&mut (Framed<S>, SocketAddr)>
pub fn get_mut(&mut self, index: usize) -> Option<&mut (Framed<S>, SocketAddr)>
Returns a mutable reference of stream and address at the index.
sourcepub fn push(
&mut self,
stream: Framed<S>,
addr: SocketAddr,
) -> Result<(), StreamPoolError>
pub fn push( &mut self, stream: Framed<S>, addr: SocketAddr, ) -> Result<(), StreamPoolError>
Push a stream into the pool.
This will return an error if attempted to push a stream while pool is full.
sourcepub fn pop(&mut self) -> Option<(Framed<S>, SocketAddr)>
pub fn pop(&mut self) -> Option<(Framed<S>, SocketAddr)>
Pops and returns a stream from the pool.
sourcepub fn swap_remove(&mut self, index: usize) -> Option<(Framed<S>, SocketAddr)>
pub fn swap_remove(&mut self, index: usize) -> Option<(Framed<S>, SocketAddr)>
Swaps a stream at the index with the last element, and pops that element.
Trait Implementations§
source§impl<S: AsyncWrite + Unpin, const N: usize> Sink<Bytes> for StreamPool<S, N>
impl<S: AsyncWrite + Unpin, const N: usize> Sink<Bytes> for StreamPool<S, N>
§type Error = SinkErrors
type Error = SinkErrors
source§fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>
fn poll_ready( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), Self::Error>>
Sink to receive a value. Read moresource§fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error>
fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error>
poll_ready which returned Poll::Ready(Ok(())). Read moresource§impl<S, const N: usize> Split for StreamPool<S, N>
impl<S, const N: usize> Split for StreamPool<S, N>
Splits the pool into pool of read-only streams, and pool of write-only streams.
§type Left = StreamPool<<S as Split>::Left, N>
type Left = StreamPool<<S as Split>::Left, N>
§type Right = StreamPool<<S as Split>::Right, N>
type Right = StreamPool<<S as Split>::Right, N>
§type Error = UnsplitError<<S as Split>::Error>
type Error = UnsplitError<<S as Split>::Error>
unsplit(_, _).source§impl<S: AsyncRead + Unpin, const N: usize> Stream for StreamPool<S, N>
impl<S: AsyncRead + Unpin, const N: usize> Stream for StreamPool<S, N>
Auto Trait Implementations§
impl<S, const N: usize> Freeze for StreamPool<S, N>where
S: Freeze,
impl<S, const N: usize = 0> !RefUnwindSafe for StreamPool<S, N>
impl<S, const N: usize> Send for StreamPool<S, N>where
S: Send,
impl<S, const N: usize> Sync for StreamPool<S, N>where
S: Sync,
impl<S, const N: usize> Unpin for StreamPool<S, N>
impl<S, const N: usize = 0> !UnwindSafe for StreamPool<S, N>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T, Item> SinkExt<Item> for T
impl<T, Item> SinkExt<Item> for T
source§fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
source§fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
source§fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
source§fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
fn sink_err_into<E>(self) -> SinkErrInto<Self, Item, E>
Into trait. Read moresource§fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
fn buffer(self, capacity: usize) -> Buffer<Self, Item>where
Self: Sized,
source§fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
fn flush(&mut self) -> Flush<'_, Self, Item>where
Self: Unpin,
source§fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
fn send(&mut self, item: Item) -> Send<'_, Self, Item>where
Self: Unpin,
source§fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>where
Self: Unpin,
source§fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
source§fn right_sink<Si1>(self) -> Either<Si1, Self>
fn right_sink<Si1>(self) -> Either<Si1, Self>
source§fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
fn poll_ready_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>>where
Self: Unpin,
Sink::poll_ready on Unpin
sink types.source§fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>where
Self: Unpin,
Sink::start_send on Unpin
sink types.source§impl<T> StreamExt for T
impl<T> StreamExt for T
source§fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
fn next(&mut self) -> Next<'_, Self>where
Self: Unpin,
source§fn into_future(self) -> StreamFuture<Self>
fn into_future(self) -> StreamFuture<Self>
source§fn map<T, F>(self, f: F) -> Map<Self, F>
fn map<T, F>(self, f: F) -> Map<Self, F>
source§fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
fn enumerate(self) -> Enumerate<Self>where
Self: Sized,
source§fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
source§fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
source§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
source§fn collect<C>(self) -> Collect<Self, C>
fn collect<C>(self) -> Collect<Self, C>
source§fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
source§fn concat(self) -> Concat<Self>
fn concat(self) -> Concat<Self>
source§fn count(self) -> Count<Self>where
Self: Sized,
fn count(self) -> Count<Self>where
Self: Sized,
source§fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
source§fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
fn any<Fut, F>(self, f: F) -> Any<Self, Fut, F>
true if any element in stream satisfied a predicate. Read moresource§fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
fn all<Fut, F>(self, f: F) -> All<Self, Fut, F>
true if all element in stream satisfied a predicate. Read moresource§fn flatten(self) -> Flatten<Self>
fn flatten(self) -> Flatten<Self>
source§fn flatten_unordered(
self,
limit: impl Into<Option<usize>>,
) -> FlattenUnorderedWithFlowController<Self, ()>
fn flatten_unordered( self, limit: impl Into<Option<usize>>, ) -> FlattenUnorderedWithFlowController<Self, ()>
source§fn flat_map_unordered<U, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> FlatMapUnordered<Self, U, F>
fn flat_map_unordered<U, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> FlatMapUnordered<Self, U, F>
StreamExt::map but flattens nested Streams
and polls them concurrently, yielding items in any order, as they made
available. Read moresource§fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
StreamExt::fold that holds internal state
and produces a new stream. Read moresource§fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
true. Read moresource§fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
true. Read moresource§fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
source§fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
source§fn for_each_concurrent<Fut, F>(
self,
limit: impl Into<Option<usize>>,
f: F,
) -> ForEachConcurrent<Self, Fut, F>
fn for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F>
source§fn take(self, n: usize) -> Take<Self>where
Self: Sized,
fn take(self, n: usize) -> Take<Self>where
Self: Sized,
n items of the underlying stream. Read moresource§fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
fn skip(self, n: usize) -> Skip<Self>where
Self: Sized,
n items of the underlying stream. Read moresource§fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self>where
Self: Sized + UnwindSafe,
source§fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
source§fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>where
Self: Sized + 'a,
source§fn buffered(self, n: usize) -> Buffered<Self>
fn buffered(self, n: usize) -> Buffered<Self>
source§fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
source§fn zip<St>(self, other: St) -> Zip<Self, St>
fn zip<St>(self, other: St) -> Zip<Self, St>
source§fn peekable(self) -> Peekable<Self>where
Self: Sized,
fn peekable(self) -> Peekable<Self>where
Self: Sized,
peek method. Read moresource§fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
fn chunks(self, capacity: usize) -> Chunks<Self>where
Self: Sized,
source§fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>where
Self: Sized,
source§fn forward<S>(self, sink: S) -> Forward<Self, S>
fn forward<S>(self, sink: S) -> Forward<Self, S>
source§fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
source§fn inspect<F>(self, f: F) -> Inspect<Self, F>
fn inspect<F>(self, f: F) -> Inspect<Self, F>
source§fn left_stream<B>(self) -> Either<Self, B>
fn left_stream<B>(self) -> Either<Self, B>
source§fn right_stream<B>(self) -> Either<B, Self>
fn right_stream<B>(self) -> Either<B, Self>
source§fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>where
Self: Unpin,
Stream::poll_next on Unpin
stream types.