Skip to main content

FlowTransaction

Enum FlowTransaction 

Source
pub enum FlowTransaction {
    Deferred {
        inner: FlowTransactionInner,
    },
    Transactional {
        inner: FlowTransactionInner,
        base_pending: Pending,
        view_overlay: Arc<Vec<Change>>,
    },
    Ephemeral {
        inner: FlowTransactionInner,
        state: HashMap<EncodedKey, EncodedRow>,
    },
}

Variants§

§

Deferred

CDC-driven async flow processing. Reads only from committed storage + flow pending writes.

§

Transactional

Inline flow processing within a committing transaction. Can additionally read uncommitted writes from the parent transaction.

Fields

§base_pending: Pending

Read-only snapshot of the committing transaction’s KV writes.

§view_overlay: Arc<Vec<Change>>

View outputs produced by sibling flows in earlier execution levels of this pre-commit. Consulted by view-reading pull paths to overlay in-transaction writes on top of the read_version snapshot. Empty for the first level.

§

Ephemeral

Ephemeral subscription flow processing.

Operator state lives in an in-memory HashMap instead of the multi-version store; source reads go through query at the CDC version. No writes are committed to persistent storage.

Fields

§state: HashMap<EncodedKey, EncodedRow>

In-memory operator state, replacing state_query for FlowNodeState keys.

Implementations§

Source§

impl FlowTransaction

Source

pub fn get(&mut self, key: &EncodedKey) -> Result<Option<EncodedRow>>

Get a value by key, checking pending writes first, then (if transactional) base_pending, then querying multi-version store

Source

pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool>

Check if a key exists

Source

pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch>

Prefix scan

Source

