pub struct StreamHandle { /* private fields */ }Expand description
A handle to an inbound stream that can be cloned for multiple consumers.
Each clone maintains its own read position, allowing independent consumers to read the stream at different rates.
The design separates the lock-free buffer from synchronization state:
buffer: Lock-free fragment storage usingOnceLock<Bytes>slotssync: Cancelled flag and wakers (requires lock)
Fragment insertion goes directly to the buffer without acquiring any lock,
and the buffer’s built-in Notify handles async notification.
Implementations§
Source§impl StreamHandle
impl StreamHandle
Sourcepub fn new(stream_id: StreamId, total_bytes: u64) -> Self
pub fn new(stream_id: StreamId, total_bytes: u64) -> Self
Creates a new stream handle.
This is public for use by OrphanStreamRegistry which needs to create
handles and for transport layer stream registration.
Sourcepub fn total_bytes(&self) -> u64
pub fn total_bytes(&self) -> u64
Returns the total expected bytes.
Sourcepub fn is_complete(&self) -> bool
pub fn is_complete(&self) -> bool
Returns true if all fragments have been received.
Sourcepub fn received_fragments(&self) -> usize
pub fn received_fragments(&self) -> usize
Returns the number of fragments received so far.
Sourcepub fn total_fragments(&self) -> usize
pub fn total_fragments(&self) -> usize
Returns the total expected number of fragments.
Sourcepub fn stream(&self) -> StreamingInboundStream
pub fn stream(&self) -> StreamingInboundStream
Creates a new streaming view starting from fragment 1.
Each call creates an independent stream with its own read position.
Fragments are cloned but remain in the buffer, allowing multiple
consumers to read the same data via fork().
§Memory Behavior
All fragments remain in the buffer until the stream is dropped or explicitly cleared. This means:
- Pro: Multiple consumers can read the same data independently
- Pro: Safe to use with
fork()for parallel processing - Con: Memory usage equals full stream size until completion
§When to Use
Use stream() when:
- Multiple consumers need to read the same data
- You need to fork the stream for parallel processing
- Memory is not a concern for the stream size
- You might need to re-read fragments
For single-consumer scenarios with large streams, prefer
stream_with_reclaim() for better memory efficiency.
Sourcepub fn stream_with_reclaim(&self) -> StreamingInboundStream
pub fn stream_with_reclaim(&self) -> StreamingInboundStream
Creates a streaming view with automatic memory reclamation.
Unlike stream(), this version takes ownership of
fragments as they are read, freeing memory progressively.
§Memory Behavior
Each fragment is removed from the buffer immediately after being read:
- Pro: Memory usage stays constant regardless of stream size
- Pro: Ideal for processing large streams (10MB+) without OOM
- Con: Fragments cannot be read again once consumed
- Con: Incompatible with
fork()or multiple consumers
§When to Use
Use stream_with_reclaim() when:
- Only one consumer will read the stream
- The stream is large and memory is a concern
- Data is processed once and discarded (e.g., forwarding, hashing)
- You don’t need
fork()or parallel consumers
§Warning
Do not use with fork() or multiple consumers. Once a fragment is
read by this stream, it is permanently removed from the buffer. Other
consumers (including forked handles) will wait forever for fragments
that have already been consumed.
§Example
// Single consumer processing a large file
let mut stream = handle.stream_with_reclaim();
let mut hasher = Sha256::new();
while let Some(chunk) = stream.next().await {
hasher.update(&chunk?);
// Memory for this chunk is freed immediately
}Sourcepub fn fork(&self) -> Self
pub fn fork(&self) -> Self
Forks this handle, creating an independent consumer.
The forked handle shares the same underlying buffer but maintains
its own read position when used with .stream().
Sourcepub fn try_assemble(&self) -> Option<Vec<u8>>
pub fn try_assemble(&self) -> Option<Vec<u8>>
Assembles the complete data if all fragments are present.
This is a convenience method for waiting until the stream is complete and then getting all data at once.
Trait Implementations§
Source§impl Clone for StreamHandle
impl Clone for StreamHandle
Source§fn clone(&self) -> StreamHandle
fn clone(&self) -> StreamHandle
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreAuto Trait Implementations§
impl Freeze for StreamHandle
impl !RefUnwindSafe for StreamHandle
impl Send for StreamHandle
impl Sync for StreamHandle
impl Unpin for StreamHandle
impl !UnwindSafe for StreamHandle
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
fn into_either(self, into_left: bool) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self> ⓘ
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.