Skip to main content

FlowTransaction

Enum FlowTransaction 

Source
pub enum FlowTransaction {
    Deferred {
        version: CommitVersion,
        pending: Pending,
        primitive_query: MultiReadTransaction,
        state_query: MultiReadTransaction,
        catalog: Catalog,
        interceptors: Interceptors,
    },
    Transactional {
        version: CommitVersion,
        pending: Pending,
        base_pending: Pending,
        primitive_query: MultiReadTransaction,
        state_query: MultiReadTransaction,
        catalog: Catalog,
        interceptors: Interceptors,
    },
}
Expand description

A transaction wrapper for flow processing with dual-version read semantics.

§Architecture

FlowTransaction provides dual-version reads critical for stateful flow processing:

  1. Source data - Read at CDC event version (snapshot isolation)
  2. Flow state - Read at latest version (state continuity across CDC events)
  3. Isolated writes - Local PendingWrites buffer returned to caller

This dual-version approach allows stateful operators (joins, aggregates, distinct) to:

  • Process source data at a consistent snapshot (the CDC event version)
  • Access their own state at the latest version to maintain continuity

§Dual-Version Read Routing

Reads are automatically routed to the correct query transaction based on key type:

┌─────────────────┐
│  FlowTransaction│
└────────┬────────┘
         │
         ├──► pending (flow-generated writes)
         │
         ├──► variant
         │    ├─ Deferred: skip
         │    └─ Transactional { base_pending }: check base_pending
         │
         ├──► primitive_query (at CDC version)
         │    - Source tables / views / regular data
         │
         └──► state_query (at latest version)
              - FlowNodeState / FlowNodeInternalState

§Construction

Use named constructors to enforce correct initialization:

§Write Path

All writes (set, remove) go to the local pending buffer:

§Thread Safety

FlowTransaction is Send because all fields are either Copy, owned, or

Variants§

§

Deferred

CDC-driven async flow processing. Reads only from committed storage + flow pending writes.

Fields

§pending: Pending
§primitive_query: MultiReadTransaction
§catalog: Catalog
§interceptors: Interceptors
§

Transactional

Inline flow processing within a committing transaction. Can additionally read uncommitted writes from the parent transaction.

Fields

§pending: Pending
§base_pending: Pending

Read-only snapshot of the committing transaction’s KV writes.

§primitive_query: MultiReadTransaction
§catalog: Catalog
§interceptors: Interceptors

Implementations§

Source§

impl FlowTransaction

Source

pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>>

Get a value by key, checking pending writes first, then (if transactional) base_pending, then querying multi-version store

Source

pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>

Check if a key exists

Source

pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>

Prefix scan

Source

pub fn range( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>

Create an iterator for forward range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries and maintains cursor state internally. Pending writes are merged with committed storage data.

Source

pub fn range_rev( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>

Create an iterator for reverse range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries in reverse key order and maintains cursor state internally.

Source§

impl FlowTransaction

Source

pub fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedValues>>

Get state for a specific flow node and key

Source

pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues, ) -> Result<()>

Set state for a specific flow node and key

Source

pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>

Remove state for a specific flow node and key

Source

pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>

Scan all state for a specific flow node

Source

pub fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>

Range query on state for a specific flow node

Source

pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>

Clear all state for a specific flow node

Source

pub fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, schema: &Schema, ) -> Result<EncodedValues>

Load state for a key, creating if not exists

Source

pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues, ) -> Result<()>

Save state encoded

Source§

impl FlowTransaction

Source

pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()>

Set a value, buffering it in pending writes

Source

pub fn remove(&mut self, key: &EncodedKey) -> Result<()>

Remove a key, buffering the deletion in pending operations

Source§

impl FlowTransaction

Source

pub fn deferred( parent: &AdminTransaction, version: CommitVersion, catalog: Catalog, interceptors: Interceptors, ) -> Self

Create a deferred (CDC) FlowTransaction from a parent transaction.

Used by the async worker path. Reads only from committed storage + flow-generated pending writes — no base pending from a parent transaction.

Source

pub fn deferred_from_parts( version: CommitVersion, pending: Pending, primitive_query: MultiReadTransaction, state_query: MultiReadTransaction, catalog: Catalog, interceptors: Interceptors, ) -> Self

