use std::sync::{Arc, RwLock};
use reifydb_catalog::catalog::Catalog;
use reifydb_core::interface::{
catalog::flow::FlowId,
change::{Change, ChangeOrigin},
};
use reifydb_engine::engine::StandardEngine;
use reifydb_transaction::{
change::OperationType,
interceptor::{
interceptors::Interceptors,
transaction::{PostCommitContext, PostCommitInterceptor, PreCommitContext, PreCommitInterceptor},
},
multi::transaction::read::MultiReadTransaction,
};
use reifydb_type::Result;
use tracing::warn;
use crate::{
engine::FlowEngine,
transaction::{
FlowTransaction,
pending::{Pending, PendingWrite},
},
transactional::registrar::TransactionalFlowRegistrar,
};
pub struct TransactionalFlowPreCommitInterceptor {
pub flow_engine: Arc<RwLock<FlowEngine>>,
pub engine: StandardEngine,
pub catalog: Catalog,
}
impl PreCommitInterceptor for TransactionalFlowPreCommitInterceptor {
fn intercept(&self, ctx: &mut PreCommitContext) -> Result<()> {
let engine = self.flow_engine.read().unwrap();
execute_inline_flow_changes(&engine, &self.engine, &self.catalog, ctx)
}
}
pub(crate) fn execute_inline_flow_changes(
flow_engine: &FlowEngine,
engine: &StandardEngine,
catalog: &Catalog,
ctx: &mut PreCommitContext,
) -> Result<()> {
if ctx.flow_changes.is_empty() {
return Ok(());
}
let execution_order = flow_engine.calculate_execution_order();
if execution_order.is_empty() {
return Ok(());
}
let read_version = {
let q: MultiReadTransaction = engine.multi().begin_query()?;
q.version()
};
let mut available_changes: Vec<Change> = ctx
.flow_changes
.iter()
.map(|c| {
let mut c = c.clone();
c.version = read_version;
c
})
.collect();
for flow_id in execution_order {
let relevant: Vec<Change> = available_changes
.iter()
.filter(|c| flow_is_interested_in(c, flow_id, flow_engine))
.cloned()
.collect();
if relevant.is_empty() {
continue;
}
let primitive_query: MultiReadTransaction = engine.multi().begin_query()?;
let state_query: MultiReadTransaction = engine.multi().begin_query()?;
let interceptors: Interceptors = engine.create_interceptors();
let mut base_pending = Pending::new();
for (key, value) in &ctx.transaction_writes {
match value {
Some(v) => base_pending.insert(key.clone(), v.clone()),
None => base_pending.remove(key.clone()),
}
}
let mut flow_txn = FlowTransaction::transactional(
read_version,
Pending::new(),
base_pending,
primitive_query,
state_query,
catalog.clone(),
interceptors,
);
for change in relevant {
flow_engine.process(&mut flow_txn, change, flow_id)?;
}
let view_entries = flow_txn.take_accumulator_entries();
for (id, diff) in &view_entries {
available_changes.push(Change {
origin: ChangeOrigin::Schema(id.clone()),
version: read_version,
diffs: vec![diff.clone()],
});
}
ctx.view_entries.extend(view_entries);
let flow_pending = flow_txn.take_pending();
for (key, pw) in flow_pending.iter_sorted() {
match pw {
PendingWrite::Set(v) => ctx.pending_writes.push((key.clone(), Some(v.clone()))),
PendingWrite::Remove => ctx.pending_writes.push((key.clone(), None)),
}
}
}
Ok(())
}
fn flow_is_interested_in(change: &Change, flow_id: FlowId, engine: &FlowEngine) -> bool {
if let ChangeOrigin::Schema(source) = change.origin {
engine.sources
.get(&source)
.map(|registrations| registrations.iter().any(|(fid, _)| *fid == flow_id))
.unwrap_or(false)
} else {
false
}
}
pub struct TransactionalFlowPostCommitInterceptor {
pub registrar: TransactionalFlowRegistrar,
}
impl PostCommitInterceptor for TransactionalFlowPostCommitInterceptor {
fn intercept(&self, ctx: &mut PostCommitContext) -> Result<()> {
for flow_change in &ctx.changes.flow {
if flow_change.op == OperationType::Create {
if let Some(flow) = &flow_change.post {
if let Err(e) = self.registrar.try_register_by_id(flow.id) {
warn!(
flow_id = flow.id.0,
error = %e,
"failed to register transactional flow on commit"
);
}
}
}
}
Ok(())
}
}