use std::sync::Arc;
use reifydb_core::{
common::CommitVersion,
encoded::key::{EncodedKey, EncodedKeyRange},
interface::{
catalog::{
authentication::{Authentication, AuthenticationId},
config::{Config, ConfigKey},
dictionary::Dictionary,
flow::{Flow, FlowId},
handler::Handler,
id::{
HandlerId, MigrationId, NamespaceId, ProcedureId, RingBufferId, SeriesId, SinkId,
SourceId, TableId, TestId, ViewId,
},
identity::{GrantedRole, Identity, Role, RoleId},
migration::Migration,
namespace::Namespace,
policy::{Policy, PolicyId},
procedure::Procedure,
ringbuffer::RingBuffer,
series::Series,
shape::ShapeId,
sink::Sink,
source::Source,
sumtype::SumType,
table::Table,
test::Test,
view::View,
},
store::{MultiVersionBatch, MultiVersionRow},
},
row::RowTtl,
};
use reifydb_type::{
Result,
params::Params,
value::{dictionary::DictionaryId, frame::frame::Frame, identity::IdentityId, sumtype::SumTypeId},
};
use tracing::instrument;
use crate::{
TransactionId,
change::{
TransactionalAuthenticationChanges, TransactionalChanges, TransactionalConfigChanges,
TransactionalDictionaryChanges, TransactionalFlowChanges, TransactionalGrantedRoleChanges,
TransactionalHandlerChanges, TransactionalIdentityChanges, TransactionalMigrationChanges,
TransactionalNamespaceChanges, TransactionalPolicyChanges, TransactionalProcedureChanges,
TransactionalRingBufferChanges, TransactionalRoleChanges, TransactionalRowTtlChanges,
TransactionalSeriesChanges, TransactionalSinkChanges, TransactionalSourceChanges,
TransactionalSumTypeChanges, TransactionalTableChanges, TransactionalTestChanges,
TransactionalViewChanges,
},
multi::transaction::read::MultiReadTransaction,
single::{SingleTransaction, read::SingleReadTransaction},
transaction::{RqlExecutor, Transaction},
};
pub struct QueryTransaction {
pub(crate) multi: MultiReadTransaction,
pub(crate) single: SingleTransaction,
pub identity: IdentityId,
pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
}
impl QueryTransaction {
#[instrument(name = "transaction::query::new", level = "debug", skip_all)]
pub fn new(multi: MultiReadTransaction, single: SingleTransaction, identity: IdentityId) -> Self {
Self {
multi,
single,
identity,
executor: None,
}
}
pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
self.executor = Some(executor);
}
pub fn rql(&mut self, rql: &str, params: Params) -> Result<Vec<Frame>> {
let executor = self.executor.clone().expect("RqlExecutor not set");
executor.rql(&mut Transaction::Query(self), rql, params)
}
#[inline]
pub fn version(&self) -> CommitVersion {
self.multi.version()
}
#[inline]
pub fn id(&self) -> TransactionId {
self.multi.tm.id()
}
#[inline]
pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
Ok(self.multi.get(key)?.map(|v| v.into_multi_version_row()))
}
#[inline]
pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
self.multi.contains_key(key)
}
#[inline]
pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
self.multi.prefix(prefix)
}
#[inline]
pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
self.multi.prefix_rev(prefix)
}
#[inline]
pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
self.multi.read_as_of_version_exclusive(version);
Ok(())
}
#[inline]
pub fn range(
&self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
self.multi.range(range, batch_size)
}
#[inline]
pub fn range_rev(
&self,
range: EncodedKeyRange,
batch_size: usize,
) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
self.multi.range_rev(range, batch_size)
}
#[instrument(name = "transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
where
I: IntoIterator<Item = &'a EncodedKey> + Send,
F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
R: Send,
{
self.single.with_query(keys, f)
}
#[instrument(name = "transaction::query::with_multi_query", level = "trace", skip(self, f))]
pub fn with_multi_query<F, R>(&mut self, f: F) -> Result<R>
where
F: FnOnce(&mut MultiReadTransaction) -> Result<R>,
{
f(&mut self.multi)
}
#[instrument(name = "transaction::query::begin_single_query", level = "trace", skip(self, keys))]
pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
where
I: IntoIterator<Item = &'a EncodedKey>,
{
self.single.begin_query(keys)
}
}
impl TransactionalDictionaryChanges for QueryTransaction {
fn find_dictionary(&self, _id: DictionaryId) -> Option<&Dictionary> {
None
}
fn find_dictionary_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Dictionary> {
None
}
fn is_dictionary_deleted(&self, _id: DictionaryId) -> bool {
false
}
fn is_dictionary_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalFlowChanges for QueryTransaction {
fn find_flow(&self, _id: FlowId) -> Option<&Flow> {
None
}
fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Flow> {
None
}
fn is_flow_deleted(&self, _id: FlowId) -> bool {
false
}
fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalNamespaceChanges for QueryTransaction {
fn find_namespace(&self, _id: NamespaceId) -> Option<&Namespace> {
None
}
fn find_namespace_by_name(&self, _name: &str) -> Option<&Namespace> {
None
}
fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
false
}
fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
false
}
}
impl TransactionalProcedureChanges for QueryTransaction {
fn find_procedure(&self, _id: ProcedureId) -> Option<&Procedure> {
None
}
fn find_procedure_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Procedure> {
None
}
fn is_procedure_deleted(&self, _id: ProcedureId) -> bool {
false
}
fn is_procedure_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalTestChanges for QueryTransaction {
fn find_test(&self, _id: TestId) -> Option<&Test> {
None
}
fn find_test_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Test> {
None
}
fn is_test_deleted(&self, _id: TestId) -> bool {
false
}
fn is_test_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalRingBufferChanges for QueryTransaction {
fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBuffer> {
None
}
fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBuffer> {
None
}
fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
false
}
fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalSeriesChanges for QueryTransaction {
fn find_series(&self, _id: SeriesId) -> Option<&Series> {
None
}
fn find_series_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Series> {
None
}
fn is_series_deleted(&self, _id: SeriesId) -> bool {
false
}
fn is_series_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalTableChanges for QueryTransaction {
fn find_table(&self, _id: TableId) -> Option<&Table> {
None
}
fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Table> {
None
}
fn is_table_deleted(&self, _id: TableId) -> bool {
false
}
fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalViewChanges for QueryTransaction {
fn find_view(&self, _id: ViewId) -> Option<&View> {
None
}
fn find_view_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&View> {
None
}
fn is_view_deleted(&self, _id: ViewId) -> bool {
false
}
fn is_view_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalSumTypeChanges for QueryTransaction {
fn find_sumtype(&self, _id: SumTypeId) -> Option<&SumType> {
None
}
fn find_sumtype_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&SumType> {
None
}
fn is_sumtype_deleted(&self, _id: SumTypeId) -> bool {
false
}
fn is_sumtype_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalHandlerChanges for QueryTransaction {
fn find_handler_by_id(&self, _id: HandlerId) -> Option<&Handler> {
None
}
fn find_handler_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Handler> {
None
}
fn is_handler_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalIdentityChanges for QueryTransaction {
fn find_identity(&self, _id: IdentityId) -> Option<&Identity> {
None
}
fn find_identity_by_name(&self, _name: &str) -> Option<&Identity> {
None
}
fn is_identity_deleted(&self, _id: IdentityId) -> bool {
false
}
fn is_identity_deleted_by_name(&self, _name: &str) -> bool {
false
}
}
impl TransactionalRoleChanges for QueryTransaction {
fn find_role(&self, _id: RoleId) -> Option<&Role> {
None
}
fn find_role_by_name(&self, _name: &str) -> Option<&Role> {
None
}
fn is_role_deleted(&self, _id: RoleId) -> bool {
false
}
fn is_role_deleted_by_name(&self, _name: &str) -> bool {
false
}
}
impl TransactionalGrantedRoleChanges for QueryTransaction {
fn find_granted_role(&self, _identity: IdentityId, _role: RoleId) -> Option<&GrantedRole> {
None
}
fn find_granted_roles_for_identity(&self, _identity: IdentityId) -> Vec<&GrantedRole> {
Vec::new()
}
fn is_granted_role_deleted(&self, _identity: IdentityId, _role: RoleId) -> bool {
false
}
}
impl TransactionalPolicyChanges for QueryTransaction {
fn find_policy(&self, _id: PolicyId) -> Option<&Policy> {
None
}
fn find_policy_by_name(&self, _name: &str) -> Option<&Policy> {
None
}
fn is_policy_deleted(&self, _id: PolicyId) -> bool {
false
}
fn is_policy_deleted_by_name(&self, _name: &str) -> bool {
false
}
}
impl TransactionalMigrationChanges for QueryTransaction {
fn find_migration(&self, _id: MigrationId) -> Option<&Migration> {
None
}
fn find_migration_by_name(&self, _name: &str) -> Option<&Migration> {
None
}
fn is_migration_deleted(&self, _id: MigrationId) -> bool {
false
}
fn is_migration_deleted_by_name(&self, _name: &str) -> bool {
false
}
}
impl TransactionalAuthenticationChanges for QueryTransaction {
fn find_authentication(&self, _id: AuthenticationId) -> Option<&Authentication> {
None
}
fn find_authentication_by_identity_and_method(
&self,
_identity: IdentityId,
_method: &str,
) -> Option<&Authentication> {
None
}
fn is_authentication_deleted(&self, _id: AuthenticationId) -> bool {
false
}
}
impl TransactionalSourceChanges for QueryTransaction {
fn find_source(&self, _id: SourceId) -> Option<&Source> {
None
}
fn find_source_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Source> {
None
}
fn is_source_deleted(&self, _id: SourceId) -> bool {
false
}
fn is_source_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalSinkChanges for QueryTransaction {
fn find_sink(&self, _id: SinkId) -> Option<&Sink> {
None
}
fn find_sink_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Sink> {
None
}
fn is_sink_deleted(&self, _id: SinkId) -> bool {
false
}
fn is_sink_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
false
}
}
impl TransactionalConfigChanges for QueryTransaction {
fn find_config(&self, _key: ConfigKey) -> Option<&Config> {
None
}
}
impl TransactionalRowTtlChanges for QueryTransaction {
fn find_row_ttl(&self, _shape: ShapeId) -> Option<&RowTtl> {
None
}
fn is_row_ttl_deleted(&self, _shape: ShapeId) -> bool {
false
}
}
impl TransactionalChanges for QueryTransaction {}