pub fn range( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>

Create an iterator for forward range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries and maintains cursor state internally. Pending writes are merged with committed storage data.

Source

pub fn range_rev( &mut self, range: EncodedKeyRange, batch_size: usize, ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_>

Create an iterator for reverse range queries.

This properly handles high version density by scanning until batch_size unique logical keys are collected. The iterator yields individual entries in reverse key order and maintains cursor state internally.

Source§

impl FlowTransaction

Source

pub fn state_get( &mut self, id: FlowNodeId, key: &EncodedKey, ) -> Result<Option<EncodedRow>>

Source

pub fn state_set( &mut self, id: FlowNodeId, key: &EncodedKey, value: EncodedRow, ) -> Result<()>

Source

pub fn state_remove(&mut self, id: FlowNodeId, key: &EncodedKey) -> Result<()>

Source

pub fn state_scan(&mut self, id: FlowNodeId) -> Result<MultiVersionBatch>

Source

pub fn state_range( &mut self, id: FlowNodeId, range: EncodedKeyRange, ) -> Result<MultiVersionBatch>

Source

pub fn state_clear(&mut self, id: FlowNodeId) -> Result<()>

Source

pub fn load_or_create_row( &mut self, id: FlowNodeId, key: &EncodedKey, shape: &RowShape, ) -> Result<EncodedRow>

Source

pub fn save_row( &mut self, id: FlowNodeId, key: &EncodedKey, row: EncodedRow, ) -> Result<()>

Source§

impl FlowTransaction

Source

pub fn set(&mut self, key: &EncodedKey, value: EncodedRow) -> Result<()>

Set a value, buffering it in pending writes

Source

pub fn remove(&mut self, key: &EncodedKey) -> Result<()>

Remove a key, buffering the deletion in pending operations

Source§

impl FlowTransaction

Source

pub fn deferred( parent: &AdminTransaction, version: CommitVersion, catalog: Catalog, interceptors: Interceptors, clock: Clock, ) -> Self

Create a deferred (CDC) FlowTransaction from a parent transaction.

Used by the async worker path. Reads only from committed storage + flow-generated pending writes - no base pending from a parent transaction.

Source

pub fn deferred_from_parts( version: CommitVersion, pending: Pending, query: MultiReadTransaction, state_query: MultiReadTransaction, catalog: Catalog, interceptors: Interceptors, clock: Clock, ) -> Self

Create a deferred (CDC) FlowTransaction from pre-built parts.

Used by the worker actor which creates its own query transactions.

Source

pub fn transactional(params: TransactionalParams) -> Self

Create a transactional (inline) FlowTransaction.

Used by the pre-commit interceptor path. base_pending is a read-only snapshot of the committing transaction’s KV writes so that flow operators can see uncommitted row data.

Source

pub fn view_overlay(&self) -> Option<Arc<Vec<Change>>>

Return a (cheap) clone of the in-transaction view overlay, if any. Returns None for Deferred / Ephemeral transactions (which read everything from committed storage). Operators reading from a view parent overlay these changes on top of their storage reads so sibling transactional view outputs produced earlier in the same pre-commit are visible.

Source

pub fn ephemeral( version: CommitVersion, query: MultiReadTransaction, catalog: Catalog, state: HashMap<EncodedKey, EncodedRow>, clock: Clock, ) -> Self

Create an ephemeral (subscription) FlowTransaction.

Operator state is backed by an in-memory HashMap. Source data reads go through query at the specified version. State reads go to state instead of the multi-version store.

Source

pub fn merge_state(&mut self)

Merge pending state writes back into the ephemeral state HashMap.

After flow processing, pending writes contain both state mutations and subscription output writes. This method merges state mutations (keys matching FlowNodeState/FlowNodeInternalState) back into state and clears pending.

Only applicable to the Ephemeral variant; no-op for others.

Source

pub fn take_state(&mut self) -> HashMap<EncodedKey, EncodedRow>

Extract the ephemeral state HashMap, consuming the state from this transaction.

Used to persist ephemeral state across CDC batches. Only applicable to the Ephemeral variant; returns empty HashMap for others.

Source

pub fn version(&self) -> CommitVersion

Get the transaction version.

Source

pub fn take_pending(&mut self) -> Pending

Extract pending writes, replacing them with an empty buffer.

Source

pub fn take_pending_shapes(&mut self) -> Vec<RowShape>

Extract pending shapes, replacing them with an empty buffer.

Source

pub fn track_flow_change(&mut self, change: Change)

Track a view-level flow change in this transaction’s accumulator.

Source

pub fn take_accumulator_entries(&mut self) -> Vec<(ShapeId, Diff)>

Drain the accumulator entries collected during flow processing.

Source

pub fn update_version(&mut self, new_version: CommitVersion)

Update the transaction to read at a new version

Source

pub fn catalog(&self) -> &Catalog

Get access to the catalog for reading metadata

Source

pub fn clock(&self) -> &Clock

Get access to the clock for timestamp generation

Source

pub fn operator_state<S, F>( &mut self, node: FlowNodeId, load: F, ) -> Result<&mut S>
where S: 'static + Send, F: FnOnce(&mut Self) -> Result<(S, PersistFn)>,

Get or initialise the cached operator state for node.

On first access in this txn, load is invoked to build (state, persist). state is held boxed, persist is stashed for flush_operator_states. On subsequent calls, the cached state is returned directly without re-decoding, regardless of how many batches have flowed through.

Operators that mutate the returned &mut S must call mark_state_dirty(node) so the slot is persisted at flush time.

Source

pub fn mark_state_dirty(&mut self, node: FlowNodeId)

Mark the cached state for node as dirty so it is persisted on flush. No-op if the slot does not exist (e.g. operator never touched state).

Source

pub fn take_operator_state<S, F>( &mut self, node: FlowNodeId, load: F, ) -> Result<(S, PersistFn)>
where S: 'static + Send, F: FnOnce(&mut Self) -> Result<(S, PersistFn)>,

Take the cached state for node, returning the owned value and the existing persist closure. Used by operators whose helpers need &mut FlowTransaction alongside &mut State (e.g. Take calling parent.pull(txn, ...) while mutating its TakeState). Pair every take_operator_state with put_operator_state before returning so the slot is restored for the next batch.

Source

pub fn put_operator_state<S>( &mut self, node: FlowNodeId, state: S, persist: PersistFn, )
where S: 'static + Send,

Restore a state slot taken by take_operator_state, marking it dirty since the operator just mutated it.

Source

pub fn flush_operator_states(&mut self) -> Result<()>

Drain dirty slots and persist them. Must be called before take_pending so the resulting state writes are included in the pending buffer that the caller later commits.

Trait Implementations§

Source§

impl WithInterceptors for FlowTransaction

Source§

fn table_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreInsertInterceptor + Send + Sync>

Source§

fn table_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostInsertInterceptor + Send + Sync>

Source§

fn table_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreUpdateInterceptor + Send + Sync>

Source§

fn table_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostUpdateInterceptor + Send + Sync>

Source§

fn table_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPreDeleteInterceptor + Send + Sync>

Source§

fn table_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn TableRowPostDeleteInterceptor + Send + Sync>

Source§

fn ringbuffer_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreInsertInterceptor + Send + Sync>

Source§

fn ringbuffer_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostInsertInterceptor + Send + Sync>

Source§

fn ringbuffer_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPreDeleteInterceptor + Send + Sync>

Source§

fn ringbuffer_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferRowPostDeleteInterceptor + Send + Sync>

Source§

fn pre_commit_interceptors( &mut self, ) -> &mut Chain<dyn PreCommitInterceptor + Send + Sync>

Source§

fn post_commit_interceptors( &mut self, ) -> &mut Chain<dyn PostCommitInterceptor + Send + Sync>

Source§

fn namespace_post_create_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePostCreateInterceptor + Send + Sync>

Source§

fn namespace_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePreUpdateInterceptor + Send + Sync>

Source§

fn namespace_post_update_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePostUpdateInterceptor + Send + Sync>

Source§

fn namespace_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn NamespacePreDeleteInterceptor + Send + Sync>

Source§

fn table_post_create_interceptors( &mut self, ) -> &mut Chain<dyn TablePostCreateInterceptor + Send + Sync>

Source§

fn table_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePreUpdateInterceptor + Send + Sync>

Source§

fn table_post_update_interceptors( &mut self, ) -> &mut Chain<dyn TablePostUpdateInterceptor + Send + Sync>

Source§

fn table_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn TablePreDeleteInterceptor + Send + Sync>

Source§

fn view_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreInsertInterceptor + Send + Sync>

Source§

fn view_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostInsertInterceptor + Send + Sync>

Source§

fn view_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreUpdateInterceptor + Send + Sync>

Source§

fn view_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostUpdateInterceptor + Send + Sync>

Source§

fn view_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPreDeleteInterceptor + Send + Sync>

Source§

fn view_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewRowPostDeleteInterceptor + Send + Sync>

Source§

fn view_post_create_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostCreateInterceptor + Send + Sync>

Source§

fn view_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreUpdateInterceptor + Send + Sync>

Source§

fn view_post_update_interceptors( &mut self, ) -> &mut Chain<dyn ViewPostUpdateInterceptor + Send + Sync>

Source§

fn view_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn ViewPreDeleteInterceptor + Send + Sync>

Source§

fn ringbuffer_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostCreateInterceptor + Send + Sync>

Source§

fn ringbuffer_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPostUpdateInterceptor + Send + Sync>

Source§

fn ringbuffer_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RingBufferPreDeleteInterceptor + Send + Sync>

Source§

fn dictionary_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreInsertInterceptor + Send + Sync>

Source§

fn dictionary_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostInsertInterceptor + Send + Sync>

Source§

fn dictionary_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreUpdateInterceptor + Send + Sync>

Source§

fn dictionary_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostUpdateInterceptor + Send + Sync>

Source§

fn dictionary_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPreDeleteInterceptor + Send + Sync>

Source§

fn dictionary_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryRowPostDeleteInterceptor + Send + Sync>

Source§

fn dictionary_post_create_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPostCreateInterceptor + Send + Sync>

Source§

fn dictionary_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPreUpdateInterceptor + Send + Sync>

Source§

fn dictionary_post_update_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPostUpdateInterceptor + Send + Sync>

Source§

fn dictionary_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn DictionaryPreDeleteInterceptor + Send + Sync>

Source§

fn series_row_pre_insert_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreInsertInterceptor + Send + Sync>

Source§

fn series_row_post_insert_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostInsertInterceptor + Send + Sync>

Source§

fn series_row_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreUpdateInterceptor + Send + Sync>

Source§

fn series_row_post_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostUpdateInterceptor + Send + Sync>

Source§

fn series_row_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPreDeleteInterceptor + Send + Sync>

Source§

fn series_row_post_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesRowPostDeleteInterceptor + Send + Sync>

Source§

fn series_post_create_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPostCreateInterceptor + Send + Sync>

Source§

fn series_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPreUpdateInterceptor + Send + Sync>

Source§

fn series_post_update_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPostUpdateInterceptor + Send + Sync>

Source§

fn series_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn SeriesPreDeleteInterceptor + Send + Sync>

Source§

fn identity_post_create_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPostCreateInterceptor + Send + Sync>

Source§

fn identity_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPreUpdateInterceptor + Send + Sync>

Source§

fn identity_post_update_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPostUpdateInterceptor + Send + Sync>

Source§

fn identity_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn IdentityPreDeleteInterceptor + Send + Sync>

Source§

fn role_post_create_interceptors( &mut self, ) -> &mut Chain<dyn RolePostCreateInterceptor + Send + Sync>

Source§

fn role_pre_update_interceptors( &mut self, ) -> &mut Chain<dyn RolePreUpdateInterceptor + Send + Sync>

Source§

fn role_post_update_interceptors( &mut self, ) -> &mut Chain<dyn RolePostUpdateInterceptor + Send + Sync>

Source§

fn role_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn RolePreDeleteInterceptor + Send + Sync>

Source§

fn granted_role_post_create_interceptors( &mut self, ) -> &mut Chain<dyn GrantedRolePostCreateInterceptor + Send + Sync>

Source§

fn granted_role_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn GrantedRolePreDeleteInterceptor + Send + Sync>

Source§

fn authentication_post_create_interceptors( &mut self, ) -> &mut Chain<dyn AuthenticationPostCreateInterceptor + Send + Sync>

Source§

fn authentication_pre_delete_interceptors( &mut self, ) -> &mut Chain<dyn AuthenticationPreDeleteInterceptor + Send + Sync>

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> GetSetFdFlags for T

Source§

fn get_fd_flags(&self) -> Result<FdFlags, Error>
where T: AsFilelike,

Query the “status” flags for the self file descriptor.
Source§

fn new_set_fd_flags(&self, fd_flags: FdFlags) -> Result<SetFdFlags<T>, Error>
where T: AsFilelike,

Create a new SetFdFlags value for use with set_fd_flags. Read more
Source§

fn set_fd_flags(&mut self, set_fd_flags: SetFdFlags<T>) -> Result<(), Error>
where T: AsFilelike,

Set the “status” flags for the self file descriptor. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Pointee for T

Source§

type Pointer = u32

Source§

fn debug( pointer: <T as Pointee>::Pointer, f: &mut Formatter<'_>, ) -> Result<(), Error>

Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more