Skip to main content

FlowTransaction

Enum FlowTransaction 

Source
pub enum FlowTransaction {
    Deferred {
        inner: FlowTransactionInner,
    },
    Transactional {
        inner: FlowTransactionInner,
        base_pending: Pending,
    },
}
Expand description

A transaction wrapper for flow processing with dual-version read semantics.

§Architecture

FlowTransaction provides dual-version reads critical for stateful flow processing:

  1. Source data - Read at CDC event version (snapshot isolation)
  2. Flow state - Read at latest version (state continuity across CDC events)
  3. Isolated writes - Local PendingWrites buffer returned to caller

This dual-version approach allows stateful operators (joins, aggregates, distinct) to:

  • Process source data at a consistent snapshot (the CDC event version)
  • Access their own state at the latest version to maintain continuity

§Dual-Version Read Routing

Reads are automatically routed to the correct query transaction based on key type:

┌─────────────────┐
│  FlowTransaction│
└────────┬────────┘
         │
         ├──► pending (flow-generated writes)
         │
         ├──► variant
         │    ├─ Deferred: skip
         │    └─ Transactional { base_pending }: check base_pending
         │
         ├──► primitive_query (at CDC version)
         │    - Source tables / views / regular data
         │
         └──► state_query (at latest version)
              - FlowNodeState / FlowNodeInternalState

§Construction

Use named constructors to enforce correct initialization:

§Write Path

All writes (set, remove) go to the local pending buffer:

§Thread Safety

FlowTransaction is Send because all fields are either Copy, owned, or

Variants§

§

Deferred

CDC-driven async flow processing. Reads only from committed storage + flow pending writes.

§

Transactional

Inline flow processing within a committing transaction. Can additionally read uncommitted writes from the parent transaction.

Fields

§base_pending: Pending

Read-only snapshot of the committing transaction’s KV writes.

Implementations§

Source§

impl FlowTransaction

Source

pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>>

Get a value by key, checking pending writes first, then (if transactional) base_pending, then querying multi-version store

Source

pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>

Check if a key exists

Source

pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>

Prefix scan

Source

pub fn range( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>

Create an iterator for forward range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries and maintains cursor state internally. Pending writes are merged with committed storage data.

Source

pub fn range_rev( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>

Create an iterator for reverse range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries in reverse key order and maintains cursor state internally.

Source§

impl FlowTransaction

Source

pub fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedRow>>

Get state for a specific flow node and key

Source

pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedRow, ) -> Result<()>

Set state for a specific flow node and key

Source

pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>

Remove state for a specific flow node and key

Source

pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>

Scan all state for a specific flow node

Source

pub fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>

Range query on state for a specific flow node

Source

pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>

Clear all state for a specific flow node

Source

pub fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, schema: &RowSchema, ) -> Result<EncodedRow>

Load state for a key, creating if not exists

Source

pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedRow, ) -> Result<()>

Save state encoded

Source§

impl FlowTransaction

Source

pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()>

Set a value, buffering it in pending writes

Source

pub fn remove(&mut self, key: &EncodedKey) -> Result<()>

Remove a key, buffering the deletion in pending operations

Source§

impl FlowTransaction

Source

pub fn deferred( parent: &AdminTransaction, version: CommitVersion, catalog: Catalog, interceptors: Interceptors, ) -> Self

Create a deferred (CDC) FlowTransaction from a parent transaction.

Used by the async worker path. Reads only from committed storage + flow-generated pending writes — no base pending from a parent transaction.

Source

pub fn deferred_from_parts( version: CommitVersion, pending: Pending, primitive_query: MultiReadTransaction, state_query: MultiReadTransaction, catalog: Catalog, interceptors: Interceptors, ) -> Self

Create a deferred (CDC) FlowTransaction from pre-built parts.

Used by the worker actor which creates its own query transactions.

Source

pub fn transactional( version: CommitVersion, pending: Pending, base_pending: Pending, primitive_query: MultiReadTransaction, state_query: MultiReadTransaction, catalog: Catalog, interceptors: Interceptors, ) -> Self

Create a transactional (inline) FlowTransaction.

