use std::sync::Arc;
use reifydb_cdc::consume::checkpoint::CdcCheckpoint;
use reifydb_core::{
common::CommitVersion,
interface::flow::{FlowLagRow, FlowLagsProvider},
};
use reifydb_engine::engine::StandardEngine;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::value::identity::IdentityId;
use super::tracker::ShapeVersionTracker;
use crate::catalog::FlowCatalog;
pub struct FlowLags {
primitive_tracker: Arc<ShapeVersionTracker>,
engine: StandardEngine,
catalog: FlowCatalog,
}
impl FlowLags {
pub fn new(primitive_tracker: Arc<ShapeVersionTracker>, engine: StandardEngine, catalog: FlowCatalog) -> Self {
Self {
primitive_tracker,
engine,
catalog,
}
}
}
impl FlowLagsProvider for FlowLags {
fn all_lags(&self) -> Vec<FlowLagRow> {
let primitive_versions = self.primitive_tracker.all();
let mut txn = match self.engine.begin_query(IdentityId::system()) {
Ok(txn) => txn,
Err(_) => return Vec::new(),
};
let mut rows = Vec::new();
let registered = self.catalog.get_flow_ids();
for flow_id in ®istered {
let flow_version = CdcCheckpoint::fetch(&mut Transaction::Query(&mut txn), flow_id)
.unwrap_or(CommitVersion(0))
.0;
for (shape_id, version) in &primitive_versions {
let lag = version.0.saturating_sub(flow_version);
rows.push(FlowLagRow {
flow_id: *flow_id,
shape_id: *shape_id,
lag,
});
}
}
rows
}
}