Skip to main content

StreamJoinOperator

Struct StreamJoinOperator 

Source
pub struct StreamJoinOperator { /* private fields */ }
Expand description

Stream-stream join operator.

Joins events from two streams based on a key column and time bound. Events are matched if they share a key value and their timestamps are within the specified time window.

§State Management

Events from both sides are stored in state until they can no longer produce matches (watermark passes timestamp + time_bound). State is automatically cleaned up via timers.

§Performance Considerations

  • State grows linearly with the number of events within the time window
  • For high-cardinality joins, consider using shorter time bounds
  • Inner joins use less state than outer joins (no unmatched tracking)

§Optimizations (F057)

  • CPU-Friendly Encoding: Use JoinRowEncoding::CpuFriendly for 30-50% faster access
  • Asymmetric Compaction: Automatically skips compaction on finished/idle sides
  • Per-Key Tracking: Aggressive cleanup for sparse key patterns
  • Build-Side Pruning: Early pruning based on probe-side watermark progress

Implementations§

Source§

impl StreamJoinOperator

Source

pub fn new( left_key_column: String, right_key_column: String, time_bound: Duration, join_type: JoinType, ) -> Self

Creates a new stream join operator.

§Arguments
  • left_key_column - Name of the key column in left stream events
  • right_key_column - Name of the key column in right stream events
  • time_bound - Maximum time difference for matching events
  • join_type - Type of join to perform
Source

pub fn with_id( left_key_column: String, right_key_column: String, time_bound: Duration, join_type: JoinType, operator_id: String, ) -> Self

Creates a new stream join operator with a custom operator ID.

Source

pub fn from_config(config: StreamJoinConfig) -> Self

Creates a new stream join operator from configuration (F057).

This is the recommended constructor for production use, allowing fine-grained control over optimization settings.

§Example
use laminar_core::operator::stream_join::{
    StreamJoinOperator, StreamJoinConfig, JoinType, JoinRowEncoding,
};
use std::time::Duration;

let config = StreamJoinConfig::builder()
    .left_key_column("order_id")
    .right_key_column("order_id")
    .time_bound(Duration::from_secs(3600))
    .join_type(JoinType::Inner)
    .row_encoding(JoinRowEncoding::CpuFriendly)
    .build();

let operator = StreamJoinOperator::from_config(config);
Source

pub fn join_type(&self) -> JoinType

Returns the join type.

Source

pub fn time_bound_ms(&self) -> i64

Returns the time bound in milliseconds.

Source

pub fn metrics(&self) -> &JoinMetrics

Returns the metrics.

Source

pub fn reset_metrics(&mut self)

Resets the metrics.

Source

pub fn row_encoding(&self) -> JoinRowEncoding

Returns the row encoding strategy (F057).

Source

pub fn asymmetric_compaction_enabled(&self) -> bool

Returns whether asymmetric compaction is enabled (F057).

Source

pub fn per_key_tracking_enabled(&self) -> bool

Returns whether per-key tracking is enabled (F057).

Source

pub fn left_stats(&self) -> &SideStats

Returns the left-side statistics (F057).

Source

pub fn right_stats(&self) -> &SideStats

Returns the right-side statistics (F057).

Source

pub fn tracked_key_count(&self) -> usize

Returns the number of tracked keys (F057).

Source

pub fn is_side_idle(&self, side: JoinSide, current_time: i64) -> bool

Checks if a side is considered “finished” (idle) (F057).

Source

pub fn effective_build_side(&self) -> JoinSide

Determines the effective build side based on configuration or heuristics (F057).

Source

pub fn process_side( &mut self, event: &Event, side: JoinSide, ctx: &mut OperatorContext<'_>, ) -> OutputVec

Processes an event from either the left or right side.

This is the main entry point for the join operator. Call this with the appropriate JoinSide to indicate which stream the event came from.

Source

pub fn scan_idle_keys(&mut self, ctx: &mut OperatorContext<'_>)

F057: Scans for idle keys and cleans them up aggressively.

Called periodically (e.g., on timer) to identify keys with no recent activity and remove their state entries.

Source

pub fn should_skip_compaction(&self, side: JoinSide, current_time: i64) -> bool

F057: Checks if compaction should be skipped for a side due to asymmetric optimization.

Trait Implementations§

Source§

impl Operator for StreamJoinOperator

Source§

fn process(&mut self, event: &Event, ctx: &mut OperatorContext<'_>) -> OutputVec

Process an incoming event
Source§

fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext<'_>) -> OutputVec

Handle timer expiration
Source§

fn checkpoint(&self) -> OperatorState

Checkpoint the operator’s state
Source§

fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>

Restore from a checkpoint Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
Source§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

Source§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
Source§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The metadata type for pointers and references to this type.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,