Skip to main content

FlowTransaction

Struct FlowTransaction 

Source
pub struct FlowTransaction { /* private fields */ }
Expand description

A transaction wrapper for flow processing with dual-version read semantics.

§Architecture

FlowTransaction provides dual-version reads critical for stateful flow processing:

  1. Source data - Read at CDC event version (snapshot isolation)
  2. Flow state - Read at latest version (state continuity across CDC events)
  3. Isolated writes - Local PendingWrites buffer returned to caller

This dual-version approach allows stateful operators (joins, aggregates, distinct) to:

  • Process source data at a consistent snapshot (the CDC event version)
  • Access their own state at the latest version to maintain continuity

§Dual-Version Read Routing

Reads are automatically routed to the correct query transaction based on key type:

┌─────────────────┐
│  FlowTransaction│
└────────┬────────┘
         │
         ├──► primitive_query (at CDC version)
         │    - Source tables
         │    - Source views
         │    - Regular data
         │
         └──► state_query (at latest version)
              - FlowNodeState
              - FlowNodeInternalState
              - Stateful operator state

§Why Dual Versions Matter

§Example: Join Operator

// CDC event arrives at version 100
let mut flow_txn = FlowTransaction::new(&parent, CommitVersion(100));

// Join operator processes the event:
// 1. Reads source table at version 100 (snapshot)
let source_row = flow_txn.get(&source_key)?;

// 2. Reads join state at LATEST version (e.g., 150)
//    This state contains results from ALL previous CDC events
let join_state = flow_txn.state_get(node_id, &state_key)?;

// Without dual versions, join state would be stale at version 100

§Example: Distinct Operator

// Maintains a set of seen values across ALL CDC events
let seen = flow_txn.state_get(node_id, &value_key)?;

// If read at CDC version, would "forget" values seen in later events
// Dual-version ensures we see ALL distinct values accumulated so far

§Current Usage Pattern

FlowTransaction is used in worker threads to process CDC batches:

// In flow worker thread
let primitive_query = engine.multi().begin_query_at_version(batch.version)?;
let state_query = engine.multi().begin_query_at_version(state_version)?;

let mut txn = FlowTransaction {
    version: batch.version,
    pending: PendingWrites::new(),
    primitive_query,
    state_query,
    catalog: catalog.clone(),
};

for change in batch.changes {
    flow_engine.process(&mut txn, change, flow_id)?;
}

// Extract pending writes to merge into parent transaction
let pending = txn.pending;

§Write Path

All writes (set, remove) go to the local pending buffer:

  • Reads check pending buffer first, then delegate to query transactions
  • Pending writes are extracted and applied to parent transaction by caller

§Thread Safety

FlowTransaction is Send because all fields are either Copy, owned, or

Implementations§

Source§

impl FlowTransaction

Source

pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>>

Get a value by key, checking pending writes first, then querying multi-version store

Source

pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>

Check if a key exists

Source

pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>

Prefix scan

Source

pub fn range( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>

Create an iterator for forward range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries and maintains cursor state internally. Pending writes are merged with committed storage data.

Source

pub fn range_rev( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>

Create an iterator for reverse range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries in reverse key order and maintains cursor state internally.

Source§

impl FlowTransaction

Source

pub fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedValues>>

Get state for a specific flow node and key

Source

pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues, ) -> Result<()>

Set state for a specific flow node and key

Source

pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>

Remove state for a specific flow node and key

Source

pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>

Scan all state for a specific flow node

Source

pub fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>

Range query on state for a specific flow node

Source

pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>

Clear all state for a specific flow node

Source

pub fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, schema: &Schema, ) -> Result<EncodedValues>

Load state for a key, creating if not exists

Source

pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues, ) -> Result<()>

Save state encoded

Source§

impl FlowTransaction

Source

pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()>

Set a value, buffering it in pending writes

Source

pub fn remove(&mut self, key: &EncodedKey) -> Result<()>

Remove a key, buffering the deletion in pending operations

Source§

impl FlowTransaction

Source

pub fn new( parent: &AdminTransaction, version: CommitVersion, catalog: Catalog, interceptors: Interceptors, ) -> Self

Create a new FlowTransaction from a parent transaction at a specific CDC version.

Creates dual query transactions:

  • primitive_query: Reads at the specified CDC version (snapshot isolation)
  • state_query: Reads at the latest version (state continuity)
§Parameters
  • parent - The parent command transaction to derive from
  • version - The CDC event version for snapshot isolation (NOT parent.version())
  • catalog - The catalog for metadata access
Source

pub fn update_version(&mut self, new_version: CommitVersion)

Update the transaction to read at a new version

Trait Implementations§

Source§

impl WithInterceptors for FlowTransaction

Source§

fn table_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync>

Access table pre-insert interceptor chain
Source§

fn table_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync>

Access table post-insert interceptor chain
Source§

fn table_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>

Access table pre-update interceptor chain
Source§

fn table_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>

Access table post-update interceptor chain
Source§

fn table_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>

Access table pre-delete interceptor chain
Source§

fn table_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync>

Access table post-delete interceptor chain
Source§

fn ringbuffer_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync>

Access ring buffer pre-insert interceptor chain
Source§

fn ringbuffer_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync>

Access ring buffer post-insert interceptor chain
Source§

fn ringbuffer_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>

Access ring buffer pre-update interceptor chain
Source§

fn ringbuffer_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>

Access ring buffer post-update interceptor chain
Source§

fn ringbuffer_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>

Access ring buffer pre-delete interceptor chain
Source§

fn ringbuffer_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync>

Access ring buffer post-delete interceptor chain
Source§

fn pre_commit_interceptors( &mut self, ) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync>

Access pre-commit interceptor chain
Source§

fn post_commit_interceptors( &mut self, ) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync>

Access post-commit interceptor chain
Source§

fn namespace_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPostCreateInterceptor + Send + Sync>

Access namespace post-create interceptor chain
Source§

fn namespace_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPreUpdateInterceptor + Send + Sync>

Access namespace pre-update interceptor chain
Source§

fn namespace_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPostUpdateInterceptor + Send + Sync>

Access namespace post-update interceptor chain
Source§

fn namespace_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPreDeleteInterceptor + Send + Sync>

Access namespace pre-delete interceptor chain
Source§

fn table_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync>

Access table definition post-create interceptor chain
Source§

fn table_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync>

Access table definition pre-update interceptor chain
Source§

fn table_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync>

Access table definition post-update interceptor chain
Source§

fn table_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync>

Access table definition pre-delete interceptor chain
Source§

fn view_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync>

Access view pre-insert interceptor chain
Source§

fn view_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync>

Access view post-insert interceptor chain
Source§

fn view_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>

Access view pre-update interceptor chain
Source§

fn view_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>

Access view post-update interceptor chain
Source§

fn view_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>

Access view pre-delete interceptor chain
Source§

fn view_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync>

Access view post-delete interceptor chain
Source§

fn view_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync>

Access view post-create interceptor chain
Source§

fn view_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync>

Access view pre-update interceptor chain
Source§

fn view_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync>

Access view post-update interceptor chain
Source§

fn view_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync>

Access view pre-delete interceptor chain
Source§

fn ringbuffer_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync>

Access ring buffer definition post-create interceptor chain
Source§

fn ringbuffer_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync>

Access ring buffer definition pre-update interceptor chain
Source§

fn ringbuffer_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync>

Access ring buffer definition post-update interceptor chain
Source§

fn ringbuffer_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync>

Access ring buffer definition pre-delete interceptor chain

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more