pub struct ChannelStreamSource { /* private fields */ }Expand description
A streaming source that receives data through a channel.
This is the primary integration point between LaminarDB’s push-based
Reactor and DataFusion’s pull-based query execution. Data is pushed
into the source via BridgeSender, and DataFusion pulls it through
the stream.
§Important Usage Pattern
The sender must be taken (not cloned) to ensure proper channel closure:
// Create the source and take the sender
let source = ChannelStreamSource::new(schema);
let sender = source.take_sender().expect("sender available");
// Register with `DataFusion`
let provider = StreamingTableProvider::new("events", Arc::new(source));
ctx.register_table("events", Arc::new(provider))?;
// Push data from Reactor
sender.send(batch).await?;
// IMPORTANT: Drop the sender to close the channel before querying
drop(sender);
// Execute query
let df = ctx.sql("SELECT * FROM events").await?;
let results = df.collect().await?;§Thread Safety
The source is thread-safe and can be shared across threads. The sender can be cloned after being taken to allow multiple producers.
Implementations§
Source§impl ChannelStreamSource
impl ChannelStreamSource
Sourcepub fn new(schema: SchemaRef) -> Self
pub fn new(schema: SchemaRef) -> Self
Creates a new channel stream source with default capacity.
§Arguments
schema- Schema of theRecordBatchinstances that will be pushed
Sourcepub fn with_capacity(schema: SchemaRef, capacity: usize) -> Self
pub fn with_capacity(schema: SchemaRef, capacity: usize) -> Self
Creates a new channel stream source with specified capacity.
§Arguments
schema- Schema of theRecordBatchinstances that will be pushedcapacity- Maximum number of batches that can be buffered
Sourcepub fn with_ordering(self, ordering: Vec<SortColumn>) -> Self
pub fn with_ordering(self, ordering: Vec<SortColumn>) -> Self
Declares that this source produces data in the given sort order.
When set, DataFusion can elide SortExec for ORDER BY queries
that match the declared ordering.
§Arguments
ordering- The columns that the output is sorted by
Sourcepub fn take_sender(&self) -> Option<BridgeSender>
pub fn take_sender(&self) -> Option<BridgeSender>
Takes the sender for pushing batches into this source.
This method can only be called once. The sender is moved out of the source to ensure the caller has full ownership and can close the channel by dropping the sender.
The returned sender can be cloned to allow multiple producers.
Returns None if the sender was already taken.
Sourcepub fn sender(&self) -> Option<BridgeSender>
pub fn sender(&self) -> Option<BridgeSender>
Returns a clone of the sender if it hasn’t been taken yet.
Warning: Using this method can lead to channel leak issues if
the original sender is never dropped. Prefer take_sender() for
proper channel lifecycle management.
Sourcepub fn reset(&self) -> BridgeSender
pub fn reset(&self) -> BridgeSender
Resets the source with a new bridge and sender.
This is useful when you need to reuse the source after the previous stream has been consumed. Any data sent before the reset but not yet consumed will be lost.
Returns the new sender.
Trait Implementations§
Source§impl Debug for ChannelStreamSource
impl Debug for ChannelStreamSource
Source§impl StreamSource for ChannelStreamSource
impl StreamSource for ChannelStreamSource
Source§fn schema(&self) -> SchemaRef
fn schema(&self) -> SchemaRef
Source§fn output_ordering(&self) -> Option<Vec<SortColumn>>
fn output_ordering(&self) -> Option<Vec<SortColumn>>
Auto Trait Implementations§
impl !Freeze for ChannelStreamSource
impl !RefUnwindSafe for ChannelStreamSource
impl Send for ChannelStreamSource
impl Sync for ChannelStreamSource
impl Unpin for ChannelStreamSource
impl UnsafeUnpin for ChannelStreamSource
impl UnwindSafe for ChannelStreamSource
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.