Skip to main content

WindowOperator

Struct WindowOperator 

Source
pub struct WindowOperator {
Show 16 fields pub parent: Arc<Operators>, pub node: FlowNodeId, pub window_type: WindowType, pub size: WindowSize, pub slide: Option<WindowSlide>, pub group_by: Vec<Expression>, pub aggregations: Vec<Expression>, pub compiled_group_by: Vec<CompiledExpr>, pub compiled_aggregations: Vec<CompiledExpr>, pub layout: Schema, pub functions: Functions, pub row_number_provider: RowNumberProvider, pub min_events: usize, pub max_window_count: Option<usize>, pub max_window_age: Option<Duration>, pub clock: Clock,
}
Expand description

The main window operator

Fields§

§parent: Arc<Operators>§node: FlowNodeId§window_type: WindowType§size: WindowSize§slide: Option<WindowSlide>§group_by: Vec<Expression>§aggregations: Vec<Expression>§compiled_group_by: Vec<CompiledExpr>§compiled_aggregations: Vec<CompiledExpr>§layout: Schema§functions: Functions§row_number_provider: RowNumberProvider§min_events: usize§max_window_count: Option<usize>§max_window_age: Option<Duration>§clock: Clock

Implementations§

Source§

impl WindowOperator

Source

pub fn should_evict_rolling_window( &self, state: &WindowState, current_timestamp: u64, ) -> bool

Check if rolling window should evict old events

Source

pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64)

Evict old events from rolling window to maintain size limit

Source§

impl WindowOperator

Source

pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64>

Determine which windows an event belongs to for sliding windows For time-based windows, pass the event timestamp For count-based windows, pass the row index (0-based)

Source

pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64

Set window start time for sliding windows (aligned to slide boundaries)

Source§

impl WindowOperator

Source

pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64

Determine which window an event belongs to for tumbling windows

Source

pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64

Set window start time for tumbling windows (aligned to window boundaries)

Source

pub fn should_start_new_tumbling_window( &self, current_window_start: u64, event_timestamp: u64, ) -> bool

Check if tumbling window should be moved to a new window due to time boundaries

Source

pub fn should_expire_tumbling_window( &self, state: &WindowState, current_timestamp: u64, ) -> bool

Check if a tumbling window should be expired (closed)

Source§

impl WindowOperator

Source

pub fn new( parent: Arc<Operators>, node: FlowNodeId, window_type: WindowType, size: WindowSize, slide: Option<WindowSlide>, group_by: Vec<Expression>, aggregations: Vec<Expression>, min_events: usize, max_window_count: Option<usize>, max_window_age: Option<Duration>, clock: Clock, functions: Functions, ) -> Self

Source

pub fn current_timestamp(&self) -> u64

Get the current timestamp in milliseconds

Source

pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>>

Compute group keys for all rows in Columns

Source

pub fn extract_timestamps(&self, columns: &Columns) -> Result<Vec<u64>>

Extract timestamps for all rows in Columns

Source

pub fn create_window_key( &self, group_hash: Hash128, window_id: u64, ) -> EncodedKey

Create a window key for storage

Source

pub fn extract_timestamp_from_row(&self, row: &Row) -> Result<u64>

Extract timestamp from row data

Source

pub fn extract_group_values( &self, events: &[WindowEvent], ) -> Result<(Vec<Value>, Vec<String>)>

Extract group values from window events (all events in a group have the same group values) TODO: Refactor to use column-based evaluation when window operator is needed

Source

pub fn events_to_columns(&self, events: &[WindowEvent]) -> Result<Columns>

Convert window events to columnar format for aggregation

Source

pub fn apply_aggregations( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, events: &[WindowEvent], ) -> Result<Option<(Row, bool)>>

Apply aggregations to all events in a window

Source

pub fn process_expired_windows( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>

Process expired windows and clean up state

Source

pub fn load_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<WindowState>

Load window state from storage

Source

pub fn save_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, state: &WindowState, ) -> Result<()>

Save window state to storage

Source

pub fn get_and_increment_global_count( &self, txn: &mut FlowTransaction, group_hash: Hash128, ) -> Result<u64>

Get and increment global event count for count-based windows

Source

pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey

Create a count key for global event counting

Source§

impl WindowOperator

Additional helper methods for window triggering

Source

pub fn should_trigger_window( &self, state: &WindowState, current_timestamp: u64, ) -> bool

Check if a window should be triggered (emitted)

Trait Implementations§

Source§

impl Operator for WindowOperator

Source§

fn id(&self) -> FlowNodeId

Source§

fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>

Source§

fn pull( &self, _txn: &mut FlowTransaction, _rows: &[RowNumber], ) -> Result<Columns>

Source§

impl RawStatefulOperator for WindowOperator

Source§

fn state_get( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<Option<EncodedValues>>

Get raw bytes for a key
Source§

fn state_set( &self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedValues, ) -> Result<()>

Set raw bytes for a key
Source§

fn state_remove( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<()>

Remove a key
Source§

fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator>

Scan all keys for this operator
Source§

fn state_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<StateIterator>

Range query between keys
Source§

fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()>

Clear all state for this operator
Source§

impl WindowStateful for WindowOperator

Source§

fn layout(&self) -> Schema

Get or create the layout for state rows
Source§

fn create_state(&self) -> EncodedValues

Create a new state encoded with default values
Source§

fn load_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<EncodedValues>

Load state for a window
Source§

fn save_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedValues, ) -> Result<()>

Save state for a window
Source§

fn expire_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<u32>

Expire windows within a given range The range should be constructed by the caller based on their window ordering semantics

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more