pub struct FlowTransaction { /* private fields */ }Expand description
A transaction wrapper for flow processing with dual-version read semantics.
§Architecture
FlowTransaction provides dual-version reads critical for stateful flow processing:
- Source data - Read at CDC event version (snapshot isolation)
- Flow state - Read at latest version (state continuity across CDC events)
- Isolated writes - Local PendingWrites buffer returned to caller
This dual-version approach allows stateful operators (joins, aggregates, distinct) to:
- Process source data at a consistent snapshot (the CDC event version)
- Access their own state at the latest version to maintain continuity
§Dual-Version Read Routing
Reads are automatically routed to the correct query transaction based on key type:
┌─────────────────┐
│ FlowTransaction│
└────────┬────────┘
│
├──► primitive_query (at CDC version)
│ - Source tables
│ - Source views
│ - Regular data
│
└──► state_query (at latest version)
- FlowNodeState
- FlowNodeInternalState
- Stateful operator state§Why Dual Versions Matter
§Example: Join Operator
// CDC event arrives at version 100
let mut flow_txn = FlowTransaction::new(&parent, CommitVersion(100));
// Join operator processes the event:
// 1. Reads source table at version 100 (snapshot)
let source_row = flow_txn.get(&source_key)?;
// 2. Reads join state at LATEST version (e.g., 150)
// This state contains results from ALL previous CDC events
let join_state = flow_txn.state_get(node_id, &state_key)?;
// Without dual versions, join state would be stale at version 100§Example: Distinct Operator
// Maintains a set of seen values across ALL CDC events
let seen = flow_txn.state_get(node_id, &value_key)?;
// If read at CDC version, would "forget" values seen in later events
// Dual-version ensures we see ALL distinct values accumulated so far§Current Usage Pattern
FlowTransaction is used in worker threads to process CDC batches:
// In flow worker thread
let primitive_query = engine.multi().begin_query_at_version(batch.version)?;
let state_query = engine.multi().begin_query_at_version(state_version)?;
let mut txn = FlowTransaction {
version: batch.version,
pending: PendingWrites::new(),
primitive_query,
state_query,
catalog: catalog.clone(),
};
for change in batch.changes {
flow_engine.process(&mut txn, change, flow_id)?;
}
// Extract pending writes to merge into parent transaction
let pending = txn.pending;§Write Path
All writes (set, remove) go to the local pending buffer:
- Reads check pending buffer first, then delegate to query transactions
- Pending writes are extracted and applied to parent transaction by caller
§Thread Safety
FlowTransaction is Send because all fields are either Copy, owned, or
Implementations§
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>>
pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>>
Get a value by key, checking pending writes first, then querying multi-version store
Sourcepub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>
pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>
Check if a key exists
Sourcepub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>
pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>
Prefix scan
Sourcepub fn range(
&mut self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>
pub fn range( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>
Create an iterator for forward range queries.
This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries and maintains cursor state internally. Pending writes are merged with committed storage data.
Sourcepub fn range_rev(
&mut self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>
pub fn range_rev( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>
Create an iterator for reverse range queries.
This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries in reverse key order and maintains cursor state internally.
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn state_get(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
) -> Result<Option<EncodedValues>>
pub fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedValues>>
Get state for a specific flow node and key
Sourcepub fn state_set(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
value: EncodedValues,
) -> Result<()>
pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues, ) -> Result<()>
Set state for a specific flow node and key
Sourcepub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>
pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>
Remove state for a specific flow node and key
Sourcepub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>
pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>
Scan all state for a specific flow node
Sourcepub fn state_range(
&mut self,
id: FlowNodeId,
range: EncodedKeyRange,
) -> Result<MultiVersionBatch>
pub fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>
Range query on state for a specific flow node
Sourcepub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>
pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>
Clear all state for a specific flow node
Sourcepub fn load_or_create_row(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
schema: &Schema,
) -> Result<EncodedValues>
pub fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, schema: &Schema, ) -> Result<EncodedValues>
Load state for a key, creating if not exists
Sourcepub fn save_row(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
row: EncodedValues,
) -> Result<()>
pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues, ) -> Result<()>
Save state encoded
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()>
pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()>
Set a value, buffering it in pending writes
Sourcepub fn remove(&mut self, key: &EncodedKey) -> Result<()>
pub fn remove(&mut self, key: &EncodedKey) -> Result<()>
Remove a key, buffering the deletion in pending operations
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn new(
parent: &AdminTransaction,
version: CommitVersion,
catalog: Catalog,
interceptors: Interceptors,
) -> Self
pub fn new( parent: &AdminTransaction, version: CommitVersion, catalog: Catalog, interceptors: Interceptors, ) -> Self
Create a new FlowTransaction from a parent transaction at a specific CDC version.
Creates dual query transactions:
primitive_query: Reads at the specified CDC version (snapshot isolation)state_query: Reads at the latest version (state continuity)
§Parameters
parent- The parent command transaction to derive fromversion- The CDC event version for snapshot isolation (NOT parent.version())catalog- The catalog for metadata access
Sourcepub fn update_version(&mut self, new_version: CommitVersion)
pub fn update_version(&mut self, new_version: CommitVersion)
Update the transaction to read at a new version
Trait Implementations§
Source§impl WithInterceptors for FlowTransaction
impl WithInterceptors for FlowTransaction
Source§fn table_pre_insert_interceptors(
&mut self,
) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync>
fn table_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn TablePreInsertInterceptor + Send + Sync>
Source§fn table_post_insert_interceptors(
&mut self,
) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync>
fn table_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn TablePostInsertInterceptor + Send + Sync>
Source§fn table_pre_update_interceptors(
&mut self,
) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>
fn table_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>
Source§fn table_post_update_interceptors(
&mut self,
) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>
fn table_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>
Source§fn table_pre_delete_interceptors(
&mut self,
) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>
fn table_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>
Source§fn table_post_delete_interceptors(
&mut self,
) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync>
fn table_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePostDeleteInterceptor + Send + Sync>
Source§fn ringbuffer_pre_insert_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync>
fn ringbuffer_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreInsertInterceptor + Send + Sync>
Source§fn ringbuffer_post_insert_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync>
fn ringbuffer_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostInsertInterceptor + Send + Sync>
Source§fn ringbuffer_pre_update_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>
fn ringbuffer_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>
Source§fn ringbuffer_post_update_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>
fn ringbuffer_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>
Source§fn ringbuffer_pre_delete_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>
fn ringbuffer_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>
Source§fn ringbuffer_post_delete_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync>
fn ringbuffer_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostDeleteInterceptor + Send + Sync>
Source§fn pre_commit_interceptors(
&mut self,
) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync>
fn pre_commit_interceptors( &mut self, ) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync>
Source§fn post_commit_interceptors(
&mut self,
) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync>
fn post_commit_interceptors( &mut self, ) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync>
Source§fn namespace_def_post_create_interceptors(
&mut self,
) -> &mut Chain<dyn NamespaceDefPostCreateInterceptor + Send + Sync>
fn namespace_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPostCreateInterceptor + Send + Sync>
Source§fn namespace_def_pre_update_interceptors(
&mut self,
) -> &mut Chain<dyn NamespaceDefPreUpdateInterceptor + Send + Sync>
fn namespace_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPreUpdateInterceptor + Send + Sync>
Source§fn namespace_def_post_update_interceptors(
&mut self,
) -> &mut Chain<dyn NamespaceDefPostUpdateInterceptor + Send + Sync>
fn namespace_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPostUpdateInterceptor + Send + Sync>
Source§fn namespace_def_pre_delete_interceptors(
&mut self,
) -> &mut Chain<dyn NamespaceDefPreDeleteInterceptor + Send + Sync>
fn namespace_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn NamespaceDefPreDeleteInterceptor + Send + Sync>
Source§fn table_def_post_create_interceptors(
&mut self,
) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync>
fn table_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPostCreateInterceptor + Send + Sync>
Source§fn table_def_pre_update_interceptors(
&mut self,
) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync>
fn table_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPreUpdateInterceptor + Send + Sync>
Source§fn table_def_post_update_interceptors(
&mut self,
) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync>
fn table_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPostUpdateInterceptor + Send + Sync>
Source§fn table_def_pre_delete_interceptors(
&mut self,
) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync>
fn table_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableDefPreDeleteInterceptor + Send + Sync>
Source§fn view_pre_insert_interceptors(
&mut self,
) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync>
fn view_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreInsertInterceptor + Send + Sync>
Source§fn view_post_insert_interceptors(
&mut self,
) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync>
fn view_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostInsertInterceptor + Send + Sync>
Source§fn view_pre_update_interceptors(
&mut self,
) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>
fn view_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>
Source§fn view_post_update_interceptors(
&mut self,
) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>
fn view_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>
Source§fn view_pre_delete_interceptors(
&mut self,
) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>
fn view_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>
Source§fn view_post_delete_interceptors(
&mut self,
) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync>
fn view_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostDeleteInterceptor + Send + Sync>
Source§fn view_def_post_create_interceptors(
&mut self,
) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync>
fn view_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPostCreateInterceptor + Send + Sync>
Source§fn view_def_pre_update_interceptors(
&mut self,
) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync>
fn view_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPreUpdateInterceptor + Send + Sync>
Source§fn view_def_post_update_interceptors(
&mut self,
) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync>
fn view_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPostUpdateInterceptor + Send + Sync>
Source§fn view_def_pre_delete_interceptors(
&mut self,
) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync>
fn view_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewDefPreDeleteInterceptor + Send + Sync>
Source§fn ringbuffer_def_post_create_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync>
fn ringbuffer_def_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPostCreateInterceptor + Send + Sync>
Source§fn ringbuffer_def_pre_update_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync>
fn ringbuffer_def_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPreUpdateInterceptor + Send + Sync>
Source§fn ringbuffer_def_post_update_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync>
fn ringbuffer_def_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPostUpdateInterceptor + Send + Sync>
Source§fn ringbuffer_def_pre_delete_interceptors(
&mut self,
) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync>
fn ringbuffer_def_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferDefPreDeleteInterceptor + Send + Sync>
Auto Trait Implementations§
impl Freeze for FlowTransaction
impl !RefUnwindSafe for FlowTransaction
impl Send for FlowTransaction
impl Sync for FlowTransaction
impl Unpin for FlowTransaction
impl UnsafeUnpin for FlowTransaction
impl !UnwindSafe for FlowTransaction
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more