pub struct StreamJoinOperator { /* private fields */ }Expand description
Stream-stream join operator.
Joins events from two streams based on a key column and time bound. Events are matched if they share a key value and their timestamps are within the specified time window.
§State Management
Events from both sides are stored in state until they can no longer
produce matches (watermark passes timestamp + time_bound). State
is automatically cleaned up via timers.
§Performance Considerations
- State grows linearly with the number of events within the time window
- For high-cardinality joins, consider using shorter time bounds
- Inner joins use less state than outer joins (no unmatched tracking)
§Optimizations (F057)
- CPU-Friendly Encoding: Use
JoinRowEncoding::CpuFriendlyfor 30-50% faster access - Asymmetric Compaction: Automatically skips compaction on finished/idle sides
- Per-Key Tracking: Aggressive cleanup for sparse key patterns
- Build-Side Pruning: Early pruning based on probe-side watermark progress
Implementations§
Source§impl StreamJoinOperator
impl StreamJoinOperator
Sourcepub fn new(
left_key_column: String,
right_key_column: String,
time_bound: Duration,
join_type: JoinType,
) -> Self
pub fn new( left_key_column: String, right_key_column: String, time_bound: Duration, join_type: JoinType, ) -> Self
Creates a new stream join operator.
§Arguments
left_key_column- Name of the key column in left stream eventsright_key_column- Name of the key column in right stream eventstime_bound- Maximum time difference for matching eventsjoin_type- Type of join to perform
Sourcepub fn with_id(
left_key_column: String,
right_key_column: String,
time_bound: Duration,
join_type: JoinType,
operator_id: String,
) -> Self
pub fn with_id( left_key_column: String, right_key_column: String, time_bound: Duration, join_type: JoinType, operator_id: String, ) -> Self
Creates a new stream join operator with a custom operator ID.
Sourcepub fn from_config(config: StreamJoinConfig) -> Self
pub fn from_config(config: StreamJoinConfig) -> Self
Creates a new stream join operator from configuration (F057).
This is the recommended constructor for production use, allowing fine-grained control over optimization settings.
§Example
use laminar_core::operator::stream_join::{
StreamJoinOperator, StreamJoinConfig, JoinType, JoinRowEncoding,
};
use std::time::Duration;
let config = StreamJoinConfig::builder()
.left_key_column("order_id")
.right_key_column("order_id")
.time_bound(Duration::from_secs(3600))
.join_type(JoinType::Inner)
.row_encoding(JoinRowEncoding::CpuFriendly)
.build();
let operator = StreamJoinOperator::from_config(config);Sourcepub fn time_bound_ms(&self) -> i64
pub fn time_bound_ms(&self) -> i64
Returns the time bound in milliseconds.
Sourcepub fn metrics(&self) -> &JoinMetrics
pub fn metrics(&self) -> &JoinMetrics
Returns the metrics.
Sourcepub fn reset_metrics(&mut self)
pub fn reset_metrics(&mut self)
Resets the metrics.
Sourcepub fn row_encoding(&self) -> JoinRowEncoding
pub fn row_encoding(&self) -> JoinRowEncoding
Returns the row encoding strategy (F057).
Sourcepub fn asymmetric_compaction_enabled(&self) -> bool
pub fn asymmetric_compaction_enabled(&self) -> bool
Returns whether asymmetric compaction is enabled (F057).
Sourcepub fn per_key_tracking_enabled(&self) -> bool
pub fn per_key_tracking_enabled(&self) -> bool
Returns whether per-key tracking is enabled (F057).
Sourcepub fn left_stats(&self) -> &SideStats
pub fn left_stats(&self) -> &SideStats
Returns the left-side statistics (F057).
Sourcepub fn right_stats(&self) -> &SideStats
pub fn right_stats(&self) -> &SideStats
Returns the right-side statistics (F057).
Sourcepub fn tracked_key_count(&self) -> usize
pub fn tracked_key_count(&self) -> usize
Returns the number of tracked keys (F057).
Sourcepub fn is_side_idle(&self, side: JoinSide, current_time: i64) -> bool
pub fn is_side_idle(&self, side: JoinSide, current_time: i64) -> bool
Checks if a side is considered “finished” (idle) (F057).
Sourcepub fn effective_build_side(&self) -> JoinSide
pub fn effective_build_side(&self) -> JoinSide
Determines the effective build side based on configuration or heuristics (F057).
Sourcepub fn process_side(
&mut self,
event: &Event,
side: JoinSide,
ctx: &mut OperatorContext<'_>,
) -> OutputVec
pub fn process_side( &mut self, event: &Event, side: JoinSide, ctx: &mut OperatorContext<'_>, ) -> OutputVec
Processes an event from either the left or right side.
This is the main entry point for the join operator. Call this with
the appropriate JoinSide to indicate which stream the event came from.
Sourcepub fn scan_idle_keys(&mut self, ctx: &mut OperatorContext<'_>)
pub fn scan_idle_keys(&mut self, ctx: &mut OperatorContext<'_>)
F057: Scans for idle keys and cleans them up aggressively.
Called periodically (e.g., on timer) to identify keys with no recent activity and remove their state entries.
Sourcepub fn should_skip_compaction(&self, side: JoinSide, current_time: i64) -> bool
pub fn should_skip_compaction(&self, side: JoinSide, current_time: i64) -> bool
F057: Checks if compaction should be skipped for a side due to asymmetric optimization.
Trait Implementations§
Source§impl Operator for StreamJoinOperator
impl Operator for StreamJoinOperator
Source§fn process(&mut self, event: &Event, ctx: &mut OperatorContext<'_>) -> OutputVec
fn process(&mut self, event: &Event, ctx: &mut OperatorContext<'_>) -> OutputVec
Source§fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext<'_>) -> OutputVec
fn on_timer(&mut self, timer: Timer, ctx: &mut OperatorContext<'_>) -> OutputVec
Source§fn checkpoint(&self) -> OperatorState
fn checkpoint(&self) -> OperatorState
Source§fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>
fn restore(&mut self, state: OperatorState) -> Result<(), OperatorError>
Auto Trait Implementations§
impl Freeze for StreamJoinOperator
impl RefUnwindSafe for StreamJoinOperator
impl Send for StreamJoinOperator
impl Sync for StreamJoinOperator
impl Unpin for StreamJoinOperator
impl UnwindSafe for StreamJoinOperator
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.