reifydb-sub-flow 0.4.5

Flow subsystem for stream processing and data flows
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

//! Provides flow lag data for the system.flow_lags virtual table.

use std::sync::Arc;

use reifydb_cdc::consume::checkpoint::CdcCheckpoint;
use reifydb_core::{
	common::CommitVersion,
	interface::flow::{FlowLagRow, FlowLagsProvider},
};
use reifydb_engine::engine::StandardEngine;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::value::identity::IdentityId;

use super::tracker::SchemaVersionTracker;
use crate::catalog::FlowCatalog;

/// Provides flow lag data for virtual table queries.
///
/// Each flow's progress is tracked individually via per-flow CDC checkpoints.
/// This enables accurate per-flow lag reporting and supports exactly-once
/// processing semantics during backfill restarts.
pub struct FlowLags {
	primitive_tracker: Arc<SchemaVersionTracker>,
	engine: StandardEngine,
	catalog: FlowCatalog,
}

impl FlowLags {
	/// Create a new flow lags provider.
	pub fn new(primitive_tracker: Arc<SchemaVersionTracker>, engine: StandardEngine, catalog: FlowCatalog) -> Self {
		Self {
			primitive_tracker,
			engine,
			catalog,
		}
	}
}

impl FlowLagsProvider for FlowLags {
	/// Get all flow lag rows.
	///
	/// Returns one row per (flow, primitive) pair, showing how far behind
	/// each flow is for each source primitive.
	fn all_lags(&self) -> Vec<FlowLagRow> {
		let primitive_versions = self.primitive_tracker.all();

		let mut txn = match self.engine.begin_query(IdentityId::system()) {
			Ok(txn) => txn,
			Err(_) => return Vec::new(),
		};

		let mut rows = Vec::new();

		// Get registered flows from catalog
		let registered = self.catalog.get_flow_ids();

		// Calculate lags only for registered flows
		for flow_id in &registered {
			let flow_version = CdcCheckpoint::fetch(&mut Transaction::Query(&mut txn), flow_id)
				.unwrap_or(CommitVersion(0))
				.0;

			for (object_id, version) in &primitive_versions {
				let lag = version.0.saturating_sub(flow_version);
				rows.push(FlowLagRow {
					flow_id: *flow_id,
					object_id: *object_id,
					lag,
				});
			}
		}

		rows
	}
}