Skip to main content

WindowOperator

Struct WindowOperator 

Source
pub struct WindowOperator {
Show 13 fields pub parent: Arc<Operators>, pub node: FlowNodeId, pub kind: WindowKind, pub group_by: Vec<Expression>, pub aggregations: Vec<Expression>, pub ts: Option<String>, pub compiled_group_by: Vec<CompiledExpr>, pub compiled_aggregations: Vec<CompiledExpr>, pub layout: RowSchema, pub functions: Functions, pub row_number_provider: RowNumberProvider, pub runtime_context: RuntimeContext, pub projected_columns: Vec<String>,
}
Expand description

The main window operator

Fields§

§parent: Arc<Operators>§node: FlowNodeId§kind: WindowKind§group_by: Vec<Expression>§aggregations: Vec<Expression>§ts: Option<String>§compiled_group_by: Vec<CompiledExpr>§compiled_aggregations: Vec<CompiledExpr>§layout: RowSchema§functions: Functions§row_number_provider: RowNumberProvider§runtime_context: RuntimeContext§projected_columns: Vec<String>

Column names needed by group_by + aggregations expressions. When empty, no projection is applied (all columns stored).

Implementations§

Source§

impl WindowOperator

Source

pub fn evict_old_events(&self, state: &mut WindowState, current_timestamp: u64)

Evict old events from rolling window to maintain size limit

Source