Create a deferred (CDC) FlowTransaction from pre-built parts.

Used by the worker actor which creates its own query transactions.

Source

pub fn transactional( version: CommitVersion, pending: Pending, base_pending: Pending, primitive_query: MultiReadTransaction, state_query: MultiReadTransaction, catalog: Catalog, interceptors: Interceptors, ) -> Self

Create a transactional (inline) FlowTransaction.

Used by the pre-commit interceptor path. base_pending is a read-only snapshot of the committing transaction’s KV writes so that flow operators can see uncommitted row data.

Source

pub fn version(&self) -> CommitVersion

Get the transaction version.

Source

pub fn take_pending(&mut self) -> Pending

Extract pending writes, replacing them with an empty buffer.

Source

pub fn take_view_changes(&mut self) -> ViewChanges

Drain all generated view changes, returning them.

Source

pub fn push_view_change(&mut self, change: Change)

Append a view change (used by SinkViewOperator).

Source

pub fn update_version(&mut self, new_version: CommitVersion)

Update the transaction to read at a new version

Trait Implementations§

Source§

impl WithInterceptors for FlowTransaction

Source§

fn table_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync>

Access table pre-insert interceptor chain
Source§

fn table_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync>

Access table post-insert interceptor chain
Source§

fn table_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>

Access table pre-update interceptor chain
Source§

fn table_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>

Access table post-update interceptor chain
Source§

fn table_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>

Access table pre-delete interceptor chain
Source§

fn table_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync>

Access table post-delete interceptor chain
Source§

fn ringbuffer_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync>

Access ring buffer pre-insert interceptor chain
Source§

fn ringbuffer_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync>

Access ring buffer post-insert interceptor chain
Source§

fn ringbuffer_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>

Access ring buffer pre-update interceptor chain
Source§

fn ringbuffer_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>

Access ring buffer post-update interceptor chain
Source§

fn ringbuffer_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>

Access ring buffer pre-delete interceptor chain
Source§

fn ringbuffer_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync>

Access ring buffer post-delete interceptor chain
Source§

fn pre_commit_interceptors( &mut self, ) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync>

Access pre-commit interceptor chain
Source§

fn post_commit_interceptors( &mut self, ) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync>

Access post-commit interceptor chain
Source§

fn namespace_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPostCreateInterceptor + Send + Sync>

Access namespace post-create interceptor chain
Source§

fn namespace_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPreUpdateInterceptor + Send + Sync>

Access namespace pre-update interceptor chain
Source§

fn namespace_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPostUpdateInterceptor + Send + Sync>

Access namespace post-update interceptor chain
Source§

fn namespace_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPreDeleteInterceptor + Send + Sync>

Access namespace pre-delete interceptor chain
Source§

fn table_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync>

Access table definition post-create interceptor chain
Source§

fn table_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync>

Access table definition pre-update interceptor chain
Source§

fn table_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync>

Access table definition post-update interceptor chain
Source§

fn table_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync>

Access table definition pre-delete interceptor chain
Source§

fn view_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync>

Access view pre-insert interceptor chain
Source§

fn view_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync>

Access view post-insert interceptor chain
Source§

fn view_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>

Access view pre-update interceptor chain
Source§

fn view_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>

Access view post-update interceptor chain
Source§

fn view_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>

Access view pre-delete interceptor chain
Source§

fn view_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync>

Access view post-delete interceptor chain
Source§

fn view_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync>

Access view post-create interceptor chain
Source§

fn view_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync>

Access view pre-update interceptor chain
Source§

fn view_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync>

Access view post-update interceptor chain
Source§

fn view_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync>

Access view pre-delete interceptor chain
Source§

fn ringbuffer_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync>

Access ring buffer definition post-create interceptor chain
Source§

fn ringbuffer_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync>

Access ring buffer definition pre-update interceptor chain
Source§

fn ringbuffer_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync>

Access ring buffer definition post-update interceptor chain
Source§

fn ringbuffer_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync>

Access ring buffer definition pre-delete interceptor chain

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more