use std::sync::Arc;
use reifydb_core::interface::{
catalog::flow::FlowId,
change::{Change, ChangeOrigin},
};
use reifydb_rql::flow::{
flow::FlowDag,
node::{FlowNode, FlowNodeType::SourceInlineData},
};
use reifydb_type::{Result, value::datetime::DateTime};
use tracing::{Span, field, instrument};
use crate::{engine::FlowEngine, transaction::FlowTransaction};
impl FlowEngine {
#[instrument(name = "flow::engine::process", level = "debug", skip(self, txn), fields(
flow_id = ?flow_id,
origin = ?change.origin,
version = change.version.0,
diff_count = change.diffs.len(),
nodes_processed = field::Empty
))]
pub fn process(&self, txn: &mut FlowTransaction, change: Change, flow_id: FlowId) -> Result<()> {
let mut nodes_processed = 0;
match change.origin {
ChangeOrigin::Shape(source) => {
let node_registrations = self.sources.get(&source).cloned();
if let Some(node_registrations) = node_registrations {
for (registered_flow_id, node_id) in node_registrations {
if registered_flow_id != flow_id {
continue;
}
let flow_and_node =
self.flows.get(®istered_flow_id).and_then(|flow| {
flow.get_node(&node_id)
.map(|node| (flow.clone(), node.clone()))
});
if let Some((flow, node)) = flow_and_node {
self.process_change(
txn,
&flow,
&node,
Arc::new(Change::from_flow(
node_id,
change.version,
change.diffs.clone(),
change.changed_at,
)),
)?;
nodes_processed += 1;
}
}
}
}
ChangeOrigin::Flow(node_id) => {
let flow_and_node = self.flows.get(&flow_id).and_then(|flow| {
flow.get_node(&node_id).map(|node| (flow.clone(), node.clone()))
});
if let Some((flow, node)) = flow_and_node {
self.process_change(txn, &flow, &node, Arc::new(change))?;
nodes_processed += 1;
}
}
}
Span::current().record("nodes_processed", nodes_processed);
Ok(())
}
#[instrument(name = "flow::engine::apply", level = "trace", skip(self, txn), fields(
node_id = ?node.id,
node_type = ?node.ty,
input_diffs = change.diffs.len(),
output_diffs = field::Empty,
lock_wait_us = field::Empty,
apply_time_us = field::Empty
))]
fn apply(&self, txn: &mut FlowTransaction, node: &FlowNode, change: Arc<Change>) -> Result<Change> {
let lock_start = self.runtime_context.clock.instant();
let operator = self.operators.get(&node.id).unwrap().clone();
Span::current().record("lock_wait_us", lock_start.elapsed().as_micros() as u64);
let owned = Arc::try_unwrap(change).unwrap_or_else(|arc| (*arc).clone());
let apply_start = self.runtime_context.clock.instant();
let result = operator.apply(txn, owned)?;
Span::current().record("apply_time_us", apply_start.elapsed().as_micros() as u64);
Span::current().record("output_diffs", result.diffs.len());
Ok(result)
}
#[instrument(name = "flow::engine::process_change", level = "trace", skip(self, txn, flow), fields(
flow_id = ?flow.id,
node_id = ?node.id,
input_diffs = change.diffs.len(),
output_diffs = field::Empty,
downstream_count = node.outputs.len(),
propagation_time_us = field::Empty
))]
fn process_change(
&self,
txn: &mut FlowTransaction,
flow: &FlowDag,
node: &FlowNode,
change: Arc<Change>,
) -> Result<()> {
let node_type = &node.ty;
let changes = &node.outputs;
let change: Arc<Change> = match &node_type {
SourceInlineData {} => unimplemented!(),
_ => {
let result = self.apply(txn, node, change)?;
Span::current().record("output_diffs", result.diffs.len());
Arc::new(result)
}
};
let propagation_start = self.runtime_context.clock.instant();
if changes.is_empty() {
} else if changes.len() == 1 {
let output_id = changes[0];
self.process_change(txn, flow, flow.get_node(&output_id).unwrap(), change)?;
} else {
let (last, rest) = changes.split_last().unwrap();
for output_id in rest {
self.process_change(txn, flow, flow.get_node(output_id).unwrap(), Arc::clone(&change))?;
}
self.process_change(txn, flow, flow.get_node(last).unwrap(), change)?;
}
Span::current().record("propagation_time_us", propagation_start.elapsed().as_micros() as u64);
Ok(())
}
#[instrument(name = "flow::engine::process_tick", level = "debug", skip(self, txn), fields(
flow_id = ?flow_id,
timestamp = %timestamp
))]
pub fn process_tick(&self, txn: &mut FlowTransaction, flow_id: FlowId, timestamp: DateTime) -> Result<()> {
let flow = match self.flows.get(&flow_id) {
Some(f) => f.clone(),
None => return Ok(()),
};
for node_id in flow.topological_order()? {
let operator = match self.operators.get(&node_id) {
Some(op) => op.clone(),
None => continue,
};
if let Some(change) = operator.tick(txn, timestamp)? {
let node = flow.get_node(&node_id).unwrap();
let outputs = &node.outputs;
if outputs.is_empty() {
} else if outputs.len() == 1 {
self.process_change(
txn,
&flow,
flow.get_node(&outputs[0]).unwrap(),
Arc::new(change),
)?;
} else {
let arc = Arc::new(change);
let (last, rest) = outputs.split_last().unwrap();
for output_id in rest {
self.process_change(
txn,
&flow,
flow.get_node(output_id).unwrap(),
Arc::clone(&arc),
)?;
}
self.process_change(txn, &flow, flow.get_node(last).unwrap(), arc)?;
}
}
}
Ok(())
}
}