pub fn tick_rolling_eviction( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>

Tick-based eviction for duration-based rolling windows. Scans all operator state, finds “win:” keys, and evicts old events.

Source§

impl WindowOperator

Source

pub fn tick_session_expiration( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>

Tick-based session expiration. Scans all operator state, finds “win:” keys with expired sessions.

Source§

impl WindowOperator

Source

pub fn get_sliding_window_ids(&self, timestamp_or_row_index: u64) -> Vec<u64>

Determine which windows an event belongs to for sliding windows

Source

pub fn set_sliding_window_start(&self, timestamp: u64, window_id: u64) -> u64

Set window start time for sliding windows (aligned to slide boundaries)

Source§

impl WindowOperator

Source

pub fn get_tumbling_window_id(&self, timestamp: u64) -> u64

Determine which window an event belongs to for tumbling windows

Source

pub fn set_tumbling_window_start(&self, timestamp: u64) -> u64

Set window start time for tumbling windows (aligned to window boundaries)

Source§

impl WindowOperator

Source

pub fn new( parent: Arc<Operators>, node: FlowNodeId, kind: WindowKind, group_by: Vec<Expression>, aggregations: Vec<Expression>, ts: Option<String>, runtime_context: RuntimeContext, functions: Functions, ) -> Self

Source

pub fn current_timestamp(&self) -> u64

Get the current timestamp in milliseconds

Source

pub fn project_columns(&self, columns: &Columns) -> Columns

Project a single-row Columns down to only the columns needed by window expressions.

Source

pub fn is_count_based(&self) -> bool

Whether this is a count-based window

Source

pub fn size_duration(&self) -> Option<Duration>

Get the window size as duration (if time-based)

Source

pub fn size_count(&self) -> Option<u64>

Get the window size as count (if count-based)

Source

pub fn compute_group_keys(&self, columns: &Columns) -> Result<Vec<Hash128>>

Compute group keys for all rows in Columns

Source

pub fn resolve_event_timestamps( &self, columns: &Columns, row_count: usize, ) -> Result<Vec<u64>>

Resolve event timestamps for all rows. When ts is configured, reads from the named DateTime column. Otherwise falls back to processing time (current clock).

Source

pub fn create_window_key( &self, group_hash: Hash128, window_id: u64, ) -> EncodedKey

Create a window key for storage

Source

pub fn store_row_index( &self, txn: &mut FlowTransaction, group_hash: Hash128, row_number: RowNumber, window_id: u64, ) -> Result<()>

Store a row_number → window_ids mapping. Appends window_id to the existing list (supports sliding windows with multiple windows per event).

Source

pub fn extract_group_values( &self, window_layout: &WindowLayout, events: &[WindowEvent], ) -> Result<(Vec<Value>, Vec<String>)>

Extract group values from window events (all events in a group have the same group values). Evaluates compiled_group_by expressions on the first row of the events.

Source

pub fn events_to_columns( &self, window_layout: &WindowLayout, events: &[WindowEvent], ) -> Result<Columns>

Convert window events to columnar format for aggregation

Source

pub fn apply_aggregations( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, window_layout: &WindowLayout, events: &[WindowEvent], ) -> Result<Option<(Row, bool)>>

Apply aggregations to all events in a window

Source

pub fn process_expired_windows( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>

Process expired windows: emit Remove diffs for each, then delete state. Uses the group registry for per-group targeted expiration.

Source

pub fn load_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<WindowState>

Load window state from storage

Source

pub fn save_window_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, state: &WindowState, ) -> Result<()>

Save window state to storage

Source

pub fn get_and_increment_global_count( &self, txn: &mut FlowTransaction, group_hash: Hash128, ) -> Result<u64>

Get and increment global event count for count-based windows

Source

pub fn create_count_key(&self, group_hash: Hash128) -> EncodedKey

Create a count key for global event counting

Source

pub fn load_group_registry( &self, txn: &mut FlowTransaction, ) -> Result<Vec<Hash128>>

Load the set of active group hashes from the registry.

Source

pub fn register_group( &self, txn: &mut FlowTransaction, group_hash: Hash128, ) -> Result<()>

Register a group hash in the registry if not already present.

Source

pub fn tick_expire_windows( &self, txn: &mut FlowTransaction, current_timestamp: u64, ) -> Result<Vec<Diff>>

Tick-based window expiration for tumbling/sliding windows. Scans all operator state, finds expired “win:” windows, emits Remove and cleans up.

Source

pub fn process_insert( &self, txn: &mut FlowTransaction, columns: &Columns, group_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns, Hash128) -> Result<Vec<Diff>>, ) -> Result<Vec<Diff>>

Shared: partition columns by group keys and call group_fn for each group.

Source

pub fn apply_window_change( &self, txn: &mut FlowTransaction, change: &Change, expire: bool, process_fn: impl Fn(&WindowOperator, &mut FlowTransaction, &Columns) -> Result<Vec<Diff>>, ) -> Result<Vec<Diff>>

Shared: iterate change diffs and process inserts/updates via process_fn. Optionally runs expiration first (all kinds except rolling).

Source

pub fn emit_aggregation_diff( aggregated_row: &Row, is_new: bool, previous_aggregation: Option<(Row, bool)>, ) -> Diff

Shared: emit an Insert or Update diff for an aggregation result. previous_aggregation is the pre-update state (if the window already existed).

Trait Implementations§

Source§

impl Operator for WindowOperator

Source§

fn id(&self) -> FlowNodeId

Source§

fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change>

Source§

fn tick( &self, txn: &mut FlowTransaction, timestamp: DateTime, ) -> Result<Option<Change>>

Periodic tick for time-based maintenance (e.g., window eviction). Returns Some(Change) with diffs if maintenance produced changes.
Source§

fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns>

Source§

impl RawStatefulOperator for WindowOperator

Source§

fn state_get( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<Option<EncodedRow>>

Get raw bytes for a key
Source§

fn state_set( &self, txn: &mut FlowTransaction, key: &EncodedKey, value: EncodedRow, ) -> Result<()>

Set raw bytes for a key
Source§

fn state_remove( &self, txn: &mut FlowTransaction, key: &EncodedKey, ) -> Result<()>

Remove a key
Source§

fn state_scan(&self, txn: &mut FlowTransaction) -> Result<StateIterator>

Scan all keys for this operator
Source§

fn state_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<StateIterator>

Range query between keys
Source§

fn state_clear(&self, txn: &mut FlowTransaction) -> Result<()>

Clear all state for this operator
Source§

impl WindowStateful for WindowOperator

Source§

fn layout(&self) -> RowSchema

Get or create the layout for state rows
Source§

fn create_state(&self) -> EncodedRow

Create a new state encoded with default values
Source§

fn load_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, ) -> Result<EncodedRow>

Load state for a window
Source§

fn save_state( &self, txn: &mut FlowTransaction, window_key: &EncodedKey, row: EncodedRow, ) -> Result<()>

Save state for a window
Source§

fn scan_keys_in_range( &self, txn: &mut FlowTransaction, range: &EncodedKeyRange, ) -> Result<Vec<EncodedKey>>

Scan keys within a range without removing them (read-only)
Source§

fn expire_range( &self, txn: &mut FlowTransaction, range: EncodedKeyRange, ) -> Result<u32>

Expire windows within a given range The range should be constructed by the caller based on their window ordering semantics

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> GetSetFdFlags for T

Source§

fn get_fd_flags(&self) -> Result<FdFlags, Error>
where T: AsFilelike,

Query the “status” flags for the self file descriptor.
Source§

fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>
where T: AsFilelike,

Create a new SetFdFlags value for use with set_fd_flags. Read more
Source§

fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>
where T: AsFilelike,

Set the “status” flags for the self file descriptor. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Pointee for T

Source§

type Pointer = u32

Source§

fn debug( pointer: <T as Pointee>::Pointer, f: &mut Formatter<'_>, ) -> Result<(), Error>

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more