use reifydb_core::{
interface::catalog::{
change::CatalogTrackFlowChangeOperations,
flow::{Flow, FlowEdgeId, FlowId, FlowNodeId, FlowStatus},
id::NamespaceId,
},
internal,
};
use reifydb_transaction::{
change::TransactionalFlowChanges,
transaction::{Transaction, admin::AdminTransaction},
};
use reifydb_type::{error, fragment::Fragment, value::duration::Duration};
use tracing::{instrument, warn};
use crate::{
CatalogStore, Result,
catalog::Catalog,
store::{flow::create::FlowToCreate as StoreFlowToCreate, sequence::flow as flow_sequence},
};
#[derive(Debug, Clone)]
pub struct FlowToCreate {
pub name: Fragment,
pub namespace: NamespaceId,
pub status: FlowStatus,
pub tick: Option<Duration>,
}
impl From<FlowToCreate> for StoreFlowToCreate {
fn from(to_create: FlowToCreate) -> Self {
StoreFlowToCreate {
name: to_create.name,
namespace: to_create.namespace,
status: to_create.status,
tick: to_create.tick,
}
}
}
impl Catalog {
#[instrument(name = "catalog::flow::find", level = "trace", skip(self, txn))]
pub fn find_flow(&self, txn: &mut Transaction<'_>, id: FlowId) -> Result<Option<Flow>> {
match txn.reborrow() {
Transaction::Command(cmd) => {
if let Some(flow) = self.materialized.find_flow_at(id, cmd.version()) {
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow(&mut Transaction::Command(&mut *cmd), id)? {
warn!("Flow with ID {:?} found in storage but not in MaterializedCatalog", id);
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Admin(admin) => {
if let Some(flow) = TransactionalFlowChanges::find_flow(admin, id) {
return Ok(Some(flow.clone()));
}
if TransactionalFlowChanges::is_flow_deleted(admin, id) {
return Ok(None);
}
if let Some(flow) = self.materialized.find_flow_at(id, admin.version()) {
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow(&mut Transaction::Admin(&mut *admin), id)? {
warn!("Flow with ID {:?} found in storage but not in MaterializedCatalog", id);
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(flow) = self.materialized.find_flow_at(id, qry.version()) {
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow(&mut Transaction::Query(&mut *qry), id)? {
warn!("Flow with ID {:?} found in storage but not in MaterializedCatalog", id);
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Test(mut t) => {
if let Some(flow) = TransactionalFlowChanges::find_flow(t.inner, id) {
return Ok(Some(flow.clone()));
}
if TransactionalFlowChanges::is_flow_deleted(t.inner, id) {
return Ok(None);
}
if let Some(flow) =
CatalogStore::find_flow(&mut Transaction::Test(Box::new(t.reborrow())), id)?
{
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(flow) = self.materialized.find_flow_at(id, rep.version()) {
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow(&mut Transaction::Replica(&mut *rep), id)? {
warn!("Flow with ID {:?} found in storage but not in MaterializedCatalog", id);
return Ok(Some(flow));
}
Ok(None)
}
}
}
#[instrument(name = "catalog::flow::find_by_name", level = "trace", skip(self, txn, name))]
pub fn find_flow_by_name(
&self,
txn: &mut Transaction<'_>,
namespace: NamespaceId,
name: &str,
) -> Result<Option<Flow>> {
match txn.reborrow() {
Transaction::Command(cmd) => {
if let Some(flow) =
self.materialized.find_flow_by_name_at(namespace, name, cmd.version())
{
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow_by_name(
&mut Transaction::Command(&mut *cmd),
namespace,
name,
)? {
warn!(
"Flow '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
name, namespace
);
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Admin(admin) => {
if let Some(flow) = TransactionalFlowChanges::find_flow_by_name(admin, namespace, name)
{
return Ok(Some(flow.clone()));
}
if TransactionalFlowChanges::is_flow_deleted_by_name(admin, namespace, name) {
return Ok(None);
}
if let Some(flow) =
self.materialized.find_flow_by_name_at(namespace, name, admin.version())
{
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow_by_name(
&mut Transaction::Admin(&mut *admin),
namespace,
name,
)? {
warn!(
"Flow '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
name, namespace
);
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Query(qry) => {
if let Some(flow) =
self.materialized.find_flow_by_name_at(namespace, name, qry.version())
{
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow_by_name(
&mut Transaction::Query(&mut *qry),
namespace,
name,
)? {
warn!(
"Flow '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
name, namespace
);
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Test(mut t) => {
if let Some(flow) =
TransactionalFlowChanges::find_flow_by_name(t.inner, namespace, name)
{
return Ok(Some(flow.clone()));
}
if TransactionalFlowChanges::is_flow_deleted_by_name(t.inner, namespace, name) {
return Ok(None);
}
if let Some(flow) = CatalogStore::find_flow_by_name(
&mut Transaction::Test(Box::new(t.reborrow())),
namespace,
name,
)? {
return Ok(Some(flow));
}
Ok(None)
}
Transaction::Replica(rep) => {
if let Some(flow) =
self.materialized.find_flow_by_name_at(namespace, name, rep.version())
{
return Ok(Some(flow));
}
if let Some(flow) = CatalogStore::find_flow_by_name(
&mut Transaction::Replica(&mut *rep),
namespace,
name,
)? {
warn!(
"Flow '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
name, namespace
);
return Ok(Some(flow));
}
Ok(None)
}
}
}
#[instrument(name = "catalog::flow::get", level = "trace", skip(self, txn))]
pub fn get_flow(&self, txn: &mut Transaction<'_>, id: FlowId) -> Result<Flow> {
self.find_flow(txn, id)?.ok_or_else(|| {
error!(internal!(
"Flow with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
id
))
})
}
#[instrument(name = "catalog::flow::create", level = "debug", skip(self, txn, to_create))]
pub fn create_flow(&self, txn: &mut AdminTransaction, to_create: FlowToCreate) -> Result<Flow> {
let flow = CatalogStore::create_flow(txn, to_create.into())?;
txn.track_flow_created(flow.clone())?;
Ok(flow)
}
#[instrument(name = "catalog::flow::create_with_id", level = "debug", skip(self, txn, to_create))]
pub fn create_flow_with_id(
&self,
txn: &mut AdminTransaction,
flow_id: FlowId,
to_create: FlowToCreate,
) -> Result<Flow> {
let flow = CatalogStore::create_flow_with_id(txn, flow_id, to_create.into())?;
txn.track_flow_created(flow.clone())?;
Ok(flow)
}
#[instrument(name = "catalog::flow::drop", level = "debug", skip(self, txn))]
pub fn drop_flow(&self, txn: &mut AdminTransaction, flow: Flow) -> Result<()> {
CatalogStore::drop_flow(txn, flow.id)?;
txn.track_flow_deleted(flow)?;
Ok(())
}
#[instrument(name = "catalog::flow::list_all", level = "debug", skip(self, txn))]
pub fn list_flows_all(&self, txn: &mut Transaction<'_>) -> Result<Vec<Flow>> {
CatalogStore::list_flows_all(txn)
}
#[instrument(name = "catalog::flow::update_name", level = "debug", skip(self, txn))]
pub fn update_flow_name(&self, txn: &mut AdminTransaction, flow_id: FlowId, new_name: String) -> Result<()> {
CatalogStore::update_flow_name(txn, flow_id, new_name)
}
#[instrument(name = "catalog::flow::update_status", level = "debug", skip(self, txn))]
pub fn update_flow_status(
&self,
txn: &mut AdminTransaction,
flow_id: FlowId,
status: FlowStatus,
) -> Result<()> {
CatalogStore::update_flow_status(txn, flow_id, status)
}
#[instrument(name = "catalog::flow::next_id", level = "trace", skip(self, txn))]
pub fn next_flow_id(&self, txn: &mut AdminTransaction) -> Result<FlowId> {
flow_sequence::next_flow_id(txn)
}
#[instrument(name = "catalog::flow::next_node_id", level = "trace", skip(self, txn))]
pub fn next_flow_node_id(&self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
flow_sequence::next_flow_node_id(txn)
}
#[instrument(name = "catalog::flow::next_edge_id", level = "trace", skip(self, txn))]
pub fn next_flow_edge_id(&self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
flow_sequence::next_flow_edge_id(txn)
}
}