pub struct WindowOperator {Show 13 fields
pub parent: Arc<Operators>,
pub node: FlowNodeId,
pub kind: WindowKind,
pub group_by: Vec<Expression>,
pub aggregations: Vec<Expression>,
pub ts: Option<String>,
pub compiled_group_by: Vec<CompiledExpr>,
pub compiled_aggregations: Vec<CompiledExpr>,
pub layout: RowSchema,
pub functions: Functions,
pub row_number_provider: RowNumberProvider,
pub runtime_context: RuntimeContext,
pub projected_columns: Vec<String>,
}Expand description
The main window operator
Fields§
§parent: Arc<Operators>§node: FlowNodeId§kind: WindowKind§group_by: Vec<Expression>§aggregations: Vec<Expression>§ts: Option<String>§compiled_group_by: Vec<CompiledExpr>§compiled_aggregations: Vec<CompiledExpr>§layout: RowSchema§functions: Functions§row_number_provider: RowNumberProvider§runtime_context: RuntimeContext§projected_columns: Vec<String>Column names needed by group_by + aggregations expressions. When empty, no projection is applied (all columns stored).
Implementations§
Source§impl WindowOperator
impl WindowOperator
Sourcepub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64)
pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64)
Evict old events from rolling window to maintain size limit
Sourcepub fn tick_rolling_eviction(
&self,
txn: &mut FlowTransaction,
current_timestamp: u64,
) -> Result<Vec<Diff>>
pub fn tick_rolling_eviction( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>
Tick-based eviction for duration-based rolling windows. Scans all operator state, finds “win:” keys, and evicts old events.
Source§impl WindowOperator
impl WindowOperator
Sourcepub fn tick_session_expiration(
&self,
txn: &mut FlowTransaction,
current_timestamp: u64,
) -> Result<Vec<Diff>>
pub fn tick_session_expiration( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>
Tick-based session expiration. Scans all operator state, finds “win:” keys with expired sessions.
Source§impl WindowOperator
impl WindowOperator
Sourcepub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64>
pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64>
Determine which windows an event belongs to for sliding windows
Sourcepub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64
pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64
Set window start time for sliding windows (aligned to slide boundaries)
Source§impl WindowOperator
impl WindowOperator
Sourcepub fn get_tumbling_window_id(&self, timestamp: u64) -> u64
pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64
Determine which window an event belongs to for tumbling windows
Sourcepub fn set_tumbling_window_start(&self, timestamp: u64) -> u64
pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64
Set window start time for tumbling windows (aligned to window boundaries)
Source§impl WindowOperator
impl WindowOperator
pub fn new( parent: Arc<Operators>, node: FlowNodeId, kind: WindowKind, group_by: Vec<Expression>, aggregations: Vec<Expression>, ts: Option<String>, runtime_context: RuntimeContext, functions: Functions, ) -> Self
Sourcepub fn current_timestamp(&self) -> u64
pub fn current_timestamp(&self) -> u64
Get the current timestamp in milliseconds
Sourcepub fn project_columns(&self, columns: &Columns) -> Columns
pub fn project_columns(&self, columns: &Columns) -> Columns
Project a single-row Columns down to only the columns needed by window expressions.
Sourcepub fn is_count_based(&self) -> bool
pub fn is_count_based(&self) -> bool
Whether this is a count-based window
Sourcepub fn size_duration(&self) -> Option<Duration>
pub fn size_duration(&self) -> Option<Duration>
Get the window size as duration (if time-based)
Sourcepub fn size_count(&self) -> Option<u64>
pub fn size_count(&self) -> Option<u64>
Get the window size as count (if count-based)
Sourcepub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>>
pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>>
Compute group keys for all rows in Columns
Sourcepub fn resolve_event_timestamps(
&self,
columns: &Columns,
row_count: usize,
) -> Result<Vec<u64>>
pub fn resolve_event_timestamps( &self, columns: &Columns, row_count: usize, ) -> Result<Vec<u64>>
Resolve event timestamps for all rows.
When ts is configured, reads from the named DateTime column.
Otherwise falls back to processing time (current clock).
Sourcepub fn create_window_key(
&self,
group_hash: Hash128,
window_id: u64,
) -> EncodedKey
pub fn create_window_key( &self, group_hash: Hash128, window_id: u64, ) -> EncodedKey
Create a window key for storage
Sourcepub fn store_row_index(
&self,
txn: &mut FlowTransaction,
group_hash: Hash128,
row_number: RowNumber,
window_id: u64,
) -> Result<()>
pub fn store_row_index( &self, txn: &mut FlowTransaction, group_hash: Hash128, row_number: RowNumber, window_id: u64, ) -> Result<()>
Store a row_number → window_ids mapping. Appends window_id to the existing list (supports sliding windows with multiple windows per event).
Sourcepub fn extract_group_values(
&self,
window_layout: &WindowLayout,
events: &[WindowEvent],
) -> Result<(Vec<Value>, Vec<String>)>
pub fn extract_group_values( &self, window_layout: &WindowLayout, events: &[WindowEvent], ) -> Result<(Vec<Value>, Vec<String>)>
Extract group values from window events (all events in a group have the same group values). Evaluates compiled_group_by expressions on the first row of the events.
Sourcepub fn events_to_columns(
&self,
window_layout: &WindowLayout,
events: &[WindowEvent],
) -> Result<Columns>
pub fn events_to_columns( &self, window_layout: &WindowLayout, events: &[WindowEvent], ) -> Result<Columns>
Convert window events to columnar format for aggregation
Sourcepub fn apply_aggregations(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
window_layout: &WindowLayout,
events: &[WindowEvent],
) -> Result<Option<(Row, bool)>>
pub fn apply_aggregations( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, window_layout: &WindowLayout, events: &[WindowEvent], ) -> Result<Option<(Row, bool)>>
Apply aggregations to all events in a window
Sourcepub fn process_expired_windows(
&self,
txn: &mut FlowTransaction,
current_timestamp: u64,
) -> Result<Vec<Diff>>
pub fn process_expired_windows( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>
Process expired windows: emit Remove diffs for each, then delete state. Uses the group registry for per-group targeted expiration.
Sourcepub fn load_window_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
) -> Result<WindowState>
pub fn load_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<WindowState>
Load window state from storage
Sourcepub fn save_window_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
state: &WindowState,
) -> Result<()>
pub fn save_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, state: &WindowState, ) -> Result<()>
Save window state to storage
Sourcepub fn get_and_increment_global_count(
&self,
txn: &mut FlowTransaction,
group_hash: Hash128,
) -> Result<u64>
pub fn get_and_increment_global_count( &self, txn: &mut FlowTransaction, group_hash: Hash128, ) -> Result<u64>
Get and increment global event count for count-based windows
Sourcepub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey
pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey
Create a count key for global event counting
Sourcepub fn load_group_registry(
&self,
txn: &mut FlowTransaction,
) -> Result<Vec<Hash128>>
pub fn load_group_registry( &self, txn: &mut FlowTransaction, ) -> Result<Vec<Hash128>>
Load the set of active group hashes from the registry.
Sourcepub fn register_group(
&self,
txn: &mut FlowTransaction,
group_hash: Hash128,
) -> Result<()>
pub fn register_group( &self, txn: &mut FlowTransaction, group_hash: Hash128, ) -> Result<()>
Register a group hash in the registry if not already present.
Sourcepub fn tick_expire_windows(
&self,
txn: &mut FlowTransaction,
current_timestamp: u64,
) -> Result<Vec<Diff>>
pub fn tick_expire_windows( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>
Tick-based window expiration for tumbling/sliding windows. Scans all operator state, finds expired “win:” windows, emits Remove and cleans up.
Sourcepub fn process_insert(
&self,
txn: &mut FlowTransaction,
columns: &Columns,
group_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns, Hash128) -> Result<Vec<Diff>>,
) -> Result<Vec<Diff>>
pub fn process_insert( &self, txn: &mut FlowTransaction, columns: &Columns, group_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns, Hash128) -> Result<Vec<Diff>>, ) -> Result<Vec<Diff>>
Shared: partition columns by group keys and call group_fn for each group.
Sourcepub fn apply_window_change(
&self,
txn: &mut FlowTransaction,
change: &Change,
expire: bool,
process_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>,
) -> Result<Vec<Diff>>
pub fn apply_window_change( &self, txn: &mut FlowTransaction, change: &Change, expire: bool, process_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>, ) -> Result<Vec<Diff>>
Shared: iterate change diffs and process inserts/updates via process_fn.
Optionally runs expiration first (all kinds except rolling).
Trait Implementations§
Source§impl Operator for WindowOperator
impl Operator for WindowOperator
fn id(&self) -> FlowNodeId
fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>
Source§fn tick(
&self,
txn: &mut FlowTransaction,
timestamp: DateTime,
) -> Result<Option<Change>>
fn tick( &self, txn: &mut FlowTransaction, timestamp: DateTime, ) -> Result<Option<Change>>
fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>
Source§impl RawStatefulOperator for WindowOperator
impl RawStatefulOperator for WindowOperator
Source§fn state_get(
&self,
txn: &mut FlowTransaction,
key: &EncodedKey,
) -> Result<Option<EncodedRow>>
fn state_get( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<Option<EncodedRow>>
Source§fn state_set(
&self,
txn: &mut FlowTransaction,
key: &EncodedKey,
value: EncodedRow,
) -> Result<()>
fn state_set( &self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedRow, ) -> Result<()>
Source§fn state_remove(
&self,
txn: &mut FlowTransaction,
key: &EncodedKey,
) -> Result<()>
fn state_remove( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<()>
Source§fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator>
fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator>
Source§fn state_range(
&self,
txn: &mut FlowTransaction,
range: EncodedKeyRange,
) -> Result<StateIterator>
fn state_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<StateIterator>
Source§fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()>
fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()>
Source§impl WindowStateful for WindowOperator
impl WindowStateful for WindowOperator
Source§fn create_state(&self) -> EncodedRow
fn create_state(&self) -> EncodedRow
Source§fn load_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
) -> Result<EncodedRow>
fn load_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<EncodedRow>
Source§fn save_state(
&self,
txn: &mut FlowTransaction,
window_key: &EncodedKey,
row: EncodedRow,
) -> Result<()>
fn save_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow, ) -> Result<()>
Source§fn scan_keys_in_range(
&self,
txn: &mut FlowTransaction,
range: &EncodedKeyRange,
) -> Result<Vec<EncodedKey>>
fn scan_keys_in_range( &self, txn: &mut FlowTransaction, range: &EncodedKeyRange, ) -> Result<Vec<EncodedKey>>
Source§fn expire_range(
&self,
txn: &mut FlowTransaction,
range: EncodedKeyRange,
) -> Result<u32>
fn expire_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<u32>
Auto Trait Implementations§
impl Freeze for WindowOperator
impl !RefUnwindSafe for WindowOperator
impl Send for WindowOperator
impl Sync for WindowOperator
impl Unpin for WindowOperator
impl UnsafeUnpin for WindowOperator
impl !UnwindSafe for WindowOperator
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> GetSetFdFlags for T
impl<T> GetSetFdFlags for T
Source§fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
self file descriptor.Source§fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
Source§fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
self file descriptor. Read moreSource§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request