Skip to main content

ChannelStreamSource

Struct ChannelStreamSource 

Source
pub struct ChannelStreamSource { /* private fields */ }
Expand description

A streaming source that receives data through a channel.

This is the primary integration point between LaminarDB’s push-based Reactor and DataFusion’s pull-based query execution. Data is pushed into the source via BridgeSender, and DataFusion pulls it through the stream.

§Important Usage Pattern

The sender must be taken (not cloned) to ensure proper channel closure:

// Create the source and take the sender
let source = ChannelStreamSource::new(schema);
let sender = source.take_sender().expect("sender available");

// Register with `DataFusion`
let provider = StreamingTableProvider::new("events", Arc::new(source));
ctx.register_table("events", Arc::new(provider))?;

// Push data from Reactor
sender.send(batch).await?;

// IMPORTANT: Drop the sender to close the channel before querying
drop(sender);

// Execute query
let df = ctx.sql("SELECT * FROM events").await?;
let results = df.collect().await?;

§Thread Safety

The source is thread-safe and can be shared across threads. The sender can be cloned after being taken to allow multiple producers.

Implementations§

Source§

impl ChannelStreamSource

Source

pub fn new(schema: SchemaRef) -> Self

Creates a new channel stream source with default capacity.

§Arguments
  • schema - Schema of the RecordBatch instances that will be pushed
Source

pub fn with_capacity(schema: SchemaRef, capacity: usize) -> Self

Creates a new channel stream source with specified capacity.

§Arguments
  • schema - Schema of the RecordBatch instances that will be pushed
  • capacity - Maximum number of batches that can be buffered
Source

pub fn with_ordering(self, ordering: Vec<SortColumn>) -> Self

Declares that this source produces data in the given sort order.

When set, DataFusion can elide SortExec for ORDER BY queries that match the declared ordering.

§Arguments
  • ordering - The columns that the output is sorted by
Source

pub fn take_sender(&self) -> Option<BridgeSender>

Takes the sender for pushing batches into this source.

This method can only be called once. The sender is moved out of the source to ensure the caller has full ownership and can close the channel by dropping the sender.

The returned sender can be cloned to allow multiple producers.

Returns None if the sender was already taken.

Source

pub fn sender(&self) -> Option<BridgeSender>

Returns a clone of the sender if it hasn’t been taken yet.

Warning: Using this method can lead to channel leak issues if the original sender is never dropped. Prefer take_sender() for proper channel lifecycle management.

Source

pub fn reset(&self) -> BridgeSender

Resets the source with a new bridge and sender.

This is useful when you need to reuse the source after the previous stream has been consumed. Any data sent before the reset but not yet consumed will be lost.

Returns the new sender.

Trait Implementations§

Source§

impl Debug for ChannelStreamSource

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl StreamSource for ChannelStreamSource

Source§

fn schema(&self) -> SchemaRef

Returns the schema of records produced by this source. Read more
Source§

fn output_ordering(&self) -> Option<Vec<SortColumn>>

Returns the output ordering of this source, if any. Read more
Source§

fn stream( &self, projection: Option<Vec<usize>>, _filters: Vec<Expr>, ) -> Result<SendableRecordBatchStream, DataFusionError>

Creates a stream of RecordBatch instances. Read more
Source§

fn supports_filters(&self, filters: &[Expr]) -> Vec<bool>

Returns which filters this source can apply. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
Source§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

Source§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
Source§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The metadata type for pointers and references to this type.
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> Scope for T

Source§

fn with<F, R>(self, f: F) -> R
where Self: Sized, F: FnOnce(Self) -> R,

Scoped with ownership.
Source§

fn with_ref<F, R>(&self, f: F) -> R
where F: FnOnce(&Self) -> R,

Scoped with reference.
Source§

fn with_mut<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut Self) -> R,

Scoped with mutable reference.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Value for T
where T: Send + Sync + 'static,