pub struct FlowTransaction { /* private fields */ }Expand description
A transaction wrapper for parallel flow processing with snapshot isolation.
§Architecture
FlowTransaction enables parallel processing of independent data flows by providing:
- Snapshot reads - via a wrapped StandardQueryTransaction reading at a fixed version
- Isolated writes - via a local PendingWrites buffer unique to this transaction
- Sequential merge - buffered writes are applied back to parent at commit time
§Read Path
All reads go through the wrapped query transaction, which provides a consistent
snapshot view of the database at version. The query transaction is read-only and
cannot modify the underlying storage.
For keys that have been modified locally:
- Reads check the
pendingbuffer first - If found there (or marked for removal), return the local value
- Otherwise, delegate to the
querytransaction for the snapshot value
§Write Path
All writes (set, remove) go to the local pending buffer only:
- Writes are NOT visible to the parent transaction
- Writes are NOT visible to other FlowTransactions
- Writes are NOT persisted to storage
The pending buffer is committed back to the parent transaction via commit().
§Parallel Processing Pattern
let mut parent = engine.begin_command()?;
// Create multiple FlowTransactions from shared parent reference
// Each uses the CDC event version for proper snapshot isolation
let flow_txns: Vec<FlowTransaction> = cdc_events
.iter()
.map(|cdc| FlowTransaction::new(&parent, cdc.version))
.collect();
// Process in parallel (e.g., using rayon)
let results: Vec<FlowTransaction> = flow_txns
.into_par_iter()
.map(|mut txn| {
// Process flow, making reads and writes
process_flow(&mut txn)?;
Ok(txn)
})
.collect()?;
// Sequential merge back to parent
for flow_txn in results {
flow_txn.commit(&mut parent)?;
}
// Atomic commit of all changes
parent.commit()?;§Thread Safety
FlowTransaction implements Send because:
versionis Copyquerywraps Arc-based multi-version transaction (Send + Sync)pendingandmetricsare owned and not shared
This allows FlowTransactions to be moved to worker threads for parallel processing.
Implementations§
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn commit(
&mut self,
parent: &mut StandardCommandTransaction,
) -> Result<FlowTransactionMetrics>
pub fn commit( &mut self, parent: &mut StandardCommandTransaction, ) -> Result<FlowTransactionMetrics>
Commit all pending writes and removes to the parent transaction
Takes the parent transaction as a mutable reference to apply buffered operations. This allows the FlowTransaction to be reused for subsequent units of work. The pending buffer is NOT cleared to maintain read-your-own-writes semantics.
Returns the transaction metrics.
§Errors
Returns an error if any key in this FlowTransaction overlaps with keys already written by another FlowTransaction to the same parent. FlowTransactions must operate on non-overlapping keyspaces.
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>>
pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>>
Get a value by key, checking pending writes first, then querying multi-version store
Sourcepub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>
pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>
Check if a key exists
Sourcepub fn range(
&mut self,
range: EncodedKeyRange,
) -> Result<BoxedMultiVersionIter<'_>>
pub fn range( &mut self, range: EncodedKeyRange, ) -> Result<BoxedMultiVersionIter<'_>>
Range query
Sourcepub fn range_batched(
&mut self,
range: EncodedKeyRange,
batch_size: u64,
) -> Result<BoxedMultiVersionIter<'_>>
pub fn range_batched( &mut self, range: EncodedKeyRange, batch_size: u64, ) -> Result<BoxedMultiVersionIter<'_>>
Range query with batching
Sourcepub fn prefix(
&mut self,
prefix: &EncodedKey,
) -> Result<BoxedMultiVersionIter<'_>>
pub fn prefix( &mut self, prefix: &EncodedKey, ) -> Result<BoxedMultiVersionIter<'_>>
Prefix scan
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn state_get(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
) -> Result<Option<EncodedValues>>
pub fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedValues>>
Get state for a specific flow node and key
Sourcepub fn state_set(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
value: EncodedValues,
) -> Result<()>
pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues, ) -> Result<()>
Set state for a specific flow node and key
Sourcepub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>
pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>
Remove state for a specific flow node and key
Sourcepub fn state_scan(
&mut self,
id: FlowNodeId,
) -> Result<BoxedMultiVersionIter<'_>>
pub fn state_scan( &mut self, id: FlowNodeId, ) -> Result<BoxedMultiVersionIter<'_>>
Scan all state for a specific flow node
Sourcepub fn state_range(
&mut self,
id: FlowNodeId,
range: EncodedKeyRange,
) -> Result<BoxedMultiVersionIter<'_>>
pub fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<BoxedMultiVersionIter<'_>>
Range query on state for a specific flow node
Sourcepub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>
pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>
Clear all state for a specific flow node
Sourcepub fn load_or_create_row(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
layout: &EncodedValuesLayout,
) -> Result<EncodedValues>
pub fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, layout: &EncodedValuesLayout, ) -> Result<EncodedValues>
Load state for a key, creating if not exists
Sourcepub fn save_row(
&mut self,
id: FlowNodeId,
key: &EncodedKey,
row: EncodedValues,
) -> Result<()>
pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues, ) -> Result<()>
Save state encoded
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()>
pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()>
Set a value, buffering it in pending writes
Sourcepub fn remove(&mut self, key: &EncodedKey) -> Result<()>
pub fn remove(&mut self, key: &EncodedKey) -> Result<()>
Remove a key, buffering the deletion in pending operations
Sourcepub fn insert_view(
&mut self,
key: &EncodedKey,
view: ViewDef,
row_number: RowNumber,
row: EncodedValues,
) -> Result<()>
pub fn insert_view( &mut self, key: &EncodedKey, view: ViewDef, row_number: RowNumber, row: EncodedValues, ) -> Result<()>
Insert a row into a view, tracking the operation for interceptor calls at commit time.
This method buffers the write in pending (for storage) and also tracks the view operation separately so that ViewInterceptor::pre_insert/post_insert can be called on the parent transaction during commit().
Sourcepub fn update_view(
&mut self,
old_key: &EncodedKey,
new_key: &EncodedKey,
view: ViewDef,
old_row_number: RowNumber,
new_row_number: RowNumber,
row: EncodedValues,
) -> Result<()>
pub fn update_view( &mut self, old_key: &EncodedKey, new_key: &EncodedKey, view: ViewDef, old_row_number: RowNumber, new_row_number: RowNumber, row: EncodedValues, ) -> Result<()>
Update a row in a view (remove old, insert new), tracking for interceptor calls.
This method buffers both the remove and set in pending (for storage) and tracks the view operation for ViewInterceptor::pre_update/post_update calls during commit().
Sourcepub fn remove_view(
&mut self,
key: &EncodedKey,
view: ViewDef,
row_number: RowNumber,
) -> Result<()>
pub fn remove_view( &mut self, key: &EncodedKey, view: ViewDef, row_number: RowNumber, ) -> Result<()>
Remove a row from a view, tracking the operation for interceptor calls at commit time.
This method buffers the removal in pending (for storage) and tracks the view operation for ViewInterceptor::pre_delete/post_delete calls during commit().
Source§impl FlowTransaction
impl FlowTransaction
Sourcepub fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self
pub fn new(parent: &StandardCommandTransaction, version: CommitVersion) -> Self
Create a new FlowTransaction from a parent transaction at a specific CDC version
Takes a shared reference to the parent, allowing multiple FlowTransactions to be created for parallel processing.
§Parameters
parent- The parent command transaction to derive fromversion- The CDC event version for snapshot isolation (NOT parent.version())
Sourcepub fn version(&self) -> CommitVersion
pub fn version(&self) -> CommitVersion
Get the version this transaction is reading at
Sourcepub fn update_version(&mut self, new_version: CommitVersion)
pub fn update_version(&mut self, new_version: CommitVersion)
Update the transaction to read at a new version
Sourcepub fn metrics(&self) -> &FlowTransactionMetrics
pub fn metrics(&self) -> &FlowTransactionMetrics
Get immutable reference to the metrics