Used by the pre-commit interceptor path. base_pending is a read-only snapshot of the committing transaction’s KV writes so that flow operators can see uncommitted row data.

Source

pub fn version(&self) -> CommitVersion

Get the transaction version.

Source

pub fn take_pending(&mut self) -> Pending

Extract pending writes, replacing them with an empty buffer.

Source

pub fn track_flow_change(&mut self, change: Change)

Track a view-level flow change in this transaction’s accumulator.

Source

pub fn take_accumulator_entries(&mut self) -> Vec<(SchemaId, Diff)>

Drain the accumulator entries collected during flow processing.

Source

pub fn update_version(&mut self, new_version: CommitVersion)

Update the transaction to read at a new version

Source

pub fn catalog(&self) -> &Catalog

Get access to the catalog for reading metadata

Trait Implementations§

Source§

impl WithInterceptors for FlowTransaction

Source§

fn table_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync>

Source§

fn table_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync>

Source§

fn table_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync>

Source§

fn table_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync>

Source§

fn table_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync>

Source§

fn table_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync>

Source§

fn ringbuffer_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync>

Source§

fn ringbuffer_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync>

Source§

fn ringbuffer_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync>

Source§

fn ringbuffer_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync>

Source§

fn pre_commit_interceptors( &mut self, ) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync>

Source§

fn post_commit_interceptors( &mut self, ) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync>

Source§

fn namespace_post_create_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync>

Source§

fn namespace_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync>

Source§

fn namespace_post_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync>

Source§

fn namespace_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync>

Source§

fn table_post_create_interceptors( &mut self, ) -> &mut Chain<dyn TablePostCreateInterceptor + Send + Sync>

Source§

fn table_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>

Source§

fn table_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>

Source§

fn table_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>

Source§

fn view_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync>

Source§

fn view_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync>

Source§

fn view_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync>

Source§

fn view_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync>

Source§

fn view_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync>

Source§

fn view_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync>

Source§

fn view_post_create_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync>

Source§

fn view_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>

Source§

fn view_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>

Source§

fn view_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>

Source§

fn ringbuffer_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync>

Source§

fn ringbuffer_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>

Source§

fn dictionary_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync>

Source§

fn dictionary_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync>

Source§

fn dictionary_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync>

Source§

fn dictionary_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync>

Source§

fn dictionary_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync>

Source§

fn dictionary_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync>

Source§

fn dictionary_post_create_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync>

Source§

fn dictionary_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync>

Source§

fn dictionary_post_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync>

Source§

fn dictionary_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync>

Source§

fn series_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync>

Source§

fn series_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync>

Source§

fn series_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync>

Source§

fn series_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync>

Source§

fn series_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync>

Source§

fn series_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync>

Source§

fn series_post_create_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync>

Source§

fn series_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync>

Source§

fn series_post_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync>

Source§

fn series_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync>

Source§

fn identity_post_create_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync>

Source§

fn identity_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync>

Source§

fn identity_post_update_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync>

Source§

fn identity_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync>

Source§

fn role_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RolePostCreateInterceptor + Send + Sync>

Source§

fn role_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync>

Source§

fn role_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync>

Source§

fn role_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync>

Source§

fn granted_role_post_create_interceptors( &mut self, ) -> &mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync>

Source§

fn granted_role_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync>

Source§

fn authentication_post_create_interceptors( &mut self, ) -> &mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync>

Source§

fn authentication_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync>

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> GetSetFdFlags for T

Source§

fn get_fd_flags(&self) -> Result<FdFlags, Error>
where T: AsFilelike,

Query the “status” flags for the self file descriptor.
Source§

fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>
where T: AsFilelike,

Create a new SetFdFlags value for use with set_fd_flags. Read more
Source§

fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>
where T: AsFilelike,

Set the “status” flags for the self file descriptor. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Pointee for T

Source§

type Pointer = u32

Source§

fn debug( pointer: <T as Pointee>::Pointer, f: &mut Formatter<'_>, ) -> Result<(), Error>

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more