use std::mem;
use pending::{Pending, PendingWrite};
use reifydb_catalog::catalog::Catalog;
use reifydb_core::{
common::CommitVersion,
interface::{
catalog::schema::SchemaId,
change::{Change, ChangeOrigin, Diff},
},
};
use reifydb_transaction::{
change_accumulator::ChangeAccumulator,
interceptor::{
WithInterceptors,
authentication::{AuthenticationPostCreateInterceptor, AuthenticationPreDeleteInterceptor},
chain::InterceptorChain as Chain,
dictionary::{
DictionaryPostCreateInterceptor, DictionaryPostUpdateInterceptor,
DictionaryPreDeleteInterceptor, DictionaryPreUpdateInterceptor,
},
dictionary_row::{
DictionaryRowPostDeleteInterceptor, DictionaryRowPostInsertInterceptor,
DictionaryRowPostUpdateInterceptor, DictionaryRowPreDeleteInterceptor,
DictionaryRowPreInsertInterceptor, DictionaryRowPreUpdateInterceptor,
},
granted_role::{GrantedRolePostCreateInterceptor, GrantedRolePreDeleteInterceptor},
identity::{
IdentityPostCreateInterceptor, IdentityPostUpdateInterceptor, IdentityPreDeleteInterceptor,
IdentityPreUpdateInterceptor,
},
interceptors::Interceptors,
namespace::{
NamespacePostCreateInterceptor, NamespacePostUpdateInterceptor, NamespacePreDeleteInterceptor,
NamespacePreUpdateInterceptor,
},
ringbuffer::{
RingBufferPostCreateInterceptor, RingBufferPostUpdateInterceptor,
RingBufferPreDeleteInterceptor, RingBufferPreUpdateInterceptor,
},
ringbuffer_row::{
RingBufferRowPostDeleteInterceptor, RingBufferRowPostInsertInterceptor,
RingBufferRowPostUpdateInterceptor, RingBufferRowPreDeleteInterceptor,
RingBufferRowPreInsertInterceptor, RingBufferRowPreUpdateInterceptor,
},
role::{
RolePostCreateInterceptor, RolePostUpdateInterceptor, RolePreDeleteInterceptor,
RolePreUpdateInterceptor,
},
series::{
SeriesPostCreateInterceptor, SeriesPostUpdateInterceptor, SeriesPreDeleteInterceptor,
SeriesPreUpdateInterceptor,
},
series_row::{
SeriesRowPostDeleteInterceptor, SeriesRowPostInsertInterceptor, SeriesRowPostUpdateInterceptor,
SeriesRowPreDeleteInterceptor, SeriesRowPreInsertInterceptor, SeriesRowPreUpdateInterceptor,
},
table::{
TablePostCreateInterceptor, TablePostUpdateInterceptor, TablePreDeleteInterceptor,
TablePreUpdateInterceptor,
},
table_row::{
TableRowPostDeleteInterceptor, TableRowPostInsertInterceptor, TableRowPostUpdateInterceptor,
TableRowPreDeleteInterceptor, TableRowPreInsertInterceptor, TableRowPreUpdateInterceptor,
},
transaction::{PostCommitInterceptor, PreCommitInterceptor},
view::{
ViewPostCreateInterceptor, ViewPostUpdateInterceptor, ViewPreDeleteInterceptor,
ViewPreUpdateInterceptor,
},
view_row::{
ViewRowPostDeleteInterceptor, ViewRowPostInsertInterceptor, ViewRowPostUpdateInterceptor,
ViewRowPreDeleteInterceptor, ViewRowPreInsertInterceptor, ViewRowPreUpdateInterceptor,
},
},
multi::transaction::read::MultiReadTransaction,
transaction::admin::AdminTransaction,
};
use tracing::instrument;
pub mod pending;
pub mod range;
pub mod read;
pub mod state;
pub mod write;
pub struct FlowTransactionInner {
pub version: CommitVersion,
pub pending: Pending,
pub primitive_query: MultiReadTransaction,
pub state_query: MultiReadTransaction,
pub catalog: Catalog,
pub interceptors: Interceptors,
pub accumulator: ChangeAccumulator,
}
pub enum FlowTransaction {
Deferred {
inner: FlowTransactionInner,
},
Transactional {
inner: FlowTransactionInner,
base_pending: Pending,
},
}
impl FlowTransaction {
fn inner(&self) -> &FlowTransactionInner {
match self {
Self::Deferred {
inner,
..
}
| Self::Transactional {
inner,
..
} => inner,
}
}
fn inner_mut(&mut self) -> &mut FlowTransactionInner {
match self {
Self::Deferred {
inner,
..
}
| Self::Transactional {
inner,
..
} => inner,
}
}
#[instrument(name = "flow::transaction::deferred", level = "debug", skip(parent, catalog, interceptors), fields(version = version.0))]
pub fn deferred(
parent: &AdminTransaction,
version: CommitVersion,
catalog: Catalog,
interceptors: Interceptors,
) -> Self {
let mut primitive_query = parent.multi.begin_query().unwrap();
primitive_query.read_as_of_version_inclusive(version);
let state_query = parent.multi.begin_query().unwrap();
Self::Deferred {
inner: FlowTransactionInner {
version,
pending: Pending::new(),
primitive_query,
state_query,
catalog,
interceptors,
accumulator: ChangeAccumulator::new(),
},
}
}
pub fn deferred_from_parts(
version: CommitVersion,
pending: Pending,
primitive_query: MultiReadTransaction,
state_query: MultiReadTransaction,
catalog: Catalog,
interceptors: Interceptors,
) -> Self {
Self::Deferred {
inner: FlowTransactionInner {
version,
pending,
primitive_query,
state_query,
catalog,
interceptors,
accumulator: ChangeAccumulator::new(),
},
}
}
pub fn transactional(
version: CommitVersion,
pending: Pending,
base_pending: Pending,
primitive_query: MultiReadTransaction,
state_query: MultiReadTransaction,
catalog: Catalog,
interceptors: Interceptors,
) -> Self {
Self::Transactional {
inner: FlowTransactionInner {
version,
pending,
primitive_query,
state_query,
catalog,
interceptors,
accumulator: ChangeAccumulator::new(),
},
base_pending,
}
}
pub fn version(&self) -> CommitVersion {
self.inner().version
}
pub fn take_pending(&mut self) -> Pending {
mem::take(&mut self.inner_mut().pending)
}
pub fn track_flow_change(&mut self, change: Change) {
if let ChangeOrigin::Schema(id) = change.origin {
for diff in change.diffs {
self.inner_mut().accumulator.track(id, diff);
}
}
}
pub fn take_accumulator_entries(&mut self) -> Vec<(SchemaId, Diff)> {
let acc = &mut self.inner_mut().accumulator;
let entries: Vec<_> = acc.entries_from(0).to_vec();
acc.clear();
entries
}
#[cfg(test)]
pub fn pending(&self) -> &Pending {
&self.inner().pending
}
pub fn update_version(&mut self, new_version: CommitVersion) {
let inner = self.inner_mut();
inner.version = new_version;
inner.primitive_query.read_as_of_version_inclusive(new_version);
}
pub fn catalog(&self) -> &Catalog {
&self.inner().catalog
}
}
macro_rules! interceptor_method {
($method:ident, $field:ident, $trait_name:ident) => {
fn $method(&mut self) -> &mut Chain<dyn $trait_name + Send + Sync> {
&mut self.inner_mut().interceptors.$field
}
};
}
impl WithInterceptors for FlowTransaction {
interceptor_method!(table_row_pre_insert_interceptors, table_row_pre_insert, TableRowPreInsertInterceptor);
interceptor_method!(table_row_post_insert_interceptors, table_row_post_insert, TableRowPostInsertInterceptor);
interceptor_method!(table_row_pre_update_interceptors, table_row_pre_update, TableRowPreUpdateInterceptor);
interceptor_method!(table_row_post_update_interceptors, table_row_post_update, TableRowPostUpdateInterceptor);
interceptor_method!(table_row_pre_delete_interceptors, table_row_pre_delete, TableRowPreDeleteInterceptor);
interceptor_method!(table_row_post_delete_interceptors, table_row_post_delete, TableRowPostDeleteInterceptor);
interceptor_method!(
ringbuffer_row_pre_insert_interceptors,
ringbuffer_row_pre_insert,
RingBufferRowPreInsertInterceptor
);
interceptor_method!(
ringbuffer_row_post_insert_interceptors,
ringbuffer_row_post_insert,
RingBufferRowPostInsertInterceptor
);
interceptor_method!(
ringbuffer_row_pre_update_interceptors,
ringbuffer_row_pre_update,
RingBufferRowPreUpdateInterceptor
);
interceptor_method!(
ringbuffer_row_post_update_interceptors,
ringbuffer_row_post_update,
RingBufferRowPostUpdateInterceptor
);
interceptor_method!(
ringbuffer_row_pre_delete_interceptors,
ringbuffer_row_pre_delete,
RingBufferRowPreDeleteInterceptor
);
interceptor_method!(
ringbuffer_row_post_delete_interceptors,
ringbuffer_row_post_delete,
RingBufferRowPostDeleteInterceptor
);
interceptor_method!(pre_commit_interceptors, pre_commit, PreCommitInterceptor);
interceptor_method!(post_commit_interceptors, post_commit, PostCommitInterceptor);
interceptor_method!(namespace_post_create_interceptors, namespace_post_create, NamespacePostCreateInterceptor);
interceptor_method!(namespace_pre_update_interceptors, namespace_pre_update, NamespacePreUpdateInterceptor);
interceptor_method!(namespace_post_update_interceptors, namespace_post_update, NamespacePostUpdateInterceptor);
interceptor_method!(namespace_pre_delete_interceptors, namespace_pre_delete, NamespacePreDeleteInterceptor);
interceptor_method!(table_post_create_interceptors, table_post_create, TablePostCreateInterceptor);
interceptor_method!(table_pre_update_interceptors, table_pre_update, TablePreUpdateInterceptor);
interceptor_method!(table_post_update_interceptors, table_post_update, TablePostUpdateInterceptor);
interceptor_method!(table_pre_delete_interceptors, table_pre_delete, TablePreDeleteInterceptor);
interceptor_method!(view_row_pre_insert_interceptors, view_row_pre_insert, ViewRowPreInsertInterceptor);
interceptor_method!(view_row_post_insert_interceptors, view_row_post_insert, ViewRowPostInsertInterceptor);
interceptor_method!(view_row_pre_update_interceptors, view_row_pre_update, ViewRowPreUpdateInterceptor);
interceptor_method!(view_row_post_update_interceptors, view_row_post_update, ViewRowPostUpdateInterceptor);
interceptor_method!(view_row_pre_delete_interceptors, view_row_pre_delete, ViewRowPreDeleteInterceptor);
interceptor_method!(view_row_post_delete_interceptors, view_row_post_delete, ViewRowPostDeleteInterceptor);
interceptor_method!(view_post_create_interceptors, view_post_create, ViewPostCreateInterceptor);
interceptor_method!(view_pre_update_interceptors, view_pre_update, ViewPreUpdateInterceptor);
interceptor_method!(view_post_update_interceptors, view_post_update, ViewPostUpdateInterceptor);
interceptor_method!(view_pre_delete_interceptors, view_pre_delete, ViewPreDeleteInterceptor);
interceptor_method!(
ringbuffer_post_create_interceptors,
ringbuffer_post_create,
RingBufferPostCreateInterceptor
);
interceptor_method!(ringbuffer_pre_update_interceptors, ringbuffer_pre_update, RingBufferPreUpdateInterceptor);
interceptor_method!(
ringbuffer_post_update_interceptors,
ringbuffer_post_update,
RingBufferPostUpdateInterceptor
);
interceptor_method!(ringbuffer_pre_delete_interceptors, ringbuffer_pre_delete, RingBufferPreDeleteInterceptor);
interceptor_method!(
dictionary_row_pre_insert_interceptors,
dictionary_row_pre_insert,
DictionaryRowPreInsertInterceptor
);
interceptor_method!(
dictionary_row_post_insert_interceptors,
dictionary_row_post_insert,
DictionaryRowPostInsertInterceptor
);
interceptor_method!(
dictionary_row_pre_update_interceptors,
dictionary_row_pre_update,
DictionaryRowPreUpdateInterceptor
);
interceptor_method!(
dictionary_row_post_update_interceptors,
dictionary_row_post_update,
DictionaryRowPostUpdateInterceptor
);
interceptor_method!(
dictionary_row_pre_delete_interceptors,
dictionary_row_pre_delete,
DictionaryRowPreDeleteInterceptor
);
interceptor_method!(
dictionary_row_post_delete_interceptors,
dictionary_row_post_delete,
DictionaryRowPostDeleteInterceptor
);
interceptor_method!(
dictionary_post_create_interceptors,
dictionary_post_create,
DictionaryPostCreateInterceptor
);
interceptor_method!(dictionary_pre_update_interceptors, dictionary_pre_update, DictionaryPreUpdateInterceptor);
interceptor_method!(
dictionary_post_update_interceptors,
dictionary_post_update,
DictionaryPostUpdateInterceptor
);
interceptor_method!(dictionary_pre_delete_interceptors, dictionary_pre_delete, DictionaryPreDeleteInterceptor);
interceptor_method!(series_row_pre_insert_interceptors, series_row_pre_insert, SeriesRowPreInsertInterceptor);
interceptor_method!(
series_row_post_insert_interceptors,
series_row_post_insert,
SeriesRowPostInsertInterceptor
);
interceptor_method!(series_row_pre_update_interceptors, series_row_pre_update, SeriesRowPreUpdateInterceptor);
interceptor_method!(
series_row_post_update_interceptors,
series_row_post_update,
SeriesRowPostUpdateInterceptor
);
interceptor_method!(series_row_pre_delete_interceptors, series_row_pre_delete, SeriesRowPreDeleteInterceptor);
interceptor_method!(
series_row_post_delete_interceptors,
series_row_post_delete,
SeriesRowPostDeleteInterceptor
);
interceptor_method!(series_post_create_interceptors, series_post_create, SeriesPostCreateInterceptor);
interceptor_method!(series_pre_update_interceptors, series_pre_update, SeriesPreUpdateInterceptor);
interceptor_method!(series_post_update_interceptors, series_post_update, SeriesPostUpdateInterceptor);
interceptor_method!(series_pre_delete_interceptors, series_pre_delete, SeriesPreDeleteInterceptor);
interceptor_method!(identity_post_create_interceptors, identity_post_create, IdentityPostCreateInterceptor);
interceptor_method!(identity_pre_update_interceptors, identity_pre_update, IdentityPreUpdateInterceptor);
interceptor_method!(identity_post_update_interceptors, identity_post_update, IdentityPostUpdateInterceptor);
interceptor_method!(identity_pre_delete_interceptors, identity_pre_delete, IdentityPreDeleteInterceptor);
interceptor_method!(role_post_create_interceptors, role_post_create, RolePostCreateInterceptor);
interceptor_method!(role_pre_update_interceptors, role_pre_update, RolePreUpdateInterceptor);
interceptor_method!(role_post_update_interceptors, role_post_update, RolePostUpdateInterceptor);
interceptor_method!(role_pre_delete_interceptors, role_pre_delete, RolePreDeleteInterceptor);
interceptor_method!(
granted_role_post_create_interceptors,
granted_role_post_create,
GrantedRolePostCreateInterceptor
);
interceptor_method!(
granted_role_pre_delete_interceptors,
granted_role_pre_delete,
GrantedRolePreDeleteInterceptor
);
interceptor_method!(
authentication_post_create_interceptors,
authentication_post_create,
AuthenticationPostCreateInterceptor
);
interceptor_method!(
authentication_pre_delete_interceptors,
authentication_pre_delete,
AuthenticationPreDeleteInterceptor
);
}