pub enum FlowTransaction {
Deferred {
inner: FlowTransactionInner,
},
Transactional {
inner: FlowTransactionInner,
base_pending: Pending,
},
}Expand description
A transaction wrapper for flow processing with dual-version read semantics.
§Architecture
FlowTransaction provides dual-version reads critical for stateful flow processing:
- Source data - Read at CDC event version (snapshot isolation)
- Flow state - Read at latest version (state continuity across CDC events)
- Isolated writes - Local PendingWrites buffer returned to caller
This dual-version approach allows stateful operators (joins, aggregates, distinct) to:
- Process source data at a consistent snapshot (the CDC event version)
- Access their own state at the latest version to maintain continuity
§Dual-Version Read Routing
Reads are automatically routed to the correct query transaction based on key type:
┌─────────────────┐
│ FlowTransaction│
└────────┬────────┘
│
├──► pending (flow-generated writes)
│
├──► variant
│ ├─ Deferred: skip
│ └─ Transactional { base_pending }: check base_pending
│
├──► primitive_query (at CDC version)
│ - Source tables / views / regular data
│
└──► state_query (at latest version)
- FlowNodeState / FlowNodeInternalState§Construction
Use named constructors to enforce correct initialization:
FlowTransaction::deferred— CDC path (no base pending)FlowTransaction::transactional— inline pre-commit path (with base pending)
§Write Path
All writes (set, remove) go to the local pending buffer:
- Reads check pending buffer first, then delegate to query transactions
- Pending writes are extracted via
FlowTransaction::take_pending
§Thread Safety
FlowTransaction is Send because all fields are either Copy, owned, or
Variants§
Deferred
CDC-driven async flow processing. Reads only from committed storage + flow pending writes.
Fields
inner: FlowTransactionInnerTransactional
Inline flow processing within a committing transaction. Can additionally read uncommitted writes from the parent transaction.
Fields
inner: FlowTransactionInnerImplementations§
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>>
pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>>
Get a value by key, checking pending writes first, then (if transactional) base_pending, then querying multi-version store
Sourcepub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>
pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>
Check if a key exists
Sourcepub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>
pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>
Prefix scan
Sourcepub fn range(
&mut self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>
pub fn range( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>
Create an iterator for forward range queries.
This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries and maintains cursor state internally. Pending writes are merged with committed storage data.
Sourcepub fn range_rev(
&mut self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>
pub fn range_rev( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>
Create an iterator for reverse range queries.
This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries in reverse key order and maintains cursor state internally.
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn state_get(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
) -> Result<Option<EncodedRow>>
pub fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedRow>>
Get state for a specific flow node and key
Sourcepub fn state_set(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
value: EncodedRow,
) -> Result<()>
pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedRow, ) -> Result<()>
Set state for a specific flow node and key
Sourcepub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>
pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>
Remove state for a specific flow node and key
Sourcepub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>
pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>
Scan all state for a specific flow node
Sourcepub fn state_range(
&mut self,
id: FlowNodeId,
range: EncodedKeyRange,
) -> Result<MultiVersionBatch>
pub fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>
Range query on state for a specific flow node
Sourcepub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>
pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>
Clear all state for a specific flow node
Sourcepub fn load_or_create_row(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
schema: &RowSchema,
) -> Result<EncodedRow>
pub fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, schema: &RowSchema, ) -> Result<EncodedRow>
Load state for a key, creating if not exists
Sourcepub fn save_row(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
row: EncodedRow,
) -> Result<()>
pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedRow, ) -> Result<()>
Save state encoded
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()>
pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()>
Set a value, buffering it in pending writes
Sourcepub fn remove(&mut self, key: &EncodedKey) -> Result<()>
pub fn remove(&mut self, key: &EncodedKey) -> Result<()>
Remove a key, buffering the deletion in pending operations
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn deferred(
parent: &AdminTransaction,
version: CommitVersion,
catalog: Catalog,
interceptors: Interceptors,
) -> Self
pub fn deferred( parent: &AdminTransaction, version: CommitVersion, catalog: Catalog, interceptors: Interceptors, ) -> Self
Create a deferred (CDC) FlowTransaction from a parent transaction.
Used by the async worker path. Reads only from committed storage + flow-generated pending writes — no base pending from a parent transaction.
Sourcepub fn deferred_from_parts(
version: CommitVersion,
pending: Pending,
primitive_query: MultiReadTransaction,
state_query: MultiReadTransaction,
catalog: Catalog,
interceptors: Interceptors,
) -> Self
pub fn deferred_from_parts( version: CommitVersion, pending: Pending, primitive_query: MultiReadTransaction, state_query: MultiReadTransaction, catalog: Catalog, interceptors: Interceptors, ) -> Self
Create a deferred (CDC) FlowTransaction from pre-built parts.
Used by the worker actor which creates its own query transactions.
Sourcepub fn transactional(
version: CommitVersion,
pending: Pending,
base_pending: Pending,
primitive_query: MultiReadTransaction,
state_query: MultiReadTransaction,
catalog: Catalog,
interceptors: Interceptors,
) -> Self
pub fn transactional( version: CommitVersion, pending: Pending, base_pending: Pending, primitive_query: MultiReadTransaction, state_query: MultiReadTransaction, catalog: Catalog, interceptors: Interceptors, ) -> Self
Create a transactional (inline) FlowTransaction.
Used by the pre-commit interceptor path. base_pending is a read-only
snapshot of the committing transaction’s KV writes so that flow operators
can see uncommitted row data.
Sourcepub fn version(&self) -> CommitVersion
pub fn version(&self) -> CommitVersion
Get the transaction version.
Sourcepub fn take_pending(&mut self) -> Pending
pub fn take_pending(&mut self) -> Pending
Extract pending writes, replacing them with an empty buffer.
Sourcepub fn track_flow_change(&mut self, change: Change)
pub fn track_flow_change(&mut self, change: Change)
Track a view-level flow change in this transaction’s accumulator.
Sourcepub fn take_accumulator_entries(&mut self) -> Vec<(SchemaId, Diff)>
pub fn take_accumulator_entries(&mut self) -> Vec<(SchemaId, Diff)>
Drain the accumulator entries collected during flow processing.
Sourcepub fn update_version(&mut self, new_version: CommitVersion)
pub fn update_version(&mut self, new_version: CommitVersion)
Update the transaction to read at a new version
Trait Implementations§
Source§impl WithInterceptors for FlowTransaction
impl WithInterceptors for FlowTransaction
fn table_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync>
fn table_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync>
fn table_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync>
fn table_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync>
fn table_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync>
fn table_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync>
fn ringbuffer_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync>
fn ringbuffer_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync>
fn ringbuffer_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync>
fn ringbuffer_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync>
fn ringbuffer_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync>
fn ringbuffer_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync>
fn pre_commit_interceptors( &mut self, ) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync>
fn post_commit_interceptors( &mut self, ) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync>
fn namespace_post_create_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync>
fn namespace_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync>
fn namespace_post_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync>
fn namespace_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync>
fn table_post_create_interceptors( &mut self, ) -> &mut Chain<dyn TablePostCreateInterceptor + Send + Sync>
fn table_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>
fn table_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>
fn table_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>
fn view_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync>
fn view_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync>
fn view_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync>
fn view_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync>
fn view_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync>
fn view_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync>
fn view_post_create_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync>
fn view_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>
fn view_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>
fn view_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>
fn ringbuffer_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync>
fn ringbuffer_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>
fn ringbuffer_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>
fn ringbuffer_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>
fn dictionary_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync>
fn dictionary_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync>
fn dictionary_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync>
fn dictionary_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync>
fn dictionary_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync>
fn dictionary_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync>
fn dictionary_post_create_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync>
fn dictionary_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync>
fn dictionary_post_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync>
fn dictionary_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync>
fn series_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync>
fn series_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync>
fn series_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync>
fn series_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync>
fn series_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync>
fn series_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync>
fn series_post_create_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync>
fn series_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync>
fn series_post_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync>
fn series_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync>
fn identity_post_create_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync>
fn identity_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync>
fn identity_post_update_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync>
fn identity_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync>
fn role_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RolePostCreateInterceptor + Send + Sync>
fn role_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync>
fn role_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync>
fn role_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync>
fn granted_role_post_create_interceptors( &mut self, ) -> &mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync>
fn granted_role_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync>
fn authentication_post_create_interceptors( &mut self, ) -> &mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync>
fn authentication_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync>
Auto Trait Implementations§
impl Freeze for FlowTransaction
impl !RefUnwindSafe for FlowTransaction
impl Send for FlowTransaction
impl Sync for FlowTransaction
impl Unpin for FlowTransaction
impl UnsafeUnpin for FlowTransaction
impl !UnwindSafe for FlowTransaction
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> GetSetFdFlags for T
impl<T> GetSetFdFlags for T
Source§fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
fn get_fd_flags(&self) -> Result<FdFlags, Error>where
T: AsFilelike,
self file descriptor.Source§fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>where
T: AsFilelike,
Source§fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>where
T: AsFilelike,
self file descriptor. Read moreSource§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request