FlowTransaction

Struct FlowTransaction 

Source
pub struct FlowTransaction { /* private fields */ }
Expand description

A transaction wrapper for parallel flow processing with snapshot isolation.

§Architecture

FlowTransaction enables parallel processing of independent data flows by providing:

  1. Snapshot reads - via a wrapped StandardQueryTransaction reading at a fixed version
  2. Isolated writes - via a local PendingWrites buffer unique to this transaction
  3. Sequential merge - buffered writes are applied back to parent at commit time

§Read Path

All reads go through the wrapped query transaction, which provides a consistent snapshot view of the database at version. The query transaction is read-only and cannot modify the underlying storage.

For keys that have been modified locally:

  • Reads check the pending buffer first
  • If found there (or marked for removal), return the local value
  • Otherwise, delegate to the query transaction for the snapshot value

§Write Path

All writes (set, remove) go to the local pending buffer only:

  • Writes are NOT visible to the parent transaction
  • Writes are NOT visible to other FlowTransactions
  • Writes are NOT persisted to storage

The pending buffer is committed back to the parent transaction via commit().

§Parallel Processing Pattern

let mut parent = engine.begin_command()?;

// Create multiple FlowTransactions from shared parent reference
// Each uses the CDC event version for proper snapshot isolation
let flow_txns: Vec<FlowTransaction> = cdc_events
    .iter()
    .map(|cdc| FlowTransaction::new(&parent, cdc.version))
    .collect();

// Process in parallel (e.g., using rayon)
let results: Vec<FlowTransaction> = flow_txns
    .into_par_iter()
    .map(|mut txn| {
        // Process flow, making reads and writes
        process_flow(&mut txn)?;
        Ok(txn)
    })
    .collect()?;

// Sequential merge back to parent
for flow_txn in results {
    flow_txn.commit(&mut parent)?;
}

// Atomic commit of all changes
parent.commit()?;

§Thread Safety

FlowTransaction implements Send because:

  • version is Copy
  • query wraps Arc-based multi-version transaction (Send + Sync)
  • pending and metrics are owned and not shared

This allows FlowTransactions to be moved to worker threads for parallel processing.

Implementations§

Source§

impl FlowTransaction

Source

pub async fn commit( &mut self, parent: &mut StandardCommandTransaction, ) -> Result<FlowTransactionMetrics>

Commit all pending writes and removes to the parent transaction

Takes the parent transaction as a mutable reference to apply buffered operations. This allows the FlowTransaction to be reused for subsequent units of work. The pending buffer is NOT cleared to maintain read-your-own-writes semantics.

Returns the transaction metrics.

§Errors

Returns an error if any key in this FlowTransaction overlaps with keys already written by another FlowTransaction to the same parent. FlowTransactions must operate on non-overlapping keyspaces.

Source§

impl FlowTransaction

Source

pub async fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedValues>>

Get a value by key, checking pending writes first, then querying multi-version store

Source

pub async fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>

Check if a key exists

Source

pub async fn range( &mut self, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>

Range query

Source

pub async fn range_batched( &mut self, range: EncodedKeyRange, batch_size: u64, ) -> Result<MultiVersionBatch>

Range query with batching

Source

pub async fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>

Prefix scan

Source§

impl FlowTransaction

Source

pub async fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedValues>>

Get state for a specific flow node and key

Source

pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedValues, ) -> Result<()>

Set state for a specific flow node and key

Source

pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>

Remove state for a specific flow node and key

Source

pub async fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>

Scan all state for a specific flow node

Source

pub async fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>

Range query on state for a specific flow node

Source

pub async fn state_clear(&mut self, id: FlowNodeId) -> Result<()>

Clear all state for a specific flow node

Source

pub async fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, layout: &EncodedValuesLayout, ) -> Result<EncodedValues>

Load state for a key, creating if not exists

Source

pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedValues, ) -> Result<()>

Save state encoded

Source§

impl FlowTransaction

Source

pub fn set(&mut self, key: &EncodedKey, value: EncodedValues) -> Result<()>

Set a value, buffering it in pending writes

Source

pub fn remove(&mut self, key: &EncodedKey) -> Result<()>

Remove a key, buffering the deletion in pending operations

Source§

impl FlowTransaction

Source

pub async fn new( parent: &StandardCommandTransaction, version: CommitVersion, ) -> Self

Create a new FlowTransaction from a parent transaction at a specific CDC version

Takes a shared reference to the parent, allowing multiple FlowTransactions to be created for parallel processing.

§Parameters
  • parent - The parent command transaction to derive from
  • version - The CDC event version for snapshot isolation (NOT parent.version())
Source

pub fn version(&self) -> CommitVersion

Get the version this transaction is reading at

Source

pub async fn update_version(&mut self, new_version: CommitVersion)

Update the transaction to read at a new version

Source

pub fn metrics(&self) -> &FlowTransactionMetrics

Get immutable reference to the metrics

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more