pub struct WindowOperator {Show 16 fields
pub parent: Arc<Operators>,
pub node: FlowNodeId,
pub window_type: WindowType,
pub size: WindowSize,
pub slide: Option<WindowSlide>,
pub group_by: Vec<Expression>,
pub aggregations: Vec<Expression>,
pub compiled_group_by: Vec<CompiledExpr>,
pub compiled_aggregations: Vec<CompiledExpr>,
pub layout: Schema,
pub functions: Functions,
pub row_number_provider: RowNumberProvider,
pub min_events: usize,
pub max_window_count: Option<usize>,
pub max_window_age: Option<Duration>,
pub clock: Clock,
}Expand description
The main window operator
Fields§
§parent: Arc<Operators>§node: FlowNodeId§window_type: WindowType§size: WindowSize§slide: Option<WindowSlide>§group_by: Vec<Expression>§aggregations: Vec<Expression>§compiled_group_by: Vec<CompiledExpr>§compiled_aggregations: Vec<CompiledExpr>§layout: Schema§functions: Functions§row_number_provider: RowNumberProvider§min_events: usize§max_window_count: Option<usize>§max_window_age: Option<Duration>§clock: ClockImplementations§
Source§impl WindowOperator
impl WindowOperator
Sourcepub fn should_evict_rolling_window(
&self,
state: &WindowState,
current_timestamp: u64,
) -> bool
pub fn should_evict_rolling_window( &self, state: &WindowState, current_timestamp: u64, ) -> bool
Check if rolling window should evict old events
Sourcepub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64)
pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64)
Evict old events from rolling window to maintain size limit
Source§impl WindowOperator
impl WindowOperator
Sourcepub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64>
pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64>
Determine which windows an event belongs to for sliding windows For time-based windows, pass the event timestamp For count-based windows, pass the row index (0-based)
Sourcepub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64
pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64
Set window start time for sliding windows (aligned to slide boundaries)
Source§impl WindowOperator
impl WindowOperator
Sourcepub fn get_tumbling_window_id(&self, timestamp: u64) -> u64
pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64
Determine which window an event belongs to for tumbling windows
Sourcepub fn set_tumbling_window_start(&self, timestamp: u64) -> u64
pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64
Set window start time for tumbling windows (aligned to window boundaries)
Sourcepub fn should_start_new_tumbling_window(
&self,
current_window_start: u64,
event_timestamp: u64,
) -> bool
pub fn should_start_new_tumbling_window( &self, current_window_start: u64, event_timestamp: u64, ) -> bool
Check if tumbling window should be moved to a new window due to time boundaries
Sourcepub fn should_expire_tumbling_window(
&self,
state: &WindowState,
current_timestamp: u64,
) -> bool
pub fn should_expire_tumbling_window( &self, state: &WindowState, current_timestamp: u64, ) -> bool
Check if a tumbling window should be expired (closed)
Source§impl WindowOperator
impl WindowOperator
pub fn new( parent: Arc<Operators>, node: FlowNodeId, window_type: WindowType, size: WindowSize, slide: Option<WindowSlide>, group_by: Vec<Expression>, aggregations: Vec<Expression>, min_events: usize, max_window_count: Option<usize>, max_window_age: Option<Duration>, clock: Clock, functions: Functions, ) -> Self
Sourcepub fn current_timestamp(&self) -> u64
pub fn current_timestamp(&self) -> u64
Get the current timestamp in milliseconds
Sourcepub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>>
pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>>
Compute group keys for all rows in Columns
Sourcepub fn extract_timestamps(&self, columns: &Columns) -> Result<Vec<u64>>
pub fn extract_timestamps(&self, columns: &Columns) -> Result<Vec<u64>>
Extract timestamps for all rows in Columns
Sourcepub fn create_window_key(
&self,
group_hash: Hash128,
window_id: u64,
) -> EncodedKey
pub fn create_window_key( &self, group_hash: Hash128, window_id: u64, ) -> EncodedKey
Create a window key for storage
Sourcepub fn extract_timestamp_from_row(&self, row: &Row) -> Result<u64>
pub fn extract_timestamp_from_row(&self, row: &Row) -> Result<u64>
Extract timestamp from row data
Sourcepub fn extract_group_values(
&self,
events: &[WindowEvent],
) -> Result<(Vec<Value>, Vec<String>)>
pub fn extract_group_values( &self, events: &[WindowEvent], ) -> Result<(Vec<Value>, Vec<String>)>
Extract group values from window events (all events in a group have the same group values) TODO: Refactor to use column-based evaluation when window operator is needed
Sourcepub fn events_to_columns(&self, events: &[WindowEvent]) -> Result<Columns>
pub fn events_to_columns(&self, events: &[WindowEvent]) -> Result<Columns>
Convert window events to columnar format for aggregation
Sourcepub fn apply_aggregations(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
events: &[WindowEvent],
) -> Result<Option<(Row, bool)>>
pub fn apply_aggregations( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, events: &[WindowEvent], ) -> Result<Option<(Row, bool)>>
Apply aggregations to all events in a window
Sourcepub fn process_expired_windows(
&self,
txn: &mut FlowTransaction,
current_timestamp: u64,
) -> Result<Vec<Diff>>
pub fn process_expired_windows( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>
Process expired windows and clean up state
Sourcepub fn load_window_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
) -> Result<WindowState>
pub fn load_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<WindowState>
Load window state from storage
Sourcepub fn save_window_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
state: &WindowState,
) -> Result<()>
pub fn save_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, state: &WindowState, ) -> Result<()>
Save window state to storage
Sourcepub fn get_and_increment_global_count(
&self,
txn: &mut FlowTransaction,
group_hash: Hash128,
) -> Result<u64>
pub fn get_and_increment_global_count( &self, txn: &mut FlowTransaction, group_hash: Hash128, ) -> Result<u64>
Get and increment global event count for count-based windows
Sourcepub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey
pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey
Create a count key for global event counting
Source§impl WindowOperator
Additional helper methods for window triggering
impl WindowOperator
Additional helper methods for window triggering
Sourcepub fn should_trigger_window(
&self,
state: &WindowState,
current_timestamp: u64,
) -> bool
pub fn should_trigger_window( &self, state: &WindowState, current_timestamp: u64, ) -> bool
Check if a window should be triggered (emitted)
Trait Implementations§
Source§impl Operator for WindowOperator
impl Operator for WindowOperator
Source§impl RawStatefulOperator for WindowOperator
impl RawStatefulOperator for WindowOperator
Source§fn state_get(
&self,
txn: &mut FlowTransaction,
key: &EncodedKey,
) -> Result<Option<EncodedValues>>
fn state_get( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<Option<EncodedValues>>
Source§fn state_set(
&self,
txn: &mut FlowTransaction,
key: &EncodedKey,
value: EncodedValues,
) -> Result<()>
fn state_set( &self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedValues, ) -> Result<()>
Source§fn state_remove(
&self,
txn: &mut FlowTransaction,
key: &EncodedKey,
) -> Result<()>
fn state_remove( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<()>
Source§fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator>
fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator>
Source§fn state_range(
&self,
txn: &mut FlowTransaction,
range: EncodedKeyRange,
) -> Result<StateIterator>
fn state_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<StateIterator>
Source§fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()>
fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()>
Source§impl WindowStateful for WindowOperator
impl WindowStateful for WindowOperator
Source§fn create_state(&self) -> EncodedValues
fn create_state(&self) -> EncodedValues
Source§fn load_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
) -> Result<EncodedValues>
fn load_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<EncodedValues>
Source§fn save_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
row: EncodedValues,
) -> Result<()>
fn save_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedValues, ) -> Result<()>
Source§fn expire_range(
&self,
txn: &mut FlowTransaction,
range: EncodedKeyRange,
) -> Result<u32>
fn expire_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<u32>
Auto Trait Implementations§
impl Freeze for WindowOperator
impl !RefUnwindSafe for WindowOperator
impl Send for WindowOperator
impl Sync for WindowOperator
impl Unpin for WindowOperator
impl UnsafeUnpin for WindowOperator
impl !UnwindSafe for WindowOperator
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request